diff options
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r-- | src/lib/frct.c | 286 |
1 files changed, 145 insertions, 141 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index a1e792af..c6fef35c 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024 * * Flow and Retransmission Control * @@ -20,6 +20,8 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#include <ouroboros/endian.h> + #define DELT_RDV (100 * MILLION) /* ns */ #define MAX_RDV (1 * BILLION) /* ns */ @@ -52,10 +54,20 @@ struct frcti { uint32_t rttseq; struct timespec t_probe; /* Probe time */ bool probe; /* Probe active */ - +#ifdef PROC_FLOW_STATS + size_t n_rtx; /* Number of rxm packets */ + size_t n_prb; /* Number of rtt probes */ + size_t n_rtt; /* Number of estimates */ + size_t n_dup; /* Duplicates received */ + size_t n_dak; /* Delayed ACKs received */ + size_t n_rdv; /* Number of rdv packets */ + size_t n_out; /* Packets out of window */ + size_t n_rqo; /* Packets out of rqueue */ +#endif struct frct_cr snd_cr; struct frct_cr rcv_cr; + ssize_t rq[RQ_SIZE]; pthread_rwlock_t lock; @@ -130,7 +142,15 @@ static int frct_rib_read(const char * path, "Receiver left window edge: %20u\n" "Receiver right window edge: %20u\n" "Receiver inactive (ns): %20ld\n" - "Receiver last ack: %20u\n", + "Receiver last ack: %20u\n" + "Number of pkt retransmissions: %20zu\n" + "Number of rtt probes: %20zu\n" + "Number of rtt estimates: %20zu\n" + "Number of duplicates received: %20zu\n" + "Number of delayed acks received: %20zu\n" + "Number of rendez-vous sent: %20zu\n" + "Number of packets out of window: %20zu\n" + "Number of packets out of rqueue: %20zu\n", frcti->mpl, frcti->a, frcti->r, @@ -144,7 +164,15 @@ static int frct_rib_read(const char * path, frcti->rcv_cr.lwe, frcti->rcv_cr.rwe, ts_diff_ns(&frcti->rcv_cr.act, &now), - frcti->rcv_cr.seqno); + frcti->rcv_cr.seqno, + frcti->n_rtx, + frcti->n_prb, + frcti->n_rtt, + frcti->n_dup, + frcti->n_dak, + frcti->n_rdv, + frcti->n_out, + frcti->n_rqo); pthread_rwlock_unlock(&flow->frcti->lock); @@ -156,10 +184,19 @@ static int frct_rib_read(const char * path, static int frct_rib_readdir(char *** buf) { *buf = malloc(sizeof(**buf)); + if (*buf == NULL) + goto fail_malloc; (*buf)[0] = strdup("frct"); + if ((*buf)[0] == NULL) + goto fail_strdup; return 1; + + fail_strdup: + free(*buf); + fail_malloc: + return -ENOMEM; } static int frct_rib_getattr(const char * path, @@ -168,7 +205,7 @@ static int frct_rib_getattr(const char * path, (void) path; (void) attr; - attr->size = 1024; + attr->size = 1189; attr->mtime = 0; return 0; @@ -223,16 +260,24 @@ static void __send_frct_pkt(int fd, pci->ackno = hton32(ackno); f = &ai.flows[fd]; + + if (crypt_encrypt(&f->crypt, sdb) < 0) + goto fail; + #ifdef RXM_BLOCKING - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) #else - if (shm_rbuff_write(f->tx_rb, idx)) { + if (shm_rbuff_write(f->tx_rb, idx)) #endif - ipcp_sdb_release(sdb); - return; - } + goto fail; + + shm_flow_set_notify(f->set, f->info.id, FLOW_PKT); + + return; - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + fail: + ipcp_sdb_release(sdb); + return; } static void send_frct_pkt(struct frcti * frcti) @@ -245,9 +290,11 @@ static void send_frct_pkt(struct frcti * frcti) assert(frcti); - pthread_rwlock_rdlock(&frcti->lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&frcti->lock); - if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { + if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) { pthread_rwlock_unlock(&frcti->lock); return; } @@ -256,65 +303,46 @@ static void send_frct_pkt(struct frcti * frcti) ackno = frcti->rcv_cr.lwe; rwe = frcti->rcv_cr.rwe; - clock_gettime(PTHREAD_COND_CLOCK, &now); - diff = ts_diff_ns(&frcti->rcv_cr.act, &now); - - pthread_rwlock_unlock(&frcti->lock); - - if (diff > frcti->a || diff < DELT_ACK) + if (diff > frcti->a) { + pthread_rwlock_unlock(&frcti->lock); return; + } - __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); - - pthread_rwlock_wrlock(&frcti->lock); + diff = ts_diff_ns(&frcti->snd_cr.act, &now); + if (diff < TICTIME) { + pthread_rwlock_unlock(&frcti->lock); + return; + } - if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) - frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; pthread_rwlock_unlock(&frcti->lock); + __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); } static void __send_rdv(int fd) { - struct shm_du_buff * sdb; - struct frct_pci * pci; - ssize_t idx; - struct flow * f; - - /* Raw calls needed to bypass frcti. */ - idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); - if (idx < 0) - return; - - pci = (struct frct_pci *) shm_du_buff_head(sdb); - memset(pci, 0, sizeof(*pci)); - - pci->flags = FRCT_RDVS; - - f = &ai.flows[fd]; - - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { - ipcp_sdb_release(sdb); - return; - } - - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + __send_frct_pkt(fd, FRCT_RDVS, 0, 0); } -static struct frcti * frcti_create(int fd) +static struct frcti * frcti_create(int fd, + time_t a, + time_t r, + time_t mpl) { struct frcti * frcti; ssize_t idx; struct timespec now; - time_t mpl; - time_t a; - time_t r; pthread_condattr_t cattr; #ifdef PROC_FLOW_STATS char frctstr[FRCT_NAME_STRLEN + 1]; #endif + mpl *= BILLION; + a *= BILLION; + r *= BILLION; + frcti = malloc(sizeof(*frcti)); if (frcti == NULL) goto fail_malloc; @@ -347,9 +375,9 @@ static struct frcti * frcti_create(int fd) clock_gettime(PTHREAD_COND_CLOCK, &now); - frcti->mpl = mpl = DELT_MPL; - frcti->a = a = DELT_A; - frcti->r = r = DELT_R; + frcti->mpl = mpl; + frcti->a = a; + frcti->r = r; frcti->rdv = DELT_RDV; frcti->fd = fd; @@ -358,10 +386,19 @@ static struct frcti * frcti_create(int fd) frcti->probe = false; frcti->srtt = 0; /* Updated on first ACK */ - frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */ - frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */ - - if (ai.flows[fd].qs.loss == 0) { + frcti->mdev = 10 * MILLION; /* Updated on first ACK */ + frcti->rto = BILLION; /* Initial rxm will be after 1 s */ +#ifdef PROC_FLOW_STATS + frcti->n_rtx = 0; + frcti->n_prb = 0; + frcti->n_rtt = 0; + frcti->n_dup = 0; + frcti->n_dak = 0; + frcti->n_rdv = 0; + frcti->n_out = 0; + frcti->n_rqo = 0; +#endif + if (ai.flows[fd].info.qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; frcti->rcv_cr.cflags |= FRCTFRTX; } @@ -448,9 +485,6 @@ static void frcti_setflags(struct frcti * frcti, #define frcti_rcv(frcti, sdb) \ (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) -#define frcti_tick(frcti) \ - (frcti == NULL ? 0 : __frcti_tick()) - #define frcti_dealloc(frcti) \ (frcti == NULL ? 0 : __frcti_dealloc(frcti)) @@ -494,6 +528,10 @@ static bool __frcti_is_window_open(struct frcti * frcti) if (diff > frcti->rdv) { frcti->t_rdvs = now; __send_rdv(frcti->fd); +#ifdef PROC_FLOW_STATS + frcti->n_rdv++; +#endif + } } @@ -520,7 +558,6 @@ static int __frcti_window_wait(struct frcti * frcti, while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { struct timespec now; - pthread_rwlock_unlock(&frcti->lock); pthread_mutex_lock(&frcti->mtx); @@ -534,9 +571,7 @@ static int __frcti_window_wait(struct frcti * frcti, pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx); - ret = -pthread_cond_timedwait(&frcti->cond, - &frcti->mtx, - abstime); + ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime); pthread_cleanup_pop(false); @@ -658,6 +693,7 @@ static int __frcti_snd(struct frcti * frcti, bool rtx; assert(frcti); + assert(shm_du_buff_len(sdb) != 0); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; @@ -706,9 +742,11 @@ static int __frcti_snd(struct frcti * frcti, frcti->rttseq = snd_cr->seqno; frcti->t_probe = now; frcti->probe = true; +#ifdef PROC_FLOW_STATS + frcti->n_prb++; +#endif } - - if (now.tv_sec - rcv_cr->act.tv_sec <= frcti->a) { + if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); rcv_cr->seqno = rcv_cr->lwe; @@ -738,17 +776,19 @@ static void rtt_estimator(struct frcti * frcti, } else { time_t delta = mrtt - srtt; srtt += (delta >> 3); - rttvar += (ABS(delta) - rttvar) >> 2; + delta = (ABS(delta) - rttvar) >> 2; +#ifdef FRCT_LINUX_RTT_ESTIMATOR + if (delta < 0) + delta >>= 3; +#endif + rttvar += delta; } - - frcti->srtt = MAX(1000U, srtt); - frcti->mdev = MAX(100U, rttvar); - frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1)); -} - -static void __frcti_tick(void) -{ - timerwheel_move(); +#ifdef PROC_FLOW_STATS + frcti->n_rtt++; +#endif + frcti->srtt = MAX(1000L, srtt); + frcti->mdev = MAX(100L, rttvar); + frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL)); } /* Always queues the next application packet on the RQ. */ @@ -785,11 +825,14 @@ static void __frcti_rcv(struct frcti * frcti, if (pci->flags & FRCT_DRF) { /* New run. */ rcv_cr->lwe = seqno; rcv_cr->rwe = seqno + RQ_SIZE; - } else { + rcv_cr->seqno = seqno; + } else if (pci->flags & FRCT_DATA) { goto drop_packet; } } + rcv_cr->act = now; + /* For now, just send an immediate window update. */ if (pci->flags & FRCT_RDVS) { fd = frcti->fd; @@ -808,6 +851,10 @@ static void __frcti_rcv(struct frcti * frcti, frcti->snd_cr.lwe = ackno; if (frcti->probe && after(ackno, frcti->rttseq)) { +#ifdef PROC_FLOW_STATS + if (!(pci->flags & FRCT_DATA)) + frcti->n_dak++; +#endif rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now)); frcti->probe = false; } @@ -838,20 +885,33 @@ static void __frcti_rcv(struct frcti * frcti, if (before(seqno, rcv_cr->lwe)) { rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */ +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif goto drop_packet; } if (rcv_cr->cflags & FRCTFRTX) { - if (!before(seqno, rcv_cr->rwe)) /* Out of window. */ + if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */ +#ifdef PROC_FLOW_STATS + frcti->n_out++; +#endif goto drop_packet; + } - if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) + if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) { +#ifdef PROC_FLOW_STATS + frcti->n_rqo++; +#endif goto drop_packet; /* Out of rq. */ - - if (frcti->rq[pos] != -1) + } + if (frcti->rq[pos] != -1) { +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif goto drop_packet; /* Duplicate in rq. */ - + } fd = frcti->fd; } else { rcv_cr->lwe = seqno; @@ -859,72 +919,16 @@ static void __frcti_rcv(struct frcti * frcti, frcti->rq[pos] = idx; - rcv_cr->act = now; - pthread_rwlock_unlock(&frcti->lock); if (fd != -1) - timerwheel_ack(fd, frcti); + timerwheel_delayed_ack(fd, frcti); return; drop_packet: pthread_rwlock_unlock(&frcti->lock); - - send_frct_pkt(frcti); - shm_rdrbuff_remove(ai.rdrb, idx); + send_frct_pkt(frcti); return; } - -/* Filter fqueue events for non-data packets */ -int frcti_filter(struct fqueue * fq) -{ - struct shm_du_buff * sdb; - int fd; - ssize_t idx; - struct frcti * frcti; - struct shm_rbuff * rb; - - while (fq->next < fq->fqsize) { - if (fq->fqueue[fq->next + 1] != FLOW_PKT) - return 1; - - pthread_rwlock_rdlock(&ai.lock); - - fd = ai.ports[fq->fqueue[fq->next]].fd; - rb = ai.flows[fd].rx_rb; - frcti = ai.flows[fd].frcti; - - if (frcti == NULL) { - pthread_rwlock_unlock(&ai.lock); - return 1; - } - - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&ai.lock); - return 1; - } - - idx = shm_rbuff_read(rb); - if (idx < 0) { - pthread_rwlock_unlock(&ai.lock); - return 0; - } - - sdb = shm_rdrbuff_get(ai.rdrb, idx); - - __frcti_rcv(frcti, sdb); - - if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&ai.lock); - return 1; - } - - pthread_rwlock_unlock(&ai.lock); - - fq->next += 2; - } - - return fq->next < fq->fqsize; -} |