diff --git a/examples/cart_pulling/base_and_ee_path_following_from_planner.py b/examples/cart_pulling/base_and_ee_path_following_from_planner.py new file mode 100644 index 0000000000000000000000000000000000000000..06f2f29d8c7388b5f71f13b4a21cd583fb7e39ec --- /dev/null +++ b/examples/cart_pulling/base_and_ee_path_following_from_planner.py @@ -0,0 +1,97 @@ +from smc import getRobotFromArgs +from smc import getMinimalArgParser +from smc.path_generation.maps.premade_maps import createSampleStaticMap +from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3_fixed +from smc.control.optimal_control.util import get_OCP_args +from smc.control.cartesian_space import getClikArgs +from smc.path_generation.planner import starPlanner, getPlanningArgs +from smc.control.optimal_control.croco_mpc_point_to_point import CrocoIKMPC +from smc.control.optimal_control.croco_mpc_path_following import ( + BaseAndEEPathFollowingMPC, +) +from smc.multiprocessing import ProcessManager + +import time +import numpy as np +from functools import partial +import pinocchio as pin + + +def get_args(): + parser = getMinimalArgParser() + parser = get_OCP_args(parser) + parser = getClikArgs(parser) # literally just for goal error + parser = getPlanningArgs(parser) + parser.add_argument( + "--handlebar-height", + type=float, + default=0.5, + help="heigh of handlebar of the cart to be pulled", + ) + parser.add_argument( + "--base-to-handlebar-preferred-distance", + type=float, + default=0.7, + help="prefered path arclength from mobile base position to handlebar", + ) + args = parser.parse_args() + return args + + +if __name__ == "__main__": + args = get_args() + robot = getRobotFromArgs(args) + # TODO: HOW IS IT POSSIBLE THAT T_W_E IS WRONG WITHOUT STEP CALLED HERE????????????????? + robot._step() + T_w_e = robot.T_w_e + robot._q[0] = 9.0 + robot._q[1] = 4.0 + robot._step() + x0 = np.concatenate([robot.q, robot.v]) + goal = np.array([0.5, 5.5]) + + planning_function = partial(starPlanner, goal) + # here we're following T_w_e reference so that's what we send + path_planner = ProcessManager( + args, planning_function, T_w_e.translation[:2], 3, None + ) + _, map_as_list = createSampleStaticMap() + if args.visualizer: + robot.sendRectangular2DMapToVisualizer(map_as_list) + # time.sleep(5) + + T_w_e = robot.T_w_e + data = None + # get first path + ##########################################3 + # initialize + ########################################### + while data is None: + path_planner.sendCommand(T_w_e.translation[:2]) + data = path_planner.getData() + # time.sleep(1 / args.ctrl_freq) + time.sleep(1) + + _, path2D = data + path2D = np.array(path2D).reshape((-1, 2)) + pathSE3 = path2D_to_SE3_fixed(path2D, args.handlebar_height) + if args.visualizer: + # TODO: document this somewhere + robot.visualizer_manager.sendCommand({"Mgoal": pathSE3[0]}) + if np.linalg.norm(pin.log6(T_w_e.actInv(pathSE3[0]))) > 1e-2: + print("going to initial path position") + CrocoIKMPC(args, robot, pathSE3[0]) + print("initialized!") + BaseAndEEPathFollowingMPC(args, robot, path_planner) + + print("final position:", robot.T_w_e) + + if args.real: + robot.stopRobot() + + if args.save_log: + robot._log_manager.saveLog() + robot._log_manager.plotAllControlLoops() + + if args.visualizer: + robot.killManipulatorVisualizer() diff --git a/examples/cart_pulling/cart_pulling.py b/examples/cart_pulling/cart_pulling.py index 6ed8beddd3f5955f509d04c62cc94220600df12d..896a3a33c1d6b3a002a5ef7531b200c572a3aaf7 100644 --- a/examples/cart_pulling/cart_pulling.py +++ b/examples/cart_pulling/cart_pulling.py @@ -61,8 +61,8 @@ def cartPullingControlLoop( robot: RobotManager, goal, goal_transform, - solver_grasp, - solver_pulling, + ocp_grasp, + ocp_pulling, path_planner: ProcessManager, i: int, past_data, @@ -77,10 +77,7 @@ def cartPullingControlLoop( """ q = robot.q - if robot.robot_name != "yumi": - T_w_e = robot.T_w_e - else: - T_w_e_left, T_w_e_right = robot.T_w_e + T_w_e_left, T_w_e_right = robot.T_w_e # we use binary as string representation (i don't want to deal with python's binary representation). # the reason for this is that then we don't have disgusting nested ifs @@ -88,12 +85,9 @@ def cartPullingControlLoop( priority_register = ["0", "1", "1"] # TODO implement this based on laser scanning or whatever # priority_register[0] = str(int(areObstaclesTooClose())) - if robot.robot_name != "yumi": - graspOK, grasp_pose = isGripperRelativeToBaseOK(args, robot) - else: - graspOK, grasp_pose, grasp_pose_left, grasp_pose_right = ( - areDualGrippersRelativeToBaseOK(args, goal_transform, robot) - ) + graspOK, grasp_pose, grasp_pose_left, grasp_pose_right = ( + areDualGrippersRelativeToBaseOK(args, goal_transform, robot) + ) # NOTE: this keeps getting reset after initial grasp has been completed. # and we want to let mpc cook priority_register[1] = str(int(not graspOK)) # set if not ok @@ -112,42 +106,15 @@ def cartPullingControlLoop( # case 0) if priority_int >= 4: - robot.sendQd(np.zeros(robot.model.nv)) + robot.sendVelocityCommand(np.zeros(robot.model.nv)) # case 1) # TODO: make it an argument obviously usempc = False if (priority_int < 4) and (priority_int >= 2): - # TODO: make goal an argument, remove Mgoal from robot - robot.Mgoal = grasp_pose if usempc: - for i, runningModel in enumerate(solver_grasp.problem.runningModels): - if robot.robot_name != "yumi": - runningModel.differential.costs.costs[ - "gripperPose" - ].cost.residual.reference = grasp_pose - else: - runningModel.differential.costs.costs[ - "gripperPose_l" - ].cost.residual.reference = grasp_pose_left - runningModel.differential.costs.costs[ - "gripperPose_r" - ].cost.residual.reference = grasp_pose_right - if robot.robot_name != "yumi": - solver_grasp.problem.terminalModel.differential.costs.costs[ - "gripperPose" - ].cost.residual.reference = grasp_pose - else: - solver_grasp.problem.terminalModel.differential.costs.costs[ - "gripperPose_l" - ].cost.residual.reference = grasp_pose_left - solver_grasp.problem.terminalModel.differential.costs.costs[ - "gripperPose_r" - ].cost.residual.reference = grasp_pose_right - - robot.Mgoal = grasp_pose - # breakFlag, save_past_item, log_item = CrocoIKMPCControlLoop(args, robot, solver_grasp, i, past_data) - CrocoIKMPCControlLoop(args, robot, solver_grasp, i, past_data) + ocp_grasp.updateGoalInModels(grasp_pose_left, grasp_pose_right): + CrocoIKMPCControlLoop(args, robot, ocp_grasp, i, past_data) else: # controlLoopClik(robot, invKinmQP, i, past_data) clikController = partial(dampedPseudoinverse, 1e-3) @@ -206,14 +173,14 @@ def cartPullingControlLoop( breakFlag, save_past_item, log_item = BaseAndEEPathFollowingMPCControlLoop( args, robot, - solver_pulling, + ocp_pulling, path_planner, grasp_pose, goal_transform, i, past_data, ) - # BaseAndEEPathFollowingMPCControlLoop(args, robot, solver_pulling, path_planner, i, past_data) + # BaseAndEEPathFollowingMPCControlLoop(args, robot, ocp_pulling, path_planner, i, past_data) p = q[:2] # needed for cart pulling @@ -239,28 +206,16 @@ def cartPulling(args, robot: RobotManager, goal, path_planner): # setup cart-pulling mpc # ############################ x0 = np.concatenate([robot.q, robot.v]) - problem_pulling = createBaseAndEEPathFollowingOCP(args, robot, x0) - if args.solver == "boxfddp": - solver_pulling = crocoddyl.SolverBoxFDDP(problem_pulling) - if args.solver == "csqp": - solver_pulling = mim_solvers.SolverCSQP(problem_pulling) - xs_init = [x0] * (solver_pulling.problem.T + 1) - us_init = solver_pulling.problem.quasiStatic([x0] * solver_pulling.problem.T) - solver_pulling.solve(xs_init, us_init, args.max_solver_iter) + ocp_pulling = BaseAndDualArmEEPathFollowingOCP(args, robot, x0) + ocp_pulling.solveInitialOCP(x0) ############################################# # setup point-to-point handlebar grasping # # TODO: have option to swith this for clik # ############################################# - grasp_pose = robot.T_w_e() - problem_grasp = createCrocoIKOCP(args, robot, x0, grasp_pose) - if args.solver == "boxfddp": - solver_grasp = crocoddyl.SolverBoxFDDP(problem_grasp) - if args.solver == "csqp": - solver_grasp = mim_solvers.SolverCSQP(problem_grasp) - xs_init = [x0] * (solver_grasp.problem.T + 1) - us_init = solver_grasp.problem.quasiStatic([x0] * solver_grasp.problem.T) - solver_grasp.solve(xs_init, us_init, args.max_solver_iter) + grasp_pose = robot.T_w_e + ocp_grasp = CrocoIKOCP(args, robot, x0, grasp_pose) + ocp_grasp.solveInitialOCP(x0) controlLoop = partial( cartPullingControlLoop, @@ -268,8 +223,8 @@ def cartPulling(args, robot: RobotManager, goal, path_planner): robot, goal, goal_transform, - solver_grasp, - solver_pulling, + ocp_grasp, + ocp_pulling, path_planner, ) diff --git a/examples/cart_pulling/dual_arm_path_following.py b/examples/cart_pulling/dual_arm_path_following.py index 2b0e218d566aeeec6a3ec796ff5269ccaef262a1..35c272cd13234ebc52c198144181223f3824fad3 100644 --- a/examples/cart_pulling/dual_arm_path_following.py +++ b/examples/cart_pulling/dual_arm_path_following.py @@ -1,2 +1,2 @@ -# same as path_following_mpc in a circle, just use a dual arm instead. +# same as ee_only_path_following_mpc in a circle, just use a dual arm instead. # the reference is still a single 6D path, but now with dual arms diff --git a/examples/cart_pulling/path_following_from_planner.py b/examples/cart_pulling/ee_only_path_following_from_plannner.py similarity index 100% rename from examples/cart_pulling/path_following_from_planner.py rename to examples/cart_pulling/ee_only_path_following_from_plannner.py diff --git a/examples/cart_pulling/pose_initialization.py b/examples/cart_pulling/pose_initialization.py index f73a16029b9bc0ee549e5b38ad88c28f1c2dd2ea..cf14b7321aaca76ecf65afbbfc40adbfef267ea4 100644 --- a/examples/cart_pulling/pose_initialization.py +++ b/examples/cart_pulling/pose_initialization.py @@ -1,3 +1,5 @@ # whatever control you use to get to the starting point for cart pulling. # in the past this might have been clik, but we can run with p2p mpc honestly. # that too needs to be dual-arm compatible though + +# ==> for single ee solved in ee_only_path_following_from_planner by just running IKOCP on the first path point diff --git a/examples/cart_pulling/starify_nav2_map.py b/examples/cart_pulling/starify_nav2_map.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..fc84f744bb1e167a38d4b1b3e22351032352bb35 100644 --- a/examples/cart_pulling/starify_nav2_map.py +++ b/examples/cart_pulling/starify_nav2_map.py @@ -0,0 +1 @@ +# the thing in the title diff --git a/examples/cart_pulling/trajectory_following_mpc.py b/examples/cart_pulling/trajectory_following_mpc.py deleted file mode 100644 index d7a53935e5b3d0bbfcf2812a1d90043e351a9565..0000000000000000000000000000000000000000 --- a/examples/cart_pulling/trajectory_following_mpc.py +++ /dev/null @@ -1 +0,0 @@ -# same as dual_arm_path_following, but with the trajectory math in it diff --git a/examples/optimal_control/croco_ik_ocp.py b/examples/optimal_control/croco_ik_ocp.py index 9ca52152057d71bb23f3708c28afa3fb00be8af9..7e72f7488a878741e6ea53c2c3a1f7200b9fbb72 100644 --- a/examples/optimal_control/croco_ik_ocp.py +++ b/examples/optimal_control/croco_ik_ocp.py @@ -38,7 +38,8 @@ if __name__ == "__main__": x0 = np.concatenate([robot.q, robot.v]) # this shouldn't really depend on x0 but i can't be bothered ocp = SingleArmIKOCP(args, robot, x0, T_w_goal) - reference = ocp.solveOCP(x0) + ocp.solveInitialOCP(x0) + reference = ocp.getSolvedReference(x0) # NOTE: IF YOU PLOT SOMETHING OTHER THAN REAL-TIME PLOTTING FIRST IT BREAKS EVERYTHING # if args.solver == "boxfddp": diff --git a/python/smc/control/optimal_control/abstract_croco_ocp.py b/python/smc/control/optimal_control/abstract_croco_ocp.py index 0d4fa0f124a0a93b65eae6d8bde07763005abab9..5669969632303e8d7202fe15d81d5614d57ede72 100644 --- a/python/smc/control/optimal_control/abstract_croco_ocp.py +++ b/python/smc/control/optimal_control/abstract_croco_ocp.py @@ -1,13 +1,8 @@ -# TODO: make a bundle method which solves and immediately follows the traj. -from smc.control.joint_space.joint_space_trajectory_following import ( - followKinematicJointTrajP, -) from smc.robots.robotmanager_abstract import AbstractRobotManager import numpy as np import crocoddyl from argparse import Namespace -import time import importlib.util import abc from typing import Any @@ -264,17 +259,19 @@ class CrocoOCP(abc.ABC): ) # this shouldn't really depend on x0 but i can't be bothered - def solveOCP(self, x0: np.ndarray) -> tuple[dict, Any]: + def solveInitialOCP(self, x0: np.ndarray): # run solve # NOTE: there are some possible parameters here that I'm not using xs = [x0] * (self.solver.problem.T + 1) us = self.solver.problem.quasiStatic([x0] * self.solver.problem.T) - start = time.time() - self.solver.solve(xs, us, 500, False, 1e-9) - end = time.time() - print("solved in:", end - start, "seconds") + #start = time.time() + #self.solver.solve(xs, us, 500, False, 1e-9) + self.solver.solve(xs, us, self.args.max_solver_iter) + #end = time.time() + #print("solved in:", end - start, "seconds") + def getSolvedReference(self) -> dict[str, Any]: # solver.solve() # get reference (state trajectory) # we aren't using controls because we only have velocity inputs @@ -283,3 +280,18 @@ class CrocoOCP(abc.ABC): vs = xs[:, self.robot.model.nq :] reference = {"qs": qs, "vs": vs, "dt": self.args.ocp_dt} return reference + + # NOTE: this is ugly, but idk how to deal with the fact that i don't know + # which kind of arguments this function needs + def updateCosts(self, data): + raise NotImplementedError("if you want to warmstart and resolve, you need \ + to specify how do you update the cost function (could be nothing) \ + in between resolving") + + def warmstartAndReSolve(self, x0: np.ndarray, data=None) -> None: + self.solver.problem.x0 = x0 + xs_init = list(self.solver.xs[1:]) + [self.solver.xs[-1]] + xs_init[0] = x0 + us_init = list(self.solver.us[1:]) + [self.solver.us[-1]] + self.updateCosts(data) + self.solver.solve(xs_init, us_init, self.args.max_solver_iter) diff --git a/python/smc/control/optimal_control/croco_mpc_path_following.py b/python/smc/control/optimal_control/croco_mpc_path_following.py index 222dd98db36b8bae6833bb3824a40838b9ec7d83..1f7fdb83313f44233a08043e21312b154c997f89 100644 --- a/python/smc/control/optimal_control/croco_mpc_path_following.py +++ b/python/smc/control/optimal_control/croco_mpc_path_following.py @@ -1,39 +1,27 @@ from smc.robots.interfaces.single_arm_interface import SingleArmInterface -from smc.control.optimal_control.croco_mpc_point_to_point import CrocoIKMPC from smc.control.control_loop_manager import ControlLoopManager from smc.multiprocessing.process_manager import ProcessManager +from smc.control.optimal_control.abstract_croco_ocp import CrocoOCP from smc.control.optimal_control.path_following_croco_ocp import ( CrocoEEPathFollowingOCP, BaseAndEEPathFollowingOCP, + BaseAndDualArmEEPathFollowingOCP, ) from smc.path_generation.path_math.path2d_to_6d import ( path2D_to_SE3_fixed, - path2D_to_SE3_with_transition_from_current_pose, ) from smc.path_generation.path_math.path_to_trajectory import path2D_timed -import crocoddyl import numpy as np from functools import partial import types -import importlib.util from argparse import Namespace -import pinocchio as pin -import time - -if importlib.util.find_spec("mim_solvers"): - try: - import mim_solvers - except ModuleNotFoundError: - print( - "mim_solvers installation is broken, rebuild and reinstall it if you want it" - ) def CrocoEndEffectorPathFollowingMPCControlLoop( args, robot: SingleArmInterface, - solver: crocoddyl.SolverBoxFDDP, + ocp: CrocoOCP, path_planner: ProcessManager, t, _, @@ -63,16 +51,11 @@ def CrocoEndEffectorPathFollowingMPCControlLoop( else: path_planner.sendCommand(p) data = path_planner.getData() - if data == None: - if args.debug_prints: - print("CTRL: got no path so i won't move") - robot.sendVelocityCommand(np.zeros(robot.model.nv)) - log_item["qs"] = q - log_item["dqs"] = robot.v - return breakFlag, save_past_item, log_item if data == "done": breakFlag = True + + if data == "done" or data is None: robot.sendVelocityCommand(np.zeros(robot.model.nv)) log_item["qs"] = q log_item["dqs"] = robot.v @@ -95,27 +78,9 @@ def CrocoEndEffectorPathFollowingMPCControlLoop( robot.visualizer_manager.sendCommand({"frame_path": pathSE3}) x0 = np.concatenate([robot.q, robot.v]) - solver.problem.x0 = x0 - # warmstart solver with previous solution - xs_init = list(solver.xs[1:]) + [solver.xs[-1]] - xs_init[0] = x0 - us_init = list(solver.us[1:]) + [solver.us[-1]] - - for i, runningModel in enumerate(solver.problem.runningModels): - runningModel.differential.costs.costs[ - "gripperPose" + str(i) - ].cost.residual.reference = pathSE3[i] - # runningModel.differential.costs.costs['gripperPose'].cost.residual.reference = pathSE3[i] - - # idk if that's necessary - solver.problem.terminalModel.differential.costs.costs[ - "gripperPose" + str(args.n_knots) - ].cost.residual.reference = pathSE3[-1] - # solver.problem.terminalModel.differential.costs.costs['gripperPose'].cost.residual.reference = pathSE3[-1] - - solver.solve(xs_init, us_init, args.max_solver_iter) - xs = np.array(solver.xs) - us = np.array(solver.us) + ocp.warmstartAndReSolve(x0, data=pathSE3) + xs = np.array(ocp.solver.xs) + us = np.array(ocp.solver.us) vel_cmds = xs[1, robot.model.nq :] robot.sendVelocityCommand(vel_cmds) @@ -139,21 +104,18 @@ def CrocoEndEffectorPathFollowingMPC( """ ocp = CrocoEEPathFollowingOCP(args, robot, x0) - solver = ocp.getSolver() # technically should be done in controlloop because now # it's solved 2 times before the first command, # but we don't have time for details rn x0 = np.concatenate([robot.q, robot.v]) - xs_init = [x0] * (solver.problem.T + 1) - us_init = solver.problem.quasiStatic([x0] * solver.problem.T) - solver.solve(xs_init, us_init, args.max_solver_iter) + ocp.solveInitialOCP(x0) controlLoop = partial( - CrocoEndEffectorPathFollowingMPCControlLoop, args, robot, solver, path_planner + CrocoEndEffectorPathFollowingMPCControlLoop, args, robot, ocp, path_planner ) log_item = {"qs": np.zeros(robot.model.nq), "dqs": np.zeros(robot.model.nv)} - save_past_item = {"T_w_e": robot.T_w_e} + save_past_item = {} loop_manager = ControlLoopManager( robot, controlLoop, args, save_past_item, log_item ) @@ -165,7 +127,179 @@ def CrocoEndEffectorPathFollowingMPC( def BaseAndEEPathFollowingMPCControlLoop( args, robot, - solver: crocoddyl.SolverBoxFDDP, + ocp: CrocoOCP, + path_planner: ProcessManager, + iter_n, + past_data, +): + """ + CrocoPathFollowingMPCControlLoop + ----------------------------- + end-effector(s) follow their path(s). + + path planner can either be a function which spits out a list of path points + or an instance of ProcessManager which spits out path points + by calling ProcessManager.getData() + """ + breakFlag = False + log_item = {} + save_past_dict = {} + + q = robot.q + T_w_e = robot.T_w_e + p = q[:2] + # NOTE: this is the actual position, not what the path suggested + # whether this or path reference should be put in is debateable. + # this feels more correct to me. + save_past_dict["path2D_untimed"] = p + path_planner.sendCommand(p) + + ########################### + # get path from planner # + ########################### + data = path_planner.getData() + + if data == "done": + breakFlag = True + if data == "done" or data is None: + robot.sendVelocityCommand(np.zeros(robot.model.nv)) + log_item["qs"] = q.reshape((robot.model.nq,)) + log_item["dqs"] = robot.v.reshape((robot.model.nv,)) + return breakFlag, save_past_dict, log_item + + ########################################## + # construct timed 2D path for the base # + ########################################## + _, path2D_untimed_base = data + path2D_untimed_base = np.array(path2D_untimed_base).reshape((-1, 2)) + # ideally should be precomputed somewhere + max_base_v = np.linalg.norm(robot.model.velocityLimit[:2]) + # base just needs timing on the path + path_base = path2D_timed(args, path2D_untimed_base, max_base_v) + # and it's of height 0 (i.e. the height of base's planar joint) + path_base = np.hstack((path_base, np.zeros((len(path_base), 1)))) + + ################################################### + # construct timed SE3 path for the end-effector # + ################################################### + # this works as follow + # 1) find the previous path point of arclength base_to_handlebar_preferred_distance. + # first part of the path is from there to current base position, + # second is just the current base's plan. + # 2) construct timing on the first part. + # 3) join that with the already timed second part. + # 4) turn the timed 2D path into an SE3 trajectory + + # NOTE: this can be O(1) instead of O(n) but i can't be bothered + path_arclength = np.linalg.norm(p - past_data["path2D_untimed"]) + handlebar_path_index = -1 + for i in range(-2, -1 * len(past_data["path2D_untimed"]), -1): + if path_arclength > args.base_to_handlebar_preferred_distance: + handlebar_path_index = i + break + path_arclength += np.linalg.norm( + past_data["path2D_untimed"][i - 1] - past_data["path2D_untimed"][i] + ) + # i shouldn't need to copy-paste everything but what can you do + path2D_handlebar_1_untimed = np.array(past_data["path2D_untimed"]) + # NOTE: BIG ASSUMPTION + # let's say we're computing on time, and that's the time spacing + # of previous path points. + # this means you need to lower the control frequency argument + # if you're not meeting deadlines. + # if you really need timing information, you should actually + # get it from ControlLoopManager instead of i, + # but let's say this is better because you're forced to know + # how fast you are instead of ducktaping around delays. + # TODO: actually save timing, pass t instead of i to controlLoops + # from controlLoopManager + # NOTE: this might not working when rosified so check that first + time_past = np.linspace( + 0.0, args.past_window_size * robot.dt, args.past_window_size + ) + s = np.linspace(0.0, args.n_knots * args.ocp_dt, args.n_knots) + path2D_handlebar_1 = np.hstack( + ( + np.interp(s, time_past, path2D_handlebar_1_untimed[:, 0]).reshape((-1, 1)), + np.interp(s, time_past, path2D_handlebar_1_untimed[:, 1]).reshape((-1, 1)), + ) + ) + + pathSE3_handlebar = path2D_to_SE3_fixed(path2D_handlebar_1, args.handlebar_height) + if args.visualizer: + if iter_n % 20 == 0: + robot.visualizer_manager.sendCommand({"path": path_base}) + robot.visualizer_manager.sendCommand({"frame_path": pathSE3_handlebar}) + + x0 = np.concatenate([robot.q, robot.v]) + ocp.warmstartAndReSolve(x0, data=(path_base, pathSE3_handlebar)) + xs = np.array(ocp.solver.xs) + us = np.array(ocp.solver.us) + vel_cmds = xs[1, robot.model.nq :] + + robot.sendVelocityCommand(vel_cmds) + + log_item["qs"] = q.reshape((robot.model.nq,)) + log_item["dqs"] = robot.v.reshape((robot.model.nv,)) + return breakFlag, save_past_dict, log_item + + +def BaseAndEEPathFollowingMPC(args, robot, path_planner): + """ + CrocoEndEffectorPathFollowingMPC + ----- + run mpc for a point-to-point inverse kinematics. + note that the actual problem is solved on + a dynamics level, and velocities we command + are actually extracted from the state x(q,dq). + """ + + x0 = np.concatenate([robot.q, robot.v]) + ocp = BaseAndEEPathFollowingOCP(args, robot, x0) + ocp.solveInitialOCP(x0) + + # prepopulate past data to make base and cart be on the same path in the past + # (which didn't actually happen because this just started) + T_w_e = robot.T_w_e + p_cart = robot.q[:2] + p_ee = T_w_e.translation[:2] + straigh_line_path = np.linspace(p_ee, p_cart, args.past_window_size) + # time it the same way the base path is timed + time_past = np.linspace( + 0.0, args.past_window_size * robot.dt, args.past_window_size + ) + s = np.linspace(0.0, args.n_knots * args.ocp_dt, args.past_window_size) + path2D_handlebar = np.hstack( + ( + np.interp(s, time_past, straigh_line_path[:, 0]).reshape((-1, 1)), + np.interp(s, time_past, straigh_line_path[:, 1]).reshape((-1, 1)), + ) + ) + + controlLoop = partial( + BaseAndEEPathFollowingMPCControlLoop, args, robot, ocp, path_planner + ) + log_item = {"qs": np.zeros(robot.model.nq), "dqs": np.zeros(robot.model.nv)} + save_past_dict = {"path2D_untimed": T_w_e.translation[:2]} + loop_manager = ControlLoopManager( + robot, controlLoop, args, save_past_dict, log_item + ) + + + loop_manager.past_data["path2D_untimed"].clear() + loop_manager.past_data["path2D_untimed"].extend( + path2D_handlebar[i] for i in range(args.past_window_size) + ) + + loop_manager.run() + + +# TODO: the robot put in is a whole body robot, +# which you need to implement +def BaseAndDualArmEEPathFollowingMPCControlLoop( + args, + robot, + ocp: CrocoOCP, path_planner: ProcessManager, grasp_pose, goal_transform, @@ -186,7 +320,7 @@ def BaseAndEEPathFollowingMPCControlLoop( save_past_dict = {} q = robot.q - T_w_e = robot.getT_w_e() + T_w_e = robot.T_w_e p = q[:2] # NOTE: this is the actual position, not what the path suggested # whether this or path reference should be put in is debateable. @@ -202,16 +336,10 @@ def BaseAndEEPathFollowingMPCControlLoop( # get the path from the base from the current base position onward. # but we're still fast so who cares data = path_planner.getData() - if data == None: - if args.debug_prints: - print("CTRL: got no path so not i won't move") - robot.sendVelocityCommand(np.zeros(robot.model.nv)) - log_item["qs"] = q.reshape((robot.model.nq,)) - log_item["dqs"] = robot.v.reshape((robot.model.nv,)) - return breakFlag, save_past_dict, log_item if data == "done": breakFlag = True + if data == "done" or data is None: robot.sendVelocityCommand(np.zeros(robot.model.nv)) log_item["qs"] = q.reshape((robot.model.nq,)) log_item["dqs"] = robot.v.reshape((robot.model.nv,)) @@ -220,7 +348,7 @@ def BaseAndEEPathFollowingMPCControlLoop( ########################################## # construct timed 2D path for the base # ########################################## - path_pol_base, path2D_untimed_base = data + _, path2D_untimed_base = data path2D_untimed_base = np.array(path2D_untimed_base).reshape((-1, 2)) # ideally should be precomputed somewhere max_base_v = np.linalg.norm(robot.model.velocityLimit[:2]) @@ -275,7 +403,7 @@ def BaseAndEEPathFollowingMPCControlLoop( ) ) - pathSE3_handlebar = path2D_to_SE3(path2D_handlebar_1, args.handlebar_height) + pathSE3_handlebar = path2D_to_SE3_fixed(path2D_handlebar_1, args.handlebar_height) pathSE3_handlebar_left = [] pathSE3_handlebar_right = [] for pathSE3 in pathSE3_handlebar: @@ -288,49 +416,11 @@ def BaseAndEEPathFollowingMPCControlLoop( robot.visualizer_manager.sendCommand({"frame_path": pathSE3_handlebar}) x0 = np.concatenate([robot.q, robot.v]) - solver.problem.x0 = x0 - # warmstart solver with previous solution - xs_init = list(solver.xs[1:]) + [solver.xs[-1]] - xs_init[0] = x0 - us_init = list(solver.us[1:]) + [solver.us[-1]] - - for i, runningModel in enumerate(solver.problem.runningModels): - # print('adding base', path_base[i]) - # print("this was the prev ref", runningModel.differential.costs.costs['base_translation' + str(i)].cost.residual.reference) - runningModel.differential.costs.costs[ - "base_translation" + str(i) - ].cost.residual.reference = path_base[i] - if robot.robot_name != "yumi": - runningModel.differential.costs.costs[ - "ee_pose" + str(i) - ].cost.residual.reference = pathSE3_handlebar[i] - else: - runningModel.differential.costs.costs[ - "l_ee_pose" + str(i) - ].cost.residual.reference = pathSE3_handlebar_left[i] - runningModel.differential.costs.costs[ - "r_ee_pose" + str(i) - ].cost.residual.reference = pathSE3_handlebar_right[i] - - # idk if that's necessary - solver.problem.terminalModel.differential.costs.costs[ - "base_translation" + str(args.n_knots) - ].cost.residual.reference = path_base[-1] - if robot.robot_name != "yumi": - solver.problem.terminalModel.differential.costs.costs[ - "ee_pose" + str(args.n_knots) - ].cost.residual.reference = pathSE3_handlebar[-1] - else: - solver.problem.terminalModel.differential.costs.costs[ - "l_ee_pose" + str(args.n_knots) - ].cost.residual.reference = pathSE3_handlebar[-1] - solver.problem.terminalModel.differential.costs.costs[ - "r_ee_pose" + str(args.n_knots) - ].cost.residual.reference = pathSE3_handlebar[-1] - - solver.solve(xs_init, us_init, args.max_solver_iter) - xs = np.array(solver.xs) - us = np.array(solver.us) + ocp.warmstartAndReSolve( + x0, data=(path_base, pathSE3_handlebar_left, pathSE3_handlebar_right) + ) + xs = np.array(ocp.solver.xs) + us = np.array(ocp.solver.us) vel_cmds = xs[1, robot.model.nq :] robot.sendVelocityCommand(vel_cmds) @@ -340,7 +430,7 @@ def BaseAndEEPathFollowingMPCControlLoop( return breakFlag, save_past_dict, log_item -def BaseAndEEPathFollowingMPC(args, robot, path_planner): +def BaseAndDualARmEEPathFollowingMPC(args, robot, path_planner): """ CrocoEndEffectorPathFollowingMPC ----- @@ -351,25 +441,18 @@ def BaseAndEEPathFollowingMPC(args, robot, path_planner): """ x0 = np.concatenate([robot.q, robot.v]) - problem = createBaseAndEEPathFollowingOCP(args, robot, x0) - if args.solver == "boxfddp": - solver = crocoddyl.SolverBoxFDDP(problem) - if args.solver == "csqp": - solver = mim_solvers.SolverCSQP(problem) - - xs_init = [x0] * (solver.problem.T + 1) - us_init = solver.problem.quasiStatic([x0] * solver.problem.T) - solver.solve(xs_init, us_init, args.max_solver_iter) + ocp = BaseAndDualArmEEPathFollowingOCP(args, robot, x0) + ocp.solveInitialOCP(x0) controlLoop = partial( - BaseAndEEPathFollowingMPCControlLoop, args, robot, solver, path_planner + BaseAndEEPathFollowingMPCControlLoop, args, robot, ocp, path_planner ) log_item = {"qs": np.zeros(robot.model.nq), "dqs": np.zeros(robot.model.nv)} # TODO: put ensurance that save_past is not too small for this # or make a specific argument for THIS past-moving-window size # this is the end-effector's reference, so we should initialize that # TODO: verify this initialization actually makes sense - T_w_e = robot.getT_w_e() + T_w_e = robot.T_w_e save_past_dict = {"path2D_untimed": T_w_e.translation[:2]} loop_manager = ControlLoopManager( robot, controlLoop, args, save_past_dict, log_item diff --git a/python/smc/control/optimal_control/croco_mpc_point_to_point.py b/python/smc/control/optimal_control/croco_mpc_point_to_point.py index 02bc018de3db160b9d8a4cb00e635037be6eda2b..d391f5fde4f589db972ae7bfb55ee18fed293a49 100644 --- a/python/smc/control/optimal_control/croco_mpc_point_to_point.py +++ b/python/smc/control/optimal_control/croco_mpc_point_to_point.py @@ -1,24 +1,13 @@ from smc.robots.interfaces.single_arm_interface import SingleArmInterface from smc.control.control_loop_manager import ControlLoopManager -from smc.multiprocessing.process_manager import ProcessManager from smc.control.optimal_control.point_to_point_croco_ocp import SingleArmIKOCP import pinocchio as pin -import crocoddyl import numpy as np from functools import partial -import importlib.util -if importlib.util.find_spec("mim_solvers"): - try: - import mim_solvers - except ModuleNotFoundError: - print( - "mim_solvers installation is broken, rebuild and reinstall it if you want it" - ) - -def CrocoIKMPCControlLoop(args, robot: SingleArmInterface, solver, T_w_goal, _, __): +def CrocoIKMPCControlLoop(args, robot: SingleArmInterface, ocp, T_w_goal, _, __): """ CrocoIKMPCControlLoop --------------------- @@ -35,15 +24,9 @@ def CrocoIKMPCControlLoop(args, robot: SingleArmInterface, solver, T_w_goal, _, # set initial state from sensor x0 = np.concatenate([robot.q, robot.v]) - solver.problem.x0 = x0 - # warmstart solver with previous solution - xs_init = list(solver.xs[1:]) + [solver.xs[-1]] - xs_init[0] = x0 - us_init = list(solver.us[1:]) + [solver.us[-1]] - - solver.solve(xs_init, us_init, args.max_solver_iter) - xs = np.array(solver.xs) - us = np.array(solver.us) + ocp.warmstartAndReSolve(x0) + xs = np.array(ocp.solver.xs) + us = np.array(ocp.solver.us) # NOTE: for some reason the first value is always some wild bs vel_cmds = xs[1, robot.model.nq :] robot.sendVelocityCommand(vel_cmds) @@ -68,16 +51,9 @@ def CrocoIKMPC(args, robot, T_w_goal, run=True): """ x0 = np.concatenate([robot.q, robot.v]) ocp = SingleArmIKOCP(args, robot, x0, T_w_goal) - solver = ocp.getSolver() - - # technically should be done in controlloop because now - # it's solved 2 times before the first command, - # but we don't have time for details rn - xs_init = [x0] * (solver.problem.T + 1) - us_init = solver.problem.quasiStatic([x0] * solver.problem.T) - solver.solve(xs_init, us_init, args.max_solver_iter) + ocp.solveInitialOCP(x0) - controlLoop = partial(CrocoIKMPCControlLoop, args, robot, solver, T_w_goal) + controlLoop = partial(CrocoIKMPCControlLoop, args, robot, ocp, T_w_goal) log_item = { "qs": np.zeros(robot.model.nq), "dqs": np.zeros(robot.model.nv), diff --git a/python/smc/control/optimal_control/path_following_croco_ocp.py b/python/smc/control/optimal_control/path_following_croco_ocp.py index 26fdae9f5eb727b0404740447c660133da41c0aa..6d75a495bceb4d6b0040359e9b12e8481cf2632a 100644 --- a/python/smc/control/optimal_control/path_following_croco_ocp.py +++ b/python/smc/control/optimal_control/path_following_croco_ocp.py @@ -47,6 +47,16 @@ class CrocoEEPathFollowingOCP(CrocoOCP): ) # terminalCostModel.addCost("gripperPose", goalTrackingCost, 1e2) + def updateCosts(self, data): + for i, runningModel in enumerate(self.solver.problem.runningModels): + runningModel.differential.costs.costs[ + "gripperPose" + str(i) + ].cost.residual.reference = data[i] + + self.solver.problem.terminalModel.differential.costs.costs[ + "gripperPose" + str(self.args.n_knots) + ].cost.residual.reference = data[-1] + class BaseAndEEPathFollowingOCP(CrocoOCP): """ @@ -57,7 +67,7 @@ class BaseAndEEPathFollowingOCP(CrocoOCP): NOTE: the path MUST be time indexed with the SAME time used between the knots """ - def __init__(self, args, robot: AbstractRobotManager, x0): + def __init__(self, args, robot: SingleArmInterface, x0): goal = None super().__init__(args, robot, x0, goal) @@ -69,7 +79,7 @@ class BaseAndEEPathFollowingOCP(CrocoOCP): for i in range(self.args.n_knots): eePoseResidual = crocoddyl.ResidualModelFramePlacement( self.state, - self.robot.model.ee_frame_id, + self.robot.ee_frame_id, path_ee[i], # this better be done with the same time steps as the knots # NOTE: this should be done outside of this function to clarity self.state.nv, @@ -80,7 +90,7 @@ class BaseAndEEPathFollowingOCP(CrocoOCP): ) baseTranslationResidual = crocoddyl.ResidualModelFrameTranslation( - self.state, self.robot.model.base_frame_id, path_base[i], self.state.nv + self.state, self.robot.base_frame_id, path_base[i], self.state.nv ) baseTrackingCost = crocoddyl.CostModelResidual( self.state, baseTranslationResidual @@ -100,6 +110,25 @@ class BaseAndEEPathFollowingOCP(CrocoOCP): self.args.base_translation_cost, ) + def updateCosts(self, data): + path_base = data[0] + pathSE3 = data[1] + for i, runningModel in enumerate(self.solver.problem.runningModels): + runningModel.differential.costs.costs[ + "base_translation" + str(i) + ].cost.residual.reference = path_base[i] + runningModel.differential.costs.costs[ + "ee_pose" + str(i) + ].cost.residual.reference = pathSE3[i] + + # idk if that's necessary + self.solver.problem.terminalModel.differential.costs.costs[ + "base_translation" + str(self.args.n_knots) + ].cost.residual.reference = path_base[-1] + self.solver.problem.terminalModel.differential.costs.costs[ + "ee_pose" + str(self.args.n_knots) + ].cost.residual.reference = pathSE3[-1] + class BaseAndDualArmEEPathFollowingOCP(CrocoOCP): def __init__(self, args, robot: DualArmInterface, x0): @@ -116,7 +145,6 @@ class BaseAndDualArmEEPathFollowingOCP(CrocoOCP): l_eePoseResidual = crocoddyl.ResidualModelFramePlacement( self.state, self.robot.model.l_ee_frame_id, - # MASSIVE TODO: actually put in reference for left arm path_ee_left[i], self.state.nv, ) @@ -127,7 +155,6 @@ class BaseAndDualArmEEPathFollowingOCP(CrocoOCP): r_eePoseResidual = crocoddyl.ResidualModelFramePlacement( self.state, self.robot.model.r_ee_frame_id, - # MASSIVE TODO: actually put in reference for left arm path_ee_right[i], self.state.nv, ) @@ -161,3 +188,29 @@ class BaseAndDualArmEEPathFollowingOCP(CrocoOCP): baseTrackingCost, self.args.base_translation_cost, ) + + def updateCosts(self, data): + path_base = data[0] + pathSE3_l = data[1] + pathSE3_r = data[2] + for i, runningModel in enumerate(self.solver.problem.runningModels): + runningModel.differential.costs.costs[ + "base_translation" + str(i) + ].cost.residual.reference = path_base[i] + runningModel.differential.costs.costs[ + "l_ee_pose" + str(i) + ].cost.residual.reference = pathSE3_l[i] + runningModel.differential.costs.costs[ + "r_ee_pose" + str(i) + ].cost.residual.reference = pathSE3_r[i] + + # idk if that's necessary + self.solver.problem.terminalModel.differential.costs.costs[ + "base_translation" + str(self.args.n_knots) + ].cost.residual.reference = path_base[-1] + self.solver.problem.terminalModel.differential.costs.costs[ + "l_ee_pose" + str(self.args.n_knots) + ].cost.residual.reference = pathSE3_l[-1] + self.solver.problem.terminalModel.differential.costs.costs[ + "r_ee_pose" + str(self.args.n_knots) + ].cost.residual.reference = pathSE3_r[-1] diff --git a/python/smc/control/optimal_control/point_to_point_croco_ocp.py b/python/smc/control/optimal_control/point_to_point_croco_ocp.py index 6748a091ba9b0a778eb15ea2bbd425a5fbd95017..f771c705307d67ed35ef03f41cf6785b4c760fc2 100644 --- a/python/smc/control/optimal_control/point_to_point_croco_ocp.py +++ b/python/smc/control/optimal_control/point_to_point_croco_ocp.py @@ -56,6 +56,19 @@ class SingleArmIKOCP(CrocoOCP): ) self.terminalCostModel.addCost("velFinal", frameVelocityCost, 1e3) + # there is nothing to update in a point-to-point task + def updateCosts(self, data): + pass + + def updateGoalInModels(self, T_w_goal): + for i, runningModel in enumerate(self.solver.problem.runningModels): + runningModel.differential.costs.costs[ + "gripperPose" + str(i) + ].cost.residual.reference = T_w_goal + self.solver.problem.terminalModel.differential.costs.costs[ + "gripperPose" + str(self.args.n_knots) + ].cost.residual.reference = T_w_goal + class DualArmIKOCP(CrocoOCP): def __init__( @@ -116,3 +129,23 @@ class DualArmIKOCP(CrocoOCP): ) self.terminalCostModel.addCost("velFinal_l", frameVelocityCost_l, 1e3) self.terminalCostModel.addCost("velFinal_r", frameVelocityCost_r, 1e3) + + # there is nothing to update in a point-to-point task + def updateCosts(self, data): + pass + + def updateGoalInModels(self, T_w_lgoal, T_w_rgoal): + for i, runningModel in enumerate(self.solver.problem.runningModels): + runningModel.differential.costs.costs[ + "gripperPose_l" + str(i) + ].cost.residual.reference = T_w_lgoal + runningModel.differential.costs.costs[ + "gripperPose_r" + str(i) + ].cost.residual.reference = T_w_rgoal + + self.solver.problem.terminalModel.differential.costs.costs[ + "gripperPose_l" + str(self.args.n_knots) + ].cost.residual.reference = T_w_lgoal + self.solver.problem.terminalModel.differential.costs.costs[ + "gripperPose_r" + str(self.args.n_knots) + ].cost.residual.reference = T_w_rgoal