Commit 40b94830 authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Changed decoder to call reader->start both at registration and

reception of remote declaration. Corrected twoway/decimating.c
to take advantage of this.
parent 688412ff
......@@ -85,6 +85,7 @@ int main(int argc, char *argv[])
struct labcomm_scheduler *scheduler;
struct labcomm_decoder *decoder;
struct labcomm_encoder *encoder;
struct labcomm_time *next;
int32_t i, j;
hostname = argv[1];
......@@ -157,12 +158,10 @@ int main(int argc, char *argv[])
labcomm_encoder_register_types_B(encoder);
labcomm_encoder_register_types_Terminate(encoder);
/* Sleep to make all remote types be known to introspecting and decimating
wrappers, this is a HACK! */
sleep(1);
err = labcomm_decoder_ioctl_types_Sum(decoder, SET_DECIMATION, 2);
err = labcomm_decoder_ioctl_types_Diff(decoder, SET_DECIMATION, 4);
next = labcomm_scheduler_now(scheduler);
for (i = 0 ; i < 4 ; i++) {
if (i == 2) {
labcomm_decoder_register_types_Product(decoder, handle_Product, NULL);
......@@ -171,7 +170,8 @@ int main(int argc, char *argv[])
printf("\nA=%d B=%d: ", i, j);
labcomm_encode_types_A(encoder, &i);
labcomm_encode_types_B(encoder, &j);
sleep(1);
labcomm_time_add_usec(next, 100000);
labcomm_scheduler_sleep(scheduler, next);
}
}
printf("\n");
......
......@@ -35,11 +35,12 @@ struct decimating_private {
int encoder_initialized;
struct labcomm_reader_action_context reader_action_context;
struct labcomm_writer_action_context writer_action_context;
LABCOMM_SIGNATURE_ARRAY_DEF(decimation,
LABCOMM_SIGNATURE_ARRAY_DEF(writer_decimation,
struct decimation {
int n;
int current;
});
LABCOMM_SIGNATURE_ARRAY_DEF(reader_decimation, int);
};
static void set_decimation(
......@@ -51,7 +52,7 @@ static void set_decimation(
labcomm_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->decimation,
decimating->writer_decimation,
struct decimation,
value->signature_index);
decimation->n = value->decimation;
......@@ -87,34 +88,81 @@ static void send_set_decimation(void *arg)
labcomm_memory_free(memory, 1, msg);
}
static void enqueue_decimation(struct decimating_private *decimating,
int remote_index,
int amount)
{
struct send_set_decimation *msg;
msg = labcomm_memory_alloc(decimating->memory, 1, sizeof(*msg));
if (msg) {
msg->decimating = decimating;
msg->set_decimation.decimation = amount;
msg->set_decimation.signature_index = remote_index;
labcomm_scheduler_enqueue(decimating->scheduler, 0,
send_set_decimation, msg);
}
}
static int wrap_reader_start(
struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
int local_index, int remote_index, struct labcomm_signature *signature,
void *value)
{
struct decimating_private *decimating = action_context->context;
if (value == NULL) {
int *decimation, amount;
labcomm_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->reader_decimation,
int,
local_index);
amount = *decimation;
labcomm_scheduler_data_unlock(decimating->scheduler);
if (remote_index != 0 && amount != 0) {
enqueue_decimation(decimating, remote_index, amount);
}
}
return labcomm_reader_start(r, action_context->next,
local_index, remote_index, signature, value);
}
static int wrap_reader_ioctl(
struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
int signature_index,
int local_index, int remote_index,
struct labcomm_signature *signature,
uint32_t action, va_list args)
{
struct decimating_private *decimating = action_context->context;
if (action == SET_DECIMATION) {
struct send_set_decimation *msg;
msg = labcomm_memory_alloc(decimating->memory, 1, sizeof(*msg));
if (msg) {
va_list va;
va_copy(va, args);
msg->decimating = decimating;
msg->set_decimation.decimation = va_arg(va, int);
msg->set_decimation.signature_index = signature_index;
va_end(va);
labcomm_scheduler_enqueue(decimating->scheduler, 0,
send_set_decimation, msg);
va_list va;
int amount;
int *decimation;
va_copy(va, args);
amount = va_arg(va, int);
va_end(va);
labcomm_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->reader_decimation,
int,
local_index);
*decimation = amount;
labcomm_scheduler_data_unlock(decimating->scheduler);
if (remote_index) {
enqueue_decimation(decimating, remote_index, amount);
}
} else {
return labcomm_reader_ioctl(r, action_context->next,
signature_index, signature, action, args);
local_index, remote_index, signature,
action, args);
}
return 0;
}
......@@ -122,7 +170,7 @@ static int wrap_reader_ioctl(
struct labcomm_reader_action decimating_reader_action = {
.alloc = wrap_reader_alloc,
.free = NULL,
.start = NULL,
.start = wrap_reader_start,
.end = NULL,
.fill = NULL,
.ioctl = wrap_reader_ioctl
......@@ -161,7 +209,7 @@ static int wrap_writer_start(
labcomm_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->decimation,
decimating->writer_decimation,
struct decimation, index);
decimation->current++;
if (decimation->current < decimation->n) {
......@@ -221,7 +269,8 @@ struct decimating *decimating_new(
result->error = error;
result->memory = memory;
result->scheduler = scheduler;
LABCOMM_SIGNATURE_ARRAY_INIT(result->decimation, struct decimation);
LABCOMM_SIGNATURE_ARRAY_INIT(result->writer_decimation, struct decimation);
LABCOMM_SIGNATURE_ARRAY_INIT(result->reader_decimation, int);
goto out_ok;
......
......@@ -20,6 +20,7 @@
*/
#include <arpa/inet.h>
#include <linux/tcp.h>
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
......@@ -61,10 +62,6 @@ static void handle_B(int32_t *value, void *context)
int status;
client->B = *value;
client->Sum = client->A + client->B;
client->Diff = client->A - client->B;
labcomm_encode_types_Sum(client->encoder, &client->Sum);
labcomm_encode_types_Diff(client->encoder, &client->Diff);
status = labcomm_encoder_ioctl_types_Product(client->encoder, HAS_SIGNATURE);
switch (status) {
case introspecting_unregistered:
......@@ -77,6 +74,10 @@ static void handle_B(int32_t *value, void *context)
default:
break;
}
client->Sum = client->A + client->B;
client->Diff = client->A - client->B;
labcomm_encode_types_Sum(client->encoder, &client->Sum);
labcomm_encode_types_Diff(client->encoder, &client->Diff);
}
static void handle_Terminate(types_Terminate *value, void *context)
......@@ -183,6 +184,7 @@ int main(int argc, char *argv[])
}
while (1) {
struct client *client;
int nodelay;
client = malloc(sizeof(*client));
if (client == NULL) {
......@@ -198,6 +200,8 @@ int main(int argc, char *argv[])
result = errno;
goto failed_to_accept;
}
nodelay = 1;
setsockopt(client->fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
pthread_create(&client->main_thread, NULL, run_client, client);
}
......
......@@ -85,11 +85,12 @@ int labcomm_reader_fill(struct labcomm_reader *r,
int labcomm_reader_ioctl(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
int index,
int local_index, int remote_index,
struct labcomm_signature *signature,
uint32_t ioctl_action, va_list args)
{
UNWRAP(ioctl, r, action_context, index, signature, ioctl_action, args);
UNWRAP(ioctl, r, action_context,
local_index, remote_index, signature, ioctl_action, args);
}
int labcomm_writer_alloc(struct labcomm_writer *w,
......
......@@ -175,9 +175,11 @@ static int decode_typedef_or_sample(struct labcomm_decoder *d, int kind)
.pos = 0,
.error = 0,
};
struct labcomm_signature signature;
int remote_index, err;
struct labcomm_signature signature, *local_signature;
int remote_index, local_index, err;
local_signature = NULL;
local_index = 0;
labcomm_writer_alloc(&writer, writer.action_context, "");
labcomm_writer_start(&writer, writer.action_context, 0, NULL, NULL);
remote_index = labcomm_read_packed32(d->reader); //int
......@@ -219,6 +221,8 @@ static int decode_typedef_or_sample(struct labcomm_decoder *d, int kind)
bcmp((void*)s->signature->signature, (void*)signature.signature,
signature.size) == 0) {
s->remote_index = remote_index;
local_signature = s->signature;
local_index = i;
remote_to_local = LABCOMM_SIGNATURE_ARRAY_REF(d->memory,
d->remote_to_local, int,
remote_index);
......@@ -228,6 +232,12 @@ static int decode_typedef_or_sample(struct labcomm_decoder *d, int kind)
}
}
labcomm_scheduler_data_unlock(d->scheduler);
if (local_signature) {
labcomm_reader_start(d->reader, d->reader->action_context,
local_index, remote_index, local_signature,
NULL);
labcomm_reader_end(d->reader, d->reader->action_context);
}
}
#if 0
if (! entry) {
......@@ -344,7 +354,7 @@ int labcomm_decoder_ioctl(struct labcomm_decoder *d,
va_start(va, action);
result = labcomm_reader_ioctl(d->reader,
d->reader->action_context,
0, NULL, action, va);
0, 0, NULL, action, va);
va_end(va);
return result;
}
......@@ -363,12 +373,9 @@ int labcomm_internal_decoder_ioctl(struct labcomm_decoder *d,
struct sample_entry,
local_index)->remote_index;
labcomm_scheduler_data_unlock(d->scheduler);
if (remote_index == 0) {
result = -EAGAIN;
} else {
result = labcomm_reader_ioctl(d->reader, d->reader->action_context,
remote_index, signature, action, va);
}
result = labcomm_reader_ioctl(d->reader, d->reader->action_context,
local_index, remote_index,
signature, action, va);
return result;
}
......
......@@ -113,8 +113,13 @@ struct labcomm_reader_action {
*/
int (*free)(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context);
/* 'start' is called right after a sample has arrived. In the case of
a sample or typedef, 'value' == NULL.
/* 'start' is called at the following instances:
1. When a sample is registered
(local_index != 0, remote_index == 0, value == NULL)
2. When a sample definition is received
(local_index != 0, remote_index != 0, value == NULL)
3. When a sample is received
(local_index != 0, remote_index != 0, value != NULL)
*/
int (*start)(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
......@@ -127,7 +132,8 @@ struct labcomm_reader_action {
struct labcomm_reader_action_context *action_context);
int (*ioctl)(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
int index, struct labcomm_signature *signature,
int local_index, int remote_index,
struct labcomm_signature *signature,
uint32_t ioctl_action, va_list args);
};
......@@ -165,7 +171,8 @@ int labcomm_reader_fill(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context);
int labcomm_reader_ioctl(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
int index, struct labcomm_signature *signature,
int local_index, int remote_index,
struct labcomm_signature *signature,
uint32_t ioctl_action, va_list args);
/*
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment