diff --git a/development_plan.toml b/development_plan.toml index f21add641660e643f3487d89b27f6a9a2cb7bd0d..dde571a98043dfc0ba0c935d874c5252d1729250 100644 --- a/development_plan.toml +++ b/development_plan.toml @@ -146,7 +146,7 @@ dependencies = ["robotmanager abstract class"] purpose = """extending existing control for dual arm robots without compromising the cleanliness of existing controllers for single arms""" hours_required = 8 -is_done = false +is_done = true [[project_plan.action_items.action_items]] @@ -204,8 +204,23 @@ deadline = 2025-01-31 dependencies = ["robotmanager abstract class"] purpose = """actual process management beyond ducktaping plotter and visualizer to loopmanager and robotmanager""" hours_required = 5 -is_done = false +is_done = true +[[project_plan.action_items.action_items]] +name = "write out control templates" +description = """ +setting the controlloopmanager - controlloop relationship to be composition, inspired by the functional programming paradigm is the winner of the previous todo. + +now the control stack should be re-written to reflect this. + +the KEY benefit of composition is that you don't need to use any of these templates. but using them in within the core of the library will drastically reduce the amount of code, thereby improving extensibility and maintainability. +""" +priority = 2 +deadline = 2025-01-31 +dependencies = ["explicit relationship between controlLoop and ControlLoopManager"] +purpose = """reducing the amount of code, thereby improving extensibility and maintainability""" +hours_required = 5 +is_done = false [[project_plan.action_items.action_items]] @@ -326,7 +341,7 @@ street cred""" hours_required = 80 is_done = false -[[project_plan.action_items.action_items]] +[[project_plan.action_items]] name = "tidy up typing" description = """ sit down and get typing coverage to 100% or whatever is the limit of reason. @@ -337,7 +352,7 @@ dependencies = ["create/fix type-checking workflow"] purpose = """easier code writting and debugging, making library professional-grade""" # -1 means it should be a sum over sub-items hours_required = 8 -is_done = true +is_done = false [[project_plan.action_items]] name = "python stdlib logging instead of printing" diff --git a/examples/cart_pulling/MASSIVE_TODO b/examples/cart_pulling/MASSIVE_TODO index eecf06f159f04179c22031c7d0ed83b9b86d6ad8..3c16504c9c08d1ba81dd26b6d7dbf08373630217 100644 --- a/examples/cart_pulling/MASSIVE_TODO +++ b/examples/cart_pulling/MASSIVE_TODO @@ -1,5 +1,3 @@ -1. you have to find a way to keep track of the path for the end-effector. one trillion percent it manages to break itself, you need to save it in a log and replay everything. make change to the log_item thing to make not everything automatically plottable (check value type or something) - --> preferably do that while running the disjoint control because it survives the hassle 2. make proper path-tracking cost in python, that's shouldn't be too much of a hassle 2.1. do it with numdiff first 2.2. put in the analytic derivative diff --git a/examples/cart_pulling/control_sub_problems/fixed_paths/dual_arm_path_following.py b/examples/cart_pulling/control_sub_problems/fixed_paths/dual_arm_path_following.py index 35c272cd13234ebc52c198144181223f3824fad3..2e11a54104211e46764924edde04603c624414d5 100644 --- a/examples/cart_pulling/control_sub_problems/fixed_paths/dual_arm_path_following.py +++ b/examples/cart_pulling/control_sub_problems/fixed_paths/dual_arm_path_following.py @@ -1,2 +1,53 @@ -# same as ee_only_path_following_mpc in a circle, just use a dual arm instead. -# the reference is still a single 6D path, but now with dual arms +from smc import getRobotFromArgs +from smc import getMinimalArgParser +from smc.control.optimal_control.util import get_OCP_args +from smc.control.cartesian_space import getClikArgs +from smc.control.optimal_control.croco_mpc_path_following import ( + CrocoEEPathFollowingMPC, +) +import numpy as np + + +def path(T_w_e, i): + # 2) do T_mobile_base_ee_pos to get + # end-effector reference frame(s) + + # generate bullshit just to see it works + path = [] + t = i * robot.dt + for i in range(args.n_knots): + t += 0.01 + new = T_w_e.copy() + translation = 2 * np.array([np.cos(t / 20), np.sin(t / 20), 0.3]) + new.translation = translation + path.append(new) + return path + + +def get_args(): + parser = getMinimalArgParser() + parser = get_OCP_args(parser) + parser = getClikArgs(parser) # literally just for goal error + args = parser.parse_args() + return args + + +if __name__ == "__main__": + args = get_args() + robot = getRobotFromArgs(args) + x0 = np.concatenate([robot.q, robot.v]) + robot._step() + + CrocoEEPathFollowingMPC(args, robot, x0, path) + + print("final position:", robot.T_w_e) + + if args.real: + robot.stopRobot() + + if args.save_log: + robot._log_manager.saveLog() + robot._log_manager.plotAllControlLoops() + + if args.visualizer: + robot.killManipulatorVisualizer() diff --git a/examples/cart_pulling/control_sub_problems/with_planner/base_and_ee_path_following_from_planner.py b/examples/cart_pulling/control_sub_problems/with_planner/base_and_ee_path_following_from_planner.py index e74dbd591649193532cfc1f6709dfe7f63bc7b9f..2d34a8528513c6f8fafbda71ef093e0b260ef59a 100644 --- a/examples/cart_pulling/control_sub_problems/with_planner/base_and_ee_path_following_from_planner.py +++ b/examples/cart_pulling/control_sub_problems/with_planner/base_and_ee_path_following_from_planner.py @@ -5,7 +5,7 @@ from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3 from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs from smc.path_generation.planner import starPlanner, getPlanningArgs -from smc.control.optimal_control.croco_mpc_point_to_point import EEAndBaseP2PMPC +from smc.control.optimal_control.croco_mpc_point_to_point import CrocoEEAndBaseP2PMPC from smc.control.optimal_control.croco_mpc_path_following import ( BaseAndEEPathFollowingMPC, ) @@ -86,7 +86,7 @@ if __name__ == "__main__": print(pathSE3[0].translation) print(p_base) # TODO: UNCOMMENT - EEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) + CrocoEEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) print("initialized!") BaseAndEEPathFollowingMPC(args, robot, path_planner) diff --git a/examples/cart_pulling/control_sub_problems/with_planner/ee_only_path_following_from_plannner.py b/examples/cart_pulling/control_sub_problems/with_planner/ee_only_path_following_from_plannner.py index 05efbb44f1dfa211317f6791be278dfbe6ef20cc..783743ab63ea13a1c2fe0c05610867a5a1ff60fc 100644 --- a/examples/cart_pulling/control_sub_problems/with_planner/ee_only_path_following_from_plannner.py +++ b/examples/cart_pulling/control_sub_problems/with_planner/ee_only_path_following_from_plannner.py @@ -5,7 +5,7 @@ from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3 from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs from smc.path_generation.planner import starPlanner, getPlanningArgs -from smc.control.optimal_control.croco_mpc_point_to_point import EEP2PMPC +from smc.control.optimal_control.croco_mpc_point_to_point import CrocoEEP2PMPC from smc.control.optimal_control.croco_mpc_path_following import ( CrocoEEPathFollowingMPC, ) @@ -71,7 +71,7 @@ if __name__ == "__main__": robot.visualizer_manager.sendCommand({"Mgoal": pathSE3[0]}) if np.linalg.norm(pin.log6(T_w_e.actInv(pathSE3[0]))) > 1e-2: print("going to initial path position") - EEP2PMPC(args, robot, pathSE3[0]) + CrocoEEP2PMPC(args, robot, pathSE3[0]) print("initialized!") CrocoEEPathFollowingMPC(args, robot, x0, path_planner) diff --git a/examples/cart_pulling/disjoint_control/demo.py b/examples/cart_pulling/disjoint_control/demo.py index 0531b4cbd96fb4c093b2dc12d3a855b3fb9f90e2..ba2f6bd3dfa289807eefb7ba657817eb6cd1ec92 100644 --- a/examples/cart_pulling/disjoint_control/demo.py +++ b/examples/cart_pulling/disjoint_control/demo.py @@ -5,7 +5,7 @@ from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3 from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs from smc.path_generation.planner import starPlanner, getPlanningArgs -from smc.control.optimal_control.croco_mpc_point_to_point import EEAndBaseP2PMPC +from smc.control.optimal_control.croco_mpc_point_to_point import CrocoEEAndBaseP2PMPC from smc.multiprocessing import ProcessManager from mpc_base_clik_arm_control_loop import BaseMPCANDEECLIKCartPulling @@ -84,7 +84,7 @@ if __name__ == "__main__": print(pathSE3[0].translation) print(p_base) # TODO: UNCOMMENT - EEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) + CrocoEEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) print("initialized!") BaseMPCANDEECLIKCartPulling(args, robot, path_planner) diff --git a/examples/optimal_control/croco_ik_mpc.py b/examples/optimal_control/croco_ik_mpc.py index d2c23286211435fd8c59725ba8a2bef1f84bf036..71f1c434ee83b46d66f04d7e7dd2dcaee681a1ef 100644 --- a/examples/optimal_control/croco_ik_mpc.py +++ b/examples/optimal_control/croco_ik_mpc.py @@ -1,7 +1,6 @@ -import numpy as np from smc import getMinimalArgParser, getRobotFromArgs from smc.control.optimal_control.croco_mpc_point_to_point import ( - EEP2PMPC, + CrocoEEP2PMPC, ) from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs @@ -24,19 +23,12 @@ if __name__ == "__main__": # TODO: put this back for nicer demos # Mgoal = defineGoalPointCLI() T_w_goal = pin.SE3.Random() - if robot.robot_name == "yumi": - T_w_goal.rotation = np.eye(3) - T_w_goal_transform = pin.SE3.Identity() - T_w_goal_transform.translation[1] = 0.1 - Mgoal_left = T_w_goal_transform.act(T_w_goal) - Mgoal_right = T_w_goal_transform.inverse().act(T_w_goal) - T_w_goal = (Mgoal_left, Mgoal_right) if args.visualizer: # TODO: document this somewhere robot.visualizer_manager.sendCommand({"Mgoal": T_w_goal}) - EEP2PMPC(args, robot, T_w_goal) + CrocoEEP2PMPC(args, robot, T_w_goal) print("final position:") print(robot.T_w_e) diff --git a/examples/optimal_control/dual_arm_ik_mpc.py b/examples/optimal_control/dual_arm_ik_mpc.py new file mode 100644 index 0000000000000000000000000000000000000000..7b0031825a7d33ce3723ae590450d5d6dee56b1f --- /dev/null +++ b/examples/optimal_control/dual_arm_ik_mpc.py @@ -0,0 +1,44 @@ +from smc import getMinimalArgParser, getRobotFromArgs +from smc.control.optimal_control.croco_mpc_point_to_point import ( + CrocoDualEEP2PMPC, +) +from smc.control.optimal_control.util import get_OCP_args +from smc.control.cartesian_space import getClikArgs +from smc.util.define_random_goal import getRandomlyGeneratedGoal +import pinocchio as pin +import argcomplete + + +def get_args(): + parser = getMinimalArgParser() + parser = get_OCP_args(parser) + parser = getClikArgs(parser) # literally just for goal error + argcomplete.autocomplete(parser) + args = parser.parse_args() + return args + + +if __name__ == "__main__": + args = get_args() + robot = getRobotFromArgs(args) + T_w_goal = getRandomlyGeneratedGoal(args) + T_absgoal_lgoal = pin.SE3.Identity() + T_absgoal_lgoal.translation[1] = 0.1 + T_absgoal_rgoal = pin.SE3.Identity() + T_absgoal_rgoal.translation[1] = -0.1 + + if args.visualizer: + # TODO: document this somewhere + robot.visualizer_manager.sendCommand({"Mgoal": T_w_goal}) + + CrocoDualEEP2PMPC(args, robot, T_w_goal) + + if args.real: + robot.stopRobot() + + if args.visualizer: + robot.killManipulatorVisualizer() + + if args.save_log: + robot._log_manager.saveLog() + robot._log_manager.plotAllControlLoops() diff --git a/python/smc/control/cartesian_space/cartesian_space_point_to_point.py b/python/smc/control/cartesian_space/cartesian_space_point_to_point.py index f9b373a0908d2cf85d3060502462d6288757a1c7..153eddf3844b847030bf736ef07b9bfbd7b112f6 100644 --- a/python/smc/control/cartesian_space/cartesian_space_point_to_point.py +++ b/python/smc/control/cartesian_space/cartesian_space_point_to_point.py @@ -1,7 +1,10 @@ -from IPython import embed from smc.control.control_loop_manager import ControlLoopManager from smc.robots.interfaces.single_arm_interface import SingleArmInterface from smc.robots.interfaces.dual_arm_interface import DualArmInterface +from smc.control.controller_templates.point_to_point import ( + EEP2PCtrlLoopTemplate, + DualEEP2PCtrlLoopTemplate, +) from smc.robots.interfaces.force_torque_sensor_interface import ( ForceTorqueOnSingleArmWrist, ) @@ -15,14 +18,14 @@ from typing import Callable def controlLoopClik( - args: Namespace, - robot: SingleArmInterface, # J err_vec v_cmd ik_solver: Callable[[np.ndarray, np.ndarray], np.ndarray], T_w_goal: pin.SE3, + args: Namespace, + robot: SingleArmInterface, i: int, past_data: dict[str, deque[np.ndarray]], -) -> tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]: +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ controlLoopClik --------------- @@ -30,36 +33,23 @@ def controlLoopClik( in some version of the universe this could be extended to a generic point-to-point motion control loop. """ - breakFlag = False - log_item = {} - save_past_item = {} - q = robot.q T_w_e = robot.T_w_e - # first check whether we're at the goal SEerror = T_w_e.actInv(T_w_goal) err_vector = pin.log6(SEerror).vector - if np.linalg.norm(err_vector) < args.goal_error: - breakFlag = True J = robot.getJacobian() # compute the joint velocities based on controller you passed # qd = ik_solver(J, err_vector, past_qd=past_data['dqs_cmd'][-1]) - qd = ik_solver(J, err_vector) - if qd is None: + v_cmd = ik_solver(J, err_vector) + if v_cmd is None: print("the controller you chose didn't work, using dampedPseudoinverse instead") - qd = dampedPseudoinverse(1e-2, J, err_vector) - robot.sendVelocityCommand(qd) + v_cmd = dampedPseudoinverse(1e-2, J, err_vector) - log_item["qs"] = q.reshape((robot.nq,)) - log_item["dqs"] = robot.v - log_item["dqs_cmd"] = qd.reshape((robot.nv,)) - log_item["err_norm"] = np.linalg.norm(err_vector).reshape((1,)) - # we're not saving here, but need to respect the API, - # hence the empty dict - save_past_item["dqs_cmd"] = qd.reshape((robot.nv,)) - return breakFlag, save_past_item, log_item + return v_cmd, {}, {} -def moveL(args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3) -> None: +def moveL( + args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3, run=True +) -> None | ControlLoopManager: """ moveL ----- @@ -69,7 +59,9 @@ def moveL(args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3) -> None """ # assert type(goal_point) == pin.pinocchio_pywrap.SE3 ik_solver = getIKSolver(args, robot) - controlLoop = partial(controlLoopClik, robot, ik_solver, T_w_goal) + controlLoop = partial( + EEP2PCtrlLoopTemplate, ik_solver, T_w_goal, controlLoopClik, args, robot + ) # we're not using any past data or logging, hence the empty arguments log_item = { "qs": np.zeros(robot.nq), @@ -77,13 +69,14 @@ def moveL(args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3) -> None "dqs_cmd": np.zeros(robot.nv), "err_norm": np.zeros(1), } - save_past_dict = { - "dqs_cmd": np.zeros(robot.nv), - } + save_past_dict = {} loop_manager = ControlLoopManager( robot, controlLoop, args, save_past_dict, log_item ) - loop_manager.run() + if run: + loop_manager.run() + else: + return loop_manager # TODO: implement @@ -156,7 +149,7 @@ def moveUntilContactControlLoop( def moveUntilContact( args: Namespace, robot: ForceTorqueOnSingleArmWrist, speed: np.ndarray -): +) -> None: """ moveUntilContact ----- @@ -174,16 +167,16 @@ def moveUntilContact( def controlLoopClikDualArm( - args: Namespace, - robot: DualArmInterface, # J err_vec v_cmd ik_solver: Callable[[np.ndarray, np.ndarray], np.ndarray], T_w_absgoal: pin.SE3, T_abs_lgoal: pin.SE3, T_abs_rgoal: pin.SE3, + args: Namespace, + robot: DualArmInterface, i: int, past_data: dict[str, deque[np.ndarray]], -) -> tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]: +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ controlLoopClikDualArm --------------- @@ -192,8 +185,6 @@ def controlLoopClikDualArm( and an SE3 transformation on the goal for each arm """ - breakFlag = False - log_item = {} T_w_lgoal = T_abs_lgoal.act(T_w_absgoal) T_w_rgoal = T_abs_rgoal.act(T_w_absgoal) @@ -203,23 +194,10 @@ def controlLoopClikDualArm( err_vector_left = pin.log6(SEerror_left).vector err_vector_right = pin.log6(SEerror_right).vector - if (np.linalg.norm(err_vector_left) < robot.args.goal_error) and ( - np.linalg.norm(err_vector_right) < robot.args.goal_error - ): - breakFlag = True err_vector = np.concatenate((err_vector_left, err_vector_right)) J = robot.getJacobian() - # compute the joint velocities based on controller you passed - # this works exactly the way you think it does: - # the velocities for the other arm are 0 - # what happens to the base hasn't been investigated but it seems ok v_cmd = ik_solver(J, err_vector) - robot.sendVelocityCommand(v_cmd) - - log_item["qs"] = robot.q - log_item["dqs"] = robot.v - log_item["dqs_cmd"] = v_cmd - return breakFlag, {}, log_item + return v_cmd, {}, {} def moveLDualArm( @@ -229,7 +207,7 @@ def moveLDualArm( T_abs_l: pin.SE3, T_abs_r: pin.SE3, run=True, -) -> None: +) -> None | ControlLoopManager: """ moveLDualArm ----------- @@ -238,13 +216,22 @@ def moveLDualArm( robot._mode = DualArmInterface.control_mode.both ik_solver = getIKSolver(args, robot) controlLoop = partial( - controlLoopClikDualArm, args, robot, ik_solver, T_w_goal, T_abs_l, T_abs_r + DualEEP2PCtrlLoopTemplate, + ik_solver, + T_w_goal, + T_abs_l, + T_abs_r, + controlLoopClikDualArm, + args, + robot, ) # we're not using any past data or logging, hence the empty arguments log_item = { "qs": np.zeros(robot.nq), "dqs": np.zeros(robot.nv), "dqs_cmd": np.zeros(robot.nv), + "l_err_norm": np.zeros(6), + "r_err_norm": np.zeros(6), } save_past_dict = {} loop_manager = ControlLoopManager( @@ -254,34 +241,3 @@ def moveLDualArm( loop_manager.run() else: return loop_manager - - -# NOTE: this shit makes no sense, it's old and broken -# but you will want to look at it for dual arm stuff so i'm leaving it here for the time being -""" -def moveLDualArm( - args, robot: AbstractRobotManager, goal_point, goal_transform, run=True -): - - moveL - ----- - does moveL. - send a SE3 object as goal point. - if you don't care about rotation, make it np.zeros((3,3)) - - # assert type(goal_point) == pin.pinocchio_pywrap.SE3 - robot.Mgoal = copy.deepcopy(goal_point) - ik_solver = getIKSolver(args, robot) - controlLoop = partial(controlLoopClikDualArm, robot, ik_solver, goal_transform) - # we're not using any past data or logging, hence the empty arguments - log_item = { - "qs": np.zeros(robot.model.nq), - "dqs": np.zeros(robot.model.nv), - "dqs_cmd": np.zeros(robot.model.nv), - } - loop_manager = ControlLoopManager(robot, controlLoop, args, {}, log_item) - if run: - loop_manager.run() - else: - return loop_manager -""" diff --git a/python/smc/control/controller_templates/point_to_point.py b/python/smc/control/controller_templates/point_to_point.py index 1f2a61ee780585294fb1d510474f5fd694b8fb60..e2b35960c420835447f7353dfa2b207ad566b7bd 100644 --- a/python/smc/control/controller_templates/point_to_point.py +++ b/python/smc/control/controller_templates/point_to_point.py @@ -1,38 +1,46 @@ from smc.robots.interfaces.single_arm_interface import SingleArmInterface +from smc.robots.interfaces.dual_arm_interface import DualArmInterface from pinocchio import SE3, log6 from argparse import Namespace from typing import Any, Callable import numpy as np +from collections import deque + +from smc.robots.interfaces.whole_body_interface import SingleArmWholeBodyInterface global control_loop_return control_loop_return = tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]] -def Point2PointControlLoop( - SOLVER: Callable, +def EEP2PCtrlLoopTemplate( + SOLVER: Any, T_w_goal: SE3, control_loop: Callable[ - [Any, SE3, Namespace, SingleArmInterface, dict[str, np.ndarray], int], - np.ndarray, + [Any, SE3, Namespace, SingleArmInterface, int, dict[str, deque[np.ndarray]]], + tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]], ], args: Namespace, robot: SingleArmInterface, - past_data: dict[str, np.ndarray], t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], ) -> control_loop_return: """ - Point2PointControlLoop + EEP2PCtrlLoopTemplate --------------- - generic control loop for point to point motion + generic control loop for point to point motion with for the end-effector (handling error to final point etc). """ breakFlag = False log_item = {} save_past_item = {} - v_cmd = control_loop(SOLVER, T_w_goal, args, robot, past_data, t) + v_cmd, past_item_inner, log_item_inner = control_loop( + SOLVER, T_w_goal, args, robot, t, past_data + ) robot.sendVelocityCommand(v_cmd) + log_item.update(log_item_inner) + save_past_item.update(past_item_inner) T_w_e = robot.T_w_e SEerror = T_w_e.actInv(T_w_goal) @@ -44,3 +52,193 @@ def Point2PointControlLoop( log_item["dqs"] = robot.v log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) return breakFlag, save_past_item, log_item + + +def DualEEP2PCtrlLoopTemplate( + SOLVER: Any, + T_w_absgoal: SE3, + T_abs_l: SE3, + T_abs_r: SE3, + control_loop: Callable[ + [ + Any, + SE3, + SE3, + SE3, + Namespace, + DualArmInterface, + int, + dict[str, deque[np.ndarray]], + ], + tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]], + ], + args: Namespace, + robot: DualArmInterface, + t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], +) -> control_loop_return: + """ + DualEEP2PCtrlLoopTemplate + --------------- + generic control loop for point to point motion for end-effectors of a dual arm robot + (handling error to final point etc). + """ + breakFlag = False + log_item = {} + save_past_item = {} + + v_cmd, past_item_inner, log_item_inner = control_loop( + SOLVER, T_w_absgoal, T_abs_l, T_abs_r, args, robot, t, past_data + ) + robot.sendVelocityCommand(v_cmd) + log_item.update(log_item_inner) + save_past_item.update(past_item_inner) + + T_w_lgoal = T_abs_l.act(T_w_absgoal) + T_w_rgoal = T_abs_r.act(T_w_absgoal) + + SEerror_left = robot.T_w_l.actInv(T_w_lgoal) + SEerror_right = robot.T_w_r.actInv(T_w_rgoal) + + err_vector_left = log6(SEerror_left).vector + err_vector_right = log6(SEerror_right).vector + + if (np.linalg.norm(err_vector_left) < robot.args.goal_error) and ( + np.linalg.norm(err_vector_right) < robot.args.goal_error + ): + breakFlag = True + + log_item["qs"] = robot.q + log_item["dqs"] = robot.v + log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) + log_item["l_err_norm"] = np.linalg.norm(err_vector_left) + log_item["r_err_norm"] = np.linalg.norm(err_vector_right) + return breakFlag, save_past_item, log_item + +def EEAndBaseP2PCtrlLoopTemplate( + SOLVER: Any, + T_w_goal: SE3, + p_basegoal: np.ndarray, + control_loop: Callable[ + [ + Any, + SE3, + np.ndarray, + Namespace, + SingleArmWholeBodyInterface, + int, + dict[str, deque[np.ndarray]], + ], + tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]], + ], + args: Namespace, + robot: SingleArmWholeBodyInterface, + t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], +) -> control_loop_return: + """ + EEAndBaseP2PCtrlLoopTemplate + --------------- + generic control loop for point to point motion for end-effectors of a dual arm robot + (handling error to final point etc). + """ + breakFlag = False + log_item = {} + save_past_item = {} + + v_cmd, past_item_inner, log_item_inner = control_loop( + SOLVER, T_w_goal, p_basegoal, args, robot, t, past_data + ) + robot.sendVelocityCommand(v_cmd) + log_item.update(log_item_inner) + save_past_item.update(past_item_inner) + + T_w_e = robot.T_w_e + SEerror = T_w_e.actInv(T_w_goal) + ee_err_vector = log6(SEerror).vector + ee_err_vector_norm = np.linalg.norm(ee_err_vector) + + p_base = np.array(list(robot.q[:2]) + [0.0]) + base_err_vector_norm = np.linalg.norm(p_basegoal - p_base) + + if (ee_err_vector_norm < robot.args.goal_error) and ( + base_err_vector_norm < robot.args.goal_error + ): + breakFlag = True + + log_item["qs"] = robot.q + log_item["dqs"] = robot.v + log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) + log_item["ee_err_norm"] = ee_err_vector_norm.reshape((1,)) + log_item["base_err_norm"] = base_err_vector_norm.reshape((1,)) + return breakFlag, save_past_item, log_item + + +def DualEEAndBaseP2PCtrlLoopTemplate( + SOLVER: Any, + T_w_absgoal: SE3, + T_abs_l: SE3, + T_abs_r: SE3, + p_basegoal: np.ndarray, + control_loop: Callable[ + [ + Any, + SE3, + SE3, + SE3, + np.ndarray, + Namespace, + DualArmWholeBodyInterface, + int, + dict[str, deque[np.ndarray]], + ], + tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]], + ], + args: Namespace, + robot: DualArmWholeBodyInterface, + t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], +) -> control_loop_return: + """ + DualEEAndBaseP2PCtrlLoopTemplate + --------------- + generic control loop for point to point motion for end-effectors of a dual arm robot + (handling error to final point etc). + """ + breakFlag = False + log_item = {} + save_past_item = {} + + v_cmd, past_item_inner, log_item_inner = control_loop( + SOLVER, T_w_absgoal, T_abs_l, T_abs_r, p_basegoal, args, robot, t, past_data + ) + robot.sendVelocityCommand(v_cmd) + log_item.update(log_item_inner) + save_past_item.update(past_item_inner) + + T_w_lgoal = T_abs_l.act(T_w_absgoal) + T_w_rgoal = T_abs_r.act(T_w_absgoal) + SEerror_left = robot.T_w_l.actInv(T_w_lgoal) + SEerror_right = robot.T_w_r.actInv(T_w_rgoal) + err_vector_left = log6(SEerror_left).vector + err_vector_right = log6(SEerror_right).vector + err_vector_left_norm = np.linalg.norm(err_vector_left) + err_vector_right_norm = np.linalg.norm(err_vector_right) + + p_base = np.array(list(robot.q[:2]) + [0.0]) + base_err_vector_norm = np.linalg.norm(p_basegoal - p_base) + + if ( + (err_vector_left_norm < robot.args.goal_error) + and (err_vector_right_norm< robot.args.goal_error) + and (base_err_vector_norm < robot.args.goal_error) + ): + breakFlag = True + + log_item["qs"] = robot.q + log_item["dqs"] = robot.v + log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) + log_item["l_err_norm"] = err_vector_left_norm.reshape((1,)) + log_item["r_err_norm"] = err_vector_right_norm.reshape((1,)) + log_item["base_err_norm"] = base_err_vector_norm.reshape((1,)) + return breakFlag, save_past_item, log_item diff --git a/python/smc/control/optimal_control/croco_mpc_path_following.py b/python/smc/control/optimal_control/croco_mpc_path_following.py index dfab194e89fc2a3fb31284e244f22baa23621f5d..c8cb92558d13fbcb6e4dcf363630a1fbd32bbd34 100644 --- a/python/smc/control/optimal_control/croco_mpc_path_following.py +++ b/python/smc/control/optimal_control/croco_mpc_path_following.py @@ -106,7 +106,7 @@ def CrocoEEPathFollowingMPCControlLoop( path_planner: types.FunctionType, t: int, _: dict[str, deque[np.ndarray]], -) -> tuple[np.ndarray, dict[str, np.ndarray]]: +) -> tuple[np.ndarray, dict[str, np.ndarray],dict[str, np.ndarray]]: """ CrocoPathFollowingMPCControlLoop ----------------------------- diff --git a/python/smc/control/optimal_control/croco_mpc_point_to_point.py b/python/smc/control/optimal_control/croco_mpc_point_to_point.py index 0e0aeb6cc2d4f441792bae2ef601b53a412bb6a3..72a34e48219dceda839bab92962094994311eb94 100644 --- a/python/smc/control/optimal_control/croco_mpc_point_to_point.py +++ b/python/smc/control/optimal_control/croco_mpc_point_to_point.py @@ -1,58 +1,117 @@ -from argparse import Namespace +from smc.control.controller_templates.point_to_point import ( + DualEEAndBaseP2PCtrlLoopTemplate, + DualEEP2PCtrlLoopTemplate, + EEAndBaseP2PCtrlLoopTemplate, + EEP2PCtrlLoopTemplate, +) +from smc.control.optimal_control.abstract_croco_ocp import CrocoOCP from smc.robots.interfaces.single_arm_interface import SingleArmInterface -from smc.control.control_loop_manager import ControlLoopManager +from smc.robots.interfaces.dual_arm_interface import DualArmInterface from smc.control.optimal_control.point_to_point_croco_ocp import ( SingleArmIKOCP, + DualArmIKOCP, BaseAndSingleArmIKOCP, + BaseAndDualArmIKOCP ) +from smc.control.control_loop_manager import ControlLoopManager import pinocchio as pin import numpy as np from functools import partial +from collections import deque +from argparse import Namespace + +from smc.robots.interfaces.whole_body_interface import SingleArmWholeBodyInterface def CrocoEEP2PMPCControlLoop( + ocp: CrocoOCP, + T_w_goal: pin.SE3, args: Namespace, robot: SingleArmInterface, - ocp, - T_w_goal: pin.SE3, _: int, __: dict[str, deque[np.ndarray]], -): +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ CrocoIKMPCControlLoop --------------------- """ - breakFlag = False - log_item = {} + # set initial state from sensor + x0 = np.concatenate([robot.q, robot.v]) + ocp.warmstartAndReSolve(x0) + xs = np.array(ocp.solver.xs) + # NOTE: for some reason the first value is always some wild bs + vel_cmd = xs[1, robot.model.nq :] + return vel_cmd, {}, {} + + +def CrocoEEP2PMPC( + args: Namespace, robot: SingleArmInterface, T_w_goal: pin.SE3, run=True +): + """ + IKMPC + ----- + run mpc for a point-to-point inverse kinematics. + note that the actual problem is solved on + a dynamics level, and velocities we command + are actually extracted from the state x(q,dq) + """ + x0 = np.concatenate([robot.q, robot.v]) + ocp = SingleArmIKOCP(args, robot, x0, T_w_goal) + ocp.solveInitialOCP(x0) + + controlLoop = partial( + EEP2PCtrlLoopTemplate, ocp, T_w_goal, CrocoEEP2PMPCControlLoop, args, robot + ) + log_item = { + "qs": np.zeros(robot.nq), + "err_norm": np.zeros(1), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), + } save_past_dict = {} + loop_manager = ControlLoopManager( + robot, controlLoop, args, save_past_dict, log_item + ) + if run: + loop_manager.run() + else: + return loop_manager - SEerror = robot.T_w_e.actInv(T_w_goal) - err_vector = pin.log6(SEerror).vector - if np.linalg.norm(err_vector) < robot.args.goal_error: - breakFlag = True +def CrocoDualEEP2PMPCControlLoop( + ocp: CrocoOCP, + T_w_abseegoal: pin.SE3, + T_absgoal_lgoal: pin.SE3, + T_absgoal_rgoal: pin.SE3, + args: Namespace, + robot: DualArmInterface, + t: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: + """ + CrocoIKMPCControlLoop + --------------------- + """ # set initial state from sensor x0 = np.concatenate([robot.q, robot.v]) ocp.warmstartAndReSolve(x0) xs = np.array(ocp.solver.xs) - us = np.array(ocp.solver.us) # NOTE: for some reason the first value is always some wild bs - vel_cmds = xs[1, robot.model.nq :] - robot.sendVelocityCommand(vel_cmds) - - log_item["qs"] = x0[: robot.model.nq] - log_item["dqs"] = x0[robot.model.nq :] - # log_item["dqs_cmd"] = vel_cmds - log_item["u_tau"] = us[0] - log_item["err_norm"] = np.linalg.norm(err_vector).reshape((1,)) - - return breakFlag, save_past_dict, log_item + vel_cmd = xs[1, robot.model.nq :] + return vel_cmd, {}, {} -def EEP2PMPC(args: Namespace, robot, T_w_goal: pin.SE3, run=True): +def CrocoDualEEP2PMPC( + args: Namespace, + robot: DualArmInterface, + T_w_absgoal: pin.SE3, + T_absgoal_l: pin.SE3, + T_absgoal_r: pin.SE3, + run=True, +): """ - IKMPC + DualEEP2PMPC ----- run mpc for a point-to-point inverse kinematics. note that the actual problem is solved on @@ -60,16 +119,24 @@ def EEP2PMPC(args: Namespace, robot, T_w_goal: pin.SE3, run=True): are actually extracted from the state x(q,dq) """ x0 = np.concatenate([robot.q, robot.v]) - ocp = SingleArmIKOCP(args, robot, x0, T_w_goal) + ocp = DualArmIKOCP(args, robot, x0, T_w_absgoal) ocp.solveInitialOCP(x0) - controlLoop = partial(CrocoEEP2PMPCControlLoop, args, robot, ocp, T_w_goal) + controlLoop = partial( + DualEEP2PCtrlLoopTemplate, + ocp, + T_w_absgoal, + T_absgoal_l, + T_absgoal_r, + CrocoDualEEP2PMPCControlLoop, + args, + robot, + ) log_item = { - "qs": np.zeros(robot.model.nq), - "dqs": np.zeros(robot.model.nv), - # "dqs_cmd": np.zeros(robot.model.nv), # we're assuming full actuation here - "u_tau": np.zeros(robot.model.nv), + "qs": np.zeros(robot.nq), "err_norm": np.zeros(1), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), } save_past_dict = {} loop_manager = ControlLoopManager( @@ -82,50 +149,30 @@ def EEP2PMPC(args: Namespace, robot, T_w_goal: pin.SE3, run=True): def CrocoP2PEEAndBaseMPCControlLoop( - args, - robot: SingleArmInterface, - ocp, + ocp: CrocoOCP, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, - _: int, - __: dict[str, deque[np.ndarray]], -): + args: Namespace, + robot: SingleArmWholeBodyInterface, + t: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ CrocoP2PEEAndBaseMPCControlLoop --------------------- """ - breakFlag = False - log_item = {} - save_past_dict = {} - - ee_SEerror = robot.T_w_e.actInv(T_w_eegoal) - ee_err_vector = pin.log6(ee_SEerror).vector - ee_err_vector_norm = np.linalg.norm(ee_err_vector) - # TODO: make this a property of mobilebaseinterface - p_base = np.array(list(robot.q[:2]) + [0.0]) - base_err_vector_norm = np.linalg.norm(p_basegoal - p_base) - if (ee_err_vector_norm < args.goal_error) and ( - base_err_vector_norm < args.goal_error - ): - breakFlag = True - # set initial state from sensor x0 = np.concatenate([robot.q, robot.v]) ocp.warmstartAndReSolve(x0) xs = np.array(ocp.solver.xs) # NOTE: for some reason the first value is always some wild bs - vel_cmds = xs[1, robot.model.nq :] - robot.sendVelocityCommand(vel_cmds) - - log_item["qs"] = x0[: robot.model.nq] - log_item["dqs"] = x0[robot.model.nq :] - log_item["ee_err_norm"] = ee_err_vector_norm.reshape((1,)) - log_item["base_err_norm"] = base_err_vector_norm.reshape((1,)) - - return breakFlag, save_past_dict, log_item + vel_cmd = xs[1, robot.model.nq :] + return vel_cmd, {}, {} -def EEAndBaseP2PMPC(args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, run=True): +def CrocoEEAndBaseP2PMPC( + args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, run=True +): """ IKMPC ----- @@ -140,13 +187,18 @@ def EEAndBaseP2PMPC(args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, ru ocp.solveInitialOCP(x0) controlLoop = partial( - CrocoP2PEEAndBaseMPCControlLoop, args, robot, ocp, T_w_eegoal, p_basegoal + EEAndBaseP2PCtrlLoopTemplate, + ocp, + T_w_eegoal, + p_basegoal, + CrocoP2PEEAndBaseMPCControlLoop, + args, + robot, ) log_item = { - "qs": np.zeros(robot.model.nq), - "dqs": np.zeros(robot.model.nv), - # "dqs_cmd": np.zeros(robot.model.nv), # we're assuming full actuation here - # "u_tau": np.zeros(robot.model.nv), + "qs": np.zeros(robot.nq), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), "ee_err_norm": np.zeros(1), "base_err_norm": np.zeros(1), } @@ -158,3 +210,78 @@ def EEAndBaseP2PMPC(args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, ru loop_manager.run() else: return loop_manager + + +def CrocoP2PDualEEAndBaseMPCControlLoop( + ocp, + T_w_abseegoal: pin.SE3, + T_absgoal_lgoal: pin.SE3, + T_absgoal_rgoal: pin.SE3, + p_basegoal: np.ndarray, + args: Namespace, + robot: DualArmWholeBodyInterface, + t: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: + """ + CrocoP2PEEAndBaseMPCControlLoop + --------------------- + """ + x0 = np.concatenate([robot.q, robot.v]) + ocp.warmstartAndReSolve(x0) + xs = np.array(ocp.solver.xs) + # NOTE: for some reason the first value is always some wild bs + v_cmd = xs[1, robot.model.nq :] + + return v_cmd, {}, {} + + +def CrocoDualEEAndBaseP2PMPC( + args: Namespace, + robot: DualArmWholeBodyInterface, + T_w_abseegoal: pin.SE3, + T_absgoal_lgoal: pin.SE3, + T_absgoal_rgoal: pin.SE3, + p_basegoal: np.ndarray, + run=True, +) -> None | ControlLoopManager: + """ + IKMPC + ----- + run mpc for a point-to-point inverse kinematics. + note that the actual problem is solved on + a dynamics level, and velocities we command + are actually extracted from the state x(q,dq) + """ + x0 = np.concatenate([robot.q, robot.v]) + goal = (T_w_abseegoal, p_basegoal) + ocp = BaseAndDualArmIKOCP(args, robot, x0, goal) + ocp.solveInitialOCP(x0) + + controlLoop = partial( + DualEEAndBaseP2PCtrlLoopTemplate, + ocp, + T_w_abseegoal, + T_absgoal_lgoal, + T_absgoal_rgoal, + p_basegoal, + CrocoP2PDualEEAndBaseMPCControlLoop, + args, + robot, + ) + log_item = { + "qs": np.zeros(robot.nq), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), + "l_err_norm": np.zeros(1), + "r_err_norm": np.zeros(1), + "base_err_norm": np.zeros(1), + } + save_past_dict = {} + loop_manager = ControlLoopManager( + robot, controlLoop, args, save_past_dict, log_item + ) + if run: + loop_manager.run() + else: + return loop_manager diff --git a/python/smc/control/optimal_control/point_to_point_croco_ocp.py b/python/smc/control/optimal_control/point_to_point_croco_ocp.py index d532e77ef872fb225300f25a7a72f3af63a53637..707c0a137b5b4744da25a4a82dc072ab53061d1f 100644 --- a/python/smc/control/optimal_control/point_to_point_croco_ocp.py +++ b/python/smc/control/optimal_control/point_to_point_croco_ocp.py @@ -22,7 +22,7 @@ class SingleArmIKOCP(CrocoOCP): T_w_goal = goal framePlacementResidual = crocoddyl.ResidualModelFramePlacement( self.state, - self.robot.model.getFrameId("tool0"), + self.robot.ee_frame_id, T_w_goal.copy(), self.state.nv, ) @@ -31,7 +31,7 @@ class SingleArmIKOCP(CrocoOCP): ) frameVelocityResidual = crocoddyl.ResidualModelFrameVelocity( self.state, - self.robot.model.getFrameId("tool0"), + self.robot.ee_frame_id, pin.Motion(np.zeros(6)), pin.ReferenceFrame.WORLD, ) @@ -160,7 +160,7 @@ class BaseAndSingleArmIKOCP(SingleArmIKOCP): # self, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray self, goal, - ): + ) -> None: T_w_eegoal, p_basegoal = goal super().constructTaskObjectiveFunction(T_w_eegoal) for i in range(self.args.n_knots): @@ -196,3 +196,57 @@ class BaseAndSingleArmIKOCP(SingleArmIKOCP): self.solver.problem.terminalModel.differential.costs.costs[ "base_translation" + str(self.args.n_knots) ].cost.residual.reference = p_basegoal + +class BaseAndDualArmIKOCP(DualArmIKOCP): + def __init__( + self, + args: Namespace, + robot: DualArmWholeBodyInterface, + x0: np.ndarray, + goal, + # T_w_eegoal: pin.SE3, + # p_basegoal: np.ndarray, + ): + # T_w_eegoal, p_basegoal = goal + super().__init__(args, robot, x0, goal) + + def constructTaskObjectiveFunction( + # self, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray + self, + goal, + )-> None: + T_w_lgoal, T_w_rgoal, p_basegoal = goal + super().constructTaskObjectiveFunction((T_w_lgoal, T_w_rgoal)) + for i in range(self.args.n_knots): + baseTranslationResidual = crocoddyl.ResidualModelFrameTranslation( + self.state, self.robot.base_frame_id, p_basegoal, self.state.nv + ) + baseTrackingCost = crocoddyl.CostModelResidual( + self.state, baseTranslationResidual + ) + self.runningCostModels[i].addCost( + "base_translation" + str(i), + baseTrackingCost, + self.args.base_translation_cost, + ) + self.terminalCostModel.addCost( + "base_translation" + str(self.args.n_knots), + baseTrackingCost, + self.args.base_translation_cost, + ) + + # there is nothing to update in a point-to-point task + def updateCosts(self, data) -> None: + pass + + def updateGoalInModels(self, goal) -> None: + # self, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray) -> None: + T_w_lgoal, T_w_rgoal, p_basegoal = goal + super().updateGoalInModels(T_w_lgoal, T_w_rgoal) + for i, runningModel in enumerate(self.solver.problem.runningModels): + runningModel.differential.costs.costs[ + "base_translation" + str(i) + ].cost.residual.reference = p_basegoal + self.solver.problem.terminalModel.differential.costs.costs[ + "base_translation" + str(self.args.n_knots) + ].cost.residual.reference = p_basegoal