Commit 2270c9ec authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Major code reorganization.

New scheduler abstraction in place.
parent f00568a0
......@@ -2,6 +2,9 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <labcomm_fd_reader.h>
#include <labcomm_default_error_handler.h>
#include <labcomm_default_memory.h>
#include <labcomm_default_scheduler.h>
#include "gen/simple.h"
#include <stdio.h>
......@@ -58,7 +61,9 @@ int main(int argc, char *argv[]) {
fd = open(filename, O_RDONLY);
decoder = labcomm_decoder_new(labcomm_fd_reader_new(
labcomm_default_memory, fd, 1),
NULL, labcomm_default_memory);
labcomm_default_error_handler,
labcomm_default_memory,
labcomm_default_scheduler);
if (!decoder) {
printf("Failed to allocate decoder %s:%d\n", __FUNCTION__, __LINE__);
return 1;
......
......@@ -2,6 +2,9 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <labcomm_fd_writer.h>
#include <labcomm_default_error_handler.h>
#include <labcomm_default_memory.h>
#include <labcomm_default_scheduler.h>
#include "gen/simple.h"
#include <stdio.h>
......@@ -14,7 +17,9 @@ int main(int argc, char *argv[]) {
fd = open(filename, O_WRONLY|O_CREAT|O_TRUNC, 0644);
encoder = labcomm_encoder_new(labcomm_fd_writer_new(
labcomm_default_memory, fd, 1),
NULL, labcomm_default_memory);
labcomm_default_error_handler,
labcomm_default_memory,
labcomm_default_scheduler);
labcomm_encoder_register_simple_theTwoInts(encoder);
labcomm_encoder_register_simple_anotherTwoInts(encoder);
labcomm_encoder_register_simple_IntString(encoder);
......
......@@ -33,7 +33,9 @@
#include <labcomm.h>
#include <labcomm_fd_reader.h>
#include <labcomm_fd_writer.h>
#include <labcomm_pthread_mutex_lock.h>
#include <labcomm_default_error_handler.h>
#include <labcomm_default_memory.h>
#include <labcomm_pthread_scheduler.h>
#include "decimating.h"
#include "introspecting.h"
#include "gen/types.h"
......@@ -74,7 +76,7 @@ int main(int argc, char *argv[])
struct introspecting *introspecting;
char *hostname;
int port;
struct labcomm_lock *lock;
struct labcomm_scheduler *scheduler;
struct labcomm_decoder *decoder;
struct labcomm_encoder *encoder;
int32_t i, j;
......@@ -114,29 +116,35 @@ int main(int argc, char *argv[])
nodelay = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
lock = labcomm_pthread_mutex_lock_new(labcomm_default_memory);
scheduler = labcomm_pthread_scheduler_new(labcomm_default_memory);
decimating = decimating_new(labcomm_fd_reader_new(labcomm_default_memory,
fd, 1),
labcomm_fd_writer_new(labcomm_default_memory,
fd, 0),
lock,
labcomm_default_memory);
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
if (decimating == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
introspecting = introspecting_new(decimating->reader,
decimating->writer,
lock,
labcomm_default_memory);
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
decoder = labcomm_decoder_new(introspecting->reader, lock,
labcomm_default_memory);
encoder = labcomm_encoder_new(introspecting->writer, lock,
labcomm_default_memory);
decoder = labcomm_decoder_new(introspecting->reader,
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
encoder = labcomm_encoder_new(introspecting->writer,
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
pthread_t rdt;
pthread_create(&rdt, NULL, run_decoder, decoder);
labcomm_encoder_register_types_A(encoder);
......
......@@ -29,8 +29,9 @@
struct decimating_private {
struct decimating decimating;
struct labcomm_error_handler *error;
struct labcomm_memory *memory;
struct labcomm_lock *lock;
struct labcomm_scheduler *scheduler;
int encoder_initialized;
struct labcomm_reader_action_context reader_action_context;
struct labcomm_writer_action_context writer_action_context;
......@@ -59,19 +60,19 @@ static void set_decimation(
static int wrap_reader_alloc(
struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
struct labcomm_decoder *decoder,
char *labcomm_version)
{
int result;
struct decimating_private *decimating = action_context->context;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__);
/* Stash away decoder for later use */
result = labcomm_reader_alloc(r, action_context->next,
decoder, labcomm_version);
result = labcomm_reader_alloc(r, action_context->next, labcomm_version);
fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__);
labcomm_decoder_register_decimating_messages_set_decimation(
decoder, set_decimation, decimating);
r->decoder, set_decimation, decimating);
fprintf(stderr, "%s %s %d\n", __FILE__, __FUNCTION__, __LINE__);
return result;
}
......@@ -110,25 +111,26 @@ struct labcomm_reader_action decimating_reader_action = {
.ioctl = wrap_reader_ioctl
};
static void register_signatures(struct labcomm_encoder *encoder,
void *context)
static void register_signatures(void *context)
{
struct decimating_private *decimating = context;
labcomm_encoder_register_decimating_messages_set_decimation(
encoder);
decimating->decimating.writer->encoder);
}
static int wrap_writer_alloc(
struct labcomm_writer *w,
struct labcomm_writer_action_context *action_context,
struct labcomm_encoder *encoder, char *labcomm_version,
labcomm_encoder_enqueue enqueue)
char *labcomm_version)
{
struct decimating_private *decimating = action_context->context;
int result;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
result = labcomm_writer_alloc(w, action_context->next,
encoder, labcomm_version, enqueue);
enqueue(encoder, register_signatures, NULL);
result = labcomm_writer_alloc(w, action_context->next, labcomm_version);
labcomm_scheduler_enqueue(decimating->scheduler,
0, register_signatures, decimating);
return result;
}
......@@ -164,11 +166,12 @@ struct labcomm_writer_action decimating_writer_action = {
.ioctl = NULL
};
extern struct decimating *decimating_new(
struct decimating *decimating_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock,
struct labcomm_memory *memory)
struct labcomm_error_handler *error,
struct labcomm_memory *memory,
struct labcomm_scheduler *scheduler)
{
struct decimating_private *result;
......@@ -193,8 +196,9 @@ extern struct decimating *decimating_new(
result->decimating.writer = writer;
/* Init other fields */
result->lock = lock;
result->error = error;
result->memory = memory;
result->scheduler = scheduler;
LABCOMM_SIGNATURE_ARRAY_INIT(result->decimation, struct decimation);
goto out_ok;
......
......@@ -12,8 +12,9 @@ struct decimating {
extern struct decimating *decimating_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock,
struct labcomm_memory *memory);
struct labcomm_error_handler *error,
struct labcomm_memory *memory,
struct labcomm_scheduler *scheduler);
#define SET_DECIMATION LABCOMM_IOSW('d',0,int)
......
......@@ -30,13 +30,10 @@
enum status {unknown, unhandled, unregistered, registered};
struct introspecting_private {
struct introspecting introspecting;
struct labcomm_lock *lock;
struct labcomm_error_handler *error;
struct labcomm_memory *memory;
struct labcomm_scheduler *scheduler;
struct labcomm_encoder *encoder;
int encoder_initialized;
struct labcomm_decoder *decoder;
int decoder_initialized;
struct labcomm_reader_action_context reader_action_context;
struct labcomm_writer_action_context writer_action_context;
LABCOMM_SIGNATURE_ARRAY_DEF(remote,
......@@ -93,51 +90,60 @@ static void handles_signature(
static int wrap_reader_alloc(
struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
struct labcomm_decoder *decoder,
char *labcomm_version)
{
int result;
struct introspecting_private *introspecting = action_context->context;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
/* Stash away decoder for later use */
introspecting->decoder = decoder;
result = labcomm_reader_alloc(r, action_context->next,
decoder, labcomm_version);
result = labcomm_reader_alloc(r, action_context->next, labcomm_version);
labcomm_decoder_register_introspecting_messages_handles_signature(
introspecting->decoder, handles_signature, introspecting);
introspecting->decoder_initialized = 1;
introspecting->introspecting.reader->decoder,
handles_signature, introspecting);
return result;
}
struct handles_signature {
struct introspecting_private *introspecting;
int index;
struct labcomm_signature *signature;
};
static void send_handles_signature(void *arg)
{
struct handles_signature *h = arg;
introspecting_messages_handles_signature handles_signature;
handles_signature.index = h->index;
handles_signature.name = h->signature->name;
handles_signature.signature.n_0 = h->signature->size;
handles_signature.signature.a = h->signature->signature;
labcomm_encode_introspecting_messages_handles_signature(
h->introspecting->introspecting.writer->encoder, &handles_signature);
}
static int wrap_reader_start(
struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
int index, struct labcomm_signature *signature,
int local_index, int remote_index, struct labcomm_signature *signature,
void *value)
{
struct introspecting_private *introspecting = action_context->context;
int result;
result = labcomm_reader_start(r, action_context->next, index,
signature, value);
if (value == NULL) {
introspecting_messages_handles_signature handles_signature;
struct handles_signature *handles_signature;
handles_signature = labcomm_memory_alloc(introspecting->memory, 1,
sizeof(*handles_signature));
handles_signature->introspecting = introspecting;
handles_signature->index = local_index;
handles_signature->signature = signature;
labcomm_scheduler_enqueue(introspecting->scheduler,
0, send_handles_signature, handles_signature);
labcomm_lock_acquire(introspecting->lock);
while (introspecting->encoder == NULL) {
/* Wait for the encoder to become functional */
labcomm_lock_wait(introspecting->lock, 1000000);
}
labcomm_lock_release(introspecting->lock);
handles_signature.index = index;
handles_signature.name = signature->name;
handles_signature.signature.n_0 = signature->size;
handles_signature.signature.a = signature->signature;
labcomm_encode_introspecting_messages_handles_signature(
introspecting->encoder, &handles_signature);
}
return result;
return labcomm_reader_start(r, action_context->next,
local_index, remote_index, signature, value);
}
void encode_handles_signature(
......@@ -167,32 +173,25 @@ struct labcomm_reader_action introspecting_reader_action = {
.ioctl = NULL
};
static void register_signatures(struct labcomm_encoder *encoder,
void *context)
static void register_encoder_signatures(void *context)
{
struct introspecting_private *introspecting = context;
labcomm_encoder_register_introspecting_messages_handles_signature(
encoder);
introspecting->introspecting.writer->encoder);
}
static int wrap_writer_alloc(
struct labcomm_writer *w,
struct labcomm_writer_action_context *action_context,
struct labcomm_encoder *encoder, char *labcomm_version,
labcomm_encoder_enqueue enqueue)
char *labcomm_version)
{
int result;
struct introspecting_private *introspecting = action_context->context;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
/* Stash away encoder for later use */
labcomm_lock_acquire(introspecting->lock);
introspecting->encoder = encoder;
labcomm_lock_notify(introspecting->lock);
labcomm_lock_release(introspecting->lock);
result = labcomm_writer_alloc(w, action_context->next,
encoder, labcomm_version, enqueue);
enqueue(encoder, register_signatures, NULL);
return result;
labcomm_scheduler_enqueue(introspecting->scheduler,
0, register_encoder_signatures, introspecting);
return labcomm_writer_alloc(w, action_context->next, labcomm_version);
}
static int wrap_writer_start(
......@@ -249,8 +248,9 @@ struct labcomm_writer_action introspecting_writer_action = {
extern struct introspecting *introspecting_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock,
struct labcomm_memory *memory)
struct labcomm_error_handler *error,
struct labcomm_memory *memory,
struct labcomm_scheduler *scheduler)
{
struct introspecting_private *result;
......@@ -275,12 +275,9 @@ extern struct introspecting *introspecting_new(
result->introspecting.writer = writer;
/* Init other fields */
result->lock = lock;
result->error = error;
result->memory = memory;
result->encoder = NULL;
result->encoder_initialized = 0;
result->decoder = NULL;
result->decoder_initialized = 0;
result->scheduler = scheduler;
LABCOMM_SIGNATURE_ARRAY_INIT(result->remote, struct remote);
LABCOMM_SIGNATURE_ARRAY_INIT(result->local, struct local);
......
......@@ -36,8 +36,9 @@ struct introspecting {
extern struct introspecting *introspecting_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock,
struct labcomm_memory *memory);
struct labcomm_error_handler *error,
struct labcomm_memory *memory,
struct labcomm_scheduler *scheduler);
#define HAS_SIGNATURE LABCOMM_IOS('i',2)
......
......@@ -27,7 +27,9 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <labcomm_pthread_mutex_lock.h>
#include <labcomm_default_error_handler.h>
#include <labcomm_default_memory.h>
#include <labcomm_pthread_scheduler.h>
#include <labcomm_fd_reader.h>
#include <labcomm_fd_writer.h>
#include "decimating.h"
......@@ -89,34 +91,40 @@ static void *run_client(void *arg)
struct client *client = arg;
struct decimating *decimating;
struct introspecting *introspecting;
struct labcomm_lock *lock;
struct labcomm_scheduler *scheduler;
printf("Client start\n");
client->A = 0;
client->B = 0;
lock = labcomm_pthread_mutex_lock_new(labcomm_default_memory);
scheduler = labcomm_pthread_scheduler_new(labcomm_default_memory);
decimating = decimating_new(labcomm_fd_reader_new(labcomm_default_memory,
client->fd, 1),
labcomm_fd_writer_new(labcomm_default_memory,
client->fd, 0),
lock,
labcomm_default_memory);
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
if (decimating == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
introspecting = introspecting_new(decimating->reader,
decimating->writer,
lock,
labcomm_default_memory);
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
client->decoder = labcomm_decoder_new(introspecting->reader, lock,
labcomm_default_memory);
client->encoder = labcomm_encoder_new(introspecting->writer, lock,
labcomm_default_memory);
client->decoder = labcomm_decoder_new(introspecting->reader,
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
client->encoder = labcomm_encoder_new(introspecting->writer,
labcomm_default_error_handler,
labcomm_default_memory,
scheduler);
pthread_t rdt;
pthread_create(&rdt, NULL, run_decoder, client);
labcomm_encoder_register_types_Sum(client->encoder);
......
......@@ -7,12 +7,16 @@ LDFLAGS=-L.
#LDLIBS_TEST=-Tlabcomm.linkscript -lcunit -llabcomm
LDLIBS_TEST=-lcunit -llabcomm -Tlabcomm.linkscript -lrt
OBJS=labcomm_memory.o labcomm_default_memory.o \
OBJS=labcomm_memory.o \
labcomm_default_error_handler.o \
labcomm_default_memory.o \
labcomm_default_scheduler.o \
labcomm_time.o labcomm_scheduler.o \
labcomm_encoder.o labcomm_decoder.o \
labcomm.o \
labcomm_dynamic_buffer_writer.o labcomm_fd_reader.o labcomm_fd_writer.o \
labcomm_pthread_scheduler.o \
labcomm_pthread_mutex_lock.o
labcomm_signature_gnu_ld_tricks.o
#FIXME: labcomm_mem_reader.o labcomm_mem_writer.o
LABCOMM_JAR=../../compiler/labComm.jar
......
This diff is collapsed.
......@@ -26,7 +26,8 @@
#include <stdarg.h>
#include <stdint.h>
#include <unistd.h>
#include <labcomm_error.h>
#include "labcomm_error.h"
#include "labcomm_scheduler.h"
/* Forward declaration */
struct labcomm_encoder;
......@@ -79,21 +80,6 @@ typedef int (*labcomm_handle_new_datatype_callback)(
void labcomm_decoder_register_new_datatype_handler(struct labcomm_decoder *d,
labcomm_handle_new_datatype_callback on_new_datatype);
/*
* Locking support (optional), if not used only a single thread
* may access an encoder or decoder at the same time.
*/
struct labcomm_lock;
int labcomm_lock_free(struct labcomm_lock *lock);
int labcomm_lock_acquire(struct labcomm_lock *lock);
int labcomm_lock_release(struct labcomm_lock *lock);
int labcomm_lock_wait(struct labcomm_lock *lock, useconds_t usec);
int labcomm_lock_notify(struct labcomm_lock *lock);
int labcomm_lock_sleep_epoch(struct labcomm_lock *lock);
int labcomm_lock_sleep_add(struct labcomm_lock *lock, useconds_t usec);
int labcomm_lock_sleep(struct labcomm_lock *lock);
/*
* Dynamic memory handling
* lifetime == 0 memory that will live for as long as the
......@@ -109,8 +95,6 @@ void *labcomm_memory_realloc(struct labcomm_memory *m, int lifetime,
void *ptr, size_t size);
void labcomm_memory_free(struct labcomm_memory *m, int lifetime, void *ptr);
extern struct labcomm_memory *labcomm_default_memory;
/*
* Decoder
*/
......@@ -118,14 +102,15 @@ struct labcomm_reader;
struct labcomm_decoder *labcomm_decoder_new(
struct labcomm_reader *reader,
struct labcomm_lock *lock,
struct labcomm_memory *memory);
struct labcomm_error_handler *error,
struct labcomm_memory *memory,
struct labcomm_scheduler *scheduler);
void labcomm_decoder_free(
struct labcomm_decoder *decoder);
int labcomm_decoder_decode_one(
struct labcomm_decoder *decoder);
void labcomm_decoder_run(
struct labcomm_decoder *decoder);
void labcomm_decoder_free(
struct labcomm_decoder *decoder);
/* See labcomm_ioctl.h for predefined ioctl_action values */
int labcomm_decoder_ioctl(struct labcomm_decoder *decoder,
......@@ -139,8 +124,9 @@ struct labcomm_writer;
struct labcomm_encoder *labcomm_encoder_new(
struct labcomm_writer *writer,
struct labcomm_lock *lock,
struct labcomm_memory *memory);
struct labcomm_error_handler *error,
struct labcomm_memory *memory,
struct labcomm_scheduler *scheduler);
void labcomm_encoder_free(
struct labcomm_encoder *encoder);
......
/*
labcomm_decoder.c -- runtime for handling decoding of labcomm samples.
Copyright 2006-2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define LABCOMM_VERSION "LabComm2013"
#include <errno.h>
#include "labcomm.h"
#include "labcomm_private.h"
#include "labcomm_signature.h"
#include "labcomm_ioctl.h"
#include "labcomm_dynamic_buffer_writer.h"
struct sample_entry {
int remote_index;
struct labcomm_signature *signature;
labcomm_decoder_function decode;
labcomm_handler_function handler;
void *context;
};
struct labcomm_decoder {
struct labcomm_reader *reader;
struct labcomm_error_handler *error;
struct labcomm_memory *memory;
struct labcomm_scheduler *scheduler;
labcomm_error_handler_callback on_error;
labcomm_handle_new_datatype_callback on_new_datatype;
LABCOMM_SIGNATURE_ARRAY_DEF(local, struct sample_entry);
LABCOMM_SIGNATURE_ARRAY_DEF(remote_to_local, int);
};
struct labcomm_decoder *labcomm_decoder_new(
struct labcomm_reader *reader,
struct labcomm_error_handler *error,
struct labcomm_memory *memory,
struct labcomm_scheduler *scheduler)
{
struct labcomm_decoder *result;
result = labcomm_memory_alloc(memory, 0, sizeof(*result));
if (result) {
result->reader = reader;
result->reader->decoder = result;
result->reader->data = 0;
result->reader->data_size = 0;
result->reader->count = 0;
result->reader->pos = 0;
result->reader->error = 0;
result->error = error;