Commit 613205dc authored by Ivan Tyagov's avatar Ivan Tyagov

Add a new mode of operation which will disable camera integration

(for testing purposes).
parent 78f6249b
...@@ -40,6 +40,7 @@ a('--ipv4', help='The IPv4 address on which the OPC UA server runs', default="0. ...@@ -40,6 +40,7 @@ a('--ipv4', help='The IPv4 address on which the OPC UA server runs', default="0.
a('--port', help='The port on which the OPC UA server runs', default="4840") a('--port', help='The port on which the OPC UA server runs', default="4840")
a('--camera', help='The index of the camera (i.e. indxed in /dev/videoX)', default=0) a('--camera', help='The index of the camera (i.e. indxed in /dev/videoX)', default=0)
a('--headless', help='Run without screen in a headless mode (boolean, default=0)', default=False) a('--headless', help='Run without screen in a headless mode (boolean, default=0)', default=False)
a('--mode', help='Mode of operation. The default is to read data from camera device. If set to 1 do NOT read data from camera device. (default=0)', default=0)
# XXX: allow to specify from CLI DEFAULT_LH & friends # XXX: allow to specify from CLI DEFAULT_LH & friends
args = parser.parse_args() args = parser.parse_args()
...@@ -47,6 +48,7 @@ ipv4 = args.ipv4 ...@@ -47,6 +48,7 @@ ipv4 = args.ipv4
port = args.port port = args.port
camera = int(args.camera) camera = int(args.camera)
headless = bool(int(args.headless)) headless = bool(int(args.headless))
mode = bool(int(args.mode))
def nothing(x): def nothing(x):
# any operation # any operation
...@@ -74,20 +76,21 @@ async def main(): ...@@ -74,20 +76,21 @@ async def main():
result_stack = [] result_stack = []
current_shape = 0.0 current_shape = 0.0
# init camera if not mode:
cap = cv2.VideoCapture(camera) # init camera
font = cv2.FONT_HERSHEY_COMPLEX cap = cv2.VideoCapture(camera)
if not headless: font = cv2.FONT_HERSHEY_COMPLEX
# create UI if not headless:
cv2.namedWindow("Trackbars") # create UI
cv2.createTrackbar("L-H", "Trackbars", DEFAULT_LH[0], DEFAULT_LH[1], nothing) cv2.namedWindow("Trackbars")
cv2.createTrackbar("L-S", "Trackbars", DEFAULT_LS[0], DEFAULT_LS[1], nothing) cv2.createTrackbar("L-H", "Trackbars", DEFAULT_LH[0], DEFAULT_LH[1], nothing)
cv2.createTrackbar("L-V", "Trackbars", DEFAULT_LV[0], DEFAULT_LV[1], nothing) cv2.createTrackbar("L-S", "Trackbars", DEFAULT_LS[0], DEFAULT_LS[1], nothing)
cv2.createTrackbar("U-H", "Trackbars", DEFAULT_UH[0], DEFAULT_UH[1], nothing) cv2.createTrackbar("L-V", "Trackbars", DEFAULT_LV[0], DEFAULT_LV[1], nothing)
cv2.createTrackbar("U-S", "Trackbars", DEFAULT_US[0], DEFAULT_US[1], nothing) cv2.createTrackbar("U-H", "Trackbars", DEFAULT_UH[0], DEFAULT_UH[1], nothing)
cv2.createTrackbar("U-V", "Trackbars", DEFAULT_UV[0], DEFAULT_UV[1], nothing) cv2.createTrackbar("U-S", "Trackbars", DEFAULT_US[0], DEFAULT_US[1], nothing)
cv2.createTrackbar("Area (min)", "Trackbars", DEFAULT_AREA[0], DEFAULT_AREA[1], nothing) cv2.createTrackbar("U-V", "Trackbars", DEFAULT_UV[0], DEFAULT_UV[1], nothing)
cv2.createTrackbar("Area (max)", "Trackbars", DEFAULT_AREA[1], DEFAULT_AREA[1], nothing) cv2.createTrackbar("Area (min)", "Trackbars", DEFAULT_AREA[0], DEFAULT_AREA[1], nothing)
cv2.createTrackbar("Area (max)", "Trackbars", DEFAULT_AREA[1], DEFAULT_AREA[1], nothing)
_logger.info("Starting server!") _logger.info("Starting server!")
async with server: async with server:
...@@ -97,98 +100,99 @@ async def main(): ...@@ -97,98 +100,99 @@ async def main():
# recognition. Thus give (roughly) some CPU time so both can work together. # recognition. Thus give (roughly) some CPU time so both can work together.
await asyncio.sleep(SLEEP_DURATION) await asyncio.sleep(SLEEP_DURATION)
before = time.time() * 1000 if not mode:
# read and process camera before = time.time() * 1000
_, frame = cap.read() # read and process camera
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) _, frame = cap.read()
if not headless: hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
l_h = cv2.getTrackbarPos("L-H", "Trackbars") if not headless:
l_s = cv2.getTrackbarPos("L-S", "Trackbars") l_h = cv2.getTrackbarPos("L-H", "Trackbars")
l_v = cv2.getTrackbarPos("L-V", "Trackbars") l_s = cv2.getTrackbarPos("L-S", "Trackbars")
u_h = cv2.getTrackbarPos("U-H", "Trackbars") l_v = cv2.getTrackbarPos("L-V", "Trackbars")
u_s = cv2.getTrackbarPos("U-S", "Trackbars") u_h = cv2.getTrackbarPos("U-H", "Trackbars")
u_v = cv2.getTrackbarPos("U-V", "Trackbars") u_s = cv2.getTrackbarPos("U-S", "Trackbars")
designated_area_min = cv2.getTrackbarPos("Area (min)", "Trackbars") u_v = cv2.getTrackbarPos("U-V", "Trackbars")
designated_area_max = cv2.getTrackbarPos("Area (max)", "Trackbars") designated_area_min = cv2.getTrackbarPos("Area (min)", "Trackbars")
else: designated_area_max = cv2.getTrackbarPos("Area (max)", "Trackbars")
# read defaults provided else:
l_h = DEFAULT_LH[0] # read defaults provided
l_s = DEFAULT_LS[0] l_h = DEFAULT_LH[0]
l_v = DEFAULT_LV[0] l_s = DEFAULT_LS[0]
u_h = DEFAULT_UH[0] l_v = DEFAULT_LV[0]
u_s = DEFAULT_US[0] u_h = DEFAULT_UH[0]
u_v = DEFAULT_UV[0] u_s = DEFAULT_US[0]
designated_area_min = DEFAULT_AREA[0] u_v = DEFAULT_UV[0]
designated_area_max = DEFAULT_AREA[1] designated_area_min = DEFAULT_AREA[0]
designated_area_max = DEFAULT_AREA[1]
lower_red = np.array([l_h, l_s, l_v])
upper_red = np.array([u_h, u_s, u_v]) lower_red = np.array([l_h, l_s, l_v])
upper_red = np.array([u_h, u_s, u_v])
mask = cv2.inRange(hsv, lower_red, upper_red)
kernel = np.ones((5, 5), np.uint8) mask = cv2.inRange(hsv, lower_red, upper_red)
mask = cv2.erode(mask, kernel) kernel = np.ones((5, 5), np.uint8)
mask = cv2.erode(mask, kernel)
# Contours detection
contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) # Contours detection
now = time.time() * 1000 contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
diff = (now-before) now = time.time() * 1000
#print("Processing time = %.2f ms, countours = %d" %(diff, len(contours))) diff = (now-before)
contour_detected = False #print("Processing time = %.2f ms, countours = %d" %(diff, len(contours)))
for cnt in contours: contour_detected = False
area = cv2.contourArea(cnt) for cnt in contours:
approx = cv2.approxPolyDP(cnt, 0.02*cv2.arcLength(cnt, True), True) area = cv2.contourArea(cnt)
x = approx.ravel()[0] approx = cv2.approxPolyDP(cnt, 0.02*cv2.arcLength(cnt, True), True)
y = approx.ravel()[1] x = approx.ravel()[0]
number_of_points = len(approx) y = approx.ravel()[1]
if area > designated_area_min and area < designated_area_max: number_of_points = len(approx)
contour_detected = True if area > designated_area_min and area < designated_area_max:
if not headless: contour_detected = True
cv2.drawContours(frame, [approx], 0, (0, 0, 0), 5)
if number_of_points == 3:
if not headless: if not headless:
cv2.putText(frame, "Triangle", (x, y), font, 1, (0, 0, 0)) cv2.drawContours(frame, [approx], 0, (0, 0, 0), 5)
result = 1.0 if number_of_points == 3:
elif number_of_points == 4: if not headless:
if not headless: cv2.putText(frame, "Triangle", (x, y), font, 1, (0, 0, 0))
cv2.putText(frame, "Rectangle", (x, y), font, 1, (0, 0, 0)) result = 1.0
result = 2.0 elif number_of_points == 4:
elif 7 < number_of_points < 20: if not headless:
if not headless: cv2.putText(frame, "Rectangle", (x, y), font, 1, (0, 0, 0))
cv2.putText(frame, "Circle (%s)" %number_of_points, (x, y), font, 1, (0, 0, 0)) result = 2.0
result = 3.0 elif 7 < number_of_points < 20:
if not headless:
# printout cv2.putText(frame, "Circle (%s)" %number_of_points, (x, y), font, 1, (0, 0, 0))
#print("\tDetected area (px)=%.2f, result=%d, shape changes=%d" %(area, result, shape_change_counter)) result = 3.0
# update list for last X results (FILO) # printout
#print("\tDetected area (px)=%.2f, result=%d, shape changes=%d" %(area, result, shape_change_counter))
# update list for last X results (FILO)
if current_shape != result:
result_stack.append(result)
current_shape = result
shape_change_counter += 1
await myvar.write_value(result)
# break current countour detection loop as in this example we care
# for first detected SHAPE, we do not expect more shapes
break
if not contour_detected:
# no countours actually detected thus update OPC UA server's node attribute
result = 0.0
if current_shape != result: if current_shape != result:
result_stack.append(result) result_stack.append(result)
current_shape = result
shape_change_counter += 1 shape_change_counter += 1
await myvar.write_value(result) await myvar.write_value(result)
current_shape = result
# break current countour detection loop as in this example we care #print("\tNo shape detected, result=%d, shape changes=%d" %(result, shape_change_counter))
# for first detected SHAPE, we do not expect more shapes
break # show current MASK and camera output windows
if not headless:
if not contour_detected: cv2.imshow("Frame", frame)
# no countours actually detected thus update OPC UA server's node attribute cv2.imshow("Mask", mask)
result = 0.0 # stop iteration if key pressed in a non headless mode
if current_shape != result: key = cv2.waitKey(1)
result_stack.append(result) if key == 27:
shape_change_counter += 1 break
await myvar.write_value(result)
current_shape = result
#print("\tNo shape detected, result=%d, shape changes=%d" %(result, shape_change_counter))
# show current MASK and camera output windows
if not headless:
cv2.imshow("Frame", frame)
cv2.imshow("Mask", mask)
# stop iteration if key pressed in a non headless mode
key = cv2.waitKey(1)
if key == 27:
break
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.ERROR) logging.basicConfig(level=logging.ERROR)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment