import rospy
import numpy
from gym import spaces
from openai_ros.robot_envs import hopper_env
from gym.envs.registration import register
from geometry_msgs.msg import Point
from geometry_msgs.msg import Vector3
from tf.transformations import euler_from_quaternion
timestep_limit_per_episode = 10000 # Can be any Value
register(
id='HopperStayUp-v0',
entry_point='openai_ros:task_envs.hopper.hopper_stay_up.HopperStayUpEnv',
timestep_limit=timestep_limit_per_episode,
)
[docs]class HopperStayUpEnv(hopper_env.HopperEnv):
[docs] def __init__(self):
"""
Make Hopper Learn how to Stay Up indefenitly
"""
# Only variable needed to be set here
"""
For this version, we consider 6 actions
1-2) Increment/Decrement haa_joint
3-4) Increment/Decrement hfe_joint
5-6) Increment/Decrement kfe_joint
"""
rospy.logdebug("Start HopperStayUpEnv INIT...")
number_actions = rospy.get_param('/monoped/n_actions')
self.action_space = spaces.Discrete(number_actions)
# We set the reward range, which is not compulsory but here we do it.
self.reward_range = (-numpy.inf, numpy.inf)
# Actions and Observations
self.init_joint_states = Vector3()
self.init_joint_states.x = rospy.get_param('/monoped/init_joint_states/haa_joint')
self.init_joint_states.y = rospy.get_param('/monoped/init_joint_states/hfe_joint')
self.init_joint_states.z = rospy.get_param('/monoped/init_joint_states/kfe_joint')
# Get Desired Point to Get
self.desired_point = Point()
self.desired_point.x = rospy.get_param("/monoped/desired_point/x")
self.desired_point.y = rospy.get_param("/monoped/desired_point/y")
self.desired_point.z = rospy.get_param("/monoped/desired_point/z")
self.accepted_error_in_des_pos = rospy.get_param("/monoped/accepted_error_in_des_pos")
self.desired_yaw = rospy.get_param("/monoped/desired_yaw")
self.joint_increment_value = rospy.get_param("/monoped/joint_increment_value")
self.accepted_joint_error = rospy.get_param("/monoped/accepted_joint_error")
self.update_rate = rospy.get_param("/monoped/update_rate")
self.dec_obs = rospy.get_param("/monoped/number_decimals_precision_obs")
self.desired_force = rospy.get_param("/monoped/desired_force")
self.max_x_pos = rospy.get_param("/monoped/max_x_pos")
self.max_y_pos = rospy.get_param("/monoped/max_y_pos")
self.min_height = rospy.get_param("/monoped/min_height")
self.max_height = rospy.get_param("/monoped/max_height")
self.distance_from_desired_point_max = rospy.get_param("/monoped/distance_from_desired_point_max")
self.max_incl_roll = rospy.get_param("/monoped/max_incl")
self.max_incl_pitch = rospy.get_param("/monoped/max_incl")
self.max_contact_force = rospy.get_param("/monoped/max_contact_force")
self.maximum_haa_joint = rospy.get_param("/monoped/max_incl")
self.maximum_hfe_joint = rospy.get_param("/monoped/max_incl")
self.maximum_kfe_joint = rospy.get_param("/monoped/max_incl")
self.min_kfe_joint = rospy.get_param("/monoped/max_incl")
# We place the Maximum and minimum values of observations
high = numpy.array([self.distance_from_desired_point_max,
self.max_incl_roll,
self.max_incl_pitch,
3.14,
self.max_contact_force,
self.maximum_haa_joint,
self.maximum_hfe_joint,
self.maximum_kfe_joint,
self.max_x_pos,
self.max_y_pos,
self.max_height
])
low = numpy.array([ 0.0,
-1*self.max_incl_roll,
-1*self.max_incl_pitch,
-1*3.14,
0.0,
self.maximum_haa_joint,
self.maximum_hfe_joint,
self.min_kfe_joint,
-1*self.max_x_pos,
-1*self.max_y_pos,
self.min_height
])
self.observation_space = spaces.Box(low, high)
rospy.logdebug("ACTION SPACES TYPE===>"+str(self.action_space))
rospy.logdebug("OBSERVATION SPACES TYPE===>"+str(self.observation_space))
# Rewards
self.weight_joint_position = rospy.get_param("/monoped/rewards_weight/weight_joint_position")
self.weight_contact_force = rospy.get_param("/monoped/rewards_weight/weight_contact_force")
self.weight_orientation = rospy.get_param("/monoped/rewards_weight/weight_orientation")
self.weight_distance_from_des_point = rospy.get_param("/monoped/rewards_weight/weight_distance_from_des_point")
self.alive_reward =rospy.get_param("/monoped/alive_reward")
self.done_reward =rospy.get_param("/monoped/done_reward")
# Here we will add any init functions prior to starting the MyRobotEnv
super(HopperStayUpEnv, self).__init__()
rospy.logdebug("END HopperStayUpEnv INIT...")
[docs] def _set_init_pose(self):
"""
Sets the Robot in its init linear and angular speeds
and lands the robot. Its preparing it to be reseted in the world.
"""
joints_array = [self.init_joint_states.x,
self.init_joint_states.y,
self.init_joint_states.z]
self.move_joints( joints_array,
epsilon=self.accepted_joint_error,
update_rate=self.update_rate)
return True
[docs] def _init_env_variables(self):
"""
Inits variables needed to be initialised each time we reset at the start
of an episode.
:return:
"""
# For Info Purposes
self.cumulated_reward = 0.0
# We get the initial pose to mesure the distance from the desired point.
odom = self.get_odom()
self.previous_distance_from_des_point = self.get_distance_from_desired_point(odom.pose.pose.position)
[docs] def _set_action(self, action):
"""
It sets the joints of monoped based on the action integer given
based on the action number given.
:param action: The action integer that sets what movement to do next.
"""
rospy.logdebug("Start Set Action ==>"+str(action))
# We get current Joints values
joint_states = self.get_joint_states()
joint_states_position = joint_states.position
rospy.logdebug("get_action_to_position>>>"+str(joint_states_position))
action_position = [0.0,0.0,0.0]
if action == 0: #Increment haa_joint
action_position[0] = joint_states_position[0] + self.joint_increment_value
action_position[1] = joint_states_position[1]
action_position[2] = joint_states_position[2]
elif action == 1: #Decrement haa_joint
action_position[0] = joint_states_position[0] - self.joint_increment_value
action_position[1] = joint_states_position[1]
action_position[2] = joint_states_position[2]
elif action == 2: #Increment hfe_joint
action_position[0] = joint_states_position[0]
action_position[1] = joint_states_position[1] + self.joint_increment_value
action_position[2] = joint_states_position[2]
elif action == 3: #Decrement hfe_joint
action_position[0] = joint_states_position[0]
action_position[1] = joint_states_position[1] - self.joint_increment_value
action_position[2] = joint_states_position[2]
elif action == 4: #Increment kfe_joint
action_position[0] = joint_states_position[0]
action_position[1] = joint_states_position[1]
action_position[2] = joint_states_position[2] + self.joint_increment_value
elif action == 5: #Decrement kfe_joint
action_position[0] = joint_states_position[0]
action_position[1] = joint_states_position[1]
action_position[2] = joint_states_position[2] - self.joint_increment_value
# We tell monoped where to place its joints next
self.move_joints( action_position,
epsilon=self.accepted_joint_error,
update_rate=self.update_rate)
rospy.logdebug("END Set Action ==>"+str(action))
[docs] def _get_obs(self):
"""
Here we define what sensor data defines our robots observations
To know which Variables we have access to, we need to read the
HopperEnv API DOCS
Returns the state of the robot needed for OpenAI QLearn Algorithm
The state will be defined by an array of the:
1) distance from desired point in meters
2) The pitch orientation in radians
3) the Roll orientation in radians
4) the Yaw orientation in radians
5) Force in contact sensor in Newtons
6-7-8) State of the 3 joints in radians
9) Height of the Base
observation = [distance_from_desired_point,
base_roll,
base_pitch,
base_yaw,
force_magnitude,
joint_states_haa,
joint_states_hfe,
joint_states_kfe,
height_base]
:return: observation
"""
rospy.logdebug("Start Get Observation ==>")
distance_from_desired_point = self.get_distance_from_desired_point(self.desired_point)
base_orientation = self.get_base_rpy()
base_roll = base_orientation.x
base_pitch = base_orientation.y
base_yaw = base_orientation.z
force_magnitude = self.get_contact_force_magnitude()
joint_states = self.get_joint_states()
joint_states_haa = joint_states.position[0]
joint_states_hfe = joint_states.position[1]
joint_states_kfe = joint_states.position[2]
odom = self.get_odom()
base_position = odom.pose.pose.position
observation = []
observation.append(round(distance_from_desired_point,self.dec_obs))
observation.append(round(base_roll,self.dec_obs))
observation.append(round(base_pitch,self.dec_obs))
observation.append(round(base_yaw,self.dec_obs))
observation.append(round(force_magnitude,self.dec_obs))
observation.append(round(joint_states_haa,self.dec_obs))
observation.append(round(joint_states_hfe,self.dec_obs))
observation.append(round(joint_states_kfe,self.dec_obs))
observation.append(round(base_position.x,self.dec_obs))
observation.append(round(base_position.y,self.dec_obs))
observation.append(round(base_position.z,self.dec_obs)) # height
return observation
[docs] def _is_done(self, observations):
"""
We consider the episode done if:
1) The Monopeds height is lower than a threshhold
2) The Orientation is outside a threshold
"""
height_base = observations[10]
monoped_height_ok = self.monoped_height_ok(height_base)
monoped_orientation_ok = self.monoped_orientation_ok()
done = not(monoped_height_ok and monoped_orientation_ok)
return done
[docs] def _compute_reward(self, observations, done):
"""
We Base the rewards in if its done or not and we base it on
the joint poisition, effort, contact force, orientation and distance from desired point.
:return:
"""
joints_state_array = observations[5:8]
r1 = self.calculate_reward_joint_position(joints_state_array, self.weight_joint_position)
# Desired Force in Newtons, taken form idle contact with 9.81 gravity.
force_magnitude = observations[4]
r2 = self.calculate_reward_contact_force(force_magnitude, self.weight_contact_force)
rpy_array = observations[1:4]
r3 = self.calculate_reward_orientation(rpy_array, self.weight_orientation)
current_position = Point()
current_position.x = observations[8]
current_position.y = observations[9]
current_position.z = observations[10]
r4 = self.calculate_reward_distance_from_des_point(current_position, self.weight_distance_from_des_point)
# The sign depend on its function.
total_reward = self.alive_reward - r1 - r2 - r3 - r4
rospy.logdebug("###############")
rospy.logdebug("alive_bonus=" + str(self.alive_reward))
rospy.logdebug("r1 joint_position=" + str(r1))
rospy.logdebug("r2 contact_force=" + str(r2))
rospy.logdebug("r3 orientation=" + str(r3))
rospy.logdebug("r4 distance=" + str(r4))
rospy.logdebug("total_reward=" + str(total_reward))
rospy.logdebug("###############")
return total_reward
# Internal TaskEnv Methods
[docs] def is_in_desired_position(self,current_position, epsilon=0.05):
"""
It return True if the current position is similar to the desired poistion
"""
is_in_desired_pos = False
x_pos_plus = self.desired_point.x + epsilon
x_pos_minus = self.desired_point.x - epsilon
y_pos_plus = self.desired_point.y + epsilon
y_pos_minus = self.desired_point.y - epsilon
x_current = current_position.x
y_current = current_position.y
x_pos_are_close = (x_current <= x_pos_plus) and (x_current > x_pos_minus)
y_pos_are_close = (y_current <= y_pos_plus) and (y_current > y_pos_minus)
is_in_desired_pos = x_pos_are_close and y_pos_are_close
rospy.logdebug("###### IS DESIRED POS ? ######")
rospy.logdebug("current_position"+str(current_position))
rospy.logdebug("x_pos_plus"+str(x_pos_plus)+",x_pos_minus="+str(x_pos_minus))
rospy.logdebug("y_pos_plus"+str(y_pos_plus)+",y_pos_minus="+str(y_pos_minus))
rospy.logdebug("x_pos_are_close"+str(x_pos_are_close))
rospy.logdebug("y_pos_are_close"+str(y_pos_are_close))
rospy.logdebug("is_in_desired_pos"+str(is_in_desired_pos))
rospy.logdebug("############")
return is_in_desired_pos
[docs] def is_inside_workspace(self,current_position):
"""
Check if the monoped is inside the Workspace defined
"""
is_inside = False
rospy.logdebug("##### INSIDE WORK SPACE? #######")
rospy.logdebug("XYZ current_position"+str(current_position))
rospy.logdebug("work_space_x_max"+str(self.work_space_x_max)+",work_space_x_min="+str(self.work_space_x_min))
rospy.logdebug("work_space_y_max"+str(self.work_space_y_max)+",work_space_y_min="+str(self.work_space_y_min))
rospy.logdebug("work_space_z_max"+str(self.work_space_z_max)+",work_space_z_min="+str(self.work_space_z_min))
rospy.logdebug("############")
if current_position.x > self.work_space_x_min and current_position.x <= self.work_space_x_max:
if current_position.y > self.work_space_y_min and current_position.y <= self.work_space_y_max:
if current_position.z > self.work_space_z_min and current_position.z <= self.work_space_z_max:
is_inside = True
return is_inside
[docs] def sonar_detected_something_too_close(self, sonar_value):
"""
Detects if there is something too close to the monoped front
"""
rospy.logdebug("##### SONAR TOO CLOSE? #######")
rospy.logdebug("sonar_value"+str(sonar_value)+",min_sonar_value="+str(self.min_sonar_value))
rospy.logdebug("############")
too_close = sonar_value < self.min_sonar_value
return too_close
[docs] def monoped_has_flipped(self,current_orientation):
"""
Based on the orientation RPY given states if the monoped has flipped
"""
has_flipped = True
self.max_roll = rospy.get_param("/monoped/max_roll")
self.max_pitch = rospy.get_param("/monoped/max_pitch")
rospy.logdebug("#### HAS FLIPPED? ########")
rospy.logdebug("RPY current_orientation"+str(current_orientation))
rospy.logdebug("max_roll"+str(self.max_roll)+",min_roll="+str(-1*self.max_roll))
rospy.logdebug("max_pitch"+str(self.max_pitch)+",min_pitch="+str(-1*self.max_pitch))
rospy.logdebug("############")
if current_orientation.x > -1*self.max_roll and current_orientation.x <= self.max_roll:
if current_orientation.y > -1*self.max_pitch and current_orientation.y <= self.max_pitch:
has_flipped = False
return has_flipped
[docs] def get_distance_from_desired_point(self, current_position):
"""
Calculates the distance from the current position to the desired point
:param start_point:
:return:
"""
distance = self.get_distance_from_point(current_position,
self.desired_point)
return distance
[docs] def get_distance_from_point(self, pstart, p_end):
"""
Given a Vector3 Object, get distance from current position
:param p_end:
:return:
"""
a = numpy.array((pstart.x, pstart.y, pstart.z))
b = numpy.array((p_end.x, p_end.y, p_end.z))
distance = numpy.linalg.norm(a - b)
return distance
[docs] def get_orientation_euler(self, quaternion_vector):
# We convert from quaternions to euler
orientation_list = [quaternion_vector.x,
quaternion_vector.y,
quaternion_vector.z,
quaternion_vector.w]
roll, pitch, yaw = euler_from_quaternion(orientation_list)
return roll, pitch, yaw
[docs] def get_base_rpy(self):
imu = self.get_imu()
base_orientation = imu.orientation
euler_rpy = Vector3()
euler = euler_from_quaternion([ base_orientation.x,
base_orientation.y,
base_orientation.z,
base_orientation.w]
)
euler_rpy.x = euler[0]
euler_rpy.y = euler[1]
euler_rpy.z = euler[2]
return euler_rpy
[docs] def monoped_height_ok(self, height_base):
height_ok = self.min_height <= height_base < self.max_height
return height_ok
[docs] def monoped_orientation_ok(self):
orientation_rpy = self.get_base_rpy()
roll_ok = self.max_incl_roll > abs(orientation_rpy.x)
pitch_ok = self.max_incl_pitch > abs(orientation_rpy.y)
orientation_ok = roll_ok and pitch_ok
return orientation_ok
[docs] def calculate_reward_joint_position(self, joints_state_array, weight=1.0):
"""
We calculate reward base on the joints configuration. The more near 0 the better.
:return:
"""
acumulated_joint_pos = 0.0
for joint_pos in joints_state_array:
# Abs to remove sign influence, it doesnt matter the direction of turn.
acumulated_joint_pos += abs(joint_pos)
rospy.logdebug("calculate_reward_joint_position>>acumulated_joint_pos=" + str(acumulated_joint_pos))
reward = weight * acumulated_joint_pos
rospy.logdebug("calculate_reward_joint_position>>reward=" + str(reward))
return reward
[docs] def calculate_reward_orientation(self, rpy_array, weight=1.0):
"""
We calculate the reward based on the orientation.
The more its closser to 0 the better because it means its upright
desired_yaw is the yaw that we want it to be.
to praise it to have a certain orientation, here is where to set it.
:param: rpy_array: Its an array with Roll Pitch and Yaw in place 0, 1 and 2 respectively.
:return:
"""
yaw_displacement = rpy_array[2] - self.desired_yaw
acumulated_orientation_displacement = abs(rpy_array[0]) + abs(rpy_array[1]) + abs(yaw_displacement)
reward = weight * acumulated_orientation_displacement
rospy.logdebug("calculate_reward_orientation>>reward=" + str(reward))
return reward
[docs] def calculate_reward_distance_from_des_point(self, current_position, weight=1.0):
"""
We calculate the distance from the desired point.
The closser the better
:param weight:
:return:reward
"""
distance = self.get_distance_from_desired_point(current_position)
reward = weight * distance
rospy.logdebug("calculate_reward_orientation>>reward=" + str(reward))
return reward