#! /usr/bin/env python
"""
.. module:: controller
:platform: Unix
:synopsis: the controller python script in topological_map_robot_control package
.. moduleauthor:: Mohammad Reza Haji Hosseini <mrhhosseini75@gmail.com>
Brief explanation of this node:
Set client pose to anm.SERVER_SET_POSE service and use execute callback to
construct the feedback and loop for each set of points.
Service:
SET_POSE to set postion of robot to 'robot_state' node
"""
import rospy
import sys
sys.path.append('/root/ros_ws/src/tmrc')
# Import constant name defined to structure the architecture.
from utilities import architecture_name_mapper as anm
# Import the ActionServer implementation used.
# This is required to pass the `Action_controller` type for instantiating the `SimpleActionServer`.
from actionlib import SimpleActionServer
# Import custom message, actions and services.
from tmrc.msg import ControlFeedback, ControlResult
from tmrc.srv import SetPose
import tmrc
# A tag for identifying logs producer.
LOG_TAG = anm.NODE_CONTROLLER
[docs]class ControllingAction(object):
"""
This class has:
1. An SimpleActionServer to simulate motion controlling.
2. Given a plan as a set of via points, it simulate the movements to reach each point.
3. This server updatesthe current robot position stored in the `robot-state` node.
"""
def __init__(self):
# Instantiate and start the action server based on the `SimpleActionServer` class.
self._as = SimpleActionServer(anm.ACTION_CONTROLLER,
tmrc.msg.ControlAction,
execute_cb=self.execute_callback,
auto_start=False)
self._as.start()
[docs] def execute_callback(self, goal):
"""
This function does:
1. The callback invoked when a client set a goal to the `controller` server.
2. Requires a list of via points (i.e., the plan), and it simulate a movement through each point.
3. The related robot position is updated in the `robot-state` node.
Arg:
goal(float): [x,y] coordinates of next destination of robot
"""
# Construct the feedback and loop for each via point.
feedback = ControlFeedback()
rospy.loginfo(anm.tag_log('Server is controlling...', LOG_TAG))
for point in goal.via_points:
# Check that the client did not cancel this service.
if self._as.is_preempt_requested():
rospy.loginfo(anm.tag_log('Service has been cancelled by the client!', LOG_TAG))
# Actually cancel this service.
self._as.set_preempted()
return
# Publish a feedback to the client to simulate that a via point has been reached.
feedback.reached_point = point
self._as.publish_feedback(feedback)
# Set the new current position into the `robot-state` node.
_set_pose_client(point)
# Log current robot position.
log_msg = f'Reaching point ({point.x}, {point.y}).'
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
# Publish the results to the client.
result = ControlResult()
result.reached_point = feedback.reached_point
rospy.loginfo(anm.tag_log('Motion control successes.', LOG_TAG))
self._as.set_succeeded(result)
return # Succeeded.
def _set_pose_client(pose):
"""
A simple function to update the current robot `pose` stored in the `robot-state` node.
This method is performed for each point provided in the action's server feedback.
Arg:
pose(float): [x,y] coordinates of current position
"""
# Eventually, wait for the server to be initialised.
rospy.wait_for_service(anm.SERVER_SET_POSE)
try:
# Log service call.
log_msg = f'Set current robot position to the `{anm.SERVER_SET_POSE}` node.'
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
# Call the service and set the current robot position.
service = rospy.ServiceProxy(anm.SERVER_SET_POSE, SetPose)
service(pose) # The `response` is not used.
except rospy.ServiceException as e:
log_msg = f'Server cannot set current robot position: {e}'
rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
if __name__ == '__main__':
# Initialise the node, its action server, and wait.
rospy.init_node(anm.NODE_CONTROLLER, log_level=rospy.INFO)
server = ControllingAction()
rospy.spin()