From b935f2ace642983631f29a8fe8c1d597d7517582 Mon Sep 17 00:00:00 2001 From: m-guberina <gubi.guberina@gmail.com> Date: Thu, 13 Feb 2025 23:57:41 +0100 Subject: [PATCH] worked on merging different croco ocp's. one abstraction away from looking good and deleting literally 100 lines of nearly identical copy-pasted code --- python/examples/__init__.py | 0 python/examples/crocoddyl_mpc.py | 35 +- python/examples/crocoddyl_ocp_clik.py | 32 +- python/smc/control/__init__.py | 1 + .../smc/control/optimal_control/__init__.py | 4 +- .../control/optimal_control/crocoddyl_mpc.py | 89 +- .../crocoddyl_optimal_control.py | 847 ++++++++---------- .../{get_ocp_args.py => util.py} | 0 python/smc/multiprocessing/__init__.py | 1 + 9 files changed, 423 insertions(+), 586 deletions(-) create mode 100644 python/examples/__init__.py create mode 100644 python/smc/control/__init__.py rename python/smc/control/optimal_control/{get_ocp_args.py => util.py} (100%) create mode 100644 python/smc/multiprocessing/__init__.py diff --git a/python/examples/__init__.py b/python/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/examples/crocoddyl_mpc.py b/python/examples/crocoddyl_mpc.py index 2fc0d60..e6921bb 100644 --- a/python/examples/crocoddyl_mpc.py +++ b/python/examples/crocoddyl_mpc.py @@ -1,21 +1,13 @@ -# PYTHON_ARGCOMPLETE_OK import numpy as np -import time -import argparse -from functools import partial -from smc.managers import getMinimalArgParser, ControlLoopManager, RobotManager -from smc.optimal_control.crocoddyl_optimal_control import ( +from smc import getMinimalArgParser, getRobotFromArgs +from smc.control.optimal_control.crocoddyl_optimal_control import ( createCrocoIKOCP, solveCrocoOCP, ) -from smc.optimal_control.get_ocp_args import get_OCP_args -from smc.optimal_control.crocoddyl_mpc import CrocoIKMPC -from smc.basics.basics import followKinematicJointTrajP -from smc.util.logging_utils import LogManager -from smc.visualize.visualize import plotFromDict -from smc.clik.clik import getClikArgs +from smc.control.optimal_control.util import get_OCP_args +from smc.control.optimal_control.crocoddyl_mpc import CrocoIKMPC +from smc.control.cartesian_space import getClikArgs import pinocchio as pin -import crocoddyl import importlib.util import argcomplete @@ -34,7 +26,7 @@ if __name__ == "__main__": import mim_solvers args = get_args() - robot = RobotManager(args) + robot = getRobotFromArgs(args) # TODO: put this back for nicer demos # Mgoal = robot.defineGoalPointCLI() Mgoal = pin.SE3.Random() @@ -47,7 +39,7 @@ if __name__ == "__main__": Mgoal = (Mgoal_left, Mgoal_right) robot.Mgoal = Mgoal.copy() - if args.visualize_manipulator: + if args.visualizer: # TODO document this somewhere robot.visualizer_manager.sendCommand({"Mgoal": Mgoal}) # create and solve the optimal control problem of @@ -55,7 +47,7 @@ if __name__ == "__main__": # reference is position and velocity reference (as a dictionary), # while solver is a crocoddyl object containing a lot more information # starting state - x0 = np.concatenate([robot.getQ(), robot.getQd()]) + x0 = np.concatenate([robot.q, robot.v]) problem = createCrocoIKOCP(args, robot, x0, Mgoal) # NOTE: this might be useful if we solve with a large time horizon, @@ -79,15 +71,14 @@ if __name__ == "__main__": print("final position:") print(robot.getT_w_e()) - if args.save_log: - robot.log_manager.plotAllControlLoops() - - if not args.pinocchio_only: + if args.real: robot.stopRobot() - if args.visualize_manipulator: + if args.visualizer: robot.killManipulatorVisualizer() if args.save_log: - robot.log_manager.saveLog() + robot._log_manager.saveLog() + robot._log_manager.plotAllControlLoops() + # loop_manager.stopHandler(None, None) diff --git a/python/examples/crocoddyl_ocp_clik.py b/python/examples/crocoddyl_ocp_clik.py index 9ff0822..32bef5c 100644 --- a/python/examples/crocoddyl_ocp_clik.py +++ b/python/examples/crocoddyl_ocp_clik.py @@ -1,18 +1,14 @@ # PYTHON_ARGCOMPLETE_OK -import numpy as np -import time -import argparse -from functools import partial -from smc.managers import getMinimalArgParser, ControlLoopManager, RobotManager -from smc.optimal_control.crocoddyl_optimal_control import ( +from smc.control.optimal_control.util import get_OCP_args +from smc import getMinimalArgParser, getRobotFromArgs +from smc.control.optimal_control import ( createCrocoIKOCP, solveCrocoOCP, ) -from smc.optimal_control.get_ocp_args import get_OCP_args -from smc.basics.basics import followKinematicJointTrajP -from smc.util.logging_utils import LogManager -from smc.visualize.visualize import plotFromDict +from smc.control.joint_space import followKinematicJointTrajP + import pinocchio as pin +import numpy as np import crocoddyl import argcomplete @@ -27,12 +23,12 @@ def get_args(): if __name__ == "__main__": args = get_args() - robot = RobotManager(args) + robot = getRobotFromArgs(args) # TODO: put this back for nicer demos # Mgoal = robot.defineGoalPointCLI() Mgoal = pin.SE3.Random() - if args.visualize_manipulator: + if args.visualizer: # TODO document this somewhere robot.visualizer_manager.sendCommand({"Mgoal": Mgoal}) @@ -41,7 +37,7 @@ if __name__ == "__main__": # reference is position and velocity reference (as a dictionary), # while solver is a crocoddyl object containing a lot more information # starting state - x0 = np.concatenate([robot.getQ(), robot.getQd()]) + x0 = np.concatenate([robot.q, robot.v]) problem = createCrocoIKOCP(args, robot, x0, Mgoal) # this shouldn't really depend on x0 but i can't be bothered reference, solver = solveCrocoOCP(args, robot, problem, x0) @@ -57,14 +53,14 @@ if __name__ == "__main__": followKinematicJointTrajP(args, robot, reference) print("final position:") - print(robot.getT_w_e()) + print(robot.T_w_e) - if not args.pinocchio_only: + if args.real: robot.stopRobot() - if args.visualize_manipulator: + if args.visualizer: robot.killManipulatorVisualizer() if args.save_log: - robot.log_manager.saveLog() - robot.log_manager.plotAllControlLoops() + robot._log_manager.saveLog() + robot._log_manager.plotAllControlLoops() diff --git a/python/smc/control/__init__.py b/python/smc/control/__init__.py new file mode 100644 index 0000000..3f61207 --- /dev/null +++ b/python/smc/control/__init__.py @@ -0,0 +1 @@ +from .control_loop_manager import ControlLoopManager diff --git a/python/smc/control/optimal_control/__init__.py b/python/smc/control/optimal_control/__init__.py index 1feaec8..927eb07 100644 --- a/python/smc/control/optimal_control/__init__.py +++ b/python/smc/control/optimal_control/__init__.py @@ -1,5 +1,6 @@ import importlib.util -#if importlib.util.find_spec('casadi'): + +# if importlib.util.find_spec('casadi'): # import pinocchio as pin # if int(pin.__version__[0]) < 3: # print("you need to have pinocchio version 3.0.0 or greater to use pinocchio.casadi!") @@ -7,4 +8,3 @@ import importlib.util # from .create_pinocchio_casadi_ocp import * from .crocoddyl_mpc import * from .crocoddyl_optimal_control import * -from .get_ocp_args import * diff --git a/python/smc/control/optimal_control/crocoddyl_mpc.py b/python/smc/control/optimal_control/crocoddyl_mpc.py index 972fadc..b415f3b 100644 --- a/python/smc/control/optimal_control/crocoddyl_mpc.py +++ b/python/smc/control/optimal_control/crocoddyl_mpc.py @@ -1,14 +1,12 @@ -from ur_simple_control.managers import ( - getMinimalArgParser, - ControlLoopManager, - RobotManager, - ProcessManager, -) -from ur_simple_control.optimal_control.crocoddyl_optimal_control import ( +from smc.robots.interfaces.single_arm_interface import SingleArmInterface +from smc.control.control_loop_manager import ControlLoopManager +from smc.multiprocessing.process_manager import ProcessManager +from smc.control.optimal_control.crocoddyl_optimal_control import ( createCrocoIKOCP, createCrocoEEPathFollowingOCP, createBaseAndEEPathFollowingOCP, ) + import pinocchio as pin import crocoddyl import numpy as np @@ -21,30 +19,17 @@ if importlib.util.find_spec("mim_solvers"): # TODO: put others here too if importlib.util.find_spec("shapely"): - from ur_simple_control.path_generation.planner import ( + from smc.path_generation.planner import ( path2D_timed, pathPointFromPathParam, path2D_to_SE3, ) - -# solve ocp in side process -# this is that function -# no it can't be a control loop then -# the actual control loop is trajectory following, -# but it continuously checks whether it can get the new path -# the path needs to be time-index because it's solved on old data -# (it takes time to solve the problem, but the horizon is then longer than thing). - -# actually, before that, try solving the mpc at 500Hz with a short horizon -# and a small number of iterations, that might be feasible too, -# we need to see, there's no way to know. -# but doing it that way is certainly much easier to implement -# and probably better. -# i'm pretty sure that's how croco devs & mim_people do mpc - - -def CrocoIKMPCControlLoop(args, robot: RobotManager, solver, goal, i, past_data): +def CrocoIKMPCControlLoop(args, robot: SingleArmInterface, solver, T_w_goal, i, past_data): + """ + CrocoIKMPCControlLoop + --------------------- + """ breakFlag = False log_item = {} save_past_dict = {} @@ -52,14 +37,14 @@ def CrocoIKMPCControlLoop(args, robot: RobotManager, solver, goal, i, past_data) # check for goal if robot.robot_name == "yumi": goal_left, goal_right = goal - SEerror = robot.getT_w_e().actInv(robot.Mgoal) + SEerror = robot.T_w_e.actInv(T_w_goal) err_vector = pin.log6(SEerror).vector if np.linalg.norm(err_vector) < robot.args.goal_error: # print("Convergence achieved, reached destionation!") breakFlag = True # set initial state from sensor - x0 = np.concatenate([robot.getQ(), robot.getQd()]) + x0 = np.concatenate([robot.q, robot.v]) solver.problem.x0 = x0 # warmstart solver with previous solution xs_init = list(solver.xs[1:]) + [solver.xs[-1]] @@ -70,7 +55,7 @@ def CrocoIKMPCControlLoop(args, robot: RobotManager, solver, goal, i, past_data) xs = np.array(solver.xs) us = np.array(solver.us) vel_cmds = xs[1, robot.model.nq :] - robot.sendQd(vel_cmds) + robot.sendVelocityCommand(vel_cmds) log_item["qs"] = x0[: robot.model.nq] log_item["dqs"] = x0[robot.model.nv :] @@ -80,7 +65,7 @@ def CrocoIKMPCControlLoop(args, robot: RobotManager, solver, goal, i, past_data) return breakFlag, save_past_dict, log_item -def CrocoIKMPC(args, robot, goal, run=True): +def CrocoIKMPC(args, robot, T_w_goal, run=True): """ IKMPC ----- @@ -89,8 +74,8 @@ def CrocoIKMPC(args, robot, goal, run=True): a dynamics level, and velocities we command are actually extracted from the state x(q,dq) """ - x0 = np.concatenate([robot.getQ(), robot.getQd()]) - problem = createCrocoIKOCP(args, robot, x0, goal) + x0 = np.concatenate([robot.q, robot.v]) + problem = createCrocoIKOCP(args, robot, x0, T_w_goal) if args.solver == "boxfddp": solver = crocoddyl.SolverBoxFDDP(problem) if args.solver == "csqp": @@ -103,7 +88,7 @@ def CrocoIKMPC(args, robot, goal, run=True): us_init = solver.problem.quasiStatic([x0] * solver.problem.T) solver.solve(xs_init, us_init, args.max_solver_iter) - controlLoop = partial(CrocoIKMPCControlLoop, args, robot, solver) + controlLoop = partial(CrocoIKMPCControlLoop, args, robot, solver, T_w_goal) log_item = { "qs": np.zeros(robot.model.nq), "dqs": np.zeros(robot.model.nq), @@ -122,7 +107,7 @@ def CrocoIKMPC(args, robot, goal, run=True): def CrocoEndEffectorPathFollowingMPCControlLoop( args, - robot: RobotManager, + robot: SingleArmInterface, solver: crocoddyl.SolverBoxFDDP, path_planner: ProcessManager, i, @@ -141,7 +126,7 @@ def CrocoEndEffectorPathFollowingMPCControlLoop( log_item = {} save_past_dict = {} - q = robot.getQ() + q = robot.q T_w_e = robot.getT_w_e() p = T_w_e.translation[:2] @@ -156,9 +141,9 @@ def CrocoEndEffectorPathFollowingMPCControlLoop( if data == None: if args.debug_prints: print("CTRL: got no path so not i won't move") - robot.sendQd(np.zeros(robot.model.nv)) + robot.sendVelocityCommand(np.zeros(robot.model.nv)) log_item["qs"] = q.reshape((robot.model.nq,)) - log_item["dqs"] = robot.getQd().reshape((robot.model.nv,)) + log_item["dqs"] = robot.v.reshape((robot.model.nv,)) return breakFlag, save_past_dict, log_item if data == "done": @@ -180,7 +165,7 @@ def CrocoEndEffectorPathFollowingMPCControlLoop( if i % 20 == 0: robot.visualizer_manager.sendCommand({"frame_path": pathSE3}) - x0 = np.concatenate([robot.getQ(), robot.getQd()]) + x0 = np.concatenate([robot.q, robot.v]) solver.problem.x0 = x0 # warmstart solver with previous solution xs_init = list(solver.xs[1:]) + [solver.xs[-1]] @@ -204,10 +189,10 @@ def CrocoEndEffectorPathFollowingMPCControlLoop( us = np.array(solver.us) vel_cmds = xs[1, robot.model.nq :] - robot.sendQd(vel_cmds) + robot.sendVelocityCommand(vel_cmds) log_item["qs"] = q.reshape((robot.model.nq,)) - log_item["dqs"] = robot.getQd().reshape((robot.model.nv,)) + log_item["dqs"] = robot.v.reshape((robot.model.nv,)) return breakFlag, save_past_dict, log_item @@ -230,7 +215,7 @@ def CrocoEndEffectorPathFollowingMPC(args, robot, x0, path_planner): # technically should be done in controlloop because now # it's solved 2 times before the first command, # but we don't have time for details rn - x0 = np.concatenate([robot.getQ(), robot.getQd()]) + x0 = np.concatenate([robot.q, robot.v]) xs_init = [x0] * (solver.problem.T + 1) us_init = solver.problem.quasiStatic([x0] * solver.problem.T) solver.solve(xs_init, us_init, args.max_solver_iter) @@ -246,9 +231,11 @@ def CrocoEndEffectorPathFollowingMPC(args, robot, x0, path_planner): loop_manager.run() +# TODO: the robot put in is a whole body robot, +# which you need to implement def BaseAndEEPathFollowingMPCControlLoop( args, - robot: RobotManager, + robot, solver: crocoddyl.SolverBoxFDDP, path_planner: ProcessManager, grasp_pose, @@ -269,7 +256,7 @@ def BaseAndEEPathFollowingMPCControlLoop( log_item = {} save_past_dict = {} - q = robot.getQ() + q = robot.q T_w_e = robot.getT_w_e() p = q[:2] # NOTE: this is the actual position, not what the path suggested @@ -289,16 +276,16 @@ def BaseAndEEPathFollowingMPCControlLoop( if data == None: if args.debug_prints: print("CTRL: got no path so not i won't move") - robot.sendQd(np.zeros(robot.model.nv)) + robot.sendVelocityCommand(np.zeros(robot.model.nv)) log_item["qs"] = q.reshape((robot.model.nq,)) - log_item["dqs"] = robot.getQd().reshape((robot.model.nv,)) + log_item["dqs"] = robot.v.reshape((robot.model.nv,)) return breakFlag, save_past_dict, log_item if data == "done": breakFlag = True - robot.sendQd(np.zeros(robot.model.nv)) + robot.sendVelocityCommand(np.zeros(robot.model.nv)) log_item["qs"] = q.reshape((robot.model.nq,)) - log_item["dqs"] = robot.getQd().reshape((robot.model.nv,)) + log_item["dqs"] = robot.v.reshape((robot.model.nv,)) return breakFlag, save_past_dict, log_item ########################################## @@ -371,7 +358,7 @@ def BaseAndEEPathFollowingMPCControlLoop( robot.visualizer_manager.sendCommand({"path": path_base}) robot.visualizer_manager.sendCommand({"frame_path": pathSE3_handlebar}) - x0 = np.concatenate([robot.getQ(), robot.getQd()]) + x0 = np.concatenate([robot.q, robot.v]) solver.problem.x0 = x0 # warmstart solver with previous solution xs_init = list(solver.xs[1:]) + [solver.xs[-1]] @@ -417,10 +404,10 @@ def BaseAndEEPathFollowingMPCControlLoop( us = np.array(solver.us) vel_cmds = xs[1, robot.model.nq :] - robot.sendQd(vel_cmds) + robot.sendVelocityCommand(vel_cmds) log_item["qs"] = q.reshape((robot.model.nq,)) - log_item["dqs"] = robot.getQd().reshape((robot.model.nv,)) + log_item["dqs"] = robot.v.reshape((robot.model.nv,)) return breakFlag, save_past_dict, log_item @@ -434,7 +421,7 @@ def BaseAndEEPathFollowingMPC(args, robot, path_planner): are actually extracted from the state x(q,dq). """ - x0 = np.concatenate([robot.getQ(), robot.getQd()]) + x0 = np.concatenate([robot.q, robot.v]) problem = createBaseAndEEPathFollowingOCP(args, robot, x0) if args.solver == "boxfddp": solver = crocoddyl.SolverBoxFDDP(problem) diff --git a/python/smc/control/optimal_control/crocoddyl_optimal_control.py b/python/smc/control/optimal_control/crocoddyl_optimal_control.py index 834466a..2b01dee 100644 --- a/python/smc/control/optimal_control/crocoddyl_optimal_control.py +++ b/python/smc/control/optimal_control/crocoddyl_optimal_control.py @@ -2,8 +2,10 @@ from smc.control.joint_space.joint_space_trajectory_following import ( followKinematicJointTrajP, ) from smc.visualization.plotters import plotFromDict -from smc.robots.utils import getMinimalArgParser +from smc import getMinimalArgParser from smc.robots.robotmanager_abstract import AbstractRobotManager +from smc.robots.interfaces.single_arm_interface import SingleArmInterface +from smc.robots.interfaces.dual_arm_interface import DualArmInterface import numpy as np import pinocchio as pin @@ -12,276 +14,327 @@ from argparse import Namespace import example_robot_data import time import importlib.util +import abc if importlib.util.find_spec("mim_solvers"): import mim_solvers -def createCrocoIKOCP(args: Namespace, robot: AbstractRobotManager, x0, goal: pin.SE3): - if robot.robot_name == "yumi": - goal_l, goal_r = goal - # create torque bounds which correspond to percentage - # of maximum allowed acceleration - # NOTE: idk if this makes any sense, but let's go for it - # print(robot.model.effortLimit) - # exit() - # robot.model.effortLimit = robot.model.effortLimit * (args.acceleration / robot.MAX_ACCELERATION) - # robot.model.effortLimit = robot.model.effortLimit * 0.5 - # robot.data = robot.model.createData() - # TODO: make it underactuated in mobile base's y-direction - state = crocoddyl.StateMultibody(robot.model) - # command input IS torque - # TODO: consider ActuationModelFloatingBaseTpl for heron - # TODO: create a different actuation model (or whatever) - # for the mobile base - basically just remove the y movement in the base - # and update the corresponding derivates to 0 - # there's python examples for this, ex. acrobot. - # you might want to implement the entire action model too idk what's really necessary here - actuation = crocoddyl.ActuationModelFull(state) - - # we will be summing 4 different costs - # first 3 are for tracking, state and control regulation - runningCostModel = crocoddyl.CostModelSum(state) - terminalCostModel = crocoddyl.CostModelSum(state) - # cost 1) u residual (actuator cost) - uResidual = crocoddyl.ResidualModelControl(state, state.nv) - uRegCost = crocoddyl.CostModelResidual(state, uResidual) - # cost 2) x residual (overall amount of movement) - xResidual = crocoddyl.ResidualModelState(state, x0, state.nv) - xRegCost = crocoddyl.CostModelResidual(state, xResidual) - # cost 3) distance to goal -> SE(3) error - # TODO: make this follow a path. - # to do so, you need to implement a residualmodel for that in crocoddyl. - # there's an example of exending crocoddyl in colmpc repo - # (look at that to see how to compile, make python bindings etc) - if robot.robot_name != "yumi": - framePlacementResidual = crocoddyl.ResidualModelFramePlacement( - state, - # TODO: check if this is the frame we actually want (ee tip) - # the id is an integer and that's what api wants - robot.model.getFrameId("tool0"), - goal.copy(), - state.nv, - ) - goalTrackingCost = crocoddyl.CostModelResidual(state, framePlacementResidual) - # cost 4) final ee velocity is 0 (can't directly formulate joint velocity, - # because that's a part of the state, and we don't know final joint positions) - frameVelocityResidual = crocoddyl.ResidualModelFrameVelocity( - state, - robot.model.getFrameId("tool0"), - pin.Motion(np.zeros(6)), - pin.ReferenceFrame.WORLD, - ) - frameVelocityCost = crocoddyl.CostModelResidual(state, frameVelocityResidual) - runningCostModel.addCost("gripperPose", goalTrackingCost, 1e2) - terminalCostModel.addCost("gripperPose", goalTrackingCost, 1e2) - terminalCostModel.addCost("velFinal", frameVelocityCost, 1e3) - else: - framePlacementResidual_l = crocoddyl.ResidualModelFramePlacement( - state, - # TODO: check if this is the frame we actually want (ee tip) - # the id is an integer and that's what api wants - robot.model.getFrameId("robl_joint_7"), - goal_l.copy(), - state.nv, - ) - framePlacementResidual_r = crocoddyl.ResidualModelFramePlacement( - state, - # TODO: check if this is the frame we actually want (ee tip) - # the id is an integer and that's what api wants - robot.model.getFrameId("robr_joint_7"), - goal_r.copy(), - state.nv, - ) - goalTrackingCost_l = crocoddyl.CostModelResidual( - state, framePlacementResidual_l - ) - goalTrackingCost_r = crocoddyl.CostModelResidual( - state, framePlacementResidual_r +class CrocoOCP(abc.ABC): + """ + CrocoOCP + ---------- + General info: + - torque inputs are assumed - we already solve for state, which includes velocity commands, and we send that if the robot accepts vel cmds + State model : StateMultibody + I'm not even sure what else is there to choose lol. + Actuation model : ActuatioModelFull + We do underactuation via a constraint at the moment. This is solely + because coding up a proper underactuated model is annoying, + and dealing with this is a TODO for later. Having that said, + input constraints are necessary either way. + # TODO: consider ActuationModelFloatingBaseTpl for heron + # TODO: create a different actuation model (or whatever) + # for the mobile base - basically just remove the y movement in the base + # and update the corresponding derivates to 0 + # there's python examples for this, ex. acrobot. + # you might want to implement the entire action model too idk what's really necessary here + Action model : DifferentialActionModelFreeFwdDynamics + We need to create an action model for running and terminal knots. The + forward dynamics (computed using ABA) are implemented + inside DifferentialActionModelFreeFwdDynamics. + """ + + def __init__( + self, args: Namespace, robot: AbstractRobotManager, x0: np.ndarray, solver: str + ): + # TODO: declare attributes here + self.x0 = x0 + self.constructCrocoModels() + self.constructRegulationCosts() + self.constructStateConstraints() + self.constructInputConstraints(solver) + self.problem = crocoddyl.ShootingProblem( + self.x0, [self.runningModel] * args.n_knots, self.terminalModel ) - frameVelocityResidual_l = crocoddyl.ResidualModelFrameVelocity( - state, - robot.model.getFrameId("robl_joint_7"), - pin.Motion(np.zeros(6)), - pin.ReferenceFrame.WORLD, + + def constructCrocoModels(self): + self.state = crocoddyl.StateMultibody(self.robot.model) + # NOTE: atm we just set effort level in that direction to 0 + self.actuation = crocoddyl.ActuationModelFull(self.state) + + # we will be summing 4 different costs + # first 3 are for tracking, state and control regulation + self.runningCostModel = crocoddyl.CostModelSum(self.state) + self.terminalCostModel = crocoddyl.CostModelSum(self.state) + + def constructRegulationCosts(self): + # cost 1) u residual (actuator cost) + self.uResidual = crocoddyl.ResidualModelControl(self.state, self.state.nv) + self.uRegCost = crocoddyl.CostModelResidual(self.state, self.uResidual) + # cost 2) x residual (overall amount of movement) + self.xResidual = crocoddyl.ResidualModelState( + self.state, self.x0, self.state.nv ) - frameVelocityResidual_r = crocoddyl.ResidualModelFrameVelocity( - state, - robot.model.getFrameId("robr_joint_7"), - pin.Motion(np.zeros(6)), - pin.ReferenceFrame.WORLD, + self.xRegCost = crocoddyl.CostModelResidual(self.state, self.xResidual) + + # put these costs into the running costs + self.runningCostModel.addCost("xReg", self.xRegCost, 1e-3) + self.runningCostModel.addCost("uReg", self.uRegCost, 1e-3) + # and add the terminal cost, which is the distance to the goal + # NOTE: shouldn't there be a final velocity = 0 here? + self.terminalCostModel.addCost("uReg", self.uRegCost, 1e3) + + def constructStateConstraints(self) -> None: + """ + constructStateConstraints + ------------------------- + """ + # the 4th cost is for defining bounds via costs + # NOTE: could have gotten the same info from state.lb and state.ub. + # the first state is unlimited there idk what that means really, + # but the arm's base isn't doing a full rotation anyway, let alone 2 or more + xlb = np.concatenate( + [self.robot.model.lowerPositionLimit, -1 * self.robot.model.velocityLimit] ) - frameVelocityCost_l = crocoddyl.CostModelResidual( - state, frameVelocityResidual_l + xub = np.concatenate( + [self.robot.model.upperPositionLimit, self.robot.model.velocityLimit] ) - frameVelocityCost_r = crocoddyl.CostModelResidual( - state, frameVelocityResidual_r - ) - runningCostModel.addCost("gripperPose_l", goalTrackingCost_l, 1e2) - runningCostModel.addCost("gripperPose_r", goalTrackingCost_r, 1e2) - terminalCostModel.addCost("gripperPose_l", goalTrackingCost_l, 1e2) - terminalCostModel.addCost("gripperPose_r", goalTrackingCost_r, 1e2) - terminalCostModel.addCost("velFinal_l", frameVelocityCost_l, 1e3) - terminalCostModel.addCost("velFinal_r", frameVelocityCost_r, 1e3) - - # put these costs into the running costs - runningCostModel.addCost("xReg", xRegCost, 1e-3) - runningCostModel.addCost("uReg", uRegCost, 1e-3) - # and add the terminal cost, which is the distance to the goal - # NOTE: shouldn't there be a final velocity = 0 here? - terminalCostModel.addCost("uReg", uRegCost, 1e3) - - ###################################################################### - # state constraints # - ################################################# - # - added to costs via barrier functions for fddp - # - added as actual constraints via crocoddyl.constraintmanager for csqp - ########################################################################### - - # the 4th cost is for defining bounds via costs - # NOTE: could have gotten the same info from state.lb and state.ub. - # the first state is unlimited there idk what that means really, - # but the arm's base isn't doing a full rotation anyway, let alone 2 or more - xlb = np.concatenate( - [robot.model.lowerPositionLimit, -1 * robot.model.velocityLimit] - ) - xub = np.concatenate([robot.model.upperPositionLimit, robot.model.velocityLimit]) - # we have no limits on the mobile base. - # the mobile base is a planar joint. - # since it is represented as [x,y,cos(theta),sin(theta)], there is no point - # to limiting cos(theta),sin(theta) even if we wanted limits, - # because we would then limit theta, or limit ct and st jointly. - # in any event, xlb and xub are 1 number too long -- - # the residual has to be [x,y,theta] because it is in the tangent space - - # the difference between two points on a manifold in pinocchio is defined - # as the velocity which if parallel transported for 1 unit of "time" - # from one to point to the other. - # point activation input and the residual need to be of the same length obviously, - # and this should be 2 * model.nv the way things are defined here. - - if ( - robot.robot_name == "heron" - or robot.robot_name == "heronros" - or robot.robot_name == "mirros" - or robot.robot_name == "yumi" - ): - xlb = xlb[1:] - xub = xub[1:] - # TODO: make these constraints-turned-to-objectives for end-effector following - # the handlebar position - if args.solver == "boxfddp": + # NOTE: in an ideal universe this is handled elsewhere + # we have no limits on the mobile base. + # the mobile base is a planar joint. + # since it is represented as [x,y,cos(theta),sin(theta)], there is no point + # to limiting cos(theta),sin(theta) even if we wanted limits, + # because we would then limit theta, or limit ct and st jointly. + # in any event, xlb and xub are 1 number too long -- + # the residual has to be [x,y,theta] because it is in the tangent space - + # the difference between two points on a manifold in pinocchio is defined + # as the velocity which if parallel transported for 1 unit of "time" + # from one to point to the other. + # point activation input and the residual need to be of the same length obviously, + # and this should be 2 * model.nv the way things are defined here. + + # TODO: replace this with subclass or whatever call + # to check if the robot has mobilebase interface + if ( + self.robot.robot_name == "heron" + or self.robot.robot_name == "heronros" + or self.robot.robot_name == "mirros" + or self.robot.robot_name == "yumi" + ): + xlb = xlb[1:] + xub = xub[1:] + + def constructInputConstraints(self) -> None: + if solver == "boxFDDP": + self.inputConstraintsViaBarriersInObjectiveFunction() + if solver == "csqp": + self.boxInputConstraintsAsActualConstraints() + + # NOTE: used by BoxFDDP + def inputConstraintsViaBarriersInObjectiveFunction(self) -> None: bounds = crocoddyl.ActivationBounds(xlb, xub, 1.0) - xLimitResidual = crocoddyl.ResidualModelState(state, x0, state.nv) + xLimitResidual = crocoddyl.ResidualModelState( + self.state, self.x0, self.state.nv + ) xLimitActivation = crocoddyl.ActivationModelQuadraticBarrier(bounds) - limitCost = crocoddyl.CostModelResidual(state, xLimitActivation, xLimitResidual) - runningCostModel.addCost("limitCost", limitCost, 1e3) - terminalCostModel.addCost("limitCost", limitCost, 1e3) - - # TODO: try using crocoddyl.ConstraintModelResidual - # instead to create actual box constraints (inequality constraints) - # TODO: same goes for control input - # NOTE: i'm not sure whether any solver uses these tho lel - # --> you only do that for mim_solvers' csqp! - - if args.solver == "csqp": - # this just store all the constraints - constraints = crocoddyl.ConstraintModelManager(state, robot.model.nv) - u_constraint = crocoddyl.ConstraintModelResidual( - state, - uResidual, - -1 * robot.model.effortLimit * 0.1, - robot.model.effortLimit * 0.1, + limitCost = crocoddyl.CostModelResidual( + self.state, xLimitActivation, xLimitResidual ) - constraints.addConstraint("u_box_constraint", u_constraint) - x_constraint = crocoddyl.ConstraintModelResidual(state, xResidual, xlb, xub) - constraints.addConstraint("x_box_constraint", x_constraint) + self.runningCostModel.addCost("limitCost", limitCost, 1e3) + self.terminalCostModel.addCost("limitCost", limitCost, 1e3) - # Next, we need to create an action model for running and terminal knots. The - # forward dynamics (computed using ABA) are implemented - # inside DifferentialActionModelFreeFwdDynamics. - if args.solver == "boxfddp": - runningModel = crocoddyl.IntegratedActionModelEuler( + self.runningModel = crocoddyl.IntegratedActionModelEuler( crocoddyl.DifferentialActionModelFreeInvDynamics( - state, actuation, runningCostModel + self.state, self.actuation, self.runningCostModel ), args.ocp_dt, ) - runningModel.u_lb = -1 * robot.model.effortLimit * 0.1 - runningModel.u_ub = robot.model.effortLimit * 0.1 - terminalModel = crocoddyl.IntegratedActionModelEuler( + self.runningModel.u_lb = -1 * self.robot.model.effortLimit * 0.1 + self.runningModel.u_ub = self.robot.model.effortLimit * 0.1 + self.terminalModel = crocoddyl.IntegratedActionModelEuler( crocoddyl.DifferentialActionModelFreeInvDynamics( - state, actuation, terminalCostModel + self.state, self.actuation, self.terminalCostModel ), 0.0, ) - terminalModel.u_lb = -1 * robot.model.effortLimit * 0.1 - terminalModel.u_ub = robot.model.effortLimit * 0.1 - if args.solver == "csqp": - runningModel = crocoddyl.IntegratedActionModelEuler( + self.terminalModel.u_lb = -1 * self.robot.model.effortLimit * 0.1 + self.terminalModel.u_ub = self.robot.model.effortLimit * 0.1 + + # NOTE: used by csqp + def boxInputConstraintsAsActualConstraints(self) -> None: + # ConstraintModelManager just stores constraints + constraints = crocoddyl.ConstraintModelManager(self.state, self.robot.model.nv) + u_constraint = crocoddyl.ConstraintModelResidual( + self.state, + self.uResidual, + -1 * self.robot.model.effortLimit * 0.1, + self.robot.model.effortLimit * 0.1, + ) + constraints.addConstraint("u_box_constraint", u_constraint) + x_constraint = crocoddyl.ConstraintModelResidual( + self.state, self.xResidual, xlb, xub + ) + constraints.addConstraint("x_box_constraint", x_constraint) + self.runningModel = crocoddyl.IntegratedActionModelEuler( crocoddyl.DifferentialActionModelFreeInvDynamics( - state, actuation, runningCostModel, constraints + self.state, self.actuation, self.runningCostModel, constraints ), args.ocp_dt, ) - terminalModel = crocoddyl.IntegratedActionModelEuler( + self.terminalModel = crocoddyl.IntegratedActionModelEuler( crocoddyl.DifferentialActionModelFreeInvDynamics( - state, actuation, terminalCostModel, constraints + self.state, self.actuation, self.terminalCostModel, constraints ), 0.0, ) - # now we define the problem - problem = crocoddyl.ShootingProblem( - x0, [runningModel] * args.n_knots, terminalModel - ) - return problem - + @abc.abstractmethod + def constructTaskObjectiveFunction(self, goal) -> None: ... + + # NOTE: used by boxfddp + def createCrocoSolver(self, args, robot, x0): + # just for reference can try + # solver = crocoddyl.SolverIpopt(problem) + self.solver = crocoddyl.SolverBoxFDDP(self.problem) + # TODO: make these callbacks optional + self.solver.setCallbacks( + [crocoddyl.CallbackVerbose(), crocoddyl.CallbackLogger()] + ) -# this shouldn't really depend on x0 but i can't be bothered -def solveCrocoOCP(args, robot, problem, x0): - # and the solver - # TODO try out the following solvers from mim_solvers: - # - csqp - # - stagewise qp - # both of these have generic inequalities you can put in. - # and both are basically QP versions of iLQR if i'm not wrong - # (i have no idea tho) - if args.solver == "boxfddp": - solver = crocoddyl.SolverBoxFDDP(problem) - if args.solver == "csqp": - solver = mim_solvers.SolverCSQP(problem) - # solver = mim_solvers.SolverSQP(problem) - # solver = crocoddyl.SolverIpopt(problem) - # TODO: remove / place elsewhere once it works (make it an argument) - if args.solver == "boxfddp": - solver.setCallbacks([crocoddyl.CallbackVerbose(), crocoddyl.CallbackLogger()]) - if args.solver == "csqp": - solver.setCallbacks( + # NOTE: used by csqp + def createMimSolver(self, args, robot, x0): + # TODO try out the following solvers from mim_solvers: + # - csqp + # - stagewise qp + # and the solver + # both of these have generic inequalities you can put in. + # and both are basically QP versions of iLQR if i'm not wrong + # (i have no idea tho) + # solver = mim_solvers.SolverSQP(problem) + self.solver = mim_solvers.SolverCSQP(self.problem) + # TODO: make these callbacks optional + self.solver.setCallbacks( [mim_solvers.CallbackVerbose(), mim_solvers.CallbackLogger()] ) - # run solve - # NOTE: there are some possible parameters here that I'm not using - xs = [x0] * (solver.problem.T + 1) - us = solver.problem.quasiStatic([x0] * solver.problem.T) - start = time.time() - solver.solve(xs, us, 500, False, 1e-9) - end = time.time() - print("solved in:", end - start, "seconds") + # this shouldn't really depend on x0 but i can't be bothered + def solveOCP(self, args, robot, x0): + # run solve + # NOTE: there are some possible parameters here that I'm not using + xs = [x0] * (self.solver.problem.T + 1) + us = self.solver.problem.quasiStatic([x0] * self.solver.problem.T) + + start = time.time() + solver.solve(xs, us, 500, False, 1e-9) + end = time.time() + print("solved in:", end - start, "seconds") + + # solver.solve() + # get reference (state trajectory) + # we aren't using controls because we only have velocity inputs + xs = np.array(solver.xs) + qs = xs[:, : robot.model.nq] + vs = xs[:, robot.model.nq :] + reference = {"qs": qs, "vs": vs, "dt": args.ocp_dt} + return reference, solver + + +class SingleArmIKOCP(CrocoOCP): + def __init__( + args: Namespace, robot: SingleArmInterface, x0: np.ndarray, T_w_goal: pin.SE3 + ): + super().__init__(args, robot, x0) + self.constructTaskObjectiveFunction(T_w_goal) + + def constructTaskObjectiveFunction(self, T_w_goal): + # cost 3) distance to goal -> SE(3) error + # TODO: make this follow a path. + # to do so, you need to implement a residualmodel for that in crocoddyl. + # there's an example of exending crocoddyl in colmpc repo + # (look at that to see how to compile, make python bindings etc) + framePlacementResidual = crocoddyl.ResidualModelFramePlacement( + self.state, + # TODO: check if this is the frame we actually want (ee tip) + # the id is an integer and that's what api wants + robot.model.getFrameId("tool0"), + T_w_goal.copy(), + self.state.nv, + ) + goalTrackingCost = crocoddyl.CostModelResidual( + self.state, framePlacementResidual + ) + # cost 4) final ee velocity is 0 (can't directly formulate joint velocity, + # because that's a part of the state, and we don't know final joint positions) + frameVelocityResidual = crocoddyl.ResidualModelFrameVelocity( + self.state, + robot.model.getFrameId("tool0"), + pin.Motion(np.zeros(6)), + pin.ReferenceFrame.WORLD, + ) + frameVelocityCost = crocoddyl.CostModelResidual( + self.state, frameVelocityResidual + ) + self.runningCostModel.addCost("gripperPose", goalTrackingCost, 1e2) + self.terminalCostModel.addCost("gripperPose", goalTrackingCost, 1e2) + self.terminalCostModel.addCost("velFinal", frameVelocityCost, 1e3) + - # solver.solve() - # get reference (state trajectory) - # we aren't using controls because we only have velocity inputs - xs = np.array(solver.xs) - qs = xs[:, : robot.model.nq] - vs = xs[:, robot.model.nq :] - reference = {"qs": qs, "vs": vs, "dt": args.ocp_dt} - return reference, solver +def DualArmIKOCP(CrocoOCP): + def __init__( + args: Namespace, robot: DualArmInterface, x0: np.ndarray, goal: pin.SE3 + ): + super().__init__(args, robot, x0) + + def constructTaskObjectiveFunction(self, goal): + T_w_lgoal, T_w_rgoal = goal + framePlacementResidual_l = crocoddyl.ResidualModelFramePlacement( + self.state, + self.robot.model.l_ee_frame_id, + T_w_lgoal.copy(), + self.state.nv, + ) + framePlacementResidual_r = crocoddyl.ResidualModelFramePlacement( + self.state, + self.robot.model.r_ee_frame_id, + T_w_rgoal.copy(), + self.state.nv, + ) + goalTrackingCost_l = crocoddyl.CostModelResidual( + self.state, framePlacementResidual_l + ) + goalTrackingCost_r = crocoddyl.CostModelResidual( + self.state, framePlacementResidual_r + ) + frameVelocityResidual_l = crocoddyl.ResidualModelFrameVelocity( + self.state, + self.robot.model.l_ee_frame_id, + pin.Motion(np.zeros(6)), + pin.ReferenceFrame.WORLD, + ) + frameVelocityResidual_r = crocoddyl.ResidualModelFrameVelocity( + self.state, + self.robot.model.r_ee_frame_id, + pin.Motion(np.zeros(6)), + pin.ReferenceFrame.WORLD, + ) + frameVelocityCost_l = crocoddyl.CostModelResidual( + self.state, frameVelocityResidual_l + ) + frameVelocityCost_r = crocoddyl.CostModelResidual( + self.state, frameVelocityResidual_r + ) + self.runningCostModel.addCost("gripperPose_l", goalTrackingCost_l, 1e2) + self.runningCostModel.addCost("gripperPose_r", goalTrackingCost_r, 1e2) + self.terminalCostModel.addCost("gripperPose_l", goalTrackingCost_l, 1e2) + self.terminalCostModel.addCost("gripperPose_r", goalTrackingCost_r, 1e2) + self.terminalCostModel.addCost("velFinal_l", frameVelocityCost_l, 1e3) + self.terminalCostModel.addCost("velFinal_r", frameVelocityCost_r, 1e3) -def createCrocoEEPathFollowingOCP(args, robot: AbstractRobotManager, x0): +class CrocoEEPathFollowingOCP(CrocoOCP): """ createCrocoEEPathFollowingOCP ------------------------------- @@ -289,162 +342,83 @@ def createCrocoEEPathFollowingOCP(args, robot: AbstractRobotManager, x0): it is instantiated to just to stay at the current position. NOTE: the path MUST be time indexed with the SAME time used between the knots """ - T_w_e = robot.getT_w_e() - path = [T_w_e] * args.n_knots - # underactuation is done by setting max-torque to 0 in the robot model, - # and since we torques are constrained we're good - state = crocoddyl.StateMultibody(robot.model) - actuation = crocoddyl.ActuationModelFull(state) - - # we will be summing 4 different costs - # first 3 are for tracking, state and control regulation - terminalCostModel = crocoddyl.CostModelSum(state) - # cost 1) u residual (actuator cost) - uResidual = crocoddyl.ResidualModelControl(state, state.nv) - uRegCost = crocoddyl.CostModelResidual(state, uResidual) - # cost 2) x residual (overall amount of movement) - xResidual = crocoddyl.ResidualModelState(state, x0, state.nv) - xRegCost = crocoddyl.CostModelResidual(state, xResidual) - # cost 4) final ee velocity is 0 (can't directly formulate joint velocity, - # because that's a part of the state, and we don't know final joint positions) - # frameVelocityResidual = crocoddyl.ResidualModelFrameVelocity( - # state, - # robot.model.getFrameId("tool0"), - # pin.Motion(np.zeros(6)), - # pin.ReferenceFrame.WORLD) - # frameVelocityCost = crocoddyl.CostModelResidual(state, frameVelocityResidual) - - # put these costs into the running costs - # we put this one in later - # and add the terminal cost, which is the distance to the goal - # NOTE: shouldn't there be a final velocity = 0 here? - terminalCostModel.addCost("uReg", uRegCost, 1e3) - # terminalCostModel.addCost("velFinal", frameVelocityCost, 1e3) - - ###################################################################### - # state constraints # - ################################################# - # - added to costs via barrier functions for fddp - # - added as actual constraints via crocoddyl.constraintmanager for csqp - ########################################################################### - - # the 4th cost is for defining bounds via costs - # NOTE: could have gotten the same info from state.lb and state.ub. - # the first state is unlimited there idk what that means really, - # but the arm's base isn't doing a full rotation anyway, let alone 2 or more - xlb = np.concatenate( - [robot.model.lowerPositionLimit, -1 * robot.model.velocityLimit] - ) - xub = np.concatenate([robot.model.upperPositionLimit, robot.model.velocityLimit]) - - # we have no limits on the mobile base. - # the mobile base is a planar joint. - # since it is represented as [x,y,cos(theta),sin(theta)], there is no point - # to limiting cos(theta),sin(theta) even if we wanted limits, - # because we would then limit theta, or limit ct and st jointly. - # in any event, xlb and xub are 1 number too long -- - # the residual has to be [x,y,theta] because it is in the tangent space - - # the difference between two points on a manifold in pinocchio is defined - # as the velocity which if parallel transported for 1 unit of "time" - # from one to point to the other. - # point activation input and the residual need to be of the same length obviously, - # and this should be 2 * model.nv the way things are defined here. - if ( - robot.robot_name == "heron" - or robot.robot_name == "heronros" - or robot.robot_name == "mirros" - ): - xlb = xlb[1:] - xub = xub[1:] - - # TODO: make these constraints-turned-to-objectives for end-effector following - # the handlebar position - if args.solver == "boxfddp": - bounds = crocoddyl.ActivationBounds(xlb, xub, 1.0) - xLimitResidual = crocoddyl.ResidualModelState(state, x0, state.nv) - xLimitActivation = crocoddyl.ActivationModelQuadraticBarrier(bounds) - - limitCost = crocoddyl.CostModelResidual(state, xLimitActivation, xLimitResidual) - terminalCostModel.addCost("limitCost", limitCost, 1e3) - - # csqp actually allows us to put in hard constraints so we do that - if args.solver == "csqp": - # this just store all the constraints - constraints = crocoddyl.ConstraintModelManager(state, robot.model.nv) - u_constraint = crocoddyl.ConstraintModelResidual( - state, - uResidual, - -1 * robot.model.effortLimit * 0.1, - robot.model.effortLimit * 0.1, - ) - constraints.addConstraint("u_box_constraint", u_constraint) - x_constraint = crocoddyl.ConstraintModelResidual(state, xResidual, xlb, xub) - constraints.addConstraint("x_box_constraint", x_constraint) - - # Next, we need to create an action model for running and terminal knots. The - # forward dynamics (computed using ABA) are implemented - # inside DifferentialActionModelFreeFwdDynamics. - - runningModels = [] - for i in range(args.n_knots): - runningCostModel = crocoddyl.CostModelSum(state) - runningCostModel.addCost("xReg", xRegCost, 1e-3) - runningCostModel.addCost("uReg", uRegCost, 1e-3) - if args.solver == "boxfddp": - runningCostModel.addCost("limitCost", limitCost, 1e3) - framePlacementResidual = crocoddyl.ResidualModelFramePlacement( - state, - robot.model.getFrameId("tool0"), - path[i], # this better be done with the same time steps as the knots - # NOTE: this should be done outside of this function to clarity - state.nv, - ) - goalTrackingCost = crocoddyl.CostModelResidual(state, framePlacementResidual) - runningCostModel.addCost("gripperPose" + str(i), goalTrackingCost, 1e2) - # runningCostModel.addCost("gripperPose", goalTrackingCost, 1e2) + def __init__(self, args: Namespace, robot: SingleArmInterface, x0: np.ndarray): + super().__init__(self, args, robot, x0) + + def constructTaskObjectiveFunction(self, goal): + T_w_e = robot.T_w_e + path = [T_w_e] * args.n_knots + + # TODO: you need to find a way to make this mesh + # with the generic construction. + # the only difference is that we construct SLIGHTLY DIFFERENT running models here + # instead of multiplying the same one + def constructPathFollowingRunningModels(self): + runningModels = [] + for i in range(args.n_knots): + runningCostModel = crocoddyl.CostModelSum(state) + runningCostModel.addCost("xReg", xRegCost, 1e-3) + runningCostModel.addCost("uReg", uRegCost, 1e-3) + if args.solver == "boxfddp": + runningCostModel.addCost("limitCost", limitCost, 1e3) + framePlacementResidual = crocoddyl.ResidualModelFramePlacement( + state, + robot.model.getFrameId("tool0"), + path[i], # this better be done with the same time steps as the knots + # NOTE: this should be done outside of this function to clarity + state.nv, + ) + goalTrackingCost = crocoddyl.CostModelResidual( + state, framePlacementResidual + ) + runningCostModel.addCost("gripperPose" + str(i), goalTrackingCost, 1e2) + # runningCostModel.addCost("gripperPose", goalTrackingCost, 1e2) + + if args.solver == "boxfddp": + runningModel = crocoddyl.IntegratedActionModelEuler( + crocoddyl.DifferentialActionModelFreeInvDynamics( + state, actuation, runningCostModel + ), + args.ocp_dt, + ) + runningModel.u_lb = -1 * robot.model.effortLimit * 0.1 + runningModel.u_ub = robot.model.effortLimit * 0.1 + if args.solver == "csqp": + runningModel = crocoddyl.IntegratedActionModelEuler( + crocoddyl.DifferentialActionModelFreeInvDynamics( + state, actuation, runningCostModel, constraints + ), + args.ocp_dt, + ) + runningModels.append(runningModel) + + # terminal model if args.solver == "boxfddp": - runningModel = crocoddyl.IntegratedActionModelEuler( + terminalModel = crocoddyl.IntegratedActionModelEuler( crocoddyl.DifferentialActionModelFreeInvDynamics( - state, actuation, runningCostModel + state, actuation, terminalCostModel ), - args.ocp_dt, + 0.0, ) - runningModel.u_lb = -1 * robot.model.effortLimit * 0.1 - runningModel.u_ub = robot.model.effortLimit * 0.1 + terminalModel.u_lb = -1 * robot.model.effortLimit * 0.1 + terminalModel.u_ub = robot.model.effortLimit * 0.1 if args.solver == "csqp": - runningModel = crocoddyl.IntegratedActionModelEuler( + terminalModel = crocoddyl.IntegratedActionModelEuler( crocoddyl.DifferentialActionModelFreeInvDynamics( - state, actuation, runningCostModel, constraints + state, actuation, terminalCostModel, constraints ), - args.ocp_dt, + 0.0, ) - runningModels.append(runningModel) - # terminal model - if args.solver == "boxfddp": - terminalModel = crocoddyl.IntegratedActionModelEuler( - crocoddyl.DifferentialActionModelFreeInvDynamics( - state, actuation, terminalCostModel - ), - 0.0, - ) - terminalModel.u_lb = -1 * robot.model.effortLimit * 0.1 - terminalModel.u_ub = robot.model.effortLimit * 0.1 - if args.solver == "csqp": - terminalModel = crocoddyl.IntegratedActionModelEuler( - crocoddyl.DifferentialActionModelFreeInvDynamics( - state, actuation, terminalCostModel, constraints - ), - 0.0, + terminalCostModel.addCost( + "gripperPose" + str(args.n_knots), goalTrackingCost, 1e2 ) + # terminalCostModel.addCost("gripperPose", goalTrackingCost, 1e2) - terminalCostModel.addCost("gripperPose" + str(args.n_knots), goalTrackingCost, 1e2) - # terminalCostModel.addCost("gripperPose", goalTrackingCost, 1e2) - - # now we define the problem - problem = crocoddyl.ShootingProblem(x0, runningModels, terminalModel) - return problem + # now we define the problem + problem = crocoddyl.ShootingProblem(x0, runningModels, terminalModel) + return problem def createBaseAndEEPathFollowingOCP(args, robot: AbstractRobotManager, x0): @@ -463,86 +437,9 @@ def createBaseAndEEPathFollowingOCP(args, robot: AbstractRobotManager, x0): path_ee = [T_w_e_left] * args.n_knots # TODO: have a different reference for each arm path_base = [np.append(x0[:2], 0.0)] * args.n_knots - # underactuation is done by setting max-torque to 0 in the robot model, - # and since we torques are constrained we're good - state = crocoddyl.StateMultibody(robot.model) - actuation = crocoddyl.ActuationModelFull(state) - - # we will be summing 6 different costs - # first 3 are for tracking, state and control regulation, - # the others depend on the path and will be defined later - terminalCostModel = crocoddyl.CostModelSum(state) - # cost 1) u residual (actuator cost) - uResidual = crocoddyl.ResidualModelControl(state, state.nv) - uRegCost = crocoddyl.CostModelResidual(state, uResidual) - # cost 2) x residual (overall amount of movement) - xResidual = crocoddyl.ResidualModelState(state, x0, state.nv) - xRegCost = crocoddyl.CostModelResidual(state, xResidual) - - # put these costs into the running costs - # we put this one in later - # and add the terminal cost, which is the distance to the goal - terminalCostModel.addCost("uReg", uRegCost, 1e3) - - ###################################################################### - # state constraints # - ################################################# - # - added to costs via barrier functions for fddp (4th cost function) - # - added as actual constraints via crocoddyl.constraintmanager for csqp - ########################################################################### - xlb = np.concatenate( - [robot.model.lowerPositionLimit, -1 * robot.model.velocityLimit] - ) - xub = np.concatenate([robot.model.upperPositionLimit, robot.model.velocityLimit]) - # we have no limits on the mobile base. - # the mobile base is a planar joint. - # since it is represented as [x,y,cos(theta),sin(theta)], there is no point - # to limiting cos(theta),sin(theta) even if we wanted limits, - # because we would then limit theta, or limit ct and st jointly. - # in any event, xlb and xub are 1 number too long -- - # the residual has to be [x,y,theta] because it is in the tangent space - - # the difference between two points on a manifold in pinocchio is defined - # as the velocity which if parallel transported for 1 unit of "time" - # from one to point to the other. - # point activation input and the residual need to be of the same length obviously, - # and this should be 2 * model.nv the way things are defined here. - - if ( - robot.robot_name == "heron" - or robot.robot_name == "heronros" - or robot.robot_name == "mirros" - or robot.robot_name == "yumi" - ): - xlb = xlb[1:] - xub = xub[1:] - # TODO: make these constraints-turned-to-objectives for end-effector following - # the handlebar position - if args.solver == "boxfddp": - bounds = crocoddyl.ActivationBounds(xlb, xub, 1.0) - xLimitResidual = crocoddyl.ResidualModelState(state, x0, state.nv) - xLimitActivation = crocoddyl.ActivationModelQuadraticBarrier(bounds) - - limitCost = crocoddyl.CostModelResidual(state, xLimitActivation, xLimitResidual) - terminalCostModel.addCost("limitCost", limitCost, 1e3) - - # csqp actually allows us to put in hard constraints so we do that - if args.solver == "csqp": - # this just store all the constraints - constraints = crocoddyl.ConstraintModelManager(state, robot.model.nv) - u_constraint = crocoddyl.ConstraintModelResidual( - state, - uResidual, - -1 * robot.model.effortLimit * 0.1, - robot.model.effortLimit * 0.1, - ) - constraints.addConstraint("u_box_constraint", u_constraint) - x_constraint = crocoddyl.ConstraintModelResidual(state, xResidual, xlb, xub) - constraints.addConstraint("x_box_constraint", x_constraint) - - # Next, we need to create an action model for running and terminal knots. The - # forward dynamics (computed using ABA) are implemented - # inside DifferentialActionModelFreeFwdDynamics. + # TODO: MAKE A RUNNING MODEL FUNCTION HERE WHICH + # PLAYS BALL WITH GENERAL CONSTRUCTION runningModels = [] for i in range(args.n_knots): runningCostModel = crocoddyl.CostModelSum(state) @@ -572,7 +469,7 @@ def createBaseAndEEPathFollowingOCP(args, robot: AbstractRobotManager, x0): else: l_eePoseResidual = crocoddyl.ResidualModelFramePlacement( state, - robot.model.getFrameId("robl_joint_7"), + robot.model.l_ee_frame_id, # MASSIVE TODO: actually put in reference for left arm path_ee[i], state.nv, @@ -583,7 +480,7 @@ def createBaseAndEEPathFollowingOCP(args, robot: AbstractRobotManager, x0): ) r_eePoseResidual = crocoddyl.ResidualModelFramePlacement( state, - robot.model.getFrameId("robr_joint_7"), + robot.model.r_ee_frame_id, # MASSIVE TODO: actually put in reference for left arm path_ee[i], state.nv, @@ -657,39 +554,3 @@ def createBaseAndEEPathFollowingOCP(args, robot: AbstractRobotManager, x0): # now we define the problem problem = crocoddyl.ShootingProblem(x0, runningModels, terminalModel) return problem - - -if __name__ == "__main__": - parser = getMinimalArgParser() - parser = get_OCP_args(parser) - args = parser.parse_args() - ex_robot = example_robot_data.load("ur5_gripper") - robot = AbstractRobotManager(args) - # TODO: remove once things work - robot.max_qd = 3.14 - print("velocity limits", robot.model.velocityLimit) - robot.q = pin.randomConfiguration(robot.model) - robot.q[0] = 0.1 - robot.q[1] = 0.1 - print(robot.q) - goal = pin.SE3.Random() - goal.translation = np.random.random(3) * 0.4 - reference, solver = CrocoIKOCP(args, robot, goal) - # we only work within -pi - pi (or 0-2pi idk anymore) - # reference['qs'] = reference['qs'] % (2*np.pi) - np.pi - if args.solver == "boxfddp": - log = solver.getCallbacks()[1] - crocoddyl.plotOCSolution(log.xs, log.us, figIndex=1, show=True) - crocoddyl.plotConvergence( - log.costs, log.pregs, log.dregs, log.grads, log.stops, log.steps, figIndex=2 - ) - followKinematicJointTrajP(args, robot, reference) - reference.pop("dt") - plotFromDict(reference, args.n_knots + 1, args) - print("achieved result", robot.getT_w_e()) - display = crocoddyl.MeshcatDisplay(ex_robot) - display.rate = -1 - display.freq = 1 - while True: - display.displayFromSolver(solver) - time.sleep(1.0) diff --git a/python/smc/control/optimal_control/get_ocp_args.py b/python/smc/control/optimal_control/util.py similarity index 100% rename from python/smc/control/optimal_control/get_ocp_args.py rename to python/smc/control/optimal_control/util.py diff --git a/python/smc/multiprocessing/__init__.py b/python/smc/multiprocessing/__init__.py new file mode 100644 index 0000000..bf5f08d --- /dev/null +++ b/python/smc/multiprocessing/__init__.py @@ -0,0 +1 @@ +from .process_manager import ProcessManager -- GitLab