diff options
| -rw-r--r-- | include/ouroboros/utils.h | 1 | ||||
| -rw-r--r-- | src/lib/frct.c | 75 | ||||
| -rw-r--r-- | src/lib/rxmwheel.c | 32 | 
3 files changed, 90 insertions, 18 deletions
diff --git a/include/ouroboros/utils.h b/include/ouroboros/utils.h index f5b6686f..d40a1783 100644 --- a/include/ouroboros/utils.h +++ b/include/ouroboros/utils.h @@ -28,6 +28,7 @@  #define MIN(a,b) (((a) < (b)) ? (a) : (b))  #define MAX(a,b) (((a) > (b)) ? (a) : (b)) +#define ABS(a)   ((a) > 0 ? (a) : -(a))  typedef struct {          uint8_t * data; diff --git a/src/lib/frct.c b/src/lib/frct.c index 02c101c8..bd08d86a 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -50,7 +50,11 @@ struct frcti {          time_t           a;          time_t           r; -        time_t           rto; /* ms */ +        time_t           srtt_us;     /* smoothed rtt */ +        time_t           mdev_us;     /* mdev         */ +        uint32_t         rttseq; +        struct timespec  t_probe;     /* probe time   */ +        bool             probe;       /* probe active */          struct frct_cr   snd_cr;          struct frct_cr   rcv_cr; @@ -110,8 +114,11 @@ static struct frcti * frcti_create(int fd)          frcti->snd_cr.inact  = 3 * delta_t;          frcti->snd_cr.act    = now.tv_sec - (frcti->snd_cr.inact + 1); -        /* Initial rto. FIXME: recalc using Karn algorithm. */ -        frcti->rto           = 120; +        /* rtt estimator. rto is currently srtt + 2 * mdev */ +        frcti->srtt_us       = 0;      /* updated on first ACK */ +        frcti->mdev_us       = 100000; /* initial rxm will be after 200 ms */ +        frcti->rttseq        = 0; +        frcti->probe         = false;          if (ai.flows[fd].spec.loss == 0) {                  frcti->snd_cr.cflags |= FRCTFRTX; @@ -200,6 +207,18 @@ static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)          return pci;  } +static bool before(uint32_t seq1, +                   uint32_t seq2) +{ +        return (int32_t)(seq1 - seq2) < 0; +} + +static bool after(uint32_t seq1, +                  uint32_t seq2) +{ +        return (int32_t)(seq2 - seq1) < 0; +} +  static int __frcti_snd(struct frcti *       frcti,                         struct shm_du_buff * sdb)  { @@ -219,7 +238,7 @@ static int __frcti_snd(struct frcti *       frcti,          if (pci == NULL)                  return -1; -        clock_gettime(CLOCK_REALTIME_COARSE, &now); +        clock_gettime(CLOCK_REALTIME, &now);          pthread_rwlock_wrlock(&frcti->lock); @@ -244,12 +263,20 @@ static int __frcti_snd(struct frcti *       frcti,          pci->seqno = hton32(snd_cr->seqno);          if (!(snd_cr->cflags & FRCTFRTX)) {                  snd_cr->lwe++; -        } else if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { -                rxmwheel_add(frcti, snd_cr->seqno, sdb); -                if (rcv_cr->lwe <= rcv_cr->seqno) { -                        pci->flags |= FRCT_ACK; -                        pci->ackno = hton32(rcv_cr->seqno); -                        rcv_cr->lwe = rcv_cr->seqno; +        } else { +                if (!frcti->probe) { +                        frcti->rttseq  = snd_cr->seqno; +                        frcti->t_probe = now; +                        frcti->probe   = true; +                } + +                if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { +                        rxmwheel_add(frcti, snd_cr->seqno, sdb); +                        if (rcv_cr->lwe <= rcv_cr->seqno) { +                                pci->flags |= FRCT_ACK; +                                pci->ackno = hton32(rcv_cr->seqno); +                                rcv_cr->lwe = rcv_cr->seqno; +                        }                  }          } @@ -261,6 +288,26 @@ static int __frcti_snd(struct frcti *       frcti,          return 0;  } +static void rtt_estimator(struct frcti * frcti, +                          time_t         mrtt_us) +{ +        time_t srtt = frcti->srtt_us; +        time_t mdev = frcti->mdev_us; + +        if (srtt != 0) { +                srtt -= (srtt >> 3); +                srtt += mrtt_us >> 3;   /* rtt = 7/8 rtt + 1/8 new */ +                mdev -=  (mdev >> 2); +                mdev += ABS(srtt - mrtt_us) >> 2; +        } else { +                srtt = mrtt_us << 3;    /* take the measured time to be rtt */ +                mdev = mrtt_us >> 1;    /* take half mrtt_us as deviation */ +        } + +        frcti->srtt_us = MAX(1U, srtt); +        frcti->mdev_us = MAX(1U, mdev); +} +  /* Returns 0 when idx contains a packet for the application. */  static int __frcti_rcv(struct frcti *       frcti,                         struct shm_du_buff * sdb) @@ -280,7 +327,7 @@ static int __frcti_rcv(struct frcti *       frcti,          pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); -        clock_gettime(CLOCK_REALTIME_COARSE, &now); +        clock_gettime(CLOCK_REALTIME, &now);          pthread_rwlock_wrlock(&frcti->lock); @@ -300,7 +347,7 @@ static int __frcti_rcv(struct frcti *       frcti,          if (seqno == rcv_cr->seqno) {                  ++rcv_cr->seqno;          } else { /* Out of order. */ -                if ((int32_t)(seqno - rcv_cr->seqno) < 0) +                if (before(seqno, rcv_cr->seqno))                          goto drop_packet;                  if (rcv_cr->cflags & FRCTFRTX) { @@ -321,6 +368,10 @@ static int __frcti_rcv(struct frcti *       frcti,                  /* Check for duplicate (old) acks. */                  if ((int32_t)(ackno - snd_cr->lwe) >= 0)                          snd_cr->lwe = ackno; +                if (frcti->probe && after(ackno, frcti->rttseq)) { +                        rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now)); +                        frcti->probe = false; +                }          }          rcv_cr->act = now.tv_sec; diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 395a5d8f..56ac7298 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -22,15 +22,16 @@  #include <ouroboros/list.h> -#define RXMQ_S     12                 /* defines #slots     */ -#define RXMQ_M     15                 /* defines max delay  */ +#define RXMQ_S     16                 /* defines #slots     */ +#define RXMQ_M     24                 /* defines max delay  */  #define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution */  #define RXMQ_SLOTS (1 << RXMQ_S)  #define RXMQ_MAX   (1 << RXMQ_M)      /* ms                 */  /* Small inacurracy to avoid slow division by MILLION. */  #define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) -#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) +#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10)) +#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))  struct rxm {          struct list_head     next; @@ -109,6 +110,21 @@ static void rxmwheel_clear(int fd)          pthread_mutex_unlock(&rw.lock);  } +static void check_probe(struct frcti * frcti, +                        uint32_t       seqno) +{ +        /* disable rtt probe if this packet */ + +        /* TODO: This should be locked, but lock reversal! */ + +        if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { +                /* Backoff to avoid never updating rtt */ +                frcti->srtt_us <<= 1; +                frcti->probe = false; +        } +} + +#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1))  /* Return fd on r-timer expiry. */  static int rxmwheel_move(void)  { @@ -148,6 +164,10 @@ static int rxmwheel_move(void)                                  free(r);                                  continue;                          } + +                        /* Disable using this seqno as rto probe. */ +                        check_probe(r->frcti, r->seqno); +                          /* Check for r-timer expiry. */                          if (ts_to_ms(now) - r->t0 > r->frcti->r) {                                  int fd = r->frcti->fd; @@ -201,7 +221,7 @@ static int rxmwheel_move(void)                          r->tail = shm_du_buff_tail(sdb);                          r->sdb  = sdb; -                        newtime = ts_to_ms(now) + (f->frcti->rto << ++r->mul); +                        newtime = ts_to_us(now) + rto(f->frcti);                          rslot   = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1);                          list_add_tail(&r->next, &rw.wheel[rslot]); @@ -231,7 +251,7 @@ static int rxmwheel_add(struct frcti *       frcti,          pthread_mutex_lock(&rw.lock); -        r->t0    = ts_to_ms(now); +        r->t0    = ts_to_us(now);          r->mul   = 0;          r->seqno = seqno;          r->sdb   = sdb; @@ -239,7 +259,7 @@ static int rxmwheel_add(struct frcti *       frcti,          r->tail  = shm_du_buff_tail(sdb);          r->frcti = frcti; -        slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1); +        slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1);          list_add_tail(&r->next, &rw.wheel[slot]);  | 
