diff --git a/lib/c/labcomm_decoder.c b/lib/c/labcomm_decoder.c index 175a382440e0840ac2fb2450f6163154cc09d342..bdf5f43c114a17f406ad5f874974a82cb554ae1d 100644 --- a/lib/c/labcomm_decoder.c +++ b/lib/c/labcomm_decoder.c @@ -37,6 +37,7 @@ struct sample_entry { struct labcomm_decoder { struct labcomm_reader *reader; + int reader_allocated; struct labcomm_error_handler *error; struct labcomm_memory *memory; struct labcomm_scheduler *scheduler; @@ -63,6 +64,7 @@ struct labcomm_decoder *labcomm_decoder_new( result->reader->count = 0; result->reader->pos = 0; result->reader->error = 0; + result->reader_allocated = 0; result->error = error; result->memory = memory; result->scheduler = scheduler; @@ -264,10 +266,11 @@ static void call_handler(void *value, void *context) labcomm_reader_end(wrap->reader, wrap->reader->action_context); } -static void reader_alloc(struct labcomm_reader *reader) +static void reader_alloc(struct labcomm_decoder *d) { - if (reader->data == NULL) { - labcomm_reader_alloc(reader, reader->action_context, + if (!d->reader_allocated) { + d->reader_allocated = 1; + labcomm_reader_alloc(d->reader, d->reader->action_context, LABCOMM_VERSION); } } @@ -276,7 +279,7 @@ int labcomm_decoder_decode_one(struct labcomm_decoder *d) { int result, remote_index; - reader_alloc(d->reader); + reader_alloc(d); remote_index = labcomm_read_packed32(d->reader); if (d->reader->error < 0) { result = d->reader->error; @@ -377,7 +380,7 @@ int labcomm_internal_decoder_register( int local_index; struct sample_entry *entry; - reader_alloc(d->reader); + reader_alloc(d); local_index = labcomm_signature_local_index(signature); if (local_index <= 0) { goto out; } labcomm_reader_start(d->reader, d->reader->action_context, diff --git a/lib/c/labcomm_encoder.c b/lib/c/labcomm_encoder.c index ba02ff1b93bc48b82555e6a0e41df61080c858a2..77ccc5ab8296c4eb0bc0bb4c83f093c59f40a4c1 100644 --- a/lib/c/labcomm_encoder.c +++ b/lib/c/labcomm_encoder.c @@ -55,6 +55,8 @@ struct labcomm_encoder *labcomm_encoder_new( result->memory = memory; result->scheduler = scheduler; LABCOMM_SIGNATURE_ARRAY_INIT(result->registered, int); + labcomm_writer_alloc(result->writer, + result->writer->action_context, LABCOMM_VERSION); } return result; } @@ -78,9 +80,6 @@ int labcomm_internal_encoder_register( index = labcomm_signature_local_index(signature); labcomm_scheduler_writer_lock(e->scheduler); - if (e->writer->data == NULL) { - labcomm_writer_alloc(e->writer,e->writer->action_context, LABCOMM_VERSION); - } if (signature->type != LABCOMM_SAMPLE) { goto out; } if (index <= 0) { goto out; } done = LABCOMM_SIGNATURE_ARRAY_REF(e->memory, e->registered, int, index); @@ -118,9 +117,6 @@ int labcomm_internal_encode( index = labcomm_signature_local_index(signature); labcomm_scheduler_writer_lock(e->scheduler); - if (e->writer->data == NULL) { - labcomm_writer_alloc(e->writer,e->writer->action_context, LABCOMM_VERSION); - } result = labcomm_writer_start(e->writer, e->writer->action_context, index, signature, value); if (result == -EALREADY) { result = 0; goto no_end; } @@ -162,61 +158,11 @@ int labcomm_internal_encoder_ioctl(struct labcomm_encoder *encoder, uint32_t action, va_list va) { int result = -ENOTSUP; - + int index; + + index = labcomm_signature_local_index(signature); result = labcomm_writer_ioctl(encoder->writer, encoder->writer->action_context, - -1, signature, action, va); + index, signature, action, va); return result; } - - - -#if 0 -static struct labcomm_encoder *enter_encoder(struct labcomm_encoder *e) -{ - if (e->is_deferred) { - return e->is_deferred; - } else { - labcomm_lock_acquire(e->lock); - e->waiting++; - while (e->busy) { labcomm_lock_wait(e->lock, 10000000); } - e->busy = 1; - labcomm_lock_release(e->lock); - - if (e->writer->data == NULL) { - labcomm_writer_alloc(e->writer,e->writer->action_context, - e, LABCOMM_VERSION, encoder_enqueue_action); - if (e->alloc_action) { - struct labcomm_encoder deferred; - struct encoder_alloc_action *p; - - deferred.is_deferred = e; - p = e->alloc_action; - e->alloc_action = NULL; - while (p) { - struct encoder_alloc_action *tmp; - - p->action(&deferred, p->context); - tmp = p; - p = p->next; - labcomm_memory_free(e->memory, 1, tmp); - } - } - } - } - return e; -} -static void leave_encoder(struct labcomm_encoder *e) -{ - if (!e->is_deferred) { - labcomm_lock_acquire(e->lock); { - e->busy = 0; - e->waiting--; - if (e->waiting) { - labcomm_lock_notify(e->lock); - } - } labcomm_lock_release(e->lock); - } -} - -#endif diff --git a/lib/c/labcomm_pthread_scheduler.c b/lib/c/labcomm_pthread_scheduler.c index 52ebed5dd05f717b5238ba876e2cb01b40bf7b5f..7bb809cdd36e9126afec329b9f55a255030c6f69 100644 --- a/lib/c/labcomm_pthread_scheduler.c +++ b/lib/c/labcomm_pthread_scheduler.c @@ -48,6 +48,7 @@ struct pthread_scheduler { pthread_mutex_t writer_mutex; pthread_mutex_t data_mutex; pthread_cond_t data_cond; + int running_deferred; struct pthread_deferred deferred; struct pthread_deferred deferred_with_delay; }; @@ -137,6 +138,7 @@ static struct labcomm_time_action time_action = { static int run_action(struct pthread_scheduler *scheduler, struct pthread_deferred *element) { + /* Called with data_lock held */ element->prev->next = element->next; element->next->prev = element->prev; labcomm_scheduler_data_unlock(&scheduler->scheduler); @@ -148,6 +150,9 @@ static int run_action(struct pthread_scheduler *scheduler, static int run_deferred(struct pthread_scheduler *scheduler) { + /* Called with data_lock held */ + if (scheduler->running_deferred) { goto out; } + scheduler->running_deferred = 1; while (!queue_empty(&scheduler->deferred)) { run_action(scheduler, scheduler->deferred.next); } @@ -160,6 +165,8 @@ static int run_deferred(struct pthread_scheduler *scheduler) run_action(scheduler, scheduler->deferred_with_delay.next); } } + scheduler->running_deferred = 0; +out: return 0; } @@ -359,6 +366,7 @@ struct labcomm_scheduler *labcomm_pthread_scheduler_new( if (pthread_cond_init(&scheduler->data_cond, NULL) != 0) { goto destroy_data_mutex; } + scheduler->running_deferred = 0; scheduler->deferred.next = &scheduler->deferred; scheduler->deferred.prev = &scheduler->deferred; scheduler->deferred_with_delay.next = &scheduler->deferred_with_delay;