Source code for scripts.planner

#! /usr/bin/env python
"""
.. module:: planner
    :platform: Unix
    :synopsis: the controller_ python script in topological_map_robot_control package
    
.. moduleauthor:: Mohammad Reza Haji Hosseini <mrhhosseini75@gmail.com>

Brief explanation of this node:
    Get client pose to anm.SERVER_GET_POSE service and use execute callback to 
    construct the feedback and loop for each set of points.

Service:
    GET_POSE to get position from 'robot_state' node

Param:
        Get PLANNER_POINTS
        Get ENVIRONMENT_SIZE 
"""
import rospy
import sys
sys.path.append('/root/ros_ws/src/tmrc')
# Import constant name defined to structure the architecture.
from utilities import architecture_name_mapper as anm
# Import the ActionServer implementation used.
# This is required to pass the `PlanAction` type for instantiating the `SimpleActionServer`.
from actionlib import SimpleActionServer
# Import custom message, actions and services.
from tmrc.msg import Point, PlanFeedback, PlanResult
from tmrc.srv import GetPose
import tmrc  

# A tag for identifying logs producer.
LOG_TAG = anm.NODE_PLANNER

[docs]class PlaningAction(object): """ This class has: 1. An action server to simulate motion planning. 2. Given a target position, it retrieve the current robot position from the `robot-state` node, and return a plan as a set of via points. """ def __init__(self): # Get based parameters used by this server self._plan_points = rospy.get_param(anm.PARAM_PLANNER_POINTS, [2, 8]) self._environment_size = rospy.get_param(anm.PARAM_ENVIRONMENT_SIZE) # Instantiate and start the action server based on the `SimpleActionServer` class. self._as = SimpleActionServer(anm.ACTION_PLANNER, tmrc.msg.PlanAction, execute_cb=self.execute_callback, auto_start=False) self._as.start() # Log information. log_msg = (f'`{anm.ACTION_PLANNER}` Action Server initialised. It will create path with a number of point ' f'spanning in [{self._plan_points[0]}, {self._plan_points[1]}). Each point will be generated ')
[docs] def execute_callback(self, goal): """ This function does: 1. The callback invoked when a client set a goal to the `planner` server. 2. Will return a list of points (i.e., the plan) when the fist point is the current robot position (retrieved from the `robot-state` node), while the last point is the `goal` position (given as input parameter). Args: goal (float): target position of robot """ # Get the input parameters to compute the plan, i.e., the start (or current) and target positions. start_point = _get_pose_client() target_point = goal.target # Initialise the `feedback` with the starting point of the plan. feedback = PlanFeedback() feedback.via_points = [] feedback.via_points.append(start_point) feedback.via_points.append(target_point) # Publish the feedback and wait to simulate computation. self._as.publish_feedback(feedback) log_msg = f'Server is planning ...' rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) # Publish the results to the client. result = PlanResult() result.via_points = feedback.via_points self._as.set_succeeded(result) log_msg = 'Motion plan succeeded with plan: ' log_msg += ''.join('(' + str(point.x) + ', ' + str(point.y) + '), ' for point in result.via_points) rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
def _get_pose_client(): """ This function retrieve the current robot pose by the `state/get_pose` server of the `robot-state` node. Returns: msg: pose of client """ # Eventually, wait for the server to be initialised. rospy.wait_for_service(anm.SERVER_GET_POSE) try: # Call the service and get a response with the current robot position. service = rospy.ServiceProxy(anm.SERVER_GET_POSE, GetPose) response = service() pose = response.pose # Log service response. log_msg = f'Retrieving current robot position from the `{anm.NODE_ROBOT_STATE}` node as: ({pose.x}, {pose.y}).' rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) return pose except rospy.ServiceException as e: log_msg = f'Server cannot get current robot position: {e}' rospy.logerr(anm.tag_log(log_msg, LOG_TAG)) if __name__ == '__main__': # Initialise the node, its action server, and wait. rospy.init_node(anm.NODE_PLANNER, log_level=rospy.INFO) server = PlaningAction() rospy.spin()