Skip to content
Snippets Groups Projects
Commit 6c6ee2f4 authored by Tommy Persson's avatar Tommy Persson
Browse files

Work on qr code.

parent 02a0cb55
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import rospy
import time
import os
import qrcode
import numpy
from PIL import Image as PilImage
from optparse import OptionParser
import cv2
from sensor_msgs.msg import Image, CompressedImage
from cv_bridge import CvBridge, CvBridgeError
import numpy as np
parser = OptionParser()
parser.add_option ("", "--image-topic", action="store", type="string", dest="image_topic", help="Topic to take image from", default="/dji21/camera0/image_raw_orig")
parser.add_option ("", "--compressed", action="store_true", dest="compressed", help="If true image in is compressed.")
#parser.add_option ("", "--fifo", action="store", type="string", dest="fifo", help="Fifo to write raw images to", default="/tmp/fifo_dji21")
parser.add_option ("", "--test", action="store_true", dest="test", help="Test mode.")
parser.add_option ("", "--qr", action="store_true", dest="qr", help="Time encoding qr code.")
(options, args) = parser.parse_args()
def image_callback(data):
global fifo
try:
# print("image_callback_main")
if options.compressed:
img = bridge.compressed_imgmsg_to_cv2(data, desired_encoding='bgr8')
else:
img = bridge.imgmsg_to_cv2(data, desired_encoding='bgr8')
scaledimg = cv2.resize(img, (1024, 600))
if options.qr:
# print("SCALEDIMG:", scaledimg.shape, scaledimg.dtype)
frame_pil = PilImage.fromarray(scaledimg)
timestamp = time.time()
qr = qrcode.QRCode(box_size=2)
qr.add_data(str(timestamp))
qr.make()
img_qr = qr.make_image()
pos = (frame_pil.size[0] - img_qr.size[0] - 1, 1) #better result when 1 pixel in, thus -1 and +1
# print("POS QR:", pos)
frame_pil.paste(img_qr, pos)
final_image = np.asarray(frame_pil)
else:
final_image = scaledimg
# print("FINALIMAGE:", final_image.shape, final_image.dtype)
#gray = numpy.array(img_qr).astype(numpy.float32)
#print("GRAY:", gray.shape, gray.dtype)
#cv_img_qr = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)
#print("CV_IMG_QR:", cv_img_qr.shape, cv_img_qr.dtype)
# gray = cv2. cvtColor(image, cv2. COLOR_BGR2GRAY)
# x_offset=y_offset=50
# scaledimg[y_offset:y_offset+cv_img_qr.shape[0], x_offset:x_offset+cv_img_qr.shape[1]] = cv_img_qr
if options.test:
cv2.imshow("image", final_image)
cv2.waitKey(1)
if options.compressed:
msg = bridge.cv2_to_compressed_imgmsg(final_image)
image_pub.publish(msg)
#retval, points = detector.detect(final_image)
#print("DETECT:", retval, points)
#qr_timestamp, _ , _ = detector.detectAndDecode(final_image)
#print("QR_TIMESTAMP:", qr_timestamp)
except Exception as exc:
print("image_callback exception:", exc)
fifo.close()
fifo = open(options.fifo, 'wb')
print(f'FIFO RE-OPENED: {options.fifo}')
if __name__ == "__main__":
rospy.init_node ("add_qr")
# detector = cv2.QRCodeDetector()
#try:
# os.mkfifo(options.fifo)
#except Exception as exc:
# print(f'{options.fifo} exist:', exc)
#if not options.test:
# fifo = open(options.fifo, 'wb')
# print(f'FIFO OPENED: {options.fifo}')
print("Create cv bridge")
bridge = CvBridge()
if options.compressed:
in_topic = f'{options.image_topic}/compressed'
out_topic = f'{options.image_topic}_qr/compressed'
else:
in_topic = f'{options.image_topic}'
out_topic = f'{options.image_topic}_qr'
print("in_topic:", in_topic)
print("out_topic:", out_topic)
if options.compressed:
image_sub = rospy.Subscriber(in_topic, CompressedImage, image_callback)
image_pub = rospy.Publisher(out_topic, CompressedImage, queue_size=10)
else:
image_sub = rospy.Subscriber(in_topic, Image, image_callback)
image_pub = rospy.Publisher(out_topic, Image, queue_size=10)
rospy.spin()
...@@ -110,7 +110,7 @@ def cleanup_tst(task): ...@@ -110,7 +110,7 @@ def cleanup_tst(task):
for child in task["children"]: for child in task["children"]:
cleanup_tst(child) cleanup_tst(child)
def do_tst_command(jobj, root_uuid=str(uuid.uuid4())): def do_tst_command(jobj, root_uuid=str(uuid.uuid4()), ns="/op0"):
print(jobj) print(jobj)
if len(jobj["children"]) != 1: if len(jobj["children"]) != 1:
print("ERROR: Not a simple TST") print("ERROR: Not a simple TST")
...@@ -122,9 +122,8 @@ def do_tst_command(jobj, root_uuid=str(uuid.uuid4())): ...@@ -122,9 +122,8 @@ def do_tst_command(jobj, root_uuid=str(uuid.uuid4())):
termobj["common_params"]["node-uuid"] = str(uuid.uuid4()) termobj["common_params"]["node-uuid"] = str(uuid.uuid4())
signalobj = None signalobj = None
print(termobj) print(termobj)
ns = "/op0"
if "node-uuid" in termobj["common_params"]: if "node-uuid" in termobj["common_params"]:
signalobj = SignalNode(termobj["common_params"]["node-uuid"], "op0", tstobj["common_params"]["node-uuid"]) signalobj = SignalNode(termobj["common_params"]["node-uuid"], ns.lstrip("/"), tstobj["common_params"]["node-uuid"])
mobj = {} mobj = {}
mobj["com-uuid"] = str(uuid.uuid4()) mobj["com-uuid"] = str(uuid.uuid4())
mobj["command"] = "start-tst" mobj["command"] = "start-tst"
...@@ -135,12 +134,11 @@ def do_tst_command(jobj, root_uuid=str(uuid.uuid4())): ...@@ -135,12 +134,11 @@ def do_tst_command(jobj, root_uuid=str(uuid.uuid4())):
mobj["tst"] = tstobj mobj["tst"] = tstobj
mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': ')) mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': '))
#ns = jobj["common_params"]["execunit"] #ns = jobj["common_params"]["execunit"]
ns = "/op0"
topic = "tst/command" topic = "tst/command"
print(mstr) print(mstr)
print("TOPIC:", topic) print("TOPIC:", topic)
try: try:
client = rospy.ServiceProxy(f'{ns}/send_topic_msg_to_unit', SendTopicMsgToUnit) client = rospy.ServiceProxy(f'/op0/send_topic_msg_to_unit', SendTopicMsgToUnit)
resp = client(ns.lstrip("/"), topic, mstr) resp = client(ns.lstrip("/"), topic, mstr)
print("RESP:", resp) print("RESP:", resp)
except rospy.ServiceException as e: except rospy.ServiceException as e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment