From d8cab5f2bb29bd00abe6dbeb63a8cb68e3425a6f Mon Sep 17 00:00:00 2001 From: Anders Blomdell <anders.blomdell@control.lth.se> Date: Fri, 28 Jun 2013 19:37:35 +0200 Subject: [PATCH] Twoway example completed. --- examples/twoway/client.c | 19 +++++- examples/twoway/decimating.c | 74 +++++++++++++------- examples/twoway/introspecting.c | 116 ++++++++++++++++++++------------ examples/twoway/introspecting.h | 5 +- examples/twoway/server.c | 31 ++++++--- 5 files changed, 165 insertions(+), 80 deletions(-) diff --git a/examples/twoway/client.c b/examples/twoway/client.c index 656a71f..3714abf 100644 --- a/examples/twoway/client.c +++ b/examples/twoway/client.c @@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <errno.h> #include <arpa/inet.h> #include <linux/tcp.h> #include <netdb.h> @@ -42,12 +43,17 @@ static void handle_Sum(int32_t *value, void *context) { - printf("A+B=%d\n", *value); + printf("A+B=%d ", *value); } static void handle_Diff(int32_t *value, void *context) { - printf("A-B=%d\n", *value); + printf("A-B=%d ", *value); +} + +static void handle_Product(int32_t *value, void *context) +{ + printf("A*B=%d ", *value); } static void *run_decoder(void *context) @@ -151,17 +157,24 @@ int main(int argc, char *argv[]) labcomm_encoder_register_types_B(encoder); labcomm_encoder_register_types_Terminate(encoder); + /* Sleep to make all remote types be known to introspecting and decimating + wrappers, this is a HACK! */ + sleep(1); err = labcomm_decoder_ioctl_types_Sum(decoder, SET_DECIMATION, 2); err = labcomm_decoder_ioctl_types_Diff(decoder, SET_DECIMATION, 4); for (i = 0 ; i < 4 ; i++) { + if (i == 2) { + labcomm_decoder_register_types_Product(decoder, handle_Product, NULL); + } for (j = 0 ; j < 4 ; j++) { - printf("A=%d B=%d\n", i, j); + printf("\nA=%d B=%d: ", i, j); labcomm_encode_types_A(encoder, &i); labcomm_encode_types_B(encoder, &j); sleep(1); } } + printf("\n"); labcomm_encode_types_Terminate(encoder, LABCOMM_VOID); out: return 0; diff --git a/examples/twoway/decimating.c b/examples/twoway/decimating.c index 717a8e7..3b3cce3 100644 --- a/examples/twoway/decimating.c +++ b/examples/twoway/decimating.c @@ -49,12 +49,14 @@ static void set_decimation( struct decimating_private *decimating = context; struct decimation *decimation; + labcomm_scheduler_data_lock(decimating->scheduler); decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory, decimating->decimation, struct decimation, value->signature_index); decimation->n = value->decimation; decimation->current = 0; + labcomm_scheduler_data_unlock(decimating->scheduler); } static int wrap_reader_alloc( @@ -62,18 +64,27 @@ static int wrap_reader_alloc( struct labcomm_reader_action_context *action_context, char *labcomm_version) { - int result; - struct decimating_private *decimating = action_context->context; - fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__); - /* Stash away decoder for later use */ - result = labcomm_reader_alloc(r, action_context->next, labcomm_version); - fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__); labcomm_decoder_register_decimating_messages_set_decimation( r->decoder, set_decimation, decimating); - fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__); - return result; + return labcomm_reader_alloc(r, action_context->next, labcomm_version); +} + +struct send_set_decimation { + struct decimating_private *decimating; + decimating_messages_set_decimation set_decimation; + +}; + +static void send_set_decimation(void *arg) +{ + struct send_set_decimation *msg = arg; + struct labcomm_memory *memory = msg->decimating->memory; + + labcomm_encode_decimating_messages_set_decimation( + msg->decimating->decimating.writer->encoder, &msg->set_decimation); + labcomm_memory_free(memory, 1, msg); } static int wrap_reader_ioctl( @@ -86,20 +97,26 @@ static int wrap_reader_ioctl( struct decimating_private *decimating = action_context->context; if (action == SET_DECIMATION) { - decimating_messages_set_decimation decimation; - va_list va; - - va_copy(va, args); - decimation.decimation = va_arg(va, int); - decimation.signature_index = signature_index; - va_end(va); - return labcomm_encode_decimating_messages_set_decimation( - decimating->decimating.writer->encoder, &decimation); + struct send_set_decimation *msg; + + msg = labcomm_memory_alloc(decimating->memory, 1, sizeof(*msg)); + if (msg) { + va_list va; + + va_copy(va, args); + msg->decimating = decimating; + msg->set_decimation.decimation = va_arg(va, int); + msg->set_decimation.signature_index = signature_index; + va_end(va); + + labcomm_scheduler_enqueue(decimating->scheduler, 0, + send_set_decimation, msg); + } } else { return labcomm_reader_ioctl(r, action_context->next, signature_index, signature, action, args); } - + return 0; } struct labcomm_reader_action decimating_reader_action = { @@ -125,14 +142,10 @@ static int wrap_writer_alloc( char *labcomm_version) { struct decimating_private *decimating = action_context->context; - int result; - fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__); - result = labcomm_writer_alloc(w, action_context->next, labcomm_version); labcomm_scheduler_enqueue(decimating->scheduler, 0, register_signatures, decimating); - - return result; + return labcomm_writer_alloc(w, action_context->next, labcomm_version); } static int wrap_writer_start( @@ -143,18 +156,27 @@ static int wrap_writer_start( { struct decimating_private *decimating = action_context->context; struct decimation *decimation; + int result; + + labcomm_scheduler_data_lock(decimating->scheduler); decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory, decimating->decimation, struct decimation, index); decimation->current++; if (decimation->current < decimation->n) { - return -EALREADY; + result = -EALREADY; } else { decimation->current = 0; - return labcomm_writer_start(w, action_context->next, - index, signature, value); + result = 0; + } + labcomm_scheduler_data_unlock(decimating->scheduler); + + if (result == 0) { + result = labcomm_writer_start(w, action_context->next, + index, signature, value); } + return result; } struct labcomm_writer_action decimating_writer_action = { diff --git a/examples/twoway/introspecting.c b/examples/twoway/introspecting.c index cb17246..a1e1028 100644 --- a/examples/twoway/introspecting.c +++ b/examples/twoway/introspecting.c @@ -27,7 +27,6 @@ #include "introspecting.h" #include "gen/introspecting_messages.h" -enum status {unknown, unhandled, unregistered, registered}; struct introspecting_private { struct introspecting introspecting; struct labcomm_error_handler *error; @@ -44,19 +43,55 @@ struct introspecting_private { }); LABCOMM_SIGNATURE_ARRAY_DEF(local, struct local { - enum status status; + enum introspecting_status status; struct labcomm_signature *signature; }); }; +static struct local *get_local(struct introspecting_private *introspecting, + int index, + struct labcomm_signature *signature) +{ + /* Called with data_lock held */ + struct local *local; + + local = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory, + introspecting->local, + struct local, + index); + if (local->signature == NULL) { + local->signature = signature; + local->status = introspecting_unknown; + } + if (local->status == introspecting_unknown) { + int i; + + local->status = introspecting_unhandled; + LABCOMM_SIGNATURE_ARRAY_FOREACH(introspecting->remote, struct remote, i) { + struct remote *r; + + r = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory, + introspecting->remote, struct remote, i); + if (r->name && + strcmp(signature->name, r->name) == 0 && + r->size == signature->size && + memcmp(signature->signature, r->signature, signature->size) == 0) { + local->status = introspecting_unregistered; + break; + } + } + } + return local; +} + static void handles_signature( introspecting_messages_handles_signature *value, void * context) { - fprintf(stderr, "### %s %x %s\n", __FUNCTION__, value->index, value->name); struct introspecting_private *introspecting = context; struct remote *remote; + labcomm_scheduler_data_lock(introspecting->scheduler); remote = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory, introspecting->remote, struct remote, @@ -74,17 +109,17 @@ static void handles_signature( l = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory, introspecting->local, struct local, i); if (l->signature && - l->status == unhandled && + l->status == introspecting_unhandled && l->signature->name && strcmp(remote->name, l->signature->name) == 0 && remote->size == l->signature->size && memcmp(l->signature->signature, remote->signature, l->signature->size) == 0) { - fprintf(stderr, "OK %s %x %x\n", __FUNCTION__, value->index, i); - l->status = unregistered; + l->status = introspecting_unregistered; } } } + labcomm_scheduler_data_unlock(introspecting->scheduler); } static int wrap_reader_alloc( @@ -92,15 +127,12 @@ static int wrap_reader_alloc( struct labcomm_reader_action_context *action_context, char *labcomm_version) { - int result; struct introspecting_private *introspecting = action_context->context; - fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__); - result = labcomm_reader_alloc(r, action_context->next, labcomm_version); labcomm_decoder_register_introspecting_messages_handles_signature( introspecting->introspecting.reader->decoder, handles_signature, introspecting); - return result; + return labcomm_reader_alloc(r, action_context->next, labcomm_version); } struct handles_signature { @@ -140,7 +172,6 @@ static int wrap_reader_start( handles_signature->signature = signature; labcomm_scheduler_enqueue(introspecting->scheduler, 0, send_handles_signature, handles_signature); - } return labcomm_reader_start(r, action_context->next, local_index, remote_index, signature, value); @@ -154,7 +185,6 @@ static int wrap_reader_start( introspecting_messages_handles_signature handles_signature; int index = 0; - fprintf(stderr, "## Handles %x %s\n", index, signature->name); handles_signature.index = index; handles_signature.name = signature->name; handles_signature.signature.n_0 = signature->size; @@ -188,7 +218,6 @@ static int wrap_writer_alloc( { struct introspecting_private *introspecting = action_context->context; - fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__); labcomm_scheduler_enqueue(introspecting->scheduler, 0, register_encoder_signatures, introspecting); return labcomm_writer_alloc(w, action_context->next, labcomm_version); @@ -201,39 +230,42 @@ static int wrap_writer_start( void *value) { struct introspecting_private *introspecting = action_context->context; - struct local *local; - fprintf(stderr, "%s %x %s\n", __FUNCTION__, index, signature->name); - local = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory, - introspecting->local, - struct local, - index); - if (local->signature == NULL) { - local->signature = signature; + if (value == NULL) { + struct local *local; + + labcomm_scheduler_data_lock(introspecting->scheduler); + local = get_local(introspecting, index, signature); + local->status = introspecting_registered; + labcomm_scheduler_data_unlock(introspecting->scheduler); } - if (local->status == unknown) { - int i; - int found = 0; + return labcomm_writer_start(w, action_context->next, index, signature, value); +} - LABCOMM_SIGNATURE_ARRAY_FOREACH(introspecting->remote, struct remote, i) { - struct remote *r; - - r = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory, - introspecting->remote, struct remote, i); - if (r->name && - strcmp(signature->name, r->name) == 0 && - r->size == signature->size && - memcmp(signature->signature, r->signature, signature->size) == 0) { - fprintf(stderr, "OK %s %x %x\n", __FUNCTION__, index, i); - found = i; - } - } - if (found == 0) { - local->status = unhandled; +static int wrap_writer_ioctl( + struct labcomm_writer *w, + struct labcomm_writer_action_context *action_context, + int index, struct labcomm_signature *signature, + uint32_t ioctl_action, va_list args) +{ + struct introspecting_private *introspecting = action_context->context; + + switch (ioctl_action) { + case HAS_SIGNATURE: { + struct local *local; + int result; + + labcomm_scheduler_data_lock(introspecting->scheduler); + local = get_local(introspecting, index, signature); + result = local->status; + labcomm_scheduler_data_unlock(introspecting->scheduler); + return result; } - fprintf(stderr, "Found: %d\n", found); + default: { + return labcomm_writer_ioctl(w, action_context->next, index, signature, + ioctl_action, args); + } break; } - return labcomm_writer_start(w, action_context->next, index, signature, value); } struct labcomm_writer_action introspecting_writer_action = { @@ -242,7 +274,7 @@ struct labcomm_writer_action introspecting_writer_action = { .start = wrap_writer_start, .end = NULL, .flush = NULL, - .ioctl = NULL + .ioctl = wrap_writer_ioctl }; extern struct introspecting *introspecting_new( diff --git a/examples/twoway/introspecting.h b/examples/twoway/introspecting.h index 894b998..984670d 100644 --- a/examples/twoway/introspecting.h +++ b/examples/twoway/introspecting.h @@ -41,5 +41,8 @@ extern struct introspecting *introspecting_new( struct labcomm_scheduler *scheduler); #define HAS_SIGNATURE LABCOMM_IOS('i',2) - +enum introspecting_status { introspecting_unknown, + introspecting_unhandled, + introspecting_unregistered, + introspecting_registered }; #endif diff --git a/examples/twoway/server.c b/examples/twoway/server.c index 66c7c6a..a8f3806 100644 --- a/examples/twoway/server.c +++ b/examples/twoway/server.c @@ -42,9 +42,10 @@ struct client { pthread_t decoder_thread; struct sockaddr_in adr; unsigned int adrlen; - int32_t A, B, Sum, Diff; + int32_t A, B, Sum, Diff, Product; struct labcomm_decoder *decoder; struct labcomm_encoder *encoder; + struct labcomm_scheduler *scheduler; }; static void handle_A(int32_t *value, void *context) @@ -57,12 +58,25 @@ static void handle_A(int32_t *value, void *context) static void handle_B(int32_t *value, void *context) { struct client *client = context; + int status; client->B = *value; client->Sum = client->A + client->B; client->Diff = client->A - client->B; labcomm_encode_types_Sum(client->encoder, &client->Sum); labcomm_encode_types_Diff(client->encoder, &client->Diff); + status = labcomm_encoder_ioctl_types_Product(client->encoder, HAS_SIGNATURE); + switch (status) { + case introspecting_unregistered: + labcomm_encoder_register_types_Product(client->encoder); + /* fall through */ + case introspecting_registered: + client->Product = client->A * client->B; + labcomm_encode_types_Product(client->encoder, &client->Product); + break; + default: + break; + } } static void handle_Terminate(types_Terminate *value, void *context) @@ -75,7 +89,6 @@ static void *run_decoder(void *arg) struct client *client = arg; int result; - labcomm_decoder_register_types_A(client->decoder, handle_A, client); labcomm_decoder_register_types_B(client->decoder, handle_B, client); labcomm_decoder_register_types_Terminate(client->decoder, handle_Terminate, @@ -83,6 +96,7 @@ static void *run_decoder(void *arg) do { result = labcomm_decoder_decode_one(client->decoder); } while (result >= 0); + labcomm_scheduler_wakeup(client->scheduler); return NULL; } @@ -91,19 +105,18 @@ static void *run_client(void *arg) struct client *client = arg; struct decimating *decimating; struct introspecting *introspecting; - struct labcomm_scheduler *scheduler; printf("Client start\n"); client->A = 0; client->B = 0; - scheduler = labcomm_pthread_scheduler_new(labcomm_default_memory); + client->scheduler = labcomm_pthread_scheduler_new(labcomm_default_memory); decimating = decimating_new(labcomm_fd_reader_new(labcomm_default_memory, client->fd, 1), labcomm_fd_writer_new(labcomm_default_memory, client->fd, 0), labcomm_default_error_handler, labcomm_default_memory, - scheduler); + client->scheduler); if (decimating == NULL) { /* Warning: might leak reader and writer at this point */ goto out; @@ -112,7 +125,7 @@ static void *run_client(void *arg) decimating->writer, labcomm_default_error_handler, labcomm_default_memory, - scheduler); + client->scheduler); if (introspecting == NULL) { /* Warning: might leak reader and writer at this point */ goto out; @@ -120,15 +133,17 @@ static void *run_client(void *arg) client->decoder = labcomm_decoder_new(introspecting->reader, labcomm_default_error_handler, labcomm_default_memory, - scheduler); + client->scheduler); client->encoder = labcomm_encoder_new(introspecting->writer, labcomm_default_error_handler, labcomm_default_memory, - scheduler); + client->scheduler); pthread_t rdt; pthread_create(&rdt, NULL, run_decoder, client); labcomm_encoder_register_types_Sum(client->encoder); labcomm_encoder_register_types_Diff(client->encoder); + labcomm_scheduler_sleep(client->scheduler, NULL); + printf("Awoken\n"); pthread_join(rdt, NULL); out: printf("Client end\n"); -- GitLab