diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2020-05-01 18:23:58 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2020-05-02 11:34:28 +0200 | 
| commit | 25d1721e7dc9fa15c8a7c5513f30e636e9bda397 (patch) | |
| tree | 9a012ab53513ffc78bf122e448045cc6084c13a4 /src/lib | |
| parent | 6415d0f683dbe5f20d4d00c74bf75a795753f444 (diff) | |
| download | ouroboros-25d1721e7dc9fa15c8a7c5513f30e636e9bda397.tar.gz ouroboros-25d1721e7dc9fa15c8a7c5513f30e636e9bda397.zip  | |
lib: Create an rxmwheel per flow
The single retransmission wheel caused locking headaches as the calls
for different flows could block on the same rxmwheel. This stabilizes
the stack, but if the rdrbuff gets full there can now be big delays.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 21 | ||||
| -rw-r--r-- | src/lib/frct.c | 86 | ||||
| -rw-r--r-- | src/lib/rxmwheel.c | 159 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_pthr.c | 11 | 
4 files changed, 149 insertions, 128 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 80d7e9ad..e8989a48 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -429,9 +429,6 @@ static void init(int     argc,          if (pthread_rwlock_init(&ai.lock, NULL))                  goto fail_lock; -        if (rxmwheel_init()) -                goto fail_rxmwheel; -          ai.fqset = shm_flow_set_open(getpid());          if (ai.fqset == NULL)                  goto fail_fqset; @@ -439,8 +436,6 @@ static void init(int     argc,          return;   fail_fqset: -        rxmwheel_fini(); - fail_rxmwheel:          pthread_rwlock_destroy(&ai.lock);   fail_lock:          for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -474,8 +469,6 @@ static void fini(void)          if (ai.fds == NULL)                  return; -        rxmwheel_fini(); -          if (ai.prog != NULL)                  free(ai.prog); @@ -1080,15 +1073,16 @@ ssize_t flow_read(int    fd,          flow = &ai.flows[fd]; +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        pthread_rwlock_rdlock(&ai.lock); +          if (flow->part_idx == DONE_PART) { +                pthread_rwlock_unlock(&ai.lock);                  flow->part_idx = NO_PART;                  return 0;          } -        clock_gettime(PTHREAD_COND_CLOCK, &abs); - -        pthread_rwlock_rdlock(&ai.lock); -          if (flow->flow_id < 0) {                  pthread_rwlock_unlock(&ai.lock);                  return -ENOTALLOC; @@ -1141,8 +1135,13 @@ ssize_t flow_read(int    fd,          if (n <= (ssize_t) count) {                  memcpy(buf, packet, n);                  shm_rdrbuff_remove(ai.rdrb, idx); + +                pthread_rwlock_wrlock(&ai.lock); +                  flow->part_idx = (partrd && n == (ssize_t) count) ?                          DONE_PART : NO_PART; + +                pthread_rwlock_unlock(&ai.lock);                  return n;          } else {                  if (partrd) { diff --git a/src/lib/frct.c b/src/lib/frct.c index 0e9d64c7..3c180128 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,15 +21,12 @@   */  /* Default Delta-t parameters */ -#define DELT_MPL       60000  /* ms */ -#define DELT_A         3000   /* ms */ -#define DELT_R         20000  /* ms */ +#define DELT_MPL       (60 * MILLION) /* us */ +#define DELT_A          (1 * MILLION) /* us */ +#define DELT_R         (20 * MILLION) /* us */  #define RQ_SIZE        1024 -#define TW_ELEMENTS    6000 -#define TW_RESOLUTION  1     /* ms */ -  #define FRCT_PCILEN    (sizeof(struct frct_pci))  struct frct_cr { @@ -44,24 +41,26 @@ struct frct_cr {  };  struct frcti { -        int              fd; +        int               fd; + +        time_t            mpl; +        time_t            a; +        time_t            r; -        time_t           mpl; -        time_t           a; -        time_t           r; +        time_t            srtt_us;     /* smoothed rtt           */ +        time_t            mdev_us;     /* mdev                   */ +        time_t            rto;         /* retransmission timeout */ +        uint32_t          rttseq; +        struct timespec   t_probe;     /* probe time             */ +        bool              probe;       /* probe active           */ -        time_t           srtt_us;     /* smoothed rtt           */ -        time_t           mdev_us;     /* mdev                   */ -        time_t           rto;         /* retransmission timeout */ -        uint32_t         rttseq; -        struct timespec  t_probe;     /* probe time             */ -        bool             probe;       /* probe active           */ +        struct frct_cr    snd_cr; +        struct frct_cr    rcv_cr; -        struct frct_cr   snd_cr; -        struct frct_cr   rcv_cr; +        struct rxmwheel * rw; -        ssize_t          rq[RQ_SIZE]; -        pthread_rwlock_t lock; +        ssize_t           rq[RQ_SIZE]; +        pthread_rwlock_t  lock;  };  enum frct_flags { @@ -111,28 +110,35 @@ static struct frcti * frcti_create(int fd)          frcti->r   = DELT_R;          frcti->fd  = fd; -        delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; +        delta_t = frcti->mpl + frcti->a + frcti->r; -        frcti->snd_cr.inact  = 3 * delta_t; -        frcti->snd_cr.act    = now.tv_sec - (frcti->snd_cr.inact + 1); +        frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ +        frcti->snd_cr.act   = now.tv_sec - (frcti->snd_cr.inact + 1); -        frcti->rttseq        = 0; -        frcti->probe         = false; +        frcti->rttseq       = 0; +        frcti->probe        = false; -        frcti->srtt_us       = 0;      /* updated on first ACK */ -        frcti->mdev_us       = 10000;  /* initial rxm will be after 20 ms */ -        frcti->rto           = 20000;  /* initial rxm will be after 20 ms */ +        frcti->srtt_us      = 0;      /* updated on first ACK */ +        frcti->mdev_us      = 10000;  /* initial rxm will be after 20 ms */ +        frcti->rto          = 20000;  /* initial rxm will be after 20 ms */ +        frcti->rw           = NULL;          if (ai.flows[fd].qs.loss == 0) {                  frcti->snd_cr.cflags |= FRCTFRTX;                  frcti->rcv_cr.cflags |= FRCTFRTX; +                frcti->rw = rxmwheel_create(); +                if (frcti->rw == NULL) +                        goto fail_rw;          } -        frcti->rcv_cr.inact  = 2 * delta_t; -        frcti->rcv_cr.act    = now.tv_sec - (frcti->rcv_cr.inact + 1); +        frcti->rcv_cr.inact = 2 * delta_t /  MILLION; /* s */ +        frcti->rcv_cr.act   = now.tv_sec - (frcti->rcv_cr.inact + 1); +          return frcti; + fail_rw: +        pthread_rwlock_destroy(&frcti->lock);   fail_lock:          free(frcti);   fail_malloc: @@ -146,7 +152,8 @@ static void frcti_destroy(struct frcti * frcti)           * make sure everything we sent is acked.           */ -        rxmwheel_clear(frcti->fd); +        if (frcti->rw != NULL) +                rxmwheel_destroy(frcti->rw);          pthread_rwlock_destroy(&frcti->lock); @@ -229,13 +236,15 @@ static int __frcti_snd(struct frcti *       frcti,          struct timespec   now;          struct frct_cr *  snd_cr;          struct frct_cr *  rcv_cr; +        uint32_t          seqno;          assert(frcti);          snd_cr = &frcti->snd_cr;          rcv_cr = &frcti->rcv_cr; -        rxmwheel_move(); +        if (frcti->rw != NULL) +                rxmwheel_move(frcti->rw);          pci = frcti_alloc_head(sdb);          if (pci == NULL) @@ -259,7 +268,9 @@ static int __frcti_snd(struct frcti *       frcti,                  frcti->snd_cr.lwe = snd_cr->seqno - 1;          } -        pci->seqno = hton32(snd_cr->seqno); +        seqno = snd_cr->seqno; +        pci->seqno = hton32(seqno); +          if (!(snd_cr->cflags & FRCTFRTX)) {                  snd_cr->lwe++;          } else { @@ -269,8 +280,6 @@ static int __frcti_snd(struct frcti *       frcti,                          frcti->probe   = true;                  } -                rxmwheel_add(frcti, snd_cr->seqno, sdb); -                  if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {                          pci->flags |= FRCT_ACK;                          pci->ackno = hton32(rcv_cr->lwe); @@ -282,6 +291,9 @@ static int __frcti_snd(struct frcti *       frcti,          pthread_rwlock_unlock(&frcti->lock); +        if (frcti->rw != NULL) +                rxmwheel_add(frcti->rw, frcti, seqno, sdb); +          return 0;  } @@ -384,13 +396,13 @@ static int __frcti_rcv(struct frcti *       frcti,          if (ret == 0 && !(pci->flags & FRCT_DATA))                  shm_rdrbuff_remove(ai.rdrb, idx); -        rxmwheel_move(); +        if (frcti->rw != NULL) +                rxmwheel_move(frcti->rw);          return ret;   drop_packet:          pthread_rwlock_unlock(&frcti->lock);          shm_rdrbuff_remove(ai.rdrb, idx); -          return -EAGAIN;  } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 28cd78de..dbdd9377 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -29,7 +29,6 @@  #define RXMQ_MAX   (1 << RXMQ_M)      /* us                       */  /* Small inacurracy to avoid slow division by MILLION. */ -#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))  #define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))  #define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) @@ -39,26 +38,28 @@ struct rxm {          struct shm_du_buff * sdb;          uint8_t *            head;          uint8_t *            tail; -        time_t               t0;    /* Time when original was sent (s).  */ +        time_t               t0;    /* Time when original was sent (us). */          size_t               mul;   /* RTO multiplier.                   */          struct frcti *       frcti;  }; -struct { +struct rxmwheel {          struct list_head wheel[RXMQ_SLOTS];          size_t           prv; /* Last processed slot. */          pthread_mutex_t  lock; -} rw; +}; -static void rxmwheel_fini(void) +static void rxmwheel_destroy(struct rxmwheel * rw)  {          size_t             i;          struct list_head * p;          struct list_head * h; +        pthread_mutex_destroy(&rw->lock); +          for (i = 0; i < RXMQ_SLOTS; ++i) { -                list_for_each_safe(p, h, &rw.wheel[i]) { +                list_for_each_safe(p, h, &rw->wheel[i]) {                          struct rxm * rxm = list_entry(p, struct rxm, next);                          list_del(&rxm->next);                          shm_du_buff_ack(rxm->sdb); @@ -68,66 +69,49 @@ static void rxmwheel_fini(void)          }  } -static int rxmwheel_init(void) +static struct rxmwheel * rxmwheel_create(void)  { -        struct timespec now; -        size_t          i; +        struct rxmwheel * rw; +        struct timespec   now; +        size_t            i; -        if (pthread_mutex_init(&rw.lock, NULL)) -                return -1; +        rw = malloc(sizeof(*rw)); +        if (rw == NULL) +                return NULL; + +        if (pthread_mutex_init(&rw->lock, NULL)) { +                free(rw); +                return NULL; +        }          clock_gettime(PTHREAD_COND_CLOCK, &now);          /* Mark the previous timeslot as the last one processed. */ -        rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); +        rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);          for (i = 0; i < RXMQ_SLOTS; ++i) -                list_head_init(&rw.wheel[i]); - -        return 0; -} - -static void rxmwheel_clear(int fd) -{ -        size_t i; +                list_head_init(&rw->wheel[i]); -        /* FIXME: Add list element to avoid looping over full rxmwheel. */ -        pthread_mutex_lock(&rw.lock); - -        for (i = 0; i < RXMQ_SLOTS; ++i) { -                struct list_head * p; -                struct list_head * h; - -                list_for_each_safe(p, h, &rw.wheel[i]) { -                        struct rxm * r = list_entry(p, struct rxm, next); -                        if (r->frcti->fd == fd) { -                                list_del(&r->next); -                                shm_du_buff_ack(r->sdb); -                                ipcp_sdb_release(r->sdb); -                                free(r); -                        } -                } -        } - -        pthread_mutex_unlock(&rw.lock); +        return rw;  }  static void check_probe(struct frcti * frcti,                          uint32_t       seqno)  { -        /* disable rtt probe if this packet */ +        /* Disable rtt probe on retransmitted packet! */ -        /* TODO: This should be locked, but lock reversal! */ +        pthread_rwlock_wrlock(&frcti->lock);          if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {                  /* Backoff to avoid never updating rtt */                  frcti->srtt_us += frcti->mdev_us;                  frcti->probe = false;          } + +        pthread_rwlock_unlock(&frcti->lock);  } -/* Return fd on r-timer expiry. */ -static int rxmwheel_move(void) +static void rxmwheel_move(struct rxmwheel * rw)  {          struct timespec    now;          struct list_head * p; @@ -135,19 +119,22 @@ static int rxmwheel_move(void)          size_t             slot;          size_t             i; -        pthread_mutex_lock(&rw.lock); +        pthread_mutex_lock(&rw->lock); + +        pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, +                             (void *) &rw->lock);          clock_gettime(PTHREAD_COND_CLOCK, &now);          slot = ts_to_slot(now); -        i = rw.prv; +        i = rw->prv;          if (slot < i)                  slot += RXMQ_SLOTS;          while (i++ < slot) { -                list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) { +                list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {                          struct rxm *         r;                          struct frct_cr *     snd_cr;                          struct frct_cr *     rcv_cr; @@ -156,42 +143,55 @@ static int rxmwheel_move(void)                          struct shm_du_buff * sdb;                          uint8_t *            head;                          struct flow *        f; +                        int                  fd; +                        uint32_t             snd_lwe; +                        uint32_t             rcv_lwe; +                        time_t               rto;                          r = list_entry(p, struct rxm, next); +                          list_del(&r->next);                          snd_cr = &r->frcti->snd_cr;                          rcv_cr = &r->frcti->rcv_cr; +                        fd     = r->frcti->fd; +                        f      = &ai.flows[fd];                          shm_du_buff_ack(r->sdb); +                        pthread_rwlock_rdlock(&r->frcti->lock); + +                        snd_lwe = snd_cr->lwe; +                        rcv_lwe = rcv_cr->lwe; +                        rto     = r->frcti->rto; + +                        pthread_rwlock_unlock(&r->frcti->lock); +                          /* Has been ack'd, remove. */ -                        if ((int) (r->seqno - snd_cr->lwe) < 0) { +                        if ((int) (r->seqno - snd_lwe) < 0) {                                  ipcp_sdb_release(r->sdb);                                  free(r);                                  continue;                          } -                        /* Disable using this seqno as rto probe. */ -                        check_probe(r->frcti, r->seqno); -                          /* Check for r-timer expiry. */ -                        if (ts_to_ms(now) - r->t0 > r->frcti->r) { -                                int fd = r->frcti->fd; -                                pthread_mutex_unlock(&rw.lock); +                        if (ts_to_us(now) - r->t0 > r->frcti->r) {                                  ipcp_sdb_release(r->sdb);                                  free(r); -                                return fd; +                                shm_rbuff_set_acl(ai.flows[fd].rx_rb, +                                                  ACL_FLOWDOWN); +                                shm_rbuff_set_acl(ai.flows[fd].tx_rb, +                                                  ACL_FLOWDOWN); +                                continue;                          }                          /* Copy the payload, safe rtx in other layers. */                          if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { -                                /* FIXME: reschedule send instead of failing? */ -                                int fd = r->frcti->fd; -                                pthread_mutex_unlock(&rw.lock);                                  ipcp_sdb_release(r->sdb);                                  free(r); -                                return fd; +                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); +                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); +                                continue;                          }                          idx = shm_du_buff_get_idx(sdb); @@ -202,20 +202,20 @@ static int rxmwheel_move(void)                          /* Release the old copy. */                          ipcp_sdb_release(r->sdb); +                        /* Disable using this seqno as rto probe. */ +                        check_probe(r->frcti, r->seqno); +                          /* Update ackno and make sure DRF is not set. */ -                        ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); +                        ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe);                          ((struct frct_pci *) head)->flags &= ~FRCT_DRF; -                        f = &ai.flows[r->frcti->fd]; - -                        /* Retransmit the copy. FIXME: cancel flow */ -                        if (shm_rbuff_write(f->tx_rb, idx)) { -                                int fd = r->frcti->fd; -                                pthread_mutex_unlock(&rw.lock); +                        /* Retransmit the copy. */ +                        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {                                  ipcp_sdb_release(sdb);                                  free(r); -                                /* FIXME: reschedule send? */ -                                return fd; +                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); +                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); +                                continue;                          }                          /* Reschedule. */ @@ -228,21 +228,20 @@ static int rxmwheel_move(void)                          r->sdb  = sdb;                          /* Schedule at least in the next time slot */ -                        rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1)) +                        rslot = (slot + MAX(rto >> RXMQ_R, 1))                                  & (RXMQ_SLOTS - 1); -                        list_add_tail(&r->next, &rw.wheel[rslot]); +                        list_add_tail(&r->next, &rw->wheel[rslot]);                  }          } -        rw.prv = slot & (RXMQ_SLOTS - 1); - -        pthread_mutex_unlock(&rw.lock); +        rw->prv = slot & (RXMQ_SLOTS - 1); -        return 0; +        pthread_cleanup_pop(true);  } -static int rxmwheel_add(struct frcti *       frcti, +static int rxmwheel_add(struct rxmwheel *    rw, +                        struct frcti *       frcti,                          uint32_t             seqno,                          struct shm_du_buff * sdb)  { @@ -254,8 +253,6 @@ static int rxmwheel_add(struct frcti *       frcti,          if (r == NULL)                  return -ENOMEM; -        pthread_mutex_lock(&rw.lock); -          clock_gettime(PTHREAD_COND_CLOCK, &now);          r->t0    = ts_to_us(now); @@ -266,13 +263,19 @@ static int rxmwheel_add(struct frcti *       frcti,          r->tail  = shm_du_buff_tail(sdb);          r->frcti = frcti; +        pthread_rwlock_rdlock(&r->frcti->lock); +          slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); -        list_add_tail(&r->next, &rw.wheel[slot]); +        pthread_rwlock_unlock(&r->frcti->lock); + +        pthread_mutex_lock(&rw->lock); + +        list_add_tail(&r->next, &rw->wheel[slot]);          shm_du_buff_wait_ack(sdb); -        pthread_mutex_unlock(&rw.lock); +        pthread_mutex_unlock(&rw->lock);          return 0;  } diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 00ffd583..91eb8b5f 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -109,7 +109,9 @@ int shm_rbuff_write_b(struct shm_rbuff *      rb,          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) rb->lock); -        while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { +        while (!shm_rbuff_free(rb) +               && ret != -ETIMEDOUT +               && !(*rb->acl & ACL_FLOWDOWN)) {                  if (abstime != NULL)                          ret = -pthread_cond_timedwait(rb->del,                                                        rb->lock, @@ -187,7 +189,9 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) rb->lock); -        while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { +        while (shm_rbuff_empty(rb) +               && (idx != -ETIMEDOUT) +               && !(*rb->acl & ACL_FLOWDOWN)) {                  if (abstime != NULL)                          idx = -pthread_cond_timedwait(rb->add,                                                        rb->lock, @@ -224,6 +228,9 @@ void shm_rbuff_set_acl(struct shm_rbuff * rb,  #endif          *rb->acl = (size_t) flags; +        pthread_cond_broadcast(rb->del); +        pthread_cond_broadcast(rb->add); +          pthread_mutex_unlock(rb->lock);  }  | 
