From 1e3a9e464cbb2f02c057e9f63c1f270ff27530f4 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sun, 20 Sep 2020 13:04:52 +0200 Subject: lib: Complete retransmission logic This completes the retransmission (automated repeat-request, ARQ) logic, sending (delayed) ACK messages when needed. On deallocation, flows will ACK try to retransmit any remaining unacknowledged messages (unless the FRCTFLINGER flag is turned off; this is on by default). Applications can safely shut down as soon as everything is ACK'd (i.e. the current Delta-t run is done). The activity timeout is now passed to the IPCP for it to sleep before completing deallocation (and releasing the flow_id). That should be moved to the IRMd in due time. The timerwheel is revised to be multi-level to reduce memory consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16) and each level has RXMQ_SLOTS (1 << 8) slots. The lowest level has a resolution of (1 << RXMQ_RES) (20) ns, which is roughly a millisecond. Currently, 3 levels are defined, so the largest delay we can schedule at each level is: Level 0: 256ms Level 1: 4s Level 2: about a minute. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/frct.c | 244 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 147 insertions(+), 97 deletions(-) (limited to 'src/lib/frct.c') diff --git a/src/lib/frct.c b/src/lib/frct.c index 2bd126f4..c26910fa 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,7 +21,7 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL (60 * BILLION) /* ns */ +#define DELT_MPL (5 * BILLION) /* ns */ #define DELT_A (1 * BILLION) /* ns */ #define DELT_R (20 * BILLION) /* ns */ @@ -59,8 +59,6 @@ struct frcti { struct frct_cr snd_cr; struct frct_cr rcv_cr; - struct rxmwheel * rw; - ssize_t rq[RQ_SIZE]; pthread_rwlock_t lock; }; @@ -86,7 +84,84 @@ struct frct_pci { uint32_t ackno; } __attribute__((packed)); -#include +static bool before(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq1 - seq2) < 0; +} + +static bool after(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq2 - seq1) < 0; +} + +static void __send_ack(int fd, + int ackno) +{ + struct shm_du_buff * sdb; + struct frct_pci * pci; + ssize_t idx; + struct flow * f; + + /* Raw calls needed to bypass frcti. */ + idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); + if (idx < 0) + return; + + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memset(pci, 0, sizeof(*pci)); + + pci->flags = FRCT_ACK; + pci->ackno = hton32(ackno); + + f = &ai.flows[fd]; + + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { + ipcp_sdb_release(sdb); + return; + } + + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); +} + +static void frct_send_ack(struct frcti * frcti) +{ + struct timespec now; + time_t diff; + uint32_t ackno; + int fd; + + assert(frcti); + + pthread_rwlock_rdlock(&frcti->lock); + + if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + ackno = frcti->rcv_cr.lwe; + fd = frcti->fd; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + diff = ts_diff_ns(&frcti->rcv_cr.act, &now); + + pthread_rwlock_unlock(&frcti->lock); + + if (diff > frcti->a || diff < DELT_ACK) + return; + + __send_ack(fd, ackno); + + pthread_rwlock_wrlock(&frcti->lock); + + if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); +} static struct frcti * frcti_create(int fd) { @@ -123,14 +198,10 @@ static struct frcti * frcti_create(int fd) frcti->srtt = 0; /* Updated on first ACK */ frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */ frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */ - frcti->rw = NULL; if (ai.flows[fd].qs.loss == 0) { - frcti->snd_cr.cflags |= FRCTFRTX; + frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; frcti->rcv_cr.cflags |= FRCTFRTX; - frcti->rw = rxmwheel_create(); - if (frcti->rw == NULL) - goto fail_rw; } frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ @@ -141,8 +212,6 @@ static struct frcti * frcti_create(int fd) return frcti; - fail_rw: - pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); fail_malloc: @@ -151,24 +220,16 @@ static struct frcti * frcti_create(int fd) static void frcti_destroy(struct frcti * frcti) { - /* - * FIXME: In case of reliable transmission we should - * make sure everything we sent is acked. - */ - - if (frcti->rw != NULL) - rxmwheel_destroy(frcti->rw); - pthread_rwlock_destroy(&frcti->lock); free(frcti); } -static uint16_t frcti_getconf(struct frcti * frcti) +static uint16_t frcti_getflags(struct frcti * frcti) { uint16_t ret; - assert (frcti); + assert(frcti); pthread_rwlock_rdlock(&frcti->lock); @@ -179,6 +240,22 @@ static uint16_t frcti_getconf(struct frcti * frcti) return ret; } +static void frcti_setflags(struct frcti * frcti, + uint16_t flags) +{ + flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */ + + assert(frcti); + + pthread_rwlock_wrlock(&frcti->lock); + + frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags */ + + frcti->snd_cr.cflags &= flags; + + pthread_rwlock_unlock(&frcti->lock); +} + #define frcti_queued_pdu(frcti) \ (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) @@ -189,8 +266,10 @@ static uint16_t frcti_getconf(struct frcti * frcti) (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) #define frcti_tick(frcti) \ - (frcti == NULL ? 0 : __frcti_tick(frcti)) + (frcti == NULL ? 0 : __frcti_tick()) +#define frcti_dealloc(frcti) \ + (frcti == NULL ? 0 : __frcti_dealloc(frcti)) static ssize_t __frcti_queued_pdu(struct frcti * frcti) { @@ -233,78 +312,41 @@ static ssize_t __frcti_pdu_ready(struct frcti * frcti) return idx; } -static bool before(uint32_t seq1, - uint32_t seq2) -{ - return (int32_t)(seq1 - seq2) < 0; -} - -static bool after(uint32_t seq1, - uint32_t seq2) -{ - return (int32_t)(seq2 - seq1) < 0; -} +#include -static void frct_send_ack(struct frcti * frcti) +/* + * Send a final ACK for everything that has not been ACK'd. + * If the flow should be kept active for retransmission, + * the returned time will be negative. + */ +static time_t __frcti_dealloc(struct frcti * frcti) { - struct shm_du_buff * sdb; - struct frct_pci * pci; - ssize_t idx; - struct timespec now; - time_t diff; - uint32_t ackno; - struct flow * f; + struct timespec now; + time_t wait; + int ackno; + int fd = -1; - assert(frcti); + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_rdlock(&frcti->lock); - if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { - pthread_rwlock_unlock(&frcti->lock); - return; - } - ackno = frcti->rcv_cr.lwe; + if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno) + fd = frcti->fd; - pthread_rwlock_unlock(&frcti->lock); + wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec, + frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec); - clock_gettime(PTHREAD_COND_CLOCK, &now); + if (frcti->snd_cr.cflags & FRCTFLINGER + && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno)) + wait = -wait; - diff = ts_diff_ns(&frcti->rcv_cr.act, &now); - - if (diff > frcti->a) - return; - - if (diff < DELT_ACK) - return; - - /* Raw calls needed to bypass frcti. */ - idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); - if (idx < 0) - return; - - pci = (struct frct_pci *) shm_du_buff_head(sdb); - memset(pci, 0, sizeof(*pci)); - - pci->flags = FRCT_ACK; - pci->ackno = hton32(ackno); - - f = &ai.flows[frcti->fd]; - - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { - pthread_rwlock_rdlock(&ai.lock); - ipcp_sdb_release(sdb); - return; - } - - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - - pthread_rwlock_wrlock(&frcti->lock); + pthread_rwlock_unlock(&frcti->lock); - if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) - frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + if (fd != -1) + __send_ack(fd, ackno); - pthread_rwlock_unlock(&frcti->lock); + return wait; } static int __frcti_snd(struct frcti * frcti, @@ -315,14 +357,14 @@ static int __frcti_snd(struct frcti * frcti, struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; + bool rtx; assert(frcti); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; - if (frcti->rw != NULL) - rxmwheel_move(frcti->rw); + timerwheel_move(); pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci == NULL) @@ -334,6 +376,8 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_wrlock(&frcti->lock); + rtx = snd_cr->cflags & FRCTFRTX; + pci->flags |= FRCT_DATA; /* Set DRF if there are no unacknowledged packets. */ @@ -351,7 +395,7 @@ static int __frcti_snd(struct frcti * frcti, seqno = snd_cr->seqno; pci->seqno = hton32(seqno); - if (!(snd_cr->cflags & FRCTFRTX)) { + if (!rtx) { snd_cr->lwe++; } else { if (!frcti->probe) { @@ -372,8 +416,8 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); - if (frcti->rw != NULL) - rxmwheel_add(frcti->rw, frcti, seqno, sdb); + if (rtx) + timerwheel_rxm(frcti, seqno, sdb); return 0; } @@ -387,12 +431,10 @@ static void rtt_estimator(struct frcti * frcti, if (srtt == 0) { /* first measurement */ srtt = mrtt; rttvar = mrtt >> 1; - } else { time_t delta = mrtt - srtt; srtt += (delta >> 3); - rttvar -= rttvar >> 2; - rttvar += ABS(delta) >> 2; + rttvar += (ABS(delta) - rttvar) >> 2; } frcti->srtt = MAX(1000U, srtt); @@ -401,12 +443,9 @@ static void rtt_estimator(struct frcti * frcti, frcti->srtt + (frcti->mdev << 1)); } -static void __frcti_tick(struct frcti * frcti) +static void __frcti_tick(void) { - if (frcti->rw != NULL) { - rxmwheel_move(frcti->rw); - frct_send_ack(frcti); - } + timerwheel_move(); } /* Always queues the next application packet on the RQ. */ @@ -420,6 +459,7 @@ static void __frcti_rcv(struct frcti * frcti, struct frct_cr * rcv_cr; uint32_t seqno; uint32_t ackno; + int fd = -1; assert(frcti); @@ -456,8 +496,10 @@ static void __frcti_rcv(struct frcti * frcti, if (!(pci->flags & FRCT_DATA)) goto drop_packet; - if (before(seqno, rcv_cr->lwe)) + if (before(seqno, rcv_cr->lwe)) { + rcv_cr->seqno = seqno; goto drop_packet; + } if (rcv_cr->cflags & FRCTFRTX) { if ((seqno - rcv_cr->lwe) >= RQ_SIZE) @@ -465,6 +507,8 @@ static void __frcti_rcv(struct frcti * frcti, if (frcti->rq[pos] != -1) goto drop_packet; /* Duplicate in rq. */ + + fd = frcti->fd; } else { rcv_cr->lwe = seqno; } @@ -475,10 +519,16 @@ static void __frcti_rcv(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); + if (fd != -1) + timerwheel_ack(fd, frcti); + return; drop_packet: pthread_rwlock_unlock(&frcti->lock); + + frct_send_ack(frcti); + shm_rdrbuff_remove(ai.rdrb, idx); return; } @@ -492,7 +542,7 @@ int frcti_filter(struct fqueue * fq) struct frcti * frcti; struct shm_rbuff * rb; - while(fq->next < fq->fqsize) { + while (fq->next < fq->fqsize) { if (fq->fqueue[fq->next + 1] != FLOW_PKT) return 1; -- cgit v1.2.3