/* * Ouroboros - Copyright (C) 2016 - 2019 * * Timerwheel * * Dimitri Staessens * Sander Vrijders * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * version 2.1 as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., http://www.fsf.org/about/contact/. */ #include #define RXMQ_S 16 /* defines #slots */ #define RXMQ_M 24 /* defines max delay */ #define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ #define RXMQ_SLOTS (1 << RXMQ_S) #define RXMQ_MAX (1 << RXMQ_M) /* ms */ /* Small inacurracy to avoid slow division by MILLION. */ #define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) #define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10)) #define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) struct rxm { struct list_head next; uint32_t seqno; struct shm_du_buff * sdb; uint8_t * head; uint8_t * tail; time_t t0; /* Time when original was sent (s). */ size_t mul; /* RTO multiplier. */ struct frcti * frcti; }; struct { struct list_head wheel[RXMQ_SLOTS]; size_t prv; /* Last processed slot. */ pthread_mutex_t lock; } rw; static void rxmwheel_fini(void) { size_t i; struct list_head * p; struct list_head * h; for (i = 0; i < RXMQ_SLOTS; ++i) { list_for_each_safe(p, h, &rw.wheel[i]) { struct rxm * rxm = list_entry(p, struct rxm, next); list_del(&rxm->next); free(rxm); } } } static int rxmwheel_init(void) { struct timespec now; size_t i; if (pthread_mutex_init(&rw.lock, NULL)) return -1; clock_gettime(PTHREAD_COND_CLOCK, &now); /* Mark the previous timeslot as the last one processed. */ rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); for (i = 0; i < RXMQ_SLOTS; ++i) list_head_init(&rw.wheel[i]); return 0; } static void rxmwheel_clear(int fd) { size_t i; /* FIXME: Add list element to avoid looping over full rxmwheel. */ pthread_mutex_lock(&rw.lock); for (i = 0; i < RXMQ_SLOTS; ++i) { struct list_head * p; struct list_head * h; list_for_each_safe(p, h, &rw.wheel[i]) { struct rxm * r = list_entry(p, struct rxm, next); if (r->frcti->fd == fd) { list_del(&r->next); shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); } } } pthread_mutex_unlock(&rw.lock); } static void check_probe(struct frcti * frcti, uint32_t seqno) { /* disable rtt probe if this packet */ /* TODO: This should be locked, but lock reversal! */ if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { /* Backoff to avoid never updating rtt */ frcti->srtt_us <<= 1; frcti->probe = false; } } #define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1)) /* Return fd on r-timer expiry. */ static int rxmwheel_move(void) { struct timespec now; struct list_head * p; struct list_head * h; size_t slot; size_t i; clock_gettime(PTHREAD_COND_CLOCK, &now); slot = ts_to_slot(now); pthread_mutex_lock(&rw.lock); for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { list_for_each_safe(p, h, &rw.wheel[i]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; size_t rslot; time_t newtime; ssize_t idx; struct shm_du_buff * sdb; uint8_t * head; struct flow * f; r = list_entry(p, struct rxm, next); list_del(&r->next); snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; /* Has been ack'd, remove. */ if ((int) (r->seqno - snd_cr->lwe) <= 0) { shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); continue; } /* Disable using this seqno as rto probe. */ check_probe(r->frcti, r->seqno); /* Check for r-timer expiry. */ if (ts_to_ms(now) - r->t0 > r->frcti->r) { int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); return fd; } /* Copy the payload, safe rtx in other layers. */ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { /* FIXME: reschedule send? */ int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); return fd; } idx = shm_du_buff_get_idx(sdb); head = shm_du_buff_head(sdb); memcpy(head, r->head, r->tail - r->head); /* Release the old copy. */ shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); /* Update ackno and make sure DRF is not set. */ ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); ((struct frct_pci *) head)->flags &= ~FRCT_DRF; f = &ai.flows[r->frcti->fd]; /* Retransmit the copy. */ if (shm_rbuff_write(f->tx_rb, idx)) { ipcp_sdb_release(sdb); free(r); /* FIXME: reschedule send? */ continue; } shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); /* Reschedule. */ shm_du_buff_wait_ack(sdb); r->head = head; r->tail = shm_du_buff_tail(sdb); r->sdb = sdb; newtime = ts_to_us(now) + rto(f->frcti); rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[rslot]); } } rw.prv = slot; pthread_mutex_unlock(&rw.lock); return 0; } static int rxmwheel_add(struct frcti * frcti, uint32_t seqno, struct shm_du_buff * sdb) { struct timespec now; struct rxm * r; size_t slot; r = malloc(sizeof(*r)); if (r == NULL) return -ENOMEM; clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_mutex_lock(&rw.lock); r->t0 = ts_to_us(now); r->mul = 0; r->seqno = seqno; r->sdb = sdb; r->head = shm_du_buff_head(sdb); r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[slot]); pthread_mutex_unlock(&rw.lock); shm_du_buff_wait_ack(sdb); return 0; }