From 8460b284d34f68f3d71c1b24b3acb958624736ac Mon Sep 17 00:00:00 2001 From: m-guberina <gubi.guberina@gmail.com> Date: Thu, 27 Feb 2025 22:49:15 +0100 Subject: [PATCH] in the middle of templetizing control loops and putting in dual arm features. i have no idea what works or what broken at the moment. the key next steps are making dualarmwholebodyinterface, probably separating point-to-point motions into different files depending on the robot, and finalizing dual-arm ocps and mpcs --- development_plan.toml | 23 +- examples/cart_pulling/MASSIVE_TODO | 2 - .../fixed_paths/dual_arm_path_following.py | 55 +++- ...base_and_ee_path_following_from_planner.py | 4 +- .../ee_only_path_following_from_plannner.py | 4 +- .../cart_pulling/disjoint_control/demo.py | 4 +- examples/optimal_control/croco_ik_mpc.py | 12 +- examples/optimal_control/dual_arm_ik_mpc.py | 44 +++ .../cartesian_space_point_to_point.py | 120 +++----- .../controller_templates/point_to_point.py | 214 ++++++++++++++- .../croco_mpc_path_following.py | 2 +- .../croco_mpc_point_to_point.py | 259 +++++++++++++----- .../point_to_point_croco_ocp.py | 60 +++- 13 files changed, 619 insertions(+), 184 deletions(-) create mode 100644 examples/optimal_control/dual_arm_ik_mpc.py diff --git a/development_plan.toml b/development_plan.toml index f21add6..dde571a 100644 --- a/development_plan.toml +++ b/development_plan.toml @@ -146,7 +146,7 @@ dependencies = ["robotmanager abstract class"] purpose = """extending existing control for dual arm robots without compromising the cleanliness of existing controllers for single arms""" hours_required = 8 -is_done = false +is_done = true [[project_plan.action_items.action_items]] @@ -204,8 +204,23 @@ deadline = 2025-01-31 dependencies = ["robotmanager abstract class"] purpose = """actual process management beyond ducktaping plotter and visualizer to loopmanager and robotmanager""" hours_required = 5 -is_done = false +is_done = true +[[project_plan.action_items.action_items]] +name = "write out control templates" +description = """ +setting the controlloopmanager - controlloop relationship to be composition, inspired by the functional programming paradigm is the winner of the previous todo. + +now the control stack should be re-written to reflect this. + +the KEY benefit of composition is that you don't need to use any of these templates. but using them in within the core of the library will drastically reduce the amount of code, thereby improving extensibility and maintainability. +""" +priority = 2 +deadline = 2025-01-31 +dependencies = ["explicit relationship between controlLoop and ControlLoopManager"] +purpose = """reducing the amount of code, thereby improving extensibility and maintainability""" +hours_required = 5 +is_done = false [[project_plan.action_items.action_items]] @@ -326,7 +341,7 @@ street cred""" hours_required = 80 is_done = false -[[project_plan.action_items.action_items]] +[[project_plan.action_items]] name = "tidy up typing" description = """ sit down and get typing coverage to 100% or whatever is the limit of reason. @@ -337,7 +352,7 @@ dependencies = ["create/fix type-checking workflow"] purpose = """easier code writting and debugging, making library professional-grade""" # -1 means it should be a sum over sub-items hours_required = 8 -is_done = true +is_done = false [[project_plan.action_items]] name = "python stdlib logging instead of printing" diff --git a/examples/cart_pulling/MASSIVE_TODO b/examples/cart_pulling/MASSIVE_TODO index eecf06f..3c16504 100644 --- a/examples/cart_pulling/MASSIVE_TODO +++ b/examples/cart_pulling/MASSIVE_TODO @@ -1,5 +1,3 @@ -1. you have to find a way to keep track of the path for the end-effector. one trillion percent it manages to break itself, you need to save it in a log and replay everything. make change to the log_item thing to make not everything automatically plottable (check value type or something) - --> preferably do that while running the disjoint control because it survives the hassle 2. make proper path-tracking cost in python, that's shouldn't be too much of a hassle 2.1. do it with numdiff first 2.2. put in the analytic derivative diff --git a/examples/cart_pulling/control_sub_problems/fixed_paths/dual_arm_path_following.py b/examples/cart_pulling/control_sub_problems/fixed_paths/dual_arm_path_following.py index 35c272c..2e11a54 100644 --- a/examples/cart_pulling/control_sub_problems/fixed_paths/dual_arm_path_following.py +++ b/examples/cart_pulling/control_sub_problems/fixed_paths/dual_arm_path_following.py @@ -1,2 +1,53 @@ -# same as ee_only_path_following_mpc in a circle, just use a dual arm instead. -# the reference is still a single 6D path, but now with dual arms +from smc import getRobotFromArgs +from smc import getMinimalArgParser +from smc.control.optimal_control.util import get_OCP_args +from smc.control.cartesian_space import getClikArgs +from smc.control.optimal_control.croco_mpc_path_following import ( + CrocoEEPathFollowingMPC, +) +import numpy as np + + +def path(T_w_e, i): + # 2) do T_mobile_base_ee_pos to get + # end-effector reference frame(s) + + # generate bullshit just to see it works + path = [] + t = i * robot.dt + for i in range(args.n_knots): + t += 0.01 + new = T_w_e.copy() + translation = 2 * np.array([np.cos(t / 20), np.sin(t / 20), 0.3]) + new.translation = translation + path.append(new) + return path + + +def get_args(): + parser = getMinimalArgParser() + parser = get_OCP_args(parser) + parser = getClikArgs(parser) # literally just for goal error + args = parser.parse_args() + return args + + +if __name__ == "__main__": + args = get_args() + robot = getRobotFromArgs(args) + x0 = np.concatenate([robot.q, robot.v]) + robot._step() + + CrocoEEPathFollowingMPC(args, robot, x0, path) + + print("final position:", robot.T_w_e) + + if args.real: + robot.stopRobot() + + if args.save_log: + robot._log_manager.saveLog() + robot._log_manager.plotAllControlLoops() + + if args.visualizer: + robot.killManipulatorVisualizer() diff --git a/examples/cart_pulling/control_sub_problems/with_planner/base_and_ee_path_following_from_planner.py b/examples/cart_pulling/control_sub_problems/with_planner/base_and_ee_path_following_from_planner.py index e74dbd5..2d34a85 100644 --- a/examples/cart_pulling/control_sub_problems/with_planner/base_and_ee_path_following_from_planner.py +++ b/examples/cart_pulling/control_sub_problems/with_planner/base_and_ee_path_following_from_planner.py @@ -5,7 +5,7 @@ from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3 from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs from smc.path_generation.planner import starPlanner, getPlanningArgs -from smc.control.optimal_control.croco_mpc_point_to_point import EEAndBaseP2PMPC +from smc.control.optimal_control.croco_mpc_point_to_point import CrocoEEAndBaseP2PMPC from smc.control.optimal_control.croco_mpc_path_following import ( BaseAndEEPathFollowingMPC, ) @@ -86,7 +86,7 @@ if __name__ == "__main__": print(pathSE3[0].translation) print(p_base) # TODO: UNCOMMENT - EEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) + CrocoEEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) print("initialized!") BaseAndEEPathFollowingMPC(args, robot, path_planner) diff --git a/examples/cart_pulling/control_sub_problems/with_planner/ee_only_path_following_from_plannner.py b/examples/cart_pulling/control_sub_problems/with_planner/ee_only_path_following_from_plannner.py index 05efbb4..783743a 100644 --- a/examples/cart_pulling/control_sub_problems/with_planner/ee_only_path_following_from_plannner.py +++ b/examples/cart_pulling/control_sub_problems/with_planner/ee_only_path_following_from_plannner.py @@ -5,7 +5,7 @@ from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3 from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs from smc.path_generation.planner import starPlanner, getPlanningArgs -from smc.control.optimal_control.croco_mpc_point_to_point import EEP2PMPC +from smc.control.optimal_control.croco_mpc_point_to_point import CrocoEEP2PMPC from smc.control.optimal_control.croco_mpc_path_following import ( CrocoEEPathFollowingMPC, ) @@ -71,7 +71,7 @@ if __name__ == "__main__": robot.visualizer_manager.sendCommand({"Mgoal": pathSE3[0]}) if np.linalg.norm(pin.log6(T_w_e.actInv(pathSE3[0]))) > 1e-2: print("going to initial path position") - EEP2PMPC(args, robot, pathSE3[0]) + CrocoEEP2PMPC(args, robot, pathSE3[0]) print("initialized!") CrocoEEPathFollowingMPC(args, robot, x0, path_planner) diff --git a/examples/cart_pulling/disjoint_control/demo.py b/examples/cart_pulling/disjoint_control/demo.py index 0531b4c..ba2f6bd 100644 --- a/examples/cart_pulling/disjoint_control/demo.py +++ b/examples/cart_pulling/disjoint_control/demo.py @@ -5,7 +5,7 @@ from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3 from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs from smc.path_generation.planner import starPlanner, getPlanningArgs -from smc.control.optimal_control.croco_mpc_point_to_point import EEAndBaseP2PMPC +from smc.control.optimal_control.croco_mpc_point_to_point import CrocoEEAndBaseP2PMPC from smc.multiprocessing import ProcessManager from mpc_base_clik_arm_control_loop import BaseMPCANDEECLIKCartPulling @@ -84,7 +84,7 @@ if __name__ == "__main__": print(pathSE3[0].translation) print(p_base) # TODO: UNCOMMENT - EEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) + CrocoEEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) print("initialized!") BaseMPCANDEECLIKCartPulling(args, robot, path_planner) diff --git a/examples/optimal_control/croco_ik_mpc.py b/examples/optimal_control/croco_ik_mpc.py index d2c2328..71f1c43 100644 --- a/examples/optimal_control/croco_ik_mpc.py +++ b/examples/optimal_control/croco_ik_mpc.py @@ -1,7 +1,6 @@ -import numpy as np from smc import getMinimalArgParser, getRobotFromArgs from smc.control.optimal_control.croco_mpc_point_to_point import ( - EEP2PMPC, + CrocoEEP2PMPC, ) from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs @@ -24,19 +23,12 @@ if __name__ == "__main__": # TODO: put this back for nicer demos # Mgoal = defineGoalPointCLI() T_w_goal = pin.SE3.Random() - if robot.robot_name == "yumi": - T_w_goal.rotation = np.eye(3) - T_w_goal_transform = pin.SE3.Identity() - T_w_goal_transform.translation[1] = 0.1 - Mgoal_left = T_w_goal_transform.act(T_w_goal) - Mgoal_right = T_w_goal_transform.inverse().act(T_w_goal) - T_w_goal = (Mgoal_left, Mgoal_right) if args.visualizer: # TODO: document this somewhere robot.visualizer_manager.sendCommand({"Mgoal": T_w_goal}) - EEP2PMPC(args, robot, T_w_goal) + CrocoEEP2PMPC(args, robot, T_w_goal) print("final position:") print(robot.T_w_e) diff --git a/examples/optimal_control/dual_arm_ik_mpc.py b/examples/optimal_control/dual_arm_ik_mpc.py new file mode 100644 index 0000000..7b00318 --- /dev/null +++ b/examples/optimal_control/dual_arm_ik_mpc.py @@ -0,0 +1,44 @@ +from smc import getMinimalArgParser, getRobotFromArgs +from smc.control.optimal_control.croco_mpc_point_to_point import ( + CrocoDualEEP2PMPC, +) +from smc.control.optimal_control.util import get_OCP_args +from smc.control.cartesian_space import getClikArgs +from smc.util.define_random_goal import getRandomlyGeneratedGoal +import pinocchio as pin +import argcomplete + + +def get_args(): + parser = getMinimalArgParser() + parser = get_OCP_args(parser) + parser = getClikArgs(parser) # literally just for goal error + argcomplete.autocomplete(parser) + args = parser.parse_args() + return args + + +if __name__ == "__main__": + args = get_args() + robot = getRobotFromArgs(args) + T_w_goal = getRandomlyGeneratedGoal(args) + T_absgoal_lgoal = pin.SE3.Identity() + T_absgoal_lgoal.translation[1] = 0.1 + T_absgoal_rgoal = pin.SE3.Identity() + T_absgoal_rgoal.translation[1] = -0.1 + + if args.visualizer: + # TODO: document this somewhere + robot.visualizer_manager.sendCommand({"Mgoal": T_w_goal}) + + CrocoDualEEP2PMPC(args, robot, T_w_goal) + + if args.real: + robot.stopRobot() + + if args.visualizer: + robot.killManipulatorVisualizer() + + if args.save_log: + robot._log_manager.saveLog() + robot._log_manager.plotAllControlLoops() diff --git a/python/smc/control/cartesian_space/cartesian_space_point_to_point.py b/python/smc/control/cartesian_space/cartesian_space_point_to_point.py index f9b373a..153eddf 100644 --- a/python/smc/control/cartesian_space/cartesian_space_point_to_point.py +++ b/python/smc/control/cartesian_space/cartesian_space_point_to_point.py @@ -1,7 +1,10 @@ -from IPython import embed from smc.control.control_loop_manager import ControlLoopManager from smc.robots.interfaces.single_arm_interface import SingleArmInterface from smc.robots.interfaces.dual_arm_interface import DualArmInterface +from smc.control.controller_templates.point_to_point import ( + EEP2PCtrlLoopTemplate, + DualEEP2PCtrlLoopTemplate, +) from smc.robots.interfaces.force_torque_sensor_interface import ( ForceTorqueOnSingleArmWrist, ) @@ -15,14 +18,14 @@ from typing import Callable def controlLoopClik( - args: Namespace, - robot: SingleArmInterface, # J err_vec v_cmd ik_solver: Callable[[np.ndarray, np.ndarray], np.ndarray], T_w_goal: pin.SE3, + args: Namespace, + robot: SingleArmInterface, i: int, past_data: dict[str, deque[np.ndarray]], -) -> tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]: +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ controlLoopClik --------------- @@ -30,36 +33,23 @@ def controlLoopClik( in some version of the universe this could be extended to a generic point-to-point motion control loop. """ - breakFlag = False - log_item = {} - save_past_item = {} - q = robot.q T_w_e = robot.T_w_e - # first check whether we're at the goal SEerror = T_w_e.actInv(T_w_goal) err_vector = pin.log6(SEerror).vector - if np.linalg.norm(err_vector) < args.goal_error: - breakFlag = True J = robot.getJacobian() # compute the joint velocities based on controller you passed # qd = ik_solver(J, err_vector, past_qd=past_data['dqs_cmd'][-1]) - qd = ik_solver(J, err_vector) - if qd is None: + v_cmd = ik_solver(J, err_vector) + if v_cmd is None: print("the controller you chose didn't work, using dampedPseudoinverse instead") - qd = dampedPseudoinverse(1e-2, J, err_vector) - robot.sendVelocityCommand(qd) + v_cmd = dampedPseudoinverse(1e-2, J, err_vector) - log_item["qs"] = q.reshape((robot.nq,)) - log_item["dqs"] = robot.v - log_item["dqs_cmd"] = qd.reshape((robot.nv,)) - log_item["err_norm"] = np.linalg.norm(err_vector).reshape((1,)) - # we're not saving here, but need to respect the API, - # hence the empty dict - save_past_item["dqs_cmd"] = qd.reshape((robot.nv,)) - return breakFlag, save_past_item, log_item + return v_cmd, {}, {} -def moveL(args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3) -> None: +def moveL( + args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3, run=True +) -> None | ControlLoopManager: """ moveL ----- @@ -69,7 +59,9 @@ def moveL(args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3) -> None """ # assert type(goal_point) == pin.pinocchio_pywrap.SE3 ik_solver = getIKSolver(args, robot) - controlLoop = partial(controlLoopClik, robot, ik_solver, T_w_goal) + controlLoop = partial( + EEP2PCtrlLoopTemplate, ik_solver, T_w_goal, controlLoopClik, args, robot + ) # we're not using any past data or logging, hence the empty arguments log_item = { "qs": np.zeros(robot.nq), @@ -77,13 +69,14 @@ def moveL(args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3) -> None "dqs_cmd": np.zeros(robot.nv), "err_norm": np.zeros(1), } - save_past_dict = { - "dqs_cmd": np.zeros(robot.nv), - } + save_past_dict = {} loop_manager = ControlLoopManager( robot, controlLoop, args, save_past_dict, log_item ) - loop_manager.run() + if run: + loop_manager.run() + else: + return loop_manager # TODO: implement @@ -156,7 +149,7 @@ def moveUntilContactControlLoop( def moveUntilContact( args: Namespace, robot: ForceTorqueOnSingleArmWrist, speed: np.ndarray -): +) -> None: """ moveUntilContact ----- @@ -174,16 +167,16 @@ def moveUntilContact( def controlLoopClikDualArm( - args: Namespace, - robot: DualArmInterface, # J err_vec v_cmd ik_solver: Callable[[np.ndarray, np.ndarray], np.ndarray], T_w_absgoal: pin.SE3, T_abs_lgoal: pin.SE3, T_abs_rgoal: pin.SE3, + args: Namespace, + robot: DualArmInterface, i: int, past_data: dict[str, deque[np.ndarray]], -) -> tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]: +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ controlLoopClikDualArm --------------- @@ -192,8 +185,6 @@ def controlLoopClikDualArm( and an SE3 transformation on the goal for each arm """ - breakFlag = False - log_item = {} T_w_lgoal = T_abs_lgoal.act(T_w_absgoal) T_w_rgoal = T_abs_rgoal.act(T_w_absgoal) @@ -203,23 +194,10 @@ def controlLoopClikDualArm( err_vector_left = pin.log6(SEerror_left).vector err_vector_right = pin.log6(SEerror_right).vector - if (np.linalg.norm(err_vector_left) < robot.args.goal_error) and ( - np.linalg.norm(err_vector_right) < robot.args.goal_error - ): - breakFlag = True err_vector = np.concatenate((err_vector_left, err_vector_right)) J = robot.getJacobian() - # compute the joint velocities based on controller you passed - # this works exactly the way you think it does: - # the velocities for the other arm are 0 - # what happens to the base hasn't been investigated but it seems ok v_cmd = ik_solver(J, err_vector) - robot.sendVelocityCommand(v_cmd) - - log_item["qs"] = robot.q - log_item["dqs"] = robot.v - log_item["dqs_cmd"] = v_cmd - return breakFlag, {}, log_item + return v_cmd, {}, {} def moveLDualArm( @@ -229,7 +207,7 @@ def moveLDualArm( T_abs_l: pin.SE3, T_abs_r: pin.SE3, run=True, -) -> None: +) -> None | ControlLoopManager: """ moveLDualArm ----------- @@ -238,13 +216,22 @@ def moveLDualArm( robot._mode = DualArmInterface.control_mode.both ik_solver = getIKSolver(args, robot) controlLoop = partial( - controlLoopClikDualArm, args, robot, ik_solver, T_w_goal, T_abs_l, T_abs_r + DualEEP2PCtrlLoopTemplate, + ik_solver, + T_w_goal, + T_abs_l, + T_abs_r, + controlLoopClikDualArm, + args, + robot, ) # we're not using any past data or logging, hence the empty arguments log_item = { "qs": np.zeros(robot.nq), "dqs": np.zeros(robot.nv), "dqs_cmd": np.zeros(robot.nv), + "l_err_norm": np.zeros(6), + "r_err_norm": np.zeros(6), } save_past_dict = {} loop_manager = ControlLoopManager( @@ -254,34 +241,3 @@ def moveLDualArm( loop_manager.run() else: return loop_manager - - -# NOTE: this shit makes no sense, it's old and broken -# but you will want to look at it for dual arm stuff so i'm leaving it here for the time being -""" -def moveLDualArm( - args, robot: AbstractRobotManager, goal_point, goal_transform, run=True -): - - moveL - ----- - does moveL. - send a SE3 object as goal point. - if you don't care about rotation, make it np.zeros((3,3)) - - # assert type(goal_point) == pin.pinocchio_pywrap.SE3 - robot.Mgoal = copy.deepcopy(goal_point) - ik_solver = getIKSolver(args, robot) - controlLoop = partial(controlLoopClikDualArm, robot, ik_solver, goal_transform) - # we're not using any past data or logging, hence the empty arguments - log_item = { - "qs": np.zeros(robot.model.nq), - "dqs": np.zeros(robot.model.nv), - "dqs_cmd": np.zeros(robot.model.nv), - } - loop_manager = ControlLoopManager(robot, controlLoop, args, {}, log_item) - if run: - loop_manager.run() - else: - return loop_manager -""" diff --git a/python/smc/control/controller_templates/point_to_point.py b/python/smc/control/controller_templates/point_to_point.py index 1f2a61e..e2b3596 100644 --- a/python/smc/control/controller_templates/point_to_point.py +++ b/python/smc/control/controller_templates/point_to_point.py @@ -1,38 +1,46 @@ from smc.robots.interfaces.single_arm_interface import SingleArmInterface +from smc.robots.interfaces.dual_arm_interface import DualArmInterface from pinocchio import SE3, log6 from argparse import Namespace from typing import Any, Callable import numpy as np +from collections import deque + +from smc.robots.interfaces.whole_body_interface import SingleArmWholeBodyInterface global control_loop_return control_loop_return = tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]] -def Point2PointControlLoop( - SOLVER: Callable, +def EEP2PCtrlLoopTemplate( + SOLVER: Any, T_w_goal: SE3, control_loop: Callable[ - [Any, SE3, Namespace, SingleArmInterface, dict[str, np.ndarray], int], - np.ndarray, + [Any, SE3, Namespace, SingleArmInterface, int, dict[str, deque[np.ndarray]]], + tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]], ], args: Namespace, robot: SingleArmInterface, - past_data: dict[str, np.ndarray], t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], ) -> control_loop_return: """ - Point2PointControlLoop + EEP2PCtrlLoopTemplate --------------- - generic control loop for point to point motion + generic control loop for point to point motion with for the end-effector (handling error to final point etc). """ breakFlag = False log_item = {} save_past_item = {} - v_cmd = control_loop(SOLVER, T_w_goal, args, robot, past_data, t) + v_cmd, past_item_inner, log_item_inner = control_loop( + SOLVER, T_w_goal, args, robot, t, past_data + ) robot.sendVelocityCommand(v_cmd) + log_item.update(log_item_inner) + save_past_item.update(past_item_inner) T_w_e = robot.T_w_e SEerror = T_w_e.actInv(T_w_goal) @@ -44,3 +52,193 @@ def Point2PointControlLoop( log_item["dqs"] = robot.v log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) return breakFlag, save_past_item, log_item + + +def DualEEP2PCtrlLoopTemplate( + SOLVER: Any, + T_w_absgoal: SE3, + T_abs_l: SE3, + T_abs_r: SE3, + control_loop: Callable[ + [ + Any, + SE3, + SE3, + SE3, + Namespace, + DualArmInterface, + int, + dict[str, deque[np.ndarray]], + ], + tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]], + ], + args: Namespace, + robot: DualArmInterface, + t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], +) -> control_loop_return: + """ + DualEEP2PCtrlLoopTemplate + --------------- + generic control loop for point to point motion for end-effectors of a dual arm robot + (handling error to final point etc). + """ + breakFlag = False + log_item = {} + save_past_item = {} + + v_cmd, past_item_inner, log_item_inner = control_loop( + SOLVER, T_w_absgoal, T_abs_l, T_abs_r, args, robot, t, past_data + ) + robot.sendVelocityCommand(v_cmd) + log_item.update(log_item_inner) + save_past_item.update(past_item_inner) + + T_w_lgoal = T_abs_l.act(T_w_absgoal) + T_w_rgoal = T_abs_r.act(T_w_absgoal) + + SEerror_left = robot.T_w_l.actInv(T_w_lgoal) + SEerror_right = robot.T_w_r.actInv(T_w_rgoal) + + err_vector_left = log6(SEerror_left).vector + err_vector_right = log6(SEerror_right).vector + + if (np.linalg.norm(err_vector_left) < robot.args.goal_error) and ( + np.linalg.norm(err_vector_right) < robot.args.goal_error + ): + breakFlag = True + + log_item["qs"] = robot.q + log_item["dqs"] = robot.v + log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) + log_item["l_err_norm"] = np.linalg.norm(err_vector_left) + log_item["r_err_norm"] = np.linalg.norm(err_vector_right) + return breakFlag, save_past_item, log_item + +def EEAndBaseP2PCtrlLoopTemplate( + SOLVER: Any, + T_w_goal: SE3, + p_basegoal: np.ndarray, + control_loop: Callable[ + [ + Any, + SE3, + np.ndarray, + Namespace, + SingleArmWholeBodyInterface, + int, + dict[str, deque[np.ndarray]], + ], + tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]], + ], + args: Namespace, + robot: SingleArmWholeBodyInterface, + t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], +) -> control_loop_return: + """ + EEAndBaseP2PCtrlLoopTemplate + --------------- + generic control loop for point to point motion for end-effectors of a dual arm robot + (handling error to final point etc). + """ + breakFlag = False + log_item = {} + save_past_item = {} + + v_cmd, past_item_inner, log_item_inner = control_loop( + SOLVER, T_w_goal, p_basegoal, args, robot, t, past_data + ) + robot.sendVelocityCommand(v_cmd) + log_item.update(log_item_inner) + save_past_item.update(past_item_inner) + + T_w_e = robot.T_w_e + SEerror = T_w_e.actInv(T_w_goal) + ee_err_vector = log6(SEerror).vector + ee_err_vector_norm = np.linalg.norm(ee_err_vector) + + p_base = np.array(list(robot.q[:2]) + [0.0]) + base_err_vector_norm = np.linalg.norm(p_basegoal - p_base) + + if (ee_err_vector_norm < robot.args.goal_error) and ( + base_err_vector_norm < robot.args.goal_error + ): + breakFlag = True + + log_item["qs"] = robot.q + log_item["dqs"] = robot.v + log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) + log_item["ee_err_norm"] = ee_err_vector_norm.reshape((1,)) + log_item["base_err_norm"] = base_err_vector_norm.reshape((1,)) + return breakFlag, save_past_item, log_item + + +def DualEEAndBaseP2PCtrlLoopTemplate( + SOLVER: Any, + T_w_absgoal: SE3, + T_abs_l: SE3, + T_abs_r: SE3, + p_basegoal: np.ndarray, + control_loop: Callable[ + [ + Any, + SE3, + SE3, + SE3, + np.ndarray, + Namespace, + DualArmWholeBodyInterface, + int, + dict[str, deque[np.ndarray]], + ], + tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]], + ], + args: Namespace, + robot: DualArmWholeBodyInterface, + t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], +) -> control_loop_return: + """ + DualEEAndBaseP2PCtrlLoopTemplate + --------------- + generic control loop for point to point motion for end-effectors of a dual arm robot + (handling error to final point etc). + """ + breakFlag = False + log_item = {} + save_past_item = {} + + v_cmd, past_item_inner, log_item_inner = control_loop( + SOLVER, T_w_absgoal, T_abs_l, T_abs_r, p_basegoal, args, robot, t, past_data + ) + robot.sendVelocityCommand(v_cmd) + log_item.update(log_item_inner) + save_past_item.update(past_item_inner) + + T_w_lgoal = T_abs_l.act(T_w_absgoal) + T_w_rgoal = T_abs_r.act(T_w_absgoal) + SEerror_left = robot.T_w_l.actInv(T_w_lgoal) + SEerror_right = robot.T_w_r.actInv(T_w_rgoal) + err_vector_left = log6(SEerror_left).vector + err_vector_right = log6(SEerror_right).vector + err_vector_left_norm = np.linalg.norm(err_vector_left) + err_vector_right_norm = np.linalg.norm(err_vector_right) + + p_base = np.array(list(robot.q[:2]) + [0.0]) + base_err_vector_norm = np.linalg.norm(p_basegoal - p_base) + + if ( + (err_vector_left_norm < robot.args.goal_error) + and (err_vector_right_norm< robot.args.goal_error) + and (base_err_vector_norm < robot.args.goal_error) + ): + breakFlag = True + + log_item["qs"] = robot.q + log_item["dqs"] = robot.v + log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) + log_item["l_err_norm"] = err_vector_left_norm.reshape((1,)) + log_item["r_err_norm"] = err_vector_right_norm.reshape((1,)) + log_item["base_err_norm"] = base_err_vector_norm.reshape((1,)) + return breakFlag, save_past_item, log_item diff --git a/python/smc/control/optimal_control/croco_mpc_path_following.py b/python/smc/control/optimal_control/croco_mpc_path_following.py index dfab194..c8cb925 100644 --- a/python/smc/control/optimal_control/croco_mpc_path_following.py +++ b/python/smc/control/optimal_control/croco_mpc_path_following.py @@ -106,7 +106,7 @@ def CrocoEEPathFollowingMPCControlLoop( path_planner: types.FunctionType, t: int, _: dict[str, deque[np.ndarray]], -) -> tuple[np.ndarray, dict[str, np.ndarray]]: +) -> tuple[np.ndarray, dict[str, np.ndarray],dict[str, np.ndarray]]: """ CrocoPathFollowingMPCControlLoop ----------------------------- diff --git a/python/smc/control/optimal_control/croco_mpc_point_to_point.py b/python/smc/control/optimal_control/croco_mpc_point_to_point.py index 0e0aeb6..72a34e4 100644 --- a/python/smc/control/optimal_control/croco_mpc_point_to_point.py +++ b/python/smc/control/optimal_control/croco_mpc_point_to_point.py @@ -1,58 +1,117 @@ -from argparse import Namespace +from smc.control.controller_templates.point_to_point import ( + DualEEAndBaseP2PCtrlLoopTemplate, + DualEEP2PCtrlLoopTemplate, + EEAndBaseP2PCtrlLoopTemplate, + EEP2PCtrlLoopTemplate, +) +from smc.control.optimal_control.abstract_croco_ocp import CrocoOCP from smc.robots.interfaces.single_arm_interface import SingleArmInterface -from smc.control.control_loop_manager import ControlLoopManager +from smc.robots.interfaces.dual_arm_interface import DualArmInterface from smc.control.optimal_control.point_to_point_croco_ocp import ( SingleArmIKOCP, + DualArmIKOCP, BaseAndSingleArmIKOCP, + BaseAndDualArmIKOCP ) +from smc.control.control_loop_manager import ControlLoopManager import pinocchio as pin import numpy as np from functools import partial +from collections import deque +from argparse import Namespace + +from smc.robots.interfaces.whole_body_interface import SingleArmWholeBodyInterface def CrocoEEP2PMPCControlLoop( + ocp: CrocoOCP, + T_w_goal: pin.SE3, args: Namespace, robot: SingleArmInterface, - ocp, - T_w_goal: pin.SE3, _: int, __: dict[str, deque[np.ndarray]], -): +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ CrocoIKMPCControlLoop --------------------- """ - breakFlag = False - log_item = {} + # set initial state from sensor + x0 = np.concatenate([robot.q, robot.v]) + ocp.warmstartAndReSolve(x0) + xs = np.array(ocp.solver.xs) + # NOTE: for some reason the first value is always some wild bs + vel_cmd = xs[1, robot.model.nq :] + return vel_cmd, {}, {} + + +def CrocoEEP2PMPC( + args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3, run=True +): + """ + IKMPC + ----- + run mpc for a point-to-point inverse kinematics. + note that the actual problem is solved on + a dynamics level, and velocities we command + are actually extracted from the state x(q,dq) + """ + x0 = np.concatenate([robot.q, robot.v]) + ocp = SingleArmIKOCP(args, robot, x0, T_w_goal) + ocp.solveInitialOCP(x0) + + controlLoop = partial( + EEP2PCtrlLoopTemplate, ocp, T_w_goal, CrocoEEP2PMPCControlLoop, args, robot + ) + log_item = { + "qs": np.zeros(robot.nq), + "err_norm": np.zeros(1), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), + } save_past_dict = {} + loop_manager = ControlLoopManager( + robot, controlLoop, args, save_past_dict, log_item + ) + if run: + loop_manager.run() + else: + return loop_manager - SEerror = robot.T_w_e.actInv(T_w_goal) - err_vector = pin.log6(SEerror).vector - if np.linalg.norm(err_vector) < robot.args.goal_error: - breakFlag = True +def CrocoDualEEP2PMPCControlLoop( + ocp: CrocoOCP, + T_w_abseegoal: pin.SE3, + T_absgoal_lgoal: pin.SE3, + T_absgoal_rgoal: pin.SE3, + args: Namespace, + robot: DualArmInterface, + t: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: + """ + CrocoIKMPCControlLoop + --------------------- + """ # set initial state from sensor x0 = np.concatenate([robot.q, robot.v]) ocp.warmstartAndReSolve(x0) xs = np.array(ocp.solver.xs) - us = np.array(ocp.solver.us) # NOTE: for some reason the first value is always some wild bs - vel_cmds = xs[1, robot.model.nq :] - robot.sendVelocityCommand(vel_cmds) - - log_item["qs"] = x0[: robot.model.nq] - log_item["dqs"] = x0[robot.model.nq :] - # log_item["dqs_cmd"] = vel_cmds - log_item["u_tau"] = us[0] - log_item["err_norm"] = np.linalg.norm(err_vector).reshape((1,)) - - return breakFlag, save_past_dict, log_item + vel_cmd = xs[1, robot.model.nq :] + return vel_cmd, {}, {} -def EEP2PMPC(args: Namespace, robot, T_w_goal: pin.SE3, run=True): +def CrocoDualEEP2PMPC( + args: Namespace, + robot: DualArmInterface, + T_w_absgoal: pin.SE3, + T_absgoal_l: pin.SE3, + T_absgoal_r: pin.SE3, + run=True, +): """ - IKMPC + DualEEP2PMPC ----- run mpc for a point-to-point inverse kinematics. note that the actual problem is solved on @@ -60,16 +119,24 @@ def EEP2PMPC(args: Namespace, robot, T_w_goal: pin.SE3, run=True): are actually extracted from the state x(q,dq) """ x0 = np.concatenate([robot.q, robot.v]) - ocp = SingleArmIKOCP(args, robot, x0, T_w_goal) + ocp = DualArmIKOCP(args, robot, x0, T_w_absgoal) ocp.solveInitialOCP(x0) - controlLoop = partial(CrocoEEP2PMPCControlLoop, args, robot, ocp, T_w_goal) + controlLoop = partial( + DualEEP2PCtrlLoopTemplate, + ocp, + T_w_absgoal, + T_absgoal_l, + T_absgoal_r, + CrocoDualEEP2PMPCControlLoop, + args, + robot, + ) log_item = { - "qs": np.zeros(robot.model.nq), - "dqs": np.zeros(robot.model.nv), - # "dqs_cmd": np.zeros(robot.model.nv), # we're assuming full actuation here - "u_tau": np.zeros(robot.model.nv), + "qs": np.zeros(robot.nq), "err_norm": np.zeros(1), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), } save_past_dict = {} loop_manager = ControlLoopManager( @@ -82,50 +149,30 @@ def EEP2PMPC(args: Namespace, robot, T_w_goal: pin.SE3, run=True): def CrocoP2PEEAndBaseMPCControlLoop( - args, - robot: SingleArmInterface, - ocp, + ocp: CrocoOCP, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, - _: int, - __: dict[str, deque[np.ndarray]], -): + args: Namespace, + robot: SingleArmWholeBodyInterface, + t: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ CrocoP2PEEAndBaseMPCControlLoop --------------------- """ - breakFlag = False - log_item = {} - save_past_dict = {} - - ee_SEerror = robot.T_w_e.actInv(T_w_eegoal) - ee_err_vector = pin.log6(ee_SEerror).vector - ee_err_vector_norm = np.linalg.norm(ee_err_vector) - # TODO: make this a property of mobilebaseinterface - p_base = np.array(list(robot.q[:2]) + [0.0]) - base_err_vector_norm = np.linalg.norm(p_basegoal - p_base) - if (ee_err_vector_norm < args.goal_error) and ( - base_err_vector_norm < args.goal_error - ): - breakFlag = True - # set initial state from sensor x0 = np.concatenate([robot.q, robot.v]) ocp.warmstartAndReSolve(x0) xs = np.array(ocp.solver.xs) # NOTE: for some reason the first value is always some wild bs - vel_cmds = xs[1, robot.model.nq :] - robot.sendVelocityCommand(vel_cmds) - - log_item["qs"] = x0[: robot.model.nq] - log_item["dqs"] = x0[robot.model.nq :] - log_item["ee_err_norm"] = ee_err_vector_norm.reshape((1,)) - log_item["base_err_norm"] = base_err_vector_norm.reshape((1,)) - - return breakFlag, save_past_dict, log_item + vel_cmd = xs[1, robot.model.nq :] + return vel_cmd, {}, {} -def EEAndBaseP2PMPC(args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, run=True): +def CrocoEEAndBaseP2PMPC( + args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, run=True +): """ IKMPC ----- @@ -140,13 +187,18 @@ def EEAndBaseP2PMPC(args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, ru ocp.solveInitialOCP(x0) controlLoop = partial( - CrocoP2PEEAndBaseMPCControlLoop, args, robot, ocp, T_w_eegoal, p_basegoal + EEAndBaseP2PCtrlLoopTemplate, + ocp, + T_w_eegoal, + p_basegoal, + CrocoP2PEEAndBaseMPCControlLoop, + args, + robot, ) log_item = { - "qs": np.zeros(robot.model.nq), - "dqs": np.zeros(robot.model.nv), - # "dqs_cmd": np.zeros(robot.model.nv), # we're assuming full actuation here - # "u_tau": np.zeros(robot.model.nv), + "qs": np.zeros(robot.nq), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), "ee_err_norm": np.zeros(1), "base_err_norm": np.zeros(1), } @@ -158,3 +210,78 @@ def EEAndBaseP2PMPC(args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, ru loop_manager.run() else: return loop_manager + + +def CrocoP2PDualEEAndBaseMPCControlLoop( + ocp, + T_w_abseegoal: pin.SE3, + T_absgoal_lgoal: pin.SE3, + T_absgoal_rgoal: pin.SE3, + p_basegoal: np.ndarray, + args: Namespace, + robot: DualArmWholeBodyInterface, + t: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: + """ + CrocoP2PEEAndBaseMPCControlLoop + --------------------- + """ + x0 = np.concatenate([robot.q, robot.v]) + ocp.warmstartAndReSolve(x0) + xs = np.array(ocp.solver.xs) + # NOTE: for some reason the first value is always some wild bs + v_cmd = xs[1, robot.model.nq :] + + return v_cmd, {}, {} + + +def CrocoDualEEAndBaseP2PMPC( + args: Namespace, + robot: DualArmWholeBodyInterface, + T_w_abseegoal: pin.SE3, + T_absgoal_lgoal: pin.SE3, + T_absgoal_rgoal: pin.SE3, + p_basegoal: np.ndarray, + run=True, +) -> None | ControlLoopManager: + """ + IKMPC + ----- + run mpc for a point-to-point inverse kinematics. + note that the actual problem is solved on + a dynamics level, and velocities we command + are actually extracted from the state x(q,dq) + """ + x0 = np.concatenate([robot.q, robot.v]) + goal = (T_w_abseegoal, p_basegoal) + ocp = BaseAndDualArmIKOCP(args, robot, x0, goal) + ocp.solveInitialOCP(x0) + + controlLoop = partial( + DualEEAndBaseP2PCtrlLoopTemplate, + ocp, + T_w_abseegoal, + T_absgoal_lgoal, + T_absgoal_rgoal, + p_basegoal, + CrocoP2PDualEEAndBaseMPCControlLoop, + args, + robot, + ) + log_item = { + "qs": np.zeros(robot.nq), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), + "l_err_norm": np.zeros(1), + "r_err_norm": np.zeros(1), + "base_err_norm": np.zeros(1), + } + save_past_dict = {} + loop_manager = ControlLoopManager( + robot, controlLoop, args, save_past_dict, log_item + ) + if run: + loop_manager.run() + else: + return loop_manager diff --git a/python/smc/control/optimal_control/point_to_point_croco_ocp.py b/python/smc/control/optimal_control/point_to_point_croco_ocp.py index d532e77..707c0a1 100644 --- a/python/smc/control/optimal_control/point_to_point_croco_ocp.py +++ b/python/smc/control/optimal_control/point_to_point_croco_ocp.py @@ -22,7 +22,7 @@ class SingleArmIKOCP(CrocoOCP): T_w_goal = goal framePlacementResidual = crocoddyl.ResidualModelFramePlacement( self.state, - self.robot.model.getFrameId("tool0"), + self.robot.ee_frame_id, T_w_goal.copy(), self.state.nv, ) @@ -31,7 +31,7 @@ class SingleArmIKOCP(CrocoOCP): ) frameVelocityResidual = crocoddyl.ResidualModelFrameVelocity( self.state, - self.robot.model.getFrameId("tool0"), + self.robot.ee_frame_id, pin.Motion(np.zeros(6)), pin.ReferenceFrame.WORLD, ) @@ -160,7 +160,7 @@ class BaseAndSingleArmIKOCP(SingleArmIKOCP): # self, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray self, goal, - ): + ) -> None: T_w_eegoal, p_basegoal = goal super().constructTaskObjectiveFunction(T_w_eegoal) for i in range(self.args.n_knots): @@ -196,3 +196,57 @@ class BaseAndSingleArmIKOCP(SingleArmIKOCP): self.solver.problem.terminalModel.differential.costs.costs[ "base_translation" + str(self.args.n_knots) ].cost.residual.reference = p_basegoal + +class BaseAndDualArmIKOCP(DualArmIKOCP): + def __init__( + self, + args: Namespace, + robot: DualArmWholeBodyInterface, + x0: np.ndarray, + goal, + # T_w_eegoal: pin.SE3, + # p_basegoal: np.ndarray, + ): + # T_w_eegoal, p_basegoal = goal + super().__init__(args, robot, x0, goal) + + def constructTaskObjectiveFunction( + # self, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray + self, + goal, + )-> None: + T_w_lgoal, T_w_rgoal, p_basegoal = goal + super().constructTaskObjectiveFunction((T_w_lgoal, T_w_rgoal)) + for i in range(self.args.n_knots): + baseTranslationResidual = crocoddyl.ResidualModelFrameTranslation( + self.state, self.robot.base_frame_id, p_basegoal, self.state.nv + ) + baseTrackingCost = crocoddyl.CostModelResidual( + self.state, baseTranslationResidual + ) + self.runningCostModels[i].addCost( + "base_translation" + str(i), + baseTrackingCost, + self.args.base_translation_cost, + ) + self.terminalCostModel.addCost( + "base_translation" + str(self.args.n_knots), + baseTrackingCost, + self.args.base_translation_cost, + ) + + # there is nothing to update in a point-to-point task + def updateCosts(self, data) -> None: + pass + + def updateGoalInModels(self, goal) -> None: + # self, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray) -> None: + T_w_lgoal, T_w_rgoal, p_basegoal = goal + super().updateGoalInModels(T_w_lgoal, T_w_rgoal) + for i, runningModel in enumerate(self.solver.problem.runningModels): + runningModel.differential.costs.costs[ + "base_translation" + str(i) + ].cost.residual.reference = p_basegoal + self.solver.problem.terminalModel.differential.costs.costs[ + "base_translation" + str(self.args.n_knots) + ].cost.residual.reference = p_basegoal -- GitLab