Skip to content
Snippets Groups Projects
Commit f00568a0 authored by Anders Blomdell's avatar Anders Blomdell
Browse files

First version of scheduler interface.

parent bb66c0de
No related branches found
No related tags found
No related merge requests found
......@@ -33,3 +33,4 @@ lib/c/test/test_labcomm_basic_type_encoding
lib/c/test/test_labcomm_generated_encoding
lib/java/se/lth/control/labcomm/WriterWrapper.class
lib/c/liblabcomm.so.1
lib/c/test/test_labcomm_pthread_scheduler
......@@ -5,18 +5,21 @@ CC=gcc
CFLAGS=-g -Wall -Werror -O3 -I. -Itest
LDFLAGS=-L.
#LDLIBS_TEST=-Tlabcomm.linkscript -lcunit -llabcomm
LDLIBS_TEST=-lcunit -llabcomm -Tlabcomm.linkscript
LDLIBS_TEST=-lcunit -llabcomm -Tlabcomm.linkscript -lrt
OBJS= labcomm.o \
labcomm_memory.o labcomm_default_memory.o \
OBJS=labcomm_memory.o labcomm_default_memory.o \
labcomm_time.o labcomm_scheduler.o \
labcomm.o \
labcomm_dynamic_buffer_writer.o labcomm_fd_reader.o labcomm_fd_writer.o \
labcomm_pthread_scheduler.o \
labcomm_pthread_mutex_lock.o
#FIXME: labcomm_mem_reader.o labcomm_mem_writer.o
LABCOMM_JAR=../../compiler/labComm.jar
LABCOMM=java -jar $(LABCOMM_JAR)
TESTS=test_labcomm_basic_type_encoding test_labcomm_generated_encoding
TESTS=test_labcomm_basic_type_encoding test_labcomm_generated_encoding \
test_labcomm_pthread_scheduler
#
#FIXME: test_labcomm test_labcomm_errors
TEST_DIR=test
......@@ -70,6 +73,7 @@ run-test-%: $(TEST_DIR)/% | $(TEST_DIR)
$(TEST_DIR)/%.o: $(TEST_DIR)/%.c
$(CC) $(CFLAGS) -o $@ -c $<
.PRECIOUS: $(TEST_DIR)/%
$(TEST_DIR)/%: $(TEST_DIR)/%.o liblabcomm.a
$(CC) $(LDFLAGS) -o $@ $^ $(LDLIBS) $(LDLIBS_TEST)
......
......@@ -90,6 +90,9 @@ int labcomm_lock_acquire(struct labcomm_lock *lock);
int labcomm_lock_release(struct labcomm_lock *lock);
int labcomm_lock_wait(struct labcomm_lock *lock, useconds_t usec);
int labcomm_lock_notify(struct labcomm_lock *lock);
int labcomm_lock_sleep_epoch(struct labcomm_lock *lock);
int labcomm_lock_sleep_add(struct labcomm_lock *lock, useconds_t usec);
int labcomm_lock_sleep(struct labcomm_lock *lock);
/*
* Dynamic memory handling
......
/*
labcomm_pthread_scheduler.c -- labcomm pthread based task coordination
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include "labcomm.h"
#include "labcomm_scheduler.h"
#include "labcomm_scheduler_private.h"
#include "labcomm_pthread_scheduler.h"
struct pthread_time {
struct labcomm_time time;
struct labcomm_memory *memory;
struct timespec abstime;
};
struct pthread_deferred {
struct pthread_deferred *next;
struct pthread_deferred *prev;
struct timespec when;
void (*action)(void *context);
void *context;
};
struct pthread_scheduler {
struct labcomm_scheduler scheduler;
struct labcomm_memory *memory;
int wakeup;
pthread_mutex_t writer_mutex;
pthread_mutex_t data_mutex;
pthread_cond_t data_cond;
struct pthread_deferred deferred;
struct pthread_deferred deferred_with_delay;
};
static struct labcomm_time_action time_action;
static int queue_empty(struct pthread_deferred *queue)
{
return queue->next == queue;
}
static void timespec_add_usec(struct timespec *t, useconds_t usec)
{
time_t sec = usec / 1000000;
long nsec = (usec % 1000000) * 1000;
t->tv_nsec += nsec;
t->tv_sec += sec + t->tv_nsec / 1000000000;
t->tv_nsec %= 1000000000;
}
static int timespec_compare(struct timespec *t1, struct timespec *t2)
{
if (t1->tv_sec == t2->tv_sec && t1->tv_nsec == t2->tv_nsec) {
return 0;
} else if (t1->tv_sec == 0 && t1->tv_nsec == 0) {
/* t1 is at end of time */
return 1;
} else if (t2->tv_sec == 0 && t2->tv_nsec == 0) {
/* t2 is at end of time */
return -1;
} else if (t1->tv_sec < t2->tv_sec) {
return -1;
} else if (t1->tv_sec == t2->tv_sec) {
if (t1->tv_nsec < t2->tv_nsec) {
return -1;
} else if (t1->tv_nsec == t2->tv_nsec) {
return 0;
} else {
return 1;
}
} else {
return 1;
}
}
static struct labcomm_time *time_new(struct labcomm_memory *memory)
{
struct pthread_time *time;
time = labcomm_memory_alloc(memory, 0, sizeof(*time));
if (time == NULL) {
return NULL;
} else {
time->time.action = &time_action;
time->time.context = time;
time->memory = memory;
clock_gettime(CLOCK_REALTIME, &time->abstime);
return &time->time;
}
}
static int time_free(struct labcomm_time *t)
{
struct pthread_time *time = t->context;
struct labcomm_memory *memory = time->memory;
labcomm_memory_free(memory, 0, time);
return 0;
}
static int time_add_usec(struct labcomm_time *t, useconds_t usec)
{
struct pthread_time *time = t->context;
timespec_add_usec(&time->abstime, usec);
return 0;
}
static struct labcomm_time_action time_action = {
.free = time_free,
.add_usec = time_add_usec
};
static int run_action(struct pthread_scheduler *scheduler,
struct pthread_deferred *element)
{
element->prev->next = element->next;
element->next->prev = element->prev;
labcomm_scheduler_data_unlock(&scheduler->scheduler);
element->action(element->context);
labcomm_memory_free(scheduler->memory, 1, element);
labcomm_scheduler_data_lock(&scheduler->scheduler);
return 0;
}
static int run_deferred(struct pthread_scheduler *scheduler)
{
while (!queue_empty(&scheduler->deferred)) {
run_action(scheduler, scheduler->deferred.next);
}
if (!queue_empty(&scheduler->deferred_with_delay)) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
while (timespec_compare(&scheduler->deferred_with_delay.next->when,
&now) <= 0) {
run_action(scheduler, scheduler->deferred_with_delay.next);
}
}
return 0;
}
static int scheduler_free(struct labcomm_scheduler *s)
{
struct pthread_scheduler *scheduler = s->context;
struct labcomm_memory *memory = scheduler->memory;
labcomm_memory_free(memory, 0, scheduler);
return 0;
}
static int scheduler_writer_lock(struct labcomm_scheduler *s)
{
struct pthread_scheduler *scheduler = s->context;
labcomm_scheduler_data_lock(&scheduler->scheduler);
run_deferred(scheduler); /* Run deferred tasks before taking lock */
labcomm_scheduler_data_unlock(&scheduler->scheduler);
if (pthread_mutex_lock(&scheduler->writer_mutex) != 0) {
return -errno;
}
return 0;
}
static int scheduler_writer_unlock(struct labcomm_scheduler *s)
{
struct pthread_scheduler *scheduler = s->context;
if (pthread_mutex_unlock(&scheduler->writer_mutex) != 0) {
return -errno;
}
labcomm_scheduler_data_lock(&scheduler->scheduler);
run_deferred(scheduler); /* Run deferred tasks after releasing lock */
labcomm_scheduler_data_unlock(&scheduler->scheduler);
return 0;
}
static int scheduler_data_lock(struct labcomm_scheduler *s)
{
struct pthread_scheduler *scheduler = s->context;
if (pthread_mutex_lock(&scheduler->data_mutex) != 0) {
perror("Failed to lock data_mutex");
exit(1);
}
return 0;
}
static int scheduler_data_unlock(struct labcomm_scheduler *s)
{
struct pthread_scheduler *scheduler = s->context;
if (pthread_mutex_unlock(&scheduler->data_mutex) != 0) {
perror("Failed to unlock data_mutex");
exit(1);
}
return 0;
}
static struct labcomm_time *scheduler_now(struct labcomm_scheduler *s)
{
struct pthread_scheduler *scheduler = s->context;
return time_new(scheduler->memory);
}
static int scheduler_sleep(struct labcomm_scheduler *s,
struct labcomm_time *t)
{
struct pthread_scheduler *scheduler = s->context;
struct pthread_time *time = t?t->context:NULL;
labcomm_scheduler_data_lock(&scheduler->scheduler);
while (1) {
struct timespec *wakeup, now;
/* Run deferred tasks before sleeping */
run_deferred(scheduler);
clock_gettime(CLOCK_REALTIME, &now);
if (scheduler->wakeup ||
(time && timespec_compare(&time->abstime, &now) <= 0)) {
/* Done waiting */
scheduler->wakeup = 0;
break;
}
wakeup = NULL;
if (!queue_empty(&scheduler->deferred_with_delay)) {
wakeup = &scheduler->deferred_with_delay.next->when;
if (time && timespec_compare(&time->abstime, wakeup) < 0) {
wakeup = &time->abstime;
}
} else if (time) {
wakeup = &time->abstime;
}
if (wakeup) {
pthread_cond_timedwait(&scheduler->data_cond,
&scheduler->data_mutex,
wakeup);
} else {
pthread_cond_wait(&scheduler->data_cond,
&scheduler->data_mutex);
}
}
labcomm_scheduler_data_unlock(&scheduler->scheduler);
return 0;
}
static int scheduler_wakeup(struct labcomm_scheduler *s)
{
struct pthread_scheduler *scheduler = s->context;
labcomm_scheduler_data_lock(&scheduler->scheduler);
scheduler->wakeup = 1;
pthread_cond_signal(&scheduler->data_cond);
labcomm_scheduler_data_unlock(&scheduler->scheduler);
return 0;
}
static int scheduler_enqueue(struct labcomm_scheduler *s,
useconds_t delay,
void (*deferred)(void *context),
void *context)
{
struct pthread_scheduler *scheduler = s->context;
int result = 0;
struct pthread_deferred *element, *insert_before;
element = labcomm_memory_alloc(scheduler->memory, 1, sizeof(*element));
if (element == NULL) {
result = -ENOMEM;
goto out;
}
element->action = deferred;
element->context = context;
labcomm_scheduler_data_lock(&scheduler->scheduler);
if (delay == 0) {
insert_before = &scheduler->deferred;
} else {
clock_gettime(CLOCK_REALTIME, &element->when);
timespec_add_usec(&element->when, delay);
for (insert_before = scheduler->deferred_with_delay.next ;
timespec_compare(&element->when, &insert_before->when) >= 0 ;
insert_before = insert_before->next) {
}
}
element->next = insert_before;
element->prev = insert_before->prev;
element->prev->next = element;
element->next->prev = element;
pthread_cond_signal(&scheduler->data_cond);
labcomm_scheduler_data_unlock(&scheduler->scheduler);
out:
return result;
}
const struct labcomm_scheduler_action scheduler_action = {
.free = scheduler_free,
.writer_lock = scheduler_writer_lock,
.writer_unlock = scheduler_writer_unlock,
.data_lock = scheduler_data_lock,
.data_unlock = scheduler_data_unlock,
.now = scheduler_now,
.sleep = scheduler_sleep,
.wakeup = scheduler_wakeup,
.enqueue = scheduler_enqueue
};
struct labcomm_scheduler *labcomm_pthread_scheduler_new(
struct labcomm_memory *memory)
{
struct labcomm_scheduler *result = NULL;
struct pthread_scheduler *scheduler;
scheduler = labcomm_memory_alloc(memory, 0, sizeof(*scheduler));
if (scheduler == NULL) {
goto out;
} else {
scheduler->scheduler.action = &scheduler_action;
scheduler->scheduler.context = scheduler;
scheduler->wakeup = 0;
scheduler->memory = memory;
if (pthread_mutex_init(&scheduler->writer_mutex, NULL) != 0) {
goto free_scheduler;
}
if (pthread_mutex_init(&scheduler->data_mutex, NULL) != 0) {
goto destroy_writer_mutex;
}
if (pthread_cond_init(&scheduler->data_cond, NULL) != 0) {
goto destroy_data_mutex;
}
scheduler->deferred.next = &scheduler->deferred;
scheduler->deferred.prev = &scheduler->deferred;
scheduler->deferred_with_delay.next = &scheduler->deferred_with_delay;
scheduler->deferred_with_delay.prev = &scheduler->deferred_with_delay;
scheduler->deferred_with_delay.when.tv_sec = 0;
scheduler->deferred_with_delay.when.tv_nsec = 0;
result = &scheduler->scheduler;
goto out;
}
destroy_data_mutex:
pthread_mutex_destroy(&scheduler->data_mutex);
destroy_writer_mutex:
pthread_mutex_destroy(&scheduler->writer_mutex);
free_scheduler:
labcomm_memory_free(memory, 0, scheduler);
out:
return result;
}
/*
labcomm_pthread_scheduler.h -- labcomm pthread based task coordination
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _LABCOMM_PTHREAD_SCHEDULER_H_
#define _LABCOMM_PTHREAD_SCHEDULER_H_
#include "labcomm.h"
struct labcomm_scheduler *labcomm_pthread_scheduler_new(
struct labcomm_memory *memory);
#endif
/*
labcomm_scheduler.c -- labcomm task coordination
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include "labcomm_scheduler_private.h"
#define SCHEDULER_scheduler(scheduler, ...) scheduler
#define SCHEDULER(func, ...) \
if (SCHEDULER_scheduler(__VA_ARGS__) && \
SCHEDULER_scheduler(__VA_ARGS__)->action->func) { \
return SCHEDULER_scheduler(__VA_ARGS__)->action->func(__VA_ARGS__); \
} \
return -ENOSYS;
int labcomm_scheduler_free(struct labcomm_scheduler *s)
{
SCHEDULER(free, s);
}
int labcomm_scheduler_writer_lock(struct labcomm_scheduler *s)
{
SCHEDULER(writer_lock, s);
}
int labcomm_scheduler_writer_unlock(struct labcomm_scheduler *s)
{
SCHEDULER(writer_unlock, s);
}
int labcomm_scheduler_data_lock(struct labcomm_scheduler *s)
{
SCHEDULER(data_lock, s);
}
int labcomm_scheduler_data_unlock(struct labcomm_scheduler *s)
{
SCHEDULER(data_unlock, s);
}
struct labcomm_time *labcomm_scheduler_now(struct labcomm_scheduler *s)
{
if (s && s->action->now) {
return s->action->now(s);
}
return NULL;
}
int labcomm_scheduler_sleep(struct labcomm_scheduler *s,
struct labcomm_time *wakeup)
{
SCHEDULER(sleep, s, wakeup);
}
int labcomm_scheduler_wakeup(struct labcomm_scheduler *s)
{
SCHEDULER(wakeup, s);
}
int labcomm_scheduler_enqueue(struct labcomm_scheduler *s,
useconds_t delay,
void (*func)(void *context),
void *context)
{
SCHEDULER(enqueue, s, delay, func, context);
}
/*
labcomm_scheduler.h -- labcomm task coordination
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _LABCOMM_SCHEDULER_H_
#define _LABCOMM_SCHEDULER_H_
#include <unistd.h>
struct labcomm_time;
int labcomm_time_free(struct labcomm_time *t);
int labcomm_time_add_usec(struct labcomm_time *t, useconds_t usec);
struct labcomm_scheduler;
int labcomm_scheduler_free(struct labcomm_scheduler *s);
/* Lock and event handling */
int labcomm_scheduler_writer_lock(struct labcomm_scheduler *s);
int labcomm_scheduler_writer_unlock(struct labcomm_scheduler *s);
int labcomm_scheduler_data_lock(struct labcomm_scheduler *s);
int labcomm_scheduler_data_unlock(struct labcomm_scheduler *s);
/* Time handling */
struct labcomm_time *labcomm_scheduler_now(struct labcomm_scheduler *s);
int labcomm_scheduler_sleep(struct labcomm_scheduler *s,
struct labcomm_time *wakeup);
int labcomm_scheduler_wakeup(struct labcomm_scheduler *s);
/* Deferred action handling */
int labcomm_scheduler_enqueue(struct labcomm_scheduler *s,
useconds_t delay,
void (*deferred)(void *context),
void *context);
#endif
/*
labcomm_scheduler.h -- labcomm task coordination, semi-private part
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _LABCOMM_SCHEDULER_PRIVATE_H_
#define _LABCOMM_SCHEDULER_PRIVATE_H_
#include <unistd.h>
#include "labcomm_scheduler.h"
struct labcomm_time {
const struct labcomm_time_action {
int (*free)(struct labcomm_time *t);
int (*add_usec)(struct labcomm_time *t, useconds_t usec);
} *action;
void *context;
};
struct labcomm_scheduler {
const struct labcomm_scheduler_action {
int (*free)(struct labcomm_scheduler *s);
int (*writer_lock)(struct labcomm_scheduler *s);
int (*writer_unlock)(struct labcomm_scheduler *s);
int (*data_lock)(struct labcomm_scheduler *s);
int (*data_unlock)(struct labcomm_scheduler *s);
struct labcomm_time *(*now)(struct labcomm_scheduler *s);
int (*sleep)(struct labcomm_scheduler *s,
struct labcomm_time *wakeup);
int (*wakeup)(struct labcomm_scheduler *s);
int (*enqueue)(struct labcomm_scheduler *s,
useconds_t delay,
void (*deferred)(void *context),
void *context);
} *action;
void *context;
};
#endif
/*
labcomm_time.c -- labcomm time handling
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include "labcomm_scheduler_private.h"
#define TIME_time(time, ...) time
#define TIME(func, ...) \
if (TIME_time(__VA_ARGS__) && \
TIME_time(__VA_ARGS__)->action->func) { \
return TIME_time(__VA_ARGS__)->action->func(__VA_ARGS__); \
} \
return -ENOSYS;
int labcomm_time_free(struct labcomm_time *s)
{
TIME(free, s);
}
int labcomm_time_add_usec(struct labcomm_time *s, useconds_t usec)
{
TIME(add_usec, s, usec);
}
/*
test_labcomm_pthread_scheduler.c -- test labcomm pthread based task
coordination
Copyright 2013 Anders Blomdell <anders.blomdell@control.lth.se>
This file is part of LabComm.
LabComm is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
LabComm is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include "labcomm_scheduler.h"
#include "labcomm_pthread_scheduler.h"
struct func_arg {
struct labcomm_scheduler *scheduler;
int i;
};
static void func(void *arg)
{
struct func_arg *func_arg = arg;
printf("%p %d\n", arg, func_arg->i);
if (func_arg->i == 999) {
labcomm_scheduler_wakeup(func_arg->scheduler);
}
}
void enqueue(struct labcomm_scheduler *scheduler,
int first, int last)
{
int i;
for (i = first ; i <= last ; i++) {
struct func_arg *tmp = malloc(sizeof(*tmp));
tmp->scheduler = scheduler;
tmp->i = i;
labcomm_scheduler_enqueue(scheduler, i*1000000, func, tmp);
}
}
int main(int argc, char *argv[])
{
struct labcomm_scheduler *scheduler;
struct labcomm_time *time;
scheduler = labcomm_pthread_scheduler_new(labcomm_default_memory);
enqueue(scheduler, 0, 5);
enqueue(scheduler, 0, 1);
enqueue(scheduler, 1, 3);
enqueue(scheduler, 7, 10);
{
struct func_arg *tmp = malloc(sizeof(*tmp));
tmp->scheduler = scheduler;
tmp->i = 999;
labcomm_scheduler_enqueue(scheduler, 6000000, func, tmp);
}
time = labcomm_scheduler_now(scheduler);
labcomm_time_add_usec(time, 12*1000000);
labcomm_scheduler_sleep(scheduler, NULL);
labcomm_scheduler_sleep(scheduler, time);
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment