Skip to content
Snippets Groups Projects
Commit 8263ad2b authored by Lucas Lindahl's avatar Lucas Lindahl
Browse files

Working navigation and slowing down when being close to humans.

parent 5bbda65f
No related branches found
No related tags found
No related merge requests found
import air_navigation
from nav2_msgs.action import NavigateToPose
import math
from air_simple_sim_msgs.msg import SemanticObservation
import rclpy
......@@ -7,13 +6,10 @@ import rclpy
import rclpy.executors
import threading
import time
from rclpy.action import ActionClient
from geometry_msgs.msg import PoseStamped, Point, Pose, PointStamped
from sensor_msgs.msg import PointCloud2, PointField
import sensor_msgs_py.point_cloud2 as pc2
from std_msgs.msg import Header
from geometry_msgs.msg import Point32
from builtin_interfaces.msg import Time
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
import tf2_geometry_msgs
class TestController(air_navigation.Controller):
def __init__(self):
......@@ -25,7 +21,7 @@ class TestController(air_navigation.Controller):
pass
self.node = rclpy.create_node("test_controller_node")
self.node.get_logger().info("Created node inside TestController")
self.node.get_logger().info("Created node inside TestController")
self.executor = rclpy.executors.MultiThreadedExecutor()
self.executor.add_node(self.node)
......@@ -40,14 +36,12 @@ class TestController(air_navigation.Controller):
self.current_speed_limit = self.desired_linear_vel
self.limit_is_percentage = False
self.original_goal_pose = None
self.detouring = False
self.detour_goal_active = False
self.original_goal_sent = False
self.detour_plan = []
self.human_positions = {}
# TF2 listener setup
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self.node)
def configure(self, name):
self.node.create_subscription(
SemanticObservation,
......@@ -55,24 +49,20 @@ class TestController(air_navigation.Controller):
self.semantic_callback,
10
)
self.node.get_logger().info("Subscribed to /semantic_sensor")
self.node.get_logger().info("Subscribed to /semantic_sensor")
def semantic_callback(self, msg):
if msg.klass != "human":
return
msg.point.header.stamp = Time()
try:
uuid = msg.uuid
now = time.time()
pos = msg.point.point
self.human_positions[uuid] = (pos, now)
transformed_point = self.tf_buffer.transform(msg.point, "map", timeout=rclpy.duration.Duration(seconds=1.0))
self.node.get_logger().info(f'Transformed point: {transformed_point.point}')
self.human_positions[msg.uuid] = (transformed_point.point, time.time())
except Exception as e:
self.node.get_logger().error(f"Exception in semantic_callback: {e}")
#self.node.get_logger().info(f'DICTIONARY OF HUMANS: {self.human_positions}')
self.node.get_logger().warn(f"TF transform failed: {e}")
def get_closest_human_distance(self, robot_pose):
now = time.time()
......@@ -88,111 +78,29 @@ class TestController(air_navigation.Controller):
min_dist = dist
closest_human = uuid
return min_dist, closest_human
'''
def send_temp_goal(self, x, y):
if not hasattr(self, 'action_client'):
self.action_client = ActionClient(self.node, NavigateToPose, 'navigate_to_pose')
if not self.action_client.wait_for_server(timeout_sec=2.0):
self.node.get_logger().warn("🛑 Goal client not available.")
return
if abs(y) > 10: # assuming map height is ~20 meters
self.node.get_logger().warn(f"⚠️ Detour Y position {y:.2f} may be out of bounds!")
return
goal_msg = NavigateToPose.Goal()
goal_msg.pose.header.frame_id = "map"
goal_msg.pose.header.stamp = self.node.get_clock().now().to_msg()
goal_msg.pose.pose.position.x = x
goal_msg.pose.pose.position.y = y
goal_msg.pose.pose.orientation.z = 0.0
goal_msg.pose.pose.orientation.w = 1.0
self.detouring = True
self.detour_goal_active = True
self.original_goal_sent = False
def done_callback(future):
self.detouring = False
self.detour_goal_active = False
self.node.get_logger().info("✅ Detour completed.")
if not self.original_goal_sent:
self.original_goal_sent = True
restore_msg = NavigateToPose.Goal()
restore_msg.pose = self.original_goal_pose
self.node.get_logger().info("🎯 Resending original goal")
self.action_client.send_goal_async(restore_msg)
self.action_client.send_goal_async(goal_msg).add_done_callback(done_callback)
'''
def setPlan(self, path):
'''
MAIN PROBLEMS:
1. How do we set a detour goal
2. How do we know when detour is finished
3. How do we set back the original goal after detour is completed
'''
self.global_plan = path.poses
'''
if not self.detouring:
self.original_goal_pose = path.poses[-1]
if path.poses:
current_pose = path.poses[0]
dist, human_id = self.get_closest_human_distance(current_pose)
if dist < 0.5 and human_id:
human_pos, _ = self.human_positions[human_id]
dx = human_pos.x - current_pose.pose.position.x
dy = human_pos.y - current_pose.pose.position.y
heading = math.atan2(dy, dx)
angle = heading + math.pi / 2
detour_x = current_pose.pose.position.x + 0.5 * math.cos(angle)
detour_y = current_pose.pose.position.y + 0.5 * math.sin(angle)
self.send_temp_goal(detour_x, detour_y)
'''
def computeVelocityCommands(self, pose, velocity, goal_checker):
self.lookahead_dist = 0.2 if self.detouring else 0.5
if not self.global_plan:
print("⚠️ No path available")
print("No path available")
return air_navigation.TwistStamped()
dist, human_id = self.get_closest_human_distance(pose)
if dist < 1.0:
#self.node.get_logger().warn("Human really close nearby!!")
self.node.get_logger().warn("Human really close nearby!!")
self.current_speed_limit = self.max_linear_vel * 0.2
elif dist < 1.5:
#self.node.get_logger().warn("Human nearby!!")
self.node.get_logger().warn("Human nearby!!")
self.current_speed_limit = self.max_linear_vel * 0.35
else:
self.current_speed_limit = self.desired_linear_vel
'''
if not self.detour_goal_active:
if dist < 0.8:
human_pos, seen_time = self.human_positions[human_id]
dx = human_pos.x - pose.pose.position.x
dy = human_pos.y - pose.pose.position.y
dist = math.hypot(dx, dy)
self.node.get_logger().info("moving away from human")
self.node.get_logger().info(f"robot x pos:{pose.pose.position.x}, robot y pos:{pose.pose.position.x}")
self.node.get_logger().info(f"human x pos:{human_pos.x}, human y pos:{human_pos.y}")
heading = math.atan2(dy, dx)
angle = heading + math.pi / 2 # detour 90° to the side
detour_x = pose.pose.position.x + 0.5 * math.cos(angle)
detour_y = pose.pose.position.y + 0.5 * math.sin(angle)
self.send_temp_goal(detour_x, detour_y)
#return air_navigation.TwistStamped()
'''
# Find target position
target = None
dist = None
......
No preview for this file type
......@@ -225,7 +225,6 @@ local_costmap:
plugin: "nav2_costmap_2d::InflationLayer"
cost_scaling_factor: 3.0
inflation_radius: 0.2
voxel_layer:
plugin: "nav2_costmap_2d::VoxelLayer"
enabled: True
......@@ -278,11 +277,11 @@ global_costmap:
clearing: True
marking: True
data_type: "LaserScan"
raytrace_max_range: 20.0
raytrace_max_range: 3.0
raytrace_min_range: 0.0
obstacle_max_range: 20.0
obstacle_max_range: 3.0
obstacle_min_range: 0.0
voxel_layer:
voxel_layer: # Voexel layer added to add costmap layers around humans
plugin: "nav2_costmap_2d::VoxelLayer"
enabled: true
topic: /humans_as_obstacles
......@@ -296,7 +295,7 @@ global_costmap:
marking: True
data_type: PointCloud2
observation_persistence: 1.0
obstacle_range: 100.0
obstacle_range: 100.0 # Maybe change these to use time instead
raytrace_range: 100.0
z_voxels: 1
voxel_size: 0.05
......@@ -305,9 +304,7 @@ global_costmap:
max_obstacle_height: 2.0
static_layer:
plugin: "nav2_costmap_2d::StaticLayer"
map_topic: /map
subscribe_to_updates: True
# map_subscribe_transient_local: True
map_subscribe_transient_local: True
inflation_layer:
plugin: "nav2_costmap_2d::InflationLayer"
cost_scaling_factor: 9.0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment