Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • liuhomewreckers/liu-home-wreckers
  • tdde19-2021-5/tdde-19-group-5-pepper
2 results
Show changes
Commits on Source (7)
Showing
with 178 additions and 2110 deletions
...@@ -12,6 +12,7 @@ services: ...@@ -12,6 +12,7 @@ services:
image: lhw image: lhw
volumes: volumes:
- /tmp/.X11-unix:/tmp/.X11-unix - /tmp/.X11-unix:/tmp/.X11-unix
- ~/.Xauthority:/root/.Xauthority
- ./src/lhw_interfaces:/workspace/liu-home-wreckers/src/lhw_interfaces - ./src/lhw_interfaces:/workspace/liu-home-wreckers/src/lhw_interfaces
- ./waiter:/workspace/liu-home-wreckers/waiter - ./waiter:/workspace/liu-home-wreckers/waiter
- ./build:/workspace/liu-home-wreckers/build - ./build:/workspace/liu-home-wreckers/build
......
#!/usr/bin/python3
import sys
import subprocess
import time
import datetime
import json
GET_PID_STR = "docker exec {} zsh -c \"ps\" | grep -v 'ros2\|zsh\|ps\|PID' | awk '{{print $1}}'"
class Command:
def __init__(self, container, val, timeout, verbose):
self.container = container
self.val = val
self.timeout = timeout
self.verbose = verbose
class Launch:
def __init__(self, json_preset):
self.containers = [container for container in json_preset["containers"]]
self.commands = []
for command in json_preset["commands"]:
container = command["container"]
val = command["command"]
timeout = command.get("timeout", 3)
verbose = command.get("verbose", False)
self.commands.append(Command(container, val, timeout, verbose))
if len(self.commands) > 0:
self.commands[len(self.commands) - 1].timeout = 0
def run(self):
created = []
try:
for container in self.containers:
name = container + "_launch_" + str(datetime.datetime.now()).replace(" ", "_").replace(":", ".")
if subprocess.run(["docker-compose run -d --name {} --rm {} zsh".format(name, container)], shell=True).returncode != 0:
print("Failed to start " + container + ", aborting")
break
created.append(name)
print("Starting: " + container)
children = []
if len(created) == len(self.containers):
for command in self.commands:
name = created[self.containers.index(command.container)]
output = None if command.verbose else subprocess.DEVNULL
print("Running: " + command.val)
children.append(subprocess.Popen(
["/usr/bin/docker", "exec", name, "zsh", "-c", "source /etc/zsh/zshrc && " + command.val],
stdout=output,
stderr=output
))
time.sleep(command.timeout)
for child in children:
child.wait()
except KeyboardInterrupt:
for container in reversed(created):
res = subprocess.run([GET_PID_STR.format(container)], shell=True, stdout = subprocess.PIPE)
for pid in reversed(res.stdout.decode("utf-8").split("\n")):
if pid == '':
continue
try:
subprocess.run(["docker exec {} zsh -c \"kill -2 {}\"".format(container, pid)], shell=True, timeout=5)
except TimeoutError:
pass
print("Exited")
for container in created:
subprocess.run(["docker kill {} && docker container rm {}".format(container, container)], shell=True)
def main():
if "--help" in sys.argv:
print("Usage: launch.py <PRESET_NAME>")
print("Available presets in presets.json")
return
with open("presets.json", "r") as file:
data = file.read()
json_data = json.JSONDecoder().decode(data)
if len(sys.argv) < 2:
print("No preset name given")
return
name = sys.argv[1]
if not name in json_data:
print("Invalid preset name")
return
preset = Launch(json_data[name])
preset.run()
if __name__ == "__main__":
main()
{
"head_touched" : {
"containers" : ["lhw_qi", "lhw_tablet", "lhw_intelligence"],
"commands" : [
{
"container" : "lhw_qi",
"command" : "ros2 launch naoqi_driver naoqi_driver.launch.py nao_ip:=$PEPPER_IP",
"timeout" : 8,
"verbose" : true
},
{
"container" : "lhw_qi",
"command" : "ros2 run lhw_qi tablet -i $PEPPER_IP"
},
{
"container" : "lhw_qi",
"command" : "ros2 run lhw_qi leds -i $PEPPER_IP"
},
{
"container" : "lhw_tablet",
"command" : "ros2 run lhw_tablet action_server",
"verbose" : false
},
{
"container" : "lhw_intelligence",
"command" : "ros2 run lhw_intelligence head_touch_tree",
"verbose" : true
}
]
},
"greet_person" : {
"containers" : ["lhw_qi", "lhw_nlp", "lhw_intelligence"],
"commands" : [
{
"container" : "lhw_qi",
"command" : "ros2 run lhw_qi microphone -i $PEPPER_IP",
"verbose" : true
},
{
"container" : "lhw_qi",
"command" : "ros2 run lhw_qi animated_speech -i $PEPPER_IP",
"verbose" : true
},
{
"container" : "lhw_nlp",
"command" : "ros2 run lhw_nlp dialogflow",
"verbose" : true
},
{
"container" : "lhw_intelligence",
"command" : "ros2 run lhw_intelligence greet_person_tree",
"verbose" : true
}
]
}
}
\ No newline at end of file
...@@ -24,6 +24,4 @@ if [ $ENV_LAUNCH ]; then ...@@ -24,6 +24,4 @@ if [ $ENV_LAUNCH ]; then
echo -e "${GREEN}Intelligence started${NC}" echo -e "${GREEN}Intelligence started${NC}"
fi fi
export AMENT_PREFIX_PATH=$AMENT_PREFIX_PATH:/workspace/liu-home-wreckers/install/lhw_intelligence
ldconfig ldconfig
"""
Function and helper functions for performing a Pepper demonstration
where it guesses a movie from the top 100 IMDB movies based on the given answers from the user.
Pepper will ask for:
- last name of the director of the movie (input manually on tablet, rest is verbal)
- one of the top 2 starring actors
- release year
- a genre
- "ploy" question what the movie is about (which is just for show and has no real implementation).
Author: Joline Hellström, Sebastian Olsson
Date: 16/8/2022
"""
import os
import py_trees
import json
from std_msgs.msg import String
from .tablet_behaviours import TabletQuestion, TabletInput
from .py_tree_behaviours import Say, SayDynamic
def guess_movie_demo(node):
"""Run the 'guess a movie' demo
How to run:
Container 1 (If Pepper is not connected to the FIA lab wifi, change 10.133.5.240 to whatever Axel Wennström says):
- docker-compose run lhw_qi zsh
- export PEPPER_IP=10.133.5.240
- ros2 run lhw_qi microphone -i $PEPPER_IP
- ros2 run lhw_qi animated_speech -i $PEPPER_IP
- ros2 run lhw_qi tablet -i $PEPPER_IP
Container 2:
- docker-compose run lhw_tablet zsh
- ros2 run lhw_tablet action_server
Container 3:
- docker-compose run lhw_nlp zsh
- ros2 run lhw_nlp dialogflow
IMPORTANT: Please stop this node if not running the demo actively, otherwise the quota can be depleted and it will no longer work
Container 4 (This is what starts the entire demo, everything else must be started beforehand):
- docker-compose run lhw_intelligence zsh
- ros2 run lhw_intelligence py_tree
Warning: If Pepper is not in the FIA lab and on the eduroam wifi; Changes WILL need to be made to the hardcoded IP variables for the tablet
Please contact Axel Wennström on slack in this case since he knows how the tablet package works.
"""
ask_pick_movie = Say(name="Ask To Pick Movie", node=node, message="Please choose a movie from the top 100 movies, when done press the button on my tablet")
wait_for_choice = TabletQuestion(name="Wait for Choice Button", node=node, text="Press button to continue", options=["this is the button"])
director_input = TabletInput(name="Director Tablet Input", node=node, text="Please input the movie director's last name", type="director")
ask_director = DialogflowListener(name="Ask for Director", node=node)
ask_for_actor = Say(name="Ask for First actor", node=node, message="Name one of the two top starring actors in the movie")
ask_actors = DialogflowListener(name="Ask for Actors", node=node)
say_release_year_question = Say("Release year question", node, message="What year was the movie released?")
ask_release_year= DialogflowListener(name="Ask for Release Year", node=node)
say_genre_question = Say("Genre question", node, message="Name one of the movie's genres")
ask_genre = DialogflowListener(name="Ask for Genre", node=node)
ask_what_movie_is_about = Say(name="What is Movie About", node=node, message="What is the movie about? When done answering, or if you don't know, please press the button once again and I will make my guess")
wait_for_guess = TabletQuestion(name="Wait for allow guess", node=node, text="Press button to continue", options=["this is the button"])
guess_movie = GuessMovie("Guess the movie", node)
say_movie = SayDynamic("Say the movie", node)
restart_question = TabletQuestion("Restart?", node, "Press the button to guess another movie", ["I am the button"])
return py_trees.composites.Sequence(
name="Guess The Movie Seq",
children=[
ask_pick_movie,
wait_for_choice,
director_input,
ask_director,
ask_for_actor,
ask_actors,
say_release_year_question,
ask_release_year,
say_genre_question,
ask_genre,
ask_what_movie_is_about,
wait_for_guess,
guess_movie,
say_movie,
restart_question
]
)
class DialogflowListener(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(DialogflowListener, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="movie_answers", access=py_trees.common.Access.WRITE)
self.say_pub = self.node.say_publisher
def initialise(self):
file_name = "./src/lhw_intelligence/lhw_intelligence/movies.json"
self.movies = json.load(open(file_name))
def update(self):
# Get the closest person to Pepper
param = self.blackboard.last_recognized_item
print(param)
if param:
recieved_value = param.value
found_val = False
if param.key != "director":
for val in self.movies.values():
if param.key == "release_year":
if "19" + str(int(float(param.value))) == val[param.key]:
recieved_value = "19" + str(int(float(param.value)))
found_val = True
break
elif str(int(float(param.value))) == val[param.key]:
found_val = True
break
else:
print(param.value)
for name in val[param.key]:
if param.value.lower() in name.lower():
found_val = True
break
if not found_val:
msg = String()
msg.data = "I'm sorry, I didn't quite catch that. Please try say again"
self.say_pub.publish(msg)
self.blackboard.last_recognized_item = None
return py_trees.common.Status.RUNNING
self.blackboard.movie_answers[param.key] = recieved_value
self.blackboard.last_recognized_item = None
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
class GuessMovie(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(GuessMovie, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="movie_answers", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="say_dynamic_str", access=py_trees.common.Access.WRITE)
def initialise(self):
file_name = "./src/lhw_intelligence/lhw_intelligence/movies.json"
movies = json.load(open(file_name))
movie_points = {}
ACTOR_SCORE = 1
DIRECTOR_SCORE = 1
RELEASE_YEAR_SCORE = 2
GENRE_SCORE = 0.5
for movie in movies:
movie_points[movie] = 0
for key, value in self.blackboard.movie_answers.items():
if key == "actors":
#for actor in value:
if value.lower() in map(lambda x : x.lower(), movies[movie][key]):
movie_points[movie] += ACTOR_SCORE
else:
for name in movies[movie][key]:
if value.lower() in name.lower():
movie_points[movie] += ACTOR_SCORE * 0.5
elif key == "genres":
#for genre in value:
if value.lower() in list(map(lambda x : x.lower(), movies[movie][key])):
movie_points[movie] += GENRE_SCORE
elif key == "release_year":
if str(int(float(value))) == movies[movie][key]:
movie_points[movie] += RELEASE_YEAR_SCORE
elif key == "director":
# Check for last name
if value.lower() in movies[movie][key].lower():
movie_points[movie] += DIRECTOR_SCORE
print(self.blackboard.movie_answers)
print(movie_points)
most_likely_movie_name = None # TODO: Make this prettier :) #max(movie_points, key=lambda x : x.values())
most_likely_movie_points = 0
for key, value in movie_points.items():
if value > most_likely_movie_points:
most_likely_movie_name = key
most_likely_movie_points = value
print(f"Picked movie: {most_likely_movie_name} with {most_likely_movie_points} points")
print("-" * 20)
self.blackboard.say_dynamic_str = most_likely_movie_name
def update(self):
return py_trees.common.Status.SUCCESS
\ No newline at end of file
import typing
import action_msgs.msg as action_msgs # GoalStatus
import py_trees
import rclpy.action
from py_trees_ros import exceptions
##############################################################################
# Behaviours
##############################################################################
class FromBlackboard(py_trees.behaviour.Behaviour):
"""
An action client interface that draws goals from the blackboard. The
lifecycle of this behaviour works as follows:
* :meth:`initialise`: check blackboard for a goal and send
* :meth:`update`: if a goal was sent, monitor progress
* :meth:`terminate`: if interrupted while running, send a cancel request
As a consequence, the status of this behaviour can be interpreted as follows:
* :data:`~py_trees.common.Status.FAILURE`: no goal was found to send,
it was rejected or it failed while executing
* :data:`~py_trees.common.Status.RUNNING`: a goal was sent and is still
executing on the server
* :data:`~py_trees.common.Status.SUCCESS`: sent goal has completed with success
To block on the arrival of a goal on the blackboard, use with the
:class:`py_trees.behaviours.WaitForBlackboardVariable` behaviour. e.g.
.. code-block:: python
sequence = py_trees.composites.Sequence(name="Sequence")
wait_for_goal = py_trees.behaviours.WaitForBlackboardVariable(
name="WaitForGoal",
variable_name="/my_goal"
)
action_client = py_trees_ros.aciton_clients.FromBlackboard(
action_type=py_trees_actions.Dock,
action_name="dock",
name="ActionClient"
)
sequence.add_children([wait_for_goal, action_client])
To customise a more interesting feedback message, pass in a method to the
constructor, for example:
.. code-block:: python
action_client = py_trees_ros.action_clients.FromBlackboard(
action_type=py_trees_actions.Dock,
action_name="dock",
name="ActionClient",
generate_message=lambda msg: "{:.2f}%%".format(msg.feedback.percentage_completed)
)
Args:
action_type: spec type for the action (e.g. move_base_msgs.action.MoveBase)
action_name: where you can find the action topics & services (e.g. "bob/move_base")
key: name of the key on the blackboard
name: name of the behaviour (default: lowercase class name)
generate_feedback_message: formatter for feedback messages, takes action_type.Feedback
messages and returns strings (default: None)
wait_for_server_timeout_sec: use negative values for a blocking but periodic check (default: -3.0)
.. note::
The default setting for timeouts (a negative value) will suit
most use cases. With this setting the behaviour will periodically check and
issue a warning if the server can't be found. Actually aborting the setup can
usually be left up to the behaviour tree manager.
"""
def __init__(self,
node,
action_type: typing.Any,
action_name: str,
key: str,
name: str=py_trees.common.Name.AUTO_GENERATED,
generate_feedback_message: typing.Callable[[typing.Any], str]=None,
wait_for_server_timeout_sec: float=-3.0
):
super().__init__(name)
self.action_type = action_type
self.action_name = action_name
self.wait_for_server_timeout_sec = wait_for_server_timeout_sec
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(
key="goal",
access=py_trees.common.Access.READ, # make sure to namespace it if not already
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", key)
)
self.blackboard.register_key(
key="result",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/%s_result", key)
)
self.blackboard.register_key(
key="feedback",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/%s_feedback", key)
)
self.generate_feedback_message = generate_feedback_message
self.node = node
self.action_client = None
self.status_strings = {
action_msgs.GoalStatus.STATUS_UNKNOWN : "STATUS_UNKNOWN", # noqa
action_msgs.GoalStatus.STATUS_ACCEPTED : "STATUS_ACCEPTED", # noqa
action_msgs.GoalStatus.STATUS_EXECUTING: "STATUS_EXECUTING", # noqa
action_msgs.GoalStatus.STATUS_CANCELING: "STATUS_CANCELING", # noqa
action_msgs.GoalStatus.STATUS_SUCCEEDED: "STATUS_SUCCEEDED", # noqa
action_msgs.GoalStatus.STATUS_CANCELED : "STATUS_CANCELED", # noqa
action_msgs.GoalStatus.STATUS_ABORTED : "STATUS_ABORTED" # noqa
}
def setup(self, **kwargs):
"""
Setup the action client services and subscribers.
Args:
**kwargs (:obj:`dict`): distribute arguments to this
behaviour and in turn, all of it's children
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
:class:`~py_trees_ros.exceptions.TimedOutError`: if the action server could not be found
"""
self.logger.debug("{}.setup()".format(self.qualified_name))
# try:
# self.node = kwargs['node']
# except KeyError as e:
# error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
# raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = rclpy.action.ActionClient(
node=self.node,
action_type=self.action_type,
action_name=self.action_name
)
result = None
if self.wait_for_server_timeout_sec > 0.0:
result = self.action_client.wait_for_server(timeout_sec=self.wait_for_server_timeout_sec)
else:
iterations = 0
period_sec = -1.0*self.wait_for_server_timeout_sec
while not result:
iterations += 1
result = self.action_client.wait_for_server(timeout_sec=period_sec)
if not result:
self.node.get_logger().warning(
"waiting for action server ... [{}s][{}][{}]".format(
iterations * period_sec,
self.action_name,
self.qualified_name
)
)
if not result:
self.feedback_message = "timed out waiting for the server [{}]".format(self.action_name)
self.node.get_logger().error("{}[{}]".format(self.feedback_message, self.qualified_name))
raise exceptions.TimedOutError(self.feedback_message)
else:
self.feedback_message = "... connected to action server [{}]".format(self.action_name)
self.node.get_logger().info("{}[{}]".format(self.feedback_message, self.qualified_name))
def initialise(self):
"""
Reset the internal variables and kick off a new goal request.
"""
self.logger.debug("{}.initialise()".format(self.qualified_name))
# initialise some temporary variables
self.goal_handle = None
self.send_goal_future = None
self.get_result_future = None
self.result_message = None
self.result_status = None
self.result_status_string = None
self.blackboard.feedback = None
self.blackboard.result = None
try:
self.send_goal_request(self.blackboard.goal)
self.feedback_message = "sent goal request"
except KeyError:
pass # self.send_goal_future will be None, check on that
def update(self):
"""
Check only to see whether the underlying action server has
succeeded, is running, or has cancelled/aborted for some reason and
map these to the usual behaviour return states.
Returns:
:class:`py_trees.common.Status`
"""
self.logger.debug("{}.update()".format(self.qualified_name))
if self.send_goal_future is None:
self.feedback_message = "no goal to send"
return py_trees.common.Status.FAILURE
if self.goal_handle is not None and not self.goal_handle.accepted:
# goal was rejected
self.feedback_message = "goal rejected"
return py_trees.common.Status.FAILURE
if self.result_status is None:
return py_trees.common.Status.RUNNING
elif not self.get_result_future.done():
# should never get here
self.node.get_logger().warn("got result, but future not yet done [{}]".format(self.qualified_name))
return py_trees.common.Status.RUNNING
else:
self.node.get_logger().debug("goal result [{}]".format(self.qualified_name))
self.node.get_logger().debug(" status: {}".format(self.result_status_string))
self.node.get_logger().debug(" message: {}".format(self.result_message))
if self.result_status == action_msgs.GoalStatus.STATUS_SUCCEEDED: # noqa
self.feedback_message = "successfully completed"
return py_trees.common.Status.SUCCESS
else:
self.feedback_message = "failed"
return py_trees.common.Status.FAILURE
def terminate(self, new_status: py_trees.common.Status):
"""
If running and the current goal has not already succeeded, cancel it.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug(
"{}.terminate({})".format(
self.qualified_name,
"{}->{}".format(self.status, new_status) if self.status != new_status else "{}".format(new_status)
)
)
if (
self.status == py_trees.common.Status.RUNNING and
new_status == py_trees.common.Status.INVALID
):
self.send_cancel_request()
def shutdown(self):
"""
Clean up the action client when shutting down.
"""
self.action_client.destroy()
########################################
# Action Client Methods
########################################
def feedback_callback(self, msg: typing.Any):
"""
Default generator for feedback messages from the action server. This will
update the behaviour's feedback message with a stringified version of the
incoming feedback message.
Args:
msg: incoming feedback message (e.g. move_base_msgs.action.MoveBaseFeedback)
"""
self.blackboard.feedback = msg
if self.generate_feedback_message is not None:
self.feedback_message = "feedback: {}".format(self.generate_feedback_message(msg))
self.node.get_logger().debug(
'{} [{}]'.format(
self.feedback_message,
self.qualified_name
)
)
def send_goal_request(self, goal: typing.Any):
"""
Send the goal, get a future back and start lining up the
chain of callbacks that will lead to a result.
"""
self.feedback_message = "sending goal ..."
self.node.get_logger().debug("{} [{}]".format(
self.feedback_message,
self.qualified_name
))
self.send_goal_future = self.action_client.send_goal_async(
goal,
feedback_callback=self.feedback_callback,
# A random uuid is always generated, since we're not sending more than one
# at a time, we don't need to generate and track them here
# goal_uuid=unique_identifier_msgs.UUID(uuid=list(uuid.uuid4().bytes))
)
self.send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future: rclpy.task.Future):
"""
Handle goal response, proceed to listen for the result if accepted.
Args:
future: incoming goal request result delivered from the action server
"""
if future.result() is None:
self.feedback_message = "goal request failed :[ [{}]\n{!r}".format(self.qualified_name, future.exception())
self.node.get_logger().debug('... {}'.format(self.feedback_message))
return
self.goal_handle = future.result()
if not self.goal_handle.accepted:
self.feedback_message = "goal rejected :( [{}]".format(self.qualified_name)
self.node.get_logger().debug('... {}'.format(self.feedback_message))
return
else:
self.feedback_message = "goal accepted :) [{}]".format(self.qualified_name)
self.node.get_logger().debug("... {}".format(self.feedback_message))
self.node.get_logger().debug(" {!s}".format(future.result()))
self.get_result_future = self.goal_handle.get_result_async()
self.get_result_future.add_done_callback(self.get_result_callback)
def send_cancel_request(self):
"""
Send a cancel request to the server. This is triggered when the
behaviour's status switches from :attr:`~py_trees.common.Status.RUNNING` to
:attr:`~py_trees.common.Status.INVALID` (typically a result of a priority
interrupt).
"""
self.feedback_message = "cancelling goal ... [{}]".format(self.qualified_name)
self.node.get_logger().debug(self.feedback_message)
if self.goal_handle is not None:
future = self.goal_handle.cancel_goal_async()
future.add_done_callback(self.cancel_response_callback)
def cancel_response_callback(self, future: rclpy.task.Future):
"""
Immediate callback for the result of a cancel request. This will
set the behaviour's feedback message accordingly.
Args:
future: incoming cancellation result delivered from the action server
"""
cancel_response = future.result()
if len(cancel_response.goals_canceling) > 0:
self.feedback_message = "goal successfully cancelled [{}]".format(self.qualified_name)
else:
self.feedback_message = "goal failed to cancel [{}]".format(self.qualified_name)
self.node.get_logger().debug('... {}'.format(self.feedback_message))
def get_result_callback(self, future: rclpy.task.Future):
"""
Immediate callback for the result, saves data into local variables so that
the update method can react accordingly.
Args:
future: incoming goal result delivered from the action server
"""
self.blackboard.result = future.result()
self.result_message = future.result()
self.result_status = future.result().status
self.result_status_string = self.status_strings[self.result_status]
\ No newline at end of file
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from geometry_msgs.msg import Twist
from lhw_interfaces.action import RotateRobot
class RotateRobotActionClient(Node):
def __init__(self):
super().__init__('turn_in_direction_action_client')
self._action_client = ActionClient(self, RotateRobot, 'rotate_robot')
def send_goal(self, angle):
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
def main(args=None):
rclpy.init(args=args)
action_client = TurnInDirectionActionClient()
future = action_client.send_goal()
rclpy.spin_until_future_complete(action_client, future)
if __name__ == '__main__':
main()
\ No newline at end of file
import typing
import action_msgs.msg as action_msgs # GoalStatus
import py_trees
import rclpy.action
from py_trees_ros import exceptions
##############################################################################
# Behaviours
##############################################################################
class FromBlackboard(py_trees.behaviour.Behaviour):
"""
An action client interface that draws goals from the blackboard. The
lifecycle of this behaviour works as follows:
* :meth:`initialise`: check blackboard for a goal and send
* :meth:`update`: if a goal was sent, monitor progress
* :meth:`terminate`: if interrupted while running, send a cancel request
As a consequence, the status of this behaviour can be interpreted as follows:
* :data:`~py_trees.common.Status.FAILURE`: no goal was found to send,
it was rejected or it failed while executing
* :data:`~py_trees.common.Status.RUNNING`: a goal was sent and is still
executing on the server
* :data:`~py_trees.common.Status.SUCCESS`: sent goal has completed with success
To block on the arrival of a goal on the blackboard, use with the
:class:`py_trees.behaviours.WaitForBlackboardVariable` behaviour. e.g.
.. code-block:: python
sequence = py_trees.composites.Sequence(name="Sequence")
wait_for_goal = py_trees.behaviours.WaitForBlackboardVariable(
name="WaitForGoal",
variable_name="/my_goal"
)
action_client = py_trees_ros.aciton_clients.FromBlackboard(
action_type=py_trees_actions.Dock,
action_name="dock",
name="ActionClient"
)
sequence.add_children([wait_for_goal, action_client])
To customise a more interesting feedback message, pass in a method to the
constructor, for example:
.. code-block:: python
action_client = py_trees_ros.action_clients.FromBlackboard(
action_type=py_trees_actions.Dock,
action_name="dock",
name="ActionClient",
generate_message=lambda msg: "{:.2f}%%".format(msg.feedback.percentage_completed)
)
Args:
action_type: spec type for the action (e.g. move_base_msgs.action.MoveBase)
action_name: where you can find the action topics & services (e.g. "bob/move_base")
key: name of the key on the blackboard
name: name of the behaviour (default: lowercase class name)
generate_feedback_message: formatter for feedback messages, takes action_type.Feedback
messages and returns strings (default: None)
wait_for_server_timeout_sec: use negative values for a blocking but periodic check (default: -3.0)
.. note::
The default setting for timeouts (a negative value) will suit
most use cases. With this setting the behaviour will periodically check and
issue a warning if the server can't be found. Actually aborting the setup can
usually be left up to the behaviour tree manager.
"""
def __init__(self,
action_type: typing.Any,
action_name: str,
key: str,
name: str=py_trees.common.Name.AUTO_GENERATED,
generate_feedback_message: typing.Callable[[typing.Any], str]=None,
wait_for_server_timeout_sec: float=-3.0
):
super().__init__(name)
self.action_type = action_type
self.action_name = action_name
self.wait_for_server_timeout_sec = wait_for_server_timeout_sec
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(
key="goal",
access=py_trees.common.Access.READ, # make sure to namespace it if not already
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", key)
)
self.blackboard.register_key(
key="result",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/%s_result", key)
)
self.blackboard.register_key(
key="feedback",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/%s_feedback", key)
)
self.generate_feedback_message = generate_feedback_message
self.node = None
self.action_client = None
self.status_strings = {
action_msgs.GoalStatus.STATUS_UNKNOWN : "STATUS_UNKNOWN", # noqa
action_msgs.GoalStatus.STATUS_ACCEPTED : "STATUS_ACCEPTED", # noqa
action_msgs.GoalStatus.STATUS_EXECUTING: "STATUS_EXECUTING", # noqa
action_msgs.GoalStatus.STATUS_CANCELING: "STATUS_CANCELING", # noqa
action_msgs.GoalStatus.STATUS_SUCCEEDED: "STATUS_SUCCEEDED", # noqa
action_msgs.GoalStatus.STATUS_CANCELED : "STATUS_CANCELED", # noqa
action_msgs.GoalStatus.STATUS_ABORTED : "STATUS_ABORTED" # noqa
}
def setup(self, **kwargs):
"""
Setup the action client services and subscribers.
Args:
**kwargs (:obj:`dict`): distribute arguments to this
behaviour and in turn, all of it's children
Raises:
:class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs
:class:`~py_trees_ros.exceptions.TimedOutError`: if the action server could not be found
"""
self.logger.debug("{}.setup()".format(self.qualified_name))
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.action_client = rclpy.action.ActionClient(
node=self.node,
action_type=self.action_type,
action_name=self.action_name
)
result = None
if self.wait_for_server_timeout_sec > 0.0:
result = self.action_client.wait_for_server(timeout_sec=self.wait_for_server_timeout_sec)
else:
iterations = 0
period_sec = -1.0*self.wait_for_server_timeout_sec
while not result:
iterations += 1
result = self.action_client.wait_for_server(timeout_sec=period_sec)
if not result:
self.node.get_logger().warning(
"waiting for action server ... [{}s][{}][{}]".format(
iterations * period_sec,
self.action_name,
self.qualified_name
)
)
if not result:
self.feedback_message = "timed out waiting for the server [{}]".format(self.action_name)
self.node.get_logger().error("{}[{}]".format(self.feedback_message, self.qualified_name))
raise exceptions.TimedOutError(self.feedback_message)
else:
self.feedback_message = "... connected to action server [{}]".format(self.action_name)
self.node.get_logger().info("{}[{}]".format(self.feedback_message, self.qualified_name))
def initialise(self):
"""
Reset the internal variables and kick off a new goal request.
"""
self.logger.debug("{}.initialise()".format(self.qualified_name))
# initialise some temporary variables
self.goal_handle = None
self.send_goal_future = None
self.get_result_future = None
self.result_message = None
self.result_status = None
self.result_status_string = None
self.blackboard.feedback = None
self.blackboard.result = None
try:
self.send_goal_request(self.blackboard.goal)
self.feedback_message = "sent goal request"
except KeyError:
pass # self.send_goal_future will be None, check on that
def update(self):
"""
Check only to see whether the underlying action server has
succeeded, is running, or has cancelled/aborted for some reason and
map these to the usual behaviour return states.
Returns:
:class:`py_trees.common.Status`
"""
self.logger.debug("{}.update()".format(self.qualified_name))
if self.send_goal_future is None:
self.feedback_message = "no goal to send"
return py_trees.common.Status.FAILURE
if self.goal_handle is not None and not self.goal_handle.accepted:
# goal was rejected
self.feedback_message = "goal rejected"
return py_trees.common.Status.FAILURE
if self.result_status is None:
return py_trees.common.Status.RUNNING
elif not self.get_result_future.done():
# should never get here
self.node.get_logger().warn("got result, but future not yet done [{}]".format(self.qualified_name))
return py_trees.common.Status.RUNNING
else:
self.node.get_logger().debug("goal result [{}]".format(self.qualified_name))
self.node.get_logger().debug(" status: {}".format(self.result_status_string))
self.node.get_logger().debug(" message: {}".format(self.result_message))
if self.result_status == action_msgs.GoalStatus.STATUS_SUCCEEDED: # noqa
self.feedback_message = "successfully completed"
return py_trees.common.Status.SUCCESS
else:
self.feedback_message = "failed"
return py_trees.common.Status.FAILURE
def terminate(self, new_status: py_trees.common.Status):
"""
If running and the current goal has not already succeeded, cancel it.
Args:
new_status: the behaviour is transitioning to this new status
"""
self.logger.debug(
"{}.terminate({})".format(
self.qualified_name,
"{}->{}".format(self.status, new_status) if self.status != new_status else "{}".format(new_status)
)
)
if (
self.status == py_trees.common.Status.RUNNING and
new_status == py_trees.common.Status.INVALID
):
self.send_cancel_request()
def shutdown(self):
"""
Clean up the action client when shutting down.
"""
self.action_client.destroy()
########################################
# Action Client Methods
########################################
def feedback_callback(self, msg: typing.Any):
"""
Default generator for feedback messages from the action server. This will
update the behaviour's feedback message with a stringified version of the
incoming feedback message.
Args:
msg: incoming feedback message (e.g. move_base_msgs.action.MoveBaseFeedback)
"""
self.blackboard.feedback = msg
if self.generate_feedback_message is not None:
self.feedback_message = "feedback: {}".format(self.generate_feedback_message(msg))
self.node.get_logger().debug(
'{} [{}]'.format(
self.feedback_message,
self.qualified_name
)
)
def send_goal_request(self, goal: typing.Any):
"""
Send the goal, get a future back and start lining up the
chain of callbacks that will lead to a result.
"""
self.feedback_message = "sending goal ..."
self.node.get_logger().debug("{} [{}]".format(
self.feedback_message,
self.qualified_name
))
self.send_goal_future = self.action_client.send_goal_async(
goal,
feedback_callback=self.feedback_callback,
# A random uuid is always generated, since we're not sending more than one
# at a time, we don't need to generate and track them here
# goal_uuid=unique_identifier_msgs.UUID(uuid=list(uuid.uuid4().bytes))
)
self.send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future: rclpy.task.Future):
"""
Handle goal response, proceed to listen for the result if accepted.
Args:
future: incoming goal request result delivered from the action server
"""
if future.result() is None:
self.feedback_message = "goal request failed :[ [{}]\n{!r}".format(self.qualified_name, future.exception())
self.node.get_logger().debug('... {}'.format(self.feedback_message))
return
self.goal_handle = future.result()
if not self.goal_handle.accepted:
self.feedback_message = "goal rejected :( [{}]".format(self.qualified_name)
self.node.get_logger().debug('... {}'.format(self.feedback_message))
return
else:
self.feedback_message = "goal accepted :) [{}]".format(self.qualified_name)
self.node.get_logger().debug("... {}".format(self.feedback_message))
self.node.get_logger().debug(" {!s}".format(future.result()))
self.get_result_future = self.goal_handle.get_result_async()
self.get_result_future.add_done_callback(self.get_result_callback)
def send_cancel_request(self):
"""
Send a cancel request to the server. This is triggered when the
behaviour's status switches from :attr:`~py_trees.common.Status.RUNNING` to
:attr:`~py_trees.common.Status.INVALID` (typically a result of a priority
interrupt).
"""
self.feedback_message = "cancelling goal ... [{}]".format(self.qualified_name)
self.node.get_logger().debug(self.feedback_message)
if self.goal_handle is not None:
future = self.goal_handle.cancel_goal_async()
future.add_done_callback(self.cancel_response_callback)
def cancel_response_callback(self, future: rclpy.task.Future):
"""
Immediate callback for the result of a cancel request. This will
set the behaviour's feedback message accordingly.
Args:
future: incoming cancellation result delivered from the action server
"""
cancel_response = future.result()
if len(cancel_response.goals_canceling) > 0:
self.feedback_message = "goal successfully cancelled [{}]".format(self.qualified_name)
else:
self.feedback_message = "goal failed to cancel [{}]".format(self.qualified_name)
self.node.get_logger().debug('... {}'.format(self.feedback_message))
def get_result_callback(self, future: rclpy.task.Future):
"""
Immediate callback for the result, saves data into local variables so that
the update method can react accordingly.
Args:
future: incoming goal result delivered from the action server
"""
self.blackboard.result = future.result()
self.result_message = future.result()
self.result_status = future.result().status
self.result_status_string = self.status_strings[self.result_status]
\ No newline at end of file
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from lhw_interfaces.action import RotateRobot
class RotateRobotActionServer(Node):
def __init__(self):
super().__init__('rotate_robot_action_server')
self._action_server = ActionServer(
self,
RotateRobot,
'rotate_robot',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = RotateRobot.Result()
return result
def main(args=None):
rclpy.init(args=args)
rotate_robot_action_server = RotateRobotActionServer()
rclpy.spin(rotate_robot_action_server)
if __name__ == '__main__':
main()
\ No newline at end of file
from typing import Any, Iterable, Union
from std_msgs.msg import Bool, String, Empty, Int16
from lhw_interfaces.msg import Entities, Response, Entity, Result
from geometry_msgs.msg import Twist, Pose, PoseStamped
import numpy as np
class Actions:
"""
Abstract class, used for all behaviours (in the behaviours folder)
The node is passed to the behaviour, so create new subscribers and
publishers is possible. This should only be done if the callbacks or
publish function isn't already defined in this class.
"""
def __init__(self, node):
self.node = node
self.start_time = self.node.get_clock().now()
def __del__(self):
pass
# ------- Common methods -------
def behaviour_timeout(self) -> bool:
""" Checks if the behaviour has gone on for too long
Returns:
True if too long, False otherwise.
"""
if self.behaviour_timeout_in_sec:
current_time = self.node.get_clock().now()
current_behaviour_time = current_time - self.start_time
current_behaviour_time_in_sec = round(current_behaviour_time.nanoseconds * 10**(-9))
return current_behaviour_time_in_sec >= self.behaviour_timeout_in_sec
else:
return False
def add_to_db(self, collection: str, element: Union[Entity,Any]):
""" Adds an arbitrary element to the database TODO: Not sure of the format of the nlp_parameters
"""
self.node.db.add_to_db(collection, element)
def get_categories_of_type(self, type):
return self.node.db.get_categories_of_type(type)
def get_entity(self, id: int) -> Entity:
""" Returns the entitity from the database corresponding to the supplied id
Args:
id: The identity of the object
Returns:
The entity
"""
return self.node.db.get_entity(id)
def relative_xyz_to_relative_pose(self, xyz: Iterable) -> Pose:
""" Converts relative coordinates from the vision stystem to a Pose message
Note that this function assumes that the coordinates from vision are not from a rotated head for example.
Args:
xyz: x - right , y - down, z - depth
Returns:
Pose where x - right, y - forward
"""
vector_in_2d = np.array((float(xyz[0]), float(xyz[2])))
direction_vector_in_2d = vector_in_2d / np.linalg.norm(vector_in_2d)
pose = Pose()
pose.position.x = direction_vector_in_2d[0]
pose.position.y = direction_vector_in_2d[1]
pose.position.z = 0.0
return pose
def relative_xyz_to_global_position(self, xyz):
#DO SOME MAGIC HERE!
position = {}
position["x"] = float(xyz[0])
position["y"] = float(xyz[1])
position["z"] = float(xyz[2])
return position
def position_to_pose(self, position: Iterable) -> Pose:
""" Converts a position to a Pose message
Args:
position: The position
Returns:
The corresponding pose
"""
pose = Pose()
pose.position.x = position["x"]
pose.position.y = position["y"]
pose.position.z = position["z"]
return pose
# ------- Publishers -----------
def rotate_robot(self):
""" Rotates the robot counter (?) clockwise.
"""
self.node.log.info("Rotating robot")
msg = Twist()
msg.angular.z = -1.0
self.node.cmd_vel_publisher.publish(msg)
def stop_robot(self):
""" Stops the robot
"""
self.node.log.info("Stop robot")
msg = Twist()
self.node.cmd_vel_publisher.publish(msg)
def move_robot_to_relative_pose(self, pose: Pose):
""" Takes a pose and
"""
goal = PoseStamped()
goal.pose = pose
self.node.log.info("Navigate to relative pose " + str(goal))
self.node.relative_goal_pose_publisher.publish(goal)
def move_robot_to_pose(self, pose):
goal = PoseStamped()
goal.pose = pose
self.node.log.info("Navigate to pose " + str(goal))
self.node.goal_pose_publisher.publish(goal)
def send_result(self, success=True, id=-1):
result = Result()
result.success = success
result.id = id
self.node.log.info("Result: " + str(result))
self.node.result_publisher.publish(result)
def say(self, line):
msg = String()
msg.data = line
self.node.log.info("Saying " + line)
self.node.say_publisher.publish(msg)
def send_event(self, event_name, parameters = {}):
msg = Event()
msg.event_name = event_name
msg_parameters = []
for key, value in parameters.items():
parameter = Parameter()
parameter.key = key
parameter.value = value
msg_parameters.append(parameter)
msg.parameters = msg_parameters
self.node.log.info("Sending event " + event_name + " with parameters " + str(parameters) + "to Dialogflow")
self.node.event_publisher.publish(msg)
# ------- Subscribers ---------
def entities_callback(self, data):
return
def dialogflow_response_callback(self, data):
print("received data from dialogflow")
return
def gpt_response_callback(self, data):
print("received data from gpt")
return
'''
def tracked_callback(self, msg):
""" Contains visually tracked objects with 3D positions inserted """
return
def publish_say(self, msg):
""" For using pepper text to speech """
self.node.say_publisher.publish(msg)
'''
\ No newline at end of file
"""
This is the main AI file for the whole of lhw. If you want to perform a task,
you also have to launch one of the state machines in this package.
In other words, this is a hub which reacts to sensory input.
"""
import rclpy
from rclpy.node import Node
from lhw_interfaces.msg._goal import Goal
from lhw_interfaces.msg._feedback import Feedback
from lhw_interfaces.msg._result import Result
from std_msgs.msg import String
from lhw_interfaces.msg import Entities, Response, Event
from geometry_msgs.msg import Twist, Point, PoseStamped
from nav_msgs.msg import OccupancyGrid
from .help_functions.DataBase import DataBase
import time
def import_behaviour(what):
if not hasattr(import_behaviour, "imported"):
import_behaviour.imported = dict()
file_name = what.split(":")[0]
class_name = what.split(":")[1]
if what not in import_behaviour.imported:
lib = __import__("behaviours." + file_name, globals(), locals(), [file_name], 1)
import_behaviour.imported[what] = getattr(lib, class_name)
return import_behaviour.imported[what]
class AI(Node):
def __init__(self):
super().__init__('ai')
self.log = self.get_logger()
self.log.info("Main AI started")
#self.goal_subscriber = self.create_subscription(Entities, "sm_goal", self.new_goal, 10)
# ------- Communication with SM ---------
self.goal_subscriber = self.create_subscription(Goal, "sm_goal", self.new_goal, 10)
self.feedback_publisher = self.create_publisher(Feedback, 'sm_feedback', 1)
self.result_publisher = self.create_publisher(Result, 'sm_result', 1)
# ------- Subscribers ---------
entities_sub = self.create_subscription(Entities, 'entities', self.entities_callback, 1)
dialogflow_sub = self.create_subscription(Response, 'dialogflow_response', self.dialogflow_response_callback, 1)
gpt_sub = self.create_subscription(Response, 'gpt_response', self.gpt_response_callback, 1)
# ------- Publishers ---------
self.cmd_vel_publisher = self.create_publisher(Twist, 'cmd_vel', 1)
self.goal_pose_publisher = self.create_publisher(PoseStamped, 'goal_pose', 1)
self.realative_goal_pose_publisher = self.create_publisher(PoseStamped, 'realative_goal_pose', 1)
self.map = self.create_publisher(OccupancyGrid, 'map', 1)
self.say_publisher = self.create_publisher(String, 'say', 100)
self.event_publisher = self.create_publisher(Event, 'nlp_event', 100)
self.current_behaviour = None
# Initialize DB:
self.db = DataBase()
### For testing: ###
#self.db.add_to_db("seen_entities", {"id": 1, "type": "apple", "position": {"x": 10.0, "y": 30.0, "z": 2.0}})
goal = Goal()
goal.what = "speak_core:SpeakCore"
#goal.specific_id = False
#goal.id = -1
goal.type = "event"
goal.at_start = "introduce_people"
#goal.behaviour_timeout_in_sec = 109
#goal.map_name = "simple_map_with_goals.png"
for count in [3,2,1,0]:
time.sleep(1)
self._logger.info("Start in " + str(count))
#self.new_goal(goal) # TODO: TAKE THIS BACK, someday
#self.new_goal({"what": "find_person:FindPerson"}) # TEST
def new_goal(self, msg):
# Terminate old behaviour object
if self.current_behaviour is not None:
del self.current_behaviour # Blocking wait
self.current_behaviour = None
# Import behaviour class
if isinstance(msg, dict):
behaviour_class = import_behaviour(msg["what"])
else:
behaviour_class = import_behaviour(msg.what)
# Instantiate behaviour
self.current_behaviour = behaviour_class(self, msg)
# ------- Subscribers ---------
def entities_callback(self, data):
if self.current_behaviour is not None:
self.current_behaviour.entities_callback(data)
def dialogflow_response_callback(self, data):
# OLD
# if self.current_behaviour is not None:
# self.current_behaviour.dialogflow_response_callback(data)
#To test dialogflow
if data.response:
msg = String()
#print("THIS IS RESPONSE: " + data.response)
msg.data = data.response
self.say_publisher.publish(msg)
#self.current_behaviour.dialogflow_response_callback(data)
def gpt_response_callback(self, data):
if self.current_behaviour is not None:
self.current_behaviour.gpt_response_callback(data)
def main(args=None):
rclpy.init(args=args)
ai_node = AI()
rclpy.spin(ai_node)
ai_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
"""
Abstract class, used for all behaviours (in the behaviours folder)
The node is passed to the behaviour, so create new subscribers and
publishers is possible. This should only be done if the callbacks or
publish function isn't already defined in this class.
"""
from std_msgs.msg import Bool, String, Empty, Int16
from lhw_interfaces.msg import Entities, Response, Event, Parameter
from geometry_msgs.msg import Twist, Pose, PoseStamped
from lhw_interfaces.msg._result import Result
class Behaviour:
def __init__(self, node, args):
self.node = node
self.start_time = self.node.get_clock().now()
self.behaviour_timeout_in_sec = args.behaviour_timeout_in_sec
def __del__(self):
pass
# ------- Common methods -------
def _behaviour_timeout(self):
if self.behaviour_timeout_in_sec:
current_time = self.node.get_clock().now()
current_behaviour_time = current_time - self.start_time
current_behaviour_time_in_sec = round(current_behaviour_time.nanoseconds * 10**(-9))
return current_behaviour_time_in_sec >= self.behaviour_timeout_in_sec
else:
return False
def _add_to_db(self, collection, element):
self.node.db.add_to_db(collection, element)
def _get_categories_of_type(self, type):
return self.node.db.get_categories_of_type(type)
def _get_entity(self, id):
return self.node.db.get_entity(id)
def _relative_xyz_to_relative_pose(self, xyz):
pose = Pose()
pose.position.x = float(xyz[0])
pose.position.y = float(xyz[1])
pose.position.z = float(xyz[2])
return pose
def _relative_xyz_to_global_position(self, xyz):
#DO SOME MAGIC HERE!
position = {}
position["x"] = float(xyz[0])
position["y"] = float(xyz[1])
position["z"] = float(xyz[2])
return position
def _position_to_pose(self, position):
pose = Pose()
pose.position.x = position["x"]
pose.position.y = position["y"]
pose.position.z = position["z"]
return pose
# ------- Publishers -----------
def rotate_robot(self):
self.node.log.info("Rotating robot")
msg = Twist()
msg.angular.z = -1.0
self.node.cmd_vel_publisher.publish(msg)
def stop_robot(self):
self.node.log.info("Stop robot")
msg = Twist()
self.node.cmd_vel_publisher.publish(msg)
def move_robot_to_relative_pose(self, pose):
goal = PoseStamped()
goal.pose = pose
self.node.log.info("Navigate to relative pose " + str(goal))
self.node.realative_goal_pose_publisher.publish(goal)
def move_robot_to_pose(self, pose):
goal = PoseStamped()
goal.pose = pose
self.node.log.info("Navigate to pose " + str(goal))
self.node.goal_pose_publisher.publish(goal)
def send_result(self, success=True, id=-1):
result = Result()
result.success = success
result.id = id
self.node.log.info("Result: " + str(result))
self.node.result_publisher.publish(result)
def say(self, line):
msg = String()
msg.data = line
self.node.log.info("Saying " + line)
self.node.say_publisher.publish(msg)
def send_event(self, event_name, parameters = {}):
msg = Event()
msg.event_name = event_name
msg_parameters = []
for key, value in parameters.items():
parameter = Parameter()
parameter.key = key
parameter.value = value
msg_parameters.append(parameter)
msg.parameters = msg_parameters
self.node.log.info("Sending event " + event_name + " with parameters " + str(parameters) + "to Dialogflow")
self.node.event_publisher.publish(msg)
# ------- Subscribers ---------
def entities_callback(self, data):
return
def dialogflow_response_callback(self, data):
print("received data from dialogflow")
return
def gpt_response_callback(self, data):
print("received data from gpt")
return
'''
def tracked_callback(self, msg):
""" Contains visually tracked objects with 3D positions inserted """
return
def publish_say(self, msg):
""" For using pepper text to speech """
self.node.say_publisher.publish(msg)
'''
\ No newline at end of file
from typing import Any, Iterable, Union
from std_msgs.msg import Bool, String, Empty, Int16
from lhw_interfaces.msg import Entities, Response, Entity, Result
from geometry_msgs.msg import Twist, Pose, PoseStamped
import numpy as np
class Behaviour:
"""
Abstract class, used for all behaviours (in the behaviours folder)
The node is passed to the behaviour, so create new subscribers and
publishers is possible. This should only be done if the callbacks or
publish function isn't already defined in this class.
"""
def __init__(self, node):
self.node = node
#self.start_time = self.node.get_clock().now()
#self.behaviour_timeout_in_sec = args.behaviour_timeout_in_sec
def __del__(self):
pass
# ------- Common methods -------
def behaviour_timeout(self) -> bool:
""" Checks if the behaviour has gone on for too long
Returns:
True if too long, False otherwise.
"""
if self.behaviour_timeout_in_sec:
#current_time = self.node.get_clock().now()
current_behaviour_time = current_time - self.start_time
current_behaviour_time_in_sec = round(current_behaviour_time.nanoseconds * 10**(-9))
return current_behaviour_time_in_sec >= self.behaviour_timeout_in_sec
else:
return False
def add_to_db(self, collection: str, element: Union[Entity,Any]):
""" Adds an arbitrary element to the database TODO: Not sure of the format of the nlp_parameters
"""
self.node.db.add_to_db(collection, element)
def get_categories_of_type(self, type):
return self.node.db.get_categories_of_type(type)
def get_entity(self, id: int) -> Entity:
""" Returns the entitity from the database corresponding to the supplied id
Args:
id: The identity of the object
Returns:
The entity
"""
return self.node.db.get_entity(id)
def relative_xyz_to_relative_pose(self, xyz: Iterable) -> Pose:
""" Converts relative coordinates from the vision stystem to a Pose message
Note that this function assumes that the coordinates from vision are not from a rotated head for example.
Args:
xyz: x - right , y - down, z - depth
Returns:
Pose where x - right, y - forward
"""
vector_in_2d = np.array((float(xyz[0]), float(xyz[2])))
direction_vector_in_2d = vector_in_2d / np.linalg.norm(vector_in_2d)
pose = Pose()
pose.position.x = direction_vector_in_2d[0]
pose.position.y = direction_vector_in_2d[1]
pose.position.z = 0.0
return pose
def relative_xyz_to_global_position(self, xyz):
#DO SOME MAGIC HERE!
position = {}
position["x"] = float(xyz[0])
position["y"] = float(xyz[1])
position["z"] = float(xyz[2])
return position
def position_to_pose(self, position: Iterable) -> Pose:
""" Converts a position to a Pose message
Args:
position: The position
Returns:
The corresponding pose
"""
pose = Pose()
pose.position.x = position["x"]
pose.position.y = position["y"]
pose.position.z = position["z"]
pose.orientation.x = position["x_o"]
pose.orientation.y = position["y_o"]
pose.orientation.z = position["z_o"]
pose.orientation.w = position["w_o"]
return pose
# ------- Publishers -----------
def rotate_robot(self):
""" Rotates the robot counter (?) clockwise.
"""
self.node.log.info("Rotating robot")
msg = Twist()
msg.angular.z = -1.0
self.node.cmd_vel_publisher.publish(msg)
def stop_robot(self):
""" Stops the robot
"""
self.node.log.info("Stop robot")
msg = Twist()
self.node.cmd_vel_publisher.publish(msg)
def move_robot_to_relative_pose(self, pose: Pose):
""" Takes a pose and
"""
goal = PoseStamped()
goal.pose = pose
self.node.log.info("Navigate to relative pose " + str(goal))
self.node.relative_goal_pose_publisher.publish(goal)
def move_robot_to_pose(self, pose):
goal = PoseStamped()
goal.pose = pose
self.node.log.info("Navigate to pose " + str(goal))
self.node.goal_pose_publisher.publish(goal)
def send_result(self, success=True, id=-1):
result = Result()
result.success = success
result.id = id
self.node.log.info("Result: " + str(result))
self.node.result_publisher.publish(result)
def say(self, line):
msg = String()
msg.data = line
self.node.log.info("Saying " + line)
self.node.say_publisher.publish(msg)
def send_event(self, event_name, parameters = {}):
msg = Event()
msg.event_name = event_name
msg_parameters = []
for key, value in parameters.items():
parameter = Parameter()
parameter.key = key
parameter.value = value
msg_parameters.append(parameter)
msg.parameters = msg_parameters
self.node.log.info("Sending event " + event_name + " with parameters " + str(parameters) + "to Dialogflow")
self.node.event_publisher.publish(msg)
# ------- Subscribers ---------
def entities_callback(self, data):
return
def dialogflow_response_callback(self, data):
print("received data from dialogflow")
return
def gpt_response_callback(self, data):
print("received data from gpt")
return
'''
def tracked_callback(self, msg):
""" Contains visually tracked objects with 3D positions inserted """
return
def publish_say(self, msg):
""" For using pepper text to speech """
self.node.say_publisher.publish(msg)
'''
\ No newline at end of file
from typing import Any, Iterable, Union
from std_msgs.msg import Bool, String, Empty, Int16
from lhw_interfaces.msg import Entities, Response, Entity, Result
from geometry_msgs.msg import Twist, Pose, PoseStamped
import numpy as np
class Behaviour:
"""
Abstract class, used for all behaviours (in the behaviours folder)
The node is passed to the behaviour, so create new subscribers and
publishers is possible. This should only be done if the callbacks or
publish function isn't already defined in this class.
"""
def __init__(self, node, args):
self.node = node
self.start_time = self.node.get_clock().now()
self.behaviour_timeout_in_sec = args.behaviour_timeout_in_sec
def __del__(self):
pass
# ------- Common methods -------
def behaviour_timeout(self) -> bool:
""" Checks if the behaviour has gone on for too long
Returns:
True if too long, False otherwise.
"""
if self.behaviour_timeout_in_sec:
current_time = self.node.get_clock().now()
current_behaviour_time = current_time - self.start_time
current_behaviour_time_in_sec = round(current_behaviour_time.nanoseconds * 10**(-9))
return current_behaviour_time_in_sec >= self.behaviour_timeout_in_sec
else:
return False
def add_to_db(self, collection: str, element: Union[Entity,Any]):
""" Adds an arbitrary element to the database TODO: Not sure of the format of the nlp_parameters
"""
self.node.db.add_to_db(collection, element)
def get_categories_of_type(self, type):
return self.node.db.get_categories_of_type(type)
def get_entity(self, id: int) -> Entity:
""" Returns the entitity from the database corresponding to the supplied id
Args:
id: The identity of the object
Returns:
The entity
"""
return self.node.db.get_entity(id)
def relative_xyz_to_relative_pose(self, xyz: Iterable) -> Pose:
""" Converts relative coordinates from the vision stystem to a Pose message
Note that this function assumes that the coordinates from vision are not from a rotated head for example.
Args:
xyz: x - right , y - down, z - depth
Returns:
Pose where x - right, y - forward
"""
vector_in_2d = np.array((float(xyz[0]), float(xyz[2])))
direction_vector_in_2d = vector_in_2d / np.linalg.norm(vector_in_2d)
pose = Pose()
pose.position.x = direction_vector_in_2d[0]
pose.position.y = direction_vector_in_2d[1]
pose.position.z = 0.0
return pose
def relative_xyz_to_global_position(self, xyz):
#DO SOME MAGIC HERE!
position = {}
position["x"] = float(xyz[0])
position["y"] = float(xyz[1])
position["z"] = float(xyz[2])
return position
def position_to_pose(self, position: Iterable) -> Pose:
""" Converts a position to a Pose message
Args:
position: The position
Returns:
The corresponding pose
"""
pose = Pose()
pose.position.x = position["x"]
pose.position.y = position["y"]
pose.position.z = position["z"]
return pose
# ------- Publishers -----------
def rotate_robot(self):
""" Rotates the robot counter (?) clockwise.
"""
self.node.log.info("Rotating robot")
msg = Twist()
msg.angular.z = -1.0
self.node.cmd_vel_publisher.publish(msg)
def stop_robot(self):
""" Stops the robot
"""
self.node.log.info("Stop robot")
msg = Twist()
self.node.cmd_vel_publisher.publish(msg)
def move_robot_to_relative_pose(self, pose: Pose):
""" Takes a pose and
"""
goal = PoseStamped()
goal.pose = pose
self.node.log.info("Navigate to relative pose " + str(goal))
self.node.relative_goal_pose_publisher.publish(goal)
def move_robot_to_pose(self, pose):
goal = PoseStamped()
goal.pose = pose
self.node.log.info("Navigate to pose " + str(goal))
self.node.goal_pose_publisher.publish(goal)
def send_result(self, success=True, id=-1):
result = Result()
result.success = success
result.id = id
self.node.log.info("Result: " + str(result))
self.node.result_publisher.publish(result)
def say(self, line):
msg = String()
msg.data = line
self.node.log.info("Saying " + line)
self.node.say_publisher.publish(msg)
def send_event(self, event_name, parameters = {}):
msg = Event()
msg.event_name = event_name
msg_parameters = []
for key, value in parameters.items():
parameter = Parameter()
parameter.key = key
parameter.value = value
msg_parameters.append(parameter)
msg.parameters = msg_parameters
self.node.log.info("Sending event " + event_name + " with parameters " + str(parameters) + "to Dialogflow")
self.node.event_publisher.publish(msg)
# ------- Subscribers ---------
def entities_callback(self, data):
return
def dialogflow_response_callback(self, data):
print("received data from dialogflow")
return
def gpt_response_callback(self, data):
print("received data from gpt")
return
'''
def tracked_callback(self, msg):
""" Contains visually tracked objects with 3D positions inserted """
return
def publish_say(self, msg):
""" For using pepper text to speech """
self.node.say_publisher.publish(msg)
'''
\ No newline at end of file
from .behaviour import Behaviour
from std_msgs.msg import String
from lhw_interfaces.msg._result import Result
class Categorize(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("Categorize started")
# ------------- State's properties --------------
self.state_done = False
self.state_name = String()
self.state_name.data = "CATEGORIZE"
self.specific_id = args.id
self.run_timer = self.node.create_timer(1, self.run)
self.state_done = False
self.found_categories = []
self.specific_entity_possible_categories = False
specific_entity = self._get_entity(self.specific_id)
if specific_entity:
self.specific_entity_possible_categories = self._get_categories_of_type(specific_entity["type"])
print("possible: " + str(self.specific_entity_possible_categories))
else:
print("ERROR: Specific object hasn't been found yet")
def run(self):
if self.state_done or self._behaviour_timeout():
self.run_timer.cancel()
self.send_result()
return
if not self.specific_entity_possible_categories:
self.run_timer.cancel()
self.send_result(success=False)
return
for category in self.specific_entity_possible_categories:
if category in self.found_categories:
self.state_done = True
def entities_callback(self, data):
for entity in data.entities:
entity_info = {}
entity_info["id"] = entity.id
entity_info["type"] = entity.type
#Fix this:
entity_info["position"] = {"x": 2.0, "y": 6.0, "z": 1.0}
self._add_to_db("seen_entities", entity_info)
if entity.id == self.specific_id:
continue
element_categories = self._get_categories_of_type(entity.type)
print(element_categories)
for category in element_categories:
if category not in self.found_categories:
self.found_categories.append(category)
print("found: " + str(self.found_categories))
import py_trees
""" Checks using ultra sound if the door in front of the robot is open or not
"""
class IsDoorOpen(py_trees.behaviour.Behaviour):
def __init__(self, name) -> None:
super(IsDoorOpen, self).__init__(name)
def setup(self, **kwargs):
try:
self.node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
raise KeyError(error_message) from e # 'direct cause' traceability
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="door_open", access=py_trees.common.Access.READ)
def update(self):
if self.blackboard.door_open == True:
print("Door is open!")
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.RUNNING
\ No newline at end of file
from .behaviour import Behaviour
from ..help_functions.Map import Map
from std_msgs.msg import String
from lhw_interfaces.msg._result import Result
from nav_msgs.msg import OccupancyGrid
from geometry_msgs.msg import PoseStamped
import numpy as np
from scipy.spatial.distance import euclidean
from PIL import Image
import os
METER_PER_PIXEL = 0.01
class ExploreRoom(Behaviour):
def __init__(self, node, args):
super().__init__(node, args)
self.node = node
node.log.info("ExploreRoom started")
# ------------- State's properties --------------
self.state_done = False
self.is_looking_for_specific_id = args.specific_id
self.specific_id = args.id
self.is_looking_for_specific_type = args.specific_type
self.specific_type = args.type
self.state_name = String()
self.state_name.data = "EXPLORE_ROOM"
self.goal_points = []
self.start_point = None
self.goal_reached = False
self.current_goal = None
self.map = None
# Get goals and start position from drawing:
if args.map_name:
image = Image.open('src/lhw_intelligence/lhw_intelligence/maps/' + args.map_name)
self.map = Map(image, METER_PER_PIXEL)
node.log.info("Searching for start and goals in map...")
self.start_point, self.goal_points = self.map.get_start_and_goal_points()
self.current_pos = self.start_point
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
#TEMPORARY
self.moving_to_goal_counter = 3
def run_behaviour_loop(self):
all_goals_visited = self.goal_reached and not self.goal_points
if self.state_done or self._behaviour_timeout() or all_goals_visited:
self.run_timer.cancel()
self.stop_robot()
self.send_result()
return
if self.goal_reached or self.current_goal is None:
#Rotate pepper and look around
self.current_goal = self.__get_next_goal(self.goal_points, self.current_pos)
self.goal_points.remove(self.current_goal)
self.node.log.info("New goal. Move to " + str(self.current_goal))
self.goal_reached = False
goal_pose = self.map.pos_to_pose(self.current_goal)
self.move_robot_to_pose( goal_pose )
#TEMPORARY
self.moving_to_goal_counter -= 1
if not self.moving_to_goal_counter:
self.goal_reached = True
self.moving_to_goal_counter = 5
self.current_pos = self.current_goal
#self.get_next_point
def entities_callback(self, data):
for entity in data.entities:
if self.is_looking_for_specific_id and entity.id != self.specific_id:
self.state_done = True
if self.is_looking_for_specific_type and entity.type != self.specific_type:
self.state_done = True
entity_info = {}
entity_info["id"] = entity.id
entity_info["type"] = entity.type
entity_info["position"] = self._relative_xyz_to_global_position(entity.xyz)
self._add_to_db("seen_entities", entity_info)
def __get_next_goal(self, goal_points, current_pos):
closest_goal = min(goal_points, key=lambda goal : euclidean(goal, current_pos))
return closest_goal
from .behaviour import Behaviour
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from lhw_interfaces.msg import Result, Entity, Entities
from rclpy.node import Node
from typing import Any, List, Optional
class FindPerson(Behaviour):
""" Tries to find either any person or a specific person given by an identity.
If not given a specific person, it will simply select the first person identity it sees.
If no person is seen, the robot will rotate.
"""
def __init__(self, node: Node, args: Any):
""" Initializes the behaviour given the corresponding AI-node and behaviour specific arguments.
Additionally starts a timer which periodically runs the associated behaviour.
Args:
node: A ROS node.
args: Behaviour modifying arguments such as if we are looking for a specific person
"""
super().__init__(node, args)
self.node = node
node.log.info("FindPerson started")
# ------------- Behaviour's properties --------------
self.is_looking_for_specific_person = args.specific_id
self.specific_person_id = args.id
self.person = None
self.person_found = False
self.state_name = String()
self.state_name.data = "FIND_PERSON"
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
def run_behaviour_loop(self):
""" Checks if a person has been found, if so it notifies the AI and cancels the behaviour loop.
If not, the robot is rotated
"""
if self.person_found:
self.stop_robot()
self.run_timer.cancel()
self.node.log.info("Person " + str(self.person.id) + " found" )
self.send_result(id = self.person.id)
else:
self.rotate_robot()
def entities_callback(self, data: Entities):
""" Recieves entities from the nodes subscriber. Usually comes from the tracked_3d_targets topic,
but might come from somewhere else in the future. If we find the person we are looking for we set person_found to true
Args:
data: The observed entities (may be people, animals, or inanimate objects)
"""
self.person = self.__get_person(data)
if not self.person:
return
if self.is_looking_for_specific_person and self.person.id != self.specific_person_id:
return
self.person_found = True
def __get_person(self, data: Entities)->Optional[Entity]:
""" If looking for a specific person, checks all entities for the specific ID. Else returns the first person.
If no person is found, returns None.
Args:
data: The observed entities (may be people, animals, or inanimate objects)
Returns:
The person, if found. Else None.
"""
for entity in data.entities:
if entity.type == "person":
if self.is_looking_for_specific_person and entity.id != self.specific_person_id:
continue
else:
return entity
return None
\ No newline at end of file
from .behaviour import Behaviour
from geometry_msgs.msg import Twist, Pose
from lhw_interfaces.msg import Entities, Entity
from rclpy.node import Node
from typing import Any
class FollowPerson(Behaviour):
""" Given a person which is currently visible in the image, and its relative coordinate in the world,
this behaviour aims to go towards the person. The behaviour will stop when given the "stop following" intention.
"""
def __init__(self, node: Node, args: Any):
super().__init__(node, args)
node.log.info("FollowPerson started")
self.node = node
# ------------- Behaviour's properties --------------
self.person_id = args.id
self.person_relative_pose = None
self.stop_follow_person = False
self.run_timer = self.node.create_timer(1, self.run_behaviour_loop)
def run_behaviour_loop(self):
""" Follows the person. If told to stop_follow_person, will stop the robot and send back results to the node.
Otherwise will move to the relative position of the person. If person is not seen the robot will rotate until the person comes back.
"""
if self.stop_follow_person:
self.run_timer.cancel()
self.stop_robot()
self.send_result()
return
if self.person_relative_pose:
self.move_robot_to_relative_pose(self.person_relative_pose)
else:
self.rotate_robot()
def entities_callback(self, data: Entities):
""" Goes through all seen entities, if the tracked target is spotted, returns its relative pose.
If not found, the pose is set to none
Args:
data: The seen entitites
"""
for entity in data.entities:
self.node.log.info("Saw: " + str(entity))
if entity.type == "person" and entity.id == self.person_id:
self.person_relative_pose = self._relative_xyz_to_relative_pose(entity.xyz)
return
self.person_relative_pose = None
def response_callback(self, data):
if data.intention == "stop following":
self.stop_follow_person = True