From fb4ad5affebf503d84ed5763419fccdbcc27dee9 Mon Sep 17 00:00:00 2001 From: m-guberina <gubi.guberina@gmail.com> Date: Tue, 11 Mar 2025 14:27:10 +0100 Subject: [PATCH] i am a moron with frames. there goes 4 hours on me being a moron. but its good now --- examples/ros/dummy_cmds_pub.py | 184 ------------ examples/ros/myumi_cart_pulling.py | 119 ++++++++ examples/ros/navigation_example.py | 254 +---------------- .../ros/{ => old_and_irrelevant}/ros2_clik.py | 0 .../smc_heron_node.py} | 0 .../smc_yumi_challenge.py | 0 examples/ros/smc_yumi_node.py | 267 ++++++++++++++++++ .../cartesian_space_trajectory_following.py | 3 + .../smc/robots/implementations/mobile_yumi.py | 1 + .../meshcat_viewer_wrapper/visualizer.py | 2 +- 10 files changed, 393 insertions(+), 437 deletions(-) create mode 100644 examples/ros/myumi_cart_pulling.py rename examples/ros/{ => old_and_irrelevant}/ros2_clik.py (100%) rename examples/ros/{smc_node.py => old_and_irrelevant/smc_heron_node.py} (100%) rename examples/ros/{ => old_and_irrelevant}/smc_yumi_challenge.py (100%) create mode 100644 examples/ros/smc_yumi_node.py diff --git a/examples/ros/dummy_cmds_pub.py b/examples/ros/dummy_cmds_pub.py index 2a42f46..0a111d8 100755 --- a/examples/ros/dummy_cmds_pub.py +++ b/examples/ros/dummy_cmds_pub.py @@ -46,190 +46,6 @@ import pinocchio as pin import numpy as np -# you will need to manually set everything here :( -def get_args(): - parser = getMinimalArgParser() - parser.description = "Run closed loop inverse kinematics \ - of various kinds. Make sure you know what the goal is before you run!" - parser = getClikArgs(parser) - parser = get_OCP_args(parser) - parser.add_argument( - "--ros-namespace", - type=str, - default="maskinn", - help="you MUST put in ros namespace you're using", - ) - parser.add_argument( - "--unreal", - action=argparse.BooleanOptionalAction, - default=False, - help="publish or not", - ) - parser.add_argument( - "--sim", - type=float, - help="if in sim you need to set sim time, otherwise not", - default=3.0, - ) - # parser.add_argument('--ros-args', action='extend', nargs="+") - # parser.add_argument('-r', action='extend', nargs="+") - parser.add_argument("--ros-args", action="append", nargs="*") - parser.add_argument("-r", action="append", nargs="*") - parser.add_argument("-p", action="append", nargs="*") - args = parser.parse_args() - args.robot = "myumi" - args.plotter = False - # args.plotter = True - args.real = True - args.ctrl_freq = 50 - # NOTE: does not work due to ctrl-c being overriden - args.save_log = True - args.run_name = "pls" - args.index_runs = True - return args - - -class DummyNode(Node): - def __init__( - self, args, robot: RealMobileYumiRobotManager, loop_manager: ControlLoopManager - ): - super().__init__("dummy_cmds_pub_node") - self.set_parameters( - [rclpy.parameter.Parameter("use_sim_time", rclpy.Parameter.Type.BOOL, True)] - ) - self.wait_for_sim_time() - self.robot = robot - self.loop_manager = loop_manager - self.args = args - # give me the latest thing even if there wasn't an update - qos_prof = rclpy.qos.QoSProfile( - reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, - durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, - history=rclpy.qos.HistoryPolicy.KEEP_LAST, - depth=1, - ) - - self._cb = ReentrantCallbackGroup() - - self._ns = args.ros_namespace - # self._ns = get_rosified_name(self.get_namespace()) - - self.get_logger().info(f"### Starting dummy example under namespace {self._ns}") - - self.empty_msg = JointState() - for i in range(29): - self.empty_msg.velocity.append(0.0) - - # self._T = 1.0 - # self._A = 0.2 - # self._t = 0.0 - self._dt = 1 / self.args.ctrl_freq - - self._pub_timer = self.create_timer(self._dt, self.send_cmd) - - ######################################################## - # connect to smc - ########################################################### - - # self.sub_amcl = self.create_subscription(PoseWithCovarianceStamped, '/amcl_pose', self.pose_callback, qos_prof) - - # this qos is incompatible - # self.sub_joint_states = self.create_subscription(JointState, f"{self._ns}/joint_states", self.callback_arms_state, qos_prof) - - qos_prof2 = rclpy.qos.QoSProfile( - # reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, - # durability = rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, - # history = rclpy.qos.HistoryPolicy.KEEP_LAST, - depth=1 - ) - self.sub_joint_states = self.create_subscription( - JointState, - f"{self._ns}/platform/joint_states", - self.callback_arms_state, - qos_prof2, - ) - self._cmd_pub = self.create_publisher( - JointState, f"{self._ns}/platform/joints_cmd", 1 - ) - self.robot.set_publisher_joints_cmd(self._cmd_pub) - self.sub_base_odom = self.create_subscription( - Odometry, f"{self._ns}/platform/odometry", self.callback_base_odom, 1 - ) - self.get_logger().info(f"{self._ns}/platform/joints_cmd") - self.get_logger().info(f"{self._ns}/platform/odometry") - self.get_logger().info(f"{self._ns}/platform/joint_states") - - ########## - # base is a twist which is constructed from the first 3 msg.velocities - # 0 -> twist.linear.x - # 1 -> twist.linear.y - # 2 -> twist.angular.z - # left arm indeces are 15-21 (last included) - # right arm indeces are 22-28 (last included) - - def send_cmd(self): - breakFlag = self.loop_manager.run_one_iter(self.loop_manager.current_iteration) - if breakFlag: - self.get_logger().info("control loop finished") - - msg = self.empty_msg - for i in range(3): - msg.velocity[i] = self.robot._v[i] - for i in range(15, 29): - msg.velocity[i] = self.robot._v[i - 12] - - # msg.header.stamp = Time().to_msg() - ## msg.header.stamp = self.get_clock().now().to_msg() - msg.header.stamp = self.get_clock().now().to_msg() - # msg.velocity[21] = 1.0 - # msg.velocity[28] = 1.0 - self._cmd_pub.publish(msg) - - # self.get_logger().info("SEND_JOINTS_CMD ON TIMER" + str(msg)) - # self.get_logger().info(str(Time())) - - def callback_base_odom(self, msg: Odometry): - # self.robot._v[0] = msg.twist.twist.linear.x - # self.robot._v[1] = msg.twist.twist.linear.y - ## TODO: check that z can be used as cos(theta) and w as sin(theta) - ## (they could be defined some other way or theta could be offset of something) - # self.robot._v[2] = msg.twist.twist.angular.z - # self.robot._v[3] = 0 # for consistency - costh2 = msg.pose.pose.orientation.w - sinth2 = np.linalg.norm( - [ - msg.pose.pose.orientation.x, - msg.pose.pose.orientation.y, - msg.pose.pose.orientation.z, - ] - ) - th = 2 * np.arctan2(sinth2, costh2) - if np.linalg.norm(self.init_odom) < 1e-4: - self.init_odom[0] = msg.pose.pose.position.x - self.init_odom[1] = msg.pose.pose.position.y - self.init_odom[2] = th - if self.args.unreal: - self.robot._q[0] = msg.pose.pose.position.x - self.init_odom[0] - self.robot._q[1] = msg.pose.pose.position.y - self.init_odom[1] - self.robot._q[2] = np.cos(th - self.init_odom[2]) - self.robot._q[3] = np.sin(th - self.init_odom[2]) - # self.get_logger().info("ODOM CALLBACK" + str(msg)) - - def callback_arms_state(self, msg: JointState): - if self.loop_manager.current_iteration < 200 or self.args.unreal: - self.robot._q[-14:-7] = msg.position[-14:-7] - self.robot._q[-7:] = msg.position[-7:] - # self.get_logger().info("JOINT_STATE CALLBACK" + str(msg)) - # self.get_logger().info(str(self.loop_manager.current_iteration)) - - def wait_for_sim_time(self): - """Wait for the /clock topic to start publishing.""" - self.get_logger().info("Waiting for simulated time to be active...") - while not self.get_clock().now().nanoseconds > 0: - rclpy.spin_once(self, timeout_sec=0.1) - self.get_logger().info("Simulated time is now active!") - - # def main(args=None): def main(args=None): # evil and makes args unusable but what can you do diff --git a/examples/ros/myumi_cart_pulling.py b/examples/ros/myumi_cart_pulling.py new file mode 100644 index 0000000..073bc7b --- /dev/null +++ b/examples/ros/myumi_cart_pulling.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +import rclpy +from rclpy.executors import MultiThreadedExecutor +from smc_yumi_node import get_args, SMCMobileYuMiNode + +import numpy as np +import argparse +from smc.robots.implementations.mobile_yumi import RealMobileYumiRobotManager +from smc import getMinimalArgParser +from smc.control.control_loop_manager import ControlLoopManager +from smc.control.cartesian_space import getClikArgs, getIKSolver +from smc.control.joint_space.joint_space_point_to_point import moveJP +from smc.control.cartesian_space.cartesian_space_point_to_point import ( + controlLoopClik, + moveL, + moveLDualArm, + moveL, +) +from smc.control.optimal_control.croco_point_to_point.mpc.base_reference_mpc import ( + CrocoBaseP2PMPC, +) +from smc.control.optimal_control.croco_path_following.mpc.base_reference_mpc import ( + CrocoBasePathFollowingMPC, +) +from smc.util.draw_path import drawPath +from smc.control.cartesian_space.cartesian_space_trajectory_following import ( + cartesianPathFollowingWithPlanner, +) + +import pinocchio as pin +import matplotlib +from functools import partial + +import numpy as np + + +def fixedPathPlanner( + path_parameter: list[int], path2D: np.ndarray, p_base: np.ndarray +) -> np.ndarray: + """ + fixedPathPlanner + ---------------- + we can assume robot is following this path and that it started the following controller at the first point. + to make the controller make sense (without a sense of timing), we need to find the closest point on the path, + and delete the already traversed portion of the path. + the assumption is that the controller constructs a trajectory from the path, starting with the first point. + + the cleanest way to get this done is associate robot's current position with the path parameter s, + but i'll just implement it as quickly as possible, not in the most correct or the most efficient manner. + + NOTE: path is (N,3) in shape, last column is 0, because subotimal code structure + """ + + p_base_3 = np.array([p_base[0], p_base[1], 0.0]) + distances = np.linalg.norm(p_base_3 - path2D, axis=1) + index = np.argmin(distances) + if (index - path_parameter[0]) > 0 and (index - path_parameter[0]) < 10: + path_parameter[0] += 1 + path2D = path2D[path_parameter[0] :] + return path2D + + +# def main(args=None): +def main(args=None): + # evil and makes args unusable but what can you do + args_smc = get_args() + assert args_smc.robot == "myumi" + robot = RealMobileYumiRobotManager(args_smc) + # you can't do terminal input with ros + # goal = robot.defineGoalPointCLI() + T_w_absgoal = pin.SE3.Identity() + T_w_absgoal.translation = np.array([0.5, 0.0, 0.5]) + T_absgoal_l = pin.SE3.Identity() + T_absgoal_l.translation = np.array([0.0, 0.2, 0.0]) + T_absgoal_r = pin.SE3.Identity() + T_absgoal_r.translation = np.array([0.0, -0.2, 0.0]) + # loop_manager = CrocoIKMPC(args, robot, goal, run=False) + # robot._mode = robot.control_mode.upper_body + # loop_manager = moveJP(robot._comfy_configuration, args_smc, robot, run=False) + # loop_manager = moveLDualArm( + # args_smc, robot, T_w_absgoal, T_absgoal_l, T_absgoal_r, run=False + # ) + # loop_manager = moveL(args_smc, robot, T_w_absgoal, run=False) + robot._mode = robot.control_mode.whole_body + robot.setInitialPose() + robot._step() + if not args_smc.draw_new: + pixel_path_file_path = "./parameters/path_in_pixels.csv" + path2D = np.genfromtxt(pixel_path_file_path, delimiter=",") + else: + matplotlib.use("tkagg") + path2D = drawPath(args_smc) + matplotlib.use("qtagg") + path2D[:, 0] = path2D[:, 0] * args_smc.map_width + path2D[:, 1] = path2D[:, 1] * args_smc.map_height + path2D = np.hstack((path2D, np.zeros((len(path2D), 1)))) + robot.updateViz({"fixed_path": path2D}) + path_planner = partial(fixedPathPlanner, [0], path2D) + + # x0 = np.concatenate([robot.q, robot.v]) + # loop_manager = CrocoBasePathFollowingMPC( + # args_smc, robot, x0, path_planner, run=False + # ) + robot._mode = robot.control_mode.base_only + loop_manager = cartesianPathFollowingWithPlanner( + args_smc, robot, path_planner, 0.0, run=False + ) + + rclpy.init(args=args) + + executor = MultiThreadedExecutor() + node = SMCMobileYuMiNode(args_smc, robot, loop_manager) + executor.add_node(node) + executor.spin() + + +if __name__ == "__main__": + main() diff --git a/examples/ros/navigation_example.py b/examples/ros/navigation_example.py index f1c225a..c7439db 100644 --- a/examples/ros/navigation_example.py +++ b/examples/ros/navigation_example.py @@ -1,27 +1,9 @@ #!/usr/bin/env python3 -# -# Copyright (c) 2024, ABB Schweiz AG -# All rights reserved. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR -# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND -# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY -# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import rclpy -from rclpy.node import Node from rclpy.executors import MultiThreadedExecutor -from geometry_msgs.msg import Twist, PoseWithCovarianceStamped -from sensor_msgs.msg import JointState -from nav_msgs.msg import Odometry -from abb_python_utilities.names import get_rosified_name -from rclpy.callback_groups import ReentrantCallbackGroup -from rclpy.time import Time +from smc_yumi_node import get_args, SMCMobileYuMiNode import numpy as np import argparse @@ -43,8 +25,6 @@ from smc.control.optimal_control.croco_path_following.mpc.base_reference_mpc imp CrocoBasePathFollowingMPC, ) from smc.util.draw_path import drawPath -from smc.control.optimal_control.util import get_OCP_args -from smc.robots.utils import getRobotFromArgs from smc.control.cartesian_space.cartesian_space_trajectory_following import ( cartesianPathFollowingWithPlanner, ) @@ -82,236 +62,6 @@ def fixedPathPlanner( return path2D -# you will need to manually set everything here :( -def get_args(): - parser = getMinimalArgParser() - parser.description = "Run closed loop inverse kinematics \ - of various kinds. Make sure you know what the goal is before you run!" - parser = getClikArgs(parser) - parser = get_OCP_args(parser) - parser.add_argument( - "--ros-namespace", - type=str, - default="maskinn", - help="you MUST put in ros namespace you're using", - ) - parser.add_argument( - "--unreal", - action=argparse.BooleanOptionalAction, - default=False, - help="publish or not", - ) - parser.add_argument( - "--draw-new", - action=argparse.BooleanOptionalAction, - help="are you drawing a new path or reusing the previous one", - default=False, - ) - parser.add_argument( - "--map-width", - type=float, - help="width of the map in meters (x-axis) - only used for drawing of the path", - default=3.0, - ) - parser.add_argument( - "--map-height", - type=float, - help="height of the map in meters (y-axis) - only used for drawing of the path", - default=3.0, - ) - parser.add_argument( - "--sim", - action=argparse.BooleanOptionalAction, - help="if in sim you need to set sim time, otherwise not", - default=True, - ) - # parser.add_argument('--ros-args', action='extend', nargs="+") - # parser.add_argument('-r', action='extend', nargs="+") - parser.add_argument("--ros-args", action="append", nargs="*") - parser.add_argument("-r", action="append", nargs="*") - parser.add_argument("-p", action="append", nargs="*") - args = parser.parse_args() - args.robot = "myumi" - args.plotter = False - # args.plotter = True - args.real = True - args.ctrl_freq = 50 - # NOTE: does not work due to ctrl-c being overriden - args.save_log = True - args.run_name = "nav_pls" - args.index_runs = True - return args - - -class DummyNode(Node): - def __init__( - self, args, robot: RealMobileYumiRobotManager, loop_manager: ControlLoopManager - ): - super().__init__("dummy_cmds_pub_node") - if args.sim: - self.set_parameters( - [ - rclpy.parameter.Parameter( - "use_sim_time", rclpy.Parameter.Type.BOOL, True - ) - ] - ) - self.wait_for_sim_time() - self.robot = robot - self.loop_manager = loop_manager - self.args = args - # give me the latest thing even if there wasn't an update - qos_prof = rclpy.qos.QoSProfile( - reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, - durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, - history=rclpy.qos.HistoryPolicy.KEEP_LAST, - depth=1, - ) - - self._cb = ReentrantCallbackGroup() - - self._ns = args.ros_namespace - # self._ns = get_rosified_name(self.get_namespace()) - - self.get_logger().info(f"### Starting dummy example under namespace {self._ns}") - - self.empty_msg = JointState() - for i in range(29): - self.empty_msg.velocity.append(0.0) - - # self._T = 1.0 - # self._A = 0.2 - # self._t = 0.0 - self._dt = 1 / self.args.ctrl_freq - - self._pub_timer = self.create_timer(self._dt, self.send_cmd) - - ######################################################## - # connect to smc - ########################################################### - - # self.sub_amcl = self.create_subscription(PoseWithCovarianceStamped, '/amcl_pose', self.pose_callback, qos_prof) - - # this qos is incompatible - # self.sub_joint_states = self.create_subscription(JointState, f"{self._ns}/joint_states", self.callback_arms_state, qos_prof) - - qos_prof2 = rclpy.qos.QoSProfile( - # reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, - # durability = rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, - # history = rclpy.qos.HistoryPolicy.KEEP_LAST, - depth=1 - ) - self.sub_joint_states = self.create_subscription( - JointState, - f"{self._ns}/platform/joint_states", - self.callback_arms_state, - qos_prof2, - ) - self._cmd_pub = self.create_publisher( - JointState, f"{self._ns}/platform/joints_cmd", 1 - ) - self.get_logger().info(f"{self._ns}/platform/joints_cmd") - # self.robot.set_publisher_joints_cmd(self._cmd_pub) - self.sub_base_odom = self.create_subscription( - Odometry, f"{self._ns}/platform/odometry", self.callback_base_odom, 1 - ) - self.init_odom = np.zeros(3) - - ########## - # base is a twist which is constructed from the first 3 msg.velocities - # 0 -> twist.linear.x - # 1 -> twist.linear.y - # 2 -> twist.angular.z - # left arm indeces are 15-21 (last included) - # right arm indeces are 22-28 (last included) - - def send_cmd(self): - # self.get_logger().info(str(self.robot._q)) - if np.linalg.norm(self.init_odom) < 1e-4: - return - else: - breakFlag = self.loop_manager.run_one_iter( - self.loop_manager.current_iteration - ) - # self.get_logger().info("TIMER_CMD") - - if self.args.unreal: - msg = self.empty_msg - msg.header.stamp = Time().to_msg() - - # TEST - # msg.velocity[2] = ( - # np.sin(self.loop_manager.current_iteration / (self.args.ctrl_freq * 3)) - # / 10 - # ) - # REAL - for i in range(3): - msg.velocity[i] = self.robot._v_cmd[i] - for i in range(15, 29): - msg.velocity[i] = self.robot._v_cmd[i - 12] - self.get_logger().info(str(msg)) - self.get_logger().info(str(self.robot._v_cmd)) - self._cmd_pub.publish(msg) - - def callback_base_odom(self, msg: Odometry): - # self.robot._v[0] = msg.twist.twist.linear.x - # self.robot._v[1] = msg.twist.twist.linear.y - ## TODO: check that z can be used as cos(theta) and w as sin(theta) - ## (they could be defined some other way or theta could be offset of something) - # self.robot._v[2] = msg.twist.twist.angular.z - # self.robot._v[3] = 0 # for consistency - costh2 = msg.pose.pose.orientation.w - sinth2 = np.linalg.norm( - [ - msg.pose.pose.orientation.x, - msg.pose.pose.orientation.y, - msg.pose.pose.orientation.z, - ] - ) - th = 2 * np.arctan2(sinth2, costh2) - if ( - np.linalg.norm(self.init_odom) < 1e-4 - ) and self.loop_manager.current_iteration < 10: - self.init_odom[0] = msg.pose.pose.position.x - self.init_odom[1] = msg.pose.pose.position.y - self.init_odom[2] = th - if self.args.unreal: - T_odom = np.zeros((3, 3)) - T_odom[0, 0] = np.cos(self.init_odom[2]) - T_odom[0, 1] = -1 * np.sin(self.init_odom[2]) - T_odom[0, 2] = self.init_odom[0] - T_odom[1, 0] = np.sin(self.init_odom[2]) - T_odom[1, 1] = np.cos(self.init_odom[2]) - T_odom[1, 2] = self.init_odom[1] - T_odom[2, 2] = 1.0 - p_odom = np.array([msg.pose.pose.position.x, msg.pose.pose.position.y, 1]) - T_inv_odom = np.zeros((3, 3)) - T_inv_odom[:2, :2] = T_odom[:2, :2].T - T_inv_odom[:2, 2] = (-1 * T_odom[:2, :2].T) @ T_odom[:2, 2] - T_inv_odom[2, 2] = 0.0 - p_ctrl = T_inv_odom @ p_odom - self.robot._q[0] = p_ctrl[0] - self.robot._q[1] = p_ctrl[1] - self.robot._q[2] = np.cos(th - self.init_odom[2]) - self.robot._q[3] = np.sin(th - self.init_odom[2]) - # self.get_logger().info("CALLBACK_ODOM") - self.get_logger().info(str(self.robot._q[:4])) - # self.get_logger().info(str(self.init_odom)) - - def callback_arms_state(self, msg: JointState): - if self.loop_manager.current_iteration < 200 or self.args.unreal: - self.robot._q[-14:-7] = msg.position[-14:-7] - self.robot._q[-7:] = msg.position[-7:] - # self.get_logger().info("CALLBACK_JOINT_STATE") - - def wait_for_sim_time(self): - """Wait for the /clock topic to start publishing.""" - self.get_logger().info("Waiting for simulated time to be active...") - while not self.get_clock().now().nanoseconds > 0: - rclpy.spin_once(self, timeout_sec=0.1) - self.get_logger().info("Simulated time is now active!") - - # def main(args=None): def main(args=None): # evil and makes args unusable but what can you do @@ -361,7 +111,7 @@ def main(args=None): rclpy.init(args=args) executor = MultiThreadedExecutor() - node = DummyNode(args_smc, robot, loop_manager) + node = SMCMobileYuMiNode(args_smc, robot, loop_manager) executor.add_node(node) executor.spin() diff --git a/examples/ros/ros2_clik.py b/examples/ros/old_and_irrelevant/ros2_clik.py similarity index 100% rename from examples/ros/ros2_clik.py rename to examples/ros/old_and_irrelevant/ros2_clik.py diff --git a/examples/ros/smc_node.py b/examples/ros/old_and_irrelevant/smc_heron_node.py similarity index 100% rename from examples/ros/smc_node.py rename to examples/ros/old_and_irrelevant/smc_heron_node.py diff --git a/examples/ros/smc_yumi_challenge.py b/examples/ros/old_and_irrelevant/smc_yumi_challenge.py similarity index 100% rename from examples/ros/smc_yumi_challenge.py rename to examples/ros/old_and_irrelevant/smc_yumi_challenge.py diff --git a/examples/ros/smc_yumi_node.py b/examples/ros/smc_yumi_node.py new file mode 100644 index 0000000..dfa1958 --- /dev/null +++ b/examples/ros/smc_yumi_node.py @@ -0,0 +1,267 @@ +from smc.robots.implementations.mobile_yumi import RealMobileYumiRobotManager +from smc.control.control_loop_manager import ControlLoopManager +from smc import getMinimalArgParser + +import rclpy +from rclpy.time import Time +from rclpy.node import Node +from rclpy.executors import MultiThreadedExecutor +from geometry_msgs.msg import Twist, PoseWithCovarianceStamped +from sensor_msgs.msg import JointState +from nav_msgs.msg import Odometry +from abb_python_utilities.names import get_rosified_name +from rclpy.callback_groups import ReentrantCallbackGroup +from smc.control.cartesian_space import getClikArgs +from smc.control.optimal_control.util import get_OCP_args + +import numpy as np +import argparse + + +def get_args(): + parser = getMinimalArgParser() + parser.description = "Run closed loop inverse kinematics \ + of various kinds. Make sure you know what the goal is before you run!" + parser = getClikArgs(parser) + parser = get_OCP_args(parser) + parser.add_argument( + "--ros-namespace", + type=str, + default="maskinn", + help="you MUST put in ros namespace you're using", + ) + parser.add_argument( + "--unreal", + action=argparse.BooleanOptionalAction, + default=False, + help="publish or not", + ) + parser.add_argument( + "--draw-new", + action=argparse.BooleanOptionalAction, + help="are you drawing a new path or reusing the previous one", + default=False, + ) + parser.add_argument( + "--map-width", + type=float, + help="width of the map in meters (x-axis) - only used for drawing of the path", + default=3.0, + ) + parser.add_argument( + "--map-height", + type=float, + help="height of the map in meters (y-axis) - only used for drawing of the path", + default=3.0, + ) + parser.add_argument( + "--sim", + action=argparse.BooleanOptionalAction, + help="if in sim you need to set sim time, otherwise not", + default=True, + ) + # parser.add_argument('--ros-args', action='extend', nargs="+") + # parser.add_argument('-r', action='extend', nargs="+") + parser.add_argument("--ros-args", action="append", nargs="*") + parser.add_argument("-r", action="append", nargs="*") + parser.add_argument("-p", action="append", nargs="*") + args = parser.parse_args() + args.robot = "myumi" + args.plotter = False + # args.plotter = True + args.real = True + args.ctrl_freq = 50 + # NOTE: does not work due to ctrl-c being overriden + args.save_log = True + args.run_name = "nav_pls" + args.index_runs = True + return args + + +class SMCMobileYuMiNode(Node): + def __init__( + self, args, robot: RealMobileYumiRobotManager, loop_manager: ControlLoopManager + ): + super().__init__("SMCMobileYuMiNode") + if args.sim: + self.set_parameters( + [ + rclpy.parameter.Parameter( + "use_sim_time", rclpy.Parameter.Type.BOOL, True + ) + ] + ) + self.wait_for_sim_time() + self.robot = robot + self.loop_manager = loop_manager + self.args = args + # give me the latest thing even if there wasn't an update + # qos_prof = rclpy.qos.QoSProfile( + # reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, + # durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, + # history=rclpy.qos.HistoryPolicy.KEEP_LAST, + # depth=1, + # ) + + self._cb = ReentrantCallbackGroup() + + self._ns = args.ros_namespace + # self._ns = get_rosified_name(self.get_namespace()) + + self.get_logger().info( + f"### Starting smc mobile yumi node example under namespace {self._ns}" + ) + + self.empty_msg = JointState() + for _ in range(29): + self.empty_msg.velocity.append(0.0) + + self._dt = 1 / self.args.ctrl_freq + + self._pub_timer = self.create_timer(self._dt, self.send_cmd) + self.current_iteration = 0 + + ######################################################## + # connect to smc + ########################################################### + + # self.sub_amcl = self.create_subscription(PoseWithCovarianceStamped, '/amcl_pose', self.pose_callback, qos_prof) + + # this qos is incompatible + # self.sub_joint_states = self.create_subscription(JointState, f"{self._ns}/joint_states", self.callback_arms_state, qos_prof) + + qos_prof2 = rclpy.qos.QoSProfile( + # reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, + # durability = rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, + # history = rclpy.qos.HistoryPolicy.KEEP_LAST, + depth=1 + ) + self.sub_joint_states = self.create_subscription( + JointState, + f"{self._ns}/platform/joint_states", + self.callback_arms_state, + qos_prof2, + ) + self._cmd_pub = self.create_publisher( + JointState, f"{self._ns}/platform/joints_cmd", 1 + ) + # self.get_logger().info(f"{self._ns}/platform/joints_cmd") + # self.robot.set_publisher_joints_cmd(self._cmd_pub) + self.sub_base_odom = self.create_subscription( + Odometry, f"{self._ns}/platform/odometry", self.callback_base_odom, 1 + ) + self.odom_initialized = False + self.init_odom = np.zeros(3) + + ########## + # base is a twist which is constructed from the first 3 msg.velocities + # 0 -> twist.linear.x + # 1 -> twist.linear.y + # 2 -> twist.angular.z + # left arm indeces are 15-21 (last included) + # right arm indeces are 22-28 (last included) + + def send_cmd(self): + # self.get_logger().info("TIMER_CMD") + self.current_iteration += 1 + breakFlag = self.loop_manager.run_one_iter(self.loop_manager.current_iteration) + + if not self.odom_initialized: + self.get_logger().info( + "odom not initialized, hence not publishing anything!" + ) + + # self.get_logger().info("current iteration: " + str(self.current_iteration)) + self.get_logger().info(str(self.robot._v_cmd)) + if self.args.unreal: + msg = self.empty_msg + msg.header.stamp = Time().to_msg() + + # TEST + # msg.velocity[0] = ( + # np.sin(self.loop_manager.current_iteration / (self.args.ctrl_freq * 2)) + # / 6 + # ) + # msg.velocity[1] = ( + # np.sin(self.loop_manager.current_iteration / (self.args.ctrl_freq * 2)) + # / 6 + # ) + # msg.velocity[2] = ( + # -1 + # * np.sin( + # self.loop_manager.current_iteration / (self.args.ctrl_freq * 2) + # ) + # / 6 + # ) + # REAL + for i in range(3): + msg.velocity[i] = self.robot._v_cmd[i] + for i in range(15, 29): + msg.velocity[i] = self.robot._v_cmd[i - 12] + # self.get_logger().info(str(self.robot._q)) + self._cmd_pub.publish(msg) + + def callback_base_odom(self, msg: Odometry): + # self.robot._v[0] = msg.twist.twist.linear.x + # self.robot._v[1] = msg.twist.twist.linear.y + ## TODO: check that z can be used as cos(theta) and w as sin(theta) + ## (they could be defined some other way or theta could be offset of something) + # self.robot._v[2] = msg.twist.twist.angular.z + # self.robot._v[3] = 0 # for consistency + costh2 = msg.pose.pose.orientation.w + sinth2 = np.linalg.norm( + [ + msg.pose.pose.orientation.x, + msg.pose.pose.orientation.y, + msg.pose.pose.orientation.z, + ] + ) + th = 2 * np.arctan2(sinth2, costh2) + if not self.odom_initialized: + self.init_odom[0] = msg.pose.pose.position.x + self.init_odom[1] = msg.pose.pose.position.y + self.init_odom[2] = th + self.odom_initialized = True + self.get_logger().info(str(self.init_odom)) + if self.args.unreal and self.odom_initialized: + T_odom = np.zeros((3, 3)) + T_odom[0, 0] = np.cos(self.init_odom[2]) + T_odom[0, 1] = -1 * np.sin(self.init_odom[2]) + T_odom[0, 2] = self.init_odom[0] + T_odom[1, 0] = np.sin(self.init_odom[2]) + T_odom[1, 1] = np.cos(self.init_odom[2]) + T_odom[1, 2] = self.init_odom[1] + T_odom[2, 2] = 1.0 + p_odom = np.array( + [ + msg.pose.pose.position.x - self.init_odom[0], + msg.pose.pose.position.y - self.init_odom[1], + ] + ) + # T_inv_odom = np.zeros((3, 3)) + # T_inv_odom[:2, :2] = T_odom[:2, :2].T + # T_inv_odom[:2, 2] = (-1 * T_odom[:2, :2].T) @ T_odom[:2, 2] + # T_inv_odom[2, 2] = 1.0 + # p_ctrl = T_inv_odom @ p_odom + # p_ctrl = (T_odom @ p_odom)[:2] + p_ctrl = T_odom[:2, :2] @ p_odom + self.robot._q[0] = p_ctrl[0] + self.robot._q[1] = p_ctrl[1] + self.robot._q[2] = np.cos(self.init_odom[2] - th) + self.robot._q[3] = np.sin(self.init_odom[2] - th) + # self.get_logger().info("CALLBACK_ODOM") + # self.get_logger().info(str(self.robot._q[:4])) + # self.get_logger().info(str(self.init_odom)) + + def callback_arms_state(self, msg: JointState): + if self.current_iteration < 200 or self.args.unreal: + self.robot._q[-14:-7] = msg.position[-14:-7] + self.robot._q[-7:] = msg.position[-7:] + # self.get_logger().info("CALLBACK_JOINT_STATE") + + def wait_for_sim_time(self): + """Wait for the /clock topic to start publishing.""" + self.get_logger().info("Waiting for simulated time to be active...") + while not self.get_clock().now().nanoseconds > 0: + rclpy.spin_once(self, timeout_sec=0.1) + self.get_logger().info("Simulated time is now active!") diff --git a/python/smc/control/cartesian_space/cartesian_space_trajectory_following.py b/python/smc/control/cartesian_space/cartesian_space_trajectory_following.py index 5b8b677..807d79e 100644 --- a/python/smc/control/cartesian_space/cartesian_space_trajectory_following.py +++ b/python/smc/control/cartesian_space/cartesian_space_trajectory_following.py @@ -69,6 +69,9 @@ def cartesianPathFollowingControlLoop( if t % int(np.ceil(args.ctrl_freq / 25)) == 0: robot.visualizer_manager.sendCommand({"frame_path": path[:20]}) + # v_cmd[0] = 1.0 + # v_cmd[1] = 1.0 + # v_cmd[2] = -1.0 return ( v_cmd, {}, diff --git a/python/smc/robots/implementations/mobile_yumi.py b/python/smc/robots/implementations/mobile_yumi.py index 602fdee..3ba4a66 100644 --- a/python/smc/robots/implementations/mobile_yumi.py +++ b/python/smc/robots/implementations/mobile_yumi.py @@ -147,6 +147,7 @@ class RealMobileYumiRobotManager(AbstractMobileYuMiRobotManager): else: self._v = v + self._v_cmd = v # NOTE: we update joint angles here, and _updateQ does nothing (there is no communication) self._q = pin.integrate(self.model, self._q, v * self._dt) diff --git a/python/smc/visualization/meshcat_viewer_wrapper/visualizer.py b/python/smc/visualization/meshcat_viewer_wrapper/visualizer.py index b947cba..5d9112b 100644 --- a/python/smc/visualization/meshcat_viewer_wrapper/visualizer.py +++ b/python/smc/visualization/meshcat_viewer_wrapper/visualizer.py @@ -142,7 +142,7 @@ class MeshcatVisualizer(PMV): self, name, path: list[pin.SE3], radius=5e-3, color=[1, 0, 0, 0.8] ): self.n_frame_path_points = len(path) if len(path) < 5 else 5 - index_step = len(path) // 5 + index_step = len(path) // 5 if len(path) > 5 else 1 for i, point in enumerate(path): if (i % index_step != 0) and name != "fixed_frame_path": continue -- GitLab