From 1e3a9e464cbb2f02c057e9f63c1f270ff27530f4 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sun, 20 Sep 2020 13:04:52 +0200 Subject: lib: Complete retransmission logic This completes the retransmission (automated repeat-request, ARQ) logic, sending (delayed) ACK messages when needed. On deallocation, flows will ACK try to retransmit any remaining unacknowledged messages (unless the FRCTFLINGER flag is turned off; this is on by default). Applications can safely shut down as soon as everything is ACK'd (i.e. the current Delta-t run is done). The activity timeout is now passed to the IPCP for it to sleep before completing deallocation (and releasing the flow_id). That should be moved to the IRMd in due time. The timerwheel is revised to be multi-level to reduce memory consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16) and each level has RXMQ_SLOTS (1 << 8) slots. The lowest level has a resolution of (1 << RXMQ_RES) (20) ns, which is roughly a millisecond. Currently, 3 levels are defined, so the largest delay we can schedule at each level is: Level 0: 256ms Level 1: 4s Level 2: about a minute. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/rxmwheel.c | 261 ----------------------------------------------------- 1 file changed, 261 deletions(-) delete mode 100644 src/lib/rxmwheel.c (limited to 'src/lib/rxmwheel.c') diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c deleted file mode 100644 index 0572c7b7..00000000 --- a/src/lib/rxmwheel.c +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2020 - * - * Timerwheel - * - * Dimitri Staessens - * Sander Vrijders - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include - -#define RXMQ_S 14 /* defines #slots */ -#define RXMQ_M 34 /* defines max delay (ns) */ -#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (ns) */ -#define RXMQ_SLOTS (1 << RXMQ_S) -#define RXMQ_MAX (1 << RXMQ_M) /* us */ - -/* Overflow limits range to about 6 hours. */ -#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) -#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) - -struct rxm { - struct list_head next; - uint32_t seqno; - struct shm_du_buff * sdb; - uint8_t * head; - uint8_t * tail; - time_t t0; /* Time when original was sent (us). */ - size_t mul; /* RTO multiplier. */ - struct frcti * frcti; -}; - -struct rxmwheel { - struct list_head wheel[RXMQ_SLOTS]; - - size_t prv; /* Last processed slot. */ - pthread_mutex_t lock; -}; - -static void rxmwheel_destroy(struct rxmwheel * rw) -{ - size_t i; - struct list_head * p; - struct list_head * h; - - pthread_mutex_destroy(&rw->lock); - - for (i = 0; i < RXMQ_SLOTS; ++i) { - list_for_each_safe(p, h, &rw->wheel[i]) { - struct rxm * rxm = list_entry(p, struct rxm, next); - list_del(&rxm->next); - shm_du_buff_ack(rxm->sdb); - ipcp_sdb_release(rxm->sdb); - free(rxm); - } - } -} - -static struct rxmwheel * rxmwheel_create(void) -{ - struct rxmwheel * rw; - struct timespec now; - size_t i; - - rw = malloc(sizeof(*rw)); - if (rw == NULL) - return NULL; - - if (pthread_mutex_init(&rw->lock, NULL)) { - free(rw); - return NULL; - } - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - /* Mark the previous timeslot as the last one processed. */ - rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); - - for (i = 0; i < RXMQ_SLOTS; ++i) - list_head_init(&rw->wheel[i]); - - return rw; -} - -static void rxmwheel_move(struct rxmwheel * rw) -{ - struct timespec now; - struct list_head * p; - struct list_head * h; - size_t slot; - size_t i; - - pthread_mutex_lock(&rw->lock); - - pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, - (void *) &rw->lock); - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - slot = ts_to_slot(now); - - i = rw->prv; - - if (slot < i) - slot += RXMQ_SLOTS; - - while (i++ < slot) { - list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) { - struct rxm * r; - struct frct_cr * snd_cr; - struct frct_cr * rcv_cr; - size_t rslot; - ssize_t idx; - struct shm_du_buff * sdb; - uint8_t * head; - struct flow * f; - int fd; - uint32_t snd_lwe; - uint32_t rcv_lwe; - time_t rto; - - r = list_entry(p, struct rxm, next); - - list_del(&r->next); - - snd_cr = &r->frcti->snd_cr; - rcv_cr = &r->frcti->rcv_cr; - fd = r->frcti->fd; - f = &ai.flows[fd]; - - shm_du_buff_ack(r->sdb); - - pthread_rwlock_wrlock(&r->frcti->lock); - - snd_lwe = snd_cr->lwe; - rcv_lwe = rcv_cr->lwe; - rto = r->frcti->rto; - /* Assume last RTX is the one that's ACK'd. */ - if (r->frcti->probe - && (r->frcti->rttseq + 1) == r->seqno) - r->frcti->t_probe = now; - - pthread_rwlock_unlock(&r->frcti->lock); - - /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_lwe) < 0) { - ipcp_sdb_release(r->sdb); - free(r); - continue; - } - - /* Check for r-timer expiry. */ - if (ts_to_ns(now) - r->t0 > r->frcti->r) { - ipcp_sdb_release(r->sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - continue; - } - - /* Copy the payload, safe rtx in other layers. */ - if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - ipcp_sdb_release(r->sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - continue; - } - - idx = shm_du_buff_get_idx(sdb); - - head = shm_du_buff_head(sdb); - memcpy(head, r->head, r->tail - r->head); - - ipcp_sdb_release(r->sdb); - - ((struct frct_pci *) head)->ackno = hton32(rcv_lwe); - - /* Retransmit the copy. */ - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { - ipcp_sdb_release(sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - continue; - } - - /* Reschedule. */ - shm_du_buff_wait_ack(sdb); - - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - - r->head = head; - r->tail = shm_du_buff_tail(sdb); - r->sdb = sdb; - - /* Schedule at least in the next time slot */ - rslot = (slot + MAX((rto >> RXMQ_R), 1)) - & (RXMQ_SLOTS - 1); - - list_add_tail(&r->next, &rw->wheel[rslot]); - } - } - - rw->prv = slot & (RXMQ_SLOTS - 1); - - pthread_cleanup_pop(true); -} - -static int rxmwheel_add(struct rxmwheel * rw, - struct frcti * frcti, - uint32_t seqno, - struct shm_du_buff * sdb) -{ - struct timespec now; - struct rxm * r; - size_t slot; - - r = malloc(sizeof(*r)); - if (r == NULL) - return -ENOMEM; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - r->t0 = ts_to_ns(now); - r->mul = 0; - r->seqno = seqno; - r->sdb = sdb; - r->head = shm_du_buff_head(sdb); - r->tail = shm_du_buff_tail(sdb); - r->frcti = frcti; - - pthread_rwlock_rdlock(&r->frcti->lock); - - slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); - - pthread_rwlock_unlock(&r->frcti->lock); - - pthread_mutex_lock(&rw->lock); - - list_add_tail(&r->next, &rw->wheel[slot]); - - shm_du_buff_wait_ack(sdb); - - pthread_mutex_unlock(&rw->lock); - - return 0; -} -- cgit v1.2.3