diff --git a/air_navigation_examples/__init__.py b/air_navigation_examples/__init__.py index 54cfea66336aa09d97ae8cd73f641df1659189ac..cc16d0c4ebc8a00eb04c7fa374eb827c5719de16 100644 --- a/air_navigation_examples/__init__.py +++ b/air_navigation_examples/__init__.py @@ -1,71 +1,93 @@ import air_navigation import sys -from air_project_interfaces import Humans -import rclpy -from rclpy.node import Node +from nav2_msgs.action import NavigateToPose import math +from std_msgs.msg import Float64 class TestController(air_navigation.Controller): def __init__(self): - print("β TestController constructor START") super().__init__() - print("β TestController constructor END") - - # Use the controller's internal ROS 2 node methods - self.create_subscription( - Humans, - '/humans', - self.humans_callback, - 10 - ) - - self.closest_human_distance = float('inf') - print("β Subscribed to /humans using internal node") - - def humans_callback(self, msg): - min_dist = float('inf') - for human in msg.humans: - dist = math.sqrt(human.position.x**2 + human.position.y**2) - min_dist = min(min_dist, dist) - self.closest_human_distance = min_dist - print(f"π₯ Closest human at {min_dist:.2f} meters") + self.global_plan = [] + self.lookahead_dist = 0.5 + self.desired_linear_vel = 0.5 + self.max_angular_vel = 1.0 + self.current_speed_limit = None + self.limit_is_percentage = False def configure(self, name): - print("TestController configure") + self.get_logger().info("β configure") def cleanup(self): - print("TestController cleanup") + self.get_logger().info("β cleanup") def activate(self): - print("TestController activate") + self.get_logger().info("β activate") def deactivate(self): - print("TestController deactivate") + self.get_logger().info("β deactivate") - def computeVelocityCommands(self, pose, velocity, goal): - print("TestController computeVelocityCommands:", pose, velocity, goal) + def setPlan(self, path): + self.global_plan = path.poses + self.get_logger().info(f"πΊοΈ setPlan: received {len(self.global_plan)} poses") - base_speed = 1.0 - slow_down_radius = 2.0 - if self.closest_human_distance < slow_down_radius: - scale = self.closest_human_distance / slow_down_radius - speed = max(0.1, base_speed * scale) - else: - speed = base_speed + def computeVelocityCommands(self, pose, velocity, goal_checker): + if not hasattr(self, 'global_plan') or not self.global_plan: + print("β οΈ No path available") + return air_navigation.TwistStamped() + + # Choose the first pose ahead of the robot as the goal (simplified) + lookahead_dist = 0.5 + target = None + dist = None + for p in self.global_plan: + dx = p.pose.position.x - pose.pose.position.x + dy = p.pose.position.y - pose.pose.position.y + dist = math.hypot(dx, dy) + if dist > lookahead_dist: + target = p.pose.position + break + + if not target: + print("β Goal reached or no valid point ahead.") + cmd = air_navigation.TwistStamped() + cmd.twist.linear.x = 0.0 + cmd.twist.angular.z = 0.0 + return cmd + + + # Compute angle to target + dx = target.x - pose.pose.position.x + dy = target.y - pose.pose.position.y + angle_to_target = math.atan2(dy, dx) + + # Get robot yaw + q = pose.pose.orientation + yaw = math.atan2(2*(q.w*q.z + q.x*q.y), 1 - 2*(q.y*q.y + q.z*q.z)) + + # Compute angular error + angle_diff = angle_to_target - yaw + angle_diff = math.atan2(math.sin(angle_diff), math.cos(angle_diff)) # Normalize + + # Speed + speed = self.desired_linear_vel + if self.current_speed_limit is not None: + if self.limit_is_percentage: + speed *= self.current_speed_limit # Expected to be 0.0β1.0 + else: + speed = min(speed, self.current_speed_limit) + + # Command + cmd = air_navigation.TwistStamped() + cmd.twist.linear.x = speed + cmd.twist.angular.z = angle_diff * 3 + return cmd - print(f"π Adjusted speed: {speed:.2f}") + def setSpeedLimit(self, speed_limit, percentage): + self.current_speed_limit = speed_limit + self.limit_is_percentage = percentage + self.get_logger().info(f"β Speed limit: {speed_limit:.2f} m/s, percentage={percentage}") - t = air_navigation.TwistStamped() - t.twist.linear.x = speed - t.twist.angular.z = 0.0 - return t - def setPlan(self, path): - print("TestController setPlan:", path) - - def setSpeedLimit(self, speed_limit, percentage): - print("TestController setSpeedLimit:", speed_limit, percentage) - class TrajectoryCritic(air_navigation.TrajectoryCritic): def __init__(self): super().__init__() diff --git a/air_navigation_examples/__pycache__/__init__.cpython-310.pyc b/air_navigation_examples/__pycache__/__init__.cpython-310.pyc index 45985231f184d670ec1eb4ef30a8dd2186978fdf..da1416d6c745262778741ec6d1a3fe56730f6c19 100644 Binary files a/air_navigation_examples/__pycache__/__init__.cpython-310.pyc and b/air_navigation_examples/__pycache__/__init__.cpython-310.pyc differ