diff --git a/README.md b/README.md index 20d02101648073c00e3820ae9ef16be972f3b101..2c2fbb4c26ed313cccad78ad4066f63f03aaf38c 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,36 @@ # ABB EGM examples +## Setup + +### 1. Install Anaconda + +[Installation](https://docs.anaconda.com/anaconda/install/index.html) guide. + +### 2. Create an anaconda environment + ```bash conda env create -f environment.yml -conda activate abb_egm_examples +conda activate abb-egm-examples +``` + +### 3. Install package + +```bash +cd /path/to/this/directory +conda activate abm-egm-examples +pip install -e . +``` + +## Usage + +Run the `EGM_test.mod` module on the robot controller. Remember to change +UCDevice argument in EGMSetupUC invocation. + +And on your computer: + +```bash +conda activate abm-egm-examples + +python examples/print_egm_feedback.py +python examples/send_configuration.py ``` diff --git a/abb_egm_client/atomic_counter.py b/abb_egm_client/atomic_counter.py index a78a1dab8a435918a087297dfbc5efd8e09756a3..c28f1023a06909255b838999bf123350b248d3ee 100644 --- a/abb_egm_client/atomic_counter.py +++ b/abb_egm_client/atomic_counter.py @@ -10,13 +10,11 @@ class AtomicCounter: >>> counter = AtomicCounter() >>> counter.inc() 1 - >>> counter.inc(num=4) - 5 >>> counter = AtomicCounter(42.5) >>> counter.value 42.5 - >>> counter.inc(num=0.5) - 43.0 + >>> counter.inc() + 43.5 >>> counter = AtomicCounter() >>> def incrementor(): ... for i in range(100000): @@ -37,16 +35,10 @@ class AtomicCounter: self._value = initial self._lock = threading.Lock() - def inc(self, num=1): + def inc(self): """Atomically increment the counter by num and return the new value""" with self._lock: - self._value += num - return self._value - - def dec(self, num=1): - """Atomically decrement the counter by num and return the new value""" - with self._lock: - self._value -= num + self._value += 1 return self._value @property diff --git a/abb_egm_client/egm_client.py b/abb_egm_client/egm_client.py index 295fd0e9fd347ec26e7a951d2c83d14e0ce682cb..1dc36ff2f2901e8eaddf0530d041c43d5f5f8e55 100644 --- a/abb_egm_client/egm_client.py +++ b/abb_egm_client/egm_client.py @@ -1,5 +1,6 @@ import socket import logging +from typing import Any, Sequence, Tuple from abb_egm_client.atomic_counter import AtomicCounter @@ -22,6 +23,24 @@ class EGMClientNotInitializedException(EGMClientException): class EGMClient: + """Communication client to ABB EGM interface. + + Parameter + --------- + port + Port number, same as defined in Controller > Configuration + > Communication > Transmission Protocol > {name of your UDP configuration} + + Attributes + ---------- + socket + UDP socket used for communication + robot_controller_address + IP address to controller, comes from first packet recieved + send_counter + An atomic counter used for sequence numbers for outbound packages + """ + def __init__(self, port=6510): self.socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) self.robot_controller_address = None @@ -29,21 +48,7 @@ class EGMClient: self.socket.bind(("", port)) - # def _ensure_socket_ready(self, timeout=1): - # # try: - # # except select.error as err: - # # if err.args[0] == EINTR: - # # return False - # # else: - # # raise - # result = select.select([self.socket], [], [self.socket], timeout) - - # print(result) - - # # if len(result[0]) == 0 and len(result[2]) == 0: - # # raise Exception("error in ensure_socket_ready") - - def _get_last_packet(self): + def _get_last_packet(self) -> Tuple[bytes, Any]: last_recieved = (None, None) self.socket.setblocking(False) @@ -67,7 +72,8 @@ class EGMClient: self.socket.setblocking(True) return last_recieved - def receive_from_robot(self): + def receive_msg(self) -> EgmRobot: + """Get latest UDP packet from IRC5.""" data, addr = self._get_last_packet() @@ -78,11 +84,12 @@ class EGMClient: # use the message description from the proto file to create a python # class from incoming data msg = EgmRobot() - msg.ParseFromString(data) + msg.ParseFromString(data) return msg - def send_msg(self, msg): + def send_msg(self, msg: Any) -> None: + """Send a protobuf message to the robot controller.""" if not self.robot_controller_address: raise EGMClientNotInitializedException( "You need to start communication with controller." @@ -90,22 +97,46 @@ class EGMClient: self.socket.sendto(msg.SerializeToString(), self.robot_controller_address) - def _create_sensor_msg(self, type="MSGTYPE_CORRECTION"): + def _create_sensor_msg(self, type="MSGTYPE_CORRECTION") -> EgmSensor: + """Create EgmSensor message with sequence number + + type + EgmSensor message type + """ msg = EgmSensor() msg.header.mtype = EgmHeader.MessageType.Value(type) msg.header.seqno = self.send_counter.inc() return msg - def send_planned_configuration(self, configuration): - msg = self._create_sensor_msg_type_correction() + def send_planned_configuration(self, configuration: Sequence[float]) -> None: + """Send target configuration to robot controller. + + configuration + List of joint position (angles) in degrees or radians, depending on + RobotWare version. + """ + msg = self._create_sensor_msg() msg.planned.joints.joints.extend(configuration) self.send_msg(msg) - def send_planned_frame(self, x, y, z, rx, ry, rz): - msg = self._create_sensor_msg_type_correction() + def send_planned_frame( + self, x: float, y: float, z: float, rx: float, ry: float, rz: float + ) -> None: + """Send target frame to robot controller. + + x + y + z + Cartesian coordinates in mm(?) + rx + ry + rz + Euler angles in (?) + """ + msg = self._create_sensor_msg() msg.planned.cartesian.pos.x = x msg.planned.cartesian.pos.y = y diff --git a/environment.yml b/environment.yml index f2cdd35a8caf6dedaaef849bb0404b418e600d6b..98a803d09d0ffc8f2bc8a47860184a734aa83769 100644 --- a/environment.yml +++ b/environment.yml @@ -3,5 +3,4 @@ channels: - conda-forge dependencies: - python >=3.9, <3.10 - - numpy - protobuf diff --git a/examples/print_egm_feedback.py b/examples/print_egm_feedback.py index b076a6157c7c5d5bbe2587ea8874639af3966185..f8d86b0174962056dcb602787b8381a052bccb33 100644 --- a/examples/print_egm_feedback.py +++ b/examples/print_egm_feedback.py @@ -24,7 +24,7 @@ def print_egm_feedback() -> None: while True: try: - msg = egm_client.receive_from_robot() + msg = egm_client.receive_msg() print(f"Sequence number: {msg.header.seqno}") print(f"Body: {msg.feedBack}") diff --git a/examples/send_configuration.py b/examples/send_configuration.py new file mode 100644 index 0000000000000000000000000000000000000000..f5f66fb0f4e20b8bbe5c9923442a51315e361de3 --- /dev/null +++ b/examples/send_configuration.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +import logging +import sys +import math +from typing import Sequence + +try: + from abb_egm_client import EGMClient +except ImportError: + raise ImportWarning("abb_egm not found, have you installed the package?") + +UDP_PORT = 6510 + + +def constrain(val: float, minimum: float, maximum: float) -> bool: + """Constrain a value between given bounds. + + Parameters + ---------- + val + Value to constrain + minimum + Minimum bound + maximum + Maximum bound + """ + return sorted([minimum, val, maximum])[1] + + +def send_configuration( + target_configuration: Sequence[float], step_size=1, tolerance=0.5 +) -> None: + """Move robot to target configuration + + Parameters + ---------- + target_configuration + List of joint positions in degrees + step_size + Degrees of rotation per message + tolerance + Absolute tolerance used to check for convergence + """ + + def converged_predicate(val: float) -> bool: + return math.isclose(val, 0, abs_tol=tolerance) + + egm_client = EGMClient(port=UDP_PORT) + + if not target_configuration: + target_configuration = [18, 45, 0, 20, 45, 0] + + while True: + pb_robot_msg = egm_client.receive_msg() + + cur_configuration = pb_robot_msg.feedBack.joints.joints + + signed_deltas = [t - c for t, c in zip(target_configuration, cur_configuration)] + + steps = [constrain(d, -step_size, step_size) for d in signed_deltas] + + if all(map(converged_predicate, steps)): + logging.info(f"Joint positions converged at {cur_configuration}") + return + + new_configuration = [cur + step for cur, step in zip(cur_configuration, steps)] + + logging.info(f"Sending {new_configuration}") + egm_client.send_planned_configuration(new_configuration) + + +if __name__ == "__main__": + target_configuration = None + print(sys.argv) + + if len(sys.argv) > 1: + if len(sys.argv) == 7: # 7 args including path to this file + target_configuration = [float(n) for n in sys.argv[1:]] + else: + raise RuntimeError("Wrong number of arguments, need six joint values") + + send_configuration(target_configuration) diff --git a/examples/send_frame.py b/examples/send_frame.py new file mode 100644 index 0000000000000000000000000000000000000000..82d32bc7aa2c5914f179f78fe7b6e565092f5f10 --- /dev/null +++ b/examples/send_frame.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python +"""Send frame example.""" + +raise NotImplementedError("Planned example")