diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2020-10-10 15:34:27 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2020-10-11 14:25:18 +0200 | 
| commit | da871be70c8039015edfe93d4581e3b9347ff882 (patch) | |
| tree | 5e42185b84bb6dfcb1d35abb91c79914680437f9 /src/lib | |
| parent | ec242f45e6980fb5b0139d3429a88795b82f0c13 (diff) | |
| download | ouroboros-da871be70c8039015edfe93d4581e3b9347ff882.tar.gz ouroboros-da871be70c8039015edfe93d4581e3b9347ff882.zip | |
lib: Block on closed flow control window
If the sending window for flow control is closed, the sending
application will now block until the window opens. Beware that until
the rendez-vous mechanism is implemented, shutting down a server while
the client is sending (with non-timed-out blocking write) will cause
the client to hang indefinitely because its window will close.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 38 | ||||
| -rw-r--r-- | src/lib/frct.c | 109 | 
2 files changed, 129 insertions, 18 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index 3bc060bf..ca004aa4 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1017,6 +1017,8 @@ ssize_t flow_write(int          fd,          int                  flags;          struct timespec      abs;          struct timespec *    abstime = NULL; +        struct timespec      tic = {0, TICTIME}; +        struct timespec      tictime;          struct shm_du_buff * sdb;          uint8_t *            ptr; @@ -1037,6 +1039,8 @@ ssize_t flow_write(int          fd,                  return -ENOTALLOC;          } +        ts_add(&tic, &abs, &tictime); +          if (ai.flows[fd].snd_timesout) {                  ts_add(&abs, &flow->snd_timeo, &abs);                  abstime = &abs; @@ -1049,18 +1053,26 @@ ssize_t flow_write(int          fd,          if ((flags & FLOWFACCMODE) == FLOWFRDONLY)                  return -EPERM; -        /* TODO: partial writes. */ -        if (flags & FLOWFWNOBLOCK) -                idx = shm_rdrbuff_alloc(ai.rdrb, -                                        count, -                                        &ptr, -                                        &sdb); -        else  /* Blocking. */ -                idx = shm_rdrbuff_alloc_b(ai.rdrb, -                                          count, -                                          &ptr, -                                          &sdb, -                                          abstime); +        if (flags & FLOWFWNOBLOCK) { +                if (!frcti_is_window_open(flow->frcti)) +                        return -EAGAIN; +                idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb); +        } else { +                while((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) { + +                        if (ret != -ETIMEDOUT) +                                return ret; + +                        if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0) +                                return -ETIMEDOUT; + +                        frcti_tick(flow->frcti); + +                        ts_add(&tictime, &tic, &tictime); +                } +                idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime); +        } +          if (idx < 0)                  return idx; @@ -1160,7 +1172,7 @@ ssize_t flow_read(int    fd,                                          return idx;                                  if (abstime != NULL -                                    && ts_diff_ns(&tictime, &abs) < 0) +                                    && ts_diff_ns(&tictime, &abs) <= 0)                                          return -ETIMEDOUT;                                  ts_add(&tictime, &tic, &tictime); diff --git a/src/lib/frct.c b/src/lib/frct.c index 7f2d8fd3..c0ae6bbb 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -52,6 +52,12 @@ struct frcti {          ssize_t           rq[RQ_SIZE];          pthread_rwlock_t  lock; + +        bool              open;        /* Window open/closed     */ +        size_t            wnd;         /* Window size            */ +        struct timespec   t_wnd;       /* Window closed time     */ +        pthread_cond_t    cond; +        pthread_mutex_t   mtx;  };  enum frct_flags { @@ -181,6 +187,12 @@ static struct frcti * frcti_create(int fd)          if (pthread_rwlock_init(&frcti->lock, NULL))                  goto fail_lock; +        if (pthread_mutex_init(&frcti->mtx, NULL)) +                goto fail_mutex; + +        if (pthread_cond_init(&frcti->cond, NULL)) +                goto fail_cond; +          for (idx = 0; idx < RQ_SIZE; ++idx)                  frcti->rq[idx] = -1; @@ -216,6 +228,10 @@ static struct frcti * frcti_create(int fd)          return frcti; + fail_cond: +        pthread_mutex_destroy(&frcti->mtx); + fail_mutex: +        pthread_rwlock_destroy(&frcti->lock);   fail_lock:          free(frcti);   fail_malloc: @@ -224,6 +240,8 @@ static struct frcti * frcti_create(int fd)  static void frcti_destroy(struct frcti * frcti)  { +        pthread_cond_destroy(&frcti->cond); +        pthread_mutex_destroy(&frcti->mtx);          pthread_rwlock_destroy(&frcti->lock);          free(frcti); @@ -275,6 +293,75 @@ static void frcti_setflags(struct frcti * frcti,  #define frcti_dealloc(frcti)                            \          (frcti == NULL ? 0 : __frcti_dealloc(frcti)) +#define frcti_is_window_open(frcti)                     \ +        (frcti == NULL ? true : __frcti_is_window_open(frcti)) + +#define frcti_window_wait(frcti, abstime)               \ +        (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime)) + + +static bool __frcti_is_window_open(struct frcti * frcti) +{ +        struct frct_cr * snd_cr = &frcti->snd_cr; +        int ret                 = true; + +        pthread_rwlock_rdlock(&frcti->lock); + +        if (snd_cr->cflags & FRCTFRESCNTL) +                ret = before(snd_cr->seqno, snd_cr->rwe); + +        if (!ret) { +                pthread_mutex_lock(&frcti->mtx); +                if (frcti->open) { +                        clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd); +                        frcti->open = false; +                } +                pthread_mutex_unlock(&frcti->mtx); +        } + +        pthread_rwlock_unlock(&frcti->lock); + +        return ret; +} + +static int __frcti_window_wait(struct frcti *    frcti, +                               struct timespec * abstime) +{ +        struct frct_cr * snd_cr = &frcti->snd_cr; +        int ret                 = 0; + +        pthread_rwlock_rdlock(&frcti->lock); + +        if (!(snd_cr->cflags & FRCTFRESCNTL)) { +                pthread_rwlock_unlock(&frcti->lock); +                return 0; +        } + +        while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { +                pthread_rwlock_unlock(&frcti->lock); +                pthread_mutex_lock(&frcti->mtx); + +                if (frcti->open) { +                        clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd); +                        frcti->open = false; +                } + +                pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                                     (void *) &frcti->mtx); + +                ret = -pthread_cond_timedwait(&frcti->cond, +                                              &frcti->mtx, +                                              abstime); + +                pthread_cleanup_pop(true); +                pthread_rwlock_rdlock(&frcti->lock); +        } + +        pthread_rwlock_unlock(&frcti->lock); + +        return ret; +} +  static ssize_t __frcti_queued_pdu(struct frcti * frcti)  {          ssize_t idx; @@ -373,7 +460,7 @@ static int __frcti_snd(struct frcti *       frcti,          pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);          if (pci == NULL) -                return -1; +                return -ENOMEM;          memset(pci, 0, sizeof(*pci)); @@ -508,11 +595,23 @@ static void __frcti_rcv(struct frcti *       frcti,          }          if (pci->flags & FRCT_FC) { -                uint32_t rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); -                if (before(rwe, snd_cr->lwe & 0x00FFFFFF)) -                        snd_cr->rwe += 0x01000000; +                uint32_t rwe; + +                rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); +                rwe |= snd_cr->rwe & 0xFF000000; + +                /* Rollover for 24 bit */ +                if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF) +                        rwe += 0x01000000; -                snd_cr->rwe = (snd_cr->rwe & 0xFF000000) + rwe; +                snd_cr->rwe = rwe; + +                pthread_mutex_lock(&frcti->mtx); +                if (!frcti->open) { +                        frcti->open = true; +                        pthread_cond_broadcast(&frcti->cond); +                } +                pthread_mutex_unlock(&frcti->mtx);          }          if (!(pci->flags & FRCT_DATA)) | 
