Commit bb66c0de authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Changed memory handling to a pluggable architecture.

parent 0e50a779
......@@ -538,7 +538,9 @@ aspect C_Decoder {
}
public void VariableArrayType.C_emitDecoderArrayAllocate(C_env env) {
env.print(env.qualid + ".a = malloc(sizeof(" + env.qualid + ".a[0])");
env.print(env.qualid +
".a = labcomm_memory_alloc(r->memory, 1, sizeof(" +
env.qualid + ".a[0])");
for (int i = 0 ; i < getNumExp() ; i++) {
env.print(" * " + getExp(i).C_getLimit(env, i));
}
......@@ -555,7 +557,8 @@ aspect C_Decoder {
public void PrimType.C_emitDecoderDeallocation(C_env env) {
if (C_isDynamic()) {
env.println("free(" + env.qualid + ");");
env.println("labcomm_memory_free(r->memory, 1, " +
env.qualid + ");");
}
}
......@@ -600,7 +603,8 @@ aspect C_Decoder {
public void VariableArrayType.C_emitDecoderDeallocation(C_env env) {
super.C_emitDecoderDeallocation(env);
env.println("free(" + env.qualid + ".a);");
env.println("labcomm_memory_free(r->memory, 1, " +
env.qualid + ".a);");
}
public void Field.C_emitDecoderDeallocation(C_env env) {
......
......@@ -56,7 +56,9 @@ int main(int argc, char *argv[]) {
char *filename = argv[1];
printf("C decoder reading from %s\n", filename);
fd = open(filename, O_RDONLY);
decoder = labcomm_decoder_new(labcomm_fd_reader_new(fd, 1), NULL);
decoder = labcomm_decoder_new(labcomm_fd_reader_new(
labcomm_default_memory, fd, 1),
NULL, labcomm_default_memory);
if (!decoder) {
printf("Failed to allocate decoder %s:%d\n", __FUNCTION__, __LINE__);
return 1;
......
......@@ -12,7 +12,9 @@ int main(int argc, char *argv[]) {
char *filename = argv[1];
printf("C encoder writing to %s\n", filename);
fd = open(filename, O_WRONLY|O_CREAT|O_TRUNC, 0644);
encoder = labcomm_encoder_new(labcomm_fd_writer_new(fd, 1), NULL);
encoder = labcomm_encoder_new(labcomm_fd_writer_new(
labcomm_default_memory, fd, 1),
NULL, labcomm_default_memory);
labcomm_encoder_register_simple_theTwoInts(encoder);
labcomm_encoder_register_simple_anotherTwoInts(encoder);
labcomm_encoder_register_simple_IntString(encoder);
......
......@@ -42,11 +42,11 @@ gen/introspecting.o: introspecting.h
gen/introspecting.o: gen/introspecting_messages.h
gen/client.o: decimating.h
gen/client.o: gen/types.h
gen/client: gen/types.o
gen/client: gen/decimating.o
gen/client: gen/decimating_messages.o
gen/client: gen/introspecting.o
gen/client: gen/introspecting_messages.o
gen/client: gen/types.o
gen/server: gen/types.o
gen/server: gen/decimating.o
gen/server: gen/decimating_messages.o
......
......@@ -53,6 +53,8 @@ static void *run_decoder(void *context)
struct labcomm_decoder *decoder = context;
int result;
labcomm_decoder_register_types_Sum(decoder, handle_Sum, NULL);
labcomm_decoder_register_types_Diff(decoder, handle_Diff, NULL);
do {
result = labcomm_decoder_decode_one(decoder);
} while (result >= 0);
......@@ -112,34 +114,35 @@ int main(int argc, char *argv[])
nodelay = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
lock = labcomm_pthread_mutex_lock_new();
decimating = decimating_new(labcomm_fd_reader_new(fd, 1),
labcomm_fd_writer_new(fd, 0),
lock);
lock = labcomm_pthread_mutex_lock_new(labcomm_default_memory);
decimating = decimating_new(labcomm_fd_reader_new(labcomm_default_memory,
fd, 1),
labcomm_fd_writer_new(labcomm_default_memory,
fd, 0),
lock,
labcomm_default_memory);
if (decimating == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
introspecting = introspecting_new(decimating->reader,
decimating->writer,
lock);
lock,
labcomm_default_memory);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
decoder = labcomm_decoder_new(introspecting->reader, lock);
encoder = labcomm_encoder_new(introspecting->writer, lock);
decoder = labcomm_decoder_new(introspecting->reader, lock,
labcomm_default_memory);
encoder = labcomm_encoder_new(introspecting->writer, lock,
labcomm_default_memory);
pthread_t rdt;
labcomm_decoder_register_types_Sum(decoder, handle_Sum, NULL);
labcomm_decoder_register_types_Diff(decoder, handle_Diff, NULL);
pthread_create(&rdt, NULL, run_decoder, decoder);
labcomm_encoder_register_types_A(encoder);
labcomm_encoder_register_types_B(encoder);
labcomm_encoder_register_types_Terminate(encoder);
usleep(100000);
err = labcomm_decoder_ioctl_types_Sum(decoder, SET_DECIMATION, 2);
err = labcomm_decoder_ioctl_types_Diff(decoder, SET_DECIMATION, 4);
......@@ -151,7 +154,7 @@ int main(int argc, char *argv[])
sleep(1);
}
}
labcomm_encode_types_Terminate(encoder, NULL);
labcomm_encode_types_Terminate(encoder, LABCOMM_VOID);
out:
return 0;
......
......@@ -29,8 +29,8 @@
struct decimating_private {
struct decimating decimating;
struct labcomm_memory *memory;
struct labcomm_lock *lock;
struct labcomm_encoder *encoder;
int encoder_initialized;
struct labcomm_reader_action_context reader_action_context;
struct labcomm_writer_action_context writer_action_context;
......@@ -48,7 +48,8 @@ static void set_decimation(
struct decimating_private *decimating = context;
struct decimation *decimation;
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->decimation,
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->decimation,
struct decimation,
value->signature_index);
decimation->n = value->decimation;
......@@ -92,7 +93,7 @@ static int wrap_reader_ioctl(
decimation.signature_index = signature_index;
va_end(va);
return labcomm_encode_decimating_messages_set_decimation(
decimating->encoder, &decimation);
decimating->decimating.writer->encoder, &decimation);
} else {
return labcomm_reader_ioctl(r, action_context->next,
signature_index, signature, action, args);
......@@ -123,11 +124,8 @@ static int wrap_writer_alloc(
labcomm_encoder_enqueue enqueue)
{
int result;
struct decimating_private *decimating = action_context->context;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
/* Stash away encoder for later use */
decimating->encoder = encoder;
result = labcomm_writer_alloc(w, action_context->next,
encoder, labcomm_version, enqueue);
enqueue(encoder, register_signatures, NULL);
......@@ -144,7 +142,8 @@ static int wrap_writer_start(
struct decimating_private *decimating = action_context->context;
struct decimation *decimation;
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->decimation,
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->memory,
decimating->decimation,
struct decimation, index);
decimation->current++;
if (decimation->current < decimation->n) {
......@@ -168,7 +167,8 @@ struct labcomm_writer_action decimating_writer_action = {
extern struct decimating *decimating_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock)
struct labcomm_lock *lock,
struct labcomm_memory *memory)
{
struct decimating_private *result;
......@@ -194,8 +194,7 @@ extern struct decimating *decimating_new(
/* Init other fields */
result->lock = lock;
result->encoder = NULL;
result->encoder_initialized = 0;
result->memory = memory;
LABCOMM_SIGNATURE_ARRAY_INIT(result->decimation, struct decimation);
goto out_ok;
......
......@@ -12,7 +12,8 @@ struct decimating {
extern struct decimating *decimating_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock);
struct labcomm_lock *lock,
struct labcomm_memory *memory);
#define SET_DECIMATION LABCOMM_IOSW('d',0,int)
......
......@@ -27,19 +27,28 @@
#include "introspecting.h"
#include "gen/introspecting_messages.h"
enum status {unknown, unhandled, unregistered, registered};
struct introspecting_private {
struct introspecting introspecting;
struct labcomm_lock *lock;
struct labcomm_memory *memory;
struct labcomm_encoder *encoder;
int encoder_initialized;
struct labcomm_decoder *decoder;
int decoder_initialized;
struct labcomm_lock *lock;
struct labcomm_reader_action_context reader_action_context;
struct labcomm_writer_action_context writer_action_context;
LABCOMM_SIGNATURE_ARRAY_DEF(introspection,
struct introspection {
int has_got_response;
int has_handler;
LABCOMM_SIGNATURE_ARRAY_DEF(remote,
struct remote {
char *name;
int size;
uint8_t *signature;
});
LABCOMM_SIGNATURE_ARRAY_DEF(local,
struct local {
enum status status;
struct labcomm_signature *signature;
});
};
......@@ -48,16 +57,37 @@ static void handles_signature(
void * context)
{
fprintf(stderr, "### %s %x %s\n", __FUNCTION__, value->index, value->name);
/*
struct introspecting_private *introspecting = context;
struct introspection *introspection;
introspection = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->introspection,
struct introspection,
value->index);
introspection->has_handler = value->has_handler;
*/
struct remote *remote;
remote = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->remote,
struct remote,
value->index);
remote->name = strdup(value->name);
remote->signature = malloc(value->signature.n_0);
if (remote->signature) {
int i;
memcpy(remote->signature, value->signature.a, value->signature.n_0);
remote->size = value->signature.n_0;
LABCOMM_SIGNATURE_ARRAY_FOREACH(introspecting->local, struct local, i) {
struct local *l;
l = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->local, struct local, i);
if (l->signature &&
l->status == unhandled &&
l->signature->name &&
strcmp(remote->name, l->signature->name) == 0 &&
remote->size == l->signature->size &&
memcmp(l->signature->signature, remote->signature,
l->signature->size) == 0) {
fprintf(stderr, "OK %s %x %x\n", __FUNCTION__, value->index, i);
l->status = unregistered;
}
}
}
}
static int wrap_reader_alloc(
......@@ -87,10 +117,19 @@ static int wrap_reader_start(
void *value)
{
struct introspecting_private *introspecting = action_context->context;
if (value == NULL && introspecting->decoder_initialized) {
int result;
result = labcomm_reader_start(r, action_context->next, index,
signature, value);
if (value == NULL) {
introspecting_messages_handles_signature handles_signature;
labcomm_lock_acquire(introspecting->lock);
while (introspecting->encoder == NULL) {
/* Wait for the encoder to become functional */
labcomm_lock_wait(introspecting->lock, 1000000);
}
labcomm_lock_release(introspecting->lock);
handles_signature.index = index;
handles_signature.name = signature->name;
handles_signature.signature.n_0 = signature->size;
......@@ -98,7 +137,7 @@ static int wrap_reader_start(
labcomm_encode_introspecting_messages_handles_signature(
introspecting->encoder, &handles_signature);
}
return labcomm_reader_start(r, action_context->next, index, signature, value);
return result;
}
void encode_handles_signature(
......@@ -146,7 +185,10 @@ static int wrap_writer_alloc(
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
/* Stash away encoder for later use */
labcomm_lock_acquire(introspecting->lock);
introspecting->encoder = encoder;
labcomm_lock_notify(introspecting->lock);
labcomm_lock_release(introspecting->lock);
result = labcomm_writer_alloc(w, action_context->next,
encoder, labcomm_version, enqueue);
enqueue(encoder, register_signatures, NULL);
......@@ -159,9 +201,39 @@ static int wrap_writer_start(
int index, struct labcomm_signature *signature,
void *value)
{
// struct introspecting_private *introspecting = action_context->context;
fprintf(stderr, "%s %p\n", __FUNCTION__, value);
struct introspecting_private *introspecting = action_context->context;
struct local *local;
fprintf(stderr, "%s %x %s\n", __FUNCTION__, index, signature->name);
local = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->local,
struct local,
index);
if (local->signature == NULL) {
local->signature = signature;
}
if (local->status == unknown) {
int i;
int found = 0;
LABCOMM_SIGNATURE_ARRAY_FOREACH(introspecting->remote, struct remote, i) {
struct remote *r;
r = LABCOMM_SIGNATURE_ARRAY_REF(introspecting->memory,
introspecting->remote, struct remote, i);
if (r->name &&
strcmp(signature->name, r->name) == 0 &&
r->size == signature->size &&
memcmp(signature->signature, r->signature, signature->size) == 0) {
fprintf(stderr, "OK %s %x %x\n", __FUNCTION__, index, i);
found = i;
}
}
if (found == 0) {
local->status = unhandled;
}
fprintf(stderr, "Found: %d\n", found);
}
return labcomm_writer_start(w, action_context->next, index, signature, value);
}
......@@ -177,7 +249,8 @@ struct labcomm_writer_action introspecting_writer_action = {
extern struct introspecting *introspecting_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock)
struct labcomm_lock *lock,
struct labcomm_memory *memory)
{
struct introspecting_private *result;
......@@ -202,12 +275,14 @@ extern struct introspecting *introspecting_new(
result->introspecting.writer = writer;
/* Init other fields */
result->lock = lock;
result->memory = memory;
result->encoder = NULL;
result->encoder_initialized = 0;
result->decoder = NULL;
result->decoder_initialized = 0;
result->lock = lock;
LABCOMM_SIGNATURE_ARRAY_INIT(result->introspection, struct introspection);
LABCOMM_SIGNATURE_ARRAY_INIT(result->remote, struct remote);
LABCOMM_SIGNATURE_ARRAY_INIT(result->local, struct local);
goto out_ok;
......
......@@ -36,7 +36,8 @@ struct introspecting {
extern struct introspecting *introspecting_new(
struct labcomm_reader *reader,
struct labcomm_writer *writer,
struct labcomm_lock *lock);
struct labcomm_lock *lock,
struct labcomm_memory *memory);
#define HAS_SIGNATURE LABCOMM_IOS('i',2)
......
......@@ -68,13 +68,18 @@ static void handle_Terminate(types_Terminate *value, void *context)
exit(0);
}
static void *run_decoder(void *context)
static void *run_decoder(void *arg)
{
struct labcomm_decoder *decoder = context;
struct client *client = arg;
int result;
labcomm_decoder_register_types_A(client->decoder, handle_A, client);
labcomm_decoder_register_types_B(client->decoder, handle_B, client);
labcomm_decoder_register_types_Terminate(client->decoder, handle_Terminate,
NULL);
do {
result = labcomm_decoder_decode_one(decoder);
result = labcomm_decoder_decode_one(client->decoder);
} while (result >= 0);
return NULL;
}
......@@ -89,30 +94,31 @@ static void *run_client(void *arg)
printf("Client start\n");
client->A = 0;
client->B = 0;
lock = labcomm_pthread_mutex_lock_new();
decimating = decimating_new(labcomm_fd_reader_new(client->fd, 1),
labcomm_fd_writer_new(client->fd, 0),
lock);
lock = labcomm_pthread_mutex_lock_new(labcomm_default_memory);
decimating = decimating_new(labcomm_fd_reader_new(labcomm_default_memory,
client->fd, 1),
labcomm_fd_writer_new(labcomm_default_memory,
client->fd, 0),
lock,
labcomm_default_memory);
if (decimating == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
introspecting = introspecting_new(decimating->reader,
decimating->writer,
lock);
lock,
labcomm_default_memory);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
client->decoder = labcomm_decoder_new(introspecting->reader, lock);
client->encoder = labcomm_encoder_new(introspecting->writer, lock);
client->decoder = labcomm_decoder_new(introspecting->reader, lock,
labcomm_default_memory);
client->encoder = labcomm_encoder_new(introspecting->writer, lock,
labcomm_default_memory);
pthread_t rdt;
labcomm_decoder_register_types_A(client->decoder, handle_A, client);
labcomm_decoder_register_types_B(client->decoder, handle_B, client);
labcomm_decoder_register_types_Terminate(client->decoder, handle_Terminate,
NULL);
pthread_create(&rdt, NULL, run_decoder, client->decoder);
pthread_create(&rdt, NULL, run_decoder, client);
labcomm_encoder_register_types_Sum(client->encoder);
labcomm_encoder_register_types_Diff(client->encoder);
pthread_join(rdt, NULL);
......
......@@ -8,6 +8,7 @@ LDFLAGS=-L.
LDLIBS_TEST=-lcunit -llabcomm -Tlabcomm.linkscript
OBJS= labcomm.o \
labcomm_memory.o labcomm_default_memory.o \
labcomm_dynamic_buffer_writer.o labcomm_fd_reader.o labcomm_fd_writer.o \
labcomm_pthread_mutex_lock.o
......
......@@ -43,6 +43,7 @@ struct labcomm_decoder {
void *context;
struct labcomm_reader *reader;
struct labcomm_lock *lock;
struct labcomm_memory *memory;
labcomm_error_handler_callback on_error;
labcomm_handle_new_datatype_callback on_new_datatype;
LABCOMM_SIGNATURE_ARRAY_DEF(local_to_remote, int);
......@@ -53,6 +54,7 @@ struct labcomm_encoder {
void *context;
struct labcomm_writer *writer;
struct labcomm_lock *lock;
struct labcomm_memory *memory;
struct labcomm_encoder *is_deferred;
int busy;
int waiting;
......@@ -347,13 +349,16 @@ static int get_encoder_index(
struct labcomm_encoder *labcomm_encoder_new(
struct labcomm_writer *writer,
struct labcomm_lock *lock)
struct labcomm_lock *lock,
struct labcomm_memory *memory)
{
struct labcomm_encoder *result = malloc(sizeof(*result));
struct labcomm_encoder *result;
result = labcomm_memory_alloc(memory, 0, sizeof(*result));
if (result) {
struct labcomm_encoder_context *context;
context = malloc(sizeof(*context));
context = labcomm_memory_alloc(memory, 0, sizeof(*context));
#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
context->sample = NULL;
context->index = LABCOMM_USER;
......@@ -362,12 +367,14 @@ struct labcomm_encoder *labcomm_encoder_new(
#endif
result->context = context;
result->writer = writer;
result->writer->encoder = result;
result->writer->data = NULL;
result->writer->data_size = 0;
result->writer->count = 0;
result->writer->pos = 0;
result->writer->error = 0;
result->lock = lock;
result->memory = memory;
result->is_deferred = NULL;
result->busy = 0;
result->waiting = 0;
......@@ -386,8 +393,7 @@ static int encoder_enqueue_action(
int result;
struct encoder_alloc_action *element, **next;
fprintf(stderr, "%s %p\n", __FUNCTION__, action);
element = malloc(sizeof(*action));
element = labcomm_memory_alloc(encoder->memory, 0, sizeof(*action));
if (element == NULL) {
result = -ENOMEM;
goto out;
......@@ -427,11 +433,10 @@ static struct labcomm_encoder *enter_encoder(struct labcomm_encoder *e)
while (p) {
struct encoder_alloc_action *tmp;
fprintf(stderr, "RUN %p", p->action);
p->action(&deferred, p->context);
tmp = p;
p = p->next;
free(tmp);
labcomm_memory_free(e->memory, 1, tmp);
}
}
}
......@@ -464,7 +469,8 @@ int labcomm_internal_encoder_register(
index = get_encoder_index(e, signature);
if (signature->type == LABCOMM_SAMPLE) {
if (index > 0) {
int *registered = LABCOMM_SIGNATURE_ARRAY_REF(e->registered, int, index);
int *registered = LABCOMM_SIGNATURE_ARRAY_REF(e->memory, e->registered,
int, index);
if (! *registered) {
int err;
......@@ -473,6 +479,8 @@ int labcomm_internal_encoder_register(
index, signature, NULL);
if (err == -EALREADY) {
result = 0;
} else if (err != 0) {
result = err;
} else if (err == 0) {
int i;
......@@ -525,24 +533,25 @@ no_end:
void labcomm_encoder_free(struct labcomm_encoder* e)
{
struct labcomm_encoder_context *context;
struct labcomm_memory * memory = e->memory;
context = (struct labcomm_encoder_context *) e->context;
labcomm_writer_free(e->writer, e->writer->action_context);
LABCOMM_SIGNATURE_ARRAY_FREE(e->registered, int);
LABCOMM_SIGNATURE_ARRAY_FREE(e->memory, e->registered, int);
#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
struct labcomm_sample_entry *entry = context->sample;
struct labcomm_sample_entry *entry_next;
while (entry != NULL) {
entry_next = entry->next;
free(entry);
labcomm_memory_free(memory, 0, entry);
entry = entry_next;
}
#else
free(context->by_section);
labcomm_memory_free(memory, 0, context->by_section);
#endif
free(e->context);
free(e);
labcomm_memory_free(memory, 0, e->context);
labcomm_memory_free(memory, 0, e);
}
int labcomm_encoder_ioctl(struct labcomm_encoder *encoder,
......@@ -629,7 +638,7 @@ static void collect_flat_signature(
for (i = 0 ; i < fields ; i++) {
char *name = labcomm_read_string(decoder->reader);
labcomm_write_string(writer, name);
free(name);
labcomm_memory_free(decoder->memory, 1, name);
collect_flat_signature(decoder, writer);