diff options
| -rw-r--r-- | src/lib/dev.c | 3 | ||||
| -rw-r--r-- | src/lib/frct.c | 156 | ||||
| -rw-r--r-- | src/lib/timerwheel.c | 2 | 
3 files changed, 130 insertions, 31 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index ca004aa4..4b78c1db 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1058,8 +1058,7 @@ ssize_t flow_write(int          fd,                          return -EAGAIN;                  idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);          } else { -                while((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) { - +                while ((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {                          if (ret != -ETIMEDOUT)                                  return ret; diff --git a/src/lib/frct.c b/src/lib/frct.c index c0ae6bbb..919e2617 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -20,6 +20,9 @@   * Foundation, Inc., http://www.fsf.org/about/contact/.   */ +#define DELT_RDV      (100 * MILLION) /* ns */ +#define MAX_RDV         (1 * BILLION) /* ns */ +  #define FRCT_PCILEN    (sizeof(struct frct_pci))  struct frct_cr { @@ -39,6 +42,7 @@ struct frcti {          time_t            mpl;          time_t            a;          time_t            r; +        time_t            rdv;          time_t            srtt;        /* Smoothed rtt           */          time_t            mdev;        /* Deviation              */ @@ -54,8 +58,8 @@ struct frcti {          pthread_rwlock_t  lock;          bool              open;        /* Window open/closed     */ -        size_t            wnd;         /* Window size            */          struct timespec   t_wnd;       /* Window closed time     */ +        struct timespec   t_rdvs;      /* Last rendez-vous sent  */          pthread_cond_t    cond;          pthread_mutex_t   mtx;  }; @@ -92,8 +96,10 @@ static bool after(uint32_t seq1,          return (int32_t)(seq2 - seq1) < 0;  } -static int __send_ack(int fd, -                       int ackno) +static void __send_frct_pkt(int      fd, +                            uint8_t  flags, +                            uint32_t ackno, +                            uint32_t rwe)  {          struct shm_du_buff * sdb;          struct frct_pci *    pci; @@ -107,12 +113,14 @@ static int __send_ack(int fd,          idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb);  #endif          if (idx < 0) -                return -ENOMEM; +                return;          pci = (struct frct_pci *) shm_du_buff_head(sdb);          memset(pci, 0, sizeof(*pci)); -        pci->flags = FRCT_ACK; +        *((uint32_t *) pci) = hton32(rwe); + +        pci->flags = flags;          pci->ackno = hton32(ackno);          f = &ai.flows[fd]; @@ -122,19 +130,18 @@ static int __send_ack(int fd,          if (shm_rbuff_write(f->tx_rb, idx)) {  #endif                  ipcp_sdb_release(sdb); -                return -ENOMEM; +                return;          }          shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - -        return 0;  } -static void frct_send_ack(struct frcti * frcti) +static void send_frct_pkt(struct frcti * frcti)  {          struct timespec      now;          time_t               diff;          uint32_t             ackno; +        uint32_t             rwe;          int                  fd;          assert(frcti); @@ -146,8 +153,9 @@ static void frct_send_ack(struct frcti * frcti)                  return;          } -        ackno = frcti->rcv_cr.lwe;          fd    = frcti->fd; +        ackno = frcti->rcv_cr.lwe; +        rwe   = frcti->rcv_cr.rwe;          clock_gettime(PTHREAD_COND_CLOCK, &now); @@ -158,8 +166,7 @@ static void frct_send_ack(struct frcti * frcti)          if (diff > frcti->a || diff < DELT_ACK)                  return; -        if (__send_ack(fd, ackno) < 0) -                return; +        __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);          pthread_rwlock_wrlock(&frcti->lock); @@ -169,14 +176,42 @@ static void frct_send_ack(struct frcti * frcti)          pthread_rwlock_unlock(&frcti->lock);  } +static void __send_rdv(int fd) +{ +        struct shm_du_buff * sdb; +        struct frct_pci *    pci; +        ssize_t              idx; +        struct flow *        f; + +        /* Raw calls needed to bypass frcti. */ +        idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); +        if (idx < 0) +                return; + +        pci = (struct frct_pci *) shm_du_buff_head(sdb); +        memset(pci, 0, sizeof(*pci)); + +        pci->flags = FRCT_RDVS; + +        f = &ai.flows[fd]; + +        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { +                ipcp_sdb_release(sdb); +                return; +        } + +        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); +} +  static struct frcti * frcti_create(int fd)  { -        struct frcti *  frcti; -        ssize_t         idx; -        struct timespec now; -        time_t          mpl; -        time_t          a; -        time_t          r; +        struct frcti *     frcti; +        ssize_t            idx; +        struct timespec    now; +        time_t             mpl; +        time_t             a; +        time_t             r; +        pthread_condattr_t cattr;          frcti = malloc(sizeof(*frcti));          if (frcti == NULL) @@ -190,7 +225,12 @@ static struct frcti * frcti_create(int fd)          if (pthread_mutex_init(&frcti->mtx, NULL))                  goto fail_mutex; -        if (pthread_cond_init(&frcti->cond, NULL)) +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&frcti->cond, &cattr))                  goto fail_cond;          for (idx = 0; idx < RQ_SIZE; ++idx) @@ -201,6 +241,7 @@ static struct frcti * frcti_create(int fd)          frcti->mpl = mpl = DELT_MPL;          frcti->a   = a   = DELT_A;          frcti->r   = r   = DELT_R; +        frcti->rdv = DELT_RDV;          frcti->fd  = fd; @@ -229,6 +270,8 @@ static struct frcti * frcti_create(int fd)          return frcti;   fail_cond: +        pthread_condattr_destroy(&cattr); +fail_cattr:          pthread_mutex_destroy(&frcti->mtx);   fail_mutex:          pthread_rwlock_destroy(&frcti->lock); @@ -311,11 +354,30 @@ static bool __frcti_is_window_open(struct frcti * frcti)                  ret = before(snd_cr->seqno, snd_cr->rwe);          if (!ret) { +                 struct timespec now; + +                clock_gettime(PTHREAD_COND_CLOCK, &now); +                  pthread_mutex_lock(&frcti->mtx);                  if (frcti->open) { -                        clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd); -                        frcti->open = false; +                        frcti->open   = false; +                        frcti->t_wnd  = now; +                        frcti->t_rdvs = now; +                } else { +                        time_t diff; +                        diff = ts_diff_ns(&frcti->t_wnd, &now); +                        if (diff > MAX_RDV) { +                                pthread_mutex_unlock(&frcti->mtx); +                                return false; +                        } + +                        diff = ts_diff_ns(&frcti->t_rdvs, &now); +                        if  (diff > frcti->rdv) { +                                frcti->t_rdvs = now; +                                __send_rdv(frcti->fd); +                        }                  } +                  pthread_mutex_unlock(&frcti->mtx);          } @@ -338,12 +400,17 @@ static int __frcti_window_wait(struct frcti *    frcti,          }          while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { +                struct timespec now; +                  pthread_rwlock_unlock(&frcti->lock);                  pthread_mutex_lock(&frcti->mtx);                  if (frcti->open) { -                        clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd); -                        frcti->open = false; +                        clock_gettime(PTHREAD_COND_CLOCK, &now); + +                        frcti->t_wnd  = now; +                        frcti->t_rdvs = now; +                        frcti->open   = false;                  }                  pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, @@ -353,7 +420,27 @@ static int __frcti_window_wait(struct frcti *    frcti,                                                &frcti->mtx,                                                abstime); -                pthread_cleanup_pop(true); +                pthread_cleanup_pop(false); + +                if (ret == -ETIMEDOUT) { +                        time_t diff; + +                        clock_gettime(PTHREAD_COND_CLOCK, &now); + +                        diff = ts_diff_ns(&frcti->t_wnd, &now); +                        if (diff > MAX_RDV) { +                                pthread_mutex_unlock(&frcti->mtx); +                                return -ECONNRESET; /* write fails! */ +                        } + +                        diff = ts_diff_ns(&frcti->t_rdvs, &now); +                        if  (diff > frcti->rdv) { +                                frcti->t_rdvs = now; +                                __send_rdv(frcti->fd); +                        } +                } + +                pthread_mutex_unlock(&frcti->mtx);                  pthread_rwlock_rdlock(&frcti->lock);          } @@ -436,7 +523,7 @@ static time_t __frcti_dealloc(struct frcti * frcti)          pthread_rwlock_unlock(&frcti->lock);          if (fd != -1) -                __send_ack(fd, ackno); +                __send_frct_pkt(fd, FRCT_ACK, ackno, 0);          return wait;  } @@ -557,6 +644,7 @@ static void __frcti_rcv(struct frcti *       frcti,          struct frct_cr *  snd_cr;          uint32_t          seqno;          uint32_t          ackno; +        uint32_t          rwe;          int               fd = -1;          assert(frcti); @@ -583,6 +671,18 @@ static void __frcti_rcv(struct frcti *       frcti,                  }          } +        /* For now, just send an immediate window update. */ +        if (pci->flags & FRCT_RDVS) { +                fd = frcti->fd; +                rwe = rcv_cr->rwe; +                pthread_rwlock_unlock(&frcti->lock); + +                __send_frct_pkt(fd, FRCT_FC, 0, rwe); + +                shm_rdrbuff_remove(ai.rdrb, idx); +                return; +        } +          if (pci->flags & FRCT_ACK) {                  ackno = ntoh32(pci->ackno);                  if (after(ackno, frcti->snd_cr.lwe)) @@ -618,13 +718,13 @@ static void __frcti_rcv(struct frcti *       frcti,                  goto drop_packet;          if (before(seqno, rcv_cr->lwe)) { -                rcv_cr->seqno = seqno; +                rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */                  goto drop_packet;          }          if (rcv_cr->cflags & FRCTFRTX) { -                if (!before(seqno, rcv_cr->rwe)) /* Out of window */ +                if (!before(seqno, rcv_cr->rwe)) /* Out of window. */                          goto drop_packet;                  if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) @@ -652,7 +752,7 @@ static void __frcti_rcv(struct frcti *       frcti,   drop_packet:          pthread_rwlock_unlock(&frcti->lock); -        frct_send_ack(frcti); +        send_frct_pkt(frcti);          shm_rdrbuff_remove(ai.rdrb, idx);          return; diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 4443832d..cd3074cb 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -294,7 +294,7 @@ static void timerwheel_move(void)                          rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;                          if (f->flow_id == a->flow_id && f->frcti != NULL) -                                frct_send_ack(a->frcti); +                                send_frct_pkt(a->frcti);                          free(a);  | 
