Commit d5d9ee8c authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Twoway stacking example added.

Labcomm cleanup (still more to do)
parent 975762f6
.PHONY: all
all:
echo To be done...
echo $(MAKE) -C twoway e
$(MAKE) -C twoway all
.PHONY: test
test:
echo More to be done...
cd simple ; sh compile.sh && sh run.sh
$(MAKE) -C twoway test
.PHONY: clean distclean
clean distclean:
echo To be done...
$(MAKE) -C twoway clean
......@@ -6,6 +6,10 @@ CFLAGS=-O3 -g -Wall -Werror -I../../lib/c -I. -lpthread
all: $(TARGETS:%=gen/%)
test: all
LD_LIBRARY_PATH=../../lib/c ./gen/server 2000 &
LD_LIBRARY_PATH=../../lib/c ./gen/client localhost 2000
gen/.dir:
mkdir -p $@
......@@ -20,11 +24,13 @@ gen/%.o: %.c | gen/.dir
gen/%.c gen/%.h: %.lc | gen/.dir
$(LABCOMM) --c=gen/$*.c --h=gen/$*.h $<
gen/client: client.c gen/types.o gen/decimating.o gen/decimating_messages.o
$(CC) -o $@ $(CFLAGS) $^ -lpthread -L../../lib/c -llabcomm
gen/client: client.c
$(CC) -o $@ $(CFLAGS) $^ -lpthread \
-L../../lib/c -llabcomm -Tlabcomm.linkscript
gen/server: server.c gen/types.o gen/decimating.o gen/decimating_messages.o
$(CC) -o $@ $(CFLAGS) $^ -lpthread -L../../lib/c -llabcomm
gen/server: server.c gen/types.o gen/decimating.o
$(CC) -o $@ $(CFLAGS) $^ -lpthread \
-L../../lib/c -llabcomm -Tlabcomm.linkscript
.PHONY: clean
clean:
......@@ -34,4 +40,7 @@ gen/decimating.o: decimating.h
gen/decimating.o: gen/decimating_messages.h
gen/client.o: decimating.h
gen/client.o: gen/types.h
gen/client: gen/decimating.o
gen/client: gen/decimating_messages.o
gen/client: gen/types.o
gen/server: gen/decimating_messages.o
......@@ -8,6 +8,7 @@
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <labcomm.h>
#include <labcomm_fd_reader.h>
#include <labcomm_fd_writer.h>
......@@ -15,16 +16,14 @@
#include "decimating.h"
#include "gen/types.h"
static void handle_A(int32_t *value, void *context)
{
}
static void handle_B(int32_t *value, void *context)
static void handle_Sum(int32_t *value, void *context)
{
printf("A+B=%d\n", *value);
}
static void handle_C(int32_t *value, void *context)
static void handle_Diff(int32_t *value, void *context)
{
printf("A-B=%d\n", *value);
}
static void *run_decoder(void *context)
......@@ -34,7 +33,6 @@ static void *run_decoder(void *context)
do {
result = labcomm_decoder_decode_one(decoder);
printf("Got index %d", result);
} while (result >= 0);
return NULL;
}
......@@ -54,6 +52,7 @@ int main(int argc, char *argv[])
struct labcomm_lock *lock;
struct labcomm_decoder *decoder;
struct labcomm_encoder *encoder;
int32_t i, j;
hostname = argv[1];
port = atoi(argv[2]);
......@@ -101,330 +100,29 @@ int main(int argc, char *argv[])
encoder = labcomm_encoder_new(decimating->writer, lock);
pthread_t rdt;
labcomm_decoder_register_types_A(decoder, handle_A, NULL);
labcomm_decoder_register_types_B(decoder, handle_B, NULL);
labcomm_decoder_register_types_C(decoder, handle_C, NULL);
labcomm_decoder_register_types_Sum(decoder, handle_Sum, NULL);
labcomm_decoder_register_types_Diff(decoder, handle_Diff, NULL);
pthread_create(&rdt, NULL, run_decoder, decoder);
labcomm_encoder_register_types_A(encoder);
out:
return 0;
}
#if 0
#include <arpa/inet.h>
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <linux/tcp.h>
#include "labcomm.h"
#include "orca_client.h"
#include "orca_messages.h"
orca_client_t *orca_client_new_tcp(
char *hostname,
int port)
{
orca_client_t *result;
struct hostent *host;
int OK;
result = malloc(sizeof(orca_client_t));
OK = result != 0;
labcomm_encoder_register_types_B(encoder);
labcomm_encoder_register_types_Terminate(encoder);
if (OK) {
result->encoder = 0;
result->decoder = 0;
result->directory.input.n_0 = 0;
result->directory.input.a = 0;
result->directory.output.n_0 = 0;
result->directory.output.a = 0;
result->directory.parameter.n_0 = 0;
result->directory.parameter.a = 0;
result->directory.log.n_0 = 0;
result->directory.log.a = 0;
result->fd = socket(PF_INET, SOCK_STREAM, 0);
if (result->fd < 0) {
fprintf(stderr, "failed to create socket\n");
OK = 0;
}
}
if (OK) {
struct sockaddr_in adr;
int err;
adr.sin_family = AF_INET;
adr.sin_port = 0;
adr.sin_addr.s_addr = INADDR_ANY;
err = bind(result->fd, (struct sockaddr*)&adr, sizeof(adr));
if (err != 0) {
fprintf(stderr, "failed to bind socket\n");
OK = 0;
}
}
if (OK) {
host = gethostbyname(hostname);
if (!host) {
fprintf(stderr, "failed to lookup %s\n", hostname);
OK = 0;
}
}
if (OK) {
struct sockaddr_in to;
int err;
to.sin_family = AF_INET;
to.sin_port = htons(port);
bcopy((char*)host->h_addr, (char*)&to.sin_addr, host->h_length);
err = connect(result->fd, (struct sockaddr*)&to, sizeof(to));
if (err != 0) {
fprintf(stderr, "failed to connect %d@%s\n", port, hostname);
OK = 0;
}
}
if (OK) {
int nodelay = 1;
setsockopt(result->fd, IPPROTO_TCP, TCP_NODELAY,
&nodelay, sizeof(nodelay));
}
usleep(100000);
if (OK) {
result->encoder = labcomm_encoder_new(fd_writer, &result->fd);
if (!result->encoder) {
fprintf(stderr, "failed to allocate encoder\n");
OK = 0;
} else {
labcomm_encoder_register_orca_messages_select_input(result->encoder);
labcomm_encoder_register_orca_messages_select_output(result->encoder);
labcomm_encoder_register_orca_messages_select_parameter(result->encoder);
labcomm_encoder_register_orca_messages_select_log(result->encoder);
}
}
err = labcomm_decoder_ioctl_types_Sum(decoder, SET_DECIMATION, 2);
err = labcomm_decoder_ioctl_types_Diff(decoder, SET_DECIMATION, 4);
if (OK) {
result->decoder = labcomm_decoder_new(fd_reader, &result->fd);
if (!result->decoder) {
fprintf(stderr, "failed to allocate encoder\n");
OK = 0;
} else {
labcomm_decoder_register_orca_messages_directory(result->decoder,
directory_handler,
result);
labcomm_decoder_decode_one(result->decoder);
for (i = 0 ; i < 4 ; i++) {
for (j = 0 ; j < 4 ; j++) {
printf("A=%d B=%d\n", i, j);
labcomm_encode_types_A(encoder, &i);
labcomm_encode_types_B(encoder, &j);
sleep(1);
}
}
if (!OK && result) {
orca_client_free_tcp(result);
result = 0;
}
return result;
}
void orca_client_free_tcp(
struct orca_client *client)
{
close(client->fd);
fprintf(stderr, "IMPLEMENT %s\n", __FUNCTION__);
}
orca_internal_channel_t *select_tcp(
struct orca_client *client,
void (*encode)(struct labcomm_encoder *e, orca_messages_select_t *v),
orca_messages_connection_list_t *list,
orca_client_selection_t *selection,
direction_t direction,
int decimation)
{
orca_internal_channel_t *result;
int i, j, n;
n = 0;
for (i = 0 ; i < selection->n_0 ; i++) {
for (j = 0 ; j < list->n_0 ; j++) {
if (strcmp(selection->a[i], list->a[j].name) == 0) {
n++;
break;
}
}
}
fprintf(stderr, "%d matches\n", n);
if (n) {
int OK, server, port, fd;
OK = 1;
server = socket(PF_INET, SOCK_STREAM, 0);
if (server < 0) {
fprintf(stderr, "failed to create socket\n");
OK = 0;
}
if (OK) {
struct sockaddr_in adr;
int err;
adr.sin_family = AF_INET;
adr.sin_port = htons(0);
adr.sin_addr.s_addr = INADDR_ANY;
err = bind(server, (struct sockaddr*)&adr, sizeof(adr));
if (err != 0) {
fprintf(stderr, "failed to bind socket\n");
OK = 0;
}
}
if (OK) {
int err;
err = listen(server, 1);
if (err != 0) {
fprintf(stderr, "failed to listen on socket\n");
OK = 0;
}
}
if (OK) {
struct sockaddr_in adr;
unsigned int adrlen;
int err;
adrlen = sizeof(adr);
err = getsockname(server, (struct sockaddr*)&adr, &adrlen);
if (err != 0) {
OK = 0;
} else {
port = ntohs(adr.sin_port);
}
}
if (OK) {
orca_messages_select_t select;
select.port = port;
select.decimation = decimation;
select.list.n_0 = n;
select.list.a = malloc(select.list.n_0 * sizeof(*select.list.a));
if (select.list.a) {
n = 0;
for (i = 0 ; i < selection->n_0 ; i++) {
for (j = 0 ; j < list->n_0 ; j++) {
if (strcmp(selection->a[i], list->a[j].name) == 0) {
fprintf(stderr, "selection->a[%d] = %s %d\n",
n,
selection->a[i],
list->a[j].index);
select.list.a[n] = list->a[j].index;
n++;
break;
}
}
}
encode(client->encoder, &select);
}
free(select.list.a);
}
if (OK) {
struct sockaddr_in adr;
unsigned int adrlen;
adr.sin_family = AF_INET;
adr.sin_port = 0;
adr.sin_addr.s_addr = INADDR_ANY;
fprintf(stderr, "Restrict accept: %s %d\n", __FILE__, __LINE__);
//adr.sin_addr = client->remote.sin_addr;
adrlen = sizeof(adr);
fd = accept(server, (struct sockaddr*)&adr, &adrlen);
if (fd < 0) {
OK = 0;
}
}
if (OK) {
result = malloc(sizeof(orca_internal_channel_t));
if (! result) {
OK = 0;
}
}
if (OK) {
result->fd = fd;
result->channel.context = result;
if (direction == direction_read || direction == direction_read_write) {
result->channel.decoder = labcomm_decoder_new(fd_reader, &result->fd);
} else {
result->channel.decoder = 0;
}
if (direction == direction_write || direction == direction_read_write) {
result->channel.encoder = labcomm_encoder_new(fd_writer, &result->fd);
} else {
result->channel.encoder = 0;
}
}
fprintf(stderr, "CONNECTED %p %p\n",
result->channel.encoder, result->channel.decoder);
close(server);
}
return result;
}
orca_client_channel_t *orca_client_select_input_tcp(
struct orca_client *client,
orca_client_selection_t *selection)
{
orca_internal_channel_t *channel;
channel = select_tcp(client, labcomm_encode_orca_messages_select_input,
&client->directory.input, selection,
direction_write, 1);
return &channel->channel;
}
orca_client_channel_t *orca_client_select_output_tcp(
struct orca_client *client,
orca_client_selection_t *selection)
{
orca_internal_channel_t *channel;
channel = select_tcp(client, labcomm_encode_orca_messages_select_output,
&client->directory.output, selection,
direction_read, 1);
return &channel->channel;
}
orca_client_channel_t *orca_client_select_parameter_tcp(
struct orca_client *client,
orca_client_selection_t *selection)
{
orca_internal_channel_t *channel;
channel = select_tcp(client, labcomm_encode_orca_messages_select_parameter,
&client->directory.parameter, selection,
direction_read_write, 1);
return &channel->channel;
}
orca_client_channel_t *orca_client_select_log_tcp(
struct orca_client *client,
orca_client_selection_t *selection,
int decimation)
{
orca_internal_channel_t *channel;
labcomm_encode_types_Terminate(encoder, NULL);
out:
return 0;
channel = select_tcp(client, labcomm_encode_orca_messages_select_log,
&client->directory.log, selection,
direction_read, decimation);
return &channel->channel;
}
#endif
......@@ -8,7 +8,9 @@
struct decimating_private {
struct decimating decimating;
struct labcomm_encoder *encoder;
int encoder_initialized;
struct labcomm_decoder *decoder;
int decoder_initialized;
struct orig_reader {
void *context;
const struct labcomm_reader_action *action;
......@@ -17,13 +19,25 @@ struct decimating_private {
void *context;
const struct labcomm_writer_action *action;
} orig_writer;
LABCOMM_SIGNATURE_ARRAY_DEF(decimation,
struct decimation {
int n;
int current;
});
};
static void set_decimation(
decimating_messages_set_decimation *value,
void * context)
{
fprintf(stderr, "%s\n", __FUNCTION__);
struct decimating_private *decimating = context;
struct decimation *decimation;
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->decimation,
struct decimation,
value->signature_index);
decimation->n = value->decimation;
decimation->current = 0;
}
static int wrap_reader_alloc(struct labcomm_reader *r, void *context,
......@@ -34,12 +48,10 @@ static int wrap_reader_alloc(struct labcomm_reader *r, void *context,
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
fprintf(stderr, "%s\n", __FUNCTION__);
/* Stash away decoder for later use */
decimating->decoder = decoder;
result = orig_reader->action->alloc(r, orig_reader->context,
decoder, labcomm_version);
labcomm_decoder_register_decimating_messages_set_decimation(decoder,
set_decimation,
decimating);
return result;
}
......@@ -48,7 +60,6 @@ static int wrap_reader_free(struct labcomm_reader *r, void *context)
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
fprintf(stderr, "%s\n", __FUNCTION__);
return orig_reader->action->free(r, orig_reader->context);
}
......@@ -57,7 +68,11 @@ static int wrap_reader_start(struct labcomm_reader *r, void *context)
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
fprintf(stderr, "%s\n", __FUNCTION__);
if (! decimating->decoder_initialized) {
decimating->decoder_initialized = 1;
labcomm_decoder_register_decimating_messages_set_decimation(
decimating->decoder, set_decimation, decimating);
}
return orig_reader->action->start(r, orig_reader->context);
}
......@@ -66,7 +81,6 @@ static int wrap_reader_end(struct labcomm_reader *r, void *context)
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
fprintf(stderr, "%s\n", __FUNCTION__);
return orig_reader->action->end(r, orig_reader->context);
}
......@@ -76,27 +90,33 @@ static int wrap_reader_fill(struct labcomm_reader *r, void *context)
struct orig_reader *orig_reader = &decimating->orig_reader;
int result;
fprintf(stderr, "%s\n", __FUNCTION__);
fprintf(stderr, "%d\n", decimating->decimating.reader->pos);
result = orig_reader->action->fill(r, orig_reader->context);
fprintf(stderr, "%d %d\n", decimating->decimating.reader->pos, result);
return result;
}
static int wrap_reader_ioctl(struct labcomm_reader *r, void *context,
int action,
struct labcomm_signature *signature, va_list args)
int signature_index,
struct labcomm_signature *signature,
uint32_t action, va_list args)
{
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
fprintf(stderr, "%s\n", __FUNCTION__);
if (orig_reader->action->ioctl == NULL) {
return -ENOTSUP;
if (action == SET_DECIMATION) {
decimating_messages_set_decimation decimation;
va_list va;
va_copy(va, args);
decimation.decimation = va_arg(va, int);
decimation.signature_index = signature_index;
va_end(va);
return labcomm_encode_decimating_messages_set_decimation(
decimating->encoder, &decimation);
} else {
return orig_reader->action->ioctl(r, orig_reader->context,
action, signature, args);
signature_index, signature, action, args);
}
}
struct labcomm_reader_action decimating_reader_action = {
......@@ -108,6 +128,90 @@ struct labcomm_reader_action decimating_reader_action = {
.ioctl = wrap_reader_ioctl
};
int wrap_writer_alloc(struct labcomm_writer *w, void *context,
struct labcomm_encoder *encoder, char *labcomm_version)
{
struct decimating_private *decimating = context;
struct orig_writer *orig_writer = &decimating->orig_writer;
/* Stash away encoder for later use */
decimating->encoder = encoder;
return orig_writer->action->alloc(w, orig_writer->context,
encoder, labcomm_version);
}
int wrap_writer_free(struct labcomm_writer *w, void *context)
{
struct decimating_private *decimating = context;
struct orig_writer *orig_writer = &decimating->orig_writer;
return orig_writer->action->free(w, orig_writer->context);
}
int wrap_writer_start(struct labcomm_writer *w, void *context,
struct labcomm_encoder *encoder,
int index, struct labcomm_signature *signature,
void *value)
{
struct decimating_private *decimating = context;
struct orig_writer *orig_writer = &decimating->orig_writer;
struct decimation *decimation;
if (! decimating->encoder_initialized) {
decimating->encoder_initialized = 1;
labcomm_encoder_register_decimating_messages_set_decimation(
decimating->encoder);
}
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->decimation,
struct decimation, index);
decimation->current++;
if (decimation->current < decimation->n) {
return -EALREADY;
} else {
decimation->current = 0;
return orig_writer->action->start(w, orig_writer->context,
encoder, index, signature, value);
}
}
int wrap_writer_end(struct labcomm_writer *w, void *context)
{
struct decimating_private *decimating = context;
struct orig_writer *orig_writer = &decimating->orig_writer;
return orig_writer->action->end(w, orig_writer->context);
}
int wrap_writer_flush(struct labcomm_writer *w, void *context)
{
struct decimating_private *decimating = context;
struct orig_writer *orig_writer = &decimating->orig_writer;
return orig_writer->action->flush(w, orig_writer->context);
}
int wrap_writer_ioctl(struct labcomm_writer *w, void *context,
int signature_index,
struct labcomm_signature *signature,
uint32_t action, va_list args)
{