From d5d9ee8c921ff5311f2d6b9dbd6e170484441772 Mon Sep 17 00:00:00 2001 From: Anders Blomdell <anders.blomdell@control.lth.se> Date: Mon, 10 Jun 2013 18:46:39 +0200 Subject: [PATCH] Twoway stacking example added. Labcomm cleanup (still more to do) --- examples/Makefile | 7 +- examples/twoway/Makefile | 17 +- examples/twoway/client.c | 346 +---- examples/twoway/decimating.c | 154 ++- examples/twoway/decimating.h | 3 - examples/twoway/decimating_messages.lc | 2 +- examples/twoway/server.c | 1261 +----------------- examples/twoway/types.lc | 5 + lib/c/Makefile | 3 + lib/c/labcomm.c | 84 +- lib/c/labcomm.h | 5 +- lib/c/labcomm_dynamic_buffer_writer.c | 2 +- lib/c/labcomm_fd_reader.c | 2 +- lib/c/labcomm_fd_writer.c | 2 +- lib/c/labcomm_private.h | 8 +- lib/c/test/test_labcomm_generated_encoding.c | 2 +- test/Makefile | 7 +- 17 files changed, 336 insertions(+), 1574 deletions(-) create mode 100644 examples/twoway/types.lc diff --git a/examples/Makefile b/examples/Makefile index 6ec4b20..30ecece 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -1,10 +1,15 @@ +.PHONY: all all: echo To be done... - echo $(MAKE) -C twoway e + $(MAKE) -C twoway all +.PHONY: test test: echo More to be done... cd simple ; sh compile.sh && sh run.sh + $(MAKE) -C twoway test +.PHONY: clean distclean clean distclean: echo To be done... + $(MAKE) -C twoway clean diff --git a/examples/twoway/Makefile b/examples/twoway/Makefile index 79f0dac..646d31c 100644 --- a/examples/twoway/Makefile +++ b/examples/twoway/Makefile @@ -6,6 +6,10 @@ CFLAGS=-O3 -g -Wall -Werror -I../../lib/c -I. -lpthread all: $(TARGETS:%=gen/%) +test: all + LD_LIBRARY_PATH=../../lib/c ./gen/server 2000 & + LD_LIBRARY_PATH=../../lib/c ./gen/client localhost 2000 + gen/.dir: mkdir -p $@ @@ -20,11 +24,13 @@ gen/%.o: %.c | gen/.dir gen/%.c gen/%.h: %.lc | gen/.dir $(LABCOMM) --c=gen/$*.c --h=gen/$*.h $< -gen/client: client.c gen/types.o gen/decimating.o gen/decimating_messages.o - $(CC) -o $@ $(CFLAGS) $^ -lpthread -L../../lib/c -llabcomm +gen/client: client.c + $(CC) -o $@ $(CFLAGS) $^ -lpthread \ + -L../../lib/c -llabcomm -Tlabcomm.linkscript -gen/server: server.c gen/types.o gen/decimating.o gen/decimating_messages.o - $(CC) -o $@ $(CFLAGS) $^ -lpthread -L../../lib/c -llabcomm +gen/server: server.c gen/types.o gen/decimating.o + $(CC) -o $@ $(CFLAGS) $^ -lpthread \ + -L../../lib/c -llabcomm -Tlabcomm.linkscript .PHONY: clean clean: @@ -34,4 +40,7 @@ gen/decimating.o: decimating.h gen/decimating.o: gen/decimating_messages.h gen/client.o: decimating.h gen/client.o: gen/types.h +gen/client: gen/decimating.o gen/client: gen/decimating_messages.o +gen/client: gen/types.o +gen/server: gen/decimating_messages.o diff --git a/examples/twoway/client.c b/examples/twoway/client.c index 9d1e250..27b1acf 100644 --- a/examples/twoway/client.c +++ b/examples/twoway/client.c @@ -8,6 +8,7 @@ #include <sys/select.h> #include <sys/socket.h> #include <sys/types.h> +#include <unistd.h> #include <labcomm.h> #include <labcomm_fd_reader.h> #include <labcomm_fd_writer.h> @@ -15,16 +16,14 @@ #include "decimating.h" #include "gen/types.h" -static void handle_A(int32_t *value, void *context) -{ -} - -static void handle_B(int32_t *value, void *context) +static void handle_Sum(int32_t *value, void *context) { + printf("A+B=%d\n", *value); } -static void handle_C(int32_t *value, void *context) +static void handle_Diff(int32_t *value, void *context) { + printf("A-B=%d\n", *value); } static void *run_decoder(void *context) @@ -34,7 +33,6 @@ static void *run_decoder(void *context) do { result = labcomm_decoder_decode_one(decoder); - printf("Got index %d", result); } while (result >= 0); return NULL; } @@ -54,6 +52,7 @@ int main(int argc, char *argv[]) struct labcomm_lock *lock; struct labcomm_decoder *decoder; struct labcomm_encoder *encoder; + int32_t i, j; hostname = argv[1]; port = atoi(argv[2]); @@ -101,330 +100,29 @@ int main(int argc, char *argv[]) encoder = labcomm_encoder_new(decimating->writer, lock); pthread_t rdt; - labcomm_decoder_register_types_A(decoder, handle_A, NULL); - labcomm_decoder_register_types_B(decoder, handle_B, NULL); - labcomm_decoder_register_types_C(decoder, handle_C, NULL); + labcomm_decoder_register_types_Sum(decoder, handle_Sum, NULL); + labcomm_decoder_register_types_Diff(decoder, handle_Diff, NULL); pthread_create(&rdt, NULL, run_decoder, decoder); labcomm_encoder_register_types_A(encoder); - -out: - return 0; - -} - -#if 0 - -#include <arpa/inet.h> -#include <errno.h> -#include <netdb.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <unistd.h> -#include <linux/tcp.h> -#include "labcomm.h" -#include "orca_client.h" -#include "orca_messages.h" - -orca_client_t *orca_client_new_tcp( - char *hostname, - int port) -{ - orca_client_t *result; - struct hostent *host; - int OK; - - result = malloc(sizeof(orca_client_t)); - OK = result != 0; + labcomm_encoder_register_types_B(encoder); + labcomm_encoder_register_types_Terminate(encoder); - if (OK) { - result->encoder = 0; - result->decoder = 0; - result->directory.input.n_0 = 0; - result->directory.input.a = 0; - result->directory.output.n_0 = 0; - result->directory.output.a = 0; - result->directory.parameter.n_0 = 0; - result->directory.parameter.a = 0; - result->directory.log.n_0 = 0; - result->directory.log.a = 0; - result->fd = socket(PF_INET, SOCK_STREAM, 0); - if (result->fd < 0) { - fprintf(stderr, "failed to create socket\n"); - OK = 0; - } - } - - if (OK) { - struct sockaddr_in adr; - int err; - - adr.sin_family = AF_INET; - adr.sin_port = 0; - adr.sin_addr.s_addr = INADDR_ANY; - err = bind(result->fd, (struct sockaddr*)&adr, sizeof(adr)); - if (err != 0) { - fprintf(stderr, "failed to bind socket\n"); - OK = 0; - } - } - - if (OK) { - host = gethostbyname(hostname); - if (!host) { - fprintf(stderr, "failed to lookup %s\n", hostname); - OK = 0; - } - } - - if (OK) { - struct sockaddr_in to; - int err; - - to.sin_family = AF_INET; - to.sin_port = htons(port); - bcopy((char*)host->h_addr, (char*)&to.sin_addr, host->h_length); - err = connect(result->fd, (struct sockaddr*)&to, sizeof(to)); - if (err != 0) { - fprintf(stderr, "failed to connect %d@%s\n", port, hostname); - OK = 0; - } - } - - if (OK) { - int nodelay = 1; - setsockopt(result->fd, IPPROTO_TCP, TCP_NODELAY, - &nodelay, sizeof(nodelay)); - } + usleep(100000); - if (OK) { - result->encoder = labcomm_encoder_new(fd_writer, &result->fd); - if (!result->encoder) { - fprintf(stderr, "failed to allocate encoder\n"); - OK = 0; - } else { - labcomm_encoder_register_orca_messages_select_input(result->encoder); - labcomm_encoder_register_orca_messages_select_output(result->encoder); - labcomm_encoder_register_orca_messages_select_parameter(result->encoder); - labcomm_encoder_register_orca_messages_select_log(result->encoder); - } - } + err = labcomm_decoder_ioctl_types_Sum(decoder, SET_DECIMATION, 2); + err = labcomm_decoder_ioctl_types_Diff(decoder, SET_DECIMATION, 4); - if (OK) { - result->decoder = labcomm_decoder_new(fd_reader, &result->fd); - if (!result->decoder) { - fprintf(stderr, "failed to allocate encoder\n"); - OK = 0; - } else { - labcomm_decoder_register_orca_messages_directory(result->decoder, - directory_handler, - result); - labcomm_decoder_decode_one(result->decoder); + for (i = 0 ; i < 4 ; i++) { + for (j = 0 ; j < 4 ; j++) { + printf("A=%d B=%d\n", i, j); + labcomm_encode_types_A(encoder, &i); + labcomm_encode_types_B(encoder, &j); + sleep(1); } } - - if (!OK && result) { - orca_client_free_tcp(result); - result = 0; - } - - return result; -} - -void orca_client_free_tcp( - struct orca_client *client) -{ - close(client->fd); - fprintf(stderr, "IMPLEMENT %s\n", __FUNCTION__); -} - -orca_internal_channel_t *select_tcp( - struct orca_client *client, - void (*encode)(struct labcomm_encoder *e, orca_messages_select_t *v), - orca_messages_connection_list_t *list, - orca_client_selection_t *selection, - direction_t direction, - int decimation) -{ - orca_internal_channel_t *result; - int i, j, n; - - n = 0; - for (i = 0 ; i < selection->n_0 ; i++) { - for (j = 0 ; j < list->n_0 ; j++) { - if (strcmp(selection->a[i], list->a[j].name) == 0) { - n++; - break; - } - } - } - fprintf(stderr, "%d matches\n", n); - if (n) { - int OK, server, port, fd; - - OK = 1; - server = socket(PF_INET, SOCK_STREAM, 0); - if (server < 0) { - fprintf(stderr, "failed to create socket\n"); - OK = 0; - } - if (OK) { - struct sockaddr_in adr; - int err; - - adr.sin_family = AF_INET; - adr.sin_port = htons(0); - adr.sin_addr.s_addr = INADDR_ANY; - err = bind(server, (struct sockaddr*)&adr, sizeof(adr)); - if (err != 0) { - fprintf(stderr, "failed to bind socket\n"); - OK = 0; - } - } - - if (OK) { - int err; - - err = listen(server, 1); - if (err != 0) { - fprintf(stderr, "failed to listen on socket\n"); - OK = 0; - } - } - - if (OK) { - struct sockaddr_in adr; - unsigned int adrlen; - int err; - - adrlen = sizeof(adr); - err = getsockname(server, (struct sockaddr*)&adr, &adrlen); - if (err != 0) { - OK = 0; - } else { - port = ntohs(adr.sin_port); - } - } - - if (OK) { - orca_messages_select_t select; - - select.port = port; - select.decimation = decimation; - select.list.n_0 = n; - select.list.a = malloc(select.list.n_0 * sizeof(*select.list.a)); - if (select.list.a) { - n = 0; - for (i = 0 ; i < selection->n_0 ; i++) { - for (j = 0 ; j < list->n_0 ; j++) { - if (strcmp(selection->a[i], list->a[j].name) == 0) { - fprintf(stderr, "selection->a[%d] = %s %d\n", - n, - selection->a[i], - list->a[j].index); - select.list.a[n] = list->a[j].index; - n++; - break; - } - } - } - encode(client->encoder, &select); - } - free(select.list.a); - } - - if (OK) { - struct sockaddr_in adr; - unsigned int adrlen; - - adr.sin_family = AF_INET; - adr.sin_port = 0; - adr.sin_addr.s_addr = INADDR_ANY; - fprintf(stderr, "Restrict accept: %s %d\n", __FILE__, __LINE__); - //adr.sin_addr = client->remote.sin_addr; - adrlen = sizeof(adr); - fd = accept(server, (struct sockaddr*)&adr, &adrlen); - if (fd < 0) { - OK = 0; - } - } - - if (OK) { - result = malloc(sizeof(orca_internal_channel_t)); - if (! result) { - OK = 0; - } - } - - if (OK) { - result->fd = fd; - result->channel.context = result; - if (direction == direction_read || direction == direction_read_write) { - result->channel.decoder = labcomm_decoder_new(fd_reader, &result->fd); - } else { - result->channel.decoder = 0; - } - if (direction == direction_write || direction == direction_read_write) { - result->channel.encoder = labcomm_encoder_new(fd_writer, &result->fd); - } else { - result->channel.encoder = 0; - } - } - fprintf(stderr, "CONNECTED %p %p\n", - result->channel.encoder, result->channel.decoder); - close(server); - } - return result; -} - - -orca_client_channel_t *orca_client_select_input_tcp( - struct orca_client *client, - orca_client_selection_t *selection) -{ - orca_internal_channel_t *channel; - - channel = select_tcp(client, labcomm_encode_orca_messages_select_input, - &client->directory.input, selection, - direction_write, 1); - return &channel->channel; -} - -orca_client_channel_t *orca_client_select_output_tcp( - struct orca_client *client, - orca_client_selection_t *selection) -{ - orca_internal_channel_t *channel; - - channel = select_tcp(client, labcomm_encode_orca_messages_select_output, - &client->directory.output, selection, - direction_read, 1); - return &channel->channel; -} - -orca_client_channel_t *orca_client_select_parameter_tcp( - struct orca_client *client, - orca_client_selection_t *selection) -{ - orca_internal_channel_t *channel; - - channel = select_tcp(client, labcomm_encode_orca_messages_select_parameter, - &client->directory.parameter, selection, - direction_read_write, 1); - return &channel->channel; -} - -orca_client_channel_t *orca_client_select_log_tcp( - struct orca_client *client, - orca_client_selection_t *selection, - int decimation) -{ - orca_internal_channel_t *channel; + labcomm_encode_types_Terminate(encoder, NULL); +out: + return 0; - channel = select_tcp(client, labcomm_encode_orca_messages_select_log, - &client->directory.log, selection, - direction_read, decimation); - return &channel->channel; } -#endif diff --git a/examples/twoway/decimating.c b/examples/twoway/decimating.c index 1be0f40..769dd30 100644 --- a/examples/twoway/decimating.c +++ b/examples/twoway/decimating.c @@ -8,7 +8,9 @@ struct decimating_private { struct decimating decimating; struct labcomm_encoder *encoder; + int encoder_initialized; struct labcomm_decoder *decoder; + int decoder_initialized; struct orig_reader { void *context; const struct labcomm_reader_action *action; @@ -17,13 +19,25 @@ struct decimating_private { void *context; const struct labcomm_writer_action *action; } orig_writer; + LABCOMM_SIGNATURE_ARRAY_DEF(decimation, + struct decimation { + int n; + int current; + }); }; static void set_decimation( decimating_messages_set_decimation *value, void * context) { - fprintf(stderr, "%s\n", __FUNCTION__); + struct decimating_private *decimating = context; + struct decimation *decimation; + + decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->decimation, + struct decimation, + value->signature_index); + decimation->n = value->decimation; + decimation->current = 0; } static int wrap_reader_alloc(struct labcomm_reader *r, void *context, @@ -34,12 +48,10 @@ static int wrap_reader_alloc(struct labcomm_reader *r, void *context, struct decimating_private *decimating = context; struct orig_reader *orig_reader = &decimating->orig_reader; - fprintf(stderr, "%s\n", __FUNCTION__); + /* Stash away decoder for later use */ + decimating->decoder = decoder; result = orig_reader->action->alloc(r, orig_reader->context, decoder, labcomm_version); - labcomm_decoder_register_decimating_messages_set_decimation(decoder, - set_decimation, - decimating); return result; } @@ -48,7 +60,6 @@ static int wrap_reader_free(struct labcomm_reader *r, void *context) struct decimating_private *decimating = context; struct orig_reader *orig_reader = &decimating->orig_reader; - fprintf(stderr, "%s\n", __FUNCTION__); return orig_reader->action->free(r, orig_reader->context); } @@ -57,7 +68,11 @@ static int wrap_reader_start(struct labcomm_reader *r, void *context) struct decimating_private *decimating = context; struct orig_reader *orig_reader = &decimating->orig_reader; - fprintf(stderr, "%s\n", __FUNCTION__); + if (! decimating->decoder_initialized) { + decimating->decoder_initialized = 1; + labcomm_decoder_register_decimating_messages_set_decimation( + decimating->decoder, set_decimation, decimating); + } return orig_reader->action->start(r, orig_reader->context); } @@ -66,7 +81,6 @@ static int wrap_reader_end(struct labcomm_reader *r, void *context) struct decimating_private *decimating = context; struct orig_reader *orig_reader = &decimating->orig_reader; - fprintf(stderr, "%s\n", __FUNCTION__); return orig_reader->action->end(r, orig_reader->context); } @@ -76,27 +90,33 @@ static int wrap_reader_fill(struct labcomm_reader *r, void *context) struct orig_reader *orig_reader = &decimating->orig_reader; int result; - fprintf(stderr, "%s\n", __FUNCTION__); - fprintf(stderr, "%d\n", decimating->decimating.reader->pos); result = orig_reader->action->fill(r, orig_reader->context); - fprintf(stderr, "%d %d\n", decimating->decimating.reader->pos, result); return result; } static int wrap_reader_ioctl(struct labcomm_reader *r, void *context, - int action, - struct labcomm_signature *signature, va_list args) + int signature_index, + struct labcomm_signature *signature, + uint32_t action, va_list args) { struct decimating_private *decimating = context; struct orig_reader *orig_reader = &decimating->orig_reader; - fprintf(stderr, "%s\n", __FUNCTION__); - if (orig_reader->action->ioctl == NULL) { - return -ENOTSUP; + if (action == SET_DECIMATION) { + decimating_messages_set_decimation decimation; + va_list va; + + va_copy(va, args); + decimation.decimation = va_arg(va, int); + decimation.signature_index = signature_index; + va_end(va); + return labcomm_encode_decimating_messages_set_decimation( + decimating->encoder, &decimation); } else { return orig_reader->action->ioctl(r, orig_reader->context, - action, signature, args); + signature_index, signature, action, args); } + } struct labcomm_reader_action decimating_reader_action = { @@ -108,6 +128,90 @@ struct labcomm_reader_action decimating_reader_action = { .ioctl = wrap_reader_ioctl }; +int wrap_writer_alloc(struct labcomm_writer *w, void *context, + struct labcomm_encoder *encoder, char *labcomm_version) +{ + struct decimating_private *decimating = context; + struct orig_writer *orig_writer = &decimating->orig_writer; + + /* Stash away encoder for later use */ + decimating->encoder = encoder; + return orig_writer->action->alloc(w, orig_writer->context, + encoder, labcomm_version); +} + +int wrap_writer_free(struct labcomm_writer *w, void *context) +{ + struct decimating_private *decimating = context; + struct orig_writer *orig_writer = &decimating->orig_writer; + + return orig_writer->action->free(w, orig_writer->context); +} + +int wrap_writer_start(struct labcomm_writer *w, void *context, + struct labcomm_encoder *encoder, + int index, struct labcomm_signature *signature, + void *value) +{ + struct decimating_private *decimating = context; + struct orig_writer *orig_writer = &decimating->orig_writer; + struct decimation *decimation; + + if (! decimating->encoder_initialized) { + decimating->encoder_initialized = 1; + labcomm_encoder_register_decimating_messages_set_decimation( + decimating->encoder); + } + decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->decimation, + struct decimation, index); + decimation->current++; + if (decimation->current < decimation->n) { + return -EALREADY; + } else { + decimation->current = 0; + return orig_writer->action->start(w, orig_writer->context, + encoder, index, signature, value); + } +} + +int wrap_writer_end(struct labcomm_writer *w, void *context) +{ + struct decimating_private *decimating = context; + struct orig_writer *orig_writer = &decimating->orig_writer; + + return orig_writer->action->end(w, orig_writer->context); +} + +int wrap_writer_flush(struct labcomm_writer *w, void *context) +{ + struct decimating_private *decimating = context; + struct orig_writer *orig_writer = &decimating->orig_writer; + + return orig_writer->action->flush(w, orig_writer->context); +} + +int wrap_writer_ioctl(struct labcomm_writer *w, void *context, + int signature_index, + struct labcomm_signature *signature, + uint32_t action, va_list args) +{ + struct decimating_private *decimating = context; + struct orig_writer *orig_writer = &decimating->orig_writer; + + return orig_writer->action->ioctl(w, orig_writer->context, + signature_index, signature, action, args); +} + + +struct labcomm_writer_action decimating_writer_action = { + .alloc = wrap_writer_alloc, + .free = wrap_writer_free, + .start = wrap_writer_start, + .end = wrap_writer_end, + .flush = wrap_writer_flush, + .ioctl = wrap_writer_ioctl +}; + extern struct decimating *decimating_new( struct labcomm_reader *reader, struct labcomm_writer *writer, @@ -120,11 +224,27 @@ extern struct decimating *decimating_new( goto out_fail; } + /* Wrap reader and writer */ result->orig_reader.context = reader->context; result->orig_reader.action = reader->action; reader->context = result; reader->action = &decimating_reader_action; + + result->orig_writer.context = writer->context; + result->orig_writer.action = writer->action; + writer->context = result; + writer->action = &decimating_writer_action; + + /* Init visible result struct */ result->decimating.reader = reader; + result->decimating.writer = writer; + + /* Init other fields */ + result->encoder = NULL; + result->encoder_initialized = 0; + result->decoder = NULL; + result->decoder_initialized = 0; + LABCOMM_SIGNATURE_ARRAY_INIT(result->decimation, struct decimation); goto out_ok; diff --git a/examples/twoway/decimating.h b/examples/twoway/decimating.h index 767d1fa..ca539c0 100644 --- a/examples/twoway/decimating.h +++ b/examples/twoway/decimating.h @@ -3,8 +3,6 @@ #include <labcomm.h> #include <labcomm_ioctl.h> -#include <labcomm_fd_reader.h> -#include <labcomm_fd_writer.h> struct decimating { struct labcomm_reader *reader; @@ -17,6 +15,5 @@ extern struct decimating *decimating_new( struct labcomm_lock *lock); #define SET_DECIMATION LABCOMM_IOSW('d',0,int) -#define GET_DECIMATION LABCOMM_IOSR('d',1,int) #endif diff --git a/examples/twoway/decimating_messages.lc b/examples/twoway/decimating_messages.lc index 00de820..8763873 100644 --- a/examples/twoway/decimating_messages.lc +++ b/examples/twoway/decimating_messages.lc @@ -1,4 +1,4 @@ sample struct { int decimation; - byte signature[_]; + int signature_index; } set_decimation; \ No newline at end of file diff --git a/examples/twoway/server.c b/examples/twoway/server.c index 4d6e22d..2d22e7b 100644 --- a/examples/twoway/server.c +++ b/examples/twoway/server.c @@ -2,23 +2,95 @@ #include <errno.h> #include <pthread.h> #include <stdlib.h> +#include <stdio.h> #include <sys/socket.h> #include <sys/types.h> +#include <unistd.h> +#include <labcomm_pthread_mutex_lock.h> +#include <labcomm_fd_reader.h> +#include <labcomm_fd_writer.h> #include "decimating.h" - +#include "gen/types.h" + struct client { int fd; pthread_t main_thread; pthread_t decoder_thread; struct sockaddr_in adr; unsigned int adrlen; + int32_t A, B, Sum, Diff; + struct labcomm_decoder *decoder; + struct labcomm_encoder *encoder; }; +static void handle_A(int32_t *value, void *context) +{ + struct client *client = context; + + client->A = *value; +} + +static void handle_B(int32_t *value, void *context) +{ + struct client *client = context; + + client->B = *value; + client->Sum = client->A + client->B; + client->Diff = client->A - client->B; + labcomm_encode_types_Sum(client->encoder, &client->Sum); + labcomm_encode_types_Diff(client->encoder, &client->Diff); +} + +static void handle_Terminate(types_Terminate *value, void *context) +{ + exit(0); +} + +static void *run_decoder(void *context) +{ + struct labcomm_decoder *decoder = context; + int result; + + do { + result = labcomm_decoder_decode_one(decoder); + } while (result >= 0); + return NULL; +} + static void *run_client(void *arg) { struct client *client = arg; - + struct decimating *decimating; + struct labcomm_lock *lock; + + printf("Client start\n"); + client->A = 0; + client->B = 0; + lock = labcomm_pthread_mutex_lock_new(); + decimating = decimating_new(labcomm_fd_reader_new(client->fd, 1), + labcomm_fd_writer_new(client->fd, 0), + lock); + if (decimating == NULL) { + /* Warning: might leak reader and writer at this point */ + goto out; + } + client->decoder = labcomm_decoder_new(decimating->reader, lock); + client->encoder = labcomm_encoder_new(decimating->writer, lock); + pthread_t rdt; + + labcomm_decoder_register_types_A(client->decoder, handle_A, client); + labcomm_decoder_register_types_B(client->decoder, handle_B, client); + labcomm_decoder_register_types_Terminate(client->decoder, handle_Terminate, + NULL); + pthread_create(&rdt, NULL, run_decoder, client->decoder); + labcomm_encoder_register_types_Sum(client->encoder); + labcomm_encoder_register_types_Diff(client->encoder); + pthread_join(rdt, NULL); +out: + printf("Client end\n"); shutdown(client->fd, SHUT_RDWR); + close(client->fd); + free(client); return NULL; } @@ -80,1188 +152,3 @@ no_server_socket: } -#if 0 -#include <errno.h> -#include <netinet/in.h> -#include <unistd.h> -#include <linux/tcp.h> -#include "labcomm.h" -#include "labcomm_private.h" -#include "orca_messages.h" -#include "orca_server.h" - -#if 0 -static long long rdtscll() -{ - long long result; - __asm__ __volatile__("rdtsc" : "=A" (result)); - return result; -} -static long long epoch; - -static void *my_malloc(int size, int line) { - void *p = malloc(size); - if (!epoch) { - epoch = rdtscll(); - } - fprintf(stderr, "malloc: %16.16Ld %p %d %d\n", - rdtscll() - epoch, p, size, line); - fflush(stderr); - return p; -} - -static void my_free(void *p, int line) { - fprintf(stderr, "free: %16.16Ld %p %d\n", - rdtscll() - epoch, p, line); - fflush(stderr); - free(p); -} - - -#define malloc(size) my_malloc(size, __LINE__) -#define free(p) my_free(p, __LINE__) -#endif - -typedef enum { - input_kind = 0, - output_kind = 1, - parameter_kind = 2, - log_kind = 3 -} kind_t; - -typedef struct sample { - struct orca_server *server; - labcomm_signature_t *signature; - labcomm_encode_typecast_t encode; - labcomm_decoder_typecast_t decode; - labcomm_handler_typecast_t handle; - void *handle_context; - struct { - int n_0; - struct connection **a; - } connection; -} sample_t; - -typedef enum { - input_t = 0, - output_t = 1, - parameter_t = 2, - log_t = 3 -} channel_kind_t; - -typedef struct { - struct orca_server *context; - labcomm_decoder_t decoder; - labcomm_encoder_t encoder; - orca_channel_t orca_channel; - struct { - int n_0; - struct sample **a; - } sample; -} channel_t; - -typedef struct orca_server { - orca_lock_t *lock; - orca_server_context_t *context; - int input_buffer_size; - int output_buffer_size; - int parameter_buffer_size; - int log_buffer_size; - struct { - int n_0; - struct connection **a; - } connection; - channel_t channel[4]; -} orca_server_t; - -typedef struct connection { - void *context; - orca_server_t *server; - struct { - int current; - int max; - } decimation; - struct labcomm_encoder *encoder; - struct labcomm_decoder *decoder; - struct { - int n_0; - struct sample **a; - } sample; -} connection_t; - -static sample_t *sample_by_signature( - orca_lock_t *lock, - channel_t *channel, - labcomm_signature_t *signature, - int create) -{ - sample_t *result = 0; - int i; - - if (lock) { lock->lock(lock); } - for (i = 0 ; !result && i < channel->sample.n_0 ; i++) { - if (channel->sample.a[i]->signature == signature) { - result = channel->sample.a[i]; - } - } - if (lock) { lock->unlock(lock); } - - if (!result && create) { - int n; - sample_t **new_a, **old_a; - sample_t *new_sample; - - new_a = 0; - old_a = 0; - new_sample = 0; - - if (lock) { lock->lock(lock); } - while (1) { - n = channel->sample.n_0 + 1; - - // Allocation without holding lock - if (lock) { lock->unlock(lock); } - - // Free list from previous iteration - if (new_a) { free(new_a); } - - // Try new allocation - if (!new_sample) { new_sample = malloc(sizeof(sample_t)); } - new_a = malloc(n * sizeof(*new_a)); - if (lock) { lock->lock(lock); } - - // Break if list length is unchanged - if (n == channel->sample.n_0 + 1) { - break; - } - } - - if (new_sample && new_a) { - result = new_sample; - old_a = channel->sample.a; - channel->sample.n_0 = n; - for (i = 0 ; i < n - 1 ; i++) { - new_a[i] = old_a[i]; - } - new_a[n - 1] = result; - channel->sample.a = new_a; - - result->server = channel->context; - result->signature = signature; - result->connection.n_0 = 0; - result->connection.a = 0; - result->encode = 0; - result->decode = 0; - result->handle = 0; - result->handle_context = 0; - new_a = 0; - new_sample = 0; - } - if (lock) { lock->unlock(lock); } - if (old_a) { free(old_a); } - if (new_a) { free(new_a); } - if (new_sample) { free(new_sample); } - } - return result; -} - -static void do_decoder_register( - struct labcomm_decoder *d, - labcomm_signature_t *signature, - labcomm_decoder_typecast_t decode, - labcomm_handler_typecast_t handle, - void *handle_context) -{ - channel_t *channel = d->context; - orca_server_t *server = channel->context; - sample_t *sample; - - sample = sample_by_signature(server->lock, channel, signature, 1); - if (sample) { - server->lock->lock(server->lock); - sample->decode = decode; - sample->handle = handle; - sample->handle_context = handle_context; - server->lock->unlock(server->lock); - } -} - -static void do_encoder_register( - struct labcomm_encoder *e, - labcomm_signature_t *signature, - labcomm_encode_typecast_t encode) -{ - channel_t *channel = e->context; - orca_server_t *server = channel->context; - sample_t *sample; - - sample = sample_by_signature(server->lock, channel, signature, 1); - if (sample) { - server->lock->lock(server->lock); - sample->encode = encode; - server->lock->unlock(server->lock); - } -} - -static void do_encode( - struct labcomm_encoder *encoder, - labcomm_signature_t *signature, - void *value) -{ - channel_t *channel = encoder->context; - sample_t *sample; - - // Lock should be held when this is called !! - sample = sample_by_signature(0, channel, signature, 0); - if (sample && sample->connection.n_0) { - int i, size, available; - - size = signature->encoded_size(value); - for (i = 0 ; i < sample->connection.n_0 ; i++) { - connection_t *c = sample->connection.a[i]; - - if (c->decimation.current == 0) { - labcomm_writer_t *writer = &c->encoder->writer; - - available = writer->write(writer, labcomm_writer_available); - if (available >= size) { - labcomm_internal_encode(sample->connection.a[i]->encoder, - signature, - value); - } - } - } - } -} - -static void connect_connection_and_server( - connection_t *connection, - orca_server_t *server) -{ - int n, i; - connection_t **new_a, **old_a; - - new_a = 0; - old_a = 0; - server->lock->lock(server->lock); - while (1) { - n = server->connection.n_0 + 1; - - // Allocation without holding lock - server->lock->unlock(server->lock); - - // Free list from previous iteration - if (new_a) { free(new_a); } - - // Try new allocation - new_a = malloc(n * sizeof(*new_a)); - - // Reclaim lock - server->lock->lock(server->lock); - - // Break if list length is the desired one - if (n == server->connection.n_0 + 1) { - break; - } - } - - if (new_a) { - old_a = server->connection.a; - server->connection.n_0 = n; - for (i = 0 ; i < n - 1 ; i++) { - new_a[i] = old_a[i]; - } - new_a[n - 1] = connection; - server->connection.a = new_a; - new_a = 0; - } - server->lock->unlock(server->lock); - - if (old_a) { free(old_a); } - if (new_a) { free(new_a); } -} - -static void disconnect_connection_and_server( - connection_t *connection) -{ - orca_server_t *server = connection->server; - int i, j; - - server->lock->lock(server->lock); - for (i = 0, j = 0 ; i < server->connection.n_0 ; i++) { - if (server->connection.a[i] != connection) { - server->connection.a[j] = server->connection.a[i]; - j++; - } - } - server->connection.n_0 = j; - server->lock->unlock(server->lock); -} - -static void connect_channel_and_sample( - connection_t *connection, - sample_t *sample) -{ - orca_lock_t *lock; - int connection_n, sample_n; - sample_t **new_sample_a, **old_sample_a; - connection_t **new_connection_a, **old_connection_a; - - lock = connection->server->lock; - - old_sample_a = 0; - old_connection_a = 0; - new_sample_a = 0; - new_connection_a = 0; - - lock->lock(lock); - while (1) { - connection_n = sample->connection.n_0 + 1; - sample_n = connection->sample.n_0 + 1; - - lock->unlock(lock); - - // Free result from previous iteration - if (new_connection_a) { free(new_connection_a); } - if (new_sample_a) { free(new_sample_a); } - - // Try new allocation - new_sample_a = malloc(sample_n * sizeof(*new_sample_a)); - new_connection_a = malloc(connection_n * sizeof(*new_connection_a)); - - lock->lock(lock); - - // Break if list length are unchanged - if (connection_n == sample->connection.n_0 + 1 && - sample_n == connection->sample.n_0 + 1) { - break; - } - } - if (new_sample_a && new_connection_a) { - int i; - - old_connection_a = sample->connection.a; - sample->connection.a = new_connection_a; - sample->connection.n_0 = connection_n; - - old_sample_a = connection->sample.a; - connection->sample.a = new_sample_a; - connection->sample.n_0 = sample_n; - - for (i = 0 ; i < connection_n - 1 ; i++) { - new_connection_a[i] = old_connection_a[i]; - } - new_connection_a[connection_n - 1] = connection; - - for (i = 0 ; i < sample_n - 1 ; i++) { - new_sample_a[i] = old_sample_a[i]; - } - new_sample_a[sample_n - 1] = sample; - - new_sample_a = 0; - new_connection_a = 0; - } - lock->unlock(lock); - - if (new_connection_a) { free(new_connection_a); } - if (old_connection_a) { free(old_connection_a); } - if (new_sample_a) { free(new_sample_a); } - if (old_sample_a) { free(old_sample_a); } -} - -static void disconnect_channel_and_sample( - connection_t *connection) -{ - orca_lock_t *lock; - int i; - void *a; - - lock = connection->server->lock; - lock->lock(lock); - for (i = 0 ; i < connection->sample.n_0 ; i++) { - int j, k; - sample_t *sample = connection->sample.a[i]; - - for (j = 0, k = 0 ; j < sample->connection.n_0 ; j++) { - if (sample->connection.a[j] != connection) { - sample->connection.a[k] = sample->connection.a[j]; - k++; - } - } - sample->connection.n_0 = k; - } - a = connection->sample.a; - connection->sample.a = 0; - connection->sample.n_0 = 0; - lock->unlock(lock); - free(a); -} - -orca_server_t *orca_server_new( - orca_lock_t *lock, - orca_server_context_t *context, - int input_buffer_size, - int output_buffer_size, - int parameter_buffer_size, - int log_buffer_size) -{ - orca_server_t *result; - - result = malloc(sizeof(orca_server_t)); - if (result) { - channel_kind_t sc; - - result->lock = lock; - result->context = context; - result->input_buffer_size = input_buffer_size; - result->output_buffer_size = output_buffer_size; - result->parameter_buffer_size = parameter_buffer_size; - result->log_buffer_size = log_buffer_size; - result->connection.n_0 = 0; - result->connection.a = 0; - - for (sc = input_t ; sc <= log_t ; sc++) { - result->channel[sc].context = result; - - result->channel[sc].decoder.context = &result->channel[sc]; - result->channel[sc].decoder.reader.read = 0; - result->channel[sc].decoder.do_register = do_decoder_register; - result->channel[sc].decoder.do_decode_one = 0; - - result->channel[sc].encoder.context = &result->channel[sc]; - result->channel[sc].encoder.writer.write = 0; - result->channel[sc].encoder.do_register = do_encoder_register; - result->channel[sc].encoder.do_encode = do_encode; - - result->channel[sc].orca_channel.decoder = &result->channel[sc].decoder ; - result->channel[sc].orca_channel.encoder = &result->channel[sc].encoder ; - - result->channel[sc].sample.n_0 = 0; - result->channel[sc].sample.a = 0; - } - } - return result; -} - -void orca_server_free( - struct orca_server *server) -{ - channel_kind_t sc; - - // What should we do about open connections? - for (sc = input_t ; sc <= log_t ; sc++) { - int i; - - for (i = 0 ; i < server->channel[sc].sample.n_0 ; i++) { - free(server->channel[sc].sample.a[i]); - } - free(server->channel[sc].sample.a); - } - free(server); - printf("CHECK implementation %s\n", __FUNCTION__); -} - -void orca_server_next_sample( - struct orca_server *server) -{ - int i; - - for (i = 0 ; i < server->connection.n_0 ; i++) { - connection_t *c = server->connection.a[i]; - int max = c->decimation.max; - - if (max) { - c->decimation.current++; - if (c->decimation.current >= max) { - c->decimation.current = 0; - } - } - } -} - -orca_channel_t *orca_server_get_input_channel( - struct orca_server *server) -{ - return &server->channel[input_t].orca_channel; -} - -orca_channel_t *orca_server_get_output_channel( - struct orca_server *server) -{ - return &server->channel[output_t].orca_channel; -} - -orca_channel_t *orca_server_get_parameter_channel( - struct orca_server *server) -{ - return &server->channel[parameter_t].orca_channel; -} - -orca_channel_t *orca_server_get_log_channel( - struct orca_server *server) -{ - return &server->channel[log_t].orca_channel; -} - -// -// TCP specific stuff -// - -typedef struct { - orca_server_t *server; - int fd; - struct sockaddr_in remote; -} tcp_setup_connection_t; - -typedef struct { - int fd; - int write_pos; - int established; - int buffer_size; -} tcp_nonblocking_t; - -typedef struct { - int closed; - int use_count; - connection_t connection; - int fd; - tcp_nonblocking_t write_context; -} tcp_connection_t; - -static int nonblocking_fd_writer( - labcomm_writer_t *w, - labcomm_writer_action_t action) -{ - int result = 0; - tcp_nonblocking_t *context = w->context; - - switch (action) { - case labcomm_writer_alloc: { - int size = context->buffer_size; - w->data = malloc(size); - if (! w->data) { - result = -ENOMEM; - w->data_size = 0; - w->count = 0; - w->pos = 0; - } else { - w->data_size = size; - w->count = size; - w->pos = 0; - } - } break; - case labcomm_writer_free: { - free(w->data); - w->data = 0; - w->data_size = 0; - w->count = 0; - w->pos = 0; - } break; - case labcomm_writer_start: { - if (!context->established) { - w->pos = 0; - } - } break; - case labcomm_writer_continue: { - if (!context->established) { - result = write(context->fd, w->data, w->pos); - w->pos = 0; - } else { - printf("Buffer overrun %s\n", __FUNCTION__); - exit(1); - } - } break; - case labcomm_writer_end: { - if (!context->established) { - result = write(context->fd, w->data, w->pos); - w->pos = 0; - } - } break; - case labcomm_writer_available: { - result = w->count - w->pos; - } break; - } - return result; -} - -static int fd_writer( - labcomm_writer_t *w, - labcomm_writer_action_t action) -{ - int result = 0; - int *fd = w->context; - - switch (action) { - case labcomm_writer_alloc: { - w->data = malloc(1000); - if (! w->data) { - result = -ENOMEM; - w->data_size = 0; - w->count = 0; - w->pos = 0; - } else { - w->data_size = 1000; - w->count = 1000; - w->pos = 0; - } - } break; - case labcomm_writer_free: { - free(w->data); - w->data = 0; - w->data_size = 0; - w->count = 0; - w->pos = 0; - } break; - case labcomm_writer_start: { - w->pos = 0; - } break; - case labcomm_writer_continue: { - result = write(*fd, w->data, w->pos); - w->pos = 0; - } break; - case labcomm_writer_end: { - result = write(*fd, w->data, w->pos); - w->pos = 0; - } break; - case labcomm_writer_available: { - result = w->count - w->pos; - } break; - } - return result; -} - -static int fd_reader( - labcomm_reader_t *r, - labcomm_reader_action_t action) -{ - int result = -EINVAL; - int *fd = r->context; - - switch (action) { - case labcomm_reader_alloc: { - r->data = malloc(1000); - if (r->data) { - r->data_size = 1000; - result = r->data_size; - } else { - r->data_size = 0; - result = -ENOMEM; - } - r->count = 0; - r->pos = 0; - } break; - case labcomm_reader_start: - case labcomm_reader_continue: { - if (r->pos < r->count) { - result = r->count - r->pos; - } else { - int err; - - r->pos = 0; - err = read(*fd, r->data, r->data_size); - if (err <= 0) { - r->count = 0; - result = -1; - } else { - r->count = err; - result = r->count - r->pos; - } - } - } break; - case labcomm_reader_end: { - result = 0; - } break; - case labcomm_reader_free: { - free(r->data); - r->data = 0; - r->data_size = 0; - r->count = 0; - r->pos = 0; - result = 0; - } break; - } - return result; -} - -static tcp_connection_t *do_tcp_connect( - tcp_setup_connection_t *connection, - int port, - int buffer_size, - int decimation) -{ - tcp_connection_t *result; - int fd, OK; - - OK = 1; - if (OK) { - fd = socket(PF_INET, SOCK_STREAM, 0); - if (fd < 0) { - printf("failed to create socket\n"); - OK = 0; - } - } - - if (OK) { - int reuse; - - reuse = 1; - setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); - } - - if (OK) { - struct sockaddr_in adr; - int err; - - adr.sin_family = AF_INET; - adr.sin_port = 0; - adr.sin_addr.s_addr = INADDR_ANY; - err = bind(fd, (struct sockaddr*)&adr, sizeof(adr)); - if (err != 0) { - printf("failed to bind socket\n"); - OK = 0; - } - } - - if (OK) { - struct sockaddr_in to; - int err; - - to.sin_family = AF_INET; - to.sin_port = htons(port); - to.sin_addr = connection->remote.sin_addr; - err = connect(fd, (struct sockaddr*)&to, sizeof(to)); - if (err != 0) { - printf("failed to connect %d@%s\n", port, inet_ntoa(to.sin_addr)); - printf("%d %d %x\n", port, htons(port), port); - OK = 0; - } - } - - if (OK) { - int nodelay = 1; - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); - } - - if (!OK && fd >= 0) { - close(fd); - fd = -1; - } - - if (OK) { - result = malloc(sizeof(*result)); - if (!result) { - OK = 0; - } - } - - if (!OK) { - result = 0; - } else { - result->closed = 0; - result->use_count = 0; - result->connection.context = result; - result->connection.server = connection->server; - result->connection.decimation.current = decimation; - result->connection.decimation.max = decimation; - result->connection.encoder = 0; - result->connection.decoder = 0; - result->connection.sample.n_0 = 0; - result->connection.sample.a = 0; - result->fd = fd; - result->write_context.fd = fd; - result->write_context.write_pos = 0; - result->write_context.established = 0; - result->write_context.buffer_size = buffer_size; - } - - return result; -} - -static void do_tcp_disconnect(tcp_connection_t *tcp_connection) -{ - orca_lock_t *lock = tcp_connection->connection.server->lock; - int use_count; - - shutdown(tcp_connection->fd, SHUT_RDWR); - close(tcp_connection->fd); - - lock->lock(lock); - tcp_connection->closed = 1; - tcp_connection->use_count--; - use_count = tcp_connection->use_count; - lock->unlock(lock); - - printf("%s use %d\n", __FUNCTION__, use_count); - if (use_count == 0) { - disconnect_channel_and_sample(&tcp_connection->connection); - disconnect_connection_and_server(&tcp_connection->connection); - if (tcp_connection->connection.decoder) { - labcomm_decoder_free(tcp_connection->connection.decoder); - } - if (tcp_connection->connection.encoder) { - labcomm_encoder_free(tcp_connection->connection.encoder); - } - free(tcp_connection); - } -} - -static void lock_and_handle( - void *value, - void *context) -{ - sample_t *sample = context; - orca_lock_t *lock = sample->server->lock; - - lock->lock(lock); - sample->handle(value, sample->handle_context); - lock->unlock(lock); -} - -static int register_select_handler_tcp( - channel_t *channel, - orca_messages_select_t *v, - tcp_connection_t *tcp_connection) -{ - int i; - int n = 0; - - for (i = 0 ; i < v->list.n_0 ; i++) { - if (v->list.a[i] < channel->sample.n_0) { - sample_t *sample = channel->sample.a[v->list.a[i]]; - - n++; - if (tcp_connection->connection.decoder) { - labcomm_internal_decoder_register(tcp_connection->connection.decoder, - sample->signature, - sample->decode, - lock_and_handle, - sample); - } - if (tcp_connection->connection.encoder) { - labcomm_internal_encoder_register(tcp_connection->connection.encoder, - sample->signature, - sample->encode); - } - } - } - // Switch to non-blocking writes - tcp_connection->write_context.established = 1; - for (i = 0 ; i < v->list.n_0 ; i++) { - if (v->list.a[i] < channel->sample.n_0) { - sample_t *sample = channel->sample.a[v->list.a[i]]; - connect_channel_and_sample(&tcp_connection->connection, sample); - } - } - return n; -} - -static void *decoder_run_main(void *context) -{ - tcp_connection_t *tcp_connection = context; - - printf("Start %s %p\n", __FUNCTION__, context); - labcomm_decoder_run(tcp_connection->connection.decoder); - do_tcp_disconnect(tcp_connection); - - printf("Finish %s %p\n", __FUNCTION__, context); - return 0; -} - -static void *tcp_deferred_write_main(void *context) -{ - tcp_connection_t *tcp_connection = context; - orca_server_t *server = tcp_connection->connection.server; - orca_lock_t *lock = tcp_connection->connection.server->lock; - - printf("Start %s %p\n", __FUNCTION__, context); - while (1) { - int start, end; - - server->context->await_deferred_write(server->context); - start = tcp_connection->write_context.write_pos; - end = tcp_connection->connection.encoder->writer.pos; - if (tcp_connection->closed) { - break; - } else if (end - start > 0) { - int err; - err = write(tcp_connection->write_context.fd, - &tcp_connection->connection.encoder->writer.data[start], - end - start); - if (err < 0) { - break; - } else { - tcp_connection->write_context.write_pos += err; - lock->lock(lock); - if (tcp_connection->connection.encoder->writer.pos == - tcp_connection->write_context.write_pos) { - tcp_connection->connection.encoder->writer.pos = 0; - tcp_connection->write_context.write_pos = 0; - } - lock->unlock(lock); - } - } - } - do_tcp_disconnect(tcp_connection); - - printf("Finish %s %p\n", __FUNCTION__, context); - return 0; -} - - -static void select_input_handler_tcp( - orca_messages_select_input *v, - void *context) -{ - tcp_setup_connection_t *tcp_setup_connection = context; - orca_server_t *server = tcp_setup_connection->server; - tcp_connection_t *tcp_connection; - - tcp_connection = do_tcp_connect(tcp_setup_connection, v->port, - server->input_buffer_size, 0); - if (tcp_connection) { - tcp_connection->use_count += 1; - tcp_connection->connection.decoder = - labcomm_decoder_new(fd_reader, &tcp_connection->fd); - connect_connection_and_server(&tcp_connection->connection, server); - if (register_select_handler_tcp(&server->channel[input_t], - v, tcp_connection)) { - server->context->spawn_task(decoder_run_main, tcp_connection); - } else { - do_tcp_disconnect(tcp_connection); - } - } -} - -static void select_output_handler_tcp( - orca_messages_select_output *v, - void *context) -{ - tcp_setup_connection_t *tcp_setup_connection = context; - orca_server_t *server = tcp_setup_connection->server; - tcp_connection_t *tcp_connection; - - tcp_connection = do_tcp_connect(tcp_setup_connection, v->port, - server->output_buffer_size, v->decimation); - if (tcp_connection) { - tcp_connection->use_count += 1; - tcp_connection->connection.encoder = - labcomm_encoder_new(nonblocking_fd_writer, - &tcp_connection->write_context); - connect_connection_and_server(&tcp_connection->connection, server); - if (register_select_handler_tcp(&server->channel[output_t], - v, tcp_connection)) { - server->context->spawn_task(tcp_deferred_write_main, tcp_connection); - } else { - do_tcp_disconnect(tcp_connection); - } - } -} - -static void select_parameter_handler_tcp( - orca_messages_select_parameter *v, - void *context) -{ - tcp_setup_connection_t *tcp_setup_connection = context; - orca_server_t *server = tcp_setup_connection->server; - tcp_connection_t *tcp_connection; - - tcp_connection = do_tcp_connect(tcp_setup_connection, v->port, - server->parameter_buffer_size, 0); - if (tcp_connection) { - tcp_connection->use_count += 2; - - tcp_connection->connection.decoder = - labcomm_decoder_new(fd_reader, &tcp_connection->fd); - tcp_connection->connection.encoder = - labcomm_encoder_new(nonblocking_fd_writer, - &tcp_connection->write_context); - connect_connection_and_server(&tcp_connection->connection, server); - if (register_select_handler_tcp(&server->channel[parameter_t], - v, tcp_connection)) { - server->context->spawn_task(decoder_run_main, tcp_connection); - server->context->spawn_task(tcp_deferred_write_main, tcp_connection); - // Force current values to be sent to all clients - { - int i; - channel_t *channel = &server->channel[parameter_t]; - - for (i = 0 ; i < v->list.n_0 ; i++) { - if (v->list.a[i] < channel->sample.n_0) { - sample_t *sample = channel->sample.a[v->list.a[i]]; - lock_and_handle(0, sample); - } - } - } - } else { - do_tcp_disconnect(tcp_connection); - } - } -} - -static void select_log_handler_tcp( - orca_messages_select_log *v, - void *context) -{ - tcp_setup_connection_t *tcp_setup_connection = context; - orca_server_t *server = tcp_setup_connection->server; - tcp_connection_t *tcp_connection; - - tcp_connection = do_tcp_connect(tcp_setup_connection, v->port, - server->log_buffer_size, v->decimation); - if (tcp_connection) { - tcp_connection->use_count += 1; - tcp_connection->connection.encoder = - labcomm_encoder_new(nonblocking_fd_writer, - &tcp_connection->write_context); - connect_connection_and_server(&tcp_connection->connection, server); - if (register_select_handler_tcp(&server->channel[log_t], - v, tcp_connection)) { - server->context->spawn_task(tcp_deferred_write_main, tcp_connection); - } else { - do_tcp_disconnect(tcp_connection); - } - } -} - -static void make_connection_list( - orca_lock_t *lock, - orca_messages_connection_list_t *dest, - channel_t *src) -{ - int i; - - dest->n_0 = src->sample.n_0; - dest->a = malloc(dest->n_0 * sizeof(*dest->a)); - if (dest->a) { - lock->lock(lock); - for (i = 0 ; i < dest->n_0 ; i++) { - dest->a[i].index = i; - dest->a[i].name = src->sample.a[i]->signature->name; - dest->a[i].signature.n_0 = src->sample.a[i]->signature->size; - dest->a[i].signature.a = src->sample.a[i]->signature->signature; - } - lock->unlock(lock); - } -} - -static void encode_orca_directory( - struct labcomm_encoder *encoder, - orca_server_t *server) -{ - orca_messages_directory directory; - - make_connection_list(server->lock, - &directory.input, - &server->channel[input_t]); - - make_connection_list(server->lock, - &directory.output, - &server->channel[output_t]); - - make_connection_list(server->lock, - &directory.parameter, - &server->channel[parameter_t]); - - make_connection_list(server->lock, - &directory.log, - &server->channel[log_t]); - - labcomm_encode_orca_messages_directory(encoder, &directory); - - free(directory.input.a); - free(directory.output.a); - free(directory.parameter.a); - free(directory.log.a); -} - -static void *tcp_setup_connection_main(void *argument) -{ - tcp_setup_connection_t *tcp_setup_connection = argument; - struct labcomm_encoder *encoder; - struct labcomm_decoder *decoder; - - encoder = labcomm_encoder_new(fd_writer, &tcp_setup_connection->fd); - decoder = labcomm_decoder_new(fd_reader, &tcp_setup_connection->fd); - labcomm_encoder_register_orca_messages_directory(encoder); - labcomm_decoder_register_orca_messages_select_input( - decoder, select_input_handler_tcp, tcp_setup_connection); - labcomm_decoder_register_orca_messages_select_output( - decoder, select_output_handler_tcp, tcp_setup_connection); - labcomm_decoder_register_orca_messages_select_parameter( - decoder, select_parameter_handler_tcp, tcp_setup_connection); - labcomm_decoder_register_orca_messages_select_log( - decoder, select_log_handler_tcp, tcp_setup_connection); - encode_orca_directory(encoder, tcp_setup_connection->server); - labcomm_decoder_run(decoder); - - shutdown(tcp_setup_connection->fd, SHUT_RDWR); - close(tcp_setup_connection->fd); - labcomm_encoder_free(encoder); - labcomm_decoder_free(decoder); - free(tcp_setup_connection); - return 0; -} - -int orca_server_run_tcp_cb( - struct orca_server *server, - int port, - int backlog, - void (*cb)(struct orca_server *server, char *message, int err)) -{ - int result; - int server_fd; - - - - if (cb) { cb(server, "Listening for client connections", 0); } - while (1) { - struct sockaddr_in adr; - unsigned int adrlen; - int fd; - tcp_setup_connection_t *tcp_setup_connection; - - adr.sin_family = AF_INET; - adr.sin_port = 0; - adr.sin_addr.s_addr = INADDR_ANY; - adrlen = sizeof(adr); - fd = accept(server_fd, (struct sockaddr*)&adr, &adrlen); - if (fd < 0) { - if (cb) { cb(server, "Failed to accept on socket", errno); } - result = -errno; - goto failed_to_accept; - } - tcp_setup_connection = malloc(sizeof(tcp_setup_connection_t)); - if (!tcp_setup_connection) { - cb(server, "Failed to allocate client memeory", -ENOMEM); - shutdown(fd, SHUT_RDWR); - } else { - if (cb) { - char buf[128]; - - sprintf(buf, "Got server connection: %s %x", - inet_ntoa(adr.sin_addr), ntohs(adr.sin_port)); - cb(server, buf, errno); - } - tcp_setup_connection->fd = fd; - tcp_setup_connection->remote = adr; - tcp_setup_connection->server = server; - server->context->spawn_task(tcp_setup_connection_main, - tcp_setup_connection); - } - } -failed_to_accept: -failed_to_listen: -failed_to_bind: - close(server_fd); -no_server_socket: - return result; -} - -int orca_server_run_tcp( - struct orca_server *server, - int port, - int backlog) -{ - return orca_server_run_tcp_cb(server, port, backlog, NULL); -} -#endif diff --git a/examples/twoway/types.lc b/examples/twoway/types.lc new file mode 100644 index 0000000..58313c3 --- /dev/null +++ b/examples/twoway/types.lc @@ -0,0 +1,5 @@ +sample int A; +sample int B; +sample int Sum; +sample int Diff; +sample void Terminate; \ No newline at end of file diff --git a/lib/c/Makefile b/lib/c/Makefile index 284ce91..a22d14c 100644 --- a/lib/c/Makefile +++ b/lib/c/Makefile @@ -115,3 +115,6 @@ $(TEST_DIR)/test_labcomm_basic_type_encoding.o: labcomm_private.h $(TEST_DIR)/test_labcomm_generated_encoding.o : $(TEST_DIR)/gen/generated_encoding.h $(TEST_DIR)/test_labcomm_generated_encoding : $(TEST_DIR)/gen/generated_encoding.o $(TEST_DIR)/test_labcomm: $(TEST_GEN_DIR)/test_sample.o +labcomm_fd_reader.o: labcomm_private.h +labcomm_fd_writer.o: labcomm_private.h +labcomm_dynamic_buffer_writer.o: labcomm_private.h diff --git a/lib/c/labcomm.c b/lib/c/labcomm.c index 6d10055..801c5f7 100644 --- a/lib/c/labcomm.c +++ b/lib/c/labcomm.c @@ -43,6 +43,8 @@ struct labcomm_decoder { struct labcomm_lock *lock; labcomm_error_handler_callback on_error; labcomm_handle_new_datatype_callback on_new_datatype; + LABCOMM_SIGNATURE_ARRAY_DEF(local_to_remote, int); + LABCOMM_SIGNATURE_ARRAY_DEF(remote_to_local, int); }; struct labcomm_encoder { @@ -117,7 +119,7 @@ void labcomm_decoder_register_new_datatype_handler(struct labcomm_decoder *d, la int on_new_datatype(struct labcomm_decoder *d, struct labcomm_signature *sig) { - d->on_error(LABCOMM_ERROR_DEC_UNKNOWN_DATATYPE, 4, "%s(): unknown datatype '%s'\n", __FUNCTION__, sig->name); + d->on_error(LABCOMM_ERROR_DEC_UNKNOWN_DATATYPE, 4, "%s(): unknown datatype '%s'\n", __FUNCTION__, sig->name); return 0; } @@ -381,6 +383,8 @@ void labcomm_encoder_free(struct labcomm_encoder* e) context = (struct labcomm_encoder_context *) e->context; e->writer->action->free(e->writer, e->writer->context); + LABCOMM_SIGNATURE_ARRAY_FREE(e->registered, int); + #ifdef LABCOMM_ENCODER_LINEAR_SEARCH struct labcomm_sample_entry *entry = context->sample; struct labcomm_sample_entry *entry_next; @@ -397,8 +401,8 @@ void labcomm_encoder_free(struct labcomm_encoder* e) } int labcomm_encoder_ioctl(struct labcomm_encoder *encoder, - int action, - ...) + uint32_t action, + ...) { int result; va_list va; @@ -423,7 +427,7 @@ out: } static int labcomm_writer_ioctl(struct labcomm_writer *writer, - int action, + uint32_t action, ...) { int result; @@ -448,7 +452,7 @@ out: int labcomm_internal_encoder_ioctl(struct labcomm_encoder *encoder, struct labcomm_signature *signature, - int action, va_list va) + uint32_t action, va_list va) { int result = -ENOTSUP; @@ -530,6 +534,8 @@ struct labcomm_decoder *labcomm_decoder_new( result->lock = lock; result->on_error = on_error_fprintf; result->on_new_datatype = on_new_datatype; + LABCOMM_SIGNATURE_ARRAY_INIT(result->local_to_remote, int); + LABCOMM_SIGNATURE_ARRAY_INIT(result->remote_to_local, int); } return result; } @@ -619,8 +625,17 @@ int labcomm_decoder_decode_one(struct labcomm_decoder *d) "%s(): index mismatch '%s' (id=0x%x != 0x%x)\n", __FUNCTION__, signature.name, entry->index, index); } else { + int local_index; + int *local_to_remote, *remote_to_local; // TODO unnessesary, since entry->index == index in above if statement entry->index = index; + local_index = get_local_index(entry->signature); + local_to_remote = LABCOMM_SIGNATURE_ARRAY_REF(d->local_to_remote, int, + local_index); + remote_to_local = LABCOMM_SIGNATURE_ARRAY_REF(d->remote_to_local, int, + index); + *local_to_remote = index; + *remote_to_local = local_index; } free_signature_name: free(signature.name); @@ -636,7 +651,8 @@ int labcomm_decoder_decode_one(struct labcomm_decoder *d) if (!entry) { // printf("Error: %s: type not found (id=0x%x)\n", //__FUNCTION__, result); - d->on_error(LABCOMM_ERROR_DEC_TYPE_NOT_FOUND, 3, "%s(): type not found (id=0x%x)\n", __FUNCTION__, result); + d->on_error(LABCOMM_ERROR_DEC_TYPE_NOT_FOUND, 3, + "%s(): type not found (id=0x%x)\n", __FUNCTION__, result); result = -ENOENT; } else { entry->decoder(d->reader, entry->handler, entry->context); @@ -661,6 +677,8 @@ void labcomm_decoder_free(struct labcomm_decoder* d) struct labcomm_sample_entry *entry_next; d->reader->action->free(d->reader, d->reader->context); + LABCOMM_SIGNATURE_ARRAY_FREE(d->local_to_remote, int); + LABCOMM_SIGNATURE_ARRAY_FREE(d->remote_to_local, int); while (entry != NULL) { entry_next = entry->next; free(entry); @@ -672,37 +690,57 @@ void labcomm_decoder_free(struct labcomm_decoder* d) } int labcomm_decoder_ioctl(struct labcomm_decoder *decoder, - int action, + uint32_t action, ...) { - int result = -ENOTSUP; - - if (decoder->reader->action->ioctl != NULL) { - va_list va; + int result; + va_list va; - va_start(va, action); - result = decoder->reader->action->ioctl(decoder->reader, - decoder->reader->context, - 0, NULL, action, va); - va_end(va); - } + va_start(va, action); + result = decoder->reader->action->ioctl(decoder->reader, + decoder->reader->context, + 0, NULL, action, va); + va_end(va); return result; } int labcomm_internal_decoder_ioctl(struct labcomm_decoder *decoder, struct labcomm_signature *signature, - int action, va_list va) + uint32_t action, va_list va) { - int result = -ENOTSUP; - - if (decoder->reader->action->ioctl != NULL) { + int result; + int local_index, *remote_index; + + local_index = get_local_index(signature); + remote_index = LABCOMM_SIGNATURE_ARRAY_REF(decoder->local_to_remote, int, + local_index); + if (*remote_index == 0) { + result = -EAGAIN; + } else { result = decoder->reader->action->ioctl(decoder->reader, decoder->reader->context, - -1, signature, action, va); + *remote_index, signature, + action, va); } return result; } +#if 0 +static void dump(void *p, int size, int first, int last) +{ + int i, j; + + printf("%d %d (%p): ", first, last, p); + for (i = first ; i < last ; i++) { + for (j = 0 ; j < size ; j++) { + printf("%2.2d", ((char*)p)[(i-first)*size + j]); + } + printf(" "); + } + printf("\n"); +} +#endif + void *labcomm_signature_array_ref(int *first, int *last, void **data, int size, int index) { @@ -728,9 +766,11 @@ void *labcomm_signature_array_ref(int *first, int *last, void **data, old_data, (old_last - old_first) * size); } +// dump(old_data, size, old_first, old_last); free(old_data); } if (*data) { +// dump(*data, size, *first, *last); return *data + (index - *first) * size; } else { return NULL; diff --git a/lib/c/labcomm.h b/lib/c/labcomm.h index 3b4a25c..5592b2c 100644 --- a/lib/c/labcomm.h +++ b/lib/c/labcomm.h @@ -2,6 +2,7 @@ #define _LABCOMM_H_ #include <stdarg.h> +#include <stdint.h> #include <unistd.h> #include <labcomm_error.h> @@ -78,7 +79,7 @@ void labcomm_decoder_free( /* See labcomm_ioctl.h for predefined ioctl_action values */ int labcomm_decoder_ioctl(struct labcomm_decoder *decoder, - int ioctl_action, + uint32_t ioctl_action, ...); /* @@ -94,7 +95,7 @@ void labcomm_encoder_free( /* See labcomm_ioctl.h for predefined ioctl_action values */ int labcomm_encoder_ioctl(struct labcomm_encoder *encoder, - int ioctl_action, + uint32_t ioctl_action, ...); #endif diff --git a/lib/c/labcomm_dynamic_buffer_writer.c b/lib/c/labcomm_dynamic_buffer_writer.c index c3ef5cd..9588807 100644 --- a/lib/c/labcomm_dynamic_buffer_writer.c +++ b/lib/c/labcomm_dynamic_buffer_writer.c @@ -79,7 +79,7 @@ static int dyn_flush(struct labcomm_writer *w, void *context) static int dyn_ioctl(struct labcomm_writer *w, void *context, int signature_index, struct labcomm_signature *signature, - int action, va_list arg) + uint32_t action, va_list arg) { int result = -ENOTSUP; switch (action) { diff --git a/lib/c/labcomm_fd_reader.c b/lib/c/labcomm_fd_reader.c index 3aa0d63..8edbdcd 100644 --- a/lib/c/labcomm_fd_reader.c +++ b/lib/c/labcomm_fd_reader.c @@ -105,7 +105,7 @@ static int fd_end(struct labcomm_reader *r, void *context) static int fd_ioctl(struct labcomm_reader *r, void *context, int signature_index, struct labcomm_signature *signature, - int action, va_list args) + uint32_t action, va_list args) { return -ENOTSUP; } diff --git a/lib/c/labcomm_fd_writer.c b/lib/c/labcomm_fd_writer.c index c2e1b12..b712ffb 100644 --- a/lib/c/labcomm_fd_writer.c +++ b/lib/c/labcomm_fd_writer.c @@ -92,7 +92,7 @@ static int fd_flush(struct labcomm_writer *w, void *context) static int fd_ioctl(struct labcomm_writer *w, void *context, int signature_index, struct labcomm_signature *signature, - int action, va_list args) + uint32_t action, va_list args) { return -ENOTSUP; } diff --git a/lib/c/labcomm_private.h b/lib/c/labcomm_private.h index 975391f..0bd9781 100644 --- a/lib/c/labcomm_private.h +++ b/lib/c/labcomm_private.h @@ -79,7 +79,7 @@ struct labcomm_reader_action { int (*fill)(struct labcomm_reader *r, void *context); int (*ioctl)(struct labcomm_reader *r, void *context, int signature_index, struct labcomm_signature *signature, - int ioctl_action, va_list args); + uint32_t ioctl_action, va_list args); }; struct labcomm_reader { @@ -107,7 +107,7 @@ void labcomm_internal_decoder_register( int labcomm_internal_decoder_ioctl(struct labcomm_decoder *decoder, struct labcomm_signature *signature, - int ioctl_action, va_list args); + uint32_t ioctl_action, va_list args); #if __BYTE_ORDER == __LITTLE_ENDIAN @@ -212,7 +212,7 @@ struct labcomm_writer_action { int (*flush)(struct labcomm_writer *w, void *context); int (*ioctl)(struct labcomm_writer *w, void *context, int signature_index, struct labcomm_signature *signature, - int ioctl_action, va_list args); + uint32_t ioctl_action, va_list args); }; struct labcomm_writer { @@ -239,7 +239,7 @@ int labcomm_internal_encode( int labcomm_internal_encoder_ioctl(struct labcomm_encoder *encoder, struct labcomm_signature *signature, - int ioctl_action, va_list args); + uint32_t ioctl_action, va_list args); #if __BYTE_ORDER == __LITTLE_ENDIAN diff --git a/lib/c/test/test_labcomm_generated_encoding.c b/lib/c/test/test_labcomm_generated_encoding.c index 4e1ff10..6185ff5 100644 --- a/lib/c/test/test_labcomm_generated_encoding.c +++ b/lib/c/test/test_labcomm_generated_encoding.c @@ -54,7 +54,7 @@ static int buf_writer_flush(struct labcomm_writer *w, void *context) static int buf_writer_ioctl( struct labcomm_writer *w, void *context, int signature_index, struct labcomm_signature *signature, - int action, va_list arg) + uint32_t action, va_list arg) { int result = -ENOTSUP; switch (action) { diff --git a/test/Makefile b/test/Makefile index 75a98a2..b006c9e 100644 --- a/test/Makefile +++ b/test/Makefile @@ -51,12 +51,9 @@ gen/%/c_relay.c: gen/%/typeinfo relay_gen_c.py Makefile .PRECIOUS: gen/%/c_relay gen/%/c_relay: gen/%/c_relay.c gen/%/c_code.c Makefile - $(CC) $(CFLAGS) -o $@ $< -I../lib/c -I. \ - -DLABCOMM_ENCODER_LINEAR_SEARCH \ + $(CC) $(CFLAGS) -o $@ $< -I../lib/c -I. -L..//lib/c \ gen/$*/c_code.c \ - ../lib/c/labcomm.c \ - ../lib/c/labcomm_fd_*.c \ - ../lib/c/labcomm_dynamic_buffer_writer.c + -llabcomm -Tlabcomm.linkscript # C# relay test rules .PRECIOUS: gen/%/cs_code.cs -- GitLab