Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • liuhomewreckers/liu-home-wreckers
  • tdde19-2021-5/tdde-19-group-5-pepper
2 results
Show changes
Showing
with 0 additions and 3473 deletions
import numpy as np
from geometry_msgs.msg import Pose
WHITE = np.array([255, 255, 255, 255])
BLACK = np.array([0, 0, 0, 255])
RED = np.array([255, 0, 0, 255])
BLUE = np.array([0, 0, 255, 255])
class Map:
def __init__(self, map_image, resolution):
self.grid = np.transpose(np.asarray(map_image).copy(), (1,0,2))
self.resolution = resolution
def get_start_and_goal_points(self):
map_grid = self.grid.copy()
goal_points = []
start_point = None
for x_pos, x_col in enumerate(map_grid):
for y_pos, cell in enumerate(x_col):
if np.array_equal(cell, RED):
goal_x_pos, goal_y_pos = self.__get_center_of_box_in_map(map_grid, x_pos, y_pos, RED)
goal_points.append((goal_x_pos, goal_y_pos))
goal_box_height = 2 * (goal_x_pos-x_pos)
goal_box_width = 2 * (goal_y_pos-y_pos)
map_grid = self.__remove_box_from_map(map_grid, x_pos, y_pos, goal_box_height, goal_box_width)
if np.array_equal(cell, BLUE):
start_x_pos, start_y_pos = self.__get_center_of_box_in_map(map_grid, x_pos, y_pos, BLUE)
start_point = (start_x_pos, start_y_pos)
start_box_height = 2 * (start_x_pos-x_pos)
start_box_width = 2 * (start_y_pos-y_pos)
map_grid = self.__remove_box_from_map(map_grid, x_pos, y_pos, start_box_height, start_box_width)
return start_point, goal_points
def pos_to_pose(self, pos):
x = pos[0]*self.resolution
y = pos[1]*self.resolution
pose = Pose()
pose.position.x = x
pose.position.y = y
pose.position.z = 0.0
return pose
def map2_grid_to_occupancy_grid(self, map_grid):
map = OccupancyGrid()
map.info.resolution = METER_PER_PIXEL
map.info.width = map_grid.shape[0]
map.info.height = map_grid.shape[1]
map_data = []
for x_pos, row in enumerate(map_grid):
for y_pos, cell in enumerate(row):
if np.array_equal(cell, BLACK):
map_data.append(1)
if np.array_equal(cell, WHITE):
map_data.append(0)
map.data = map_data
return map
def __get_center_of_box_in_map(self, map_grid, x_pos, y_pos, color):
diagonal_negihbour_position = (x_pos + 1, y_pos + 1)
diagonal_negihbour_value = map_grid[diagonal_negihbour_position]
while np.array_equal(diagonal_negihbour_value, color):
diagonal_negihbour_position = (x_pos + 1, y_pos + 1)
diagonal_negihbour_value = map_grid[diagonal_negihbour_position]
x_pos += 1
y_pos += 1
end_x_pos = diagonal_negihbour_position[0]
end_y_pos = diagonal_negihbour_position[1]
center_of_goal_row = (end_x_pos + x_pos) / 2
center_of_goal_col = (end_y_pos + y_pos) / 2
return center_of_goal_row, center_of_goal_col
def __remove_box_from_map(self, map_grid, x_pos, y_pos, goal_box_height, goal_box_width):
end_x_pos = int(x_pos + goal_box_height)
end_y_pos = int(y_pos + goal_box_width)
map_grid[x_pos:end_x_pos, y_pos:end_y_pos] = WHITE
return map_grid
\ No newline at end of file
src/lhw_intelligence/lhw_intelligence/maps/simple_map.png

19.8 KiB

