Skip to content
Snippets Groups Projects
Verified Commit d24cef3c authored by Anton Tetov Johansson's avatar Anton Tetov Johansson
Browse files

modernization of package

parent b292aede
No related branches found
No related tags found
No related merge requests found
......@@ -40,5 +40,5 @@ python -m abb_egm_pyclient.run 6510 joint 30 0 0 20 10 0 0
Or import the client from your own code:
```python
from abb_egm_pyclient import EGMClient
from abb_egm_pyclient.egm_client import EGMClient
```
from .egm_client import EGMClient # noqa: F401,F403
import logging
DEFAULT_UDP_PORT = 6510
logging.getLogger()
import socket
import logging
from typing import Any, Tuple
from typing import Any, Optional, Tuple
from abb_egm_pyclient import DEFAULT_UDP_PORT
from abb_egm_pyclient.atomic_counter import AtomicCounter
from abb_egm_pyclient.egm_pb2 import EgmRobot, EgmSensor, EgmHeader
try:
from abb_egm_pyclient.egm_pb2 import EgmRobot, EgmSensor, EgmHeader # type: ignore
except ImportError:
raise ImportWarning(
"abb_egm_pyclient.egm_pb2 not found, have you extracted the protobufs?"
)
import numpy as np
import numpy.typing as npt
log = logging.getLogger(__name__)
......@@ -19,7 +24,7 @@ class EGMClientException(Exception):
class EGMClientNotInitializedException(EGMClientException):
"""When user tries to access attributes set in self.recieve_from_robot()"""
"""When method is used before initialization"""
pass
......@@ -38,20 +43,20 @@ class EGMClient:
socket
UDP socket used for communication
robot_controller_address
IP address to controller, comes from first packet recieved
IP address to controller, comes from first packet received
send_counter
An atomic counter used for sequence numbers for outbound packages
"""
def __init__(self, port=6510):
def __init__(self, port: int = DEFAULT_UDP_PORT):
self.socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
self.robot_controller_address = None
self.send_counter = AtomicCounter()
self.socket.bind(("", port))
def _get_last_packet(self) -> Tuple[bytes, Any]:
last_recieved = (None, None)
def _get_last_packet(self) -> Tuple[Optional[bytes], Any]:
last_received = (None, None)
self.socket.setblocking(False)
......@@ -59,11 +64,11 @@ class EGMClient:
try:
data, addr = self.socket.recvfrom(2048) # read from socket
if data:
last_recieved = (data, addr) # store last data and addr
last_received = (data, addr) # store last data and addr
except socket.error as err:
# EWOULDBLOCK: recvfrom would block since socket buffer is empty
if err.args[0] == socket.EWOULDBLOCK:
if last_recieved[0]: # if last data got picked up
if last_received[0]: # if last data got picked up
break
else:
continue # else wait for data
......@@ -72,7 +77,7 @@ class EGMClient:
raise err # if there is another error, raise exception
self.socket.setblocking(True)
return last_recieved
return last_received
def receive_msg(self) -> EgmRobot:
"""Get latest UDP packet from IRC5."""
......@@ -130,8 +135,6 @@ class EGMClient:
if len(external_joints) > 0:
msg.planned.externalJoints.joints.extend(external_joints)
self.send_msg(msg)
def send_planned_frame(
......
import argparse
try:
from abb_egm_pyclient.run.print_egm_feedback import print_egm_feedback
from abb_egm_pyclient.run.send_configuration import send_configuration
from abb_egm_pyclient.run.send_pose import send_pose
from abb_egm_pyclient import DEFAULT_UDP_PORT
except ImportError:
raise ImportWarning("abb_egm_pyclient not found, have you installed the package?")
import logging
log = logging.getLogger(__name__)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run one of the example EGM scripts.")
parser.add_argument("port", type=int, help="UDP port")
parser.add_argument(
"--port", "-p", type=int, help="UDP port", default=DEFAULT_UDP_PORT
)
subparsers = parser.add_subparsers(help="sub-command help", dest="subparser_name")
......@@ -17,22 +28,22 @@ if __name__ == "__main__":
parser_print = subparsers.add_parser(
"print",
help="print messages recieved from the EGM interface on the controller",
help="print messages received from the EGM interface on the controller",
)
parser_joint = subparsers.add_parser(
"joint",
help="print messages recieved from the EGM interface on the controller",
help="send joint configuration to the robot",
)
parser_joint.add_argument(
"joint_values",
nargs="*",
nargs="+",
type=float,
)
parser_pose = subparsers.add_parser(
"pose",
help="print messages recieved from the EGM interface on the controller",
help="send pose to the robot",
)
parser_pose.add_argument(
"pose_values", nargs=6, type=float, metavar=("x", "y", "z", "rx", "ry", "rz")
......@@ -40,17 +51,17 @@ if __name__ == "__main__":
args = parser.parse_args()
print(args.__dict__)
log.debug(f"args: {args}")
if args.subparser_name == "print":
print_egm_feedback(args.port)
print_egm_feedback(port=args.port)
if args.subparser_name == "joint":
if len(args.joint_values) not in (6, 7):
raise RuntimeError("Incorrect number of joint values.")
send_configuration(args.port, args.joint_values)
send_configuration(args.joint_values, port=args.port)
if args.subparser_name == "pose":
send_configuration(args.port, *args.pose_values)
send_configuration(args.pose_values, port=args.port)
func = func_sel_mapping[args.func_selection]
#!/usr/bin/env python
import logging
try:
from abb_egm_pyclient import EGMClient
except ImportError:
raise ImportWarning("abb_egm not found, have you installed the package?")
DEFAULT_UDP_PORT = 6510
from abb_egm_pyclient.egm_client import EGMClient
from abb_egm_pyclient import DEFAULT_UDP_PORT
log = logging.getLogger("egm_client")
log = logging.getLogger(__name__)
def print_egm_feedback(port: int) -> None:
def print_egm_feedback(port: int = DEFAULT_UDP_PORT) -> None:
"""Print EGM feedback.
Parameters
......
#!/usr/bin/env python
import sys
from typing import Sequence
from typing import List, Sequence
import time
import numpy as np
import numpy.typing as npt
from abb_egm_pyclient import EGMClient
from abb_egm_pyclient.egm_client import EGMClient
from abb_egm_pyclient import DEFAULT_UDP_PORT
DEFAULT_UDP_PORT = 6510
def send_configuration(port: int, target_conf: Sequence[float], joint_vel=1.0) -> None:
def send_configuration(
target_conf: Sequence[float], port: int = DEFAULT_UDP_PORT, joint_vel=1.0
) -> None:
"""Move robot to target configuration
Parameters
......@@ -27,7 +26,7 @@ def send_configuration(port: int, target_conf: Sequence[float], joint_vel=1.0) -
egm_client = EGMClient(port=port)
pb_robot_msg = egm_client.receive_msg()
start_conf: Sequence[float] = pb_robot_msg.feedBack.joints.joints
start_conf: List[float] = pb_robot_msg.feedBack.joints.joints
if len(start_conf) == len(target_conf) - 1:
start_conf.append(pb_robot_msg.feedBack.externalJoints.joints[0])
......@@ -47,8 +46,6 @@ def send_configuration(port: int, target_conf: Sequence[float], joint_vel=1.0) -
print(f"Current configuration {cur_configuration}")
new_conf = start_conf_arr
conf = []
for start_pos, delta in zip(start_conf, deltas):
abs_change = n * delta / num_msgs
......
#!/usr/bin/env python
"""Send pose example."""
from typing import List
from abb_egm_pyclient import DEFAULT_UDP_PORT
def send_pose(port, x, y, z, rx, ry, rz):
def send_pose(
target_pose: List[float],
port: int = DEFAULT_UDP_PORT,
):
raise NotImplementedError("Planned example")
name: abb-egm-examples
name: abb_egm_pyclient
channels:
- conda-forge
dependencies:
- python >=3.7, <3.10
- protobuf
- numpy
- python >=3.7, <3.13
- protobuf
- ruff
[project]
name = "abb_egm_pyclient"
version = "0.1.0"
description = "Python client to send EGM messages to ABB robots"
authors = [
{ name = "Anton Tetov", email = "anton@tetov.se" },
]
readme = "README.md"
requires-python = ">= 3.7"
license = {file = "LICENSE"}
urls = { "Repository" = "https://gitlab.control.lth.se/robotlab/abb_egm_pyclient/"}
dependencies = ["numpy", "protobuf"]
optional-dependencies = { "dev" = ["ruff"] }
[build-system]
requires = ["setuptools >= 40.9.0", "wheel"]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[tool.isort]
profile = "black"
[tool.mypy]
plugins = "numpy.typing.mypy_plugin"
[metadata]
name = abb_egm_pyclient
version = 0.1.0
[options]
packages = abb_egm_pyclient
python_requires = >= 3.7
install_requires =
protobuf
[options.extras_require]
dev =
black
flake8
isort >= 5.0.0
mypy
[flake8]
max-line-length = 88
extend-ignore = E203
import setuptools
setuptools.setup()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment