Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • liuhomewreckers/liu-home-wreckers
  • tdde19-2021-5/tdde-19-group-5-pepper
2 results
Show changes
Showing
with 1048 additions and 1474 deletions
from .behaviour import Behaviour
from std_msgs.msg import String
from lhw_interfaces.msg._result import Result
class GeneralBehaviour(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("GeneralBehaviour started")
# ------------- State's properties --------------
self.state_done = False
self.state_name = String()
self.state_name.data = "GENERAL_STATE"
self.run_timer = self.node.create_timer(1, self.run)
def run(self):
if self.state_done or self._behaviour_timeout():
self.run_timer.cancel()
self.stop_robot()
self.send_result()
return
#def subscriber_cb(self, data):
# self.state_done = True
from .behaviour import Behaviour
from ..help_functions.Map import Map
from std_msgs.msg import String
from lhw_interfaces.msg._result import Result
from nav_msgs.msg import OccupancyGrid
from geometry_msgs.msg import PoseStamped
import numpy as np
from scipy.spatial.distance import euclidean
from PIL import Image
import os
METER_PER_PIXEL = 0.01
class GoToEntity(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("GoToEntity started")
# ------------- State's properties --------------
self.state_done = False
self.specific_id = args.id
self.state_name = String()
self.state_name.data = "GO_TO_ENTITY"
self.goal_reached = False
self.relative_pose = False
goal_entity = self.node.db.get_entity(self.specific_id)
self.goal_pose = self._position_to_pose(goal_entity["position"])
self.move_robot_to_pose(self.goal_pose)
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
#TEMPORARY
self.moving_to_goal_counter = 5
def run_behaviour_loop(self):
if self.state_done or self._behaviour_timeout():
self.run_timer.cancel()
self.stop_robot()
self.send_result()
return
self.node.log.info("GoToEntity looop")
if self.relative_pose:
self.move_robot_to_relative_pose(relative_pose)
#TEMPORARY
self.moving_to_goal_counter -= 1
if not self.moving_to_goal_counter:
self.navigate_to_pose_done_callback("done")
def entities_callback(self, data):
for entity in data.entities:
if entity.id == self.specific_id:
self.relative_pose = self._relative_xyz_to_relative_pose(entity.xyz)
def navigate_to_pose_done_callback(self, result):
self.state_done = True
#
# Assignee: Markus Handstedt (@marha066)
# Issue: #115 Create service for using face recognition
# Branch: 115-create-service-for-using-face-recognition
#
##############################################################################
# Documentation
##############################################################################
"""
Behaviour for a analyse faces service call to the visage_face_analysis server.
Template behaviour that is used for implementation of new behaviours.
"""
##############################################################################
# Imports
##############################################################################
import typing
import py_trees
import lhw_interfaces.msg as lhw_msgs
import lhw_interfaces.srv as lhw_srvs
import lhw_interfaces.action as lhw_action
# import lhw_interfaces.actions as lhw_actions
import std_msgs
import time
##############################################################################
# Behaviours
##############################################################################
class AnalyseFaces(py_trees.behaviour.Behaviour):
class BlackboardUpdated(py_trees.behaviour.Behaviour):
"""
This behaviour makes a service call to the visage_face_analysis server
over the service '/analyse_faces'. The behaviour makes several request to
the server in order to validate the data and return :attr:`~py_trees.common.Status.RUNNING`.
When it has recieved 3 sequent responses with the same FaceDetection.id from the server
it then stores the FaceDetection data in the given blackboard and returns
:attr:`~py_trees.common.Status.SUCCESS`.
Service Client:
* **/analyse_faces** (:class:`lhw_interfaces.srv.AnalyseFaces`)
Args;
name: name of the behaviour
blackboard: blackboard object from tree node (:class:`py_trees.blackboard.Client`)
"""
def __init__(
self,
name: str
self,
name: str,
category: str
):
super(AnalyseFaces, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(key="found_new_person", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="recognized_people", access=py_trees.common.Access.WRITE)
self.blackboard.found_new_person = False
self.blackboard.recognized_people = {}
self.iterations_done = 0
self.analysed_faces = None
self.request_id = None
self.response_id = None
super(BlackboardUpdated, self).__init__(name=name)
self.category = category
self.blackboard = py_trees.blackboard.Client(name=self.name)
def setup(self, **kwargs):
"""
Setup the service client which will send face analyse requests to the visage_face_analysis server.
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
......@@ -66,238 +43,315 @@ class AnalyseFaces(py_trees.behaviour.Behaviour):
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
# analyse faces service client
self.analyse_faces_client = self.node.create_client(lhw_srvs.AnalyseFaces, '/analyse_faces')
if not self.analyse_faces_client.wait_for_service(timeout_sec=3.0):
raise RuntimeError("client timed out waiting for server visage_face_analysis")
# Tries to read person from blackboard.
# If person not found -> creates a mock person for debugging.
try:
self.blackboard.register_key(
key="recognized_people",
access=py_trees.common.Access.WRITE
)
self.blackboard.register_key(
key="current_person_id",
access=py_trees.common.Access.READ
)
self.blackboard.recognized_people
self.blackboard.current_person_id
self.node.get_logger().info("In TRY")
except:
self.node.get_logger().info("In Except")
# Temporary write access
self.blackboard.register_key(
key="current_person_id",
access=py_trees.common.Access.WRITE
)
# recognized people borde se ut såhär: [{name: "", drink: "", id: 0}]
#self.blackboard.set(self.variable_name, self.variable_value, overwrite=True)
self.blackboard.set("recognized_people", [{"id": 0}], overwrite=True)
self.blackboard.set("current_person_id", 0, overwrite=True)
def initialise(self):
"""
Reset the iterations_done counter and the analyse_faces srv.result() object and
triggers a service request to the visage_face_analysis server.
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self.iterations_done = 0
self.analysed_faces = None
self.request_id = 0
self.response_id = None
self._send_analyse_faces_request()
# WRITE CODE HERE
def update(self) -> py_trees.common.Status:
"""
Ceep make service calls to the visage_face_analysis server until valid data is recieved.
Returns:
When data not yet valid or service response not done :attr:`~py_trees.common.Status.RUNNING`
When data valid and stored in blackboard :attr:`~py_trees.common.Status.SUCCESS`
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
all_done = False
if self.iterations_done >= 3:
all_done = True
# wait for analyse faces to return
if not all_done:
if self._process_analyse_faces_response():
self._send_analyse_faces_request()
return py_trees.common.Status.RUNNING
# add analysed faces to blackboard
self.request_id = 0
self._add_analysed_faces_to_blackboard()
return py_trees.common.Status.SUCCESS
# value = self.blackboard.recognized_people[self.blackboard.current_person_id][self.key]
# Try to read a value from recognized people
try:
value = self.blackboard.recognized_people[self.blackboard.current_person_id][self.category]
confirmed = self.blackboard.recognized_people[self.blackboard.current_person_id][f"{self.category}_confirmed"]
except:
return py_trees.common.Status.FAILURE
# Return succes if value for key exists in blackboard and it is confirmed
if value and confirmed:
return py_trees.common.Status.SUCCESS
# Otherwise return false
return py_trees.common.Status.FAILURE
def terminate(self, new_status: py_trees.common.Status):
"""
Only logs the terminate call.
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _send_analyse_faces_request(self):
self.request_id += 1
self.feedback_message = "sent analyse faces request {0}".format(self.request_id)
self.node.get_logger().info(self.feedback_message)
request = lhw_srvs.AnalyseFaces.Request()
self.analyse_faces_future = self.analyse_faces_client.call_async(request)
def _process_analyse_faces_response(self) -> bool:
if not self.analyse_faces_future.done():
return False
if self.analyse_faces_future.result() is None:
self.feedback_message = "Service call to visage_face_analysis failed %r" % (self.analyse_faces_future.exception(),)
self.node.get_logger().error(self.feedback_message)
faces_detected = self.analyse_faces_future.result().faces_detected
if self.analysed_faces is not None:
previous_faces_detected = self.analysed_faces.faces_detected
same_faces = False
self.node.get_logger().info("faces_detected: {0}".format(faces_detected))
self.node.get_logger().info("previous_faces_detected: {0}".format(previous_faces_detected))
if len(faces_detected) == len(previous_faces_detected):
same_faces = True
for (curr_id, prev_id) in zip(faces_detected, previous_faces_detected):
if curr_id != prev_id:
same_faces = False
if not same_faces:
self.feedback_message = "not same as previous result"
self.node.get_logger().info(self.feedback_message)
self.iterations_done = 0
self.iterations_done += 1
self.analysed_faces = self.analyse_faces_future.result()
self.response_id = self.request_id
self.node.get_logger().info("response recieved {0}".format(self.response_id))
return True
def _add_analysed_faces_to_blackboard(self):
if self.analysed_faces is not None:
for face in self.analysed_faces.faces_detected:
if face.id not in self.blackboard.recognized_people:
self.blackboard.found_new_person = True
self.blackboard.recognized_people[face.id] = {
"z_index" : face.z_index,
"name" : None,
"drinks" : None,
"age" : face.age,
"face_center" : [face.face_center_xy[0], face.face_center_xy[1]]
}
else:
self.blackboard.found_new_person = False
self.blackboard.recognized_people[face.id]["z_index"] = face.z_index
self.blackboard.recognized_people[face.id]["face_center"] = [face.face_center_xy[0], face.face_center_xy[1]]
# Only change age if it is lower than previous, prevents age bouncing around
if face.age < self.blackboard.recognized_people[face.id]["age"]:
self.blackboard.recognized_people[face.id]["age"] = face.age
class PixelToXYZ(py_trees.behaviour.Behaviour):
# WRITE CODE HERE
class ListenConfirmation(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
self,
name: str,
category: str
):
super(PixelToXYZ, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(key="point_to_pixel", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="point_to_coordinate", access=py_trees.common.Access.WRITE)
super(ListenConfirmation, self).__init__(name=name)
self.category = category
self.blackboard = py_trees.blackboard.Client(name=self.name)
self.blackboard.register_key(
key="recognized_people",
access=py_trees.common.Access.WRITE
)
self.blackboard.register_key(
key="current_person_id",
access=py_trees.common.Access.READ
)
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
# service client
self.client = self.node.create_client(lhw_srvs.PixelToXYZ, '/pixel_to_xyz')
if not self.client.wait_for_service(timeout_sec=3.0):
raise RuntimeError("client timed out waiting for server depth_node")
# WRITE CLIENT SETUP CODE HERE
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self._send_request()
self.time_start = time.time()
self.dialog_sub = self.node.create_subscription(
lhw_msgs.Response,
'dialogflow_response',
self._answer_callback,
10
)
self.confirmed = ""
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self._process_response():
self._add_to_blackboard()
# TODO: change to node timer
time_duration_sec = time.gmtime(time.time() - self.time_start).tm_sec
if self.confirmed == "True":
self.blackboard.recognized_people[self.blackboard.current_person_id][f"{self.category}_confirmed"] \
= True
return py_trees.common.Status.SUCCESS
else:
# self._send_request()
return py_trees.common.Status.RUNNING
elif self.confirmed == "False":
self.blackboard.recognized_people[self.blackboard.current_person_id][f"{self.category}_confirmed"] \
= False
return py_trees.common.Status.FAILURE
if time_duration_sec > 10.0: # If more than 10 sec --> fail
return py_trees.common.Status.FAILURE
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
# Destroy subscription if still exists
try:
self.node.destroy_subscription(self.dialog_sub)
except:
pass
def _send_request(self):
self.feedback_message = "sending pixel to xyz request..."
self.node.get_logger().info(self.feedback_message)
request = lhw_srvs.PixelToXYZ.Request()
request.row = int(self.blackboard.point_to_pixel[0]/4)
request.col = int(self.blackboard.point_to_pixel[1]/4)
self.future = self.client.call_async(request)
def _process_response(self):
if not self.future.done():
return False
if self.future.result() is None:
self.feedback_message = "Service call to depth_node failed %r" % (self.future.exception(),)
self.node.get_logger().error(self.feedback_message)
return True
def _answer_callback(self, msg):
if "yes" in msg.recognized_text:
self.confirmed = "True"
elif "no" in msg.recognized_text:
self.confirmed = "False"
def _add_to_blackboard(self):
self.blackboard.point_to_coordinate = self.future.result().xyz
# ADD PRIVATE FUNCTIONS HERE "_function_name"
class Idle(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name:str
):
super(Idle, self).__init__(name=name)
class ToBlackboard(py_trees.behaviour.Behaviour):
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
return py_trees.common.Status.RUNNING
class AskForConfirmation(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
self,
name: str,
key: str,
variable: typing.Any,
node
variable: str
):
super(ToBlackboard, self).__init__(name=name)
self.key = key
super(AskForConfirmation, self).__init__(name=name)
self.variable = variable
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard = py_trees.blackboard.Client(name=name)
self.blackboard.register_key(
key="variable",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", key)
key="current_person_id",
access=py_trees.common.Access.READ
)
self.node = node
self.blackboard.register_key(
key="recognized_people",
access=py_trees.common.Access.READ
)
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
# try:
# self.node = kwargs['node']
# except KeyError as e:
# error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
# raise KeyError(error_message) from e # 'direct cause' traceability
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.say_pub = self.node.create_publisher(std_msgs.msg.String, 'say', 100)
self.is_talking_sub = self.node.create_subscription(
std_msgs.msg.Bool, 'is_talking', self._is_talking_callback, 10
)
self.is_talking = False
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self._goal_to_blackboard()
current_person_id = self.blackboard.current_person_id
# Get information about recognized people from blackboard
blackboard_variable = self.blackboard.recognized_people[current_person_id][self.variable]
print(blackboard_variable)
msg = std_msgs.msg.String()
if self.variable == "actors":
msg.data = "Is your name " + blackboard_variable + "?"
elif self.variable == "drinks":
msg.data = "Is your favorite drink " + blackboard_variable + "?"
self.say_pub.publish(msg)
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self.is_talking:
return py_trees.common.Status.RUNNING
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _goal_to_blackboard(self):
self.node.get_logger().info("added goal to {0}".format(self.key))
self.blackboard.variable = self.variable
# WRITE CODE HERE
def _is_talking_callback(self, msg):
self.is_talking = msg.data
import py_trees
class GuardBehaviour(py_trees.behaviour.Behaviour):
"""
Behaviour for returning succes if a blackbord guard is True
Initializes the guard to False on creation
"""
def __init__(
self,
name: str,
blackboard_key: str
):
super(GuardBehaviour, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name = self.name)
self.blackboard.register_key(
key="guard",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", blackboard_key)
)
self.blackboard.set("guard", False, overwrite=False)
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self.blackboard.guard:
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.FAILURE
class SetGuardBehaviour(py_trees.behaviour.Behaviour):
def __init__(self, name: str, blackboard_key: str, val : bool):
super(SetGuardBehaviour, self).__init__(name = name)
self.blackboard.register_key(
key="guard",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", blackboard_key)
)
self.val = val
def initialise(self):
self.blackboard.guard = self.val
def update(self) -> py_trees.common.Status:
return py_trees.common.Status.SUCCESS
\ No newline at end of file
from .behaviour import Behaviour
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from lhw_interfaces.msg._result import Result
class HaveAConversation(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("HaveAConversation started")
self.stop_intention = args.type
at_start = args.at_start
self.stop_intention_detected = False
# ------------- Behaviour's properties --------------
self.state_name = String()
self.state_name.data = "SPEECH_WITH_PERSON"
print(at_start)
self.say(at_start)
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
def run_behaviour_loop(self):
if self.stop_intention_detected or self._behaviour_timeout():
self.run_timer.cancel()
self.node.log.info("Done with conversation")
self.send_result()
return
def response_callback(self, data):
if data.intention == self.stop_intention:
self.stop_intention_detected = True
for parameter in data.parameters:
self._add_to_db("nlp_parameters", parameter)
self.say(data.response)
return False
\ No newline at end of file
##############################################################################
# Documentation
##############################################################################
"""
Behaviours for basic motions
"""
##############################################################################
# Imports
##############################################################################
import py_trees
import lhw_interfaces.msg as lhw_msgs
import lhw_interfaces.srv as lhw_srvs
import lhw_interfaces.actions as lhw_actions
from geometry_msgs.msg import Pose
from std_msgs.msg import Bool
##############################################################################
# Behaviours
##############################################################################
class MoveForward(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(MoveForward, self).__init__(name=name)
def setup(self, **kwargs):
"""
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
"""
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
# Do something
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
class MoveToPoint(py_trees.behaviour.Behaviour):
def __init__(self, name, pose: Pose, ssh_client) -> None:
super(MoveToPoint, self).__init__(name)
self.pose = pose
self.time = 0
self.ssh_client = ssh_client
def setup(self, **kwargs):
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.fix_head_pub = self.node.fix_head_pub
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="move_base_status", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="move_base_status", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="updated", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="updated", access=py_trees.common.Access.WRITE)
self.blackboard.move_base_status = "Default Status"
def initialise(self):
print("Initialising MoveToPoint")
msg = Bool()
msg.data = True
self.fix_head_pub.publish(msg)
#Orientation
goal_orientation = '{{x: {0}, y: {1}, z: {2}, w: {3}}}'
g_o = goal_orientation.format(self.pose.orientation.x, self.pose.orientation.y, self.pose.orientation.z, self.pose.orientation.w) # Unpack the list as laid out args
print(g_o)
#Position
goal_position = '{{x: {0}, y: {1}, z: {2}}}'
g_p = goal_position.format(self.pose.position.x, self.pose.position.y, self.pose.position.z)
goal_cmd = f"rostopic pub /lhw/nav/goal_server/goal geometry_msgs/PoseStamped \"{{header: {{frame_id: 'map'}}, pose: {{position: {g_p}, orientation: {g_o} }}}}\" --once"
#print(goal_cmd)
#print("Goal cmd: {goal_cmd}")
#print(goal_cmd)
self.ssh_client.invoke_shell()
stdin, stdout, stderr = self.ssh_client.exec_command("source ~/.bash_profile && " + goal_cmd)
def update(self):
print("Listening for callback...")
#self.node.log.info(str(self.blackboard.move_base_status))
if self.blackboard.move_base_status == 3:
self.blackboard.updated = False
self.blackboard.move_base_status = 0
return py_trees.common.Status.SUCCESS
if self.blackboard.move_base_status == 4:
return py_trees.common.Status.FAILED
else:
return py_trees.common.Status.RUNNING
""" Move the robot to a specified location
"""
class SetPose(py_trees.behaviour.Behaviour):
def __init__(self, name: str, pose: Pose, ssh_client) -> None:
super(SetPose, self).__init__(name)
self.pose = pose
self.ssh_client = ssh_client
def setup(self, **kwargs):
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
print("Initialising SetPose")
position = '{{x: {0}, y: {1}, z: {2}}}'
p = position.format(self.pose.position.x, self.pose.position.y, self.pose.position.z)
orientation = '{{x: {0}, y: {1}, z: {2}, w: {3}}}'
o = orientation.format(self.pose.orientation.x, self.pose.orientation.y, self.pose.orientation.z, self.pose.orientation.w ) # Unpack the list as laid out args
pose_cmd = f"""
rostopic pub /lhw/nav/initialpose geometry_msgs/PoseWithCovarianceStamped \"header:
frame_id: 'map'
pose:
pose:
position: {p}
orientation: {o}\" --once"""
self.ssh_client.invoke_shell()
try:
print("")
stdin, stdout, stderr = self.ssh_client.exec_command("source ~/.bash_profile && " + pose_cmd, timeout = 1)#while not self.action_client.wait_for_server():
except:
print()
def update(self):
print("Setting pose...")
#Timer for next step in tree -> SetPose Doesn't need a callback (as long as stack works)
return py_trees.common.Status.SUCCESS
""" Standby
"""
class Standby(py_trees.behaviour.Behaviour):
def __init__(self, name) -> None:
super(Standby, self).__init__(name)
def update(self):
return py_trees.common.Status.RUNNING
\ No newline at end of file
->;;Sequence
| (Timer);;Timer;;10
| [Rotate Robot]
| (Timer2);;Timer;;20
| [Stop Robot]
\ No newline at end of file
from ..behaviour import Behaviour
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from lhw_interfaces.msg._result import Result
from lhw_interfaces.msg import Response
'''
SpeacCore
The behaviour that handles the AI for having a conversation.
Indata:
String Goal.type - 3 different options:
- "event" - At start, sends an event request to dialogflow with name Goal.at_start
- "say" - At start, makes the robot say Goal.at_start
- "" - Do nothing at start.
String Goal.at_start -
- Name of an event, if Goal.type is "event"
- Message that the robot should say, if Goal.type is "say"
'''
class SpeakCore(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("Speak Core started")
node.log.info("Got goal: " + str(args))
self.stop_listening = False
type_at_start = args.type
at_start = args.at_start
# ------------- Behaviour's properties --------------
self.state_name = String()
self.state_name.data = "SPEAK_CORE"
if at_start:
if type_at_start == "event":
self.send_event(at_start)
if type_at_start == "say":
self.say(at_start)
self.dialogflow_response = False
self.gpt_response = False
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
def run_behaviour_loop(self):
if self.stop_listening or self._behaviour_timeout():
self.run_timer.cancel()
self.node.log.info("Stopped listening")
self.send_result()
return
if self.dialogflow_response and self.gpt_response:
response = self.__get_response()
self.say( response )
self.dialogflow_response = False
self.gpt_response = False
def dialogflow_response_callback(self, data):
if data.source == Response.DIALOGFLOW:
self.node.log.info("Data from Dialogflow: ")
self.node.log.info(str(data))
self.dialogflow_response = data
else:
self.node.log.info("No source defined...")
def gpt_response_callback(self, data):
if data.source == Response.GPT:
self.node.log.info("Data from GPT: ")
self.node.log.info(str(data))
self.gpt_response = data
else:
self.node.log.info("No source defined...")
def __get_response(self):
low_confidence = self.dialogflow_response.confidence < 0.5
no_matching_intent_found = self.dialogflow_response.intention == "Default Fallback Intent"
if low_confidence:
self.node.log.info("Low confidence - GPT response was chosen")
return self.gpt_response.response
if no_matching_intent_found:
self.node.log.info("No matching intent - GPT response was chosen")
return self.gpt_response.response
return self.dialogflow_response.response
\ No newline at end of file
import py_trees
import std_msgs
import lhw_interfaces.msg as lhw_msgs
import time
""" Pepper says the message"""
class Say(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str,
message:str
):
super(Say, self).__init__(name=name)
self.message = message
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.say_pub = self.node.create_publisher(std_msgs.msg.String, 'say', 100)
self.is_talking_sub = self.node.create_subscription(
std_msgs.msg.Bool, 'is_talking', self._is_talking_callback, 10
)
self.is_talking = False
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
msg = std_msgs.msg.String()
msg.data = self.message
self.say_pub.publish(msg)
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self.is_talking:
return py_trees.common.Status.RUNNING
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
# WRITE CODE HERE
def _is_talking_callback(self, msg):
self.is_talking = msg.data
class Listen(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str,
category: str
):
super(Listen, self).__init__(name=name)
self.category = category
self.blackboard = py_trees.blackboard.Client(name=self.name)
self.blackboard.register_key(
key="recognized_people",
access=py_trees.common.Access.WRITE
)
self.blackboard.register_key(
key="current_person_id",
access=py_trees.common.Access.WRITE
)
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self.time_start = time.time()
self.callback_executed = False
self.dialog_sub = self.node.create_subscription(
lhw_msgs.Response,
'dialogflow_response',
self._answer_callback,
10
)
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("testing -----------------------------------------------------------")
self.logger.debug("%s.update()" % self.__class__.__name__)
# TODO: change to node timer
time_duration_sec = time.gmtime(time.time() - self.time_start).tm_sec
self.node.get_logger().info(str(time_duration_sec))
if self.callback_executed:
return py_trees.common.Status.SUCCESS
if time_duration_sec > 10.0: # If more than 10 sec --> fail
return py_trees.common.Status.FAILURE
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
# Destroy subscription if still exists
if self.dialog_sub:
self.node.destroy_subscription(self.dialog_sub)
# WRITE CODE HERE
def _answer_callback(self, msg):
# self.blackboard.recognized_people = False
print(self.blackboard)
self.logger.debug("aa parametes")
print(msg.parameters)
# self.get_logger().info(msg.parameters)
for i in range(len(msg.parameters)):
# Check if correct category was found
self.logger.debug(msg.parameters[i].key)
print(msg.parameters[i].key)
if msg.parameters[i].key == self.category:
self.callback_executed = True
self.answer = msg.parameters[i].value
self.blackboard.recognized_people[self.blackboard.current_person_id][self.category] \
= self.answer
from socket import timeout
import py_trees
#from .actions import Actions
#from naoqi_bridge_msgs.msg import HeadTouch
#TODO: Tidy this up when finished
from std_msgs.msg import Bool, String, Empty, Int16, Header, Int32
from typing import List
from lhw_interfaces.action import TabletAction
from lhw_interfaces.msg import Parameter
from typing import List
FIA_LOGO_URL = 'assets/fia_logo.png'
......@@ -17,17 +9,17 @@ def while_wait_for_server(behaviour:py_trees.behaviour.Behaviour, node):
while not behaviour.action_client.wait_for_server(timeout_sec=1):
node.log.info(f"waiting for tablet server connection")
""" Move the robot to a specified location
"""
class TabletSpeech(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
def __init__(self, name) -> None:
super(TabletSpeech, self).__init__(name)
self.node = node
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
while_wait_for_server(self, self.node)
......@@ -42,13 +34,17 @@ class TabletSpeech(py_trees.behaviour.Behaviour):
return py_trees.common.Status.SUCCESS
class TabletLoading(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
def __init__(self, name) -> None:
super(TabletLoading, self).__init__(name)
self.node = node
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
while_wait_for_server(self, self.node)
......@@ -62,14 +58,18 @@ class TabletLoading(py_trees.behaviour.Behaviour):
return py_trees.common.Status.SUCCESS
class TabletImage(py_trees.behaviour.Behaviour):
def __init__(self, name, node, url:str) -> None:
def __init__(self, name, url:str) -> None:
super(TabletImage, self).__init__(name)
self.node = node
self.url = url
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
......@@ -87,17 +87,21 @@ class TabletImage(py_trees.behaviour.Behaviour):
class TabletQuestion(py_trees.behaviour.Behaviour):
def __init__(self, name, node, text:str, options:List[str]) -> None:
def __init__(self, name, text:str, options:List[str]) -> None:
super(TabletQuestion, self).__init__(name)
self.node = node
self.text = text
self.options = options
self.chosen_option = ""
self.result = False
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
......@@ -141,13 +145,17 @@ class TabletQuestion(py_trees.behaviour.Behaviour):
class TabletShowFIALogo(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
def __init__(self, name) -> None:
super(TabletShowFIALogo, self).__init__(name)
self.node = node
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
......@@ -161,81 +169,4 @@ class TabletShowFIALogo(py_trees.behaviour.Behaviour):
def update(self):
self.node.log.info(f"UPDATE")
return py_trees.common.Status.SUCCESS
class TabletShowWebpage(py_trees.behaviour.Behaviour):
def __init__(self, name, node, url) -> None:
super(TabletShowWebpage, self).__init__(name)
self.node = node
self.action_client = self.node.tablet_action_client
self.url = url
def setup(self, **kwargs):
pass
def initialise(self):
while_wait_for_server(self, self.node)
goal = TabletAction.Goal()
goal.type = "webpage"
goal.url = self.url
self.future = self.action_client.send_goal_async(goal)
def update(self):
self.node.log.info(f"UPDATE")
return py_trees.common.Status.SUCCESS
#ros2 action send_goal tablet_action_server lhw_interfaces/action/TabletAction "{type: 'input', text: 'Vad heter du?'}"
class TabletInput(py_trees.behaviour.Behaviour):
def __init__(self, name, node, text:str, type:str) -> None:
super(TabletInput, self).__init__(name)
self.node = node
self.text = text
self.result = False
self.tablet_input = ""
self.type = type
self.action_client = self.node.tablet_action_client
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.WRITE)
def setup(self, **kwargs):
pass
def initialise(self):
while_wait_for_server(self, self.node)
goal = TabletAction.Goal()
goal.type = "input"
goal.text = self.text
self.future = self.action_client.send_goal_async(goal)
self.future.add_done_callback(self.goal_response_callback)
def update(self):
if self.result:
param = Parameter()
param.key = self.type
param.value = str(self.tablet_input)
self.blackboard.last_recognized_item = param
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.RUNNING
def terminate(self, new_status):
self.result = False
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
return
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
self.result = True
self.tablet_input = future.result().result.response
\ No newline at end of file
return py_trees.common.Status.SUCCESS
\ No newline at end of file
##############################################################################
# Documentation
##############################################################################
"""
Template behaviour that is used for implementation of new behaviours.
"""
##############################################################################
# Imports
##############################################################################
import py_trees
import lhw_interfaces.msg as lhw_msgs
import lhw_interfaces.srv as lhw_srvs
import lhw_interfaces.actions as lhw_actions
##############################################################################
# Behaviours
##############################################################################
class Template(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(Template, self).__init__(name=name)
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
# WRITE CLIENT SETUP CODE HERE
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
# WRITE CODE HERE
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
# WRITE CODE HERE
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
# WRITE CODE HERE
# ADD PRIVATE FUNCTIONS HERE "_function_name"
\ No newline at end of file
import typing
import py_trees
import rclpy
from lhw_interfaces.action import TabletAction
from naoqi_bridge_msgs.msg import HeadTouch
import typing
import std_msgs
"""
Behaviour to wait until head is touched, and blinking lights while waiting
"""
class WaitForHeadTouch(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(WaitForHeadTouch, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name = self.name)
self.blackboard.register_key(
key = "scenario_started",
access=py_trees.common.Access.WRITE
)
self.led_is_on = False
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.led_pub = self.node.create_publisher(std_msgs.msg.Bool, "turn_on_off", 10)
self.tablet_action_client = rclpy.action.ActionClient(
node=self.node,
action_type=TabletAction,
action_name="tablet_action_server"
)
if not self.tablet_action_client.wait_for_server(timeout_sec=3.0):
raise RuntimeError("client timed out waiting for server tablet_action_server")
self.logger.debug("Action client successfully created")
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
Creates a timer that toggles all LED:s every second.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self.timer = self.node.create_timer(1.0, self._toggle_leds_callback)
self.head_touch_sub = self.node.create_subscription(HeadTouch, 'head_touch', self._head_touch_callback, 10)
goal = TabletAction.Goal()
goal.type = "text"
goal.text = "Slap my head, boi"
self.send_goal_future = self.tablet_action_client.send_goal_async(
goal,
feedback_callback=None
)
self.logger.debug("Action goal sent")
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self.blackboard.scenario_started:
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
self.timer.destroy()
self.node.destroy_subscription(self.head_touch_sub)
if not self.led_is_on:
msg = std_msgs.msg.Bool()
msg.data = True
self.led_pub.publish(msg)
goal = TabletAction.Goal()
goal.type = "text"
goal.text = "Scenario Started"
self.send_goal_future = self.tablet_action_client.send_goal_async(
goal,
feedback_callback=None
)
def shutdown(self):
"""
This function is executed when the tree is destroyed. Destroy the publisher created in setup.
"""
goal = TabletAction.Goal()
goal.type = "image"
goal.url = "assets/fia_logo.png"
self.send_goal_future = self.tablet_action_client.send_goal_async(
goal,
feedback_callback=None
)
self.led_pub.destroy()
self.tablet_action_client.destroy()
def _toggle_leds_callback(self):
msg = std_msgs.msg.Bool()
msg.data = not self.led_is_on
self.led_is_on = not self.led_is_on
self.led_pub.publish(msg)
def _head_touch_callback(self, msg):
self.blackboard.scenario_started = True
#
# Assignee: Markus Handstedt (@marha066)
# Issue: #115 Create service for using face recognition
# Branch: 115-create-service-for-using-face-recognition
#
##############################################################################
# Documentation
##############################################################################
"""
Behaviour for a analyse faces service call to the visage_face_analysis server.
"""
##############################################################################
# Imports
##############################################################################
import typing
import py_trees
import lhw_interfaces.msg as lhw_msgs
import lhw_interfaces.srv as lhw_srvs
import lhw_interfaces.action as lhw_action
import std_msgs
##############################################################################
# Behaviours
##############################################################################
class AnalyseFaces(py_trees.behaviour.Behaviour):
"""
This behaviour makes a service call to the visage_face_analysis server
over the service '/analyse_faces'. The behaviour makes several request to
the server in order to validate the data and return :attr:`~py_trees.common.Status.RUNNING`.
When it has recieved 3 sequent responses with the same FaceDetection.id from the server
it then stores the FaceDetection data in the given blackboard and returns
:attr:`~py_trees.common.Status.SUCCESS`.
Service Client:
* **/analyse_faces** (:class:`lhw_interfaces.srv.AnalyseFaces`)
Args;
name: name of the behaviour
blackboard: blackboard object from tree node (:class:`py_trees.blackboard.Client`)
"""
def __init__(
self,
name: str
):
super(AnalyseFaces, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(key="found_new_person", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="recognized_people", access=py_trees.common.Access.WRITE)
self.blackboard.found_new_person = False
self.blackboard.recognized_people = {}
self.iterations_done = 0
self.analysed_faces = None
self.request_id = None
self.response_id = None
def setup(self, **kwargs):
"""
Setup the service client which will send face analyse requests to the visage_face_analysis server.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
# analyse faces service client
self.analyse_faces_client = self.node.create_client(lhw_srvs.AnalyseFaces, '/analyse_faces')
if not self.analyse_faces_client.wait_for_service(timeout_sec=3.0):
raise RuntimeError("client timed out waiting for server visage_face_analysis")
def initialise(self):
"""
Reset the iterations_done counter and the analyse_faces srv.result() object and
triggers a service request to the visage_face_analysis server.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self.iterations_done = 0
self.analysed_faces = None
self.request_id = 0
self.response_id = None
self._send_analyse_faces_request()
def update(self) -> py_trees.common.Status:
"""
Ceep make service calls to the visage_face_analysis server until valid data is recieved.
Returns:
When data not yet valid or service response not done :attr:`~py_trees.common.Status.RUNNING`
When data valid and stored in blackboard :attr:`~py_trees.common.Status.SUCCESS`
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
all_done = False
if self.iterations_done >= 3:
all_done = True
# wait for analyse faces to return
if not all_done:
if self._process_analyse_faces_response():
self._send_analyse_faces_request()
return py_trees.common.Status.RUNNING
# add analysed faces to blackboard
self.request_id = 0
self._add_analysed_faces_to_blackboard()
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
Only logs the terminate call.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _send_analyse_faces_request(self):
self.request_id += 1
self.feedback_message = "sent analyse faces request {0}".format(self.request_id)
self.node.get_logger().info(self.feedback_message)
request = lhw_srvs.AnalyseFaces.Request()
self.analyse_faces_future = self.analyse_faces_client.call_async(request)
def _process_analyse_faces_response(self) -> bool:
if not self.analyse_faces_future.done():
return False
if self.analyse_faces_future.result() is None:
self.feedback_message = "Service call to visage_face_analysis failed %r" % (self.analyse_faces_future.exception(),)
self.node.get_logger().error(self.feedback_message)
faces_detected = self.analyse_faces_future.result().faces_detected
if self.analysed_faces is not None:
previous_faces_detected = self.analysed_faces.faces_detected
same_faces = False
self.node.get_logger().info("faces_detected: {0}".format(faces_detected))
self.node.get_logger().info("previous_faces_detected: {0}".format(previous_faces_detected))
if len(faces_detected) == len(previous_faces_detected):
same_faces = True
for (curr_id, prev_id) in zip(faces_detected, previous_faces_detected):
if curr_id != prev_id:
same_faces = False
if not same_faces:
self.feedback_message = "not same as previous result"
self.node.get_logger().info(self.feedback_message)
self.iterations_done = 0
self.iterations_done += 1
self.analysed_faces = self.analyse_faces_future.result()
self.response_id = self.request_id
self.node.get_logger().info("response recieved {0}".format(self.response_id))
return True
def _add_analysed_faces_to_blackboard(self):
if self.analysed_faces is not None:
for face in self.analysed_faces.faces_detected:
if face.id not in self.blackboard.recognized_people:
self.blackboard.found_new_person = True
self.blackboard.recognized_people[face.id] = {
"z_index" : face.z_index,
"name" : None,
"drinks" : None,
"age" : face.age,
"face_center" : [face.face_center_xy[0], face.face_center_xy[1]]
}
else:
self.blackboard.found_new_person = False
self.blackboard.recognized_people[face.id]["z_index"] = face.z_index
self.blackboard.recognized_people[face.id]["face_center"] = [face.face_center_xy[0], face.face_center_xy[1]]
# Only change age if it is lower than previous, prevents age bouncing around
if face.age < self.blackboard.recognized_people[face.id]["age"]:
self.blackboard.recognized_people[face.id]["age"] = face.age
class PixelToXYZ(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(PixelToXYZ, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(key="point_to_pixel", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="point_to_coordinate", access=py_trees.common.Access.WRITE)
def setup(self, **kwargs):
"""
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
# service client
self.client = self.node.create_client(lhw_srvs.PixelToXYZ, '/pixel_to_xyz')
if not self.client.wait_for_service(timeout_sec=3.0):
raise RuntimeError("client timed out waiting for server depth_node")
def initialise(self):
"""
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self._send_request()
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self._process_response():
self._add_to_blackboard()
return py_trees.common.Status.SUCCESS
else:
# self._send_request()
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _send_request(self):
self.feedback_message = "sending pixel to xyz request..."
self.node.get_logger().info(self.feedback_message)
request = lhw_srvs.PixelToXYZ.Request()
request.row = int(self.blackboard.point_to_pixel[0]/4)
request.col = int(self.blackboard.point_to_pixel[1]/4)
self.future = self.client.call_async(request)
def _process_response(self):
if not self.future.done():
return False
if self.future.result() is None:
self.feedback_message = "Service call to depth_node failed %r" % (self.future.exception(),)
self.node.get_logger().error(self.feedback_message)
return True
def _add_to_blackboard(self):
self.blackboard.point_to_coordinate = self.future.result().xyz
class ToBlackboard(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str,
key: str,
variable: typing.Any
):
super(ToBlackboard, self).__init__(name=name)
self.key = key
self.variable = variable
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(
key="variable",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", key)
)
def setup(self, **kwargs):
"""
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
"""
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self._goal_to_blackboard()
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _goal_to_blackboard(self):
self.node.get_logger().info("added goal to {0}".format(self.key))
self.blackboard.variable = self.variable
class Scan(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(Scan, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(key="colors", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="text", access=py_trees.common.Access.WRITE)
def setup(self, **kwargs):
"""
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
# scan service client
self.scan_client = self.node.create_client(lhw_srvs.Scan, '/scan')
if not self.scan_client.wait_for_service(timeout_sec=3.0):
raise RuntimeError("client timed out waiting for server scan")
def initialise(self):
"""
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self._send_scan_request()
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
# wait for scan to return
if self._process_scan_response():
self._add_to_blackboard()
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _send_scan_request(self):
self.node.get_logger().info("added goal to {0}".format(self.key))
request = lhw_srvs.Scan.Request()
self.scan_future = self.scan_client.call_async(request)
def _process_scan_response(self):
if not self.scan_future.done():
return False
if self.scan_future.result() is None:
self.feedback_message = "Service call to scan server failed %r" % (self.scan_future.exception(),)
self.node.get_logger().error(self.feedback_message)
return True
def _add_to_blackboard(self):
self.blackboard.colors = self.scan_future.result().colors
self.blackboard.text = self.scan_future.result().text
class DiscoverObject(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(DiscoverObject, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(key="colors", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="text", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="data", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="object", access=py_trees.common.Access.WRITE)
def setup(self, **kwargs):
"""
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
# discover_object service client
self.discover_object_client = self.node.create_client(lhw_srvs.DiscoverObject, '/discover_object')
if not self.discover_object_client.wait_for_service(timeout_sec=3.0):
raise RuntimeError("client timed out waiting for server discover_object")
def initialise(self):
"""
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self._send_discover_object_request()
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
# wait for scan to return
if self._process_discover_object_response():
self._add_to_blackboard()
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _send_discover_object_request(self):
self.node.get_logger().info("added goal to blackboard")
request = lhw_srvs.DiscoverObject.Request()
request.data = self.blackboard.colors + ". " + self.blackboard.text + ". " + self.blackboard.data
self.discover_object_future = self.discover_object_client.call_async(request)
def _process_discover_object_response(self):
if not self.discover_object_future.done():
return False
if self.discover_object_future.result() is None:
self.feedback_message = "Service call to discover_object server failed %r" % (self.discover_object_future.exception(),)
self.node.get_logger().error(self.feedback_message)
return True
def _add_to_blackboard(self):
self.blackboard.object = self.discover_object_future.result().object
class Say(py_trees.behaviour.Behaviour):
def __init__(
self,
name,
message: str,
from_blackboard: bool,
check_if_True: bool
) -> None:
super(Say, self).__init__(name)
self.message = message
self.from_blackboard = from_blackboard
self.blackboard = py_trees.blackboard.Client(name=self.name)
self.blackboard.register_key(key="is_talking", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="is_talking", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="object", access=py_trees.common.Access.READ)
def setup(self, **kwargs):
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.say_pub = self.node.say_publisher
def initialise(self):
msg = std_msgs.String()
if (self.from_blackboard):
msg.data = self.message + self.blackboard.object
else:
msg.data = self.message
self.say_pub.publish(msg)
self.blackboard.is_talking = True
def update(self):
if self.blackboard.is_talking:
return py_trees.common.Status.RUNNING
return py_trees.common.Status.SUCCESS
class Listen(py_trees.behaviour.Behaviour):
def __init__(self, name, key: str) -> None:
super(Listen, self).__init__(name)
self.key = key
self.blackboard = py_trees.blackboard.Client(name=self.name)
self.blackboard.register_key(
key="answer",
access=py_trees.common.Access.READ,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", key)
)
self.blackboard.register_key(
name="object",
access=py_trees.common.Access.READ
)
def setup(self, **kwargs):
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.dialog_sub = self.node.create_subscription(
lhw_msgs.Response,
'dialogflow_response',
self.answer_callback,
10
)
self.finished = False
self.answer = None
def update(self):
if self.finished:
return py_trees.common.Status.SUCCESS
if (self.check_if_true):
if (self.blackboard.object in self.answer):
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.FAILURE
return py_trees.common.Status.RUNNING
def answer_callback(self, msg):
self.answer = msg.recognized_text
self.blackboard.answer += ". " + msg.recognized_text
self.finished = True
import py_trees
from .behaviour_copy import Behaviour #TODO: Move behaviours to here
#TODO: Tidy this up when finished
from std_msgs.msg import Bool, String, Empty, Int16
from lhw_interfaces.msg import Entities, Response, Entity, Result
from geometry_msgs.msg import Twist, Pose, PoseStamped
import numpy as np
# class Foo(py_trees.behaviour.Behaviour):
# def __init__(self, name):
# """
# Minimal one-time initialisation. A good rule of thumb is
# to only include the initialisation relevant for being able
# to insert this behaviour in a tree for offline rendering to
# dot graphs.
# Other one-time initialisation requirements should be met via
# the setup() method.
# """
# super(Foo, self).__init__(name)
# def setup(self):
# """
# When is this called?
# This function should be either manually called by your program
# to setup this behaviour alone, or more commonly, via
# :meth:`~py_trees.behaviour.Behaviour.setup_with_descendants`
# or :meth:`~py_trees.trees.BehaviourTree.setup`, both of which
# will iterate over this behaviour, it's children (it's children's
# children ...) calling :meth:`~py_trees.behaviour.Behaviour.setup`
# on each in turn.
# If you have vital initialisation necessary to the success
# execution of your behaviour, put a guard in your
# :meth:`~py_trees.behaviour.Behaviour.initialise` method
# to protect against entry without having been setup.
# What to do here?
# Delayed one-time initialisation that would otherwise interfere
# with offline rendering of this behaviour in a tree to dot graph
# or validation of the behaviour's configuration.
# Good examples include:
# - Hardware or driver initialisation
# - Middleware initialisation (e.g. ROS pubs/subs/services)
# - A parallel checking for a valid policy configuration after
# children have been added or removed
# """
# self.logger.debug(" %s [Foo::setup()]" % self.name)
# def initialise(self):
# """
# When is this called?
# The first time your behaviour is ticked and anytime the
# status is not RUNNING thereafter.
# What to do here?
# Any initialisation you need before putting your behaviour
# to work.
# """
# self.logger.debug(" %s [Foo::initialise()]" % self.name)
# def update(self):
# """
# When is this called?
# Every time your behaviour is ticked.
# What to do here?
# - Triggering, checking, monitoring. Anything...but do not block!
# - Set a feedback message
# - return a py_trees.common.Status.[RUNNING, SUCCESS, FAILURE]
# """
# self.logger.debug(" %s [Foo::update()]" % self.name)
# ready_to_make_a_decision = random.choice([True, False])
# decision = random.choice([True, False])
# if not ready_to_make_a_decision:
# return py_trees.common.Status.RUNNING
# elif decision:
# self.feedback_message = "We are not bar!"
# return py_trees.common.Status.SUCCESS
# else:
# self.feedback_message = "Uh oh"
# return py_trees.common.Status.FAILURE
# def terminate(self, new_status):
# """
# When is this called?
# Whenever your behaviour switches to a non-running state.
# - SUCCESS || FAILURE : your behaviour's work cycle has finished
# - INVALID : a higher priority branch has interrupted, or shutting down
# """
# self.logger.debug(" %s [Foo::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
""" Rotates the robot counter (?) clockwise.
"""
class RotateRobot(py_trees.behaviour.Behaviour):
def __init__(self, name, node):
super(RotateRobot, self).__init__(name)
self.node = node
def setup(self, **kwargs):
pass
def update(self):
# self.node.log.info("Rotate robot")
# msg = Twist()
# msg.angular.z = -1.0
# self.node.cmd_vel_publisher.publish(msg)
return py_trees.common.Status.SUCCESS
def terminate(self, new_status):
pass
\ No newline at end of file
#!/usr/bin/env python
#
# License: BSD
# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
.. argparse::
:module: py_trees.demos.action
:func: command_line_argument_parser
:prog: py-trees-demo-action-behaviour
.. image:: images/action.gif
"""
##############################################################################
# Imports
##############################################################################
import py_trees
import argparse
import atexit
import multiprocessing
import py_trees.common
import time
import py_trees.console as console
##############################################################################
# Classes
##############################################################################
def description():
content = "Demonstrates the characteristics of a typical 'action' behaviour.\n"
content += "\n"
content += "* Mocks an external process and connects to it in the setup() method\n"
content += "* Kickstarts new goals with the external process in the initialise() method\n"
content += "* Monitors the ongoing goal status in the update() method\n"
content += "* Determines RUNNING/SUCCESS pending feedback from the external process\n"
if py_trees.console.has_colours:
banner_line = console.green + "*" * 79 + "\n" + console.reset
s = "\n"
s += banner_line
s += console.bold_white + "Action Behaviour".center(79) + "\n" + console.reset
s += banner_line
s += "\n"
s += content
s += "\n"
s += banner_line
else:
s = content
return s
def epilog():
if py_trees.console.has_colours:
return console.cyan + "And his noodly appendage reached forth to tickle the blessed...\n" + console.reset
else:
return None
def command_line_argument_parser():
return argparse.ArgumentParser(description=description(),
epilog=epilog(),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
def planning(pipe_connection):
"""
Emulates an external process which might accept long running planning jobs.
"""
idle = True
percentage_complete = 0
try:
while(True):
if pipe_connection.poll():
pipe_connection.recv()
percentage_complete = 0
idle = False
if not idle:
percentage_complete += 10
pipe_connection.send([percentage_complete])
if percentage_complete == 100:
idle = True
time.sleep(0.5)
except KeyboardInterrupt:
pass
class Action(py_trees.behaviour.Behaviour):
"""
Connects to a subprocess to initiate a goal, and monitors the progress
of that goal at each tick until the goal is completed, at which time
the behaviour itself returns with success or failure (depending on
success or failure of the goal itself).
This is typical of a behaviour that is connected to an external process
responsible for driving hardware, conducting a plan, or a long running
processing pipeline (e.g. planning/vision).
Key point - this behaviour itself should not be doing any work!
"""
def __init__(self, name="Action"):
"""
Default construction.
"""
super(Action, self).__init__(name)
self.logger.debug("%s.__init__()" % (self.__class__.__name__))
def setup(self):
"""
No delayed initialisation required for this example.
"""
self.logger.debug("%s.setup()->connections to an external process" % (self.__class__.__name__))
self.parent_connection, self.child_connection = multiprocessing.Pipe()
self.planning = multiprocessing.Process(target=planning, args=(self.child_connection,))
atexit.register(self.planning.terminate)
self.planning.start()
def initialise(self):
"""
Reset a counter variable.
"""
self.logger.debug("%s.initialise()->sending new goal" % (self.__class__.__name__))
self.parent_connection.send(['new goal'])
self.percentage_completion = 0
def update(self):
"""
Increment the counter and decide upon a new status result for the behaviour.
"""
new_status = py_trees.common.Status.RUNNING
if self.parent_connection.poll():
self.percentage_completion = self.parent_connection.recv().pop()
if self.percentage_completion == 100:
new_status = py_trees.common.Status.SUCCESS
if new_status == py_trees.common.Status.SUCCESS:
self.feedback_message = "Processing finished"
self.logger.debug("%s.update()[%s->%s][%s]" % (self.__class__.__name__, self.status, new_status, self.feedback_message))
else:
self.feedback_message = "{0}%".format(self.percentage_completion)
self.logger.debug("%s.update()[%s][%s]" % (self.__class__.__name__, self.status, self.feedback_message))
return new_status
def terminate(self, new_status):
"""
Nothing to clean up in this example.
"""
self.logger.debug("%s.terminate()[%s->%s]" % (self.__class__.__name__, self.status, new_status))
##############################################################################
# Main
##############################################################################
def main():
"""
Entry point for the demo script.
"""
command_line_argument_parser().parse_args()
print(description())
py_trees.logging.level = py_trees.logging.Level.DEBUG
action = Action()
action.setup()
try:
for unused_i in range(0, 12):
action.tick_once()
time.sleep(0.5)
print("\n")
except KeyboardInterrupt:
pass
\ No newline at end of file
import py_trees
from .behaviour_copy import Behaviour #TODO: Move behaviours to here
#TODO: Tidy this up when finished
from std_msgs.msg import Bool, String, Empty, Int16
from lhw_interfaces.msg import Entities, Response, Entity, Result
from geometry_msgs.msg import Twist, Pose, PoseStamped
import numpy as np
""" Standby """
class Standby (py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(Standby, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.logger.debug(" %s [Standby::setup()]" % self.name)
def initialise(self):
# todo: make pepper do the standby pose
self.logger.debug(" %s [Standby::initialise()]" % self.name)
def update(self):
self.logger.debug(" %s [Standby::update()]" % self.name)
return py_trees.common.Status.RUNNING
def terminate(self, new_status):
self.logger.debug(" %s [Standby::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
""" Is Head Touched """
class IsHeadTouched (py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(Standby, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.logger.debug(" %s [IsHeadTouched::setup()]" % self.name)
def initialise(self):
# todo: make pepper do the standby pose
self.logger.debug(" %s [IsHeadTouched::initialise()]" % self.name)
def update(self):
self.logger.debug(" %s [IsHeadTouched::update()]" % self.name)
if (head_touched):
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.FAILURE
def terminate(self, new_status):
self.logger.debug(" %s [IsHeadTouched::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
\ No newline at end of file
import py_trees
import py_trees_ros
import random
class ExampleBehaviour(py_trees.behaviour.Behaviour):
def __init__(self, name):
"""
Minimal one-time initialisation. A good rule of thumb is
to only include the initialisation relevant for being able
to insert this behaviour in a tree for offline rendering to
dot graphs.
Other one-time initialisation requirements should be met via
the setup() method.
"""
super(ExampleBehaviour, self).__init__(name)
def setup(self):
"""
When is this called?
This function should be either manually called by your program
to setup this behaviour alone, or more commonly, via
:meth:`~py_trees.behaviour.Behaviour.setup_with_descendants`
or :meth:`~py_trees.trees.BehaviourTree.setup`, both of which
will iterate over this behaviour, it's children (it's children's
children ...) calling :meth:`~py_trees.behaviour.Behaviour.setup`
on each in turn.
If you have vital initialisation necessary to the success
execution of your behaviour, put a guard in your
:meth:`~py_trees.behaviour.Behaviour.initialise` method
to protect against entry without having been setup.
What to do here?
Delayed one-time initialisation that would otherwise interfere
with offline rendering of this behaviour in a tree to dot graph
or validation of the behaviour's configuration.
Good examples include:
- Hardware or driver initialisation
- Middleware initialisation (e.g. ROS pubs/subs/services)
- A parallel checking for a valid policy configuration after
children have been added or removed
"""
self.logger.debug(" %s [ExampleBehaviour::setup()]" % self.name)
def initialise(self):
"""
When is this called?
The first time your behaviour is ticked and anytime the
status is not RUNNING thereafter.
What to do here?
Any initialisation you need before putting your behaviour
to work.
"""
self.logger.debug(" %s [ExampleBehaviour::initialise()]" % self.name)
def update(self):
"""
When is this called?
Every time your behaviour is ticked.
What to do here?
- Triggering, checking, monitoring. Anything...but do not block!
- Set a feedback message
- return a py_trees.common.Status.[RUNNING, SUCCESS, FAILURE]
"""
self.logger.debug(" %s [ExampleBehaviour::update()]" % self.name)
ready_to_make_a_decision = random.choice([True, False])
decision = random.choice([True, False])
if not ready_to_make_a_decision:
return py_trees.common.Status.RUNNING
elif decision:
self.feedback_message = "We are not bar!"
return py_trees.common.Status.SUCCESS
else:
self.feedback_message = "Uh oh"
return py_trees.common.Status.FAILURE
def terminate(self, new_status):
"""
When is this called?
Whenever your behaviour switches to a non-running state.
- SUCCESS || FAILURE : your behaviour's work cycle has finished
- INVALID : a higher priority branch has interrupted, or shutting down
"""
self.logger.debug(" %s [ExampleBehaviour::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
\ No newline at end of file
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from lhw_interfaces.msg import Response, Event
'''
OBS! This node is only for testing.
'''
class Tester(Node):
def __init__(self):
super().__init__('DFNOde')
self.publisher_ = self.create_publisher(Response, 'dialogflow_response', 10)
response_sub = self.create_subscription(Event, 'nlp_event', self.event_callback, 1)
def send_dialogflow_msg(self, response, recognized_text = ""):
msg = Response()
msg.source = Response.DIALOGFLOW
msg.recognized_text = recognized_text
msg.response = response
msg.confidence = 0.0
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg)
def event_callback(self, data):
self.get_logger().info('Received: "%s"' % data)
self.get_logger().info('Sending event')
self.get_logger().info('Receiving response from dialogflow')
self.send_dialogflow_msg("Thank you for your event")
def main(args=None):
rclpy.init(args=args)
ros_node = Tester()
rclpy.spin(ros_node)
rclpy.shutdown()
if __name__ == '__main__':
main()
\ No newline at end of file
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from lhw_interfaces.msg import Response, Event
'''
OBS! This node is only for testing.
'''
class Tester(Node):
def __init__(self):
super().__init__('GPTNode')
self.publisher_ = self.create_publisher(Response, 'gpt_response', 10)
dialogflow_sub = self.create_subscription(Response, 'dialogflow_response', self.dialogflow_callback, 1)
say_sub = self.create_subscription(String, 'say', self.say_callback, 1)
def send_gpt_msg(self, response):
msg = Response()
msg.source = Response.GPT
msg.response = response
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg)
def dialogflow_callback(self, data):
self.get_logger().info('GPT Received: "%s"' % data)
self.send_gpt_msg("Hi from GPT")
def say_callback(self, data):
message = data.data
# Save message for Machine learning
def main(args=None):
rclpy.init(args=args)
ros_node = Tester()
rclpy.spin(ros_node)
rclpy.shutdown()
if __name__ == '__main__':
main()
\ No newline at end of file
class DataBase:
def __init__(self):
self.categories = {
"furniture": ["table", "chair"],
"persons": ["person"],
"fruits": ["apple", "banana"]
}
self.seen_entities = {}
self.nlp_parameters = []
def add_to_db(self, collection, element):
if collection == "seen_entities":
self.seen_entities[element["id"]] = element
print(self.seen_entities)
if collection == "nlp_parameters":
self.nlp_parameters.append(element)
print(self.seen_entities)
def get_categories_of_type(self, type):
possible_categories = []
for category, types_in_category in self.categories.items():
if type in types_in_category:
possible_categories.append(category)
return possible_categories
def get_entity(self, id):
return self.seen_entities.get(id)