diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/lib/dev.c | 70 | ||||
| -rw-r--r-- | src/lib/rq.c | 157 | ||||
| -rw-r--r-- | src/lib/tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/lib/tests/rq_test.c | 115 | 
5 files changed, 331 insertions, 13 deletions
| diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 5a09c52e..fd7ece83 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -163,6 +163,7 @@ set(SOURCE_FILES    qos.c    qoscube.c    random.c +  rq.c    sha3.c    shm_flow_set.c    shm_rbuff.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 47fec48d..14ee31f4 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -39,6 +39,7 @@  #include <ouroboros/qoscube.h>  #include <ouroboros/timerwheel.h>  #include <ouroboros/frct_pci.h> +#include <ouroboros/rq.h>  #include <stdlib.h>  #include <string.h> @@ -50,7 +51,8 @@  #define TW_ELEMENTS   6000  #define TW_RESOLUTION 1   /* ms */ -#define MPL 2000 /* ms */ +#define MPL            2000 /* ms */ +#define RQ_SIZE        20  #ifndef CLOCK_REALTIME_COARSE  #define CLOCK_REALTIME_COARSE CLOCK_REALTIME @@ -89,6 +91,8 @@ struct frcti {          uint16_t         conf_flags; +        struct rq *      rq; +          pthread_rwlock_t lock;  }; @@ -269,6 +273,12 @@ static int frcti_init(int fd)          frcti->rcv_lwe = 0;          frcti->rcv_rwe = 0; +        frcti->conf_flags = 0; + +        frcti->rq = rq_create(RQ_SIZE); +        if (frcti->rq == NULL) +                return -1; +          return 0;  } @@ -285,6 +295,8 @@ static void frcti_fini(int fd)           */          frcti_clear(fd); + +        rq_destroy(ai.frcti[fd].rq);  }  static int frcti_send(int                  fd, @@ -382,6 +394,25 @@ static ssize_t frcti_read(int fd)          struct frcti *       frcti;          struct frct_pci      pci;          struct shm_du_buff * sdb; +        uint64_t             seqno; +        bool                 nxt_pdu = true; + +        frcti = &(ai.frcti[fd]); + +        /* See if we already have the next PDU */ +        pthread_rwlock_wrlock(&frcti->lock); + +        if (!rq_is_empty(frcti->rq)) { +                seqno = rq_peek(frcti->rq); +                if (seqno == frcti->rcv_lwe) { +                        frcti->rcv_lwe++; +                        idx = rq_pop(frcti->rq); +                        pthread_rwlock_unlock(&frcti->lock); +                        return idx; +                } +        } + +        pthread_rwlock_unlock(&frcti->lock);          do {                  struct timespec    now; @@ -390,7 +421,7 @@ static ssize_t frcti_read(int fd)                  struct shm_rbuff * rb;                  bool               noblock; -                clock_gettime(PTHREAD_COND_CLOCK, &now); +                clock_gettime(CLOCK_REALTIME_COARSE, &now);                  pthread_rwlock_rdlock(&ai.lock); @@ -404,18 +435,16 @@ static ssize_t frcti_read(int fd)                  pthread_rwlock_unlock(&ai.lock); -                if (noblock) +                if (noblock) {                          idx = shm_rbuff_read(rb); -                else +                } else {                          idx = shm_rbuff_read_b(rb, abstime); +                        clock_gettime(CLOCK_REALTIME_COARSE, &now); +                }                  if (idx < 0)                          return idx; -                clock_gettime(CLOCK_REALTIME_COARSE, &now); - -                frcti = &(ai.frcti[fd]); -                  sdb = shm_rdrbuff_get(ai.rdrb, idx);                  pthread_rwlock_wrlock(&frcti->lock); @@ -432,10 +461,11 @@ static ssize_t frcti_read(int fd)                      ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)                          frcti->rcv_drf = true; -                /* We don't accept packets when there is receiver inactivity. */ +                /* When there is receiver inactivity queue the packet. */                  if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { +                        if (rq_push(frcti->rq, pci.seqno, idx)) +                                shm_rdrbuff_remove(ai.rdrb, idx);                          pthread_rwlock_unlock(&frcti->lock); -                        shm_rdrbuff_remove(ai.rdrb, idx);                          return -EAGAIN;                  } @@ -451,12 +481,26 @@ static ssize_t frcti_read(int fd)                  frcti->last_rcv = now; -                pthread_rwlock_unlock(&frcti->lock); +                nxt_pdu = true; -                if (!(pci.type & PDU_TYPE_DATA)) +                if (!(pci.type & PDU_TYPE_DATA)) {                          shm_rdrbuff_remove(ai.rdrb, idx); +                        nxt_pdu = false; +                } + +                if (frcti->conf_flags & FRCTFORDERING) { +                        if (pci.seqno != frcti->rcv_lwe) { +                                if (rq_push(frcti->rq, pci.seqno, idx)) +                                        shm_rdrbuff_remove(ai.rdrb, idx); +                                nxt_pdu = false; +                        } else { +                                frcti->rcv_lwe++; +                        } +                } + +                pthread_rwlock_unlock(&frcti->lock); -        } while (!(pci.type & PDU_TYPE_DATA)); +        } while (!nxt_pdu);          return idx;  } diff --git a/src/lib/rq.c b/src/lib/rq.c new file mode 100644 index 00000000..bd0594b5 --- /dev/null +++ b/src/lib/rq.c @@ -0,0 +1,157 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Reordering queue + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/rq.h> + +#include <assert.h> + +struct pdu { +        uint64_t seqno; +        size_t   idx; +}; + +struct rq { +        struct pdu * items; +        int          n_items; +        int          size; +}; + +struct rq * rq_create(int size) +{ +        struct rq * rq; + +        rq = malloc(sizeof(*rq)); +        if (rq == NULL) +                return NULL; + +        rq->items = malloc(sizeof(struct pdu) * (size + 1)); +        if (rq->items == NULL) { +                free(rq); +                return NULL; +        } + +        rq->size = size; +        rq->n_items = 0; + +        return rq; +} + +void rq_destroy(struct rq * rq) +{ +        assert(rq); + +        free(rq->items); +        free(rq); +} + +int rq_push(struct rq * rq, +            uint64_t    seqno, +            size_t      idx) +{ +        int i; +        int j; + +        assert(rq); + +        /* Queue is full. */ +        if (rq->n_items == rq->size) +                return -1; + +        i = ++rq->n_items; +        j = i / 2; +        while (i > 1 && rq->items[j].seqno > seqno) { +                rq->items[i] = rq->items[j]; +                i = j; +                j = j / 2; +        } + +        rq->items[i].seqno = seqno; +        rq->items[i].idx = idx; + +        return 0; +} + +uint64_t rq_peek(struct rq * rq) +{ +        assert(rq); + +        return rq->items[1].seqno; +} + +bool rq_is_empty(struct rq * rq) +{ +        assert(rq); + +        return (rq->n_items == 0); +} + +size_t rq_pop(struct rq * rq) +{ +        size_t idx; +        int    i; +        int    j; +        int    k; + +        assert(rq); + +        idx = rq->items[1].idx; + +        rq->items[1] = rq->items[rq->n_items]; +        rq->n_items--; + +        i = 1; +        while (true) { +                k = i; +                j = 2 * i; + +                if (j <= rq->n_items && rq->items[j].seqno < rq->items[k].seqno) +                        k = j; + +                if (j + 1 <= rq->n_items && +                    rq->items[j + 1].seqno < rq->items[k].seqno) +                        k = j + 1; + +                if (k == i) +                        break; + +                rq->items[i] = rq->items[k]; +                i = k; +        } + +        rq->items[i] = rq->items[rq->n_items + 1]; + +        return idx; +} + +bool rq_has(struct rq * rq, +            uint64_t    seqno) +{ +        int i; + +        assert(rq); + +        for (i = 1; i <= rq->n_items; i++) +                if (rq->items[i].seqno == seqno) +                        return true; + +        return false; +} diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index a93bf321..0edd4a42 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -14,6 +14,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c    crc32_test.c    hashtable_test.c    md5_test.c +  rq_test.c    sha3_test.c    time_utils_test.c    ${TIMERWHEEL_TEST} diff --git a/src/lib/tests/rq_test.c b/src/lib/tests/rq_test.c new file mode 100644 index 00000000..e2d0f435 --- /dev/null +++ b/src/lib/tests/rq_test.c @@ -0,0 +1,115 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Reordering queue test + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/rq.h> + +#include <stdio.h> + +#define Q_SIZE 5 + +int rq_test(int     argc, +            char ** argv) +{ +        struct rq * q; +        int         i; + +        (void) argc; +        (void) argv; + +        q = rq_create(Q_SIZE); +        if (q == NULL) { +                printf("Failed to create.\n"); +                return -1; +        } + +        if (rq_push(q, 1, 1)) { +                printf("Failed to insert.\n"); +                return -1; +        } + +        if (!rq_has(q, 1)) { +                printf("Inserted item not present.\n"); +                return -1; +        } + +        if (rq_peek(q) != 1) { +                printf("Inserted item not present.\n"); +                return -1; +        } + +        if (rq_pop(q) != 1) { +                printf("Bad pop.\n"); +                return -1; +        } + +        if (rq_push(q, 3, 5)) { +                printf("Failed to insert.\n"); +                return -1; +        } + +        if (rq_push(q, 1, 3)) { +                printf("Failed to insert.\n"); +                return -1; +        } + +        if (rq_push(q, 2, 7)) { +                printf("Failed to insert.\n"); +                return -1; +        } + +        if (!rq_has(q, 3)) { +                printf("Inserted item not present.\n"); +                return -1; +        } + +        if (rq_has(q, 4)) { +                printf("Item present that was not inserted.\n"); +                return -1; +        } + +        if (rq_peek(q) != 1) { +                printf("Inserted item not present.\n"); +                return -1; +        } + +        if (rq_pop(q) != 3) { +                printf("Bad pop.\n"); +                return -1; +        } + +        if (rq_peek(q) != 2) { +                printf("Inserted item not present.\n"); +                return -1; +        } + +        if (rq_pop(q) != 7) { +                printf("Bad pop.\n"); +                return -1; +        } + +        for (i = 0; i < Q_SIZE + 1; i++) +                rq_push(q, i, i); + +        rq_destroy(q); + +        return 0; +} | 
