From 396e376d7edf491b9f431a807ef73881c5f0caa1 Mon Sep 17 00:00:00 2001 From: m-guberina <gubi.guberina@gmail.com> Date: Mon, 10 Mar 2025 16:39:18 +0100 Subject: [PATCH] first day in the lab - robot moves - control is horrible. rest of the day is for hacking whatever remotely works, tomorrow is about running it and live-patching. it's doable --- examples/ros/dummy_cmds_pub.py | 154 ++++++++------ examples/ros/navigation_example.py | 310 +++++++++++++++++++++++++++++ examples/ros/stop_robot.py | 70 +++++++ 3 files changed, 474 insertions(+), 60 deletions(-) create mode 100644 examples/ros/navigation_example.py create mode 100644 examples/ros/stop_robot.py diff --git a/examples/ros/dummy_cmds_pub.py b/examples/ros/dummy_cmds_pub.py index 0c4653e..cd73370 100755 --- a/examples/ros/dummy_cmds_pub.py +++ b/examples/ros/dummy_cmds_pub.py @@ -24,20 +24,19 @@ from rclpy.callback_groups import ReentrantCallbackGroup import numpy as np import argparse -from smc.robots.interfaces.whole_body_dual_arm_interface import ( - DualArmWholeBodyInterface, -) +from smc.robots.implementations.mobile_yumi import RealMobileYumiRobotManager from smc import getMinimalArgParser from smc.control.control_loop_manager import ControlLoopManager -from smc.control.cartesian_space import getClikArgs +from smc.control.cartesian_space import getClikArgs, getIKSolver +from smc.control.joint_space.joint_space_point_to_point import moveJP from smc.control.cartesian_space.cartesian_space_point_to_point import ( - getClikController, controlLoopClik, moveL, - compliantMoveL, - controlLoopCompliantClik, - invKinmQP, moveLDualArm, + moveL, +) +from smc.control.optimal_control.croco_point_to_point.mpc.base_reference_mpc import ( + CrocoBaseP2PMPC, ) from smc.control.optimal_control.util import get_OCP_args from smc.robots.utils import getRobotFromArgs @@ -59,6 +58,12 @@ def get_args(): default="maskinn", help="you MUST put in ros namespace you're using", ) + parser.add_argument( + "--unreal", + action=argparse.BooleanOptionalAction, + default=False, + help="publish or not", + ) # parser.add_argument('--ros-args', action='extend', nargs="+") # parser.add_argument('-r', action='extend', nargs="+") parser.add_argument("--ros-args", action="append", nargs="*") @@ -66,23 +71,37 @@ def get_args(): parser.add_argument("-p", action="append", nargs="*") args = parser.parse_args() args.robot = "myumi" - args.save_log = True - args.plotter = True + args.plotter = False + # args.plotter = True args.real = True + args.ctrl_freq = 50 + # NOTE: does not work due to ctrl-c being overriden + args.save_log = True + args.run_name = "pls" + args.index_runs = True return args class DummyNode(Node): def __init__( - self, args, robot: DualArmWholeBodyInterface, loop_manager: ControlLoopManager + self, args, robot: RealMobileYumiRobotManager, loop_manager: ControlLoopManager ): super().__init__("dummy_cmds_pub_node") + self.robot = robot + self.loop_manager = loop_manager + self.args = args + # give me the latest thing even if there wasn't an update + qos_prof = rclpy.qos.QoSProfile( + reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, + durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, + history=rclpy.qos.HistoryPolicy.KEEP_LAST, + depth=1, + ) self._cb = ReentrantCallbackGroup() - self._ns = get_rosified_name(self.get_namespace()) - - self._cmd_pub = self.create_publisher(JointState, f"{self._ns}/joints_cmd", 1) + self._ns = args.ros_namespace + # self._ns = get_rosified_name(self.get_namespace()) self.get_logger().info(f"### Starting dummy example under namespace {self._ns}") @@ -90,10 +109,10 @@ class DummyNode(Node): for i in range(29): self.empty_msg.velocity.append(0.0) - self._T = 1.0 - self._A = 0.2 - self._t = 0.0 - self._dt = 0.004 + # self._T = 1.0 + # self._A = 0.2 + # self._t = 0.0 + self._dt = 1 / self.args.ctrl_freq self._pub_timer = self.create_timer(self._dt, self.send_cmd) @@ -101,19 +120,6 @@ class DummyNode(Node): # connect to smc ########################################################### - robot.set_publisher_joints_cmd(self._cmd_pub) - self.robot = robot - self.loop_manager = loop_manager - self.args = args - # give me the latest thing even if there wasn't an update - qos_prof = rclpy.qos.QoSProfile( - reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, - durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, - history=rclpy.qos.HistoryPolicy.KEEP_LAST, - depth=1, - ) - - # self.sub_base_odom = self.create_subscription(Odometry, f"{self._ns}platform/odometry", self.callback_base_odom, qos_prof) # self.sub_amcl = self.create_subscription(PoseWithCovarianceStamped, '/amcl_pose', self.pose_callback, qos_prof) # this qos is incompatible @@ -126,7 +132,18 @@ class DummyNode(Node): depth=1 ) self.sub_joint_states = self.create_subscription( - JointState, f"{self._ns}/joint_states", self.callback_arms_state, qos_prof2 + JointState, + f"{self._ns}/platform/joint_states", + self.callback_arms_state, + qos_prof2, + ) + self._cmd_pub = self.create_publisher( + JointState, f"{self._ns}/platform/joints_cmd", 1 + ) + self.robot.set_publisher_joints_cmd(self._cmd_pub) + self._cmd_pub = self.create_publisher(JointState, f"{self._ns}joints_cmd", 1) + self.sub_base_odom = self.create_subscription( + Odometry, f"{self._ns}/platform/odometry", self.callback_base_odom, 1 ) ########## @@ -138,32 +155,44 @@ class DummyNode(Node): # right arm indeces are 22-28 (last included) def send_cmd(self): - breakFlag = self.loop_manager.run_one_iter(0) + breakFlag = self.loop_manager.run_one_iter(self.loop_manager.current_iteration) + # msg = self.empty_msg - # msg.header.stamp = self.get_clock().now().to_msg() - # msg.velocity[0] = self._A * np.sin(2 * np.pi / self._T * self._t) - # msg.velocity[1] = self._A * np.sin(2 * np.pi / self._T * self._t) - # msg.velocity[2] = self._A * np.sin(2 * np.pi / self._T * self._t) + # for i in range(3): + # msg.velocity[i] = self.robot._v[i] + # for i in range(15, 29): + # msg.velocity[i] = self.robot._v[i - 12] + # self.get_logger().info(str(self.robot._v)) # self._cmd_pub.publish(msg) - # self._t += self._dt def callback_base_odom(self, msg: Odometry): - self.robot._v[0] = msg.twist.twist.linear.x - self.robot._v[1] = msg.twist.twist.linear.y - # TODO: check that z can be used as cos(theta) and w as sin(theta) - # (they could be defined some other way or theta could be offset of something) - self.robot._v[2] = msg.twist.twist.angular.z - self.robot._v[3] = 0 # for consistency - print("received /odom") + # self.robot._v[0] = msg.twist.twist.linear.x + # self.robot._v[1] = msg.twist.twist.linear.y + ## TODO: check that z can be used as cos(theta) and w as sin(theta) + ## (they could be defined some other way or theta could be offset of something) + # self.robot._v[2] = msg.twist.twist.angular.z + # self.robot._v[3] = 0 # for consistency + costh2 = msg.pose.pose.orientation.w + sinth2 = np.linalg.norm( + [ + msg.pose.pose.orientation.x, + msg.pose.pose.orientation.y, + msg.pose.pose.orientation.z, + ] + ) + th = 2 * np.arctan2(sinth2, costh2) + if self.args.unreal: + self.robot._q[0] = msg.pose.pose.position.x + self.robot._q[1] = msg.pose.pose.position.y + self.robot._q[2] = np.cos(th) + self.robot._q[3] = np.sin(th) def callback_arms_state(self, msg: JointState): - self.robot._q[0] = 0.0 - self.robot._q[1] = 0.0 - self.robot._q[2] = 1.0 - self.robot._q[3] = 0.0 - self.robot._q[-14:-7] = msg.position[-14:-7] - self.robot._q[-7:] = msg.position[-7:] - self.get_logger().info("callback arms state") + if self.loop_manager.current_iteration < 200 or self.args.unreal: + self.robot._q[-14:-7] = msg.position[-14:-7] + self.robot._q[-7:] = msg.position[-7:] + # self.get_logger().info(str(msg)) + # self.get_logger().info(str(self.loop_manager.current_iteration)) # def main(args=None): @@ -171,24 +200,29 @@ def main(args=None): # evil and makes args unusable but what can you do args_smc = get_args() assert args_smc.robot == "myumi" - robot = getRobotFromArgs(args_smc) + robot = RealMobileYumiRobotManager(args_smc) # you can't do terminal input with ros # goal = robot.defineGoalPointCLI() T_w_absgoal = pin.SE3.Identity() - T_w_absgoal.translation = np.array([0.3, 0.0, 0.5]) + T_w_absgoal.translation = np.array([0.5, 0.0, 0.5]) T_absgoal_l = pin.SE3.Identity() - T_absgoal_l.translation = np.array([-0.15, 0.0, 0.0]) + T_absgoal_l.translation = np.array([0.0, 0.2, 0.0]) T_absgoal_r = pin.SE3.Identity() - T_absgoal_r.translation = np.array([0.15, 0.0, 0.0]) + T_absgoal_r.translation = np.array([0.0, -0.2, 0.0]) # loop_manager = CrocoIKMPC(args, robot, goal, run=False) - loop_manager = moveLDualArm( - args_smc, robot, T_w_absgoal, T_absgoal_l, T_absgoal_r, run=False - ) + # robot._mode = robot.control_mode.upper_body + # loop_manager = moveJP(robot._comfy_configuration, args_smc, robot, run=False) + # loop_manager = moveLDualArm( + # args_smc, robot, T_w_absgoal, T_absgoal_l, T_absgoal_r, run=False + # ) + # loop_manager = moveL(args_smc, robot, T_w_absgoal, run=False) + robot._mode = robot.control_mode.whole_body + loop_manager = CrocoBaseP2PMPC(args_smc, robot, np.zeros(3), run=False) rclpy.init(args=args) executor = MultiThreadedExecutor() - node = DummyNode(args, robot, loop_manager) + node = DummyNode(args_smc, robot, loop_manager) executor.add_node(node) executor.spin() diff --git a/examples/ros/navigation_example.py b/examples/ros/navigation_example.py new file mode 100644 index 0000000..52ae099 --- /dev/null +++ b/examples/ros/navigation_example.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2024, ABB Schweiz AG +# All rights reserved. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY +# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import rclpy +from rclpy.node import Node +from rclpy.executors import MultiThreadedExecutor +from geometry_msgs.msg import Twist, PoseWithCovarianceStamped +from sensor_msgs.msg import JointState +from nav_msgs.msg import Odometry +from abb_python_utilities.names import get_rosified_name +from rclpy.callback_groups import ReentrantCallbackGroup + +import numpy as np +import argparse +from smc.robots.implementations.mobile_yumi import RealMobileYumiRobotManager +from smc import getMinimalArgParser +from smc.control.control_loop_manager import ControlLoopManager +from smc.control.cartesian_space import getClikArgs, getIKSolver +from smc.control.joint_space.joint_space_point_to_point import moveJP +from smc.control.cartesian_space.cartesian_space_point_to_point import ( + controlLoopClik, + moveL, + moveLDualArm, + moveL, +) +from smc.control.optimal_control.croco_point_to_point.mpc.base_reference_mpc import ( + CrocoBaseP2PMPC, +) +from smc.control.optimal_control.croco_path_following.mpc.base_reference_mpc import ( + CrocoBasePathFollowingMPC, +) +from smc.util.draw_path import drawPath +from smc.control.optimal_control.util import get_OCP_args +from smc.robots.utils import getRobotFromArgs +import pinocchio as pin +import matplotlib +from functools import partial + +import numpy as np + + +def fixedPathPlanner( + path_parameter: list[int], path2D: np.ndarray, p_base: np.ndarray +) -> np.ndarray: + """ + fixedPathPlanner + ---------------- + we can assume robot is following this path and that it started the following controller at the first point. + to make the controller make sense (without a sense of timing), we need to find the closest point on the path, + and delete the already traversed portion of the path. + the assumption is that the controller constructs a trajectory from the path, starting with the first point. + + the cleanest way to get this done is associate robot's current position with the path parameter s, + but i'll just implement it as quickly as possible, not in the most correct or the most efficient manner. + + NOTE: path is (N,3) in shape, last column is 0, because subotimal code structure + """ + + p_base_3 = np.array([p_base[0], p_base[1], 0.0]) + distances = np.linalg.norm(p_base_3 - path2D, axis=1) + index = np.argmin(distances) + if (index - path_parameter[0]) > 0 and (index - path_parameter[0]) < 10: + path_parameter[0] += 1 + path2D = path2D[path_parameter[0] :] + return path2D + + +# you will need to manually set everything here :( +def get_args(): + parser = getMinimalArgParser() + parser.description = "Run closed loop inverse kinematics \ + of various kinds. Make sure you know what the goal is before you run!" + parser = getClikArgs(parser) + parser = get_OCP_args(parser) + parser.add_argument( + "--ros-namespace", + type=str, + default="maskinn", + help="you MUST put in ros namespace you're using", + ) + parser.add_argument( + "--unreal", + action=argparse.BooleanOptionalAction, + default=False, + help="publish or not", + ) + parser.add_argument( + "--draw-new", + action=argparse.BooleanOptionalAction, + help="are you drawing a new path or reusing the previous one", + default=False, + ) + parser.add_argument( + "--map-width", + type=float, + help="width of the map in meters (x-axis) - only used for drawing of the path", + default=3.0, + ) + parser.add_argument( + "--map-height", + type=float, + help="height of the map in meters (y-axis) - only used for drawing of the path", + default=3.0, + ) + # parser.add_argument('--ros-args', action='extend', nargs="+") + # parser.add_argument('-r', action='extend', nargs="+") + parser.add_argument("--ros-args", action="append", nargs="*") + parser.add_argument("-r", action="append", nargs="*") + parser.add_argument("-p", action="append", nargs="*") + args = parser.parse_args() + args.robot = "myumi" + args.plotter = False + # args.plotter = True + args.real = True + args.ctrl_freq = 50 + # NOTE: does not work due to ctrl-c being overriden + args.save_log = True + args.run_name = "pls" + args.index_runs = True + return args + + +class DummyNode(Node): + def __init__( + self, args, robot: RealMobileYumiRobotManager, loop_manager: ControlLoopManager + ): + super().__init__("dummy_cmds_pub_node") + self.robot = robot + self.loop_manager = loop_manager + self.args = args + # give me the latest thing even if there wasn't an update + qos_prof = rclpy.qos.QoSProfile( + reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, + durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, + history=rclpy.qos.HistoryPolicy.KEEP_LAST, + depth=1, + ) + + self._cb = ReentrantCallbackGroup() + + self._ns = args.ros_namespace + # self._ns = get_rosified_name(self.get_namespace()) + + self.get_logger().info(f"### Starting dummy example under namespace {self._ns}") + + self.empty_msg = JointState() + for i in range(29): + self.empty_msg.velocity.append(0.0) + + # self._T = 1.0 + # self._A = 0.2 + # self._t = 0.0 + self._dt = 1 / self.args.ctrl_freq + + self._pub_timer = self.create_timer(self._dt, self.send_cmd) + + ######################################################## + # connect to smc + ########################################################### + + # self.sub_amcl = self.create_subscription(PoseWithCovarianceStamped, '/amcl_pose', self.pose_callback, qos_prof) + + # this qos is incompatible + # self.sub_joint_states = self.create_subscription(JointState, f"{self._ns}/joint_states", self.callback_arms_state, qos_prof) + + qos_prof2 = rclpy.qos.QoSProfile( + # reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, + # durability = rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, + # history = rclpy.qos.HistoryPolicy.KEEP_LAST, + depth=1 + ) + self.sub_joint_states = self.create_subscription( + JointState, + f"{self._ns}/platform/joint_states", + self.callback_arms_state, + qos_prof2, + ) + self._cmd_pub = self.create_publisher( + JointState, f"{self._ns}/platform/joints_cmd", 1 + ) + self.robot.set_publisher_joints_cmd(self._cmd_pub) + self._cmd_pub = self.create_publisher(JointState, f"{self._ns}joints_cmd", 1) + self.sub_base_odom = self.create_subscription( + Odometry, f"{self._ns}/platform/odometry", self.callback_base_odom, 1 + ) + self.init_odom = np.zeros(3) + + ########## + # base is a twist which is constructed from the first 3 msg.velocities + # 0 -> twist.linear.x + # 1 -> twist.linear.y + # 2 -> twist.angular.z + # left arm indeces are 15-21 (last included) + # right arm indeces are 22-28 (last included) + + def send_cmd(self): + # self.get_logger().info(str(self.robot._q)) + if np.linalg.norm(self.init_odom) < 1e-4: + return + else: + breakFlag = self.loop_manager.run_one_iter( + self.loop_manager.current_iteration + ) + + # msg = self.empty_msg + # for i in range(3): + # msg.velocity[i] = self.robot._v[i] + # for i in range(15, 29): + # msg.velocity[i] = self.robot._v[i - 12] + # self.get_logger().info(str(self.robot._v)) + # self._cmd_pub.publish(msg) + + def callback_base_odom(self, msg: Odometry): + # self.robot._v[0] = msg.twist.twist.linear.x + # self.robot._v[1] = msg.twist.twist.linear.y + ## TODO: check that z can be used as cos(theta) and w as sin(theta) + ## (they could be defined some other way or theta could be offset of something) + # self.robot._v[2] = msg.twist.twist.angular.z + # self.robot._v[3] = 0 # for consistency + costh2 = msg.pose.pose.orientation.w + sinth2 = np.linalg.norm( + [ + msg.pose.pose.orientation.x, + msg.pose.pose.orientation.y, + msg.pose.pose.orientation.z, + ] + ) + th = 2 * np.arctan2(sinth2, costh2) + if np.linalg.norm(self.init_odom) < 1e-4: + self.init_odom[0] = msg.pose.pose.position.x + self.init_odom[1] = msg.pose.pose.position.y + self.init_odom[2] = th + if self.args.unreal: + self.robot._q[0] = msg.pose.pose.position.x - self.init_odom[0] + self.robot._q[1] = msg.pose.pose.position.y - self.init_odom[1] + self.robot._q[2] = np.cos(th - self.init_odom[2]) + self.robot._q[3] = np.sin(th - self.init_odom[2]) + + def callback_arms_state(self, msg: JointState): + if self.args.unreal: + if self.loop_manager.current_iteration < 200 or self.args.unreal: + self.robot._q[-14:-7] = msg.position[-14:-7] + self.robot._q[-7:] = msg.position[-7:] + # self.get_logger().info(str(self.loop_manager.current_iteration)) + + +# def main(args=None): +def main(args=None): + # evil and makes args unusable but what can you do + args_smc = get_args() + assert args_smc.robot == "myumi" + robot = RealMobileYumiRobotManager(args_smc) + # you can't do terminal input with ros + # goal = robot.defineGoalPointCLI() + T_w_absgoal = pin.SE3.Identity() + T_w_absgoal.translation = np.array([0.5, 0.0, 0.5]) + T_absgoal_l = pin.SE3.Identity() + T_absgoal_l.translation = np.array([0.0, 0.2, 0.0]) + T_absgoal_r = pin.SE3.Identity() + T_absgoal_r.translation = np.array([0.0, -0.2, 0.0]) + # loop_manager = CrocoIKMPC(args, robot, goal, run=False) + # robot._mode = robot.control_mode.upper_body + # loop_manager = moveJP(robot._comfy_configuration, args_smc, robot, run=False) + # loop_manager = moveLDualArm( + # args_smc, robot, T_w_absgoal, T_absgoal_l, T_absgoal_r, run=False + # ) + # loop_manager = moveL(args_smc, robot, T_w_absgoal, run=False) + robot._mode = robot.control_mode.whole_body + robot.setInitialPose() + robot._step() + if not args_smc.draw_new: + pixel_path_file_path = "./parameters/path_in_pixels.csv" + path2D = np.genfromtxt(pixel_path_file_path, delimiter=",") + else: + matplotlib.use("tkagg") + path2D = drawPath(args_smc) + matplotlib.use("qtagg") + path2D[:, 0] = path2D[:, 0] * args_smc.map_width + path2D[:, 1] = path2D[:, 1] * args_smc.map_height + path2D = np.hstack((path2D, np.zeros((len(path2D), 1)))) + robot.updateViz({"fixed_path": path2D}) + path_planner = partial(fixedPathPlanner, [0], path2D) + + x0 = np.concatenate([robot.q, robot.v]) + loop_manager = CrocoBasePathFollowingMPC( + args_smc, robot, x0, path_planner, run=False + ) + + rclpy.init(args=args) + + executor = MultiThreadedExecutor() + node = DummyNode(args_smc, robot, loop_manager) + executor.add_node(node) + executor.spin() + + +if __name__ == "__main__": + main() diff --git a/examples/ros/stop_robot.py b/examples/ros/stop_robot.py new file mode 100644 index 0000000..5561e75 --- /dev/null +++ b/examples/ros/stop_robot.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2024, ABB Schweiz AG +# All rights reserved. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY +# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import rclpy +from rclpy.node import Node +from rclpy.executors import MultiThreadedExecutor +from geometry_msgs.msg import Twist, PoseWithCovarianceStamped +from sensor_msgs.msg import JointState +from nav_msgs.msg import Odometry +from abb_python_utilities.names import get_rosified_name +from rclpy.callback_groups import ReentrantCallbackGroup + + +class DummyNode(Node): + def __init__(self): + super().__init__("dummy_cmds_pub_node") + # give me the latest thing even if there wasn't an update + qos_prof = rclpy.qos.QoSProfile( + reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE, + durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, + history=rclpy.qos.HistoryPolicy.KEEP_LAST, + depth=1, + ) + + self._cb = ReentrantCallbackGroup() + + self._ns = "myumi_001" + + self.get_logger().info(f"### Starting dummy example under namespace {self._ns}") + + self._cmd_pub = self.create_publisher( + JointState, f"{self._ns}/platform/joints_cmd", 1 + ) + self.empty_msg = JointState() + for i in range(29): + self.empty_msg.velocity.append(0.0) + + self._dt = 1 / 50 + + self._pub_timer = self.create_timer(self._dt, self.send_cmd) + + def send_cmd(self): + msg = self.empty_msg + self._cmd_pub.publish(msg) + + +# def main(args=None): +def main(args=None): + rclpy.init(args=args) + + executor = MultiThreadedExecutor() + node = DummyNode() + executor.add_node(node) + executor.spin() + + +if __name__ == "__main__": + main() -- GitLab