summaryrefslogtreecommitdiff
path: root/src/lib/rxmwheel.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-09-20 13:04:52 +0200
committerSander Vrijders <sander@ouroboros.rocks>2020-09-25 11:52:51 +0200
commit1e3a9e464cbb2f02c057e9f63c1f270ff27530f4 (patch)
tree5193774eea2bb204d062cc47e178a3702d4790a4 /src/lib/rxmwheel.c
parent5f468ee5e02a0d63ed8ad7420ee1beda87e524d6 (diff)
downloadouroboros-1e3a9e464cbb2f02c057e9f63c1f270ff27530f4.tar.gz
ouroboros-1e3a9e464cbb2f02c057e9f63c1f270ff27530f4.zip
lib: Complete retransmission logic
This completes the retransmission (automated repeat-request, ARQ) logic, sending (delayed) ACK messages when needed. On deallocation, flows will ACK try to retransmit any remaining unacknowledged messages (unless the FRCTFLINGER flag is turned off; this is on by default). Applications can safely shut down as soon as everything is ACK'd (i.e. the current Delta-t run is done). The activity timeout is now passed to the IPCP for it to sleep before completing deallocation (and releasing the flow_id). That should be moved to the IRMd in due time. The timerwheel is revised to be multi-level to reduce memory consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16) and each level has RXMQ_SLOTS (1 << 8) slots. The lowest level has a resolution of (1 << RXMQ_RES) (20) ns, which is roughly a millisecond. Currently, 3 levels are defined, so the largest delay we can schedule at each level is: Level 0: 256ms Level 1: 4s Level 2: about a minute. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r--src/lib/rxmwheel.c261
1 files changed, 0 insertions, 261 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
deleted file mode 100644
index 0572c7b7..00000000
--- a/src/lib/rxmwheel.c
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2020
- *
- * Timerwheel
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#include <ouroboros/list.h>
-
-#define RXMQ_S 14 /* defines #slots */
-#define RXMQ_M 34 /* defines max delay (ns) */
-#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (ns) */
-#define RXMQ_SLOTS (1 << RXMQ_S)
-#define RXMQ_MAX (1 << RXMQ_M) /* us */
-
-/* Overflow limits range to about 6 hours. */
-#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
-#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
-
-struct rxm {
- struct list_head next;
- uint32_t seqno;
- struct shm_du_buff * sdb;
- uint8_t * head;
- uint8_t * tail;
- time_t t0; /* Time when original was sent (us). */
- size_t mul; /* RTO multiplier. */
- struct frcti * frcti;
-};
-
-struct rxmwheel {
- struct list_head wheel[RXMQ_SLOTS];
-
- size_t prv; /* Last processed slot. */
- pthread_mutex_t lock;
-};
-
-static void rxmwheel_destroy(struct rxmwheel * rw)
-{
- size_t i;
- struct list_head * p;
- struct list_head * h;
-
- pthread_mutex_destroy(&rw->lock);
-
- for (i = 0; i < RXMQ_SLOTS; ++i) {
- list_for_each_safe(p, h, &rw->wheel[i]) {
- struct rxm * rxm = list_entry(p, struct rxm, next);
- list_del(&rxm->next);
- shm_du_buff_ack(rxm->sdb);
- ipcp_sdb_release(rxm->sdb);
- free(rxm);
- }
- }
-}
-
-static struct rxmwheel * rxmwheel_create(void)
-{
- struct rxmwheel * rw;
- struct timespec now;
- size_t i;
-
- rw = malloc(sizeof(*rw));
- if (rw == NULL)
- return NULL;
-
- if (pthread_mutex_init(&rw->lock, NULL)) {
- free(rw);
- return NULL;
- }
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- /* Mark the previous timeslot as the last one processed. */
- rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
-
- for (i = 0; i < RXMQ_SLOTS; ++i)
- list_head_init(&rw->wheel[i]);
-
- return rw;
-}
-
-static void rxmwheel_move(struct rxmwheel * rw)
-{
- struct timespec now;
- struct list_head * p;
- struct list_head * h;
- size_t slot;
- size_t i;
-
- pthread_mutex_lock(&rw->lock);
-
- pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
- (void *) &rw->lock);
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- slot = ts_to_slot(now);
-
- i = rw->prv;
-
- if (slot < i)
- slot += RXMQ_SLOTS;
-
- while (i++ < slot) {
- list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {
- struct rxm * r;
- struct frct_cr * snd_cr;
- struct frct_cr * rcv_cr;
- size_t rslot;
- ssize_t idx;
- struct shm_du_buff * sdb;
- uint8_t * head;
- struct flow * f;
- int fd;
- uint32_t snd_lwe;
- uint32_t rcv_lwe;
- time_t rto;
-
- r = list_entry(p, struct rxm, next);
-
- list_del(&r->next);
-
- snd_cr = &r->frcti->snd_cr;
- rcv_cr = &r->frcti->rcv_cr;
- fd = r->frcti->fd;
- f = &ai.flows[fd];
-
- shm_du_buff_ack(r->sdb);
-
- pthread_rwlock_wrlock(&r->frcti->lock);
-
- snd_lwe = snd_cr->lwe;
- rcv_lwe = rcv_cr->lwe;
- rto = r->frcti->rto;
- /* Assume last RTX is the one that's ACK'd. */
- if (r->frcti->probe
- && (r->frcti->rttseq + 1) == r->seqno)
- r->frcti->t_probe = now;
-
- pthread_rwlock_unlock(&r->frcti->lock);
-
- /* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_lwe) < 0) {
- ipcp_sdb_release(r->sdb);
- free(r);
- continue;
- }
-
- /* Check for r-timer expiry. */
- if (ts_to_ns(now) - r->t0 > r->frcti->r) {
- ipcp_sdb_release(r->sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
- continue;
- }
-
- /* Copy the payload, safe rtx in other layers. */
- if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
- ipcp_sdb_release(r->sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
- continue;
- }
-
- idx = shm_du_buff_get_idx(sdb);
-
- head = shm_du_buff_head(sdb);
- memcpy(head, r->head, r->tail - r->head);
-
- ipcp_sdb_release(r->sdb);
-
- ((struct frct_pci *) head)->ackno = hton32(rcv_lwe);
-
- /* Retransmit the copy. */
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
- ipcp_sdb_release(sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
- continue;
- }
-
- /* Reschedule. */
- shm_du_buff_wait_ack(sdb);
-
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
- r->head = head;
- r->tail = shm_du_buff_tail(sdb);
- r->sdb = sdb;
-
- /* Schedule at least in the next time slot */
- rslot = (slot + MAX((rto >> RXMQ_R), 1))
- & (RXMQ_SLOTS - 1);
-
- list_add_tail(&r->next, &rw->wheel[rslot]);
- }
- }
-
- rw->prv = slot & (RXMQ_SLOTS - 1);
-
- pthread_cleanup_pop(true);
-}
-
-static int rxmwheel_add(struct rxmwheel * rw,
- struct frcti * frcti,
- uint32_t seqno,
- struct shm_du_buff * sdb)
-{
- struct timespec now;
- struct rxm * r;
- size_t slot;
-
- r = malloc(sizeof(*r));
- if (r == NULL)
- return -ENOMEM;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- r->t0 = ts_to_ns(now);
- r->mul = 0;
- r->seqno = seqno;
- r->sdb = sdb;
- r->head = shm_du_buff_head(sdb);
- r->tail = shm_du_buff_tail(sdb);
- r->frcti = frcti;
-
- pthread_rwlock_rdlock(&r->frcti->lock);
-
- slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1);
-
- pthread_rwlock_unlock(&r->frcti->lock);
-
- pthread_mutex_lock(&rw->lock);
-
- list_add_tail(&r->next, &rw->wheel[slot]);
-
- shm_du_buff_wait_ack(sdb);
-
- pthread_mutex_unlock(&rw->lock);
-
- return 0;
-}