diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/lib/dev.c | 15 | ||||
| -rw-r--r-- | src/lib/frct.c | 63 | ||||
| -rw-r--r-- | src/lib/rxmwheel.c | 252 | ||||
| -rw-r--r-- | src/lib/shm_rdrbuff.c | 45 | ||||
| -rw-r--r-- | src/lib/tests/CMakeLists.txt | 7 | ||||
| -rw-r--r-- | src/lib/tests/timerwheel_test.c | 104 | ||||
| -rw-r--r-- | src/lib/timerwheel.c | 229 | 
8 files changed, 321 insertions, 395 deletions
| diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 414a7203..a6dc00c6 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -183,7 +183,6 @@ set(SOURCE_FILES_DEV    # Add source files here    cacep.c    dev.c -  timerwheel.c    )  set(SOURCE_FILES_IRM diff --git a/src/lib/dev.c b/src/lib/dev.c index a7b342ba..2a5c3f83 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -44,7 +44,6 @@  #include <ouroboros/shm_rbuff.h>  #include <ouroboros/utils.h>  #include <ouroboros/fqueue.h> -#include <ouroboros/timerwheel.h>  #include <stdlib.h>  #include <string.h> @@ -89,6 +88,9 @@ struct port {          pthread_cond_t  state_cond;  }; +#define frcti_to_flow(frcti) \ +        ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti))) +  struct flow {          struct shm_rbuff *    rx_rb;          struct shm_rbuff *    tx_rb; @@ -399,12 +401,12 @@ static void init(int     argc,          if (pthread_rwlock_init(&ai.lock, NULL))                  goto fail_lock; -        if (frct_init()) -                goto fail_frct; +        if (rxmwheel_init()) +                goto fail_rxmwheel;          return; - fail_frct: + fail_rxmwheel:          pthread_rwlock_destroy(&ai.lock);   fail_lock:          for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -440,7 +442,7 @@ static void fini(void)          if (ai.fds == NULL)                  return; -        frct_fini(); +        rxmwheel_fini();          if (ai.prog != NULL)                  free(ai.prog); @@ -466,9 +468,6 @@ static void fini(void)          shm_rdrbuff_close(ai.rdrb); -        if (ai.tw != NULL) -                timerwheel_destroy(ai.tw); -          free(ai.flows);          free(ai.ports); diff --git a/src/lib/frct.c b/src/lib/frct.c index 200a9fe7..db3572e3 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -39,8 +39,9 @@ struct frct_cr {          uint8_t  cflags;          uint32_t seqno; -        time_t   act; -        time_t   inact; +        time_t   rto;     /* ms */ +        time_t   act;     /* s */ +        time_t   inact;   /* s */  };  struct frcti { @@ -54,16 +55,9 @@ struct frcti {          struct frct_cr   rcv_cr;          ssize_t          rq[RQ_SIZE]; - -        struct timespec  rtt; -          pthread_rwlock_t lock;  }; -struct { -        struct timerwheel * tw; -} frct; -  enum frct_flags {          FRCT_DATA = 0x01, /* PDU carries data */          FRCT_DRF  = 0x02, /* Data run flag    */ @@ -83,21 +77,7 @@ struct frct_pci {          uint32_t ackno;  } __attribute__((packed)); -static int frct_init(void) -{ -        frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); -        if (frct.tw == NULL) -                return -1; - -        return 0; -} - -static void frct_fini(void) -{ -        assert(frct.tw); - -        timerwheel_destroy(frct.tw); -} +#include <rxmwheel.c>  static struct frcti * frcti_create(int fd)  { @@ -129,6 +109,8 @@ static struct frcti * frcti_create(int fd)          frcti->snd_cr.inact  = 3 * delta_t;          frcti->snd_cr.act    = now.tv_sec - (frcti->snd_cr.inact + 1); +        /* Initial rto. FIXME: recalc using Karn algorithm. */ +        frcti->snd_cr.rto    = 120;          if (ai.flows[fd].spec.loss == 0)                  frcti->snd_cr.cflags |= FRCTFRTX; @@ -151,6 +133,8 @@ static void frcti_destroy(struct frcti * frcti)           * make sure everything we sent is acked.           */ +        rxmwheel_clear(frcti->fd); +          pthread_rwlock_destroy(&frcti->lock);          free(frcti); @@ -204,7 +188,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)  static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)  { -        struct frct_pci * pci = NULL; +        struct frct_pci * pci;          pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);          if (pci != NULL) @@ -219,10 +203,14 @@ static int __frcti_snd(struct frcti *       frcti,          struct frct_pci * pci;          struct timespec   now;          struct frct_cr *  snd_cr; +        struct frct_cr *  rcv_cr;          assert(frcti);          snd_cr = &frcti->snd_cr; +        rcv_cr = &frcti->rcv_cr; + +        rxmwheel_move();          pci = frcti_alloc_head(sdb);          if (pci == NULL) @@ -250,13 +238,16 @@ static int __frcti_snd(struct frcti *       frcti,                  frcti->snd_cr.lwe = snd_cr->seqno;          } -        pci->seqno = hton32(snd_cr->seqno++); -        if (!(snd_cr->cflags & FRCTFRTX)) -                snd_cr->lwe++; -        else -                /* TODO: update on ACK */ +        pci->seqno = hton32(snd_cr->seqno); +        if (!(snd_cr->cflags & FRCTFRTX)) {                  snd_cr->lwe++; +        } else if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { +                rxmwheel_add(frcti, snd_cr->seqno, sdb); +                pci->flags |= FRCT_ACK; +                pci->ackno = hton32(rcv_cr->lwe); +        } +        snd_cr->seqno++;          snd_cr->act  = now.tv_sec;          pthread_rwlock_unlock(&frcti->lock); @@ -271,6 +262,7 @@ static int __frcti_rcv(struct frcti *       frcti,          ssize_t           idx;          struct frct_pci * pci;          struct timespec   now; +        struct frct_cr *  snd_cr;          struct frct_cr *  rcv_cr;          uint32_t          seqno;          int               ret = 0; @@ -278,6 +270,7 @@ static int __frcti_rcv(struct frcti *       frcti,          assert(frcti);          rcv_cr = &frcti->rcv_cr; +        snd_cr = &frcti->snd_cr;          pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); @@ -317,6 +310,13 @@ static int __frcti_rcv(struct frcti *       frcti,                  }          } +        if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) { +                uint32_t ackno = ntoh32(pci->ackno); +                /* Check for duplicate (old) acks. */ +                if ((int32_t)(ackno - snd_cr->lwe) >= 0) +                        snd_cr->lwe = ackno; +        } +          rcv_cr->act = now.tv_sec;          pthread_rwlock_unlock(&frcti->lock); @@ -324,10 +324,13 @@ static int __frcti_rcv(struct frcti *       frcti,          if (!(pci->flags & FRCT_DATA))                  shm_rdrbuff_remove(ai.rdrb, idx); +        rxmwheel_move(); +          return ret;   drop_packet:          pthread_rwlock_unlock(&frcti->lock);          shm_rdrbuff_remove(ai.rdrb, idx); +        rxmwheel_move();          return -EAGAIN;  } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c new file mode 100644 index 00000000..697c6a48 --- /dev/null +++ b/src/lib/rxmwheel.c @@ -0,0 +1,252 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Timerwheel + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/list.h> + +#define RXMQ_S     12                 /* defines #slots     */ +#define RXMQ_M     15                 /* defines max delay  */ +#define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution */ +#define RXMQ_SLOTS (1 << RXMQ_S) +#define RXMQ_MAX   (1 << RXMQ_M)      /* ms                 */ + +/* Small inacurracy to avoid slow division by MILLION. */ +#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) +#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) + +struct rxm { +        struct list_head     next; +        uint32_t             seqno; +        struct shm_du_buff * sdb; +        uint8_t *            head; +        uint8_t *            tail; +        time_t               t0;    /* Time when original was sent (s).  */ +        size_t               mul;   /* RTO multiplier.                   */ +        struct frcti *       frcti; +}; + +struct { +        struct list_head wheel[RXMQ_SLOTS]; + +        size_t           prv; /* Last processed slot. */ +        pthread_mutex_t  lock; +} rw; + +static void rxmwheel_fini(void) +{ +        size_t             i; +        struct list_head * p; +        struct list_head * h; + +        for (i = 0; i < RXMQ_SLOTS; ++i) { +                list_for_each_safe(p, h, &rw.wheel[i]) { +                        struct rxm * rxm = list_entry(p, struct rxm, next); +                        list_del(&rxm->next); +                        free(rxm); +                } +        } +} + +static int rxmwheel_init(void) +{ +        struct timespec now; +        size_t          i; + +        if (pthread_mutex_init(&rw.lock, NULL)) +                return -1; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        /* Mark the previous timeslot as the last one processed. */ +        rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); + +        for (i = 0; i < RXMQ_SLOTS; ++i) +                list_head_init(&rw.wheel[i]); + +        return 0; +} + +static void rxmwheel_clear(int fd) +{ +        size_t i; + +        /* FIXME: Add list element to avoid looping over full rxmwheel */ +        pthread_mutex_lock(&rw.lock); + +        for (i = 0; i < RXMQ_SLOTS; ++i) { +                struct list_head * p; +                struct list_head * h; + +                list_for_each_safe(p, h, &rw.wheel[i]) { +                        struct rxm * r = list_entry(p, struct rxm, next); +                        if (r->frcti->fd == fd) { +                                list_del(&r->next); +                                shm_du_buff_ack(r->sdb); +                                ipcp_sdb_release(r->sdb); +                                free(r); +                        } +                } +        } + +        pthread_mutex_unlock(&rw.lock); +} + +/* Return fd on r-timer expiry. */ +static int rxmwheel_move(void) +{ +        struct timespec    now; +        struct list_head * p; +        struct list_head * h; +        size_t             slot; +        size_t             i; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        slot = ts_to_slot(now); + +        pthread_mutex_lock(&rw.lock); + +        for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { +                list_for_each_safe(p, h, &rw.wheel[i]) { +                        struct rxm *         r; +                        struct frct_cr *     snd_cr; +                        struct frct_cr *     rcv_cr; +                        size_t               rslot; +                        time_t               newtime; +                        ssize_t              idx; +                        struct shm_du_buff * sdb; +                        uint8_t *            head; +                        struct flow *        f; + +                        r = list_entry(p, struct rxm, next); +                        list_del(&r->next); + +                        snd_cr = &r->frcti->snd_cr; +                        rcv_cr = &r->frcti->rcv_cr; +                        /* Has been ack'd, remove. */ +                        if ((int) (r->seqno - snd_cr->lwe) <= 0) { +                                shm_du_buff_ack(r->sdb); +                                ipcp_sdb_release(r->sdb); +                                free(r); +                                continue; +                        } +                        /* Check for r-timer expiry. */ +                        if (ts_to_ms(now) - r->t0 > r->frcti->r) { +                                int fd = r->frcti->fd; +                                pthread_mutex_unlock(&rw.lock); +                                shm_du_buff_ack(r->sdb); +                                ipcp_sdb_release(r->sdb); +                                free(r); +                                return fd; +                        } + +                        /* Copy the payload, safe rtx in other layers. */ +                        if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { +                                /* FIXME: reschedule send? */ +                                int fd = r->frcti->fd; +                                pthread_mutex_unlock(&rw.lock); +                                shm_du_buff_ack(r->sdb); +                                ipcp_sdb_release(r->sdb); +                                free(r); +                                return fd; +                        } + +                        idx = shm_du_buff_get_idx(sdb); + +                        head = shm_du_buff_head(sdb); +                        memcpy(head, r->head, r->tail - r->head); + +                        /* Release the old copy */ +                        shm_du_buff_ack(r->sdb); +                        ipcp_sdb_release(r->sdb); + +                        /* Update ackno and make sure DRF is not set*/ +                        ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); +                        ((struct frct_pci *) head)->flags &= ~FRCT_DRF; + +                        f = &ai.flows[r->frcti->fd]; + +                        /* Retransmit the copy. */ +                        if (shm_rbuff_write(f->tx_rb, idx)) { +                                ipcp_sdb_release(sdb); +                                free(r); +                                /* FIXME: reschedule send? */ +                                continue; +                        } + +                        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + +                        /* Reschedule. */ +                        shm_du_buff_wait_ack(sdb); + +                        r->head = head; +                        r->tail = shm_du_buff_tail(sdb); +                        r->sdb  = sdb; + +                        newtime = ts_to_ms(now) + (snd_cr->rto << ++r->mul); +                        rslot   = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); + +                        list_add_tail(&r->next, &rw.wheel[rslot]); +                } +        } + +        rw.prv = slot; + +        pthread_mutex_unlock(&rw.lock); + +        return 0; +} + +static int rxmwheel_add(struct frcti *       frcti, +                        uint32_t             seqno, +                        struct shm_du_buff * sdb) +{ +        struct timespec now; +        struct rxm *    r; +        size_t          slot; + +        r = malloc(sizeof(*r)); +        if (r == NULL) +                return -ENOMEM; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        pthread_mutex_lock(&rw.lock); + +        r->t0    = ts_to_ms(now); +        r->mul   = 0; +        r->seqno = seqno; +        r->sdb   = sdb; +        r->head  = shm_du_buff_head(sdb); +        r->tail  = shm_du_buff_tail(sdb); +        r->frcti = frcti; + +        slot = ((r->t0 + frcti->snd_cr.rto) >> RXMQ_R) +                & (RXMQ_SLOTS - 1); + +        list_add_tail(&r->next, &rw.wheel[slot]); + +        pthread_mutex_unlock(&rw.lock); + +        shm_du_buff_wait_ack(sdb); + +        return 0; +} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 8bbdda46..31d9f2b6 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -65,11 +65,6 @@  #define shm_rdrb_empty(rdrb)                                                   \          (*rdrb->tail == *rdrb->head) -enum shm_du_buff_flags { -        SDB_VALID = 0, -        SDB_NULL -}; -  struct shm_du_buff {          size_t size;  #ifdef SHM_RDRB_MULTI_BLOCK @@ -77,7 +72,7 @@ struct shm_du_buff {  #endif          size_t du_head;          size_t du_tail; -        size_t flags; +        size_t refs;          size_t idx;  }; @@ -96,11 +91,11 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)  #ifdef SHM_RDRB_MULTI_BLOCK          struct shm_du_buff * sdb;          while (!shm_rdrb_empty(rdrb) && -               (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL) +               (sdb = get_tail_ptr(rdrb))->refs == 0)                  *rdrb->tail = (*rdrb->tail + sdb->blocks)                          & ((SHM_BUFFER_SIZE) - 1);  #else -        while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL) +        while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0)                  *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);  #endif          pthread_cond_broadcast(rdrb->healthy); @@ -108,7 +103,7 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)  static void sanitize(struct shm_rdrbuff * rdrb)  { -        get_head_ptr(rdrb)->flags = SDB_NULL; +        --get_head_ptr(rdrb)->refs;          garbage_collect(rdrb);          pthread_mutex_consistent(rdrb->lock);  } @@ -338,7 +333,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,                  sdb = get_head_ptr(rdrb);                  sdb->size    = 0;                  sdb->blocks  = padblocks; -                sdb->flags   = SDB_NULL; +                sdb->refs    = 0;                  sdb->du_head = 0;                  sdb->du_tail = 0;                  sdb->idx     = *rdrb->head; @@ -347,7 +342,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,          }  #endif          sdb        = get_head_ptr(rdrb); -        sdb->flags = SDB_VALID; +        sdb->refs  = 1;          sdb->idx   = *rdrb->head;  #ifdef SHM_RDRB_MULTI_BLOCK          sdb->blocks  = blocks; @@ -434,7 +429,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff *    rdrb,                          sdb = get_head_ptr(rdrb);                          sdb->size    = 0;                          sdb->blocks  = padblocks; -                        sdb->flags   = SDB_NULL; +                        sdb->refs    = 0;                          sdb->du_head = 0;                          sdb->du_tail = 0;                          sdb->idx     = *rdrb->head; @@ -443,7 +438,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff *    rdrb,                  }  #endif                  sdb        = get_head_ptr(rdrb); -                sdb->flags = SDB_VALID; +                sdb->refs  = 1;                  sdb->idx   = *rdrb->head;  #ifdef SHM_RDRB_MULTI_BLOCK                  sdb->blocks  = blocks; @@ -497,6 +492,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb,  int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,                         size_t               idx)  { +        struct shm_du_buff * sdb; +          assert(rdrb);          assert(idx < (SHM_BUFFER_SIZE)); @@ -508,10 +505,13 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,  #endif          assert(!shm_rdrb_empty(rdrb)); -        idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; +        sdb = idx_to_du_buff_ptr(rdrb, idx); -        if (idx == *rdrb->tail) -                garbage_collect(rdrb); +        if (sdb->refs == 1) { /* only stack needs it, can be removed */ +                sdb->refs = 0; +                if (idx == *rdrb->tail) +                        garbage_collect(rdrb); +        }          pthread_mutex_unlock(rdrb->lock); @@ -603,3 +603,16 @@ void shm_du_buff_truncate(struct shm_du_buff * sdb,          sdb->du_tail = sdb->du_head + len;  } + +int shm_du_buff_wait_ack(struct shm_du_buff * sdb) +{ +        __sync_add_and_fetch(&sdb->refs, 1); + +        return 0; +} + +int shm_du_buff_ack(struct shm_du_buff * sdb) +{ +        __sync_sub_and_fetch(&sdb->refs, 1); +        return 0; +} diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 7d5b3651..cc51c203 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -1,12 +1,6 @@  get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)  get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) -if (NOT (APPLE OR GNU)) -  set(TIMERWHEEL_TEST "timerwheel_test.c") -else () -  set(TIMERWHEEL_TEST "") -endif () -  create_test_sourcelist(${PARENT_DIR}_tests test_suite.c    # Add new tests here    bitmap_test.c @@ -16,7 +10,6 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c    md5_test.c    sha3_test.c    time_utils_test.c -  ${TIMERWHEEL_TEST}    )  add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c deleted file mode 100644 index 0ec98316..00000000 --- a/src/lib/tests/timerwheel_test.c +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Test of the timer wheel - * - *    Dimitri Staessens <dimitri.staessens@ugent.be> - *    Sander Vrijders   <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include "timerwheel.c" - -#include <pthread.h> -#include <time.h> -#include <stdlib.h> -#include <stdio.h> - -#define MAX_ELEMENTS   100 -#define MAX_RESOLUTION 10  /* ms */ -#define MAX_ADDITIONS  1000 - -int total; - -int add(void * o) -{ -        total += *((int *) o); -        return 0; -} - -int timerwheel_test(int argc, char ** argv) -{ -        struct timerwheel * tw; -        long resolution; -        long elements; -        struct timespec wait; - -        int additions; - -        int check_total = 0; - -        int i; -        int var = 5; - -        struct tw_f * f; - -        (void) argc; -        (void) argv; - -        total = 0; - -        srand(time(NULL)); - -        resolution = rand() % (MAX_RESOLUTION - 1) + 1; -        elements = rand() % (MAX_ELEMENTS - 10) + 10; - -        tw = timerwheel_create(resolution, resolution * elements); -        if (tw == NULL) { -                printf("Failed to create timerwheel.\n"); -                return -1; -        } - -        wait.tv_sec = (resolution * elements) / 1000; -        wait.tv_nsec = ((resolution * elements) % 1000) * MILLION; - -        additions = rand() % MAX_ADDITIONS + 1000; - -        for (i = 0; i < additions; ++i) { -                int delay = rand() % (resolution * elements); -                check_total += var; -                f = timerwheel_start(tw, -                                     (void (*)(void *)) add, -                                     (void *) &var, -                                     delay); -                if (f == NULL) { -                        printf("Failed to add function."); -                        return -1; -                } -        } - -        nanosleep(&wait, NULL); - -        timerwheel_move(tw); - -        timerwheel_destroy(tw); - -        if (total != check_total) { -                printf("Totals do not match: %d and %d.\n", total, check_total); -                return -1; -        } - -        return 0; -} diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c deleted file mode 100644 index 0feda642..00000000 --- a/src/lib/timerwheel.c +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Timerwheel - * - *    Dimitri Staessens <dimitri.staessens@ugent.be> - *    Sander Vrijders   <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200112L - -#include "config.h" - -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> -#include <ouroboros/list.h> - -#include <pthread.h> -#include <stdlib.h> -#include <assert.h> -#include <string.h> - -#define FRAC 10 /* accuracy of the timer */ - -#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1)); -#define tw_free(tw) (tw_used(tw) + 1 < tw->elements) -#define tw_empty(tw) (tw->head == tw->tail) - -struct tw_f { -        struct list_head next; -        void (* func)(void *); -        void * arg; -}; - -struct tw_el { -        struct list_head funcs; -        struct timespec  expiry; -}; - -struct timerwheel { -        struct tw_el *   wheel; - -        struct timespec  intv; - -        size_t           pos; - -        pthread_mutex_t  lock; - -        time_t           resolution; -        size_t           elements; -}; - -static void tw_el_fini(struct tw_el * e) -{ -        struct list_head * p; -        struct list_head * h; - -        list_for_each_safe(p, h, &e->funcs) { -                struct tw_f * f = list_entry(p, struct tw_f, next); -                list_del(&f->next); -        } -} - -void timerwheel_move(struct timerwheel * tw) -{ -        struct timespec now = {0, 0}; -        long ms = tw->resolution * tw->elements; -        struct timespec total = {ms / 1000, -                                 (ms % 1000) * MILLION}; -        struct list_head * p; -        struct list_head * h; - -        clock_gettime(CLOCK_MONOTONIC, &now); - -        pthread_mutex_lock(&tw->lock); - -        while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) { -                list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) { -                        struct tw_f * f = list_entry(p, struct tw_f, next); -                        list_del(&f->next); -                        f->func(f->arg); -                        free(f); -                } - -                ts_add(&tw->wheel[tw->pos].expiry, -                       &total, -                       &tw->wheel[tw->pos].expiry); - -                tw->pos = (tw->pos + 1) & (tw->elements - 1); -        } - -        pthread_mutex_unlock(&tw->lock); -} - -struct timerwheel * timerwheel_create(time_t resolution, -                                      time_t max_delay) -{ -        struct timespec now = {0, 0}; -        struct timespec res_ts = {resolution / 1000, -                                  (resolution % 1000) * MILLION}; -        size_t i; - -        struct timerwheel * tw; - -        assert(resolution != 0); - -        tw = malloc(sizeof(*tw)); -        if (tw == NULL) -                return NULL; - -        tw->elements = 1; - -        while (tw->elements < (size_t) max_delay / resolution) -                tw->elements <<= 1; - -        tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements); -        if (tw->wheel == NULL) -                goto fail_wheel_malloc; - -        tw->resolution = resolution; - -        tw->intv.tv_sec = (tw->resolution / FRAC) / 1000; -        tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION; - -        if (pthread_mutex_init(&tw->lock, NULL)) -                goto fail_lock_init; - -        tw->pos = 0; - -        clock_gettime(CLOCK_MONOTONIC, &now); -        now.tv_nsec -= (now.tv_nsec % MILLION); - -        for (i = 0; i < tw->elements; ++i) { -                list_head_init(&tw->wheel[i].funcs); -                tw->wheel[i].expiry = now; -                ts_add(&now, &res_ts, &now); -        } - -        return tw; - - fail_lock_init: -         free(tw->wheel); - fail_wheel_malloc: -         free(tw); -         return NULL; -} - -void timerwheel_destroy(struct timerwheel * tw) -{ -        unsigned long i; - -        for (i = 0; i < tw->elements; ++i) -                tw_el_fini(&tw->wheel[i]); - -        pthread_mutex_destroy(&tw->lock); -        free(tw->wheel); -        free(tw); -} - -struct tw_f * timerwheel_start(struct timerwheel * tw, -                               void (* func)(void *), -                               void *              arg, -                               time_t              delay) -{ -        int pos; -        struct tw_f * f = malloc(sizeof(*f)); -        if (f == NULL) -                return NULL; - -        f->func = func; -        f->arg = arg; - -        assert(delay < (time_t) tw->elements * tw->resolution); - -        pthread_mutex_lock(&tw->lock); - -        pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); -        list_add(&f->next, &tw->wheel[pos].funcs); - -        pthread_mutex_unlock(&tw->lock); - -        return f; -} - -int timerwheel_restart(struct timerwheel * tw, -                       struct tw_f *       f, -                       time_t              delay) -{ -        int pos; - -        assert(tw); -        assert(delay < (time_t) tw->elements * tw->resolution); - -        pthread_mutex_lock(&tw->lock); - -        list_del(&f->next); -        pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); -        list_add(&f->next, &tw->wheel[pos].funcs); - -        pthread_mutex_unlock(&tw->lock); - -        return 0; -} - -void timerwheel_stop(struct timerwheel * tw, -                     struct tw_f *       f) -{ -        assert(tw); - -        pthread_mutex_lock(&tw->lock); - -        list_del(&f->next); -        free(f); - -        pthread_mutex_unlock(&tw->lock); -} | 
