From 635ffc5c1c8da07f3f34218280d6a25a29c78bc7 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Tue, 5 Jun 2018 13:39:40 +0200 Subject: lib: Simplify delta-t logic This revises the delta-t implementation to align with Watson's timer specifications. FRCT will never deliver out-of-order packets. A raw flow (without delta-t state machine) will be able to provide such a service. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- doc/man/fccntl.3 | 2 - include/ouroboros/fccntl.h | 3 +- src/lib/frct.c | 109 +++++++++++++++++++++++---------------------- 3 files changed, 56 insertions(+), 58 deletions(-) diff --git a/doc/man/fccntl.3 b/doc/man/fccntl.3 index bec506ec..543a6bb6 100644 --- a/doc/man/fccntl.3 +++ b/doc/man/fccntl.3 @@ -83,8 +83,6 @@ argument. Supported flags are: \fIFRCTFERRCHCK\fR - enable checksum (CRC32). -\fIFRCTFORDERING\fR - enable packet in-order delivery. - \fIFRCTFPARTIAL\fR - enable partial delivery. .RE diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h index 5c3fc064..bc7c7206 100644 --- a/include/ouroboros/fccntl.h +++ b/include/ouroboros/fccntl.h @@ -48,8 +48,7 @@ #define FRCTFRESCNTRL 00000001 /* Feedback from receiver */ #define FRCTFRTX 00000002 /* Reliable flow */ #define FRCTFERRCHCK 00000004 /* Check for errors */ -#define FRCTFORDERING 00000010 /* Ordered delivery */ -#define FRCTFPARTIAL 00000020 /* Allow partial delivery */ +#define FRCTFPARTIAL 00000010 /* Allow partial delivery */ /* Flow operations */ #define FLOWSRCVTIMEO 00000001 /* Set read timeout */ diff --git a/src/lib/frct.c b/src/lib/frct.c index 2eb79fb4..395fa2d8 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -56,7 +56,7 @@ struct frcti { struct frct_cr snd_cr; struct frct_cr rcv_cr; - size_t rq[RQ_SIZE]; + ssize_t rq[RQ_SIZE]; struct timespec rtt; @@ -107,20 +107,25 @@ static void frct_fini(void) static struct frcti * frcti_create(int fd) { - struct frcti * frcti; - time_t delta_t; - ssize_t idx; + struct frcti * frcti; + time_t delta_t; + ssize_t idx; + struct timespec now; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) goto fail_malloc; + memset(frcti, 0, sizeof(*frcti)); + if (pthread_rwlock_init(&frcti->lock, NULL)) goto fail_lock; for (idx = 0; idx < RQ_SIZE; ++idx) frcti->rq[idx] = -1; + clock_gettime(CLOCK_REALTIME_COARSE, &now); + frcti->mpl = DELT_MPL; frcti->a = DELT_A; frcti->r = DELT_R; @@ -128,23 +133,12 @@ static struct frcti * frcti_create(int fd) delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; - frcti->snd_cr.drf = true; frcti->snd_cr.conf = true; -#ifdef OUROBOROS_CONFIG_DEBUG - frcti->snd_cr.seqno = 0; -#else - random_buffer(&frcti->snd_cr.seqno, sizeof(frcti->snd_cr.seqno)); -#endif - frcti->snd_cr.lwe = 0; - frcti->snd_cr.rwe = 0; - frcti->snd_cr.cflags = 0; frcti->snd_cr.inact = 3 * delta_t + 1; + frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - frcti->rcv_cr.drf = true; - frcti->rcv_cr.lwe = 0; - frcti->rcv_cr.rwe = 0; - frcti->rcv_cr.cflags = 0; frcti->rcv_cr.inact = 2 * delta_t + 1; + frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); return frcti; @@ -226,10 +220,8 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) sdb = shm_rdrbuff_get(ai.rdrb, idx); pci = (struct frct_pci *) shm_du_buff_head(sdb) - 1; - if (pci->flags & FRCT_CFG) { - assert(pci->flags & FRCT_DRF); + if (pci->flags & FRCT_CFG) frcti->rcv_cr.cflags = pci->cflags; - } ++frcti->rcv_cr.lwe; frcti->rq[pos] = -1; @@ -286,23 +278,8 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_wrlock(&frcti->lock); - /* Check if sender is inactive. */ - if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact) - snd_cr->drf = true; - pci->flags |= FRCT_DATA; - /* Set the DRF in the first packet of a new run of SDUs. */ - if (snd_cr->drf) { - pci->flags |= FRCT_DRF; - if (snd_cr->conf) { - pci->flags |= FRCT_CFG; - pci->cflags = snd_cr->cflags; - } - } - - pci->seqno = hton32(snd_cr->seqno++); - if (snd_cr->cflags & FRCTFERRCHCK) { uint8_t * tail = shm_du_buff_tail_alloc(sdb, FRCT_CRCLEN); if (tail == NULL) { @@ -315,9 +292,33 @@ static int __frcti_snd(struct frcti * frcti, pci->flags |= FRCT_CRC; } - snd_cr->act = now.tv_sec; + /* Set DRF if there are no unacknowledged packets. */ + if (snd_cr->seqno == snd_cr->lwe) + pci->flags |= FRCT_DRF; + + if (snd_cr->conf) { + /* FIXME: This packet must be acked! */ + pci->flags |= FRCT_CFG; + pci->cflags = snd_cr->cflags; + } + + /* Choose a new sequence number if sender inactivity expired. */ + if (now.tv_sec - snd_cr->act > snd_cr->inact) { + /* There are no unacknowledged packets. */ + assert(snd_cr->seqno == snd_cr->lwe); +#ifdef OUROBOROS_CONFIG_DEBUG + frcti->snd_cr.seqno = 0; +#else + random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); +#endif + frcti->snd_cr.lwe = frcti->snd_cr.seqno; + } + + pci->seqno = hton32(snd_cr->seqno++); + if (!(snd_cr->cflags & FRCTFRTX)) + snd_cr->lwe++; - snd_cr->drf = false; + snd_cr->act = now.tv_sec; snd_cr->conf = false; pthread_rwlock_unlock(&frcti->lock); @@ -354,13 +355,10 @@ static int __frcti_rcv(struct frcti * frcti, goto drop_packet; } - /* Check if receiver inactivity is true. */ - if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact) - rcv_cr->drf = true; - seqno = ntoh32(pci->seqno); - if (rcv_cr->drf) { + /* Check if receiver inactivity is true. */ + if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { /* Inactive receiver, check for DRF. */ if (pci->flags & FRCT_DRF) /* New run. */ rcv_cr->lwe = seqno; @@ -368,24 +366,27 @@ static int __frcti_rcv(struct frcti * frcti, goto drop_packet; } - /* Queue the PDU if needed. */ - if (rcv_cr->cflags & FRCTFORDERING) { - if (seqno < rcv_cr->lwe || seqno > rcv_cr->lwe + RQ_SIZE) + if (seqno == rcv_cr->lwe) { + ++rcv_cr->lwe; + /* Check for online reconfiguration. */ + if (pci->flags & FRCT_CFG) + rcv_cr->cflags = pci->cflags; + } else { /* Out of order. */ + if (seqno < rcv_cr->lwe) /* Duplicate. */ goto drop_packet; - if (seqno == rcv_cr->lwe) { - ++rcv_cr->lwe; - /* Check for online reconfiguration. */ - if (pci->flags & FRCT_CFG) { - assert(pci->flags & FRCT_DRF); - rcv_cr->cflags = pci->cflags; - } + if (rcv_cr->cflags & FRCTFRTX) { + size_t pos = seqno & (RQ_SIZE - 1); + if (seqno > rcv_cr->lwe + RQ_SIZE || /* Out of range. */ + frcti->rq[pos] != -1) /* Duplicate in rq. */ + goto drop_packet; + /* Queue. */ + frcti->rq[pos] = idx; } else { - frcti->rq[seqno & (RQ_SIZE - 1)] = idx; + rcv_cr->lwe = seqno; } } - rcv_cr->drf = false; rcv_cr->act = now.tv_sec; if (!(pci->flags & FRCT_DATA)) -- cgit v1.2.3