diff --git a/development_plan.toml b/development_plan.toml index c6cd1e0863d44bba8d34d535f2628f584e351d12..f21add641660e643f3487d89b27f6a9a2cb7bd0d 100644 --- a/development_plan.toml +++ b/development_plan.toml @@ -182,6 +182,31 @@ purpose = """making it easy, simple, and error-abstinent to only navigate the ba hours_required = 4 is_done = true +[[project_plan.action_items.action_items]] +name = "explicit relationship between controlLoop and ControlLoopManager" +description = """ +the goal is that they can access each others private, i.e. local variables. could be that controlLoopManager has an controlLoop as an abstractmethod. this is probably the easiest way to go about it. alternatively it could be some more explicit composition than it is now. +UPDATE! YOU DON'T WANT OOP, YOU WANT TO DO THIS WITH FUNCTIONAL PROGRAMMING AND COMPOSITION. THIS IS WHAT YOU HAD SO FAR, JUST EXPANDED TO THE NEW CONDITIONS! IT IS THE ONLY SOLUTION THAT MAKES PERFECT SENSE IN THIS CONTEXT!!!! THE ONLY WAY TO FULLFILL ALL THE REQUIREMENTS AND THE ONLY WAY TO CONTINUE WITH THE CURRENT STYLE WHICH MAKES PERFECT SENSE AND JUST NEEDS TO SUPPORT MORE FEATURES!!! + +this should get you the following things: +1) you can decouple visualization commands specific to this control loop from the gloryous math +2) if there's some oop somewhere in there, you can create control-type based classes, which basically store away repetitive code, like data parsing after receiving something from multiprocessor (could even be in new optional overridable method "processinput") +3) you made the relationship between controlloopmanager, a controlloop and the function you can all to instantiate and execute everything. there is no cost to calcifying this relationship, while you wrote the perfect documentation - code. +4) the what's being logged is enforced... +... + +what this buys is that you can remove log_item management from ... + +you can also make an argument for why the current setup is optimal. i mean everything works perfectly fine, there is just a bunch of uncessary code around. +""" +priority = 2 +deadline = 2025-01-31 +dependencies = ["robotmanager abstract class"] +purpose = """actual process management beyond ducktaping plotter and visualizer to loopmanager and robotmanager""" +hours_required = 5 +is_done = false + + [[project_plan.action_items.action_items]] name = "actual process management" diff --git a/examples/cart_pulling/base_and_ee_path_following_from_planner.py b/examples/cart_pulling/base_and_ee_path_following_from_planner.py index 06f2f29d8c7388b5f71f13b4a21cd583fb7e39ec..54c7dfbc2a1427f7eb960c33437eb3584a30eb80 100644 --- a/examples/cart_pulling/base_and_ee_path_following_from_planner.py +++ b/examples/cart_pulling/base_and_ee_path_following_from_planner.py @@ -5,7 +5,7 @@ from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3_fixed from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs from smc.path_generation.planner import starPlanner, getPlanningArgs -from smc.control.optimal_control.croco_mpc_point_to_point import CrocoIKMPC +from smc.control.optimal_control.croco_mpc_point_to_point import EEAndBaseP2PMPC from smc.control.optimal_control.croco_mpc_path_following import ( BaseAndEEPathFollowingMPC, ) @@ -31,7 +31,7 @@ def get_args(): parser.add_argument( "--base-to-handlebar-preferred-distance", type=float, - default=0.7, + default=0.5, help="prefered path arclength from mobile base position to handlebar", ) args = parser.parse_args() @@ -80,7 +80,12 @@ if __name__ == "__main__": robot.visualizer_manager.sendCommand({"Mgoal": pathSE3[0]}) if np.linalg.norm(pin.log6(T_w_e.actInv(pathSE3[0]))) > 1e-2: print("going to initial path position") - CrocoIKMPC(args, robot, pathSE3[0]) + p_base = pathSE3[0].translation.copy() + p_base[0] -= args.base_to_handlebar_preferred_distance + p_base[2] = 0.0 + print(pathSE3[0].translation) + print(p_base) + EEAndBaseP2PMPC(args, robot, pathSE3[0], p_base) print("initialized!") BaseAndEEPathFollowingMPC(args, robot, path_planner) diff --git a/examples/cart_pulling/cart_pulling.py b/examples/cart_pulling/cart_pulling.py index 896a3a33c1d6b3a002a5ef7531b200c572a3aaf7..d54a4bffeb7396a2cd765f7bd7ff07df7bd77489 100644 --- a/examples/cart_pulling/cart_pulling.py +++ b/examples/cart_pulling/cart_pulling.py @@ -114,7 +114,7 @@ def cartPullingControlLoop( if (priority_int < 4) and (priority_int >= 2): if usempc: ocp_grasp.updateGoalInModels(grasp_pose_left, grasp_pose_right): - CrocoIKMPCControlLoop(args, robot, ocp_grasp, i, past_data) + CrocoEEP2PMPCControlLoop(args, robot, ocp_grasp, i, past_data) else: # controlLoopClik(robot, invKinmQP, i, past_data) clikController = partial(dampedPseudoinverse, 1e-3) diff --git a/examples/cart_pulling/ee_only_path_following_from_plannner.py b/examples/cart_pulling/ee_only_path_following_from_plannner.py index d7aea034e81b18a48e7681c8eee405bfe64156aa..fcd16af1ed24fb60e5cfdf3e73c230240ef4d209 100644 --- a/examples/cart_pulling/ee_only_path_following_from_plannner.py +++ b/examples/cart_pulling/ee_only_path_following_from_plannner.py @@ -5,7 +5,7 @@ from smc.path_generation.path_math.path2d_to_6d import path2D_to_SE3_fixed from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs from smc.path_generation.planner import starPlanner, getPlanningArgs -from smc.control.optimal_control.croco_mpc_point_to_point import CrocoIKMPC +from smc.control.optimal_control.croco_mpc_point_to_point import EEP2PMPC from smc.control.optimal_control.croco_mpc_path_following import ( CrocoEEPathFollowingMPC, ) @@ -71,7 +71,7 @@ if __name__ == "__main__": robot.visualizer_manager.sendCommand({"Mgoal": pathSE3[0]}) if np.linalg.norm(pin.log6(T_w_e.actInv(pathSE3[0]))) > 1e-2: print("going to initial path position") - CrocoIKMPC(args, robot, pathSE3[0]) + EEP2PMPC(args, robot, pathSE3[0]) print("initialized!") CrocoEEPathFollowingMPC(args, robot, x0, path_planner) diff --git a/examples/log_analysis/log_analysis_example.py b/examples/log_analysis/log_analysis_example.py index ea531f16c09e8f21aaabd02bae94ca18669bded9..1e68c40d76c07e41fa300cb39d22871721239c63 100644 --- a/examples/log_analysis/log_analysis_example.py +++ b/examples/log_analysis/log_analysis_example.py @@ -1,6 +1,3 @@ -import pinocchio as pin -import numpy as np -import argparse from smc.logging.logger import Logger from smc.visualization.manipulator_comparison_visualizer import ( getLogComparisonArgs, diff --git a/examples/optimal_control/croco_ik_mpc.py b/examples/optimal_control/croco_ik_mpc.py index 5fc1ff99b65dc8efe9ab17237b92bbad79e67607..d2c23286211435fd8c59725ba8a2bef1f84bf036 100644 --- a/examples/optimal_control/croco_ik_mpc.py +++ b/examples/optimal_control/croco_ik_mpc.py @@ -1,7 +1,7 @@ import numpy as np from smc import getMinimalArgParser, getRobotFromArgs from smc.control.optimal_control.croco_mpc_point_to_point import ( - CrocoIKMPC, + EEP2PMPC, ) from smc.control.optimal_control.util import get_OCP_args from smc.control.cartesian_space import getClikArgs @@ -36,7 +36,7 @@ if __name__ == "__main__": # TODO: document this somewhere robot.visualizer_manager.sendCommand({"Mgoal": T_w_goal}) - CrocoIKMPC(args, robot, T_w_goal) + EEP2PMPC(args, robot, T_w_goal) print("final position:") print(robot.T_w_e) diff --git a/python/smc/control/control_loop_manager.py b/python/smc/control/control_loop_manager.py index 47fc612eb8aa89277ededead202ad800336745c2..bf2c31a8359452353b672226c175ac2852e3327e 100644 --- a/python/smc/control/control_loop_manager.py +++ b/python/smc/control/control_loop_manager.py @@ -1,3 +1,4 @@ +from argparse import Namespace from smc.robots.robotmanager_abstract import AbstractRobotManager from smc.multiprocessing.process_manager import ProcessManager from smc.visualization.plotters import realTimePlotter @@ -64,16 +65,16 @@ class ControlLoopManager: ): signal.signal(signal.SIGINT, self.stopHandler) self.pid = getpid() - self.max_iterations = args.max_iterations - self.robot_manager = robot_manager - self.controlLoop = controlLoop + self.max_iterations: int = args.max_iterations + self.robot_manager: AbstractRobotManager = robot_manager + self.controlLoop = controlLoop # TODO: declare partial function type self.final_iteration = -1 # because we didn't even start yet - self.args = args - self.iter_n = 0 - self.past_data = {} - # NOTE: viz update rate is a magic number that seems to work fine and i don't have + self.args: Namespace = args + self.iter_n: int = 0 + self.past_data: dict[str, deque[np.ndarray]] = {} + # NOTE: viz update rate is a magic number that seems to work fine and i don't have # any plans to make it smarter - self.viz_update_rate = int(np.ceil(self.args.ctrl_freq / 25)) + self.viz_update_rate: int = int(np.ceil(self.args.ctrl_freq / 25)) # save_past_dict has to have the key and 1 example of what you're saving # so that it's type can be inferred (but we're in python so types don't really work). # the good thing is this way you also immediatelly put in the initial values @@ -89,12 +90,14 @@ class ControlLoopManager: # except this is not used in the control loop, # we don't predeclare sizes, but instead # just shove items into linked lists (python lists) in dictionaries (hash-maps) - self.log_dict = {} + self.log_dict: dict[str, list[np.ndarray]] = {} for key in log_item: self.log_dict[key] = [] if self.args.plotter: - self.plotter_manager = ProcessManager(args, realTimePlotter, log_item, 0) + self.plotter_manager: ProcessManager = ProcessManager( + args, realTimePlotter, log_item, 0 + ) def run_one_iter(self, i): """ @@ -160,7 +163,7 @@ class ControlLoopManager: # self.robot_manager.visualizer_manager.sendCommand({"T_base" : T_base}) if self.args.visualize_collision_approximation: - pass + raise NotADirectoryError # TODO: here call robot manager's update ellipses function if self.args.plotter: diff --git a/python/smc/control/optimal_control/croco_mpc_path_following.py b/python/smc/control/optimal_control/croco_mpc_path_following.py index eac9b86877b47847190d09ed190b7eef73586745..57299aafdb0b103f14df354e90d1bbc98ad12b9b 100644 --- a/python/smc/control/optimal_control/croco_mpc_path_following.py +++ b/python/smc/control/optimal_control/croco_mpc_path_following.py @@ -10,12 +10,17 @@ from smc.control.optimal_control.path_following_croco_ocp import ( from smc.path_generation.path_math.path2d_to_6d import ( path2D_to_SE3_fixed, ) +from smc.path_generation.path_math.cart_pulling_path_math import ( + time_base_path, + construct_EE_path, +) from smc.path_generation.path_math.path_to_trajectory import path2D_timed import numpy as np from functools import partial import types from argparse import Namespace +from pinocchio import SE3, log6 def CrocoEEPathFollowingMPCControlLoop( @@ -145,7 +150,6 @@ def CrocoEEPathFollowingMPC( """ ocp = CrocoEEPathFollowingOCP(args, robot, x0) - # technically should be done in controlloop because now # it's solved 2 times before the first command, # but we don't have time for details rn @@ -209,6 +213,10 @@ def BaseAndEEPathFollowingMPCControlLoop( robot.visualizer_manager.sendCommand({"path": path_base}) robot.visualizer_manager.sendCommand({"frame_path": path_EE}) + err_vector_ee = T_w_e.actInv(path_EE[0]) + err_vector_base = np.linalg.norm(q[:2] - path_base[0][:2]) # z axis is irrelevant + log_item["err_norm_ee"] = np.linalg.norm(err_vector_ee).reshape((1,)) + log_item["err_norm_base"] = np.linalg.norm(err_vector_base).reshape((1,)) log_item["qs"] = q.reshape((robot.model.nq,)) log_item["dqs"] = robot.v.reshape((robot.model.nv,)) return breakFlag, save_past_dict, log_item @@ -261,61 +269,19 @@ def BaseAndEEPathFollowingFromPlannerMPCControlLoop( # construct timed 2D path for the base # ########################################## _, path2D_untimed_base = data - path2D_untimed_base = np.array(path2D_untimed_base).reshape((-1, 2)) - # ideally should be precomputed somewhere - max_base_v = np.linalg.norm(robot.model.velocityLimit[:2]) - # base just needs timing on the path - path_base = path2D_timed(args, path2D_untimed_base, max_base_v) - # and it's of height 0 (i.e. the height of base's planar joint) - path_base = np.hstack((path_base, np.zeros((len(path_base), 1)))) - - ################################################### - # construct timed SE3 path for the end-effector # - ################################################### - # this works as follow - # 1) find the previous path point of arclength base_to_handlebar_preferred_distance. - # first part of the path is from there to current base position, - # second is just the current base's plan. - # 2) construct timing on the first part. - # 3) join that with the already timed second part. - # 4) turn the timed 2D path into an SE3 trajectory - - # NOTE: this can be O(1) instead of O(n) but i can't be bothered - path_arclength = np.linalg.norm(p - past_data["path2D_untimed"]) - handlebar_path_index = -1 - for i in range(-2, -1 * len(past_data["path2D_untimed"]), -1): - if path_arclength > args.base_to_handlebar_preferred_distance: - handlebar_path_index = i - break - path_arclength += np.linalg.norm( - past_data["path2D_untimed"][i - 1] - past_data["path2D_untimed"][i] - ) - # i shouldn't need to copy-paste everything but what can you do - path2D_handlebar_1_untimed = np.array(past_data["path2D_untimed"]) - # NOTE: BIG ASSUMPTION - # let's say we're computing on time, and that's the time spacing - # of previous path points. - # this means you need to lower the control frequency argument - # if you're not meeting deadlines. - # if you really need timing information, you should actually - # get it from ControlLoopManager instead of i, - # but let's say this is better because you're forced to know - # how fast you are instead of ducktaping around delays. - # TODO: actually save timing, pass t instead of i to controlLoops - # from controlLoopManager - # NOTE: this might not working when rosified so check that first - time_past = np.linspace( - 0.0, args.past_window_size * robot.dt, args.past_window_size - ) - s = np.linspace(0.0, args.n_knots * args.ocp_dt, args.n_knots) - path2D_handlebar_1 = np.hstack( - ( - np.interp(s, time_past, path2D_handlebar_1_untimed[:, 0]).reshape((-1, 1)), - np.interp(s, time_past, path2D_handlebar_1_untimed[:, 1]).reshape((-1, 1)), - ) + path_base = time_base_path( + args, path2D_untimed_base, np.linalg.norm(robot._max_v[:2]) ) - pathSE3_handlebar = path2D_to_SE3_fixed(path2D_handlebar_1, args.handlebar_height) + pathSE3_handlebar = construct_EE_path(args, p, past_data["path2D_untimed"]) + # TODO: remove + # SETTING ROTATION IS BROKEN ATM + # BUT SETTING DISTANCES (TRANSLATIONS) IS TOO. + # WE DEAL WITH DISTANCES FIRST, UNTIL THAT'S DONE THIS STAYS HERE + for ppp in pathSE3_handlebar: + ppp.rotation = T_w_e.rotation + ########################################### + if args.visualizer: if t % int(np.ceil(args.ctrl_freq / 25)) == 0: robot.visualizer_manager.sendCommand({"path": path_base}) @@ -329,18 +295,43 @@ def BaseAndEEPathFollowingFromPlannerMPCControlLoop( robot.sendVelocityCommand(vel_cmds) + err_vector_ee = log6(T_w_e.actInv(pathSE3_handlebar[0])) + err_vector_base = np.linalg.norm(q[:2] - path_base[0][:2]) # z axis is irrelevant + log_item["err_vec_ee"] = err_vector_ee + log_item["err_norm_ee"] = np.linalg.norm(err_vector_ee).reshape((1,)) + log_item["err_norm_base"] = np.linalg.norm(err_vector_base).reshape((1,)) log_item["qs"] = q.reshape((robot.model.nq,)) - log_item["dqs"] = robot.v.reshape((robot.model.nv,)) + #log_item["dqs"] = robot.v.reshape((robot.model.nv,)) return breakFlag, save_past_dict, log_item +def initializePastData(args: Namespace, T_w_e: SE3, p_base: np.ndarray) -> np.ndarray: + # prepopulate past data to make base and cart be on the same path in the past + # (which didn't actually happen because this just started) + p_ee = T_w_e.translation[:2] + straight_line_path = np.linspace(p_ee, p_base, args.past_window_size) + # time it the same way the base path is timed + time_past = np.linspace( + 0.0, args.past_window_size * (1 / args.ctrl_freq), args.past_window_size + ) + s = np.linspace(0.0, args.n_knots * args.ocp_dt, args.past_window_size) + path2D_handlebar = np.hstack( + ( + np.interp(s, time_past, straight_line_path[:, 0]).reshape((-1, 1)), + np.interp(s, time_past, straight_line_path[:, 1]).reshape((-1, 1)), + ) + ) + #return path2D_handlebar + return straight_line_path + + def BaseAndEEPathFollowingMPC( args: Namespace, robot: SingleArmInterface, path_planner: ProcessManager | types.FunctionType, ): """ - CrocoEndEffectorPathFollowingMPC + BaseAndEEPathFollowingMPC ----- run mpc for a point-to-point inverse kinematics. note that the actual problem is solved on @@ -348,27 +339,12 @@ def BaseAndEEPathFollowingMPC( are actually extracted from the state x(q,dq). """ + T_w_e = robot.T_w_e x0 = np.concatenate([robot.q, robot.v]) ocp = BaseAndEEPathFollowingOCP(args, robot, x0) ocp.solveInitialOCP(x0) - # prepopulate past data to make base and cart be on the same path in the past - # (which didn't actually happen because this just started) - T_w_e = robot.T_w_e - p_cart = robot.q[:2] - p_ee = T_w_e.translation[:2] - straigh_line_path = np.linspace(p_ee, p_cart, args.past_window_size) - # time it the same way the base path is timed - time_past = np.linspace( - 0.0, args.past_window_size * robot.dt, args.past_window_size - ) - s = np.linspace(0.0, args.n_knots * args.ocp_dt, args.past_window_size) - path2D_handlebar = np.hstack( - ( - np.interp(s, time_past, straigh_line_path[:, 0]).reshape((-1, 1)), - np.interp(s, time_past, straigh_line_path[:, 1]).reshape((-1, 1)), - ) - ) + path2D_handlebar = initializePastData(args, T_w_e, robot.q[:2]) if type(path_planner) == types.FunctionType: controlLoop = partial( @@ -382,12 +358,19 @@ def BaseAndEEPathFollowingMPC( ocp, path_planner, ) - log_item = {"qs": np.zeros(robot.model.nq), "dqs": np.zeros(robot.model.nv)} + log_item = { + "qs": np.zeros(robot.model.nq), + # "dqs": np.zeros(robot.model.nv), + "err_vec_ee": np.zeros((6,)), + "err_norm_ee": np.zeros((1,)), + "err_norm_base": np.zeros((1,)), + } save_past_dict = {"path2D_untimed": T_w_e.translation[:2]} loop_manager = ControlLoopManager( robot, controlLoop, args, save_past_dict, log_item ) + # actually put past data into the past window loop_manager.past_data["path2D_untimed"].clear() loop_manager.past_data["path2D_untimed"].extend( path2D_handlebar[i] for i in range(args.past_window_size) @@ -453,7 +436,7 @@ def BaseAndDualArmEEPathFollowingMPCControlLoop( _, path2D_untimed_base = data path2D_untimed_base = np.array(path2D_untimed_base).reshape((-1, 2)) # ideally should be precomputed somewhere - max_base_v = np.linalg.norm(robot.model.velocityLimit[:2]) + max_base_v = np.linalg.norm(robot._max_v[:2]) # base just needs timing on the path path_base = path2D_timed(args, path2D_untimed_base, max_base_v) # and it's of height 0 (i.e. the height of base's planar joint) @@ -482,6 +465,8 @@ def BaseAndDualArmEEPathFollowingMPCControlLoop( ) # i shouldn't need to copy-paste everything but what can you do path2D_handlebar_1_untimed = np.array(past_data["path2D_untimed"]) + # TODO: try this + # path2D_handlebar_1_untimed = np.array(past_data["path2D_untimed"][handlebar_path_index:]) # NOTE: BIG ASSUMPTION # let's say we're computing on time, and that's the time spacing # of previous path points. diff --git a/python/smc/control/optimal_control/croco_mpc_point_to_point.py b/python/smc/control/optimal_control/croco_mpc_point_to_point.py index d391f5fde4f589db972ae7bfb55ee18fed293a49..3f8ce0e7af11f2e875f34c49c0297c49208b6495 100644 --- a/python/smc/control/optimal_control/croco_mpc_point_to_point.py +++ b/python/smc/control/optimal_control/croco_mpc_point_to_point.py @@ -1,13 +1,16 @@ from smc.robots.interfaces.single_arm_interface import SingleArmInterface from smc.control.control_loop_manager import ControlLoopManager -from smc.control.optimal_control.point_to_point_croco_ocp import SingleArmIKOCP +from smc.control.optimal_control.point_to_point_croco_ocp import ( + SingleArmIKOCP, + BaseAndSingleArmIKOCP, +) import pinocchio as pin import numpy as np from functools import partial -def CrocoIKMPCControlLoop(args, robot: SingleArmInterface, ocp, T_w_goal, _, __): +def CrocoEEP2PMPCControlLoop(args, robot: SingleArmInterface, ocp, T_w_goal, _, __): """ CrocoIKMPCControlLoop --------------------- @@ -19,7 +22,6 @@ def CrocoIKMPCControlLoop(args, robot: SingleArmInterface, ocp, T_w_goal, _, __) SEerror = robot.T_w_e.actInv(T_w_goal) err_vector = pin.log6(SEerror).vector if np.linalg.norm(err_vector) < robot.args.goal_error: - # print("Convergence achieved, reached destionation!") breakFlag = True # set initial state from sensor @@ -40,7 +42,7 @@ def CrocoIKMPCControlLoop(args, robot: SingleArmInterface, ocp, T_w_goal, _, __) return breakFlag, save_past_dict, log_item -def CrocoIKMPC(args, robot, T_w_goal, run=True): +def EEP2PMPC(args, robot, T_w_goal, run=True): """ IKMPC ----- @@ -53,7 +55,7 @@ def CrocoIKMPC(args, robot, T_w_goal, run=True): ocp = SingleArmIKOCP(args, robot, x0, T_w_goal) ocp.solveInitialOCP(x0) - controlLoop = partial(CrocoIKMPCControlLoop, args, robot, ocp, T_w_goal) + controlLoop = partial(CrocoEEP2PMPCControlLoop, args, robot, ocp, T_w_goal) log_item = { "qs": np.zeros(robot.model.nq), "dqs": np.zeros(robot.model.nv), @@ -69,3 +71,82 @@ def CrocoIKMPC(args, robot, T_w_goal, run=True): loop_manager.run() else: return loop_manager + + +def CrocoP2PEEAndBaseMPCControlLoop( + args, + robot: SingleArmInterface, + ocp, + T_w_eegoal: pin.SE3, + p_basegoal: np.ndarray, + _, + __, +): + """ + CrocoP2PEEAndBaseMPCControlLoop + --------------------- + """ + breakFlag = False + log_item = {} + save_past_dict = {} + + ee_SEerror = robot.T_w_e.actInv(T_w_eegoal) + ee_err_vector = pin.log6(ee_SEerror).vector + ee_err_vector_norm = np.linalg.norm(ee_err_vector) + # TODO: make this a property of mobilebaseinterface + p_base = np.array(list(robot.q[:2]) + [0.0]) + base_err_vector_norm = np.linalg.norm(p_basegoal - p_base) + if (ee_err_vector_norm < args.goal_error) and ( + base_err_vector_norm < args.goal_error + ): + breakFlag = True + + # set initial state from sensor + x0 = np.concatenate([robot.q, robot.v]) + ocp.warmstartAndReSolve(x0) + xs = np.array(ocp.solver.xs) + # NOTE: for some reason the first value is always some wild bs + vel_cmds = xs[1, robot.model.nq :] + robot.sendVelocityCommand(vel_cmds) + + log_item["qs"] = x0[: robot.model.nq] + log_item["dqs"] = x0[robot.model.nq :] + log_item["ee_err_norm"] = ee_err_vector_norm.reshape((1,)) + log_item["base_err_norm"] = base_err_vector_norm.reshape((1,)) + + return breakFlag, save_past_dict, log_item + + +def EEAndBaseP2PMPC(args, robot, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray, run=True): + """ + IKMPC + ----- + run mpc for a point-to-point inverse kinematics. + note that the actual problem is solved on + a dynamics level, and velocities we command + are actually extracted from the state x(q,dq) + """ + x0 = np.concatenate([robot.q, robot.v]) + goal = (T_w_eegoal, p_basegoal) + ocp = BaseAndSingleArmIKOCP(args, robot, x0, goal) + ocp.solveInitialOCP(x0) + + controlLoop = partial( + CrocoP2PEEAndBaseMPCControlLoop, args, robot, ocp, T_w_eegoal, p_basegoal + ) + log_item = { + "qs": np.zeros(robot.model.nq), + "dqs": np.zeros(robot.model.nv), + # "dqs_cmd": np.zeros(robot.model.nv), # we're assuming full actuation here + # "u_tau": np.zeros(robot.model.nv), + "ee_err_norm": np.zeros(1), + "base_err_norm": np.zeros(1), + } + save_past_dict = {} + loop_manager = ControlLoopManager( + robot, controlLoop, args, save_past_dict, log_item + ) + if run: + loop_manager.run() + else: + return loop_manager diff --git a/python/smc/control/optimal_control/point_to_point_croco_ocp.py b/python/smc/control/optimal_control/point_to_point_croco_ocp.py index f771c705307d67ed35ef03f41cf6785b4c760fc2..d532e77ef872fb225300f25a7a72f3af63a53637 100644 --- a/python/smc/control/optimal_control/point_to_point_croco_ocp.py +++ b/python/smc/control/optimal_control/point_to_point_croco_ocp.py @@ -18,17 +18,10 @@ class SingleArmIKOCP(CrocoOCP): ): super().__init__(args, robot, x0, T_w_goal) - def constructTaskObjectiveFunction(self, goal): + def constructTaskObjectiveFunction(self, goal) -> None: T_w_goal = goal - # cost 3) distance to goal -> SE(3) error - # TODO: make this follow a path. - # to do so, you need to implement a residualmodel for that in crocoddyl. - # there's an example of exending crocoddyl in colmpc repo - # (look at that to see how to compile, make python bindings etc) framePlacementResidual = crocoddyl.ResidualModelFramePlacement( self.state, - # TODO: check if this is the frame we actually want (ee tip) - # the id is an integer and that's what api wants self.robot.model.getFrameId("tool0"), T_w_goal.copy(), self.state.nv, @@ -36,8 +29,6 @@ class SingleArmIKOCP(CrocoOCP): goalTrackingCost = crocoddyl.CostModelResidual( self.state, framePlacementResidual ) - # cost 4) final ee velocity is 0 (can't directly formulate joint velocity, - # because that's a part of the state, and we don't know final joint positions) frameVelocityResidual = crocoddyl.ResidualModelFrameVelocity( self.state, self.robot.model.getFrameId("tool0"), @@ -60,7 +51,8 @@ class SingleArmIKOCP(CrocoOCP): def updateCosts(self, data): pass - def updateGoalInModels(self, T_w_goal): + def updateGoalInModels(self, goal) -> None: + T_w_goal = goal for i, runningModel in enumerate(self.solver.problem.runningModels): runningModel.differential.costs.costs[ "gripperPose" + str(i) @@ -76,7 +68,7 @@ class DualArmIKOCP(CrocoOCP): ): super().__init__(args, robot, x0, goal) - def constructTaskObjectiveFunction(self, goal): + def constructTaskObjectiveFunction(self, goal) -> None: T_w_lgoal, T_w_rgoal = goal framePlacementResidual_l = crocoddyl.ResidualModelFramePlacement( self.state, @@ -134,7 +126,7 @@ class DualArmIKOCP(CrocoOCP): def updateCosts(self, data): pass - def updateGoalInModels(self, T_w_lgoal, T_w_rgoal): + def updateGoalInModels(self, T_w_lgoal: pin.SE3, T_w_rgoal: pin.SE3) -> None: for i, runningModel in enumerate(self.solver.problem.runningModels): runningModel.differential.costs.costs[ "gripperPose_l" + str(i) @@ -149,3 +141,58 @@ class DualArmIKOCP(CrocoOCP): self.solver.problem.terminalModel.differential.costs.costs[ "gripperPose_r" + str(self.args.n_knots) ].cost.residual.reference = T_w_rgoal + + +class BaseAndSingleArmIKOCP(SingleArmIKOCP): + def __init__( + self, + args: Namespace, + robot: SingleArmInterface, + x0: np.ndarray, + goal, + # T_w_eegoal: pin.SE3, + # p_basegoal: np.ndarray, + ): + # T_w_eegoal, p_basegoal = goal + super().__init__(args, robot, x0, goal) + + def constructTaskObjectiveFunction( + # self, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray + self, + goal, + ): + T_w_eegoal, p_basegoal = goal + super().constructTaskObjectiveFunction(T_w_eegoal) + for i in range(self.args.n_knots): + baseTranslationResidual = crocoddyl.ResidualModelFrameTranslation( + self.state, self.robot.base_frame_id, p_basegoal, self.state.nv + ) + baseTrackingCost = crocoddyl.CostModelResidual( + self.state, baseTranslationResidual + ) + self.runningCostModels[i].addCost( + "base_translation" + str(i), + baseTrackingCost, + self.args.base_translation_cost, + ) + self.terminalCostModel.addCost( + "base_translation" + str(self.args.n_knots), + baseTrackingCost, + self.args.base_translation_cost, + ) + + # there is nothing to update in a point-to-point task + def updateCosts(self, data): + pass + + def updateGoalInModels(self, goal) -> None: + # self, T_w_eegoal: pin.SE3, p_basegoal: np.ndarray) -> None: + T_w_eegoal, p_basegoal = goal + super().updateGoalInModels(T_w_eegoal) + for i, runningModel in enumerate(self.solver.problem.runningModels): + runningModel.differential.costs.costs[ + "base_translation" + str(i) + ].cost.residual.reference = p_basegoal + self.solver.problem.terminalModel.differential.costs.costs[ + "base_translation" + str(self.args.n_knots) + ].cost.residual.reference = p_basegoal diff --git a/python/smc/path_generation/path_math/cart_pulling_path_math.py b/python/smc/path_generation/path_math/cart_pulling_path_math.py new file mode 100644 index 0000000000000000000000000000000000000000..bedff3a280b5637668c749a5ebd910ec48ea2ce5 --- /dev/null +++ b/python/smc/path_generation/path_math/cart_pulling_path_math.py @@ -0,0 +1,130 @@ +from smc.path_generation.path_math.path_to_trajectory import path2D_timed +from smc.path_generation.path_math.path2d_to_6d import ( + path2D_to_SE3_fixed, +) + +import numpy as np +import pinocchio as pin +from argparse import Namespace +from collections import deque + + +def getCurrentPositionHandlebarInPastBasePath( + args: Namespace, p_base_current: np.ndarray, past_path2D_untimed: np.ndarray +) -> int: + """ + getCurrentPositionHandlebarInPastBasePath + ------------------------------------------- + 0. paths points are 2D [x,y] coordinates in the world frame + 1. past path goes from oldest entry to newest in indeces + 2. we want a point on the path that's args.base_to_handlebar_preferred_distance in arclength + 3. by minusing the whole path from the current point, we get relative distances of the path + points from the current point + 4. to make life easier, we linearly interpolate between the path point. + 5. we add the lengths of lines between path points until we reach args.base_to_handlebar_preferred_distance + 6. we return the index of the path point to make further math easier + + NOTE: this can be O(1) instead of O(n) but i can't be bothered + (i could save arclengths instead of re-calculating them all the time, but the cost + of the algo as it is now is not high enough to justify this optimization) + """ + arclength = 0.0 + path_relative_distances = np.linalg.norm( + p_base_current - past_path2D_untimed, axis=1 + ) + handlebar_path_index = ( + 0 # index of sought-after point, 0 is the furthest in the past + ) + # go through whole path backwards + for i in range(-2, -1 * len(past_path2D_untimed), -1): + if arclength > args.base_to_handlebar_preferred_distance: + handlebar_path_index = i + return handlebar_path_index + arclength += np.linalg.norm( + path_relative_distances[i - 1] - path_relative_distances[i] + ) + print( + "the size of the past path is too short to find the point of prefered distance on it!" + ) + print("i'll just give you the furtherst point, which is of distance", arclength) + return handlebar_path_index + + +def time_base_path( + args: Namespace, path2D_untimed_base, max_base_v: float +) -> np.ndarray: + # NOTE: this one is obtained as the future path from path planner + path2D_untimed_base = np.array(path2D_untimed_base).reshape((-1, 2)) + # ideally should be precomputed somewhere + path_base = path2D_timed(args, path2D_untimed_base, max_base_v) + # and it's of height 0 (i.e. the height of base's planar joint) + path_base = np.hstack((path_base, np.zeros((len(path_base), 1)))) + return path_base + + +# NOTE: extremely inefficient. +# there should be no copy-pasting of the whole path at every single point in time, +# instead all the computations should be cached. +# ideally you don't even cast the whole past_path into an array, but work on the queue, +# and just do the smallest update. +# this shouldn't be too difficult, it's just annoying. first we fix the logic, +# then we improve efficiency. +def construct_EE_path( + args: Namespace, + p_base_current: np.ndarray, + past_path2D_from_window: deque[np.ndarray], +) -> list[pin.SE3]: + """ + construct_EE_path + ----------------- + - path_past2D - should be this entry in past rolling window, turned into a numpy array and it's length should be args.past_window_size. + just in case, and for potential transferability, we will use len(path_past2D) here + + ################################################### + # construct timed SE3 path for the end-effector # + ################################################### + this works as follow: + 1) find the previous path point of arclength base_to_handlebar_preferred_distance. + first part of the path is from there to current base position, + second is just the current base's plan. + 2) construct timing on the first part. + 3) (optional) join that with the already timed second part. + --> not doing this because paths are super short and this won't ever happen with the current setup + 4) turn the timed 2D path into an SE3 trajectory + + (1) NOTE: BIG ASSUMPTION + let's say we're computing on time, and that's the time spacing + of previous path points. + this means you need to lower the control frequency argument + if you're not meeting deadlines. + if you really need timing information, you should actually + get it from ControlLoopManager instead of i, + but let's say this is better because you're forced to know + how fast you are instead of ducktaping around delays. + TODO: actually save timing, pass t instead of i to controlLoops + from controlLoopManager + NOTE: this might not working when rosified + """ + # i shouldn't need to copy-paste everything but what can you do + past_path2D = np.array(past_path2D_from_window).reshape((-1, 2)) + # step (1) + handlebar_path_index = getCurrentPositionHandlebarInPastBasePath( + args, p_base_current, past_path2D + ) + # cut of the past that's irrelevant + ee_path = past_path2D[handlebar_path_index:] + path_len = len(ee_path) + # step (2) + # look at note (1) + time_past = np.linspace(0.0, path_len * (1 / args.ctrl_freq), path_len) + s = np.linspace(0.0, args.n_knots * args.ocp_dt, args.n_knots) + ee_2Dtrajectory = np.hstack( + ( + np.interp(s, time_past, ee_path[:, 0]).reshape((-1, 1)), + np.interp(s, time_past, ee_path[:, 1]).reshape((-1, 1)), + ) + ) + + # step (4) + ee_SE3trajectory = path2D_to_SE3_fixed(ee_2Dtrajectory, args.handlebar_height) + return ee_SE3trajectory diff --git a/python/smc/path_generation/path_math/path2d_to_6d.py b/python/smc/path_generation/path_math/path2d_to_6d.py index 7687282479a8110a2370d3018a9804ebc24225f5..6b19bb23560b60978170785ec86c55901b50d350 100644 --- a/python/smc/path_generation/path_math/path2d_to_6d.py +++ b/python/smc/path_generation/path_math/path2d_to_6d.py @@ -2,7 +2,7 @@ import pinocchio as pin import numpy as np -def path2D_to_SE3_fixed(path2D: np.ndarray, path_height: float): +def path2D_to_SE3_fixed(path2D: np.ndarray, path_height: float) -> list[pin.SE3]: """ path2D_SE3 ---------- @@ -22,6 +22,8 @@ def path2D_to_SE3_fixed(path2D: np.ndarray, path_height: float): x_diff = x_i_plus_1 - x_i y_diff = y_i_plus_1 - y_i # elementwise arctan2 + # should be y first, then x + # thetas = np.arctan2(y_diff, x_diff) thetas = np.arctan2(x_diff, y_diff) ###################################### @@ -41,7 +43,7 @@ def path2D_to_SE3_fixed(path2D: np.ndarray, path_height: float): ] ) # rotation = pin.rpy.rpyToMatrix(0.0, 0.0, thetas[i]) - # rotation = pin.rpy.rpyToMatrix(np.pi/2, np.pi/2,0.0) @ rotation + # rotation = pin.rpy.rpyToMatrix(np.pi / 2, np.pi / 2, 0.0) @ rotation translation = np.array([path2D[i][0], path2D[i][1], path_height]) pathSE3.append(pin.SE3(rotation, translation)) pathSE3.append(pin.SE3(rotation, translation)) @@ -51,7 +53,7 @@ def path2D_to_SE3_fixed(path2D: np.ndarray, path_height: float): # NOTE: this is a complete hack, but it does not need to be def path2D_to_SE3_with_transition_from_current_pose( path2D: np.ndarray, path_height: float, T_w_current: pin.SE3 -): +) -> list[pin.SE3]: """ path2D_SE3 ----------