diff options
Diffstat (limited to 'src/lib/timerwheel.c')
| -rw-r--r-- | src/lib/timerwheel.c | 480 |
1 files changed, 331 insertions, 149 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index ef8489bf..96f4ac47 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2018 + * Ouroboros - Copyright (C) 2016 - 2024 * * Timerwheel * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,213 +20,395 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#define _POSIX_C_SOURCE 200112L - -#include "config.h" - -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> #include <ouroboros/list.h> -#include <pthread.h> -#include <stdlib.h> -#include <assert.h> -#include <string.h> - -#define FRAC 10 /* accuracy of the timer */ - -#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1)); -#define tw_free(tw) (tw_used(tw) + 1 < tw->elements) -#define tw_empty(tw) (tw->head == tw->tail) - -struct tw_f { - struct list_head next; - void (* func)(void *); - void * arg; +/* Overflow limits range to about 6 hours. */ +#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) +#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES) +#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES) + +struct rxm { + struct list_head next; + uint32_t seqno; +#ifndef RXM_BUFFER_ON_HEAP + struct shm_du_buff * sdb; +#endif + struct frct_pci * pkt; + size_t len; + time_t t0; /* Time when original was sent (us). */ + struct frcti * frcti; + int fd; + int flow_id; /* Prevent rtx when fd reused. */ }; -struct tw_el { - struct list_head funcs; - struct timespec expiry; +struct ack { + struct list_head next; + struct frcti * frcti; + int fd; + int flow_id; }; -struct timerwheel { - struct tw_el * wheel; - - struct timespec intv; +struct { + /* + * At a 1 ms min resolution, every level bumps the + * resolution by a factor of 16. + */ + struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS]; - size_t pos; + struct list_head acks[ACKQ_SLOTS]; + bool map[ACKQ_SLOTS][PROG_MAX_FLOWS]; + size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */ + size_t prv_ack; /* Last processed ack slot. */ pthread_mutex_t lock; +} rw; - time_t resolution; - size_t elements; -}; - -static void tw_el_fini(struct tw_el * e) +static void timerwheel_fini(void) { + size_t i; + size_t j; struct list_head * p; struct list_head * h; - list_for_each_safe(p, h, &e->funcs) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); + pthread_mutex_lock(&rw.lock); + + for (i = 0; i < RXMQ_LVLS; ++i) { + for (j = 0; j < RXMQ_SLOTS; j++) { + list_for_each_safe(p, h, &rw.rxms[i][j]) { + struct rxm * rxm; + rxm = list_entry(p, struct rxm, next); + list_del(&rxm->next); +#ifdef RXM_BUFFER_ON_HEAP + free(rxm->pkt); +#else + shm_du_buff_ack(rxm->sdb); + ipcp_sdb_release(rxm->sdb); +#endif + free(rxm); + } + } } -} -void timerwheel_move(struct timerwheel * tw) -{ - struct timespec now = {0, 0}; - long ms = tw->resolution * tw->elements; - struct timespec total = {ms / 1000, - (ms % 1000) * MILLION}; - struct list_head * p; - struct list_head * h; + for (i = 0; i < ACKQ_SLOTS; ++i) { + list_for_each_safe(p, h, &rw.acks[i]) { + struct ack * a = list_entry(p, struct ack, next); + list_del(&a->next); + free(a); + } + } - clock_gettime(CLOCK_MONOTONIC, &now); + pthread_mutex_unlock(&rw.lock); - pthread_mutex_lock(&tw->lock); + pthread_mutex_destroy(&rw.lock); +} - while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) { - list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) { - struct tw_f * f = list_entry(p, struct tw_f, next); - list_del(&f->next); - f->func(f->arg); - free(f); - } +static int timerwheel_init(void) +{ + struct timespec now; + size_t i; + size_t j; + + if (pthread_mutex_init(&rw.lock, NULL)) + return -1; - ts_add(&tw->wheel[tw->pos].expiry, - &total, - &tw->wheel[tw->pos].expiry); + clock_gettime(PTHREAD_COND_CLOCK, &now); - tw->pos = (tw->pos + 1) & (tw->elements - 1); + for (i = 0; i < RXMQ_LVLS; ++i) { + rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1); + rw.prv_rxm[i] >>= (RXMQ_BUMP * i); + rw.prv_rxm[i] &= (RXMQ_SLOTS - 1); + for (j = 0; j < RXMQ_SLOTS; ++j) + list_head_init(&rw.rxms[i][j]); } - pthread_mutex_unlock(&tw->lock); + rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1); + for (i = 0; i < ACKQ_SLOTS; ++i) + list_head_init(&rw.acks[i]); + + return 0; } -struct timerwheel * timerwheel_create(time_t resolution, - time_t max_delay) +static void timerwheel_move(void) { - struct timespec now = {0, 0}; - struct timespec res_ts = {resolution / 1000, - (resolution % 1000) * MILLION}; - size_t i; - - struct timerwheel * tw; - - assert(resolution != 0); - - tw = malloc(sizeof(*tw)); - if (tw == NULL) - return NULL; + struct timespec now; + struct list_head * p; + struct list_head * h; + size_t rxm_slot; + size_t ack_slot; + size_t i; + size_t j; + + pthread_mutex_lock(&rw.lock); + + pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + rxm_slot = ts_to_rxm_slot(now); + + for (i = 0; i < RXMQ_LVLS; ++i) { + size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1); + j = rw.prv_rxm[i]; + if (j_max_slot < j) + j_max_slot += RXMQ_SLOTS; + while (j++ < j_max_slot) { + list_for_each_safe(p, h, + &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { + struct rxm * r; + struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; + size_t slot; + size_t rslot; + ssize_t idx; + struct shm_du_buff * sdb; + struct frct_pci * pci; + struct flow * f; + uint32_t snd_lwe; + uint32_t rcv_lwe; + size_t lvl = 0; + + r = list_entry(p, struct rxm, next); + + list_del(&r->next); + + snd_cr = &r->frcti->snd_cr; + rcv_cr = &r->frcti->rcv_cr; + f = &ai.flows[r->fd]; +#ifndef RXM_BUFFER_ON_HEAP + shm_du_buff_ack(r->sdb); +#endif + if (f->frcti == NULL + || f->info.id != r->flow_id) + goto cleanup; + + pthread_rwlock_rdlock(&r->frcti->lock); + + snd_lwe = snd_cr->lwe; + rcv_lwe = rcv_cr->lwe; + + pthread_rwlock_unlock(&r->frcti->lock); + + /* Has been ack'd, remove. */ + if (before(r->seqno, snd_lwe)) + goto cleanup; + + /* Check for r-timer expiry. */ + if (ts_to_ns(now) - r->t0 > r->frcti->r) + goto flow_down; + + pthread_rwlock_wrlock(&r->frcti->lock); + + if (r->seqno == r->frcti->rttseq) { + r->frcti->rto += + r->frcti->rto >> RTO_DIV; + r->frcti->probe = false; + } +#ifdef PROC_FLOW_STATS + r->frcti->n_rtx++; +#endif + rslot = r->frcti->rto >> RXMQ_RES; + + pthread_rwlock_unlock(&r->frcti->lock); + + /* Schedule at least in the next time slot. */ + slot = ts_to_ns(now) >> RXMQ_RES; + + while (rslot >= RXMQ_SLOTS) { + ++lvl; + rslot >>= RXMQ_BUMP; + slot >>= RXMQ_BUMP; + } + + if (lvl >= RXMQ_LVLS) /* Can't reschedule */ + goto flow_down; + + rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1); +#ifdef RXM_BLOCKING + if (ipcp_sdb_reserve(&sdb, r->len) < 0) +#else + if (shm_rdrbuff_alloc(ai.rdrb, r->len, NULL, + &sdb) < 0) +#endif + goto reschedule; /* rdrbuff full */ + + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memcpy(pci, r->pkt, r->len); +#ifndef RXM_BUFFER_ON_HEAP + ipcp_sdb_release(r->sdb); + r->sdb = sdb; + r->pkt = pci; + shm_du_buff_wait_ack(sdb); +#endif + idx = shm_du_buff_get_idx(sdb); + + /* Retransmit the copy. */ + pci->ackno = hton32(rcv_lwe); +#ifdef RXM_BLOCKING + if (shm_rbuff_write_b(f->tx_rb, idx, NULL) < 0) +#else + if (shm_rbuff_write(f->tx_rb, idx) < 0) +#endif + goto flow_down; + shm_flow_set_notify(f->set, f->info.id, + FLOW_PKT); + reschedule: + list_add(&r->next, &rw.rxms[lvl][rslot]); + continue; + + flow_down: + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + cleanup: +#ifdef RXM_BUFFER_ON_HEAP + free(r->pkt); +#else + ipcp_sdb_release(r->sdb); +#endif + free(r); + } + } + rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1); + /* Move up a level in the wheel. */ + rxm_slot >>= RXMQ_BUMP; + } - if (pthread_mutex_init(&tw->lock, NULL)) - return NULL; + ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; - tw->elements = 1; + j = rw.prv_ack; - while (tw->elements < (size_t) max_delay / resolution) - tw->elements <<= 1; + if (ack_slot < j) + ack_slot += ACKQ_SLOTS; - tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements); - if (tw->wheel == NULL) - goto fail_wheel_malloc; + while (j++ < ack_slot) { + list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) { + struct ack * a; + struct flow * f; - tw->resolution = resolution; + a = list_entry(p, struct ack, next); - tw->intv.tv_sec = (tw->resolution / FRAC) / 1000; - tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION; + list_del(&a->next); - if (pthread_mutex_init(&tw->lock, NULL)) - goto fail_lock_init; + f = &ai.flows[a->fd]; - tw->pos = 0; + rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false; - clock_gettime(CLOCK_MONOTONIC, &now); - now.tv_nsec -= (now.tv_nsec % MILLION); + if (f->info.id == a->flow_id && f->frcti != NULL) + send_frct_pkt(a->frcti); - for (i = 0; i < tw->elements; ++i) { - list_head_init(&tw->wheel[i].funcs); - tw->wheel[i].expiry = now; - ts_add(&now, &res_ts, &now); + free(a); + } } - return tw; + rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1); - fail_lock_init: - free(tw->wheel); - fail_wheel_malloc: - free(tw); - return NULL; + pthread_cleanup_pop(true); } -void timerwheel_destroy(struct timerwheel * tw) +static int timerwheel_rxm(struct frcti * frcti, + uint32_t seqno, + struct shm_du_buff * sdb) { - unsigned long i; + struct timespec now; + struct rxm * r; + size_t slot; + size_t lvl = 0; + time_t rto_slot; + + r = malloc(sizeof(*r)); + if (r == NULL) + return -ENOMEM; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + r->t0 = ts_to_ns(now); + r->seqno = seqno; + r->frcti = frcti; + r->len = shm_du_buff_len(sdb); +#ifdef RXM_BUFFER_ON_HEAP + r->pkt = malloc(r->len); + if (r->pkt == NULL) { + free(r); + return -ENOMEM; + } + memcpy(r->pkt, shm_du_buff_head(sdb), r->len); +#else + r->sdb = sdb; + r->pkt = (struct frct_pci *) shm_du_buff_head(sdb); +#endif + pthread_rwlock_rdlock(&r->frcti->lock); - for (i = 0; i < tw->elements; ++i) - tw_el_fini(&tw->wheel[i]); + rto_slot = frcti->rto >> RXMQ_RES; + slot = r->t0 >> RXMQ_RES; - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); -} + r->fd = frcti->fd; + r->flow_id = ai.flows[r->fd].info.id; -struct tw_f * timerwheel_start(struct timerwheel * tw, - void (* func)(void *), - void * arg, - time_t delay) -{ - int pos; - struct tw_f * f = malloc(sizeof(*f)); - if (f == NULL) - return NULL; + pthread_rwlock_unlock(&r->frcti->lock); - f->func = func; - f->arg = arg; + while (rto_slot >= RXMQ_SLOTS) { + ++lvl; + rto_slot >>= RXMQ_BUMP; + slot >>= RXMQ_BUMP; + } - assert(delay < (time_t) tw->elements * tw->resolution); + if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */ +#ifdef RXM_BUFFER_ON_HEAP + free(r->pkt); +#endif + free(r); + return -EPERM; + } - pthread_mutex_lock(&tw->lock); + slot = (slot + rto_slot + 1) & (RXMQ_SLOTS - 1); - pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); - list_add(&f->next, &tw->wheel[pos].funcs); + pthread_mutex_lock(&rw.lock); - pthread_mutex_unlock(&tw->lock); + list_add_tail(&r->next, &rw.rxms[lvl][slot]); +#ifndef RXM_BUFFER_ON_HEAP + shm_du_buff_wait_ack(sdb); +#endif + pthread_mutex_unlock(&rw.lock); - return f; + return 0; } -int timerwheel_restart(struct timerwheel * tw, - struct tw_f * f, - time_t delay) +static int timerwheel_delayed_ack(int fd, + struct frcti * frcti) { - int pos; + struct timespec now; + struct ack * a; + size_t slot; - assert(tw); - assert(delay < (time_t) tw->elements * tw->resolution); + a = malloc(sizeof(*a)); + if (a == NULL) + return -ENOMEM; - pthread_mutex_lock(&tw->lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); - list_del(&f->next); - pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); - list_add(&f->next, &tw->wheel[pos].funcs); + pthread_rwlock_rdlock(&frcti->lock); - pthread_mutex_unlock(&tw->lock); + slot = (((ts_to_ns(now) + (TICTIME << 1)) >> ACKQ_RES) + 1) + & (ACKQ_SLOTS - 1); - return 0; -} + pthread_rwlock_unlock(&frcti->lock); -void timerwheel_stop(struct timerwheel * tw, - struct tw_f * f) -{ - assert(tw); + a->fd = fd; + a->frcti = frcti; + a->flow_id = ai.flows[fd].info.id; - pthread_mutex_lock(&tw->lock); + pthread_mutex_lock(&rw.lock); - list_del(&f->next); - free(f); + if (rw.map[slot][fd]) { + pthread_mutex_unlock(&rw.lock); + free(a); + return 0; + } + + rw.map[slot][fd] = true; + + list_add_tail(&a->next, &rw.acks[slot]); - pthread_mutex_unlock(&tw->lock); + pthread_mutex_unlock(&rw.lock); + + return 0; } |
