Source code for openai_ros.gazebo_connection

#!/usr/bin/env python

import rospy
from std_srvs.srv import Empty
from gazebo_msgs.msg import ODEPhysics
from gazebo_msgs.srv import SetPhysicsProperties, SetPhysicsPropertiesRequest
from std_msgs.msg import Float64
from geometry_msgs.msg import Vector3

[docs]class GazeboConnection():
[docs] def __init__(self, start_init_physics_parameters, reset_world_or_sim): self.unpause = rospy.ServiceProxy('/gazebo/unpause_physics', Empty) self.pause = rospy.ServiceProxy('/gazebo/pause_physics', Empty) self.reset_simulation_proxy = rospy.ServiceProxy('/gazebo/reset_simulation', Empty) self.reset_world_proxy = rospy.ServiceProxy('/gazebo/reset_world', Empty) # Setup the Gravity Controle system service_name = '/gazebo/set_physics_properties' rospy.logdebug("Waiting for service " + str(service_name)) rospy.wait_for_service(service_name) rospy.logdebug("Service Found " + str(service_name)) self.set_physics = rospy.ServiceProxy(service_name, SetPhysicsProperties) self.start_init_physics_parameters = start_init_physics_parameters self.reset_world_or_sim = reset_world_or_sim self.init_values() # We always pause the simulation, important for legged robots learning self.pauseSim()
[docs] def pauseSim(self): rospy.wait_for_service('/gazebo/pause_physics') try: self.pause() except rospy.ServiceException as e: print ("/gazebo/pause_physics service call failed")
[docs] def unpauseSim(self): rospy.wait_for_service('/gazebo/unpause_physics') try: self.unpause() except rospy.ServiceException as e: print ("/gazebo/unpause_physics service call failed")
[docs] def resetSim(self): """ This was implemented because some simulations, when reseted the simulation the systems that work with TF break, and because sometime we wont be able to change them we need to reset world that ONLY resets the object position, not the entire simulation systems. """ if self.reset_world_or_sim == "SIMULATION": rospy.logerr("SIMULATION RESET") self.resetSimulation() elif self.reset_world_or_sim == "WORLD": rospy.logerr("WORLD RESET") self.resetWorld() elif self.reset_world_or_sim == "NO_RESET_SIM": rospy.logerr("NO RESET SIMULATION SELECTED") else: rospy.logerr("WRONG Reset Option:"+str(self.reset_world_or_sim))
[docs] def resetSimulation(self): rospy.wait_for_service('/gazebo/reset_simulation') try: self.reset_simulation_proxy() except rospy.ServiceException as e: print ("/gazebo/reset_simulation service call failed")
[docs] def resetWorld(self): rospy.wait_for_service('/gazebo/reset_world') try: self.reset_world_proxy() except rospy.ServiceException as e: print ("/gazebo/reset_world service call failed")
[docs] def init_values(self): self.resetSim() if self.start_init_physics_parameters: rospy.logwarn("Initialising Simulation Physics Parameters") self.init_physics_parameters() else: rospy.logerr("NOT Initialising Simulation Physics Parameters")
[docs] def init_physics_parameters(self): """ We initialise the physics parameters of the simulation, like gravity, friction coeficients and so on. """ self._time_step = Float64(0.001) self._max_update_rate = Float64(1000.0) self._gravity = Vector3() self._gravity.x = 0.0 self._gravity.y = 0.0 self._gravity.z = -9.81 self._ode_config = ODEPhysics() self._ode_config.auto_disable_bodies = False self._ode_config.sor_pgs_precon_iters = 0 self._ode_config.sor_pgs_iters = 50 self._ode_config.sor_pgs_w = 1.3 self._ode_config.sor_pgs_rms_error_tol = 0.0 self._ode_config.contact_surface_layer = 0.001 self._ode_config.contact_max_correcting_vel = 0.0 self._ode_config.cfm = 0.0 self._ode_config.erp = 0.2 self._ode_config.max_contacts = 20 self.update_gravity_call()
[docs] def update_gravity_call(self): self.pauseSim() set_physics_request = SetPhysicsPropertiesRequest() set_physics_request.time_step = self._time_step.data set_physics_request.max_update_rate = self._max_update_rate.data set_physics_request.gravity = self._gravity set_physics_request.ode_config = self._ode_config rospy.logdebug(str(set_physics_request.gravity)) result = self.set_physics(set_physics_request) rospy.logdebug("Gravity Update Result==" + str(result.success) + ",message==" + str(result.status_message)) self.unpauseSim()
[docs] def change_gravity(self, x, y, z): self._gravity.x = x self._gravity.y = y self._gravity.z = z self.update_gravity_call()