From 409304aa0ff2551309b33e7ada04b3c837e02875 Mon Sep 17 00:00:00 2001 From: m-guberina <gubi.guberina@gmail.com> Date: Sat, 1 Mar 2025 00:41:50 +0100 Subject: [PATCH] getting real close to FULLY finalize refactoring --- examples/cartesian_space/test_by_running.sh | 37 ++++ examples/test_by_running.sh | 14 +- .../cartesian_space_compliant_control.py | 75 ++++--- .../cartesian_space_trajectory_following.py | 2 +- .../smc/control/cartesian_space/ik_solvers.py | 61 +++--- python/smc/control/control_loop_manager.py | 8 +- .../generic_control_loop.py | 11 +- .../path_following_template.py | 2 +- .../controller_templates/point_to_point.py | 17 +- python/smc/control/freedrive.py | 17 +- .../joint_space/joint_space_point_to_point.py | 81 +++---- .../path_following_croco_ocp.py | 1 - ...r_abstract.py => abstract_robotmanager.py} | 65 +++--- ....py => abstract_simulated_robotmanager.py} | 3 +- .../smc/robots/grippers/abstract_gripper.py | 3 + .../grippers/robotiq/robotiq_gripper.py | 89 +++++--- .../grippers/rs485_robotiq/rs485_robotiq.py | 134 ++++++++++++ python/smc/robots/implementations/__init__.py | 19 +- python/smc/robots/implementations/heron.py | 31 +-- python/smc/robots/implementations/mir.py | 28 ++- .../smc/robots/implementations/mobile_yumi.py | 38 +++- python/smc/robots/implementations/ur5e.py | 31 +-- python/smc/robots/implementations/yumi.py | 30 +-- .../robots/interfaces/dual_arm_interface.py | 65 ++++-- .../interfaces/mobile_base_interface.py | 2 +- .../robots/interfaces/single_arm_interface.py | 2 +- .../robots/interfaces/whole_body_interface.py | 197 ++++++++++++++++-- python/smc/robots/utils.py | 7 + 28 files changed, 712 insertions(+), 358 deletions(-) create mode 100755 examples/cartesian_space/test_by_running.sh rename python/smc/robots/{robotmanager_abstract.py => abstract_robotmanager.py} (92%) rename python/smc/robots/{abstract_simulated_robot.py => abstract_simulated_robotmanager.py} (96%) create mode 100644 python/smc/robots/grippers/rs485_robotiq/rs485_robotiq.py diff --git a/examples/cartesian_space/test_by_running.sh b/examples/cartesian_space/test_by_running.sh new file mode 100755 index 0000000..cae0cf4 --- /dev/null +++ b/examples/cartesian_space/test_by_running.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# the idea here is to run all the runnable things +# and test for syntax errors + +################### +# classic cliks # +################### +# damped pseudoinverse arm +runnable="clik_point_to_point.py --randomly-generate-goal --ctrl-freq=-1 --no-plotter --no-visualizer --max-iterations=2000" +echo $runnable +python $runnable + +# QP arm +runnable="clik_point_to_point.py --randomly-generate-goal --ik-solver=invKinmQP --ctrl-freq=-1 --no-visualizer --no-plotter --max-iterations=2000" +echo $runnable +python $runnable + +# damped pseudoinverse single arm whole body mobile +runnable="clik_point_to_point.py --robot=heron --randomly-generate-goal --ctrl-freq=-1 --no-visualizer --no-plotter --max-iterations=2000" +echo $runnable +python $runnable + + +# QP whole body mobile +runnable="clik_point_to_point.py --robot=heron --randomly-generate-goal --ik-solver=invKinmQP --ctrl-freq=-1 --no-visualizer --no-plotter --max-iterations=2000" +echo $runnable +python $runnable + +# dual arm pseudoinverse +runnable="dual_arm_clik.py --robot=yumi --randomly-generate-goal --ctrl-freq=-1 --no-visualizer --no-plotter --max-iterations=2000" +echo $runnable +python $runnable + +# dual arm QP +runnable="dual_arm_clik.py --robot=yumi --randomly-generate-goal --ik-solver=invKinmQP --ctrl-freq=-1 --no-visualizer --no-plotter --max-iterations=2000" +echo $runnable +python $runnable diff --git a/examples/test_by_running.sh b/examples/test_by_running.sh index f001ebe..dfe4fde 100755 --- a/examples/test_by_running.sh +++ b/examples/test_by_running.sh @@ -6,7 +6,7 @@ # camera # ############ # the only one where you check plotter -runnable="camera_no_lag.py --max-iterations=1500 --no-visualize-manipulator" +runnable="./vision/camera_no_lag.py --max-iterations=1500 --no-visualize-manipulator" echo $runnable python $runnable echo "==========================================================" @@ -16,22 +16,22 @@ echo "==========================================================" ################### # the only one where you check visualizer # damped pseudoinverse arm -runnable="clik.py --randomly-generate-goal" +runnable="./cartesian_space/clik_point_to_point.py --randomly-generate-goal" echo $runnable python $runnable # damped pseudoinverse whole body mobile -runnable="clik.py --robot=heron --randomly-generate-goal --fast-simulation --no-visualize-manipulator --no-real-time-plotting --max-iterations=2000" +runnable="./cartesian_space/clik_point_to_point.py --robot=heron --randomly-generate-goal --fast-simulation --no-visualize-manipulator --no-real-time-plotting --max-iterations=2000" echo $runnable python $runnable # QP arm -runnable="clik.py --randomly-generate-goal --clik-controller=invKinmQP --fast-simulation --no-visualize-manipulator --no-real-time-plotting --max-iterations=2000" +runnable="./cartesian_space/clik_point_to_point.py --randomly-generate-goal --clik-controller=invKinmQP --fast-simulation --no-visualize-manipulator --no-real-time-plotting --max-iterations=2000" echo $runnable python $runnable # QP whole body mobile -runnable="clik.py --robot=heron --randomly-generate-goal --clik-controller=invKinmQP --fast-simulation --no-visualize-manipulator --no-real-time-plotting --max-iterations=2000" +runnable="./cartesian_space/clik_point_to_point.py --robot=heron --randomly-generate-goal --clik-controller=invKinmQP --fast-simulation --no-visualize-manipulator --no-real-time-plotting --max-iterations=2000" echo $runnable python $runnable @@ -46,7 +46,7 @@ python $runnable #python challenge_main.py #python comparing_logs_example.py #python crocoddyl_mpc.py -#python crocoddyl_ocp_clik.py +#python crocoddyl_ocp_./cartesian_space/clik_point_to_point.py #python data #python drawing_from_input_drawing.py #python force_control_test.py @@ -65,7 +65,7 @@ python $runnable #python point_impedance_control.py #python pushing_via_friction_cones.py #python __pycache__ -#python ros2_clik.py +#python ros2_./cartesian_space/clik_point_to_point.py #python smc_node.py #python test_by_running.sh #python test_crocoddyl_opt_ctrl.py diff --git a/python/smc/control/cartesian_space/cartesian_space_compliant_control.py b/python/smc/control/cartesian_space/cartesian_space_compliant_control.py index 745d9ed..8947a3c 100644 --- a/python/smc/control/cartesian_space/cartesian_space_compliant_control.py +++ b/python/smc/control/cartesian_space/cartesian_space_compliant_control.py @@ -1,4 +1,5 @@ from smc.control.control_loop_manager import ControlLoopManager +from smc.control.controller_templates.point_to_point import EEP2PCtrlLoopTemplate from smc.robots.interfaces.force_torque_sensor_interface import ( ForceTorqueOnSingleArmWrist, ) @@ -8,28 +9,34 @@ import pinocchio as pin import numpy as np import copy from argparse import Namespace +from collections import deque def controlLoopCompliantClik( - args, robot: ForceTorqueOnSingleArmWrist, ik_solver, T_w_goal: pin.SE3, i, past_data -): + # J err_vec v_cmd + ik_solver: Callable[[np.ndarray, np.ndarray], np.ndarray], + T_w_goal: pin.SE3, + args: Namespace, + robot: ForceTorqueOnSingleArmWrist, + i: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[np.ndarray, dict[str, np.ndarray], dict[str, np.ndarray]]: """ controlLoopCompliantClik --------------- - generic control loop for clik (handling error to final point etc). - in some version of the universe this could be extended to a generic - point-to-point motion control loop. + CLIK with compliance in all directions """ - breakFlag = False + save_past_item = {} log_item = {} - save_past_dict = {} - q = robot.q T_w_e = robot.T_w_e wrench = robot.wrench + # we need to overcome noise if we want to converge if np.linalg.norm(wrench) < args.minimum_detectable_force_norm: wrench = np.zeros(6) - save_past_dict["wrench"] = copy.deepcopy(wrench) + save_past_item["wrench"] = copy.deepcopy(wrench) + + # low pass filter for wrenches wrench = args.beta * wrench + (1 - args.beta) * np.average( np.array(past_data["wrench"]), axis=0 ) @@ -38,42 +45,22 @@ def controlLoopCompliantClik( else: Z = np.diag(np.array([0.0, 0.0, 1.0, 0.0, 0.0, 0.0])) wrench = Z @ wrench - # first check whether we're at the goal + SEerror = T_w_e.actInv(T_w_goal) err_vector = pin.log6(SEerror).vector - if np.linalg.norm(err_vector) < robot.args.goal_error: - breakFlag = True - J = pin.computeFrameJacobian(robot.model, robot.data, q, robot.ee_frame_id) - # compute the joint velocities. - # just plug and play different ones + + J = robot.getJacobian() v_cartesian_body_cmd = args.kp * err_vector + args.alpha * wrench v_cmd = ik_solver(J, v_cartesian_body_cmd) - # qd = ( - # J.T - # @ np.linalg.inv(J @ J.T + np.eye(J.shape[0], J.shape[0]) * args.tikhonov_damp) - # @ err_vector - # ) - # tau = J.T @ wrench - # tau = tau[:6].reshape((6,1)) - # qd += args.alpha * tau - robot.sendVelocityCommand(v_cmd) - log_item["qs"] = q.reshape((robot.model.nq,)) - # get actual velocity, not the commanded one. - # although that would be fun to compare i guess - # TODO: have both, but call the compute control signal like it should be - log_item["dqs"] = robot.v - log_item["wrench"] = robot.wrench.reshape((6,)) - log_item["wrench_used"] = wrench.reshape((6,)) - # log_item["tau"] = tau.reshape((robot.model.nv,)) - # we're not saving here, but need to respect the API, - # hence the empty dict - return breakFlag, save_past_dict, log_item + log_item["wrench"] = robot.wrench + log_item["wrench_used"] = wrench + return v_cmd, save_past_item, log_item # add a threshold for the wrench def compliantMoveL( - args: Namespace, robot: ForceTorqueOnSingleArmWrist, T_w_goal: pin.SE3, run=True + T_w_goal: pin.SE3, args: Namespace, robot: ForceTorqueOnSingleArmWrist, run=True ): """ compliantMoveL @@ -83,16 +70,24 @@ def compliantMoveL( send a SE3 object as goal point. if you don't care about rotation, make it np.zeros((3,3)) """ - # assert type(goal_point) == pin.pinocchio_pywrap.SE3 + assert type(T_w_goal) is pin.SE3 ik_solver = getIKSolver(args, robot) - controlLoop = partial(controlLoopCompliantClik, args, robot, ik_solver, T_w_goal) + controlLoop = partial( + EEP2PCtrlLoopTemplate, + ik_solver, + T_w_goal, + controlLoopCompliantClik, + args, + robot, + ) # we're not using any past data or logging, hence the empty arguments log_item = { "qs": np.zeros(robot.model.nq), + "err_norm": np.zeros(1), + "dqs": np.zeros(robot.nv), + "dqs_cmd": np.zeros(robot.nv), "wrench": np.zeros(6), "wrench_used": np.zeros(6), - # "tau": np.zeros(robot.model.nv), - "dqs": np.zeros(robot.model.nv), } save_past_dict = { "wrench": np.zeros(6), diff --git a/python/smc/control/cartesian_space/cartesian_space_trajectory_following.py b/python/smc/control/cartesian_space/cartesian_space_trajectory_following.py index 2283451..cefb4bb 100644 --- a/python/smc/control/cartesian_space/cartesian_space_trajectory_following.py +++ b/python/smc/control/cartesian_space/cartesian_space_trajectory_following.py @@ -1,5 +1,5 @@ from smc.control.control_loop_manager import ControlLoopManager -from smc.robots.robotmanager_abstract import AbstractRobotManager +from smc.robots.abstract_robotmanager import AbstractRobotManager from smc.multiprocessing.process_manager import ProcessManager from functools import partial import pinocchio as pin diff --git a/python/smc/control/cartesian_space/ik_solvers.py b/python/smc/control/cartesian_space/ik_solvers.py index 01f7244..464f26e 100644 --- a/python/smc/control/cartesian_space/ik_solvers.py +++ b/python/smc/control/cartesian_space/ik_solvers.py @@ -1,7 +1,9 @@ from typing import Callable import numpy as np from qpsolvers import solve_qp -import proxsuite + +# TODO: if importlib.files ... to make it an optional import +from proxsuite import proxqp from functools import partial from argparse import Namespace @@ -37,24 +39,21 @@ def getIKSolver( if args.ik_solver == "invKinmQP": lb = -1 * np.array(robot.model.velocityLimit, dtype="double") # we do additional clipping - lb = np.clip(lb, -1 * robot.max_qd, robot.max_qd) + lb = np.clip(lb, -1 * robot._max_v, robot._max_v) ub = np.array(robot.model.velocityLimit, dtype="double") - ub = np.clip(ub, -1 * robot.max_qd, robot.max_qd) + ub = np.clip(ub, -1 * robot._max_v, robot._max_v) return partial(invKinmQP, lb=lb, ub=ub) if args.ik_solver == "QPproxsuite": - H = np.eye(robot.model.nv) - g = np.zeros(robot.model.nv) - G = np.eye(robot.model.nv) - A = np.eye(6, robot.model.nv) + H = np.eye(robot.nv) + g = np.zeros(robot.nv) + G = np.eye(robot.nv) + A = np.eye(6, robot.nv) b = np.ones(6) * 0.1 - # proxsuite does lb <= Cx <= ub - C = np.eye(robot.model.nv) - lb = -1 * np.array(robot.model.velocityLimit, dtype="double") - # we do additional clipping - lb = np.clip(lb, -1 * robot.max_qd, robot.max_qd) - ub = np.array(robot.model.velocityLimit, dtype="double") - ub = np.clip(ub, -1 * robot.max_qd, robot.max_qd) - qp = proxsuite.proxqp.dense.QP(robot.model.nv, 6, robot.model.nv) + # proxqp does lb <= Cx <= ub + C = np.eye(robot.nv) + lb = -1 * robot._max_v.copy() + ub = robot._max_v.copy() + qp = proxqp.dense.QP(robot.model.nv, 6, robot.model.nv) qp.init(H, g, A, b, G, lb, ub) qp.solve() return partial(QPproxsuite, qp, lb, ub) @@ -70,7 +69,9 @@ def getIKSolver( return partial(dampedPseudoinverse, args.tikhonov_damp) -def dampedPseudoinverse(tikhonov_damp, J, err_vector): +def dampedPseudoinverse( + tikhonov_damp: float, J: np.ndarray, err_vector: np.ndarray +) -> np.ndarray: qd = ( J.T @ np.linalg.inv(J @ J.T + np.eye(J.shape[0], J.shape[0]) * tikhonov_damp) @@ -79,7 +80,7 @@ def dampedPseudoinverse(tikhonov_damp, J, err_vector): return qd -def jacobianTranspose(J, err_vector): +def jacobianTranspose(J: np.ndarray, err_vector: np.ndarray) -> np.ndarray: qd = J.T @ err_vector return qd @@ -87,21 +88,23 @@ def jacobianTranspose(J, err_vector): # TODO: put something into q of the QP # also, put in lb and ub # this one is with qpsolvers -def invKinmQP(J, err_vector, lb=None, ub=None, past_qd=None): +def invKinmQP( + J: np.ndarray, err_vector: np.ndarray, lb=None, ub=None, past_qd=None +) -> np.ndarray: """ invKinmQP --------- generic QP: minimize 1/2 x^T P x + q^T x subject to - G x \leq h + G x \\leq h A x = b lb <= x <= ub inverse kinematics QP: minimize 1/2 qd^T P qd + q^T qd (optional secondary objective) subject to - G qd \leq h (optional) + G qd \\leq h (optional) J qd = b (mandatory) lb <= qd <= ub (optional) """ @@ -140,17 +143,21 @@ def invKinmQP(J, err_vector, lb=None, ub=None, past_qd=None): return qd -def QPproxsuite(qp, lb, ub, J, err_vector): - # proxsuite does lb <= Cx <= ub - qp.settings.initial_guess = ( - proxsuite.proxqp.InitialGuess.WARM_START_WITH_PREVIOUS_RESULT - ) +def QPproxsuite( + qp: proxqp.dense.QP, + lb: np.ndarray, + ub: np.ndarray, + J: np.ndarray, + err_vector: np.ndarray, +) -> np.ndarray: + # proxqp does lb <= Cx <= ub + qp.settings.initial_guess = proxqp.InitialGuess.WARM_START_WITH_PREVIOUS_RESULT # qp.update(g=q, A=A, b=h, l=lb, u=ub) qp.update(A=J, b=err_vector) qp.solve() qd = qp.results.x - if qp.results.info.status == proxsuite.proxqp.PROXQP_PRIMAL_INFEASIBLE: + if qp.results.info.status == proxqp.PROXQP_PRIMAL_INFEASIBLE: # if np.abs(qp.results.info.duality_gap) > 0.1: print("didn't solve shit") qd = None @@ -214,7 +221,7 @@ Tgn = Tdiffq(costManipulability.calc,q) """ -def QPManipMax(J, err_vector, lb=None, ub=None): +def QPManipMax(J: np.ndarray, err_vector: np.ndarray, lb=None, ub=None) -> np.ndarray: """ invKinmQP --------- diff --git a/python/smc/control/control_loop_manager.py b/python/smc/control/control_loop_manager.py index 9951ff1..4faca5c 100644 --- a/python/smc/control/control_loop_manager.py +++ b/python/smc/control/control_loop_manager.py @@ -59,9 +59,9 @@ class ControlLoopManager: self, robot_manager: AbstractRobotManager, controlLoop: partial, - args, - save_past_item, - log_item, + args: Namespace, + save_past_item: dict[str, np.ndarray], + log_item: dict[str, np.ndarray], ): signal.signal(signal.SIGINT, self.stopHandler) self.pid = getpid() @@ -74,7 +74,7 @@ class ControlLoopManager: self.past_data: dict[str, deque[np.ndarray]] = {} # NOTE: viz update rate is a magic number that seems to work fine and i don't have # any plans to make it smarter - if args.viz_update_rate < 0: + if args.viz_update_rate < 0 and args.ctrl_freq > 0: self.viz_update_rate: int = int(np.ceil(self.args.ctrl_freq / 25)) else: self.viz_update_rate: int = args.viz_update_rate diff --git a/python/smc/control/controller_templates/generic_control_loop.py b/python/smc/control/controller_templates/generic_control_loop.py index f9b5838..5422547 100644 --- a/python/smc/control/controller_templates/generic_control_loop.py +++ b/python/smc/control/controller_templates/generic_control_loop.py @@ -1,8 +1,9 @@ +from smc.robots.abstract_robotmanager import AbstractRobotManager + from argparse import Namespace from typing import Any, Callable import numpy as np - -from smc.robots.robotmanager_abstract import AbstractRobotManager +from collections import deque global control_loop_return control_loop_return = tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]] @@ -11,18 +12,18 @@ control_loop_return = tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]] def GenericControlLoop( X: Any, control_loop: Callable[ - [Any, Namespace, AbstractRobotManager, dict[str, np.ndarray], float], + [Any, Namespace, AbstractRobotManager, float, dict[str, np.ndarray]], control_loop_return, ], args: Namespace, robot: AbstractRobotManager, - past_data: dict[str, np.ndarray], t: int, # will be float eventually + past_data: dict[str, deque[np.ndarray]], ) -> control_loop_return: breakFlag = False log_item = {} save_past_item = {} - control_loop(X, args, robot, past_data, t) + control_loop(X, args, robot, t, past_data) return breakFlag, log_item, save_past_item diff --git a/python/smc/control/controller_templates/path_following_template.py b/python/smc/control/controller_templates/path_following_template.py index 746a159..4df88cd 100644 --- a/python/smc/control/controller_templates/path_following_template.py +++ b/python/smc/control/controller_templates/path_following_template.py @@ -1,4 +1,4 @@ -from smc.robots.robotmanager_abstract import AbstractRobotManager +from smc.robots.abstract_robotmanager import AbstractRobotManager from smc.multiprocessing.process_manager import ProcessManager from pinocchio import SE3, log6 diff --git a/python/smc/control/controller_templates/point_to_point.py b/python/smc/control/controller_templates/point_to_point.py index 8c319c0..828b217 100644 --- a/python/smc/control/controller_templates/point_to_point.py +++ b/python/smc/control/controller_templates/point_to_point.py @@ -48,12 +48,13 @@ def EEP2PCtrlLoopTemplate( T_w_e = robot.T_w_e SEerror = T_w_e.actInv(T_w_goal) err_vector = log6(SEerror).vector - if np.linalg.norm(err_vector) < robot.args.goal_error: + err_vector_norm = np.linalg.norm(err_vector) + if err_vector_norm < robot.args.goal_error: breakFlag = True log_item["qs"] = robot.q - log_item["err_norm"] = np.linalg.norm(err_vector).reshape((1,)) + log_item["err_norm"] = err_vector_norm.reshape((1,)) log_item["dqs"] = robot.v - log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) + log_item["dqs_cmd"] = v_cmd.reshape((robot.nv,)) return breakFlag, save_past_item, log_item @@ -105,17 +106,19 @@ def DualEEP2PCtrlLoopTemplate( err_vector_left = log6(SEerror_left).vector err_vector_right = log6(SEerror_right).vector + err_vector_left_norm = np.linalg.norm(err_vector_left) + err_vector_right_norm = np.linalg.norm(err_vector_right) - if (np.linalg.norm(err_vector_left) < robot.args.goal_error) and ( - np.linalg.norm(err_vector_right) < robot.args.goal_error + if (err_vector_left_norm < robot.args.goal_error) and ( + err_vector_right_norm < robot.args.goal_error ): breakFlag = True log_item["qs"] = robot.q log_item["dqs"] = robot.v log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,)) - log_item["l_err_norm"] = np.linalg.norm(err_vector_left) - log_item["r_err_norm"] = np.linalg.norm(err_vector_right) + log_item["l_err_norm"] = err_vector_left_norm + log_item["r_err_norm"] = err_vector_right_norm return breakFlag, save_past_item, log_item diff --git a/python/smc/control/freedrive.py b/python/smc/control/freedrive.py index d7f6e7d..5628e7a 100644 --- a/python/smc/control/freedrive.py +++ b/python/smc/control/freedrive.py @@ -4,12 +4,19 @@ from smc.control.control_loop_manager import ControlLoopManager import numpy as np from functools import partial import threading -import queue +from queue import Queue +from argparse import Namespace +from collections import deque def freedriveControlLoop( - args, robot: SingleArmInterface, com_queue, pose_n_q_dict, i, past_data -): + args: Namespace, + robot: SingleArmInterface, + com_queue: Queue, + pose_n_q_dict: dict[str, list], + i: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]: """ controlLoopFreedrive ----------------------------- @@ -68,10 +75,10 @@ def freedriveUntilKeyboard(args, robot: SingleArmInterface): # all controlLoopManager goodies) log_item = {"qs": np.zeros((robot.model.nq,)), "dqs": np.zeros((robot.model.nv,))} save_past_dict = {} - # use queue.queue because we're doing this on a + # use Queue because we're doing this on a # threading level, not multiprocess level # infinite size (won't need more than 1 but who cares) - com_queue = queue.Queue() + com_queue = Queue() # we're passing pose_n_q_list by reference # (python default for mutables) pose_n_q_dict = {"T_w_es": [], "qs": []} diff --git a/python/smc/control/joint_space/joint_space_point_to_point.py b/python/smc/control/joint_space/joint_space_point_to_point.py index 1f54db2..d506184 100644 --- a/python/smc/control/joint_space/joint_space_point_to_point.py +++ b/python/smc/control/joint_space/joint_space_point_to_point.py @@ -1,38 +1,31 @@ -from smc.robots.robotmanager_abstract import AbstractRobotManager +from smc.robots.abstract_robotmanager import AbstractRobotManager from smc.control.control_loop_manager import ControlLoopManager import numpy as np from functools import partial +from collections import deque +from argparse import Namespace -def moveJControlLoop(q_desired, robot: AbstractRobotManager, i, past_data): +def moveJControlLoop( + q_desired: np.ndarray, + args: Namespace, + robot: AbstractRobotManager, + i: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]: """ moveJControlLoop --------------- most basic P control for joint space point-to-point motion, actual control loop. """ breakFlag = False - save_past_dict = {} - # you don't even need forward kinematics for this lmao q = robot.q - # TODO: be more intelligent with qs - q = q[:6] - q_desired = q_desired[:6] q_error = q_desired - q - - # STOP MUCH BEFORE YOU NEED TO FOR THE DEMO - # EVEN THIS MIGHT BE TOO MUCH - # TODO fix later obviously if np.linalg.norm(q_error) < 1e-3: breakFlag = True - # stupid hack, for the love of god remove this - # but it should be small enough lel - # there. fixed. tko radi taj i grijesi, al jebemu zivot sta je to bilo K = 120 - # print(q_error) - # TODO: you should clip this qd = K * q_error * robot.dt - # qd = np.clip(qd, robot.acceleration, robot.acceleration) robot.sendVelocityCommand(qd) return breakFlag, {}, {} @@ -41,7 +34,7 @@ def moveJControlLoop(q_desired, robot: AbstractRobotManager, i, past_data): # fix this by tuning or whatever else. # MOVEL works just fine, so apply whatever's missing for there here # and that's it. -def moveJP(args, robot, q_desired): +def moveJP(q_desired: np.ndarray, args: Namespace, robot: AbstractRobotManager) -> None: """ moveJP --------------- @@ -49,7 +42,7 @@ def moveJP(args, robot, q_desired): just starts the control loop without any logging. """ assert type(q_desired) == np.ndarray - controlLoop = partial(moveJControlLoop, q_desired, robot) + controlLoop = partial(moveJControlLoop, q_desired, args, robot) # we're not using any past data or logging, hence the empty arguments loop_manager = ControlLoopManager(robot, controlLoop, args, {}, {}) loop_manager.run() @@ -59,7 +52,13 @@ def moveJP(args, robot, q_desired): print("MoveJP done: convergence achieved, reached destionation!") -def moveJPIControlLoop(q_desired, robot: AbstractRobotManager, i, past_data): +def moveJPIControlLoop( + q_desired: np.ndarray, + args: Namespace, + robot: AbstractRobotManager, + i: int, + past_data: dict[str, deque[np.ndarray]], +) -> tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]: """ PID control for joint space point-to-point motion with approximated joint velocities. """ @@ -74,13 +73,7 @@ def moveJPIControlLoop(q_desired, robot: AbstractRobotManager, i, past_data): # ================================ # Current Joint Positions # ================================ - q = robot.q # Get current joint positions (first 6 joints) - - # ================================ - # Retrieve Previous States - # ================================ - q_prev = past_data["q_prev"][-1] # Previous joint positions - e_prev = past_data["e_prev"][-1] # Previous position error + q = robot.q # ================================ # Compute Position Error @@ -91,7 +84,7 @@ def moveJPIControlLoop(q_desired, robot: AbstractRobotManager, i, past_data): # Check for Convergence # ================================ if np.linalg.norm(q_error) < 1e-3 and np.linalg.norm(robot.v) < 1e-3: - breakFlag = True # Convergence achieved + breakFlag = True # ================================ # Update Integral of Error @@ -121,7 +114,6 @@ def moveJPIControlLoop(q_desired, robot: AbstractRobotManager, i, past_data): # Compute Control Input (Joint Velocities) # ================================ v_cmd = Kp * q_error + Ki * integral_error - # qd[5]=qd[5]*10 # ================================ # Send Joint Velocities to the Robot @@ -135,43 +127,30 @@ def moveJPIControlLoop(q_desired, robot: AbstractRobotManager, i, past_data): return breakFlag, save_past_dict, log_item -def moveJPI(args, robot, q_desired): +def moveJPI( + q_desired: np.ndarray, args: Namespace, robot: AbstractRobotManager +) -> None: assert isinstance(q_desired, np.ndarray) - controlLoop = partial(moveJPIControlLoop, q_desired, robot) + controlLoop = partial(moveJPIControlLoop, q_desired, args, robot) - # ================================ - # Initialization - # ================================ - # Get initial joint positions - initial_q = robot.getQ()[:6] - - # Initialize past data for control loop + initial_q = robot.q save_past_dict = { - "integral_error": np.zeros(robot.model.nq)[:6], + "integral_error": np.zeros(robot.nq), "q_prev": initial_q, - "e_prev": q_desired - - initial_q, # Initial position error (may need to be q_desired - initial_q) + "e_prev": q_desired - initial_q, } - # Initialize log item (if logging is needed) log_item = { "qs": np.zeros(6), "dqs": np.zeros(6), - "integral_error": np.zeros(robot.model.nq)[:6], - "e_prev": q_desired - - initial_q, # Initial position error (may need to be q_desired - initial_q) + "integral_error": np.zeros(robot.nq), + "e_prev": q_desired - initial_q, } - # ================================ - # Create and Run Control Loop Manager - # ================================ loop_manager = ControlLoopManager( robot, controlLoop, args, save_past_dict, log_item ) loop_manager.run() - # ================================ - # Debug Printing - # ================================ if args.debug_prints: print("MoveJPI done: convergence achieved, reached destination!") diff --git a/python/smc/control/optimal_control/path_following_croco_ocp.py b/python/smc/control/optimal_control/path_following_croco_ocp.py index 092f783..bf1d815 100644 --- a/python/smc/control/optimal_control/path_following_croco_ocp.py +++ b/python/smc/control/optimal_control/path_following_croco_ocp.py @@ -1,7 +1,6 @@ # TODO: make a bundle method which solves and immediately follows the traj. from smc.control.optimal_control.abstract_croco_ocp import CrocoOCP from smc.robots.interfaces.mobile_base_interface import MobileBaseInterface -from smc.robots.robotmanager_abstract import AbstractRobotManager from smc.robots.interfaces.single_arm_interface import SingleArmInterface from smc.robots.interfaces.dual_arm_interface import DualArmInterface diff --git a/python/smc/robots/robotmanager_abstract.py b/python/smc/robots/abstract_robotmanager.py similarity index 92% rename from python/smc/robots/robotmanager_abstract.py rename to python/smc/robots/abstract_robotmanager.py index ffe9976..93af007 100644 --- a/python/smc/robots/robotmanager_abstract.py +++ b/python/smc/robots/abstract_robotmanager.py @@ -2,7 +2,7 @@ from smc.visualization.visualizer import manipulatorVisualizer from smc.logging.logger import Logger from smc.multiprocessing.process_manager import ProcessManager import abc -import argparse +from argparse import Namespace import pinocchio as pin import numpy as np from functools import partial @@ -20,46 +20,25 @@ class AbstractRobotManager(abc.ABC): - provide functions every robot has to have like getQ """ - # weird ass pythonic way to declare an abstract atribute: - # you make the property function abstract, - # and then implement that. - # the property function consists of: - # - the class attribute - # - setter of the class attribute - # - getter of the class attribute - # - deleter of the class attribute - # since we only want the class attribute which won't be changed, - # we don't explicitely declare @property.setter and the rest. - # if you don't implement the following for attributes declared this way: - # @property - # def attribute(self): - # return self.attribute - # then you will get a TypeError, and that's what we want - - # errors if you didn't implement these attributes. - - @property - @abc.abstractmethod - def model(self) -> pin.Model: ... - - @property - @abc.abstractmethod - def data(self) -> pin.Data: ... - @property - @abc.abstractmethod - def visual_model(self) -> pin.pinocchio_pywrap_default.GeometryModel: ... - @property - @abc.abstractmethod - def collision_model(self) -> pin.pinocchio_pywrap_default.GeometryModel: ... - - def __init__(self, args: argparse.Namespace): - self.args = args + def __init__(self, args: Namespace): + self._model: pin.Model + self._data: pin.Data + self._visual_model: pin.pinocchio_pywrap_default.GeometryModel + self._collision_model: pin.pinocchio_pywrap_default.GeometryModel + self.args: Namespace = args if args.debug_prints: print("AbstractRobotManager init") #################################################################### # robot-related attributes # #################################################################### self.robot_name: str = args.robot # const - self._dt = 1 / self.args.ctrl_freq # [s] + if self.args.ctrl_freq > 0: + self._dt = 1 / self.args.ctrl_freq # [s] + else: + # if we're going as fast as possible, we are in sim. + # if we're in sim, we need to know for long to integrate, + # and we set this to 1e-3 because there's absolutely no need to be more precise than that + self._dt = 1e-3 self._t: float = 0.0 # NOTE: _MAX_ACCELERATION probably should be an array, but since UR robots only accept a float, # we go for a float and pretend the every vector element is this 1 number @@ -120,6 +99,22 @@ class AbstractRobotManager(abc.ABC): "sorry, almost done - look at utils to get 90% of the solution!" ) + @property + def model(self) -> pin.Model: + return self._model + + @property + def data(self) -> pin.Data: + return self._data + + @property + def visual_model(self) -> pin.pinocchio_pywrap_default.GeometryModel: + return self._visual_model + + @property + def collision_model(self) -> pin.pinocchio_pywrap_default.GeometryModel: + return self._collision_model + @abc.abstractmethod def setInitialPose(self) -> None: """ diff --git a/python/smc/robots/abstract_simulated_robot.py b/python/smc/robots/abstract_simulated_robotmanager.py similarity index 96% rename from python/smc/robots/abstract_simulated_robot.py rename to python/smc/robots/abstract_simulated_robotmanager.py index be9392a..2024e95 100644 --- a/python/smc/robots/abstract_simulated_robot.py +++ b/python/smc/robots/abstract_simulated_robotmanager.py @@ -8,7 +8,8 @@ import numpy as np # it could be in pybullet, mujoco or wherever else class AbstractSimulatedRobotManager(AbstractRealRobotManager): def __init__(self, args): - print("simulated robot init") + if args.debug_prints: + print("AbstractSimulatedRobotManager init") super().__init__(args) # NOTE: can be override by any particular robot diff --git a/python/smc/robots/grippers/abstract_gripper.py b/python/smc/robots/grippers/abstract_gripper.py index 0c254dc..a261f05 100644 --- a/python/smc/robots/grippers/abstract_gripper.py +++ b/python/smc/robots/grippers/abstract_gripper.py @@ -15,6 +15,9 @@ class AbstractGripper(abc.ABC): There are also the classic expected open(), close(), isGripping() quality-of-life functions. """ + def connect(self): + pass + @abc.abstractmethod def move(self, position, speed=None, gripping_force=None): pass diff --git a/python/smc/robots/grippers/robotiq/robotiq_gripper.py b/python/smc/robots/grippers/robotiq/robotiq_gripper.py index 2ce85ba..db7dcd4 100644 --- a/python/smc/robots/grippers/robotiq/robotiq_gripper.py +++ b/python/smc/robots/grippers/robotiq/robotiq_gripper.py @@ -12,27 +12,33 @@ class RobotiqGripper(AbstractGripper): """ Communicates with the gripper directly, via socket with string commands, leveraging string names for variables. """ + # WRITE VARIABLES (CAN ALSO READ) - ACT = 'ACT' # act : activate (1 while activated, can be reset to clear fault status) - GTO = 'GTO' # gto : go to (will perform go to with the actions set in pos, for, spe) - ATR = 'ATR' # atr : auto-release (emergency slow move) - ADR = 'ADR' # adr : auto-release direction (open(1) or close(0) during auto-release) - FOR = 'FOR' # for : force (0-255) - SPE = 'SPE' # spe : speed (0-255) - POS = 'POS' # pos : position (0-255), 0 = open + # act : activate (1 while activated, can be reset to clear fault status) + ACT = "ACT" + # gto : go to (will perform go to with the actions set in pos, for, spe) + GTO = "GTO" + # atr : auto-release (emergency slow move) + ATR = "ATR" + # adr : auto-release direction (open(1) or close(0) during auto-release) + ADR = "ADR" + FOR = "FOR" # for : force (0-255) + SPE = "SPE" # spe : speed (0-255) + POS = "POS" # pos : position (0-255), 0 = open # READ VARIABLES - STA = 'STA' # status (0 = is reset, 1 = activating, 3 = active) - PRE = 'PRE' # position request (echo of last commanded position) - OBJ = 'OBJ' # object detection (0 = moving, 1 = outer grip, 2 = inner grip, 3 = no object at rest) - FLT = 'FLT' # fault (0=ok, see manual for errors if not zero) + STA = "STA" # status (0 = is reset, 1 = activating, 3 = active) + PRE = "PRE" # position request (echo of last commanded position) + OBJ = "OBJ" # object detection (0 = moving, 1 = outer grip, 2 = inner grip, 3 = no object at rest) + FLT = "FLT" # fault (0=ok, see manual for errors if not zero) - ENCODING = 'UTF-8' # ASCII and UTF-8 both seem to work + ENCODING = "UTF-8" # ASCII and UTF-8 both seem to work class GripperStatus(Enum): """ Gripper status reported by the gripper. The integer values have to match what the gripper sends. """ + RESET = 0 ACTIVATING = 1 # UNUSED = 2 # This value is currently not used by the gripper firmware @@ -43,6 +49,7 @@ class RobotiqGripper(AbstractGripper): Object status reported by the gripper. The integer values have to match what the gripper sends. """ + MOVING = 0 STOPPED_OUTER_OBJECT = 1 STOPPED_INNER_OBJECT = 2 @@ -81,15 +88,15 @@ class RobotiqGripper(AbstractGripper): Sends the appropriate command via socket to set the value of n variables, and waits for its 'ack' response. :param var_dict: Dictionary of variables to set (variable_name, value). - :return: True on successful reception of ack, - false if no ack was received, indicating the set may not + :return: True on successful reception of ack, + false if no ack was received, indicating the set may not have been effective. """ # construct unique command cmd = "SET" for variable, value in var_dict.items(): cmd += f" {variable} {str(value)}" - cmd += '\n' # new line is required for the command to finish + cmd += "\n" # new line is required for the command to finish # atomic commands send/rcv with self.command_lock: self.socket.sendall(cmd.encode(self.ENCODING)) @@ -102,7 +109,7 @@ class RobotiqGripper(AbstractGripper): variable, and waits for its 'ack' response. :param variable: Variable to set. :param value: Value to set for the variable. - :return: True on successful reception of ack, + :return: True on successful reception of ack, false if no ack was received, indicating the set may not have been effective. """ @@ -127,13 +134,15 @@ class RobotiqGripper(AbstractGripper): # 2 bytes, instead of an integer. We assume integer here var_name, value_str = data.decode(self.ENCODING).split() if var_name != variable: - raise ValueError(f"Unexpected response {data} ({data.decode(self.ENCODING)}): does not match '{variable}'") + raise ValueError( + f"Unexpected response {data} ({data.decode(self.ENCODING)}): does not match '{variable}'" + ) value = int(value_str) return value @staticmethod def _is_ack(data: str): - return data == b'ack' + return data == b"ack" def _reset(self): """ @@ -154,12 +163,11 @@ class RobotiqGripper(AbstractGripper): """ self._set_var(self.ACT, 0) self._set_var(self.ATR, 0) - while (not self._get_var(self.ACT) == 0 or not self._get_var(self.STA) == 0): + while not self._get_var(self.ACT) == 0 or not self._get_var(self.STA) == 0: self._set_var(self.ACT, 0) self._set_var(self.ATR, 0) time.sleep(0.5) - def activate(self, auto_calibrate: bool = True): """ Resets the activation flag in the gripper, and sets it back to one, @@ -194,12 +202,12 @@ class RobotiqGripper(AbstractGripper): """ if not self.is_active(): self._reset() - while (not self._get_var(self.ACT) == 0 or not self._get_var(self.STA) == 0): + while not self._get_var(self.ACT) == 0 or not self._get_var(self.STA) == 0: time.sleep(0.01) self._set_var(self.ACT, 1) time.sleep(1.0) - while (not self._get_var(self.ACT) == 1 or not self._get_var(self.STA) == 3): + while not self._get_var(self.ACT) == 1 or not self._get_var(self.STA) == 3: time.sleep(0.01) # auto-calibrate position range if desired @@ -211,7 +219,9 @@ class RobotiqGripper(AbstractGripper): Returns whether the gripper is active. """ status = self._get_var(self.STA) - return RobotiqGripper.GripperStatus(status) == RobotiqGripper.GripperStatus.ACTIVE + return ( + RobotiqGripper.GripperStatus(status) == RobotiqGripper.GripperStatus.ACTIVE + ) def get_min_position(self) -> int: """ @@ -267,21 +277,29 @@ class RobotiqGripper(AbstractGripper): raise RuntimeError(f"Calibration failed opening to start: {str(status)}") # try to close as far as possible, and record the number - (position, status) = self.move_and_wait_for_pos(self.get_closed_position(), 64, 1) + (position, status) = self.move_and_wait_for_pos( + self.get_closed_position(), 64, 1 + ) if RobotiqGripper.ObjectStatus(status) != RobotiqGripper.ObjectStatus.AT_DEST: - raise RuntimeError(f"Calibration failed because of an object: {str(status)}") + raise RuntimeError( + f"Calibration failed because of an object: {str(status)}" + ) assert position <= self._max_position self._max_position = position # try to open as far as possible, and record the number (position, status) = self.move_and_wait_for_pos(self.get_open_position(), 64, 1) if RobotiqGripper.ObjectStatus(status) != RobotiqGripper.ObjectStatus.AT_DEST: - raise RuntimeError(f"Calibration failed because of an object: {str(status)}") + raise RuntimeError( + f"Calibration failed because of an object: {str(status)}" + ) assert position >= self._min_position self._min_position = position if log: - print(f"Gripper auto-calibrated to [{self.get_min_position()}, {self.get_max_position()}]") + print( + f"Gripper auto-calibrated to [{self.get_min_position()}, {self.get_max_position()}]" + ) def move(self, position: int, speed=255, gripping_force=0) -> Tuple[bool, int]: """ @@ -302,7 +320,14 @@ class RobotiqGripper(AbstractGripper): clip_for = clip_val(self._min_force, gripping_force, self._max_force) # moves to the given position with the given speed and gripping_force - var_dict = OrderedDict([(self.POS, clip_pos), (self.SPE, clip_spe), (self.FOR, clip_for), (self.GTO, 1)]) + var_dict = OrderedDict( + [ + (self.POS, clip_pos), + (self.SPE, clip_spe), + (self.FOR, clip_for), + (self.GTO, 1), + ] + ) return self._set_vars(var_dict), clip_pos def open(self): @@ -311,7 +336,9 @@ class RobotiqGripper(AbstractGripper): def close(self): self.move(255, 255, 0) - def move_and_wait_for_pos(self, position: int, speed: int, force: int) -> Tuple[int, ObjectStatus]: # noqa + def move_and_wait_for_pos( + self, position: int, speed: int, force: int + ) -> Tuple[int, ObjectStatus]: # noqa """ Sends commands to start moving towards the given position, with the specified speed and force, and then waits for the move to complete. @@ -335,7 +362,9 @@ class RobotiqGripper(AbstractGripper): # wait until not moving cur_obj = self._get_var(self.OBJ) - while RobotiqGripper.ObjectStatus(cur_obj) == RobotiqGripper.ObjectStatus.MOVING: + while ( + RobotiqGripper.ObjectStatus(cur_obj) == RobotiqGripper.ObjectStatus.MOVING + ): cur_obj = self._get_var(self.OBJ) # report the actual position and the object status diff --git a/python/smc/robots/grippers/rs485_robotiq/rs485_robotiq.py b/python/smc/robots/grippers/rs485_robotiq/rs485_robotiq.py new file mode 100644 index 0000000..e40f457 --- /dev/null +++ b/python/smc/robots/grippers/rs485_robotiq/rs485_robotiq.py @@ -0,0 +1,134 @@ +from ur_simple_control.util.grippers.abstract_gripper import AbstractGripper + +import socket +import time +import struct +import threading + + +class RobotiqHand(AbstractGripper): + def __init__(self): + self.so = None + self._cont = False + self._sem = None + self._heartbeat_th = None + self._max_position = 255 + self._min_position = 0 + + def _heartbeat_worker(self): + while self._cont: + self.status() + time.sleep(0.5) + + def connect(self, ip, port): + self.so = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.so.connect((ip, port)) + self._cont = True + self._sem = threading.Semaphore(1) + self._heartbeat_th = threading.Thread(target=self._heartbeat_worker) + self._heartbeat_th.start() + + def disconnect(self): + self._cont = False + self._heartbeat_th.join() + self._heartbeat_th = None + self.so.close() + self.so = None + self._sem = None + + def _calc_crc(self, command): + crc_registor = 0xFFFF + for data_byte in command: + tmp = crc_registor ^ data_byte + for _ in range(8): + if tmp & 1 == 1: + tmp = tmp >> 1 + tmp = 0xA001 ^ tmp + else: + tmp = tmp >> 1 + crc_registor = tmp + crc = bytearray(struct.pack("<H", crc_registor)) + return crc + + def send_command(self, command): + with self._sem: + crc = self._calc_crc(command) + data = command + crc + self.so.sendall(data) + time.sleep(0.001) + data = self.so.recv(1024) + return bytearray(data) + + def status(self): + command = bytearray(b"\x09\x03\x07\xd0\x00\x03") + return self.send_command(command) + + def reset(self): + command = bytearray(b"\x09\x10\x03\xe8\x00\x03\x06\x00\x00\x00\x00\x00\x00") + return self.send_command(command) + + def activate(self): + command = bytearray(b"\x09\x10\x03\xe8\x00\x03\x06\x01\x00\x00\x00\x00\x00") + return self.send_command(command) + + def wait_activate_complete(self): + while True: + data = self.status() + if data[5] != 0x00: + return data[3] + if data[3] == 0x31 and data[7] < 4: + return data[3] + + def adjust(self): + self.move(255, 64, 1) + (status, position, force) = self.wait_move_complete() + self._max_position = position + self.move(0, 64, 1) + (status, position, force) = self.wait_move_complete() + self._min_position = position + + def get_position_mm(self, position): + if position > self._max_position: + position = self._max_position + elif position < self._min_position: + position = self._min_position + position_mm = ( + 85.0 + * (self._max_position - position) + / (self._max_position - self._min_position) + ) + # print 'max=%d, min=%d, pos=%d pos_mm=%.1f' % (self._max_position, self._min_position, position, position_mm) + return position_mm + + def get_force_mA(self, force): + return 10.0 * force + + # position: 0x00...open, 0xff...close + # speed: 0x00...minimum, 0xff...maximum + # force: 0x00...minimum, 0xff...maximum + def move(self, position, speed, force): + # print('move hand') + command = bytearray(b"\x09\x10\x03\xe8\x00\x03\x06\x09\x00\x00\x00\x00\x00") + command[10] = position + command[11] = speed + command[12] = force + return self.send_command(command) + + # result: (status, position, force) + def wait_move_complete(self): + while True: + data = self.status() + if data[5] != 0x00: + return (-1, data[7], data[8]) + if data[3] == 0x79: + return (2, data[7], data[8]) + if data[3] == 0xB9: + return (1, data[7], data[8]) + if data[3] == 0xF9: + return (0, data[7], data[8]) + + def open(self): + self.move(0, 255, 255) + + def close(self): + self.move(255, 255, 0) diff --git a/python/smc/robots/implementations/__init__.py b/python/smc/robots/implementations/__init__.py index 98afce5..3871613 100644 --- a/python/smc/robots/implementations/__init__.py +++ b/python/smc/robots/implementations/__init__.py @@ -1,10 +1,17 @@ -# from .heron import RobotManagerHeron, SimulatedHeronRobotManager -from .heron import * -from .mir import * -from .yumi import * -from .mobile_yumi import * -from .ur5e import RobotManagerUR5e, SimulatedUR5eRobotManager +# from .mir import AbstractMirRobotManager +from .heron import SimulatedHeronRobotManager +from .mobile_yumi import SimulatedMobileYuMiRobotManager +from .ur5e import SimulatedUR5eRobotManager +from .yumi import SimulatedYuMiRobotManager + from importlib.util import find_spec if find_spec("rtde_control"): from .ur5e import RealUR5eRobotManager + +if find_spec("rclpy"): + from .yumi import RealYuMiRobotManager + from .mobile_yumi import RealMobileYumiRobotManager + +if find_spec("rclpy") and find_spec("rtde_control"): + from .heron import RealHeronRobotManager diff --git a/python/smc/robots/implementations/heron.py b/python/smc/robots/implementations/heron.py index 45ef20f..c8fd585 100644 --- a/python/smc/robots/implementations/heron.py +++ b/python/smc/robots/implementations/heron.py @@ -1,40 +1,21 @@ -from smc.robots.abstract_simulated_robot import AbstractSimulatedRobotManager +from smc.robots.abstract_simulated_robotmanager import AbstractSimulatedRobotManager from smc.robots.interfaces.force_torque_sensor_interface import ( ForceTorqueOnSingleArmWrist, ) from smc.robots.interfaces.mobile_base_interface import ( - MobileBaseInterface, get_mobile_base_model, ) from smc.robots.interfaces.whole_body_interface import SingleArmWholeBodyInterface from smc.robots.implementations.ur5e import get_model from argparse import Namespace -import time import numpy as np import pinocchio as pin -import hppfcl as fcl -class RobotManagerHeron( - ForceTorqueOnSingleArmWrist, SingleArmWholeBodyInterface, MobileBaseInterface +class AbstractHeronRobotManager( + ForceTorqueOnSingleArmWrist, SingleArmWholeBodyInterface ): - @property - def model(self): - return self._model - - @property - def data(self): - return self._data - - @property - def visual_model(self): - return self._visual_model - - @property - def collision_model(self): - return self._collision_model - def __init__(self, args): if args.debug_prints: print("RobotManagerHeron init") @@ -50,7 +31,9 @@ class RobotManagerHeron( super().__init__(args) -class SimulatedHeronRobotManager(AbstractSimulatedRobotManager, RobotManagerHeron): +class SimulatedHeronRobotManager( + AbstractSimulatedRobotManager, AbstractHeronRobotManager +): def __init__(self, args: Namespace): if args.debug_prints: print("SimulatedRobotManagerHeron init") @@ -72,7 +55,7 @@ class SimulatedHeronRobotManager(AbstractSimulatedRobotManager, RobotManagerHero self._wrench_bias = np.zeros(6) -class RealHeronRobotManager(RobotManagerHeron): +class RealHeronRobotManager(AbstractHeronRobotManager): def __init__(self, args): pass diff --git a/python/smc/robots/implementations/mir.py b/python/smc/robots/implementations/mir.py index 58944c1..acb22f5 100644 --- a/python/smc/robots/implementations/mir.py +++ b/python/smc/robots/implementations/mir.py @@ -1,12 +1,12 @@ -from smc.robots.robotmanager_abstract import AbstractRobotManager +from smc.robots.abstract_robotmanager import AbstractRobotManager -class MirRobotManager(AbstractRobotManager): +class AbstractMirRobotManager(AbstractRobotManager): pass -class RealMirRobotManager(MirRobotManager): - def sendVelocityCommand(self, v): +class RealMirRobotManager(AbstractMirRobotManager): + def sendVelocityCommandToReal(self, v): """ sendVelocityCommand ----- @@ -15,30 +15,26 @@ class RealMirRobotManager(MirRobotManager): here because this things depend on the arguments which are manager here (hence the class name RobotManager) """ - # y-direction is not possible on heron - qd[1] = 0 + # y-direction is not possible on mir + v[1] = 0 cmd_msg = msg.Twist() - cmd_msg.linear.x = qd[0] - cmd_msg.angular.z = qd[2] + cmd_msg.linear.x = v[0] + cmd_msg.angular.z = v[2] # print("about to publish", cmd_msg) self.publisher_vel_base.publish(cmd_msg) # good to keep because updating is slow otherwise # it's not correct, but it's more correct than not updating # self.q = pin.integrate(self.model, self.q, qd * self.dt) + raise NotImplementedError def stopRobot(self): - print("stopping via freedrive lel") - self._rtde_control.freedriveMode() - time.sleep(0.5) - self._rtde_control.endFreedriveMode() + raise NotImplementedError def setFreedrive(self): - self._rtde_control.freedriveMode() - raise NotImplementedError("freedrive function only written for ur5e") + raise NotImplementedError def unSetFreedrive(self): - self._rtde_control.endFreedriveMode() - raise NotImplementedError("freedrive function only written for ur5e") + raise NotImplementedError def mir_approximation(): diff --git a/python/smc/robots/implementations/mobile_yumi.py b/python/smc/robots/implementations/mobile_yumi.py index 40adca6..96f3499 100644 --- a/python/smc/robots/implementations/mobile_yumi.py +++ b/python/smc/robots/implementations/mobile_yumi.py @@ -1,25 +1,45 @@ -from smc.robots.robotmanager_abstract import AbstractRobotManager -from smc.robots.interfaces.dual_arm_interface import DualArmInterface +from smc.robots.abstract_simulated_robotmanager import AbstractSimulatedRobotManager +from smc.robots.interfaces.whole_body_interface import DualArmWholeBodyInterface from smc.robots.interfaces.mobile_base_interface import ( - MobileBaseInterface, get_mobile_base_model, ) from smc.robots.implementations.yumi import get_yumi_model import pinocchio as pin +from argparse import Namespace -class MobileYumiRobotManager(DualArmInterface): +class AbstractMobileYumiRobotManager(DualArmWholeBodyInterface): + def __init__(self, args): + if args.debug_prints: + print("MobileYuMiRobotManager init") self.args = args - self.model, self.collision_model, self.visual_model, self.data = ( - get_yumi_model() + self._model, self._collision_model, self._visual_model, self._data = ( + get_mobile_yumi_model() ) - self.r_ee_frame_id = self.model.getFrameId("robr_joint_7") - self.l_ee_frame_id = self.model.getFrameId("robl_joint_7") + + self._l_ee_frame_id = self.model.getFrameId("robl_tool0") + self._r_ee_frame_id = self.model.getFrameId("robr_tool0") + self._MAX_ACCELERATION = 1.7 # const + self._MAX_QD = 3.14 # const + + self._mode: DualArmWholeBodyInterface.control_mode = ( + DualArmWholeBodyInterface.control_mode.whole_body + ) + super().__init__(args) + + +class SimulatedMobileYuMiRobotManager( + AbstractSimulatedRobotManager, AbstractMobileYuMiRobotManager +): + def __init__(self, args: Namespace): + if args.debug_prints: + print("SimulatedMobileYuMiRobotManager init") + super().__init__(args) -class RealMobileYumiRobotManager(MobileYumiRobotManager): +class RealMobileYumiRobotManager(AbstractMobileYumiRobotManager): def __init__(self, args): super().__init__(args) diff --git a/python/smc/robots/implementations/ur5e.py b/python/smc/robots/implementations/ur5e.py index 7a3ec98..ea619bd 100644 --- a/python/smc/robots/implementations/ur5e.py +++ b/python/smc/robots/implementations/ur5e.py @@ -3,8 +3,8 @@ from smc.robots.interfaces.force_torque_sensor_interface import ( ) from smc.robots.grippers.robotiq.robotiq_gripper import RobotiqGripper from smc.robots.grippers.on_robot.twofg import TwoFG -from smc.robots.robotmanager_abstract import AbstractRealRobotManager -from smc.robots.abstract_simulated_robot import AbstractSimulatedRobotManager +from smc.robots.abstract_robotmanager import AbstractRealRobotManager +from smc.robots.abstract_simulated_robotmanager import AbstractSimulatedRobotManager import numpy as np import pinocchio as pin @@ -21,24 +21,7 @@ if find_spec("rtde_control"): # NOTE: this one assumes a jaw gripper -class RobotManagerUR5e(ForceTorqueOnSingleArmWrist): - - @property - def model(self): - return self._model - - @property - def data(self): - return self._data - - @property - def visual_model(self): - return self._visual_model - - @property - def collision_model(self): - return self._collision_model - +class AbstractUR5eRobotManager(ForceTorqueOnSingleArmWrist): def __init__(self, args): if args.debug_prints: print("RobotManagerUR5e init") @@ -49,7 +32,9 @@ class RobotManagerUR5e(ForceTorqueOnSingleArmWrist): super().__init__(args) -class SimulatedUR5eRobotManager(AbstractSimulatedRobotManager, RobotManagerUR5e): +class SimulatedUR5eRobotManager( + AbstractSimulatedRobotManager, AbstractUR5eRobotManager +): def __init__(self, args): if args.debug_prints: print("SimulatedRobotManagerUR5e init") @@ -71,7 +56,7 @@ class SimulatedUR5eRobotManager(AbstractSimulatedRobotManager, RobotManagerUR5e) self._wrench_bias = np.zeros(6) -class RealUR5eRobotManager(RobotManagerUR5e, AbstractRealRobotManager): +class RealUR5eRobotManager(AbstractUR5eRobotManager, AbstractRealRobotManager): def __init__(self, args: argparse.Namespace): if args.debug_prints: print("RealUR5eRobotManager init") @@ -185,7 +170,7 @@ class RealUR5eRobotManager(RobotManagerUR5e, AbstractRealRobotManager): raise NotImplementedError("freedrive function only written for ur5e") -def get_model()-> tuple[pin.Model, pin.GeometryModel, pin.GeometryModel, pin.Data]: +def get_model() -> tuple[pin.Model, pin.GeometryModel, pin.GeometryModel, pin.Data]: urdf_path_relative = files("smc.robots.robot_descriptions.urdf").joinpath( "ur5e_with_robotiq_hande_FIXED_PATHS.urdf" diff --git a/python/smc/robots/implementations/yumi.py b/python/smc/robots/implementations/yumi.py index be195f2..82194c4 100644 --- a/python/smc/robots/implementations/yumi.py +++ b/python/smc/robots/implementations/yumi.py @@ -1,4 +1,5 @@ -from smc.robots.abstract_simulated_robot import AbstractSimulatedRobotManager +from smc.robots.abstract_robotmanager import AbstractRealRobotManager +from smc.robots.abstract_simulated_robotmanager import AbstractSimulatedRobotManager from smc.robots.interfaces.dual_arm_interface import DualArmInterface from argparse import Namespace @@ -8,26 +9,9 @@ import pinocchio as pin class AbstractYuMiRobotManager(DualArmInterface): - - @property - def model(self): - return self._model - - @property - def data(self): - return self._data - - @property - def visual_model(self): - return self._visual_model - - @property - def collision_model(self): - return self._collision_model - def __init__(self, args): if args.debug_prints: - print("RobotManagerYuMi init") + print("YuMiRobotManager init") self._model, self._collision_model, self._visual_model, self._data = ( get_yumi_model() ) @@ -36,7 +20,9 @@ class AbstractYuMiRobotManager(DualArmInterface): self._MAX_ACCELERATION = 1.7 # const self._MAX_QD = 3.14 # const - self._mode: DualArmInterface.control_mode = DualArmInterface.control_mode.both + self._mode: DualArmInterface.control_mode = ( + DualArmInterface.control_mode.both_arms + ) super().__init__(args) @@ -49,6 +35,10 @@ class SimulatedYuMiRobotManager( super().__init__(args) +class RealYuMiRobotManager(AbstractRealRobotManager, AbstractYuMiRobotManager): + pass + + def get_yumi_model() -> ( tuple[pin.Model, pin.GeometryModel, pin.GeometryModel, pin.Data] ): diff --git a/python/smc/robots/interfaces/dual_arm_interface.py b/python/smc/robots/interfaces/dual_arm_interface.py index 468ec8d..5bd709a 100644 --- a/python/smc/robots/interfaces/dual_arm_interface.py +++ b/python/smc/robots/interfaces/dual_arm_interface.py @@ -1,6 +1,5 @@ from smc.robots.interfaces.single_arm_interface import SingleArmInterface -import abc import numpy as np import pinocchio as pin from argparse import Namespace @@ -16,7 +15,9 @@ class DualArmInterface(SingleArmInterface): - r stands for right """ - control_mode = Enum("mode", [("both", 3), ("left", 4), ("right", 5)]) + control_mode = Enum( + "mode", [("both_arms", 3), ("left_arm_only", 4), ("right_arm_only", 5)] + ) # NOTE: you need to fill in the specific names from the urdf here for your robot # (better than putting in a magic number) @@ -46,41 +47,41 @@ class DualArmInterface(SingleArmInterface): @property def q(self) -> np.ndarray: # NOTE: left should be on the left side of the joint values array - if self._mode.value == 4: + if self._mode == self.control_mode.left_arm_only: return self._q[: self.model.nq // 2] - # NOTE: right should be on the left side of the joint values array - if self._mode.value == 5: + # NOTE: right should be on the right side of the joint values array + if self._mode == self.control_mode.right_arm_only: return self._q[self.model.nq // 2 :] return self._q.copy() @property def nq(self): - if self._mode.value == 4: + if self._mode == self.control_mode.left_arm_only: return self.model.nq // 2 - if self._mode.value == 5: + if self._mode == self.control_mode.right_arm_only: return self.model.nq // 2 return self.model.nq @property def v(self) -> np.ndarray: - if self._mode.value == 4: + if self._mode == self.control_mode.left_arm_only: return self._v[: self.model.nv // 2] - if self._mode.value == 5: - return self._v[: self.model.nv // 2] + if self._mode == self.control_mode.right_arm_only: + return self._v[self.model.nv // 2 :] return self._v.copy() @property def nv(self): - if self._mode.value == 4: + if self._mode == self.control_mode.left_arm_only: return self.model.nv // 2 - if self._mode.value == 5: + if self._mode == self.control_mode.right_arm_only: return self.model.nv // 2 return self.model.nv @@ -170,7 +171,7 @@ class DualArmInterface(SingleArmInterface): # TODO: put back in when python3.12 will be safe to use # @override - def forwardKinematics(self): + def forwardKinematics(self) -> None: pin.forwardKinematics( self.model, self.data, @@ -189,16 +190,40 @@ class DualArmInterface(SingleArmInterface): # the other arm filled with zeros J_left = pin.computeFrameJacobian( self.model, self.data, self._q, self._l_ee_frame_id - )[:, : self.model.nq // 2] - if self._mode.value == 4: + )[:, : self.model.nv // 2] + if self._mode == self.control_mode.left_arm_only: return J_left J_right = pin.computeFrameJacobian( self.model, self.data, self._q, self._r_ee_frame_id - )[:, self.model.nq // 2 :] - if self._mode.value == 5: + )[:, self.model.nv // 2 :] + if self._mode == self.control_mode.right_arm_only: return J_right - J = np.zeros((12, self.nq)) - J[:6, : self.model.nq // 2] = J_left - J[6:, self.model.nq // 2 :] = J_right + + J = np.zeros((12, self.nv)) + J[:6, : self.model.nv // 2] = J_left + J[6:, self.model.nv // 2 :] = J_right return J + + # TODO: put back in when python3.12 will be safe to use + # @override + def sendVelocityCommand(self, v) -> None: + """ + sendVelocityCommand + ------------------- + 1) saturate the command to comply with hardware limits or smaller limits you've set + 2) send it via the particular robot's particular communication interface + """ + assert type(v) == np.ndarray + + if self._mode == self.control_mode.left_arm_only: + v = np.zeros(self.model.nv) + v[3 : self.model.nv // 2] = v + + if self._mode == self.control_mode.right_arm_only: + v = np.zeros(self.model.nv) + v[self.model.nv // 2 :] = v + + assert len(v) == self.model.nv + v = np.clip(v, -1 * self._max_v, self._max_v) + self.sendVelocityCommandToReal(v) diff --git a/python/smc/robots/interfaces/mobile_base_interface.py b/python/smc/robots/interfaces/mobile_base_interface.py index 1ad00cb..56bd0dc 100644 --- a/python/smc/robots/interfaces/mobile_base_interface.py +++ b/python/smc/robots/interfaces/mobile_base_interface.py @@ -1,4 +1,4 @@ -from smc.robots.robotmanager_abstract import AbstractRobotManager +from smc.robots.abstract_robotmanager import AbstractRobotManager import numpy as np import pinocchio as pin diff --git a/python/smc/robots/interfaces/single_arm_interface.py b/python/smc/robots/interfaces/single_arm_interface.py index ce49432..4f03771 100644 --- a/python/smc/robots/interfaces/single_arm_interface.py +++ b/python/smc/robots/interfaces/single_arm_interface.py @@ -1,6 +1,6 @@ import numpy as np import pinocchio as pin -from smc.robots.robotmanager_abstract import AbstractRobotManager +from smc.robots.abstract_robotmanager import AbstractRobotManager class SingleArmInterface(AbstractRobotManager): diff --git a/python/smc/robots/interfaces/whole_body_interface.py b/python/smc/robots/interfaces/whole_body_interface.py index e88c1d9..4179f0e 100644 --- a/python/smc/robots/interfaces/whole_body_interface.py +++ b/python/smc/robots/interfaces/whole_body_interface.py @@ -44,46 +44,40 @@ class SingleArmWholeBodyInterface(SingleArmInterface, MobileBaseInterface): # @override @property def q(self) -> np.ndarray: - # 3if self._mode.value == 1: - # 3 return self._q.copy() - - if self._mode.value == 2: + if self._mode == self.control_mode.base_only: return self._q[:4] - if self._mode.value == 3: + if self._mode == self.control_mode.arm_only: return self._q[4:] return self._q.copy() @property def nq(self): - if self._mode.value == 2: + if self._mode == self.control_mode.base_only: return 4 - if self._mode.value == 3: + if self._mode == self.control_mode.arm_only: return self.model.nq - 4 return self.model.nq @property def v(self) -> np.ndarray: - # 3if self._mode.value == 1: - # 3 return self._q.copy() - - if self._mode.value == 2: + if self._mode == self.control_mode.base_only: return self._v[:3] - if self._mode.value == 3: + if self._mode == self.control_mode.arm_only: return self._v[3:] return self._v.copy() @property def nv(self): - if self._mode.value == 2: + if self._mode == self.control_mode.base_only: return 3 - if self._mode.value == 3: + if self._mode == self.control_mode.arm_only: return self.model.nv - 3 return self.model.nv @@ -107,13 +101,10 @@ class SingleArmWholeBodyInterface(SingleArmInterface, MobileBaseInterface): J_full = pin.computeFrameJacobian( self.model, self.data, self._q, self._ee_frame_id ) - # if self._mode.value == 1: - # return J_full - - if self._mode.value == 2: + if self._mode == self.control_mode.base_only: return J_full[:, :3] - if self._mode.value == 3: + if self._mode == self.control_mode.arm_only: return J_full[:, 3:] return J_full @@ -127,16 +118,176 @@ class SingleArmWholeBodyInterface(SingleArmInterface, MobileBaseInterface): 1) saturate the command to comply with hardware limits or smaller limits you've set 2) send it via the particular robot's particular communication interface """ - if self._mode.value == 2: + assert type(v) == np.ndarray + + if self._mode == self.control_mode.base_only: v = np.hstack((v, np.zeros(self.model.nv - 3))) - if self._mode.value == 3: + if self._mode == self.control_mode.arm_only: v = np.hstack((np.zeros(3), v)) - assert type(v) == np.ndarray + assert len(v) == self.model.nv v = np.clip(v, -1 * self._max_v, self._max_v) self.sendVelocityCommandToReal(v) class DualArmWholeBodyInterface(DualArmInterface, MobileBaseInterface): - pass + control_mode = Enum( + "mode", + [ + ("whole_body", 1), + ("base_only", 2), + ("both_arms_only", 3), + ("left_arm_only", 4), + ("right_arm_only", 5), + ], + ) + + def __init__(self, args: Namespace): + if args.debug_prints: + print("DualArmWholeBodyInterface init") + self._mode: DualArmWholeBodyInterface.control_mode + super().__init__(args) + + # TODO: put back in when python3.12 will be safe to use + # @override + @property + def q(self) -> np.ndarray: + if self._mode == self.control_mode.base_only: + return self._q[:4] + + if self._mode == self.control_mode.both_arms_only: + return self._q[4:] + + # NOTE: left should be on the left side of the joint values array + if self._mode == self.control_mode.left_arm_only: + return self._q[4 : (self.model.nq - 4) // 2] + + # NOTE: right should be on the right side of the joint values array + if self._mode == self.control_mode.right_arm_only: + return self._q[4 + (self.model.nq - 4) // 2 :] + + return self._q.copy() + + @property + def nq(self): + if self._mode == self.control_mode.base_only: + return 4 + + if self._mode == self.control_mode.both_arms_only: + return self.model.nq - 4 + + if self._mode == self.control_mode.left_arm_only: + return (self.model.nq - 4) // 2 + + if self._mode == self.control_mode.right_arm_only: + return (self.model.nq - 4) // 2 + return self.model.nq + + @property + def v(self) -> np.ndarray: + if self._mode == self.control_mode.base_only: + return self._v[:3] + + if self._mode == self.control_mode.both_arms_only: + return self._v[3:] + + if self._mode == self.control_mode.left_arm_only: + return self._v[3 : (self.model.nv - 3) // 2] + + if self._mode == self.control_mode.right_arm_only: + return self._v[3 + (self.model.nv - 3) // 2 :] + + return self._v.copy() + + @property + def nv(self) -> int: + if self._mode == self.control_mode.base_only: + return 3 + + if self._mode == self.control_mode.both_arms_only: + return self.model.nv - 3 + + if self._mode == self.control_mode.left_arm_only: + return (self.model.nv - 3) // 2 + + if self._mode == self.control_mode.right_arm_only: + return (self.model.nv - 3) // 2 + + return self.model.nv + + # TODO: put back in when python3.12 will be safe to use + # @override + def forwardKinematics(self) -> None: + pin.forwardKinematics( + self.model, + self.data, + self._q, + ) + pin.updateFramePlacement(self.model, self.data, self._base_frame_id) + self._T_w_b = self.data.oMf[self._base_frame_id].copy() + pin.updateFramePlacement(self.model, self.data, self._l_ee_frame_id) + pin.updateFramePlacement(self.model, self.data, self._r_ee_frame_id) + self._T_w_l = self.data.oMf[self._l_ee_frame_id].copy() + self._T_w_r = self.data.oMf[self._r_ee_frame_id].copy() + self.T_w_abs # will update _T_w_abs in the process + + # TODO: put back in when python3.12 will be safe to use + # @override + def getJacobian(self) -> np.ndarray: + J_left = pin.computeFrameJacobian( + self.model, self.data, self._q, self._l_ee_frame_id + )[:, 3 : (self.model.nv - 3) // 2] + if self._mode == self.control_mode.left_arm_only: + return J_left + + # the base jacobian can be extracted from either left or right frame - + # since it's a body jacobian both have to be the same at the base. + # for efficiency of course it would be best to construct it in place, + # but who cares if it runs on time either way. + if self._mode == self.control_mode.base_only: + return J_left[:, :3] + + J_right = pin.computeFrameJacobian( + self.model, self.data, self._q, self._r_ee_frame_id + )[:, 3 + (self.model.nv - 3) // 2 :] + if self._mode == self.control_mode.right_arm_only: + return J_right + + J_full = np.zeros((12, self.nv)) + J_full[:6, : (self.model.nv - 3) // 2] = J_left + J_full[6:, (self.model.nv - 3) // 2 :] = J_right + + if self._mode == self.control_mode.both_arms_only: + return J_full[:, 3:] + + return J_full + + # TODO: put back in when python3.12 will be safe to use + # @override + def sendVelocityCommand(self, v) -> None: + """ + sendVelocityCommand + ------------------- + 1) saturate the command to comply with hardware limits or smaller limits you've set + 2) send it via the particular robot's particular communication interface + """ + assert type(v) == np.ndarray + if self._mode == self.control_mode.base_only: + v = np.hstack((v, np.zeros(self.model.nv - 3))) + + if self._mode == self.control_mode.both_arms_only: + v = np.hstack((np.zeros(3), v)) + + if self._mode == self.control_mode.left_arm_only: + v = np.zeros(self.model.nv) + v[3 : (self.model.nv - 3) // 2] = v + + if self._mode == self.control_mode.right_arm_only: + v = np.zeros(self.model.nv) + v[3 + (self.model.nv - 3) // 2 :] = v + + assert len(v) == self.model.nv + v = np.clip(v, -1 * self._max_v, self._max_v) + + self.sendVelocityCommandToReal(v) diff --git a/python/smc/robots/utils.py b/python/smc/robots/utils.py index f55a561..066d96e 100644 --- a/python/smc/robots/utils.py +++ b/python/smc/robots/utils.py @@ -267,6 +267,13 @@ def getRobotFromArgs(args: argparse.Namespace) -> AbstractRobotManager: # return RealYuMiRobotManager(args) else: return SimulatedYuMiRobotManager(args) + if args.robot == "myumi": + if args.real: + pass + # TODO: finish it + # return RealMobileYuMiRobotManager(args) + else: + return SimulatedMobileYuMiRobotManager(args) raise NotImplementedError( f"robot {args.robot} is not supported! run the script you ran with --help to see what's available" ) -- GitLab