Commit d8cab5f2 authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Twoway example completed.

parent d44882a8
......@@ -19,6 +19,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include <arpa/inet.h>
#include <linux/tcp.h>
#include <netdb.h>
......@@ -42,12 +43,17 @@
static void handle_Sum(int32_t *value, void *context)
{
printf("A+B=%d\n", *value);
printf("A+B=%d ", *value);
}
static void handle_Diff(int32_t *value, void *context)
{
printf("A-B=%d\n", *value);
printf("A-B=%d ", *value);
}
static void handle_Product(int32_t *value, void *context)
{
printf("A*B=%d ", *value);
}
static void *run_decoder(void *context)
......@@ -151,17 +157,24 @@ int main(int argc, char *argv[])
labcomm_encoder_register_types_B(encoder);
labcomm_encoder_register_types_Terminate(encoder);
/* Sleep to make all remote types be known to introspecting and decimating
wrappers, this is a HACK! */
sleep(1);
err = labcomm_decoder_ioctl_types_Sum(decoder, SET_DECIMATION, 2);
err = labcomm_decoder_ioctl_types_Diff(decoder, SET_DECIMATION, 4);
for (i = 0 ; i < 4 ; i++) {
if (i == 2) {
labcomm_decoder_register_types_Product(decoder, handle_Product, NULL);
}
for (j = 0 ; j < 4 ; j++) {
printf("A=%d B=%d\n", i, j);
printf("\nA=%d B=%d: ", i, j);
labcomm_encode_types_A(encoder, &i);
labcomm_encode_types_B(encoder, &j);
sleep(1);
}
}
printf("\n");
labcomm_encode_types_Terminate(encoder, LABCOMM_VOID);
out:
return 0;
......
......@@ -49,12 +49,14 @@ static void set_decimation(
struct decimating_private *decimating = context;
struct decimation *decimation;
labcomm_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->decimation,
struct decimation,
value->signature_index);
decimation->n = value->decimation;
decimation->current = 0;
labcomm_scheduler_data_unlock(decimating->scheduler);
}
static int wrap_reader_alloc(
......@@ -62,18 +64,27 @@ static int wrap_reader_alloc(
struct labcomm_reader_action_context *action_context,
char *labcomm_version)
{
int result;
struct decimating_private *decimating = action_context->context;
fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__);
/* Stash away decoder for later use */
result = labcomm_reader_alloc(r, action_context->next, labcomm_version);
fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__);
labcomm_decoder_register_decimating_messages_set_decimation(
r->decoder, set_decimation, decimating);
fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__);
return result;
return labcomm_reader_alloc(r, action_context->next, labcomm_version);
}
struct send_set_decimation {
struct decimating_private *decimating;
decimating_messages_set_decimation set_decimation;
};
static void send_set_decimation(void *arg)
{
struct send_set_decimation *msg = arg;
struct labcomm_memory *memory = msg->decimating->memory;
labcomm_encode_decimating_messages_set_decimation(
msg->decimating->decimating.writer->encoder, &msg->set_decimation);
labcomm_memory_free(memory, 1, msg);
}
static int wrap_reader_ioctl(
......@@ -86,20 +97,26 @@ static int wrap_reader_ioctl(
struct decimating_private *decimating = action_context->context;
if (action == SET_DECIMATION) {
decimating_messages_set_decimation decimation;
va_list va;
va_copy(va, args);
decimation.decimation = va_arg(va, int);
decimation.signature_index = signature_index;
va_end(va);
return labcomm_encode_decimating_messages_set_decimation(
decimating->decimating.writer->encoder, &decimation);
struct send_set_decimation *msg;
msg = labcomm_memory_alloc(decimating->memory, 1, sizeof(*msg));
if (msg) {
va_list va;
va_copy(va, args);
msg->decimating = decimating;
msg->set_decimation.decimation = va_arg(va, int);
msg->set_decimation.signature_index = signature_index;
va_end(va);
labcomm_scheduler_enqueue(decimating->scheduler, 0,
send_set_decimation, msg);
}
} else {
return labcomm_reader_ioctl(r, action_context->next,
signature_index, signature, action, args);
}
return 0;
}
struct labcomm_reader_action decimating_reader_action = {
......@@ -125,14 +142,10 @@ static int wrap_writer_alloc(
char *labcomm_version)
{
struct decimating_private *decimating = action_context->context;
int result;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
result = labcomm_writer_alloc(w, action_context->next, labcomm_version);
labcomm_scheduler_enqueue(decimating->scheduler,
0, register_signatures, decimating);
return result;
return labcomm_writer_alloc(w, action_context->next, labcomm_version);
}
static int wrap_writer_start(
......@@ -143,18 +156,27 @@ static int wrap_writer_start(
{
struct decimating_private *decimating = action_context->context;
struct decimation *decimation;
int result;
labcomm_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->decimation,
struct decimation, index);
decimation->current++;
if (decimation->current < decimation->n) {
return -EALREADY;
result = -EALREADY;
} else {
decimation->current = 0;
return labcomm_writer_start(w, action_context->next,
index, signature, value);
result = 0;
}
labcomm_scheduler_data_unlock(decimating->scheduler);
if (result == 0) {
result = labcomm_writer_start(w, action_context->next,
index, signature, value);
}
return result;
}
struct labcomm_writer_action decimating_writer_action = {
......
......@@ -27,7 +27,6 @@
#include "introspecting.h"
#include "gen/introspecting_messages.h"
enum status {unknown, unhandled, unregistered, registered};
struct introspecting_private {
struct introspecting introspecting;
struct labcomm_error_handler *error;
......@@ -44,19 +43,55 @@ struct introspecting_private {
});
LABCOMM_SIGNATURE_ARRAY_DEF(local,
struct local {
enum status status;
enum introspecting_status status;
struct labcomm_signature *signature;
});
};
static struct local *get_local(struct introspecting_private *introspecting,
int index,
struct labcomm_signature *signature)
{
/* Called with data_lock held */
struct local *local;
local = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->local,
struct local,
index);
if (local->signature == NULL) {
local->signature = signature;
local->status = introspecting_unknown;
}
if (local->status == introspecting_unknown) {
int i;
local->status = introspecting_unhandled;
LABCOMM_SIGNATURE_ARRAY_FOREACH(introspecting->remote, struct remote, i) {
struct remote *r;
r = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->remote, struct remote, i);
if (r->name &&
strcmp(signature->name, r->name) == 0 &&
r->size == signature->size &&
memcmp(signature->signature, r->signature, signature->size) == 0) {
local->status = introspecting_unregistered;
break;
}
}
}
return local;
}
static void handles_signature(
introspecting_messages_handles_signature *value,
void * context)
{
fprintf(stderr, "### %s %x %s\n", __FUNCTION__, value->index, value->name);
struct introspecting_private *introspecting = context;
struct remote *remote;
labcomm_scheduler_data_lock(introspecting->scheduler);
remote = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->remote,
struct remote,
......@@ -74,17 +109,17 @@ static void handles_signature(
l = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->local, struct local, i);
if (l->signature &&
l->status == unhandled &&
l->status == introspecting_unhandled &&
l->signature->name &&
strcmp(remote->name, l->signature->name) == 0 &&
remote->size == l->signature->size &&
memcmp(l->signature->signature, remote->signature,
l->signature->size) == 0) {
fprintf(stderr, "OK %s %x %x\n", __FUNCTION__, value->index, i);
l->status = unregistered;
l->status = introspecting_unregistered;
}
}
}
labcomm_scheduler_data_unlock(introspecting->scheduler);
}
static int wrap_reader_alloc(
......@@ -92,15 +127,12 @@ static int wrap_reader_alloc(
struct labcomm_reader_action_context *action_context,
char *labcomm_version)
{
int result;
struct introspecting_private *introspecting = action_context->context;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
result = labcomm_reader_alloc(r, action_context->next, labcomm_version);
labcomm_decoder_register_introspecting_messages_handles_signature(
introspecting->introspecting.reader->decoder,
handles_signature, introspecting);
return result;
return labcomm_reader_alloc(r, action_context->next, labcomm_version);
}
struct handles_signature {
......@@ -140,7 +172,6 @@ static int wrap_reader_start(
handles_signature->signature = signature;
labcomm_scheduler_enqueue(introspecting->scheduler,
0, send_handles_signature, handles_signature);
}
return labcomm_reader_start(r, action_context->next,
local_index, remote_index, signature, value);
......@@ -154,7 +185,6 @@ static int wrap_reader_start(
introspecting_messages_handles_signature handles_signature;
int index = 0;
fprintf(stderr, "## Handles %x %s\n", index, signature->name);
handles_signature.index = index;
handles_signature.name = signature->name;
handles_signature.signature.n_0 = signature->size;
......@@ -188,7 +218,6 @@ static int wrap_writer_alloc(
{
struct introspecting_private *introspecting = action_context->context;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
labcomm_scheduler_enqueue(introspecting->scheduler,
0, register_encoder_signatures, introspecting);
return labcomm_writer_alloc(w, action_context->next, labcomm_version);
......@@ -201,39 +230,42 @@ static int wrap_writer_start(
void *value)
{
struct introspecting_private *introspecting = action_context->context;
struct local *local;
fprintf(stderr, "%s %x %s\n", __FUNCTION__, index, signature->name);
local = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->local,
struct local,
index);
if (local->signature == NULL) {
local->signature = signature;
if (value == NULL) {
struct local *local;
labcomm_scheduler_data_lock(introspecting->scheduler);
local = get_local(introspecting, index, signature);
local->status = introspecting_registered;
labcomm_scheduler_data_unlock(introspecting->scheduler);
}
if (local->status == unknown) {
int i;
int found = 0;
return labcomm_writer_start(w, action_context->next, index, signature, value);
}
LABCOMM_SIGNATURE_ARRAY_FOREACH(introspecting->remote, struct remote, i) {
struct remote *r;
r = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->remote, struct remote, i);
if (r->name &&
strcmp(signature->name, r->name) == 0 &&
r->size == signature->size &&
memcmp(signature->signature, r->signature, signature->size) == 0) {
fprintf(stderr, "OK %s %x %x\n", __FUNCTION__, index, i);
found = i;
}
}
if (found == 0) {
local->status = unhandled;
static int wrap_writer_ioctl(
struct labcomm_writer *w,
struct labcomm_writer_action_context *action_context,
int index, struct labcomm_signature *signature,
uint32_t ioctl_action, va_list args)
{
struct introspecting_private *introspecting = action_context->context;
switch (ioctl_action) {
case HAS_SIGNATURE: {
struct local *local;
int result;
labcomm_scheduler_data_lock(introspecting->scheduler);
local = get_local(introspecting, index, signature);
result = local->status;
labcomm_scheduler_data_unlock(introspecting->scheduler);
return result;
}
fprintf(stderr, "Found: %d\n", found);
default: {
return labcomm_writer_ioctl(w, action_context->next, index, signature,
ioctl_action, args);
} break;
}
return labcomm_writer_start(w, action_context->next, index, signature, value);
}
struct labcomm_writer_action introspecting_writer_action = {
......@@ -242,7 +274,7 @@ struct labcomm_writer_action introspecting_writer_action = {
.start = wrap_writer_start,
.end = NULL,
.flush = NULL,
.ioctl = NULL
.ioctl = wrap_writer_ioctl
};
extern struct introspecting *introspecting_new(
......
......@@ -41,5 +41,8 @@ extern struct introspecting *introspecting_new(
struct labcomm_scheduler *scheduler);
#define HAS_SIGNATURE LABCOMM_IOS('i',2)
enum introspecting_status { introspecting_unknown,
introspecting_unhandled,
introspecting_unregistered,
introspecting_registered };
#endif
......@@ -42,9 +42,10 @@ struct client {
pthread_t decoder_thread;
struct sockaddr_in adr;
unsigned int adrlen;
int32_t A, B, Sum, Diff;
int32_t A, B, Sum, Diff, Product;
struct labcomm_decoder *decoder;
struct labcomm_encoder *encoder;
struct labcomm_scheduler *scheduler;
};
static void handle_A(int32_t *value, void *context)
......@@ -57,12 +58,25 @@ static void handle_A(int32_t *value, void *context)
static void handle_B(int32_t *value, void *context)
{
struct client *client = context;
int status;
client->B = *value;
client->Sum = client->A + client->B;
client->Diff = client->A - client->B;
labcomm_encode_types_Sum(client->encoder, &client->Sum);
labcomm_encode_types_Diff(client->encoder, &client->Diff);
status = labcomm_encoder_ioctl_types_Product(client->encoder, HAS_SIGNATURE);
switch (status) {
case introspecting_unregistered:
labcomm_encoder_register_types_Product(client->encoder);
/* fall through */
case introspecting_registered:
client->Product = client->A * client->B;
labcomm_encode_types_Product(client->encoder, &client->Product);
break;
default:
break;
}
}
static void handle_Terminate(types_Terminate *value, void *context)
......@@ -75,7 +89,6 @@ static void *run_decoder(void *arg)
struct client *client = arg;
int result;
labcomm_decoder_register_types_A(client->decoder, handle_A, client);
labcomm_decoder_register_types_B(client->decoder, handle_B, client);
labcomm_decoder_register_types_Terminate(client->decoder, handle_Terminate,
......@@ -83,6 +96,7 @@ static void *run_decoder(void *arg)
do {
result = labcomm_decoder_decode_one(client->decoder);
} while (result >= 0);
labcomm_scheduler_wakeup(client->scheduler);
return NULL;
}
......@@ -91,19 +105,18 @@ static void *run_client(void *arg)
struct client *client = arg;
struct decimating *decimating;
struct introspecting *introspecting;
struct labcomm_scheduler *scheduler;
printf("Client start\n");
client->A = 0;
client->B = 0;
scheduler = labcomm_pthread_scheduler_new(labcomm_default_memory);
client->scheduler = labcomm_pthread_scheduler_new(labcomm_default_memory);
decimating = decimating_new(labcomm_fd_reader_new(labcomm_default_memory,
client->fd, 1),
labcomm_fd_writer_new(labcomm_default_memory,
client->fd, 0),
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
client->scheduler);
if (decimating == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
......@@ -112,7 +125,7 @@ static void *run_client(void *arg)
decimating->writer,
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
client->scheduler);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
......@@ -120,15 +133,17 @@ static void *run_client(void *arg)
client->decoder = labcomm_decoder_new(introspecting->reader,
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
client->scheduler);
client->encoder = labcomm_encoder_new(introspecting->writer,
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
client->scheduler);
pthread_t rdt;
pthread_create(&rdt, NULL, run_decoder, client);
labcomm_encoder_register_types_Sum(client->encoder);
labcomm_encoder_register_types_Diff(client->encoder);
labcomm_scheduler_sleep(client->scheduler, NULL);
printf("Awoken\n");
pthread_join(rdt, NULL);
out:
printf("Client end\n");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment