diff --git a/examples/send_configuration.py b/examples/send_configuration.py index f5f66fb0f4e20b8bbe5c9923442a51315e361de3..1feddc0f6b392c6d8e1dde713402a0a22745ea07 100644 --- a/examples/send_configuration.py +++ b/examples/send_configuration.py @@ -1,8 +1,8 @@ #!/usr/bin/env python -import logging import sys -import math from typing import Sequence +import time + try: from abb_egm_client import EGMClient @@ -12,71 +12,74 @@ except ImportError: UDP_PORT = 6510 -def constrain(val: float, minimum: float, maximum: float) -> bool: - """Constrain a value between given bounds. - - Parameters - ---------- - val - Value to constrain - minimum - Minimum bound - maximum - Maximum bound - """ - return sorted([minimum, val, maximum])[1] - - -def send_configuration( - target_configuration: Sequence[float], step_size=1, tolerance=0.5 -) -> None: +def send_configuration(target_conf: Sequence[float], joint_vel=1.0) -> None: """Move robot to target configuration Parameters ---------- target_configuration List of joint positions in degrees - step_size - Degrees of rotation per message - tolerance - Absolute tolerance used to check for convergence + joint_vel + Max joint velocity in degrees/s for all joints """ - - def converged_predicate(val: float) -> bool: - return math.isclose(val, 0, abs_tol=tolerance) + # send rate in hz + rate = 10 egm_client = EGMClient(port=UDP_PORT) - if not target_configuration: - target_configuration = [18, 45, 0, 20, 45, 0] + pb_robot_msg = egm_client.receive_msg() + start_conf = pb_robot_msg.feedBack.joints.joints - while True: - pb_robot_msg = egm_client.receive_msg() + deltas = [ + target_pos - start_pos for start_pos, target_pos in zip(start_conf, target_conf) + ] + + max_delta = max([abs(d) for d in deltas]) - cur_configuration = pb_robot_msg.feedBack.joints.joints + num_msgs = round(max_delta / joint_vel * rate) + + for n in range(num_msgs): + pb_robot_msg = egm_client.receive_msg() + cur_configuration: Sequence[float] = pb_robot_msg.feedBack.joints.joints - signed_deltas = [t - c for t, c in zip(target_configuration, cur_configuration)] + print(f"Current configuration {cur_configuration}") - steps = [constrain(d, -step_size, step_size) for d in signed_deltas] + conf = [] + for start_pos, delta in zip(start_conf, deltas): + abs_change = n * delta / num_msgs + if delta < 0: + conf.append(start_pos - abs_change) + else: + conf.append(start_pos + abs_change) - if all(map(converged_predicate, steps)): - logging.info(f"Joint positions converged at {cur_configuration}") - return + egm_client.send_planned_configuration(conf) - new_configuration = [cur + step for cur, step in zip(cur_configuration, steps)] + time.sleep(1 / rate) - logging.info(f"Sending {new_configuration}") - egm_client.send_planned_configuration(new_configuration) + n = 0 + while n < 10: + pb_robot_msg = egm_client.receive_msg() + if pb_robot_msg.mciConvergenceMet: + print("Joint positions converged.") + break + else: + print("Waiting for convergence.") + n += 1 + time.sleep(1) + else: + raise TimeoutError(f"Joint positions did not converge after {n} s timeout.") if __name__ == "__main__": target_configuration = None - print(sys.argv) - if len(sys.argv) > 1: - if len(sys.argv) == 7: # 7 args including path to this file - target_configuration = [float(n) for n in sys.argv[1:]] - else: - raise RuntimeError("Wrong number of arguments, need six joint values") + args = sys.argv[1:] # first argument is just path to file + + if len(args) == 0: + target_configuration = [30.0, 30.0, 40.0, 30.0, 10.0, 15.0] + elif len(args) in (6, 7): # check for six to seven joint values + target_configuration = [float(n) for n in args] + else: + raise RuntimeError("Wrong number of arguments, need six or seven joint values") send_configuration(target_configuration)