Source code for openai_ros.task_envs.parrotdrone.parrotdrone_goto

import rospy
import numpy
from gym import spaces
from openai_ros.robot_envs import parrotdrone_env
from gym.envs.registration import register
from geometry_msgs.msg import Point
from geometry_msgs.msg import Vector3
from tf.transformations import euler_from_quaternion

timestep_limit_per_episode = 10000 # Can be any Value

register(
        id='ParrotDroneGoto-v0',
        entry_point='openai_ros:task_envs.parrotdrone.parrotdrone_goto.ParrotDroneGotoEnv',
        timestep_limit=timestep_limit_per_episode,
    )

[docs]class ParrotDroneGotoEnv(parrotdrone_env.ParrotDroneEnv):
[docs] def __init__(self): """ Make parrotdrone learn how to navigate to get to a point """ # Only variable needed to be set here number_actions = rospy.get_param('/drone/n_actions') self.action_space = spaces.Discrete(number_actions) # We set the reward range, which is not compulsory but here we do it. self.reward_range = (-numpy.inf, numpy.inf) # Actions and Observations self.linear_forward_speed = rospy.get_param('/drone/linear_forward_speed') self.angular_turn_speed = rospy.get_param('/drone/angular_turn_speed') self.angular_speed = rospy.get_param('/drone/angular_speed') self.init_linear_speed_vector = Vector3() self.init_linear_speed_vector.x = rospy.get_param('/drone/init_linear_speed_vector/x') self.init_linear_speed_vector.y = rospy.get_param('/drone/init_linear_speed_vector/y') self.init_linear_speed_vector.z = rospy.get_param('/drone/init_linear_speed_vector/z') self.init_angular_turn_speed = rospy.get_param('/drone/init_angular_turn_speed') self.min_sonar_value = rospy.get_param('/drone/min_sonar_value') self.max_sonar_value = rospy.get_param('/drone/max_sonar_value') # Get WorkSpace Cube Dimensions self.work_space_x_max = rospy.get_param("/drone/work_space/x_max") self.work_space_x_min = rospy.get_param("/drone/work_space/x_min") self.work_space_y_max = rospy.get_param("/drone/work_space/y_max") self.work_space_y_min = rospy.get_param("/drone/work_space/y_min") self.work_space_z_max = rospy.get_param("/drone/work_space/z_max") self.work_space_z_min = rospy.get_param("/drone/work_space/z_min") # Maximum RPY values self.max_roll = rospy.get_param("/drone/max_roll") self.max_pitch = rospy.get_param("/drone/max_pitch") self.max_yaw = rospy.get_param("/drone/max_yaw") # Get Desired Point to Get self.desired_point = Point() self.desired_point.x = rospy.get_param("/drone/desired_pose/x") self.desired_point.y = rospy.get_param("/drone/desired_pose/y") self.desired_point.z = rospy.get_param("/drone/desired_pose/z") self.desired_point_epsilon = rospy.get_param("/drone/desired_point_epsilon") # We place the Maximum and minimum values of the X,Y,Z,R,P,Yof the pose high = numpy.array([self.work_space_x_max, self.work_space_y_max, self.work_space_z_max, self.max_roll, self.max_pitch, self.max_yaw, self.max_sonar_value]) low = numpy.array([ self.work_space_x_min, self.work_space_y_min, self.work_space_z_min, -1*self.max_roll, -1*self.max_pitch, -numpy.inf, self.min_sonar_value]) self.observation_space = spaces.Box(low, high) rospy.logdebug("ACTION SPACES TYPE===>"+str(self.action_space)) rospy.logdebug("OBSERVATION SPACES TYPE===>"+str(self.observation_space)) # Rewards self.closer_to_point_reward = rospy.get_param("/drone/closer_to_point_reward") self.not_ending_point_reward = rospy.get_param("/drone/not_ending_point_reward") self.end_episode_points = rospy.get_param("/drone/end_episode_points") self.cumulated_steps = 0.0 # Here we will add any init functions prior to starting the MyRobotEnv super(ParrotDroneGotoEnv, self).__init__()
[docs] def _set_init_pose(self): """ Sets the Robot in its init linear and angular speeds and lands the robot. Its preparing it to be reseted in the world. """ #raw_input("INIT SPEED PRESS") self.move_base(self.init_linear_speed_vector, self.init_angular_turn_speed, epsilon=0.05, update_rate=10) # We Issue the landing command to be sure it starts landing #raw_input("LAND PRESS") #self.land() return True
[docs] def _init_env_variables(self): """ Inits variables needed to be initialised each time we reset at the start of an episode. :return: """ #raw_input("TakeOFF PRESS") # We TakeOff before sending any movement commands self.takeoff() # For Info Purposes self.cumulated_reward = 0.0 # We get the initial pose to mesure the distance from the desired point. gt_pose = self.get_gt_pose() self.previous_distance_from_des_point = self.get_distance_from_desired_point(gt_pose.position)
[docs] def _set_action(self, action): """ This set action will Set the linear and angular speed of the drone based on the action number given. :param action: The action integer that set s what movement to do next. """ rospy.logdebug("Start Set Action ==>"+str(action)) # We convert the actions to speed movements to send to the parent class of Parrot linear_speed_vector = Vector3() angular_speed = 0.0 if action == 0: #FORWARDS linear_speed_vector.x = self.linear_forward_speed self.last_action = "FORWARDS" elif action == 1: #BACKWARDS linear_speed_vector.x = -1*self.linear_forward_speed self.last_action = "BACKWARDS" elif action == 2: #STRAFE_LEFT linear_speed_vector.y = self.linear_forward_speed self.last_action = "STRAFE_LEFT" elif action == 3: #STRAFE_RIGHT linear_speed_vector.y = -1*self.linear_forward_speed self.last_action = "STRAFE_RIGHT" elif action == 4: #UP linear_speed_vector.z = self.linear_forward_speed self.last_action = "UP" elif action == 5: #DOWN linear_speed_vector.z = -1*self.linear_forward_speed self.last_action = "DOWN" # We tell drone the linear and angular speed to set to execute self.move_base(linear_speed_vector, angular_speed, epsilon=0.05, update_rate=10) rospy.logdebug("END Set Action ==>"+str(action))
[docs] def _get_obs(self): """ Here we define what sensor data defines our robots observations To know which Variables we have acces to, we need to read the droneEnv API DOCS :return: """ rospy.logdebug("Start Get Observation ==>") # We get the laser scan data gt_pose = self.get_gt_pose() # We get the orientation of the cube in RPY roll, pitch, yaw = self.get_orientation_euler(gt_pose.orientation) # We get the sonar value sonar = self.get_sonar() sonar_value = sonar.range """ observations = [ round(gt_pose.position.x, 1), round(gt_pose.position.y, 1), round(gt_pose.position.z, 1), round(roll, 1), round(pitch, 1), round(yaw, 1), round(sonar_value,1)] """ # We simplify a bit the spatial grid to make learning faster observations = [ int(gt_pose.position.x), int(gt_pose.position.y), int(gt_pose.position.z), round(roll,1), round(pitch,1), round(yaw,1), round(sonar_value,1)] rospy.logdebug("Observations==>"+str(observations)) rospy.logdebug("END Get Observation ==>") return observations
[docs] def _is_done(self, observations): """ The done can be done due to three reasons: 1) It went outside the workspace 2) It detected something with the sonar that is too close 3) It flipped due to a crash or something 4) It has reached the desired point """ episode_done = False current_position = Point() current_position.x = observations[0] current_position.y = observations[1] current_position.z = observations[2] current_orientation = Point() current_orientation.x = observations[3] current_orientation.y = observations[4] current_orientation.z = observations[5] sonar_value = observations[6] is_inside_workspace_now = self.is_inside_workspace(current_position) sonar_detected_something_too_close_now = self.sonar_detected_something_too_close(sonar_value) drone_flipped = self.drone_has_flipped(current_orientation) has_reached_des_point = self.is_in_desired_position(current_position, self.desired_point_epsilon) rospy.logwarn(">>>>>> DONE RESULTS <<<<<") if not is_inside_workspace_now: rospy.logerr("is_inside_workspace_now="+str(is_inside_workspace_now)) else: rospy.logwarn("is_inside_workspace_now="+str(is_inside_workspace_now)) if sonar_detected_something_too_close_now: rospy.logerr("sonar_detected_something_too_close_now="+str(sonar_detected_something_too_close_now)) else: rospy.logwarn("sonar_detected_something_too_close_now="+str(sonar_detected_something_too_close_now)) if drone_flipped: rospy.logerr("drone_flipped="+str(drone_flipped)) else: rospy.logwarn("drone_flipped="+str(drone_flipped)) if has_reached_des_point: rospy.logerr("has_reached_des_point="+str(has_reached_des_point)) else: rospy.logwarn("has_reached_des_point="+str(has_reached_des_point)) # We see if we are outside the Learning Space episode_done = not(is_inside_workspace_now) or sonar_detected_something_too_close_now or drone_flipped or has_reached_des_point if episode_done: rospy.logerr("episode_done====>"+str(episode_done)) else: rospy.logwarn("episode_done====>"+str(episode_done)) return episode_done
[docs] def _compute_reward(self, observations, done): current_position = Point() current_position.x = observations[0] current_position.y = observations[1] current_position.z = observations[2] distance_from_des_point = self.get_distance_from_desired_point(current_position) distance_difference = distance_from_des_point - self.previous_distance_from_des_point if not done: # If there has been a decrease in the distance to the desired point, we reward it if distance_difference < 0.0: rospy.logwarn("DECREASE IN DISTANCE GOOD") reward = self.closer_to_point_reward else: rospy.logerr("ENCREASE IN DISTANCE BAD") reward = 0 else: if self.is_in_desired_position(current_position, epsilon=0.5): reward = self.end_episode_points else: reward = -1*self.end_episode_points self.previous_distance_from_des_point = distance_from_des_point rospy.logdebug("reward=" + str(reward)) self.cumulated_reward += reward rospy.logdebug("Cumulated_reward=" + str(self.cumulated_reward)) self.cumulated_steps += 1 rospy.logdebug("Cumulated_steps=" + str(self.cumulated_steps)) return reward
# Internal TaskEnv Methods
[docs] def is_in_desired_position(self,current_position, epsilon=0.05): """ It return True if the current position is similar to the desired poistion """ is_in_desired_pos = False x_pos_plus = self.desired_point.x + epsilon x_pos_minus = self.desired_point.x - epsilon y_pos_plus = self.desired_point.y + epsilon y_pos_minus = self.desired_point.y - epsilon x_current = current_position.x y_current = current_position.y x_pos_are_close = (x_current <= x_pos_plus) and (x_current > x_pos_minus) y_pos_are_close = (y_current <= y_pos_plus) and (y_current > y_pos_minus) is_in_desired_pos = x_pos_are_close and y_pos_are_close rospy.logwarn("###### IS DESIRED POS ? ######") rospy.logwarn("current_position"+str(current_position)) rospy.logwarn("x_pos_plus"+str(x_pos_plus)+",x_pos_minus="+str(x_pos_minus)) rospy.logwarn("y_pos_plus"+str(y_pos_plus)+",y_pos_minus="+str(y_pos_minus)) rospy.logwarn("x_pos_are_close"+str(x_pos_are_close)) rospy.logwarn("y_pos_are_close"+str(y_pos_are_close)) rospy.logwarn("is_in_desired_pos"+str(is_in_desired_pos)) rospy.logwarn("############") return is_in_desired_pos
[docs] def is_inside_workspace(self,current_position): """ Check if the Drone is inside the Workspace defined """ is_inside = False rospy.logwarn("##### INSIDE WORK SPACE? #######") rospy.logwarn("XYZ current_position"+str(current_position)) rospy.logwarn("work_space_x_max"+str(self.work_space_x_max)+",work_space_x_min="+str(self.work_space_x_min)) rospy.logwarn("work_space_y_max"+str(self.work_space_y_max)+",work_space_y_min="+str(self.work_space_y_min)) rospy.logwarn("work_space_z_max"+str(self.work_space_z_max)+",work_space_z_min="+str(self.work_space_z_min)) rospy.logwarn("############") if current_position.x > self.work_space_x_min and current_position.x <= self.work_space_x_max: if current_position.y > self.work_space_y_min and current_position.y <= self.work_space_y_max: if current_position.z > self.work_space_z_min and current_position.z <= self.work_space_z_max: is_inside = True return is_inside
[docs] def sonar_detected_something_too_close(self, sonar_value): """ Detects if there is something too close to the drone front """ rospy.logwarn("##### SONAR TOO CLOSE? #######") rospy.logwarn("sonar_value"+str(sonar_value)+",min_sonar_value="+str(self.min_sonar_value)) rospy.logwarn("############") too_close = sonar_value < self.min_sonar_value return too_close
[docs] def drone_has_flipped(self,current_orientation): """ Based on the orientation RPY given states if the drone has flipped """ has_flipped = True self.max_roll = rospy.get_param("/drone/max_roll") self.max_pitch = rospy.get_param("/drone/max_pitch") rospy.logwarn("#### HAS FLIPPED? ########") rospy.logwarn("RPY current_orientation"+str(current_orientation)) rospy.logwarn("max_roll"+str(self.max_roll)+",min_roll="+str(-1*self.max_roll)) rospy.logwarn("max_pitch"+str(self.max_pitch)+",min_pitch="+str(-1*self.max_pitch)) rospy.logwarn("############") if current_orientation.x > -1*self.max_roll and current_orientation.x <= self.max_roll: if current_orientation.y > -1*self.max_pitch and current_orientation.y <= self.max_pitch: has_flipped = False return has_flipped
[docs] def get_distance_from_desired_point(self, current_position): """ Calculates the distance from the current position to the desired point :param start_point: :return: """ distance = self.get_distance_from_point(current_position, self.desired_point) return distance
[docs] def get_distance_from_point(self, pstart, p_end): """ Given a Vector3 Object, get distance from current position :param p_end: :return: """ a = numpy.array((pstart.x, pstart.y, pstart.z)) b = numpy.array((p_end.x, p_end.y, p_end.z)) distance = numpy.linalg.norm(a - b) return distance
[docs] def get_orientation_euler(self, quaternion_vector): # We convert from quaternions to euler orientation_list = [quaternion_vector.x, quaternion_vector.y, quaternion_vector.z, quaternion_vector.w] roll, pitch, yaw = euler_from_quaternion(orientation_list) return roll, pitch, yaw