From 26d4a6072cbf59708071dac8393c88ddacd69a37 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 14 Sep 2017 13:43:09 +0200 Subject: lib: Add reordering queue to FRCT This adds a reordering queue to FRCT so that SDUs can be delivered in-order when requested. --- include/ouroboros/rq.h | 47 +++++++++++++ src/lib/CMakeLists.txt | 1 + src/lib/dev.c | 70 +++++++++++++++---- src/lib/rq.c | 157 +++++++++++++++++++++++++++++++++++++++++++ src/lib/tests/CMakeLists.txt | 1 + src/lib/tests/rq_test.c | 115 +++++++++++++++++++++++++++++++ 6 files changed, 378 insertions(+), 13 deletions(-) create mode 100644 include/ouroboros/rq.h create mode 100644 src/lib/rq.c create mode 100644 src/lib/tests/rq_test.c diff --git a/include/ouroboros/rq.h b/include/ouroboros/rq.h new file mode 100644 index 00000000..7c024c11 --- /dev/null +++ b/include/ouroboros/rq.h @@ -0,0 +1,47 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Reordering queue + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_RQ_H +#define OUROBOROS_LIB_RQ_H + +#include +#include +#include + +struct rq * rq_create(int size); + +void rq_destroy(struct rq * rq); + +int rq_push(struct rq * rq, + uint64_t seqno, + size_t idx); + +uint64_t rq_peek(struct rq * rq); + +bool rq_is_empty(struct rq * rq); + +size_t rq_pop(struct rq * rq); + +bool rq_has(struct rq * rq, + uint64_t seqno); + +#endif /* OUROBOROS_LIB_RQ_H */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 5a09c52e..fd7ece83 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -163,6 +163,7 @@ set(SOURCE_FILES qos.c qoscube.c random.c + rq.c sha3.c shm_flow_set.c shm_rbuff.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 47fec48d..14ee31f4 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -50,7 +51,8 @@ #define TW_ELEMENTS 6000 #define TW_RESOLUTION 1 /* ms */ -#define MPL 2000 /* ms */ +#define MPL 2000 /* ms */ +#define RQ_SIZE 20 #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME @@ -89,6 +91,8 @@ struct frcti { uint16_t conf_flags; + struct rq * rq; + pthread_rwlock_t lock; }; @@ -269,6 +273,12 @@ static int frcti_init(int fd) frcti->rcv_lwe = 0; frcti->rcv_rwe = 0; + frcti->conf_flags = 0; + + frcti->rq = rq_create(RQ_SIZE); + if (frcti->rq == NULL) + return -1; + return 0; } @@ -285,6 +295,8 @@ static void frcti_fini(int fd) */ frcti_clear(fd); + + rq_destroy(ai.frcti[fd].rq); } static int frcti_send(int fd, @@ -382,6 +394,25 @@ static ssize_t frcti_read(int fd) struct frcti * frcti; struct frct_pci pci; struct shm_du_buff * sdb; + uint64_t seqno; + bool nxt_pdu = true; + + frcti = &(ai.frcti[fd]); + + /* See if we already have the next PDU */ + pthread_rwlock_wrlock(&frcti->lock); + + if (!rq_is_empty(frcti->rq)) { + seqno = rq_peek(frcti->rq); + if (seqno == frcti->rcv_lwe) { + frcti->rcv_lwe++; + idx = rq_pop(frcti->rq); + pthread_rwlock_unlock(&frcti->lock); + return idx; + } + } + + pthread_rwlock_unlock(&frcti->lock); do { struct timespec now; @@ -390,7 +421,7 @@ static ssize_t frcti_read(int fd) struct shm_rbuff * rb; bool noblock; - clock_gettime(PTHREAD_COND_CLOCK, &now); + clock_gettime(CLOCK_REALTIME_COARSE, &now); pthread_rwlock_rdlock(&ai.lock); @@ -404,18 +435,16 @@ static ssize_t frcti_read(int fd) pthread_rwlock_unlock(&ai.lock); - if (noblock) + if (noblock) { idx = shm_rbuff_read(rb); - else + } else { idx = shm_rbuff_read_b(rb, abstime); + clock_gettime(CLOCK_REALTIME_COARSE, &now); + } if (idx < 0) return idx; - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - frcti = &(ai.frcti[fd]); - sdb = shm_rdrbuff_get(ai.rdrb, idx); pthread_rwlock_wrlock(&frcti->lock); @@ -432,10 +461,11 @@ static ssize_t frcti_read(int fd) ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL) frcti->rcv_drf = true; - /* We don't accept packets when there is receiver inactivity. */ + /* When there is receiver inactivity queue the packet. */ if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { + if (rq_push(frcti->rq, pci.seqno, idx)) + shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&frcti->lock); - shm_rdrbuff_remove(ai.rdrb, idx); return -EAGAIN; } @@ -451,12 +481,26 @@ static ssize_t frcti_read(int fd) frcti->last_rcv = now; - pthread_rwlock_unlock(&frcti->lock); + nxt_pdu = true; - if (!(pci.type & PDU_TYPE_DATA)) + if (!(pci.type & PDU_TYPE_DATA)) { shm_rdrbuff_remove(ai.rdrb, idx); + nxt_pdu = false; + } + + if (frcti->conf_flags & FRCTFORDERING) { + if (pci.seqno != frcti->rcv_lwe) { + if (rq_push(frcti->rq, pci.seqno, idx)) + shm_rdrbuff_remove(ai.rdrb, idx); + nxt_pdu = false; + } else { + frcti->rcv_lwe++; + } + } + + pthread_rwlock_unlock(&frcti->lock); - } while (!(pci.type & PDU_TYPE_DATA)); + } while (!nxt_pdu); return idx; } diff --git a/src/lib/rq.c b/src/lib/rq.c new file mode 100644 index 00000000..bd0594b5 --- /dev/null +++ b/src/lib/rq.c @@ -0,0 +1,157 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Reordering queue + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include + +#include + +struct pdu { + uint64_t seqno; + size_t idx; +}; + +struct rq { + struct pdu * items; + int n_items; + int size; +}; + +struct rq * rq_create(int size) +{ + struct rq * rq; + + rq = malloc(sizeof(*rq)); + if (rq == NULL) + return NULL; + + rq->items = malloc(sizeof(struct pdu) * (size + 1)); + if (rq->items == NULL) { + free(rq); + return NULL; + } + + rq->size = size; + rq->n_items = 0; + + return rq; +} + +void rq_destroy(struct rq * rq) +{ + assert(rq); + + free(rq->items); + free(rq); +} + +int rq_push(struct rq * rq, + uint64_t seqno, + size_t idx) +{ + int i; + int j; + + assert(rq); + + /* Queue is full. */ + if (rq->n_items == rq->size) + return -1; + + i = ++rq->n_items; + j = i / 2; + while (i > 1 && rq->items[j].seqno > seqno) { + rq->items[i] = rq->items[j]; + i = j; + j = j / 2; + } + + rq->items[i].seqno = seqno; + rq->items[i].idx = idx; + + return 0; +} + +uint64_t rq_peek(struct rq * rq) +{ + assert(rq); + + return rq->items[1].seqno; +} + +bool rq_is_empty(struct rq * rq) +{ + assert(rq); + + return (rq->n_items == 0); +} + +size_t rq_pop(struct rq * rq) +{ + size_t idx; + int i; + int j; + int k; + + assert(rq); + + idx = rq->items[1].idx; + + rq->items[1] = rq->items[rq->n_items]; + rq->n_items--; + + i = 1; + while (true) { + k = i; + j = 2 * i; + + if (j <= rq->n_items && rq->items[j].seqno < rq->items[k].seqno) + k = j; + + if (j + 1 <= rq->n_items && + rq->items[j + 1].seqno < rq->items[k].seqno) + k = j + 1; + + if (k == i) + break; + + rq->items[i] = rq->items[k]; + i = k; + } + + rq->items[i] = rq->items[rq->n_items + 1]; + + return idx; +} + +bool rq_has(struct rq * rq, + uint64_t seqno) +{ + int i; + + assert(rq); + + for (i = 1; i <= rq->n_items; i++) + if (rq->items[i].seqno == seqno) + return true; + + return false; +} diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index a93bf321..0edd4a42 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -14,6 +14,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c crc32_test.c hashtable_test.c md5_test.c + rq_test.c sha3_test.c time_utils_test.c ${TIMERWHEEL_TEST} diff --git a/src/lib/tests/rq_test.c b/src/lib/tests/rq_test.c new file mode 100644 index 00000000..e2d0f435 --- /dev/null +++ b/src/lib/tests/rq_test.c @@ -0,0 +1,115 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Reordering queue test + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include + +#include + +#define Q_SIZE 5 + +int rq_test(int argc, + char ** argv) +{ + struct rq * q; + int i; + + (void) argc; + (void) argv; + + q = rq_create(Q_SIZE); + if (q == NULL) { + printf("Failed to create.\n"); + return -1; + } + + if (rq_push(q, 1, 1)) { + printf("Failed to insert.\n"); + return -1; + } + + if (!rq_has(q, 1)) { + printf("Inserted item not present.\n"); + return -1; + } + + if (rq_peek(q) != 1) { + printf("Inserted item not present.\n"); + return -1; + } + + if (rq_pop(q) != 1) { + printf("Bad pop.\n"); + return -1; + } + + if (rq_push(q, 3, 5)) { + printf("Failed to insert.\n"); + return -1; + } + + if (rq_push(q, 1, 3)) { + printf("Failed to insert.\n"); + return -1; + } + + if (rq_push(q, 2, 7)) { + printf("Failed to insert.\n"); + return -1; + } + + if (!rq_has(q, 3)) { + printf("Inserted item not present.\n"); + return -1; + } + + if (rq_has(q, 4)) { + printf("Item present that was not inserted.\n"); + return -1; + } + + if (rq_peek(q) != 1) { + printf("Inserted item not present.\n"); + return -1; + } + + if (rq_pop(q) != 3) { + printf("Bad pop.\n"); + return -1; + } + + if (rq_peek(q) != 2) { + printf("Inserted item not present.\n"); + return -1; + } + + if (rq_pop(q) != 7) { + printf("Bad pop.\n"); + return -1; + } + + for (i = 0; i < Q_SIZE + 1; i++) + rq_push(q, i, i); + + rq_destroy(q); + + return 0; +} -- cgit v1.2.3