summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/shm_du_buff.h4
-rw-r--r--include/ouroboros/timerwheel.h47
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/dev.c15
-rw-r--r--src/lib/frct.c85
-rw-r--r--src/lib/rxmwheel.c251
-rw-r--r--src/lib/shm_rdrbuff.c45
-rw-r--r--src/lib/tests/CMakeLists.txt7
-rw-r--r--src/lib/tests/timerwheel_test.c104
-rw-r--r--src/lib/timerwheel.c229
10 files changed, 338 insertions, 450 deletions
diff --git a/include/ouroboros/shm_du_buff.h b/include/ouroboros/shm_du_buff.h
index 31090cd3..066898df 100644
--- a/include/ouroboros/shm_du_buff.h
+++ b/include/ouroboros/shm_du_buff.h
@@ -49,4 +49,8 @@ uint8_t * shm_du_buff_tail_release(struct shm_du_buff * sdb,
void shm_du_buff_truncate(struct shm_du_buff * sdb,
size_t len);
+int shm_du_buff_wait_ack(struct shm_du_buff * sdb);
+
+int shm_du_buff_ack(struct shm_du_buff * sdb);
+
#endif /* OUROBOROS_SHM_DU_BUFF_H */
diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h
deleted file mode 100644
index 34994185..00000000
--- a/include/ouroboros/timerwheel.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2018
- *
- * Timerwheel
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_LIB_TIMERWHEEL_H
-#define OUROBOROS_LIB_TIMERWHEEL_H
-
-struct timerwheel;
-
-struct timerwheel * timerwheel_create(time_t resolution,
- time_t max_delay);
-
-void timerwheel_destroy(struct timerwheel * tw);
-
-struct tw_f * timerwheel_start(struct timerwheel * tw,
- void (* func)(void *),
- void * arg,
- time_t delay); /* ms */
-
-int timerwheel_restart(struct timerwheel * tw,
- struct tw_f * f,
- time_t delay); /* ms */
-
-void timerwheel_stop(struct timerwheel * tw,
- struct tw_f * f);
-
-void timerwheel_move(struct timerwheel * tw);
-
-#endif /* OUROBOROS_LIB_TIMERWHEEL_H */
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 5dd37346..42164fac 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -183,7 +183,6 @@ set(SOURCE_FILES_DEV
# Add source files here
cacep.c
dev.c
- timerwheel.c
)
set(SOURCE_FILES_IRM
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 456019d2..6dbb925e 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -44,7 +44,6 @@
#include <ouroboros/shm_rbuff.h>
#include <ouroboros/utils.h>
#include <ouroboros/fqueue.h>
-#include <ouroboros/timerwheel.h>
#include <stdlib.h>
#include <string.h>
@@ -89,6 +88,9 @@ struct port {
pthread_cond_t state_cond;
};
+#define frcti_to_flow(frcti) \
+ ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti)))
+
struct flow {
struct shm_rbuff * rx_rb;
struct shm_rbuff * tx_rb;
@@ -402,12 +404,12 @@ static void init(int argc,
if (pthread_rwlock_init(&ai.lock, NULL))
goto fail_lock;
- if (frct_init())
- goto fail_frct;
+ if (rxmwheel_init())
+ goto fail_rxmwheel;
return;
- fail_frct:
+ fail_rxmwheel:
pthread_rwlock_destroy(&ai.lock);
fail_lock:
for (i = 0; i < SYS_MAX_FLOWS; ++i)
@@ -443,7 +445,7 @@ static void fini(void)
if (ai.fds == NULL)
return;
- frct_fini();
+ rxmwheel_fini();
if (ai.prog != NULL)
free(ai.prog);
@@ -469,9 +471,6 @@ static void fini(void)
shm_rdrbuff_close(ai.rdrb);
- if (ai.tw != NULL)
- timerwheel_destroy(ai.tw);
-
free(ai.flows);
free(ai.ports);
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 200a9fe7..6041279a 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -39,8 +39,8 @@ struct frct_cr {
uint8_t cflags;
uint32_t seqno;
- time_t act;
- time_t inact;
+ time_t act; /* s */
+ time_t inact; /* s */
};
struct frcti {
@@ -50,24 +50,19 @@ struct frcti {
time_t a;
time_t r;
+ time_t rto; /* ms */
+
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
ssize_t rq[RQ_SIZE];
-
- struct timespec rtt;
-
pthread_rwlock_t lock;
};
-struct {
- struct timerwheel * tw;
-} frct;
-
enum frct_flags {
FRCT_DATA = 0x01, /* PDU carries data */
FRCT_DRF = 0x02, /* Data run flag */
- FRCT_ACK = 0x03, /* ACK field valid */
+ FRCT_ACK = 0x04, /* ACK field valid */
FRCT_FC = 0x08, /* FC window valid */
FRCT_RDVZ = 0x10, /* Rendez-vous */
FRCT_MFGM = 0x20, /* More fragments */
@@ -83,21 +78,7 @@ struct frct_pci {
uint32_t ackno;
} __attribute__((packed));
-static int frct_init(void)
-{
- frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS);
- if (frct.tw == NULL)
- return -1;
-
- return 0;
-}
-
-static void frct_fini(void)
-{
- assert(frct.tw);
-
- timerwheel_destroy(frct.tw);
-}
+#include <rxmwheel.c>
static struct frcti * frcti_create(int fd)
{
@@ -129,9 +110,13 @@ static struct frcti * frcti_create(int fd)
frcti->snd_cr.inact = 3 * delta_t;
frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
+ /* Initial rto. FIXME: recalc using Karn algorithm. */
+ frcti->rto = 120;
- if (ai.flows[fd].spec.loss == 0)
+ if (ai.flows[fd].spec.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX;
+ frcti->rcv_cr.cflags |= FRCTFRTX;
+ }
frcti->rcv_cr.inact = 2 * delta_t;
frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1);
@@ -151,6 +136,8 @@ static void frcti_destroy(struct frcti * frcti)
* make sure everything we sent is acked.
*/
+ rxmwheel_clear(frcti->fd);
+
pthread_rwlock_destroy(&frcti->lock);
free(frcti);
@@ -190,10 +177,10 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
/* See if we already have the next PDU. */
pthread_rwlock_wrlock(&frcti->lock);
- pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+ pos = frcti->rcv_cr.seqno & (RQ_SIZE - 1);
idx = frcti->rq[pos];
if (idx != -1) {
- ++frcti->rcv_cr.lwe;
+ ++frcti->rcv_cr.seqno;
frcti->rq[pos] = -1;
}
@@ -204,7 +191,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)
{
- struct frct_pci * pci = NULL;
+ struct frct_pci * pci;
pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
if (pci != NULL)
@@ -219,10 +206,14 @@ static int __frcti_snd(struct frcti * frcti,
struct frct_pci * pci;
struct timespec now;
struct frct_cr * snd_cr;
+ struct frct_cr * rcv_cr;
assert(frcti);
snd_cr = &frcti->snd_cr;
+ rcv_cr = &frcti->rcv_cr;
+
+ rxmwheel_move();
pci = frcti_alloc_head(sdb);
if (pci == NULL)
@@ -247,16 +238,22 @@ static int __frcti_snd(struct frcti * frcti,
#else
random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
#endif
- frcti->snd_cr.lwe = snd_cr->seqno;
+ frcti->snd_cr.lwe = snd_cr->seqno - 1;
}
- pci->seqno = hton32(snd_cr->seqno++);
- if (!(snd_cr->cflags & FRCTFRTX))
- snd_cr->lwe++;
- else
- /* TODO: update on ACK */
+ pci->seqno = hton32(snd_cr->seqno);
+ if (!(snd_cr->cflags & FRCTFRTX)) {
snd_cr->lwe++;
+ } else if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
+ rxmwheel_add(frcti, snd_cr->seqno, sdb);
+ if (rcv_cr->lwe <= rcv_cr->seqno) {
+ pci->flags |= FRCT_ACK;
+ pci->ackno = hton32(rcv_cr->seqno);
+ rcv_cr->lwe = rcv_cr->seqno;
+ }
+ }
+ snd_cr->seqno++;
snd_cr->act = now.tv_sec;
pthread_rwlock_unlock(&frcti->lock);
@@ -271,6 +268,7 @@ static int __frcti_rcv(struct frcti * frcti,
ssize_t idx;
struct frct_pci * pci;
struct timespec now;
+ struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
uint32_t seqno;
int ret = 0;
@@ -278,6 +276,7 @@ static int __frcti_rcv(struct frcti * frcti,
assert(frcti);
rcv_cr = &frcti->rcv_cr;
+ snd_cr = &frcti->snd_cr;
pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
@@ -301,22 +300,29 @@ static int __frcti_rcv(struct frcti * frcti,
if (seqno == rcv_cr->seqno) {
++rcv_cr->seqno;
} else { /* Out of order. */
- if ((int32_t)(seqno - rcv_cr->seqno) < 0) /* Duplicate. */
+ if ((int32_t)(seqno - rcv_cr->seqno) < 0)
goto drop_packet;
if (rcv_cr->cflags & FRCTFRTX) {
size_t pos = seqno & (RQ_SIZE - 1);
- if ((seqno - rcv_cr->seqno) > RQ_SIZE /* Out of rq. */
+ if ((seqno - rcv_cr->lwe) > RQ_SIZE /* Out of rq. */
|| frcti->rq[pos] != -1) /* Duplicate in rq. */
goto drop_packet;
/* Queue. */
frcti->rq[pos] = idx;
ret = -EAGAIN;
} else {
- rcv_cr->seqno = seqno;
+ rcv_cr->seqno = seqno + 1;
}
}
+ if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) {
+ uint32_t ackno = ntoh32(pci->ackno);
+ /* Check for duplicate (old) acks. */
+ if ((int32_t)(ackno - snd_cr->lwe) >= 0)
+ snd_cr->lwe = ackno;
+ }
+
rcv_cr->act = now.tv_sec;
pthread_rwlock_unlock(&frcti->lock);
@@ -324,10 +330,13 @@ static int __frcti_rcv(struct frcti * frcti,
if (!(pci->flags & FRCT_DATA))
shm_rdrbuff_remove(ai.rdrb, idx);
+ rxmwheel_move();
+
return ret;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
shm_rdrbuff_remove(ai.rdrb, idx);
+ rxmwheel_move();
return -EAGAIN;
}
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
new file mode 100644
index 00000000..69151801
--- /dev/null
+++ b/src/lib/rxmwheel.c
@@ -0,0 +1,251 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2018
+ *
+ * Timerwheel
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/list.h>
+
+#define RXMQ_S 12 /* defines #slots */
+#define RXMQ_M 15 /* defines max delay */
+#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */
+#define RXMQ_SLOTS (1 << RXMQ_S)
+#define RXMQ_MAX (1 << RXMQ_M) /* ms */
+
+/* Small inacurracy to avoid slow division by MILLION. */
+#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))
+#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
+
+struct rxm {
+ struct list_head next;
+ uint32_t seqno;
+ struct shm_du_buff * sdb;
+ uint8_t * head;
+ uint8_t * tail;
+ time_t t0; /* Time when original was sent (s). */
+ size_t mul; /* RTO multiplier. */
+ struct frcti * frcti;
+};
+
+struct {
+ struct list_head wheel[RXMQ_SLOTS];
+
+ size_t prv; /* Last processed slot. */
+ pthread_mutex_t lock;
+} rw;
+
+static void rxmwheel_fini(void)
+{
+ size_t i;
+ struct list_head * p;
+ struct list_head * h;
+
+ for (i = 0; i < RXMQ_SLOTS; ++i) {
+ list_for_each_safe(p, h, &rw.wheel[i]) {
+ struct rxm * rxm = list_entry(p, struct rxm, next);
+ list_del(&rxm->next);
+ free(rxm);
+ }
+ }
+}
+
+static int rxmwheel_init(void)
+{
+ struct timespec now;
+ size_t i;
+
+ if (pthread_mutex_init(&rw.lock, NULL))
+ return -1;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ /* Mark the previous timeslot as the last one processed. */
+ rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
+
+ for (i = 0; i < RXMQ_SLOTS; ++i)
+ list_head_init(&rw.wheel[i]);
+
+ return 0;
+}
+
+static void rxmwheel_clear(int fd)
+{
+ size_t i;
+
+ /* FIXME: Add list element to avoid looping over full rxmwheel */
+ pthread_mutex_lock(&rw.lock);
+
+ for (i = 0; i < RXMQ_SLOTS; ++i) {
+ struct list_head * p;
+ struct list_head * h;
+
+ list_for_each_safe(p, h, &rw.wheel[i]) {
+ struct rxm * r = list_entry(p, struct rxm, next);
+ if (r->frcti->fd == fd) {
+ list_del(&r->next);
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&rw.lock);
+}
+
+/* Return fd on r-timer expiry. */
+static int rxmwheel_move(void)
+{
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ size_t slot;
+ size_t i;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ slot = ts_to_slot(now);
+
+ pthread_mutex_lock(&rw.lock);
+
+ for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) {
+ list_for_each_safe(p, h, &rw.wheel[i]) {
+ struct rxm * r;
+ struct frct_cr * snd_cr;
+ struct frct_cr * rcv_cr;
+ size_t rslot;
+ time_t newtime;
+ ssize_t idx;
+ struct shm_du_buff * sdb;
+ uint8_t * head;
+ struct flow * f;
+
+ r = list_entry(p, struct rxm, next);
+ list_del(&r->next);
+
+ snd_cr = &r->frcti->snd_cr;
+ rcv_cr = &r->frcti->rcv_cr;
+ /* Has been ack'd, remove. */
+ if ((int) (r->seqno - snd_cr->lwe) <= 0) {
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ continue;
+ }
+ /* Check for r-timer expiry. */
+ if (ts_to_ms(now) - r->t0 > r->frcti->r) {
+ int fd = r->frcti->fd;
+ pthread_mutex_unlock(&rw.lock);
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ return fd;
+ }
+
+ /* Copy the payload, safe rtx in other layers. */
+ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
+ /* FIXME: reschedule send? */
+ int fd = r->frcti->fd;
+ pthread_mutex_unlock(&rw.lock);
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ return fd;
+ }
+
+ idx = shm_du_buff_get_idx(sdb);
+
+ head = shm_du_buff_head(sdb);
+ memcpy(head, r->head, r->tail - r->head);
+
+ /* Release the old copy */
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+
+ /* Update ackno and make sure DRF is not set*/
+ ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe);
+ ((struct frct_pci *) head)->flags &= ~FRCT_DRF;
+
+ f = &ai.flows[r->frcti->fd];
+
+ /* Retransmit the copy. */
+ if (shm_rbuff_write(f->tx_rb, idx)) {
+ ipcp_sdb_release(sdb);
+ free(r);
+ /* FIXME: reschedule send? */
+ continue;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+
+ /* Reschedule. */
+ shm_du_buff_wait_ack(sdb);
+
+ r->head = head;
+ r->tail = shm_du_buff_tail(sdb);
+ r->sdb = sdb;
+
+ newtime = ts_to_ms(now) + (f->frcti->rto << ++r->mul);
+ rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1);
+
+ list_add_tail(&r->next, &rw.wheel[rslot]);
+ }
+ }
+
+ rw.prv = slot;
+
+ pthread_mutex_unlock(&rw.lock);
+
+ return 0;
+}
+
+static int rxmwheel_add(struct frcti * frcti,
+ uint32_t seqno,
+ struct shm_du_buff * sdb)
+{
+ struct timespec now;
+ struct rxm * r;
+ size_t slot;
+
+ r = malloc(sizeof(*r));
+ if (r == NULL)
+ return -ENOMEM;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_mutex_lock(&rw.lock);
+
+ r->t0 = ts_to_ms(now);
+ r->mul = 0;
+ r->seqno = seqno;
+ r->sdb = sdb;
+ r->head = shm_du_buff_head(sdb);
+ r->tail = shm_du_buff_tail(sdb);
+ r->frcti = frcti;
+
+ slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1);
+
+ list_add_tail(&r->next, &rw.wheel[slot]);
+
+ pthread_mutex_unlock(&rw.lock);
+
+ shm_du_buff_wait_ack(sdb);
+
+ return 0;
+}
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index a94d4169..e9ef9222 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -65,11 +65,6 @@
#define shm_rdrb_empty(rdrb) \
(*rdrb->tail == *rdrb->head)
-enum shm_du_buff_flags {
- SDB_VALID = 0,
- SDB_NULL
-};
-
struct shm_du_buff {
size_t size;
#ifdef SHM_RDRB_MULTI_BLOCK
@@ -77,7 +72,7 @@ struct shm_du_buff {
#endif
size_t du_head;
size_t du_tail;
- size_t flags;
+ size_t refs;
size_t idx;
};
@@ -95,11 +90,11 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)
#ifdef SHM_RDRB_MULTI_BLOCK
struct shm_du_buff * sdb;
while (!shm_rdrb_empty(rdrb) &&
- (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL)
+ (sdb = get_tail_ptr(rdrb))->refs == 0)
*rdrb->tail = (*rdrb->tail + sdb->blocks)
& ((SHM_BUFFER_SIZE) - 1);
#else
- while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL)
+ while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0)
*rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);
#endif
pthread_cond_broadcast(rdrb->healthy);
@@ -107,7 +102,7 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)
static void sanitize(struct shm_rdrbuff * rdrb)
{
- get_head_ptr(rdrb)->flags = SDB_NULL;
+ --get_head_ptr(rdrb)->refs;
garbage_collect(rdrb);
pthread_mutex_consistent(rdrb->lock);
}
@@ -330,7 +325,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
sdb = get_head_ptr(rdrb);
sdb->size = 0;
sdb->blocks = padblocks;
- sdb->flags = SDB_NULL;
+ sdb->refs = 0;
sdb->du_head = 0;
sdb->du_tail = 0;
sdb->idx = *rdrb->head;
@@ -339,7 +334,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
}
#endif
sdb = get_head_ptr(rdrb);
- sdb->flags = SDB_VALID;
+ sdb->refs = 1;
sdb->idx = *rdrb->head;
#ifdef SHM_RDRB_MULTI_BLOCK
sdb->blocks = blocks;
@@ -423,7 +418,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
sdb = get_head_ptr(rdrb);
sdb->size = 0;
sdb->blocks = padblocks;
- sdb->flags = SDB_NULL;
+ sdb->refs = 0;
sdb->du_head = 0;
sdb->du_tail = 0;
sdb->idx = *rdrb->head;
@@ -432,7 +427,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
}
#endif
sdb = get_head_ptr(rdrb);
- sdb->flags = SDB_VALID;
+ sdb->refs = 1;
sdb->idx = *rdrb->head;
#ifdef SHM_RDRB_MULTI_BLOCK
sdb->blocks = blocks;
@@ -486,6 +481,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb,
int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
size_t idx)
{
+ struct shm_du_buff * sdb;
+
assert(rdrb);
assert(idx < (SHM_BUFFER_SIZE));
@@ -497,10 +494,13 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
#endif
assert(!shm_rdrb_empty(rdrb));
- idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL;
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
- if (idx == *rdrb->tail)
- garbage_collect(rdrb);
+ if (sdb->refs == 1) { /* only stack needs it, can be removed */
+ sdb->refs = 0;
+ if (idx == *rdrb->tail)
+ garbage_collect(rdrb);
+ }
pthread_mutex_unlock(rdrb->lock);
@@ -592,3 +592,16 @@ void shm_du_buff_truncate(struct shm_du_buff * sdb,
sdb->du_tail = sdb->du_head + len;
}
+
+int shm_du_buff_wait_ack(struct shm_du_buff * sdb)
+{
+ __sync_add_and_fetch(&sdb->refs, 1);
+
+ return 0;
+}
+
+int shm_du_buff_ack(struct shm_du_buff * sdb)
+{
+ __sync_sub_and_fetch(&sdb->refs, 1);
+ return 0;
+}
diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt
index 7d5b3651..cc51c203 100644
--- a/src/lib/tests/CMakeLists.txt
+++ b/src/lib/tests/CMakeLists.txt
@@ -1,12 +1,6 @@
get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
-if (NOT (APPLE OR GNU))
- set(TIMERWHEEL_TEST "timerwheel_test.c")
-else ()
- set(TIMERWHEEL_TEST "")
-endif ()
-
create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
# Add new tests here
bitmap_test.c
@@ -16,7 +10,6 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
md5_test.c
sha3_test.c
time_utils_test.c
- ${TIMERWHEEL_TEST}
)
add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests})
diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c
deleted file mode 100644
index 0ec98316..00000000
--- a/src/lib/tests/timerwheel_test.c
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2018
- *
- * Test of the timer wheel
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#include "timerwheel.c"
-
-#include <pthread.h>
-#include <time.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#define MAX_ELEMENTS 100
-#define MAX_RESOLUTION 10 /* ms */
-#define MAX_ADDITIONS 1000
-
-int total;
-
-int add(void * o)
-{
- total += *((int *) o);
- return 0;
-}
-
-int timerwheel_test(int argc, char ** argv)
-{
- struct timerwheel * tw;
- long resolution;
- long elements;
- struct timespec wait;
-
- int additions;
-
- int check_total = 0;
-
- int i;
- int var = 5;
-
- struct tw_f * f;
-
- (void) argc;
- (void) argv;
-
- total = 0;
-
- srand(time(NULL));
-
- resolution = rand() % (MAX_RESOLUTION - 1) + 1;
- elements = rand() % (MAX_ELEMENTS - 10) + 10;
-
- tw = timerwheel_create(resolution, resolution * elements);
- if (tw == NULL) {
- printf("Failed to create timerwheel.\n");
- return -1;
- }
-
- wait.tv_sec = (resolution * elements) / 1000;
- wait.tv_nsec = ((resolution * elements) % 1000) * MILLION;
-
- additions = rand() % MAX_ADDITIONS + 1000;
-
- for (i = 0; i < additions; ++i) {
- int delay = rand() % (resolution * elements);
- check_total += var;
- f = timerwheel_start(tw,
- (void (*)(void *)) add,
- (void *) &var,
- delay);
- if (f == NULL) {
- printf("Failed to add function.");
- return -1;
- }
- }
-
- nanosleep(&wait, NULL);
-
- timerwheel_move(tw);
-
- timerwheel_destroy(tw);
-
- if (total != check_total) {
- printf("Totals do not match: %d and %d.\n", total, check_total);
- return -1;
- }
-
- return 0;
-}
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
deleted file mode 100644
index 0feda642..00000000
--- a/src/lib/timerwheel.c
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2018
- *
- * Timerwheel
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#define _POSIX_C_SOURCE 200112L
-
-#include "config.h"
-
-#include <ouroboros/time_utils.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/list.h>
-
-#include <pthread.h>
-#include <stdlib.h>
-#include <assert.h>
-#include <string.h>
-
-#define FRAC 10 /* accuracy of the timer */
-
-#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1));
-#define tw_free(tw) (tw_used(tw) + 1 < tw->elements)
-#define tw_empty(tw) (tw->head == tw->tail)
-
-struct tw_f {
- struct list_head next;
- void (* func)(void *);
- void * arg;
-};
-
-struct tw_el {
- struct list_head funcs;
- struct timespec expiry;
-};
-
-struct timerwheel {
- struct tw_el * wheel;
-
- struct timespec intv;
-
- size_t pos;
-
- pthread_mutex_t lock;
-
- time_t resolution;
- size_t elements;
-};
-
-static void tw_el_fini(struct tw_el * e)
-{
- struct list_head * p;
- struct list_head * h;
-
- list_for_each_safe(p, h, &e->funcs) {
- struct tw_f * f = list_entry(p, struct tw_f, next);
- list_del(&f->next);
- }
-}
-
-void timerwheel_move(struct timerwheel * tw)
-{
- struct timespec now = {0, 0};
- long ms = tw->resolution * tw->elements;
- struct timespec total = {ms / 1000,
- (ms % 1000) * MILLION};
- struct list_head * p;
- struct list_head * h;
-
- clock_gettime(CLOCK_MONOTONIC, &now);
-
- pthread_mutex_lock(&tw->lock);
-
- while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) {
- list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) {
- struct tw_f * f = list_entry(p, struct tw_f, next);
- list_del(&f->next);
- f->func(f->arg);
- free(f);
- }
-
- ts_add(&tw->wheel[tw->pos].expiry,
- &total,
- &tw->wheel[tw->pos].expiry);
-
- tw->pos = (tw->pos + 1) & (tw->elements - 1);
- }
-
- pthread_mutex_unlock(&tw->lock);
-}
-
-struct timerwheel * timerwheel_create(time_t resolution,
- time_t max_delay)
-{
- struct timespec now = {0, 0};
- struct timespec res_ts = {resolution / 1000,
- (resolution % 1000) * MILLION};
- size_t i;
-
- struct timerwheel * tw;
-
- assert(resolution != 0);
-
- tw = malloc(sizeof(*tw));
- if (tw == NULL)
- return NULL;
-
- tw->elements = 1;
-
- while (tw->elements < (size_t) max_delay / resolution)
- tw->elements <<= 1;
-
- tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements);
- if (tw->wheel == NULL)
- goto fail_wheel_malloc;
-
- tw->resolution = resolution;
-
- tw->intv.tv_sec = (tw->resolution / FRAC) / 1000;
- tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION;
-
- if (pthread_mutex_init(&tw->lock, NULL))
- goto fail_lock_init;
-
- tw->pos = 0;
-
- clock_gettime(CLOCK_MONOTONIC, &now);
- now.tv_nsec -= (now.tv_nsec % MILLION);
-
- for (i = 0; i < tw->elements; ++i) {
- list_head_init(&tw->wheel[i].funcs);
- tw->wheel[i].expiry = now;
- ts_add(&now, &res_ts, &now);
- }
-
- return tw;
-
- fail_lock_init:
- free(tw->wheel);
- fail_wheel_malloc:
- free(tw);
- return NULL;
-}
-
-void timerwheel_destroy(struct timerwheel * tw)
-{
- unsigned long i;
-
- for (i = 0; i < tw->elements; ++i)
- tw_el_fini(&tw->wheel[i]);
-
- pthread_mutex_destroy(&tw->lock);
- free(tw->wheel);
- free(tw);
-}
-
-struct tw_f * timerwheel_start(struct timerwheel * tw,
- void (* func)(void *),
- void * arg,
- time_t delay)
-{
- int pos;
- struct tw_f * f = malloc(sizeof(*f));
- if (f == NULL)
- return NULL;
-
- f->func = func;
- f->arg = arg;
-
- assert(delay < (time_t) tw->elements * tw->resolution);
-
- pthread_mutex_lock(&tw->lock);
-
- pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1);
- list_add(&f->next, &tw->wheel[pos].funcs);
-
- pthread_mutex_unlock(&tw->lock);
-
- return f;
-}
-
-int timerwheel_restart(struct timerwheel * tw,
- struct tw_f * f,
- time_t delay)
-{
- int pos;
-
- assert(tw);
- assert(delay < (time_t) tw->elements * tw->resolution);
-
- pthread_mutex_lock(&tw->lock);
-
- list_del(&f->next);
- pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1);
- list_add(&f->next, &tw->wheel[pos].funcs);
-
- pthread_mutex_unlock(&tw->lock);
-
- return 0;
-}
-
-void timerwheel_stop(struct timerwheel * tw,
- struct tw_f * f)
-{
- assert(tw);
-
- pthread_mutex_lock(&tw->lock);
-
- list_del(&f->next);
- free(f);
-
- pthread_mutex_unlock(&tw->lock);
-}