Commit 9b3081f9 authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Added locking to labcomm_internal_encoder_register and

labcomm_internal_encode, this triggered lots of changes to handling of 
stacked writer allocation. Twoway example is not finished yet.
parent 1112f3ba
......@@ -339,7 +339,7 @@ aspect C_Declarations {
}
public void SampleDecl.C_emitDecoderDeclaration(C_env env) {
env.println("void labcomm_decoder_register_" +
env.println("int labcomm_decoder_register_" +
env.prefix + getName() + "(");
env.indent();
env.println("struct labcomm_decoder *d,");
......@@ -366,7 +366,7 @@ aspect C_Declarations {
}
public void SampleDecl.C_emitEncoderDeclaration(C_env env) {
env.println("void labcomm_encoder_register_" +
env.println("int labcomm_encoder_register_" +
env.prefix + getName() + "(");
env.indent();
env.println("struct labcomm_encoder *e);");
......@@ -617,7 +617,7 @@ aspect C_Decoder {
}
public void SampleDecl.C_emitDecoderRegisterHandler(C_env env) {
env.println("void labcomm_decoder_register_" +
env.println("int labcomm_decoder_register_" +
env.prefix + getName() + "(");
env.indent();
env.println("struct labcomm_decoder *d,");
......@@ -632,7 +632,7 @@ aspect C_Decoder {
env.println(")");
env.println("{");
env.indent();
env.println("labcomm_internal_decoder_register(");
env.println("return labcomm_internal_decoder_register(");
env.indent();
env.println("d,");
env.println("&labcomm_signature_" + env.prefix + getName() + ",");
......@@ -804,7 +804,7 @@ aspect C_Encoder {
}
public void SampleDecl.C_emitEncoderRegisterHandler(C_env env) {
env.println("void labcomm_encoder_register_" +
env.println("int labcomm_encoder_register_" +
env.prefix + getName() + "(");
env.indent();
env.println("struct labcomm_encoder *e");
......@@ -812,7 +812,7 @@ aspect C_Encoder {
env.println(")");
env.println("{");
env.indent();
env.println("labcomm_internal_encoder_register(");
env.println("return labcomm_internal_encoder_register(");
env.indent();
env.println("e,");
env.println("&labcomm_signature_" + env.prefix + getName() + ",");
......
......@@ -28,7 +28,7 @@ gen/client: client.c
$(CC) -o $@ $(CFLAGS) $^ -lpthread \
-L../../lib/c -llabcomm -Tlabcomm.linkscript
gen/server: server.c gen/types.o gen/decimating.o
gen/server: server.c
$(CC) -o $@ $(CFLAGS) $^ -lpthread \
-L../../lib/c -llabcomm -Tlabcomm.linkscript
......@@ -38,9 +38,17 @@ clean:
gen/decimating.o: decimating.h
gen/decimating.o: gen/decimating_messages.h
gen/introspecting.o: introspecting.h
gen/introspecting.o: gen/introspecting_messages.h
gen/client.o: decimating.h
gen/client.o: gen/types.h
gen/client: gen/types.o
gen/client: gen/decimating.o
gen/client: gen/decimating_messages.o
gen/client: gen/types.o
gen/client: gen/introspecting.o
gen/client: gen/introspecting_messages.o
gen/server: gen/types.o
gen/server: gen/decimating.o
gen/server: gen/decimating_messages.o
gen/server: gen/introspecting.o
gen/server: gen/introspecting_messages.o
/*
client.c -- LabComm example of using stacked readers/writers.
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <arpa/inet.h>
#include <linux/tcp.h>
#include <netdb.h>
......@@ -14,6 +35,7 @@
#include <labcomm_fd_writer.h>
#include <labcomm_pthread_mutex_lock.h>
#include "decimating.h"
#include "introspecting.h"
#include "gen/types.h"
static void handle_Sum(int32_t *value, void *context)
......@@ -47,6 +69,7 @@ int main(int argc, char *argv[])
struct sockaddr_in to;
int nodelay;
struct decimating *decimating;
struct introspecting *introspecting;
char *hostname;
int port;
struct labcomm_lock *lock;
......@@ -94,10 +117,18 @@ int main(int argc, char *argv[])
labcomm_fd_writer_new(fd, 0),
lock);
if (decimating == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
introspecting = introspecting_new(decimating->reader,
decimating->writer,
lock);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
decoder = labcomm_decoder_new(decimating->reader, lock);
encoder = labcomm_encoder_new(decimating->writer, lock);
decoder = labcomm_decoder_new(introspecting->reader, lock);
encoder = labcomm_encoder_new(introspecting->writer, lock);
pthread_t rdt;
labcomm_decoder_register_types_Sum(decoder, handle_Sum, NULL);
......
......@@ -29,10 +29,9 @@
struct decimating_private {
struct decimating decimating;
struct labcomm_lock *lock;
struct labcomm_encoder *encoder;
int encoder_initialized;
struct labcomm_decoder *decoder;
int decoder_initialized;
struct labcomm_reader_action_context reader_action_context;
struct labcomm_writer_action_context writer_action_context;
LABCOMM_SIGNATURE_ARRAY_DEF(decimation,
......@@ -62,26 +61,17 @@ static int wrap_reader_alloc(
struct labcomm_decoder *decoder,
char *labcomm_version)
{
int result;
struct decimating_private *decimating = action_context->context;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
/* Stash away decoder for later use */
decimating->decoder = decoder;
return labcomm_reader_alloc(r, action_context->next,
result = labcomm_reader_alloc(r, action_context->next,
decoder, labcomm_version);
}
static int wrap_reader_start(
struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context)
{
struct decimating_private *decimating = action_context->context;
if (! decimating->decoder_initialized) {
decimating->decoder_initialized = 1;
labcomm_decoder_register_decimating_messages_set_decimation(
decimating->decoder, set_decimation, decimating);
}
return labcomm_reader_start(r, action_context->next);
decoder, set_decimation, decimating);
return result;
}
static int wrap_reader_ioctl(
......@@ -113,40 +103,47 @@ static int wrap_reader_ioctl(
struct labcomm_reader_action decimating_reader_action = {
.alloc = wrap_reader_alloc,
.free = NULL,
.start = wrap_reader_start,
.start = NULL,
.end = NULL,
.fill = NULL,
.ioctl = wrap_reader_ioctl
};
static void register_signatures(struct labcomm_encoder *encoder,
void *context)
{
labcomm_encoder_register_decimating_messages_set_decimation(
encoder);
}
static int wrap_writer_alloc(
struct labcomm_writer *w,
struct labcomm_writer_action_context *action_context,
struct labcomm_encoder *encoder, char *labcomm_version)
struct labcomm_encoder *encoder, char *labcomm_version,
labcomm_encoder_enqueue enqueue)
{
int result;
struct decimating_private *decimating = action_context->context;
fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
/* Stash away encoder for later use */
decimating->encoder = encoder;
return labcomm_writer_alloc(w, action_context->next,
encoder, labcomm_version);
result = labcomm_writer_alloc(w, action_context->next,
encoder, labcomm_version, enqueue);
enqueue(encoder, register_signatures, NULL);
return result;
}
static int wrap_writer_start(
struct labcomm_writer *w,
struct labcomm_writer_action_context *action_context,
struct labcomm_encoder *encoder,
int index, struct labcomm_signature *signature,
void *value)
{
struct decimating_private *decimating = action_context->context;
struct decimation *decimation;
if (! decimating->encoder_initialized) {
decimating->encoder_initialized = 1;
labcomm_encoder_register_decimating_messages_set_decimation(
decimating->encoder);
}
decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->decimation,
struct decimation, index);
decimation->current++;
......@@ -155,7 +152,7 @@ static int wrap_writer_start(
} else {
decimation->current = 0;
return labcomm_writer_start(w, action_context->next,
encoder, index, signature, value);
index, signature, value);
}
}
......@@ -196,10 +193,9 @@ extern struct decimating *decimating_new(
result->decimating.writer = writer;
/* Init other fields */
result->lock = lock;
result->encoder = NULL;
result->encoder_initialized = 0;
result->decoder = NULL;
result->decoder_initialized = 0;
LABCOMM_SIGNATURE_ARRAY_INIT(result->decimation, struct decimation);
goto out_ok;
......
typedef struct {
int index;
byte signature[_];
} has_signature_request;
typedef struct {
int index;
boolean result;
} has_signature_response;
sample has_signature_request encoder_has_signature_request;
sample has_signature_response encoder_has_signature_response;
sample has_signature_request decoder_has_signature_request;
sample has_signature_response decoder_has_signature_response;
\ No newline at end of file
/*
server.c -- LabComm example of using stacked readers/writers.
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <arpa/inet.h>
#include <errno.h>
#include <pthread.h>
......@@ -10,6 +31,7 @@
#include <labcomm_fd_reader.h>
#include <labcomm_fd_writer.h>
#include "decimating.h"
#include "introspecting.h"
#include "gen/types.h"
struct client {
......@@ -61,6 +83,7 @@ static void *run_client(void *arg)
{
struct client *client = arg;
struct decimating *decimating;
struct introspecting *introspecting;
struct labcomm_lock *lock;
printf("Client start\n");
......@@ -74,8 +97,15 @@ static void *run_client(void *arg)
/* Warning: might leak reader and writer at this point */
goto out;
}
client->decoder = labcomm_decoder_new(decimating->reader, lock);
client->encoder = labcomm_encoder_new(decimating->writer, lock);
introspecting = introspecting_new(decimating->reader,
decimating->writer,
lock);
if (introspecting == NULL) {
/* Warning: might leak reader and writer at this point */
goto out;
}
client->decoder = labcomm_decoder_new(introspecting->reader, lock);
client->encoder = labcomm_encoder_new(introspecting->writer, lock);
pthread_t rdt;
labcomm_decoder_register_types_A(client->decoder, handle_A, client);
......
......@@ -2,4 +2,5 @@ sample int A;
sample int B;
sample int Sum;
sample int Diff;
sample int Product;
sample void Terminate;
\ No newline at end of file
......@@ -45,7 +45,7 @@ liblabcomm.a: $(OBJS)
ar -r liblabcomm.a $^
liblabcomm.so.1: $(OBJS:%.o=%.pic.o)
gcc -shared -Wl,-soname,$@ -o $@ $^ -lc
gcc -shared -Wl,-soname,$@ -o $@ $^ -lc -lrt
labcomm.o : labcomm.c labcomm.h labcomm_private.h
......
......@@ -53,6 +53,14 @@ struct labcomm_encoder {
void *context;
struct labcomm_writer *writer;
struct labcomm_lock *lock;
struct labcomm_encoder *is_deferred;
int busy;
int waiting;
struct encoder_alloc_action {
struct encoder_alloc_action *next;
void (*action)(struct labcomm_encoder *encoder, void *context);
void *context;
} *alloc_action;
labcomm_error_handler_callback on_error;
LABCOMM_SIGNATURE_ARRAY_DEF(registered, int);
};
......@@ -67,32 +75,60 @@ struct labcomm_sample_entry {
void *context;
};
#ifndef LABCOMM_ENCODER_LINEAR_SEARCH
extern struct labcomm_signature labcomm_first_signature;
extern struct labcomm_signature labcomm_last_signature;
#endif
struct labcomm_encoder_context {
#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
struct labcomm_sample_entry *sample;
int index;
#else
struct labcomm_sample_entry *by_section;
#endif
};
struct labcomm_decoder_context {
struct labcomm_sample_entry *sample;
};
/* Lock wrappers */
#define CONDCALL_lock(lock, ...) lock
#define CONDCALL(func, ...) \
if (CONDCALL_lock(__VA_ARGS__) && \
CONDCALL_lock(__VA_ARGS__)->action->func) { \
return CONDCALL_lock(__VA_ARGS__)->action->func(__VA_ARGS__); \
} \
return -ENOSYS;
int labcomm_lock_free(struct labcomm_lock *lock) {
CONDCALL(free, lock);
}
int labcomm_lock_acquire(struct labcomm_lock *lock)
{
CONDCALL(acquire, lock);
}
int labcomm_lock_release(struct labcomm_lock *lock)
{
CONDCALL(release, lock);
}
int labcomm_lock_wait(struct labcomm_lock *lock, useconds_t usec){
CONDCALL(wait, lock, usec);
}
int labcomm_lock_notify(struct labcomm_lock *lock)
{
CONDCALL(notify, lock);
}
#undef CONDCALL
#undef CONDCALL_lock
/* Unwrapping reader/writer functions */
#define UNWRAP_ac(func, rw, ac, ...) ac
#define UNWRAP_ac(rw, ac, ...) ac
#define UNWRAP(func, ...) \
while (1) { \
if (UNWRAP_ac(func, __VA_ARGS__)->action->func) { \
return UNWRAP_ac(func, __VA_ARGS__)->action->func(__VA_ARGS__); } \
if (UNWRAP_ac(func, __VA_ARGS__)->next == NULL) { return -ENOSYS; } \
UNWRAP_ac(func, __VA_ARGS__) = UNWRAP_ac(func, __VA_ARGS__)->next; \
if (UNWRAP_ac(__VA_ARGS__)->action->func) { \
return UNWRAP_ac(__VA_ARGS__)->action->func(__VA_ARGS__); } \
if (UNWRAP_ac(__VA_ARGS__)->next == NULL) { return -ENOSYS; } \
UNWRAP_ac( __VA_ARGS__) = UNWRAP_ac(__VA_ARGS__)->next; \
}
int labcomm_reader_alloc(struct labcomm_reader *r,
......@@ -110,9 +146,11 @@ int labcomm_reader_free(struct labcomm_reader *r,
}
int labcomm_reader_start(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context)
struct labcomm_reader_action_context *action_context,
int index, struct labcomm_signature *signature,
void *value)
{
UNWRAP(start, r, action_context);
UNWRAP(start, r, action_context, index, signature, value);
}
int labcomm_reader_end(struct labcomm_reader *r,
......@@ -139,9 +177,10 @@ int labcomm_reader_ioctl(struct labcomm_reader *r,
int labcomm_writer_alloc(struct labcomm_writer *w,
struct labcomm_writer_action_context *action_context,
struct labcomm_encoder *encoder,
char *labcomm_version)
char *labcomm_version,
labcomm_encoder_enqueue enqueue)
{
UNWRAP(alloc, w, action_context, encoder, labcomm_version);
UNWRAP(alloc, w, action_context, encoder, labcomm_version, enqueue);
}
int labcomm_writer_free(struct labcomm_writer *w,
......@@ -152,11 +191,10 @@ int labcomm_writer_free(struct labcomm_writer *w,
int labcomm_writer_start(struct labcomm_writer *w,
struct labcomm_writer_action_context *action_context,
struct labcomm_encoder *encoder,
int index, struct labcomm_signature *signature,
void *value)
{
UNWRAP(start, w, action_context, encoder, index, signature, value);
UNWRAP(start, w, action_context, index, signature, value);
}
int labcomm_writer_end(struct labcomm_writer *w,
......@@ -180,6 +218,8 @@ int labcomm_writer_ioctl(struct labcomm_writer *w,
UNWRAP(ioctl, w, action_context, index, signature, ioctl_action, args);
}
#undef UNWRAP
#undef UNWRAP_ac
......@@ -287,27 +327,6 @@ static struct labcomm_sample_entry *get_sample_by_index(
return p;
}
#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
static int get_encoder_index_by_search(
struct labcomm_encoder *e,
struct labcomm_signature *s)
{
int result = 0;
struct labcomm_encoder_context *context = e->context;
struct labcomm_sample_entry *sample = context->sample;
while (sample) {
if (sample->signature == s) { break; }
sample = sample->next;
}
if (sample) {
result = sample->index;
}
return result;
}
#else
static int get_local_index(
struct labcomm_signature *s)
{
......@@ -319,83 +338,12 @@ static int get_local_index(
return result;
}
#endif
static int get_encoder_index(
struct labcomm_encoder *e,
struct labcomm_signature *s)
{
#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
return get_encoder_index_by_search(e, s);
#else
return get_local_index(s);
#endif
}
static void labcomm_encode_signature(struct labcomm_encoder *e,
struct labcomm_signature *signature)
{
int i, index;
index = get_encoder_index(e, signature);
labcomm_writer_start(e->writer, e->writer->action_context,
e, index, signature, NULL);
labcomm_write_packed32(e->writer, signature->type);
labcomm_write_packed32(e->writer, index);
labcomm_write_string(e->writer, signature->name);
for (i = 0 ; i < signature->size ; i++) {
if (e->writer->pos >= e->writer->count) {
labcomm_writer_flush(e->writer, e->writer->action_context);
}
e->writer->data[e->writer->pos] = signature->signature[i];
e->writer->pos++;
}
labcomm_writer_end(e->writer, e->writer->action_context);
}
#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
static int encoder_add_signature_by_search(struct labcomm_encoder *e,
struct labcomm_signature *signature,
labcomm_encoder_function encode)
{
int result;
struct labcomm_encoder_context *context = e->context;
struct labcomm_sample_entry *sample;
sample = (struct labcomm_sample_entry *)malloc(sizeof(*sample));
if (sample == NULL) {
result = -ENOMEM;
} else {
sample->next = context->sample;
sample->index = context->index;
sample->signature = signature;
sample->encode = encode;
context->index++;
context->sample = sample;
result = sample->index;
}
return result;
}
#endif
/*
static struct labcomm_sample_entry *encoder_get_sample_by_signature_address(
struct labcomm_encoder *encoder,
struct labcomm_signature *s)
{
struct labcomm_sample_entry *result = NULL;
struct labcomm_encoder_context *context = encoder->context;
#ifndef LABCOMM_ENCODER_LINEAR_SEARCH
if (&labcomm_first_signature <= s && s <= &labcomm_last_signature) {
result = &context->by_section[s - &labcomm_first_signature];
}
#else
result = get_sample_by_signature_address(context->sample, s);
#endif
return result;
}
*/
struct labcomm_encoder *labcomm_encoder_new(
struct labcomm_writer *writer,
......@@ -414,58 +362,154 @@ struct labcomm_encoder *labcomm_encoder_new(
#endif
result->context = context;
result->writer = writer;
result->writer->data = 0;
result->writer->data = NULL;
result->writer->data_size = 0;
result->writer->count = 0;
result->writer->pos = 0;
result->writer->error = 0;
result->lock = lock;
result->is_deferred = NULL;
result->busy = 0;
result->waiting = 0;
result->alloc_action = NULL;
result->on_error = on_error_fprintf;
LABCOMM_SIGNATURE_ARRAY_INIT(result->registered, int);
labcomm_writer_alloc(result->writer,result->writer->action_context,
result, LABCOMM_VERSION);
}
return result;
}
void labcomm_internal_encoder_register(
struct labcomm_encoder *e,
static int encoder_enqueue_action(
struct labcomm_encoder *encoder,
void (*action)(struct labcomm_encoder *encoder, void *context),
void *context)
{
int result;
struct encoder_alloc_action *element, **next;
fprintf(stderr, "%s %p\n", __FUNCTION__, action);
element = malloc(sizeof(*action));
if (element == NULL) {
result = -ENOMEM;