diff options
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r-- | src/lib/rxmwheel.c | 271 |
1 files changed, 0 insertions, 271 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c deleted file mode 100644 index ce7ef8e4..00000000 --- a/src/lib/rxmwheel.c +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2020 - * - * Timerwheel - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include <ouroboros/list.h> - -#define RXMQ_S 16 /* defines #slots */ -#define RXMQ_M 24 /* defines max delay */ -#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ -#define RXMQ_SLOTS (1 << RXMQ_S) -#define RXMQ_MAX (1 << RXMQ_M) /* ms */ - -/* Small inacurracy to avoid slow division by MILLION. */ -#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) -#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10)) -#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) - -struct rxm { - struct list_head next; - uint32_t seqno; - struct shm_du_buff * sdb; - uint8_t * head; - uint8_t * tail; - time_t t0; /* Time when original was sent (s). */ - size_t mul; /* RTO multiplier. */ - struct frcti * frcti; -}; - -struct { - struct list_head wheel[RXMQ_SLOTS]; - - size_t prv; /* Last processed slot. */ - pthread_mutex_t lock; -} rw; - -static void rxmwheel_fini(void) -{ - size_t i; - struct list_head * p; - struct list_head * h; - - for (i = 0; i < RXMQ_SLOTS; ++i) { - list_for_each_safe(p, h, &rw.wheel[i]) { - struct rxm * rxm = list_entry(p, struct rxm, next); - list_del(&rxm->next); - free(rxm); - } - } -} - -static int rxmwheel_init(void) -{ - struct timespec now; - size_t i; - - if (pthread_mutex_init(&rw.lock, NULL)) - return -1; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - /* Mark the previous timeslot as the last one processed. */ - rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); - - for (i = 0; i < RXMQ_SLOTS; ++i) - list_head_init(&rw.wheel[i]); - - return 0; -} - -static void rxmwheel_clear(int fd) -{ - size_t i; - - /* FIXME: Add list element to avoid looping over full rxmwheel. */ - pthread_mutex_lock(&rw.lock); - - for (i = 0; i < RXMQ_SLOTS; ++i) { - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &rw.wheel[i]) { - struct rxm * r = list_entry(p, struct rxm, next); - if (r->frcti->fd == fd) { - list_del(&r->next); - shm_du_buff_ack(r->sdb); - ipcp_sdb_release(r->sdb); - free(r); - } - } - } - - pthread_mutex_unlock(&rw.lock); -} - -static void check_probe(struct frcti * frcti, - uint32_t seqno) -{ - /* disable rtt probe if this packet */ - - /* TODO: This should be locked, but lock reversal! */ - - if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { - /* Backoff to avoid never updating rtt */ - frcti->srtt_us <<= 1; - frcti->probe = false; - } -} - -#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1)) -/* Return fd on r-timer expiry. */ -static int rxmwheel_move(void) -{ - struct timespec now; - struct list_head * p; - struct list_head * h; - size_t slot; - size_t i; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - slot = ts_to_slot(now); - - pthread_mutex_lock(&rw.lock); - - for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { - list_for_each_safe(p, h, &rw.wheel[i]) { - struct rxm * r; - struct frct_cr * snd_cr; - struct frct_cr * rcv_cr; - size_t rslot; - time_t newtime; - ssize_t idx; - struct shm_du_buff * sdb; - uint8_t * head; - struct flow * f; - - r = list_entry(p, struct rxm, next); - list_del(&r->next); - - snd_cr = &r->frcti->snd_cr; - rcv_cr = &r->frcti->rcv_cr; - /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_cr->lwe) <= 0) { - shm_du_buff_ack(r->sdb); - ipcp_sdb_release(r->sdb); - free(r); - continue; - } - - /* Disable using this seqno as rto probe. */ - check_probe(r->frcti, r->seqno); - - /* Check for r-timer expiry. */ - if (ts_to_ms(now) - r->t0 > r->frcti->r) { - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); - shm_du_buff_ack(r->sdb); - ipcp_sdb_release(r->sdb); - free(r); - return fd; - } - - /* Copy the payload, safe rtx in other layers. */ - if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - /* FIXME: reschedule send? */ - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); - shm_du_buff_ack(r->sdb); - ipcp_sdb_release(r->sdb); - free(r); - return fd; - } - - idx = shm_du_buff_get_idx(sdb); - - head = shm_du_buff_head(sdb); - memcpy(head, r->head, r->tail - r->head); - - /* Release the old copy. */ - shm_du_buff_ack(r->sdb); - ipcp_sdb_release(r->sdb); - - /* Update ackno and make sure DRF is not set. */ - ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); - ((struct frct_pci *) head)->flags &= ~FRCT_DRF; - - f = &ai.flows[r->frcti->fd]; - - /* Retransmit the copy. */ - if (shm_rbuff_write(f->tx_rb, idx)) { - ipcp_sdb_release(sdb); - free(r); - /* FIXME: reschedule send? */ - continue; - } - - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - - /* Reschedule. */ - shm_du_buff_wait_ack(sdb); - - r->head = head; - r->tail = shm_du_buff_tail(sdb); - r->sdb = sdb; - - newtime = ts_to_us(now) + rto(f->frcti); - rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); - - list_add_tail(&r->next, &rw.wheel[rslot]); - } - } - - rw.prv = slot; - - pthread_mutex_unlock(&rw.lock); - - return 0; -} - -static int rxmwheel_add(struct frcti * frcti, - uint32_t seqno, - struct shm_du_buff * sdb) -{ - struct timespec now; - struct rxm * r; - size_t slot; - - r = malloc(sizeof(*r)); - if (r == NULL) - return -ENOMEM; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_mutex_lock(&rw.lock); - - r->t0 = ts_to_us(now); - r->mul = 0; - r->seqno = seqno; - r->sdb = sdb; - r->head = shm_du_buff_head(sdb); - r->tail = shm_du_buff_tail(sdb); - r->frcti = frcti; - - slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1); - - list_add_tail(&r->next, &rw.wheel[slot]); - - pthread_mutex_unlock(&rw.lock); - - shm_du_buff_wait_ack(sdb); - - return 0; -} |