From 21dec0f30da7eecf965a3b088c9646029354b431 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sat, 10 Oct 2020 15:34:25 +0200 Subject: lib: Send and receive window updates This adds sending and receiving window updates for flow control. I used the 8 pad bits as part of the window update field, so it's 24 bits, allowing for ~16 million packets in flight. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/CMakeLists.txt | 2 ++ src/lib/config.h.in | 1 + src/lib/frct.c | 44 +++++++++++++++++++++++++++++++++++--------- 3 files changed, 38 insertions(+), 9 deletions(-) (limited to 'src/lib') diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 5e60e375..02fca2d9 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -196,6 +196,8 @@ set(DELTA_T_ACK_DELAY 10 CACHE STRING "Maximum time to wait before acknowledging a packet (ms)") set(FRCT_REORDER_QUEUE_SIZE 256 CACHE STRING "Size of the reordering queue, must be a power of 2") +set(FRCT_START_WINDOW 64 CACHE STRING + "Start window, must be a power of 2") set(FRCT_RTO_MIN 250 CACHE STRING "Minimum Retransmission Timeout (RTO) for FRCT (us)") set(FRCT_TICK_TIME 500 CACHE STRING diff --git a/src/lib/config.h.in b/src/lib/config.h.in index 38c364c6..36221b8c 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -75,6 +75,7 @@ #define DELT_ACK (@DELTA_T_ACK_DELAY@ * MILLION) /* ns */ #define RQ_SIZE (@FRCT_REORDER_QUEUE_SIZE@) +#define START_WINDOW (@FRCT_START_WINDOW@) #define RTO_MIN (@FRCT_RTO_MIN@ * 1000) #define TICTIME (@FRCT_TICK_TIME@ * 1000) /* ns */ diff --git a/src/lib/frct.c b/src/lib/frct.c index 6ead72af..7f2d8fd3 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -67,8 +67,7 @@ enum frct_flags { struct frct_pci { uint8_t flags; - uint8_t pad; - + uint8_t pad; /* 24 bit window! */ uint16_t window; uint32_t seqno; @@ -205,6 +204,10 @@ static struct frcti * frcti_create(int fd) frcti->rcv_cr.cflags |= FRCTFRTX; } + frcti->snd_cr.cflags |= FRCTFRESCNTL; + + frcti->snd_cr.rwe = START_WINDOW; + frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1); @@ -244,13 +247,13 @@ static uint16_t frcti_getflags(struct frcti * frcti) static void frcti_setflags(struct frcti * frcti, uint16_t flags) { - flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */ + flags |= FRCTFRTX; /* Should not be set by command */ assert(frcti); pthread_rwlock_wrlock(&frcti->lock); - frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags */ + frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */ frcti->snd_cr.cflags &= flags; @@ -287,6 +290,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) idx = frcti->rq[pos]; if (idx != -1) { ++frcti->rcv_cr.lwe; + ++frcti->rcv_cr.rwe; frcti->rq[pos] = -1; } @@ -390,12 +394,18 @@ static int __frcti_snd(struct frcti * frcti, /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); - frcti->snd_cr.lwe = snd_cr->seqno - 1; + snd_cr->lwe = snd_cr->seqno - 1; + snd_cr->rwe = snd_cr->lwe + START_WINDOW; } seqno = snd_cr->seqno; pci->seqno = hton32(seqno); + if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) { + pci->flags |= FRCT_FC; + *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF); + } + if (!rtx) { snd_cr->lwe++; } else { @@ -405,7 +415,7 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } - if (now.tv_sec - rcv_cr->act.tv_sec <= rcv_cr->inact) { + if (now.tv_sec - rcv_cr->act.tv_sec <= frcti->a) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); rcv_cr->seqno = rcv_cr->lwe; @@ -457,6 +467,7 @@ static void __frcti_rcv(struct frcti * frcti, struct frct_pci * pci; struct timespec now; struct frct_cr * rcv_cr; + struct frct_cr * snd_cr; uint32_t seqno; uint32_t ackno; int fd = -1; @@ -464,6 +475,7 @@ static void __frcti_rcv(struct frcti * frcti, assert(frcti); rcv_cr = &frcti->rcv_cr; + snd_cr = &frcti->snd_cr; clock_gettime(PTHREAD_COND_CLOCK, &now); @@ -476,10 +488,12 @@ static void __frcti_rcv(struct frcti * frcti, pthread_rwlock_wrlock(&frcti->lock); if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) { - if (pci->flags & FRCT_DRF) /* New run. */ + if (pci->flags & FRCT_DRF) { /* New run. */ rcv_cr->lwe = seqno; - else + rcv_cr->rwe = seqno + RQ_SIZE; + } else { goto drop_packet; + } } if (pci->flags & FRCT_ACK) { @@ -493,6 +507,14 @@ static void __frcti_rcv(struct frcti * frcti, } } + if (pci->flags & FRCT_FC) { + uint32_t rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); + if (before(rwe, snd_cr->lwe & 0x00FFFFFF)) + snd_cr->rwe += 0x01000000; + + snd_cr->rwe = (snd_cr->rwe & 0xFF000000) + rwe; + } + if (!(pci->flags & FRCT_DATA)) goto drop_packet; @@ -502,7 +524,11 @@ static void __frcti_rcv(struct frcti * frcti, } if (rcv_cr->cflags & FRCTFRTX) { - if ((seqno - rcv_cr->lwe) >= RQ_SIZE) + + if (!before(seqno, rcv_cr->rwe)) /* Out of window */ + goto drop_packet; + + if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) goto drop_packet; /* Out of rq. */ if (frcti->rq[pos] != -1) -- cgit v1.2.3