diff options
author | Dimitri Staessens <dimitri.staessens@ugent.be> | 2018-07-27 00:18:20 +0200 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2018-07-27 08:53:17 +0200 |
commit | bfc29ca20406ccd69363b0f9796987534318e7ae (patch) | |
tree | ac41f39e11cfe989daa10277f14a9597bb5cc3b8 | |
parent | 1c7dcc2d37dc5a41379ca08b523bda58a51f11de (diff) | |
download | ouroboros-bfc29ca20406ccd69363b0f9796987534318e7ae.tar.gz ouroboros-bfc29ca20406ccd69363b0f9796987534318e7ae.zip |
lib: Support for rudimentary retransmission
This adds rudimentary support for sending and processing
acknowledgments and doing retransmission.
It replaces the generic timerwheel with a specific one for
retransmission. This is currently a fixed wheel allowing
retransmissions to be scheduled up to about 32 seconds into the
future. It currently has an 8ms resolution. This could be made
configurable in the future. Failures of the flow (i.e. rtx not
working) are indicated by the rxmwheel_move() function returning a fd.
This is currently not yet handled (maybe just setting the state of the
flow to FLOWDOWN is a better solution).
The shm_rdrbuff tracks the number of users of a du_buff. One user is
the full stack, each retransmission will increment the refs counter
(which effectively acts as a semaphore). The refs counter is
decremented when a packet is acked. The du_buff is only allowed to be
removed if there is only one user left (the "stack").
When a packet is retransmitted, it is copied in the rdrbuff. This is
to ensure integrity of the packet when multiple layers do
retransmission and it is passed down the stack again.
Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be>
Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
-rw-r--r-- | include/ouroboros/shm_du_buff.h | 4 | ||||
-rw-r--r-- | include/ouroboros/timerwheel.h | 47 | ||||
-rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/lib/dev.c | 15 | ||||
-rw-r--r-- | src/lib/frct.c | 73 | ||||
-rw-r--r-- | src/lib/rxmwheel.c | 252 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 45 | ||||
-rw-r--r-- | src/lib/tests/CMakeLists.txt | 7 | ||||
-rw-r--r-- | src/lib/tests/timerwheel_test.c | 104 | ||||
-rw-r--r-- | src/lib/timerwheel.c | 232 |
10 files changed, 330 insertions, 450 deletions
diff --git a/include/ouroboros/shm_du_buff.h b/include/ouroboros/shm_du_buff.h index 31090cd3..066898df 100644 --- a/include/ouroboros/shm_du_buff.h +++ b/include/ouroboros/shm_du_buff.h @@ -49,4 +49,8 @@ uint8_t * shm_du_buff_tail_release(struct shm_du_buff * sdb, void shm_du_buff_truncate(struct shm_du_buff * sdb, size_t len); +int shm_du_buff_wait_ack(struct shm_du_buff * sdb); + +int shm_du_buff_ack(struct shm_du_buff * sdb); + #endif /* OUROBOROS_SHM_DU_BUFF_H */ diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h deleted file mode 100644 index 231e8103..00000000 --- a/include/ouroboros/timerwheel.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Ring buffer for incoming SDUs - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_LIB_TIMERWHEEL_H -#define OUROBOROS_LIB_TIMERWHEEL_H - -struct timerwheel; - -struct timerwheel * timerwheel_create(time_t resolution, - time_t max_delay); - -void timerwheel_destroy(struct timerwheel * tw); - -struct tw_f * timerwheel_start(struct timerwheel * tw, - void (* func)(void *), - void * arg, - time_t delay); /* ms */ - -int timerwheel_restart(struct timerwheel * tw, - struct tw_f * f, - time_t delay); /* ms */ - -void timerwheel_stop(struct timerwheel * tw, - struct tw_f * f); - -void timerwheel_move(struct timerwheel * tw); - -#endif /* OUROBOROS_LIB_TIMERWHEEL_H */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index e7e07802..47e93d61 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -179,7 +179,6 @@ set(SOURCE_FILES_DEV # Add source files here cacep.c dev.c - timerwheel.c ) set(SOURCE_FILES_IRM diff --git a/src/lib/dev.c b/src/lib/dev.c index dd908f78..e69fec26 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -40,7 +40,6 @@ #include <ouroboros/utils.h> #include <ouroboros/fqueue.h> #include <ouroboros/qoscube.h> -#include <ouroboros/timerwheel.h> #include <stdlib.h> #include <string.h> @@ -83,6 +82,9 @@ struct port { pthread_cond_t state_cond; }; +#define frcti_to_flow(frcti) \ + ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti))) + struct flow { struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; @@ -396,12 +398,12 @@ static void init(int argc, if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; - if (frct_init()) - goto fail_frct; + if (rxmwheel_init()) + goto fail_rxmwheel; return; - fail_frct: + fail_rxmwheel: pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -437,7 +439,7 @@ static void fini(void) if (ai.fds == NULL) return; - frct_fini(); + rxmwheel_fini(); if (ai.prog != NULL) free(ai.prog); @@ -463,9 +465,6 @@ static void fini(void) shm_rdrbuff_close(ai.rdrb); - if (ai.tw != NULL) - timerwheel_destroy(ai.tw); - free(ai.flows); free(ai.ports); diff --git a/src/lib/frct.c b/src/lib/frct.c index 296d5b2c..0f3173c5 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -42,8 +42,9 @@ struct frct_cr { bool conf; uint8_t cflags; - time_t act; - time_t inact; + time_t rto; /* ms */ + time_t act; /* s */ + time_t inact; /* s */ }; struct frcti { @@ -57,16 +58,9 @@ struct frcti { struct frct_cr rcv_cr; ssize_t rq[RQ_SIZE]; - - struct timespec rtt; - pthread_rwlock_t lock; }; -struct { - struct timerwheel * tw; -} frct; - enum frct_flags { FRCT_DATA = 0x01, /* PDU carries data */ FRCT_DRF = 0x02, /* Data run flag */ @@ -89,21 +83,7 @@ struct frct_pci { uint32_t ackno; } __attribute__((packed)); -static int frct_init(void) -{ - frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); - if (frct.tw == NULL) - return -1; - - return 0; -} - -static void frct_fini(void) -{ - assert(frct.tw); - - timerwheel_destroy(frct.tw); -} +#include <rxmwheel.c> static struct frcti * frcti_create(int fd, qoscube_t qc) @@ -140,6 +120,8 @@ static struct frcti * frcti_create(int fd, frcti->snd_cr.conf = true; frcti->snd_cr.inact = 3 * delta_t + 1; frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + /* Initial rto. FIXME: recalc using Karn algorithm. */ + frcti->snd_cr.rto = 120; frcti->rcv_cr.inact = 2 * delta_t + 1; frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); @@ -159,6 +141,8 @@ static void frcti_destroy(struct frcti * frcti) * make sure everything is acked. */ + rxmwheel_clear(frcti->fd); + pthread_rwlock_destroy(&frcti->lock); free(frcti); @@ -254,7 +238,7 @@ static void frct_add_crc(uint8_t * head, static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) { - struct frct_pci * pci = NULL; + struct frct_pci * pci; pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci != NULL) @@ -269,10 +253,14 @@ static int __frcti_snd(struct frcti * frcti, struct frct_pci * pci; struct timespec now; struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; assert(frcti); snd_cr = &frcti->snd_cr; + rcv_cr = &frcti->rcv_cr; + + rxmwheel_move(); pci = frcti_alloc_head(sdb); if (pci == NULL) @@ -310,7 +298,7 @@ static int __frcti_snd(struct frcti * frcti, if (now.tv_sec - snd_cr->act > snd_cr->inact) { /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); -#ifdef OUROBOROS_CONFIG_DEBUG +#ifdef CONFIG_OUROBOROS_DEBUG frcti->snd_cr.seqno = 0; #else random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); @@ -318,13 +306,16 @@ static int __frcti_snd(struct frcti * frcti, frcti->snd_cr.lwe = frcti->snd_cr.seqno; } - pci->seqno = hton32(snd_cr->seqno++); - if (!(snd_cr->cflags & FRCTFRTX)) - snd_cr->lwe++; - else - /* TODO: update on ACK */ + pci->seqno = hton32(snd_cr->seqno); + if (!(snd_cr->cflags & FRCTFRTX)) { snd_cr->lwe++; + } else if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { + rxmwheel_add(frcti, snd_cr->seqno, sdb); + pci->flags |= FRCT_ACK; + pci->ackno = hton32(rcv_cr->lwe); + } + snd_cr->seqno++; snd_cr->act = now.tv_sec; snd_cr->conf = false; @@ -340,6 +331,7 @@ static int __frcti_rcv(struct frcti * frcti, ssize_t idx; struct frct_pci * pci; struct timespec now; + struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; int ret = 0; @@ -347,6 +339,7 @@ static int __frcti_rcv(struct frcti * frcti, assert(frcti); rcv_cr = &frcti->rcv_cr; + snd_cr = &frcti->snd_cr; pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); @@ -369,18 +362,18 @@ static int __frcti_rcv(struct frcti * frcti, if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { /* Inactive receiver, check for DRF. */ if (pci->flags & FRCT_DRF) /* New run. */ - rcv_cr->lwe = seqno; + rcv_cr->lwe = seqno - 1; else goto drop_packet; } - if (seqno == rcv_cr->lwe) { - ++rcv_cr->lwe; + if (seqno == rcv_cr->lwe + 1) { + rcv_cr->lwe = seqno; /* Check for online reconfiguration. */ if (pci->flags & FRCT_CFG) rcv_cr->cflags = pci->cflags; } else { /* Out of order. */ - if ((int32_t)(seqno - rcv_cr->lwe) < 0) /* Duplicate. */ + if ((int32_t)(seqno - rcv_cr->lwe) <= 0) /* Duplicate. */ goto drop_packet; if (rcv_cr->cflags & FRCTFRTX) { @@ -396,6 +389,13 @@ static int __frcti_rcv(struct frcti * frcti, } } + if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) { + uint32_t ackno = ntoh32(pci->ackno); + /* Check for duplicate (old) acks. */ + if ((int32_t)(ackno - snd_cr->lwe) >= 0) + snd_cr->lwe = ackno; + } + rcv_cr->act = now.tv_sec; if (!(pci->flags & FRCT_DATA)) @@ -403,10 +403,13 @@ static int __frcti_rcv(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); + rxmwheel_move(); + return ret; drop_packet: shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&frcti->lock); + rxmwheel_move(); return -EAGAIN; } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c new file mode 100644 index 00000000..e5891081 --- /dev/null +++ b/src/lib/rxmwheel.c @@ -0,0 +1,252 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Timerwheel + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/list.h> + +#define RXMQ_S 12 /* defines #slots */ +#define RXMQ_M 15 /* defines max delay */ +#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ +#define RXMQ_SLOTS (1 << RXMQ_S) +#define RXMQ_MAX (1 << RXMQ_M) /* ms */ + +/* Small inacurracy to avoid slow division by MILLION. */ +#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) +#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) + +struct rxm { + struct list_head next; + uint32_t seqno; + struct shm_du_buff * sdb; + uint8_t * head; + uint8_t * tail; + time_t t0; /* Time when original was sent (s). */ + size_t mul; /* RTO multiplier. */ + struct frcti * frcti; +}; + +struct { + struct list_head wheel[RXMQ_SLOTS]; + + size_t prv; /* Last processed slot. */ + pthread_mutex_t lock; +} rw; + +static void rxmwheel_fini(void) +{ + size_t i; + struct list_head * p; + struct list_head * h; + + for (i = 0; i < RXMQ_SLOTS; ++i) { + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * rxm = list_entry(p, struct rxm, next); + list_del(&rxm->next); + free(rxm); + } + } +} + +static int rxmwheel_init(void) +{ + struct timespec now; + size_t i; + + if (pthread_mutex_init(&rw.lock, NULL)) + return -1; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + /* Mark the previous timeslot as the last one processed. */ + rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); + + for (i = 0; i < RXMQ_SLOTS; ++i) + list_head_init(&rw.wheel[i]); + + return 0; +} + +static void rxmwheel_clear(int fd) +{ + size_t i; + + /* FIXME: Add list element to avoid looping over full rxmwheel */ + pthread_mutex_lock(&rw.lock); + + for (i = 0; i < RXMQ_SLOTS; ++i) { + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * r = list_entry(p, struct rxm, next); + if (r->frcti->fd == fd) { + list_del(&r->next); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + } + } + } + + pthread_mutex_unlock(&rw.lock); +} + +/* Return fd on r-timer expiry. */ +static int rxmwheel_move(void) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; + size_t slot; + size_t i; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + slot = ts_to_slot(now); + + pthread_mutex_lock(&rw.lock); + + for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * r; + struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; + size_t rslot; + time_t newtime; + ssize_t idx; + struct shm_du_buff * sdb; + uint8_t * head; + struct flow * f; + + r = list_entry(p, struct rxm, next); + list_del(&r->next); + + snd_cr = &r->frcti->snd_cr; + rcv_cr = &r->frcti->rcv_cr; + /* Has been ack'd, remove. */ + if ((int) (r->seqno - snd_cr->lwe) <= 0) { + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + continue; + } + /* Check for r-timer expiry. */ + if (ts_to_ms(now) - r->t0 > r->frcti->r) { + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + return fd; + } + + /* Copy the payload, safe rtx in other layers. */ + if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { + /* FIXME: reschedule send? */ + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + return fd; + } + + idx = shm_du_buff_get_idx(sdb); + + head = shm_du_buff_head(sdb); + memcpy(head, r->head, r->tail - r->head); + + /* Release the old copy */ + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + + /* Update ackno and make sure DRF is not set*/ + ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); + ((struct frct_pci *) head)->flags &= ~FRCT_DRF; + + f = &ai.flows[r->frcti->fd]; + + /* Retransmit the copy. */ + if (shm_rbuff_write(f->tx_rb, idx)) { + ipcp_sdb_release(sdb); + free(r); + /* FIXME: reschedule send? */ + continue; + } + + shm_flow_set_notify(f->set, f->port_id, FLOW_PKT); + + /* Reschedule. */ + shm_du_buff_wait_ack(sdb); + + r->head = head; + r->tail = shm_du_buff_tail(sdb); + r->sdb = sdb; + + newtime = ts_to_ms(now) + (snd_cr->rto << ++r->mul); + rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); + + list_add_tail(&r->next, &rw.wheel[rslot]); + } + } + + rw.prv = slot; + + pthread_mutex_unlock(&rw.lock); + + return 0; +} + +static int rxmwheel_add(struct frcti * frcti, + uint32_t seqno, + struct shm_du_buff * sdb) +{ + struct timespec now; + struct rxm * r; + size_t slot; + + r = malloc(sizeof(*r)); + if (r == NULL) + return -ENOMEM; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_mutex_lock(&rw.lock); + + r->t0 = ts_to_ms(now); + r->mul = 0; + r->seqno = seqno; + r->sdb = sdb; + r->head = shm_du_buff_head(sdb); + r->tail = shm_du_buff_tail(sdb); + r->frcti = frcti; + + slot = ((r->t0 + frcti->snd_cr.rto) >> RXMQ_R) + & (RXMQ_SLOTS - 1); + + list_add_tail(&r->next, &rw.wheel[slot]); + + pthread_mutex_unlock(&rw.lock); + + shm_du_buff_wait_ack(sdb); + + return 0; +} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 5ae2085d..182ad084 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -65,11 +65,6 @@ #define shm_rdrb_empty(rdrb) \ (*rdrb->tail == *rdrb->head) -enum shm_du_buff_flags { - SDB_VALID = 0, - SDB_NULL -}; - struct shm_du_buff { size_t size; #ifdef SHM_RDRB_MULTI_BLOCK @@ -77,7 +72,7 @@ struct shm_du_buff { #endif size_t du_head; size_t du_tail; - size_t flags; + size_t refs; size_t idx; }; @@ -96,11 +91,11 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) #ifdef SHM_RDRB_MULTI_BLOCK struct shm_du_buff * sdb; while (!shm_rdrb_empty(rdrb) && - (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL) + (sdb = get_tail_ptr(rdrb))->refs == 0) *rdrb->tail = (*rdrb->tail + sdb->blocks) & ((SHM_BUFFER_SIZE) - 1); #else - while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL) + while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0) *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); #endif pthread_cond_broadcast(rdrb->healthy); @@ -108,7 +103,7 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) static void sanitize(struct shm_rdrbuff * rdrb) { - get_head_ptr(rdrb)->flags = SDB_NULL; + --get_head_ptr(rdrb)->refs; garbage_collect(rdrb); pthread_mutex_consistent(rdrb->lock); } @@ -338,7 +333,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->flags = SDB_NULL; + sdb->refs = 0; sdb->du_head = 0; sdb->du_tail = 0; sdb->idx = *rdrb->head; @@ -347,7 +342,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, } #endif sdb = get_head_ptr(rdrb); - sdb->flags = SDB_VALID; + sdb->refs = 1; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; @@ -434,7 +429,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->flags = SDB_NULL; + sdb->refs = 0; sdb->du_head = 0; sdb->du_tail = 0; sdb->idx = *rdrb->head; @@ -443,7 +438,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, } #endif sdb = get_head_ptr(rdrb); - sdb->flags = SDB_VALID; + sdb->refs = 1; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; @@ -497,6 +492,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, size_t idx) { + struct shm_du_buff * sdb; + assert(rdrb); assert(idx < (SHM_BUFFER_SIZE)); @@ -508,10 +505,13 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, #endif assert(!shm_rdrb_empty(rdrb)); - idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; + sdb = idx_to_du_buff_ptr(rdrb, idx); - if (idx == *rdrb->tail) - garbage_collect(rdrb); + if (sdb->refs == 1) { /* only stack needs it, can be removed */ + sdb->refs = 0; + if (idx == *rdrb->tail) + garbage_collect(rdrb); + } pthread_mutex_unlock(rdrb->lock); @@ -603,3 +603,16 @@ void shm_du_buff_truncate(struct shm_du_buff * sdb, sdb->du_tail = sdb->du_head + len; } + +int shm_du_buff_wait_ack(struct shm_du_buff * sdb) +{ + __sync_add_and_fetch(&sdb->refs, 1); + + return 0; +} + +int shm_du_buff_ack(struct shm_du_buff * sdb) +{ + __sync_sub_and_fetch(&sdb->refs, 1); + return 0; +} diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 7d5b3651..cc51c203 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -1,12 +1,6 @@ get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) -if (NOT (APPLE OR GNU)) - set(TIMERWHEEL_TEST "timerwheel_test.c") -else () - set(TIMERWHEEL_TEST "") -endif () - create_test_sourcelist(${PARENT_DIR}_tests test_suite.c # Add new tests here bitmap_test.c @@ -16,7 +10,6 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c md5_test.c sha3_test.c time_utils_test.c - ${TIMERWHEEL_TEST} ) add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c deleted file mode 100644 index 0ec98316..00000000 --- a/src/lib/tests/timerwheel_test.c +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Test of the timer wheel - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include "timerwheel.c" - -#include <pthread.h> -#include <time.h> -#include <stdlib.h> -#include <stdio.h> - -#define MAX_ELEMENTS 100 -#define MAX_RESOLUTION 10 /* ms */ -#define MAX_ADDITIONS 1000 - -int total; - -int add(void * o) -{ - total += *((int *) o); - return 0; -} - -int timerwheel_test(int argc, char ** argv) -{ - struct timerwheel * tw; - long resolution; - long elements; - struct timespec wait; - - int additions; - - int check_total = 0; - - int i; - int var = 5; - - struct tw_f * f; - - (void) argc; - (void) argv; - - total = 0; - - srand(time(NULL)); - - resolution = rand() % (MAX_RESOLUTION - 1) + 1; - elements = rand() % (MAX_ELEMENTS - 10) + 10; - - tw = timerwheel_create(resolution, resolution * elements); - if (tw == NULL) { - printf("Failed to create timerwheel.\n"); - return -1; - } - - wait.tv_sec = (resolution * elements) / 1000; - wait.tv_nsec = ((resolution * elements) % 1000) * MILLION; - - additions = rand() % MAX_ADDITIONS + 1000; - - for (i = 0; i < additions; ++i) { - int delay = rand() % (resolution * elements); - check_total += var; - f = timerwheel_start(tw, - (void (*)(void *)) add, - (void *) &var, - delay); - if (f == NULL) { - printf("Failed to add function."); - return -1; - } - } - - nanosleep(&wait, NULL); - - timerwheel_move(tw); - - timerwheel_destroy(tw); - - if (total != check_total) { - printf("Totals do not match: %d and %d.\n", total, check_total); - return -1; - } - - return 0; -} diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c deleted file mode 100644 index ef8489bf..00000000 --- a/src/lib/timerwheel.c +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Timerwheel - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200112L - -#include "config.h" - -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> -#include <ouroboros/list.h> - -#include <pthread.h> -#include <stdlib.h> -#include <assert.h> -#include <string.h> - -#define FRAC 10 /* accuracy of the timer */ - -#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1)); -#define tw_free(tw) (tw_used(tw) + 1 < tw->elements) -#define tw_empty(tw) (tw->head == tw->tail) - -struct tw_f { - struct list_head next; - void (* func)(void *); - void * arg; -}; - -struct tw_el { - struct list_head funcs; - struct timespec expiry; -}; - -struct timerwheel { - struct tw_el * wheel; - - struct timespec intv; - - size_t pos; - - pthread_mutex_t lock; - - time_t resolution; - size_t elements; -}; - -static void tw_el_fini(struct tw_el * e) -{ - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &e->funcs) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - } -} - -void timerwheel_move(struct timerwheel * tw) -{ - struct timespec now = {0, 0}; - long ms = tw->resolution * tw->elements; - struct timespec total = {ms / 1000, - (ms % 1000) * MILLION}; - struct list_head * p; - struct list_head * h; - - clock_gettime(CLOCK_MONOTONIC, &now); - - pthread_mutex_lock(&tw->lock); - - while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) { - list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - f->func(f->arg); - free(f); - } - - ts_add(&tw->wheel[tw->pos].expiry, - &total, - &tw->wheel[tw->pos].expiry); - - tw->pos = (tw->pos + 1) & (tw->elements - 1); - } - - pthread_mutex_unlock(&tw->lock); -} - -struct timerwheel * timerwheel_create(time_t resolution, - time_t max_delay) -{ - struct timespec now = {0, 0}; - struct timespec res_ts = {resolution / 1000, - (resolution % 1000) * MILLION}; - size_t i; - - struct timerwheel * tw; - - assert(resolution != 0); - - tw = malloc(sizeof(*tw)); - if (tw == NULL) - return NULL; - - if (pthread_mutex_init(&tw->lock, NULL)) - return NULL; - - tw->elements = 1; - - while (tw->elements < (size_t) max_delay / resolution) - tw->elements <<= 1; - - tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements); - if (tw->wheel == NULL) - goto fail_wheel_malloc; - - tw->resolution = resolution; - - tw->intv.tv_sec = (tw->resolution / FRAC) / 1000; - tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION; - - if (pthread_mutex_init(&tw->lock, NULL)) - goto fail_lock_init; - - tw->pos = 0; - - clock_gettime(CLOCK_MONOTONIC, &now); - now.tv_nsec -= (now.tv_nsec % MILLION); - - for (i = 0; i < tw->elements; ++i) { - list_head_init(&tw->wheel[i].funcs); - tw->wheel[i].expiry = now; - ts_add(&now, &res_ts, &now); - } - - return tw; - - fail_lock_init: - free(tw->wheel); - fail_wheel_malloc: - free(tw); - return NULL; -} - -void timerwheel_destroy(struct timerwheel * tw) -{ - unsigned long i; - - for (i = 0; i < tw->elements; ++i) - tw_el_fini(&tw->wheel[i]); - - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); -} - -struct tw_f * timerwheel_start(struct timerwheel * tw, - void (* func)(void *), - void * arg, - time_t delay) -{ - int pos; - struct tw_f * f = malloc(sizeof(*f)); - if (f == NULL) - return NULL; - - f->func = func; - f->arg = arg; - - assert(delay < (time_t) tw->elements * tw->resolution); - - pthread_mutex_lock(&tw->lock); - - pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); - list_add(&f->next, &tw->wheel[pos].funcs); - - pthread_mutex_unlock(&tw->lock); - - return f; -} - -int timerwheel_restart(struct timerwheel * tw, - struct tw_f * f, - time_t delay) -{ - int pos; - - assert(tw); - assert(delay < (time_t) tw->elements * tw->resolution); - - pthread_mutex_lock(&tw->lock); - - list_del(&f->next); - pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); - list_add(&f->next, &tw->wheel[pos].funcs); - - pthread_mutex_unlock(&tw->lock); - - return 0; -} - -void timerwheel_stop(struct timerwheel * tw, - struct tw_f * f) -{ - assert(tw); - - pthread_mutex_lock(&tw->lock); - - list_del(&f->next); - free(f); - - pthread_mutex_unlock(&tw->lock); -} |