import py_trees
from lhw_interfaces.action import RobotAction
from lhw_interfaces.msg import RobotActionGoal
from std_msgs.msg import String
from lhw_interfaces.srv import PixelToXYZ
import numpy as np
class PointQuit(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str,
key: str,
arm: RobotActionGoal.arm,
node
):
super(PointQuit, self).__init__(name=name)
self.key = key
self.arm = arm
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(
key="goal",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", key)
)
self.goal = None
self.node = node
def setup(self, **kwargs):
"""
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# ros2 node
# try:
# self.node = kwargs['node']
# except KeyError as e:
# error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
# raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
"""
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self._goal_to_blackboard()
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
return py_trees.common.Status.SUCCESS
def terminate(self, new_status: py_trees.common.Status):
"""
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _goal_to_blackboard(self):
self.node.get_logger().info("adding goal to {0} in blackboard".format(self.key))
self.goal = RobotAction.Goal()
self.goal.action.type = RobotActionGoal.POINT_QUIT
self.goal.action.arm = self.arm
self.blackboard.goal = self.goal
"""
Transforms a xy-pixel on the blackboard variable 'point_to_pixel'
to a coordinate and makes Pepper point at this location
"""
class PointAt(py_trees.behaviour.Behaviour):
"""
"""
def __init__(
self,
name: str,
key: str,
node
):
super(PointAt, self).__init__(name=name)
self.key = key
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key(key="point_to_pixel", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="point_to_pixel", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(
key="goal",
access=py_trees.common.Access.WRITE,
remap_to=py_trees.blackboard.Blackboard.absolute_name("/", key)
)
self.xyz = None
self.node = node
def setup(self, **kwargs):
"""
"""
self.logger.debug("%s.setup()" % self.__class__.__name__)
# # ros2 node
# try:
# self.node = kwargs['node']
# except KeyError as e:
# error_message = "didn't find 'node' in setup's kwargs [{}]".format(self.qualified_name)
# raise KeyError(error_message) from e # 'direct cause' traceability
def initialise(self):
"""
"""
self.logger.debug("%s.initialise()" % self.__class__.__name__)
self.xyz = None
def update(self) -> py_trees.common.Status:
"""
"""
self.logger.debug("%s.update()" % self.__class__.__name__)
if (self._convert_pixel_to_xyz()):
self._goal_to_blackboard()
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.Failure
def terminate(self, new_status: py_trees.common.Status):
"""
"""
self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status))
def _convert_pixel_to_xyz(self) -> bool:
point_to_pixel = self.blackboard.point_to_pixel
if (point_to_pixel == [-1, -1]):
return False
RES = 4
PIXELS = [53*RES, 158*RES, 265*RES]
COORDINATES = [[1.0, 0.3, 0.0, 0], [1.0, 0.0, 0.0, 1], [1.0, -0.4, 0.0, 1]]
PIXELS.reverse()
point_pixel = point_to_pixel[0]
if point_to_pixel[0] == -1:
self.xyz = [1.0, 1.0, 0.0, 0]
return True
min_pixel_distance = 10000
for (pixel, coordinate) in zip(PIXELS, COORDINATES):
diff = point_pixel - pixel
if abs(diff) < min_pixel_distance:
self.xyz = coordinate
min_pixel_distance = abs(diff)
return True
def _goal_to_blackboard(self):
self.node.get_logger().info("adding goal to {0} in blackboard".format(self.key))
goal = RobotAction.Goal()
goal.action.type = RobotActionGoal.POINT_AT
goal.action.arm = bool(self.xyz[-1])
goal.action.xyz = [self.xyz[0], self.xyz[1], self.xyz[2]]
self.blackboard.goal = goal
""" Generic way of sending a premade action msg to Pepper
"""
class PepperAction(py_trees.behaviour.Behaviour):
def __init__(self, name, node, action: RobotActionGoal) -> None:
super(PepperAction, self).__init__(name)
self.node = node
self.action = action
def setup(self, **kwargs):
self.pub = self.node.robot_action_pub
def initialise(self):
self.pub.publish(self.action)
def update(self):
return py_trees.common.Status.SUCCESS
""" Wake up Pepper
"""
class PepperWakeUp(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(PepperWakeUp, self).__init__(name)
self.node = node
self.pub = self.node.robot_action_pub
def initialise(self):
self.node.log.info("Waking up Pepper...")
action = RobotActionGoal()
action.type = RobotActionGoal.WAKEUP
self.pub.publish(action)
def update(self):
return py_trees.common.Status.SUCCESS
""" Put Pepper to rest
"""
class PepperRest(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(PepperRest, self).__init__(name)
self.node = node
self.pub = self.node.robot_action_pub
def initialise(self):
self.node.log.info("Putting Pepper to Rest...")
action = RobotActionGoal()
action.type = RobotActionGoal.REST
self.pub.publish(action)
def update(self):
return py_trees.common.Status.SUCCESS
""" Set the safety of Pepper, default is ON
"""
class PepperSetSafety(py_trees.behaviour.Behaviour):
def __init__(self, name, node, safety_on: bool = True) -> None:
super(PepperSetSafety, self).__init__(name)
self.node = node
self.pub = self.node.robot_action_pub
self.safety_on = safety_on
def initialise(self):
action = RobotActionGoal()
if self.safety_on:
self.node.log.info("Safety is now ON")
action.type = RobotActionGoal.SAFETY_ON
else:
self.node.log.info("Safety is now OFF")
action.type = RobotActionGoal.SAFETY_OFF
self.pub.publish(action)
def update(self):
return py_trees.common.Status.SUCCESS
class PepperCenterHead(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(PepperCenterHead, self).__init__(name)
self.node = node
self.pub = self.node.robot_action_pub
def initialise(self):
action = RobotActionGoal()
action.type = RobotActionGoal.CENTER_HEAD
self.pub.publish(action)
def update(self):
return py_trees.common.Status.SUCCESS
class PepperTiltHead(py_trees.behaviour.Behaviour):
def __init__(self, name, node, tilt_up: bool = True) -> None:
super(PepperTiltHead, self).__init__(name)
self.node = node
self.pub = self.node.robot_action_pub
self.tilt_up = tilt_up
def initialise(self):
action = RobotActionGoal()
if self.tilt_up:
action.type = RobotActionGoal.TILT_HEAD_UP
else:
action.type = RobotActionGoal.TILT_HEAD_DOWN
self.pub.publish(action)
def update(self):
return py_trees.common.Status.SUCCESS
class SetIdle(py_trees.behaviour.Behaviour):
def __init__(self, name, node, go_idle) -> None:
super(SetIdle, self).__init__(name)
self.node = node
self.robot_action_pub = self.node.robot_action_pub
self.go_idle = go_idle
def initialise(self):
action = RobotActionGoal()
if self.go_idle:
action.type = RobotActionGoal.IDLE_POSTURE_ON
else:
action.type = RobotActionGoal.IDLE_POSTURE_OFF
self.robot_action_pub.publish(action)
def update(self):
return py_trees.common.Status.SUCCESS
class PalmOpen(py_trees.behaviour.Behaviour):
def __init__(self, name, node, right_hand: bool) -> None:
super(PalmOpen, self).__init__(name)
self.node = node
self.robot_action_pub = self.node.robot_action_pub
self.is_right_hand = right_hand
def initialise(self):
action = RobotActionGoal()
action.hand = self.is_right_hand
self.action.type = RobotActionGoal.PALM_OPEN
self.robot_action_pub.publish(action)
def update(self):
return py_trees.common.Status.SUCCESS
class PalmClose(py_trees.behaviour.Behaviour):
def __init__(self, name, node, right_hand: bool) -> None:
super(PalmClose, self).__init__(name)
self.node = node
self.robot_action_pub = self.node.robot_action_pub
self.is_right_hand = right_hand
def initialise(self):
action = RobotActionGoal()
action.right_arm = self.is_right_hand
action.type = RobotActionGoal.PALM_CLOSE
self.robot_action_pub.publish(action)
def update(self):
return py_trees.common.Status.SUCCESS
class YawElbowOpen(py_trees.behaviour.Behaviour):
def __init__(self, name, node, right_hand: bool) -> None:
super(YawElbowOpen, self).__init__(name)
self.node = node
self.robot_action_pub = self.node.robot_action_pub
self.is_right_hand = right_hand
def setup(self, **kwargs):
self.action = RobotActionGoal()
self.action.hand = self.is_right_hand
self.action.type = RobotActionGoal.YAW_ELBOW_OPEN
def initialise(self):
self.robot_action_pub.publish(self.action)
def update(self):
return py_trees.common.Status.SUCCESS
class YawElbowClose(py_trees.behaviour.Behaviour):
def __init__(self, name, node, right_hand: bool) -> None:
super(YawElbowClose, self).__init__(name)
self.node = node
self.robot_action_pub = self.node.robot_action_pub
self.is_right_hand = right_hand
def setup(self, **kwargs):
self.action = RobotActionGoal()
self.action.hand = self.is_right_hand
self.action.type = RobotActionGoal.YAW_ELBOW_CLOSE
def initialise(self):
self.robot_action_pub.publish(self.action)
def update(self):
return py_trees.common.Status.SUCCESS
{
"The Shawshank Redemption": {
"genres": ["Drama"],
"actors": [
"Tim Robbins",
"Morgan Freeman",
"Bob Gunton"
],
"director": "Frank Darabont",
"release_year": "1994"
},
"The Godfather": {
"genres": ["Crime", "Drama"],
"actors": [
"Marlon Brando",
"Al Pacino",
"James Caan"
],
"director": "Francis Ford Coppola",
"release_year": "1972"
},
"The Dark Knight": {
"genres": ["Action", "Crime", "Drama"],
"actors": [
"Christian Bale",
"Heath Ledger",
"Aaron Eckhart"
],
"director": "Christopher Nolan",
"release_year": "2008"
},
"The Godfather Part 2": {
"genres": ["Crime", "Drama"],
"actors": [
"Al Pacino",
"Robert De Niro",
"Robert Duvall"
],
"director": "Francis Ford Coppola",
"release_year": "1974"
},
"12 Angry Men": {
"genres": ["Crime", "Drama"],
"actors": [
"Henry Fonda",
"Lee J. Cobb",
"Martin Balsam"
],
"director": "Sidney Lumet",
"release_year": "1957"
},
"Schindler's List": {
"genres": ["Biography", "Drama", "History"],
"actors": [
"Liam Neeson",
"Ralph Fiennes",
"Ben Kingsley"
],
"director": "Steven Spielberg",
"release_year": "1993"
},
"The Lord of the Rings: The Return of the King": {
"genres": ["Action", "Adventure", "Drama"],
"actors": [
"Elijah Wood",
"Viggo Mortensen",
"Ian McKellen"
],
"director": "Peter Jackson",
"release_year": "2003"
},
"Pulp Fiction": {
"genres": ["Crime", "Drama"],
"actors": [
"John Travolta",
"Uma Thurman",
"Samuel L Jackson"
],
"director": "Quentin Tarantino",
"release_year": "1994"
},
"The Lord of the Rings: The Fellowship of the Ring": {
"genres": ["Action", "Adventure", "Drama"],
"actors": [
"Elijah Wood",
"Ian McKellen",
"Orlando Bloom"
],
"director": "Peter Jackson",
"release_year": "2001"
},
"The Good the Bad and the Ugly": {
"genres": ["Adventure", "Western"],
"actors": [
"Clint Eastwood",
"Eli Wallach",
"Lee Van Cleef"
],
"director": "Sergio Leone",
"release_year": "1966"
},
"Forrest Gump": {
"genres": ["Drama", "Romance"],
"actors": [
"Tom Hanks",
"Robin Wright",
"Garry Sinise"
],
"director": "Robert Zemeckis",
"release_year": "1994"
},
"Fight Club": {
"genres": ["Drama"],
"actors": [
"Brad Pitt",
"Edward Norton",
"Meat Loaf"
],
"director": "David Fincher",
"release_year": "1999"
},
"Inception": {
"genres": ["Action", "Adventure", "Sci-Fi"],
"actors": [
"Leonardo DiCaprio",
"Joseph Gordon-Levitt",
"Elliot Page"
],
"director": "Christopher Nolan",
"release_year": "2010"
},
"The Lord of the Rings: The Two Towers": {
"genres": ["Action", "Adventure", "Drama"],
"actors": [
"Elijah Wood",
"Ian McKellen",
"Viggo Mortensen"
],
"director": "Peter Jackson",
"release_year": "2002"
},
"Star Wars: Episode 5 - The Empire Strikes Back": {
"genres": ["Action", "Adventure", "Fantasy"],
"actors": [
"Mark Hamill",
"Harrison Ford",
"Carrie Fisher"
],
"director": "Irvin Kershner",
"release_year": "1980"
},
"The Matrix": {
"genres": ["Action", "Sci-Fi"],
"actors": [
"Keanu Reeves",
"Laurence Fishburne",
"Carrie Anne Moss"
],
"director": "Lana Wachowski",
"release_year": "1999"
},
"Goodfellas": {
"genres": ["Biography", "Crime", "Drama"],
"actors": [
"Robert De Niro",
"Ray Liotta",
"Joe Pesci"
],
"director": "Martin Scorsese",
"release_year": "1990"
},
"One Flew Over the Cuckoo's Nest": {
"genres": ["Drama"],
"actors": [
"Jack Nicholson",
"Louise Fletcher",
"Michael Berryman"
],
"director": "Milos Forman",
"release_year": "1975"
},
"Seven": {
"genres": ["Crime", "Drama", "Mystery"],
"actors": [
"Morgan Freeman",
"Brad Pitt",
"Kevin Spacey"
],
"director": "David Fincher",
"release_year": "1995"
},
"Seven Samurai": {
"genres": ["Action", "Drama"],
"actors": [
"Toshiro Mifune",
"Takashi Shimura",
"Keiko Tsushima"
],
"director": "Akira Kurosawa",
"release_year": "1954"
},
"It's a Wonderful Life": {
"genres": ["Drama", "Family", "Fantasy"],
"actors": [
"Roberto Benigni",
"Nicoletta Braschi",
"Lieonel Barrymore"
],
"director": "Roberto Benigni",
"release_year": "1997"
},
"The Silence of the Lambs": {
"genres": ["Crime", "Drama", "Thriller"],
"actors": [
"Jodie Foster",
"Anthony Hopkins",
"Lawrence A Bonney"
],
"director": "Jonathan Demme",
"release_year": "1991"
},
"City of God": {
"genres": ["Crime", "Drama"],
"actors": [
"Alexandre Rodrigues",
"Leandro Firmino",
"Matheus Nachtergaele"
],
"director": "Fernando Meirelles",
"release_year": "2002"
},
"Saving Private Ryan": {
"genres": ["Drama", "War"],
"actors": [
"Tom Hanks",
"Matt Damon",
"Tom Sizemore"
],
"director": "Steven Spielberg",
"release_year": "1998"
},
"Life Is Beautiful": {
"genres": ["Comedy", "Drama", "Romance"],
"actors": [
"Tom Hanks",
"Matt Damon",
"Giorgio Cantarini"
],
"director": "Reberto Benigni",
"release_year": "1997"
},
"The Green Mile": {
"genres": ["Crime", "Drama", "Fantasy"],
"actors": [
"Tom Hanks",
"Michael Clarke Duncan",
"David Morse"
],
"director": "Frank Darabont",
"release_year": "1999"
},
"Interstellar": {
"genres": ["Adventure", "Drama", "Sci-Fi"],
"actors": [
"Matthew McConaughey",
"Anne Hathaway",
"Jessica Chastain"
],
"director": "Christopher Nolan",
"release_year": "2014"
},
"Star Wars": {
"genres": ["Action", "Adventure", "Fantasy"],
"actors": [
"Mark Hamill",
"Harrison Ford",
"Carrie Fisher"
],
"director": "George Lucas",
"release_year": "1977"
},
"Terminator 2 Judgment Day": {
"genres": ["Action", "Sci-Fi"],
"actors": [
"Arnold Schwarzenegger",
"Linda Hamilton",
"Edward Furlong"
],
"director": "James Cameron",
"release_year": "1991"
},
"Back to the Future": {
"genres": ["Adventure", "Comedy", "Sci-Fi"],
"actors": [
"Michael J. Fox",
"Christopher Lloyd",
"Lea Thompson"
],
"director": "Robert Zemeckis",
"release_year": "1985"
},
"Spirited Away": {
"genres": ["Animation", "Adventure", "Family"],
"actors": [
"Daveigh Chase",
"Suzanne Pleshette",
"Miyu Irino"
],
"director": "Hayao Miyazaki",
"release_year": "2001"
},
"Psycho": {
"genres": ["Horror", "Mystery", "Thriller"],
"actors": [
"Anthony Perkins",
"Janet Leigh",
"Vera Miles"
],
"director": "Alfred Hitchcock",
"release_year": "1960"
},
"The Pianist": {
"genres": ["Biography", "Drama", "Music"],
"actors": [
"Adrien Brody",
"Thomas Kretschmann",
"Frank Finlay"
],
"director": "Roman Polanski",
"release_year": "2002"
},
"Leon The Professional": {
"genres": ["Action", "Crime", "Drama"],
"actors": [
"Jean Reno",
"Gary Oldman",
"Natalie Portman"
],
"director": "Luc Besson",
"release_year": "1994"
},
"Parasite": {
"genres": ["Comedy", "Drama", "Thriller"],
"actors": [
"Song Kang-ho",
"Sun-kyun Lee",
"Cho Yeo-jeong"
],
"director": "Bong Joon Ho",
"release_year": "2019"
},
"The Lion King": {
"genres": ["Animation", "Adventure", "Drama"],
"actors": [
"Matthew Broderick",
"Jeremy Irons",
"James Earl Jones"
],
"director": "Roger Allers",
"release_year": "1994"
},
"Gladiator": {
"genres": ["Action", "Adventure", "Drama"],
"actors": [
"Russell Crowe",
"Joaquin Phoenix",
"Connie Nielsen"
],
"director": "Ridley Scott",
"release_year": "2000"
},
"American History X": {
"genres": ["Crime", "Drama"],
"actors": [
"Edward Norton",
"Edward Furlong",
"Beverly D'Angelo"
],
"director": "Tony Kaye",
"release_year": "1998"
},
"The Departed": {
"genres": ["Crime", "Drama", "Thriller"],
"actors": [
"Leonardo DiCaprio",
"Matt Damon",
"Jack Nicholson"
],
"director": "Martin Scorsese",
"release_year": "2006"
},
"The Usual Suspects": {
"genres": ["Crime", "Drama", "Mystery"],
"actors": [
"Kevin Spacey",
"Gabriel Byrne",
"Chazz Palminteri"
],
"director": "Bryan Singer",
"release_year": "1995"
},
"The Prestige": {
"genres": ["Drama", "Mystery", "Sci-Fi"],
"actors": [
"Christian Bale",
"Hugh Jackman",
"Scarlett Johansson"
],
"director": "Christopher Nolan",
"release_year": "2006"
},
"Casablanca": {
"genres": ["Drama", "Romance", "War"],
"actors": [
"Humphrey Bogart",
"Ingrid Bergman",
"Paul Henreid"
],
"director": "Michael Curtiz",
"release_year": "1942"
},
"Whiplash": {
"genres": ["Drama", "Music"],
"actors": [
"Miles Teller",
"J.K. Simmons",
"Melissa Benoist"
],
"director": "Damien Chazelle",
"release_year": "2014"
},
"Intouchables": {
"genres": ["Biography", "Comedy", "Drama"],
"actors": [
"Francois Cluzet",
"Omar Sy",
"Anne Le Ny"
],
"director": "Olivier Nakache",
"release_year": "2011"
},
"Harakiri": {
"genres": ["Action", "Drama", "Mystery"],
"actors": [
"Tatsuya Nakadai",
"Akira Ishihama",
"Shima Iwashita"
],
"director": "Masaki Kobayashi",
"release_year": "1962"
},
"Grave of the Fireflies": {
"genres": ["Animation", "Drama", "War"],
"actors": [
"Tsutomu Tatsumi",
"Ayano Shiraishi",
"Akemi Yamaguchi"
],
"director": "Isao Takahata",
"release_year": "1988"
},
"Modern Times": {
"genres": ["Comedy", "Drama", "Romance"],
"actors": [
"Charles Chaplin",
"Paulette Goddard",
"Henry Bergman"
],
"director": "Charles Chaplin",
"release_year": "1936"
},
"Once Upon a Time in the West": {
"genres": ["Western"],
"actors": [
"Henry Fonda",
"Charles Bronson",
"Claudia Cardinale"
],
"director": "Sergio Leone",
"release_year": "1968"
},
"Top Gun: Maverick": {
"genres": ["Action", "Drama"],
"actors": [
"Tom Cruise",
"Jennifer Connelly",
"Miles Teller"
],
"director": "Joseph Kosinski",
"release_year": "2022"
},
"Rear Window": {
"genres": ["Mystery", "Thriller"],
"actors": [
"James Stewart",
"Grace Kelly",
"Wendell Corey"
],
"director": "Alfred Hitchcock",
"release_year": "1954"
},
"Alien": {
"genres": ["Horror", "Sci-Fi"],
"actors": [
"Sigourney Weaver",
"Tom Skerritt",
"John Hurt"
],
"director": "Ridley Scott",
"release_year": "1979"
},
"City Lights": {
"genres": ["Comedy", "Drama", "Romance"],
"actors": [
"Charles Chaplin",
"Virginia Cherrill",
"Florence Lee"
],
"director": "Charles Chaplin",
"release_year": "1931"
},
"Cinema Paradiso": {
"genres": ["Drama", "Romance"],
"actors": [
"Philippe Noiret",
"Enzo Cannavale",
"Antonella Attili"
],
"director": "Giuseppe Tornatore",
"release_year": "1988"
},
"Apocalypse Now": {
"genres": ["Drama", "Mystery", "War"],
"actors": [
"Martin Sheen",
"Marlon Brando",
"Robert Duvall"
],
"director": "Francis Ford Coppola",
"release_year": "1979"
},
"Memento": {
"genres": ["Mystery", "Thriller"],
"actors": [
"Guy Pearce",
"Carrie-Anne Moss",
"Joe Pantoliano"
],
"director": "Christopher Nolan",
"release_year": "2000"
},
"Indiana Jones and the Raiders of the Lost Ark": {
"genres": ["Action", "Adventure"],
"actors": [
"Harrison Ford",
"Karen Allen",
"Paul Freeman"
],
"director": "Steven Spielberg",
"release_year": "1981"
},
"Django Unchained": {
"genres": ["Drama", "Western"],
"actors": [
"Jamie Foxx",
"Christoph Waltz",
"Leonardo DiCaprio"
],
"director": "Quentin Tarantino",
"release_year": "2012"
},
"Wall-E": {
"genres": ["Animation", "Adventure", "Family"],
"actors": [
"Ben Burtt",
"Elissa Knight",
"Jeff Garlin"
],
"director": "Andrew Stanton",
"release_year": "2008"
},
"The Lives of Others": {
"genres": ["Drama", "Mystery", "Thriller"],
"actors": [
"Ulrich Muhe",
"Martina Gedeck",
"Sebastian Koch"
],
"director": "Florian Henckel von Donnersmarck",
"release_year": "2006"
},
"Sunset Boulevard": {
"genres": ["Drama", "Film-Noir"],
"actors": [
"William Holden",
"Gloria Swanson",
"Erich von Stroheim"
],
"director": "Billy Wilder",
"release_year": "1950"
},
"Paths of Glory": {
"genres": ["Drama", "War"],
"actors": [
"Kirk Douglas",
"Ralph Meeker",
"Adolphe Menjou"
],
"director": "Stanley Kubrick",
"release_year": "1957"
},
"The Shining": {
"genres": ["Drama", "Horror"],
"actors": [
"Jack Nicholson",
"Shelley Duvall",
"Danny Lloyd"
],
"director": "Stanley Kubrick",
"release_year": "1980"
},
"The Great Dictator": {
"genres": ["Comedy", "Drama", "War"],
"actors": [
"Charles Chaplin",
"Paulette Goddard",
"Jack Oakie"
],
"director": "Charles Chaplin",
"release_year": "1940"
},
"Witness for the Prosecution": {
"genres": ["Crime", "Drama", "Mystery"],
"actors": [
"Tyrone Power",
"Marlene Dietrich",
"Charles Laughton"
],
"director": "Billy Wilder",
"release_year": "1957"
},
"Avengers: Infinity War": {
"genres": ["Action", "Adventure", "Sci-Fi"],
"actors": [
"Robert Downey Jr.",
"Chris Hemsworth",
"Mark Ruffalo",
"Chris Evans",
"Scarlett Johansson",
"Benedict Cumberbatch",
"Tom Holland",
"Chadwick Boseman"
],
"director": "Anthony Russo",
"release_year": "2018"
},
"Aliens": {
"genres": ["Action", "Adventure", "Sci-Fi"],
"actors": [
"Sigourney Weaver",
"Michael Biehn",
"Carrie Henn"
],
"director": "James Cameron",
"release_year": "1986"
},
"American Beauty": {
"genres": ["Drama"],
"actors": [
"Kevin Spacey",
"Annette Bening",
"Thora Birch"
],
"director": "Sam Mendes",
"release_year": "1999"
},
"Dr Strangelove or: How I Learned to Stop Worrying and Love the Bomb": {
"genres": ["Comedy", "War"],
"actors": [
"Peter Sellers",
"George C. Scott",
"Sterling Hayden"
],
"director": "Stanley Kubrick",
"release_year": "1964"
},
"Spider-Man: Into the Spider-Verse": {
"genres": ["Animation", "Action", "Adventure"],
"actors": [
"Shameik Moore",
"Jake Johnson",
"Hailee Steinfeld"
],
"director": "Bob Persichetti",
"release_year": "2018"
},
"The Dark Knight Rises": {
"genres": ["Action", "Drama"],
"actors": [
"Christian Bale",
"Tom Hardy",
"Anne Hathaway"
],
"director": "Christopher Nolan",
"release_year": "2012"
},
"Old Boy": {
"genres": ["Action", "Drama", "Mystery", "Thriller"],
"actors": [
"Choi Min-sik",
"Yoo Ji-tae"
],
"director": "Park Chan-wook",
"release_year": "2003"
},
"Joker": {
"genres": ["Crime", "Drama", "Thriller"],
"actors": [
"Joaquin Phoenix",
"Robert De Niro"
],
"director": "Todd Phillips",
"release_year": "2019"
},
"Amadeus": {
"genres": ["Biography", "Drama", "Music"],
"actors": [
"F. Murray Abraham",
"Tom Hulce"
],
"director": "Milos Forman",
"release_year": "1984"
},
"Braveheart": {
"genres": ["Biography", "Drama", "History", "War"],
"actors": [
"Mel Gibson",
"Sophie Marceau"
],
"director": "Mel Gibson",
"release_year": "1995"
},
"Toy Story": {
"genres": ["Animation", "Adventure", "Comedy", "Family", "Fantasy"],
"actors": [
"Tom Hanks",
"Tim Allen"
],
"director": "John Lasseter",
"release_year": "1995"
},
"Coco": {
"genres": ["Animation", "Adventure", "Comedy", "Family", "Fantasy", "Music", "Mystery"],
"actors": [
"Anthony Gonzalez",
"Gael García Bernal"
],
"director": "Lee Unkrich",
"release_year": "2017"
},
"Das Boot": {
"genres": ["Drama", "War"],
"actors": [
"Jürgen Prochnow",
"Herbert Grönemeyer"
],
"director": "Wolfgang Petersen",
"release_year": "1981"
},
"Inglourious Basterds": {
"genres": ["Adventure", "Drama", "War"],
"actors": [
"Brad Pitt",
"Diane Kruger"
],
"director": "Quentin Tarantino",
"release_year": "2009"
},
"Prinsessan Mononoke": {
"genres": ["Animation", "Adventure", "Fantasy"],
"actors": [
"Yôji Matsuda",
"Yuriko Ishida"
],
"director": "Hayao Miyazaki",
"release_year": "1997"
},
"Avengers: Endgame": {
"genres": ["Action", "Adventure", "Drama", "Sci-Fi"],
"actors": [
"Robert Downey Jr.",
"Chris Hemsworth",
"Mark Ruffalo",
"Chris Evans",
"Scarlett Johansson",
"Benedict Cumberbatch",
"Tom Holland",
"Chadwick Boseman"
],
"director": "Anthony Russo",
"release_year": "2019"
},
"Once Upon a Time in America": {
"genres": ["Crime", "Drama"],
"actors": [
"Robert De Niro",
"James Woods"
],
"director": "Sergio Leone",
"release_year": "1984"
},
"Will Hunting": {
"genres": ["Drama", "Romance"],
"actors": [
"Robin Williams",
"Matt Damon"
],
"director": "Gus Van Sant",
"release_year": "1997"
},
"Your Name": {
"genres": ["Animation", "Drama", "Fantasy", "Romance"],
"actors": [
"Ryûnosuke Kamiki",
"Mone Kamishiraishi"
],
"director": "Makoto Shinkai",
"release_year": "2016"
},
"Toy Story 3": {
"genres": ["Animation", "Adventure", "Comedy", "Family", "Fantasy"],
"actors": [
"Tom Hanks",
"Tim Allen"
],
"director": "Lee Unkrich",
"release_year": "2010"
},
"Requiem for a Dream": {
"genres": ["Drama"],
"actors": [
"Ellen Burstyn",
"Jared Leto"
],
"director": "Darren Aronofsky",
"release_year": "2000"
},
"Singin' in the Rain": {
"genres": ["Comedy", "Musical", "Romance"],
"actors": [
"Gene Kelly",
"Donald O'Connor"
],
"director": "Stanley Donen",
"release_year": "1952"
},
"3 Idiots": {
"genres": ["Comedy", "Drama"],
"actors": [
"Aamir Khan",
"Madhavan"
],
"director": "Rajkumar Hirani",
"release_year": "2009"
},
"Star Wars: Episode VI - Return of the Jedi": {
"genres": ["Action", "Adventure", "Fantasy", "Sci-Fi"],
"actors": [
"Mark Hamill",
"Harrison Ford"
],
"director": "Richard Marquand",
"release_year": "1983"
},
"High and Low": {
"genres": ["Crime", "Drama", "Mystery", "Thriller"],
"actors": [
"Toshirô Mifune",
"Yutaka Sada"
],
"director": "Akira Kurosawa",
"release_year": "1963"
},
"2001: A Space Odyssey": {
"genres": ["Adventure", "Sci-Fi"],
"actors": [
"Keir Dullea",
"Gary Lockwood"
],
"director": "Stanley Kubrick",
"release_year": "1968"
},
"Eternal Sunshine of the Spotless Mind": {
"genres": ["Drama", "Romance", "Sci-Fi"],
"actors": [
"Jim Carrey",
"Kate Winslet"
],
"director": "Michel Gondry",
"release_year": "2004"
},
"Reservoir Dogs": {
"genres": ["Crime", "Drama", "Thriller"],
"actors": [
"Harvey Keitel",
"Tim Roth"
],
"director": "Quentin Tarantino",
"release_year": "1992"
},
"Kapernaum": {
"genres": ["Drama"],
"actors": [
"Zain Al Rafeea",
"Yordanos Shiferaw"
],
"director": "Nadine Labaki",
"release_year": "2018"
},
"Citizen Kane": {
"genres": ["Drama", "Mystery"],
"actors": [
"Orson Welles",
"Joseph Cotten"
],
"director": "Orson Welles",
"release_year": "1941"
},
"Lawrence of Arabia": {
"genres": ["Adventure", "Biography", "Drama", "War"],
"actors": [
"Peter O'Toole",
"Alec Guinness"
],
"director": "David Lean",
"release_year": "1962"
},
"Jagten": {
"genres": ["Drama"],
"actors": [
"Mads Mikkelsen",
"Thomas Bo Larsen"
],
"director": "Thomas Vinterberg",
"release_year": "2012"
},
"M - Eine Stadt sucht einen Mörder": {
"genres": ["Crime", "Mystery", "Thriller"],
"actors": [
"Peter Lorre",
"Ellen Widmann"
],
"director": "Fritz Lang",
"release_year": "1931"
},
"North by Northwest": {
"genres": ["Action", "Adventure", "Mystery", "Thriller"],
"actors": [
"Cary Grant",
"Eva Marie Saint"
],
"director": "Alfred Hitchcock",
"release_year": "1959"
},
"Vertigo": {
"genres": ["Mystery", "Romance", "Thriller"],
"actors": [
"James Stewart",
"Kim Novak"
],
"director": "Alfred Hitchcock",
"release_year": "1958"
},
"Idi i smotri": {
"genres": ["Drama", "Thriller", "War"],
"actors": [
"Aleksey Kravchenko",
"Olga Mironova"
],
"director": "Elem Klimov",
"release_year": "1985"
}
}
\ No newline at end of file
import json
from .motion_behaviours import *
from .scenario_trees import *
from .py_tree_test_functions import Py_Tree_Tests
import py_trees
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
import os
from diagnostic_msgs.msg._diagnostic_array import DiagnosticArray
from move_base_msgs.action import MoveBase
from std_msgs.msg import String, Bool
from sensor_msgs.msg import Range
from naoqi_bridge_msgs.msg import HeadTouch
from lhw_interfaces.msg import Entities, Response, RobotActionGoal, FaceDetected, FacesDetected, WordDetected
from nav_msgs.msg import OccupancyGrid
from nav2_msgs.action import NavigateToPose
from lhw_interfaces.action import TabletAction
#from action_msgs.msg import GoalStatusArray
from actionlib_msgs.msg import GoalStatusArray
from geometry_msgs.msg import Twist
from naoqi_bridge_msgs.msg import AudioBuffer as NaoqiAudioBuffer
from .FredrikDemo import guess_movie_demo
######################################################################################
# Global variables
######################################################################################
BB_WRITE_VARIABLES = ["battery_percentage", "battery_low_warning", "head_touched", "door_open", "in_motion", "at_destination",
"entities", "point_to_pixel", "recognized_text", "last_recognized_item", "recognized_people", "found_new_person",
"current_person", "already_introduced_people", "is_talking", "say_dynamic_str", "move_base_status", "updated",
"movie_answers"]
BB_READ_VARIABLES = ["entities", "movie_answers", "head_touched"]
class Py_Tree(Node):
def __init__(self):
super().__init__('py_tree')
self.log = self.get_logger()
self.now = time.time()
self.goal_time_stamp = 0
######################################################################################
# For running the SSS Tunnel
######################################################################################
self.pepper_ip = "10.133.5.240"#"192.168.0.108"#os.environ["PEPPER_IP"]
self.user = "nao"
self.pw = "nao"
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
print("start pytree")
"""try:
self.ssh_client.connect(hostname = self.pepper_ip, username = self.user, password = self.pw)
print(f"Successful Connection")
except Exception as e:
print(f"Failed to connect to {self.pepper_ip} vi Shh")"""
######################################################################################
# Publishers
######################################################################################
self.say_publisher = self.create_publisher(String, 'say', 10)
self.robot_action_pub = self.create_publisher(RobotActionGoal, 'robot_action', 10)
self.camera_on_off_publisher = self.create_publisher(Bool, 'camera_on_off', 10)
self.microphone_on_off_publisher = self.create_publisher(Bool, 'microphone_on_off', 10)
self.fix_head_pub = self.create_publisher(Bool, 'lhw/head_fix', 10)
self.cmd_vel_publisher = self.create_publisher(Twist, 'cmd_vel', 10)
self.listener_on_off_publisher = self.create_publisher(Bool, "listener_on_off", 10)
######################################################################################
# Subscribers
######################################################################################
self.dialogflow_sub = self.create_subscription(Response, 'dialogflow_response', self.dialogflow_response_callback, 10)
self.face_detected_sub = self.create_subscription(FaceDetected, 'face_detected', self.face_detected_callback, 10)
self.n_faces_detected_sub = self.create_subscription(FacesDetected, 'faces_detected', self.faces_detected_callback, 10)
self.entities_sub = self.create_subscription(Entities, 'yolov5_entities', self.entities_callback, 10)
self.head_touched_subscriber = self.create_subscription(HeadTouch, 'head_touch', self.head_touched_cb, 10)
self.sonar_front_subscriber = self.create_subscription(Range, 'sonar/front', self.sonar_front_cb, 10)
self.battery_sub = self.create_subscription(DiagnosticArray, "diagnostics", self.battery_cb, 10)
self.is_talking_sub = self.create_subscription(Bool, "is_talking", self.is_talking_cb, 10) # TODO: Test with things running on Pepper
self.yes_no_sub = self.create_subscription(WordDetected, "word_detected", self.yes_no_cb, 10)
self.move_base_status = self.create_subscription(GoalStatusArray, "/lhw/nav/move_base/status", self.goal_success_cb, 10)
######################################################################################
# Action and Service Clients
######################################################################################
#client server (/navigate_to_pose)
#self._action_client = ActionClient(self, MoveBase, 'mauqi/nav/goal_server/goal')
self.pixel_to_xyz_serv = self.create_client(PixelToXYZ, "pixel_to_xyz")
self.tablet_action_client = ActionClient(self, TabletAction, "tablet_action_server")
######################################################################################
# Blackboard setup
# Add the name of the variable you want to have read/write access to in the global
# variables (and optionally add initialation of it in the initialize_bb_variables())
######################################################################################
self.blackboard = py_trees.blackboard.Client(name="bb")
self.set_all_read_blackboard(BB_READ_VARIABLES)
self.set_all_write_blackboard(BB_WRITE_VARIABLES)
self.initialize_bb_variables()
######################################################################################
# Some useful behaviours
######################################################################################
center_head = PepperCenterHead(name="Center Head", node=self)
idle_off = SetIdle(name="Idle Off", node=self, go_idle=True)
mock = MockNavigation(name="test mock", node=self)
######################################################################################
# Setup and run PyTree
######################################################################################
#Points for navigation can be read from the configfile
config = json.load(open("src/lhw_intelligence/lhw_intelligence/config.json"))
#test_functions object can be used to call functions in py_tree_test_functions which are returning
#sequences which root can be set to
test_functions = Py_Tree_Tests(self)
self.log.info("py_tree started")
s_c_tree = safety_check_tree(self, config, self.ssh_client)
receptionist_tree = receptionist(self, config, self.ssh_client)
#test_functions.test_pointing_straight_forward()
#Root is set to the tree which we want to run
self.behaviour_tree = py_trees.trees.BehaviourTree(root=guess_movie_demo(self))#test_functions.better_point_at_face_test())#test_functions.point_at_face_test())
self.behaviour_tree.setup(timeout=15)
""" Run before every tick to clear all backlogged callbacks
"""
def pre_tick_spin(tree):
rclpy.spin_once(self, timeout_sec=2)
...
def tick_print(tree):
self.get_logger().info(py_trees.display.unicode_tree(root=tree.root, show_status=True))
print(self.blackboard.movie_answers)
#print(self.blackboard.recognized_people)
#print(self.blackboard.point_to_pixel)
try:
self.behaviour_tree.tick_tock(
period_ms=500,
number_of_iterations=py_trees.trees.CONTINUOUS_TICK_TOCK,
pre_tick_handler=pre_tick_spin,
post_tick_handler=tick_print
)
except KeyboardInterrupt:
self.behaviour_tree.interrupt()
######################################################################################
# Callbacks
######################################################################################
def goal_success_cb(self,msg):
try:
print(msg.status_list[0].status, "Current Status")
except IndexError:
return
#current_time_stamp = msg.status_list[0].goal_id.stamp.sec
if msg.status_list[0].status == 1:
self.blackboard.updated = True
self.goal_time_stamp = msg.status_list[0].goal_id.stamp.sec
#Using the move_base/status topic we check the current status and write to the blackboard
self.get_logger().info("----------------------------")
self.get_logger().info(str(self.blackboard.move_base_status))
#May Add more Statuses if necessary
if (msg.status_list[0].status == 3 or msg.status_list[0].status == 2) and self.blackboard.updated:
self.get_logger().info('Publishing: "%s"' % "Goal Reached")
self.blackboard.move_base_status = 3
if msg.status_list[0].status == 4:
self.get_logger().info('Publishing: "%s"' % "Goal Failed")
self.blackboard.move_base_status = "4"
def head_touched_cb(self, msg):
print("head touched!")
self.blackboard.head_touched = True
def sonar_front_cb(self, sonar_msg):
#Sat this to true when testing security with navigation
self.get_logger().info('Publishing: "%s"' % sonar_msg.range)
if sonar_msg.range > 0.5:
self.blackboard.door_open = True
def battery_cb(self, data):
# How to get the data for the battery from the DiagnosticArray
# data.status[len(data.status)-3].values
PERCENTAGE_LIM = 30
battery_index = len(data.status)-3
percentage = float(data.status[battery_index].values[0].value)
self.blackboard.battery_percentage = percentage
if percentage < PERCENTAGE_LIM:
self.blackboard.battery_low_warning = True
elif percentage > PERCENTAGE_LIM + 5: # To prevent ping-pong behaviour
self.blackboard.battery_low_warning = False
# Checking for gaps in the sofa to point at
# TODO: Test this
def entities_callback(self, yolo_data):
width = yolo_data.source_width
height = yolo_data.source_height
self.blackboard.entities = {}
self.blackboard.entities["picture_width"] = width
self.blackboard.entities["height"] = height
yolo_dict = yolo_data.entities
for entity in yolo_dict:
if entity.type == "person":
#TODO: Maybe need to check size of bbox so that we dont insert people from the background
#TODO: Test if this becomes nestled list for several persons
self.dict_insert_list_or_append(self.blackboard.entities, entity.type, [entity.bbox[0], entity.bbox[2]])
if entity.type == "couch":
self.blackboard.entities["couch"] = [entity.bbox[0], entity.bbox[2]]
def dialogflow_response_callback(self, data):
self.blackboard.recognized_text = data.recognized_text
if data.parameters:
"""for parameter in data.parameters:
params[parameter.key] = parameter.value"""
param = data.parameters[0]
self.blackboard.last_recognized_item = param # TODO: Test this
def face_detected_callback(self, face_msg):
print("FaceDetected callback!")
if not face_msg:
return
#print(face_msg)
# Add the person and initialize different items if not recognized since earlier
if face_msg.id not in self.blackboard.recognized_people:
self.blackboard.found_new_person = True
self.blackboard.recognized_people[face_msg.id] = {
"z_index" : face_msg.z_index,
"name" : None,
"drinks" : None,
"age" : face_msg.age,
"face_center" : [face_msg.face_center_xy[0], face_msg.face_center_xy[1]]
}
# Update the z_index of the detected person
else:
self.blackboard.recognized_people[face_msg.id]["z_index"] = face_msg.z_index
self.blackboard.recognized_people[face_msg.id]["face_center"] = [face_msg.face_center_xy[0], face_msg.face_center_xy[1]]
# Only change age if it is lower than previous, prevents age bouncing around
if face_msg.age < self.blackboard.recognized_people[face_msg.id]["age"]:
self.blackboard.recognized_people[face_msg.id]["age"] = face_msg.age
print(self.blackboard.recognized_people)
def faces_detected_callback(self, n_faces_msg):
print("FacesDetected callback!")
if not n_faces_msg:
return
#print(face_msg)
# Add the person and initialize different items if not recognized since earlier
# Currently allows for 3 people to be detected at the same time
for face_msg in n_faces_msg.faces_detected:
if face_msg.id not in self.blackboard.recognized_people:
self.blackboard.found_new_person = True
print("FOUND PERSON")
self.blackboard.recognized_people[face_msg.id] = {
"z_index" : face_msg.z_index,
"name" : None,
"drinks" : None,
"age" : face_msg.age,
"face_center" : [face_msg.face_center_xy[0], face_msg.face_center_xy[1]]
}
# Update different real time values for each person
else:
self.blackboard.recognized_people[face_msg.id]["z_index"] = face_msg.z_index
self.blackboard.recognized_people[face_msg.id]["face_center"] = [face_msg.face_center_xy[0], face_msg.face_center_xy[1]]
# Only change age if it is lower than previous, prevents age from bouncing around
if face_msg.age < self.blackboard.recognized_people[face_msg.id]["age"]:
self.blackboard.recognized_people[face_msg.id]["age"] = face_msg.age
print(self.blackboard.recognized_people)
def is_talking_cb(self, msg):
print(msg.data)
self.blackboard.is_talking = msg.data
def yes_no_cb(self, msg):
if msg.confidence > 0.45:
self.blackboard.recognized_text = msg.word
def amcl_pose_cordinates(self, msg):
self.blackboard.is_talking = msg.data
def dict_insert_list_or_append(self, adict: dict, key, val) -> None:
'''Insert a value in dict at key if one does not exist
Otherwise, convert value to list and append. We are sorting so that
all people in the sofa will be in "order" in the "person" key in entities
'''
if key in adict:
adict[key].append(val)
adict[key] = sorted(adict[key], key=lambda x: x[0], reverse= True)
else:
adict[key] = [val]
def set_all_read_blackboard(self, list):
for read_variable in list:
self.blackboard.register_key(key= read_variable, access=py_trees.common.Access.READ)
def set_all_write_blackboard(self, list):
for write_variable in list:
self.blackboard.register_key(key=write_variable, access=py_trees.common.Access.WRITE)
def initialize_bb_variables(self):
self.blackboard.head_touched = False
self.blackboard.door_open = False
self.blackboard.in_motion = False
self.blackboard.at_destination = False
self.blackboard.entities = {}
self.blackboard.point_to_pixel = [0,0]
self.blackboard.recognized_text = ""
self.blackboard.recognized_people = {}
self.blackboard.current_person = ""
self.blackboard.last_recognized_item = None
self.blackboard.found_new_person = False
self.blackboard.already_introduced_people = []
self.blackboard.say_dynamic_str = ""
self.blackboard.is_talking = False
self.blackboard.move_base_status = "Default navigation blackboard"
self.blackboard.updated = False
self.blackboard.movie_answers = {}#{"release_year": 1988, "actors": ["Philippe Noiret"], "genres": []}
def main(args=None):
rclpy.init(args=args)
ai_node = Py_Tree()
rclpy.spin(ai_node)
ai_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
from socket import timeout
from paramiko import channel, ssh_exception
from .motion_behaviours import PepperCenterHead
import py_trees
from .actions import Actions
from actionlib_msgs.msg import GoalStatusArray
#from naoqi_bridge_msgs.msg import HeadTouch
from .behaviour_copy import Behaviour #TODO: Move behaviours to here
from move_base_msgs.action import MoveBase
#TODO: Tidy this up when finished
from std_msgs.msg import String, Header, Bool
from builtin_interfaces.msg import Time
import time, random
from geometry_msgs.msg import Twist, Pose, PoseStamped
import paramiko
import sys
import yaml
import os
import time
""" Rotates the robot counter (?) clockwise.
"""
class RotateRobot(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(RotateRobot, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.actions = Actions(self.node)
def update(self):
self.actions.rotate_robot()
return py_trees.common.Status.RUNNING
"""if (Robotidle):
self.node.log.info("Rotation Succeeded")
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.RUNNING"""
""" Stops the robot
"""
class StopRobot(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(StopRobot, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.actions = Actions(self.node)
def initialise(self):
self.actions.stop_robot()
def update(self):
return py_trees.common.Status.SUCCESS
""" mocking navigation for testing
"""
class MockNavigation(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(MockNavigation, self).__init__(name)
self.node = node
def update(self):
return py_trees.common.Status.SUCCESS
rand = random.random()
if rand < 0.3:
return py_trees.common.Status.FAILURE
elif rand < 0.6:
return py_trees.common.Status.RUNNING
else:
return py_trees.common.Status.SUCCESS
""" Moves the robot forward x meters
"""
class MoveForward(py_trees.behaviour.Behaviour):
def __init__(self, name, node, x: float) -> None:
super(MoveForward, self).__init__(name)
self.node = node
self.x = x
def setup(self, **kwargs):
self.action = Actions(self.node)
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="in_motion", access=py_trees.common.Access.READ)
def initialise(self):
self.node.log.info(f"Moving forward {self.x}m")
msg = Twist()
speed = 1.0
msg.linear.x = speed
msg.linear.y = 0.0
msg.linear.z = 0.0
msg.angular.z = 0.0
self.node.cmd_vel_publisher.publish(msg)
self.start_time = time.time()
self.time_diff = self.x * 2 * speed + 1
print(self.start_time)
def update(self):
return py_trees.common.Status.SUCCESS
if float(time.time() - self.start_time) < self.time_diff:
return py_trees.common.Status.RUNNING
else:
return py_trees.common.Status.SUCCESS
"""
if (self.blackboard.in_motion):
return py_trees.common.Status.RUNNING
else:
return py_trees.common.Status.SUCCESS
"""
#TODO: Få tag i data från navigation, när är vi stilla, har vi nått slutmålet
"""if (<idle> && <in location>):
return py_trees.common.Status.SUCCESS
elif (<idle> && <not at localtion>):
return py_trees.common.Status.FAILURE
else:
return py_trees.common.Status.RUNNING"""
""" Checks using ultra sound if the door in front of the robot is open or not
"""
class IsDoorOpen(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(IsDoorOpen, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="door_open", access=py_trees.common.Access.READ)
def update(self):
if self.blackboard.door_open == True:
print("Door is open!")
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.RUNNING
""" Move the robot to a specified location
"""
class SetPose(py_trees.behaviour.Behaviour):
def __init__(self, name, pose: Pose, ssh_client) -> None:
super(SetPose, self).__init__(name)
self.pose = pose
self.ssh_client = ssh_client
def initialise(self):
print("Initialising SetPose")
position = '{{x: {0}, y: {1}, z: {2}}}'
p = position.format(self.pose.position.x, self.pose.position.y, self.pose.position.z)
orientation = '{{x: {0}, y: {1}, z: {2}, w: {3}}}'
o = orientation.format(self.pose.orientation.x, self.pose.orientation.y, self.pose.orientation.z, self.pose.orientation.w ) # Unpack the list as laid out args
pose_cmd = f"""
rostopic pub /lhw/nav/initialpose geometry_msgs/PoseWithCovarianceStamped \"header:
frame_id: 'map'
pose:
pose:
position: {p}
orientation: {o}\" --once"""
self.ssh_client.invoke_shell()
try:
print("")
stdin, stdout, stderr = self.ssh_client.exec_command("source ~/.bash_profile && " + pose_cmd, timeout = 1)#while not self.action_client.wait_for_server():
except:
print()
def update(self):
print("Setting pose...")
#Timer for next step in tree -> SetPose Doesn't need a callback (as long as stack works)
return py_trees.common.Status.SUCCESS
class MoveToPoint(py_trees.behaviour.Behaviour):
def __init__(self, name, node, pose: Pose, ssh_client) -> None:
super(MoveToPoint, self).__init__(name)
self.pose = pose
self.time = 0
self.ssh_client = ssh_client
self.node = node
self.fix_head_pub = node.fix_head_pub
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="move_base_status", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="move_base_status", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="updated", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="updated", access=py_trees.common.Access.WRITE)
self.blackboard.move_base_status = "Default Status"
def initialise(self):
print("Initialising MoveToPoint")
msg = Bool()
msg.data = True
self.fix_head_pub.publish(msg)
#Orientation
goal_orientation = '{{x: {0}, y: {1}, z: {2}, w: {3}}}'
g_o = goal_orientation.format(self.pose.orientation.x, self.pose.orientation.y, self.pose.orientation.z, self.pose.orientation.w) # Unpack the list as laid out args
print(g_o)
#Position
goal_position = '{{x: {0}, y: {1}, z: {2}}}'
g_p = goal_position.format(self.pose.position.x, self.pose.position.y, self.pose.position.z)
goal_cmd = f"rostopic pub /lhw/nav/goal_server/goal geometry_msgs/PoseStamped \"{{header: {{frame_id: 'map'}}, pose: {{position: {g_p}, orientation: {g_o} }}}}\" --once"
#print(goal_cmd)
#print("Goal cmd: {goal_cmd}")
#print(goal_cmd)
self.ssh_client.invoke_shell()
stdin, stdout, stderr = self.ssh_client.exec_command("source ~/.bash_profile && " + goal_cmd)
def update(self):
print("Listening for callback...")
#self.node.log.info(str(self.blackboard.move_base_status))
if self.blackboard.move_base_status == 3:
self.blackboard.updated = False
self.blackboard.move_base_status = 0
return py_trees.common.Status.SUCCESS
if self.blackboard.move_base_status == 4:
return py_trees.common.Status.FAILED
else:
return py_trees.common.Status.RUNNING
""" Checks if the head has been tapped
This is a blocking behaviour
"""
class IsHeadTouched(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(IsHeadTouched, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="head_touched", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="head_touched", access=py_trees.common.Access.WRITE)
def initialize(self):
#self.blackboard.head_touched = False
pass
def update(self):
if self.blackboard.head_touched == True:
self.node.log.info(f"Head has been touched")
self.blackboard.head_touched = False
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.RUNNING
""" Set head touch to false in blackboard"""
class ResetHeadTouched(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(ResetHeadTouched, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="head_touched", access=py_trees.common.Access.WRITE)
def update(self):
self.blackboard.head_touched = False
return py_trees.common.Status.SUCCESS
class ResetPixel(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(ResetPixel, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="point_to_pixel", access=py_trees.common.Access.WRITE)
def update(self):
self.blackboard.point_to_pixel = None
return py_trees.common.Status.SUCCESS
""" Standby
"""
class Standby(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(Standby, self).__init__(name)
self.node = node
def update(self):
return py_trees.common.Status.RUNNING
"""" Used to calculate where a gap in a sofa can be found so a person can sit down"""
class CalculateGapPointLocation(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(CalculateGapPointLocation, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="entities", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="point_to_pixel", access=py_trees.common.Access.WRITE)
def update(self):
# TODO: Test this
GAP_LIM = 50
if "couch" not in self.blackboard.entities or "person" not in self.blackboard.entities:
return py_trees.common.Status.RUNNING
print(self.blackboard.entities)
couch = {"left":self.blackboard.entities["couch"][0], "right": self.blackboard.entities["couch"][1]}
persons = self.blackboard.entities["person"]
spaces = []
# Calculate all gaps in the sofa
for i in range(len(persons)):
left = persons[i][0]
right = persons[i][1]
if i == 0:
spaces.append({"left": couch["left"], "right": left, "gap": couch["left"] - left})
else:
prev_right = persons[i-1][1]
spaces.append({"left": prev_right, "right": left, "gap": prev_right - right})
right = persons[len(persons)-1][1]
spaces.append({"left": right, "right": couch["right"], "gap": right - couch["right"]})
largest_gap = max(spaces, key= lambda x: x["gap"])
# Check if gap is actually big enough
if largest_gap["gap"] >= GAP_LIM:
# Gets middle location of the left and right x pixel
mid_x = (largest_gap["right"] + largest_gap["left"])/2
y = self.blackboard.entities["height"]/2 # Decide for value (if even needed)
self.blackboard.point_to_pixel = [mid_x, y]
print(self.blackboard.point_to_pixel)
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.FAILURE
"""" Used to calculate where a gap in a sofa can be found so a person can sit down"""
class PlacePerson(py_trees.behaviour.Behaviour):
def __init__(self, name, node, fin: bool) -> None:
super(PlacePerson, self).__init__(name)
self.node = node
self.fin = fin
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="recognized_people", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="current_person", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="point_to_pixel", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="say_dynamic_str", access=py_trees.common.Access.WRITE)
def initialize(self):
# Change this if resolution of image is changed
RES = 4
points = [53*RES, 158*RES, 265*RES]
self.blackboard.say_dynamic_str = "Please sit right there"
people = self.blackboard.recognized_people
if len(people) == 1:
self.blackboard.point_to_pixel = [265,150]
print(self.blackboard.point_to_pixel)
return
elif self.fin:
self.blackboard.point_to_pixel = [-1,150]
print(self.blackboard.point_to_pixel)
return
face_centers_x = [person["face_center"][0] for person in people if (person["name"] is not None and person["name"] != self.blackboard.current_person)]
face_centers_x.sort()
# List is sorted so only compare each of the same indicies until an empty spot is found
for (pixel, point) in zip(face_centers_x, points):
if abs(pixel - point) > 50:
self.blackboard.point_to_pixel = [point, 150]
return
self.blackboard.point_to_pixel = [-1,-1]
self.blackboard.say_dynamic_str = "Sorry there doesn't appear to be any available seats to you, sadly"
print(self.blackboard.point_to_pixel)
def update(self):
return py_trees.common.Status.SUCCESS
"""
Gets and saves the pixel coordinate of the first person found which has not
been introduced yet and adds them to the list of introduced people.
If such a person cannot be found, halts the tree
"""
class GetFacePixelCoordinate(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(GetFacePixelCoordinate, self).__init__(name)
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="already_introduced_people", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="recognized_people", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="current_person", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="already_introduced_people", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="point_to_pixel", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="say_dynamic_str", access=py_trees.common.Access.WRITE)
def update(self):
for id in self.blackboard.recognized_people:
name = self.blackboard.recognized_people[id]["name"]
# If name is None, that means that it's a person that we have not introduced but
# the facial recognition has found
if name is None:
continue #pass??
# This means we have found a person that we have not introduced before and that has a name
if name not in self.blackboard.already_introduced_people: # and name != self.blackboard.current_person:
# face_center will be None if the person was not in frame the last time faces were captured
if self.blackboard.recognized_people[id]["face_center"] is None:
continue
print("Face pixel: " + str(self.blackboard.recognized_people[id]["face_center"]))
x = self.blackboard.recognized_people[id]["face_center"][0]
y = self.blackboard.recognized_people[id]["face_center"][1]
#print("Face pixel for: " + name)
print([x,y])
self.blackboard.point_to_pixel = [x, y]
#self.blackboard.already_introduced_people.append(name)
#self.blackboard.say_dynamic_str = "This is " + name
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
"""
Checks if everyone that is currently saved has been introduced,
if so return SUCCESS otherwise return FAILURE
"""
# class IsEveryoneIntroduced(py_trees.behaviour.Behaviour):
# def __init__(self, name, node) -> None:
# super(IsEveryoneIntroduced, self).__init__(name)
# def setup(self, **kwargs):
# self.blackboard = py_trees.blackboard.Client(name="bb")
# self.blackboard.register_key(key="already_introduced_people", access=py_trees.common.Access.READ)
# self.blackboard.register_key(key="already_introduced_people", access=py_trees.common.Access.WRITE)
# self.blackboard.register_key(key="recognized_people", access=py_trees.common.Access.READ)
# self.blackboard.register_key(key="current_person", access=py_trees.common.Access.READ)
# self.blackboard.register_key(key="say_dynamic_str", access=py_trees.common.Access.WRITE)
# self.blackboard.say_dynamic_str = ""
# def update(self):
# # Succeed when everyone has been introduced
# print(self.blackboard.recognized_people)
# for id in self.blackboard.recognized_people:
# id_name = self.blackboard.recognized_people[id]['name']
# #if we have detected a person more than once, the name will be None for id
# if id_name is not None:
# if id_name not in self.blackboard.already_introduced_people and id_name != self.blackboard.current_person:
# return py_trees.common.Status.FAILURE
# self.blackboard.already_introduced_people = []
# return py_trees.common.Status.SUCCESS
class IsEveryoneIntroduced(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(IsEveryoneIntroduced, self).__init__(name)
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="already_introduced_people", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="already_introduced_people", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="recognized_people", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="current_person", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="say_dynamic_str", access=py_trees.common.Access.WRITE)
self.blackboard.say_dynamic_str = "This is"
def initialize(self):
string = ""
for id in self.blackboard.recognized_people:
id_name = self.blackboard.recognized_people[id]['name']
#if we have detected a person more than once, the name will be None for id
if id_name is not None:
if id_name != self.blackboard.current_person:
drink = str(self.blackboard.recognized_people[id]["drinks"])
string = string + f" {id_name} and their favorite drink is {drink}, "
print("TETSS: " + string)
print("STRING" + string)
self.blackboard.say_dynamic_str = "This is " + string
def update(self):
# Succeed when everyone has been introduced
print(self.blackboard.recognized_people)
return py_trees.common.Status.SUCCESS
"""
Checks if everyone that is currently saved has been introduced,
if so return SUCCESS otherwise return FAILURE
"""
class RepeatReceptionist(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(RepeatReceptionist, self).__init__(name)
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="recognized_people", access=py_trees.common.Access.READ)
def update(self):
n_people = 0
for person in self.blackboard.recognized_people:
if person["name"] is not None:
n_people += 1
if n_people == 3:
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.FAILURE
class IsBatteryLow(py_trees.behaviour.Behaviour):
def __init__(self, name) -> None:
super(IsBatteryLow, self).__init__(name)
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="Battery")
self.blackboard.register_key(key="low_percentage", access=py_trees.common.Access.READ)
def update(self):
if self.blackboard.low_percentage:
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.FAILURE
""" Pepper says the message"""
class Say(py_trees.behaviour.Behaviour):
def __init__(self, name, node, message: str) -> None:
super(Say, self).__init__(name)
self.node = node
self.message = message
print(message)
self.say_pub = self.node.say_publisher
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="is_talking", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="is_talking", access=py_trees.common.Access.WRITE)
def initialise(self):
msg = String()
msg.data = self.message
self.say_pub.publish(msg)
self.blackboard.is_talking = True
def update(self):
"""if self.blackboard.is_talking:
return py_trees.common.Status.RUNNING"""
return py_trees.common.Status.SUCCESS
""" Pepper says whatver is on the blackboard message variable"""
class SayDynamic(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(SayDynamic, self).__init__(name)
self.node = node
self.say_pub = self.node.say_publisher
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="say_dynamic_str", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="say_dynamic_str", access=py_trees.common.Access.WRITE)
def initialise(self):
msg = String()
msg.data = self.blackboard.say_dynamic_str
self.say_pub.publish(msg)
print("SAY DNM -------->: " + self.blackboard.say_dynamic_str)
def update(self):
return py_trees.common.Status.SUCCESS
""" Waiting for a parameter response from dialog flow"""
class WaitForItem(py_trees.behaviour.Behaviour):
def __init__(self, name, node, key: str) -> None:
super(WaitForItem, self).__init__(name)
self.node = node
self.key = key
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.READ)
def update(self):
print(self.blackboard.last_recognized_item)
if self.blackboard.last_recognized_item and self.blackboard.last_recognized_item.key == self.key:
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
""" Reset blackboard variable keeping track for last recognized item"""
class RemoveItem(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(RemoveItem, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.WRITE)
def initialise(self):
self.blackboard.last_recognized_item = None
def update(self):
self.blackboard.last_recognized_item = None
return py_trees.common.Status.SUCCESS
""" Is used to detect whether a person has been found, and is causing a wait until one has been found """
class WaitForPerson(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(WaitForPerson, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="found_new_person", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="found_new_person", access=py_trees.common.Access.WRITE)
def update(self):
if self.blackboard.found_new_person:
self.blackboard.found_new_person = False
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
""" Used to confirm whether an answer from the human was correct"""
class ConfirmSay(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(ConfirmSay, self).__init__(name)
self.node = node
self.say_pub = self.node.say_publisher # FOR TESTING
self.on_off_pub = self.node.listener_on_off_publisher
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="recognized_text", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="recognized_text", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.WRITE)
def initiate(self):
msg = Bool()
msg.data = True
self.on_off_pub.publish(msg)
def update(self):
msg = String() # TESTING
if "yes" in self.blackboard.recognized_text.lower():
msg.data = "Confirmed"
self.say_pub.publish(msg)
self.blackboard.recognized_text = ""
msg = Bool()
msg.data = False
self.on_off_pub.publish(msg)
return py_trees.common.Status.SUCCESS
elif "no" in self.blackboard.recognized_text.lower():
msg.data = "No"
self.say_pub.publish(msg)
self.blackboard.recognized_text = ""
self.blackboard.last_recognized_item = None # TODO: Is this too "hard-coded"?
msg = Bool()
msg.data = False
self.on_off_pub.publish(msg)
return py_trees.common.Status.FAILURE
return py_trees.common.Status.RUNNING
""" Used to remember a person's answer. The item is saved on the currently found face id"""
class AddItemToPerson(py_trees.behaviour.Behaviour):
def __init__(self, name, node) -> None:
super(AddItemToPerson, self).__init__(name)
self.node = node
def setup(self, **kwargs):
self.blackboard = py_trees.blackboard.Client(name="bb")
self.blackboard.register_key(key="recognized_people", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.READ)
self.blackboard.register_key(key="last_recognized_item", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="current_person", access=py_trees.common.Access.WRITE)
def initialise(self):
# Get the closest person to Pepper
closest_person = min(self.blackboard.recognized_people.values(), key=lambda x: x["z_index"])
param = self.blackboard.last_recognized_item
closest_person[param.key] = param.value
# Save the name of the current person being introduced
if param.key == "name":
self.blackboard.current_person = param.value
self.blackboard.last_recognized_item = None
# TODO: Code if multiple parameters can be passed through last_recognized_item
"""for key in params:
# In case we ever use other parameters with dialogflow
if key in ["name", "drink", "age"]:
value = params[key]
closest_person[key] = value"""
def update(self):
return py_trees.common.Status.SUCCESS
class TurnCameraOnOrOff(py_trees.behaviour.Behaviour):
def __init__(self, name, node, turn_on:bool) -> None:
super(TurnCameraOnOrOff, self).__init__(name)
self.node = node
self.turn_on = turn_on
def initialise(self):
msg = Bool()
msg.data = self.turn_on
self.node.camera_on_off_publisher.publish(msg)
def update(self):
return py_trees.common.Status.SUCCESS
class TurnMicrophoneOnOrOff(py_trees.behaviour.Behaviour):
def __init__(self, name, node, turn_on:bool) -> None:
super(TurnMicrophoneOnOrOff, self).__init__(name)
self.node = node
self.turn_on = turn_on
def initialise(self):
msg = Bool()
msg.data = self.turn_on
self.node.microphone_on_off_publisher.publish(msg)
def update(self):
return py_trees.common.Status.SUCCESS
=1;;Root
| ->;;Topics2BB
| | (Scan2BB)
| | [Cancel2BB]
| | (Battery2BB)
| ?;;Tasks
| | [Flash Red];;Battery Low?;;EternalGuard
| | ->;;Scan
| | | [Send Result]
| | | (Scan?)
| | | ?;;Scan or Die
| | | | ->;;Ere We Go
| | | | | [UnDock]
| | | | | ?;;Scan or Be Cancelled
| | | | | | ->;;Move Out and Scan
| | | | | | | [Move Home]
| | | | | | | =1;;Scanning
| | | | | | | | [Rotate]
| | | | | | | | [Flash Blue]
| | | | | | | | [Context Switch]
| | | | | | | [Move Out]
| | | | | | | [Result2BB succeeded]
| | | | | | ->;;Cancelling?
| | | | | | | (Cancel?)
| | | | | | | [Move Home]
| | | | | | | [Result2BB cancelled]
| | | | | [Dock]
| | | | | =3;;Celebrate;;Pause, Flash Green
| | | | | | [Pause]
| | | | | | [Flash Green]
| | | | ->;;Die
| | | | | =1;;Notification
| | | | | | [Pause]
| | | | | | [Flash Red]
| | | | | [Result2BB failed]
| | (Idle);;Running
\ No newline at end of file
import py_trees
root = py_trees.composites.Parallel(name="Root", policy=py_trees.common.ParallelPolicy.SuccessOnAll(synchronize=True))
topics2bb = py_trees.composites.Sequence(name="Topics2BB")
scan2bb = Scan2BB(name="Scan2BB") #CONDITION
cancel2bb = Cancel2BB(name="Cancel2BB") #ACTION
battery2bb = Battery2BB(name="Battery2BB") #CONDITION
tasks = py_trees.composites.Selector(name="Tasks")
flash_red = FlashRed(name="Flash Red") #ACTION
scan = py_trees.composites.Sequence(name="Scan")
send_result = SendResult(name="Send Result") #ACTION
scan_QUESTION = Scan_QUESTION(name="Scan?") #CONDITION
scan_or_die = py_trees.composites.Selector(name="Scan or Die")
ere_we_go = py_trees.composites.Sequence(name="Ere We Go")
undock = UnDock(name="UnDock") #ACTION
scan_or_be_cancelled = py_trees.composites.Selector(name="Scan or Be Cancelled")
move_out_and_scan = py_trees.composites.Sequence(name="Move Out and Scan")
move_home = MoveHome(name="Move Home") #ACTION
scanning = py_trees.composites.Parallel(name="Scanning", policy=py_trees.common.ParallelPolicy.SuccessOnAll(synchronize=True))
rotate = Rotate(name="Rotate") #ACTION
flash_blue = FlashBlue(name="Flash Blue") #ACTION
context_switch = ContextSwitch(name="Context Switch") #ACTION
move_out = MoveOut(name="Move Out") #ACTION
result2bb_succeeded = Result2BBsucceeded(name="Result2BB succeeded") #ACTION
cancelling_QUESTION = py_trees.composites.Sequence(name="Cancelling?")
cancel_QUESTION = Cancel_QUESTION(name="Cancel?") #CONDITION
result2bb_cancelled = Result2BBcancelled(name="Result2BB cancelled") #ACTION
dock = Dock(name="Dock") #ACTION
pause = Pause(name="Pause") #ACTION
flash_green = FlashGreen(name="Flash Green") #ACTION
celebrate = py_trees.composites.Parallel(name="Celebrate", policy=py_trees.common.ParallelPolicy.SuccessOnSelected(children=[pause, flash_green], synchronize=True))
celebrate = py_trees.composites.Parallel(name="Celebrate", policy=py_trees.common.ParallelPolicy.SuccessOnSelected(children=[pause, flash_green], synchronize=True))
die = py_trees.composites.Sequence(name="Die")
notification = py_trees.composites.Parallel(name="Notification", policy=py_trees.common.ParallelPolicy.SuccessOnAll(synchronize=True))
result2bb_failed = Result2BBfailed(name="Result2BB failed") #ACTION
idle = py_trees.behaviours.Running(name="Idle")
cancelling_QUESTION.add_children([cancel_QUESTION, move_home, result2bb_cancelled])
celebrate.add_children([pause, flash_green])
die.add_children([notification, result2bb_failed])
ere_we_go.add_children([undock, scan_or_be_cancelled, dock, celebrate])
move_home = MoveHome(name="Move Home") #ACTION
move_out_and_scan.add_children([move_home, scanning, move_out, result2bb_succeeded])
pause = Pause(name="Pause") #ACTION
notification.add_children([pause, flash_red])
root.add_children([topics2bb, tasks])
scan.add_children([send_result, scan_QUESTION, scan_or_die])
scan_or_be_cancelled.add_children([move_out_and_scan, cancelling_QUESTION])
scan_or_die.add_children([ere_we_go, die])
scanning.add_children([rotate, flash_blue, context_switch])
flash_red = FlashRed(name="Flash Red") #ACTION
tasks.add_children([flash_red, scan, idle])
topics2bb.add_children([scan2bb, cancel2bb, battery2bb])
behaviour_tree = py_trees.trees.BehaviourTree(root=root)
behaviour_tree.setup(timeout=15)
try:
behaviour_tree.tick_tock(
period_ms=500,
number_of_iterations=py_trees.trees.CONTINOUS_TICK_TOCK,
pre_tick_handler=None,
post_tick_handler=None
)
except KeyboardInterrupt:
behaviour_tree.interrupt()
->;;Sequence
| ?;;Selector
| | =1;;Default Parallel
| | | [Normal Behaviour]
| | | (Also Normal Behaviour)
| | | !(Inverted Behaviour);;Inverter Decorator Name
| | | [EternalGuard Behaviour];;EternalGuard;;Eternal Guard Decorator Name
| | | (OneShot Behaviour);;OneShot;;OneShot Decorator Name
| | | (TimeOut Behaviour);;Timeout;;Timeout Decorator Name;;5
| | | (Condition Behaviour);;Condition;;Condition Decorator Name;;Success
| | ;;Allowed Comment
| | =2;;SuccessOnOne Parallel
| | | (Failure Is Running Behaviour);;FailureIsRunning;;Failure Is Running Decorator Name
| | | [Failure Is Success Behaviour];;FailureIsSuccess;;Failure Is Success Decorator Name
| | | (Running Is Failure);;RunningIsFailure;;Running Is Failure Decorator Name
| | | [Running Is Success];;RunningIsSuccess;;Running Is Success Decorator Name
| | | [Success Is Running];;SuccessIsRunning;;Success Is Running Decorator Name
| | | (Success Is Failure);;SuccessIsFailure;;Success Is Failure Decorator Name
| ;; Parallels can have any int after '=', but any other than 2 or 3 will result in the default one being used
| =1000;;Also default Parallel
| | =3;;SuccessOnSelected Parallel Without Selected
| | | (Constant Success);;Success
| | | (Constant Failure);;Failure
| | | (Constant Running);;Running
| | | (Timer Name);;Timer;;10
| | | (Meta Name);;Meta;;function_name
| | =3;;SuccessOnSelected Parallel With Selected;;Selected Behaviour1, Selected Behaviour2
| | | [Selected Behaviour1]
| | | ->;;OneShot Idiom Name;;OneShot
| | | | ?;;OneShot Selector
| | | | | (Behaviour2)
| | | | | [Behaviour3]
| | | ?;;Eternal Guard Idiom Name;;EternalGuard
| | | | (Behaviour5)
| | | | ->;;Eternal Guard Sequence
| | | | | [Behaviour6]
| | | | | [Behaviour7]
| | | | (Selected Behaviour2)
| | | ->;;EitherOr Idiom Name;;EitherOr
| | | | ->;;EitherOr Task1 Sequence
| | | | | (Behaviour8)
| | | | | (Behaviour9)
| | | | ->;;EitherOr Task2 Sequence
| | | | | [Behaviour10]
| | | | | [Behaviour11]
| | | ->;;PUWYLO Idiom Name;;PickUpWhereYouLeftOff
| | | | ?;;PUWYLO Selector
| | | | | (Behaviour8)
| | | | | (Behaviour9)
| | | | [Behaviour10]
| | |
import py_trees
def receptionist():
root = py_trees.composites.Sequence(name="Root")
is_head_touched = IsHeadTouched(name="Is Head Touched", node=node)
wait = py_trees.timers.Timer(name="Wait", duration=5)
tell_referee_to_open_door = Say(name="Tell Referee To Open Door", node=node)
greet_person_subtree = greet_person(node=node)
tell_person_to_follow = Say(name="Tell Person To Follow", node=node)
look_down = PepperTiltHead(name="Look Down", node=node)
move_to_sofa = MoveToPoint(name="Move To Sofa", node=node)
pepper_center_head = PepperCenterHead(name="Pepper Center Head", node=node)
at_sofa_seq = py_trees.composites.Sequence(name="At Sofa Seq")
intros_wait_sel = py_trees.composites.Selector(name="Intros Wait Sel")
all_people_introduced = IsEveryoneIntroduced(name="All People Introduced", node=node)
introduce_person = py_trees.composites.Sequence(name=" Introduce person")
get_face_pixel = GetFacePixelCoordinate(name="Get Face Pixel", node=node)
point_at_person = PointAtPixel(name="Point At Person", node=node)
say_name_of_person = Say(name="Say Name Of Person", node=node)
const_fail = py_trees.behaviours.Failure(name="Const Fail")
introduce_retry = Standby(name="Introduce Retry", node=node)
empty_seat_sel = py_trees.composites.Selector(name="Empty Seat Sel")
successful_seating = py_trees.composites.Sequence(name="Successful Seating")
get_couch_gap_pixel = CalculateGapPointLocation(name="Get Couch Gap Pixel", node=node)
point_at_empty_seat = PointAtPixel(name="Point At Empty Seat", node=node)
tell_person_to_sit = Say(name="Tell Person To Sit", node=node)
unsuccessful_seating = py_trees.composites.Sequence(name="Unsuccessful Seating")
say_there_is_no_seat = Say(name="Say There Is No Seat", node=node)
move_to_door = MoveToPoint(name="Move To Door", node=node)
at_sofa_seq.add_children([intros_wait_sel, empty_seat_sel])
empty_seat_sel.add_children([successful_seating, unsuccessful_seating])
introduce_person.add_children([get_face_pixel, point_at_person, say_name_of_person, const_fail])
intros_wait_sel.add_children([all_people_introduced, introduce_person, introduce_retry])
root.add_children([is_head_touched, wait, tell_referee_to_open_door, greet_person_subtree, tell_person_to_follow, look_down, move_to_sofa, pepper_center_head, at_sofa_seq, move_to_door])
successful_seating.add_children([get_couch_gap_pixel, point_at_empty_seat, tell_person_to_sit])
unsuccessful_seating.add_child(say_there_is_no_seat)
return root
\ No newline at end of file
?;;Root
| ->;;Sequence
| | (Ghost Close)
| | ?;;3-Way-Selector
| | | ->;;Sequence2
| | | | !(Ghost Scared);;Inverter
| | | | (Power Pill Close)
| | | | [Eat Power Pill]
| | | ->;;Sequence3
| | | | (Ghost Scared)
| | | | [Chase Ghost]
| | | [Avoid Ghost]
| =1;;Parallel
| | [Eat Pills]
| | [Eat Fruit]
\ No newline at end of file
from .scenario_trees import greet_person
from .py_tree_behaviours import IsDoorOpen, IsHeadTouched, MoveToPoint, RotateRobot, Standby, StopRobot, Say, TurnCameraOnOrOff, TurnMicrophoneOnOrOff, WaitForPerson, GetFacePixelCoordinate
from .motion_behaviours import *
from .behaviour_copy import Behaviour
import py_trees
from lhw_interfaces.msg import RobotActionGoal
RESOLUTION_MULTIPLIER = 4
class Py_Tree_Tests():
def __init__(self, py_trees_node):
self.py_trees_node = py_trees_node
"""Test for rotating and stopping the robot. This works"""
def rotate_and_stop(self) -> py_trees.composites.Sequence:
timer = py_trees.timers.Timer(name="Timer", duration=2)
rotate_robot = RotateRobot(name="Rotate Robot", node=self.py_trees_node)
timer2 = py_trees.timers.Timer(name="Timer2", duration=10)
stop_robot = StopRobot(name="Stop Robot", node=self.py_trees_node)
timer3 = py_trees.timers.Timer(name="Timer3", duration=10)
root = py_trees.composites.Sequence(name="rotate sequence", children= [timer, rotate_robot, timer2, stop_robot, timer3])
return root
"""
For testing repeating behaviour with resetting of blackboard variables
"""
def very_random_test(self):
greet_person_subtree = py_trees.composites.Sequence(name="Greet Person Subtree")
greet = Say(name="Greet", node=self.py_trees_node, message="Hello!")
name_wait_sel = py_trees.composites.Selector(name="Name Wait Sel")
get_name_seq = py_trees.composites.Sequence(name="Get Name Seq")
ask_for_name = Say(name="Ask For Name", node=self.py_trees_node, message="Whats your name?")
head_touched = IsHeadTouched(name="Head Touched", node=self.py_trees_node)
is_door_open = IsDoorOpen(name="Is Door Open", node=self.py_trees_node)
complete = Say(name="Complete", node=self.py_trees_node, message="Complete")
ask_name_again_seq = py_trees.composites.Sequence(name="Ask Name Again Seq")
ask_name_again = Say(name="Ask Name Again", node=self.py_trees_node, message="I'm sorry I didnt catch that")
#reset_head_touched = ResetHeadTouched(name="Reset Head Touched", node=node)
fail = py_trees.behaviours.Failure(name="Failure")
name_retry = Standby(name="Name Retry", node=self.py_trees_node)
ask_name_again_seq.add_children([ask_name_again, fail])
get_name_seq.add_children([ask_for_name, head_touched, is_door_open, complete])
greet_person_subtree.add_children([greet, name_wait_sel])
name_wait_sel.add_children([get_name_seq, ask_name_again_seq, name_retry])
return greet_person_subtree
""" To help test the navstack """
def navigation_test(self):
behaviour = Behaviour(self)
move_to_safety_point = MoveToPoint(name="Move To Safety Point", node=self.py_trees_node, pose=behaviour.position_to_pose({"x": 1.0,"y": 0.0,"z": 0.0}))
standby2 = Standby(name="Standby", node=self.py_trees_node)
return py_trees.composites.Sequence(name="Nav Test Seq", children=[move_to_safety_point, standby2])
def test_pointing_straight_forward(self):
action = RobotAction()
action.type = RobotAction.POINT_ABS_STRAIGHT
action.right_arm = False
pepper_point_straight_l = PepperAction(name="Point with left", node=self.py_trees_node, action=action)
turn_idle_off = SetIdle(name="go idle", node=self.py_trees_node, go_idle=False)
action2 = RobotAction()
action2.type = RobotAction.POINT_ABS_STRAIGHT
action2.right_arm = False
pepper_point_straight_r = PepperAction(name="Point with right", node=self.py_trees_node, action=action2)
return py_trees.composites.Sequence(name="Pointing Test", children=[turn_idle_off, py_trees.timers.Timer(name="Timer2", duration=500)])
"""In development (WIP)"""
def point_at_face_test(self):
#greet_person_subtree = greet_person(self.py_trees_node)
find_person = WaitForPerson(name="Find person", node=self.py_trees_node)
#getPixel = GetFacePixelCoordinate(name="getPixel", node=self.py_trees_node)
pointAt = PointAtPixel(name="Point At", node=self.py_trees_node)
timer = py_trees.timers.Timer(name="Timer", duration=5)
#reset_pixel = ResetPixel(name="Reset", node=self)
# greet_person_subtree, getPixel,
return py_trees.composites.Sequence(name="POINT", children=[pointAt, timer])
"""In development (WIP)"""
def better_point_at_face_test(self):
center_head = PepperCenterHead(name="Center", node=self.py_trees_node)
#greet_person_subtree = greet_person(self.py_trees_node)
find_person = WaitForPerson(name="Find person", node=self.py_trees_node)
getPixel = GetFacePixelCoordinate(name="getPixel", node=self.py_trees_node)
pointAt = PointAtPixel(name="Point At", node=self.py_trees_node)
timer = py_trees.timers.Timer(name="Timer", duration=5)
#reset_pixel = ResetPixel(name="Reset", node=self)
# greet_person_subtree, getPixel,
return py_trees.composites.Sequence(name="POINT", children=[center_head, getPixel, pointAt, timer])
def test_pepper_security(self):
rotate_robot = RotateRobot(name="Rotate Robot", node=self.py_trees_node)
timer = py_trees.timers.Timer(name="Timer2", duration=20)
stop_robot = StopRobot(name="Stop Robot", node=self.py_trees_node)
safety_off = PepperSetSafety(name="Safety Off", node=self.py_trees_node, safety_on = False)
return py_trees.composites.Sequence(name="ROTATE", children=[safety_off, rotate_robot, timer, stop_robot])
def pepper_nod(self):
timer1 = py_trees.timers.Timer(name="Timer1", duration=2)
timer2 = py_trees.timers.Timer(name="Timer2", duration=2)
tilt_up = PepperTiltHead(name="Tilt Up", node=self.py_trees_node, tilt_up=True)
tilt_down = PepperTiltHead(name="Tilt Down", node=self.py_trees_node, tilt_up=False)
return py_trees.composites.Sequence(name="Head Bob", children=[tilt_down, timer1, tilt_up, timer2])
def test_touch_head_and_say(self):
timer = py_trees.timers.Timer(name="Timer1", duration=5)
head_touched = IsHeadTouched(name="Head Touched", node=self.py_trees_node)
speak = Say(name="speak", node=self.py_trees_node, message="Hello World")
return py_trees.composites.Sequence(name="TEST", children=[head_touched, speak, timer])
def test_turn_on_and_off_camera(self):
timer = py_trees.timers.Timer(name="Timer1", duration=10)
timer2 = py_trees.timers.Timer(name="Timer1", duration=10)
return py_trees.composites.Sequence(name="TEST", children=[TurnCameraOnOrOff("camera_off",self.py_trees_node, False), timer, TurnCameraOnOrOff("camera_on",self.py_trees_node, True), timer2 ])
def test_turn_on_and_off_mic(self):
timer = py_trees.timers.Timer(name="Timer1", duration=10)
timer2 = py_trees.timers.Timer(name="Timer1", duration=10)
return py_trees.composites.Sequence(name="TEST", children=[TurnMicrophoneOnOrOff("mic_off",self.py_trees_node, False), timer, TurnMicrophoneOnOrOff("mic_on",self.py_trees_node, True), timer2 ])
\ No newline at end of file
from paramiko.util import log_to_file
import py_trees
import operator
from .py_tree_behaviours import *
from .motion_behaviours import *
from lhw_intelligence.behaviour_copy import Behaviour
from .tablet_behaviours import *
from . import action_clients
from . import behaviours
""" Py_tree for the safety check procedure
"""
def safety_check_tree(node, config: dict, ssh):
behaviour = Behaviour(node)
starting_point = behaviour.position_to_pose(config["starting_point"])
waiting_point = behaviour.position_to_pose(config["waiting_point"])
safety_point = behaviour.position_to_pose(config["safety_point"])
exit_pose = behaviour.position_to_pose(config["exit"])
move_forward = MoveForward(name="Forward", node=node, x=2.0)
show_speech_beginning = TabletShowFIALogo(name="show_speech_beginning", node=node)
say_door_open = Say(name="test say", node=node, message="Checking that door is open")
set_initial_pose = SetPose(name = "Set Starting Point", pose = starting_point, ssh_client=ssh)
is_door_open = IsDoorOpen(name="Is Door Open", node=node)
move_to_waiting_point = MoveToPoint(name="Move to Point",node=node, pose=waiting_point, ssh_client=ssh)
#move_to_start = MockNavigation(name="Move to point", node=node)
wait = py_trees.timers.Timer(name="Wait", duration=2)
move_to_safety_point = MoveToPoint(name="Move to Point", node=node,pose=safety_point, ssh_client=ssh)
head_touch = IsHeadTouched(name="head touched", node=node)
#move_to_safety_point= MockNavigation(name="Move to point", node=node)
say_ready_for_safety_check = Say(name="safety check say", node=node, message="I am ready for the safety check. Press the button when it's done.")
is_button_clicked = TabletQuestion(name="Confirm continue", node=node, text="Press button to continue", options=["this is the button"])
button_clicked_retry = Standby(name="Head Touched Retry", node=node)
show_speech_end = TabletShowFIALogo(name="show_speech_end", node=node)
move_to_exit = MoveToPoint(name="Move to Point", node=node,pose=exit_pose, ssh_client=ssh)
#move_to_exit = MockNavigation(name="Move to point", node=node)
standby = Standby(name="Standby", node=node)
button_clicked_selector_wait = py_trees.composites.Selector(
name="Head Touched Selector Wait",
children=[is_button_clicked, button_clicked_retry]
)
safety_check_sequence = py_trees.composites.Sequence(
name="Safety Check Sequence",
children=[show_speech_beginning,say_door_open, is_door_open, move_forward, move_to_safety_point, wait, say_ready_for_safety_check, button_clicked_selector_wait,show_speech_end, move_to_waiting_point , move_to_exit]
)
return py_trees.composites.Sequence(name="Standby Sequence", children=[safety_check_sequence, standby])
#return py_trees.composites.Sequence(name="Standby Sequence", children=[set_initial_pose])
""" Py_tree for the safety check procedure
"""
def enter_arena(node):
#tell_referee_to_open_door = Say(name="Open The Door", node=node, message="Please open the door for me")
is_door_open = IsDoorOpen(name="Is Door Open", node=node)
move_forward = MoveForward(name="Move Forward", node=node, x=1.0)
timer = py_trees.timers.Timer(name="Wait", duration=5)
#rotate = RotateRobot(name="Rotate", node=node) # TODO: Add radian
standby = Standby(name="Standby", node=node)
enter_arena_sequence = py_trees.composites.Sequence(
name="Enter Arena Sequence",
children=[is_door_open, move_forward, timer]
)
return py_trees.composites.Sequence(name="Standby Sequence", children=[enter_arena_sequence, standby])
"""
Behaviour tree to greet new people and get what their name and favourite drink is.
Required running nodes:
- face_analysis node & deepface server
- Dialogflow
- Naoqi_driver & animated speech & microphone
Reads/Writes to blackboard variables:
- last_recognized_item
- recognized_people
- recognized_text
"""
def greet_person(node, repeat_message="Okay, can you repeat that?"):
start = Say(name="Start", node=node, message="Hello! Please bend down and look into my eyes so I can identify you?")
find_person = WaitForPerson(name="Find person", node=node)
turn_off_camera = TurnCameraOnOrOff(name="Camera Off1", node=node, turn_on=False)
greet = Say(name="Greet", node=node, message="Welcome!")
ask_for_name = Say(name="Ask For Name", node=node, message="What's your name?")
wait_for_name = WaitForItem(name="Wait For Name", node=node, key="name") #TabletInput(name="Tablet Name Input", node=node, text="Please input your name", type="name")
#say_name_confirm = Say(name="test1", node=node, message="If this is your name say yes. If you want to retype your answer say no.")
confirm_name = ConfirmSay(name="Confirm Name", node=node)
add_name = AddItemToPerson(name="Add Name", node=node)
ask_name_again = Say(name="Ask Name Again", node=node, message=repeat_message)
fail_name = py_trees.behaviours.Failure(name='Name Failure')
name_retry = Standby(name="Name Retry", node=node)
ask_for_favorite_drink = Say(name="Ask For Favorite Drink", node=node, message="What's your favorite drink?")
wait_for_drink = WaitForItem(name="Wait For Drink", node=node, key="drinks") #TabletInput(name="Tablet Name Input", node=node, text="Please input your favorite drink", type="drinks")
#say_drink_confirm = Say(name="test1", node=node, message="If this is your favourite drink say yes. If you want to retype your answer say no.")
confirm_drink = ConfirmSay(name="Confirm Drink", node=node)
add_drink = AddItemToPerson(name="Add Drink", node=node)
ask_drink_again = Say(name="Ask Drink Again", node=node,message=repeat_message)
fail_drink = py_trees.behaviours.Failure(name='Drink Failure')
drink_retry = Standby(name="Drink Retry", node=node)
completed_greet = Say(name="Complete", node=node, message="Thank you! That should be all I need to know.")
turn_on_camera= TurnCameraOnOrOff(name="Camera On1", node=node, turn_on=True)
get_name_seq = py_trees.composites.Sequence(
name="Get Name Seq",
children=[ask_for_name, wait_for_name, add_name] #say_name_confirm, confirm_name,
)
ask_name_again_seq = py_trees.composites.Sequence(
name="Ask Name Again Seq",
children=[ask_name_again, fail_name]
)
name_wait_sel = py_trees.composites.Selector(
name="Name Wait Sel",
children=[get_name_seq, ask_name_again_seq, name_retry]
)
get_drink_seq = py_trees.composites.Sequence(
name="Get Drink Seq",
children=[ask_for_favorite_drink, wait_for_drink, add_drink] #say_drink_confirm, confirm_drink,
)
ask_drink_again_seq = py_trees.composites.Sequence(
name="Ask Drink Again Seq",
children=[ask_drink_again, fail_drink]
)
drink_wait_sel = py_trees.composites.Selector(
name="Drink Wait Sel",
children=[get_drink_seq, ask_drink_again_seq, drink_retry]
)
return py_trees.composites.Sequence(
name="Greet Person Subtree",
#children=[start, find_person, greet, name_wait_sel, drink_wait_sel, completed_greet])
children=[start, find_person, greet, name_wait_sel, drink_wait_sel, completed_greet])
def receptionist(node, config: dict, ssh):
behaviour = Behaviour(node)
sofa = behaviour.position_to_pose(config["sofa"])
door = behaviour.position_to_pose(config["door"])
starting_point = behaviour.position_to_pose(config["receptionist_starting_point"])
loading_tablet = TabletLoading(name="loading tablet", node=node)
set_initial_pose = SetPose(name = "Set Starting Point", pose = starting_point, ssh_client=ssh)
wait = py_trees.timers.Timer(name="Wait", duration=2)
is_button_clicked = TabletQuestion(name="Confirm continue", node=node, text="Press button to continue", options=["this is the button"])
speech = TabletSpeech(name="Speech", node=node)
start_move_to_door = MoveToPoint(name="Move To Door At Start", node=node, pose=door, ssh_client=ssh)
"""
either_or = py_trees.idioms.either_or(
name="EitherOr",
conditions=[
py_trees.common.ComparisonExpression("head_touched", False, operator.eq),
py_trees.common.ComparisonExpression("head_touched", True, operator.eq)],
subtrees=[?, ?]
)"""
initialize_scenario = py_trees.composites.Sequence(
name="Initialize Scenario",
children=[set_initial_pose,loading_tablet, wait,is_button_clicked, speech]
)
loop1 = _receptionist_repeating(node, sofa, door, ssh)
loop2 = _receptionist_repeating(node, sofa, door, ssh)
loop3 = _receptionist_repeating(node, sofa, door, ssh, True)
end_say = Say(name="End", node=node, message="Thank you for attending my party, bye")
ask_referee_to_identify = Say(name="Identify referee", node=node, message="Can the host please stand in front of me.")
timer = py_trees.timers.Timer(name="Wait", duration=2)
timer2 = py_trees.timers.Timer(name="Wait", duration=2)
ask_referee_to_sit = Say(name="Sit mfker", node=node, message="Please sit back down")
stop = py_trees.behaviours.Running(name="Const Running")
return py_trees.composites.Sequence(
name="receptionist tree",
children=[initialize_scenario, ask_referee_to_identify, timer, greet_person(node), ask_referee_to_sit, timer2, start_move_to_door, loop1, loop2, loop3, end_say, stop]#, move_to_door]#, move_to_sofa, at_sofa_seq, move_to_door]
)
def _receptionist_repeating(node, sofa, door, ssh, fin=False):
tell_referee_to_open_door = Say(name="Tell Referee To Open Door", node=node, message="Please step through the door")
greet_person_subtree = greet_person(node=node)
tell_person_to_follow = Say(name="Tell Person To Follow", node=node, message="Please come with me")
move_to_sofa = MoveToPoint(name="Move to Point",node=node, pose=sofa, ssh_client=ssh)
all_people_introduced = IsEveryoneIntroduced(name="All People Introduced", node=node)
get_face_pixel = GetFacePixelCoordinate(name="Get Face Pixel", node=node)
point_at_pixel = PointAt(name="Point At Pixel", node=node, key="point_at_person_goal")
# Motion Action
point_at_person = action_clients.FromBlackboard(
name="Point At Person",
action_name="robot_action",
action_type=RobotAction,
key="point_at_person_goal",
node=node
)
say_name_of_person = SayDynamic(name="Say Name Of Person", node=node)
point_quit_both_goal = PointQuit(
name="Point Quit Both Goal",
key="point_quit_both",
arm = RobotActionGoal.ARM_BOTH,
node=node
)
point_quit_both_action = action_clients.FromBlackboard(
name="Point Quit Both Action",
action_name="robot_action",
action_type=RobotAction,
key="point_quit_both",
node=node
)
const_fail = py_trees.behaviours.Failure(name="Const Fail")
introduce_retry = Standby(name="Introduce Retry", node=node)
#get_couch_gap_pixel = CalculateGapPointLocation(name="Get Couch Gap Pixel", node=node)
place_person = PlacePerson(name="Place Person", node=node, fin=fin)
pixel = [0, 0]
prepare_point_goal = behaviours.ToBlackboard(
name="PreparePointGoal",
key="point_to_pixel",
variable=pixel,
node=node
)
point_at_empty_seat = PointAt(name="Point At Empty Seat", node=node, key="point_at_empty_seat_goal")
# Motion Action
point_at_person = action_clients.FromBlackboard(
name="Point At Face",
action_name="robot_action",
action_type=RobotAction,
key="point_at_empty_seat_goal",
node=node
)
tell_person_to_sit = SayDynamic(name="Tell Person To Sit", node=node)
pause3 = py_trees.timers.Timer("Pause5", duration=3.0)
pause10 = py_trees.timers.Timer("Pause10", duration=10.0)
point_quit_both_goal_2 = PointQuit(
name="Point Quit Both Goal 2",
key="point_quit_both_2",
arm = RobotActionGoal.ARM_BOTH,
node=node
)
point_quit_both_action_2 = action_clients.FromBlackboard(
name="Point Quit Both Action 2",
action_name="robot_action",
action_type=RobotAction,
key="point_quit_both_2",
node=node
)
point_at_person_2 = action_clients.FromBlackboard(
name="Point At Empty Seat",
action_name="robot_action",
action_type=RobotAction,
key="point_at_empty_seat_goal",
node=node
)
#say_there_is_no_seat = Say(name="Say There Is No Seat", node=node, message="There is no seat for you")
move_to_door = MoveToPoint(name="Move To Door",node=node,pose=door, ssh_client=ssh)
"""receptionist_sel = py_trees.composites.Selector(
name="Receptionist pytree",
children=[tell_referee_to_open_door, greet_person_subtree, tell_person_to_follow, move_to_sofa, at_sofa_seq, move_to_door]
)"""
# introduce_person = py_trees.composites.Sequence(
# name=" Introduce person",
# children=[get_face_pixel, point_at_pixel, say_name_of_person, point_quit_both_goal, point_quit_both_action, const_fail]
# )
# unsuccessful_seating = py_trees.composites.Sequence(
# name="Unsuccessful Seating",
# children=[say_there_is_no_seat]
# )
timer = py_trees.timers.Timer(name="Wait", duration=5)
random_say = Say(name="DSAD", node=node, message="Please sit here")
successful_seating = py_trees.composites.Sequence(
name="Successful Seating",
children=[place_person, point_at_empty_seat, random_say, point_at_person_2, pause3, point_quit_both_goal_2, point_quit_both_action_2, timer]#[get_couch_gap_pixel, point_at_empty_seat, tell_person_to_sit]
)
# empty_seat_sel = py_trees.composites.Selector(
# name="Empty Seat Sel",
# children=[successful_seating, unsuccessful_seating]
# )
intros_wait_sel = py_trees.composites.Sequence(
name="Intros Wait Sel",
children=[Say(name="<bläsad", node=node, message="Let me introduce you to everyone"), all_people_introduced, SayDynamic(name="Say all people", node=node)]
)
at_sofa_seq = py_trees.composites.Sequence(
name="At Sofa Seq",
children=[intros_wait_sel, successful_seating]
)
return py_trees.composites.Sequence(
name="receptionist tree",
children=[tell_referee_to_open_door, greet_person_subtree, tell_person_to_follow, move_to_sofa, at_sofa_seq, move_to_door]#, move_to_door]#, move_to_sofa, at_sofa_seq, move_to_door]
)
#return py_trees.composites.Sequence(name="receptionist tree", children=[TabletInput(name="guhjk", node=node, text="TYPE your name:", type="name")])
\ No newline at end of file
->;;Greet Person Subtree
| [Find Person];;WaitForPerson
| [Greet];;Say
| ?;;Name Wait Sel
| | ->;;Get Name Seq
| | | [Ask For Name];;Say
| | | [Wait For Name];;WaitForItem
| | | [Confirm Name];;Confirm Say
| | | [Add Name];;AddItemToPerson
| | ->;;Ask Name Again Seq
| | | [Ask Name Again];;Say
| | | [Fail Name];;failure
| | [Name Retry];;Standby
;;| ?;;Age Wait Sel
;;| | ->;;Get Age Seq
;;| | | [Ask For Age];;Say
;;| | | [Wait For Age];;WaitForItem
;;| | | [Confirm Age];;Confirm Say
;;| | | [Add age];;AddItemToPerson
;;| | [Ask Age Again];;Say
;;| | [Age Retry];;Standby
| ?;;Drink Wait Sel
| | ->;;Get Drink Seq
| | | [Ask For Favorite Drink];;Say
| | | [Wait For Drink];;WaitForItem
| | | [Confirm Drink];;Confirm Say
| | | [Add Drink];;AddItemToPerson
| | ->;;Ask Drink Again Seq
| | | [Ask Drink Again];;Say
| | | [Fail Drink];;Failure
| | [Drink Retry];;Standby
| [Completed Greet];;Say
| (Greet Complete Delay);;Timer;;1
\ No newline at end of file
->;;Root
| (Is Head Touched)
| (Wait);;Timer;;5
| [Tell Referee To Open Door];;Say
| (Greet Person Subtree);;Subtree
| [Tell Person To Follow];;Say
| [Look Down];;Pepper Tilt Head
| [Move To Sofa];;Move To Point
| [Pepper Center Head]
| ->;;At Sofa Seq
| | ?;;Intros Wait Sel
| | | (All People Introduced);;IsEveryoneIntroduced
| | | ->;; Introduce person
| | | | [Get Face Pixel];;GetFacePixelCoordinate
| | | | [Point At Person];;PointAtPixel
| | | | [Say Name Of Person];;SayDynamic
| | | | [Const Fail];;Failure
| | | (Introduce Retry);;Standby
| | ?;;Empty Seat Sel
| | | ->;;Successful Seating
| | | | [Get Couch Gap Pixel];;CalculateGapPointLocation
| | | | [Point At Empty Seat];;PointAtPixel
| | | | [Tell Person To Sit];;Say
| | | ->;;Unsuccessful Seating
| | | | [Say There Is No Seat];;Say
| |
| [Look Down];;Pepper Tilt Head
| [Move To Door];;Move To Point
| [Pepper Center Head]
;; Say to touch the head
;; Send text to tablet
\ No newline at end of file
->;;Root
| (Is Head Touched)
| (Wait);;Timer;;5
| [Tell Referee To Open Door];;Say
| ->;;Greet Person Seq
| | [Find Person]
| | [Greet];;Say
| | ?;;Name Wait Sel
| | | ->;;Get Name Seq
| | | | [Ask For Name];;Say
| | | | [Wait For Name];;WaitForItem
| | | | [Confirm Name];;Confirm Say
| | | | [Add Name];;AddItemToPerson
| | | [Ask Name Again];;Say
| | | [Name Retry];;Standby
| | ?;;Age Wait Sel
| | | ->;;Get Age Seq
| | | | [Ask For Age];;Say
| | | | [Wait For Age];;WaitForItem
| | | | [Confirm Age];;Confirm Say
| | | | [Add age];;AddItemToPerson
| | | [Ask Age Again];;Say
| | | [Age Retry];;Standby
| | ?;;Drink Wait Sel
| | | ->;;Get Drink Seq
| | | | [Ask For Favorite Drink];;Say
| | | | [Wait For Drink];;WaitForItem
| | | | [Confirm Drink];;Confirm Say
| | | | [Add Drink];;AddItemToPerson
| | | [Ask Drink Again];;Say
| | | [Drink Retry];;Standby
| | [Tell Person To Follow];;Say
| [Look Down];;Pepper Tilt Head
| [Move To Sofa];;Move To Point
| [Pepper Center Head]
| ->;;At Sofa Seq
| | ?;;Intros Wait Sel
| | | ->;;Intro Seq
| | | | [Introduce new person];;Introduce
| | | | [Point At Person Introduced];;Point At
| | | | [Increase Introduced]
| | | | (Wait);;Timer;;5
| | | | (Check If Introduced everyone);;Introduced All
| | | (Introduce Retry);;Standby
| | [Point At Empty Seat];;Point At
| | [Tell Person To Sit];;Say
| [Look Down];;Pepper Tilt Head
| [Move To Door];;Move To Point
| [Pepper Center Head]
->;;Root
| ?;;Preempt?
| | ->;;Battery Low
| | | (Is Battery Low)
| | | [TEST3]
| | ->;;SEQ
| | | [Scenario Tree]
| | | (TEST)
| | | (TEST2)
\ No newline at end of file
?;;Safety Check
| ->;;Procedure
| | (Is Door Open)
| | [Move Forward]
| | (Wait);;Timer;;2
| | [Look Down];;Pepper Tilt Head
| | [Move To Safety Point];;Move To Point
| | [Center Head];;Pepper Center Head
| | ?;;Head Touched Sel
| | | (Is Head Touched)
| | | [Head Touched Retry];;Standby
| | [Look Down];;Pepper Tilt Head
| | [Move To Exit];;Move To Point
| [Standby]
\ No newline at end of file