diff --git a/python/examples/cart_pulling.py b/python/examples/cart_pulling.py new file mode 100644 index 0000000000000000000000000000000000000000..784d5d3362b7b1678ce6b45b96e20e8d72da04fa --- /dev/null +++ b/python/examples/cart_pulling.py @@ -0,0 +1,71 @@ +# PYTHON_ARGCOMPLETE_OK +import numpy as np +import time +import argparse, argcomplete +from functools import partial +from ur_simple_control.managers import getMinimalArgParser, ControlLoopManager, RobotManager, ProcessManager +from ur_simple_control.optimal_control.get_ocp_args import get_OCP_args +from ur_simple_control.optimal_control.crocoddyl_optimal_control import createCrocoPathFollowingOCP1, solveCrocoOCP +from ur_simple_control.optimal_control.crocoddyl_mpc import CrocoPathFollowingMPCControlLoop, CrocoPathFollowingMPC +from ur_simple_control.basics.basics import followKinematicJointTrajP +from ur_simple_control.util.logging_utils import LogManager +from ur_simple_control.visualize.visualize import plotFromDict +from ur_simple_control.clik.clik import getClikArgs +import pinocchio as pin +import crocoddyl +import importlib.util +from ur_simple_control.path_generation.planner import starPlanner, getPlanningArgs +import yaml + + +def get_args(): + parser = getMinimalArgParser() + parser = get_OCP_args(parser) + parser = getClikArgs(parser) # literally just for goal error + parser = getPlanningArgs(parser) + parser.add_argument('--handlebar-height', type=float, default=0.5) + argcomplete.autocomplete(parser) + args = parser.parse_args() + # TODO TODO TODO: REMOVE PRESET HACKS + robot_type = "Unicycle" + with open(args.planning_robot_params_file) as f: + params = yaml.safe_load(f) + robot_params = params[robot_type] + with open(args.tunnel_mpc_params_file) as f: + params = yaml.safe_load(f) + mpc_params = params["tunnel_mpc"] + args.np = mpc_params['np'] + args.n_pol = mpc_params['n_pol'] + return args + + +if __name__ == "__main__": + args = get_args() + if importlib.util.find_spec('mim_solvers'): + import mim_solvers + robot = RobotManager(args) + #time.sleep(5) + + x0 = np.concatenate([robot.getQ(), robot.getQd()]) + robot._step() + goal = np.array([0.5, 5.5]) + planning_function = partial(starPlanner, goal) + path_planner = ProcessManager(args, planning_function, None, 2, None) + CrocoPathFollowingMPC(args, robot, x0, path_planner) + + print("final position:") + print(robot.getT_w_e()) + + if args.save_log: + robot.log_manager.plotAllControlLoops() + + if not args.pinocchio_only: + robot.stopRobot() + + if args.visualize_manipulator: + robot.killManipulatorVisualizer() + + if args.save_log: + robot.log_manager.saveLog() + #loop_manager.stopHandler(None, None) + diff --git a/python/examples/force_control_test.py b/python/examples/force_control_test.py new file mode 100644 index 0000000000000000000000000000000000000000..949e06a9583c2d554bc31832dfab04a08949804d --- /dev/null +++ b/python/examples/force_control_test.py @@ -0,0 +1,70 @@ +import pinocchio as pin +import numpy as np +import time +import argparse +from functools import partial +from ur_simple_control.managers import getMinimalArgParser, ControlLoopManager, RobotManager + +def get_args(): + parser = getMinimalArgParser() + parser.description = 'force control example' + # add more arguments here from different Simple Manipulator Control modules + args = parser.parse_args() + return args + +def controlLoopForceEx(args, robot : RobotManager, T_w_e_init, i, past_data): + """ + controlLoop + ----------------------------- + controller description + """ + breakFlag = False + log_item = {} + save_past_dict = {} + + q = robot.getQ() + T_w_e = robot.getT_w_e() + wrench = robot.getWrench() + save_past_dict['wrench'] = wrench.copy() + wrench = 0.5 * wrench + (1 - 0.5) * np.average(np.array(past_data['wrench']), axis=0) + J = pin.computeFrameJacobian(robot.model, robot.data, q, robot.ee_frame_id) + + Kp = 1.0 + wrench_ref = np.array([0.0, 0.0, 2.0, 0.0, 0.0, 0.0]) + error = T_w_e.actInv(T_w_e_init) + error_v = pin.log6(error).vector + error_v[2] = 0.0 + qd_cmd = Kp * J.T @ error_v + J.T @ (wrench_ref - wrench) + + robot.sendQd(qd_cmd) + + log_item['qs'] = q.reshape((robot.model.nq,)) + log_item['dqs'] = robot.getQd().reshape((robot.model.nv,)) + log_item['wrench'] = wrench + return breakFlag, save_past_dict, log_item + +if __name__ == "__main__": + args = get_args() + robot = RobotManager(args) + q_init = robot.getQ() + T_w_e_init = robot.getT_w_e() + controlLoop = partial(controlLoopForceEx, args, robot, T_w_e_init) + log_item = {'qs' : np.zeros(robot.model.nq), + 'dqs' : np.zeros(robot.model.nv), + 'wrench' : np.zeros(6)} + loop_manager = ControlLoopManager(robot, controlLoop, args, {'wrench' : np.zeros(6)} , log_item) + loop_manager.run() + + # get expected behaviour here (library can't know what the end is - you have to do this here) + if not args.pinocchio_only: + robot.stopRobot() + + if args.save_log: + robot.log_manager.plotAllControlLoops() + + if args.visualize_manipulator: + robot.killManipulatorVisualizer() + + if args.save_log: + robot.log_manager.saveLog() + #loop_manager.stopHandler(None, None) diff --git a/python/ur_simple_control/__init__.py b/python/ur_simple_control/__init__.py index 75986aa4b30ac7e439246be67064f4e90dc56679..6668633de352c6519659a5f88371a9157178c2eb 100644 --- a/python/ur_simple_control/__init__.py +++ b/python/ur_simple_control/__init__.py @@ -1,4 +1,4 @@ -from ur_simple_control import clik, dmp, robot_descriptions, util, visualize +from ur_simple_control import clik, dmp, robot_descriptions, util, visualize, path_generation __all__ = [ -"clik", "dmp", "robot_descriptions", "util", "visualize", "basics", "vision" +"clik", "dmp", "robot_descriptions", "util", "visualize", "basics", "vision", "path_generation" ] diff --git a/python/ur_simple_control/__pycache__/__init__.cpython-312.pyc b/python/ur_simple_control/__pycache__/__init__.cpython-312.pyc index d2618cf293d8233bf32c84fc0512a79ee7bcca00..962f7ffb2839b04d64de59bffc301462294b51dd 100644 Binary files a/python/ur_simple_control/__pycache__/__init__.cpython-312.pyc and b/python/ur_simple_control/__pycache__/__init__.cpython-312.pyc differ diff --git a/python/ur_simple_control/__pycache__/managers.cpython-312.pyc b/python/ur_simple_control/__pycache__/managers.cpython-312.pyc index 0fbef10f71ac938cc681cd484d7d87e0a2dd77e8..fd39d940c984e5b9546f816ca6829f8138fb8e99 100644 Binary files a/python/ur_simple_control/__pycache__/managers.cpython-312.pyc and b/python/ur_simple_control/__pycache__/managers.cpython-312.pyc differ diff --git a/python/ur_simple_control/managers.py b/python/ur_simple_control/managers.py index b6be29c9f186d8f31cc1a4f9c033947a1c8edfab..c097dd826cde9f24438f1c66eeaace6a8e4bee77 100644 --- a/python/ur_simple_control/managers.py +++ b/python/ur_simple_control/managers.py @@ -469,6 +469,8 @@ class RobotManager: # also TODO: figure out how to best solve the gripper_velocity problem # NOTE: you need to initialize differently for other types of joints + # TODO: add the option for this being a shared variable, + # and then write ifs with locks to make this correct. self.q = np.zeros(self.model.nq) # planar joint is [x, y, cos(theta), sin(theta)], # so it can't be all zeros @@ -1038,6 +1040,42 @@ class ProcessManager: if self.command_queue.qsize() < 5: self.command_queue.put_nowait(command) + def sendFreshestCommand(self, command : dict): + """ + sendFreshestCommand + ------------ + assumes you're calling from controlLoop and that + you want a non-blocking call. + there can be only 1 command in the queue, + and that it is the latest command you want to issue. + since ControlLoopManager is the master of all processes, + and it runs the fastest, most of the time the slave process + won't pop the command currently in queue, so ControlLoopManager + will have to update it, + and this is to be done by emptying the queue and then putting in the latest command. + shouldn't this be solved with shared memory? + yes, absolutely. but then i need to write more code than i am + willing to do right now, and it would complicate the code + a bit more than it's worth, losing it's readability for + non-computer science users. + if the point was to write the fastest code, this wouldn't be in python anyway. + it would be good to add an option to select whether + a command is shared via a queue, or shared_memory, but + this will be done only if the preformance is too low + (it probably won't be because a bit of copy-ing will certainly + be insignificant compared to any interesting matrix operation". + NOTE: this assumes only the master is putting stuff in, + and only the slave is popping it if there's something + (the slave has to a blocking call for this to make sense). + worst case the slave gets one cycle old data, but again, + we're not optimizing all the way + """ + # This literally doesn't work ubt whatever: + if self.command_queue.qsize() < 1: + #self.command_queue.get_nowait() + self.command_queue.put_nowait(command) + + def getData(self): if not self.data_queue.empty(): self.lastest_data = self.data_queue.get_nowait() diff --git a/python/ur_simple_control/optimal_control/crocoddyl_mpc.py b/python/ur_simple_control/optimal_control/crocoddyl_mpc.py index 9431fbecd8982616bf80dd736732ff76f7c8cc80..b012566e7276a33ef8ca8c4df6d79e35d96a2583 100644 --- a/python/ur_simple_control/optimal_control/crocoddyl_mpc.py +++ b/python/ur_simple_control/optimal_control/crocoddyl_mpc.py @@ -1,4 +1,4 @@ -from ur_simple_control.managers import getMinimalArgParser, ControlLoopManager, RobotManager +from ur_simple_control.managers import getMinimalArgParser, ControlLoopManager, RobotManager, ProcessManager from ur_simple_control.optimal_control.crocoddyl_optimal_control import createCrocoIKOCP, createCrocoPathFollowingOCP1 import pinocchio as pin import crocoddyl @@ -92,7 +92,121 @@ def CrocoIKMPC(args, robot, x0, goal): loop_manager = ControlLoopManager(robot, controlLoop, args, save_past_dict, log_item) loop_manager.run() -def CrocoPathFollowingMPCControlLoop(args, robot : RobotManager, solver : crocoddyl.SolverBoxFDDP, path, i, past_data): + +def pathPointFromPathParam(n_pol, path_dim, path_pol, s): + return [np.polyval(path_pol[j*(n_pol+1):(j+1)*(n_pol+1)], s) for j in range(path_dim)] + +def path2D_to_timed_SE3(args, path_pol, path2D_untimed, max_base_v): + """ + path2D_to_timed_SE3 + --------------------- + we have a 2D path of (N,2) shape as reference + the OCP accepts SE3 (it could just rotations or translation too), + so this function constructs it from the 2D path. + it also times it as this is what the current ocp formulation needs: + there should be a timing associated with the path, + because defining the cost function as the fit of the rollout to the path + is complicated to do software-wise. + as it is now, we're pre-selecting points of the path, and associating those + with rollout states at a set time step between the states in the rollout. + this isn't a problem if we have an idea of how fast the robot can go, + which gives us a heuristic of how to select the points (if you went at maximum + speed, this is how far you'd go in this time, so this is your point). + thankfully this does not need to be exact because we just care about the distance + between the current point and the point on the path, so if it's further out + that just means the error is larger, and that doesn't necessarily correspond + to a different action. + NOTE: we are constructing a possibly bullshit + trajectory here, it's a man-made heuristic, + and we should leave that to the MPC, + but that requires too much coding and there is no time rn. + """ + + #################################################### + # getting a timed 2D trajectory from a heuristic # + #################################################### + # the strategy is somewhat reasonable at least: + # assume we're moving at 90% max velocity in the base, + # and use that. + perc_of_max_v = 0.9 + base_v = perc_of_max_v * max_base_v + + # 1) the length of the path divided by 0.9 * max_vel + # gives us the total time of the trajectory, + # so we first compute that + # easiest possible way to get approximate path length + # (yes it should be from the polynomial approximation but that takes time to write) + x_i = path2D_untimed[:,0][:-1] # no last element + y_i = path2D_untimed[:,1][:-1] # no last element + x_i_plus_1 = path2D_untimed[:,0][1:] # no first element + y_i_plus_1 = path2D_untimed[:,1][1:] # no first element + x_diff = x_i_plus_1 - x_i + x_diff = x_diff.reshape((-1,1)) + y_diff = y_i_plus_1 - y_i + y_diff = y_diff.reshape((-1,1)) + path_length = np.sum(np.linalg.norm(np.hstack((x_diff, y_diff)), axis=1)) + total_time = path_length / base_v + # 2) we find the correspondence between s and time + # TODO: read from where it should be, but seba checked that it's 5 for some reason + max_s = 5 + t_to_s = max_s / total_time + # 3) construct the ocp-timed 2D path + path2D = [] + # time is i * args.ocp_dt + for i in range(args.n_knots + 1): + s = (i * args.ocp_dt) * t_to_s + path2D.append(pathPointFromPathParam(args.n_pol, args.np, path_pol, s)) + path2D = np.array(path2D) + + #################################################### + # constructing the SE3 reference from [x,y] path # + #################################################### + # 1) we need to add points so that there are args.n_knots of them + # the -1 is here because we're finite differencing to get theta, + # so we need one extra + #path2D = list(path2D) + ## in case there's too few + #while len(path2D) - 1 < args.n_knots: + # path2D.append(path2D[-1]) + ## in case there's too much + #while len(path2D) - 1> args.n_knots: + # path2D.pop() + #path2D = np.array(path2D) + + + ######################### + # path2D into pathSE2 # + ######################### + # construct theta + # since it's a pairwise operation it can't be done on the last point + x_i = path2D[:,0][:-1] # no last element + y_i = path2D[:,1][:-1] # no last element + x_i_plus_1 = path2D[:,0][1:] # no first element + y_i_plus_1 = path2D[:,1][1:] # no first element + x_diff = x_i_plus_1 - x_i + y_diff = y_i_plus_1 - y_i + # elementwise arctan2 + thetas = np.arctan2(x_diff, y_diff) + + #thetas = thetas.reshape((-1, 1)) + #path_SE2 = np.hstack((path2D, thetas)) + + ###################################### + # construct SE3 from SE2 reference # + ###################################### + # the plan is parallel to the ground because it's a mobile + # manipulation task + # TODO: enforce timing, interpolate the path accordingly + path = [] + for i in range(len(path2D) - 1): + rotation = pin.rpy.rpyToMatrix(0.0, 0.0, thetas[i]) + translation = np.array([path2D[i][0], path2D[i][1], + args.handlebar_height]) + path.append(pin.SE3(rotation, translation)) + + return path + +def CrocoPathFollowingMPCControlLoop(args, robot : RobotManager, solver : crocoddyl.SolverBoxFDDP, path_planner : ProcessManager, i, past_data): """ CrocoPathFollowingMPCControlLoop ----------------------------- @@ -103,29 +217,44 @@ def CrocoPathFollowingMPCControlLoop(args, robot : RobotManager, solver : crocod save_past_dict = {} q = robot.getQ() - wrench = robot.getWrench() + p = q[:2] + path_planner.sendFreshestCommand({'p' : p}) + + # NOTE: it's pointless to recalculate the path every time + # if it's the same 2D path + data = path_planner.getData() + if data == None: + if args.debug_prints: + print("got no path so no doing anything") + robot.sendQd(np.zeros(robot.model.nv)) + log_item['qs'] = q.reshape((robot.model.nq,)) + log_item['dqs'] = robot.getQd().reshape((robot.model.nv,)) + return breakFlag, save_past_dict, log_item + + path_pol, path2D_untimed = data + path2D_untimed = np.array(path2D_untimed).reshape((-1,2)) + # should be precomputed somewhere but this is nowhere near the biggest problem + max_base_v = np.linalg.norm(robot.model.velocityLimit[:2]) - # what i should have more or less - the path - # changes, and should be updated from a different thread - # so that it's ready when it gets here - # TODO: implement - #path = robot.getPath() - # 1) make 6D path out of [[x0,y0],...] # that represents the center of the cart + pathSE3 = path2D_to_timed_SE3(args, path_pol, path2D_untimed, max_base_v) + # 2) do T_mobile_base_ee_pos to get # end-effector reference frame(s) + ##############################################################33 # generate bullshit just to see it works - T_w_e = robot.getT_w_e() - path = [] - t = i * robot.dt - for i in range(args.n_knots): - t += 0.01 - new = T_w_e.copy() - translation = 2 * np.array([np.cos(t/20), np.sin(t/20), 0.3]) - new.translation = translation - path.append(new) + #T_w_e = robot.getT_w_e() + #path = [] + #t = i * robot.dt + #for i in range(args.n_knots): + # t += 0.01 + # new = T_w_e.copy() + # translation = 2 * np.array([np.cos(t/20), np.sin(t/20), 0.3]) + # new.translation = translation + # path.append(new) + ##############################################################33 x0 = np.concatenate([robot.getQ(), robot.getQd()]) @@ -136,10 +265,10 @@ def CrocoPathFollowingMPCControlLoop(args, robot : RobotManager, solver : crocod us_init = list(solver.us[1:]) + [solver.us[-1]] for i, runningModel in enumerate(solver.problem.runningModels): - runningModel.differential.costs.costs['gripperPose' + str(i)].cost.residual.reference = path[i] + runningModel.differential.costs.costs['gripperPose' + str(i)].cost.residual.reference = pathSE3[i] # idk if that's necessary - solver.problem.terminalModel.differential.costs.costs['gripperPose'+str(args.n_knots)].cost.residual.reference = path[-1] + solver.problem.terminalModel.differential.costs.costs['gripperPose'+str(args.n_knots)].cost.residual.reference = pathSE3[-1] solver.solve(xs_init, us_init, args.max_solver_iter) xs = np.array(solver.xs) @@ -153,8 +282,7 @@ def CrocoPathFollowingMPCControlLoop(args, robot : RobotManager, solver : crocod return breakFlag, save_past_dict, log_item -# NOTE: not using actual path rn -def CrocoPathFollowingMPC(args, robot, x0, path): +def CrocoPathFollowingMPC(args, robot, x0, path_planner): """ IKMPC ----- @@ -163,7 +291,8 @@ def CrocoPathFollowingMPC(args, robot, x0, path): a dynamics level, and velocities we command are actually extracted from the state x(q,dq) """ - problem = createCrocoPathFollowingOCP1(args, robot, x0, path) + + problem = createCrocoPathFollowingOCP1(args, robot, x0) if args.solver == "boxfddp": solver = crocoddyl.SolverBoxFDDP(problem) if args.solver == "csqp": @@ -177,7 +306,7 @@ def CrocoPathFollowingMPC(args, robot, x0, path): us_init = solver.problem.quasiStatic([x0] * solver.problem.T) solver.solve(xs_init, us_init, args.max_solver_iter) - controlLoop = partial(CrocoPathFollowingMPCControlLoop, args, robot, solver, path) + controlLoop = partial(CrocoPathFollowingMPCControlLoop, args, robot, solver, path_planner) log_item = { 'qs' : np.zeros(robot.model.nq), 'dqs' : np.zeros(robot.model.nv) diff --git a/python/ur_simple_control/optimal_control/crocoddyl_optimal_control.py b/python/ur_simple_control/optimal_control/crocoddyl_optimal_control.py index f1856a434cc7802c4ce2c11bd4d8976dadecf51b..76b770e02362651d9671ee0d185e7dd7b714a347 100644 --- a/python/ur_simple_control/optimal_control/crocoddyl_optimal_control.py +++ b/python/ur_simple_control/optimal_control/crocoddyl_optimal_control.py @@ -222,13 +222,16 @@ def solveCrocoOCP(args, robot, problem, x0): -def createCrocoPathFollowingOCP1(args, robot : RobotManager, x0, path : list[pin.SE3]): +def createCrocoPathFollowingOCP1(args, robot : RobotManager, x0): """ createCrocoPathFollowingOCP1 ------------------------------- - follows the provided path. + creates a path following problem. + it is instantiated to just to stay at the current position. NOTE: the path MUST be time indexed with the SAME time used between the knots """ + T_w_e = robot.getT_w_e() + path = [T_w_e] * args.n_knots # create torque bounds which correspond to percentage # of maximum allowed acceleration # NOTE: idk if this makes any sense, but let's go for it diff --git a/python/ur_simple_control/path_generation/README.md b/python/ur_simple_control/path_generation/README.md new file mode 100644 index 0000000000000000000000000000000000000000..feb858bd08227c5fc6c8f941c3bf096a5970fb07 --- /dev/null +++ b/python/ur_simple_control/path_generation/README.md @@ -0,0 +1,6 @@ +Credit +------- +The path planning algorithms have been developed +by Albin Dahlin, both theoretically and in implementation. +The details can be found in Albin's [PhD thesis](https://research.chalmers.se/publication/538686/file/538686_Fulltext.pdf). +The code has been somewhat reworked to fit into the library. diff --git a/python/ur_simple_control/path_generation/__init__.py b/python/ur_simple_control/path_generation/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..812668e783b576ca4a4a134cc12ac0f11c5ab58b --- /dev/null +++ b/python/ur_simple_control/path_generation/__init__.py @@ -0,0 +1,5 @@ +#import importlib.util +#if importlib.util.find_spec('casadi'): +# from .create_pinocchio_casadi_ocp import * +from .planner import * +#import starworlds diff --git a/python/ur_simple_control/path_generation/path_generator.py b/python/ur_simple_control/path_generation/path_generator.py new file mode 100644 index 0000000000000000000000000000000000000000..3f6bea641f44fc51f0e53b42a80b03483a24d882 --- /dev/null +++ b/python/ur_simple_control/path_generation/path_generator.py @@ -0,0 +1,629 @@ +from typing import List +from abc import ABC, abstractmethod + +import numpy as np +from starworlds.obstacles import StarshapedObstacle +from starworlds.starshaped_hull import cluster_and_starify, ObstacleCluster +from starworlds.utils.misc import tic, toc +import shapely +import yaml + +import matplotlib.pyplot as plt +import matplotlib.collections as plt_col + + + +### mobile_robot.py + +class MobileRobot(ABC): + + def __init__(self, nu, nx, width, name, u_min=None, u_max=None, x_min=None, x_max=None): + """ + nx : int number of state variables + nu : int number of input variables + baseline : float distance between the wheels + name : str the name of the robot + u_min : np.ndarray minimum input value + u_max : np.ndarray maximum input value + x_min : np.ndarray minimum state value + x_max : np.ndarray maximum state value + """ + self.nx = nx + self.nu = nu + self.width = width + self.name = name + def valid_u_bound(bound): return bound is not None and len(bound) == self.nu + def valid_q_bound(bound): return bound is not None and len(bound) == self.nx + self.u_min = u_min if valid_u_bound(u_min) else [-np.inf] * self.nu + self.u_max = u_max if valid_u_bound(u_max) else [np.inf] * self.nu + self.x_min = x_min if valid_q_bound(x_min) else [-np.inf] * self.nx + self.x_max = x_max if valid_q_bound(x_max) else [np.inf] * self.nx + + @abstractmethod + def f(self, x, u): + """ Forward dynamics ? """ + pass + + @abstractmethod + def h(self, q): + """ Forward kinematics """ + pass + + @abstractmethod + def vel_min(self): + pass + + @abstractmethod + def vel_max(self): + pass + + def move(self, x, u, dt): + u_sat = np.clip(u, self.u_min, self.u_max) + x_next = x + np.array(self.f(x, u_sat)) * dt + x_next = np.clip(x_next, self.x_min, self.x_max) + return x_next, u_sat + + +class Unicycle(MobileRobot): + + def __init__(self, width, vel_min=None, vel_max=None, name='robot'): + self.vmax = vel_max[0] + super().__init__(nu=2, nx=3, width=width, name=name, u_min=vel_min, u_max=vel_max) + + def f(self, x, u): + return [u[0] * np.cos(x[2]), # vx + u[0] * np.sin(x[2]), # vy + u[1]] # wz + + def h(self, x): + return x[:2] # x, y + + def vel_min(self): + return self.u_min + + def vel_max(self): + return self.u_max + + def init_plot(self, ax=None, color='b', alpha=0.7, markersize=10): + handles, ax = super(Unicycle, self).init_plot(ax=ax, color=color, alpha=alpha) + handles += ax.plot(0, 0, marker=(3, 0, np.rad2deg(0)), markersize=markersize, color=color) + handles += ax.plot(0, 0, marker=(2, 0, np.rad2deg(0)), markersize=0.5*markersize, color='w') + return handles, ax + + def update_plot(self, x, handles): + super(Unicycle, self).update_plot(x, handles) + handles[1].set_data([x[0]], [x[1]]) + handles[1].set_marker((3, 0, np.rad2deg(x[2]-np.pi/2))) + handles[1].set_markersize(handles[1].get_markersize()) + handles[2].set_data([x[0]], [x[1]]) + handles[2].set_marker((2, 0, np.rad2deg(x[2]-np.pi/2))) + handles[2].set_markersize(handles[2].get_markersize()) + + + +### tunnel_mpc_controller.py + +class SceneUpdater(): + + def __init__(self, params: dict, verbosity=0): + self.params = params + self.verbosity = verbosity + self.reset() + + def reset(self): + self.obstacle_clusters : List[ObstacleCluster] = None + self.free_star = None + + # workspace_modification.py + + # TODO: Check why computational time varies so much for same static scene + @staticmethod + def workspace_modification(obstacles, p, pg, rho0, max_compute_time, hull_epsilon, gamma=0.5, make_convex=0, obstacle_clusters_prev=None, free_star_prev=None, verbosity=0): + + # Clearance variable initialization + rho = rho0 / gamma # First rho should be rho0 + t_init = tic() + + while True: + if toc(t_init) > max_compute_time: + if verbosity > 0: + print("[Workspace modification]: Max compute time in rho iteration.") + break + + # Reduce rho + rho *= gamma + + # Pad obstacles with rho + obstacles_rho = [o.dilated_obstacle(padding=rho, id="duplicate") for o in obstacles] + + # TODO: Fix boundaries + free_rho = shapely.geometry.box(-20, -20, 20, 20) + for o in obstacles_rho: + free_rho = free_rho.difference(o.polygon()) + + # TODO: Check buffering fix + # Find P0 + Bp = shapely.geometry.Point(p).buffer(0.95 * rho) + initial_reference_set = Bp.intersection(free_rho.buffer(-0.1 * rho)) + + if not initial_reference_set.is_empty: + break + + # Initial and goal reference position selection + r0_sh, _ = shapely.ops.nearest_points(initial_reference_set, shapely.geometry.Point(p)) + r0 = np.array(r0_sh.coords[0]) + rg_sh, _ = shapely.ops.nearest_points(free_rho, shapely.geometry.Point(pg)) + rg = np.array(rg_sh.coords[0]) + + + # TODO: Check more thoroughly + if free_star_prev is not None: + free_star_prev = free_star_prev.buffer(-1e-4) + if free_star_prev is not None and free_star_prev.contains(r0_sh) and free_star_prev.contains(rg_sh) and free_rho.contains(free_star_prev):# not any([free_star_prev.covers(o.polygon()) for o in obstacles_rho]): + if verbosity > 1: + print("[Workspace modification]: Reuse workspace from previous time step.") + obstacle_clusters = obstacle_clusters_prev + exit_flag = 10 + else: + # Apply cluster and starify + obstacle_clusters, obstacle_timing, exit_flag, n_iter = \ + cluster_and_starify(obstacles_rho, r0, rg, hull_epsilon, max_compute_time=max_compute_time-toc(t_init), + previous_clusters=obstacle_clusters_prev, + make_convex=make_convex, verbose=verbosity) + + free_star = shapely.geometry.box(-20, -20, 20, 20) + for o in obstacle_clusters: + free_star = free_star.difference(o.polygon()) + + compute_time = toc(t_init) + return obstacle_clusters, r0, rg, rho, free_star, compute_time, exit_flag + + def update(self, p: np.ndarray, pg: np.ndarray, obstacles) -> tuple[np.ndarray, np.ndarray, float, list[StarshapedObstacle]]: + # Update obstacles + if not self.params['use_prev_workspace']: + self.free_star = None + self.obstacle_clusters, r0, rg, rho, self.free_star, _, _ = SceneUpdater.workspace_modification( + obstacles, p, pg, self.params['rho0'], self.params['max_obs_compute_time'], + self.params['hull_epsilon'], self.params['gamma'], + make_convex=self.params['make_convex'], obstacle_clusters_prev=self.obstacle_clusters, + free_star_prev=self.free_star, verbosity=self.verbosity) + obstacles_star : List[StarshapedObstacle] = [o.cluster_obstacle for o in self.obstacle_clusters] + # Make sure all polygon representations are computed + [o._compute_polygon_representation() for o in obstacles_star] + + return r0, rg, rho, obstacles_star + + +class PathGenerator(): + + def __init__(self, params: dict, verbosity=0): + self.params = params + self.verbosity = verbosity + self.reset() + + def reset(self): + self.target_path = [] + + ### soads.py + + # TODO: Check if can make more computationally efficient + @staticmethod + def soads_f(r, rg, obstacles: List[StarshapedObstacle], adapt_obstacle_velocity=False, unit_magnitude=False, crep=1., reactivity=1., tail_effect=False, convergence_tolerance=1e-4, d=False): + goal_vector = rg - r + goal_dist = np.linalg.norm(goal_vector) + if goal_dist < convergence_tolerance: + return 0 * r + + No = len(obstacles) + fa = goal_vector / goal_dist # Attractor dynamics + if No == 0: + return fa + + ef = [-fa[1], fa[0]] + Rf = np.vstack((fa, ef)).T + + mu = [obs.reference_direction(r) for obs in obstacles] + normal = [obs.normal(r) for obs in obstacles] + gamma = [obs.distance_function(r) for obs in obstacles] + # Compute weights + w = PathGenerator.compute_weights(gamma, weightPow=1) + + # Compute obstacle velocities + xd_o = np.zeros((2, No)) + if adapt_obstacle_velocity: + for i, obs in enumerate(obstacles): + xd_o[:, i] = obs.vel_intertial_frame(r) + + kappa = 0. + f_mag = 0. + for i in range(No): + # Compute basis matrix + E = np.zeros((2, 2)) + E[:, 0] = mu[i] + E[:, 1] = [-normal[i][1], normal[i][0]] + # Compute eigenvalues + D = np.zeros((2, 2)) + D[0, 0] = 1 - crep / (gamma[i] ** (1 / reactivity)) if tail_effect or normal[i].dot(fa) < 0. else 1 + D[1, 1] = 1 + 1 / gamma[i] ** (1 / reactivity) + # Compute modulation + M = E.dot(D).dot(np.linalg.inv(E)) + # f_i = M.dot(fa) + f_i = M.dot(fa - xd_o[:, i]) + xd_o[:, i] + # Compute contribution to velocity magnitude + f_i_abs = np.linalg.norm(f_i) + f_mag += w[i] * f_i_abs + # Compute contribution to velocity direction + nu_i = f_i / f_i_abs + nu_i_hat = Rf.T.dot(nu_i) + kappa_i = np.arccos(np.clip(nu_i_hat[0], -1, 1)) * np.sign(nu_i_hat[1]) + kappa += w[i] * kappa_i + kappa_norm = abs(kappa) + f_o = Rf.dot([np.cos(kappa_norm), np.sin(kappa_norm) / kappa_norm * kappa]) if kappa_norm > 0. else fa + + if unit_magnitude: + f_mag = 1. + return f_mag * f_o + + @staticmethod + def compute_weights( + distMeas, + N=0, + distMeas_lowerLimit=1, + weightType="inverseGamma", + weightPow=2, + ): + """Compute weights based on a distance measure (with no upper limit)""" + distMeas = np.array(distMeas) + n_points = distMeas.shape[0] + + critical_points = distMeas <= distMeas_lowerLimit + + if np.sum(critical_points): # at least one + if np.sum(critical_points) == 1: + w = critical_points * 1.0 + return w + else: + # TODO: continuous weighting function + # warnings.warn("Implement continuity of weighting function.") + w = critical_points * 1.0 / np.sum(critical_points) + return w + + distMeas = distMeas - distMeas_lowerLimit + w = (1 / distMeas) ** weightPow + if np.sum(w) == 0: + return w + w = w / np.sum(w) # Normalization + return w + + ### path_generator.py + + @staticmethod + def pol2pos(path_pol, s): + n_pol = len(path_pol) // 2 - 1 + return [sum([path_pol[j * (n_pol + 1) + i] * s ** (n_pol - i) for i in range(n_pol + 1)]) for j in range(2)] + + @staticmethod + def path_generator(r0, rg, obstacles, dp_max, N, dt, max_compute_time, n_pol, ds_decay_rate=0.5, + ds_increase_rate=2., max_nr_steps=1000, convergence_tolerance=1e-5, P_prev=None, s_prev=None, + reactivity=1., crep=1., tail_effect=False, reference_step_size=0.5, verbosity=0): + """ + r0 : np.ndarray initial reference position + rg : np.ndarray goal reference position + obstacles : list[StarshapedObstacle] obstacles in the scene + dp_max : float maximum position increment (i.e. vmax * dt) + N : int ??? + dt : float time step + max_compute_time : float timeout for computation + n_pol : int degree of the polynomial that fits the reference ??? + ds_decay_rate : float ??? + ds_increase_rate : float ??? + max_nr_steps : int computation steps threshold + convergence_tolerance : float ??? + P_prev : np.ndarray previous path ??? + s_prev : np.ndarray previous path time ??? + reactivity : float ??? + crep : float ??? + tail_effect : bool ??? + reference_step_size : float ??? + verbosity : int you guessed it... + """ + + t0 = tic() + + # Initialize + ds = 1 + s = np.zeros(max_nr_steps) + r = np.zeros((max_nr_steps, r0.size)) + if P_prev is not None: + i = P_prev.shape[0] + r[:i, :] = P_prev + s[:i] = s_prev + else: + i = 1 + r[0, :] = r0 + + while True: + dist_to_goal = np.linalg.norm(r[i - 1, :] - rg) + # Check exit conditions + if dist_to_goal < convergence_tolerance: + if verbosity > 1: + print(f"[Path Generator]: Path converged. {int(100 * (s[i - 1] / N))}% of path completed.") + break + if s[i - 1] >= N: + if verbosity > 1: + print(f"[Path Generator]: Completed path length. {int(100 * (s[i - 1] / N))}% of path completed.") + break + if toc(t0) > max_compute_time: + if verbosity > 1: + print(f"[Path Generator]: Max compute time in path integrator. {int(100 * (s[i - 1] / N))}% of path completed.") + break + if i >= max_nr_steps: + if verbosity > 1: + print(f"[Path Generator]: Max steps taken in path integrator. {int(100 * (s[i - 1] / N))}% of path completed.") + break + + # Movement using SOADS dynamics + dr = min(dp_max, dist_to_goal) * PathGenerator.soads_f(r[i - 1, :], rg, obstacles, adapt_obstacle_velocity=False, + unit_magnitude=True, crep=crep, + reactivity=reactivity, tail_effect=tail_effect, + convergence_tolerance=convergence_tolerance) + + r[i, :] = r[i - 1, :] + dr * ds + + ri_in_obstacle = False + while any([o.interior_point(r[i, :]) for o in obstacles]): + if verbosity > 1: + print("[Path Generator]: Path inside obstacle. Reducing integration step from {:5f} to {:5f}.".format(ds, ds*ds_decay_rate)) + ds *= ds_decay_rate + r[i, :] = r[i - 1, :] + dr * ds + # Additional compute time check + if toc(t0) > max_compute_time: + ri_in_obstacle = True + break + if ri_in_obstacle: + continue + + # Update travelled distance + s[i] = s[i - 1] + ds + # Try to increase step rate again + ds = min(ds_increase_rate * ds, 1) + # Increase iteration counter + i += 1 + + r = r[:i, :] + s = s[:i] + + # Evenly spaced path + s_vec = np.arange(0, s[-1] + reference_step_size, reference_step_size) + xs, ys = np.interp(s_vec, s, r[:, 0]), np.interp(s_vec, s, r[:, 1]) + # Append not finished path with fixed final position + s_vec = np.append(s_vec, np.arange(s[-1] + reference_step_size, N + reference_step_size, reference_step_size)) + xs = np.append(xs, xs[-1] * np.ones(len(s_vec)-len(xs))) + ys = np.append(ys, ys[-1] * np.ones(len(s_vec)-len(ys))) + + reference_path = [el for p in zip(xs, ys) for el in p] # [x0 y0 x1 y1 ...] + + # TODO: Fix when close to goal + # TODO: Adjust for short arc length, skip higher order terms.. + path_pol = np.polyfit(s_vec, reference_path[::2], n_pol).tolist() + \ + np.polyfit(s_vec, reference_path[1::2], n_pol).tolist() # [px0 px1 ... pxn py0 py1 ... pyn] + # Force init position to be correct + path_pol[n_pol] = reference_path[0] + path_pol[-1] = reference_path[1] + + # Compute polyfit approximation error + epsilon = [np.linalg.norm(np.array(reference_path[2 * i:2 * (i + 1)]) - np.array(PathGenerator.pol2pos(path_pol, s_vec[i]))) for i in range(N + 1)] + + compute_time = toc(t0) + + """ + path_pol : np.ndarray the polynomial approximation of `reference_path` + epsilon : [float] approximation error between the polynomial fit and the actual path + reference_path : list the actual path (used for P_prev later on) in [x1, y1, x2, y2, ...] format + compute_time : float overall timing of this function + """ + return path_pol, epsilon, reference_path, compute_time + + def prepare_prev(self, p: np.ndarray, rho: float, obstacles_star: List[StarshapedObstacle]): + P_prev = np.array([self.target_path[::2], self.target_path[1::2]]).T + # Shift path to start at point closest to robot position + P_prev = P_prev[np.argmin(np.linalg.norm(p - P_prev, axis=1)):, :] + # P_prev[0, :] = self.r0 + if np.linalg.norm(p - P_prev[0, :]) > rho: + if self.verbosity > 0: + print("[Path Generator]: No reuse of previous path. Path not within distance rho from robot.") + P_prev = None + else: + for r in P_prev: + if any([o.interior_point(r) for o in obstacles_star]): + if self.verbosity > 0: + print("[Path Generator]: No reuse of previous path. Path not collision-free.") + P_prev = None + + if P_prev is not None: + # Cut off stand still padding in previous path + P_prev_stepsize = np.linalg.norm(np.diff(P_prev, axis=0), axis=1) + s_prev = np.hstack((0, np.cumsum(P_prev_stepsize) / self.params['dp_max'])) + P_prev_mask = [True] + (P_prev_stepsize > 1e-8).tolist() + P_prev = P_prev[P_prev_mask, :] + s_prev = s_prev[P_prev_mask] + else: + s_prev = None + + return P_prev, s_prev + + def update(self, p: np.ndarray, r0: np.ndarray, rg: np.ndarray, rho: float, obstacles_star: List[StarshapedObstacle]) -> tuple[List[float], float]: + # Buffer previous target path + if self.params['buffer'] and self.target_path: + P_prev, s_prev = self.prepare_prev(p, rho, obstacles_star) + else: + P_prev, s_prev = None, None + # Generate the new path + path_pol, epsilon, self.target_path, _ = PathGenerator.path_generator( + r0, rg, obstacles_star, self.params['dp_max'], self.params['N'], + self.params['dt'], self.params['max_compute_time'], self.params['n_pol'], + ds_decay_rate=0.5, ds_increase_rate=2., max_nr_steps=1000, P_prev=P_prev, s_prev=s_prev, + reactivity=self.params['reactivity'], crep=self.params['crep'], + convergence_tolerance=self.params['convergence_tolerance'], verbosity=self.verbosity) + return path_pol, epsilon + + +class MpcController(): + + def __init__(self, params: dict, robot, verbosity=0): + self.params = params + self.robot = robot + # self.mpc_solver = TunnelMpc(params, robot) + self.verbosity = verbosity + self.reset() + + def reset(self): + # self.mpc_solver.reset() + self.u_prev = [0] * self.robot.nu + + def check_convergence(self, p: np.ndarray, pg: np.ndarray): + return np.linalg.norm(p - pg) < self.params['convergence_margin'] + + def e_max(self, rho, epsilon): + if rho > 0: + e_max = rho - max(epsilon) + else: + e_max = 1.e6 + return e_max + + def ref(self, path_pol, s): + n_pol = self.params['n_pol'] + return [np.polyval(path_pol[j*(n_pol+1):(j+1)*(n_pol+1)], s) for j in range(self.params['np'])] + + def compute_u(self, x, p, path_pol, rg, rho, epsilon): + # Compute MPC solution + + # e_max = self.e_max(rho, epsilon) # parameter for tracking error constraint + # solution = self.mpc_solver.run(x.tolist(), self.u_prev, path_pol, self.params, e_max, rg.tolist(), self.verbosity) # call to Rust solver + # # Extract first control signal and store it for later + # self.u_prev = solution['u'][:self.robot.nu] + + p = np.array(p) + e_max = self.e_max(rho, epsilon) + s_kappa = 0.9 * e_max / self.robot.vmax + p_ref = self.ref(path_pol, s_kappa) + + err = p_ref - p + dir = np.array([np.cos(x[2]), np.sin(x[2])]) + vel = 0.65 * self.robot.vmax * max(0.1, (dir @ (err / np.linalg.norm(err)))) + wel = 0.85 * ((np.arctan2(err[1], err[0]) - x[2] + np.pi) % (2*np.pi) - np.pi) + + alpha = 1 + self.u_prev = [alpha*vel, alpha*wel] + + return np.array(self.u_prev) + + +if __name__ == "__main__": + + from starworlds.obstacles import StarshapedPolygon + + # environment + obstacles = [ + StarshapedPolygon([[2, 2], [8, 2], [8, 3], [2, 3]]), + StarshapedPolygon([[2, 3], [3, 3], [3, 4.25], [2, 4.25]]), + StarshapedPolygon([[2, 5], [8, 5], [8, 6], [2, 6]]), + StarshapedPolygon([[2, 8], [8, 8], [8, 9], [2, 9]]), + ] + pg = np.array([0.5, 5.5]) # goal position + p0 = np.array([9, 4]) # initial position + theta0 = np.arctan2(pg[1]-p0[1], pg[0]-p0[0]) # initial heading (simply start heading towards goal) + + # robot + robot_type = "Unicycle" + with open(r'robot_params.yaml') as f: + params = yaml.safe_load(f) + robot_params = params[robot_type] + robot = Unicycle(width=robot_params['width'], + vel_min=[robot_params['lin_vel_min'], -robot_params['ang_vel_max']], + vel_max=[robot_params['lin_vel_max'], robot_params['ang_vel_max']], + name=robot_type) + x0 = np.append(p0, [theta0]) # initial robot state + + # MPC + with open(r'./tunnel_mpc_params.yaml') as f: + params = yaml.safe_load(f) + mpc_params = params["tunnel_mpc"] + mpc_params['dp_max'] = robot.vmax * mpc_params['dt'] + + # components + verbosity = 1 + scene_updater = SceneUpdater(mpc_params, verbosity) + path_gen = PathGenerator(mpc_params, verbosity) + controller = MpcController(mpc_params, robot, verbosity) + + + # plotting + fig = plt.figure() + handle_goal = plt.plot(*pg, c="g")[0] + handle_init = plt.plot(*x0[:2], c="b")[0] + handle_curr = plt.plot(*x0[:2], c="r", marker=(3, 0, np.rad2deg(x0[2]-np.pi/2)), markersize=10)[0] + handle_curr_dir = plt.plot(0, 0, marker=(2, 0, np.rad2deg(0)), markersize=5, color='w')[0] + handle_path = plt.plot([], [], c="k")[0] + coll = plt_col.PolyCollection([ + np.array([[2, 2], [8, 2], [8, 3], [2, 3]]), + np.array([[2, 3], [3, 3], [3, 4.25], [2, 4.25]]), + np.array([[2, 5], [8, 5], [8, 6], [2, 6]]), + np.array([[2, 8], [8, 8], [8, 9], [2, 9]]) + ]) + plt.gca().add_collection(coll) + handle_title = plt.text(5, 9.5, "", bbox={'facecolor':'w', 'alpha':0.5, 'pad':5}, ha="center") + plt.gca().set_aspect("equal") + plt.xlim([0, 10]) + plt.ylim([0, 10]) + plt.draw() + + + # run the controller + T_max = 30 + dt = controller.params['dt'] + t = 0. + x = x0 + u_prev = np.zeros(robot.nu) + convergence_threshold = 0.05 + converged = False + try: + while t < T_max: + p = robot.h(x) + + if np.linalg.norm(p - pg) < convergence_threshold: + break + + # Udpate the scene + r0, rg, rho, obstacles_star = scene_updater.update(p, pg, obstacles) + + # Check for convergence + if controller.check_convergence(p, pg): + u = np.zeros(robot.nu) + else: + # Update target path + path_pol, epsilon = path_gen.update(p, r0, rg, rho, obstacles_star) + # Calculate next control input + u = controller.compute_u(x, p, path_pol, rg, rho, epsilon) + + # update robot position + x, _ = robot.move(x, u, dt) + u_prev = u + t += dt + + # plot + handle_curr.set_data([x[0]], [x[1]]) + handle_curr.set_marker((3, 0, np.rad2deg(x[2]-np.pi/2))) + handle_curr_dir.set_data([x[0]], [x[1]]) + handle_curr_dir.set_marker((2, 0, np.rad2deg(x[2]-np.pi/2))) + handle_path.set_data([path_gen.target_path[::2], path_gen.target_path[1::2]]) + handle_title.set_text(f"{t:5.3f}") + fig.canvas.draw() + plt.pause(0.005) + + except Exception as ex: + raise ex + pass + + plt.show() + diff --git a/python/ur_simple_control/path_generation/planner.py b/python/ur_simple_control/path_generation/planner.py new file mode 100644 index 0000000000000000000000000000000000000000..655d8157a31d2ea0776d066861e16ade4b92d8ee --- /dev/null +++ b/python/ur_simple_control/path_generation/planner.py @@ -0,0 +1,525 @@ +from typing import List +from abc import ABC, abstractmethod + +import numpy as np +from ur_simple_control.path_generation.starworlds.obstacles import StarshapedObstacle +from ur_simple_control.path_generation.starworlds.starshaped_hull import cluster_and_starify, ObstacleCluster +from ur_simple_control.path_generation.starworlds.utils.misc import tic, toc +from ur_simple_control.path_generation.star_navigation.robot.unicycle import Unicycle +from ur_simple_control.path_generation.starworlds.obstacles import StarshapedPolygon +import shapely +import yaml + +import matplotlib.pyplot as plt +import matplotlib.collections as plt_col +from multiprocessing import Queue + +def getPlanningArgs(parser): + parser.add_argument('--planning-robot-params-file', type=str, + default='/home/gospodar/lund/praxis/projects/ur_simple_control/python/ur_simple_control/path_generation/robot_params.yaml', + help="path to robot params file, required for path planning because it takes kinematic constraints into account") + parser.add_argument('--tunnel-mpc-params-file', type=str, + default='/home/gospodar/lund/praxis/projects/ur_simple_control/python/ur_simple_control/path_generation/tunnel_mpc_params.yaml', + help="path to mpc (in original tunnel) params file, required for path planning because it takes kinematic constraints into account") + parser.add_argument('--n-pol', type=int, + default='0', + help="IDK, TODO, rn this is just a preset hack value put into args for easier data transfer") + parser.add_argument('--np', type=int, + default='0', + help="IDK, TODO, rn this is just a preset hack value put into args for easier data transfer") + return parser + +class SceneUpdater(): + def __init__(self, params: dict, verbosity=0): + self.params = params + self.verbosity = verbosity + self.reset() + + def reset(self): + self.obstacle_clusters : List[ObstacleCluster] = None + self.free_star = None + + # TODO: Check why computational time varies so much for same static scene + @staticmethod + def workspace_modification(obstacles, p, pg, rho0, max_compute_time, hull_epsilon, gamma=0.5, make_convex=0, obstacle_clusters_prev=None, free_star_prev=None, verbosity=0): + + # Clearance variable initialization + rho = rho0 / gamma # First rho should be rho0 + t_init = tic() + + while True: + if toc(t_init) > max_compute_time: + if verbosity > 0: + print("[Workspace modification]: Max compute time in rho iteration.") + break + + # Reduce rho + rho *= gamma + + # Pad obstacles with rho + obstacles_rho = [o.dilated_obstacle(padding=rho, id="duplicate") for o in obstacles] + + # TODO: Fix boundaries + free_rho = shapely.geometry.box(-20, -20, 20, 20) + for o in obstacles_rho: + free_rho = free_rho.difference(o.polygon()) + + # TODO: Check buffering fix + # Find P0 + Bp = shapely.geometry.Point(p).buffer(0.95 * rho) + initial_reference_set = Bp.intersection(free_rho.buffer(-0.1 * rho)) + + if not initial_reference_set.is_empty: + break + + # Initial and goal reference position selection + r0_sh, _ = shapely.ops.nearest_points(initial_reference_set, shapely.geometry.Point(p)) + r0 = np.array(r0_sh.coords[0]) + rg_sh, _ = shapely.ops.nearest_points(free_rho, shapely.geometry.Point(pg)) + rg = np.array(rg_sh.coords[0]) + + + # TODO: Check more thoroughly + if free_star_prev is not None: + free_star_prev = free_star_prev.buffer(-1e-4) + if free_star_prev is not None and free_star_prev.contains(r0_sh) and free_star_prev.contains(rg_sh) and free_rho.contains(free_star_prev):# not any([free_star_prev.covers(o.polygon()) for o in obstacles_rho]): + if verbosity > 1: + print("[Workspace modification]: Reuse workspace from previous time step.") + obstacle_clusters = obstacle_clusters_prev + exit_flag = 10 + else: + # Apply cluster and starify + obstacle_clusters, obstacle_timing, exit_flag, n_iter = \ + cluster_and_starify(obstacles_rho, r0, rg, hull_epsilon, max_compute_time=max_compute_time-toc(t_init), + previous_clusters=obstacle_clusters_prev, + make_convex=make_convex, verbose=verbosity) + + free_star = shapely.geometry.box(-20, -20, 20, 20) + for o in obstacle_clusters: + free_star = free_star.difference(o.polygon()) + + compute_time = toc(t_init) + return obstacle_clusters, r0, rg, rho, free_star, compute_time, exit_flag + + def update(self, p: np.ndarray, pg: np.ndarray, obstacles) -> tuple[np.ndarray, np.ndarray, float, list[StarshapedObstacle]]: + # Update obstacles + if not self.params['use_prev_workspace']: + self.free_star = None + self.obstacle_clusters, r0, rg, rho, self.free_star, _, _ = SceneUpdater.workspace_modification( + obstacles, p, pg, self.params['rho0'], self.params['max_obs_compute_time'], + self.params['hull_epsilon'], self.params['gamma'], + make_convex=self.params['make_convex'], obstacle_clusters_prev=self.obstacle_clusters, + free_star_prev=self.free_star, verbosity=self.verbosity) + obstacles_star : List[StarshapedObstacle] = [o.cluster_obstacle for o in self.obstacle_clusters] + # Make sure all polygon representations are computed + [o._compute_polygon_representation() for o in obstacles_star] + + return r0, rg, rho, obstacles_star + + +class PathGenerator(): + def __init__(self, params: dict, verbosity=0): + self.params = params + self.verbosity = verbosity + self.reset() + + def reset(self): + self.target_path = [] + + ### soads.py + + # TODO: Check if can make more computationally efficient + @staticmethod + def soads_f(r, rg, obstacles: List[StarshapedObstacle], adapt_obstacle_velocity=False, unit_magnitude=False, crep=1., reactivity=1., tail_effect=False, convergence_tolerance=1e-4, d=False): + goal_vector = rg - r + goal_dist = np.linalg.norm(goal_vector) + if goal_dist < convergence_tolerance: + return 0 * r + + No = len(obstacles) + fa = goal_vector / goal_dist # Attractor dynamics + if No == 0: + return fa + + ef = [-fa[1], fa[0]] + Rf = np.vstack((fa, ef)).T + + mu = [obs.reference_direction(r) for obs in obstacles] + normal = [obs.normal(r) for obs in obstacles] + gamma = [obs.distance_function(r) for obs in obstacles] + # Compute weights + w = PathGenerator.compute_weights(gamma, weightPow=1) + + # Compute obstacle velocities + xd_o = np.zeros((2, No)) + if adapt_obstacle_velocity: + for i, obs in enumerate(obstacles): + xd_o[:, i] = obs.vel_intertial_frame(r) + + kappa = 0. + f_mag = 0. + for i in range(No): + # Compute basis matrix + E = np.zeros((2, 2)) + E[:, 0] = mu[i] + E[:, 1] = [-normal[i][1], normal[i][0]] + # Compute eigenvalues + D = np.zeros((2, 2)) + D[0, 0] = 1 - crep / (gamma[i] ** (1 / reactivity)) if tail_effect or normal[i].dot(fa) < 0. else 1 + D[1, 1] = 1 + 1 / gamma[i] ** (1 / reactivity) + # Compute modulation + M = E.dot(D).dot(np.linalg.inv(E)) + # f_i = M.dot(fa) + f_i = M.dot(fa - xd_o[:, i]) + xd_o[:, i] + # Compute contribution to velocity magnitude + f_i_abs = np.linalg.norm(f_i) + f_mag += w[i] * f_i_abs + # Compute contribution to velocity direction + nu_i = f_i / f_i_abs + nu_i_hat = Rf.T.dot(nu_i) + kappa_i = np.arccos(np.clip(nu_i_hat[0], -1, 1)) * np.sign(nu_i_hat[1]) + kappa += w[i] * kappa_i + kappa_norm = abs(kappa) + f_o = Rf.dot([np.cos(kappa_norm), np.sin(kappa_norm) / kappa_norm * kappa]) if kappa_norm > 0. else fa + + if unit_magnitude: + f_mag = 1. + return f_mag * f_o + + @staticmethod + def compute_weights( + distMeas, + N=0, + distMeas_lowerLimit=1, + weightType="inverseGamma", + weightPow=2, + ): + """Compute weights based on a distance measure (with no upper limit)""" + distMeas = np.array(distMeas) + n_points = distMeas.shape[0] + + critical_points = distMeas <= distMeas_lowerLimit + + if np.sum(critical_points): # at least one + if np.sum(critical_points) == 1: + w = critical_points * 1.0 + return w + else: + # TODO: continuous weighting function + # warnings.warn("Implement continuity of weighting function.") + w = critical_points * 1.0 / np.sum(critical_points) + return w + + distMeas = distMeas - distMeas_lowerLimit + w = (1 / distMeas) ** weightPow + if np.sum(w) == 0: + return w + w = w / np.sum(w) # Normalization + return w + + ### path_generator.py + + @staticmethod + def pol2pos(path_pol, s): + n_pol = len(path_pol) // 2 - 1 + return [sum([path_pol[j * (n_pol + 1) + i] * s ** (n_pol - i) for i in range(n_pol + 1)]) for j in range(2)] + + @staticmethod + def path_generator(r0, rg, obstacles, dp_max, N, dt, max_compute_time, n_pol, ds_decay_rate=0.5, + ds_increase_rate=2., max_nr_steps=1000, convergence_tolerance=1e-5, P_prev=None, s_prev=None, + reactivity=1., crep=1., tail_effect=False, reference_step_size=0.5, verbosity=0): + """ + r0 : np.ndarray initial reference position + rg : np.ndarray goal reference position + obstacles : list[StarshapedObstacle] obstacles in the scene + dp_max : float maximum position increment (i.e. vmax * dt) + N : int ??? + dt : float time step + max_compute_time : float timeout for computation + n_pol : int degree of the polynomial that fits the reference ??? + ds_decay_rate : float ??? + ds_increase_rate : float ??? + max_nr_steps : int computation steps threshold + convergence_tolerance : float ??? + P_prev : np.ndarray previous path ??? + s_prev : np.ndarray previous path time ??? + reactivity : float ??? + crep : float ??? + tail_effect : bool ??? + reference_step_size : float ??? + verbosity : int you guessed it... + """ + + t0 = tic() + + # Initialize + ds = 1 + s = np.zeros(max_nr_steps) + r = np.zeros((max_nr_steps, r0.size)) + if P_prev is not None: + i = P_prev.shape[0] + r[:i, :] = P_prev + s[:i] = s_prev + else: + i = 1 + r[0, :] = r0 + + while True: + dist_to_goal = np.linalg.norm(r[i - 1, :] - rg) + # Check exit conditions + if dist_to_goal < convergence_tolerance: + if verbosity > 1: + print(f"[Path Generator]: Path converged. {int(100 * (s[i - 1] / N))}% of path completed.") + break + if s[i - 1] >= N: + if verbosity > 1: + print(f"[Path Generator]: Completed path length. {int(100 * (s[i - 1] / N))}% of path completed.") + break + if toc(t0) > max_compute_time: + if verbosity > 1: + print(f"[Path Generator]: Max compute time in path integrator. {int(100 * (s[i - 1] / N))}% of path completed.") + break + if i >= max_nr_steps: + if verbosity > 1: + print(f"[Path Generator]: Max steps taken in path integrator. {int(100 * (s[i - 1] / N))}% of path completed.") + break + + # Movement using SOADS dynamics + dr = min(dp_max, dist_to_goal) * PathGenerator.soads_f(r[i - 1, :], rg, obstacles, adapt_obstacle_velocity=False, + unit_magnitude=True, crep=crep, + reactivity=reactivity, tail_effect=tail_effect, + convergence_tolerance=convergence_tolerance) + + r[i, :] = r[i - 1, :] + dr * ds + + ri_in_obstacle = False + while any([o.interior_point(r[i, :]) for o in obstacles]): + if verbosity > 1: + print("[Path Generator]: Path inside obstacle. Reducing integration step from {:5f} to {:5f}.".format(ds, ds*ds_decay_rate)) + ds *= ds_decay_rate + r[i, :] = r[i - 1, :] + dr * ds + # Additional compute time check + if toc(t0) > max_compute_time: + ri_in_obstacle = True + break + if ri_in_obstacle: + continue + + # Update travelled distance + s[i] = s[i - 1] + ds + # Try to increase step rate again + ds = min(ds_increase_rate * ds, 1) + # Increase iteration counter + i += 1 + + r = r[:i, :] + s = s[:i] + + # Evenly spaced path + s_vec = np.arange(0, s[-1] + reference_step_size, reference_step_size) + xs, ys = np.interp(s_vec, s, r[:, 0]), np.interp(s_vec, s, r[:, 1]) + # Append not finished path with fixed final position + s_vec = np.append(s_vec, np.arange(s[-1] + reference_step_size, N + reference_step_size, reference_step_size)) + xs = np.append(xs, xs[-1] * np.ones(len(s_vec)-len(xs))) + ys = np.append(ys, ys[-1] * np.ones(len(s_vec)-len(ys))) + + reference_path = [el for p in zip(xs, ys) for el in p] # [x0 y0 x1 y1 ...] + + # TODO: Fix when close to goal + # TODO: Adjust for short arc length, skip higher order terms.. + path_pol = np.polyfit(s_vec, reference_path[::2], n_pol).tolist() + \ + np.polyfit(s_vec, reference_path[1::2], n_pol).tolist() # [px0 px1 ... pxn py0 py1 ... pyn] + # Force init position to be correct + path_pol[n_pol] = reference_path[0] + path_pol[-1] = reference_path[1] + + # Compute polyfit approximation error + epsilon = [np.linalg.norm(np.array(reference_path[2 * i:2 * (i + 1)]) - np.array(PathGenerator.pol2pos(path_pol, s_vec[i]))) for i in range(N + 1)] + + compute_time = toc(t0) + + """ + path_pol : np.ndarray the polynomial approximation of `reference_path` + epsilon : [float] approximation error between the polynomial fit and the actual path + reference_path : list the actual path (used for P_prev later on) in [x1, y1, x2, y2, ...] format + compute_time : float overall timing of this function + """ + return path_pol, epsilon, reference_path, compute_time + + def prepare_prev(self, p: np.ndarray, rho: float, obstacles_star: List[StarshapedObstacle]): + P_prev = np.array([self.target_path[::2], self.target_path[1::2]]).T + # Shift path to start at point closest to robot position + P_prev = P_prev[np.argmin(np.linalg.norm(p - P_prev, axis=1)):, :] + # P_prev[0, :] = self.r0 + if np.linalg.norm(p - P_prev[0, :]) > rho: + if self.verbosity > 0: + print("[Path Generator]: No reuse of previous path. Path not within distance rho from robot.") + P_prev = None + else: + for r in P_prev: + if any([o.interior_point(r) for o in obstacles_star]): + if self.verbosity > 0: + print("[Path Generator]: No reuse of previous path. Path not collision-free.") + P_prev = None + + if P_prev is not None: + # Cut off stand still padding in previous path + P_prev_stepsize = np.linalg.norm(np.diff(P_prev, axis=0), axis=1) + s_prev = np.hstack((0, np.cumsum(P_prev_stepsize) / self.params['dp_max'])) + P_prev_mask = [True] + (P_prev_stepsize > 1e-8).tolist() + P_prev = P_prev[P_prev_mask, :] + s_prev = s_prev[P_prev_mask] + else: + s_prev = None + + return P_prev, s_prev + + def update(self, p: np.ndarray, r0: np.ndarray, rg: np.ndarray, rho: float, obstacles_star: List[StarshapedObstacle]) -> tuple[List[float], float]: + # Buffer previous target path + if self.params['buffer'] and self.target_path: + P_prev, s_prev = self.prepare_prev(p, rho, obstacles_star) + else: + P_prev, s_prev = None, None + # Generate the new path + path_pol, epsilon, self.target_path, _ = PathGenerator.path_generator( + r0, rg, obstacles_star, self.params['dp_max'], self.params['N'], + self.params['dt'], self.params['max_compute_time'], self.params['n_pol'], + ds_decay_rate=0.5, ds_increase_rate=2., max_nr_steps=1000, P_prev=P_prev, s_prev=s_prev, + reactivity=self.params['reactivity'], crep=self.params['crep'], + convergence_tolerance=self.params['convergence_tolerance'], verbosity=self.verbosity) + return path_pol, epsilon + + +def createMap(): + """ + createMap + --------- + return obstacles that define the 2D map + """ + map_as_list = [ + [[2, 2], [8, 2], [8, 3], [2, 3]] , + [[2, 3], [3, 3], [3, 4.25], [2, 4.25]], + [[2, 5], [8, 5], [8, 6], [2, 6]] , + [[2, 8], [8, 8], [8, 9], [2, 9]] , + ] + + obstacles = [] + for map_element in map_as_list: + obstacles.append(StarshapedPolygon(map_element)) + return obstacles, map_as_list + +def pathVisualizer(x0, goal, map_as_list, positions): + # plotting + fig = plt.figure() + handle_goal = plt.plot(*pg, c="g")[0] + handle_init = plt.plot(*x0[:2], c="b")[0] + handle_curr = plt.plot(*x0[:2], c="r", marker=(3, 0, np.rad2deg(x0[2]-np.pi/2)), markersize=10)[0] + handle_curr_dir = plt.plot(0, 0, marker=(2, 0, np.rad2deg(0)), markersize=5, color='w')[0] + handle_path = plt.plot([], [], c="k")[0] + coll = [] + for map_element in map_as_list: + coll.append(plt_col.PolyCollection(np.array(map_element))) + plt.gca().add_collection(coll) + handle_title = plt.text(5, 9.5, "", bbox={'facecolor':'w', 'alpha':0.5, 'pad':5}, ha="center") + plt.gca().set_aspect("equal") + plt.xlim([0, 10]) + plt.ylim([0, 10]) + plt.draw() + + # do the updating plotting + for x in positions: + handle_curr.set_data([x[0]], [x[1]]) + handle_curr.set_marker((3, 0, np.rad2deg(x[2]-np.pi/2))) + handle_curr_dir.set_data([x[0]], [x[1]]) + handle_curr_dir.set_marker((2, 0, np.rad2deg(x[2]-np.pi/2))) + handle_path.set_data([path_gen.target_path[::2], path_gen.target_path[1::2]]) + handle_title.set_text(f"{t:5.3f}") + fig.canvas.draw() + plt.pause(0.005) + plt.show() + +# stupid function for stupid data re-assembly +#def path2D_to_SE2(path2D : list): +# path2D = np.array(path2D) +# # create (N,2) path for list [x0,y0,x1,y1,...] +# # of course it shouldn't be like that to begin with but +# # i have no time for that +# path2D = path2D.reshape((-1, 2)) +# # since it's a pairwise operation it can't be done on the last point +# x_i = path2D[:,0][:-1] # no last element +# y_i = path2D[:,1][:-1] # no last element +# x_i_plus_1 = path2D[:,0][1:] # no first element +# y_i_plus_1 = path2D[:,1][1:] # no first element +# # elementwise arctan2 +# thetas = np.arctan2(x_i_plus_1 - x_i, y_i_plus_1 - y_i) +# thetas = thetas.reshape((-1, 1)) +# path_SE2 = np.hstack((path2D, thetas)) +# return path_SE2 + + +# function to be put into ProcessManager, +# spitting out path points +def starPlanner(goal, args, init_cmd, cmd_queue : Queue, data_queue : Queue): + """ + starPlanner + ------------ + wild dark dynamical system magic done by albin, + with more dark magic on top to get it to spit + out path points and just the path points. + goal and path are [x,y] + """ + # environment + obstacles, _ = createMap() + + robot_type = "Unicycle" + with open(args.planning_robot_params_file) as f: + params = yaml.safe_load(f) + robot_params = params[robot_type] + planning_robot = Unicycle(width=robot_params['width'], + vel_min=[robot_params['lin_vel_min'], -robot_params['ang_vel_max']], + vel_max=[robot_params['lin_vel_max'], robot_params['ang_vel_max']], + name=robot_type) + + with open(args.tunnel_mpc_params_file) as f: + params = yaml.safe_load(f) + mpc_params = params["tunnel_mpc"] + mpc_params['dp_max'] = planning_robot.vmax * mpc_params['dt'] + + verbosity = 1 + scene_updater = SceneUpdater(mpc_params, verbosity) + path_gen = PathGenerator(mpc_params, verbosity) + + # TODO: make it an argument + convergence_threshold = 0.05 + try: + while True: + # has to be a blocking call + cmd = cmd_queue.get() + p = cmd['p'] + + if np.linalg.norm(p - goal) < convergence_threshold: + break + + # Update the scene + r0, rg, rho, obstacles_star = scene_updater.update(p, goal, obstacles) + # compute the path + path_pol, epsilon = path_gen.update(p, r0, rg, rho, obstacles_star) + # delete comment once new thing works + ################################################ + # create (N,2) path for list [x0,y0,x1,y1,...] + # of course it shouldn't be like that to begin with but + # i have no time for that + #path2D = np.array(path_gen.target_path) + #path2D = path2D.reshape((-1, 2)) + ################################################ + data_queue.put((path_pol, path_gen.target_path)) + if args.debug_prints: + print("put in new plan") + + except KeyboardInterrupt: + if args.debug_prints: + print("PLANNER: caught KeyboardInterrupt, i'm out") + + +if __name__ == "__main__": + pass diff --git a/python/ur_simple_control/path_generation/robot_params.yaml b/python/ur_simple_control/path_generation/robot_params.yaml new file mode 100644 index 0000000000000000000000000000000000000000..fe1dcbf7e05ebbe89e1044620e25a327a931dfd1 --- /dev/null +++ b/python/ur_simple_control/path_generation/robot_params.yaml @@ -0,0 +1,27 @@ + +Omnidirectional: + model: 'Omnidirectional' + width: 0.5 + vel_max: 1.5 + +Unicycle_forward: + model: 'Unicycle' + width: 0.5 + lin_vel_min: 0. + lin_vel_max: 1.5 + ang_vel_max: 1.5 + +Unicycle: + model: 'Unicycle' + width: 0.5 + lin_vel_min: -0.5 + lin_vel_max: 1.5 + ang_vel_max: 1.5 + +Bicycle: + model: 'Bicycle' + width: 0.5 + steer_max: 1.0 + lin_vel_min: -0.5 + lin_vel_max: 1.5 + steer_vel_max: 0.5 diff --git a/python/ur_simple_control/path_generation/setup.py b/python/ur_simple_control/path_generation/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..df2f0c78df00acfbde143eed1cd5a2a9b32fb2ac --- /dev/null +++ b/python/ur_simple_control/path_generation/setup.py @@ -0,0 +1,13 @@ +from setuptools import setup, find_packages + +setup(name='mobile_manipulation_challenge', + version='1.0', + packages=find_packages(), + install_requires=[ + 'numpy', + 'scipy', + 'matplotlib', + 'shapely', + 'pyyaml' + ] +) diff --git a/python/ur_simple_control/path_generation/star_navigation b/python/ur_simple_control/path_generation/star_navigation new file mode 160000 index 0000000000000000000000000000000000000000..2e24e2ffca8a7662146e90f06fce74f40af6a316 --- /dev/null +++ b/python/ur_simple_control/path_generation/star_navigation @@ -0,0 +1 @@ +Subproject commit 2e24e2ffca8a7662146e90f06fce74f40af6a316 diff --git a/python/ur_simple_control/path_generation/starworld_tunnel_mpc b/python/ur_simple_control/path_generation/starworld_tunnel_mpc new file mode 160000 index 0000000000000000000000000000000000000000..94c3d55671c9c57bcfc2509d6f8a64f26217861e --- /dev/null +++ b/python/ur_simple_control/path_generation/starworld_tunnel_mpc @@ -0,0 +1 @@ +Subproject commit 94c3d55671c9c57bcfc2509d6f8a64f26217861e diff --git a/python/ur_simple_control/path_generation/starworlds b/python/ur_simple_control/path_generation/starworlds new file mode 160000 index 0000000000000000000000000000000000000000..2990b9ecdeb8d7dd421fc518b5a97b2a31d5e67e --- /dev/null +++ b/python/ur_simple_control/path_generation/starworlds @@ -0,0 +1 @@ +Subproject commit 2990b9ecdeb8d7dd421fc518b5a97b2a31d5e67e diff --git a/python/ur_simple_control/path_generation/tunnel_mpc_params.yaml b/python/ur_simple_control/path_generation/tunnel_mpc_params.yaml new file mode 100644 index 0000000000000000000000000000000000000000..1f1ca862f725ed5a3d033dd093dfb0a5238bd9ac --- /dev/null +++ b/python/ur_simple_control/path_generation/tunnel_mpc_params.yaml @@ -0,0 +1,41 @@ +tunnel_mpc: + + # Workspace modification + rho0: 0.3 # clearance distance + gamma: 0.5 # rho decay + make_convex: 1 + max_obs_compute_time: 40 + hull_epsilon: 0.3 + use_prev_workspace: 1 + + # Target path generation + convergence_tolerance: 1.e-3 + max_compute_time: 10 + crep: 1. + reactivity: 1. + buffer: 1 + + # MPC + ce: 100. + cs: 500. + e_penalty: 0 + dg: 0.2 + cg: 10000 + R: [250, 2.5] + convergence_margin: 0.02 + + #-------- MPC Build params --------- # + build_mode: 'release' + integration_method: 'euler' + np: 2 + n_pol: 10 + N: 5 + dt: 0.2 + solver_tol: 1.e-5 + solver_max_time: 500 # Maximum duration for mpc solver in milliseconds + solver_max_inner_iterations: 1000 + solver_max_outer_iterations: 10 + solver_initial_tol: 1.e-4 + solver_delta_tol: 1.e-4 + solver_weight_update_factor: 10.0 + solver_initial_penalty: 1000.0