Skip to content
Snippets Groups Projects
Commit e1d3def6 authored by Tommy Persson's avatar Tommy Persson
Browse files

waraps ui

parent 9886c68a
No related branches found
No related tags found
No related merge requests found
import rospy
import uuid
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List
from lrs_msgs_common.srv import *
@dataclass
class Signal:
task_uuid: str
ns: str
def signal(self, signal):
mobj = {}
mobj["com-uuid"] = str(uuid.uuid4())
mobj["command"] = "signal-task"
mobj["sender"] = "commander"
mobj["signal"] = signal
mobj["task-uuid"] = self.task_uuid
mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': '))
topic = "exec/command"
try:
client = rospy.ServiceProxy('send_topic_msg_to_unit', SendTopicMsgToUnit)
resp = client(self.ns.lstrip("/"), topic, mstr)
print("RESP:", resp)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
@dataclass
class SignalNode:
node_uuid: str
ns: str
tree_uuid: str
def signal(self, signal):
mobj = {}
mobj["com-uuid"] = str(uuid.uuid4())
mobj["command"] = "signal-tst-node"
mobj["sender"] = "commander"
mobj["signal"] = signal
mobj["node-uuid"] = self.node_uuid
mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': '))
topic = "tst/command"
ns = "/op0"
print(ns, topic, mstr)
try:
client = rospy.ServiceProxy(f'{ns}/send_topic_msg_to_unit', SendTopicMsgToUnit)
resp = client(self.ns.lstrip("/"), topic, mstr)
print("RESP:", resp)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
def signal_tree(self, signal):
mobj = {}
mobj["com-uuid"] = str(uuid.uuid4())
mobj["command"] = "signal-tst-tree"
mobj["sender"] = "commander"
mobj["signal"] = signal
mobj["node-uuid"] = self.tree_uuid
mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': '))
topic = "tst/command"
ns = "/op0"
print(ns, topic, mstr)
try:
client = rospy.ServiceProxy(f'{ns}/send_topic_msg_to_unit', SendTopicMsgToUnit)
resp = client(self.ns.lstrip("/"), topic, mstr)
print("RESP:", resp)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
def signal_unit(self, unit, signal):
mobj = {}
mobj["com-uuid"] = str(uuid.uuid4())
mobj["command"] = "signal-unit"
mobj["sender"] = "commander"
mobj["signal"] = signal
mobj["unit"] = unit
mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': '))
topic = "tst/command"
ns = "/op0"
print(ns, topic, mstr)
try:
client = rospy.ServiceProxy(f'{ns}/send_topic_msg_to_unit', SendTopicMsgToUnit)
resp = client(self.ns.lstrip("/"), topic, mstr)
print("RESP:", resp)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
def cleanup_task(task):
task.pop("children", None)
common_params = task.pop("common_params", None)
cleanup_params(task["params"])
return common_params["execunit"]
def cleanup_common_params(jobj):
jobj.pop("use_lock")
def cleanup_params(jobj):
jobj.pop("use_lock", None)
def cleanup_tst(task):
cleanup_common_params(task["common_params"])
cleanup_params(task["params"])
for child in task["children"]:
cleanup_tst(child)
def do_tst_command(jobj):
if len(jobj["children"]) != 1:
print("ERROR: Not a simple TST")
tstobj = jobj
tstobj["common_params"]["execunit"] = "/op0"
tstobj["common_params"]["node-uuid"] = str(uuid.uuid4())
cleanup_tst(tstobj)
termobj = tstobj["children"][0]
termobj["common_params"]["node-uuid"] = str(uuid.uuid4())
signalobj = None
print(termobj)
ns = "/op0"
if "node-uuid" in termobj["common_params"]:
signalobj = SignalNode(termobj["common_params"]["node-uuid"], "op0", tstobj["common_params"]["node-uuid"])
mobj = {}
mobj["com-uuid"] = str(uuid.uuid4())
mobj["command"] = "start-tst"
mobj["sender"] = "commander"
mobj["receiver"] = "op0"
# mobj["execution-unit"] = execunit.lstrip("/")
# mobj["task-uuid"] = str(uuid.uuid4())
mobj["tst"] = tstobj
mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': '))
#ns = jobj["common_params"]["execunit"]
ns = "/op0"
topic = "tst/command"
print(mstr)
print("TOPIC:", topic)
try:
client = rospy.ServiceProxy(f'{ns}/send_topic_msg_to_unit', SendTopicMsgToUnit)
resp = client(ns.lstrip("/"), topic, mstr)
print("RESP:", resp)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
return signalobj
def do_task_command(jobj):
if len(jobj["children"]) != 1:
print("ERROR: Not a task")
taskobj = jobj["children"][0]
execunit = cleanup_task(taskobj)
mobj = {}
mobj["com-uuid"] = str(uuid.uuid4())
mobj["command"] = "start-task"
mobj["sender"] = "commander"
mobj["execution-unit"] = execunit.lstrip("/")
mobj["task-uuid"] = str(uuid.uuid4())
mobj["task"] = taskobj
mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': '))
ns = jobj["common_params"]["execunit"]
signalobj = Signal(mobj["task-uuid"], ns)
topic = "exec/command"
print(mstr)
print("TOPIC:", topic)
try:
client = rospy.ServiceProxy('send_topic_msg_to_unit', SendTopicMsgToUnit)
resp = client(execunit.lstrip("/"), topic, mstr)
print("RESP:", resp)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
return signalobj
def do_del_command(jobj):
if len(jobj["children"]) != 1:
print("ERROR: Not a simple TST")
tstobj = jobj
tstobj["common_params"]["execunit"] = "/op0"
tstobj["common_params"]["node-uuid"] = str(uuid.uuid4())
cleanup_tst(tstobj)
termobj = tstobj["children"][0]
signalobj = None
print(termobj)
ns = "/op0"
if "node-uuid" in termobj["common_params"]:
signalobj = SignalNode(termobj["common_params"]["node-uuid"], "op0")
mobj = {}
mobj["com-uuid"] = str(uuid.uuid4())
mobj["command"] = "start-tst"
mobj["sender"] = "commander"
mobj["receiver"] = "op0"
# mobj["execution-unit"] = execunit.lstrip("/")
# mobj["task-uuid"] = str(uuid.uuid4())
mobj["tst"] = tstobj
mstr = json.dumps(mobj, sort_keys=True, indent=4, separators=(',', ': '))
#ns = jobj["common_params"]["execunit"]
ns = "/op0"
topic = "tst/command"
print(mstr)
print("TOPIC:", topic)
try:
client = rospy.ServiceProxy(f'{ns}/send_topic_msg_to_unit', SendTopicMsgToUnit)
resp = client(ns.lstrip("/"), topic, mstr)
print("RESP:", resp)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
return signalobj
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment