Source code for openai_ros.task_envs.shadow_tc.learn_to_pick_ball

import rospy
import numpy
from gym import spaces
from openai_ros.robot_envs import shadow_tc_env
from gym.envs.registration import register
from geometry_msgs.msg import Point
from geometry_msgs.msg import Vector3
from tf.transformations import euler_from_quaternion

timestep_limit_per_episode = 10000 # Can be any Value

register(
        id='ShadowTcGetBall-v0',
        entry_point='openai_ros:task_envs.shadow_tc.learn_to_pick_ball.ShadowTcGetBallEnv',
        timestep_limit=timestep_limit_per_episode,
    )

[docs]class ShadowTcGetBallEnv(shadow_tc_env.ShadowTcEnv):
[docs] def __init__(self): """ Make ShadowTc learn how pick up a ball """ # We execute this one before because there are some functions that this # TaskEnv uses that use variables from the parent class, like the effort limit fetch. super(ShadowTcGetBallEnv, self).__init__() # Here we will add any init functions prior to starting the MyRobotEnv # Only variable needed to be set here rospy.logdebug("Start ShadowTcGetBallEnv INIT...") number_actions = rospy.get_param('/shadow_tc/n_actions') self.action_space = spaces.Discrete(number_actions) # We set the reward range, which is not compulsory but here we do it. self.reward_range = (-numpy.inf, numpy.inf) self.movement_delta =rospy.get_param("/shadow_tc/movement_delta") self.work_space_x_max = rospy.get_param("/shadow_tc/work_space/x_max") self.work_space_x_min = rospy.get_param("/shadow_tc/work_space/x_min") self.work_space_y_max = rospy.get_param("/shadow_tc/work_space/y_max") self.work_space_y_min = rospy.get_param("/shadow_tc/work_space/y_min") self.work_space_z_max = rospy.get_param("/shadow_tc/work_space/z_max") self.work_space_z_min = rospy.get_param("/shadow_tc/work_space/z_min") self.dec_obs = rospy.get_param("/shadow_tc/number_decimals_precision_obs") self.acceptable_distance_to_ball = rospy.get_param("/shadow_tc/acceptable_distance_to_ball") # We place the Maximum and minimum values of observations # TODO: Fill when get_observations is done. high = numpy.array([self.work_space_x_max, self.work_space_y_max, self.work_space_z_max, 1,1,1]) low = numpy.array([ self.work_space_x_min, self.work_space_y_min, self.work_space_z_min, 0,0,0]) self.observation_space = spaces.Box(low, high) rospy.logdebug("ACTION SPACES TYPE===>"+str(self.action_space)) rospy.logdebug("OBSERVATION SPACES TYPE===>"+str(self.observation_space)) # Rewards self.done_reward =rospy.get_param("/shadow_tc/done_reward") self.closer_to_block_reward = rospy.get_param("/shadow_tc/closer_to_block_reward") self.cumulated_steps = 0.0 rospy.logdebug("END shadow_tcGetBallEnv INIT...")
[docs] def _set_init_pose(self): """ Sets the UR5 arm to the initial position and the objects to the original position. """ rospy.logdebug("START _set_init_pose...") # We set the angles to zero of the limb self.reset_scene() rospy.logdebug("END _set_init_pose...") return True
[docs] def _init_env_variables(self): """ Inits variables needed to be initialised each time we reset at the start of an episode. :return: """ rospy.logdebug("START TaskEnv _init_env_variables") # For Info Purposes self.cumulated_reward = 0.0 self.ball_pose = self.get_ball_pose() tcp_pose = self.get_tip_pose() rospy.logdebug("TCP POSE ===>"+str(tcp_pose)) self.previous_distance_from_ball = self.get_distance_from_point(self.ball_pose.position, tcp_pose.position) rospy.logdebug("END TaskEnv _init_env_variables")
[docs] def _set_action(self, action): """ It sets the joints of shadow_tc based on the action integer given based on the action number given. :param action: The action integer that sets what movement to do next. """ rospy.logdebug("Start Set Action ==>"+str(action)) increment_vector = Vector3() action_id="move" if action == 0: # Increase X increment_vector.x = self.movement_delta elif action == 1: # Decrease X increment_vector.x = -1*self.movement_delta elif action == 2: # Increase Y increment_vector.y = self.movement_delta elif action == 3: # Decrease Y increment_vector.y = -1*self.movement_delta elif action == 4: # Increase Z increment_vector.z = self.movement_delta elif action == 5: # Decrease Z increment_vector.z = -1*self.movement_delta elif action == 6: # Open Claw action_id = "open" elif action == 7: # Close Claw action_id = "close" rospy.logdebug("Action_id="+str(action_id)+",IncrementVector===>"+str(increment_vector)) if action_id == "move": # We tell shadow_tc the action to perform # We dont change the RPY, therefore it will always be zero self.move_tip( x=increment_vector.x, y=increment_vector.y, z=increment_vector.z) elif action_id == "open": self.open_hand() elif action_id == "close": self.close_hand() rospy.logdebug("END Set Action ==>"+str(action)+",action_id="+str(action_id)+",IncrementVector===>"+str(increment_vector))
[docs] def _get_obs(self): """ Here we define what sensor data defines our robots observations To know which Variables we have access to, we need to read the shadow_tcEnv API DOCS. :return: observation """ rospy.logdebug("Start Get Observation ==>") tcp_pose = self.get_tip_pose() # We dont add it to the observations because is not part of the robot self.ball_pose = self.get_ball_pose() # We activate the Finguer collision detection self.finger_collided_dict = self.get_fingers_colision(object_collision_name="cricket_ball__link") f1_collided = self.finger_collided_dict["f1"] f2_collided = self.finger_collided_dict["f2"] f3_collided = self.finger_collided_dict["f3"] observation = [ round(tcp_pose.position.x,self.dec_obs), round(tcp_pose.position.y,self.dec_obs), round(tcp_pose.position.z,self.dec_obs), int(f1_collided), int(f2_collided), int(f3_collided) ] rospy.logdebug("Observations ==>"+str(observation)) rospy.logdebug("END Get Observation ==>") return observation
[docs] def _is_done(self, observations): """ We consider the episode done if: 1) The shadow_tc TCP is outside the workspace. 2) The TCP to block distance is lower than a threshold ( it got to the place ) and the the collisions in the figuers are true. """ tcp_pos = Vector3() tcp_pos.x = observations[0] tcp_pos.y = observations[1] tcp_pos.z = observations[2] # We check if all three finguers have collided with the ball finguers_collided = observations[3] and observations[4] and observations[5] bool_is_inside_workspace = self.is_inside_workspace(tcp_pos) has_reached_the_ball = self.reached_ball( tcp_pos, self.ball_pose.position, self.acceptable_distance_to_ball, finguers_collided) done = has_reached_the_ball or not(bool_is_inside_workspace) rospy.logdebug("#### IS DONE ? ####") rospy.logdebug("Not bool_is_inside_workspace ?="+str(not(bool_is_inside_workspace))) rospy.logdebug("has_reached_the_ball ?="+str(has_reached_the_ball)) rospy.logdebug("done ?="+str(done)) rospy.logdebug("#### #### ####") return done
[docs] def _compute_reward(self, observations, done): """ We Base the rewards in if its done or not and we base it on if the distance to the ball has increased or not. :return: """ tcp_pos = Vector3() tcp_pos.x = observations[0] tcp_pos.y = observations[1] tcp_pos.z = observations[2] # We check if all three finguers have collided with the ball finguers_collided = observations[3] and observations[4] and observations[5] distance_from_ball = self.get_distance_from_point(self.ball_pose.position, tcp_pos) distance_difference = distance_from_ball - self.previous_distance_from_ball if not done: # If there has been a decrease in the distance to the desired point, we reward it if distance_difference < 0.0: rospy.logerr("NOT ERROR: DECREASE IN DISTANCE GOOD") reward = self.closer_to_block_reward else: rospy.logerr("NOT ERROR: ENCREASE IN DISTANCE BAD") #reward = -1*self.closer_to_block_reward reward = 0.0 else: has_reached_the_ball = self.reached_ball( tcp_pos, self.ball_pose.position, self.acceptable_distance_to_ball, finguers_collided) if has_reached_the_ball: reward = self.done_reward else: reward = -1*self.done_reward self.previous_distance_from_ball = distance_from_ball rospy.logdebug("reward=" + str(reward)) self.cumulated_reward += reward rospy.logdebug("Cumulated_reward=" + str(self.cumulated_reward)) self.cumulated_steps += 1 rospy.logdebug("Cumulated_steps=" + str(self.cumulated_steps)) return reward
# Internal TaskEnv Methods
[docs] def reached_ball(self,tcp_position, ball_position, minimum_distance, finguers_collided): """ Return true if the distance from TCP position to the ball position is lower than the minimum_distance and all three finguers are touching the ball. """ distance_from_ball = self.get_distance_from_point(tcp_position, ball_position) distance_to_ball_ok = distance_from_ball < minimum_distance reached_ball_b = distance_to_ball_ok and finguers_collided rospy.logdebug("###### REACHED BLOCK ? ######") rospy.logdebug("distance_from_ball==>"+str(distance_from_ball)) rospy.logdebug("distance_to_ball_ok==>"+str(distance_to_ball_ok)) rospy.logdebug("reached_ball_b==>"+str(reached_ball_b)) rospy.logdebug("finguers_collided==>"+str(finguers_collided)) rospy.logdebug("############") return reached_ball_b
[docs] def get_distance_from_point(self, pstart, p_end): """ Given a Vector3 Object, get distance from current position :param p_end: :return: """ a = numpy.array((pstart.x, pstart.y, pstart.z)) b = numpy.array((p_end.x, p_end.y, p_end.z)) distance = numpy.linalg.norm(a - b) return distance
[docs] def is_inside_workspace(self,current_position): """ Check if the shadow_tc is inside the Workspace defined """ is_inside = False rospy.logdebug("##### INSIDE WORK SPACE? #######") rospy.logdebug("XYZ current_position"+str(current_position)) rospy.logdebug("work_space_x_max"+str(self.work_space_x_max)+",work_space_x_min="+str(self.work_space_x_min)) rospy.logdebug("work_space_y_max"+str(self.work_space_y_max)+",work_space_y_min="+str(self.work_space_y_min)) rospy.logdebug("work_space_z_max"+str(self.work_space_z_max)+",work_space_z_min="+str(self.work_space_z_min)) rospy.logdebug("############") if current_position.x > self.work_space_x_min and current_position.x <= self.work_space_x_max: if current_position.y > self.work_space_y_min and current_position.y <= self.work_space_y_max: if current_position.z > self.work_space_z_min and current_position.z <= self.work_space_z_max: is_inside = True return is_inside