summaryrefslogtreecommitdiff
path: root/src/lib/timerwheel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/timerwheel.c')
-rw-r--r--src/lib/timerwheel.c480
1 files changed, 331 insertions, 149 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index ef8489bf..96f4ac47 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2018
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Timerwheel
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -20,213 +20,395 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#define _POSIX_C_SOURCE 200112L
-
-#include "config.h"
-
-#include <ouroboros/time_utils.h>
-#include <ouroboros/errno.h>
#include <ouroboros/list.h>
-#include <pthread.h>
-#include <stdlib.h>
-#include <assert.h>
-#include <string.h>
-
-#define FRAC 10 /* accuracy of the timer */
-
-#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1));
-#define tw_free(tw) (tw_used(tw) + 1 < tw->elements)
-#define tw_empty(tw) (tw->head == tw->tail)
-
-struct tw_f {
- struct list_head next;
- void (* func)(void *);
- void * arg;
+/* Overflow limits range to about 6 hours. */
+#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
+#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES)
+#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES)
+
+struct rxm {
+ struct list_head next;
+ uint32_t seqno;
+#ifndef RXM_BUFFER_ON_HEAP
+ struct shm_du_buff * sdb;
+#endif
+ struct frct_pci * pkt;
+ size_t len;
+ time_t t0; /* Time when original was sent (us). */
+ struct frcti * frcti;
+ int fd;
+ int flow_id; /* Prevent rtx when fd reused. */
};
-struct tw_el {
- struct list_head funcs;
- struct timespec expiry;
+struct ack {
+ struct list_head next;
+ struct frcti * frcti;
+ int fd;
+ int flow_id;
};
-struct timerwheel {
- struct tw_el * wheel;
-
- struct timespec intv;
+struct {
+ /*
+ * At a 1 ms min resolution, every level bumps the
+ * resolution by a factor of 16.
+ */
+ struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS];
- size_t pos;
+ struct list_head acks[ACKQ_SLOTS];
+ bool map[ACKQ_SLOTS][PROG_MAX_FLOWS];
+ size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */
+ size_t prv_ack; /* Last processed ack slot. */
pthread_mutex_t lock;
+} rw;
- time_t resolution;
- size_t elements;
-};
-
-static void tw_el_fini(struct tw_el * e)
+static void timerwheel_fini(void)
{
+ size_t i;
+ size_t j;
struct list_head * p;
struct list_head * h;
- list_for_each_safe(p, h, &e->funcs) {
- struct tw_f * f = list_entry(p, struct tw_f, next);
- list_del(&f->next);
+ pthread_mutex_lock(&rw.lock);
+
+ for (i = 0; i < RXMQ_LVLS; ++i) {
+ for (j = 0; j < RXMQ_SLOTS; j++) {
+ list_for_each_safe(p, h, &rw.rxms[i][j]) {
+ struct rxm * rxm;
+ rxm = list_entry(p, struct rxm, next);
+ list_del(&rxm->next);
+#ifdef RXM_BUFFER_ON_HEAP
+ free(rxm->pkt);
+#else
+ shm_du_buff_ack(rxm->sdb);
+ ipcp_sdb_release(rxm->sdb);
+#endif
+ free(rxm);
+ }
+ }
}
-}
-void timerwheel_move(struct timerwheel * tw)
-{
- struct timespec now = {0, 0};
- long ms = tw->resolution * tw->elements;
- struct timespec total = {ms / 1000,
- (ms % 1000) * MILLION};
- struct list_head * p;
- struct list_head * h;
+ for (i = 0; i < ACKQ_SLOTS; ++i) {
+ list_for_each_safe(p, h, &rw.acks[i]) {
+ struct ack * a = list_entry(p, struct ack, next);
+ list_del(&a->next);
+ free(a);
+ }
+ }
- clock_gettime(CLOCK_MONOTONIC, &now);
+ pthread_mutex_unlock(&rw.lock);
- pthread_mutex_lock(&tw->lock);
+ pthread_mutex_destroy(&rw.lock);
+}
- while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) {
- list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) {
- struct tw_f * f = list_entry(p, struct tw_f, next);
- list_del(&f->next);
- f->func(f->arg);
- free(f);
- }
+static int timerwheel_init(void)
+{
+ struct timespec now;
+ size_t i;
+ size_t j;
+
+ if (pthread_mutex_init(&rw.lock, NULL))
+ return -1;
- ts_add(&tw->wheel[tw->pos].expiry,
- &total,
- &tw->wheel[tw->pos].expiry);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- tw->pos = (tw->pos + 1) & (tw->elements - 1);
+ for (i = 0; i < RXMQ_LVLS; ++i) {
+ rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1);
+ rw.prv_rxm[i] >>= (RXMQ_BUMP * i);
+ rw.prv_rxm[i] &= (RXMQ_SLOTS - 1);
+ for (j = 0; j < RXMQ_SLOTS; ++j)
+ list_head_init(&rw.rxms[i][j]);
}
- pthread_mutex_unlock(&tw->lock);
+ rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1);
+ for (i = 0; i < ACKQ_SLOTS; ++i)
+ list_head_init(&rw.acks[i]);
+
+ return 0;
}
-struct timerwheel * timerwheel_create(time_t resolution,
- time_t max_delay)
+static void timerwheel_move(void)
{
- struct timespec now = {0, 0};
- struct timespec res_ts = {resolution / 1000,
- (resolution % 1000) * MILLION};
- size_t i;
-
- struct timerwheel * tw;
-
- assert(resolution != 0);
-
- tw = malloc(sizeof(*tw));
- if (tw == NULL)
- return NULL;
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ size_t rxm_slot;
+ size_t ack_slot;
+ size_t i;
+ size_t j;
+
+ pthread_mutex_lock(&rw.lock);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ rxm_slot = ts_to_rxm_slot(now);
+
+ for (i = 0; i < RXMQ_LVLS; ++i) {
+ size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1);
+ j = rw.prv_rxm[i];
+ if (j_max_slot < j)
+ j_max_slot += RXMQ_SLOTS;
+ while (j++ < j_max_slot) {
+ list_for_each_safe(p, h,
+ &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
+ struct rxm * r;
+ struct frct_cr * snd_cr;
+ struct frct_cr * rcv_cr;
+ size_t slot;
+ size_t rslot;
+ ssize_t idx;
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ struct flow * f;
+ uint32_t snd_lwe;
+ uint32_t rcv_lwe;
+ size_t lvl = 0;
+
+ r = list_entry(p, struct rxm, next);
+
+ list_del(&r->next);
+
+ snd_cr = &r->frcti->snd_cr;
+ rcv_cr = &r->frcti->rcv_cr;
+ f = &ai.flows[r->fd];
+#ifndef RXM_BUFFER_ON_HEAP
+ shm_du_buff_ack(r->sdb);
+#endif
+ if (f->frcti == NULL
+ || f->info.id != r->flow_id)
+ goto cleanup;
+
+ pthread_rwlock_rdlock(&r->frcti->lock);
+
+ snd_lwe = snd_cr->lwe;
+ rcv_lwe = rcv_cr->lwe;
+
+ pthread_rwlock_unlock(&r->frcti->lock);
+
+ /* Has been ack'd, remove. */
+ if (before(r->seqno, snd_lwe))
+ goto cleanup;
+
+ /* Check for r-timer expiry. */
+ if (ts_to_ns(now) - r->t0 > r->frcti->r)
+ goto flow_down;
+
+ pthread_rwlock_wrlock(&r->frcti->lock);
+
+ if (r->seqno == r->frcti->rttseq) {
+ r->frcti->rto +=
+ r->frcti->rto >> RTO_DIV;
+ r->frcti->probe = false;
+ }
+#ifdef PROC_FLOW_STATS
+ r->frcti->n_rtx++;
+#endif
+ rslot = r->frcti->rto >> RXMQ_RES;
+
+ pthread_rwlock_unlock(&r->frcti->lock);
+
+ /* Schedule at least in the next time slot. */
+ slot = ts_to_ns(now) >> RXMQ_RES;
+
+ while (rslot >= RXMQ_SLOTS) {
+ ++lvl;
+ rslot >>= RXMQ_BUMP;
+ slot >>= RXMQ_BUMP;
+ }
+
+ if (lvl >= RXMQ_LVLS) /* Can't reschedule */
+ goto flow_down;
+
+ rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1);
+#ifdef RXM_BLOCKING
+ if (ipcp_sdb_reserve(&sdb, r->len) < 0)
+#else
+ if (shm_rdrbuff_alloc(ai.rdrb, r->len, NULL,
+ &sdb) < 0)
+#endif
+ goto reschedule; /* rdrbuff full */
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memcpy(pci, r->pkt, r->len);
+#ifndef RXM_BUFFER_ON_HEAP
+ ipcp_sdb_release(r->sdb);
+ r->sdb = sdb;
+ r->pkt = pci;
+ shm_du_buff_wait_ack(sdb);
+#endif
+ idx = shm_du_buff_get_idx(sdb);
+
+ /* Retransmit the copy. */
+ pci->ackno = hton32(rcv_lwe);
+#ifdef RXM_BLOCKING
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL) < 0)
+#else
+ if (shm_rbuff_write(f->tx_rb, idx) < 0)
+#endif
+ goto flow_down;
+ shm_flow_set_notify(f->set, f->info.id,
+ FLOW_PKT);
+ reschedule:
+ list_add(&r->next, &rw.rxms[lvl][rslot]);
+ continue;
+
+ flow_down:
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ cleanup:
+#ifdef RXM_BUFFER_ON_HEAP
+ free(r->pkt);
+#else
+ ipcp_sdb_release(r->sdb);
+#endif
+ free(r);
+ }
+ }
+ rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1);
+ /* Move up a level in the wheel. */
+ rxm_slot >>= RXMQ_BUMP;
+ }
- if (pthread_mutex_init(&tw->lock, NULL))
- return NULL;
+ ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
- tw->elements = 1;
+ j = rw.prv_ack;
- while (tw->elements < (size_t) max_delay / resolution)
- tw->elements <<= 1;
+ if (ack_slot < j)
+ ack_slot += ACKQ_SLOTS;
- tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements);
- if (tw->wheel == NULL)
- goto fail_wheel_malloc;
+ while (j++ < ack_slot) {
+ list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) {
+ struct ack * a;
+ struct flow * f;
- tw->resolution = resolution;
+ a = list_entry(p, struct ack, next);
- tw->intv.tv_sec = (tw->resolution / FRAC) / 1000;
- tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION;
+ list_del(&a->next);
- if (pthread_mutex_init(&tw->lock, NULL))
- goto fail_lock_init;
+ f = &ai.flows[a->fd];
- tw->pos = 0;
+ rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
- clock_gettime(CLOCK_MONOTONIC, &now);
- now.tv_nsec -= (now.tv_nsec % MILLION);
+ if (f->info.id == a->flow_id && f->frcti != NULL)
+ send_frct_pkt(a->frcti);
- for (i = 0; i < tw->elements; ++i) {
- list_head_init(&tw->wheel[i].funcs);
- tw->wheel[i].expiry = now;
- ts_add(&now, &res_ts, &now);
+ free(a);
+ }
}
- return tw;
+ rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1);
- fail_lock_init:
- free(tw->wheel);
- fail_wheel_malloc:
- free(tw);
- return NULL;
+ pthread_cleanup_pop(true);
}
-void timerwheel_destroy(struct timerwheel * tw)
+static int timerwheel_rxm(struct frcti * frcti,
+ uint32_t seqno,
+ struct shm_du_buff * sdb)
{
- unsigned long i;
+ struct timespec now;
+ struct rxm * r;
+ size_t slot;
+ size_t lvl = 0;
+ time_t rto_slot;
+
+ r = malloc(sizeof(*r));
+ if (r == NULL)
+ return -ENOMEM;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ r->t0 = ts_to_ns(now);
+ r->seqno = seqno;
+ r->frcti = frcti;
+ r->len = shm_du_buff_len(sdb);
+#ifdef RXM_BUFFER_ON_HEAP
+ r->pkt = malloc(r->len);
+ if (r->pkt == NULL) {
+ free(r);
+ return -ENOMEM;
+ }
+ memcpy(r->pkt, shm_du_buff_head(sdb), r->len);
+#else
+ r->sdb = sdb;
+ r->pkt = (struct frct_pci *) shm_du_buff_head(sdb);
+#endif
+ pthread_rwlock_rdlock(&r->frcti->lock);
- for (i = 0; i < tw->elements; ++i)
- tw_el_fini(&tw->wheel[i]);
+ rto_slot = frcti->rto >> RXMQ_RES;
+ slot = r->t0 >> RXMQ_RES;
- pthread_mutex_destroy(&tw->lock);
- free(tw->wheel);
- free(tw);
-}
+ r->fd = frcti->fd;
+ r->flow_id = ai.flows[r->fd].info.id;
-struct tw_f * timerwheel_start(struct timerwheel * tw,
- void (* func)(void *),
- void * arg,
- time_t delay)
-{
- int pos;
- struct tw_f * f = malloc(sizeof(*f));
- if (f == NULL)
- return NULL;
+ pthread_rwlock_unlock(&r->frcti->lock);
- f->func = func;
- f->arg = arg;
+ while (rto_slot >= RXMQ_SLOTS) {
+ ++lvl;
+ rto_slot >>= RXMQ_BUMP;
+ slot >>= RXMQ_BUMP;
+ }
- assert(delay < (time_t) tw->elements * tw->resolution);
+ if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */
+#ifdef RXM_BUFFER_ON_HEAP
+ free(r->pkt);
+#endif
+ free(r);
+ return -EPERM;
+ }
- pthread_mutex_lock(&tw->lock);
+ slot = (slot + rto_slot + 1) & (RXMQ_SLOTS - 1);
- pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1);
- list_add(&f->next, &tw->wheel[pos].funcs);
+ pthread_mutex_lock(&rw.lock);
- pthread_mutex_unlock(&tw->lock);
+ list_add_tail(&r->next, &rw.rxms[lvl][slot]);
+#ifndef RXM_BUFFER_ON_HEAP
+ shm_du_buff_wait_ack(sdb);
+#endif
+ pthread_mutex_unlock(&rw.lock);
- return f;
+ return 0;
}
-int timerwheel_restart(struct timerwheel * tw,
- struct tw_f * f,
- time_t delay)
+static int timerwheel_delayed_ack(int fd,
+ struct frcti * frcti)
{
- int pos;
+ struct timespec now;
+ struct ack * a;
+ size_t slot;
- assert(tw);
- assert(delay < (time_t) tw->elements * tw->resolution);
+ a = malloc(sizeof(*a));
+ if (a == NULL)
+ return -ENOMEM;
- pthread_mutex_lock(&tw->lock);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- list_del(&f->next);
- pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1);
- list_add(&f->next, &tw->wheel[pos].funcs);
+ pthread_rwlock_rdlock(&frcti->lock);
- pthread_mutex_unlock(&tw->lock);
+ slot = (((ts_to_ns(now) + (TICTIME << 1)) >> ACKQ_RES) + 1)
+ & (ACKQ_SLOTS - 1);
- return 0;
-}
+ pthread_rwlock_unlock(&frcti->lock);
-void timerwheel_stop(struct timerwheel * tw,
- struct tw_f * f)
-{
- assert(tw);
+ a->fd = fd;
+ a->frcti = frcti;
+ a->flow_id = ai.flows[fd].info.id;
- pthread_mutex_lock(&tw->lock);
+ pthread_mutex_lock(&rw.lock);
- list_del(&f->next);
- free(f);
+ if (rw.map[slot][fd]) {
+ pthread_mutex_unlock(&rw.lock);
+ free(a);
+ return 0;
+ }
+
+ rw.map[slot][fd] = true;
+
+ list_add_tail(&a->next, &rw.acks[slot]);
- pthread_mutex_unlock(&tw->lock);
+ pthread_mutex_unlock(&rw.lock);
+
+ return 0;
}