From 9b3081f9b096a20fbb90b31b4ae05e5f6a14ee97 Mon Sep 17 00:00:00 2001
From: Anders Blomdell <anders.blomdell@control.lth.se>
Date: Thu, 13 Jun 2013 19:14:48 +0200
Subject: [PATCH] Added locking to labcomm_internal_encoder_register and
 labcomm_internal_encode, this triggered lots of changes to handling of
 stacked writer allocation. Twoway example is not finished yet.

---
 compiler/C_CodeGen.jrag                      |  12 +-
 examples/twoway/Makefile                     |  12 +-
 examples/twoway/client.c                     |  35 +-
 examples/twoway/decimating.c                 |  62 ++-
 examples/twoway/introspection_messages.lc    |  14 -
 examples/twoway/server.c                     |  34 +-
 examples/twoway/types.lc                     |   1 +
 lib/c/Makefile                               |   2 +-
 lib/c/labcomm.c                              | 515 +++++++++++--------
 lib/c/labcomm.h                              |   6 +
 lib/c/labcomm_dynamic_buffer_writer.c        |   4 +-
 lib/c/labcomm_fd_reader.c                    |  12 +-
 lib/c/labcomm_fd_writer.c                    |   4 +-
 lib/c/labcomm_ioctl.h                        |  11 +-
 lib/c/labcomm_private.h                      |  81 ++-
 lib/c/labcomm_pthread_mutex_lock.c           | 101 +++-
 lib/c/test/test_labcomm_generated_encoding.c |   4 +-
 17 files changed, 587 insertions(+), 323 deletions(-)
 delete mode 100644 examples/twoway/introspection_messages.lc

