summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-10-10 15:34:27 +0200
committerSander Vrijders <sander@ouroboros.rocks>2020-10-11 14:25:18 +0200
commitda871be70c8039015edfe93d4581e3b9347ff882 (patch)
tree5e42185b84bb6dfcb1d35abb91c79914680437f9
parentec242f45e6980fb5b0139d3429a88795b82f0c13 (diff)
downloadouroboros-da871be70c8039015edfe93d4581e3b9347ff882.tar.gz
ouroboros-da871be70c8039015edfe93d4581e3b9347ff882.zip
lib: Block on closed flow control window
If the sending window for flow control is closed, the sending application will now block until the window opens. Beware that until the rendez-vous mechanism is implemented, shutting down a server while the client is sending (with non-timed-out blocking write) will cause the client to hang indefinitely because its window will close. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--src/lib/dev.c38
-rw-r--r--src/lib/frct.c109
2 files changed, 129 insertions, 18 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 3bc060bf..ca004aa4 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1017,6 +1017,8 @@ ssize_t flow_write(int fd,
int flags;
struct timespec abs;
struct timespec * abstime = NULL;
+ struct timespec tic = {0, TICTIME};
+ struct timespec tictime;
struct shm_du_buff * sdb;
uint8_t * ptr;
@@ -1037,6 +1039,8 @@ ssize_t flow_write(int fd,
return -ENOTALLOC;
}
+ ts_add(&tic, &abs, &tictime);
+
if (ai.flows[fd].snd_timesout) {
ts_add(&abs, &flow->snd_timeo, &abs);
abstime = &abs;
@@ -1049,18 +1053,26 @@ ssize_t flow_write(int fd,
if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
return -EPERM;
- /* TODO: partial writes. */
- if (flags & FLOWFWNOBLOCK)
- idx = shm_rdrbuff_alloc(ai.rdrb,
- count,
- &ptr,
- &sdb);
- else /* Blocking. */
- idx = shm_rdrbuff_alloc_b(ai.rdrb,
- count,
- &ptr,
- &sdb,
- abstime);
+ if (flags & FLOWFWNOBLOCK) {
+ if (!frcti_is_window_open(flow->frcti))
+ return -EAGAIN;
+ idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);
+ } else {
+ while((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {
+
+ if (ret != -ETIMEDOUT)
+ return ret;
+
+ if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0)
+ return -ETIMEDOUT;
+
+ frcti_tick(flow->frcti);
+
+ ts_add(&tictime, &tic, &tictime);
+ }
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);
+ }
+
if (idx < 0)
return idx;
@@ -1160,7 +1172,7 @@ ssize_t flow_read(int fd,
return idx;
if (abstime != NULL
- && ts_diff_ns(&tictime, &abs) < 0)
+ && ts_diff_ns(&tictime, &abs) <= 0)
return -ETIMEDOUT;
ts_add(&tictime, &tic, &tictime);
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 7f2d8fd3..c0ae6bbb 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -52,6 +52,12 @@ struct frcti {
ssize_t rq[RQ_SIZE];
pthread_rwlock_t lock;
+
+ bool open; /* Window open/closed */
+ size_t wnd; /* Window size */
+ struct timespec t_wnd; /* Window closed time */
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
};
enum frct_flags {
@@ -181,6 +187,12 @@ static struct frcti * frcti_create(int fd)
if (pthread_rwlock_init(&frcti->lock, NULL))
goto fail_lock;
+ if (pthread_mutex_init(&frcti->mtx, NULL))
+ goto fail_mutex;
+
+ if (pthread_cond_init(&frcti->cond, NULL))
+ goto fail_cond;
+
for (idx = 0; idx < RQ_SIZE; ++idx)
frcti->rq[idx] = -1;
@@ -216,6 +228,10 @@ static struct frcti * frcti_create(int fd)
return frcti;
+ fail_cond:
+ pthread_mutex_destroy(&frcti->mtx);
+ fail_mutex:
+ pthread_rwlock_destroy(&frcti->lock);
fail_lock:
free(frcti);
fail_malloc:
@@ -224,6 +240,8 @@ static struct frcti * frcti_create(int fd)
static void frcti_destroy(struct frcti * frcti)
{
+ pthread_cond_destroy(&frcti->cond);
+ pthread_mutex_destroy(&frcti->mtx);
pthread_rwlock_destroy(&frcti->lock);
free(frcti);
@@ -275,6 +293,75 @@ static void frcti_setflags(struct frcti * frcti,
#define frcti_dealloc(frcti) \
(frcti == NULL ? 0 : __frcti_dealloc(frcti))
+#define frcti_is_window_open(frcti) \
+ (frcti == NULL ? true : __frcti_is_window_open(frcti))
+
+#define frcti_window_wait(frcti, abstime) \
+ (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime))
+
+
+static bool __frcti_is_window_open(struct frcti * frcti)
+{
+ struct frct_cr * snd_cr = &frcti->snd_cr;
+ int ret = true;
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ if (snd_cr->cflags & FRCTFRESCNTL)
+ ret = before(snd_cr->seqno, snd_cr->rwe);
+
+ if (!ret) {
+ pthread_mutex_lock(&frcti->mtx);
+ if (frcti->open) {
+ clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd);
+ frcti->open = false;
+ }
+ pthread_mutex_unlock(&frcti->mtx);
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return ret;
+}
+
+static int __frcti_window_wait(struct frcti * frcti,
+ struct timespec * abstime)
+{
+ struct frct_cr * snd_cr = &frcti->snd_cr;
+ int ret = 0;
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ if (!(snd_cr->cflags & FRCTFRESCNTL)) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return 0;
+ }
+
+ while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
+ pthread_rwlock_unlock(&frcti->lock);
+ pthread_mutex_lock(&frcti->mtx);
+
+ if (frcti->open) {
+ clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd);
+ frcti->open = false;
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) &frcti->mtx);
+
+ ret = -pthread_cond_timedwait(&frcti->cond,
+ &frcti->mtx,
+ abstime);
+
+ pthread_cleanup_pop(true);
+ pthread_rwlock_rdlock(&frcti->lock);
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return ret;
+}
+
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
ssize_t idx;
@@ -373,7 +460,7 @@ static int __frcti_snd(struct frcti * frcti,
pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
if (pci == NULL)
- return -1;
+ return -ENOMEM;
memset(pci, 0, sizeof(*pci));
@@ -508,11 +595,23 @@ static void __frcti_rcv(struct frcti * frcti,
}
if (pci->flags & FRCT_FC) {
- uint32_t rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF));
- if (before(rwe, snd_cr->lwe & 0x00FFFFFF))
- snd_cr->rwe += 0x01000000;
+ uint32_t rwe;
+
+ rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF));
+ rwe |= snd_cr->rwe & 0xFF000000;
+
+ /* Rollover for 24 bit */
+ if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF)
+ rwe += 0x01000000;
- snd_cr->rwe = (snd_cr->rwe & 0xFF000000) + rwe;
+ snd_cr->rwe = rwe;
+
+ pthread_mutex_lock(&frcti->mtx);
+ if (!frcti->open) {
+ frcti->open = true;
+ pthread_cond_broadcast(&frcti->cond);
+ }
+ pthread_mutex_unlock(&frcti->mtx);
}
if (!(pci->flags & FRCT_DATA))