Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • anders_blomdell/labcomm
  • klaren/labcomm
  • tommyo/labcomm
  • erikj/labcomm
  • sven/labcomm
5 results
Show changes
Showing
with 1324 additions and 1637 deletions
package labcommTCPtest.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import se.lth.control.labcomm2014.DecoderChannel;
import se.lth.control.labcomm2014.EncoderChannel;
import labcommTCPtest.gen.FooSample;
import labcommTCPtest.gen.FooSample.Handler;
public class TestServer implements Handler {
private OutputStream out;
private InputStream in;
public static void main(String a[]) {
try {
ServerSocket ss = new ServerSocket(9999);
Socket s = ss.accept();
TestServer ts = new TestServer(s);
ts.runOne();
} catch (IOException e) {
e.printStackTrace();
}
}
public TestServer(Socket s) throws IOException {
out = s.getOutputStream();
in = s.getInputStream();
}
public void runOne() {
try {
DecoderChannel c = new DecoderChannel(in);
FooSample.register(c,this);
c.runOne();
} catch (Exception e) {
e.printStackTrace();
}
}
public void handle_FooSample(FooSample sample) throws Exception {
EncoderChannel e = new EncoderChannel(out );
FooSample.register(e);
System.out.println("TestServer.handle_FooSample: "+sample.s);
int tmp[] = new int[2*sample.a.length];
for (int i = 0; i < sample.a.length;i++) {
tmp[2*i] = tmp[2*i+1] = sample.a[i];
}
sample.s = "double!";
sample.x *= 2;
sample.y *= 2;
sample.a = tmp;
sample.t *= 2;
sample.d *= 2;
FooSample.encode(e, sample);
}
}
#!/bin/sh
PYTHONPATH=../../lib/python ./example_tcp_client_decoder.py $@
import socket
import types
def flush(self):
pass
socket.socket.write = socket.socket.send
socket.socket.read = socket.socket.recv
socket.socket.flush = flush
# class rwsocket(socket.socket):
# write=socket.socket.send
# read=socket.socket.recv
# def flush(self):
# pass
# def accept(self):
# sock, addr = super(rwsocket, self).accept()
# # sock.write = types.MethodType(self.write, sock, sock.__class__)
# # sock.read = types.MethodType(self.write, sock, sock.__class__)
# sock.__class__ = rwsocket
# return (sock, addr)
sample struct {
string s;
int x;
int y;
int a[_];
long t;
double d;
} FooSample;
gen
UNAME_S=$(shell uname -s)
TARGETS=client server
LABCOMM_JAR=../../compiler/labComm.jar
LABCOMM=java -jar $(LABCOMM_JAR)
LABCOMM_JAR=../../compiler/labcomm2014_compiler.jar
LABCOMM=java -jar $(LABCOMM_JAR)
#include ../../lib/c/os_compat.mk
CFLAGS=-O3 -g -Wall -Werror -I../../lib/c -I. -lpthread
CFLAGS=-O3 -g -Wall -Werror -I../../lib/c/2014 -I. -Wno-unused-function
ifeq ($(UNAME_S),Darwin)
CFLAGS+=-DLABCOMM_COMPAT=\"labcomm2014_compat_osx.h\" -DLABCOMM_OS_DARWIN=1
else
CFLAGS+=-Wno-tautological-compare
endif
all: $(TARGETS:%=gen/%)
test: all
LD_LIBRARY_PATH=../../lib/c ./gen/server 2000 &
LD_LIBRARY_PATH=../../lib/c ./gen/client localhost 2000
gen/.dir:
mkdir -p $@
.PRECIOUS: gen/%.o
gen/%.o: gen/%.c | gen/.dir
$(CC) $(CFLAGS) -c -o $@ $<
$(CC) $(CFLAGS) -c -o $@ $<
gen/%.o: %.c | gen/.dir
$(CC) $(CFLAGS) -c -o $@ $<
$(CC) $(CFLAGS) -c -o $@ $<
.PRECIOUS: gen/%.c gen/%.h
gen/%.c gen/%.h: %.lc | gen/.dir
$(LABCOMM) --c=gen/$*.c --h=gen/$*.h $<
gen/client: client.c gen/types.o gen/decimating.o gen/decimating_messages.o
$(CC) -o $@ $(CFLAGS) $^ -lpthread -L../../lib/c -llabcomm
gen/client: client.c
$(CC) -o $@ $(CFLAGS) $^ -lpthread \
-L../../lib/c -llabcomm2014
gen/server: server.c gen/types.o gen/decimating.o gen/decimating_messages.o
$(CC) -o $@ $(CFLAGS) $^ -lpthread -L../../lib/c -llabcomm
gen/server: server.c
$(CC) -o $@ $(CFLAGS) $^ -lpthread \
-L../../lib/c -llabcomm2014
.PHONY: clean
clean:
.PHONY: clean distclean
clean distclean:
rm -rf gen
gen/decimating.o: decimating.h
gen/decimating.o: gen/decimating_messages.h
gen/introspecting.o: introspecting.h
gen/introspecting.o: gen/introspecting_messages.h
gen/client.o: decimating.h
gen/client.o: gen/types.h
gen/client: gen/decimating.o
gen/client: gen/decimating_messages.o
gen/client: gen/introspecting.o
gen/client: gen/introspecting_messages.o
gen/client: gen/types.o
gen/server: gen/types.o
gen/server: gen/decimating.o
gen/server: gen/decimating_messages.o
gen/server: gen/introspecting.o
gen/server: gen/introspecting_messages.o
/*
client.c -- LabComm example of using stacked readers/writers.
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include <arpa/inet.h>
#ifndef LABCOMM_OS_DARWIN
#include <linux/tcp.h>
#else
#include <netinet/in.h>
#include <netinet/tcp.h>
#endif
#include <netdb.h>
#include <pthread.h>
#include <stdio.h>
......@@ -8,33 +35,41 @@
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <labcomm.h>
#include <labcomm_fd_reader.h>
#include <labcomm_fd_writer.h>
#include <labcomm_pthread_mutex_lock.h>
#include <unistd.h>
#include <labcomm2014.h>
#include <labcomm2014_fd_reader.h>
#include <labcomm2014_fd_writer.h>
#include <labcomm2014_default_error_handler.h>
#include <labcomm2014_default_memory.h>
#include <labcomm2014_pthread_scheduler.h>
#include "decimating.h"
#include "introspecting.h"
#include "gen/types.h"
static void handle_A(int32_t *value, void *context)
static void handle_Sum(int32_t *value, void *context)
{
printf("A+B=%d ", *value);
}
static void handle_B(int32_t *value, void *context)
static void handle_Diff(int32_t *value, void *context)
{
printf("A-B=%d ", *value);
}
static void handle_C(int32_t *value, void *context)
static void handle_Product(int32_t *value, void *context)
{
printf("A*B=%d ", *value);
}
static void *run_decoder(void *context)
{
struct labcomm_decoder *decoder = context;
struct labcomm2014_decoder *decoder = context;
int result;
labcomm2014_decoder_register_types_Sum(decoder, handle_Sum, NULL);
labcomm2014_decoder_register_types_Diff(decoder, handle_Diff, NULL);
do {
result = labcomm_decoder_decode_one(decoder);
printf("Got index %d", result);
result = labcomm2014_decoder_decode_one(decoder);
} while (result >= 0);
return NULL;
}
......@@ -49,11 +84,14 @@ int main(int argc, char *argv[])
struct sockaddr_in to;
int nodelay;
struct decimating *decimating;
struct introspecting *introspecting;
char *hostname;
int port;
struct labcomm_lock *lock;
struct labcomm_decoder *decoder;
struct labcomm_encoder *encoder;
struct labcomm2014_scheduler *scheduler;
struct labcomm2014_decoder *decoder;
struct labcomm2014_encoder *encoder;
struct labcomm2014_time *next;
int32_t i, j;
hostname = argv[1];
port = atoi(argv[2]);
......@@ -90,341 +128,61 @@ int main(int argc, char *argv[])
nodelay = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
lock = labcomm_pthread_mutex_lock_new();
decimating = decimating_new(labcomm_fd_reader_new(fd, 1),
labcomm_fd_writer_new(fd, 0),
lock);
scheduler = labcomm2014_pthread_scheduler_new(labcomm2014_default_memory);
decimating = decimating_new(labcomm2014_fd_reader_new(labcomm2014_default_memory,
fd, 1),
labcomm2014_fd_writer_new(labcomm2014_default_memory,
fd, 0),
labcomm2014_default_error_handler,
labcomm2014_default_memory,
scheduler);
if (decimating == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
decoder = labcomm_decoder_new(decimating->reader, lock);
encoder = labcomm_encoder_new(decimating->writer, lock);
introspecting = introspecting_new(decimating->reader,
decimating->writer,
labcomm2014_default_error_handler,
labcomm2014_default_memory,
scheduler);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
decoder = labcomm2014_decoder_new(introspecting->reader,
labcomm2014_default_error_handler,
labcomm2014_default_memory,
scheduler);
encoder = labcomm2014_encoder_new(introspecting->writer,
labcomm2014_default_error_handler,
labcomm2014_default_memory,
scheduler);
pthread_t rdt;
labcomm_decoder_register_types_A(decoder, handle_A, NULL);
labcomm_decoder_register_types_B(decoder, handle_B, NULL);
labcomm_decoder_register_types_C(decoder, handle_C, NULL);
pthread_create(&rdt, NULL, run_decoder, decoder);
labcomm_encoder_register_types_A(encoder);
out:
return 0;
}
labcomm2014_encoder_register_types_A(encoder);
labcomm2014_encoder_register_types_B(encoder);
labcomm2014_encoder_register_types_Terminate(encoder);
#if 0
err = labcomm2014_decoder_ioctl_types_Sum(decoder, SET_DECIMATION, 2);
err = labcomm2014_decoder_ioctl_types_Diff(decoder, SET_DECIMATION, 4);
#include <arpa/inet.h>
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <linux/tcp.h>
#include "labcomm.h"
#include "orca_client.h"
#include "orca_messages.h"
orca_client_t *orca_client_new_tcp(
char *hostname,
int port)
{
orca_client_t *result;
struct hostent *host;
int OK;
result = malloc(sizeof(orca_client_t));
OK = result != 0;
if (OK) {
result->encoder = 0;
result->decoder = 0;
result->directory.input.n_0 = 0;
result->directory.input.a = 0;
result->directory.output.n_0 = 0;
result->directory.output.a = 0;
result->directory.parameter.n_0 = 0;
result->directory.parameter.a = 0;
result->directory.log.n_0 = 0;
result->directory.log.a = 0;
result->fd = socket(PF_INET, SOCK_STREAM, 0);
if (result->fd < 0) {
fprintf(stderr, "failed to create socket\n");
OK = 0;
}
}
if (OK) {
struct sockaddr_in adr;
int err;
adr.sin_family = AF_INET;
adr.sin_port = 0;
adr.sin_addr.s_addr = INADDR_ANY;
err = bind(result->fd, (struct sockaddr*)&adr, sizeof(adr));
if (err != 0) {
fprintf(stderr, "failed to bind socket\n");
OK = 0;
next = labcomm2014_scheduler_now(scheduler);
for (i = 0 ; i < 4 ; i++) {
if (i == 2) {
labcomm2014_decoder_register_types_Product(decoder, handle_Product, NULL);
}
}
if (OK) {
host = gethostbyname(hostname);
if (!host) {
fprintf(stderr, "failed to lookup %s\n", hostname);
OK = 0;
for (j = 0 ; j < 4 ; j++) {
printf("\nA=%d B=%d: ", i, j);
labcomm2014_encode_types_A(encoder, &i);
labcomm2014_encode_types_B(encoder, &j);
labcomm2014_time_add_usec(next, 100000);
labcomm2014_scheduler_sleep(scheduler, next);
}
}
if (OK) {
struct sockaddr_in to;
int err;
to.sin_family = AF_INET;
to.sin_port = htons(port);
bcopy((char*)host->h_addr, (char*)&to.sin_addr, host->h_length);
err = connect(result->fd, (struct sockaddr*)&to, sizeof(to));
if (err != 0) {
fprintf(stderr, "failed to connect %d@%s\n", port, hostname);
OK = 0;
}
}
if (OK) {
int nodelay = 1;
setsockopt(result->fd, IPPROTO_TCP, TCP_NODELAY,
&nodelay, sizeof(nodelay));
}
if (OK) {
result->encoder = labcomm_encoder_new(fd_writer, &result->fd);
if (!result->encoder) {
fprintf(stderr, "failed to allocate encoder\n");
OK = 0;
} else {
labcomm_encoder_register_orca_messages_select_input(result->encoder);
labcomm_encoder_register_orca_messages_select_output(result->encoder);
labcomm_encoder_register_orca_messages_select_parameter(result->encoder);
labcomm_encoder_register_orca_messages_select_log(result->encoder);
}
}
if (OK) {
result->decoder = labcomm_decoder_new(fd_reader, &result->fd);
if (!result->decoder) {
fprintf(stderr, "failed to allocate encoder\n");
OK = 0;
} else {
labcomm_decoder_register_orca_messages_directory(result->decoder,
directory_handler,
result);
labcomm_decoder_decode_one(result->decoder);
}
}
if (!OK && result) {
orca_client_free_tcp(result);
result = 0;
}
return result;
}
void orca_client_free_tcp(
struct orca_client *client)
{
close(client->fd);
fprintf(stderr, "IMPLEMENT %s\n", __FUNCTION__);
}
orca_internal_channel_t *select_tcp(
struct orca_client *client,
void (*encode)(struct labcomm_encoder *e, orca_messages_select_t *v),
orca_messages_connection_list_t *list,
orca_client_selection_t *selection,
direction_t direction,
int decimation)
{
orca_internal_channel_t *result;
int i, j, n;
n = 0;
for (i = 0 ; i < selection->n_0 ; i++) {
for (j = 0 ; j < list->n_0 ; j++) {
if (strcmp(selection->a[i], list->a[j].name) == 0) {
n++;
break;
}
}
}
fprintf(stderr, "%d matches\n", n);
if (n) {
int OK, server, port, fd;
OK = 1;
server = socket(PF_INET, SOCK_STREAM, 0);
if (server < 0) {
fprintf(stderr, "failed to create socket\n");
OK = 0;
}
if (OK) {
struct sockaddr_in adr;
int err;
adr.sin_family = AF_INET;
adr.sin_port = htons(0);
adr.sin_addr.s_addr = INADDR_ANY;
err = bind(server, (struct sockaddr*)&adr, sizeof(adr));
if (err != 0) {
fprintf(stderr, "failed to bind socket\n");
OK = 0;
}
}
if (OK) {
int err;
err = listen(server, 1);
if (err != 0) {
fprintf(stderr, "failed to listen on socket\n");
OK = 0;
}
}
if (OK) {
struct sockaddr_in adr;
unsigned int adrlen;
int err;
adrlen = sizeof(adr);
err = getsockname(server, (struct sockaddr*)&adr, &adrlen);
if (err != 0) {
OK = 0;
} else {
port = ntohs(adr.sin_port);
}
}
if (OK) {
orca_messages_select_t select;
select.port = port;
select.decimation = decimation;
select.list.n_0 = n;
select.list.a = malloc(select.list.n_0 * sizeof(*select.list.a));
if (select.list.a) {
n = 0;
for (i = 0 ; i < selection->n_0 ; i++) {
for (j = 0 ; j < list->n_0 ; j++) {
if (strcmp(selection->a[i], list->a[j].name) == 0) {
fprintf(stderr, "selection->a[%d] = %s %d\n",
n,
selection->a[i],
list->a[j].index);
select.list.a[n] = list->a[j].index;
n++;
break;
}
}
}
encode(client->encoder, &select);
}
free(select.list.a);
}
if (OK) {
struct sockaddr_in adr;
unsigned int adrlen;
adr.sin_family = AF_INET;
adr.sin_port = 0;
adr.sin_addr.s_addr = INADDR_ANY;
fprintf(stderr, "Restrict accept: %s %d\n", __FILE__, __LINE__);
//adr.sin_addr = client->remote.sin_addr;
adrlen = sizeof(adr);
fd = accept(server, (struct sockaddr*)&adr, &adrlen);
if (fd < 0) {
OK = 0;
}
}
if (OK) {
result = malloc(sizeof(orca_internal_channel_t));
if (! result) {
OK = 0;
}
}
if (OK) {
result->fd = fd;
result->channel.context = result;
if (direction == direction_read || direction == direction_read_write) {
result->channel.decoder = labcomm_decoder_new(fd_reader, &result->fd);
} else {
result->channel.decoder = 0;
}
if (direction == direction_write || direction == direction_read_write) {
result->channel.encoder = labcomm_encoder_new(fd_writer, &result->fd);
} else {
result->channel.encoder = 0;
}
}
fprintf(stderr, "CONNECTED %p %p\n",
result->channel.encoder, result->channel.decoder);
close(server);
}
return result;
}
orca_client_channel_t *orca_client_select_input_tcp(
struct orca_client *client,
orca_client_selection_t *selection)
{
orca_internal_channel_t *channel;
channel = select_tcp(client, labcomm_encode_orca_messages_select_input,
&client->directory.input, selection,
direction_write, 1);
return &channel->channel;
}
orca_client_channel_t *orca_client_select_output_tcp(
struct orca_client *client,
orca_client_selection_t *selection)
{
orca_internal_channel_t *channel;
channel = select_tcp(client, labcomm_encode_orca_messages_select_output,
&client->directory.output, selection,
direction_read, 1);
return &channel->channel;
}
orca_client_channel_t *orca_client_select_parameter_tcp(
struct orca_client *client,
orca_client_selection_t *selection)
{
orca_internal_channel_t *channel;
channel = select_tcp(client, labcomm_encode_orca_messages_select_parameter,
&client->directory.parameter, selection,
direction_read_write, 1);
return &channel->channel;
}
orca_client_channel_t *orca_client_select_log_tcp(
struct orca_client *client,
orca_client_selection_t *selection,
int decimation)
{
orca_internal_channel_t *channel;
printf("\n");
labcomm2014_encode_types_Terminate(encoder);
out:
return 0;
channel = select_tcp(client, labcomm_encode_orca_messages_select_log,
&client->directory.log, selection,
direction_read, decimation);
return &channel->channel;
}
#endif
/*
decimating.c -- LabComm example of a twoway stacked decimation
reader/writer.
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include "labcomm_private.h"
#include "labcomm2014_private.h"
#include "decimating.h"
#include "gen/decimating_messages.h"
struct decimating_private {
struct decimating decimating;
struct labcomm_encoder *encoder;
struct labcomm_decoder *decoder;
struct orig_reader {
void *context;
const struct labcomm_reader_action *action;
} orig_reader;
struct orig_writer {
void *context;
const struct labcomm_writer_action *action;
} orig_writer;
struct labcomm2014_error_handler *error;
struct labcomm2014_memory *memory;
struct labcomm2014_scheduler *scheduler;
int encoder_initialized;
struct labcomm2014_reader_action_context reader_action_context;
struct labcomm2014_writer_action_context writer_action_context;
LABCOMM_SIGNATURE_ARRAY_DEF(writer_decimation,
struct decimation {
int n;
int current;
});
LABCOMM_SIGNATURE_ARRAY_DEF(reader_decimation, int);
};
static void set_decimation(
decimating_messages_set_decimation *value,
void * context)
{
fprintf(stderr, "%s\n", __FUNCTION__);
struct decimating_private *decimating = context;
struct decimation *decimation;
labcomm2014_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->writer_decimation,
struct decimation,
value->signature_index);
decimation->n = value->decimation;
decimation->current = 0;
labcomm2014_scheduler_data_unlock(decimating->scheduler);
}
static int wrap_reader_alloc(struct labcomm_reader *r, void *context,
struct labcomm_decoder *decoder,
char *labcomm_version)
static int wrap_reader_alloc(
struct labcomm2014_reader *r,
struct labcomm2014_reader_action_context *action_context)
{
int result;
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
fprintf(stderr, "%s\n", __FUNCTION__);
result = orig_reader->action->alloc(r, orig_reader->context,
decoder, labcomm_version);
labcomm_decoder_register_decimating_messages_set_decimation(decoder,
set_decimation,
decimating);
return result;
struct decimating_private *decimating = action_context->context;
labcomm2014_decoder_register_decimating_messages_set_decimation(
r->decoder, set_decimation, decimating);
return labcomm2014_reader_alloc(r, action_context->next);
}
static int wrap_reader_free(struct labcomm_reader *r, void *context)
struct send_set_decimation {
struct decimating_private *decimating;
decimating_messages_set_decimation set_decimation;
};
static void send_set_decimation(void *arg)
{
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
struct send_set_decimation *msg = arg;
struct labcomm2014_memory *memory = msg->decimating->memory;
fprintf(stderr, "%s\n", __FUNCTION__);
return orig_reader->action->free(r, orig_reader->context);
labcomm2014_encode_decimating_messages_set_decimation(
msg->decimating->decimating.writer->encoder, &msg->set_decimation);
labcomm2014_memory_free(memory, 1, msg);
}
static int wrap_reader_start(struct labcomm_reader *r, void *context)
static void enqueue_decimation(struct decimating_private *decimating,
int remote_index,
int amount)
{
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
struct send_set_decimation *msg;
msg = labcomm2014_memory_alloc(decimating->memory, 1, sizeof(*msg));
if (msg) {
msg->decimating = decimating;
msg->set_decimation.decimation = amount;
msg->set_decimation.signature_index = remote_index;
labcomm2014_scheduler_enqueue(decimating->scheduler, 0,
send_set_decimation, msg);
}
}
fprintf(stderr, "%s\n", __FUNCTION__);
return orig_reader->action->start(r, orig_reader->context);
static int wrap_reader_start(
struct labcomm2014_reader *r,
struct labcomm2014_reader_action_context *action_context,
int local_index, int remote_index, const struct labcomm2014_signature *signature,
void *value)
{
struct decimating_private *decimating = action_context->context;
if (value == NULL) {
int *decimation, amount;
labcomm2014_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->reader_decimation,
int,
local_index);
amount = *decimation;
labcomm2014_scheduler_data_unlock(decimating->scheduler);
if (remote_index != 0 && amount != 0) {
enqueue_decimation(decimating, remote_index, amount);
}
}
return labcomm2014_reader_start(r, action_context->next,
local_index, remote_index, signature, value);
}
static int wrap_reader_end(struct labcomm_reader *r, void *context)
static int wrap_reader_ioctl(
struct labcomm2014_reader *r,
struct labcomm2014_reader_action_context *action_context,
int local_index, int remote_index,
const struct labcomm2014_signature *signature,
uint32_t action, va_list args)
{
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
struct decimating_private *decimating = action_context->context;
if (action == SET_DECIMATION) {
va_list va;
int amount;
int *decimation;
fprintf(stderr, "%s\n", __FUNCTION__);
return orig_reader->action->end(r, orig_reader->context);
va_copy(va, args);
amount = va_arg(va, int);
va_end(va);
labcomm2014_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->reader_decimation,
int,
local_index);
*decimation = amount;
labcomm2014_scheduler_data_unlock(decimating->scheduler);
if (remote_index) {
enqueue_decimation(decimating, remote_index, amount);
}
} else {
return labcomm2014_reader_ioctl(r, action_context->next,
local_index, remote_index, signature,
action, args);
}
return 0;
}
static int wrap_reader_fill(struct labcomm_reader *r, void *context)
struct labcomm2014_reader_action decimating_reader_action = {
.alloc = wrap_reader_alloc,
.free = NULL,
.start = wrap_reader_start,
.end = NULL,
.fill = NULL,
.ioctl = wrap_reader_ioctl
};
static void register_signatures(void *context)
{
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
int result;
fprintf(stderr, "%s\n", __FUNCTION__);
fprintf(stderr, "%d\n", decimating->decimating.reader->pos);
result = orig_reader->action->fill(r, orig_reader->context);
fprintf(stderr, "%d %d\n", decimating->decimating.reader->pos, result);
return result;
}
labcomm2014_encoder_register_decimating_messages_set_decimation(
decimating->decimating.writer->encoder);
}
static int wrap_reader_ioctl(struct labcomm_reader *r, void *context,
int action,
struct labcomm_signature *signature, va_list args)
static int wrap_writer_alloc(
struct labcomm2014_writer *w,
struct labcomm2014_writer_action_context *action_context)
{
struct decimating_private *decimating = context;
struct orig_reader *orig_reader = &decimating->orig_reader;
struct decimating_private *decimating = action_context->context;
labcomm2014_scheduler_enqueue(decimating->scheduler,
0, register_signatures, decimating);
return labcomm2014_writer_alloc(w, action_context->next);
}
fprintf(stderr, "%s\n", __FUNCTION__);
if (orig_reader->action->ioctl == NULL) {
return -ENOTSUP;
static int wrap_writer_start(
struct labcomm2014_writer *w,
struct labcomm2014_writer_action_context *action_context,
int index, const struct labcomm2014_signature *signature,
void *value)
{
struct decimating_private *decimating = action_context->context;
struct decimation *decimation;
int result;
if (index < LABCOMM_USER) {
result = 0;
} else {
return orig_reader->action->ioctl(r, orig_reader->context,
action, signature, args);
labcomm2014_scheduler_data_lock(decimating->scheduler);
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->writer_decimation,
struct decimation, index);
decimation->current++;
if (decimation->current < decimation->n) {
result = -EALREADY;
} else {
decimation->current = 0;
result = 0;
}
labcomm2014_scheduler_data_unlock(decimating->scheduler);
}
if (result == 0) {
result = labcomm2014_writer_start(w, action_context->next,
index, signature, value);
}
return result;
}
struct labcomm_reader_action decimating_reader_action = {
.alloc = wrap_reader_alloc,
.free = wrap_reader_free,
.start = wrap_reader_start,
.end = wrap_reader_end,
.fill = wrap_reader_fill,
.ioctl = wrap_reader_ioctl
struct labcomm2014_writer_action decimating_writer_action = {
.alloc = wrap_writer_alloc,
.free = NULL,
.start = wrap_writer_start,
.end = NULL,
.flush = NULL,
.ioctl = NULL
};
extern struct decimating *decimating_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock)
struct decimating *decimating_new(
struct labcomm2014_reader *reader,
struct labcomm2014_writer *writer,
struct labcomm2014_error_handler *error,
struct labcomm2014_memory *memory,
struct labcomm2014_scheduler *scheduler)
{
struct decimating_private *result;
......@@ -120,11 +250,27 @@ extern struct decimating *decimating_new(
goto out_fail;
}
result->orig_reader.context = reader->context;
result->orig_reader.action = reader->action;
reader->context = result;
reader->action = &decimating_reader_action;
/* Wrap reader and writer */
result->reader_action_context.next = reader->action_context;
result->reader_action_context.action = &decimating_reader_action;
result->reader_action_context.context = result;
reader->action_context = &result->reader_action_context;
result->writer_action_context.next = writer->action_context;
result->writer_action_context.action = &decimating_writer_action;
result->writer_action_context.context = result;
writer->action_context = &result->writer_action_context;
/* Init visible result struct */
result->decimating.reader = reader;
result->decimating.writer = writer;
/* Init other fields */
result->error = error;
result->memory = memory;
result->scheduler = scheduler;
LABCOMM_SIGNATURE_ARRAY_INIT(result->writer_decimation, struct decimation);
LABCOMM_SIGNATURE_ARRAY_INIT(result->reader_decimation, int);
goto out_ok;
......
#ifndef __DECIMATING_H__
#define __DECIMATING_H__
#include <labcomm.h>
#include <labcomm_ioctl.h>
#include <labcomm_fd_reader.h>
#include <labcomm_fd_writer.h>
#include <labcomm2014.h>
#include <labcomm2014_ioctl.h>
struct decimating {
struct labcomm_reader *reader;
struct labcomm_writer *writer;
struct labcomm2014_reader *reader;
struct labcomm2014_writer *writer;
};
extern struct decimating *decimating_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock);
struct labcomm2014_reader *reader,
struct labcomm2014_writer *writer,
struct labcomm2014_error_handler *error,
struct labcomm2014_memory *memory,
struct labcomm2014_scheduler *scheduler);
#define SET_DECIMATION LABCOMM_IOSW('d',0,int)
#define GET_DECIMATION LABCOMM_IOSR('d',1,int)
#endif
sample struct {
int decimation;
byte signature[_];
} set_decimation;
\ No newline at end of file
int signature_index;
} set_decimation;
/*
introspecting.c -- LabComm example of a twoway stacked introspection
reader/writer.
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include "labcomm2014_private.h"
#include "introspecting.h"
#include "gen/introspecting_messages.h"
struct introspecting_private {
struct introspecting introspecting;
struct labcomm2014_error_handler *error;
struct labcomm2014_memory *memory;
struct labcomm2014_scheduler *scheduler;
struct labcomm2014_reader_action_context reader_action_context;
struct labcomm2014_writer_action_context writer_action_context;
LABCOMM_SIGNATURE_ARRAY_DEF(remote,
struct remote {
char *name;
int size;
uint8_t *signature;
});
LABCOMM_SIGNATURE_ARRAY_DEF(local,
struct local {
enum introspecting_status status;
const struct labcomm2014_signature *signature;
});
};
static struct local *get_local(struct introspecting_private *introspecting,
int index,
const struct labcomm2014_signature *signature)
{
/* Called with data_lock held */
struct local *local;
local = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->local,
struct local,
index);
if (local->signature == NULL) {
local->signature = signature;
local->status = introspecting_unknown;
}
if (local->status == introspecting_unknown) {
int i;
local->status = introspecting_unhandled;
LABCOMM_SIGNATURE_ARRAY_FOREACH(introspecting->remote, struct remote, i) {
struct remote *r;
r = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->remote, struct remote, i);
if (r->name &&
strcmp(signature->name, r->name) == 0 &&
r->size == signature->size &&
memcmp(signature->signature, r->signature, signature->size) == 0) {
local->status = introspecting_unregistered;
break;
}
}
}
return local;
}
static void handles_signature(
introspecting_messages_handles_signature *value,
void * context)
{
struct introspecting_private *introspecting = context;
struct remote *remote;
labcomm2014_scheduler_data_lock(introspecting->scheduler);
remote = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->remote,
struct remote,
value->index);
remote->name = strdup(value->name);
remote->signature = malloc(value->signature.n_0);
if (remote->signature) {
int i;
memcpy(remote->signature, value->signature.a, value->signature.n_0);
remote->size = value->signature.n_0;
LABCOMM_SIGNATURE_ARRAY_FOREACH(introspecting->local, struct local, i) {
struct local *l;
l = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->local, struct local, i);
if (l->signature &&
l->status == introspecting_unhandled &&
l->signature->name &&
strcmp(remote->name, l->signature->name) == 0 &&
remote->size == l->signature->size &&
memcmp(l->signature->signature, remote->signature,
l->signature->size) == 0) {
l->status = introspecting_unregistered;
}
}
}
labcomm2014_scheduler_data_unlock(introspecting->scheduler);
}
static int wrap_reader_alloc(
struct labcomm2014_reader *r,
struct labcomm2014_reader_action_context *action_context)
{
struct introspecting_private *introspecting = action_context->context;
labcomm2014_decoder_register_introspecting_messages_handles_signature(
introspecting->introspecting.reader->decoder,
handles_signature, introspecting);
return labcomm2014_reader_alloc(r, action_context->next);
}
struct handles_signature {
struct introspecting_private *introspecting;
int index;
const struct labcomm2014_signature *signature;
};
static void send_handles_signature(void *arg)
{
struct handles_signature *h = arg;
introspecting_messages_handles_signature handles_signature;
handles_signature.index = h->index;
handles_signature.name = h->signature->name;
handles_signature.signature.n_0 = h->signature->size;
handles_signature.signature.a = h->signature->signature;
labcomm2014_encode_introspecting_messages_handles_signature(
h->introspecting->introspecting.writer->encoder, &handles_signature);
}
static int wrap_reader_start(
struct labcomm2014_reader *r,
struct labcomm2014_reader_action_context *action_context,
int local_index, int remote_index, const struct labcomm2014_signature *signature,
void *value)
{
struct introspecting_private *introspecting = action_context->context;
if (value == NULL) {
struct handles_signature *handles_signature;
handles_signature = labcomm2014_memory_alloc(introspecting->memory, 1,
sizeof(*handles_signature));
handles_signature->introspecting = introspecting;
handles_signature->index = local_index;
handles_signature->signature = signature;
labcomm2014_scheduler_enqueue(introspecting->scheduler,
0, send_handles_signature, handles_signature);
}
return labcomm2014_reader_start(r, action_context->next,
local_index, remote_index, signature, value);
}
void encode_handles_signature(
struct labcomm2014_encoder *encoder,
void *context)
{
const struct labcomm2014_signature *signature = context;
introspecting_messages_handles_signature handles_signature;
int index = 0;
handles_signature.index = index;
handles_signature.name = signature->name;
handles_signature.signature.n_0 = signature->size;
handles_signature.signature.a = signature->signature;
labcomm2014_encode_introspecting_messages_handles_signature(
NULL, &handles_signature);
}
struct labcomm2014_reader_action introspecting_reader_action = {
.alloc = wrap_reader_alloc,
.free = NULL,
.start = wrap_reader_start,
.end = NULL,
.fill = NULL,
.ioctl = NULL
};
static void register_encoder_signatures(void *context)
{
struct introspecting_private *introspecting = context;
labcomm2014_encoder_register_introspecting_messages_handles_signature(
introspecting->introspecting.writer->encoder);
}
static int wrap_writer_alloc(
struct labcomm2014_writer *w,
struct labcomm2014_writer_action_context *action_context)
{
struct introspecting_private *introspecting = action_context->context;
labcomm2014_scheduler_enqueue(introspecting->scheduler,
0, register_encoder_signatures, introspecting);
return labcomm2014_writer_alloc(w, action_context->next);
}
static int wrap_writer_start(
struct labcomm2014_writer *w,
struct labcomm2014_writer_action_context *action_context,
int index, const struct labcomm2014_signature *signature,
void *value)
{
struct introspecting_private *introspecting = action_context->context;
if (index >= LABCOMM_USER && value == NULL) {
struct local *local;
labcomm2014_scheduler_data_lock(introspecting->scheduler);
local = get_local(introspecting, index, signature);
local->status = introspecting_registered;
labcomm2014_scheduler_data_unlock(introspecting->scheduler);
}
return labcomm2014_writer_start(w, action_context->next, index, signature, value);
}
static int wrap_writer_ioctl(
struct labcomm2014_writer *w,
struct labcomm2014_writer_action_context *action_context,
int index, const struct labcomm2014_signature *signature,
uint32_t ioctl_action, va_list args)
{
struct introspecting_private *introspecting = action_context->context;
switch (ioctl_action) {
case HAS_SIGNATURE: {
struct local *local;
int result;
labcomm2014_scheduler_data_lock(introspecting->scheduler);
local = get_local(introspecting, index, signature);
result = local->status;
labcomm2014_scheduler_data_unlock(introspecting->scheduler);
return result;
}
default: {
return labcomm2014_writer_ioctl(w, action_context->next, index, signature,
ioctl_action, args);
} break;
}
}
struct labcomm2014_writer_action introspecting_writer_action = {
.alloc = wrap_writer_alloc,
.free = NULL,
.start = wrap_writer_start,
.end = NULL,
.flush = NULL,
.ioctl = wrap_writer_ioctl
};
extern struct introspecting *introspecting_new(
struct labcomm2014_reader *reader,
struct labcomm2014_writer *writer,
struct labcomm2014_error_handler *error,
struct labcomm2014_memory *memory,
struct labcomm2014_scheduler *scheduler)
{
struct introspecting_private *result;
result = malloc(sizeof(*result));
if (result == NULL) {
goto out_fail;
}
/* Wrap reader and writer */
result->reader_action_context.next = reader->action_context;
result->reader_action_context.action = &introspecting_reader_action;
result->reader_action_context.context = result;
reader->action_context = &result->reader_action_context;
result->writer_action_context.next = writer->action_context;
result->writer_action_context.action = &introspecting_writer_action;
result->writer_action_context.context = result;
writer->action_context = &result->writer_action_context;
/* Init visible result struct */
result->introspecting.reader = reader;
result->introspecting.writer = writer;
/* Init other fields */
result->error = error;
result->memory = memory;
result->scheduler = scheduler;
LABCOMM_SIGNATURE_ARRAY_INIT(result->remote, struct remote);
LABCOMM_SIGNATURE_ARRAY_INIT(result->local, struct local);
goto out_ok;
out_fail:
return NULL;
out_ok:
return &result->introspecting;
}
/*
introspecting.h -- LabComm example of a twoway stacked introspection
reader/writer.
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __INTROSPECTING_H__
#define __INTROSPECTING_H__
#include <labcomm2014.h>
#include <labcomm2014_ioctl.h>
#include <labcomm2014_fd_reader.h>
#include <labcomm2014_fd_writer.h>
struct introspecting {
struct labcomm2014_reader *reader;
struct labcomm2014_writer *writer;
};
extern struct introspecting *introspecting_new(
struct labcomm2014_reader *reader,
struct labcomm2014_writer *writer,
struct labcomm2014_error_handler *error,
struct labcomm2014_memory *memory,
struct labcomm2014_scheduler *scheduler);
#define HAS_SIGNATURE LABCOMM_IOS('i',2)
enum introspecting_status { introspecting_unknown,
introspecting_unhandled,
introspecting_unregistered,
introspecting_registered };
#endif
sample struct {
int index;
string name;
byte signature[_];
} handles_signature;
typedef struct {
int index;
byte signature[_];
} has_signature_request;
typedef struct {
int index;
boolean result;
} has_signature_response;
sample has_signature_request encoder_has_signature_request;
sample has_signature_response encoder_has_signature_response;
sample has_signature_request decoder_has_signature_request;
sample has_signature_response decoder_has_signature_response;
\ No newline at end of file
/*
server.c -- LabComm example of using stacked readers/writers.
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <arpa/inet.h>
#ifndef LABCOMM_OS_DARWIN
#include <linux/tcp.h>
#else
#include <netinet/in.h>
#include <netinet/tcp.h>
#endif
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <labcomm2014_default_error_handler.h>
#include <labcomm2014_default_memory.h>
#include <labcomm2014_pthread_scheduler.h>
#include <labcomm2014_fd_reader.h>
#include <labcomm2014_fd_writer.h>
#include "decimating.h"
#include "introspecting.h"
#include "gen/types.h"
struct client {
int fd;
pthread_t main_thread;
pthread_t decoder_thread;
struct sockaddr_in adr;
unsigned int adrlen;
int32_t A, B, Sum, Diff, Product;
struct labcomm2014_decoder *decoder;
struct labcomm2014_encoder *encoder;
struct labcomm2014_scheduler *scheduler;
};
static void *run_client(void *arg)
static void handle_A(int32_t *value, void *context)
{
struct client *client = context;
client->A = *value;
}
static void handle_B(int32_t *value, void *context)
{
struct client *client = context;
int status;
client->B = *value;
status = labcomm2014_encoder_ioctl_types_Product(client->encoder, HAS_SIGNATURE);
switch (status) {
case introspecting_unregistered:
labcomm2014_encoder_register_types_Product(client->encoder);
/* fall through */
case introspecting_registered:
client->Product = client->A * client->B;
labcomm2014_encode_types_Product(client->encoder, &client->Product);
break;
default:
break;
}
client->Sum = client->A + client->B;
client->Diff = client->A - client->B;
labcomm2014_encode_types_Sum(client->encoder, &client->Sum);
labcomm2014_encode_types_Diff(client->encoder, &client->Diff);
}
static void handle_Terminate(types_Terminate *value, void *context)
{
exit(0);
}
static void *run_decoder(void *arg)
{
struct client *client = arg;
int result;
labcomm2014_decoder_register_types_A(client->decoder, handle_A, client);
labcomm2014_decoder_register_types_B(client->decoder, handle_B, client);
labcomm2014_decoder_register_types_Terminate(client->decoder, handle_Terminate,
NULL);
do {
result = labcomm2014_decoder_decode_one(client->decoder);
} while (result >= 0);
labcomm2014_scheduler_wakeup(client->scheduler);
return NULL;
}
static void *run_client(void *arg)
{
struct client *client = arg;
struct decimating *decimating;
struct introspecting *introspecting;
printf("Client start\n");
client->A = 0;
client->B = 0;
client->scheduler = labcomm2014_pthread_scheduler_new(labcomm2014_default_memory);
decimating = decimating_new(labcomm2014_fd_reader_new(labcomm2014_default_memory,
client->fd, 1),
labcomm2014_fd_writer_new(labcomm2014_default_memory,
client->fd, 0),
labcomm2014_default_error_handler,
labcomm2014_default_memory,
client->scheduler);
if (decimating == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
introspecting = introspecting_new(decimating->reader,
decimating->writer,
labcomm2014_default_error_handler,
labcomm2014_default_memory,
client->scheduler);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
client->decoder = labcomm2014_decoder_new(introspecting->reader,
labcomm2014_default_error_handler,
labcomm2014_default_memory,
client->scheduler);
client->encoder = labcomm2014_encoder_new(introspecting->writer,
labcomm2014_default_error_handler,
labcomm2014_default_memory,
client->scheduler);
pthread_t rdt;
pthread_create(&rdt, NULL, run_decoder, client);
labcomm2014_encoder_register_types_Sum(client->encoder);
labcomm2014_encoder_register_types_Diff(client->encoder);
labcomm2014_scheduler_sleep(client->scheduler, NULL);
printf("Awoken\n");
pthread_join(rdt, NULL);
out:
printf("Client end\n");
shutdown(client->fd, SHUT_RDWR);
close(client->fd);
free(client);
return NULL;
}
......@@ -52,6 +189,7 @@ int main(int argc, char *argv[])
}
while (1) {
struct client *client;
int nodelay;
client = malloc(sizeof(*client));
if (client == NULL) {
......@@ -67,6 +205,8 @@ int main(int argc, char *argv[])
result = errno;
goto failed_to_accept;
}
nodelay = 1;
setsockopt(client->fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
pthread_create(&client->main_thread, NULL, run_client, client);
}
......@@ -80,1188 +220,3 @@ no_server_socket:
}
#if 0
#include <errno.h>
#include <netinet/in.h>
#include <unistd.h>
#include <linux/tcp.h>
#include "labcomm.h"
#include "labcomm_private.h"
#include "orca_messages.h"
#include "orca_server.h"
#if 0
static long long rdtscll()
{
long long result;
__asm__ __volatile__("rdtsc" : "=A" (result));
return result;
}
static long long epoch;
static void *my_malloc(int size, int line) {
void *p = malloc(size);
if (!epoch) {
epoch = rdtscll();
}
fprintf(stderr, "malloc: %16.16Ld %p %d %d\n",
rdtscll() - epoch, p, size, line);
fflush(stderr);
return p;
}
static void my_free(void *p, int line) {
fprintf(stderr, "free: %16.16Ld %p %d\n",
rdtscll() - epoch, p, line);
fflush(stderr);
free(p);
}
#define malloc(size) my_malloc(size, __LINE__)
#define free(p) my_free(p, __LINE__)
#endif
typedef enum {
input_kind = 0,
output_kind = 1,
parameter_kind = 2,
log_kind = 3
} kind_t;
typedef struct sample {
struct orca_server *server;
labcomm_signature_t *signature;
labcomm_encode_typecast_t encode;
labcomm_decoder_typecast_t decode;
labcomm_handler_typecast_t handle;
void *handle_context;
struct {
int n_0;
struct connection **a;
} connection;
} sample_t;
typedef enum {
input_t = 0,
output_t = 1,
parameter_t = 2,
log_t = 3
} channel_kind_t;
typedef struct {
struct orca_server *context;
labcomm_decoder_t decoder;
labcomm_encoder_t encoder;
orca_channel_t orca_channel;
struct {
int n_0;
struct sample **a;
} sample;
} channel_t;
typedef struct orca_server {
orca_lock_t *lock;
orca_server_context_t *context;
int input_buffer_size;
int output_buffer_size;
int parameter_buffer_size;
int log_buffer_size;
struct {
int n_0;
struct connection **a;
} connection;
channel_t channel[4];
} orca_server_t;
typedef struct connection {
void *context;
orca_server_t *server;
struct {
int current;
int max;
} decimation;
struct labcomm_encoder *encoder;
struct labcomm_decoder *decoder;
struct {
int n_0;
struct sample **a;
} sample;
} connection_t;
static sample_t *sample_by_signature(
orca_lock_t *lock,
channel_t *channel,
labcomm_signature_t *signature,
int create)
{
sample_t *result = 0;
int i;
if (lock) { lock->lock(lock); }
for (i = 0 ; !result && i < channel->sample.n_0 ; i++) {
if (channel->sample.a[i]->signature == signature) {
result = channel->sample.a[i];
}
}
if (lock) { lock->unlock(lock); }
if (!result && create) {
int n;
sample_t **new_a, **old_a;
sample_t *new_sample;
new_a = 0;
old_a = 0;
new_sample = 0;
if (lock) { lock->lock(lock); }
while (1) {
n = channel->sample.n_0 + 1;
// Allocation without holding lock
if (lock) { lock->unlock(lock); }
// Free list from previous iteration
if (new_a) { free(new_a); }
// Try new allocation
if (!new_sample) { new_sample = malloc(sizeof(sample_t)); }
new_a = malloc(n * sizeof(*new_a));
if (lock) { lock->lock(lock); }
// Break if list length is unchanged
if (n == channel->sample.n_0 + 1) {
break;
}
}
if (new_sample && new_a) {
result = new_sample;
old_a = channel->sample.a;
channel->sample.n_0 = n;
for (i = 0 ; i < n - 1 ; i++) {
new_a[i] = old_a[i];
}
new_a[n - 1] = result;
channel->sample.a = new_a;
result->server = channel->context;
result->signature = signature;
result->connection.n_0 = 0;
result->connection.a = 0;
result->encode = 0;
result->decode = 0;
result->handle = 0;
result->handle_context = 0;
new_a = 0;
new_sample = 0;
}
if (lock) { lock->unlock(lock); }
if (old_a) { free(old_a); }
if (new_a) { free(new_a); }
if (new_sample) { free(new_sample); }
}
return result;
}
static void do_decoder_register(
struct labcomm_decoder *d,
labcomm_signature_t *signature,
labcomm_decoder_typecast_t decode,
labcomm_handler_typecast_t handle,
void *handle_context)
{
channel_t *channel = d->context;
orca_server_t *server = channel->context;
sample_t *sample;
sample = sample_by_signature(server->lock, channel, signature, 1);
if (sample) {
server->lock->lock(server->lock);
sample->decode = decode;
sample->handle = handle;
sample->handle_context = handle_context;
server->lock->unlock(server->lock);
}
}
static void do_encoder_register(
struct labcomm_encoder *e,
labcomm_signature_t *signature,
labcomm_encode_typecast_t encode)
{
channel_t *channel = e->context;
orca_server_t *server = channel->context;
sample_t *sample;
sample = sample_by_signature(server->lock, channel, signature, 1);
if (sample) {
server->lock->lock(server->lock);
sample->encode = encode;
server->lock->unlock(server->lock);
}
}
static void do_encode(
struct labcomm_encoder *encoder,
labcomm_signature_t *signature,
void *value)
{
channel_t *channel = encoder->context;
sample_t *sample;
// Lock should be held when this is called !!
sample = sample_by_signature(0, channel, signature, 0);
if (sample && sample->connection.n_0) {
int i, size, available;
size = signature->encoded_size(value);
for (i = 0 ; i < sample->connection.n_0 ; i++) {
connection_t *c = sample->connection.a[i];
if (c->decimation.current == 0) {
labcomm_writer_t *writer = &c->encoder->writer;
available = writer->write(writer, labcomm_writer_available);
if (available >= size) {
labcomm_internal_encode(sample->connection.a[i]->encoder,
signature,
value);
}
}
}
}
}
static void connect_connection_and_server(
connection_t *connection,
orca_server_t *server)
{
int n, i;
connection_t **new_a, **old_a;
new_a = 0;
old_a = 0;
server->lock->lock(server->lock);
while (1) {
n = server->connection.n_0 + 1;
// Allocation without holding lock
server->lock->unlock(server->lock);
// Free list from previous iteration
if (new_a) { free(new_a); }
// Try new allocation
new_a = malloc(n * sizeof(*new_a));
// Reclaim lock
server->lock->lock(server->lock);
// Break if list length is the desired one
if (n == server->connection.n_0 + 1) {
break;
}
}
if (new_a) {
old_a = server->connection.a;
server->connection.n_0 = n;
for (i = 0 ; i < n - 1 ; i++) {
new_a[i] = old_a[i];
}
new_a[n - 1] = connection;
server->connection.a = new_a;
new_a = 0;
}
server->lock->unlock(server->lock);
if (old_a) { free(old_a); }
if (new_a) { free(new_a); }
}
static void disconnect_connection_and_server(
connection_t *connection)
{
orca_server_t *server = connection->server;
int i, j;
server->lock->lock(server->lock);
for (i = 0, j = 0 ; i < server->connection.n_0 ; i++) {
if (server->connection.a[i] != connection) {
server->connection.a[j] = server->connection.a[i];
j++;
}
}
server->connection.n_0 = j;
server->lock->unlock(server->lock);
}
static void connect_channel_and_sample(
connection_t *connection,
sample_t *sample)
{
orca_lock_t *lock;
int connection_n, sample_n;
sample_t **new_sample_a, **old_sample_a;
connection_t **new_connection_a, **old_connection_a;
lock = connection->server->lock;
old_sample_a = 0;
old_connection_a = 0;
new_sample_a = 0;
new_connection_a = 0;
lock->lock(lock);
while (1) {
connection_n = sample->connection.n_0 + 1;
sample_n = connection->sample.n_0 + 1;
lock->unlock(lock);
// Free result from previous iteration
if (new_connection_a) { free(new_connection_a); }
if (new_sample_a) { free(new_sample_a); }
// Try new allocation
new_sample_a = malloc(sample_n * sizeof(*new_sample_a));
new_connection_a = malloc(connection_n * sizeof(*new_connection_a));
lock->lock(lock);
// Break if list length are unchanged
if (connection_n == sample->connection.n_0 + 1 &&
sample_n == connection->sample.n_0 + 1) {
break;
}
}
if (new_sample_a && new_connection_a) {
int i;
old_connection_a = sample->connection.a;
sample->connection.a = new_connection_a;
sample->connection.n_0 = connection_n;
old_sample_a = connection->sample.a;
connection->sample.a = new_sample_a;
connection->sample.n_0 = sample_n;
for (i = 0 ; i < connection_n - 1 ; i++) {
new_connection_a[i] = old_connection_a[i];
}
new_connection_a[connection_n - 1] = connection;
for (i = 0 ; i < sample_n - 1 ; i++) {
new_sample_a[i] = old_sample_a[i];
}
new_sample_a[sample_n - 1] = sample;
new_sample_a = 0;
new_connection_a = 0;
}
lock->unlock(lock);
if (new_connection_a) { free(new_connection_a); }
if (old_connection_a) { free(old_connection_a); }
if (new_sample_a) { free(new_sample_a); }
if (old_sample_a) { free(old_sample_a); }
}
static void disconnect_channel_and_sample(
connection_t *connection)
{
orca_lock_t *lock;
int i;
void *a;
lock = connection->server->lock;
lock->lock(lock);
for (i = 0 ; i < connection->sample.n_0 ; i++) {
int j, k;
sample_t *sample = connection->sample.a[i];
for (j = 0, k = 0 ; j < sample->connection.n_0 ; j++) {
if (sample->connection.a[j] != connection) {
sample->connection.a[k] = sample->connection.a[j];
k++;
}
}
sample->connection.n_0 = k;
}
a = connection->sample.a;
connection->sample.a = 0;
connection->sample.n_0 = 0;
lock->unlock(lock);
free(a);
}
orca_server_t *orca_server_new(
orca_lock_t *lock,
orca_server_context_t *context,
int input_buffer_size,
int output_buffer_size,
int parameter_buffer_size,
int log_buffer_size)
{
orca_server_t *result;
result = malloc(sizeof(orca_server_t));
if (result) {
channel_kind_t sc;
result->lock = lock;
result->context = context;
result->input_buffer_size = input_buffer_size;
result->output_buffer_size = output_buffer_size;
result->parameter_buffer_size = parameter_buffer_size;
result->log_buffer_size = log_buffer_size;
result->connection.n_0 = 0;
result->connection.a = 0;
for (sc = input_t ; sc <= log_t ; sc++) {
result->channel[sc].context = result;
result->channel[sc].decoder.context = &result->channel[sc];
result->channel[sc].decoder.reader.read = 0;
result->channel[sc].decoder.do_register = do_decoder_register;
result->channel[sc].decoder.do_decode_one = 0;
result->channel[sc].encoder.context = &result->channel[sc];
result->channel[sc].encoder.writer.write = 0;
result->channel[sc].encoder.do_register = do_encoder_register;
result->channel[sc].encoder.do_encode = do_encode;
result->channel[sc].orca_channel.decoder = &result->channel[sc].decoder ;
result->channel[sc].orca_channel.encoder = &result->channel[sc].encoder ;
result->channel[sc].sample.n_0 = 0;
result->channel[sc].sample.a = 0;
}
}
return result;
}
void orca_server_free(
struct orca_server *server)
{
channel_kind_t sc;
// What should we do about open connections?
for (sc = input_t ; sc <= log_t ; sc++) {
int i;
for (i = 0 ; i < server->channel[sc].sample.n_0 ; i++) {
free(server->channel[sc].sample.a[i]);
}
free(server->channel[sc].sample.a);
}
free(server);
printf("CHECK implementation %s\n", __FUNCTION__);
}
void orca_server_next_sample(
struct orca_server *server)
{
int i;
for (i = 0 ; i < server->connection.n_0 ; i++) {
connection_t *c = server->connection.a[i];
int max = c->decimation.max;
if (max) {
c->decimation.current++;
if (c->decimation.current >= max) {
c->decimation.current = 0;
}
}
}
}
orca_channel_t *orca_server_get_input_channel(
struct orca_server *server)
{
return &server->channel[input_t].orca_channel;
}
orca_channel_t *orca_server_get_output_channel(
struct orca_server *server)
{
return &server->channel[output_t].orca_channel;
}
orca_channel_t *orca_server_get_parameter_channel(
struct orca_server *server)
{
return &server->channel[parameter_t].orca_channel;
}
orca_channel_t *orca_server_get_log_channel(
struct orca_server *server)
{
return &server->channel[log_t].orca_channel;
}
//
// TCP specific stuff
//
typedef struct {
orca_server_t *server;
int fd;
struct sockaddr_in remote;
} tcp_setup_connection_t;
typedef struct {
int fd;
int write_pos;
int established;
int buffer_size;
} tcp_nonblocking_t;
typedef struct {
int closed;
int use_count;
connection_t connection;
int fd;
tcp_nonblocking_t write_context;
} tcp_connection_t;
static int nonblocking_fd_writer(
labcomm_writer_t *w,
labcomm_writer_action_t action)
{
int result = 0;
tcp_nonblocking_t *context = w->context;
switch (action) {
case labcomm_writer_alloc: {
int size = context->buffer_size;
w->data = malloc(size);
if (! w->data) {
result = -ENOMEM;
w->data_size = 0;
w->count = 0;
w->pos = 0;
} else {
w->data_size = size;
w->count = size;
w->pos = 0;
}
} break;
case labcomm_writer_free: {
free(w->data);
w->data = 0;
w->data_size = 0;
w->count = 0;
w->pos = 0;
} break;
case labcomm_writer_start: {
if (!context->established) {
w->pos = 0;
}
} break;
case labcomm_writer_continue: {
if (!context->established) {
result = write(context->fd, w->data, w->pos);
w->pos = 0;
} else {
printf("Buffer overrun %s\n", __FUNCTION__);
exit(1);
}
} break;
case labcomm_writer_end: {
if (!context->established) {
result = write(context->fd, w->data, w->pos);
w->pos = 0;
}
} break;
case labcomm_writer_available: {
result = w->count - w->pos;
} break;
}
return result;
}
static int fd_writer(
labcomm_writer_t *w,
labcomm_writer_action_t action)
{
int result = 0;
int *fd = w->context;
switch (action) {
case labcomm_writer_alloc: {
w->data = malloc(1000);
if (! w->data) {
result = -ENOMEM;
w->data_size = 0;
w->count = 0;
w->pos = 0;
} else {
w->data_size = 1000;
w->count = 1000;
w->pos = 0;
}
} break;
case labcomm_writer_free: {
free(w->data);
w->data = 0;
w->data_size = 0;
w->count = 0;
w->pos = 0;
} break;
case labcomm_writer_start: {
w->pos = 0;
} break;
case labcomm_writer_continue: {
result = write(*fd, w->data, w->pos);
w->pos = 0;
} break;
case labcomm_writer_end: {
result = write(*fd, w->data, w->pos);
w->pos = 0;
} break;
case labcomm_writer_available: {
result = w->count - w->pos;
} break;
}
return result;
}
static int fd_reader(
labcomm_reader_t *r,
labcomm_reader_action_t action)
{
int result = -EINVAL;
int *fd = r->context;
switch (action) {
case labcomm_reader_alloc: {
r->data = malloc(1000);
if (r->data) {
r->data_size = 1000;
result = r->data_size;
} else {
r->data_size = 0;
result = -ENOMEM;
}
r->count = 0;
r->pos = 0;
} break;
case labcomm_reader_start:
case labcomm_reader_continue: {
if (r->pos < r->count) {
result = r->count - r->pos;
} else {
int err;
r->pos = 0;
err = read(*fd, r->data, r->data_size);
if (err <= 0) {
r->count = 0;
result = -1;
} else {
r->count = err;
result = r->count - r->pos;
}
}
} break;
case labcomm_reader_end: {
result = 0;
} break;
case labcomm_reader_free: {
free(r->data);
r->data = 0;
r->data_size = 0;
r->count = 0;
r->pos = 0;
result = 0;
} break;
}
return result;
}
static tcp_connection_t *do_tcp_connect(
tcp_setup_connection_t *connection,
int port,
int buffer_size,
int decimation)
{
tcp_connection_t *result;
int fd, OK;
OK = 1;
if (OK) {
fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
printf("failed to create socket\n");
OK = 0;
}
}
if (OK) {
int reuse;
reuse = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
}
if (OK) {
struct sockaddr_in adr;
int err;
adr.sin_family = AF_INET;
adr.sin_port = 0;
adr.sin_addr.s_addr = INADDR_ANY;
err = bind(fd, (struct sockaddr*)&adr, sizeof(adr));
if (err != 0) {
printf("failed to bind socket\n");
OK = 0;
}
}
if (OK) {
struct sockaddr_in to;
int err;
to.sin_family = AF_INET;
to.sin_port = htons(port);
to.sin_addr = connection->remote.sin_addr;
err = connect(fd, (struct sockaddr*)&to, sizeof(to));
if (err != 0) {
printf("failed to connect %d@%s\n", port, inet_ntoa(to.sin_addr));
printf("%d %d %x\n", port, htons(port), port);
OK = 0;
}
}
if (OK) {
int nodelay = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
}
if (!OK && fd >= 0) {
close(fd);
fd = -1;
}
if (OK) {
result = malloc(sizeof(*result));
if (!result) {
OK = 0;
}
}
if (!OK) {
result = 0;
} else {
result->closed = 0;
result->use_count = 0;
result->connection.context = result;
result->connection.server = connection->server;
result->connection.decimation.current = decimation;
result->connection.decimation.max = decimation;
result->connection.encoder = 0;
result->connection.decoder = 0;
result->connection.sample.n_0 = 0;
result->connection.sample.a = 0;
result->fd = fd;
result->write_context.fd = fd;
result->write_context.write_pos = 0;
result->write_context.established = 0;
result->write_context.buffer_size = buffer_size;
}
return result;
}
static void do_tcp_disconnect(tcp_connection_t *tcp_connection)
{
orca_lock_t *lock = tcp_connection->connection.server->lock;
int use_count;
shutdown(tcp_connection->fd, SHUT_RDWR);
close(tcp_connection->fd);
lock->lock(lock);
tcp_connection->closed = 1;
tcp_connection->use_count--;
use_count = tcp_connection->use_count;
lock->unlock(lock);
printf("%s use %d\n", __FUNCTION__, use_count);
if (use_count == 0) {
disconnect_channel_and_sample(&tcp_connection->connection);
disconnect_connection_and_server(&tcp_connection->connection);
if (tcp_connection->connection.decoder) {
labcomm_decoder_free(tcp_connection->connection.decoder);
}
if (tcp_connection->connection.encoder) {
labcomm_encoder_free(tcp_connection->connection.encoder);
}
free(tcp_connection);
}
}
static void lock_and_handle(
void *value,
void *context)
{
sample_t *sample = context;
orca_lock_t *lock = sample->server->lock;
lock->lock(lock);
sample->handle(value, sample->handle_context);
lock->unlock(lock);
}
static int register_select_handler_tcp(
channel_t *channel,
orca_messages_select_t *v,
tcp_connection_t *tcp_connection)
{
int i;
int n = 0;
for (i = 0 ; i < v->list.n_0 ; i++) {
if (v->list.a[i] < channel->sample.n_0) {
sample_t *sample = channel->sample.a[v->list.a[i]];
n++;
if (tcp_connection->connection.decoder) {
labcomm_internal_decoder_register(tcp_connection->connection.decoder,
sample->signature,
sample->decode,
lock_and_handle,
sample);
}
if (tcp_connection->connection.encoder) {
labcomm_internal_encoder_register(tcp_connection->connection.encoder,
sample->signature,
sample->encode);
}
}
}
// Switch to non-blocking writes
tcp_connection->write_context.established = 1;
for (i = 0 ; i < v->list.n_0 ; i++) {
if (v->list.a[i] < channel->sample.n_0) {
sample_t *sample = channel->sample.a[v->list.a[i]];
connect_channel_and_sample(&tcp_connection->connection, sample);
}
}
return n;
}
static void *decoder_run_main(void *context)
{
tcp_connection_t *tcp_connection = context;
printf("Start %s %p\n", __FUNCTION__, context);
labcomm_decoder_run(tcp_connection->connection.decoder);
do_tcp_disconnect(tcp_connection);
printf("Finish %s %p\n", __FUNCTION__, context);
return 0;
}
static void *tcp_deferred_write_main(void *context)
{
tcp_connection_t *tcp_connection = context;
orca_server_t *server = tcp_connection->connection.server;
orca_lock_t *lock = tcp_connection->connection.server->lock;
printf("Start %s %p\n", __FUNCTION__, context);
while (1) {
int start, end;
server->context->await_deferred_write(server->context);
start = tcp_connection->write_context.write_pos;
end = tcp_connection->connection.encoder->writer.pos;
if (tcp_connection->closed) {
break;
} else if (end - start > 0) {
int err;
err = write(tcp_connection->write_context.fd,
&tcp_connection->connection.encoder->writer.data[start],
end - start);
if (err < 0) {
break;
} else {
tcp_connection->write_context.write_pos += err;
lock->lock(lock);
if (tcp_connection->connection.encoder->writer.pos ==
tcp_connection->write_context.write_pos) {
tcp_connection->connection.encoder->writer.pos = 0;
tcp_connection->write_context.write_pos = 0;
}
lock->unlock(lock);
}
}
}
do_tcp_disconnect(tcp_connection);
printf("Finish %s %p\n", __FUNCTION__, context);
return 0;
}
static void select_input_handler_tcp(
orca_messages_select_input *v,
void *context)
{
tcp_setup_connection_t *tcp_setup_connection = context;
orca_server_t *server = tcp_setup_connection->server;
tcp_connection_t *tcp_connection;
tcp_connection = do_tcp_connect(tcp_setup_connection, v->port,
server->input_buffer_size, 0);
if (tcp_connection) {
tcp_connection->use_count += 1;
tcp_connection->connection.decoder =
labcomm_decoder_new(fd_reader, &tcp_connection->fd);
connect_connection_and_server(&tcp_connection->connection, server);
if (register_select_handler_tcp(&server->channel[input_t],
v, tcp_connection)) {
server->context->spawn_task(decoder_run_main, tcp_connection);
} else {
do_tcp_disconnect(tcp_connection);
}
}
}
static void select_output_handler_tcp(
orca_messages_select_output *v,
void *context)
{
tcp_setup_connection_t *tcp_setup_connection = context;
orca_server_t *server = tcp_setup_connection->server;
tcp_connection_t *tcp_connection;
tcp_connection = do_tcp_connect(tcp_setup_connection, v->port,
server->output_buffer_size, v->decimation);
if (tcp_connection) {
tcp_connection->use_count += 1;
tcp_connection->connection.encoder =
labcomm_encoder_new(nonblocking_fd_writer,
&tcp_connection->write_context);
connect_connection_and_server(&tcp_connection->connection, server);
if (register_select_handler_tcp(&server->channel[output_t],
v, tcp_connection)) {
server->context->spawn_task(tcp_deferred_write_main, tcp_connection);
} else {
do_tcp_disconnect(tcp_connection);
}
}
}
static void select_parameter_handler_tcp(
orca_messages_select_parameter *v,
void *context)
{
tcp_setup_connection_t *tcp_setup_connection = context;
orca_server_t *server = tcp_setup_connection->server;
tcp_connection_t *tcp_connection;
tcp_connection = do_tcp_connect(tcp_setup_connection, v->port,
server->parameter_buffer_size, 0);
if (tcp_connection) {
tcp_connection->use_count += 2;
tcp_connection->connection.decoder =
labcomm_decoder_new(fd_reader, &tcp_connection->fd);
tcp_connection->connection.encoder =
labcomm_encoder_new(nonblocking_fd_writer,
&tcp_connection->write_context);
connect_connection_and_server(&tcp_connection->connection, server);
if (register_select_handler_tcp(&server->channel[parameter_t],
v, tcp_connection)) {
server->context->spawn_task(decoder_run_main, tcp_connection);
server->context->spawn_task(tcp_deferred_write_main, tcp_connection);
// Force current values to be sent to all clients
{
int i;
channel_t *channel = &server->channel[parameter_t];
for (i = 0 ; i < v->list.n_0 ; i++) {
if (v->list.a[i] < channel->sample.n_0) {
sample_t *sample = channel->sample.a[v->list.a[i]];
lock_and_handle(0, sample);
}
}
}
} else {
do_tcp_disconnect(tcp_connection);
}
}
}
static void select_log_handler_tcp(
orca_messages_select_log *v,
void *context)
{
tcp_setup_connection_t *tcp_setup_connection = context;
orca_server_t *server = tcp_setup_connection->server;
tcp_connection_t *tcp_connection;
tcp_connection = do_tcp_connect(tcp_setup_connection, v->port,
server->log_buffer_size, v->decimation);
if (tcp_connection) {
tcp_connection->use_count += 1;
tcp_connection->connection.encoder =
labcomm_encoder_new(nonblocking_fd_writer,
&tcp_connection->write_context);
connect_connection_and_server(&tcp_connection->connection, server);
if (register_select_handler_tcp(&server->channel[log_t],
v, tcp_connection)) {
server->context->spawn_task(tcp_deferred_write_main, tcp_connection);
} else {
do_tcp_disconnect(tcp_connection);
}
}
}
static void make_connection_list(
orca_lock_t *lock,
orca_messages_connection_list_t *dest,
channel_t *src)
{
int i;
dest->n_0 = src->sample.n_0;
dest->a = malloc(dest->n_0 * sizeof(*dest->a));
if (dest->a) {
lock->lock(lock);
for (i = 0 ; i < dest->n_0 ; i++) {
dest->a[i].index = i;
dest->a[i].name = src->sample.a[i]->signature->name;
dest->a[i].signature.n_0 = src->sample.a[i]->signature->size;
dest->a[i].signature.a = src->sample.a[i]->signature->signature;
}
lock->unlock(lock);
}
}
static void encode_orca_directory(
struct labcomm_encoder *encoder,
orca_server_t *server)
{
orca_messages_directory directory;
make_connection_list(server->lock,
&directory.input,
&server->channel[input_t]);
make_connection_list(server->lock,
&directory.output,
&server->channel[output_t]);
make_connection_list(server->lock,
&directory.parameter,
&server->channel[parameter_t]);
make_connection_list(server->lock,
&directory.log,
&server->channel[log_t]);
labcomm_encode_orca_messages_directory(encoder, &directory);
free(directory.input.a);
free(directory.output.a);
free(directory.parameter.a);
free(directory.log.a);
}
static void *tcp_setup_connection_main(void *argument)
{
tcp_setup_connection_t *tcp_setup_connection = argument;
struct labcomm_encoder *encoder;
struct labcomm_decoder *decoder;
encoder = labcomm_encoder_new(fd_writer, &tcp_setup_connection->fd);
decoder = labcomm_decoder_new(fd_reader, &tcp_setup_connection->fd);
labcomm_encoder_register_orca_messages_directory(encoder);
labcomm_decoder_register_orca_messages_select_input(
decoder, select_input_handler_tcp, tcp_setup_connection);
labcomm_decoder_register_orca_messages_select_output(
decoder, select_output_handler_tcp, tcp_setup_connection);
labcomm_decoder_register_orca_messages_select_parameter(
decoder, select_parameter_handler_tcp, tcp_setup_connection);
labcomm_decoder_register_orca_messages_select_log(
decoder, select_log_handler_tcp, tcp_setup_connection);
encode_orca_directory(encoder, tcp_setup_connection->server);
labcomm_decoder_run(decoder);
shutdown(tcp_setup_connection->fd, SHUT_RDWR);
close(tcp_setup_connection->fd);
labcomm_encoder_free(encoder);
labcomm_decoder_free(decoder);
free(tcp_setup_connection);
return 0;
}
int orca_server_run_tcp_cb(
struct orca_server *server,
int port,
int backlog,
void (*cb)(struct orca_server *server, char *message, int err))
{
int result;
int server_fd;
if (cb) { cb(server, "Listening for client connections", 0); }
while (1) {
struct sockaddr_in adr;
unsigned int adrlen;
int fd;
tcp_setup_connection_t *tcp_setup_connection;
adr.sin_family = AF_INET;
adr.sin_port = 0;
adr.sin_addr.s_addr = INADDR_ANY;
adrlen = sizeof(adr);
fd = accept(server_fd, (struct sockaddr*)&adr, &adrlen);
if (fd < 0) {
if (cb) { cb(server, "Failed to accept on socket", errno); }
result = -errno;
goto failed_to_accept;
}
tcp_setup_connection = malloc(sizeof(tcp_setup_connection_t));
if (!tcp_setup_connection) {
cb(server, "Failed to allocate client memeory", -ENOMEM);
shutdown(fd, SHUT_RDWR);
} else {
if (cb) {
char buf[128];
sprintf(buf, "Got server connection: %s %x",
inet_ntoa(adr.sin_addr), ntohs(adr.sin_port));
cb(server, buf, errno);
}
tcp_setup_connection->fd = fd;
tcp_setup_connection->remote = adr;
tcp_setup_connection->server = server;
server->context->spawn_task(tcp_setup_connection_main,
tcp_setup_connection);
}
}
failed_to_accept:
failed_to_listen:
failed_to_bind:
close(server_fd);
no_server_socket:
return result;
}
int orca_server_run_tcp(
struct orca_server *server,
int port,
int backlog)
{
return orca_server_run_tcp_cb(server, port, backlog, NULL);
}
#endif
sample int A;
sample int B;
sample int Sum;
sample int Diff;
sample int Product;
sample void Terminate;
gen
labcomm2014.dll
ExampleDecoder.exe
ExampleEncoder.exe
encoded_data_c
encoded_data_cs
encoded_data_j
encoded_data_p
example_decoder
example_encoder
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import se.lth.control.labcomm2014.DecoderChannel;
import se.lth.control.labcomm2014.TypeDef;
import se.lth.control.labcomm2014.TypeDefParser;
import se.lth.control.labcomm2014.SigTypeDef;
//import se.lth.control.labcomm2014.TypeBinding;
public class Decoder
implements twoLines.Handler,
// TypeDef.Handler,
// TypeBinding.Handler,
TypeDefParser.TypeDefListener,
twoInts.Handler,
theFirstInt.Handler,
theSecondInt.Handler,
doavoid.Handler,
intAndRef.Handler
{
private DecoderChannel decoder;
private TypeDefParser tdp;
public Decoder(InputStream in)
throws Exception
{
decoder = new DecoderChannel(in);
doavoid.register(decoder, this);
twoInts.register(decoder, this);
twoLines.register(decoder, this);
theFirstInt.register(decoder, this);
theSecondInt.register(decoder, this);
intAndRef.register(decoder, this);
doavoid.registerSampleRef(decoder);
this.tdp = TypeDefParser.registerTypeDefParser(decoder);
// TypeDef.register(decoder, this);
// TypeBinding.register(decoder, this);
tdp.addListener(this);
try {
System.out.println("Running decoder.");
decoder.run();
} catch (java.io.EOFException e) {
System.out.println("Decoder reached end of file.");
}
}
private String genPoint(point p) {
return "("+p.x.val+", "+p.y.val+")";
}
private String genLine(line l) {
return "Line from "+genPoint(l.start)+" to "+genPoint(l.end);
}
// public void handle_TypeDef(TypeDef d) throws java.io.IOException {
// System.out.println("Got TypeDef: "+d.getName()+"("+d.getIndex()+")");
// }
//
// public void handle_TypeBinding(TypeBinding d) throws java.io.IOException {
// System.out.println("Got TypeBinding: "+d.getSampleIndex()+" --> "+d.getTypeIndex()+"");
// }
public void onTypeDef(SigTypeDef d) {
System.out.println("ontype_def: ");
if(d != null) {
System.out.print((d.isSampleDef()?"sample ":"typedef ")+d);
System.out.println(" "+d.getName()+";");
} else {
System.out.println(" null???");
}
//for(byte b: d.getSignature()) {
// System.out.print(Integer.toHexString(b)+" ");
//}
//System.out.println();
//try {
// tdp.parseSignature(d.getIndex());
//} catch(IOException ex) { ex.printStackTrace();}
}
public void handle_doavoid() throws java.io.IOException {
System.out.println("Got a void.");
}
public void handle_twoInts(twoInts d) throws java.io.IOException {
System.out.print("Got twoInts: ");
System.out.println(d.a +", "+d.b);
}
public void handle_theFirstInt(int d) throws java.io.IOException {
System.out.println("Got theFirstInt: "+d);
}
public void handle_theSecondInt(int d) throws java.io.IOException {
System.out.println("Got theSecondInt: "+d);
}
public void handle_intAndRef(intAndRef d) throws java.io.IOException {
System.out.println("Got intAndRef: "+d.x+", "+d.reference);
}
public void handle_twoLines(twoLines d) throws java.io.IOException {
System.out.print("Got twoLines: ");
System.out.println("Line l1: "+genLine(d.l1));
System.out.println(" Line l2: "+genLine(d.l2));
}
public static void main(String[] arg) throws Exception {
Decoder example = new Decoder(
new FileInputStream(new File(arg[0]))
);
}
}
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import se.lth.control.labcomm2014.EncoderChannel;
/**
* Simple encoder
*/
public class Encoder
{
EncoderChannel encoder;
public Encoder(OutputStream out)
throws Exception
{
encoder = new EncoderChannel(out);
doavoid.register(encoder);
twoInts.register(encoder);
twoLines.register(encoder);
theFirstInt.register(encoder);
theSecondInt.register(encoder);
intAndRef.register(encoder);
doavoid.registerSampleRef(encoder);
}
public void doEncode() throws java.io.IOException {
System.out.println("Encoding doavoid");
doavoid.encode(encoder);
intAndRef iar = new intAndRef();
iar.x = 17;
iar.reference = doavoid.class;
System.out.println("Encoding intAndRef");
intAndRef.encode(encoder, iar);
twoInts ti = new twoInts();
ti.a = 12;
ti.b = 21;
System.out.println("Encoding twoInts");
twoInts.encode(encoder, ti);
System.out.println("Encoding the Ints");
theFirstInt.encode(encoder, 71);
theSecondInt.encode(encoder, 24);
twoLines x = new twoLines();
line l1 = new line();
point p11 = new point();
coord c11x = new coord();
coord c11y = new coord();
c11x.val = 11;
c11y.val = 99;
p11.x = c11x;
p11.y = c11y;
l1.start = p11;
point p12 = new point();
coord c12x = new coord();
coord c12y = new coord();
c12x.val = 22;
c12y.val = 88;
p12.x = c12x;
p12.y = c12y;
l1.end = p12;
line l2 = new line();
point p21 = new point();
coord c21x = new coord();
coord c21y = new coord();
c21x.val = 17;
c21y.val = 42;
p21.x = c21x;
p21.y = c21y;
l2.start = p21;
point p22 = new point();
coord c22x = new coord();
coord c22y = new coord();
c22x.val = 13;
c22y.val = 37;
p22.x = c22x;
p22.y = c22y;
l2.end = p22;
foo f = new foo();
f.a = 10;
f.b = 20;
f.c = false;
x.l1 = l1;
x.l2 = l2;
x.f = f;
System.out.println("Encoding theTwoLines");
twoLines.encode(encoder, x);
}
public static void main(String[] arg) throws Exception {
FileOutputStream fos = new FileOutputStream(arg[0]);
Encoder example = new Encoder(fos);
example.doEncode();
fos.close();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using se.lth.control.labcomm2014;
namespace user_types
{
class Decoder : twoLines.Handler,
twoInts.Handler,
theFirstInt.Handler,
theSecondInt.Handler,
doavoid.Handler,
intAndRef.Handler
{
DecoderChannel dec;
public Decoder(Stream stream)
{
dec = new DecoderChannel(stream);
twoLines.register(dec, this);
twoInts.register(dec, this);
theFirstInt.register(dec, this);
theSecondInt.register(dec, this);
doavoid.register(dec, this);
intAndRef.register(dec, this);
doavoid.registerSampleRef(dec);
try
{
Console.WriteLine("Running decoder.");
dec.run();
}
catch (EndOfStreamException)
{
Console.WriteLine("EOF reached");
}
}
private string genPoint(point p)
{
return "(" + p.x.val + ", " + p.y.val + ")";
}
private String genLine(line l)
{
return "Line from " + genPoint(l.start) + " to " + genPoint(l.end);
}
public void handle(twoLines d)
{
Console.WriteLine("Got twoLines: ");
Console.WriteLine("Line l1: "+genLine(d.l1));
Console.WriteLine("Line l2: " + genLine(d.l2));
}
public void handle(twoInts d)
{
Console.WriteLine("Got twoInts: ");
Console.WriteLine("a: "+d.a);
Console.WriteLine("b: "+d.b);
}
void theFirstInt.Handler.handle(int d)
{
Console.WriteLine("Got theFirstInt: "+d);
}
void theSecondInt.Handler.handle(int d)
{
Console.WriteLine("Got theSecondInt: "+d);
}
void doavoid.Handler.handle()
{
Console.WriteLine("Got a void.");
}
void intAndRef.Handler.handle(intAndRef d)
{
Console.WriteLine("Got intAndRef: "+d.x+" : "+d.reference);
}
static void Main(string[] args)
{
new Decoder(new FileStream(args[0], FileMode.Open));
}
}
}