summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-10-10 15:34:25 +0200
committerSander Vrijders <sander@ouroboros.rocks>2020-10-11 14:23:44 +0200
commit21dec0f30da7eecf965a3b088c9646029354b431 (patch)
treeb83883f2d0d1c4ec20390a8120ec79cc7c0cfcc1
parentbf736bebbe89618e23fc5cfb19cf049314cce03d (diff)
downloadouroboros-21dec0f30da7eecf965a3b088c9646029354b431.tar.gz
ouroboros-21dec0f30da7eecf965a3b088c9646029354b431.zip
lib: Send and receive window updates
This adds sending and receiving window updates for flow control. I used the 8 pad bits as part of the window update field, so it's 24 bits, allowing for ~16 million packets in flight. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--include/ouroboros/fccntl.h6
-rw-r--r--src/lib/CMakeLists.txt2
-rw-r--r--src/lib/config.h.in1
-rw-r--r--src/lib/frct.c44
4 files changed, 41 insertions, 12 deletions
diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h
index ccd74b6c..0ebc90f3 100644
--- a/include/ouroboros/fccntl.h
+++ b/include/ouroboros/fccntl.h
@@ -46,9 +46,9 @@
#define FLOWFINVALID (FLOWFWRONLY | FLOWFRDWR)
/* FRCT flags */
-#define FRCTFRESCNTRL 00000001 /* Feedback from receiver */
-#define FRCTFRTX 00000002 /* Reliable flow */
-#define FRCTFLINGER 00000004 /* Sent unsent data */
+#define FRCTFRTX 00000001 /* Reliable flow */
+#define FRCTFRESCNTL 00000002 /* Feedback from receiver */
+#define FRCTFLINGER 00000004 /* Send unsent data */
/* Flow operations */
#define FLOWSRCVTIMEO 00000001 /* Set read timeout */
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 5e60e375..02fca2d9 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -196,6 +196,8 @@ set(DELTA_T_ACK_DELAY 10 CACHE STRING
"Maximum time to wait before acknowledging a packet (ms)")
set(FRCT_REORDER_QUEUE_SIZE 256 CACHE STRING
"Size of the reordering queue, must be a power of 2")
+set(FRCT_START_WINDOW 64 CACHE STRING
+ "Start window, must be a power of 2")
set(FRCT_RTO_MIN 250 CACHE STRING
"Minimum Retransmission Timeout (RTO) for FRCT (us)")
set(FRCT_TICK_TIME 500 CACHE STRING
diff --git a/src/lib/config.h.in b/src/lib/config.h.in
index 38c364c6..36221b8c 100644
--- a/src/lib/config.h.in
+++ b/src/lib/config.h.in
@@ -75,6 +75,7 @@
#define DELT_ACK (@DELTA_T_ACK_DELAY@ * MILLION) /* ns */
#define RQ_SIZE (@FRCT_REORDER_QUEUE_SIZE@)
+#define START_WINDOW (@FRCT_START_WINDOW@)
#define RTO_MIN (@FRCT_RTO_MIN@ * 1000)
#define TICTIME (@FRCT_TICK_TIME@ * 1000) /* ns */
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 6ead72af..7f2d8fd3 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -67,8 +67,7 @@ enum frct_flags {
struct frct_pci {
uint8_t flags;
- uint8_t pad;
-
+ uint8_t pad; /* 24 bit window! */
uint16_t window;
uint32_t seqno;
@@ -205,6 +204,10 @@ static struct frcti * frcti_create(int fd)
frcti->rcv_cr.cflags |= FRCTFRTX;
}
+ frcti->snd_cr.cflags |= FRCTFRESCNTL;
+
+ frcti->snd_cr.rwe = START_WINDOW;
+
frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */
frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1);
@@ -244,13 +247,13 @@ static uint16_t frcti_getflags(struct frcti * frcti)
static void frcti_setflags(struct frcti * frcti,
uint16_t flags)
{
- flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */
+ flags |= FRCTFRTX; /* Should not be set by command */
assert(frcti);
pthread_rwlock_wrlock(&frcti->lock);
- frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags */
+ frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */
frcti->snd_cr.cflags &= flags;
@@ -287,6 +290,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
idx = frcti->rq[pos];
if (idx != -1) {
++frcti->rcv_cr.lwe;
+ ++frcti->rcv_cr.rwe;
frcti->rq[pos] = -1;
}
@@ -390,12 +394,18 @@ static int __frcti_snd(struct frcti * frcti,
/* There are no unacknowledged packets. */
assert(snd_cr->seqno == snd_cr->lwe);
random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
- frcti->snd_cr.lwe = snd_cr->seqno - 1;
+ snd_cr->lwe = snd_cr->seqno - 1;
+ snd_cr->rwe = snd_cr->lwe + START_WINDOW;
}
seqno = snd_cr->seqno;
pci->seqno = hton32(seqno);
+ if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) {
+ pci->flags |= FRCT_FC;
+ *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF);
+ }
+
if (!rtx) {
snd_cr->lwe++;
} else {
@@ -405,7 +415,7 @@ static int __frcti_snd(struct frcti * frcti,
frcti->probe = true;
}
- if (now.tv_sec - rcv_cr->act.tv_sec <= rcv_cr->inact) {
+ if (now.tv_sec - rcv_cr->act.tv_sec <= frcti->a) {
pci->flags |= FRCT_ACK;
pci->ackno = hton32(rcv_cr->lwe);
rcv_cr->seqno = rcv_cr->lwe;
@@ -457,6 +467,7 @@ static void __frcti_rcv(struct frcti * frcti,
struct frct_pci * pci;
struct timespec now;
struct frct_cr * rcv_cr;
+ struct frct_cr * snd_cr;
uint32_t seqno;
uint32_t ackno;
int fd = -1;
@@ -464,6 +475,7 @@ static void __frcti_rcv(struct frcti * frcti,
assert(frcti);
rcv_cr = &frcti->rcv_cr;
+ snd_cr = &frcti->snd_cr;
clock_gettime(PTHREAD_COND_CLOCK, &now);
@@ -476,10 +488,12 @@ static void __frcti_rcv(struct frcti * frcti,
pthread_rwlock_wrlock(&frcti->lock);
if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) {
- if (pci->flags & FRCT_DRF) /* New run. */
+ if (pci->flags & FRCT_DRF) { /* New run. */
rcv_cr->lwe = seqno;
- else
+ rcv_cr->rwe = seqno + RQ_SIZE;
+ } else {
goto drop_packet;
+ }
}
if (pci->flags & FRCT_ACK) {
@@ -493,6 +507,14 @@ static void __frcti_rcv(struct frcti * frcti,
}
}
+ if (pci->flags & FRCT_FC) {
+ uint32_t rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF));
+ if (before(rwe, snd_cr->lwe & 0x00FFFFFF))
+ snd_cr->rwe += 0x01000000;
+
+ snd_cr->rwe = (snd_cr->rwe & 0xFF000000) + rwe;
+ }
+
if (!(pci->flags & FRCT_DATA))
goto drop_packet;
@@ -502,7 +524,11 @@ static void __frcti_rcv(struct frcti * frcti,
}
if (rcv_cr->cflags & FRCTFRTX) {
- if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
+
+ if (!before(seqno, rcv_cr->rwe)) /* Out of window */
+ goto drop_packet;
+
+ if (!before(seqno, rcv_cr->lwe + RQ_SIZE))
goto drop_packet; /* Out of rq. */
if (frcti->rq[pos] != -1)