diff --git a/python/ur_simple_control/managers.py b/python/ur_simple_control/managers.py index ab9d336d4227e03147191d29e843d00b3ab8a4b1..7aa7927b31edc3b9d113ba2009de4468cfd1a6be 100644 --- a/python/ur_simple_control/managers.py +++ b/python/ur_simple_control/managers.py @@ -1173,6 +1173,7 @@ class ProcessManager: # we're in python, and NOW do we get picky with copy-pasting??????? if comm_direction == 3: # "sending" the command via shared memory + # TODO: the name should be random and send over as function argument shm_name = "command" self.shm_cmd = shared_memory.SharedMemory(shm_name, create=True, size=init_command.nbytes) self.shared_command = np.ndarray(init_command.shape, dtype=init_command.dtype, buffer=self.shm_cmd.buf) @@ -1189,6 +1190,18 @@ class ProcessManager: # the process has to create its shared memory self.side_process = Process(target=side_function, args=(args, init_command, shm_name, self.shared_command_lock, self.shm_data,)) + if comm_direction == 4: + from ur_simple_control.networking.util import DictPb2EncoderDecoder + self.encoder_decoder = DictPb2EncoderDecoder() + self.msg_code = self.encoder_decoder.dictToMsgCode(init_command) + # TODO: the name should be random and send over as function argument + shm_name = "client_socket" + # NOTE: size is max size of the recv buffer too, + # and the everything blows up if you manage to fill it atm + self.shm_msg = shared_memory.SharedMemory(shm_name, create=True, size=1024) + self.lock = Lock() + self.side_process = Process(target=side_function, + args=(args, init_command, shm_name, self.lock)) if type(side_function) == partial: self.side_process.name = side_function.func.__name__ else: @@ -1225,8 +1238,9 @@ class ProcessManager: do you need to pollute half of robotmanager or whatever else to deal with this. """ - if self.command_queue.qsize() < 5: - self.command_queue.put_nowait(command) + if self.comm_direction != 3: + if self.command_queue.qsize() < 5: + self.command_queue.put_nowait(command) if self.comm_direction == 3: assert type(command) == np.ndarray @@ -1237,13 +1251,20 @@ class ProcessManager: def getData(self): - if self.comm_direction != 3: + if self.comm_direction < 3: if not self.data_queue.empty(): self.latest_data = self.data_queue.get_nowait() if self.comm_direction == 3: self.shared_command_lock.acquire() + # here we should only copy, release the lock, then deserialize self.latest_data = pickle.loads(self.shm_data.buf) self.shared_command_lock.release() + if self.comm_direction == 4: + self.lock.acquire() + #data_copy = copy.deepcopy(self.shm_msg.buf) + self.latest_data = self.encoder_decoder.serializedPb2MsgToDict(self.shm_msg.buf, self.msg_code) + self.lock.release() + #self.latest_data = self.encoder_decoder.serializedPb2MsgToDict(data_copy, self.msg_code) return copy.deepcopy(self.latest_data) def terminateProcess(self): diff --git a/python/ur_simple_control/networking/__init__.py b/python/ur_simple_control/networking/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..df760a3a595a49ad125a6ccf171c1ef6c3c71419 100644 --- a/python/ur_simple_control/networking/__init__.py +++ b/python/ur_simple_control/networking/__init__.py @@ -0,0 +1,3 @@ +from client import * +from server import * +from util import * diff --git a/python/ur_simple_control/networking/client.py b/python/ur_simple_control/networking/client.py index 6ad0507e2394507d8fa4d83e1b16b1eb2ad0c29c..201dd32e1ecbc80c7c519329152e0ad0af2f0f39 100644 --- a/python/ur_simple_control/networking/client.py +++ b/python/ur_simple_control/networking/client.py @@ -1,67 +1,64 @@ import socket -import time -import message_specs_pb2 +from multiprocessing import shared_memory +from google.protobuf.internal.encoder import _VarintBytes +from google.protobuf.internal.decoder import _DecodeVarint32 -# TODO: finish -def client(args, init_command, shm_name, shared_data_lock, shm_data): +def client(args, init_command, shm_name, lock): """ - server + client ------- connects to a server, then receives messages from it. offers latest message via shared memory + the message is not deserialized here because it needs to + be serialized to be put into shared memory anyway. + so better this than to deserialize and the serialize + again differently. + + to use this, you comm_direction = 4 in processmanager + ex. host = "127.0.0.1" ex. host_port = 7777 """ - shm = shared_memory.SharedMemory(name=shm_name) - p_shared = np.ndarray((2,), dtype=np.float64, buffer=shm.buf) - lock.acquire() - p[:] = p_shared[:] - lock.release() - - -host = "127.0.0.1" -host_port = 7777 -host_addr = (host, host_port) -s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -s.connect(host_addr) + def parse_message(msg_raw): + pos, next_pos = 0, 0 + msg_len = len(msg_raw) + msg_in_bytes = b"" + while True: + next_pos, pos = _DecodeVarint32(msg_raw, pos) + # TODO: either save the message chunk, or save how many initial bytes to ignore in the next message + if pos + next_pos > msg_len: + print("NETWORKING CLIENT: BUFFER OVERFLOW, DROPPING MSG!") + break + # NOTE: we only keep the latest message, we're not going to do anything with the missed message. + # the assumption is that we only get the same kind of message from a sensor or something similar, + # not files or whatever. + # hence we only deserialize the last message + if pos >= len(msg_raw): + # why i am encoding the length back in ??? + msg_in_bytes = ( + _VarintBytes(pos + next_pos) + msg_raw[pos : pos + next_pos] + ) + break + return msg_in_bytes + shm = shared_memory.SharedMemory(name=shm_name) + host_addr = (args.host_addr, args.host_port) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(host_addr) + if args.debug_prints: + print("NETWORKING CLIENT: connected to server") -def parse_message(msg_raw): - parsed_messages = [] - offset = 0 - while True: - pb2_msg = message_specs_pb2.iteration_n_time() - pb2_msg.ParseFromString(msg_raw[offset:]) - parsed_messages.append(pb2_msg) - offset += pb2_msg.ByteSize() - if offset >= len(msg_raw): - break - return parsed_messages - - -for i in range(100000): - # TODO: receive only in multiples of n_bytes of your message - msg_raw = s.recv(1024) - print("=" * 20) - print("message length", len(msg_raw)) - if len(msg_raw) == 0: - print("iteration:", i, "got nothing, going for next recv") - continue - # pb2_msg = message_specs_pb2.iteration_n_time() - # pb2_msg.ParseFromString(msg_raw) - parsed_messages = parse_message(msg_raw) - for j in range(len(parsed_messages)): - print("-------------------") - my_time = time.perf_counter() - diff = my_time - parsed_messages[j].time_sent - print("client iteration:", i) - print("server iteration:", parsed_messages[j].iteration_number) - print("client time:", my_time) - print("server time:", parsed_messages[j].time_sent) - print("time between sending and receiveing:", diff) - -s.close() -print("client out") + try: + while True: + msg_raw = s.recv(1024) + msg_in_bytes = parse_message(msg_raw) + lock.acquire() + shm.buf[: len(msg_in_bytes)] = msg_in_bytes + lock.release() + except KeyboardInterrupt: + s.close() + if args.debug_prints: + print("NETWORKING_CLIENT: caught KeyboardInterrupt, i'm out") diff --git a/python/ur_simple_control/networking/message_specs.proto b/python/ur_simple_control/networking/message_specs.proto index a45edd8f1f7c56cf45c69a93e7a52a40e952f51b..730a91bd92879b11b587317246bd08038e3fd900 100644 --- a/python/ur_simple_control/networking/message_specs.proto +++ b/python/ur_simple_control/networking/message_specs.proto @@ -4,6 +4,6 @@ message joint_angles { repeated double q = 1; } // graziano's message message wrenches { - repeated double wrench_mine = 1; + repeated double wrench = 1; repeated double wrench_estimate = 2; } diff --git a/python/ur_simple_control/networking/server.py b/python/ur_simple_control/networking/server.py index a7818fc807240636f5d937a2f8e9e7c78dd9fdbb..13c2340897df10303c5c2afb16df7c7ef3040bbf 100644 --- a/python/ur_simple_control/networking/server.py +++ b/python/ur_simple_control/networking/server.py @@ -1,10 +1,11 @@ import socket import time import message_specs_pb2 +from google.protobuf.internal.encoder import _VarintBytes +from ur_simple_control.networking.util import DictPb2EncoderDecoder -# TODO: finish -def server(args, init_command, shm_name, shared_data_lock, shm_data): +def server(args, init_command, queue): """ server ------- @@ -12,21 +13,31 @@ def server(args, init_command, shm_name, shared_data_lock, shm_data): ex. host = "127.0.0.1" ex. host_port = 7777 + + use comm_direction = 0 in processmanager for this """ + encoder_decoder = DictPb2EncoderDecoder() host_addr = (args.host, args.port) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(host_addr) + if args.debug_prints: + print("NETWORKING_SERVER: server listening on", host_addr) s.listen() - print("server listening on", host_addr) comm_socket, comm_addr = s.accept() - print("accepted a client", comm_addr) - for i in range(100000): - time.sleep(0.00001) - pb2_msg = message_specs_pb2.iteration_n_time() - pb2_msg.iteration_number = i - pb2_msg.time_sent = time.perf_counter() - # msg = pickle.dumps((i, time.time())) - comm_socket.send(pb2_msg.SerializeToString()) + # we're only accepting a single connection s.close() + if args.debug_prints: + print("NETWORKING_SERVER: accepted a client", comm_addr) + try: + while True: + # the construction of the message should happen in processmanager + msg_dict = queue.get() + if msg_dict == "befree": + if args.debug_prints: + print("NETWORKING_SERVER: got befree, networking server out") + break + comm_socket.send(encoder_decoder.dictToSerializedPb2Msg(msg_dict)) + except KeyboardInterrupt: + if args.debug_prints: + print("NETWORKING_SERVER: caugth KeyboardInterrupt, networking server out") comm_socket.close() - print("server exited") diff --git a/python/ur_simple_control/networking/util.py b/python/ur_simple_control/networking/util.py new file mode 100644 index 0000000000000000000000000000000000000000..46397f38eb81cfd2858fbf167b401080aa81ad8f --- /dev/null +++ b/python/ur_simple_control/networking/util.py @@ -0,0 +1,134 @@ +import message_specs_pb2 +from google.protobuf.internal.encoder import _VarintBytes +from google.protobuf.internal.decoder import _DecodeVarint32 + +""" +to make the data API uniform across the library (dictionaries), +we need to have a lookup function between these dictionaries +and messages send with protobuf. +i don't want to see protobuf construction in my control loop aight. + +this is done as follows. +there is a map between a binary number and messages. +the binary number is of fixed length, and each bit represents +a boolean for whether a specific quantity is in the keys of the dictionary. +it is initialized to all zeros (false) +when a dictionary is inputed, the function goes over every key and sets +the corresponding value. +of course for every combination there is a pre-defined message in protobuf. +(well it won't be, that's a todo because it might be better +to have a big message with optional fields or something). +the output is a message with the message values put in. + +is this the best way of dealing with enabling unifromness in the API? +is this the best way to construct protobuf messages? +i don't know, but it solves my most important problem - i don't +want to look at protobuf in the rest of the code +""" + + +class DictPb2EncoderDecoder: + def __init__(self): + # TODO: fill this in with more messages + # int_to_message = {0: message_specs_pb2.joint_angles, 6: message_specs_pb2.wrenches} + self.key_to_index = {"q": 0, "wrench": 1, "wrench_estimate": 2} + # if you want to embed message codes into messages, + # you'll need to use this + # stupidest possible solution + # TODO: implement own bidirectional dict for instead of this + # self.index_to_key = {self.key_to_index[key]: key for key in self.key_to_index} + + def dictToMsgCode(self, dict_msg): + """ + you give me a dict that you want to send, + i give you it's code which internally corresponds + to the pb2 message which the given dict should form + """ + msg_code = 0b000 + for key in dict_msg: + # 1 << x gets you a binary starting with 1 followed by x zeros + # | is binary or + # because i can't just flip a bit because python. + # this certainly isn't the most efficient way to go about it, + # but what can you do + msg_code = msg_code | (1 << self.key_to_index[key]) + return msg_code + + def dictToSerializedPb2Msg(self, dict_msg): + """ + dictToPb2Msg + ------------ + takes dict, finds it's code and the corresponding pb2 message, + fills in the values, serializes + the message and prepends its length. + + NOTE: possible extension: prepend msg_code as well, + that way you can send different messages + you could also just have multiple sockets, all sending the same + message. ask someone who actually knows network programming + what makes more sense + """ + # pb2_msg = int_to_message[msg_code] + msg_code = self.dictToMsgCode(dict_msg) + if msg_code == 0: + pb2_msg = message_specs_pb2.joint_angles() + # if i could go from string to class atribute somehow + # that'd be amazing + # TODO: see if that's possible + pb2_msg.q = dict_msg["q"] + if msg_code == 6: + pb2_msg = message_specs_pb2.wrenches() + # if i could go from string to class atribute somehow + # that'd be amazing + # TODO: see if that's possible + pb2_msg.wrench = dict_msg["wrench"] + pb2_msg.wrench_estimate = dict_msg["wrench_estimate"] + + # NOTE: possible extension: + # prepend the message code as well so that i can send different + # messages over the same socket. + # might come in handy and it comes for the price of one int per message, + # which sounds like a good deal to me + # msg_serialized = _VarintBytes(msg_code) + pb2_msg.SerializeToString() + # --> not bothering with it until necessary + msg_length = pb2_msg.ByteSize() + msg_serialized = pb2_msg.SerializeToString() + # NOTE: protobuf is not self-delimiting so we have to prepend + # the length of each message to know how to parse multiple + # messages if they get buffered at the client (likely occurance) + msg = _VarintBytes(msg_length) + msg_serialized + return msg + + def serializedPb2MsgToDict(self, pb2_msg_in_bytes, msg_code): + """ + pb2MsgToDict + ------------ + input is pb2 msg in bytes prepended with msg length and + msg_code, in that order, both are integers + + atm assumes only one message type will be received, + and that you have to pass as an argument (it's the one + you get when you call dictToMsgCode). + + alternatively, as an extension, + the msg_code is embeded in the message and then you can send + different messages over the same socket. + """ + dict_msg = {} + if msg_code == 0: + pb2_msg = message_specs_pb2.joint_angles() + pb2_msg.ParseFromString(pb2_msg_in_bytes) + # if i could go from string to class atribute somehow + # that'd be amazing + # TODO: see if that's possible + dict_msg["q"] = pb2_msg.q + if msg_code == 6: + pb2_msg = message_specs_pb2.wrenches() + pb2_msg.ParseFromString(pb2_msg_in_bytes) + # if i could go from string to class atribute somehow + # that'd be amazing + # TODO: see if that's possible + dict_msg["wrench"] = pb2_msg.wrench + dict_msg["wrench_estimate"] = pb2_msg.wrench_estimate + + return dict_msg