Source code for openai_ros.task_envs.sumit_xl.sumit_xl_room

import rospy
import numpy
from gym import spaces
from openai_ros.robot_envs import sumitxl_env
from gym.envs.registration import register
from geometry_msgs.msg import Vector3
from geometry_msgs.msg import Point
from tf.transformations import euler_from_quaternion


# The path is __init__.py of openai_ros, where we import the SumitXlMazeEnv directly
timestep_limit_per_episode = 10000 # Can be any Value

register(
        id='SumitXlRoom-v0',
        entry_point='openai_ros:task_envs.sumit_xl.sumit_xl_room.SumitXlRoom',
        timestep_limit=timestep_limit_per_episode,
    )

[docs]class SumitXlRoom(sumitxl_env.SumitXlEnv):
[docs] def __init__(self): """ This Task Env is designed for having the sumit_xl in the sumit_xl world closed room with columns. It will learn how to move around without crashing. """ # Only variable needed to be set here number_actions = rospy.get_param('/sumit_xl/n_actions') self.action_space = spaces.Discrete(number_actions) # We set the reward range, which is not compulsory but here we do it. self.reward_range = (-numpy.inf, numpy.inf) # Actions and Observations self.linear_forward_speed = rospy.get_param('/sumit_xl/linear_forward_speed') self.linear_turn_speed = rospy.get_param('/sumit_xl/linear_turn_speed') self.angular_speed = rospy.get_param('/sumit_xl/angular_speed') self.init_linear_forward_speed = rospy.get_param('/sumit_xl/init_linear_forward_speed') self.init_linear_turn_speed = rospy.get_param('/sumit_xl/init_linear_turn_speed') self.new_ranges = rospy.get_param('/sumit_xl/new_ranges') self.min_range = rospy.get_param('/sumit_xl/min_range') self.max_laser_value = rospy.get_param('/sumit_xl/max_laser_value') self.min_laser_value = rospy.get_param('/sumit_xl/min_laser_value') self.max_linear_aceleration = rospy.get_param('/sumit_xl/max_linear_aceleration') self.max_distance = rospy.get_param('/sumit_xl/max_distance') # Get Desired Point to Get self.desired_point = Point() self.desired_point.x = rospy.get_param("/sumit_xl/desired_pose/x") self.desired_point.y = rospy.get_param("/sumit_xl/desired_pose/y") self.desired_point.z = rospy.get_param("/sumit_xl/desired_pose/z") # We create the arrays for the laser readings # We also create the arrays for the odometry readings # We join them toeguether. laser_scan = self._check_laser_scan_ready() num_laser_readings = len(laser_scan.ranges)/self.new_ranges high_laser = numpy.full((num_laser_readings), self.max_laser_value) low_laser = numpy.full((num_laser_readings), self.min_laser_value) # We place the Maximum and minimum values of the X,Y and YAW of the odometry # The odometry yaw can be any value in the circunference. high_odometry = numpy.array([self.max_distance, self.max_distance, 3.14]) low_odometry = numpy.array([-1*self.max_distance, -1*self.max_distance, -1*3.14]) # We join both arrays high = numpy.concatenate([high_laser, high_odometry]) low = numpy.concatenate([low_laser, low_odometry]) self.observation_space = spaces.Box(low, high) rospy.logdebug("ACTION SPACES TYPE===>"+str(self.action_space)) rospy.logdebug("OBSERVATION SPACES TYPE===>"+str(self.observation_space)) # Rewards self.closer_to_point_reward = rospy.get_param("/sumit_xl/closer_to_point_reward") self.not_ending_point_reward = rospy.get_param("/sumit_xl/not_ending_point_reward") self.end_episode_points = rospy.get_param("/sumit_xl/end_episode_points") self.cumulated_steps = 0.0 # Here we will add any init functions prior to starting the MyRobotEnv super(SumitXlRoom, self).__init__()
[docs] def _set_init_pose(self): """Sets the Robot in its init pose """ self.move_base( self.init_linear_forward_speed, self.init_linear_turn_speed, epsilon=0.05, update_rate=10) return True
[docs] def _init_env_variables(self): """ Inits variables needed to be initialised each time we reset at the start of an episode. :return: """ # For Info Purposes self.cumulated_reward = 0.0 # Set to false Done, because its calculated asyncronously self._episode_done = False odometry = self.get_odom() self.previous_distance_from_des_point = self.get_distance_from_desired_point(odometry.pose.pose.position)
[docs] def _set_action(self, action): """ This set action will Set the linear and angular speed of the SumitXl based on the action number given. :param action: The action integer that set s what movement to do next. """ rospy.logdebug("Start Set Action ==>"+str(action)) # We convert the actions to speed movements to send to the parent class CubeSingleDiskEnv if action == 0: #FORWARD linear_speed = self.linear_forward_speed angular_speed = 0.0 self.last_action = "FORWARDS" elif action == 1: #LEFT linear_speed = self.linear_turn_speed angular_speed = self.angular_speed self.last_action = "TURN_LEFT" elif action == 2: #RIGHT linear_speed = self.linear_turn_speed angular_speed = -1*self.angular_speed self.last_action = "TURN_RIGHT" """ elif action == 3: #STOP linear_speed = 0.0 angular_speed = 0.0 self.last_action = "STOP" """ # We tell SumitXL the linear and angular speed to set to execute self.move_base(linear_speed, angular_speed, epsilon=0.05, update_rate=10) rospy.logdebug("END Set Action ==>"+str(action))
[docs] def _get_obs(self): """ Here we define what sensor data defines our robots observations To know which Variables we have acces to, we need to read the SumitXlEnv API DOCS WALL CLOSE LEFT [1, 1, 9, 0, 0, 0, -1.8, 0.46, 0.01] WALL CLOSE RIGHT [0, 0, 0, 10, 1, 2, -1.8, -0.61, 0.01] WALL BACK [0, 9, 1, 1, 6, 0, -1.8, -0.54, 1.59] WALL FRONT [2, 9, 0, 0, 2, 2, -1.83, 0.51, 1.58] 0 in reality is arround front 0.4, back 0.5, sides 0.3 :return: """ rospy.logdebug("Start Get Observation ==>") # We get the laser scan data laser_scan = self.get_laser_scan() discretized_laser_scan = self.discretize_scan_observation( laser_scan, self.new_ranges ) # We get the odometry so that SumitXL knows where it is. odometry = self.get_odom() x_position = odometry.pose.pose.position.x y_position = odometry.pose.pose.position.y # We get the orientation of the cube in RPY roll, pitch, yaw = self.get_orientation_euler() # We round to only two decimals to avoid very big Observation space odometry_array = [ round(x_position, 2), round(y_position, 2), round(yaw, 2)] # We only want the X and Y position and the Yaw observations = discretized_laser_scan + odometry_array rospy.logdebug("Observations==>"+str(observations)) rospy.logdebug("END Get Observation ==>") return observations
[docs] def _is_done(self, observations): if self._episode_done: rospy.logerr("SumitXl is Too Close to wall==>") else: rospy.logdebug("SumitXl is NOT close to a wall ==>") # Now we check if it has crashed based on the imu imu_data = self.get_imu() linear_acceleration_magnitude = self.get_vector_magnitude(imu_data.linear_acceleration) if linear_acceleration_magnitude > self.max_linear_aceleration: rospy.logerr("SumitXl Crashed==>"+str(linear_acceleration_magnitude)+">"+str(self.max_linear_aceleration)) self._episode_done = True else: rospy.logerr("DIDNT crash SumitXl ==>"+str(linear_acceleration_magnitude)+">"+str(self.max_linear_aceleration)) current_position = Point() current_position.x = observations[-3] current_position.y = observations[-2] current_position.z = 0.0 if abs(current_position.x) <= self.max_distance: if abs(current_position.y) <= self.max_distance: rospy.logdebug("SummitXL Position is OK ==>["+str(current_position.x)+","+str(current_position.y)+"]") else: rospy.logerr("SummitXL to Far in Y Pos ==>"+str(current_position.x)) self._episode_done = True else: rospy.logerr("SummitXL to Far in X Pos ==>"+str(current_position.x)) self._episode_done = True if self.is_in_desired_position(current_position): self._episode_done = True return self._episode_done
[docs] def _compute_reward(self, observations, done): """ We give reward to the robot when it gets closer to the desired point. We Dont give it contsnatly, but only if there is an improvement """ # We get the current Position from the obervations current_position = Point() current_position.x = observations[-3] current_position.y = observations[-2] current_position.z = 0.0 distance_from_des_point = self.get_distance_from_desired_point(current_position) distance_difference = distance_from_des_point - self.previous_distance_from_des_point rospy.logwarn("current_position=" + str(current_position)) rospy.logwarn("desired_point=" + str(self.desired_point)) rospy.logwarn("total_distance_from_des_point=" + str(self.previous_distance_from_des_point)) rospy.logwarn("distance_from_des_point=" + str(distance_from_des_point)) rospy.logwarn("distance_difference=" + str(distance_difference)) if not done: # If there has been a decrease in the distance to the desired point, we reward it if distance_difference < 0.0: rospy.logwarn("DECREASE IN DISTANCE GOOD") reward = self.closer_to_point_reward else: # If it didnt get closer, we give much less points in theory # This should trigger the behaviour of moving towards the point rospy.logwarn("NO DECREASE IN DISTANCE, so much less points") reward = self.not_ending_point_reward else: if self.is_in_desired_position(current_position): reward = self.end_episode_points rospy.logwarn("GOT TO DESIRED POINT ; DONE, reward=" + str(reward)) else: reward = -1*self.end_episode_points rospy.logerr("SOMETHING WENT WRONG ; DONE, reward=" + str(reward)) self.previous_distance_from_des_point = distance_from_des_point rospy.logwarn("reward=" + str(reward)) self.cumulated_reward += reward rospy.logdebug("Cumulated_reward=" + str(self.cumulated_reward)) self.cumulated_steps += 1 rospy.logdebug("Cumulated_steps=" + str(self.cumulated_steps)) return reward
# Internal TaskEnv Methods
[docs] def discretize_scan_observation(self,data,new_ranges): """ Discards all the laser readings that are not multiple in index of new_ranges value. """ self._episode_done = False discretized_ranges = [] mod = len(data.ranges)/new_ranges rospy.logdebug("data=" + str(data)) rospy.logdebug("new_ranges=" + str(new_ranges)) rospy.logdebug("mod=" + str(mod)) for i, item in enumerate(data.ranges): if (i%mod==0): if item == float ('Inf') or numpy.isinf(item): discretized_ranges.append(self.max_laser_value) elif numpy.isnan(item): discretized_ranges.append(self.min_laser_value) else: discretized_ranges.append(int(item)) if (self.min_range > item > 0): rospy.logerr("done Validation >>> item=" + str(item)+"< "+str(self.min_range)) self._episode_done = True else: rospy.logdebug("NOT done Validation >>> item=" + str(item)+"< "+str(self.min_range)) return discretized_ranges
[docs] def get_vector_magnitude(self, vector): """ It calculated the magnitude of the Vector3 given. This is usefull for reading imu accelerations and knowing if there has been a crash :return: """ contact_force_np = numpy.array((vector.x, vector.y, vector.z)) force_magnitude = numpy.linalg.norm(contact_force_np) return force_magnitude
[docs] def get_orientation_euler(self): # We convert from quaternions to euler orientation_list = [self.odom.pose.pose.orientation.x, self.odom.pose.pose.orientation.y, self.odom.pose.pose.orientation.z, self.odom.pose.pose.orientation.w] roll, pitch, yaw = euler_from_quaternion(orientation_list) return roll, pitch, yaw
[docs] def get_distance_from_desired_point(self, current_position): """ Calculates the distance from the current position to the desired point :param start_point: :return: """ distance = self.get_distance_from_point(current_position, self.desired_point) return distance
[docs] def get_distance_from_point(self, pstart, p_end): """ Given a Vector3 Object, get distance from current position :param p_end: :return: """ a = numpy.array((pstart.x, pstart.y, pstart.z)) b = numpy.array((p_end.x, p_end.y, p_end.z)) distance = numpy.linalg.norm(a - b) return distance
[docs] def is_in_desired_position(self,current_position, epsilon=0.05): """ It return True if the current position is similar to the desired poistion """ is_in_desired_pos = False x_pos_plus = self.desired_point.x + epsilon x_pos_minus = self.desired_point.x - epsilon y_pos_plus = self.desired_point.y + epsilon y_pos_minus = self.desired_point.y - epsilon x_current = current_position.x y_current = current_position.y x_pos_are_close = (x_current <= x_pos_plus) and (x_current > x_pos_minus) y_pos_are_close = (y_current <= y_pos_plus) and (y_current > y_pos_minus) is_in_desired_pos = x_pos_are_close and y_pos_are_close return is_in_desired_pos