diff --git a/compiler/C_CodeGen.jrag b/compiler/C_CodeGen.jrag
index 428657f..786f5cc 100644
--- a/compiler/C_CodeGen.jrag
+++ b/compiler/C_CodeGen.jrag
@@ -339,7 +339,7 @@ aspect C_Declarations {
   }
 
   public void SampleDecl.C_emitDecoderDeclaration(C_env env) {
-    env.println("void labcomm_decoder_register_" + 
+    env.println("int labcomm_decoder_register_" + 
 		env.prefix + getName() + "(");
     env.indent();
     env.println("struct labcomm_decoder *d,");
@@ -366,7 +366,7 @@ aspect C_Declarations {
   }
 
   public void SampleDecl.C_emitEncoderDeclaration(C_env env) {
-    env.println("void labcomm_encoder_register_" + 
+    env.println("int labcomm_encoder_register_" + 
 		env.prefix + getName() + "(");
     env.indent();
     env.println("struct labcomm_encoder *e);");
@@ -617,7 +617,7 @@ aspect C_Decoder {
   }
 
   public void SampleDecl.C_emitDecoderRegisterHandler(C_env env) {
-    env.println("void labcomm_decoder_register_" + 
+    env.println("int labcomm_decoder_register_" + 
 		env.prefix + getName() + "(");
     env.indent();
     env.println("struct labcomm_decoder *d,");
@@ -632,7 +632,7 @@ aspect C_Decoder {
     env.println(")");
     env.println("{");
     env.indent();
-    env.println("labcomm_internal_decoder_register(");
+    env.println("return labcomm_internal_decoder_register(");
     env.indent();
     env.println("d,");
     env.println("&labcomm_signature_" + env.prefix + getName() + ",");
@@ -804,7 +804,7 @@ aspect C_Encoder {
   }
 
   public void SampleDecl.C_emitEncoderRegisterHandler(C_env env) {
-    env.println("void labcomm_encoder_register_" + 
+    env.println("int labcomm_encoder_register_" + 
 		env.prefix + getName() + "(");
     env.indent();
     env.println("struct labcomm_encoder *e");
@@ -812,7 +812,7 @@ aspect C_Encoder {
     env.println(")");
     env.println("{");
     env.indent();
-    env.println("labcomm_internal_encoder_register(");
+    env.println("return labcomm_internal_encoder_register(");
     env.indent();
     env.println("e,");
     env.println("&labcomm_signature_" + env.prefix + getName() + ",");
diff --git a/examples/twoway/Makefile b/examples/twoway/Makefile
index 646d31c..5689df1 100644
--- a/examples/twoway/Makefile
+++ b/examples/twoway/Makefile
@@ -28,7 +28,7 @@ gen/client: client.c
 	$(CC) -o $@ $(CFLAGS) $^ -lpthread \
 		-L../../lib/c -llabcomm -Tlabcomm.linkscript
 
-gen/server: server.c gen/types.o gen/decimating.o
+gen/server: server.c 
 	$(CC) -o $@ $(CFLAGS) $^ -lpthread \
 		-L../../lib/c -llabcomm -Tlabcomm.linkscript
 
@@ -38,9 +38,17 @@ clean:
 
 gen/decimating.o: decimating.h
 gen/decimating.o: gen/decimating_messages.h
+gen/introspecting.o: introspecting.h
+gen/introspecting.o: gen/introspecting_messages.h
 gen/client.o: decimating.h
 gen/client.o: gen/types.h
+gen/client: gen/types.o
 gen/client: gen/decimating.o 
 gen/client: gen/decimating_messages.o
-gen/client: gen/types.o
+gen/client: gen/introspecting.o
+gen/client: gen/introspecting_messages.o
+gen/server: gen/types.o
+gen/server: gen/decimating.o
 gen/server: gen/decimating_messages.o
+gen/server: gen/introspecting.o
+gen/server: gen/introspecting_messages.o
diff --git a/examples/twoway/client.c b/examples/twoway/client.c
index 27b1acf..ec373b1 100644
--- a/examples/twoway/client.c
+++ b/examples/twoway/client.c
@@ -1,3 +1,24 @@
+/*
+  client.c -- LabComm example of using stacked readers/writers.
+
+  Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
+
+  This file is part of LabComm.
+
+  LabComm is free software: you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation, either version 3 of the License, or
+  (at your option) any later version.
+
+  LabComm is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
 #include <arpa/inet.h>
 #include <linux/tcp.h>
 #include <netdb.h>
@@ -14,6 +35,7 @@
 #include <labcomm_fd_writer.h>
 #include <labcomm_pthread_mutex_lock.h>
 #include "decimating.h"
+#include "introspecting.h"
 #include "gen/types.h"
 
 static void handle_Sum(int32_t *value, void *context)
@@ -47,6 +69,7 @@ int main(int argc, char *argv[])
   struct sockaddr_in to;
   int nodelay;
   struct decimating *decimating;
+  struct introspecting *introspecting;
   char *hostname;
   int port;
   struct labcomm_lock *lock;
@@ -94,10 +117,18 @@ int main(int argc, char *argv[])
 			      labcomm_fd_writer_new(fd, 0),
 			      lock);
   if (decimating == NULL) {
+    /* Warning: might leak reader and writer at this point */
+    goto out;
+  }
+  introspecting = introspecting_new(decimating->reader,
+				    decimating->writer,
+				    lock);
+  if (introspecting == NULL) {
+    /* Warning: might leak reader and writer at this point */
     goto out;
   }
-  decoder = labcomm_decoder_new(decimating->reader, lock);
-  encoder = labcomm_encoder_new(decimating->writer, lock);
+  decoder = labcomm_decoder_new(introspecting->reader, lock);
+  encoder = labcomm_encoder_new(introspecting->writer, lock);
   pthread_t rdt;
   
   labcomm_decoder_register_types_Sum(decoder, handle_Sum, NULL);
diff --git a/examples/twoway/decimating.c b/examples/twoway/decimating.c
index e80e7ad..9ce2b34 100644
--- a/examples/twoway/decimating.c
+++ b/examples/twoway/decimating.c
@@ -29,10 +29,9 @@
 
 struct decimating_private {
   struct decimating decimating;
+  struct labcomm_lock *lock;
   struct labcomm_encoder *encoder;
   int encoder_initialized;
-  struct labcomm_decoder *decoder;
-  int decoder_initialized;
   struct labcomm_reader_action_context reader_action_context;
   struct labcomm_writer_action_context writer_action_context;
   LABCOMM_SIGNATURE_ARRAY_DEF(decimation, 
@@ -62,26 +61,17 @@ static int wrap_reader_alloc(
   struct labcomm_decoder *decoder,
   char *labcomm_version)
 {
-  struct decimating_private *decimating = action_context->context;
-
-  /* Stash away decoder for later use */
-  decimating->decoder = decoder;
-  return labcomm_reader_alloc(r, action_context->next, 
-			      decoder, labcomm_version);
-}
+  int result;
 
-static int wrap_reader_start(
-  struct labcomm_reader *r, 
-  struct labcomm_reader_action_context *action_context)
-{
   struct decimating_private *decimating = action_context->context;
-
-  if (! decimating->decoder_initialized) {
-    decimating->decoder_initialized = 1;
-    labcomm_decoder_register_decimating_messages_set_decimation(
-      decimating->decoder, set_decimation, decimating);
-  }
-  return labcomm_reader_start(r, action_context->next);
+  
+  fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
+  /* Stash away decoder for later use */
+  result = labcomm_reader_alloc(r, action_context->next, 
+				decoder, labcomm_version);
+  labcomm_decoder_register_decimating_messages_set_decimation(
+    decoder, set_decimation, decimating);
+  return result;
 }
 
 static int wrap_reader_ioctl(
@@ -113,40 +103,47 @@ static int wrap_reader_ioctl(
 struct labcomm_reader_action decimating_reader_action = {
   .alloc = wrap_reader_alloc,
   .free = NULL,
-  .start = wrap_reader_start,
+  .start = NULL,
   .end = NULL,
   .fill = NULL,
   .ioctl = wrap_reader_ioctl
 };
 
+static void register_signatures(struct labcomm_encoder *encoder,
+				void *context)
+{
+  labcomm_encoder_register_decimating_messages_set_decimation(
+    encoder);
+}
+
 static int wrap_writer_alloc(
   struct labcomm_writer *w, 
   struct labcomm_writer_action_context *action_context, 
-  struct labcomm_encoder *encoder, char *labcomm_version)
+  struct labcomm_encoder *encoder, char *labcomm_version,
+  labcomm_encoder_enqueue enqueue)
 {
+  int result;
   struct decimating_private *decimating = action_context->context;
 
+  fprintf(stderr, "%s %s\n", __FILE__, __FUNCTION__);
   /* Stash away encoder for later use */
   decimating->encoder = encoder;
-  return labcomm_writer_alloc(w, action_context->next,
-			      encoder, labcomm_version);
+  result = labcomm_writer_alloc(w, action_context->next,
+				encoder, labcomm_version, enqueue);
+  enqueue(encoder, register_signatures, NULL);
+
+  return result;
 }
 
 static int wrap_writer_start(
   struct labcomm_writer *w, 
   struct labcomm_writer_action_context *action_context, 
-  struct labcomm_encoder *encoder,
   int index, struct labcomm_signature *signature,
   void *value)
 {
   struct decimating_private *decimating = action_context->context;
   struct decimation *decimation;
 
-  if (! decimating->encoder_initialized) {
-    decimating->encoder_initialized = 1;
-    labcomm_encoder_register_decimating_messages_set_decimation(
-      decimating->encoder);
-  }
   decimation = LABCOMM_SIGNATURE_ARRAY_REF(decimating->decimation, 
 					   struct decimation, index);
   decimation->current++;
@@ -155,7 +152,7 @@ static int wrap_writer_start(
   } else {
     decimation->current = 0;
     return labcomm_writer_start(w, action_context->next,
-				encoder, index, signature, value);
+				index, signature, value);
   }
 }
 
@@ -196,10 +193,9 @@ extern struct decimating *decimating_new(
   result->decimating.writer = writer;
 
   /* Init other fields */
+  result->lock = lock;
   result->encoder = NULL;
   result->encoder_initialized = 0;
-  result->decoder = NULL;
-  result->decoder_initialized = 0;
   LABCOMM_SIGNATURE_ARRAY_INIT(result->decimation, struct decimation);
 
   goto out_ok;
diff --git a/examples/twoway/introspection_messages.lc b/examples/twoway/introspection_messages.lc
deleted file mode 100644
index 8f14fb3..0000000
--- a/examples/twoway/introspection_messages.lc
+++ /dev/null
@@ -1,14 +0,0 @@
-typedef struct {
-  int index;
-  byte signature[_];
-} has_signature_request;
-
-typedef struct {
-  int index;
-  boolean result;
-} has_signature_response;
-
-sample has_signature_request encoder_has_signature_request;
-sample has_signature_response encoder_has_signature_response;
-sample has_signature_request decoder_has_signature_request;
-sample has_signature_response decoder_has_signature_response;
\ No newline at end of file
diff --git a/examples/twoway/server.c b/examples/twoway/server.c
index 2d22e7b..9a980cb 100644
--- a/examples/twoway/server.c
+++ b/examples/twoway/server.c
@@ -1,3 +1,24 @@
+/*
+  server.c -- LabComm example of using stacked readers/writers.
+
+  Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
+
+  This file is part of LabComm.
+
+  LabComm is free software: you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation, either version 3 of the License, or
+  (at your option) any later version.
+
+  LabComm is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
 #include <arpa/inet.h>
 #include <errno.h>
 #include <pthread.h>
@@ -10,6 +31,7 @@
 #include <labcomm_fd_reader.h>
 #include <labcomm_fd_writer.h>
 #include "decimating.h"
+#include "introspecting.h"
 #include "gen/types.h"
 
 struct client {
@@ -61,6 +83,7 @@ static void *run_client(void *arg)
 {
   struct client *client = arg;
   struct decimating *decimating;
+  struct introspecting *introspecting;
   struct labcomm_lock *lock;
 
   printf("Client start\n");
@@ -74,8 +97,15 @@ static void *run_client(void *arg)
     /* Warning: might leak reader and writer at this point */
     goto out;
   }
-  client->decoder = labcomm_decoder_new(decimating->reader, lock);
-  client->encoder = labcomm_encoder_new(decimating->writer, lock);
+  introspecting = introspecting_new(decimating->reader,
+				    decimating->writer,
+				    lock);
+  if (introspecting == NULL) {
+    /* Warning: might leak reader and writer at this point */
+    goto out;
+  }
+  client->decoder = labcomm_decoder_new(introspecting->reader, lock);
+  client->encoder = labcomm_encoder_new(introspecting->writer, lock);
   pthread_t rdt;
   
   labcomm_decoder_register_types_A(client->decoder, handle_A, client);
diff --git a/examples/twoway/types.lc b/examples/twoway/types.lc
index 58313c3..54c03ec 100644
--- a/examples/twoway/types.lc
+++ b/examples/twoway/types.lc
@@ -2,4 +2,5 @@ sample int A;
 sample int B;
 sample int Sum;
 sample int Diff;
+sample int Product;
 sample void Terminate;
\ No newline at end of file
diff --git a/lib/c/Makefile b/lib/c/Makefile
index 50325b6..bb68b64 100644
--- a/lib/c/Makefile
+++ b/lib/c/Makefile
@@ -45,7 +45,7 @@ liblabcomm.a: $(OBJS)
 	ar -r liblabcomm.a $^
 
 liblabcomm.so.1: $(OBJS:%.o=%.pic.o)
-	gcc -shared -Wl,-soname,$@ -o $@ $^ -lc
+	gcc -shared -Wl,-soname,$@ -o $@ $^ -lc -lrt
 
 labcomm.o : labcomm.c labcomm.h  labcomm_private.h
 
diff --git a/lib/c/labcomm.c b/lib/c/labcomm.c
index 7aec84c..c5e7bfb 100644
--- a/lib/c/labcomm.c
+++ b/lib/c/labcomm.c
@@ -53,6 +53,14 @@ struct labcomm_encoder {
   void *context;
   struct labcomm_writer *writer;
   struct labcomm_lock *lock;
+  struct labcomm_encoder *is_deferred;
+  int busy;
+  int waiting;
+  struct encoder_alloc_action {
+    struct encoder_alloc_action *next;
+    void (*action)(struct labcomm_encoder *encoder, void *context);
+    void *context;
+  } *alloc_action;
   labcomm_error_handler_callback on_error;
   LABCOMM_SIGNATURE_ARRAY_DEF(registered, int);
 };
@@ -67,32 +75,60 @@ struct labcomm_sample_entry {
   void *context;
 };
 
-#ifndef LABCOMM_ENCODER_LINEAR_SEARCH
 extern  struct labcomm_signature labcomm_first_signature;
 extern  struct labcomm_signature labcomm_last_signature;
-#endif
 
 struct labcomm_encoder_context {
-#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
-  struct labcomm_sample_entry *sample;
-  int index;
-#else
   struct labcomm_sample_entry *by_section;
-#endif
 };
 
 struct labcomm_decoder_context {
   struct labcomm_sample_entry *sample;
 };
 
+/* Lock wrappers */
+#define CONDCALL_lock(lock, ...) lock
+#define CONDCALL(func, ...)					\
+  if (CONDCALL_lock(__VA_ARGS__) &&				\
+      CONDCALL_lock(__VA_ARGS__)->action->func) {		\
+  return CONDCALL_lock(__VA_ARGS__)->action->func(__VA_ARGS__);	\
+  }								\
+  return -ENOSYS;
+
+int labcomm_lock_free(struct labcomm_lock *lock) {
+  CONDCALL(free, lock);
+}
+
+int labcomm_lock_acquire(struct labcomm_lock *lock)
+{
+  CONDCALL(acquire, lock);
+}
+
+int labcomm_lock_release(struct labcomm_lock *lock)
+{
+  CONDCALL(release, lock);
+}
+
+int labcomm_lock_wait(struct labcomm_lock *lock, useconds_t usec){
+  CONDCALL(wait, lock, usec);
+}
+
+int labcomm_lock_notify(struct labcomm_lock *lock)
+{
+  CONDCALL(notify, lock);
+}
+
+#undef CONDCALL
+#undef CONDCALL_lock
+
 /* Unwrapping reader/writer functions */
-#define UNWRAP_ac(func, rw, ac, ...) ac
+#define UNWRAP_ac(rw, ac, ...) ac
 #define UNWRAP(func, ...)	     \
   while (1) {								\
-    if (UNWRAP_ac(func, __VA_ARGS__)->action->func) {			\
-      return UNWRAP_ac(func, __VA_ARGS__)->action->func(__VA_ARGS__); }	\
-    if (UNWRAP_ac(func, __VA_ARGS__)->next == NULL) { return -ENOSYS; }	\
-    UNWRAP_ac(func, __VA_ARGS__) = UNWRAP_ac(func, __VA_ARGS__)->next;	\
+    if (UNWRAP_ac(__VA_ARGS__)->action->func) {				\
+      return UNWRAP_ac(__VA_ARGS__)->action->func(__VA_ARGS__); }	\
+    if (UNWRAP_ac(__VA_ARGS__)->next == NULL) { return -ENOSYS; }	\
+    UNWRAP_ac( __VA_ARGS__) = UNWRAP_ac(__VA_ARGS__)->next;		\
   }
 
 int labcomm_reader_alloc(struct labcomm_reader *r, 
@@ -110,9 +146,11 @@ int labcomm_reader_free(struct labcomm_reader *r,
 }
 
 int labcomm_reader_start(struct labcomm_reader *r, 
-                         struct labcomm_reader_action_context *action_context)
+                         struct labcomm_reader_action_context *action_context,
+			 int index, struct labcomm_signature *signature,
+			 void *value)
 {
-  UNWRAP(start, r, action_context);
+  UNWRAP(start, r, action_context, index, signature, value);
 }
 
 int labcomm_reader_end(struct labcomm_reader *r, 
@@ -139,9 +177,10 @@ int labcomm_reader_ioctl(struct labcomm_reader *r,
 int labcomm_writer_alloc(struct labcomm_writer *w, 
                          struct labcomm_writer_action_context *action_context, 
                          struct labcomm_encoder *encoder, 
-                         char *labcomm_version)
+                         char *labcomm_version,
+			 labcomm_encoder_enqueue enqueue)
 {
-  UNWRAP(alloc, w, action_context, encoder, labcomm_version);
+  UNWRAP(alloc, w, action_context, encoder, labcomm_version, enqueue);
 }
 
 int labcomm_writer_free(struct labcomm_writer *w, 
@@ -152,11 +191,10 @@ int labcomm_writer_free(struct labcomm_writer *w,
 
 int labcomm_writer_start(struct labcomm_writer *w, 
                          struct labcomm_writer_action_context *action_context,
-                         struct labcomm_encoder *encoder,
                          int index, struct labcomm_signature *signature,
                          void *value)
 {
-  UNWRAP(start, w, action_context, encoder, index, signature, value);
+  UNWRAP(start, w, action_context, index, signature, value);
 }
 
 int labcomm_writer_end(struct labcomm_writer *w, 
@@ -180,6 +218,8 @@ int labcomm_writer_ioctl(struct labcomm_writer *w,
   UNWRAP(ioctl, w, action_context, index, signature, ioctl_action, args);
 } 
 
+#undef UNWRAP
+#undef UNWRAP_ac
 
 
 
@@ -287,27 +327,6 @@ static struct labcomm_sample_entry *get_sample_by_index(
   return p;
 }
 
-#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
-
-static int get_encoder_index_by_search(
-  struct labcomm_encoder *e,
-  struct labcomm_signature *s)
-{
-  int result = 0;
-  struct labcomm_encoder_context *context = e->context;
-  struct labcomm_sample_entry *sample = context->sample;
-  while (sample) {
-    if (sample->signature == s) { break; }
-    sample = sample->next;
-  }
-  if (sample) {
-    result = sample->index;
-  }
-  return result;
-}
-
-#else
-
 static int get_local_index(
   struct labcomm_signature *s)
 {
@@ -319,84 +338,13 @@ static int get_local_index(
   return result;
 }
 
-#endif
 static int get_encoder_index(
   struct labcomm_encoder *e,
   struct labcomm_signature *s)
 {
-#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
-  return get_encoder_index_by_search(e, s);
-#else
   return get_local_index(s);
-#endif
-}
-
-static void labcomm_encode_signature(struct labcomm_encoder *e,
-				     struct labcomm_signature *signature) 
-{
-  int i, index;
-
-  index = get_encoder_index(e, signature);
-  labcomm_writer_start(e->writer, e->writer->action_context, 
-		       e, index, signature, NULL);
-  labcomm_write_packed32(e->writer, signature->type);
-  labcomm_write_packed32(e->writer, index);
-
-  labcomm_write_string(e->writer, signature->name);
-  for (i = 0 ; i < signature->size ; i++) {
-    if (e->writer->pos >= e->writer->count) {
-      labcomm_writer_flush(e->writer, e->writer->action_context);
-    }
-    e->writer->data[e->writer->pos] = signature->signature[i];
-    e->writer->pos++;
-  }
-  labcomm_writer_end(e->writer, e->writer->action_context);
 }
 
-#ifdef LABCOMM_ENCODER_LINEAR_SEARCH
-static int encoder_add_signature_by_search(struct labcomm_encoder *e,
-					   struct labcomm_signature *signature,
-					   labcomm_encoder_function encode)
-{
-  int result;
-  struct labcomm_encoder_context *context = e->context;
-  struct labcomm_sample_entry *sample;
-
-  sample = (struct labcomm_sample_entry *)malloc(sizeof(*sample));
-  if (sample == NULL) {
-    result = -ENOMEM;
-  } else {
-    sample->next = context->sample;
-    sample->index = context->index;
-    sample->signature = signature;
-    sample->encode = encode;
-    context->index++;
-    context->sample = sample;
-    result = sample->index;
-  }
-  return result;
-}
-#endif
-
-/*
-static struct labcomm_sample_entry *encoder_get_sample_by_signature_address(
-  struct labcomm_encoder *encoder,
-  struct labcomm_signature *s)
-{
-  struct labcomm_sample_entry *result = NULL;
-  struct labcomm_encoder_context *context = encoder->context;
-  
-#ifndef LABCOMM_ENCODER_LINEAR_SEARCH
-  if (&labcomm_first_signature <= s && s <= &labcomm_last_signature) {
-    result = &context->by_section[s - &labcomm_first_signature];
-  }
-#else
-  result = get_sample_by_signature_address(context->sample, s);
-#endif
-  return result;
-}
-*/
-						    
 struct labcomm_encoder *labcomm_encoder_new(
   struct labcomm_writer *writer,
   struct labcomm_lock *lock)
@@ -414,58 +362,154 @@ struct labcomm_encoder *labcomm_encoder_new(
 #endif
     result->context = context;
     result->writer = writer;
-    result->writer->data = 0;
+    result->writer->data = NULL;
     result->writer->data_size = 0;
     result->writer->count = 0;
     result->writer->pos = 0;
     result->writer->error = 0;
     result->lock = lock;
+    result->is_deferred = NULL;
+    result->busy = 0;
+    result->waiting = 0;
+    result->alloc_action = NULL;
     result->on_error = on_error_fprintf;
     LABCOMM_SIGNATURE_ARRAY_INIT(result->registered, int);
-    labcomm_writer_alloc(result->writer,result->writer->action_context,
-			 result, LABCOMM_VERSION);
   }
   return result;
 }
 
-void labcomm_internal_encoder_register(
-  struct labcomm_encoder *e,
+static int encoder_enqueue_action(
+  struct labcomm_encoder *encoder, 
+  void (*action)(struct labcomm_encoder *encoder, void *context),
+  void *context)
+{
+  int result;
+  struct encoder_alloc_action *element, **next;
+
+  fprintf(stderr, "%s %p\n", __FUNCTION__, action);
+  element = malloc(sizeof(*action));
+  if (element == NULL) {
+    result = -ENOMEM;
+    goto out;
+  }
+  element->next = NULL;
+  element->action = action;
+  element->context = context;
+  for (next = &encoder->alloc_action ; *next ; next = &(*next)->next) {
+  }
+  *next = element;
+  result = 0;
+out:
+  return result;
+}
+
+static struct labcomm_encoder *enter_encoder(struct labcomm_encoder *e)
+{
+  if (e->is_deferred) {
+    return e->is_deferred;
+  } else {
+    labcomm_lock_acquire(e->lock); 
+    e->waiting++;
+    while (e->busy) { labcomm_lock_wait(e->lock, 10000000); }
+    e->busy = 1;
+    labcomm_lock_release(e->lock);
+    
+    if (e->writer->data == NULL) {
+      labcomm_writer_alloc(e->writer,e->writer->action_context,
+			   e, LABCOMM_VERSION, encoder_enqueue_action);
+      if (e->alloc_action) {
+	struct labcomm_encoder deferred;
+	struct encoder_alloc_action *p;
+
+	deferred.is_deferred = e;
+	p = e->alloc_action;
+	e->alloc_action = NULL;
+	while (p) {
+	  struct encoder_alloc_action *tmp;
+
+	  fprintf(stderr, "RUN %p", p->action);
+	  p->action(&deferred, p->context);
+	  tmp = p;
+	  p = p->next;
+	  free(tmp);
+	}
+      }
+    }
+  }
+  return e;
+}
+static void leave_encoder(struct labcomm_encoder *e)
+{
+  if (!e->is_deferred) {
+    labcomm_lock_acquire(e->lock); {
+      e->busy = 0;
+      e->waiting--;
+      if (e->waiting) {
+	labcomm_lock_notify(e->lock);
+      }
+    } labcomm_lock_release(e->lock);
+  }
+}
+
+int labcomm_internal_encoder_register(
+  struct labcomm_encoder *encoder,
   struct labcomm_signature *signature,
   labcomm_encoder_function encode)
 {
+  int result = -EINVAL;
+  struct labcomm_encoder *e;
+  int index;
+
+  e = enter_encoder(encoder);
+  index = get_encoder_index(e, signature);
   if (signature->type == LABCOMM_SAMPLE) {
-    int index = get_local_index(signature);
     if (index > 0) {
       int *registered = LABCOMM_SIGNATURE_ARRAY_REF(e->registered, int, index);
       if (! *registered) {
-	struct labcomm_ioctl_register_signature ioctl_data;
 	int err;
-
+	
 	*registered = 1;	
-	ioctl_data.index = index;
-	ioctl_data.signature = signature;	
-	err = labcomm_encoder_ioctl(e, LABCOMM_IOCTL_REGISTER_SIGNATURE,
-				    &ioctl_data);
-	if (err != 0) {
-	  labcomm_encode_signature(e, signature);
+	err = labcomm_writer_start(e->writer, e->writer->action_context, 
+				   index, signature, NULL);
+	if (err == -EALREADY) {
+	  result = 0;
+	} else if (err == 0) {
+	  int i;
+
+	  labcomm_write_packed32(e->writer, signature->type);
+	  labcomm_write_packed32(e->writer, index);
+	  labcomm_write_string(e->writer, signature->name);
+	  for (i = 0 ; i < signature->size ; i++) {
+	    if (e->writer->pos >= e->writer->count) {
+	      labcomm_writer_flush(e->writer, e->writer->action_context);
+	    }
+	    e->writer->data[e->writer->pos] = signature->signature[i];
+	    e->writer->pos++;
+	  }
+	  labcomm_writer_end(e->writer, e->writer->action_context);
+	  result = e->writer->error;
 	}
       }
     }
   }
+  leave_encoder(encoder);
+  return result;
 }
 
 int labcomm_internal_encode(
-  struct labcomm_encoder *e,
+  struct labcomm_encoder *encoder,
   struct labcomm_signature *signature,
   labcomm_encoder_function encode,
   void *value)
 {
   int result;
+  struct labcomm_encoder *e;
   int index;
 
+  e = enter_encoder(encoder);
   index = get_encoder_index(e, signature);
   result = labcomm_writer_start(e->writer, e->writer->action_context, 
-				    e, index, signature, value);
+				index, signature, value);
   if (result == -EALREADY) { result = 0; goto no_end; }
   if (result != 0) { goto out; }
   result = labcomm_write_packed32(e->writer, index);
@@ -474,6 +518,7 @@ int labcomm_internal_encode(
 out:
   labcomm_writer_end(e->writer, e->writer->action_context);
 no_end:
+  leave_encoder(encoder);
   return result;
 }
 
@@ -630,7 +675,7 @@ struct labcomm_decoder *labcomm_decoder_new(
   return result;
 }
 
-void labcomm_internal_decoder_register(
+int labcomm_internal_decoder_register(
   struct labcomm_decoder *d,
   struct labcomm_signature *signature,
   labcomm_decoder_function type_decoder,
@@ -651,11 +696,116 @@ void labcomm_internal_decoder_register(
   sample->decoder = type_decoder;
   sample->handler = handler;
   sample->context = handler_context;
+
+  return 0;
 }
 
-int labcomm_decoder_decode_one(struct labcomm_decoder *d)
+static int decode_typedef_or_sample(struct labcomm_decoder *d, int kind)
 {
   int result;
+  struct labcomm_decoder_context *context = d->context;
+
+  /* TODO: should the labcomm_dynamic_buffer_writer be 
+     a permanent part of labcomm_decoder? */
+  struct labcomm_writer_action_context action_context = {
+    .next = NULL,
+    .action = labcomm_dynamic_buffer_writer_action,
+    .context = NULL
+  };
+  struct labcomm_writer writer = {
+    .action_context = &action_context,
+    .data = NULL,
+    .data_size = 0,
+    .count = 0,
+    .pos = 0,
+    .error = 0,
+  };
+  struct labcomm_signature signature;
+  struct labcomm_sample_entry *entry = NULL;
+  int remote_index, err;
+      
+  labcomm_writer_alloc(&writer, writer.action_context, NULL, "", NULL);
+  labcomm_writer_start(&writer, writer.action_context, 0, NULL, NULL);
+  remote_index = labcomm_read_packed32(d->reader); //int
+  signature.name = labcomm_read_string(d->reader);
+  signature.type = kind;
+  collect_flat_signature(d, &writer);
+  labcomm_writer_end(&writer, writer.action_context);
+  err = writer_ioctl(&writer, 
+		     LABCOMM_IOCTL_WRITER_GET_BYTES_WRITTEN,
+		     &signature.size);
+  if (err < 0) {
+    printf("Failed to get size: %s\n", strerror(-err));
+    result = -ENOENT;
+    goto free_signature_name;
+  }
+  err = writer_ioctl(&writer, 
+		     LABCOMM_IOCTL_WRITER_GET_BYTE_POINTER,
+		     &signature.signature);
+  if (err < 0) {
+    printf("Failed to get pointer: %s\n", strerror(-err));
+    result = -ENOENT;
+    goto free_signature_name;
+  }
+  entry = get_sample_by_signature_value(context->sample, &signature);
+  if (! entry) {
+    fprintf(stderr, "%d %s\n", remote_index, signature.name);
+    /* Unknown datatype, bail out */
+    d->on_new_datatype(d, &signature);
+    result = -ENOENT;
+  } else if (entry->index && entry->index != remote_index) {
+    d->on_error(LABCOMM_ERROR_DEC_INDEX_MISMATCH, 5, 
+		"%s(): index mismatch '%s' (id=0x%x != 0x%x)\n", 
+		__FUNCTION__, signature.name, entry->index, remote_index);
+    result = -ENOENT;
+  } else {
+    int local_index;
+    int *local_to_remote, *remote_to_local;
+    // TODO unnessesary, since entry->index == index in above if statement
+    entry->index = remote_index;
+    local_index = get_local_index(entry->signature);
+    local_to_remote = LABCOMM_SIGNATURE_ARRAY_REF(d->local_to_remote, int,
+						  local_index);
+    *local_to_remote = remote_index;
+    remote_to_local = LABCOMM_SIGNATURE_ARRAY_REF(d->remote_to_local, int,
+						  remote_index);
+    *remote_to_local = local_index;
+    result = remote_index;
+    labcomm_reader_start(d->reader, d->reader->action_context,
+			 entry->index, entry->signature,
+			 NULL);
+    labcomm_reader_end(d->reader, d->reader->action_context);
+  }
+free_signature_name:
+  free(signature.name);
+  labcomm_writer_free(&writer, writer.action_context);
+  if (!entry) {
+    // No handler for found type, bail out (after cleanup)
+    result = -ENOENT;
+  }
+  return result;
+}
+
+struct call_handler_context {
+  struct labcomm_sample_entry *entry;
+  struct labcomm_reader *reader;
+};
+
+static void call_handler(void *value, void *context)
+{
+  struct call_handler_context *wrap = context;
+
+  labcomm_reader_start(wrap->reader, wrap->reader->action_context,
+		       wrap->entry->index, wrap->entry->signature,
+		       value);
+  wrap->entry->handler(value, wrap->entry->context);
+  labcomm_reader_end(wrap->reader, wrap->reader->action_context);
+}
+
+int labcomm_decoder_decode_one(struct labcomm_decoder *d)
+{
+  int result, index;
+  struct labcomm_decoder_context *context = d->context;
   
   if (d->reader->data == NULL) {
     result = labcomm_reader_alloc(d->reader, d->reader->action_context,
@@ -664,96 +814,33 @@ int labcomm_decoder_decode_one(struct labcomm_decoder *d)
       goto out;
     }
   }
-  result = labcomm_reader_start(d->reader, d->reader->action_context);
-  if (result > 0) {
-    struct labcomm_decoder_context *context = d->context;
+  index = labcomm_read_packed32(d->reader);
+  if (d->reader->error < 0) {
+    result = d->reader->error;
+    goto out;
+  }
+  if (index == LABCOMM_TYPEDEF || index == LABCOMM_SAMPLE) {
+    result = decode_typedef_or_sample(d, index); 
+  } else {
+    struct labcomm_sample_entry *entry;
     
-    result = labcomm_read_packed32(d->reader);
-    if (result == LABCOMM_TYPEDEF || result == LABCOMM_SAMPLE) {
-      /* TODO: should the labcomm_dynamic_buffer_writer be 
-	 a permanent part of labcomm_decoder? */
-      struct labcomm_writer_action_context action_context = {
-	.next = NULL,
-	.action = labcomm_dynamic_buffer_writer_action,
-	.context = NULL
-      };
-      struct labcomm_writer writer = {
-	.action_context = &action_context,
-	.data = NULL,
-	.data_size = 0,
-	.count = 0,
-	.pos = 0,
-	.error = 0,
-      };
-      struct labcomm_signature signature;
-      struct labcomm_sample_entry *entry = NULL;
-      int index, err;
-      
-      labcomm_writer_alloc(&writer, writer.action_context, NULL, "");
-      labcomm_writer_start(&writer, writer.action_context, NULL, 0, NULL, NULL);
-      index = labcomm_read_packed32(d->reader); //int
-      signature.name = labcomm_read_string(d->reader);
-      signature.type = result;
-      collect_flat_signature(d, &writer);
-      labcomm_writer_end(&writer, writer.action_context);
-      err = writer_ioctl(&writer, 
-			 LABCOMM_IOCTL_WRITER_GET_BYTES_WRITTEN,
-			 &signature.size);
-      if (err < 0) {
-	printf("Failed to get size: %s\n", strerror(-err));
-	goto free_signature_name;
-      }
-      err = writer_ioctl(&writer, 
-			 LABCOMM_IOCTL_WRITER_GET_BYTE_POINTER,
-			 &signature.signature);
-      if (err < 0) {
-	printf("Failed to get pointer: %s\n", strerror(-err));
-	goto free_signature_name;
-      }
-      entry = get_sample_by_signature_value(context->sample, &signature);
-      if (! entry) {
-	/* Unknown datatype, bail out */
-	d->on_new_datatype(d, &signature);
-      } else if (entry->index && entry->index != index) {
-	d->on_error(LABCOMM_ERROR_DEC_INDEX_MISMATCH, 5, 
-		    "%s(): index mismatch '%s' (id=0x%x != 0x%x)\n", 
-		    __FUNCTION__, signature.name, entry->index, index);
-      } else {
-	int local_index;
-	int *local_to_remote, *remote_to_local;
-	// TODO unnessesary, since entry->index == index in above if statement
-	entry->index = index;
-	local_index = get_local_index(entry->signature);
-	local_to_remote = LABCOMM_SIGNATURE_ARRAY_REF(d->local_to_remote, int,
-						      local_index);
-	remote_to_local = LABCOMM_SIGNATURE_ARRAY_REF(d->remote_to_local, int,
-						      index);
-	*local_to_remote = index;
-	*remote_to_local = local_index;
-      }
-    free_signature_name:
-      free(signature.name);
-      labcomm_writer_free(&writer, writer.action_context);
-      if (!entry) {
-	// No handler for found type, bail out (after cleanup)
-	result = -ENOENT;
-      }
+    entry = get_sample_by_index(context->sample, index);
+    if (!entry) {
+      // printf("Error: %s: type not found (id=0x%x)\n",
+      //__FUNCTION__, result);
+      d->on_error(LABCOMM_ERROR_DEC_TYPE_NOT_FOUND, 3, 
+		  "%s(): type not found (id=0x%x)\n", 
+		  __FUNCTION__, index);
+      result = -ENOENT;
     } else {
-      struct labcomm_sample_entry *entry;
-      
-      entry = get_sample_by_index(context->sample, result);
-      if (!entry) {
-	// printf("Error: %s: type not found (id=0x%x)\n",
-	//__FUNCTION__, result);
-	d->on_error(LABCOMM_ERROR_DEC_TYPE_NOT_FOUND, 3, 
-		    "%s(): type not found (id=0x%x)\n", __FUNCTION__, result);
-	result = -ENOENT;
-      } else {
-	entry->decoder(d->reader, entry->handler, entry->context);
-      }
+      struct call_handler_context wrap = {
+	.entry = entry,
+	.reader = d->reader
+      };
+      entry->decoder(d->reader, call_handler, &wrap);
+      result = index;
     }
   }
-  labcomm_reader_end(d->reader, d->reader->action_context);
 out:
   return result;
 }
diff --git a/lib/c/labcomm.h b/lib/c/labcomm.h
index f7b273c..1e4f303 100644
--- a/lib/c/labcomm.h
+++ b/lib/c/labcomm.h
@@ -84,6 +84,12 @@ void labcomm_decoder_register_new_datatype_handler(struct labcomm_decoder *d,
  */
 struct labcomm_lock;
 
+int labcomm_lock_free(struct labcomm_lock *lock);
+int labcomm_lock_lock(struct labcomm_lock *lock);
+int labcomm_lock_unlock(struct labcomm_lock *lock);
+int labcomm_lock_wait(struct labcomm_lock *lock, useconds_t usec);
+int labcomm_lock_notify_all(struct labcomm_lock *lock);
+
 /*
  * Decoder
  */
diff --git a/lib/c/labcomm_dynamic_buffer_writer.c b/lib/c/labcomm_dynamic_buffer_writer.c
index cca4df2..f3619b0 100644
--- a/lib/c/labcomm_dynamic_buffer_writer.c
+++ b/lib/c/labcomm_dynamic_buffer_writer.c
@@ -30,7 +30,8 @@
 static int dyn_alloc(struct labcomm_writer *w, 
 		     struct labcomm_writer_action_context *action_context,
 		     struct labcomm_encoder *encoder,
-		     char *labcomm_version)
+		     char *labcomm_version,
+		     labcomm_encoder_enqueue enqueue)
 {
   w->data_size = 1000;
   w->count = w->data_size;
@@ -59,7 +60,6 @@ static int dyn_free(struct labcomm_writer *w,
 
 static int dyn_start(struct labcomm_writer *w, 
 		     struct labcomm_writer_action_context *action_context,
-		     struct labcomm_encoder *encoder,
 		     int index,
 		     struct labcomm_signature *signature,
 		     void *value)
diff --git a/lib/c/labcomm_fd_reader.c b/lib/c/labcomm_fd_reader.c
index 154b0a8..70c1df9 100644
--- a/lib/c/labcomm_fd_reader.c
+++ b/lib/c/labcomm_fd_reader.c
@@ -112,15 +112,11 @@ static int fd_fill(struct labcomm_reader *r,
 }
 
 static int fd_start(struct labcomm_reader *r,
-		    struct labcomm_reader_action_context *action_context)
+		    struct labcomm_reader_action_context *action_context,
+		    int index, struct labcomm_signature *signature,
+		    void *value)
 {
-  int available;
-
-  available = r->count - r->pos;
-  if (available == 0) {
-    available = fd_fill(r, action_context);
-  }
-  return available;
+  return 0;
 }
 
 static int fd_end(struct labcomm_reader *r, 
diff --git a/lib/c/labcomm_fd_writer.c b/lib/c/labcomm_fd_writer.c
index 1ff8b22..476052c 100644
--- a/lib/c/labcomm_fd_writer.c
+++ b/lib/c/labcomm_fd_writer.c
@@ -42,7 +42,8 @@ static int fd_flush(struct labcomm_writer *w,
 static int fd_alloc(struct labcomm_writer *w, 
 		    struct labcomm_writer_action_context *action_context, 
 		    struct labcomm_encoder *encoder,
-		    char *version)
+		    char *version,
+		    labcomm_encoder_enqueue enqueue)
 {
   w->data = malloc(BUFFER_SIZE);
   if (! w->data) {
@@ -82,7 +83,6 @@ static int fd_free(struct labcomm_writer *w,
 
 static int fd_start(struct labcomm_writer *w, 
 		    struct labcomm_writer_action_context *action_context,
-		    struct labcomm_encoder *encoder,
 		    int index,
 		    struct labcomm_signature *signature,
 		    void *value)
diff --git a/lib/c/labcomm_ioctl.h b/lib/c/labcomm_ioctl.h
index d332314..6f35ffe 100644
--- a/lib/c/labcomm_ioctl.h
+++ b/lib/c/labcomm_ioctl.h
@@ -81,16 +81,9 @@
 #define LABCOMM_IOSWN(type,nr,nargs)					\
   LABCOMM_IOC(LABCOMM_IOC_USESIG,LABCOMM_IOC_WRITE,type,nr,nargs)
 
-struct labcomm_ioctl_register_signature {
-  int index;
-  struct labcomm_signature *signature;
-};
-
-#define LABCOMM_IOCTL_REGISTER_SIGNATURE \
-  LABCOMM_IOW(0,1,struct labcomm_ioctl_register_signature)
 #define LABCOMM_IOCTL_WRITER_GET_BYTES_WRITTEN \
-  LABCOMM_IOR(0,2,int)
+  LABCOMM_IOR(0,1,int)
 #define LABCOMM_IOCTL_WRITER_GET_BYTE_POINTER \
-  LABCOMM_IOR(0,3,void*)
+  LABCOMM_IOR(0,2,void*)
 
 #endif
diff --git a/lib/c/labcomm_private.h b/lib/c/labcomm_private.h
index 3755cc4..8632dc0 100644
--- a/lib/c/labcomm_private.h
+++ b/lib/c/labcomm_private.h
@@ -33,6 +33,7 @@
 #include <stdint.h>
 #include <stdlib.h>
 #include <string.h>
+#include <unistd.h>
 #include "labcomm.h"
 
 /*
@@ -69,17 +70,19 @@
 /*
  * Semi private lock declarations
  */
+struct labcomm_lock;
+
 struct labcomm_lock_action {
-  int (*alloc)(void *context);
-  int (*free)(void *context);
-  int (*read_lock)(void *context);
-  int (*read_unlock)(void *context);
-  int (*write_lock)(void *context);
-  int (*write_unlock)(void *context);
+  int (*free)(struct labcomm_lock *lock);
+  int (*acquire)(struct labcomm_lock *lock);
+  int (*release)(struct labcomm_lock *lock);
+  int (*wait)(struct labcomm_lock *lock, useconds_t usec);
+  int (*notify)(struct labcomm_lock *lock);
 };
 
 struct labcomm_lock {
-  const struct labcomm_lock_action action;
+  const struct labcomm_lock_action *action;
+  void *context;
 };
 
 /*
@@ -95,20 +98,41 @@ typedef void (*labcomm_decoder_function)(
 struct labcomm_reader_action_context;
 
 struct labcomm_reader_action {
+  /* 'alloc' is called at the first invocation of 'labcomm_decoder_decode_one' 
+     on the decoder containing the reader. If 'labcomm_version' != NULL
+     and non-empty the transport layer may use it to ensure that
+     compatible versions are used.
+
+     Returned value:
+       >  0    Number of bytes allocated for buffering
+       <= 0    Error
+  */
   int (*alloc)(struct labcomm_reader *r, 
 	       struct labcomm_reader_action_context *action_context, 
 	       struct labcomm_decoder *decoder, char *labcomm_version);
+  /* 'free' returns the resources claimed by 'alloc' and might have other
+     reader specific side-effects as well.
+
+     Returned value:
+       == 0    Success
+       != 0    Error
+  */
   int (*free)(struct labcomm_reader *r, 
 	      struct labcomm_reader_action_context *action_context);
+  /* 'start' is called right after a sample has arrived. In the case of 
+     a sample or typedef, 'value' == NULL.
+   */
   int (*start)(struct labcomm_reader *r, 
-	       struct labcomm_reader_action_context *action_context);
+	       struct labcomm_reader_action_context *action_context,
+	       int index, struct labcomm_signature *signature,
+	       void *value);
   int (*end)(struct labcomm_reader *r, 
 	     struct labcomm_reader_action_context *action_context);
   int (*fill)(struct labcomm_reader *r, 
 	      struct labcomm_reader_action_context *action_context);
   int (*ioctl)(struct labcomm_reader *r, 
 	       struct labcomm_reader_action_context *action_context,
-	       int signature_index, struct labcomm_signature *signature, 
+	       int index, struct labcomm_signature *signature, 
 	       uint32_t ioctl_action, va_list args);
 };
 
@@ -134,22 +158,23 @@ int labcomm_reader_alloc(struct labcomm_reader *r,
 int labcomm_reader_free(struct labcomm_reader *r, 
 			struct labcomm_reader_action_context *action_context);
 int labcomm_reader_start(struct labcomm_reader *r, 
-			 struct labcomm_reader_action_context *action_context);
+			 struct labcomm_reader_action_context *action_context,
+			 int index, struct labcomm_signature *signature,
+			 void *value);
 int labcomm_reader_end(struct labcomm_reader *r, 
 		       struct labcomm_reader_action_context *action_context);
 int labcomm_reader_fill(struct labcomm_reader *r, 
 			struct labcomm_reader_action_context *action_context);
 int labcomm_reader_ioctl(struct labcomm_reader *r, 
 			 struct labcomm_reader_action_context *action_context,
-			 int signature_index, 
-			 struct labcomm_signature *signature, 
+			 int index, struct labcomm_signature *signature, 
 			 uint32_t ioctl_action, va_list args);
 
 /*
  * Non typesafe registration function to be called from
  * generated labcomm_decoder_register_* functions.
  */
-void labcomm_internal_decoder_register(
+int labcomm_internal_decoder_register(
   struct labcomm_decoder *d, 
   struct labcomm_signature *s, 
   labcomm_decoder_function decoder,
@@ -248,17 +273,31 @@ static inline char *labcomm_read_string(struct labcomm_reader *r)
 typedef int (*labcomm_encoder_function)(
   struct labcomm_writer *,
   void *value);
+typedef int (*labcomm_encoder_enqueue)(
+  struct labcomm_encoder *encoder, 
+  void (*action)(struct labcomm_encoder *encoder, 
+		 void *context),
+  void *context);
 struct labcomm_writer_action_context;
 
 struct labcomm_writer_action {
   int (*alloc)(struct labcomm_writer *w, 
 	       struct labcomm_writer_action_context *action_context, 
-	       struct labcomm_encoder *encoder, char *labcomm_version);
+	       struct labcomm_encoder *encoder, char *labcomm_version,
+	       labcomm_encoder_enqueue enqueue);
   int (*free)(struct labcomm_writer *w, 
 	      struct labcomm_writer_action_context *action_context);
+  /* 'start' is called right before a sample is to be sent. In the 
+     case of a sample or typedef, 'value' == NULL.
+
+     Returned value:
+       == 0          Success -> continue sending the sample
+       == -EALREADY  Success -> silently skip sending the sample,
+                                'end' will not be called
+       < 0           Error
+   */
   int (*start)(struct labcomm_writer *w, 
 	       struct labcomm_writer_action_context *action_context,
-	       struct labcomm_encoder *encoder,
 	       int index, struct labcomm_signature *signature,
 	       void *value);
   int (*end)(struct labcomm_writer *w, 
@@ -267,7 +306,7 @@ struct labcomm_writer_action {
 	       struct labcomm_writer_action_context *action_context); 
   int (*ioctl)(struct labcomm_writer *w, 
 	       struct labcomm_writer_action_context *action_context, 
-	       int signature_index, struct labcomm_signature *signature, 
+	       int index, struct labcomm_signature *signature, 
 	       uint32_t ioctl_action, va_list args);
 };
 
@@ -289,12 +328,12 @@ struct labcomm_writer {
 int labcomm_writer_alloc(struct labcomm_writer *w, 
 			 struct labcomm_writer_action_context *action_context, 
 			 struct labcomm_encoder *encoder, 
-			 char *labcomm_version);
+			 char *labcomm_version,
+			 labcomm_encoder_enqueue enqueue);
 int labcomm_writer_free(struct labcomm_writer *w, 
 			struct labcomm_writer_action_context *action_context);
 int labcomm_writer_start(struct labcomm_writer *w, 
 			 struct labcomm_writer_action_context *action_context,
-			 struct labcomm_encoder *encoder,
 			 int index, struct labcomm_signature *signature,
 			 void *value);
 int labcomm_writer_end(struct labcomm_writer *w, 
@@ -303,11 +342,10 @@ int labcomm_writer_flush(struct labcomm_writer *w,
 			 struct labcomm_writer_action_context *action_context); 
 int labcomm_writer_ioctl(struct labcomm_writer *w, 
 			 struct labcomm_writer_action_context *action_context, 
-			 int signature_index, 
-			 struct labcomm_signature *signature, 
+			 int index, struct labcomm_signature *signature, 
 			 uint32_t ioctl_action, va_list args);
 
-void labcomm_internal_encoder_register(
+int labcomm_internal_encoder_register(
   struct labcomm_encoder *encoder, 
   struct labcomm_signature *signature, 
   labcomm_encoder_function encode);
@@ -318,7 +356,6 @@ int labcomm_internal_encode(
   labcomm_encoder_function encode,
   void *value);
 
-
 int labcomm_internal_encoder_ioctl(struct labcomm_encoder *encoder, 
 				   struct labcomm_signature *signature,
 				   uint32_t ioctl_action, va_list args);
diff --git a/lib/c/labcomm_pthread_mutex_lock.c b/lib/c/labcomm_pthread_mutex_lock.c
index f8c58ef..66c8ac3 100644
--- a/lib/c/labcomm_pthread_mutex_lock.c
+++ b/lib/c/labcomm_pthread_mutex_lock.c
@@ -1,15 +1,108 @@
+#include <errno.h>
+#include <pthread.h>
 #include "labcomm.h"
 #include "labcomm_private.h"
 
+struct labcomm_pthread_mutex_lock {
+  struct labcomm_lock lock;
+  pthread_mutex_t mutex;
+  pthread_cond_t cond;
+};
+
+int do_free(struct labcomm_lock *l)
+{
+  struct labcomm_pthread_mutex_lock *lock = l->context;
+  
+  pthread_cond_destroy(&lock->cond);
+  pthread_mutex_destroy(&lock->mutex);
+  free(lock);
+  return 0;
+}
+
+int do_acquire(struct labcomm_lock *l)
+{
+  struct labcomm_pthread_mutex_lock *lock = l->context;
+  
+  if (pthread_mutex_lock(&lock->mutex) != 0) {
+    return -errno;
+  }
+  return 0;
+}
+
+int do_release(struct labcomm_lock *l)
+{
+  struct labcomm_pthread_mutex_lock *lock = l->context;
+  
+  if (pthread_mutex_unlock(&lock->mutex) != 0) {
+    return -errno;
+  }
+  return 0;
+}
+
+int do_wait(struct labcomm_lock *l, useconds_t usec)
+{
+  struct labcomm_pthread_mutex_lock *lock = l->context;
+  
+  if (usec <= 0) {
+    if (pthread_cond_wait(&lock->cond, &lock->mutex) != 0) {
+      return -errno;
+    }
+  } else {
+    struct timespec abstime;
+    time_t sec = usec / 1000000;
+    long nsec = (usec % 1000000) * 1000;
+
+    clock_gettime(CLOCK_REALTIME, &abstime);
+    abstime.tv_nsec += nsec;
+    abstime.tv_sec += sec + abstime.tv_nsec / 1000000000;
+    abstime.tv_nsec %= 1000000000;
+    if (pthread_cond_timedwait(&lock->cond, &lock->mutex, &abstime) != 0) {
+      return -errno;
+    }
+  }
+  return 0;
+}
+
+int do_notify(struct labcomm_lock *l)
+{
+  struct labcomm_pthread_mutex_lock *lock = l->context;
+  if (pthread_cond_broadcast(&lock->cond) != 0) {
+    return -errno;
+  }
+  return 0;
+}
+
+static struct labcomm_lock_action action = {
+  .free = do_free,
+  .acquire = do_acquire,
+  .release = do_release,
+  .wait = do_wait,
+  .notify = do_notify
+};
+
 struct labcomm_lock *labcomm_pthread_mutex_lock_new()
 {
-  struct labcomm_lock *result;
+  struct labcomm_lock *result = NULL;
+  struct labcomm_pthread_mutex_lock *lock;
 
-  result = malloc(sizeof(*result));
-  if (result == NULL) {
+  lock = malloc(sizeof(*lock));
+  if (lock == NULL) {
     goto out;
   }
-  
+  if (pthread_mutex_init(&lock->mutex, NULL) != 0) {
+    goto free_lock;
+  }
+  if (pthread_cond_init(&lock->cond, NULL) != 0) {
+    goto destroy_mutex;
+  }
+  lock->lock.action = &action;
+  lock->lock.context = lock;
+  result = &lock->lock;
+  goto out;
+destroy_mutex:
+  pthread_mutex_destroy(&lock->mutex);
+free_lock:
+  free(lock);
 out:
   return result;
 
diff --git a/lib/c/test/test_labcomm_generated_encoding.c b/lib/c/test/test_labcomm_generated_encoding.c
index 773268e..32a5a99 100644
--- a/lib/c/test/test_labcomm_generated_encoding.c
+++ b/lib/c/test/test_labcomm_generated_encoding.c
@@ -36,7 +36,8 @@ static int buf_writer_alloc(
   struct labcomm_writer *w, 
   struct labcomm_writer_action_context *action_context,
   struct labcomm_encoder *encoder,
-  char *labcomm_version)
+  char *labcomm_version,
+  labcomm_encoder_enqueue enqueue)
 {
   writer = w; /* Hack */
   w->data_size = sizeof(buffer);
@@ -57,7 +58,6 @@ static int buf_writer_free(
 static int buf_writer_start(
   struct labcomm_writer *w,
   struct labcomm_writer_action_context *action_context,
-  struct labcomm_encoder *encoder,
   int index,
   struct labcomm_signature *signature,
   void *value)
-- 
GitLab