From 4d9c4025222e19dac9a90cabe8bd886e47959ad6 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 17 Aug 2017 16:56:00 +0200 Subject: lib: Add basic FRCT mechanisms This adds the basic FRCT mechanisms to the library. Upon flow alloc or accept an FRCT instance is now created and used when reading or writing to the flow. The timerwheel has been refactored to allow recharging timers and removing them and is now part of the library. The first SDU sent over the connection has the DRF set and this initializes the connection. Sender and receiver inactivity timers are added. --- src/ipcpd/CMakeLists.txt | 4 - src/ipcpd/tests/CMakeLists.txt | 34 --- src/ipcpd/tests/timerwheel_test.c | 104 --------- src/ipcpd/timerwheel.c | 366 ------------------------------- src/ipcpd/timerwheel.h | 39 ---- src/lib/CMakeLists.txt | 2 + src/lib/dev.c | 440 ++++++++++++++++++++++++++++++++------ src/lib/frct_pci.c | 105 +++++++++ src/lib/hash.c | 40 ++-- src/lib/shm_flow_set.c | 11 +- src/lib/shm_rbuff_ll.c | 12 +- src/lib/shm_rbuff_pthr.c | 12 +- src/lib/tests/CMakeLists.txt | 1 + src/lib/tests/timerwheel_test.c | 106 +++++++++ src/lib/timerwheel.c | 371 ++++++++++++++++++++++++++++++++ 15 files changed, 988 insertions(+), 659 deletions(-) delete mode 100644 src/ipcpd/tests/CMakeLists.txt delete mode 100644 src/ipcpd/tests/timerwheel_test.c delete mode 100644 src/ipcpd/timerwheel.c delete mode 100644 src/ipcpd/timerwheel.h create mode 100644 src/lib/frct_pci.c create mode 100644 src/lib/tests/timerwheel_test.c create mode 100644 src/lib/timerwheel.c (limited to 'src') diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt index 0ead1fed..b2f350dd 100644 --- a/src/ipcpd/CMakeLists.txt +++ b/src/ipcpd/CMakeLists.txt @@ -2,13 +2,9 @@ set(IPCP_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/ipcp.c ${CMAKE_CURRENT_SOURCE_DIR}/shim-data.c - ${CMAKE_CURRENT_SOURCE_DIR}/timerwheel.c ) add_subdirectory(local) add_subdirectory(normal) add_subdirectory(shim-udp) add_subdirectory(shim-eth-llc) -if (NOT APPLE) - add_subdirectory(tests) -endif () diff --git a/src/ipcpd/tests/CMakeLists.txt b/src/ipcpd/tests/CMakeLists.txt deleted file mode 100644 index 9b5eeaa1..00000000 --- a/src/ipcpd/tests/CMakeLists.txt +++ /dev/null @@ -1,34 +0,0 @@ -get_filename_component(CURRENT_SOURCE_PARENT_DIR - ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) -get_filename_component(CURRENT_BINARY_PARENT_DIR - ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) - -include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -include_directories(${CMAKE_CURRENT_BINARY_DIR}) - -include_directories(${CURRENT_SOURCE_PARENT_DIR}) -include_directories(${CURRENT_BINARY_PARENT_DIR}) - -include_directories(${CMAKE_SOURCE_DIR}/include) -include_directories(${CMAKE_BINARY_DIR}/include) - -get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) -get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) - -create_test_sourcelist(${PARENT_DIR}_tests test_suite.c - # Add new tests here - timerwheel_test.c - ) - -add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) -target_link_libraries(${PARENT_DIR}_test ouroboros) - -add_dependencies(check ${PARENT_DIR}_test) - -set(tests_to_run ${${PARENT_DIR}_tests}) -remove(tests_to_run test_suite.c) - -foreach (test ${tests_to_run}) - get_filename_component(test_name ${test} NAME_WE) - add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) -endforeach (test) diff --git a/src/ipcpd/tests/timerwheel_test.c b/src/ipcpd/tests/timerwheel_test.c deleted file mode 100644 index 6ba1b890..00000000 --- a/src/ipcpd/tests/timerwheel_test.c +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Test of the timer wheel - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include "timerwheel.c" - -#include -#include -#include -#include - -#define MAX_ELEMENTS 100 -#define MAX_RESOLUTION 10 /* ms */ -#define MAX_ADDITIONS 1000 - -int total; - -int add(void * o) -{ - total += *((int *) o); - return 0; -} - -int timerwheel_test(int argc, char ** argv) -{ - struct timerwheel * tw; - long resolution; - long elements; - struct timespec wait; - - int additions; - - int check_total = 0; - - int i; - - (void) argc; - (void) argv; - - total = 0; - - srand(time(NULL)); - - resolution = rand() % (MAX_RESOLUTION - 1) + 1; - elements = rand() % (MAX_ELEMENTS - 10) + 10; - - tw = timerwheel_create(resolution, resolution * elements); - if (tw == NULL) { - printf("Failed to create timerwheel.\n"); - return -1; - } - - wait.tv_sec = (resolution * elements) / 1000; - wait.tv_nsec = ((resolution * elements) % 1000) * MILLION; - - additions = rand() % MAX_ADDITIONS + 1000; - - for (i = 0; i < additions; ++i) { - int delay = rand() % (resolution * elements); - int var = rand() % 5; - check_total += var; - if (timerwheel_add(tw, - (void (*)(void *)) add, - (void *) &var, - sizeof(var), - delay)) { - printf("Failed to add function."); - return -1; - } - } - - nanosleep(&wait, NULL); - - /* On some systems and VMs, the scheduler may be too slow. */ - if (total != check_total) - nanosleep(&wait, NULL); - - timerwheel_destroy(tw); - - if (total != check_total) { - printf("Totals do not match.\n"); - return -1; - } - - return 0; -} diff --git a/src/ipcpd/timerwheel.c b/src/ipcpd/timerwheel.c deleted file mode 100644 index 6086181a..00000000 --- a/src/ipcpd/timerwheel.c +++ /dev/null @@ -1,366 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Timerwheel - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include -#include -#include -#include - -#include -#include -#include -#include - -#define FRAC 10 /* accuracy of the timer */ - -#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1)); -#define tw_free(tw) (tw_used(tw) + 1 < tw->elements) -#define tw_empty(tw) (tw->head == tw->tail) - -enum tw_state { - TW_NULL = 0, - TW_RUNNING, - TW_DESTROY -}; - -struct tw_f { - struct list_head next; - void (* func)(void *); - void * arg; -}; - -struct tw_el { - struct list_head funcs; - struct timespec expiry; -}; - -struct timerwheel { - struct tw_el * wheel; - - struct timespec intv; - - size_t pos; - - struct list_head wq; - - pthread_cond_t work; - pthread_mutex_t lock; - - int resolution; - unsigned int elements; - - enum tw_state state; - pthread_mutex_t s_lock; - - pthread_t ticker; - pthread_t worker; -}; - -static void tw_el_fini(struct tw_el * e) -{ - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &e->funcs) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - if (f->arg != NULL) - free(f->arg); - } -} - -static enum tw_state tw_get_state(struct timerwheel * tw) -{ - enum tw_state state; - - assert(tw); - - pthread_mutex_lock(&tw->s_lock); - - state = tw->state; - - pthread_mutex_unlock(&tw->s_lock); - - return state; -} - -static void tw_set_state(struct timerwheel * tw, enum tw_state state) -{ - assert(tw); - assert(state != TW_NULL); - - pthread_mutex_lock(&tw->s_lock); - - tw->state = state; - - pthread_mutex_unlock(&tw->s_lock); -} - -static void * worker(void * o) -{ - struct list_head * p; - struct list_head * h; - - struct timerwheel * tw = (struct timerwheel *) o; - struct timespec dl; - struct timespec now; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - ts_add(&now, &tw->intv, &dl); - - pthread_mutex_lock(&tw->lock); - - while (tw_get_state(tw) == TW_RUNNING) { - if (pthread_cond_timedwait(&tw->work, &tw->lock, &dl) - == ETIMEDOUT) - ts_add(&dl, &tw->intv, &dl); - - list_for_each_safe(p, h, &tw->wq) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - pthread_mutex_unlock(&tw->lock); - f->func(f->arg); - if (f->arg != NULL) - free(f->arg); - free(f); - - pthread_mutex_lock(&tw->lock); - } - } - - pthread_mutex_unlock(&tw->lock); - - return (void *) o; -} - -static void * movement(void * o) -{ - struct timerwheel * tw = (struct timerwheel *) o; - struct timespec now = {0, 0}; - long ms = tw->resolution * tw->elements; - struct timespec total = {ms / 1000, - (ms % 1000) * MILLION}; - struct list_head * p; - struct list_head * h; - - while (tw_get_state(tw) == TW_RUNNING) { - clock_gettime(CLOCK_MONOTONIC, &now); - - pthread_mutex_lock(&tw->lock); - - if (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) < 0) { - pthread_mutex_unlock(&tw->lock); - nanosleep(&tw->intv, NULL); - continue; - } - - list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - list_add(&f->next, &tw->wq); - } - - ts_add(&tw->wheel[tw->pos].expiry, - &total, - &tw->wheel[tw->pos].expiry); - - tw->pos = (tw->pos + 1) & (tw->elements - 1); - - pthread_cond_signal(&tw->work); - - pthread_mutex_unlock(&tw->lock); - } - - return (void *) 0; -} - -struct timerwheel * timerwheel_create(unsigned int resolution, - unsigned int max_delay) -{ - struct timespec now = {0, 0}; - struct timespec res_ts = {resolution / 1000, - (resolution % 1000) * MILLION}; - unsigned long i; - - struct timerwheel * tw; - - pthread_condattr_t cattr; - - assert(resolution != 0); - - tw = malloc(sizeof(*tw)); - if (tw == NULL) - return NULL; - - if (pthread_mutex_init(&tw->lock, NULL)) - return NULL; - - tw->elements = 1; - - while (tw->elements < max_delay / resolution) - tw->elements <<= 1; - - tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements); - if (tw->wheel == NULL) { - free(tw); - return NULL; - } - - tw->resolution = resolution; - - tw->intv.tv_sec = (tw->resolution / FRAC) / 1000; - tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION; - - list_head_init(&tw->wq); - - if (pthread_mutex_init(&tw->lock, NULL)) { - free(tw->wheel); - free(tw); - return NULL; - } - - if (pthread_mutex_init(&tw->s_lock, NULL)) { - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } - - if (pthread_condattr_init(&cattr)) { - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - if (pthread_cond_init(&tw->work, &cattr)) { - pthread_mutex_destroy(&tw->s_lock); - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } - - tw->pos = 0; - tw->state = TW_RUNNING; - - clock_gettime(CLOCK_MONOTONIC, &now); - now.tv_nsec -= (now.tv_nsec % MILLION); - - for (i = 0; i < tw->elements; ++i) { - list_head_init(&tw->wheel[i].funcs); - tw->wheel[i].expiry = now; - ts_add(&now, &res_ts, &now); - } - - if (pthread_create(&tw->worker, NULL, worker, (void *) tw)) { - pthread_cond_destroy(&tw->work); - pthread_mutex_destroy(&tw->s_lock); - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } - - if (pthread_create(&tw->ticker, NULL, movement, (void *) tw)) { - tw_set_state(tw, TW_DESTROY); - pthread_join(tw->worker, NULL); - pthread_cond_destroy(&tw->work); - pthread_mutex_destroy(&tw->s_lock); - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } - - return tw; -} - -void timerwheel_destroy(struct timerwheel * tw) -{ - unsigned long i; - - struct list_head * p; - struct list_head * h; - - tw_set_state(tw, TW_DESTROY); - - pthread_join(tw->ticker, NULL); - pthread_join(tw->worker, NULL); - - for (i = 0; i < tw->elements; ++i) - tw_el_fini(&tw->wheel[i]); - - pthread_mutex_lock(&tw->lock); - - list_for_each_safe(p, h, &tw->wq) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - if (f->arg != NULL) - free(f->arg); - free(f); - } - - pthread_mutex_unlock(&tw->lock); - - pthread_cond_destroy(&tw->work); - pthread_mutex_destroy(&tw->lock); - pthread_mutex_destroy(&tw->s_lock); - - free(tw->wheel); - free(tw); -} - -int timerwheel_add(struct timerwheel * tw, - void (* func)(void *), - void * arg, - size_t arg_len, - unsigned int delay) -{ - int pos; - struct tw_f * f = malloc(sizeof(*f)); - if (f == NULL) - return -ENOMEM; - - f->func = func; - f->arg = malloc(arg_len); - if (f->arg == NULL) { - free(f); - return -ENOMEM; - } - - memcpy(f->arg, arg, arg_len); - - assert(delay < tw->elements * tw->resolution); - - pthread_mutex_lock(&tw->lock); - - pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); - list_add(&f->next, &tw->wheel[pos].funcs); - - pthread_mutex_unlock(&tw->lock); - - return 0; -} diff --git a/src/ipcpd/timerwheel.h b/src/ipcpd/timerwheel.h deleted file mode 100644 index 37a6d06a..00000000 --- a/src/ipcpd/timerwheel.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Ring buffer for incoming SDUs - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_TIMERWHEEL_H -#define OUROBOROS_IPCPD_TIMERWHEEL_H - -struct timerwheel; - -struct timerwheel * timerwheel_create(unsigned int resolution, - unsigned int max_delay); - -void timerwheel_destroy(struct timerwheel * tw); - -int timerwheel_add(struct timerwheel * tw, - void (* func)(void *), - void * arg, - size_t arg_len, - unsigned int delay); /* ms */ - -#endif /* OUROBOROS_IPCPD_TIMERWHEEL_H */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 550bbc08..728d975a 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -86,6 +86,7 @@ set(SOURCE_FILES cdap_req.c crc32.c dev.c + frct_pci.c hash.c hashtable.c irm.c @@ -104,6 +105,7 @@ set(SOURCE_FILES shm_rdrbuff.c sockets.c time_utils.c + timerwheel.c tpm.c utils.c ) diff --git a/src/lib/dev.c b/src/lib/dev.c index 9354855b..e81bf105 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include #include @@ -41,8 +43,14 @@ #define BUF_SIZE 1500 +#define TW_ELEMENTS 6000 +#define TW_RESOLUTION 1 /* ms */ + +#define MPL 2000 /* ms */ + struct flow_set { size_t idx; + bool np1_set; }; struct fqueue { @@ -59,6 +67,26 @@ enum port_state { PORT_DESTROY }; +struct frcti { + bool used; + + struct tw_f * snd_inact; + bool snd_drf; + uint64_t snd_lwe; + uint64_t snd_rwe; + + struct tw_f * rcv_inact; + bool rcv_drf; + uint64_t rcv_lwe; + uint64_t rcv_rwe; + + bool resource_control; + bool reliable; + bool error_check; + bool ordered; + bool partial; +}; + struct port { int fd; @@ -89,10 +117,14 @@ struct { struct shm_rdrbuff * rdrb; struct shm_flow_set * fqset; + struct timerwheel * tw; + int tw_users; + struct bmp * fds; struct bmp * fqueues; struct flow * flows; struct port * ports; + struct frcti * frcti; pthread_rwlock_t lock; } ai; @@ -203,6 +235,242 @@ static int api_announce(char * ap_name) return ret; } +/* Call under flows lock */ +static int finalize_write(int fd, + size_t idx) +{ + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) + return -ENOTALLOC; + + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + + return 0; +} + +static int frcti_init(int fd) +{ + ai.frcti[fd].used = true; + + ai.frcti[fd].snd_drf = true; + ai.frcti[fd].snd_lwe = 0; + ai.frcti[fd].snd_rwe = 0; + + ai.frcti[fd].rcv_drf = true; + ai.frcti[fd].rcv_lwe = 0; + ai.frcti[fd].rcv_rwe = 0; + + return 0; +} + +static void frcti_fini(int fd) +{ + struct frcti * frcti; + + frcti = &(ai.frcti[fd]); + + frcti->used = false; + + /* FIXME: We actually need to wait until these timers become NULL. */ + if (frcti->snd_inact != NULL) + timerwheel_stop(ai.tw, frcti->snd_inact); + + if (frcti->rcv_inact != NULL) + timerwheel_stop(ai.tw, frcti->rcv_inact); +} + +static int frcti_configure(int fd, + qosspec_t * qos) +{ + /* FIXME: Send configuration message here to other side. */ + + (void) fd; + (void) qos; + + return 0; +} + +static void frcti_snd_inactivity(void * arg) +{ + struct frcti * frcti; + + pthread_rwlock_wrlock(&ai.lock); + + frcti = (struct frcti * ) arg; + + frcti->snd_drf = true; + frcti->snd_inact = NULL; + + pthread_rwlock_unlock(&ai.lock); +} + +/* Called under flows lock */ +static int frcti_write(int fd, + struct shm_du_buff * sdb) +{ + struct frcti * frcti; + struct frct_pci pci; + + memset(&pci, 0, sizeof(pci)); + + frcti = &(ai.frcti[fd]); + + /* + * Set the DRF in the first packet of a new run of SDUs, + * otherwise simply recharge the timer. + */ + if (frcti->snd_drf) { + frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity, + frcti, 2 * MPL); + if (frcti->snd_inact == NULL) + return -1; + + pci.flags |= FLAG_DATA_RUN; + frcti->snd_drf = false; + } else { + if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL)) + return -1; + } + + pci.seqno = frcti->snd_lwe++; + pci.type |= PDU_TYPE_DATA; + + if (frct_pci_ser(sdb, &pci, frcti->error_check)) + return -1; + + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) + return -ENOTALLOC; + + return 0; +} + +static void frcti_rcv_inactivity(void * arg) +{ + struct frcti * frcti; + + pthread_rwlock_wrlock(&ai.lock); + + frcti = (struct frcti * ) arg; + + frcti->rcv_drf = true; + frcti->rcv_inact = NULL; + + pthread_rwlock_unlock(&ai.lock); +} + +static ssize_t frcti_read(int fd) +{ + ssize_t idx = -1; + struct timespec abstime; + struct frcti * frcti; + struct frct_pci pci; + struct shm_du_buff * sdb; + + pthread_rwlock_rdlock(&ai.lock); + + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + pthread_rwlock_unlock(&ai.lock); + } else { + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + bool timeo = ai.flows[fd].timesout; + struct timespec timeout = ai.flows[fd].rcv_timeo; + + pthread_rwlock_unlock(&ai.lock); + + if (timeo) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, &timeout, &abstime); + idx = shm_rbuff_read_b(rb, &abstime); + } else { + idx = shm_rbuff_read_b(rb, NULL); + } + } + + if (idx < 0) + return idx; + + pthread_rwlock_rdlock(&ai.lock); + + frcti = &(ai.frcti[fd]); + + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + /* SDU may be corrupted. */ + if (frct_pci_des(sdb, &pci, frcti->error_check)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + /* We don't accept packets when there is no inactivity timer. */ + if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + /* + * If there is an inactivity timer and the DRF is set, + * reset the state of the connection. + */ + if (pci.flags & FLAG_DATA_RUN) { + frcti->rcv_drf = true; + if (frcti->rcv_inact != NULL) + timerwheel_stop(ai.tw, frcti->rcv_inact); + frcti->rcv_lwe = pci.seqno; + } + + /* + * Start receiver inactivity if this packet has the DRF, + * otherwise simply restart it. + */ + if (frcti->rcv_drf) { + frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity, + frcti, 3 * MPL); + if (frcti->rcv_inact == NULL) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + frcti->rcv_drf = false; + } else { + if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + } + + pthread_rwlock_unlock(&ai.lock); + + return idx; +} + +static int frcti_event_wait(struct flow_set * set, + struct fqueue * fq, + const struct timespec * timeout) +{ + int ret; + + assert(set); + assert(fq); + assert(timeout); + + /* + * FIXME: Return the fq only if a data SDU + * for the application is available. + */ + + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (ret == -ETIMEDOUT) { + fq->fqsize = 0; + return -ETIMEDOUT; + } + + return ret; +} + static void flow_clear(int fd) { assert(!(fd < 0)); @@ -230,6 +498,9 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); + if (ai.frcti[fd].used) + frcti_fini(fd); + flow_clear(fd); } @@ -316,8 +587,14 @@ int ouroboros_init(const char * ap_name) if (ai.flows == NULL) goto fail_flows; - for (i = 0; i < AP_MAX_FLOWS; ++i) + ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); + if (ai.frcti == NULL) + goto fail_frcti; + + for (i = 0; i < AP_MAX_FLOWS; ++i) { flow_clear(i); + frcti_fini(i); + } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); if (ai.ports == NULL) @@ -353,8 +630,15 @@ int ouroboros_init(const char * ap_name) if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; + ai.tw = timerwheel_create(TW_RESOLUTION, + TW_RESOLUTION * TW_ELEMENTS); + if (ai.tw == NULL) + goto fail_timerwheel; + return 0; + fail_timerwheel: + pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < IRMD_MAX_FLOWS; ++i) pthread_cond_destroy(&ai.ports[i].state_cond); @@ -366,11 +650,13 @@ int ouroboros_init(const char * ap_name) fail_ap_name: free(ai.ports); fail_ports: + free(ai.frcti); + fail_frcti: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); fail_rdrb: - shm_flow_set_destroy(ai.fqset); + shm_flow_set_destroy(ai.fqset); fail_fqset: bmp_destroy(ai.fqueues); fail_fqueues: @@ -409,6 +695,9 @@ void ouroboros_fini() shm_rdrbuff_close(ai.rdrb); + if (ai.tw != NULL) + timerwheel_destroy(ai.tw); + free(ai.flows); free(ai.ports); @@ -463,9 +752,15 @@ int flow_accept(qosspec_t * qs, if (fd < 0) return fd; + pthread_rwlock_wrlock(&ai.lock); + + frcti_init(fd); + if (qs != NULL) *qs = ai.flows[fd].spec; + pthread_rwlock_unlock(&ai.lock); + return fd; } @@ -505,7 +800,7 @@ int flow_alloc(const char * dst_name, return -EIRMD; } - if (recv_msg->result != 0) { + if (recv_msg->result != 0) { int res = recv_msg->result; irm_msg__free_unpacked(recv_msg, NULL); return res; @@ -520,6 +815,22 @@ int flow_alloc(const char * dst_name, irm_msg__free_unpacked(recv_msg, NULL); + if (fd < 0) + return fd; + + pthread_rwlock_wrlock(&ai.lock); + + frcti_init(fd); + + if (frcti_configure(fd, qs)) { + flow_fini(fd); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.lock); + return -1; + } + + pthread_rwlock_unlock(&ai.lock); + return fd; } @@ -720,34 +1031,31 @@ ssize_t flow_write(int fd, return idx; } - if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { - shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } } else { /* blocking */ - struct shm_rdrbuff * rdrb = ai.rdrb; - struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; - pthread_rwlock_unlock(&ai.lock); - assert(tx_rb); - - idx = shm_rdrbuff_write_b(rdrb, + idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, count); - if (shm_rbuff_write(tx_rb, idx) < 0) { - shm_rdrbuff_remove(rdrb, idx); - return -ENOTALLOC; - } - pthread_rwlock_rdlock(&ai.lock); } - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + if (!ai.frcti[fd].used) { + if (finalize_write(fd, idx)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOTALLOC; + } + } else { + if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + } pthread_rwlock_unlock(&ai.lock); @@ -772,21 +1080,12 @@ ssize_t flow_read(int fd, return -ENOTALLOC; } - if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - pthread_rwlock_unlock(&ai.lock); - } else { - struct shm_rbuff * rb = ai.flows[fd].rx_rb; - bool timeo = ai.flows[fd].timesout; - struct timespec timeout = ai.flows[fd].rcv_timeo; - - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&ai.lock); - if (timeo) - idx = shm_rbuff_read_b(rb, &timeout); - else - idx = shm_rbuff_read_b(rb, NULL); - } + if (!ai.frcti[fd].used) + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + else + idx = frcti_read(fd); if (idx < 0) { assert(idx == -EAGAIN || idx == -ETIMEDOUT); @@ -823,6 +1122,8 @@ struct flow_set * flow_set_create() return NULL; } + set->np1_set = false; + pthread_rwlock_unlock(&ai.lock); return set; @@ -891,6 +1192,9 @@ int flow_set_add(struct flow_set * set, for (i = 0; i < sdus; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + if (ai.frcti[fd].used) + set->np1_set = true; + pthread_rwlock_unlock(&ai.lock); return ret; @@ -960,7 +1264,8 @@ int flow_event_wait(struct flow_set * set, struct fqueue * fq, const struct timespec * timeout) { - ssize_t ret; + ssize_t ret; + struct timespec abstime; if (set == NULL || fq == NULL) return -EINVAL; @@ -970,7 +1275,17 @@ int flow_event_wait(struct flow_set * set, assert(!fq->next); - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + if (set->np1_set) + ret = frcti_event_wait(set, fq, &abstime); + else + ret = shm_flow_set_wait(ai.fqset, set->idx, + fq->fqueue, &abstime); + if (ret == -ETIMEDOUT) { fq->fqsize = 0; return -ETIMEDOUT; @@ -1132,9 +1447,8 @@ int ipcp_flow_read(int fd, { ssize_t idx = -1; int port_id = -1; - struct shm_rbuff * rb; - assert(fd >=0); + assert(fd >= 0); assert(sdb); pthread_rwlock_rdlock(&ai.lock); @@ -1144,11 +1458,13 @@ int ipcp_flow_read(int fd, return -ENOTALLOC; } - rb = ai.flows[fd].rx_rb; - pthread_rwlock_unlock(&ai.lock); - idx = shm_rbuff_read(rb); + if (!ai.frcti[fd].used) + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + else + idx = frcti_read(fd); + if (idx < 0) return idx; @@ -1160,8 +1476,6 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - size_t idx; - if (sdb == NULL) return -EINVAL; @@ -1179,10 +1493,17 @@ int ipcp_flow_write(int fd, assert(ai.flows[fd].tx_rb); - idx = shm_du_buff_get_idx(sdb); - - shm_rbuff_write(ai.flows[fd].tx_rb, idx); - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + if (!ai.frcti[fd].used) { + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; + } + } else { + if (frcti_write(fd, sdb)) { + pthread_rwlock_unlock(&ai.lock); + return -1; + } + } pthread_rwlock_unlock(&ai.lock); @@ -1274,32 +1595,11 @@ int local_flow_write(int fd, return -ENOTALLOC; } - shm_rbuff_write(ai.flows[fd].tx_rb, idx); - - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - - pthread_rwlock_unlock(&ai.lock); - - return 0; -} - -int ipcp_read_shim(int fd, - struct shm_du_buff ** sdb) -{ - ssize_t idx; - - pthread_rwlock_rdlock(&ai.lock); - - assert(ai.flows[fd].rx_rb); - - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - if (idx < 0) { + if (finalize_write(fd, idx)) { pthread_rwlock_unlock(&ai.lock); - return -EAGAIN; + return -ENOTALLOC; } - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.lock); return 0; diff --git a/src/lib/frct_pci.c b/src/lib/frct_pci.c new file mode 100644 index 00000000..92cf8cd9 --- /dev/null +++ b/src/lib/frct_pci.c @@ -0,0 +1,105 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Protocol Control Information of FRCT + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include +#include +#include +#include + +#include +#include + +#define TYPE_SIZE 1 +#define SEQNO_SIZE 8 +#define FLAGS_SIZE 1 + +/* FIXME: Head size will differ on type */ +#define HEAD_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE + +int frct_pci_ser(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check) +{ + uint8_t * head; + uint8_t * tail; + + assert(sdb); + assert(pci); + + head = shm_du_buff_head_alloc(sdb, HEAD_SIZE); + if (head == NULL) + return -EPERM; + + memcpy(head, &pci->type, TYPE_SIZE); + memcpy(head + TYPE_SIZE, &pci->flags, FLAGS_SIZE); + memcpy(head + TYPE_SIZE + FLAGS_SIZE, &pci->seqno, SEQNO_SIZE); + + if (error_check) { + tail = shm_du_buff_tail_alloc(sdb, hash_len(HASH_CRC32)); + if (tail == NULL) { + shm_du_buff_head_release(sdb, HEAD_SIZE); + return -EPERM; + } + + *((uint32_t *) tail) = 0; + mem_hash(HASH_CRC32, (uint32_t *) tail, head, tail - head); + } + + return 0; +} + +int frct_pci_des(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check) +{ + uint8_t * head; + uint8_t * tail; + uint32_t crc; + + assert(sdb); + assert(pci); + + head = shm_du_buff_head(sdb); + + /* FIXME: Depending on the type a different deserialization */ + memcpy(&pci->type, head, TYPE_SIZE); + memcpy(&pci->flags, head + TYPE_SIZE, FLAGS_SIZE); + memcpy(&pci->seqno, head + TYPE_SIZE + FLAGS_SIZE, SEQNO_SIZE); + + if (error_check) { + tail = shm_du_buff_tail(sdb); + if (tail == NULL) + return -EPERM; + + mem_hash(HASH_CRC32, &crc, head, tail - head); + + /* Corrupted SDU */ + if (crc != 0) + return -1; + + shm_du_buff_tail_release(sdb, hash_len(HASH_CRC32)); + } + + shm_du_buff_head_release(sdb, HEAD_SIZE); + + return 0; +} diff --git a/src/lib/hash.c b/src/lib/hash.c index d8cabfd3..e062a0ad 100644 --- a/src/lib/hash.c +++ b/src/lib/hash.c @@ -64,45 +64,46 @@ uint16_t hash_len(enum hash_algo algo) #endif } -void str_hash(enum hash_algo algo, - void * buf, - const char * str) +void mem_hash(enum hash_algo algo, + void * dst, + const uint8_t * buf, + size_t len) { #ifdef HAVE_LIBGCRYPT - gcry_md_hash_buffer(algo, buf, str, strlen(str)); + gcry_md_hash_buffer(algo, dst, buf, len); #else struct sha3_ctx sha3_ctx; struct md5_ctx md5_ctx; switch (algo) { case HASH_CRC32: - memset(buf, 0, CRC32_HASH_LEN); - crc32((uint32_t *) buf, str, strlen(str)); + memset(dst, 0, CRC32_HASH_LEN); + crc32((uint32_t *) dst, buf, len); break; case HASH_MD5: rhash_md5_init(&md5_ctx); - rhash_md5_update(&md5_ctx, str, strlen(str)); - rhash_md5_final(&md5_ctx, (uint8_t *) buf); + rhash_md5_update(&md5_ctx, buf, len); + rhash_md5_final(&md5_ctx, (uint8_t *) dst); break; case HASH_SHA3_224: rhash_sha3_224_init(&sha3_ctx); - rhash_sha3_update(&sha3_ctx, str, strlen(str)); - rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); + rhash_sha3_update(&sha3_ctx, buf, len); + rhash_sha3_final(&sha3_ctx, (uint8_t *) dst); break; case HASH_SHA3_256: rhash_sha3_256_init(&sha3_ctx); - rhash_sha3_update(&sha3_ctx, str, strlen(str)); - rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); + rhash_sha3_update(&sha3_ctx, buf, len); + rhash_sha3_final(&sha3_ctx, (uint8_t *) dst); break; case HASH_SHA3_384: rhash_sha3_384_init(&sha3_ctx); - rhash_sha3_update(&sha3_ctx, str, strlen(str)); - rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); + rhash_sha3_update(&sha3_ctx, buf, len); + rhash_sha3_final(&sha3_ctx, (uint8_t *) dst); break; case HASH_SHA3_512: rhash_sha3_512_init(&sha3_ctx); - rhash_sha3_update(&sha3_ctx, str, strlen(str)); - rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); + rhash_sha3_update(&sha3_ctx, buf, len); + rhash_sha3_final(&sha3_ctx, (uint8_t *) dst); break; default: assert(false); @@ -110,3 +111,10 @@ void str_hash(enum hash_algo algo, } #endif } + +void str_hash(enum hash_algo algo, + void * dst, + const char * str) +{ + return mem_hash(algo, dst, (const uint8_t *) str, strlen(str)); +} diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index cd6946d4..2f1d4e33 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -326,10 +326,9 @@ void shm_flow_set_notify(struct shm_flow_set * set, ssize_t shm_flow_set_wait(const struct shm_flow_set * set, size_t idx, int * fqueue, - const struct timespec * timeout) + const struct timespec * abstime) { ssize_t ret = 0; - struct timespec abstime; assert(set); assert(idx < AP_MAX_FQUEUES); @@ -341,19 +340,15 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set, if (pthread_mutex_lock(set->lock) == EOWNERDEAD) pthread_mutex_consistent(set->lock); #endif - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) set->lock); while (set->heads[idx] == 0 && ret != -ETIMEDOUT) { - if (timeout != NULL) + if (abstime != NULL) ret = -pthread_cond_timedwait(set->conds + idx, set->lock, - &abstime); + abstime); else ret = -pthread_cond_wait(set->conds + idx, set->lock); diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 33e236b0..b420b785 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -281,9 +281,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) } ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, - const struct timespec * timeout) + const struct timespec * abstime) { - struct timespec abstime; ssize_t idx = -1; assert(rb); @@ -293,11 +292,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (idx != -EAGAIN) return idx; - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else @@ -308,10 +302,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, (void *) rb->lock); while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { - if (timeout != NULL) + if (abstime != NULL) idx = -pthread_cond_timedwait(rb->add, rb->lock, - &abstime); + abstime); else idx = -pthread_cond_wait(rb->add, rb->lock); #ifdef HAVE_ROBUST_MUTEX diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 44001458..7dc5f5d9 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -284,18 +284,12 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) } ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, - const struct timespec * timeout) + const struct timespec * abstime) { - struct timespec abstime; ssize_t idx = -1; assert(rb); - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else @@ -306,10 +300,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, (void *) rb->lock); while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { - if (timeout != NULL) + if (abstime != NULL) idx = -pthread_cond_timedwait(rb->add, rb->lock, - &abstime); + abstime); else idx = -pthread_cond_wait(rb->add, rb->lock); #ifdef HAVE_ROBUST_MUTEX diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 41c2074a..fd3c1c6a 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -11,6 +11,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c rib_test.c sha3_test.c time_utils_test.c + timerwheel_test.c ) add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c new file mode 100644 index 00000000..d9ca164e --- /dev/null +++ b/src/lib/tests/timerwheel_test.c @@ -0,0 +1,106 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Test of the timer wheel + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "timerwheel.c" + +#include +#include +#include +#include + +#define MAX_ELEMENTS 100 +#define MAX_RESOLUTION 10 /* ms */ +#define MAX_ADDITIONS 1000 + +int total; + +int add(void * o) +{ + total += *((int *) o); + return 0; +} + +int timerwheel_test(int argc, char ** argv) +{ + struct timerwheel * tw; + long resolution; + long elements; + struct timespec wait; + + int additions; + + int check_total = 0; + + int i; + int var = 5; + + struct tw_f * f; + + (void) argc; + (void) argv; + + total = 0; + + srand(time(NULL)); + + resolution = rand() % (MAX_RESOLUTION - 1) + 1; + elements = rand() % (MAX_ELEMENTS - 10) + 10; + + tw = timerwheel_create(resolution, resolution * elements); + if (tw == NULL) { + printf("Failed to create timerwheel.\n"); + return -1; + } + + wait.tv_sec = (resolution * elements) / 1000; + wait.tv_nsec = ((resolution * elements) % 1000) * MILLION; + + additions = rand() % MAX_ADDITIONS + 1000; + + for (i = 0; i < additions; ++i) { + int delay = rand() % (resolution * elements); + check_total += var; + f = timerwheel_start(tw, + (void (*)(void *)) add, + (void *) &var, + delay); + if (f == NULL) { + printf("Failed to add function."); + return -1; + } + } + + nanosleep(&wait, NULL); + + /* On some systems and VMs, the scheduler may be too slow. */ + if (total != check_total) + nanosleep(&wait, NULL); + + timerwheel_destroy(tw); + + if (total != check_total) { + printf("Totals do not match.\n"); + return -1; + } + + return 0; +} diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c new file mode 100644 index 00000000..7e2779d0 --- /dev/null +++ b/src/lib/timerwheel.c @@ -0,0 +1,371 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Timerwheel + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include + +#define FRAC 10 /* accuracy of the timer */ + +#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1)); +#define tw_free(tw) (tw_used(tw) + 1 < tw->elements) +#define tw_empty(tw) (tw->head == tw->tail) + +enum tw_state { + TW_NULL = 0, + TW_RUNNING, + TW_DESTROY +}; + +struct tw_f { + struct list_head next; + void (* func)(void *); + void * arg; +}; + +struct tw_el { + struct list_head funcs; + struct timespec expiry; +}; + +struct timerwheel { + struct tw_el * wheel; + + struct timespec intv; + + size_t pos; + + struct list_head wq; + + pthread_cond_t work; + pthread_mutex_t lock; + + int resolution; + unsigned int elements; + + enum tw_state state; + pthread_mutex_t s_lock; + + pthread_t ticker; + pthread_t worker; +}; + +static void tw_el_fini(struct tw_el * e) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &e->funcs) { + struct tw_f * f = list_entry(p, struct tw_f, next); + list_del(&f->next); + } +} + +static enum tw_state tw_get_state(struct timerwheel * tw) +{ + enum tw_state state; + + assert(tw); + + pthread_mutex_lock(&tw->s_lock); + + state = tw->state; + + pthread_mutex_unlock(&tw->s_lock); + + return state; +} + +static void tw_set_state(struct timerwheel * tw, enum tw_state state) +{ + assert(tw); + assert(state != TW_NULL); + + pthread_mutex_lock(&tw->s_lock); + + tw->state = state; + + pthread_mutex_unlock(&tw->s_lock); +} + +static void * worker(void * o) +{ + struct list_head * p; + struct list_head * h; + + struct timerwheel * tw = (struct timerwheel *) o; + struct timespec dl; + struct timespec now; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + ts_add(&now, &tw->intv, &dl); + + pthread_mutex_lock(&tw->lock); + + while (tw_get_state(tw) == TW_RUNNING) { + if (pthread_cond_timedwait(&tw->work, &tw->lock, &dl) + == ETIMEDOUT) + ts_add(&dl, &tw->intv, &dl); + + list_for_each_safe(p, h, &tw->wq) { + struct tw_f * f = list_entry(p, struct tw_f, next); + list_del(&f->next); + pthread_mutex_unlock(&tw->lock); + f->func(f->arg); + free(f); + + pthread_mutex_lock(&tw->lock); + } + } + + pthread_mutex_unlock(&tw->lock); + + return (void *) o; +} + +static void * movement(void * o) +{ + struct timerwheel * tw = (struct timerwheel *) o; + struct timespec now = {0, 0}; + long ms = tw->resolution * tw->elements; + struct timespec total = {ms / 1000, + (ms % 1000) * MILLION}; + struct list_head * p; + struct list_head * h; + + while (tw_get_state(tw) == TW_RUNNING) { + clock_gettime(CLOCK_MONOTONIC, &now); + + pthread_mutex_lock(&tw->lock); + + if (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) < 0) { + pthread_mutex_unlock(&tw->lock); + nanosleep(&tw->intv, NULL); + continue; + } + + list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) { + struct tw_f * f = list_entry(p, struct tw_f, next); + list_del(&f->next); + list_add(&f->next, &tw->wq); + } + + ts_add(&tw->wheel[tw->pos].expiry, + &total, + &tw->wheel[tw->pos].expiry); + + tw->pos = (tw->pos + 1) & (tw->elements - 1); + + pthread_cond_signal(&tw->work); + + pthread_mutex_unlock(&tw->lock); + } + + return (void *) 0; +} + +struct timerwheel * timerwheel_create(time_t resolution, + time_t max_delay) +{ + struct timespec now = {0, 0}; + struct timespec res_ts = {resolution / 1000, + (resolution % 1000) * MILLION}; + unsigned long i; + + struct timerwheel * tw; + + pthread_condattr_t cattr; + + assert(resolution != 0); + + tw = malloc(sizeof(*tw)); + if (tw == NULL) + return NULL; + + if (pthread_mutex_init(&tw->lock, NULL)) + return NULL; + + tw->elements = 1; + + while (tw->elements < max_delay / resolution) + tw->elements <<= 1; + + tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements); + if (tw->wheel == NULL) + goto fail_wheel_malloc; + + tw->resolution = resolution; + + tw->intv.tv_sec = (tw->resolution / FRAC) / 1000; + tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION; + + list_head_init(&tw->wq); + + if (pthread_mutex_init(&tw->lock, NULL)) + goto fail_lock_init; + + if (pthread_mutex_init(&tw->s_lock, NULL)) + goto fail_s_lock_init; + + if (pthread_condattr_init(&cattr)) + goto fail_cond_init; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&tw->work, &cattr)) + goto fail_cond_init; + + tw->pos = 0; + tw->state = TW_RUNNING; + + clock_gettime(CLOCK_MONOTONIC, &now); + now.tv_nsec -= (now.tv_nsec % MILLION); + + for (i = 0; i < tw->elements; ++i) { + list_head_init(&tw->wheel[i].funcs); + tw->wheel[i].expiry = now; + ts_add(&now, &res_ts, &now); + } + + if (pthread_create(&tw->worker, NULL, worker, (void *) tw)) + goto fail_worker_create; + + if (pthread_create(&tw->ticker, NULL, movement, (void *) tw)) { + tw_set_state(tw, TW_DESTROY); + goto fail_ticker_create; + } + + return tw; + + fail_ticker_create: + pthread_join(tw->worker, NULL); + fail_worker_create: + pthread_cond_destroy(&tw->work); + fail_cond_init: + pthread_mutex_destroy(&tw->s_lock); + fail_s_lock_init: + pthread_mutex_destroy(&tw->lock); + fail_lock_init: + free(tw->wheel); + fail_wheel_malloc: + free(tw); + return NULL; +} + +void timerwheel_destroy(struct timerwheel * tw) +{ + unsigned long i; + + struct list_head * p; + struct list_head * h; + + tw_set_state(tw, TW_DESTROY); + + pthread_join(tw->ticker, NULL); + pthread_join(tw->worker, NULL); + + for (i = 0; i < tw->elements; ++i) + tw_el_fini(&tw->wheel[i]); + + pthread_mutex_lock(&tw->lock); + + list_for_each_safe(p, h, &tw->wq) { + struct tw_f * f = list_entry(p, struct tw_f, next); + list_del(&f->next); + free(f); + } + + pthread_mutex_unlock(&tw->lock); + + pthread_cond_destroy(&tw->work); + pthread_mutex_destroy(&tw->lock); + pthread_mutex_destroy(&tw->s_lock); + + free(tw->wheel); + free(tw); +} + +struct tw_f * timerwheel_start(struct timerwheel * tw, + void (* func)(void *), + void * arg, + time_t delay) +{ + int pos; + struct tw_f * f = malloc(sizeof(*f)); + if (f == NULL) + return NULL; + + f->func = func; + f->arg = arg; + + assert(delay < tw->elements * tw->resolution); + + pthread_mutex_lock(&tw->lock); + + pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); + list_add(&f->next, &tw->wheel[pos].funcs); + + pthread_mutex_unlock(&tw->lock); + + return f; +} + +int timerwheel_restart(struct timerwheel * tw, + struct tw_f * f, + time_t delay) +{ + int pos; + + assert(tw); + assert(delay < tw->elements * tw->resolution); + + pthread_mutex_lock(&tw->lock); + + list_del(&f->next); + pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); + list_add(&f->next, &tw->wheel[pos].funcs); + + pthread_mutex_unlock(&tw->lock); + + return 0; +} + +void timerwheel_stop(struct timerwheel * tw, + struct tw_f * f) +{ + assert(tw); + + pthread_mutex_lock(&tw->lock); + + list_del(&f->next); + free(f); + + pthread_mutex_unlock(&tw->lock); +} -- cgit v1.2.3