diff options
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/lib/dev.c | 15 | ||||
-rw-r--r-- | src/lib/frct.c | 85 | ||||
-rw-r--r-- | src/lib/rxmwheel.c | 251 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 45 | ||||
-rw-r--r-- | src/lib/tests/CMakeLists.txt | 7 | ||||
-rw-r--r-- | src/lib/tests/timerwheel_test.c | 104 | ||||
-rw-r--r-- | src/lib/timerwheel.c | 229 |
8 files changed, 334 insertions, 403 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 5dd37346..42164fac 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -183,7 +183,6 @@ set(SOURCE_FILES_DEV # Add source files here cacep.c dev.c - timerwheel.c ) set(SOURCE_FILES_IRM diff --git a/src/lib/dev.c b/src/lib/dev.c index 456019d2..6dbb925e 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -44,7 +44,6 @@ #include <ouroboros/shm_rbuff.h> #include <ouroboros/utils.h> #include <ouroboros/fqueue.h> -#include <ouroboros/timerwheel.h> #include <stdlib.h> #include <string.h> @@ -89,6 +88,9 @@ struct port { pthread_cond_t state_cond; }; +#define frcti_to_flow(frcti) \ + ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti))) + struct flow { struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; @@ -402,12 +404,12 @@ static void init(int argc, if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; - if (frct_init()) - goto fail_frct; + if (rxmwheel_init()) + goto fail_rxmwheel; return; - fail_frct: + fail_rxmwheel: pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -443,7 +445,7 @@ static void fini(void) if (ai.fds == NULL) return; - frct_fini(); + rxmwheel_fini(); if (ai.prog != NULL) free(ai.prog); @@ -469,9 +471,6 @@ static void fini(void) shm_rdrbuff_close(ai.rdrb); - if (ai.tw != NULL) - timerwheel_destroy(ai.tw); - free(ai.flows); free(ai.ports); diff --git a/src/lib/frct.c b/src/lib/frct.c index 200a9fe7..6041279a 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -39,8 +39,8 @@ struct frct_cr { uint8_t cflags; uint32_t seqno; - time_t act; - time_t inact; + time_t act; /* s */ + time_t inact; /* s */ }; struct frcti { @@ -50,24 +50,19 @@ struct frcti { time_t a; time_t r; + time_t rto; /* ms */ + struct frct_cr snd_cr; struct frct_cr rcv_cr; ssize_t rq[RQ_SIZE]; - - struct timespec rtt; - pthread_rwlock_t lock; }; -struct { - struct timerwheel * tw; -} frct; - enum frct_flags { FRCT_DATA = 0x01, /* PDU carries data */ FRCT_DRF = 0x02, /* Data run flag */ - FRCT_ACK = 0x03, /* ACK field valid */ + FRCT_ACK = 0x04, /* ACK field valid */ FRCT_FC = 0x08, /* FC window valid */ FRCT_RDVZ = 0x10, /* Rendez-vous */ FRCT_MFGM = 0x20, /* More fragments */ @@ -83,21 +78,7 @@ struct frct_pci { uint32_t ackno; } __attribute__((packed)); -static int frct_init(void) -{ - frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); - if (frct.tw == NULL) - return -1; - - return 0; -} - -static void frct_fini(void) -{ - assert(frct.tw); - - timerwheel_destroy(frct.tw); -} +#include <rxmwheel.c> static struct frcti * frcti_create(int fd) { @@ -129,9 +110,13 @@ static struct frcti * frcti_create(int fd) frcti->snd_cr.inact = 3 * delta_t; frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + /* Initial rto. FIXME: recalc using Karn algorithm. */ + frcti->rto = 120; - if (ai.flows[fd].spec.loss == 0) + if (ai.flows[fd].spec.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; + frcti->rcv_cr.cflags |= FRCTFRTX; + } frcti->rcv_cr.inact = 2 * delta_t; frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); @@ -151,6 +136,8 @@ static void frcti_destroy(struct frcti * frcti) * make sure everything we sent is acked. */ + rxmwheel_clear(frcti->fd); + pthread_rwlock_destroy(&frcti->lock); free(frcti); @@ -190,10 +177,10 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) /* See if we already have the next PDU. */ pthread_rwlock_wrlock(&frcti->lock); - pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); + pos = frcti->rcv_cr.seqno & (RQ_SIZE - 1); idx = frcti->rq[pos]; if (idx != -1) { - ++frcti->rcv_cr.lwe; + ++frcti->rcv_cr.seqno; frcti->rq[pos] = -1; } @@ -204,7 +191,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) { - struct frct_pci * pci = NULL; + struct frct_pci * pci; pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci != NULL) @@ -219,10 +206,14 @@ static int __frcti_snd(struct frcti * frcti, struct frct_pci * pci; struct timespec now; struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; assert(frcti); snd_cr = &frcti->snd_cr; + rcv_cr = &frcti->rcv_cr; + + rxmwheel_move(); pci = frcti_alloc_head(sdb); if (pci == NULL) @@ -247,16 +238,22 @@ static int __frcti_snd(struct frcti * frcti, #else random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); #endif - frcti->snd_cr.lwe = snd_cr->seqno; + frcti->snd_cr.lwe = snd_cr->seqno - 1; } - pci->seqno = hton32(snd_cr->seqno++); - if (!(snd_cr->cflags & FRCTFRTX)) - snd_cr->lwe++; - else - /* TODO: update on ACK */ + pci->seqno = hton32(snd_cr->seqno); + if (!(snd_cr->cflags & FRCTFRTX)) { snd_cr->lwe++; + } else if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { + rxmwheel_add(frcti, snd_cr->seqno, sdb); + if (rcv_cr->lwe <= rcv_cr->seqno) { + pci->flags |= FRCT_ACK; + pci->ackno = hton32(rcv_cr->seqno); + rcv_cr->lwe = rcv_cr->seqno; + } + } + snd_cr->seqno++; snd_cr->act = now.tv_sec; pthread_rwlock_unlock(&frcti->lock); @@ -271,6 +268,7 @@ static int __frcti_rcv(struct frcti * frcti, ssize_t idx; struct frct_pci * pci; struct timespec now; + struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; int ret = 0; @@ -278,6 +276,7 @@ static int __frcti_rcv(struct frcti * frcti, assert(frcti); rcv_cr = &frcti->rcv_cr; + snd_cr = &frcti->snd_cr; pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); @@ -301,22 +300,29 @@ static int __frcti_rcv(struct frcti * frcti, if (seqno == rcv_cr->seqno) { ++rcv_cr->seqno; } else { /* Out of order. */ - if ((int32_t)(seqno - rcv_cr->seqno) < 0) /* Duplicate. */ + if ((int32_t)(seqno - rcv_cr->seqno) < 0) goto drop_packet; if (rcv_cr->cflags & FRCTFRTX) { size_t pos = seqno & (RQ_SIZE - 1); - if ((seqno - rcv_cr->seqno) > RQ_SIZE /* Out of rq. */ + if ((seqno - rcv_cr->lwe) > RQ_SIZE /* Out of rq. */ || frcti->rq[pos] != -1) /* Duplicate in rq. */ goto drop_packet; /* Queue. */ frcti->rq[pos] = idx; ret = -EAGAIN; } else { - rcv_cr->seqno = seqno; + rcv_cr->seqno = seqno + 1; } } + if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) { + uint32_t ackno = ntoh32(pci->ackno); + /* Check for duplicate (old) acks. */ + if ((int32_t)(ackno - snd_cr->lwe) >= 0) + snd_cr->lwe = ackno; + } + rcv_cr->act = now.tv_sec; pthread_rwlock_unlock(&frcti->lock); @@ -324,10 +330,13 @@ static int __frcti_rcv(struct frcti * frcti, if (!(pci->flags & FRCT_DATA)) shm_rdrbuff_remove(ai.rdrb, idx); + rxmwheel_move(); + return ret; drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); + rxmwheel_move(); return -EAGAIN; } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c new file mode 100644 index 00000000..69151801 --- /dev/null +++ b/src/lib/rxmwheel.c @@ -0,0 +1,251 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Timerwheel + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/list.h> + +#define RXMQ_S 12 /* defines #slots */ +#define RXMQ_M 15 /* defines max delay */ +#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ +#define RXMQ_SLOTS (1 << RXMQ_S) +#define RXMQ_MAX (1 << RXMQ_M) /* ms */ + +/* Small inacurracy to avoid slow division by MILLION. */ +#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) +#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) + +struct rxm { + struct list_head next; + uint32_t seqno; + struct shm_du_buff * sdb; + uint8_t * head; + uint8_t * tail; + time_t t0; /* Time when original was sent (s). */ + size_t mul; /* RTO multiplier. */ + struct frcti * frcti; +}; + +struct { + struct list_head wheel[RXMQ_SLOTS]; + + size_t prv; /* Last processed slot. */ + pthread_mutex_t lock; +} rw; + +static void rxmwheel_fini(void) +{ + size_t i; + struct list_head * p; + struct list_head * h; + + for (i = 0; i < RXMQ_SLOTS; ++i) { + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * rxm = list_entry(p, struct rxm, next); + list_del(&rxm->next); + free(rxm); + } + } +} + +static int rxmwheel_init(void) +{ + struct timespec now; + size_t i; + + if (pthread_mutex_init(&rw.lock, NULL)) + return -1; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + /* Mark the previous timeslot as the last one processed. */ + rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); + + for (i = 0; i < RXMQ_SLOTS; ++i) + list_head_init(&rw.wheel[i]); + + return 0; +} + +static void rxmwheel_clear(int fd) +{ + size_t i; + + /* FIXME: Add list element to avoid looping over full rxmwheel */ + pthread_mutex_lock(&rw.lock); + + for (i = 0; i < RXMQ_SLOTS; ++i) { + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * r = list_entry(p, struct rxm, next); + if (r->frcti->fd == fd) { + list_del(&r->next); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + } + } + } + + pthread_mutex_unlock(&rw.lock); +} + +/* Return fd on r-timer expiry. */ +static int rxmwheel_move(void) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; + size_t slot; + size_t i; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + slot = ts_to_slot(now); + + pthread_mutex_lock(&rw.lock); + + for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * r; + struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; + size_t rslot; + time_t newtime; + ssize_t idx; + struct shm_du_buff * sdb; + uint8_t * head; + struct flow * f; + + r = list_entry(p, struct rxm, next); + list_del(&r->next); + + snd_cr = &r->frcti->snd_cr; + rcv_cr = &r->frcti->rcv_cr; + /* Has been ack'd, remove. */ + if ((int) (r->seqno - snd_cr->lwe) <= 0) { + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + continue; + } + /* Check for r-timer expiry. */ + if (ts_to_ms(now) - r->t0 > r->frcti->r) { + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + return fd; + } + + /* Copy the payload, safe rtx in other layers. */ + if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { + /* FIXME: reschedule send? */ + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + return fd; + } + + idx = shm_du_buff_get_idx(sdb); + + head = shm_du_buff_head(sdb); + memcpy(head, r->head, r->tail - r->head); + + /* Release the old copy */ + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + + /* Update ackno and make sure DRF is not set*/ + ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); + ((struct frct_pci *) head)->flags &= ~FRCT_DRF; + + f = &ai.flows[r->frcti->fd]; + + /* Retransmit the copy. */ + if (shm_rbuff_write(f->tx_rb, idx)) { + ipcp_sdb_release(sdb); + free(r); + /* FIXME: reschedule send? */ + continue; + } + + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + + /* Reschedule. */ + shm_du_buff_wait_ack(sdb); + + r->head = head; + r->tail = shm_du_buff_tail(sdb); + r->sdb = sdb; + + newtime = ts_to_ms(now) + (f->frcti->rto << ++r->mul); + rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); + + list_add_tail(&r->next, &rw.wheel[rslot]); + } + } + + rw.prv = slot; + + pthread_mutex_unlock(&rw.lock); + + return 0; +} + +static int rxmwheel_add(struct frcti * frcti, + uint32_t seqno, + struct shm_du_buff * sdb) +{ + struct timespec now; + struct rxm * r; + size_t slot; + + r = malloc(sizeof(*r)); + if (r == NULL) + return -ENOMEM; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_mutex_lock(&rw.lock); + + r->t0 = ts_to_ms(now); + r->mul = 0; + r->seqno = seqno; + r->sdb = sdb; + r->head = shm_du_buff_head(sdb); + r->tail = shm_du_buff_tail(sdb); + r->frcti = frcti; + + slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1); + + list_add_tail(&r->next, &rw.wheel[slot]); + + pthread_mutex_unlock(&rw.lock); + + shm_du_buff_wait_ack(sdb); + + return 0; +} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index a94d4169..e9ef9222 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -65,11 +65,6 @@ #define shm_rdrb_empty(rdrb) \ (*rdrb->tail == *rdrb->head) -enum shm_du_buff_flags { - SDB_VALID = 0, - SDB_NULL -}; - struct shm_du_buff { size_t size; #ifdef SHM_RDRB_MULTI_BLOCK @@ -77,7 +72,7 @@ struct shm_du_buff { #endif size_t du_head; size_t du_tail; - size_t flags; + size_t refs; size_t idx; }; @@ -95,11 +90,11 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) #ifdef SHM_RDRB_MULTI_BLOCK struct shm_du_buff * sdb; while (!shm_rdrb_empty(rdrb) && - (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL) + (sdb = get_tail_ptr(rdrb))->refs == 0) *rdrb->tail = (*rdrb->tail + sdb->blocks) & ((SHM_BUFFER_SIZE) - 1); #else - while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL) + while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0) *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); #endif pthread_cond_broadcast(rdrb->healthy); @@ -107,7 +102,7 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) static void sanitize(struct shm_rdrbuff * rdrb) { - get_head_ptr(rdrb)->flags = SDB_NULL; + --get_head_ptr(rdrb)->refs; garbage_collect(rdrb); pthread_mutex_consistent(rdrb->lock); } @@ -330,7 +325,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->flags = SDB_NULL; + sdb->refs = 0; sdb->du_head = 0; sdb->du_tail = 0; sdb->idx = *rdrb->head; @@ -339,7 +334,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, } #endif sdb = get_head_ptr(rdrb); - sdb->flags = SDB_VALID; + sdb->refs = 1; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; @@ -423,7 +418,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->flags = SDB_NULL; + sdb->refs = 0; sdb->du_head = 0; sdb->du_tail = 0; sdb->idx = *rdrb->head; @@ -432,7 +427,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, } #endif sdb = get_head_ptr(rdrb); - sdb->flags = SDB_VALID; + sdb->refs = 1; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; @@ -486,6 +481,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, size_t idx) { + struct shm_du_buff * sdb; + assert(rdrb); assert(idx < (SHM_BUFFER_SIZE)); @@ -497,10 +494,13 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, #endif assert(!shm_rdrb_empty(rdrb)); - idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; + sdb = idx_to_du_buff_ptr(rdrb, idx); - if (idx == *rdrb->tail) - garbage_collect(rdrb); + if (sdb->refs == 1) { /* only stack needs it, can be removed */ + sdb->refs = 0; + if (idx == *rdrb->tail) + garbage_collect(rdrb); + } pthread_mutex_unlock(rdrb->lock); @@ -592,3 +592,16 @@ void shm_du_buff_truncate(struct shm_du_buff * sdb, sdb->du_tail = sdb->du_head + len; } + +int shm_du_buff_wait_ack(struct shm_du_buff * sdb) +{ + __sync_add_and_fetch(&sdb->refs, 1); + + return 0; +} + +int shm_du_buff_ack(struct shm_du_buff * sdb) +{ + __sync_sub_and_fetch(&sdb->refs, 1); + return 0; +} diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 7d5b3651..cc51c203 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -1,12 +1,6 @@ get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) -if (NOT (APPLE OR GNU)) - set(TIMERWHEEL_TEST "timerwheel_test.c") -else () - set(TIMERWHEEL_TEST "") -endif () - create_test_sourcelist(${PARENT_DIR}_tests test_suite.c # Add new tests here bitmap_test.c @@ -16,7 +10,6 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c md5_test.c sha3_test.c time_utils_test.c - ${TIMERWHEEL_TEST} ) add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c deleted file mode 100644 index 0ec98316..00000000 --- a/src/lib/tests/timerwheel_test.c +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Test of the timer wheel - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include "timerwheel.c" - -#include <pthread.h> -#include <time.h> -#include <stdlib.h> -#include <stdio.h> - -#define MAX_ELEMENTS 100 -#define MAX_RESOLUTION 10 /* ms */ -#define MAX_ADDITIONS 1000 - -int total; - -int add(void * o) -{ - total += *((int *) o); - return 0; -} - -int timerwheel_test(int argc, char ** argv) -{ - struct timerwheel * tw; - long resolution; - long elements; - struct timespec wait; - - int additions; - - int check_total = 0; - - int i; - int var = 5; - - struct tw_f * f; - - (void) argc; - (void) argv; - - total = 0; - - srand(time(NULL)); - - resolution = rand() % (MAX_RESOLUTION - 1) + 1; - elements = rand() % (MAX_ELEMENTS - 10) + 10; - - tw = timerwheel_create(resolution, resolution * elements); - if (tw == NULL) { - printf("Failed to create timerwheel.\n"); - return -1; - } - - wait.tv_sec = (resolution * elements) / 1000; - wait.tv_nsec = ((resolution * elements) % 1000) * MILLION; - - additions = rand() % MAX_ADDITIONS + 1000; - - for (i = 0; i < additions; ++i) { - int delay = rand() % (resolution * elements); - check_total += var; - f = timerwheel_start(tw, - (void (*)(void *)) add, - (void *) &var, - delay); - if (f == NULL) { - printf("Failed to add function."); - return -1; - } - } - - nanosleep(&wait, NULL); - - timerwheel_move(tw); - - timerwheel_destroy(tw); - - if (total != check_total) { - printf("Totals do not match: %d and %d.\n", total, check_total); - return -1; - } - - return 0; -} diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c deleted file mode 100644 index 0feda642..00000000 --- a/src/lib/timerwheel.c +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Timerwheel - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200112L - -#include "config.h" - -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> -#include <ouroboros/list.h> - -#include <pthread.h> -#include <stdlib.h> -#include <assert.h> -#include <string.h> - -#define FRAC 10 /* accuracy of the timer */ - -#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1)); -#define tw_free(tw) (tw_used(tw) + 1 < tw->elements) -#define tw_empty(tw) (tw->head == tw->tail) - -struct tw_f { - struct list_head next; - void (* func)(void *); - void * arg; -}; - -struct tw_el { - struct list_head funcs; - struct timespec expiry; -}; - -struct timerwheel { - struct tw_el * wheel; - - struct timespec intv; - - size_t pos; - - pthread_mutex_t lock; - - time_t resolution; - size_t elements; -}; - -static void tw_el_fini(struct tw_el * e) -{ - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &e->funcs) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - } -} - -void timerwheel_move(struct timerwheel * tw) -{ - struct timespec now = {0, 0}; - long ms = tw->resolution * tw->elements; - struct timespec total = {ms / 1000, - (ms % 1000) * MILLION}; - struct list_head * p; - struct list_head * h; - - clock_gettime(CLOCK_MONOTONIC, &now); - - pthread_mutex_lock(&tw->lock); - - while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) { - list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - f->func(f->arg); - free(f); - } - - ts_add(&tw->wheel[tw->pos].expiry, - &total, - &tw->wheel[tw->pos].expiry); - - tw->pos = (tw->pos + 1) & (tw->elements - 1); - } - - pthread_mutex_unlock(&tw->lock); -} - -struct timerwheel * timerwheel_create(time_t resolution, - time_t max_delay) -{ - struct timespec now = {0, 0}; - struct timespec res_ts = {resolution / 1000, - (resolution % 1000) * MILLION}; - size_t i; - - struct timerwheel * tw; - - assert(resolution != 0); - - tw = malloc(sizeof(*tw)); - if (tw == NULL) - return NULL; - - tw->elements = 1; - - while (tw->elements < (size_t) max_delay / resolution) - tw->elements <<= 1; - - tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements); - if (tw->wheel == NULL) - goto fail_wheel_malloc; - - tw->resolution = resolution; - - tw->intv.tv_sec = (tw->resolution / FRAC) / 1000; - tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION; - - if (pthread_mutex_init(&tw->lock, NULL)) - goto fail_lock_init; - - tw->pos = 0; - - clock_gettime(CLOCK_MONOTONIC, &now); - now.tv_nsec -= (now.tv_nsec % MILLION); - - for (i = 0; i < tw->elements; ++i) { - list_head_init(&tw->wheel[i].funcs); - tw->wheel[i].expiry = now; - ts_add(&now, &res_ts, &now); - } - - return tw; - - fail_lock_init: - free(tw->wheel); - fail_wheel_malloc: - free(tw); - return NULL; -} - -void timerwheel_destroy(struct timerwheel * tw) -{ - unsigned long i; - - for (i = 0; i < tw->elements; ++i) - tw_el_fini(&tw->wheel[i]); - - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); -} - -struct tw_f * timerwheel_start(struct timerwheel * tw, - void (* func)(void *), - void * arg, - time_t delay) -{ - int pos; - struct tw_f * f = malloc(sizeof(*f)); - if (f == NULL) - return NULL; - - f->func = func; - f->arg = arg; - - assert(delay < (time_t) tw->elements * tw->resolution); - - pthread_mutex_lock(&tw->lock); - - pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); - list_add(&f->next, &tw->wheel[pos].funcs); - - pthread_mutex_unlock(&tw->lock); - - return f; -} - -int timerwheel_restart(struct timerwheel * tw, - struct tw_f * f, - time_t delay) -{ - int pos; - - assert(tw); - assert(delay < (time_t) tw->elements * tw->resolution); - - pthread_mutex_lock(&tw->lock); - - list_del(&f->next); - pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); - list_add(&f->next, &tw->wheel[pos].funcs); - - pthread_mutex_unlock(&tw->lock); - - return 0; -} - -void timerwheel_stop(struct timerwheel * tw, - struct tw_f * f) -{ - assert(tw); - - pthread_mutex_lock(&tw->lock); - - list_del(&f->next); - free(f); - - pthread_mutex_unlock(&tw->lock); -} |