From 25d1721e7dc9fa15c8a7c5513f30e636e9bda397 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 1 May 2020 18:23:58 +0200 Subject: lib: Create an rxmwheel per flow The single retransmission wheel caused locking headaches as the calls for different flows could block on the same rxmwheel. This stabilizes the stack, but if the rdrbuff gets full there can now be big delays. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/frct.c | 86 +++++++++++++++++++++++++++++++++------------------------- 1 file changed, 49 insertions(+), 37 deletions(-) (limited to 'src/lib/frct.c') diff --git a/src/lib/frct.c b/src/lib/frct.c index 0e9d64c7..3c180128 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,15 +21,12 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL 60000 /* ms */ -#define DELT_A 3000 /* ms */ -#define DELT_R 20000 /* ms */ +#define DELT_MPL (60 * MILLION) /* us */ +#define DELT_A (1 * MILLION) /* us */ +#define DELT_R (20 * MILLION) /* us */ #define RQ_SIZE 1024 -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ - #define FRCT_PCILEN (sizeof(struct frct_pci)) struct frct_cr { @@ -44,24 +41,26 @@ struct frct_cr { }; struct frcti { - int fd; + int fd; + + time_t mpl; + time_t a; + time_t r; - time_t mpl; - time_t a; - time_t r; + time_t srtt_us; /* smoothed rtt */ + time_t mdev_us; /* mdev */ + time_t rto; /* retransmission timeout */ + uint32_t rttseq; + struct timespec t_probe; /* probe time */ + bool probe; /* probe active */ - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ - time_t rto; /* retransmission timeout */ - uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ + struct frct_cr snd_cr; + struct frct_cr rcv_cr; - struct frct_cr snd_cr; - struct frct_cr rcv_cr; + struct rxmwheel * rw; - ssize_t rq[RQ_SIZE]; - pthread_rwlock_t lock; + ssize_t rq[RQ_SIZE]; + pthread_rwlock_t lock; }; enum frct_flags { @@ -111,28 +110,35 @@ static struct frcti * frcti_create(int fd) frcti->r = DELT_R; frcti->fd = fd; - delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; + delta_t = frcti->mpl + frcti->a + frcti->r; - frcti->snd_cr.inact = 3 * delta_t; - frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ + frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - frcti->rttseq = 0; - frcti->probe = false; + frcti->rttseq = 0; + frcti->probe = false; - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ - frcti->rto = 20000; /* initial rxm will be after 20 ms */ + frcti->srtt_us = 0; /* updated on first ACK */ + frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ + frcti->rto = 20000; /* initial rxm will be after 20 ms */ + frcti->rw = NULL; if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.cflags |= FRCTFRTX; + frcti->rw = rxmwheel_create(); + if (frcti->rw == NULL) + goto fail_rw; } - frcti->rcv_cr.inact = 2 * delta_t; - frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */ + frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + return frcti; + fail_rw: + pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); fail_malloc: @@ -146,7 +152,8 @@ static void frcti_destroy(struct frcti * frcti) * make sure everything we sent is acked. */ - rxmwheel_clear(frcti->fd); + if (frcti->rw != NULL) + rxmwheel_destroy(frcti->rw); pthread_rwlock_destroy(&frcti->lock); @@ -229,13 +236,15 @@ static int __frcti_snd(struct frcti * frcti, struct timespec now; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; + uint32_t seqno; assert(frcti); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; - rxmwheel_move(); + if (frcti->rw != NULL) + rxmwheel_move(frcti->rw); pci = frcti_alloc_head(sdb); if (pci == NULL) @@ -259,7 +268,9 @@ static int __frcti_snd(struct frcti * frcti, frcti->snd_cr.lwe = snd_cr->seqno - 1; } - pci->seqno = hton32(snd_cr->seqno); + seqno = snd_cr->seqno; + pci->seqno = hton32(seqno); + if (!(snd_cr->cflags & FRCTFRTX)) { snd_cr->lwe++; } else { @@ -269,8 +280,6 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } - rxmwheel_add(frcti, snd_cr->seqno, sdb); - if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); @@ -282,6 +291,9 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); + if (frcti->rw != NULL) + rxmwheel_add(frcti->rw, frcti, seqno, sdb); + return 0; } @@ -384,13 +396,13 @@ static int __frcti_rcv(struct frcti * frcti, if (ret == 0 && !(pci->flags & FRCT_DATA)) shm_rdrbuff_remove(ai.rdrb, idx); - rxmwheel_move(); + if (frcti->rw != NULL) + rxmwheel_move(frcti->rw); return ret; drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; } -- cgit v1.2.3