Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • liuhomewreckers/liu-home-wreckers
  • tdde19-2021-5/tdde-19-group-5-pepper
2 results
Show changes
Showing
with 1048 additions and 1474 deletions
from .behaviour import Behaviour
from std_msgs.msg import String
from lhw_interfaces.msg._result import Result
class GeneralBehaviour(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("GeneralBehaviour started")
# ------------- State's properties --------------
self.state_done = False
self.state_name = String()
self.state_name.data = "GENERAL_STATE"
self.run_timer = self.node.create_timer(1, self.run)
def run(self):
if self.state_done or self._behaviour_timeout():
self.run_timer.cancel()
self.stop_robot()
self.send_result()
return
#def subscriber_cb(self, data):
# self.state_done = True
from .behaviour import Behaviour
from ..help_functions.Map import Map
from std_msgs.msg import String
from lhw_interfaces.msg._result import Result
from nav_msgs.msg import OccupancyGrid
from geometry_msgs.msg import PoseStamped
import numpy as np
from scipy.spatial.distance import euclidean
from PIL import Image
import os
METER_PER_PIXEL = 0.01
class GoToEntity(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("GoToEntity started")
# ------------- State's properties --------------
self.state_done = False
self.specific_id = args.id
self.state_name = String()
self.state_name.data = "GO_TO_ENTITY"
self.goal_reached = False
self.relative_pose = False
goal_entity = self.node.db.get_entity(self.specific_id)
self.goal_pose = self._position_to_pose(goal_entity["position"])
self.move_robot_to_pose(self.goal_pose)
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
#TEMPORARY
self.moving_to_goal_counter = 5
def run_behaviour_loop(self):
if self.state_done or self._behaviour_timeout():
self.run_timer.cancel()
self.stop_robot()
self.send_result()
return
self.node.log.info("GoToEntity looop")
if self.relative_pose:
self.move_robot_to_relative_pose(relative_pose)
#TEMPORARY
self.moving_to_goal_counter -= 1
if not self.moving_to_goal_counter:
self.navigate_to_pose_done_callback("done")
def entities_callback(self, data):
for entity in data.entities:
if entity.id == self.specific_id:
self.relative_pose = self._relative_xyz_to_relative_pose(entity.xyz)
def navigate_to_pose_done_callback(self, result):
self.state_done = True
import py_trees
class GuardBehaviour(py_trees.behaviour.Behaviour):
"""
Behaviour for returning succes if a blackbord guard is True
Initializes the guard to False on creation
"""
def __init__(
self,
name: str,
blackboard_key: str
):
super(GuardBehaviour, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name = self.name)
self.blackboard.register_key(
key="guard",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", blackboard_key)
)
self.blackboard.set("guard", False, overwrite=False)
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self.blackboard.guard:
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.FAILURE
class SetGuardBehaviour(py_trees.behaviour.Behaviour):
def __init__(self, name: str, blackboard_key: str, val : bool):
super(SetGuardBehaviour, self).__init__(name = name)
self.blackboard.register_key(
key="guard",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", blackboard_key)
)
self.val = val
def initialise(self):
self.blackboard.guard = self.val
def update(self) -> py_trees.common.Status:
return py_trees.common.Status.SUCCESS
\ No newline at end of file
from .behaviour import Behaviour
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from lhw_interfaces.msg._result import Result
class HaveAConversation(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("HaveAConversation started")
self.stop_intention = args.type
at_start = args.at_start
self.stop_intention_detected = False
# ------------- Behaviour's properties --------------
self.state_name = String()
self.state_name.data = "SPEECH_WITH_PERSON"
print(at_start)
self.say(at_start)
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
def run_behaviour_loop(self):
if self.stop_intention_detected or self._behaviour_timeout():
self.run_timer.cancel()
self.node.log.info("Done with conversation")
self.send_result()
return
def response_callback(self, data):
if data.intention == self.stop_intention:
self.stop_intention_detected = True
for parameter in data.parameters:
self._add_to_db("nlp_parameters", parameter)
self.say(data.response)
return False
\ No newline at end of file
##############################################################################
# Documentation
##############################################################################
"""
Behaviours for basic motions
"""
##############################################################################
# Imports
##############################################################################
import py_trees
import lhw_interfaces.msg as lhw_msgs
import lhw_interfaces.srv as lhw_srvs
import lhw_interfaces.actions as lhw_actions
from geometry_msgs.msg import Pose
from std_msgs.msg import Bool
##############################################################################
# Behaviours
##############################################################################
class MoveForward(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(MoveForward, self).__init__(name=name)
def setup(self, **kwargs):
"""
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
"""
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
# Do something
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
class MoveToPoint(py_trees.behaviour.Behaviour):
def __init__(self, name, pose: Pose, ssh_client) -> None:
super(MoveToPoint, self).__init__(name)
self.pose = pose
self.time = 0
self.ssh_client = ssh_client
def setup(self, **kwargs):
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.fix_head_pub = self.node.fix_head_pub
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="move_base_status", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="move_base_status", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="updated", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="updated", access=py_trees.common.Access.WRITE)
self.blackboard.move_base_status = "Default Status"
def initialise(self):
print("Initialising MoveToPoint")
msg = Bool()
msg.data = True
self.fix_head_pub.publish(msg)
#Orientation
goal_orientation = '{{x: {0}, y: {1}, z: {2}, w: {3}}}'
g_o = goal_orientation.format(self.pose.orientation.x, self.pose.orientation.y, self.pose.orientation.z, self.pose.orientation.w) # Unpack the list as laid out args
print(g_o)
#Position
goal_position = '{{x: {0}, y: {1}, z: {2}}}'
g_p = goal_position.format(self.pose.position.x, self.pose.position.y, self.pose.position.z)
goal_cmd = f"rostopic pub /lhw/nav/goal_server/goal geometry_msgs/PoseStamped \"{{header: {{frame_id: 'map'}}, pose: {{position: {g_p}, orientation: {g_o} }}}}\" --once"
#print(goal_cmd)
#print("Goal cmd: {goal_cmd}")
#print(goal_cmd)
self.ssh_client.invoke_shell()
stdin, stdout, stderr = self.ssh_client.exec_command("source ~/.bash_profile && " + goal_cmd)
def update(self):
print("Listening for callback...")
#self.node.log.info(str(self.blackboard.move_base_status))
if self.blackboard.move_base_status == 3:
self.blackboard.updated = False
self.blackboard.move_base_status = 0
return py_trees.common.Status.SUCCESS
if self.blackboard.move_base_status == 4:
return py_trees.common.Status.FAILED
else:
return py_trees.common.Status.RUNNING
""" Move the robot to a specified location
"""
class SetPose(py_trees.behaviour.Behaviour):
def __init__(self, name: str, pose: Pose, ssh_client) -> None:
super(SetPose, self).__init__(name)
self.pose = pose
self.ssh_client = ssh_client
def setup(self, **kwargs):
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
print("Initialising SetPose")
position = '{{x: {0}, y: {1}, z: {2}}}'
p = position.format(self.pose.position.x, self.pose.position.y, self.pose.position.z)
orientation = '{{x: {0}, y: {1}, z: {2}, w: {3}}}'
o = orientation.format(self.pose.orientation.x, self.pose.orientation.y, self.pose.orientation.z, self.pose.orientation.w ) # Unpack the list as laid out args
pose_cmd = f"""
rostopic pub /lhw/nav/initialpose geometry_msgs/PoseWithCovarianceStamped \"header:
frame_id: 'map'
pose:
pose:
position: {p}
orientation: {o}\" --once"""
self.ssh_client.invoke_shell()
try:
print("")
stdin, stdout, stderr = self.ssh_client.exec_command("source ~/.bash_profile && " + pose_cmd, timeout = 1)#while not self.action_client.wait_for_server():
except:
print()
def update(self):
print("Setting pose...")
#Timer for next step in tree -> SetPose Doesn't need a callback (as long as stack works)
return py_trees.common.Status.SUCCESS
""" Standby
"""
class Standby(py_trees.behaviour.Behaviour):
def __init__(self, name) -> None:
super(Standby, self).__init__(name)
def update(self):
return py_trees.common.Status.RUNNING
\ No newline at end of file
->;;Sequence
| (Timer);;Timer;;10
| [Rotate Robot]
| (Timer2);;Timer;;20
| [Stop Robot]
\ No newline at end of file
from ..behaviour import Behaviour
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from lhw_interfaces.msg._result import Result
from lhw_interfaces.msg import Response
'''
SpeacCore
The behaviour that handles the AI for having a conversation.
Indata:
String Goal.type - 3 different options:
- "event" - At start, sends an event request to dialogflow with name Goal.at_start
- "say" - At start, makes the robot say Goal.at_start
- "" - Do nothing at start.
String Goal.at_start -
- Name of an event, if Goal.type is "event"
- Message that the robot should say, if Goal.type is "say"
'''
class SpeakCore(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("Speak Core started")
node.log.info("Got goal: " + str(args))
self.stop_listening = False
type_at_start = args.type
at_start = args.at_start
# ------------- Behaviour's properties --------------
self.state_name = String()
self.state_name.data = "SPEAK_CORE"
if at_start:
if type_at_start == "event":
self.send_event(at_start)
if type_at_start == "say":
self.say(at_start)
self.dialogflow_response = False
self.gpt_response = False
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
def run_behaviour_loop(self):
if self.stop_listening or self._behaviour_timeout():
self.run_timer.cancel()
self.node.log.info("Stopped listening")
self.send_result()
return
if self.dialogflow_response and self.gpt_response:
response = self.__get_response()
self.say( response )
self.dialogflow_response = False
self.gpt_response = False
def dialogflow_response_callback(self, data):
if data.source == Response.DIALOGFLOW:
self.node.log.info("Data from Dialogflow: ")
self.node.log.info(str(data))
self.dialogflow_response = data
else:
self.node.log.info("No source defined...")
def gpt_response_callback(self, data):
if data.source == Response.GPT:
self.node.log.info("Data from GPT: ")
self.node.log.info(str(data))
self.gpt_response = data
else:
self.node.log.info("No source defined...")
def __get_response(self):
low_confidence = self.dialogflow_response.confidence < 0.5
no_matching_intent_found = self.dialogflow_response.intention == "Default Fallback Intent"
if low_confidence:
self.node.log.info("Low confidence - GPT response was chosen")
return self.gpt_response.response
if no_matching_intent_found:
self.node.log.info("No matching intent - GPT response was chosen")
return self.gpt_response.response
return self.dialogflow_response.response
\ No newline at end of file
import py_trees
import std_msgs
import lhw_interfaces.msg as lhw_msgs
import time
""" Pepper says the message"""
class Say(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str,
message:str
):
super(Say, self).__init__(name=name)
self.message = message
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.say_pub = self.node.create_publisher(std_msgs.msg.String, 'say', 100)
self.is_talking_sub = self.node.create_subscription(
std_msgs.msg.Bool, 'is_talking', self._is_talking_callback, 10
)
self.is_talking = False
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
msg = std_msgs.msg.String()
msg.data = self.message
self.say_pub.publish(msg)
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self.is_talking:
return py_trees.common.Status.RUNNING
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
# WRITE CODE HERE
def _is_talking_callback(self, msg):
self.is_talking = msg.data
class Listen(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str,
category: str
):
super(Listen, self).__init__(name=name)
self.category = category
self.blackboard = py_trees.blackboard.Client(name=self.name)
self.blackboard.register_key(
key="recognized_people",
access=py_trees.common.Access.WRITE
)
self.blackboard.register_key(
key="current_person_id",
access=py_trees.common.Access.WRITE
)
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self.time_start = time.time()
self.callback_executed = False
self.dialog_sub = self.node.create_subscription(
lhw_msgs.Response,
'dialogflow_response',
self._answer_callback,
10
)
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("testing -----------------------------------------------------------")
self.logger.debug("%s.update()" % self.__class__.__name__)
# TODO: change to node timer
time_duration_sec = time.gmtime(time.time() - self.time_start).tm_sec
self.node.get_logger().info(str(time_duration_sec))
if self.callback_executed:
return py_trees.common.Status.SUCCESS
if time_duration_sec > 10.0: # If more than 10 sec --> fail
return py_trees.common.Status.FAILURE
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
# Destroy subscription if still exists
if self.dialog_sub:
self.node.destroy_subscription(self.dialog_sub)
# WRITE CODE HERE
def _answer_callback(self, msg):
# self.blackboard.recognized_people = False
print(self.blackboard)
self.logger.debug("aa parametes")
print(msg.parameters)
# self.get_logger().info(msg.parameters)
for i in range(len(msg.parameters)):
# Check if correct category was found
self.logger.debug(msg.parameters[i].key)
print(msg.parameters[i].key)
if msg.parameters[i].key == self.category:
self.callback_executed = True
self.answer = msg.parameters[i].value
self.blackboard.recognized_people[self.blackboard.current_person_id][self.category] \
= self.answer
from socket import timeout
import py_trees
#from .actions import Actions
#from naoqi_bridge_msgs.msg import HeadTouch
#TODO: Tidy this up when finished
from std_msgs.msg import Bool, String, Empty, Int16, Header, Int32
from typing import List
from lhw_interfaces.action import TabletAction
from lhw_interfaces.msg import Parameter
from typing import List
FIA_LOGO_URL = 'assets/fia_logo.png'
......@@ -17,17 +9,17 @@ def while_wait_for_server(behaviour:py_trees.behaviour.Behaviour, node):
while not behaviour.action_client.wait_for_server(timeout_sec=1):
node.log.info(f"waiting for tablet server connection")
""" Move the robot to a specified location
"""
class TabletSpeech(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
def __init__(self, name) -> None:
super(TabletSpeech, self).__init__(name)
self.node = node
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
while_wait_for_server(self, self.node)
......@@ -42,13 +34,17 @@ class TabletSpeech(py_trees.behaviour.Behaviour):
return py_trees.common.Status.SUCCESS
class TabletLoading(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
def __init__(self, name) -> None:
super(TabletLoading, self).__init__(name)
self.node = node
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
while_wait_for_server(self, self.node)
......@@ -62,14 +58,18 @@ class TabletLoading(py_trees.behaviour.Behaviour):
return py_trees.common.Status.SUCCESS
class TabletImage(py_trees.behaviour.Behaviour):
def __init__(self, name, node, url:str) -> None:
def __init__(self, name, url:str) -> None:
super(TabletImage, self).__init__(name)
self.node = node
self.url = url
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
......@@ -87,17 +87,21 @@ class TabletImage(py_trees.behaviour.Behaviour):
class TabletQuestion(py_trees.behaviour.Behaviour):
def __init__(self, name, node, text:str, options:List[str]) -> None:
def __init__(self, name, text:str, options:List[str]) -> None:
super(TabletQuestion, self).__init__(name)
self.node = node
self.text = text
self.options = options
self.chosen_option = ""
self.result = False
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
......@@ -141,13 +145,17 @@ class TabletQuestion(py_trees.behaviour.Behaviour):
class TabletShowFIALogo(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
def __init__(self, name) -> None:
super(TabletShowFIALogo, self).__init__(name)
self.node = node
self.action_client = self.node.tablet_action_client
def setup(self, **kwargs):
pass
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = self.node.tablet_action_client
def initialise(self):
......@@ -161,81 +169,4 @@ class TabletShowFIALogo(py_trees.behaviour.Behaviour):
def update(self):
self.node.log.info(f"UPDATE")
return py_trees.common.Status.SUCCESS
class TabletShowWebpage(py_trees.behaviour.Behaviour):
def __init__(self, name, node, url) -> None:
super(TabletShowWebpage, self).__init__(name)
self.node = node
self.action_client = self.node.tablet_action_client
self.url = url
def setup(self, **kwargs):
pass
def initialise(self):
while_wait_for_server(self, self.node)
goal = TabletAction.Goal()
goal.type = "webpage"
goal.url = self.url
self.future = self.action_client.send_goal_async(goal)
def update(self):
self.node.log.info(f"UPDATE")
return py_trees.common.Status.SUCCESS
#ros2 action send_goal tablet_action_server lhw_interfaces/action/TabletAction "{type: 'input', text: 'Vad heter du?'}"
class TabletInput(py_trees.behaviour.Behaviour):
def __init__(self, name, node, text:str, type:str) -> None:
super(TabletInput, self).__init__(name)
self.node = node
self.text = text
self.result = False
self.tablet_input = ""
self.type = type
self.action_client = self.node.tablet_action_client
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.WRITE)
def setup(self, **kwargs):
pass
def initialise(self):
while_wait_for_server(self, self.node)
goal = TabletAction.Goal()
goal.type = "input"
goal.text = self.text
self.future = self.action_client.send_goal_async(goal)
self.future.add_done_callback(self.goal_response_callback)
def update(self):
if self.result:
param = Parameter()
param.key = self.type
param.value = str(self.tablet_input)
self.blackboard.last_recognized_item = param
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.RUNNING
def terminate(self, new_status):
self.result = False
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
return
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
self.result = True
self.tablet_input = future.result().result.response
\ No newline at end of file
return py_trees.common.Status.SUCCESS
\ No newline at end of file
##############################################################################
# Documentation
##############################################################################
"""
Template behaviour that is used for implementation of new behaviours.
"""
##############################################################################
# Imports
##############################################################################
import py_trees
import lhw_interfaces.msg as lhw_msgs
import lhw_interfaces.srv as lhw_srvs
import lhw_interfaces.actions as lhw_actions
##############################################################################
# Behaviours
##############################################################################
class Template(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(Template, self).__init__(name=name)
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
# WRITE CLIENT SETUP CODE HERE
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
# WRITE CODE HERE
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
# WRITE CODE HERE
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
# WRITE CODE HERE
# ADD PRIVATE FUNCTIONS HERE "_function_name"
\ No newline at end of file
import typing
import py_trees
import rclpy
from lhw_interfaces.action import TabletAction
from naoqi_bridge_msgs.msg import HeadTouch
import typing
import std_msgs
"""
Behaviour to wait until head is touched, and blinking lights while waiting
"""
class WaitForHeadTouch(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str
):
super(WaitForHeadTouch, self).__init__(name=name)
self.blackboard = self.attach_blackboard_client(name = self.name)
self.blackboard.register_key(
key = "scenario_started",
access=py_trees.common.Access.WRITE
)
self.led_is_on = False
def setup(self, **kwargs):
"""
This function is executed when tree is setup.
Args:
**kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# Fetch 'node' object passed down from the tree.
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.led_pub = self.node.create_publisher(std_msgs.msg.Bool, "turn_on_off", 10)
self.tablet_action_client = rclpy.action.ActionClient(
node=self.node,
action_type=TabletAction,
action_name="tablet_action_server"
)
if not self.tablet_action_client.wait_for_server(timeout_sec=3.0):
raise RuntimeError("client timed out waiting for server tablet_action_server")
self.logger.debug("Action client successfully created")
def initialise(self):
"""
This function is first to be executed when behaviour is runned.
Creates a timer that toggles all LED:s every second.
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self.timer = self.node.create_timer(1.0, self._toggle_leds_callback)
self.head_touch_sub = self.node.create_subscription(HeadTouch, 'head_touch', self._head_touch_callback, 10)
goal = TabletAction.Goal()
goal.type = "text"
goal.text = "Slap my head, boi"
self.send_goal_future = self.tablet_action_client.send_goal_async(
goal,
feedback_callback=None
)
self.logger.debug("Action goal sent")
def update(self) -> py_trees.common.Status:
"""
This function is second to be executed right after initialise
when behaviour is runned. The execution will continue as long as
return flag is RUNNING and will be stopped when return flag is
SUCCESS or FAILURE.
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if self.blackboard.scenario_started:
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
def terminate(self, new_status: py_trees.common.Status):
"""
This function is executed when behaviour is terminated.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
self.timer.destroy()
self.node.destroy_subscription(self.head_touch_sub)
if not self.led_is_on:
msg = std_msgs.msg.Bool()
msg.data = True
self.led_pub.publish(msg)
goal = TabletAction.Goal()
goal.type = "text"
goal.text = "Scenario Started"
self.send_goal_future = self.tablet_action_client.send_goal_async(
goal,
feedback_callback=None
)
def shutdown(self):
"""
This function is executed when the tree is destroyed. Destroy the publisher created in setup.
"""
goal = TabletAction.Goal()
goal.type = "image"
goal.url = "assets/fia_logo.png"
self.send_goal_future = self.tablet_action_client.send_goal_async(
goal,
feedback_callback=None
)
self.led_pub.destroy()
self.tablet_action_client.destroy()
def _toggle_leds_callback(self):
msg = std_msgs.msg.Bool()
msg.data = not self.led_is_on
self.led_is_on = not self.led_is_on
self.led_pub.publish(msg)
def _head_touch_callback(self, msg):
self.blackboard.scenario_started = True
import py_trees
from .behaviour_copy import Behaviour #TODO: Move behaviours to here
#TODO: Tidy this up when finished
from std_msgs.msg import Bool, String, Empty, Int16
from lhw_interfaces.msg import Entities, Response, Entity, Result
from geometry_msgs.msg import Twist, Pose, PoseStamped
import numpy as np
# class Foo(py_trees.behaviour.Behaviour):
# def __init__(self, name):
# """
# Minimal one-time initialisation. A good rule of thumb is
# to only include the initialisation relevant for being able
# to insert this behaviour in a tree for offline rendering to
# dot graphs.
# Other one-time initialisation requirements should be met via
# the setup() method.
# """
# super(Foo, self).__init__(name)
# def setup(self):
# """
# When is this called?
# This function should be either manually called by your program
# to setup this behaviour alone, or more commonly, via
# :meth:`~py_trees.behaviour.Behaviour.setup_with_descendants`
# or :meth:`~py_trees.trees.BehaviourTree.setup`, both of which
# will iterate over this behaviour, it's children (it's children's
# children ...) calling :meth:`~py_trees.behaviour.Behaviour.setup`
# on each in turn.
# If you have vital initialisation necessary to the success
# execution of your behaviour, put a guard in your
# :meth:`~py_trees.behaviour.Behaviour.initialise` method
# to protect against entry without having been setup.
# What to do here?
# Delayed one-time initialisation that would otherwise interfere
# with offline rendering of this behaviour in a tree to dot graph
# or validation of the behaviour's configuration.
# Good examples include:
# - Hardware or driver initialisation
# - Middleware initialisation (e.g. ROS pubs/subs/services)
# - A parallel checking for a valid policy configuration after
# children have been added or removed
# """
# self.logger.debug(" %s [Foo::setup()]" % self.name)
# def initialise(self):
# """
# When is this called?
# The first time your behaviour is ticked and anytime the
# status is not RUNNING thereafter.
# What to do here?
# Any initialisation you need before putting your behaviour
# to work.
# """
# self.logger.debug(" %s [Foo::initialise()]" % self.name)
# def update(self):
# """
# When is this called?
# Every time your behaviour is ticked.
# What to do here?
# - Triggering, checking, monitoring. Anything...but do not block!
# - Set a feedback message
# - return a py_trees.common.Status.[RUNNING, SUCCESS, FAILURE]
# """
# self.logger.debug(" %s [Foo::update()]" % self.name)
# ready_to_make_a_decision = random.choice([True, False])
# decision = random.choice([True, False])
# if not ready_to_make_a_decision:
# return py_trees.common.Status.RUNNING
# elif decision:
# self.feedback_message = "We are not bar!"
# return py_trees.common.Status.SUCCESS
# else:
# self.feedback_message = "Uh oh"
# return py_trees.common.Status.FAILURE
# def terminate(self, new_status):
# """
# When is this called?
# Whenever your behaviour switches to a non-running state.
# - SUCCESS || FAILURE : your behaviour's work cycle has finished
# - INVALID : a higher priority branch has interrupted, or shutting down
# """
# self.logger.debug(" %s [Foo::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
""" Rotates the robot counter (?) clockwise.
"""
class RotateRobot(py_trees.behaviour.Behaviour):
def __init__(self, name, node):
super(RotateRobot, self).__init__(name)
self.node = node
def setup(self, **kwargs):
pass
def update(self):
# self.node.log.info("Rotate robot")
# msg = Twist()
# msg.angular.z = -1.0
# self.node.cmd_vel_publisher.publish(msg)
return py_trees.common.Status.SUCCESS
def terminate(self, new_status):
pass
\ No newline at end of file
#!/usr/bin/env python
#
# License: BSD
# https://raw.githubusercontent.com/splintered-reality/py_trees/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
.. argparse::
:module: py_trees.demos.action
:func: command_line_argument_parser
:prog: py-trees-demo-action-behaviour
.. image:: images/action.gif
"""
##############################################################################
# Imports
##############################################################################
import py_trees
import argparse
import atexit
import multiprocessing
import py_trees.common
import time
import py_trees.console as console
##############################################################################
# Classes
##############################################################################
def description():
content = "Demonstrates the characteristics of a typical 'action' behaviour.\n"
content += "\n"
content += "* Mocks an external process and connects to it in the setup() method\n"
content += "* Kickstarts new goals with the external process in the initialise() method\n"
content += "* Monitors the ongoing goal status in the update() method\n"
content += "* Determines RUNNING/SUCCESS pending feedback from the external process\n"
if py_trees.console.has_colours:
banner_line = console.green + "*" * 79 + "\n" + console.reset
s = "\n"
s += banner_line
s += console.bold_white + "Action Behaviour".center(79) + "\n" + console.reset
s += banner_line
s += "\n"
s += content
s += "\n"
s += banner_line
else:
s = content
return s
def epilog():
if py_trees.console.has_colours:
return console.cyan + "And his noodly appendage reached forth to tickle the blessed...\n" + console.reset
else:
return None
def command_line_argument_parser():
return argparse.ArgumentParser(description=description(),
epilog=epilog(),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
def planning(pipe_connection):
"""
Emulates an external process which might accept long running planning jobs.
"""
idle = True
percentage_complete = 0
try:
while(True):
if pipe_connection.poll():
pipe_connection.recv()
percentage_complete = 0
idle = False
if not idle:
percentage_complete += 10
pipe_connection.send([percentage_complete])
if percentage_complete == 100:
idle = True
time.sleep(0.5)
except KeyboardInterrupt:
pass
class Action(py_trees.behaviour.Behaviour):
"""
Connects to a subprocess to initiate a goal, and monitors the progress
of that goal at each tick until the goal is completed, at which time
the behaviour itself returns with success or failure (depending on
success or failure of the goal itself).
This is typical of a behaviour that is connected to an external process
responsible for driving hardware, conducting a plan, or a long running
processing pipeline (e.g. planning/vision).
Key point - this behaviour itself should not be doing any work!
"""
def __init__(self, name="Action"):
"""
Default construction.
"""
super(Action, self).__init__(name)
self.logger.debug("%s.__init__()" % (self.__class__.__name__))
def setup(self):
"""
No delayed initialisation required for this example.
"""
self.logger.debug("%s.setup()->connections to an external process" % (self.__class__.__name__))
self.parent_connection, self.child_connection = multiprocessing.Pipe()
self.planning = multiprocessing.Process(target=planning, args=(self.child_connection,))
atexit.register(self.planning.terminate)
self.planning.start()
def initialise(self):
"""
Reset a counter variable.
"""
self.logger.debug("%s.initialise()->sending new goal" % (self.__class__.__name__))
self.parent_connection.send(['new goal'])
self.percentage_completion = 0
def update(self):
"""
Increment the counter and decide upon a new status result for the behaviour.
"""
new_status = py_trees.common.Status.RUNNING
if self.parent_connection.poll():
self.percentage_completion = self.parent_connection.recv().pop()
if self.percentage_completion == 100:
new_status = py_trees.common.Status.SUCCESS
if new_status == py_trees.common.Status.SUCCESS:
self.feedback_message = "Processing finished"
self.logger.debug("%s.update()[%s->%s][%s]" % (self.__class__.__name__, self.status, new_status, self.feedback_message))
else:
self.feedback_message = "{0}%".format(self.percentage_completion)
self.logger.debug("%s.update()[%s][%s]" % (self.__class__.__name__, self.status, self.feedback_message))
return new_status
def terminate(self, new_status):
"""
Nothing to clean up in this example.
"""
self.logger.debug("%s.terminate()[%s->%s]" % (self.__class__.__name__, self.status, new_status))
##############################################################################
# Main
##############################################################################
def main():
"""
Entry point for the demo script.
"""
command_line_argument_parser().parse_args()
print(description())
py_trees.logging.level = py_trees.logging.Level.DEBUG
action = Action()
action.setup()
try:
for unused_i in range(0, 12):
action.tick_once()
time.sleep(0.5)
print("\n")
except KeyboardInterrupt:
pass
\ No newline at end of file
import py_trees
from .behaviour_copy import Behaviour #TODO: Move behaviours to here
#TODO: Tidy this up when finished
from std_msgs.msg import Bool, String, Empty, Int16
from lhw_interfaces.msg import Entities, Response, Entity, Result
from geometry_msgs.msg import Twist, Pose, PoseStamped
import numpy as np
""" Standby """
class Standby (py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(Standby, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.logger.debug(" %s [Standby::setup()]" % self.name)
def initialise(self):
# todo: make pepper do the standby pose
self.logger.debug(" %s [Standby::initialise()]" % self.name)
def update(self):
self.logger.debug(" %s [Standby::update()]" % self.name)
return py_trees.common.Status.RUNNING
def terminate(self, new_status):
self.logger.debug(" %s [Standby::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
""" Is Head Touched """
class IsHeadTouched (py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(Standby, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.logger.debug(" %s [IsHeadTouched::setup()]" % self.name)
def initialise(self):
# todo: make pepper do the standby pose
self.logger.debug(" %s [IsHeadTouched::initialise()]" % self.name)
def update(self):
self.logger.debug(" %s [IsHeadTouched::update()]" % self.name)
if (head_touched):
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.FAILURE
def terminate(self, new_status):
self.logger.debug(" %s [IsHeadTouched::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
\ No newline at end of file
import py_trees
import py_trees_ros
import random
class ExampleBehaviour(py_trees.behaviour.Behaviour):
def __init__(self, name):
"""
Minimal one-time initialisation. A good rule of thumb is
to only include the initialisation relevant for being able
to insert this behaviour in a tree for offline rendering to
dot graphs.
Other one-time initialisation requirements should be met via
the setup() method.
"""
super(ExampleBehaviour, self).__init__(name)
def setup(self):
"""
When is this called?
This function should be either manually called by your program
to setup this behaviour alone, or more commonly, via
:meth:`~py_trees.behaviour.Behaviour.setup_with_descendants`
or :meth:`~py_trees.trees.BehaviourTree.setup`, both of which
will iterate over this behaviour, it's children (it's children's
children ...) calling :meth:`~py_trees.behaviour.Behaviour.setup`
on each in turn.
If you have vital initialisation necessary to the success
execution of your behaviour, put a guard in your
:meth:`~py_trees.behaviour.Behaviour.initialise` method
to protect against entry without having been setup.
What to do here?
Delayed one-time initialisation that would otherwise interfere
with offline rendering of this behaviour in a tree to dot graph
or validation of the behaviour's configuration.
Good examples include:
- Hardware or driver initialisation
- Middleware initialisation (e.g. ROS pubs/subs/services)
- A parallel checking for a valid policy configuration after
children have been added or removed
"""
self.logger.debug(" %s [ExampleBehaviour::setup()]" % self.name)
def initialise(self):
"""
When is this called?
The first time your behaviour is ticked and anytime the
status is not RUNNING thereafter.
What to do here?
Any initialisation you need before putting your behaviour
to work.
"""
self.logger.debug(" %s [ExampleBehaviour::initialise()]" % self.name)
def update(self):
"""
When is this called?
Every time your behaviour is ticked.
What to do here?
- Triggering, checking, monitoring. Anything...but do not block!
- Set a feedback message
- return a py_trees.common.Status.[RUNNING, SUCCESS, FAILURE]
"""
self.logger.debug(" %s [ExampleBehaviour::update()]" % self.name)
ready_to_make_a_decision = random.choice([True, False])
decision = random.choice([True, False])
if not ready_to_make_a_decision:
return py_trees.common.Status.RUNNING
elif decision:
self.feedback_message = "We are not bar!"
return py_trees.common.Status.SUCCESS
else:
self.feedback_message = "Uh oh"
return py_trees.common.Status.FAILURE
def terminate(self, new_status):
"""
When is this called?
Whenever your behaviour switches to a non-running state.
- SUCCESS || FAILURE : your behaviour's work cycle has finished
- INVALID : a higher priority branch has interrupted, or shutting down
"""
self.logger.debug(" %s [ExampleBehaviour::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
\ No newline at end of file
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from lhw_interfaces.msg import Response, Event
'''
OBS! This node is only for testing.
'''
class Tester(Node):
def __init__(self):
super().__init__('DFNOde')
self.publisher_ = self.create_publisher(Response, 'dialogflow_response', 10)
response_sub = self.create_subscription(Event, 'nlp_event', self.event_callback, 1)
def send_dialogflow_msg(self, response, recognized_text = ""):
msg = Response()
msg.source = Response.DIALOGFLOW
msg.recognized_text = recognized_text
msg.response = response
msg.confidence = 0.0
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg)
def event_callback(self, data):
self.get_logger().info('Received: "%s"' % data)
self.get_logger().info('Sending event')
self.get_logger().info('Receiving response from dialogflow')
self.send_dialogflow_msg("Thank you for your event")
def main(args=None):
rclpy.init(args=args)
ros_node = Tester()
rclpy.spin(ros_node)
rclpy.shutdown()
if __name__ == '__main__':
main()
\ No newline at end of file
This diff is collapsed.