Source code for scripts.finite_state_machine

#!/usr/bin/env python
"""
.. module:: finit_state_machine
    :platform: Unix
    :synopsis: the finite_state_machine python script in topological_map_robot_control package
    
.. moduleauthor:: Mohammad Reza Haji Hosseini <mrhhosseini75@gmail.com>

Brief explanation of this node:
    1. Add individuals to classes (ROOM, DOOR, ROBOT)
    2. Disjoint individuals (ROOM, DOOR)
    3. Assign DOORs to ROOMs 
    4. Use last visited times method for update time visit
    5. Add robot in map and set first pose
    6. Use urgency threshold method for visit all the rooms
    7. Add battery lvl of robot and use charge and discharge method
    8. Get pose client and get location of robot to get current pose of robot
    9. Update ontological map
    10. Moves robot between different ROOMs
    11. Run smach viewer to show current sate of robot

Publishe :
    /target_point to 'planner_client' node 

Service :
    GET_POSE to get robot pose from 'robot_state' node
"""
import rospy
import smach
import smach_ros
import time
from os.path import dirname, realpath
from std_msgs.msg import String
# Import custom message, actions and services.
from tmrc.msg import PlanGoal
from tmrc.srv import GetPose
import tmrc  
import sys
# Import constant name defined to structure the architecture.
sys.path.append('/root/ros_ws/src/tmrc')
from utilities import architecture_name_mapper as anm
from utilities.armor_client import ArmorClient



# A tag for identifying logs producer.
LOG_TAG = anm.NODE_FINITE_STATE_MACHINE

# Define global variabiles
robot = ""
client = None
is_in = " "
prev_time = " "
target_room = " "

