Source code for openai_ros.task_envs.moving_cube.one_disk_walk

import rospy
import numpy
import math
from gym import spaces
from openai_ros.robot_envs import cube_single_disk_env
from gym.envs.registration import register
from geometry_msgs.msg import Point
from tf.transformations import euler_from_quaternion

# The path is __init__.py of openai_ros, where we import the MovingCubeOneDiskWalkEnv directly
timestep_limit_per_episode = 10000 # Can be any Value

register(
        id='MovingCubeOneDiskWalk-v0',
        entry_point='openai_ros:task_envs.moving_cube.one_disk_walk.MovingCubeOneDiskWalkEnv',
        timestep_limit=timestep_limit_per_episode,
    )

[docs]class MovingCubeOneDiskWalkEnv(cube_single_disk_env.CubeSingleDiskEnv):
[docs] def __init__(self): # Only variable needed to be set here number_actions = rospy.get_param('/moving_cube/n_actions') self.action_space = spaces.Discrete(number_actions) #number_observations = rospy.get_param('/moving_cube/n_observations') """ We set the Observation space for the 6 observations cube_observations = [ round(current_disk_roll_vel, 0), round(y_distance, 1), round(roll, 1), round(pitch, 1), round(y_linear_speed,1), round(yaw, 1), ] """ # Actions and Observations self.roll_speed_fixed_value = rospy.get_param('/moving_cube/roll_speed_fixed_value') self.roll_speed_increment_value = rospy.get_param('/moving_cube/roll_speed_increment_value') self.max_distance = rospy.get_param('/moving_cube/max_distance') max_roll = 2 * math.pi self.max_pitch_angle = rospy.get_param('/moving_cube/max_pitch_angle') self.max_y_linear_speed = rospy.get_param('/moving_cube/max_y_linear_speed') self.max_yaw_angle = rospy.get_param('/moving_cube/max_yaw_angle') high = numpy.array([ self.roll_speed_fixed_value, self.max_distance, max_roll, self.max_pitch_angle, self.max_y_linear_speed, self.max_y_linear_speed, ]) self.observation_space = spaces.Box(-high, high) rospy.logwarn("ACTION SPACES TYPE===>"+str(self.action_space)) rospy.logwarn("OBSERVATION SPACES TYPE===>"+str(self.observation_space)) # Variables that we retrieve through the param server, loded when launch training launch. self.init_roll_vel = rospy.get_param("/moving_cube/init_roll_vel") # Get Observations self.start_point = Point() self.start_point.x = rospy.get_param("/moving_cube/init_cube_pose/x") self.start_point.y = rospy.get_param("/moving_cube/init_cube_pose/y") self.start_point.z = rospy.get_param("/moving_cube/init_cube_pose/z") # Rewards self.move_distance_reward_weight = rospy.get_param("/moving_cube/move_distance_reward_weight") self.y_linear_speed_reward_weight = rospy.get_param("/moving_cube/y_linear_speed_reward_weight") self.y_axis_angle_reward_weight = rospy.get_param("/moving_cube/y_axis_angle_reward_weight") self.end_episode_points = rospy.get_param("/moving_cube/end_episode_points") self.cumulated_steps = 0.0 # Here we will add any init functions prior to starting the MyRobotEnv super(MovingCubeOneDiskWalkEnv, self).__init__()
[docs] def _set_init_pose(self): """Sets the Robot in its init pose """ self.move_joints(self.init_roll_vel) return True
[docs] def _init_env_variables(self): """ Inits variables needed to be initialised each time we reset at the start of an episode. :return: """ self.total_distance_moved = 0.0 self.current_y_distance = self.get_y_dir_distance_from_start_point(self.start_point) self.roll_turn_speed = rospy.get_param('/moving_cube/init_roll_vel') # For Info Purposes self.cumulated_reward = 0.0
#self.cumulated_steps = 0.0
[docs] def _set_action(self, action): # We convert the actions to speed movements to send to the parent class CubeSingleDiskEnv if action == 0:# Move Speed Wheel Forwards self.roll_turn_speed = self.roll_speed_fixed_value elif action == 1:# Move Speed Wheel Backwards self.roll_turn_speed = -1*self.roll_speed_fixed_value elif action == 2:# Stop Speed Wheel self.roll_turn_speed = 0.0 elif action == 3:# Increment Speed self.roll_turn_speed += self.roll_speed_increment_value elif action == 4:# Decrement Speed self.roll_turn_speed -= self.roll_speed_increment_value # We clamp Values to maximum rospy.logdebug("roll_turn_speed before clamp=="+str(self.roll_turn_speed)) self.roll_turn_speed = numpy.clip(self.roll_turn_speed, -1*self.roll_speed_fixed_value, self.roll_speed_fixed_value) rospy.logdebug("roll_turn_speed after clamp==" + str(self.roll_turn_speed)) # We tell the OneDiskCube to spin the RollDisk at the selected speed self.move_joints(self.roll_turn_speed)
[docs] def _get_obs(self): """ Here we define what sensor data defines our robots observations To know which Variables we have acces to, we need to read the MyCubeSingleDiskEnv API DOCS :return: """ # We get the orientation of the cube in RPY roll, pitch, yaw = self.get_orientation_euler() # We get the distance from the origin #distance = self.get_distance_from_start_point(self.start_point) y_distance = self.get_y_dir_distance_from_start_point(self.start_point) # We get the current speed of the Roll Disk current_disk_roll_vel = self.get_roll_velocity() # We get the linear speed in the y axis y_linear_speed = self.get_y_linear_speed() cube_observations = [ round(current_disk_roll_vel, 0), round(y_distance, 1), round(roll, 1), round(pitch, 1), round(y_linear_speed,1), round(yaw, 1) ] rospy.logdebug("Observations==>"+str(cube_observations)) return cube_observations
[docs] def _is_done(self, observations): pitch_angle = observations[3] yaw_angle = observations[5] if abs(pitch_angle) > self.max_pitch_angle: rospy.logerr("WRONG Cube Pitch Orientation==>" + str(pitch_angle)) done = True else: rospy.logdebug("Cube Pitch Orientation Ok==>" + str(pitch_angle)) if abs(yaw_angle) > self.max_yaw_angle: rospy.logerr("WRONG Cube Yaw Orientation==>" + str(yaw_angle)) done = True else: rospy.logdebug("Cube Yaw Orientation Ok==>" + str(yaw_angle)) done = False return done
[docs] def _compute_reward(self, observations, done): if not done: y_distance_now = observations[1] delta_distance = y_distance_now - self.current_y_distance rospy.logdebug("y_distance_now=" + str(y_distance_now)+", current_y_distance=" + str(self.current_y_distance)) rospy.logdebug("delta_distance=" + str(delta_distance)) reward_distance = delta_distance * self.move_distance_reward_weight self.current_y_distance = y_distance_now y_linear_speed = observations[4] rospy.logdebug("y_linear_speed=" + str(y_linear_speed)) reward_y_axis_speed = y_linear_speed * self.y_linear_speed_reward_weight # Negative Reward for yaw different from zero. yaw_angle = observations[5] rospy.logdebug("yaw_angle=" + str(yaw_angle)) # Worst yaw is 90 and 270 degrees, best 0 and 180. We use sin function for giving reward. sin_yaw_angle = math.sin(yaw_angle) rospy.logdebug("sin_yaw_angle=" + str(sin_yaw_angle)) reward_y_axis_angle = -1 * abs(sin_yaw_angle) * self.y_axis_angle_reward_weight # We are not intereseted in decimals of the reward, doesnt give any advatage. reward = round(reward_distance, 0) + round(reward_y_axis_speed, 0) + round(reward_y_axis_angle, 0) rospy.logdebug("reward_distance=" + str(reward_distance)) rospy.logdebug("reward_y_axis_speed=" + str(reward_y_axis_speed)) rospy.logdebug("reward_y_axis_angle=" + str(reward_y_axis_angle)) rospy.logdebug("reward=" + str(reward)) else: reward = -1*self.end_episode_points self.cumulated_reward += reward rospy.logdebug("Cumulated_reward=" + str(self.cumulated_reward)) self.cumulated_steps += 1 rospy.logdebug("Cumulated_steps=" + str(self.cumulated_steps)) return reward
# Internal TaskEnv Methods
[docs] def get_y_dir_distance_from_start_point(self, start_point): """ Calculates the distance from the given point and the current position given by odometry. In this case the increase or decrease in y. :param start_point: :return: """ y_dist_dir = self.odom.pose.pose.position.y - start_point.y return y_dist_dir
[docs] def get_distance_from_start_point(self, start_point): """ Calculates the distance from the given point and the current position given by odometry :param start_point: :return: """ distance = self.get_distance_from_point(start_point, self.odom.pose.pose.position) return distance
[docs] def get_distance_from_point(self, pstart, p_end): """ Given a Vector3 Object, get distance from current position :param p_end: :return: """ a = numpy.array((pstart.x, pstart.y, pstart.z)) b = numpy.array((p_end.x, p_end.y, p_end.z)) distance = numpy.linalg.norm(a - b) return distance
[docs] def get_orientation_euler(self): # We convert from quaternions to euler orientation_list = [self.odom.pose.pose.orientation.x, self.odom.pose.pose.orientation.y, self.odom.pose.pose.orientation.z, self.odom.pose.pose.orientation.w] roll, pitch, yaw = euler_from_quaternion(orientation_list) return roll, pitch, yaw
[docs] def get_roll_velocity(self): # We get the current joint roll velocity roll_vel = self.joints.velocity[0] return roll_vel
[docs] def get_y_linear_speed(self): # We get the current joint roll velocity y_linear_speed = self.odom.twist.twist.linear.y return y_linear_speed