From e00c9b13acad23e14df9d5cf4c7868dfd6e1bc55 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 8 Feb 2019 10:47:42 +0100 Subject: lib: Add initial rtt estimator to FRCT This adds a simple round-trip time estimator to FRCT. The estimate is a weighted average with deviation. The retransmission is scheduled after rtt + 2 times the deviation. A retransmit doubles the rtt estimate to avoid the no-update case when rtt suddenly increases. The rtt is estimated in microseconds and the granularity for retransmits is 256 microseconds. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/utils.h | 1 + src/lib/frct.c | 75 +++++++++++++++++++++++++++++++++++++++-------- src/lib/rxmwheel.c | 32 ++++++++++++++++---- 3 files changed, 90 insertions(+), 18 deletions(-) diff --git a/include/ouroboros/utils.h b/include/ouroboros/utils.h index f5b6686f..d40a1783 100644 --- a/include/ouroboros/utils.h +++ b/include/ouroboros/utils.h @@ -28,6 +28,7 @@ #define MIN(a,b) (((a) < (b)) ? (a) : (b)) #define MAX(a,b) (((a) > (b)) ? (a) : (b)) +#define ABS(a) ((a) > 0 ? (a) : -(a)) typedef struct { uint8_t * data; diff --git a/src/lib/frct.c b/src/lib/frct.c index 02c101c8..bd08d86a 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -50,7 +50,11 @@ struct frcti { time_t a; time_t r; - time_t rto; /* ms */ + time_t srtt_us; /* smoothed rtt */ + time_t mdev_us; /* mdev */ + uint32_t rttseq; + struct timespec t_probe; /* probe time */ + bool probe; /* probe active */ struct frct_cr snd_cr; struct frct_cr rcv_cr; @@ -110,8 +114,11 @@ static struct frcti * frcti_create(int fd) frcti->snd_cr.inact = 3 * delta_t; frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - /* Initial rto. FIXME: recalc using Karn algorithm. */ - frcti->rto = 120; + /* rtt estimator. rto is currently srtt + 2 * mdev */ + frcti->srtt_us = 0; /* updated on first ACK */ + frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */ + frcti->rttseq = 0; + frcti->probe = false; if (ai.flows[fd].spec.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; @@ -200,6 +207,18 @@ static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) return pci; } +static bool before(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq1 - seq2) < 0; +} + +static bool after(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq2 - seq1) < 0; +} + static int __frcti_snd(struct frcti * frcti, struct shm_du_buff * sdb) { @@ -219,7 +238,7 @@ static int __frcti_snd(struct frcti * frcti, if (pci == NULL) return -1; - clock_gettime(CLOCK_REALTIME_COARSE, &now); + clock_gettime(CLOCK_REALTIME, &now); pthread_rwlock_wrlock(&frcti->lock); @@ -244,12 +263,20 @@ static int __frcti_snd(struct frcti * frcti, pci->seqno = hton32(snd_cr->seqno); if (!(snd_cr->cflags & FRCTFRTX)) { snd_cr->lwe++; - } else if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { - rxmwheel_add(frcti, snd_cr->seqno, sdb); - if (rcv_cr->lwe <= rcv_cr->seqno) { - pci->flags |= FRCT_ACK; - pci->ackno = hton32(rcv_cr->seqno); - rcv_cr->lwe = rcv_cr->seqno; + } else { + if (!frcti->probe) { + frcti->rttseq = snd_cr->seqno; + frcti->t_probe = now; + frcti->probe = true; + } + + if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { + rxmwheel_add(frcti, snd_cr->seqno, sdb); + if (rcv_cr->lwe <= rcv_cr->seqno) { + pci->flags |= FRCT_ACK; + pci->ackno = hton32(rcv_cr->seqno); + rcv_cr->lwe = rcv_cr->seqno; + } } } @@ -261,6 +288,26 @@ static int __frcti_snd(struct frcti * frcti, return 0; } +static void rtt_estimator(struct frcti * frcti, + time_t mrtt_us) +{ + time_t srtt = frcti->srtt_us; + time_t mdev = frcti->mdev_us; + + if (srtt != 0) { + srtt -= (srtt >> 3); + srtt += mrtt_us >> 3; /* rtt = 7/8 rtt + 1/8 new */ + mdev -= (mdev >> 2); + mdev += ABS(srtt - mrtt_us) >> 2; + } else { + srtt = mrtt_us << 3; /* take the measured time to be rtt */ + mdev = mrtt_us >> 1; /* take half mrtt_us as deviation */ + } + + frcti->srtt_us = MAX(1U, srtt); + frcti->mdev_us = MAX(1U, mdev); +} + /* Returns 0 when idx contains a packet for the application. */ static int __frcti_rcv(struct frcti * frcti, struct shm_du_buff * sdb) @@ -280,7 +327,7 @@ static int __frcti_rcv(struct frcti * frcti, pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); - clock_gettime(CLOCK_REALTIME_COARSE, &now); + clock_gettime(CLOCK_REALTIME, &now); pthread_rwlock_wrlock(&frcti->lock); @@ -300,7 +347,7 @@ static int __frcti_rcv(struct frcti * frcti, if (seqno == rcv_cr->seqno) { ++rcv_cr->seqno; } else { /* Out of order. */ - if ((int32_t)(seqno - rcv_cr->seqno) < 0) + if (before(seqno, rcv_cr->seqno)) goto drop_packet; if (rcv_cr->cflags & FRCTFRTX) { @@ -321,6 +368,10 @@ static int __frcti_rcv(struct frcti * frcti, /* Check for duplicate (old) acks. */ if ((int32_t)(ackno - snd_cr->lwe) >= 0) snd_cr->lwe = ackno; + if (frcti->probe && after(ackno, frcti->rttseq)) { + rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now)); + frcti->probe = false; + } } rcv_cr->act = now.tv_sec; diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 395a5d8f..56ac7298 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -22,15 +22,16 @@ #include -#define RXMQ_S 12 /* defines #slots */ -#define RXMQ_M 15 /* defines max delay */ +#define RXMQ_S 16 /* defines #slots */ +#define RXMQ_M 24 /* defines max delay */ #define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ #define RXMQ_SLOTS (1 << RXMQ_S) #define RXMQ_MAX (1 << RXMQ_M) /* ms */ /* Small inacurracy to avoid slow division by MILLION. */ #define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) -#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) +#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10)) +#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) struct rxm { struct list_head next; @@ -109,6 +110,21 @@ static void rxmwheel_clear(int fd) pthread_mutex_unlock(&rw.lock); } +static void check_probe(struct frcti * frcti, + uint32_t seqno) +{ + /* disable rtt probe if this packet */ + + /* TODO: This should be locked, but lock reversal! */ + + if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { + /* Backoff to avoid never updating rtt */ + frcti->srtt_us <<= 1; + frcti->probe = false; + } +} + +#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1)) /* Return fd on r-timer expiry. */ static int rxmwheel_move(void) { @@ -148,6 +164,10 @@ static int rxmwheel_move(void) free(r); continue; } + + /* Disable using this seqno as rto probe. */ + check_probe(r->frcti, r->seqno); + /* Check for r-timer expiry. */ if (ts_to_ms(now) - r->t0 > r->frcti->r) { int fd = r->frcti->fd; @@ -201,7 +221,7 @@ static int rxmwheel_move(void) r->tail = shm_du_buff_tail(sdb); r->sdb = sdb; - newtime = ts_to_ms(now) + (f->frcti->rto << ++r->mul); + newtime = ts_to_us(now) + rto(f->frcti); rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[rslot]); @@ -231,7 +251,7 @@ static int rxmwheel_add(struct frcti * frcti, pthread_mutex_lock(&rw.lock); - r->t0 = ts_to_ms(now); + r->t0 = ts_to_us(now); r->mul = 0; r->seqno = seqno; r->sdb = sdb; @@ -239,7 +259,7 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; - slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1); + slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[slot]); -- cgit v1.2.3