Source code for openai_ros.robot_envs.cartpole_env

#!/usr/bin/env python

import gym
import rospy
import roslaunch
import time
import numpy as np
from gym import utils, spaces
from geometry_msgs.msg import Twist
from std_srvs.srv import Empty
from gym.utils import seeding
from gym.envs.registration import register
import copy
import math
import os

from sensor_msgs.msg import JointState
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from std_msgs.msg import Float64
from gazebo_msgs.srv import SetLinkState
from gazebo_msgs.msg import LinkState
from rosgraph_msgs.msg import Clock
from openai_ros import robot_gazebo_env

[docs]class CartPoleEnv(robot_gazebo_env.RobotGazeboEnv):
[docs] def __init__( self, control_type ): self.publishers_array = [] self._base_pub = rospy.Publisher('/cartpole_v0/foot_joint_velocity_controller/command', Float64, queue_size=1) self._pole_pub = rospy.Publisher('/cartpole_v0/pole_joint_velocity_controller/command', Float64, queue_size=1) self.publishers_array.append(self._base_pub) self.publishers_array.append(self._pole_pub) rospy.Subscriber("/cartpole_v0/joint_states", JointState, self.joints_callback) self.control_type = control_type if self.control_type == "velocity": self.controllers_list = ['joint_state_controller', 'pole_joint_velocity_controller', 'foot_joint_velocity_controller', ] elif self.control_type == "position": self.controllers_list = ['joint_state_controller', 'pole_joint_position_controller', 'foot_joint_position_controller', ] elif self.control_type == "effort": self.controllers_list = ['joint_state_controller', 'pole_joint_effort_controller', 'foot_joint_effort_controller', ] self.robot_name_space = "cartpole_v0" self.reset_controls = True # Seed the environment self._seed() self.steps_beyond_done = None super(CartPoleEnv, self).__init__( controllers_list=self.controllers_list, robot_name_space=self.robot_name_space, reset_controls=self.reset_controls )
[docs] def joints_callback(self, data): self.joints = data
[docs] def _seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed]
# RobotEnv methods # ----------------------------
[docs] def _env_setup(self, initial_qpos): self.init_internal_vars(self.init_pos) self.set_init_pose() self.check_all_systems_ready()
[docs] def init_internal_vars(self, init_pos_value): self.pos = [init_pos_value] self.joints = None
[docs] def check_publishers_connection(self): """ Checks that all the publishers are working :return: """ rate = rospy.Rate(10) # 10hz while (self._base_pub.get_num_connections() == 0 and not rospy.is_shutdown()): rospy.logdebug("No susbribers to _base_pub yet so we wait and try again") try: rate.sleep() except rospy.ROSInterruptException: # This is to avoid error when world is rested, time when backwards. pass rospy.logdebug("_base_pub Publisher Connected") while (self._pole_pub.get_num_connections() == 0 and not rospy.is_shutdown()): rospy.logdebug("No susbribers to _pole_pub yet so we wait and try again") try: rate.sleep() except rospy.ROSInterruptException: # This is to avoid error when world is rested, time when backwards. pass rospy.logdebug("_pole_pub Publisher Connected") rospy.logdebug("All Publishers READY")
[docs] def _check_all_systems_ready(self, init=True): self.base_position = None while self.base_position is None and not rospy.is_shutdown(): try: self.base_position = rospy.wait_for_message("/cartpole_v0/joint_states", JointState, timeout=1.0) rospy.logdebug("Current cartpole_v0/joint_states READY=>"+str(self.base_position)) if init: # We Check all the sensors are in their initial values positions_ok = all(abs(i) <= 1.0e-02 for i in self.base_position.position) velocity_ok = all(abs(i) <= 1.0e-02 for i in self.base_position.velocity) efforts_ok = all(abs(i) <= 1.0e-01 for i in self.base_position.effort) base_data_ok = positions_ok and velocity_ok and efforts_ok rospy.logdebug("Checking Init Values Ok=>" + str(base_data_ok)) except: rospy.logerr("Current cartpole_v0/joint_states not ready yet, retrying for getting joint_states") rospy.logdebug("ALL SYSTEMS READY")
[docs] def move_joints(self, joints_array): joint_value = Float64() joint_value.data = joints_array[0] rospy.logdebug("Single Base JointsPos>>"+str(joint_value)) self._base_pub.publish(joint_value)
[docs] def get_clock_time(self): self.clock_time = None while self.clock_time is None and not rospy.is_shutdown(): try: self.clock_time = rospy.wait_for_message("/clock", Clock, timeout=1.0) rospy.logdebug("Current clock_time READY=>" + str(self.clock_time)) except: rospy.logdebug("Current clock_time not ready yet, retrying for getting Current clock_time") return self.clock_time