Source code for openai_ros.robot_envs.cube_rl_utils
#!/usr/bin/env python
import time
import rospy
import math
import copy
import numpy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Point
from tf.transformations import euler_from_quaternion
[docs]class CubeRLUtils(object):
[docs] def __init__(self):
self.check_all_sensors_ready()
rospy.Subscriber("/moving_cube/joint_states", JointState, self.joints_callback)
rospy.Subscriber("/moving_cube/odom", Odometry, self.odom_callback)
self._roll_vel_pub = rospy.Publisher('/moving_cube/inertia_wheel_roll_joint_velocity_controller/command',
Float64, queue_size=1)
self.check_publishers_connection()
[docs] def check_all_sensors_ready(self):
self.disk_joints_data = None
while self.disk_joints_data is None and not rospy.is_shutdown():
try:
self.disk_joints_data = rospy.wait_for_message("/moving_cube/joint_states", JointState, timeout=1.0)
rospy.loginfo("Current moving_cube/joint_states READY=>" + str(self.disk_joints_data))
except:
rospy.logerr("Current moving_cube/joint_states not ready yet, retrying for getting joint_states")
self.cube_odom_data = None
while self.disk_joints_data is None and not rospy.is_shutdown():
try:
self.cube_odom_data = rospy.wait_for_message("/moving_cube/odom", Odometry, timeout=1.0)
rospy.loginfo("Current /moving_cube/odom READY=>" + str(self.cube_odom_data))
except:
rospy.logerr("Current /moving_cube/odom not ready yet, retrying for getting odom")
rospy.loginfo("ALL SENSORS READY")
[docs] def check_publishers_connection(self):
"""
Checks that all the publishers are working
:return:
"""
rate = rospy.Rate(10) # 10hz
while (self._roll_vel_pub.get_num_connections() == 0 and not rospy.is_shutdown()):
rospy.loginfo("No susbribers to _roll_vel_pub yet so we wait and try again")
try:
rate.sleep()
except rospy.ROSInterruptException:
# This is to avoid error when world is rested, time when backwards.
pass
rospy.loginfo("_base_pub Publisher Connected")
rospy.loginfo("All Publishers READY")
[docs] def joints_callback(self, data):
self.joints = data
[docs] def odom_callback(self, data):
self.odom = data
# Reinforcement Learning Utility Code
[docs] def move_joints(self, roll_speed):
joint_speed_value = Float64()
joint_speed_value.data = roll_speed
rospy.loginfo("Single Disk Roll Velocity>>" + str(joint_speed_value))
self._roll_vel_pub.publish(joint_speed_value)
[docs] def get_cube_state(self):
# We convert from quaternions to euler
orientation_list = [self.odom.pose.pose.orientation.x,
self.odom.pose.pose.orientation.y,
self.odom.pose.pose.orientation.z,
self.odom.pose.pose.orientation.w]
roll, pitch, yaw = euler_from_quaternion(orientation_list)
# We get the distance from the origin
start_position = Point()
start_position.x = 0.0
start_position.y = 0.0
start_position.z = 0.0
distance = self.get_distance_from_point(start_position,
self.odom.pose.pose.position)
cube_state = [
round(self.joints.velocity[0], 1),
round(distance, 1),
round(roll, 1),
round(pitch, 1),
round(yaw, 1)
]
return cube_state
[docs] def observation_checks(self, cube_state):
# MAximum distance to travel permited in meters from origin
max_distance = 2.0
if (cube_state[1] > max_distance):
rospy.logerr("Cube Too Far==>" + str(cube_state[1]))
done = True
else:
rospy.loginfo("Cube NOT Too Far==>" + str(cube_state[1]))
done = False
return done
[docs] def get_distance_from_point(self, pstart, p_end):
"""
Given a Vector3 Object, get distance from current position
:param p_end:
:return:
"""
a = numpy.array((pstart.x, pstart.y, pstart.z))
b = numpy.array((p_end.x, p_end.y, p_end.z))
distance = numpy.linalg.norm(a - b)
return distance
[docs] def get_reward_for_observations(self, state):
# We reward it for lower speeds and distance traveled
speed = state[0]
distance = state[1]
# Positive Reinforcement
reward_distance = distance * 10.0
# Negative Reinforcement for magnitude of speed
reward_for_efective_movement = -1 * abs(speed)
reward = reward_distance + reward_for_efective_movement
rospy.loginfo("Reward_distance=" + str(reward_distance))
rospy.loginfo("Reward_for_efective_movement= " + str(reward_for_efective_movement))
return reward
[docs]def cube_rl_systems_test():
rospy.init_node('cube_rl_systems_test_node', anonymous=True, log_level=rospy.INFO)
cube_rl_utils_object = CubeRLUtils()
rospy.loginfo("Moving to Speed==>80")
cube_rl_utils_object.move_joints(roll_speed=80.0)
time.sleep(2)
rospy.loginfo("Moving to Speed==>-80")
cube_rl_utils_object.move_joints(roll_speed=-80.0)
time.sleep(2)
rospy.loginfo("Moving to Speed==>0.0")
cube_rl_utils_object.move_joints(roll_speed=0.0)
time.sleep(2)
cube_state = cube_rl_utils_object.get_cube_state()
done = cube_rl_utils_object.observation_checks(cube_state)
reward = cube_rl_utils_object.get_reward_for_observations(cube_state)
rospy.loginfo("Done==>" + str(done))
rospy.loginfo("Reward==>" + str(reward))
if __name__ == "__main__":
cube_rl_systems_test()