diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2020-10-10 15:34:27 +0200 |
---|---|---|
committer | Sander Vrijders <sander@ouroboros.rocks> | 2020-10-11 14:25:18 +0200 |
commit | da871be70c8039015edfe93d4581e3b9347ff882 (patch) | |
tree | 5e42185b84bb6dfcb1d35abb91c79914680437f9 | |
parent | ec242f45e6980fb5b0139d3429a88795b82f0c13 (diff) | |
download | ouroboros-da871be70c8039015edfe93d4581e3b9347ff882.tar.gz ouroboros-da871be70c8039015edfe93d4581e3b9347ff882.zip |
lib: Block on closed flow control window
If the sending window for flow control is closed, the sending
application will now block until the window opens. Beware that until
the rendez-vous mechanism is implemented, shutting down a server while
the client is sending (with non-timed-out blocking write) will cause
the client to hang indefinitely because its window will close.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r-- | src/lib/dev.c | 38 | ||||
-rw-r--r-- | src/lib/frct.c | 109 |
2 files changed, 129 insertions, 18 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 3bc060bf..ca004aa4 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1017,6 +1017,8 @@ ssize_t flow_write(int fd, int flags; struct timespec abs; struct timespec * abstime = NULL; + struct timespec tic = {0, TICTIME}; + struct timespec tictime; struct shm_du_buff * sdb; uint8_t * ptr; @@ -1037,6 +1039,8 @@ ssize_t flow_write(int fd, return -ENOTALLOC; } + ts_add(&tic, &abs, &tictime); + if (ai.flows[fd].snd_timesout) { ts_add(&abs, &flow->snd_timeo, &abs); abstime = &abs; @@ -1049,18 +1053,26 @@ ssize_t flow_write(int fd, if ((flags & FLOWFACCMODE) == FLOWFRDONLY) return -EPERM; - /* TODO: partial writes. */ - if (flags & FLOWFWNOBLOCK) - idx = shm_rdrbuff_alloc(ai.rdrb, - count, - &ptr, - &sdb); - else /* Blocking. */ - idx = shm_rdrbuff_alloc_b(ai.rdrb, - count, - &ptr, - &sdb, - abstime); + if (flags & FLOWFWNOBLOCK) { + if (!frcti_is_window_open(flow->frcti)) + return -EAGAIN; + idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb); + } else { + while((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) { + + if (ret != -ETIMEDOUT) + return ret; + + if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0) + return -ETIMEDOUT; + + frcti_tick(flow->frcti); + + ts_add(&tictime, &tic, &tictime); + } + idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime); + } + if (idx < 0) return idx; @@ -1160,7 +1172,7 @@ ssize_t flow_read(int fd, return idx; if (abstime != NULL - && ts_diff_ns(&tictime, &abs) < 0) + && ts_diff_ns(&tictime, &abs) <= 0) return -ETIMEDOUT; ts_add(&tictime, &tic, &tictime); diff --git a/src/lib/frct.c b/src/lib/frct.c index 7f2d8fd3..c0ae6bbb 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -52,6 +52,12 @@ struct frcti { ssize_t rq[RQ_SIZE]; pthread_rwlock_t lock; + + bool open; /* Window open/closed */ + size_t wnd; /* Window size */ + struct timespec t_wnd; /* Window closed time */ + pthread_cond_t cond; + pthread_mutex_t mtx; }; enum frct_flags { @@ -181,6 +187,12 @@ static struct frcti * frcti_create(int fd) if (pthread_rwlock_init(&frcti->lock, NULL)) goto fail_lock; + if (pthread_mutex_init(&frcti->mtx, NULL)) + goto fail_mutex; + + if (pthread_cond_init(&frcti->cond, NULL)) + goto fail_cond; + for (idx = 0; idx < RQ_SIZE; ++idx) frcti->rq[idx] = -1; @@ -216,6 +228,10 @@ static struct frcti * frcti_create(int fd) return frcti; + fail_cond: + pthread_mutex_destroy(&frcti->mtx); + fail_mutex: + pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); fail_malloc: @@ -224,6 +240,8 @@ static struct frcti * frcti_create(int fd) static void frcti_destroy(struct frcti * frcti) { + pthread_cond_destroy(&frcti->cond); + pthread_mutex_destroy(&frcti->mtx); pthread_rwlock_destroy(&frcti->lock); free(frcti); @@ -275,6 +293,75 @@ static void frcti_setflags(struct frcti * frcti, #define frcti_dealloc(frcti) \ (frcti == NULL ? 0 : __frcti_dealloc(frcti)) +#define frcti_is_window_open(frcti) \ + (frcti == NULL ? true : __frcti_is_window_open(frcti)) + +#define frcti_window_wait(frcti, abstime) \ + (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime)) + + +static bool __frcti_is_window_open(struct frcti * frcti) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + int ret = true; + + pthread_rwlock_rdlock(&frcti->lock); + + if (snd_cr->cflags & FRCTFRESCNTL) + ret = before(snd_cr->seqno, snd_cr->rwe); + + if (!ret) { + pthread_mutex_lock(&frcti->mtx); + if (frcti->open) { + clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd); + frcti->open = false; + } + pthread_mutex_unlock(&frcti->mtx); + } + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} + +static int __frcti_window_wait(struct frcti * frcti, + struct timespec * abstime) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + int ret = 0; + + pthread_rwlock_rdlock(&frcti->lock); + + if (!(snd_cr->cflags & FRCTFRESCNTL)) { + pthread_rwlock_unlock(&frcti->lock); + return 0; + } + + while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { + pthread_rwlock_unlock(&frcti->lock); + pthread_mutex_lock(&frcti->mtx); + + if (frcti->open) { + clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd); + frcti->open = false; + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) &frcti->mtx); + + ret = -pthread_cond_timedwait(&frcti->cond, + &frcti->mtx, + abstime); + + pthread_cleanup_pop(true); + pthread_rwlock_rdlock(&frcti->lock); + } + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} + static ssize_t __frcti_queued_pdu(struct frcti * frcti) { ssize_t idx; @@ -373,7 +460,7 @@ static int __frcti_snd(struct frcti * frcti, pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci == NULL) - return -1; + return -ENOMEM; memset(pci, 0, sizeof(*pci)); @@ -508,11 +595,23 @@ static void __frcti_rcv(struct frcti * frcti, } if (pci->flags & FRCT_FC) { - uint32_t rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); - if (before(rwe, snd_cr->lwe & 0x00FFFFFF)) - snd_cr->rwe += 0x01000000; + uint32_t rwe; + + rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); + rwe |= snd_cr->rwe & 0xFF000000; + + /* Rollover for 24 bit */ + if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF) + rwe += 0x01000000; - snd_cr->rwe = (snd_cr->rwe & 0xFF000000) + rwe; + snd_cr->rwe = rwe; + + pthread_mutex_lock(&frcti->mtx); + if (!frcti->open) { + frcti->open = true; + pthread_cond_broadcast(&frcti->cond); + } + pthread_mutex_unlock(&frcti->mtx); } if (!(pci->flags & FRCT_DATA)) |