Source code for openai_ros.task_envs.husarion.husarion_get_to_position_turtlebot_playground

import rospy
import numpy
from gym import spaces
from openai_ros.robot_envs import husarion_env
from gym.envs.registration import register
from geometry_msgs.msg import Vector3
from geometry_msgs.msg import Point
from tf.transformations import euler_from_quaternion
from sensor_msgs.msg import LaserScan
from std_msgs.msg import Header


# The path is __init__.py of openai_ros, where we import the SumitXlMazeEnv directly
timestep_limit_per_episode = 10000 # Can be any Value

register(
        id='HusarionGetToPosTurtleBotPlayGround-v0',
        entry_point='openai_ros:task_envs.husarion.husarion_get_to_position_turtlebot_playground.HusarionGetToPosTurtleBotPlayGroundEnv',
        timestep_limit=timestep_limit_per_episode,
    )

[docs]class HusarionGetToPosTurtleBotPlayGroundEnv(husarion_env.HusarionEnv):
[docs] def __init__(self): """ This Task Env is designed for having the husarion in the husarion world closed room with columns. It will learn how to move around without crashing. """ # Only variable needed to be set here number_actions = rospy.get_param('/husarion/n_actions') self.action_space = spaces.Discrete(number_actions) # We set the reward range, which is not compulsory but here we do it. self.reward_range = (-numpy.inf, numpy.inf) # Actions and Observations self.init_linear_forward_speed = rospy.get_param('/husarion/init_linear_forward_speed') self.init_linear_turn_speed = rospy.get_param('/husarion/init_linear_turn_speed') self.linear_forward_speed = rospy.get_param('/husarion/linear_forward_speed') self.linear_turn_speed = rospy.get_param('/husarion/linear_turn_speed') self.angular_speed = rospy.get_param('/husarion/angular_speed') self.new_ranges = rospy.get_param('/husarion/new_ranges') self.max_laser_value = rospy.get_param('/husarion/max_laser_value') self.min_laser_value = rospy.get_param('/husarion/min_laser_value') self.work_space_x_max = rospy.get_param("/husarion/work_space/x_max") self.work_space_x_min = rospy.get_param("/husarion/work_space/x_min") self.work_space_y_max = rospy.get_param("/husarion/work_space/y_max") self.work_space_y_min = rospy.get_param("/husarion/work_space/y_min") # Get Desired Point to Get self.desired_position = Point() self.desired_position.x = rospy.get_param("/husarion/desired_pose/x") self.desired_position.y = rospy.get_param("/husarion/desired_pose/y") self.precision = rospy.get_param('/husarion/precision') self.precision_epsilon = 1.0 / (10.0 * self.precision) self.move_base_precision = rospy.get_param('/husarion/move_base_precision') # We create the arrays for the laser readings # We also create the arrays for the odometry readings # We join them toeguether. laser_scan = self._check_laser_scan_ready() num_laser_readings = len(laser_scan.ranges)/self.new_ranges high_laser = numpy.full((num_laser_readings), self.max_laser_value) low_laser = numpy.full((num_laser_readings), self.min_laser_value) # We place the Maximum and minimum values of the X,Y and YAW of the odometry # The odometry yaw can be any value in the circunference. high_odometry = numpy.array([ self.work_space_x_max, self.work_space_y_max, 3.14]) low_odometry = numpy.array([ self.work_space_x_min, self.work_space_y_min, -1*3.14]) # Now we fetch the max and min of the Desired Position in 2D XY # We use the exact same as the workspace, just because make no sense # Consider points outside the workspace high_des_pos = numpy.array([ self.work_space_x_max, self.work_space_y_max ]) low_des_pos = numpy.array([ self.work_space_x_min, self.work_space_y_min ]) # We join both arrays high = numpy.concatenate([high_laser, high_odometry, high_des_pos]) low = numpy.concatenate([low_laser, low_odometry, low_des_pos]) self.observation_space = spaces.Box(low, high) rospy.logdebug("ACTION SPACES TYPE===>"+str(self.action_space)) rospy.logdebug("OBSERVATION SPACES TYPE===>"+str(self.observation_space)) # Rewards self.closer_to_point_reward = rospy.get_param("/husarion/closer_to_point_reward") self.alive_reward = rospy.get_param("/husarion/alive_reward") self.end_episode_points = rospy.get_param("/husarion/end_episode_points") self.cumulated_steps = 0.0 self.laser_filtered_pub = rospy.Publisher('/rosbot/laser/scan_filtered', LaserScan, queue_size=1) # Here we will add any init functions prior to starting the MyRobotEnv super(HusarionGetToPosTurtleBotPlayGroundEnv, self).__init__()
[docs] def _set_init_pose(self): """Sets the Robot in its init pose """ self.move_base( self.init_linear_forward_speed, self.init_linear_turn_speed, epsilon=self.move_base_precision, update_rate=10) return True
[docs] def _init_env_variables(self): """ Inits variables needed to be initialised each time we reset at the start of an episode. :return: """ # For Info Purposes self.cumulated_reward = 0.0 self.index = 0 odometry = self.get_odom() self.previous_distance_from_des_point = self.get_distance_from_desired_point(odometry.pose.pose.position, self.desired_position)
[docs] def _set_action(self, action): """ This set action will Set the linear and angular speed of the SumitXl based on the action number given. :param action: The action integer that set s what movement to do next. """ rospy.logdebug("Start Set Action ==>"+str(action)) # We convert the actions to speed movements to send to the parent class CubeSingleDiskEnv if action == 0: #FORWARD linear_speed = self.linear_forward_speed angular_speed = 0.0 last_action = "FORWARDS" elif action == 1: #LEFT linear_speed = self.linear_turn_speed angular_speed = self.angular_speed last_action = "TURN_LEFT" elif action == 2: #RIGHT linear_speed = self.linear_turn_speed angular_speed = -1*self.angular_speed last_action = "TURN_RIGHT" elif action == 3: # BACKWARDS linear_speed = self.linear_forward_speed angular_speed = 0.0 last_action = "BACKWARDS" # We tell Husarion the linear and angular speed to set to execute self.move_base(linear_speed, angular_speed, epsilon=self.move_base_precision, update_rate=10) rospy.logdebug("END Set Action ==>"+str(action)+", ACTION="+str(last_action))
[docs] def _get_obs(self): """ Here we define what sensor data defines our robots observations To know which Variables we have acces to, we need to read the HusarionEnv API DOCS :return: """ rospy.logdebug("Start Get Observation ==>") # We get the laser scan data laser_scan = self.get_laser_scan() discretized_laser_scan = self.discretize_scan_observation( laser_scan, self.new_ranges ) # We get the odometry so that SumitXL knows where it is. odometry = self.get_odom() x_position = odometry.pose.pose.position.x y_position = odometry.pose.pose.position.y # We get the orientation of the cube in RPY roll, pitch, yaw = self.get_orientation_euler() # We round to only two decimals to avoid very big Observation space # We only want the X and Y position and the Yaw odometry_array = [ round(x_position, 1), round(y_position, 1), round(yaw, 1)] # We fetch also the desired position because it conditions the learning # It also make it dynamic, because we can change the desired position and the # learning will be able to adapt. desired_position = [round(self.desired_position.x, 1), round(self.desired_position.y, 1)] # We concatenate all the lists. observations = discretized_laser_scan + odometry_array + desired_position rospy.logwarn("Observations==>"+str(observations)) rospy.logwarn("END Get Observation ==>") return observations
[docs] def _is_done(self, observations): """ We consider that the episode has finished when: 1) Husarion has moved ouside the workspace defined. 2) Husarion is too close to an object 3) Husarion has reached the desired position """ # We fetch data through the observations # Its all the array except from the last four elements, which are XY odom and XY des_pos laser_readings = observations[:-5] current_position = Point() current_position.x = observations[-5] current_position.y = observations[-4] current_position.z = 0.0 desired_position = Point() desired_position.x = observations[-2] desired_position.y = observations[-1] desired_position.z = 0.0 rospy.logwarn("is DONE? laser_readings=" + str(laser_readings)) rospy.logwarn("is DONE? current_position=" + str(current_position)) rospy.logwarn("is DONE? desired_position=" + str(desired_position)) too_close_to_object = self.check_husarion_has_crashed(laser_readings) inside_workspace = self.check_inside_workspace(current_position) reached_des_pos = self.check_reached_desired_position( current_position, desired_position, self.precision_epsilon) is_done = too_close_to_object or not(inside_workspace) or reached_des_pos rospy.logwarn("####################") rospy.logwarn("too_close_to_object=" + str(too_close_to_object)) rospy.logwarn("inside_workspace=" + str(inside_workspace)) rospy.logwarn("reached_des_pos=" + str(reached_des_pos)) rospy.logwarn("is_done=" + str(is_done)) rospy.logwarn("######## END DONE ##") return is_done
[docs] def _compute_reward(self, observations, done): """ We will reward the following behaviours: 1) The distance to the desired point has increase from last step 2) The robot has reached the desired point We will penalise the following behaviours: 1) Ending the episode without reaching the desired pos. That means it has crashed or it has gone outside the workspace """ laser_readings = observations[:-5] current_position = Point() current_position.x = observations[-5] current_position.y = observations[-4] current_position.z = 0.0 desired_position = Point() desired_position.x = observations[-2] desired_position.y = observations[-1] desired_position.z = 0.0 distance_from_des_point = self.get_distance_from_desired_point(current_position, desired_position) distance_difference = distance_from_des_point - self.previous_distance_from_des_point rospy.logwarn("current_position=" + str(current_position)) rospy.logwarn("desired_point=" + str(desired_position)) rospy.logwarn("total_distance_from_des_point=" + str(self.previous_distance_from_des_point)) rospy.logwarn("distance_from_des_point=" + str(distance_from_des_point)) rospy.logwarn("distance_difference=" + str(distance_difference)) if not done: # If there has been a decrease in the distance to the desired point, we reward it if distance_difference < 0.0: rospy.logwarn("DECREASE IN DISTANCE GOOD") reward = self.closer_to_point_reward else: reward = self.alive_reward else: reached_des_pos = self.check_reached_desired_position( current_position, desired_position, self.precision_epsilon) if reached_des_pos: reward = self.end_episode_points rospy.logwarn("GOT TO DESIRED POINT ; DONE, reward=" + str(reward)) else: reward = -1*self.end_episode_points rospy.logerr("SOMETHING WENT WRONG ; DONE, reward=" + str(reward)) self.previous_distance_from_des_point = distance_from_des_point rospy.logwarn("reward=" + str(reward)) self.cumulated_reward += reward rospy.logdebug("Cumulated_reward=" + str(self.cumulated_reward)) self.cumulated_steps += 1 rospy.logdebug("Cumulated_steps=" + str(self.cumulated_steps)) return reward
# Internal TaskEnv Methods
[docs] def update_desired_pos(self,new_position): """ With this method you can change the desired position that you want Usarion to be that initialy is set through rosparams loaded through a yaml file possibly. :new_position: Type Point, because we only value the position. """ self.desired_position.x = new_position.x self.desired_position.y = new_position.y
[docs] def discretize_scan_observation(self,data,new_ranges): """ Discards all the laser readings that are not multiple in index of new_ranges value. """ discretized_ranges = [] mod = len(data.ranges)/new_ranges filtered_range = [] rospy.logdebug("data=" + str(data)) rospy.logdebug("new_ranges=" + str(new_ranges)) rospy.logdebug("mod=" + str(mod)) nan_value = (self.min_laser_value + self.min_laser_value) / 2.0 for i, item in enumerate(data.ranges): if (i%mod==0): if item == float ('Inf') or numpy.isinf(item): rospy.logerr("Infinite Value=" + str(item)+"Assigning Max value") discretized_ranges.append(self.max_laser_value) elif numpy.isnan(item): rospy.logerr("Nan Value=" + str(item)+"Assigning MIN value") discretized_ranges.append(self.min_laser_value) else: # We clamp the laser readings if item > self.max_laser_value: rospy.logwarn("Item Bigger Than MAX, CLAMPING=>" + str(item)+", MAX="+str(self.max_laser_value)) discretized_ranges.append(round(self.max_laser_value,1)) elif item < self.min_laser_value: rospy.logwarn("Item smaller Than MIN, CLAMPING=>" + str(item)+", MIN="+str(self.min_laser_value)) discretized_ranges.append(round(self.min_laser_value,1)) else: rospy.logwarn("Normal Item, no processing=>" + str(item)) discretized_ranges.append(round(item,1)) # We add last value appended filtered_range.append(discretized_ranges[-1]) else: # We add value zero filtered_range.append(0.0) rospy.logwarn(">>>>>>>>>>>>>>>>>>>>>>discretized_ranges=>" + str(discretized_ranges)) self.publish_filtered_laser_scan( laser_original_data=data, new_filtered_laser_range=filtered_range) return discretized_ranges
[docs] def get_orientation_euler(self): # We convert from quaternions to euler orientation_list = [self.odom.pose.pose.orientation.x, self.odom.pose.pose.orientation.y, self.odom.pose.pose.orientation.z, self.odom.pose.pose.orientation.w] roll, pitch, yaw = euler_from_quaternion(orientation_list) return roll, pitch, yaw
[docs] def get_distance_from_desired_point(self, current_position, desired_position): """ Calculates the distance from the current position to the desired point :param current_position: :param desired_position: :return: """ distance = self.get_distance_from_point(current_position, desired_position) return distance
[docs] def get_distance_from_point(self, pstart, p_end): """ Given a Vector3 Object, get distance from current position :param p_end: :return: """ a = numpy.array((pstart.x, pstart.y, pstart.z)) b = numpy.array((p_end.x, p_end.y, p_end.z)) distance = numpy.linalg.norm(a - b) return distance
[docs] def check_husarion_has_crashed(self, laser_readings): """ Based on the laser readings we check if any laser readingdistance is below the minimum distance acceptable. """ husarion_has_crashed = False for laser_distance in laser_readings: rospy.logwarn("laser_distance==>"+str(laser_distance)) if laser_distance == self.min_laser_value: husarion_has_crashed = True rospy.logwarn("HAS CRASHED==>"+str(laser_distance)+", min="+str(self.min_laser_value)) break elif laser_distance < self.min_laser_value: rospy.logerr("Value of laser shouldnt be lower than min==>"+str(laser_distance)+", min="+str(self.min_laser_value)) elif laser_distance > self.max_laser_value: rospy.logerr("Value of laser shouldnt be higher than max==>"+str(laser_distance)+", max="+str(self.min_laser_value)) return husarion_has_crashed
[docs] def check_inside_workspace(self,current_position): """ We check that the current position is inside the given workspace. """ is_inside = False rospy.logwarn("##### INSIDE WORK SPACE? #######") rospy.logwarn("XYZ current_position"+str(current_position)) rospy.logwarn("work_space_x_max"+str(self.work_space_x_max)+",work_space_x_min="+str(self.work_space_x_min)) rospy.logwarn("work_space_y_max"+str(self.work_space_y_max)+",work_space_y_min="+str(self.work_space_y_min)) rospy.logwarn("############") if current_position.x > self.work_space_x_min and current_position.x <= self.work_space_x_max: if current_position.y > self.work_space_y_min and current_position.y <= self.work_space_y_max: is_inside = True return is_inside
[docs] def check_reached_desired_position(self,current_position, desired_position, epsilon=0.1): """ It return True if the current position is similar to the desired poistion """ is_in_desired_pos = False x_pos_plus = desired_position.x + epsilon x_pos_minus = desired_position.x - epsilon y_pos_plus = desired_position.y + epsilon y_pos_minus = desired_position.y - epsilon x_current = current_position.x y_current = current_position.y x_pos_are_close = (x_current <= x_pos_plus) and (x_current > x_pos_minus) y_pos_are_close = (y_current <= y_pos_plus) and (y_current > y_pos_minus) is_in_desired_pos = x_pos_are_close and y_pos_are_close rospy.logdebug("###### IS DESIRED POS ? ######") rospy.logdebug("epsilon==>"+str(epsilon)) rospy.logdebug("current_position"+str(current_position)) rospy.logdebug("x_pos_plus"+str(x_pos_plus)+",x_pos_minus="+str(x_pos_minus)) rospy.logdebug("y_pos_plus"+str(y_pos_plus)+",y_pos_minus="+str(y_pos_minus)) rospy.logdebug("x_pos_are_close"+str(x_pos_are_close)) rospy.logdebug("y_pos_are_close"+str(y_pos_are_close)) rospy.logdebug("is_in_desired_pos"+str(is_in_desired_pos)) rospy.logdebug("############") return is_in_desired_pos
[docs] def publish_filtered_laser_scan(self, laser_original_data, new_filtered_laser_range): length_range = len(laser_original_data.ranges) length_intensities = len(laser_original_data.intensities) laser_filtered_object = LaserScan() h = Header() h.stamp = rospy.Time.now() # Note you need to call rospy.init_node() before this will work h.frame_id = "chassis" laser_filtered_object.header = h laser_filtered_object.angle_min = laser_original_data.angle_min laser_filtered_object.angle_max = laser_original_data.angle_max laser_filtered_object.angle_increment = laser_original_data.angle_increment laser_filtered_object.time_increment = laser_original_data.time_increment laser_filtered_object.scan_time = laser_original_data.scan_time laser_filtered_object.range_min = laser_original_data.range_min laser_filtered_object.range_max = laser_original_data.range_max laser_filtered_object.ranges = [] laser_filtered_object.intensities = [] for item in new_filtered_laser_range: laser_filtered_object.ranges.append(item) laser_filtered_object.intensities.append(item) self.laser_filtered_pub.publish(laser_filtered_object)