diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 21 | ||||
| -rw-r--r-- | src/lib/frct.c | 86 | ||||
| -rw-r--r-- | src/lib/rxmwheel.c | 159 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_pthr.c | 11 | 
4 files changed, 149 insertions, 128 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index 80d7e9ad..e8989a48 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -429,9 +429,6 @@ static void init(int     argc,          if (pthread_rwlock_init(&ai.lock, NULL))                  goto fail_lock; -        if (rxmwheel_init()) -                goto fail_rxmwheel; -          ai.fqset = shm_flow_set_open(getpid());          if (ai.fqset == NULL)                  goto fail_fqset; @@ -439,8 +436,6 @@ static void init(int     argc,          return;   fail_fqset: -        rxmwheel_fini(); - fail_rxmwheel:          pthread_rwlock_destroy(&ai.lock);   fail_lock:          for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -474,8 +469,6 @@ static void fini(void)          if (ai.fds == NULL)                  return; -        rxmwheel_fini(); -          if (ai.prog != NULL)                  free(ai.prog); @@ -1080,15 +1073,16 @@ ssize_t flow_read(int    fd,          flow = &ai.flows[fd]; +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        pthread_rwlock_rdlock(&ai.lock); +          if (flow->part_idx == DONE_PART) { +                pthread_rwlock_unlock(&ai.lock);                  flow->part_idx = NO_PART;                  return 0;          } -        clock_gettime(PTHREAD_COND_CLOCK, &abs); - -        pthread_rwlock_rdlock(&ai.lock); -          if (flow->flow_id < 0) {                  pthread_rwlock_unlock(&ai.lock);                  return -ENOTALLOC; @@ -1141,8 +1135,13 @@ ssize_t flow_read(int    fd,          if (n <= (ssize_t) count) {                  memcpy(buf, packet, n);                  shm_rdrbuff_remove(ai.rdrb, idx); + +                pthread_rwlock_wrlock(&ai.lock); +                  flow->part_idx = (partrd && n == (ssize_t) count) ?                          DONE_PART : NO_PART; + +                pthread_rwlock_unlock(&ai.lock);                  return n;          } else {                  if (partrd) { diff --git a/src/lib/frct.c b/src/lib/frct.c index 0e9d64c7..3c180128 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,15 +21,12 @@   */  /* Default Delta-t parameters */ -#define DELT_MPL       60000  /* ms */ -#define DELT_A         3000   /* ms */ -#define DELT_R         20000  /* ms */ +#define DELT_MPL       (60 * MILLION) /* us */ +#define DELT_A          (1 * MILLION) /* us */ +#define DELT_R         (20 * MILLION) /* us */  #define RQ_SIZE        1024 -#define TW_ELEMENTS    6000 -#define TW_RESOLUTION  1     /* ms */ -  #define FRCT_PCILEN    (sizeof(struct frct_pci))  struct frct_cr { @@ -44,24 +41,26 @@ struct frct_cr {  };  struct frcti { -        int              fd; +        int               fd; + +        time_t            mpl; +        time_t            a; +        time_t            r; -        time_t           mpl; -        time_t           a; -        time_t           r; +        time_t            srtt_us;     /* smoothed rtt           */ +        time_t            mdev_us;     /* mdev                   */ +        time_t            rto;         /* retransmission timeout */ +        uint32_t          rttseq; +        struct timespec   t_probe;     /* probe time             */ +        bool              probe;       /* probe active           */ -        time_t           srtt_us;     /* smoothed rtt           */ -        time_t           mdev_us;     /* mdev                   */ -        time_t           rto;         /* retransmission timeout */ -        uint32_t         rttseq; -        struct timespec  t_probe;     /* probe time             */ -        bool             probe;       /* probe active           */ +        struct frct_cr    snd_cr; +        struct frct_cr    rcv_cr; -        struct frct_cr   snd_cr; -        struct frct_cr   rcv_cr; +        struct rxmwheel * rw; -        ssize_t          rq[RQ_SIZE]; -        pthread_rwlock_t lock; +        ssize_t           rq[RQ_SIZE]; +        pthread_rwlock_t  lock;  };  enum frct_flags { @@ -111,28 +110,35 @@ static struct frcti * frcti_create(int fd)          frcti->r   = DELT_R;          frcti->fd  = fd; -        delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; +        delta_t = frcti->mpl + frcti->a + frcti->r; -        frcti->snd_cr.inact  = 3 * delta_t; -        frcti->snd_cr.act    = now.tv_sec - (frcti->snd_cr.inact + 1); +        frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ +        frcti->snd_cr.act   = now.tv_sec - (frcti->snd_cr.inact + 1); -        frcti->rttseq        = 0; -        frcti->probe         = false; +        frcti->rttseq       = 0; +        frcti->probe        = false; -        frcti->srtt_us       = 0;      /* updated on first ACK */ -        frcti->mdev_us       = 10000;  /* initial rxm will be after 20 ms */ -        frcti->rto           = 20000;  /* initial rxm will be after 20 ms */ +        frcti->srtt_us      = 0;      /* updated on first ACK */ +        frcti->mdev_us      = 10000;  /* initial rxm will be after 20 ms */ +        frcti->rto          = 20000;  /* initial rxm will be after 20 ms */ +        frcti->rw           = NULL;          if (ai.flows[fd].qs.loss == 0) {                  frcti->snd_cr.cflags |= FRCTFRTX;                  frcti->rcv_cr.cflags |= FRCTFRTX; +                frcti->rw = rxmwheel_create(); +                if (frcti->rw == NULL) +                        goto fail_rw;          } -        frcti->rcv_cr.inact  = 2 * delta_t; -        frcti->rcv_cr.act    = now.tv_sec - (frcti->rcv_cr.inact + 1); +        frcti->rcv_cr.inact = 2 * delta_t /  MILLION; /* s */ +        frcti->rcv_cr.act   = now.tv_sec - (frcti->rcv_cr.inact + 1); +          return frcti; + fail_rw: +        pthread_rwlock_destroy(&frcti->lock);   fail_lock:          free(frcti);   fail_malloc: @@ -146,7 +152,8 @@ static void frcti_destroy(struct frcti * frcti)           * make sure everything we sent is acked.           */ -        rxmwheel_clear(frcti->fd); +        if (frcti->rw != NULL) +                rxmwheel_destroy(frcti->rw);          pthread_rwlock_destroy(&frcti->lock); @@ -229,13 +236,15 @@ static int __frcti_snd(struct frcti *       frcti,          struct timespec   now;          struct frct_cr *  snd_cr;          struct frct_cr *  rcv_cr; +        uint32_t          seqno;          assert(frcti);          snd_cr = &frcti->snd_cr;          rcv_cr = &frcti->rcv_cr; -        rxmwheel_move(); +        if (frcti->rw != NULL) +                rxmwheel_move(frcti->rw);          pci = frcti_alloc_head(sdb);          if (pci == NULL) @@ -259,7 +268,9 @@ static int __frcti_snd(struct frcti *       frcti,                  frcti->snd_cr.lwe = snd_cr->seqno - 1;          } -        pci->seqno = hton32(snd_cr->seqno); +        seqno = snd_cr->seqno; +        pci->seqno = hton32(seqno); +          if (!(snd_cr->cflags & FRCTFRTX)) {                  snd_cr->lwe++;          } else { @@ -269,8 +280,6 @@ static int __frcti_snd(struct frcti *       frcti,                          frcti->probe   = true;                  } -                rxmwheel_add(frcti, snd_cr->seqno, sdb); -                  if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {                          pci->flags |= FRCT_ACK;                          pci->ackno = hton32(rcv_cr->lwe); @@ -282,6 +291,9 @@ static int __frcti_snd(struct frcti *       frcti,          pthread_rwlock_unlock(&frcti->lock); +        if (frcti->rw != NULL) +                rxmwheel_add(frcti->rw, frcti, seqno, sdb); +          return 0;  } @@ -384,13 +396,13 @@ static int __frcti_rcv(struct frcti *       frcti,          if (ret == 0 && !(pci->flags & FRCT_DATA))                  shm_rdrbuff_remove(ai.rdrb, idx); -        rxmwheel_move(); +        if (frcti->rw != NULL) +                rxmwheel_move(frcti->rw);          return ret;   drop_packet:          pthread_rwlock_unlock(&frcti->lock);          shm_rdrbuff_remove(ai.rdrb, idx); -          return -EAGAIN;  } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 28cd78de..dbdd9377 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -29,7 +29,6 @@  #define RXMQ_MAX   (1 << RXMQ_M)      /* us                       */  /* Small inacurracy to avoid slow division by MILLION. */ -#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))  #define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))  #define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) @@ -39,26 +38,28 @@ struct rxm {          struct shm_du_buff * sdb;          uint8_t *            head;          uint8_t *            tail; -        time_t               t0;    /* Time when original was sent (s).  */ +        time_t               t0;    /* Time when original was sent (us). */          size_t               mul;   /* RTO multiplier.                   */          struct frcti *       frcti;  }; -struct { +struct rxmwheel {          struct list_head wheel[RXMQ_SLOTS];          size_t           prv; /* Last processed slot. */          pthread_mutex_t  lock; -} rw; +}; -static void rxmwheel_fini(void) +static void rxmwheel_destroy(struct rxmwheel * rw)  {          size_t             i;          struct list_head * p;          struct list_head * h; +        pthread_mutex_destroy(&rw->lock); +          for (i = 0; i < RXMQ_SLOTS; ++i) { -                list_for_each_safe(p, h, &rw.wheel[i]) { +                list_for_each_safe(p, h, &rw->wheel[i]) {                          struct rxm * rxm = list_entry(p, struct rxm, next);                          list_del(&rxm->next);                          shm_du_buff_ack(rxm->sdb); @@ -68,66 +69,49 @@ static void rxmwheel_fini(void)          }  } -static int rxmwheel_init(void) +static struct rxmwheel * rxmwheel_create(void)  { -        struct timespec now; -        size_t          i; +        struct rxmwheel * rw; +        struct timespec   now; +        size_t            i; -        if (pthread_mutex_init(&rw.lock, NULL)) -                return -1; +        rw = malloc(sizeof(*rw)); +        if (rw == NULL) +                return NULL; + +        if (pthread_mutex_init(&rw->lock, NULL)) { +                free(rw); +                return NULL; +        }          clock_gettime(PTHREAD_COND_CLOCK, &now);          /* Mark the previous timeslot as the last one processed. */ -        rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); +        rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);          for (i = 0; i < RXMQ_SLOTS; ++i) -                list_head_init(&rw.wheel[i]); - -        return 0; -} - -static void rxmwheel_clear(int fd) -{ -        size_t i; +                list_head_init(&rw->wheel[i]); -        /* FIXME: Add list element to avoid looping over full rxmwheel. */ -        pthread_mutex_lock(&rw.lock); - -        for (i = 0; i < RXMQ_SLOTS; ++i) { -                struct list_head * p; -                struct list_head * h; - -                list_for_each_safe(p, h, &rw.wheel[i]) { -                        struct rxm * r = list_entry(p, struct rxm, next); -                        if (r->frcti->fd == fd) { -                                list_del(&r->next); -                                shm_du_buff_ack(r->sdb); -                                ipcp_sdb_release(r->sdb); -                                free(r); -                        } -                } -        } - -        pthread_mutex_unlock(&rw.lock); +        return rw;  }  static void check_probe(struct frcti * frcti,                          uint32_t       seqno)  { -        /* disable rtt probe if this packet */ +        /* Disable rtt probe on retransmitted packet! */ -        /* TODO: This should be locked, but lock reversal! */ +        pthread_rwlock_wrlock(&frcti->lock);          if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {                  /* Backoff to avoid never updating rtt */                  frcti->srtt_us += frcti->mdev_us;                  frcti->probe = false;          } + +        pthread_rwlock_unlock(&frcti->lock);  } -/* Return fd on r-timer expiry. */ -static int rxmwheel_move(void) +static void rxmwheel_move(struct rxmwheel * rw)  {          struct timespec    now;          struct list_head * p; @@ -135,19 +119,22 @@ static int rxmwheel_move(void)          size_t             slot;          size_t             i; -        pthread_mutex_lock(&rw.lock); +        pthread_mutex_lock(&rw->lock); + +        pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, +                             (void *) &rw->lock);          clock_gettime(PTHREAD_COND_CLOCK, &now);          slot = ts_to_slot(now); -        i = rw.prv; +        i = rw->prv;          if (slot < i)                  slot += RXMQ_SLOTS;          while (i++ < slot) { -                list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) { +                list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {                          struct rxm *         r;                          struct frct_cr *     snd_cr;                          struct frct_cr *     rcv_cr; @@ -156,42 +143,55 @@ static int rxmwheel_move(void)                          struct shm_du_buff * sdb;                          uint8_t *            head;                          struct flow *        f; +                        int                  fd; +                        uint32_t             snd_lwe; +                        uint32_t             rcv_lwe; +                        time_t               rto;                          r = list_entry(p, struct rxm, next); +                          list_del(&r->next);                          snd_cr = &r->frcti->snd_cr;                          rcv_cr = &r->frcti->rcv_cr; +                        fd     = r->frcti->fd; +                        f      = &ai.flows[fd];                          shm_du_buff_ack(r->sdb); +                        pthread_rwlock_rdlock(&r->frcti->lock); + +                        snd_lwe = snd_cr->lwe; +                        rcv_lwe = rcv_cr->lwe; +                        rto     = r->frcti->rto; + +                        pthread_rwlock_unlock(&r->frcti->lock); +                          /* Has been ack'd, remove. */ -                        if ((int) (r->seqno - snd_cr->lwe) < 0) { +                        if ((int) (r->seqno - snd_lwe) < 0) {                                  ipcp_sdb_release(r->sdb);                                  free(r);                                  continue;                          } -                        /* Disable using this seqno as rto probe. */ -                        check_probe(r->frcti, r->seqno); -                          /* Check for r-timer expiry. */ -                        if (ts_to_ms(now) - r->t0 > r->frcti->r) { -                                int fd = r->frcti->fd; -                                pthread_mutex_unlock(&rw.lock); +                        if (ts_to_us(now) - r->t0 > r->frcti->r) {                                  ipcp_sdb_release(r->sdb);                                  free(r); -                                return fd; +                                shm_rbuff_set_acl(ai.flows[fd].rx_rb, +                                                  ACL_FLOWDOWN); +                                shm_rbuff_set_acl(ai.flows[fd].tx_rb, +                                                  ACL_FLOWDOWN); +                                continue;                          }                          /* Copy the payload, safe rtx in other layers. */                          if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { -                                /* FIXME: reschedule send instead of failing? */ -                                int fd = r->frcti->fd; -                                pthread_mutex_unlock(&rw.lock);                                  ipcp_sdb_release(r->sdb);                                  free(r); -                                return fd; +                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); +                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); +                                continue;                          }                          idx = shm_du_buff_get_idx(sdb); @@ -202,20 +202,20 @@ static int rxmwheel_move(void)                          /* Release the old copy. */                          ipcp_sdb_release(r->sdb); +                        /* Disable using this seqno as rto probe. */ +                        check_probe(r->frcti, r->seqno); +                          /* Update ackno and make sure DRF is not set. */ -                        ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); +                        ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe);                          ((struct frct_pci *) head)->flags &= ~FRCT_DRF; -                        f = &ai.flows[r->frcti->fd]; - -                        /* Retransmit the copy. FIXME: cancel flow */ -                        if (shm_rbuff_write(f->tx_rb, idx)) { -                                int fd = r->frcti->fd; -                                pthread_mutex_unlock(&rw.lock); +                        /* Retransmit the copy. */ +                        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {                                  ipcp_sdb_release(sdb);                                  free(r); -                                /* FIXME: reschedule send? */ -                                return fd; +                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); +                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); +                                continue;                          }                          /* Reschedule. */ @@ -228,21 +228,20 @@ static int rxmwheel_move(void)                          r->sdb  = sdb;                          /* Schedule at least in the next time slot */ -                        rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1)) +                        rslot = (slot + MAX(rto >> RXMQ_R, 1))                                  & (RXMQ_SLOTS - 1); -                        list_add_tail(&r->next, &rw.wheel[rslot]); +                        list_add_tail(&r->next, &rw->wheel[rslot]);                  }          } -        rw.prv = slot & (RXMQ_SLOTS - 1); - -        pthread_mutex_unlock(&rw.lock); +        rw->prv = slot & (RXMQ_SLOTS - 1); -        return 0; +        pthread_cleanup_pop(true);  } -static int rxmwheel_add(struct frcti *       frcti, +static int rxmwheel_add(struct rxmwheel *    rw, +                        struct frcti *       frcti,                          uint32_t             seqno,                          struct shm_du_buff * sdb)  { @@ -254,8 +253,6 @@ static int rxmwheel_add(struct frcti *       frcti,          if (r == NULL)                  return -ENOMEM; -        pthread_mutex_lock(&rw.lock); -          clock_gettime(PTHREAD_COND_CLOCK, &now);          r->t0    = ts_to_us(now); @@ -266,13 +263,19 @@ static int rxmwheel_add(struct frcti *       frcti,          r->tail  = shm_du_buff_tail(sdb);          r->frcti = frcti; +        pthread_rwlock_rdlock(&r->frcti->lock); +          slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); -        list_add_tail(&r->next, &rw.wheel[slot]); +        pthread_rwlock_unlock(&r->frcti->lock); + +        pthread_mutex_lock(&rw->lock); + +        list_add_tail(&r->next, &rw->wheel[slot]);          shm_du_buff_wait_ack(sdb); -        pthread_mutex_unlock(&rw.lock); +        pthread_mutex_unlock(&rw->lock);          return 0;  } diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 00ffd583..91eb8b5f 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -109,7 +109,9 @@ int shm_rbuff_write_b(struct shm_rbuff *      rb,          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) rb->lock); -        while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { +        while (!shm_rbuff_free(rb) +               && ret != -ETIMEDOUT +               && !(*rb->acl & ACL_FLOWDOWN)) {                  if (abstime != NULL)                          ret = -pthread_cond_timedwait(rb->del,                                                        rb->lock, @@ -187,7 +189,9 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) rb->lock); -        while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { +        while (shm_rbuff_empty(rb) +               && (idx != -ETIMEDOUT) +               && !(*rb->acl & ACL_FLOWDOWN)) {                  if (abstime != NULL)                          idx = -pthread_cond_timedwait(rb->add,                                                        rb->lock, @@ -224,6 +228,9 @@ void shm_rbuff_set_acl(struct shm_rbuff * rb,  #endif          *rb->acl = (size_t) flags; +        pthread_cond_broadcast(rb->del); +        pthread_cond_broadcast(rb->add); +          pthread_mutex_unlock(rb->lock);  } | 
