From 40b94830de62330c5fee20b46002d538ea186948 Mon Sep 17 00:00:00 2001 From: Anders Blomdell <anders.blomdell@control.lth.se> Date: Fri, 28 Jun 2013 20:53:55 +0200 Subject: [PATCH] Changed decoder to call reader->start both at registration and reception of remote declaration. Corrected twoway/decimating.c to take advantage of this. --- examples/twoway/client.c | 8 ++-- examples/twoway/decimating.c | 91 +++++++++++++++++++++++++++--------- examples/twoway/server.c | 12 +++-- lib/c/labcomm.c | 5 +- lib/c/labcomm_decoder.c | 27 +++++++---- lib/c/labcomm_private.h | 15 ++++-- 6 files changed, 113 insertions(+), 45 deletions(-) diff --git a/examples/twoway/client.c b/examples/twoway/client.c index 3714abf..484b20e 100644 --- a/examples/twoway/client.c +++ b/examples/twoway/client.c @@ -85,6 +85,7 @@ int main(int argc, char *argv[]) struct labcomm_scheduler *scheduler; struct labcomm_decoder *decoder; struct labcomm_encoder *encoder; + struct labcomm_time *next; int32_t i, j; hostname = argv[1]; @@ -157,12 +158,10 @@ int main(int argc, char *argv[]) labcomm_encoder_register_types_B(encoder); labcomm_encoder_register_types_Terminate(encoder); - /* Sleep to make all remote types be known to introspecting and decimating - wrappers, this is a HACK! */ - sleep(1); err = labcomm_decoder_ioctl_types_Sum(decoder, SET_DECIMATION, 2); err = labcomm_decoder_ioctl_types_Diff(decoder, SET_DECIMATION, 4); + next = labcomm_scheduler_now(scheduler); for (i = 0 ; i < 4 ; i++) { if (i == 2) { labcomm_decoder_register_types_Product(decoder, handle_Product, NULL); @@ -171,7 +170,8 @@ int main(int argc, char *argv[]) printf("\nA=%d B=%d: ", i, j); labcomm_encode_types_A(encoder, &i); labcomm_encode_types_B(encoder, &j); - sleep(1); + labcomm_time_add_usec(next, 100000); + labcomm_scheduler_sleep(scheduler, next); } } printf("\n"); diff --git a/examples/twoway/decimating.c b/examples/twoway/decimating.c index 3b3cce3..dbcc80c 100644 --- a/examples/twoway/decimating.c +++ b/examples/twoway/decimating.c @@ -35,11 +35,12 @@ struct decimating_private { int encoder_initialized; struct labcomm_reader_action_context reader_action_context; struct labcomm_writer_action_context writer_action_context; - LABCOMM_SIGNATURE_ARRAY_DEF(decimation, + LABCOMM_SIGNATURE_ARRAY_DEF(writer_decimation, struct decimation { int n; int current; }); + LABCOMM_SIGNATURE_ARRAY_DEF(reader_decimation, int); }; static void set_decimation( @@ -51,7 +52,7 @@ static void set_decimation( labcomm_scheduler_data_lock(decimating->scheduler); decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory, - decimating->decimation, + decimating->writer_decimation, struct decimation, value->signature_index); decimation->n = value->decimation; @@ -87,34 +88,81 @@ static void send_set_decimation(void *arg) labcomm_memory_free(memory, 1, msg); } +static void enqueue_decimation(struct decimating_private *decimating, + int remote_index, + int amount) +{ + struct send_set_decimation *msg; + msg = labcomm_memory_alloc(decimating->memory, 1, sizeof(*msg)); + if (msg) { + msg->decimating = decimating; + msg->set_decimation.decimation = amount; + msg->set_decimation.signature_index = remote_index; + + labcomm_scheduler_enqueue(decimating->scheduler, 0, + send_set_decimation, msg); + } +} + +static int wrap_reader_start( + struct labcomm_reader *r, + struct labcomm_reader_action_context *action_context, + int local_index, int remote_index, struct labcomm_signature *signature, + void *value) +{ + struct decimating_private *decimating = action_context->context; + + if (value == NULL) { + int *decimation, amount; + + labcomm_scheduler_data_lock(decimating->scheduler); + decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory, + decimating->reader_decimation, + int, + local_index); + amount = *decimation; + labcomm_scheduler_data_unlock(decimating->scheduler); + if (remote_index != 0 && amount != 0) { + enqueue_decimation(decimating, remote_index, amount); + } + } + return labcomm_reader_start(r, action_context->next, + local_index, remote_index, signature, value); +} + static int wrap_reader_ioctl( struct labcomm_reader *r, struct labcomm_reader_action_context *action_context, - int signature_index, + int local_index, int remote_index, struct labcomm_signature *signature, uint32_t action, va_list args) { struct decimating_private *decimating = action_context->context; if (action == SET_DECIMATION) { - struct send_set_decimation *msg; - - msg = labcomm_memory_alloc(decimating->memory, 1, sizeof(*msg)); - if (msg) { - va_list va; - - va_copy(va, args); - msg->decimating = decimating; - msg->set_decimation.decimation = va_arg(va, int); - msg->set_decimation.signature_index = signature_index; - va_end(va); - - labcomm_scheduler_enqueue(decimating->scheduler, 0, - send_set_decimation, msg); + va_list va; + int amount; + int *decimation; + + va_copy(va, args); + amount = va_arg(va, int); + va_end(va); + + labcomm_scheduler_data_lock(decimating->scheduler); + decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory, + decimating->reader_decimation, + int, + local_index); + *decimation = amount; + labcomm_scheduler_data_unlock(decimating->scheduler); + if (remote_index) { + enqueue_decimation(decimating, remote_index, amount); } + } else { return labcomm_reader_ioctl(r, action_context->next, - signature_index, signature, action, args); + local_index, remote_index, signature, + action, args); } return 0; } @@ -122,7 +170,7 @@ static int wrap_reader_ioctl( struct labcomm_reader_action decimating_reader_action = { .alloc = wrap_reader_alloc, .free = NULL, - .start = NULL, + .start = wrap_reader_start, .end = NULL, .fill = NULL, .ioctl = wrap_reader_ioctl @@ -161,7 +209,7 @@ static int wrap_writer_start( labcomm_scheduler_data_lock(decimating->scheduler); decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory, - decimating->decimation, + decimating->writer_decimation, struct decimation, index); decimation->current++; if (decimation->current < decimation->n) { @@ -221,7 +269,8 @@ struct decimating *decimating_new( result->error = error; result->memory = memory; result->scheduler = scheduler; - LABCOMM_SIGNATURE_ARRAY_INIT(result->decimation, struct decimation); + LABCOMM_SIGNATURE_ARRAY_INIT(result->writer_decimation, struct decimation); + LABCOMM_SIGNATURE_ARRAY_INIT(result->reader_decimation, int); goto out_ok; diff --git a/examples/twoway/server.c b/examples/twoway/server.c index a8f3806..3a34efa 100644 --- a/examples/twoway/server.c +++ b/examples/twoway/server.c @@ -20,6 +20,7 @@ */ #include <arpa/inet.h> +#include <linux/tcp.h> #include <errno.h> #include <pthread.h> #include <stdlib.h> @@ -61,10 +62,6 @@ static void handle_B(int32_t *value, void *context) int status; client->B = *value; - client->Sum = client->A + client->B; - client->Diff = client->A - client->B; - labcomm_encode_types_Sum(client->encoder, &client->Sum); - labcomm_encode_types_Diff(client->encoder, &client->Diff); status = labcomm_encoder_ioctl_types_Product(client->encoder, HAS_SIGNATURE); switch (status) { case introspecting_unregistered: @@ -77,6 +74,10 @@ static void handle_B(int32_t *value, void *context) default: break; } + client->Sum = client->A + client->B; + client->Diff = client->A - client->B; + labcomm_encode_types_Sum(client->encoder, &client->Sum); + labcomm_encode_types_Diff(client->encoder, &client->Diff); } static void handle_Terminate(types_Terminate *value, void *context) @@ -183,6 +184,7 @@ int main(int argc, char *argv[]) } while (1) { struct client *client; + int nodelay; client = malloc(sizeof(*client)); if (client == NULL) { @@ -198,6 +200,8 @@ int main(int argc, char *argv[]) result = errno; goto failed_to_accept; } + nodelay = 1; + setsockopt(client->fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); pthread_create(&client->main_thread, NULL, run_client, client); } diff --git a/lib/c/labcomm.c b/lib/c/labcomm.c index 1721289..0e983b1 100644 --- a/lib/c/labcomm.c +++ b/lib/c/labcomm.c @@ -85,11 +85,12 @@ int labcomm_reader_fill(struct labcomm_reader *r, int labcomm_reader_ioctl(struct labcomm_reader *r, struct labcomm_reader_action_context *action_context, - int index, + int local_index, int remote_index, struct labcomm_signature *signature, uint32_t ioctl_action, va_list args) { - UNWRAP(ioctl, r, action_context, index, signature, ioctl_action, args); + UNWRAP(ioctl, r, action_context, + local_index, remote_index, signature, ioctl_action, args); } int labcomm_writer_alloc(struct labcomm_writer *w, diff --git a/lib/c/labcomm_decoder.c b/lib/c/labcomm_decoder.c index 7d657d7..9a82859 100644 --- a/lib/c/labcomm_decoder.c +++ b/lib/c/labcomm_decoder.c @@ -175,9 +175,11 @@ static int decode_typedef_or_sample(struct labcomm_decoder *d, int kind) .pos = 0, .error = 0, }; - struct labcomm_signature signature; - int remote_index, err; - + struct labcomm_signature signature, *local_signature; + int remote_index, local_index, err; + + local_signature = NULL; + local_index = 0; labcomm_writer_alloc(&writer, writer.action_context, ""); labcomm_writer_start(&writer, writer.action_context, 0, NULL, NULL); remote_index = labcomm_read_packed32(d->reader); //int @@ -219,6 +221,8 @@ static int decode_typedef_or_sample(struct labcomm_decoder *d, int kind) bcmp((void*)s->signature->signature, (void*)signature.signature, signature.size) == 0) { s->remote_index = remote_index; + local_signature = s->signature; + local_index = i; remote_to_local = LABCOMM_SIGNATURE_ARRAY_REF(d->memory, d->remote_to_local, int, remote_index); @@ -228,6 +232,12 @@ static int decode_typedef_or_sample(struct labcomm_decoder *d, int kind) } } labcomm_scheduler_data_unlock(d->scheduler); + if (local_signature) { + labcomm_reader_start(d->reader, d->reader->action_context, + local_index, remote_index, local_signature, + NULL); + labcomm_reader_end(d->reader, d->reader->action_context); + } } #if 0 if (! entry) { @@ -344,7 +354,7 @@ int labcomm_decoder_ioctl(struct labcomm_decoder *d, va_start(va, action); result = labcomm_reader_ioctl(d->reader, d->reader->action_context, - 0, NULL, action, va); + 0, 0, NULL, action, va); va_end(va); return result; } @@ -363,12 +373,9 @@ int labcomm_internal_decoder_ioctl(struct labcomm_decoder *d, struct sample_entry, local_index)->remote_index; labcomm_scheduler_data_unlock(d->scheduler); - if (remote_index == 0) { - result = -EAGAIN; - } else { - result = labcomm_reader_ioctl(d->reader, d->reader->action_context, - remote_index, signature, action, va); - } + result = labcomm_reader_ioctl(d->reader, d->reader->action_context, + local_index, remote_index, + signature, action, va); return result; } diff --git a/lib/c/labcomm_private.h b/lib/c/labcomm_private.h index e953f02..6ea5b43 100644 --- a/lib/c/labcomm_private.h +++ b/lib/c/labcomm_private.h @@ -113,8 +113,13 @@ struct labcomm_reader_action { */ int (*free)(struct labcomm_reader *r, struct labcomm_reader_action_context *action_context); - /* 'start' is called right after a sample has arrived. In the case of - a sample or typedef, 'value' == NULL. + /* 'start' is called at the following instances: + 1. When a sample is registered + (local_index != 0, remote_index == 0, value == NULL) + 2. When a sample definition is received + (local_index != 0, remote_index != 0, value == NULL) + 3. When a sample is received + (local_index != 0, remote_index != 0, value != NULL) */ int (*start)(struct labcomm_reader *r, struct labcomm_reader_action_context *action_context, @@ -127,7 +132,8 @@ struct labcomm_reader_action { struct labcomm_reader_action_context *action_context); int (*ioctl)(struct labcomm_reader *r, struct labcomm_reader_action_context *action_context, - int index, struct labcomm_signature *signature, + int local_index, int remote_index, + struct labcomm_signature *signature, uint32_t ioctl_action, va_list args); }; @@ -165,7 +171,8 @@ int labcomm_reader_fill(struct labcomm_reader *r, struct labcomm_reader_action_context *action_context); int labcomm_reader_ioctl(struct labcomm_reader *r, struct labcomm_reader_action_context *action_context, - int index, struct labcomm_signature *signature, + int local_index, int remote_index, + struct labcomm_signature *signature, uint32_t ioctl_action, va_list args); /* -- GitLab