Source code for openai_ros.robot_gazebo_env

import rospy
import gym
from gym.utils import seeding
from .gazebo_connection import GazeboConnection
from .controllers_connection import ControllersConnection
from openai_ros.msg import RLExperimentInfo

[docs]class RobotGazeboEnv(gym.Env):
[docs] def __init__(self, robot_name_space, controllers_list, reset_controls, start_init_physics_parameters=True, reset_world_or_sim="SIMULATION"): # To reset Simulations rospy.logdebug("START init RobotGazeboEnv") self.gazebo = GazeboConnection(start_init_physics_parameters,reset_world_or_sim) self.controllers_object = ControllersConnection(namespace=robot_name_space, controllers_list=controllers_list) self.reset_controls = reset_controls self.seed() # Set up ROS related variables self.episode_num = 0 self.cumulated_episode_reward = 0 self.reward_pub = rospy.Publisher('/openai/reward', RLExperimentInfo, queue_size=1) rospy.logdebug("END init RobotGazeboEnv")
# Env methods
[docs] def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed]
[docs] def step(self, action): """ Function executed each time step. Here we get the action execute it in a time step and retrieve the observations generated by that action. :param action: :return: obs, reward, done, info """ """ Here we should convert the action num to movement action, execute the action in the simulation and get the observations result of performing that action. """ self.gazebo.unpauseSim() self._set_action(action) self.gazebo.pauseSim() obs = self._get_obs() done = self._is_done(obs) info = {} reward = self._compute_reward(obs, done) self.cumulated_episode_reward += reward return obs, reward, done, info
[docs] def reset(self): rospy.logdebug("Reseting RobotGazeboEnvironment") self._reset_sim() self._init_env_variables() self._update_episode() obs = self._get_obs() rospy.logdebug("END Reseting RobotGazeboEnvironment") return obs
[docs] def close(self): """ Function executed when closing the environment. Use it for closing GUIS and other systems that need closing. :return: """ rospy.logdebug("Closing RobotGazeboEnvironment") rospy.signal_shutdown("Closing RobotGazeboEnvironment")
[docs] def _update_episode(self): """ Publishes the cumulated reward of the episode and increases the episode number by one. :return: """ self._publish_reward_topic( self.cumulated_episode_reward, self.episode_num ) self.episode_num += 1 self.cumulated_episode_reward = 0
[docs] def _publish_reward_topic(self, reward, episode_number=1): """ This function publishes the given reward in the reward topic for easy access from ROS infrastructure. :param reward: :param episode_number: :return: """ reward_msg = RLExperimentInfo() reward_msg.episode_number = episode_number reward_msg.episode_reward = reward self.reward_pub.publish(reward_msg)
# Extension methods # ----------------------------
[docs] def _reset_sim(self): """Resets a simulation """ rospy.logdebug("START robot gazebo _reset_sim") if self.reset_controls : self.gazebo.unpauseSim() self.controllers_object.reset_controllers() self._check_all_systems_ready() self._set_init_pose() self.gazebo.pauseSim() self.gazebo.resetSim() self.gazebo.unpauseSim() self.controllers_object.reset_controllers() self._check_all_systems_ready() self.gazebo.pauseSim() else: self.gazebo.unpauseSim() self._check_all_systems_ready() self._set_init_pose() self.gazebo.pauseSim() self.gazebo.resetSim() self.gazebo.unpauseSim() self._check_all_systems_ready() self.gazebo.pauseSim() rospy.logdebug("END robot gazebo _reset_sim") return True
[docs] def _set_init_pose(self): """Sets the Robot in its init pose """ raise NotImplementedError()
[docs] def _check_all_systems_ready(self): """ Checks that all the sensors, publishers and other simulation systems are operational. """ raise NotImplementedError()
[docs] def _get_obs(self): """Returns the observation. """ raise NotImplementedError()
[docs] def _init_env_variables(self): """Inits variables needed to be initialised each time we reset at the start of an episode. """ raise NotImplementedError()
[docs] def _set_action(self, action): """Applies the given action to the simulation. """ raise NotImplementedError()
[docs] def _is_done(self, observations): """Indicates whether or not the episode is done ( the robot has fallen for example). """ raise NotImplementedError()
[docs] def _compute_reward(self, observations, done): """Calculates the reward to give based on the observations given. """ raise NotImplementedError()
[docs] def _env_setup(self, initial_qpos): """Initial configuration of the environment. Can be used to configure initial state and extract information from the simulation. """ raise NotImplementedError()