Skip to content
Snippets Groups Projects
Commit d44882a8 authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Corrected allocation order in encoder and decoder.

Fixed recursion bug in scheduler.
parent 2270c9ec
No related branches found
No related tags found
No related merge requests found
......@@ -37,6 +37,7 @@ struct sample_entry {
struct labcomm_decoder {
struct labcomm_reader *reader;
int reader_allocated;
struct labcomm_error_handler *error;
struct labcomm_memory *memory;
struct labcomm_scheduler *scheduler;
......@@ -63,6 +64,7 @@ struct labcomm_decoder *labcomm_decoder_new(
result->reader->count = 0;
result->reader->pos = 0;
result->reader->error = 0;
result->reader_allocated = 0;
result->error = error;
result->memory = memory;
result->scheduler = scheduler;
......@@ -264,10 +266,11 @@ static void call_handler(void *value, void *context)
labcomm_reader_end(wrap->reader, wrap->reader->action_context);
}
static void reader_alloc(struct labcomm_reader *reader)
static void reader_alloc(struct labcomm_decoder *d)
{
if (reader->data == NULL) {
labcomm_reader_alloc(reader, reader->action_context,
if (!d->reader_allocated) {
d->reader_allocated = 1;
labcomm_reader_alloc(d->reader, d->reader->action_context,
LABCOMM_VERSION);
}
}
......@@ -276,7 +279,7 @@ int labcomm_decoder_decode_one(struct labcomm_decoder *d)
{
int result, remote_index;
reader_alloc(d->reader);
reader_alloc(d);
remote_index = labcomm_read_packed32(d->reader);
if (d->reader->error < 0) {
result = d->reader->error;
......@@ -377,7 +380,7 @@ int labcomm_internal_decoder_register(
int local_index;
struct sample_entry *entry;
reader_alloc(d->reader);
reader_alloc(d);
local_index = labcomm_signature_local_index(signature);
if (local_index <= 0) { goto out; }
labcomm_reader_start(d->reader, d->reader->action_context,
......
......@@ -55,6 +55,8 @@ struct labcomm_encoder *labcomm_encoder_new(
result->memory = memory;
result->scheduler = scheduler;
LABCOMM_SIGNATURE_ARRAY_INIT(result->registered, int);
labcomm_writer_alloc(result->writer,
result->writer->action_context, LABCOMM_VERSION);
}
return result;
}
......@@ -78,9 +80,6 @@ int labcomm_internal_encoder_register(
index = labcomm_signature_local_index(signature);
labcomm_scheduler_writer_lock(e->scheduler);
if (e->writer->data == NULL) {
labcomm_writer_alloc(e->writer,e->writer->action_context, LABCOMM_VERSION);
}
if (signature->type != LABCOMM_SAMPLE) { goto out; }
if (index <= 0) { goto out; }
done = LABCOMM_SIGNATURE_ARRAY_REF(e->memory, e->registered, int, index);
......@@ -118,9 +117,6 @@ int labcomm_internal_encode(
index = labcomm_signature_local_index(signature);
labcomm_scheduler_writer_lock(e->scheduler);
if (e->writer->data == NULL) {
labcomm_writer_alloc(e->writer,e->writer->action_context, LABCOMM_VERSION);
}
result = labcomm_writer_start(e->writer, e->writer->action_context,
index, signature, value);
if (result == -EALREADY) { result = 0; goto no_end; }
......@@ -162,61 +158,11 @@ int labcomm_internal_encoder_ioctl(struct labcomm_encoder *encoder,
uint32_t action, va_list va)
{
int result = -ENOTSUP;
int index;
index = labcomm_signature_local_index(signature);
result = labcomm_writer_ioctl(encoder->writer,
encoder->writer->action_context,
-1, signature, action, va);
index, signature, action, va);
return result;
}
#if 0
static struct labcomm_encoder *enter_encoder(struct labcomm_encoder *e)
{
if (e->is_deferred) {
return e->is_deferred;
} else {
labcomm_lock_acquire(e->lock);
e->waiting++;
while (e->busy) { labcomm_lock_wait(e->lock, 10000000); }
e->busy = 1;
labcomm_lock_release(e->lock);
if (e->writer->data == NULL) {
labcomm_writer_alloc(e->writer,e->writer->action_context,
e, LABCOMM_VERSION, encoder_enqueue_action);
if (e->alloc_action) {
struct labcomm_encoder deferred;
struct encoder_alloc_action *p;
deferred.is_deferred = e;
p = e->alloc_action;
e->alloc_action = NULL;
while (p) {
struct encoder_alloc_action *tmp;
p->action(&deferred, p->context);
tmp = p;
p = p->next;
labcomm_memory_free(e->memory, 1, tmp);
}
}
}
}
return e;
}
static void leave_encoder(struct labcomm_encoder *e)
{
if (!e->is_deferred) {
labcomm_lock_acquire(e->lock); {
e->busy = 0;
e->waiting--;
if (e->waiting) {
labcomm_lock_notify(e->lock);
}
} labcomm_lock_release(e->lock);
}
}
#endif
......@@ -48,6 +48,7 @@ struct pthread_scheduler {
pthread_mutex_t writer_mutex;
pthread_mutex_t data_mutex;
pthread_cond_t data_cond;
int running_deferred;
struct pthread_deferred deferred;
struct pthread_deferred deferred_with_delay;
};
......@@ -137,6 +138,7 @@ static struct labcomm_time_action time_action = {
static int run_action(struct pthread_scheduler *scheduler,
struct pthread_deferred *element)
{
/* Called with data_lock held */
element->prev->next = element->next;
element->next->prev = element->prev;
labcomm_scheduler_data_unlock(&scheduler->scheduler);
......@@ -148,6 +150,9 @@ static int run_action(struct pthread_scheduler *scheduler,
static int run_deferred(struct pthread_scheduler *scheduler)
{
/* Called with data_lock held */
if (scheduler->running_deferred) { goto out; }
scheduler->running_deferred = 1;
while (!queue_empty(&scheduler->deferred)) {
run_action(scheduler, scheduler->deferred.next);
}
......@@ -160,6 +165,8 @@ static int run_deferred(struct pthread_scheduler *scheduler)
run_action(scheduler, scheduler->deferred_with_delay.next);
}
}
scheduler->running_deferred = 0;
out:
return 0;
}
......@@ -359,6 +366,7 @@ struct labcomm_scheduler *labcomm_pthread_scheduler_new(
if (pthread_cond_init(&scheduler->data_cond, NULL) != 0) {
goto destroy_data_mutex;
}
scheduler->running_deferred = 0;
scheduler->deferred.next = &scheduler->deferred;
scheduler->deferred.prev = &scheduler->deferred;
scheduler->deferred_with_delay.next = &scheduler->deferred_with_delay;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment