Source code for openai_ros.task_envs.turtlebot2.turtlebot2_maze
import rospy
import numpy
from gym import spaces
from openai_ros.robot_envs import turtlebot2_env
from gym.envs.registration import register
# The path is __init__.py of openai_ros, where we import the TurtleBot2MazeEnv directly
timestep_limit_per_episode = 10000 # Can be any Value
register(
id='TurtleBot2Maze-v0',
entry_point='openai_ros:task_envs.turtlebot2.turtlebot2_maze.TurtleBot2MazeEnv',
timestep_limit=timestep_limit_per_episode,
)
[docs]class TurtleBot2MazeEnv(turtlebot2_env.TurtleBot2Env):
[docs] def __init__(self):
"""
This Task Env is designed for having the TurtleBot2 in some kind of maze.
It will learn how to move around the maze without crashing.
"""
# Only variable needed to be set here
number_actions = rospy.get_param('/turtlebot2/n_actions')
self.action_space = spaces.Discrete(number_actions)
# We set the reward range, which is not compulsory but here we do it.
self.reward_range = (-numpy.inf, numpy.inf)
#number_observations = rospy.get_param('/turtlebot2/n_observations')
"""
We set the Observation space for the 6 observations
cube_observations = [
round(current_disk_roll_vel, 0),
round(y_distance, 1),
round(roll, 1),
round(pitch, 1),
round(y_linear_speed,1),
round(yaw, 1),
]
"""
# Actions and Observations
self.linear_forward_speed = rospy.get_param('/turtlebot2/linear_forward_speed')
self.linear_turn_speed = rospy.get_param('/turtlebot2/linear_turn_speed')
self.angular_speed = rospy.get_param('/turtlebot2/angular_speed')
self.init_linear_forward_speed = rospy.get_param('/turtlebot2/init_linear_forward_speed')
self.init_linear_turn_speed = rospy.get_param('/turtlebot2/init_linear_turn_speed')
self.new_ranges = rospy.get_param('/turtlebot2/new_ranges')
self.min_range = rospy.get_param('/turtlebot2/min_range')
self.max_laser_value = rospy.get_param('/turtlebot2/max_laser_value')
self.min_laser_value = rospy.get_param('/turtlebot2/min_laser_value')
# We create two arrays based on the binary values that will be assigned
# In the discretization method.
laser_scan = self._check_laser_scan_ready()
num_laser_readings = len(laser_scan.ranges)/self.new_ranges
high = numpy.full((num_laser_readings), self.max_laser_value)
low = numpy.full((num_laser_readings), self.min_laser_value)
# We only use two integers
self.observation_space = spaces.Box(low, high)
rospy.logdebug("ACTION SPACES TYPE===>"+str(self.action_space))
rospy.logdebug("OBSERVATION SPACES TYPE===>"+str(self.observation_space))
# Rewards
self.forwards_reward = rospy.get_param("/turtlebot2/forwards_reward")
self.turn_reward = rospy.get_param("/turtlebot2/turn_reward")
self.end_episode_points = rospy.get_param("/turtlebot2/end_episode_points")
self.cumulated_steps = 0.0
# Here we will add any init functions prior to starting the MyRobotEnv
super(TurtleBot2MazeEnv, self).__init__()
[docs] def _set_init_pose(self):
"""Sets the Robot in its init pose
"""
self.move_base( self.init_linear_forward_speed,
self.init_linear_turn_speed,
epsilon=0.05,
update_rate=10)
return True
[docs] def _init_env_variables(self):
"""
Inits variables needed to be initialised each time we reset at the start
of an episode.
:return:
"""
# For Info Purposes
self.cumulated_reward = 0.0
# Set to false Done, because its calculated asyncronously
self._episode_done = False
[docs] def _set_action(self, action):
"""
This set action will Set the linear and angular speed of the turtlebot2
based on the action number given.
:param action: The action integer that set s what movement to do next.
"""
rospy.logdebug("Start Set Action ==>"+str(action))
# We convert the actions to speed movements to send to the parent class CubeSingleDiskEnv
if action == 0: #FORWARD
linear_speed = self.linear_forward_speed
angular_speed = 0.0
self.last_action = "FORWARDS"
elif action == 1: #LEFT
linear_speed = self.linear_turn_speed
angular_speed = self.angular_speed
self.last_action = "TURN_LEFT"
elif action == 2: #RIGHT
linear_speed = self.linear_turn_speed
angular_speed = -1*self.angular_speed
self.last_action = "TURN_RIGHT"
# We tell TurtleBot2 the linear and angular speed to set to execute
self.move_base(linear_speed, angular_speed, epsilon=0.05, update_rate=10)
rospy.logdebug("END Set Action ==>"+str(action))
[docs] def _get_obs(self):
"""
Here we define what sensor data defines our robots observations
To know which Variables we have acces to, we need to read the
TurtleBot2Env API DOCS
:return:
"""
rospy.logdebug("Start Get Observation ==>")
# We get the laser scan data
laser_scan = self.get_laser_scan()
discretized_observations = self.discretize_observation( laser_scan,
self.new_ranges
)
rospy.logdebug("Observations==>"+str(discretized_observations))
rospy.logdebug("END Get Observation ==>")
return discretized_observations
[docs] def _is_done(self, observations):
if self._episode_done:
rospy.logerr("TurtleBot2 is Too Close to wall==>")
else:
rospy.logerr("TurtleBot2 is Ok ==>")
return self._episode_done
[docs] def _compute_reward(self, observations, done):
if not done:
if self.last_action == "FORWARDS":
reward = self.forwards_reward
else:
reward = self.turn_reward
else:
reward = -1*self.end_episode_points
rospy.logdebug("reward=" + str(reward))
self.cumulated_reward += reward
rospy.logdebug("Cumulated_reward=" + str(self.cumulated_reward))
self.cumulated_steps += 1
rospy.logdebug("Cumulated_steps=" + str(self.cumulated_steps))
return reward
# Internal TaskEnv Methods
[docs] def discretize_observation(self,data,new_ranges):
"""
Discards all the laser readings that are not multiple in index of new_ranges
value.
"""
self._episode_done = False
discretized_ranges = []
mod = len(data.ranges)/new_ranges
rospy.logdebug("data=" + str(data))
rospy.logwarn("new_ranges=" + str(new_ranges))
rospy.logwarn("mod=" + str(mod))
for i, item in enumerate(data.ranges):
if (i%mod==0):
if item == float ('Inf') or numpy.isinf(item):
discretized_ranges.append(self.max_laser_value)
elif numpy.isnan(item):
discretized_ranges.append(self.min_laser_value)
else:
discretized_ranges.append(int(item))
if (self.min_range > item > 0):
rospy.logerr("done Validation >>> item=" + str(item)+"< "+str(self.min_range))
self._episode_done = True
else:
rospy.logwarn("NOT done Validation >>> item=" + str(item)+"< "+str(self.min_range))
return discretized_ranges