diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2020-04-29 21:26:13 +0200 |
---|---|---|
committer | Sander Vrijders <sander@ouroboros.rocks> | 2020-04-30 11:54:44 +0200 |
commit | f6ae2ac9f0735846c58e0e953c01c3d834dd7d08 (patch) | |
tree | 541c1fea8b2b10d8abeac53f2814990126bf4efc /src | |
parent | 04a3795a70d6deb4840b30f5889e41ed42c85a6e (diff) | |
download | ouroboros-f6ae2ac9f0735846c58e0e953c01c3d834dd7d08.tar.gz ouroboros-f6ae2ac9f0735846c58e0e953c01c3d834dd7d08.zip |
lib: Stabilize FRCP under packet loss conditions0.17.3
There were a bunch of bugs in FRCP that urgently needed fixing. Now
data QoS is usable even with heavy packet loss (within some
parameters). The current RTT estimator is the IETF one. It should be
updated to the improved one used in the Linux kernel once the A-timer
(ACKs without data) and graceful shutdown are implemented.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src')
-rw-r--r-- | src/lib/frct.c | 91 | ||||
-rw-r--r-- | src/lib/rxmwheel.c | 36 |
2 files changed, 68 insertions, 59 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index e4b858d0..bc07be5a 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,9 +21,9 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL 60000 /* ms */ -#define DELT_A 0 /* ms */ -#define DELT_R 2000 /* ms */ +#define DELT_MPL 60000 /* ms */ +#define DELT_A 3000 /* ms */ +#define DELT_R 20000 /* ms */ #define RQ_SIZE 64 @@ -50,11 +50,12 @@ struct frcti { time_t a; time_t r; - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ + time_t srtt_us; /* smoothed rtt */ + time_t mdev_us; /* mdev */ + time_t rto; /* retransmission timeout */ uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ + struct timespec t_probe; /* probe time */ + bool probe; /* probe active */ struct frct_cr snd_cr; struct frct_cr rcv_cr; @@ -114,12 +115,14 @@ static struct frcti * frcti_create(int fd) frcti->snd_cr.inact = 3 * delta_t; frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - /* rtt estimator. rto is currently srtt + 2 * mdev */ - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */ + frcti->rttseq = 0; frcti->probe = false; + frcti->srtt_us = 0; /* updated on first ACK */ + frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */ + frcti->rto = 200000; /* initial rxm will be after 200 ms */ + if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.cflags |= FRCTFRTX; @@ -184,10 +187,10 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) /* See if we already have the next PDU. */ pthread_rwlock_wrlock(&frcti->lock); - pos = frcti->rcv_cr.seqno & (RQ_SIZE - 1); + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); idx = frcti->rq[pos]; if (idx != -1) { - ++frcti->rcv_cr.seqno; + ++frcti->rcv_cr.lwe; frcti->rq[pos] = -1; } @@ -252,11 +255,7 @@ static int __frcti_snd(struct frcti * frcti, if (now.tv_sec - snd_cr->act > snd_cr->inact) { /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); -#ifdef CONFIG_OUROBOROS_DEBUG - snd_cr->seqno = 0; -#else random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); -#endif frcti->snd_cr.lwe = snd_cr->seqno - 1; } @@ -270,13 +269,11 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } + rxmwheel_add(frcti, snd_cr->seqno, sdb); + if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { - rxmwheel_add(frcti, snd_cr->seqno, sdb); - if (rcv_cr->lwe <= rcv_cr->seqno) { - pci->flags |= FRCT_ACK; - pci->ackno = hton32(rcv_cr->seqno); - rcv_cr->lwe = rcv_cr->seqno; - } + pci->flags |= FRCT_ACK; + pci->ackno = hton32(rcv_cr->lwe); } } @@ -291,21 +288,23 @@ static int __frcti_snd(struct frcti * frcti, static void rtt_estimator(struct frcti * frcti, time_t mrtt_us) { - time_t srtt = frcti->srtt_us; - time_t mdev = frcti->mdev_us; - - if (srtt != 0) { - srtt -= (srtt >> 3); - srtt += mrtt_us >> 3; /* rtt = 7/8 rtt + 1/8 new */ - mdev -= (mdev >> 2); - mdev += ABS(srtt - mrtt_us) >> 2; + time_t srtt = frcti->srtt_us; + time_t rttvar = frcti->mdev_us; + + if (srtt == 0) { /* first measurement */ + srtt = mrtt_us; + rttvar = mrtt_us >> 1; + } else { - srtt = mrtt_us << 3; /* take the measured time to be rtt */ - mdev = mrtt_us >> 1; /* take half mrtt_us as deviation */ + time_t delta = mrtt_us - srtt; + srtt += (delta >> 3); + rttvar -= rttvar >> 2; + rttvar += ABS(delta) >> 2; } - frcti->srtt_us = MAX(1U, srtt); - frcti->mdev_us = MAX(1U, mdev); + frcti->srtt_us = MAX(1U, srtt); + frcti->mdev_us = MAX(1U, rttvar); + frcti->rto = srtt + (rttvar >> 2); } /* Returns 0 when idx contains a packet for the application. */ @@ -339,35 +338,39 @@ static int __frcti_rcv(struct frcti * frcti, if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { /* Inactive receiver, check for DRF. */ if (pci->flags & FRCT_DRF) /* New run. */ - rcv_cr->seqno = seqno; + rcv_cr->lwe = seqno; else goto drop_packet; } - if (seqno == rcv_cr->seqno) { - ++rcv_cr->seqno; + if (seqno == rcv_cr->lwe) { + ++rcv_cr->lwe; } else { /* Out of order. */ - if (before(seqno, rcv_cr->seqno)) + if (before(seqno, rcv_cr->lwe) ) goto drop_packet; if (rcv_cr->cflags & FRCTFRTX) { size_t pos = seqno & (RQ_SIZE - 1); - if ((seqno - rcv_cr->lwe) > RQ_SIZE /* Out of rq. */ - || frcti->rq[pos] != -1) /* Duplicate in rq. */ - goto drop_packet; + if ((seqno - rcv_cr->lwe) >= RQ_SIZE) + goto drop_packet; /* Out of rq. */ + + if (frcti->rq[pos] != -1) + goto drop_packet; /* Duplicate in rq */ + /* Queue. */ frcti->rq[pos] = idx; ret = -EAGAIN; } else { - rcv_cr->seqno = seqno + 1; + rcv_cr->lwe = seqno + 1; } } if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) { uint32_t ackno = ntoh32(pci->ackno); /* Check for duplicate (old) acks. */ - if ((int32_t)(ackno - snd_cr->lwe) >= 0) + if ((int32_t)(ackno - snd_cr->lwe) > 0) snd_cr->lwe = ackno; + if (frcti->probe && after(ackno, frcti->rttseq)) { rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now)); frcti->probe = false; @@ -378,7 +381,7 @@ static int __frcti_rcv(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); - if (!(pci->flags & FRCT_DATA)) + if (ret == 0 && !(pci->flags & FRCT_DATA)) shm_rdrbuff_remove(ai.rdrb, idx); rxmwheel_move(); diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index ce7ef8e4..3f01a0d3 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -22,11 +22,11 @@ #include <ouroboros/list.h> -#define RXMQ_S 16 /* defines #slots */ -#define RXMQ_M 24 /* defines max delay */ -#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ +#define RXMQ_S 16 /* defines #slots */ +#define RXMQ_M 24 /* defines max delay (us) */ +#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (us) */ #define RXMQ_SLOTS (1 << RXMQ_S) -#define RXMQ_MAX (1 << RXMQ_M) /* ms */ +#define RXMQ_MAX (1 << RXMQ_M) /* us */ /* Small inacurracy to avoid slow division by MILLION. */ #define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) @@ -119,12 +119,11 @@ static void check_probe(struct frcti * frcti, if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { /* Backoff to avoid never updating rtt */ - frcti->srtt_us <<= 1; + frcti->srtt_us += frcti->mdev_us; frcti->probe = false; } } -#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1)) /* Return fd on r-timer expiry. */ static int rxmwheel_move(void) { @@ -136,12 +135,17 @@ static int rxmwheel_move(void) clock_gettime(PTHREAD_COND_CLOCK, &now); + pthread_mutex_lock(&rw.lock); + slot = ts_to_slot(now); - pthread_mutex_lock(&rw.lock); + i = rw.prv; - for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { - list_for_each_safe(p, h, &rw.wheel[i]) { + if (slot < i) + slot += RXMQ_SLOTS; + + while (i++ < slot) { + list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; @@ -158,7 +162,7 @@ static int rxmwheel_move(void) snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_cr->lwe) <= 0) { + if ((int) (r->seqno - snd_cr->lwe) < 0) { shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); @@ -180,7 +184,7 @@ static int rxmwheel_move(void) /* Copy the payload, safe rtx in other layers. */ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - /* FIXME: reschedule send? */ + /* FIXME: reschedule send instead of failing? */ int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); shm_du_buff_ack(r->sdb); @@ -206,10 +210,12 @@ static int rxmwheel_move(void) /* Retransmit the copy. */ if (shm_rbuff_write(f->tx_rb, idx)) { + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); ipcp_sdb_release(sdb); free(r); /* FIXME: reschedule send? */ - continue; + return fd; } shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); @@ -221,14 +227,14 @@ static int rxmwheel_move(void) r->tail = shm_du_buff_tail(sdb); r->sdb = sdb; - newtime = ts_to_us(now) + rto(f->frcti); + newtime = ts_to_us(now) + f->frcti->rto; rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[rslot]); } } - rw.prv = slot; + rw.prv = slot & (RXMQ_SLOTS - 1); pthread_mutex_unlock(&rw.lock); @@ -259,7 +265,7 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; - slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1); + slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[slot]); |