In many cities around the world, especially developing countries like India, traffic issues have become a growing concern. Because of the facilities available in the urban parts people are rapidly migrating from rural to urban areas. As the number of residents moving to urban areas continue to grow it has resulted in a drastic rise in the number of vehicles on the road. In large cities, this results in hour-long road congestions.
To tackle this issue there is an urgent need to monitor the number of vehicles passing through each road every day. Analyzing this data will enable the government and authorities to divert the traffic appropriately to prevent traffic congestions.
In this article we will create a simple vehicle detection system based on Computer Vision technologies.
For this we will take the following YouTube video to test our system.
https://www.youtube.com/watch?v=PJ5xXXcfuTc
This is a demonstration video of the Alibi ALI-IPU3030RV 3.0 megapixel bullet IP camera monitoring highway traffic.
The system will contain two files namely 'vehicles.py' and 'main.py'.
Save the below code in the file named 'vehicles.py'.
from random import randint
class Car:
tracks = []
def __init__(self, i, xi, yi, max_age):
self.i = i
self.x = xi
self.y = yi
self.tracks = []
self.R = randint(0, 255)
self.G = randint(0, 255)
self.B = randint(0, 255)
self.done = False
self.state = '0'
self.age = 0
self.max_age = max_age
self.dir = None
def getRGB(self):
return (self.R, self.G, self.B)
def getTracks(self):
return self.tracks
def getId(self):
return self.i
def getState(self):
return self.state
def getDir(self):
return self.dir
def getX(self):
return self.x
def getY(self):
return self.y
def updateCoords(self, xn, yn):
self.age = 0
self.tracks.append([self.x, self.y])
self.x = xn
self.y = yn
def setDone(self):
self.done = True
def timedOut(self):
return self.done
def going_UP(self, mid_start, mid_end):
if len(self.tracks) >= 2:
if self.state == '0':
if self.tracks[-1][1] < mid_end and self.tracks[-2][1] >= mid_end:
state = '1'
self.dir = 'up'
return True
else:
return False
else:
return False
else:
return False
def going_DOWN(self, mid_start, mid_end):
if len(self.tracks) >= 2:
if self.state == '0':
if self.tracks[-1][1] > mid_start and self.tracks[-2][1] <= mid_start:
start = '1'
self.dir = 'down'
return True
else:
return False
else:
return False
else:
return False
def age_one(self):
self.age += 1
if self.age > self.max_age:
self.done = True
return True
class MultiCar:
def __init__(self, cars, xi, yi):
self.cars = cars
self.x = xi
self.y = yi
self.tracks = []
self.R = randint(0, 255)
self.G = randint(0, 255)
self.B = randint(0, 255)
self.done = False
And save the below code in the file named 'main.py'.
import cv2
import numpy as np
import vehicles
import time
cnt_up = 0
cnt_down = 0
cap = cv2.VideoCapture("traffic_camera.m4v")
w = cap.get(3)
h = cap.get(4)
frameArea = h*w
areaTH = frameArea/400
line_up = int(2*(h/5))
line_down = int(3*(h/5))
up_limit = int(1*(h/5))
down_limit = int(4*(h/5))
print("Red line y:", str(line_down))
print("Blue line y:", str(line_up))
line_down_color = (255, 0, 0)
line_up_color = (255, 0, 255)
pt1 = [0, line_down]
pt2 = [w, line_down]
pts_L1 = np.array([pt1, pt2], np.int32)
pts_L1 = pts_L1.reshape((-1, 1, 2))
pt3 = [0, line_up]
pt4 = [w, line_up]
pts_L2 = np.array([pt3, pt4], np.int32)
pts_L2 = pts_L2.reshape((-1, 1, 2))
pt5 = [0, up_limit]
pt6 = [w, up_limit]
pts_L3 = np.array([pt5, pt6], np.int32)
pts_L3 = pts_L3.reshape((-1, 1, 2))
pt7 = [0, down_limit]
pt8 = [w, down_limit]
pts_L4 = np.array([pt7, pt8], np.int32)
pts_L4 = pts_L4.reshape((-1, 1, 2))
fgbg = cv2.createBackgroundSubtractorMOG2(detectShadows=True)
kernalOp = np.ones((3, 3), np.uint8)
kernalOp2 = np.ones((5, 5), np.uint8)
kernalCl = np.ones((11, 11), np.uint)
font = cv2.FONT_HERSHEY_SIMPLEX
cars = []
max_p_age = 5
pid = 1
while(cap.isOpened()):
ret, frame = cap.read()
for i in cars:
i.age_one()
fgmask = fgbg.apply(frame)
fgmask2 = fgbg.apply(frame)
if ret == True:
ret, imBin = cv2.threshold(fgmask, 200, 255, cv2.THRESH_BINARY)
ret, imBin2 = cv2.threshold(fgmask2, 200, 255, cv2.THRESH_BINARY)
mask = cv2.morphologyEx(imBin, cv2.MORPH_OPEN, kernalOp)
mask2 = cv2.morphologyEx(imBin2, cv2.MORPH_CLOSE, kernalOp)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.float32(kernalCl))
mask2 = cv2.morphologyEx(mask2, cv2.MORPH_CLOSE, np.float32(kernalCl))
countours0, hierarchy = cv2.findContours(
mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
for cnt in countours0:
area = cv2.contourArea(cnt)
print(area)
if area > areaTH:
m = cv2.moments(cnt)
cx = int(m['m10']/m['m00'])
cy = int(m['m01']/m['m00'])
x, y, w, h = cv2.boundingRect(cnt)
new = True
if cy in range(up_limit, down_limit):
for i in cars:
if abs(x - i.getX()) <= w and abs(y - i.getY()) <= h:
new = False
i.updateCoords(cx, cy)
if i.going_UP(line_down, line_up) == True:
cnt_up += 1
print("ID:", i.getId(),
'crossed going up at', time.strftime("%c"))
elif i.going_DOWN(line_down, line_up) == True:
cnt_down += 1
print("ID:", i.getId(),
'crossed going up at', time.strftime("%c"))
break
if i.getState() == '1':
if i.getDir() == 'down' and i.getY() > down_limit:
i.setDone()
elif i.getDir() == 'up' and i.getY() < up_limit:
i.setDone()
if i.timedOut():
index = cars.index(i)
cars.pop(index)
del i
if new == True:
p = vehicles.Car(pid, cx, cy, max_p_age)
cars.append(p)
pid += 1
cv2.circle(frame, (cx, cy), 5, (0, 0, 255), -1)
img = cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
for i in cars:
cv2.putText(frame, str(i.getId()), (i.getX(), i.getY()),
font, 0.3, i.getRGB(), 1, cv2.LINE_AA)
str_up = 'UP: '+str(cnt_up)
str_down = 'DOWN: '+str(cnt_down)
frame = cv2.polylines(
frame, [pts_L1], False, line_down_color, thickness=2)
frame = cv2.polylines(
frame, [pts_L2], False, line_up_color, thickness=2)
frame = cv2.polylines(
frame, [pts_L3], False, (255, 255, 255), thickness=1)
frame = cv2.polylines(
frame, [pts_L4], False, (255, 255, 255), thickness=1)
cv2.putText(frame, str_up, (10, 40), font, 0.5,
(255, 255, 255), 2, cv2.LINE_AA)
cv2.putText(frame, str_up, (10, 40), font,
0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.putText(frame, str_down, (10, 90), font,
0.5, (255, 255, 255), 2, cv2.LINE_AA)
cv2.putText(frame, str_down, (10, 90), font,
0.5, (255, 0, 0), 1, cv2.LINE_AA)
cv2.imshow('Frame', frame)
if cv2.waitKey(1) & 0xff == ord('q'):
break
else:
break
cap.release()
cv2.destroyAllWindows()
The 'main.py' file uses the YouTube video mentioned above which is named as 'traffic_camera.m4v'.
Comments