From 26d4a6072cbf59708071dac8393c88ddacd69a37 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 14 Sep 2017 13:43:09 +0200 Subject: lib: Add reordering queue to FRCT This adds a reordering queue to FRCT so that SDUs can be delivered in-order when requested. --- src/lib/dev.c | 70 ++++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 13 deletions(-) (limited to 'src/lib/dev.c') diff --git a/src/lib/dev.c b/src/lib/dev.c index 47fec48d..14ee31f4 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -50,7 +51,8 @@ #define TW_ELEMENTS 6000 #define TW_RESOLUTION 1 /* ms */ -#define MPL 2000 /* ms */ +#define MPL 2000 /* ms */ +#define RQ_SIZE 20 #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME @@ -89,6 +91,8 @@ struct frcti { uint16_t conf_flags; + struct rq * rq; + pthread_rwlock_t lock; }; @@ -269,6 +273,12 @@ static int frcti_init(int fd) frcti->rcv_lwe = 0; frcti->rcv_rwe = 0; + frcti->conf_flags = 0; + + frcti->rq = rq_create(RQ_SIZE); + if (frcti->rq == NULL) + return -1; + return 0; } @@ -285,6 +295,8 @@ static void frcti_fini(int fd) */ frcti_clear(fd); + + rq_destroy(ai.frcti[fd].rq); } static int frcti_send(int fd, @@ -382,6 +394,25 @@ static ssize_t frcti_read(int fd) struct frcti * frcti; struct frct_pci pci; struct shm_du_buff * sdb; + uint64_t seqno; + bool nxt_pdu = true; + + frcti = &(ai.frcti[fd]); + + /* See if we already have the next PDU */ + pthread_rwlock_wrlock(&frcti->lock); + + if (!rq_is_empty(frcti->rq)) { + seqno = rq_peek(frcti->rq); + if (seqno == frcti->rcv_lwe) { + frcti->rcv_lwe++; + idx = rq_pop(frcti->rq); + pthread_rwlock_unlock(&frcti->lock); + return idx; + } + } + + pthread_rwlock_unlock(&frcti->lock); do { struct timespec now; @@ -390,7 +421,7 @@ static ssize_t frcti_read(int fd) struct shm_rbuff * rb; bool noblock; - clock_gettime(PTHREAD_COND_CLOCK, &now); + clock_gettime(CLOCK_REALTIME_COARSE, &now); pthread_rwlock_rdlock(&ai.lock); @@ -404,18 +435,16 @@ static ssize_t frcti_read(int fd) pthread_rwlock_unlock(&ai.lock); - if (noblock) + if (noblock) { idx = shm_rbuff_read(rb); - else + } else { idx = shm_rbuff_read_b(rb, abstime); + clock_gettime(CLOCK_REALTIME_COARSE, &now); + } if (idx < 0) return idx; - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - frcti = &(ai.frcti[fd]); - sdb = shm_rdrbuff_get(ai.rdrb, idx); pthread_rwlock_wrlock(&frcti->lock); @@ -432,10 +461,11 @@ static ssize_t frcti_read(int fd) ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL) frcti->rcv_drf = true; - /* We don't accept packets when there is receiver inactivity. */ + /* When there is receiver inactivity queue the packet. */ if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { + if (rq_push(frcti->rq, pci.seqno, idx)) + shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&frcti->lock); - shm_rdrbuff_remove(ai.rdrb, idx); return -EAGAIN; } @@ -451,12 +481,26 @@ static ssize_t frcti_read(int fd) frcti->last_rcv = now; - pthread_rwlock_unlock(&frcti->lock); + nxt_pdu = true; - if (!(pci.type & PDU_TYPE_DATA)) + if (!(pci.type & PDU_TYPE_DATA)) { shm_rdrbuff_remove(ai.rdrb, idx); + nxt_pdu = false; + } + + if (frcti->conf_flags & FRCTFORDERING) { + if (pci.seqno != frcti->rcv_lwe) { + if (rq_push(frcti->rq, pci.seqno, idx)) + shm_rdrbuff_remove(ai.rdrb, idx); + nxt_pdu = false; + } else { + frcti->rcv_lwe++; + } + } + + pthread_rwlock_unlock(&frcti->lock); - } while (!(pci.type & PDU_TYPE_DATA)); + } while (!nxt_pdu); return idx; } -- cgit v1.2.3