From 21dec0f30da7eecf965a3b088c9646029354b431 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sat, 10 Oct 2020 15:34:25 +0200 Subject: lib: Send and receive window updates This adds sending and receiving window updates for flow control. I used the 8 pad bits as part of the window update field, so it's 24 bits, allowing for ~16 million packets in flight. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/fccntl.h | 6 +++--- src/lib/CMakeLists.txt | 2 ++ src/lib/config.h.in | 1 + src/lib/frct.c | 44 +++++++++++++++++++++++++++++++++++--------- 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h index ccd74b6c..0ebc90f3 100644 --- a/include/ouroboros/fccntl.h +++ b/include/ouroboros/fccntl.h @@ -46,9 +46,9 @@ #define FLOWFINVALID (FLOWFWRONLY | FLOWFRDWR) /* FRCT flags */ -#define FRCTFRESCNTRL 00000001 /* Feedback from receiver */ -#define FRCTFRTX 00000002 /* Reliable flow */ -#define FRCTFLINGER 00000004 /* Sent unsent data */ +#define FRCTFRTX 00000001 /* Reliable flow */ +#define FRCTFRESCNTL 00000002 /* Feedback from receiver */ +#define FRCTFLINGER 00000004 /* Send unsent data */ /* Flow operations */ #define FLOWSRCVTIMEO 00000001 /* Set read timeout */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 5e60e375..02fca2d9 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -196,6 +196,8 @@ set(DELTA_T_ACK_DELAY 10 CACHE STRING "Maximum time to wait before acknowledging a packet (ms)") set(FRCT_REORDER_QUEUE_SIZE 256 CACHE STRING "Size of the reordering queue, must be a power of 2") +set(FRCT_START_WINDOW 64 CACHE STRING + "Start window, must be a power of 2") set(FRCT_RTO_MIN 250 CACHE STRING "Minimum Retransmission Timeout (RTO) for FRCT (us)") set(FRCT_TICK_TIME 500 CACHE STRING diff --git a/src/lib/config.h.in b/src/lib/config.h.in index 38c364c6..36221b8c 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -75,6 +75,7 @@ #define DELT_ACK (@DELTA_T_ACK_DELAY@ * MILLION) /* ns */ #define RQ_SIZE (@FRCT_REORDER_QUEUE_SIZE@) +#define START_WINDOW (@FRCT_START_WINDOW@) #define RTO_MIN (@FRCT_RTO_MIN@ * 1000) #define TICTIME (@FRCT_TICK_TIME@ * 1000) /* ns */ diff --git a/src/lib/frct.c b/src/lib/frct.c index 6ead72af..7f2d8fd3 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -67,8 +67,7 @@ enum frct_flags { struct frct_pci { uint8_t flags; - uint8_t pad; - + uint8_t pad; /* 24 bit window! */ uint16_t window; uint32_t seqno; @@ -205,6 +204,10 @@ static struct frcti * frcti_create(int fd) frcti->rcv_cr.cflags |= FRCTFRTX; } + frcti->snd_cr.cflags |= FRCTFRESCNTL; + + frcti->snd_cr.rwe = START_WINDOW; + frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1); @@ -244,13 +247,13 @@ static uint16_t frcti_getflags(struct frcti * frcti) static void frcti_setflags(struct frcti * frcti, uint16_t flags) { - flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */ + flags |= FRCTFRTX; /* Should not be set by command */ assert(frcti); pthread_rwlock_wrlock(&frcti->lock); - frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags */ + frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */ frcti->snd_cr.cflags &= flags; @@ -287,6 +290,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) idx = frcti->rq[pos]; if (idx != -1) { ++frcti->rcv_cr.lwe; + ++frcti->rcv_cr.rwe; frcti->rq[pos] = -1; } @@ -390,12 +394,18 @@ static int __frcti_snd(struct frcti * frcti, /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); - frcti->snd_cr.lwe = snd_cr->seqno - 1; + snd_cr->lwe = snd_cr->seqno - 1; + snd_cr->rwe = snd_cr->lwe + START_WINDOW; } seqno = snd_cr->seqno; pci->seqno = hton32(seqno); + if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) { + pci->flags |= FRCT_FC; + *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF); + } + if (!rtx) { snd_cr->lwe++; } else { @@ -405,7 +415,7 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } - if (now.tv_sec - rcv_cr->act.tv_sec <= rcv_cr->inact) { + if (now.tv_sec - rcv_cr->act.tv_sec <= frcti->a) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); rcv_cr->seqno = rcv_cr->lwe; @@ -457,6 +467,7 @@ static void __frcti_rcv(struct frcti * frcti, struct frct_pci * pci; struct timespec now; struct frct_cr * rcv_cr; + struct frct_cr * snd_cr; uint32_t seqno; uint32_t ackno; int fd = -1; @@ -464,6 +475,7 @@ static void __frcti_rcv(struct frcti * frcti, assert(frcti); rcv_cr = &frcti->rcv_cr; + snd_cr = &frcti->snd_cr; clock_gettime(PTHREAD_COND_CLOCK, &now); @@ -476,10 +488,12 @@ static void __frcti_rcv(struct frcti * frcti, pthread_rwlock_wrlock(&frcti->lock); if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) { - if (pci->flags & FRCT_DRF) /* New run. */ + if (pci->flags & FRCT_DRF) { /* New run. */ rcv_cr->lwe = seqno; - else + rcv_cr->rwe = seqno + RQ_SIZE; + } else { goto drop_packet; + } } if (pci->flags & FRCT_ACK) { @@ -493,6 +507,14 @@ static void __frcti_rcv(struct frcti * frcti, } } + if (pci->flags & FRCT_FC) { + uint32_t rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); + if (before(rwe, snd_cr->lwe & 0x00FFFFFF)) + snd_cr->rwe += 0x01000000; + + snd_cr->rwe = (snd_cr->rwe & 0xFF000000) + rwe; + } + if (!(pci->flags & FRCT_DATA)) goto drop_packet; @@ -502,7 +524,11 @@ static void __frcti_rcv(struct frcti * frcti, } if (rcv_cr->cflags & FRCTFRTX) { - if ((seqno - rcv_cr->lwe) >= RQ_SIZE) + + if (!before(seqno, rcv_cr->rwe)) /* Out of window */ + goto drop_packet; + + if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) goto drop_packet; /* Out of rq. */ if (frcti->rq[pos] != -1) -- cgit v1.2.3