summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-10-11 14:42:16 +0200
committerSander Vrijders <sander@ouroboros.rocks>2020-10-11 14:47:52 +0200
commit3294170daecaf14a43e97d4e435d4ed0308444f8 (patch)
tree349075344cdc08681fa2491ab7bf0b4c32b730b7
parentda871be70c8039015edfe93d4581e3b9347ff882 (diff)
downloadouroboros-3294170daecaf14a43e97d4e435d4ed0308444f8.tar.gz
ouroboros-3294170daecaf14a43e97d4e435d4ed0308444f8.zip
lib: Add Rendez-Vous mechanism for flow control
This adds the rendez-vous mechanism to handle the case where the sending window is closed and window updates get lost. If the sending window is closed, the sender side will send an RDVS every DELT_RDV time (100ms), and give up after MAX_RDV time (1 second). Upon reception of a RDVS packet, a window update is sent immediately. We can make this much more configurable later on (build options for defaults, fccntl for runtime tuning). Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--src/lib/dev.c3
-rw-r--r--src/lib/frct.c156
-rw-r--r--src/lib/timerwheel.c2
3 files changed, 130 insertions, 31 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index ca004aa4..4b78c1db 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1058,8 +1058,7 @@ ssize_t flow_write(int fd,
return -EAGAIN;
idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);
} else {
- while((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {
-
+ while ((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {
if (ret != -ETIMEDOUT)
return ret;
diff --git a/src/lib/frct.c b/src/lib/frct.c
index c0ae6bbb..919e2617 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -20,6 +20,9 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
+#define DELT_RDV (100 * MILLION) /* ns */
+#define MAX_RDV (1 * BILLION) /* ns */
+
#define FRCT_PCILEN (sizeof(struct frct_pci))
struct frct_cr {
@@ -39,6 +42,7 @@ struct frcti {
time_t mpl;
time_t a;
time_t r;
+ time_t rdv;
time_t srtt; /* Smoothed rtt */
time_t mdev; /* Deviation */
@@ -54,8 +58,8 @@ struct frcti {
pthread_rwlock_t lock;
bool open; /* Window open/closed */
- size_t wnd; /* Window size */
struct timespec t_wnd; /* Window closed time */
+ struct timespec t_rdvs; /* Last rendez-vous sent */
pthread_cond_t cond;
pthread_mutex_t mtx;
};
@@ -92,8 +96,10 @@ static bool after(uint32_t seq1,
return (int32_t)(seq2 - seq1) < 0;
}
-static int __send_ack(int fd,
- int ackno)
+static void __send_frct_pkt(int fd,
+ uint8_t flags,
+ uint32_t ackno,
+ uint32_t rwe)
{
struct shm_du_buff * sdb;
struct frct_pci * pci;
@@ -107,12 +113,14 @@ static int __send_ack(int fd,
idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb);
#endif
if (idx < 0)
- return -ENOMEM;
+ return;
pci = (struct frct_pci *) shm_du_buff_head(sdb);
memset(pci, 0, sizeof(*pci));
- pci->flags = FRCT_ACK;
+ *((uint32_t *) pci) = hton32(rwe);
+
+ pci->flags = flags;
pci->ackno = hton32(ackno);
f = &ai.flows[fd];
@@ -122,19 +130,18 @@ static int __send_ack(int fd,
if (shm_rbuff_write(f->tx_rb, idx)) {
#endif
ipcp_sdb_release(sdb);
- return -ENOMEM;
+ return;
}
shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
- return 0;
}
-static void frct_send_ack(struct frcti * frcti)
+static void send_frct_pkt(struct frcti * frcti)
{
struct timespec now;
time_t diff;
uint32_t ackno;
+ uint32_t rwe;
int fd;
assert(frcti);
@@ -146,8 +153,9 @@ static void frct_send_ack(struct frcti * frcti)
return;
}
- ackno = frcti->rcv_cr.lwe;
fd = frcti->fd;
+ ackno = frcti->rcv_cr.lwe;
+ rwe = frcti->rcv_cr.rwe;
clock_gettime(PTHREAD_COND_CLOCK, &now);
@@ -158,8 +166,7 @@ static void frct_send_ack(struct frcti * frcti)
if (diff > frcti->a || diff < DELT_ACK)
return;
- if (__send_ack(fd, ackno) < 0)
- return;
+ __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);
pthread_rwlock_wrlock(&frcti->lock);
@@ -169,14 +176,42 @@ static void frct_send_ack(struct frcti * frcti)
pthread_rwlock_unlock(&frcti->lock);
}
+static void __send_rdv(int fd)
+{
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct flow * f;
+
+ /* Raw calls needed to bypass frcti. */
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memset(pci, 0, sizeof(*pci));
+
+ pci->flags = FRCT_RDVS;
+
+ f = &ai.flows[fd];
+
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ ipcp_sdb_release(sdb);
+ return;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+}
+
static struct frcti * frcti_create(int fd)
{
- struct frcti * frcti;
- ssize_t idx;
- struct timespec now;
- time_t mpl;
- time_t a;
- time_t r;
+ struct frcti * frcti;
+ ssize_t idx;
+ struct timespec now;
+ time_t mpl;
+ time_t a;
+ time_t r;
+ pthread_condattr_t cattr;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
@@ -190,7 +225,12 @@ static struct frcti * frcti_create(int fd)
if (pthread_mutex_init(&frcti->mtx, NULL))
goto fail_mutex;
- if (pthread_cond_init(&frcti->cond, NULL))
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&frcti->cond, &cattr))
goto fail_cond;
for (idx = 0; idx < RQ_SIZE; ++idx)
@@ -201,6 +241,7 @@ static struct frcti * frcti_create(int fd)
frcti->mpl = mpl = DELT_MPL;
frcti->a = a = DELT_A;
frcti->r = r = DELT_R;
+ frcti->rdv = DELT_RDV;
frcti->fd = fd;
@@ -229,6 +270,8 @@ static struct frcti * frcti_create(int fd)
return frcti;
fail_cond:
+ pthread_condattr_destroy(&cattr);
+fail_cattr:
pthread_mutex_destroy(&frcti->mtx);
fail_mutex:
pthread_rwlock_destroy(&frcti->lock);
@@ -311,11 +354,30 @@ static bool __frcti_is_window_open(struct frcti * frcti)
ret = before(snd_cr->seqno, snd_cr->rwe);
if (!ret) {
+ struct timespec now;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
pthread_mutex_lock(&frcti->mtx);
if (frcti->open) {
- clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd);
- frcti->open = false;
+ frcti->open = false;
+ frcti->t_wnd = now;
+ frcti->t_rdvs = now;
+ } else {
+ time_t diff;
+ diff = ts_diff_ns(&frcti->t_wnd, &now);
+ if (diff > MAX_RDV) {
+ pthread_mutex_unlock(&frcti->mtx);
+ return false;
+ }
+
+ diff = ts_diff_ns(&frcti->t_rdvs, &now);
+ if (diff > frcti->rdv) {
+ frcti->t_rdvs = now;
+ __send_rdv(frcti->fd);
+ }
}
+
pthread_mutex_unlock(&frcti->mtx);
}
@@ -338,12 +400,17 @@ static int __frcti_window_wait(struct frcti * frcti,
}
while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
+ struct timespec now;
+
pthread_rwlock_unlock(&frcti->lock);
pthread_mutex_lock(&frcti->mtx);
if (frcti->open) {
- clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd);
- frcti->open = false;
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ frcti->t_wnd = now;
+ frcti->t_rdvs = now;
+ frcti->open = false;
}
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
@@ -353,7 +420,27 @@ static int __frcti_window_wait(struct frcti * frcti,
&frcti->mtx,
abstime);
- pthread_cleanup_pop(true);
+ pthread_cleanup_pop(false);
+
+ if (ret == -ETIMEDOUT) {
+ time_t diff;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&frcti->t_wnd, &now);
+ if (diff > MAX_RDV) {
+ pthread_mutex_unlock(&frcti->mtx);
+ return -ECONNRESET; /* write fails! */
+ }
+
+ diff = ts_diff_ns(&frcti->t_rdvs, &now);
+ if (diff > frcti->rdv) {
+ frcti->t_rdvs = now;
+ __send_rdv(frcti->fd);
+ }
+ }
+
+ pthread_mutex_unlock(&frcti->mtx);
pthread_rwlock_rdlock(&frcti->lock);
}
@@ -436,7 +523,7 @@ static time_t __frcti_dealloc(struct frcti * frcti)
pthread_rwlock_unlock(&frcti->lock);
if (fd != -1)
- __send_ack(fd, ackno);
+ __send_frct_pkt(fd, FRCT_ACK, ackno, 0);
return wait;
}
@@ -557,6 +644,7 @@ static void __frcti_rcv(struct frcti * frcti,
struct frct_cr * snd_cr;
uint32_t seqno;
uint32_t ackno;
+ uint32_t rwe;
int fd = -1;
assert(frcti);
@@ -583,6 +671,18 @@ static void __frcti_rcv(struct frcti * frcti,
}
}
+ /* For now, just send an immediate window update. */
+ if (pci->flags & FRCT_RDVS) {
+ fd = frcti->fd;
+ rwe = rcv_cr->rwe;
+ pthread_rwlock_unlock(&frcti->lock);
+
+ __send_frct_pkt(fd, FRCT_FC, 0, rwe);
+
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return;
+ }
+
if (pci->flags & FRCT_ACK) {
ackno = ntoh32(pci->ackno);
if (after(ackno, frcti->snd_cr.lwe))
@@ -618,13 +718,13 @@ static void __frcti_rcv(struct frcti * frcti,
goto drop_packet;
if (before(seqno, rcv_cr->lwe)) {
- rcv_cr->seqno = seqno;
+ rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */
goto drop_packet;
}
if (rcv_cr->cflags & FRCTFRTX) {
- if (!before(seqno, rcv_cr->rwe)) /* Out of window */
+ if (!before(seqno, rcv_cr->rwe)) /* Out of window. */
goto drop_packet;
if (!before(seqno, rcv_cr->lwe + RQ_SIZE))
@@ -652,7 +752,7 @@ static void __frcti_rcv(struct frcti * frcti,
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
- frct_send_ack(frcti);
+ send_frct_pkt(frcti);
shm_rdrbuff_remove(ai.rdrb, idx);
return;
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 4443832d..cd3074cb 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -294,7 +294,7 @@ static void timerwheel_move(void)
rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
if (f->flow_id == a->flow_id && f->frcti != NULL)
- frct_send_ack(a->frcti);
+ send_frct_pkt(a->frcti);
free(a);