diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/frct.c | 91 | ||||
| -rw-r--r-- | src/lib/rxmwheel.c | 36 | 
2 files changed, 68 insertions, 59 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index e4b858d0..bc07be5a 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,9 +21,9 @@   */  /* Default Delta-t parameters */ -#define DELT_MPL       60000 /* ms */ -#define DELT_A         0     /* ms */ -#define DELT_R         2000  /* ms */ +#define DELT_MPL       60000  /* ms */ +#define DELT_A         3000   /* ms */ +#define DELT_R         20000  /* ms */  #define RQ_SIZE        64 @@ -50,11 +50,12 @@ struct frcti {          time_t           a;          time_t           r; -        time_t           srtt_us;     /* smoothed rtt */ -        time_t           mdev_us;     /* mdev         */ +        time_t           srtt_us;     /* smoothed rtt           */ +        time_t           mdev_us;     /* mdev                   */ +        time_t           rto;         /* retransmission timeout */          uint32_t         rttseq; -        struct timespec  t_probe;     /* probe time   */ -        bool             probe;       /* probe active */ +        struct timespec  t_probe;     /* probe time             */ +        bool             probe;       /* probe active           */          struct frct_cr   snd_cr;          struct frct_cr   rcv_cr; @@ -114,12 +115,14 @@ static struct frcti * frcti_create(int fd)          frcti->snd_cr.inact  = 3 * delta_t;          frcti->snd_cr.act    = now.tv_sec - (frcti->snd_cr.inact + 1); -        /* rtt estimator. rto is currently srtt + 2 * mdev */ -        frcti->srtt_us       = 0;      /* updated on first ACK */ -        frcti->mdev_us       = 100000; /* initial rxm will be after 200 ms */ +          frcti->rttseq        = 0;          frcti->probe         = false; +        frcti->srtt_us       = 0;       /* updated on first ACK */ +        frcti->mdev_us       = 100000;  /* initial rxm will be after 200 ms */ +        frcti->rto           = 200000;  /* initial rxm will be after 200 ms */ +          if (ai.flows[fd].qs.loss == 0) {                  frcti->snd_cr.cflags |= FRCTFRTX;                  frcti->rcv_cr.cflags |= FRCTFRTX; @@ -184,10 +187,10 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)          /* See if we already have the next PDU. */          pthread_rwlock_wrlock(&frcti->lock); -        pos = frcti->rcv_cr.seqno & (RQ_SIZE - 1); +        pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);          idx = frcti->rq[pos];          if (idx != -1) { -                ++frcti->rcv_cr.seqno; +                ++frcti->rcv_cr.lwe;                  frcti->rq[pos] = -1;          } @@ -252,11 +255,7 @@ static int __frcti_snd(struct frcti *       frcti,          if (now.tv_sec - snd_cr->act > snd_cr->inact) {                  /* There are no unacknowledged packets. */                  assert(snd_cr->seqno == snd_cr->lwe); -#ifdef CONFIG_OUROBOROS_DEBUG -                snd_cr->seqno = 0; -#else                  random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); -#endif                  frcti->snd_cr.lwe = snd_cr->seqno - 1;          } @@ -270,13 +269,11 @@ static int __frcti_snd(struct frcti *       frcti,                          frcti->probe   = true;                  } +                rxmwheel_add(frcti, snd_cr->seqno, sdb); +                  if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { -                        rxmwheel_add(frcti, snd_cr->seqno, sdb); -                        if (rcv_cr->lwe <= rcv_cr->seqno) { -                                pci->flags |= FRCT_ACK; -                                pci->ackno = hton32(rcv_cr->seqno); -                                rcv_cr->lwe = rcv_cr->seqno; -                        } +                        pci->flags |= FRCT_ACK; +                        pci->ackno = hton32(rcv_cr->lwe);                  }          } @@ -291,21 +288,23 @@ static int __frcti_snd(struct frcti *       frcti,  static void rtt_estimator(struct frcti * frcti,                            time_t         mrtt_us)  { -        time_t srtt = frcti->srtt_us; -        time_t mdev = frcti->mdev_us; - -        if (srtt != 0) { -                srtt -= (srtt >> 3); -                srtt += mrtt_us >> 3;   /* rtt = 7/8 rtt + 1/8 new */ -                mdev -=  (mdev >> 2); -                mdev += ABS(srtt - mrtt_us) >> 2; +        time_t srtt     = frcti->srtt_us; +        time_t rttvar   = frcti->mdev_us; + +        if (srtt == 0) { /* first measurement */ +                srtt   = mrtt_us; +                rttvar = mrtt_us >> 1; +          } else { -                srtt = mrtt_us << 3;    /* take the measured time to be rtt */ -                mdev = mrtt_us >> 1;    /* take half mrtt_us as deviation */ +                time_t delta = mrtt_us - srtt; +                srtt += (delta >> 3); +                rttvar -= rttvar >> 2; +                rttvar += ABS(delta) >> 2;          } -        frcti->srtt_us = MAX(1U, srtt); -        frcti->mdev_us = MAX(1U, mdev); +        frcti->srtt_us     = MAX(1U, srtt); +        frcti->mdev_us     = MAX(1U, rttvar); +        frcti->rto         = srtt + (rttvar >> 2);  }  /* Returns 0 when idx contains a packet for the application. */ @@ -339,35 +338,39 @@ static int __frcti_rcv(struct frcti *       frcti,          if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {                  /* Inactive receiver, check for DRF. */                  if (pci->flags & FRCT_DRF) /* New run. */ -                        rcv_cr->seqno = seqno; +                        rcv_cr->lwe = seqno;                  else                          goto drop_packet;          } -        if (seqno == rcv_cr->seqno) { -                ++rcv_cr->seqno; +        if (seqno == rcv_cr->lwe) { +                ++rcv_cr->lwe;          } else { /* Out of order. */ -                if (before(seqno, rcv_cr->seqno)) +                if (before(seqno, rcv_cr->lwe) )                          goto drop_packet;                  if (rcv_cr->cflags & FRCTFRTX) {                          size_t pos = seqno & (RQ_SIZE - 1); -                        if ((seqno - rcv_cr->lwe) > RQ_SIZE /* Out of rq. */ -                            || frcti->rq[pos] != -1) /* Duplicate in rq. */ -                                goto drop_packet; +                        if ((seqno - rcv_cr->lwe) >= RQ_SIZE) +                                goto drop_packet; /* Out of rq. */ + +                        if (frcti->rq[pos] != -1) +                                goto drop_packet; /* Duplicate in rq */ +                          /* Queue. */                          frcti->rq[pos] = idx;                          ret = -EAGAIN;                  } else { -                        rcv_cr->seqno = seqno + 1; +                        rcv_cr->lwe = seqno + 1;                  }          }          if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) {                  uint32_t ackno = ntoh32(pci->ackno);                  /* Check for duplicate (old) acks. */ -                if ((int32_t)(ackno - snd_cr->lwe) >= 0) +                if ((int32_t)(ackno - snd_cr->lwe) > 0)                          snd_cr->lwe = ackno; +                  if (frcti->probe && after(ackno, frcti->rttseq)) {                          rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now));                          frcti->probe = false; @@ -378,7 +381,7 @@ static int __frcti_rcv(struct frcti *       frcti,          pthread_rwlock_unlock(&frcti->lock); -        if (!(pci->flags & FRCT_DATA)) +        if (ret == 0 && !(pci->flags & FRCT_DATA))                  shm_rdrbuff_remove(ai.rdrb, idx);          rxmwheel_move(); diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index ce7ef8e4..3f01a0d3 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -22,11 +22,11 @@  #include <ouroboros/list.h> -#define RXMQ_S     16                 /* defines #slots     */ -#define RXMQ_M     24                 /* defines max delay  */ -#define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution */ +#define RXMQ_S     16                 /* defines #slots           */ +#define RXMQ_M     24                 /* defines max delay  (us)  */ +#define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution (us)  */  #define RXMQ_SLOTS (1 << RXMQ_S) -#define RXMQ_MAX   (1 << RXMQ_M)      /* ms                 */ +#define RXMQ_MAX   (1 << RXMQ_M)      /* us                       */  /* Small inacurracy to avoid slow division by MILLION. */  #define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) @@ -119,12 +119,11 @@ static void check_probe(struct frcti * frcti,          if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {                  /* Backoff to avoid never updating rtt */ -                frcti->srtt_us <<= 1; +                frcti->srtt_us += frcti->mdev_us;                  frcti->probe = false;          }  } -#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1))  /* Return fd on r-timer expiry. */  static int rxmwheel_move(void)  { @@ -136,12 +135,17 @@ static int rxmwheel_move(void)          clock_gettime(PTHREAD_COND_CLOCK, &now); +        pthread_mutex_lock(&rw.lock); +          slot = ts_to_slot(now); -        pthread_mutex_lock(&rw.lock); +        i = rw.prv; -        for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { -                list_for_each_safe(p, h, &rw.wheel[i]) { +        if (slot < i) +                slot += RXMQ_SLOTS; + +        while (i++ < slot) { +                list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) {                          struct rxm *         r;                          struct frct_cr *     snd_cr;                          struct frct_cr *     rcv_cr; @@ -158,7 +162,7 @@ static int rxmwheel_move(void)                          snd_cr = &r->frcti->snd_cr;                          rcv_cr = &r->frcti->rcv_cr;                          /* Has been ack'd, remove. */ -                        if ((int) (r->seqno - snd_cr->lwe) <= 0) { +                        if ((int) (r->seqno - snd_cr->lwe) < 0) {                                  shm_du_buff_ack(r->sdb);                                  ipcp_sdb_release(r->sdb);                                  free(r); @@ -180,7 +184,7 @@ static int rxmwheel_move(void)                          /* Copy the payload, safe rtx in other layers. */                          if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { -                                /* FIXME: reschedule send? */ +                                /* FIXME: reschedule send instead of failing? */                                  int fd = r->frcti->fd;                                  pthread_mutex_unlock(&rw.lock);                                  shm_du_buff_ack(r->sdb); @@ -206,10 +210,12 @@ static int rxmwheel_move(void)                          /* Retransmit the copy. */                          if (shm_rbuff_write(f->tx_rb, idx)) { +                                int fd = r->frcti->fd; +                                pthread_mutex_unlock(&rw.lock);                                  ipcp_sdb_release(sdb);                                  free(r);                                  /* FIXME: reschedule send? */ -                                continue; +                                return fd;                          }                          shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); @@ -221,14 +227,14 @@ static int rxmwheel_move(void)                          r->tail = shm_du_buff_tail(sdb);                          r->sdb  = sdb; -                        newtime = ts_to_us(now) + rto(f->frcti); +                        newtime = ts_to_us(now) + f->frcti->rto;                          rslot   = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1);                          list_add_tail(&r->next, &rw.wheel[rslot]);                  }          } -        rw.prv = slot; +        rw.prv = slot & (RXMQ_SLOTS - 1);          pthread_mutex_unlock(&rw.lock); @@ -259,7 +265,7 @@ static int rxmwheel_add(struct frcti *       frcti,          r->tail  = shm_du_buff_tail(sdb);          r->frcti = frcti; -        slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1); +        slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1);          list_add_tail(&r->next, &rw.wheel[slot]);  | 