[docs]def add_individuals_to_classes(): """Function to add all individuals like ROOMs, DOORs, and ROBOT into the class of topological map""" # Add indiviuals to class --> ROOM client.manipulation.add_ind_to_class("E", "ROOM") client.manipulation.add_ind_to_class("C1", "ROOM") client.manipulation.add_ind_to_class("C2", "ROOM") client.manipulation.add_ind_to_class("R1", "ROOM") client.manipulation.add_ind_to_class("R2", "ROOM") client.manipulation.add_ind_to_class("R3", "ROOM") client.manipulation.add_ind_to_class("R4", "ROOM") #Add indiviuals to class --> DOOR client.manipulation.add_ind_to_class("D1", "DOOR") client.manipulation.add_ind_to_class("D2", "DOOR") client.manipulation.add_ind_to_class("D3", "DOOR") client.manipulation.add_ind_to_class("D4", "DOOR") client.manipulation.add_ind_to_class("D5", "DOOR") client.manipulation.add_ind_to_class("D6", "DOOR") client.manipulation.add_ind_to_class("D7", "DOOR") #Add indiviuals to class --> ROBOT client.manipulation.add_ind_to_class("Robot", "ROBOT")
[docs]def disjoint_individuals(): """Function to disjoint individuals like ROOMs and DOORs in topological map""" client.manipulation.disj_inds_of_class("ROOM") client.manipulation.disj_inds_of_class("DOOR")
[docs]def assign_doors_to_rooms(): """Function to assign DOORs to ROOMs in topological map""" client.manipulation.add_objectprop_to_ind("hasDoor", "R1", "D1") client.manipulation.add_objectprop_to_ind("hasDoor", "R2", "D2") client.manipulation.add_objectprop_to_ind("hasDoor", "R3", "D3") client.manipulation.add_objectprop_to_ind("hasDoor", "R4", "D4") client.manipulation.add_objectprop_to_ind("hasDoor", "R4", "D4") client.manipulation.add_objectprop_to_ind("hasDoor", "E", "D6") client.manipulation.add_objectprop_to_ind("hasDoor", "E", "D7") client.manipulation.add_objectprop_to_ind("hasDoor", "C1", "D1") client.manipulation.add_objectprop_to_ind("hasDoor", "C1", "D2") client.manipulation.add_objectprop_to_ind("hasDoor", "C1", "D5") client.manipulation.add_objectprop_to_ind("hasDoor", "C1", "D7") client.manipulation.add_objectprop_to_ind("hasDoor", "C2", "D3") client.manipulation.add_objectprop_to_ind("hasDoor", "C2", "D4") client.manipulation.add_objectprop_to_ind("hasDoor", "C2", "D5") client.manipulation.add_objectprop_to_ind("hasDoor", "C2", "D6")
[docs]def add_last_visit_times(): """Function to update the last time the robot was there""" # Time now = rospy.get_rostime() client.manipulation.add_dataprop_to_ind("visitedAt", "R1", "Int", str(now.secs - 70)) client.manipulation.add_dataprop_to_ind("visitedAt", "R2", "Int", str(now.secs - 50)) client.manipulation.add_dataprop_to_ind("visitedAt", "R3", "Int", str(now.secs - 30)) client.manipulation.add_dataprop_to_ind("visitedAt", "R4", "Int", str(now.secs - 20)) client.manipulation.add_dataprop_to_ind("visitedAt", "E", "Int", str(now.secs)) client.manipulation.add_dataprop_to_ind("visitedAt", "C1", "Int", str(now.secs - 60)) client.manipulation.add_dataprop_to_ind("visitedAt", "C2", "Int", str(now.secs - 40))
[docs]def add_robot(): """Function to add robot in topological map""" client.manipulation.add_objectprop_to_ind("isIn", "Robot", "E") # Time now = rospy.get_rostime() client.manipulation.add_dataprop_to_ind("now", "Robot", "Long", str(now.secs))
[docs]def add_urgency_threshold(): """Function to creat an urgency threshold in case more than 7 secs have passed since the last time the robot was there """ client.manipulation.add_dataprop_to_ind("urgencyThreshold", "Robot", "Int", "7")
[docs]def add_battery_level(): """Function to create battery level for robot, battery level of robot is 20 percent.""" client.manipulation.add_dataprop_to_ind("batteryLvl", "Robot", "Int", "20")
[docs]def charge_battery(): """Function to charge battery of robot. If battery level of robot is equal to 20 function stops.""" prev_battery_lvl = cut_dataprop_list(client.query.dataprop_b2_ind("batteryLvl", "Robot"))[0] if int(prev_battery_lvl) + 10 < 20: battery_lvl = int(prev_battery_lvl) + 5 else: battery_lvl = 20 client.manipulation.replace_dataprop_b2_ind("batteryLvl", "Robot", "Int", str(battery_lvl), prev_battery_lvl)
[docs]def consume_battery(): """Function to consume battery of robot when robot move between ROOMs.""" prev_battery_lvl = cut_dataprop_list(client.query.dataprop_b2_ind("batteryLvl", "Robot"))[0] battery_lvl = int(prev_battery_lvl) - 1 client.manipulation.replace_dataprop_b2_ind("batteryLvl", "Robot", "Int", str(battery_lvl), prev_battery_lvl)
[docs]def get_pose_client(): """ Function to retrieve the current robot pose by the `state/get_pose` server of the `robot-state` node. """ # Eventually, wait for the server to be initialised. rospy.wait_for_service(anm.SERVER_GET_POSE) try: # Call the service and get a response with the current robot position. service = rospy.ServiceProxy(anm.SERVER_GET_POSE, GetPose) response = service() pose = response.pose # Log service response. log_msg = f'Retrieving current robot position from the `{anm.NODE_ROBOT_STATE}` node as: ({pose.x}, {pose.y}).' rospy.loginfo(anm.tag_log(log_msg, LOG_TAG)) return pose except rospy.ServiceException as e: log_msg = f'Server cannot get current robot position: {e}' rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
[docs]def get_location(): """ Function to detect robot current location and replace new location with previous location. Return: is_in(string): current location of robot """ global is_in now = rospy.get_rostime() position = get_pose_client() if position.y <= 2: is_in = "E" prev_time = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "E"))[0] client.manipulation.replace_dataprop_b2_ind("visitedAt", "E", "Int", str(now.secs), prev_time) elif position.x <= 2 and position.y > 2 and position.y <= 6: is_in = "R1" prev_time = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "R1"))[0] client.manipulation.replace_dataprop_b2_ind("visitedAt", "R1", "Int", str(now.secs), prev_time) elif position.x <= 2 and position.y > 6: is_in = "R2" prev_time = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "R2"))[0] client.manipulation.replace_dataprop_b2_ind("visitedAt", "R2", "Int", str(now.secs), prev_time) elif position.x > 2 and position.x <= 5 and position.y > 2: is_in = "C1" prev_time = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "C1"))[0] client.manipulation.replace_dataprop_b2_ind("visitedAt", "C1", "Int", str(now.secs), prev_time) elif position.x > 5 and position.x <= 8 and position.y > 2: is_in = "C2" prev_time = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "C2"))[0] client.manipulation.replace_dataprop_b2_ind("visitedAt", "C2", "Int", str(now.secs), prev_time) elif position.x > 8 and position.y > 2 and position.y <= 6: is_in = "R3" prev_time = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "R3"))[0] client.manipulation.replace_dataprop_b2_ind("visitedAt", "R3", "Int", str(now.secs), prev_time) elif position.x > 8 and position.y > 6: is_in = "R4" prev_time = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "R4"))[0] client.manipulation.replace_dataprop_b2_ind("visitedAt", "R4", "Int", str(now.secs), prev_time) return is_in
[docs]def update_ontology(): """ Function to update topological map and sync the reasoner. If pass more than 7 secs since the last time robot was there, the location is added to the list of URGENT locations to visit. If battery level of robot less than 7, the robot first has to go to ROOM E to charge. Return: target_room(string): next destination to vist """ client.utils.sync_buffered_reasoner() loc = get_location() print("Battery level: ", cut_dataprop_list(client.query.dataprop_b2_ind("batteryLvl", "Robot"))[0]) print("Current location: ", loc) print("Urgent locations: ", client.query.ind_b2_class("URGENT")) now = rospy.get_rostime() urgency_threshold = cut_dataprop_list(client.query.dataprop_b2_ind("urgencyThreshold", "Robot"))[0] visitedAt_E = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "E"))[0] visitedAt_R1 = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "R1"))[0] visitedAt_R2 = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "R2"))[0] visitedAt_R3 = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "R3"))[0] visitedAt_R4 = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "R4"))[0] visitedAt_C1 = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "C1"))[0] visitedAt_C2 = cut_dataprop_list(client.query.dataprop_b2_ind("visitedAt", "C2"))[0] visitedAt_dict = {visitedAt_R1: "R1", visitedAt_R2: "R2", visitedAt_R3: "R3", visitedAt_R4: "R4", visitedAt_C1: "C1", visitedAt_C2: "C2", visitedAt_E: "E"} visitedAt_dict = dict(sorted(visitedAt_dict.items())) room_list = list(visitedAt_dict.values()) target_room = room_list[0] print("VisitedAt times: ", visitedAt_dict) print("Target room: ", target_room) if now.secs - int(visitedAt_E) > int(urgency_threshold): client.manipulation.add_ind_to_class("E", "URGENT") else: client.manipulation.remove_ind_from_class("E", "URGENT") if now.secs - int(visitedAt_R1) > int(urgency_threshold): client.manipulation.add_ind_to_class("R1", "URGENT") else: client.manipulation.remove_ind_from_class("R1", "URGENT") if now.secs - int(visitedAt_R2) > int(urgency_threshold): client.manipulation.add_ind_to_class("R2", "URGENT") else: client.manipulation.remove_ind_from_class("R2", "URGENT") if now.secs - int(visitedAt_R3) > int(urgency_threshold): client.manipulation.add_ind_to_class("R3", "URGENT") else: client.manipulation.remove_ind_from_class("R3", "URGENT") if now.secs - int(visitedAt_R4) > int(urgency_threshold): client.manipulation.add_ind_to_class("R4", "URGENT") else: client.manipulation.remove_ind_from_class("R4", "URGENT") if now.secs - int(visitedAt_C1) > int(urgency_threshold): client.manipulation.add_ind_to_class("C1", "URGENT") else: client.manipulation.remove_ind_from_class("C1", "URGENT") if now.secs - int(visitedAt_C2) > int(urgency_threshold): client.manipulation.add_ind_to_class("C2", "URGENT") else: client.manipulation.remove_ind_from_class("C2", "URGENT") battery_lvl = cut_dataprop_list(client.query.dataprop_b2_ind("batteryLvl", "Robot"))[0] if int(battery_lvl) > int(urgency_threshold): return target_room else: return "E"
[docs]def move_robot_to_roomE(): """Function to move robot to ROOM E""" move_robot(3.5, 1.0)
[docs]def move_robot_to_room1(): """Function to move robot to ROOM 1""" move_robot(1.0, 4.5)
[docs]def move_robot_to_room2(): """Function to move robot to ROOM 2""" move_robot(1.0, 7.5)
[docs]def move_robot_to_room3(): """Function to move robot to ROOM 3""" move_robot(9.0, 4.5)
[docs]def move_robot_to_room4(): """Function to move robot to ROOM 4""" move_robot(9.0, 7.5)
[docs]def move_robot_to_corridor1(): """Function to move robot to CORRIDOR 1""" move_robot(3.0, 4.5)
[docs]def move_robot_to_corridor2(): """Function to move robot to CORRIDOR 2""" move_robot(6.5, 4.5)
[docs]def move_to_room(room): """ Function to move to target room Arg: room(string): target room """ rospy.loginfo("[RDST] Received request for robot to move to " + room) if room == 'E' : move_robot_to_roomE() elif room == 'R1' : move_robot_to_room1() elif room == 'R2' : move_robot_to_room2() elif room == 'R3' : move_robot_to_room3() elif room == 'R4' : move_robot_to_room4() elif room == 'C1' : move_robot_to_corridor1() elif room == 'C2' : move_robot_to_corridor2()
[docs]def move_robot(x, y): """ Function to move the robot to specific coordinate Args: x(float): x-coordinate of the target room y(float): y-coordinate of the target room """ goal = PlanGoal() goal.target.x = x goal.target.y = y pub.publish(goal) rospy.loginfo("[RDST] Request sento to planner server")
[docs]def cut_dataprop(data_prop): """ Very simple function that cut the data prop from a string received from armor. Return: data_prop(float) """ start = 0 end = data_prop.rfind('^') - 2 data_prop = data_prop[(start+1) : end] return data_prop
[docs]def cut_dataprop_list(data_prop_list): """ Very simple function that cut the data prop from a list of strings received from armor. Return: data_prop(float) """ for i in range(len(data_prop_list)): data_prop_list[i] = cut_dataprop(data_prop_list[i]) return data_prop_list
[docs]class RoomE(smach.State): """Class to define state Room E """ def __init__(self): """Initialisation function, it should not wait""" smach.State.__init__(self, outcomes=['D6','D7', 'stay'], )
[docs] def execute(self, userdata): """Function called when exiting from the node, it can be blacking""" time.sleep(5) target_room = update_ontology() if target_room == "C1" or target_room == "R1" or target_room == "R2": move_to_room('C1') consume_battery() return 'D7' elif target_room == "C2" or target_room == "R3" or target_room == "R4": move_to_room('C2') consume_battery() return 'D6' else: charge_battery() return 'stay'
[docs]class Room1(smach.State): """Class to define state Room 1 """ def __init__(self): """initialisation function, it should not wait""" smach.State.__init__(self, outcomes=['D1', 'stay'], )
[docs] def execute(self, userdata): """Function called when exiting from the node, it can be blacking""" time.sleep(5) target_room = update_ontology() if target_room != "R1": move_to_room('C1') consume_battery() return 'D1' else: return 'stay'
[docs]class Room2(smach.State): """Class to define state Room 2 """ def __init__(self): """Initialisation function, it should not wait""" smach.State.__init__(self, outcomes=['D2', 'stay'], )
[docs] def execute(self, userdata): """Function called when exiting from the node, it can be blacking""" time.sleep(5) target_room = update_ontology() if target_room != "R2": move_to_room('C1') consume_battery() return 'D2' else: return 'stay'
[docs]class Room3(smach.State): """Class to define state Room 3 """ def __init__(self): """Initialisation function, it should not wait""" smach.State.__init__(self, outcomes=['D3', 'stay'], )
[docs] def execute(self, userdata): """Function called when exiting from the node, it can be blacking""" time.sleep(5) target_room = update_ontology() if target_room != "R3": move_to_room('C2') consume_battery() return 'D3' else: return 'stay'
[docs]class Room4(smach.State): """Class to define state Room 4 """ def __init__(self): """Initialisation function, it should not wait""" smach.State.__init__(self, outcomes=['D4', 'stay'], )
[docs] def execute(self, userdata): """Function called when exiting from the node, it can be blacking""" time.sleep(5) target_room = update_ontology() if target_room != "R4": move_to_room('C2') consume_battery() return 'D4' else: return 'stay'
[docs]class Corridor1(smach.State): """Class to define state Corridor 1 """ def __init__(self): """Initialisation function, it should not wait""" smach.State.__init__(self, outcomes=['D1','D2','D5','D7','stay'], ) self.sensor_input = 0
[docs] def execute(self, userdata): """Function called when exiting from the node, it can be blacking""" time.sleep(5) target_room = update_ontology() if target_room == "R1": move_to_room('R1') consume_battery() return 'D1' elif target_room == "R2": move_to_room('R2') consume_battery() return 'D2' elif target_room == "C2" or target_room == "R3" or target_room == "R4": move_to_room('C2') consume_battery() return 'D5' elif target_room == "E": move_to_room('E') consume_battery() return 'D7' else: return 'stay'
[docs]class Corridor2(smach.State): """Class to define state Corridor 2 """ def __init__(self): """Initialisation function, it should not wait""" smach.State.__init__(self, outcomes=['D3','D4','D5','D6','stay'], )
[docs] def execute(self, userdata): """Function called when exiting from the node, it can be blacking""" time.sleep(5) target_room = update_ontology() if target_room == "R3": move_to_room('R3') consume_battery() return 'D3' elif target_room == "R4": move_to_room('R4') consume_battery() return 'D4' elif target_room == "C1" or target_room == "R1" or target_room == "R2": move_to_room('C1') consume_battery() return 'D5' elif target_room == "E": move_to_room('E') consume_battery() return 'D6' else: return 'stay'
[docs]def SMACH_state_machine(): """Function to create a SMACH state machine and start the introspection server for visualization and execute the state machine.""" sm = smach.StateMachine(outcomes=[]) sm.userdata.sm_counter = 0 # Open the container with sm: # Add states to the container smach.StateMachine.add('E', RoomE(), transitions={'D7':'C1', 'D6':'C2', 'stay':'E'}) smach.StateMachine.add('C1', Corridor1(), transitions={'D7':'E', 'D1':'R1', 'D2':'R2', 'D5':'C2', 'stay':'C1'}) smach.StateMachine.add('C2', Corridor2(), transitions={'D6':'E', 'D3':'R3', 'D4':'R4', 'D5':'C1', 'stay':'C2'}) smach.StateMachine.add('R1', Room1(), transitions={'D1':'C1', 'stay':'R1'}) smach.StateMachine.add('R2', Room2(), transitions={'D2':'C1', 'stay':'R2'}) smach.StateMachine.add('R3', Room3(), transitions={'D3':'C2', 'stay':'R3'}) smach.StateMachine.add('R4', Room4(), transitions={'D4':'C2', 'stay':'R4'}) #Create and start the introspection server for visualization sis = smach_ros.IntrospectionServer('server_name', sm, 'START') sis.start() #Execute the state machine outcome = sm.execute() #Wait for ctrl-c to stop the application rospy.spin() sis.stop()
if __name__ == '__main__': #Initialise this node rospy.init_node(anm.NODE_FINITE_STATE_MACHINE, log_level=rospy.INFO) #Publishes to: /target_point to 'planner_client' node pub = rospy.Publisher('/target_point', tmrc.msg.PlanGoal, queue_size=10) #Build topological map path = dirname(dirname(realpath(__file__))) path = path + "/ontology/" client = ArmorClient(robot, "ontoRef") client.utils.load_ref_from_file(path + 'robot-ontology.owl', '', True, 'PELLET',False,False) client.utils.set_log_to_terminal(True) # Call functions defined above add_individuals_to_classes() disjoint_individuals() assign_doors_to_rooms() add_last_visit_times() add_robot() add_urgency_threshold() add_battery_level() # Sync the reasoner client.utils.sync_buffered_reasoner() SMACH_state_machine()