Source code for openai_ros.task_envs.cartpole_stay_up.stay_up

from gym import utils
from openai_ros.robot_envs import cartpole_env
from gym.envs.registration import register
from gym import error, spaces
import rospy
import math
import numpy as np

# The path is __init__.py of openai_ros, where we import the MovingCubeOneDiskWalkEnv directly
register(
        id='CartPoleStayUp-v0',
        entry_point='openai_ros:task_envs.cartpole_stay_up.stay_up.CartPoleStayUpEnv',
        timestep_limit=1000,
    )

[docs]class CartPoleStayUpEnv(cartpole_env.CartPoleEnv):
[docs] def __init__(self): self.get_params() self.action_space = spaces.Discrete(self.n_actions) high = np.array([ 2.5 * 2, np.finfo(np.float32).max, 0.7 * 2, np.finfo(np.float32).max]) self.observation_space = spaces.Box(-high, high) cartpole_env.CartPoleEnv.__init__( self, control_type=self.control_type )
[docs] def get_params(self): #get configuration parameters self.n_actions = rospy.get_param('/cartpole_v0/n_actions') self.min_pole_angle = rospy.get_param('/cartpole_v0/min_pole_angle') self.max_pole_angle = rospy.get_param('/cartpole_v0/max_pole_angle') self.max_base_velocity = rospy.get_param('/cartpole_v0/max_base_velocity') self.min_base_pose_x = rospy.get_param('/cartpole_v0/min_base_pose_x') self.max_base_pose_x = rospy.get_param('/cartpole_v0/max_base_pose_x') self.pos_step = rospy.get_param('/cartpole_v0/pos_step') self.running_step = rospy.get_param('/cartpole_v0/running_step') self.init_pos = rospy.get_param('/cartpole_v0/init_pos') self.wait_time = rospy.get_param('/cartpole_v0/wait_time') self.control_type = rospy.get_param('/cartpole_v0/control_type')
[docs] def _set_action(self, action): # Take action if action == 0: #LEFT rospy.loginfo("GO LEFT...") self.pos[0] -= self.pos_step elif action == 1: #RIGHT rospy.loginfo("GO RIGHT...") self.pos[0] += self.pos_step elif action == 2: #LEFT BIG rospy.loginfo("GO LEFT BIG...") self.pos[0] -= self.pos_step * 10 elif action == 3: #RIGHT BIG rospy.loginfo("GO RIGHT BIG...") self.pos[0] += self.pos_step * 10 # Apply action to simulation. rospy.loginfo("MOVING TO POS=="+str(self.pos)) # 1st: unpause simulation rospy.logdebug("Unpause SIM...") self.gazebo.unpauseSim() self.move_joints(self.pos) rospy.logdebug("Wait for some time to execute movement, time="+str(self.running_step)) rospy.sleep(self.running_step) #wait for some time rospy.logdebug("DONE Wait for some time to execute movement, time=" + str(self.running_step)) # 3rd: pause simulation rospy.logdebug("Pause SIM...") self.gazebo.pauseSim()
[docs] def _get_obs(self): data = self.joints # base_postion base_velocity pole angle pole velocity obs = [round(data.position[1],1), round(data.velocity[1],1), round(data.position[0],1), round(data.velocity[0],1)] return obs
[docs] def _is_done(self, observations): done = False data = self.joints rospy.loginfo("BASEPOSITION=="+str(observations[0])) rospy.loginfo("POLE ANGLE==" + str(observations[2])) if (self.min_base_pose_x >= observations[0] or observations[0] >= self.max_base_pose_x): #check if the base is still within the ranges of (-2, 2) rospy.logerr("Base Outside Limits==>min="+str(self.min_base_pose_x)+",pos="+str(observations[0])+",max="+str(self.max_base_pose_x)) done = True if (self.min_pole_angle >= observations[2] or observations[2] >= self.max_pole_angle): #check if pole has toppled over rospy.logerr( "Pole Angle Outside Limits==>min=" + str(self.min_pole_angle) + ",pos=" + str(observations[2]) + ",max=" + str( self.max_pole_angle)) done = True return done
[docs] def _compute_reward(self, observations, done): """ Gives more points for staying upright, gets data from given observations to avoid having different data than other previous functions :return:reward """ if not done: reward = 1.0 elif self.steps_beyond_done is None: # Pole just fell! self.steps_beyond_done = 0 reward = 1.0 else: if self.steps_beyond_done == 0: logger.warning("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.") self.steps_beyond_done += 1 reward = 0.0 return reward
[docs] def _init_env_variables(self): """ Inits variables needed to be initialised each time we reset at the start of an episode. :return: """ self.steps_beyond_done = None
[docs] def _set_init_pose(self): """ Sets joints to initial position [0,0,0] :return: """ self.check_publishers_connection() # Reset Internal pos variable self.init_internal_vars(self.init_pos) self.move_joints(self.pos)