diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/frct.c | 109 | 
1 files changed, 55 insertions, 54 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index 2eb79fb4..395fa2d8 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -56,7 +56,7 @@ struct frcti {          struct frct_cr   snd_cr;          struct frct_cr   rcv_cr; -        size_t           rq[RQ_SIZE]; +        ssize_t          rq[RQ_SIZE];          struct timespec  rtt; @@ -107,20 +107,25 @@ static void frct_fini(void)  static struct frcti * frcti_create(int fd)  { -        struct frcti * frcti; -        time_t         delta_t; -        ssize_t        idx; +        struct frcti *  frcti; +        time_t          delta_t; +        ssize_t         idx; +        struct timespec now;          frcti = malloc(sizeof(*frcti));          if (frcti == NULL)                  goto fail_malloc; +        memset(frcti, 0, sizeof(*frcti)); +          if (pthread_rwlock_init(&frcti->lock, NULL))                  goto fail_lock;          for (idx = 0; idx < RQ_SIZE; ++idx)                  frcti->rq[idx] = -1; +        clock_gettime(CLOCK_REALTIME_COARSE, &now); +          frcti->mpl = DELT_MPL;          frcti->a   = DELT_A;          frcti->r   = DELT_R; @@ -128,23 +133,12 @@ static struct frcti * frcti_create(int fd)          delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; -        frcti->snd_cr.drf    = true;          frcti->snd_cr.conf   = true; -#ifdef OUROBOROS_CONFIG_DEBUG -        frcti->snd_cr.seqno  = 0; -#else -        random_buffer(&frcti->snd_cr.seqno, sizeof(frcti->snd_cr.seqno)); -#endif -        frcti->snd_cr.lwe    = 0; -        frcti->snd_cr.rwe    = 0; -        frcti->snd_cr.cflags = 0;          frcti->snd_cr.inact  = 3 * delta_t + 1; +        frcti->snd_cr.act    = now.tv_sec - (frcti->snd_cr.inact + 1); -        frcti->rcv_cr.drf    = true; -        frcti->rcv_cr.lwe    = 0; -        frcti->rcv_cr.rwe    = 0; -        frcti->rcv_cr.cflags = 0;          frcti->rcv_cr.inact  = 2 * delta_t + 1; +        frcti->rcv_cr.act    = now.tv_sec - (frcti->rcv_cr.inact + 1);          return frcti; @@ -226,10 +220,8 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)                  sdb = shm_rdrbuff_get(ai.rdrb, idx);                  pci = (struct frct_pci *) shm_du_buff_head(sdb) - 1; -                if (pci->flags & FRCT_CFG) { -                        assert(pci->flags & FRCT_DRF); +                if (pci->flags & FRCT_CFG)                          frcti->rcv_cr.cflags = pci->cflags; -                }                  ++frcti->rcv_cr.lwe;                  frcti->rq[pos] = -1; @@ -286,23 +278,8 @@ static int __frcti_snd(struct frcti *       frcti,          pthread_rwlock_wrlock(&frcti->lock); -        /* Check if sender is inactive. */ -        if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact) -                snd_cr->drf = true; -          pci->flags |= FRCT_DATA; -        /* Set the DRF in the first packet of a new run of SDUs. */ -        if (snd_cr->drf) { -                pci->flags |= FRCT_DRF; -                if (snd_cr->conf) { -                        pci->flags |= FRCT_CFG; -                        pci->cflags = snd_cr->cflags; -                } -        } - -        pci->seqno = hton32(snd_cr->seqno++); -          if (snd_cr->cflags & FRCTFERRCHCK) {                  uint8_t * tail = shm_du_buff_tail_alloc(sdb, FRCT_CRCLEN);                  if (tail == NULL) { @@ -315,9 +292,33 @@ static int __frcti_snd(struct frcti *       frcti,                  pci->flags |= FRCT_CRC;          } -        snd_cr->act = now.tv_sec; +        /* Set DRF if there are no unacknowledged packets. */ +        if (snd_cr->seqno == snd_cr->lwe) +                pci->flags |= FRCT_DRF; + +        if (snd_cr->conf) { +                /* FIXME: This packet must be acked! */ +                pci->flags |= FRCT_CFG; +                pci->cflags = snd_cr->cflags; +        } + +        /* Choose a new sequence number if sender inactivity expired. */ +        if (now.tv_sec - snd_cr->act > snd_cr->inact) { +                /* There are no unacknowledged packets. */ +                assert(snd_cr->seqno == snd_cr->lwe); +#ifdef OUROBOROS_CONFIG_DEBUG +                frcti->snd_cr.seqno = 0; +#else +                random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); +#endif +                frcti->snd_cr.lwe = frcti->snd_cr.seqno; +        } + +        pci->seqno = hton32(snd_cr->seqno++); +        if (!(snd_cr->cflags & FRCTFRTX)) +                snd_cr->lwe++; -        snd_cr->drf  = false; +        snd_cr->act  = now.tv_sec;          snd_cr->conf = false;          pthread_rwlock_unlock(&frcti->lock); @@ -354,13 +355,10 @@ static int __frcti_rcv(struct frcti *       frcti,                          goto drop_packet;          } -        /* Check if receiver inactivity is true. */ -        if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact) -                rcv_cr->drf = true; -          seqno = ntoh32(pci->seqno); -        if (rcv_cr->drf) { +        /* Check if receiver inactivity is true. */ +        if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {                  /* Inactive receiver, check for DRF. */                  if (pci->flags & FRCT_DRF) /* New run. */                          rcv_cr->lwe = seqno; @@ -368,24 +366,27 @@ static int __frcti_rcv(struct frcti *       frcti,                          goto drop_packet;          } -        /* Queue the PDU if needed. */ -        if (rcv_cr->cflags & FRCTFORDERING) { -                if (seqno < rcv_cr->lwe || seqno > rcv_cr->lwe + RQ_SIZE) +        if (seqno == rcv_cr->lwe) { +                ++rcv_cr->lwe; +                /* Check for online reconfiguration. */ +                if (pci->flags & FRCT_CFG) +                        rcv_cr->cflags = pci->cflags; +        } else { /* Out of order. */ +                if (seqno < rcv_cr->lwe) /* Duplicate. */                          goto drop_packet; -                if (seqno == rcv_cr->lwe) { -                        ++rcv_cr->lwe; -                        /* Check for online reconfiguration. */ -                        if (pci->flags & FRCT_CFG) { -                                assert(pci->flags & FRCT_DRF); -                                rcv_cr->cflags = pci->cflags; -                        } +                if (rcv_cr->cflags & FRCTFRTX) { +                        size_t pos = seqno & (RQ_SIZE - 1); +                        if (seqno > rcv_cr->lwe + RQ_SIZE || /* Out of range. */ +                            frcti->rq[pos] != -1) /* Duplicate in rq. */ +                                goto drop_packet; +                        /* Queue. */ +                        frcti->rq[pos] = idx;                  } else { -                        frcti->rq[seqno & (RQ_SIZE - 1)] = idx; +                        rcv_cr->lwe = seqno;                  }          } -        rcv_cr->drf = false;          rcv_cr->act = now.tv_sec;          if (!(pci->flags & FRCT_DATA))  | 
