diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/lib/dev.c | 440 | ||||
| -rw-r--r-- | src/lib/frct_pci.c | 105 | ||||
| -rw-r--r-- | src/lib/hash.c | 40 | ||||
| -rw-r--r-- | src/lib/shm_flow_set.c | 11 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_ll.c | 12 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_pthr.c | 12 | ||||
| -rw-r--r-- | src/lib/tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/lib/tests/timerwheel_test.c | 106 | ||||
| -rw-r--r-- | src/lib/timerwheel.c | 371 | 
10 files changed, 988 insertions, 112 deletions
| diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 550bbc08..728d975a 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -86,6 +86,7 @@ set(SOURCE_FILES    cdap_req.c    crc32.c    dev.c +  frct_pci.c    hash.c    hashtable.c    irm.c @@ -104,6 +105,7 @@ set(SOURCE_FILES    shm_rdrbuff.c    sockets.c    time_utils.c +  timerwheel.c    tpm.c    utils.c    ) diff --git a/src/lib/dev.c b/src/lib/dev.c index 9354855b..e81bf105 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -34,6 +34,8 @@  #include <ouroboros/utils.h>  #include <ouroboros/fqueue.h>  #include <ouroboros/qoscube.h> +#include <ouroboros/timerwheel.h> +#include <ouroboros/frct_pci.h>  #include <stdlib.h>  #include <string.h> @@ -41,8 +43,14 @@  #define BUF_SIZE 1500 +#define TW_ELEMENTS   6000 +#define TW_RESOLUTION 1   /* ms */ + +#define MPL 2000 /* ms */ +  struct flow_set {          size_t idx; +        bool   np1_set;  };  struct fqueue { @@ -59,6 +67,26 @@ enum port_state {          PORT_DESTROY  }; +struct frcti { +        bool          used; + +        struct tw_f * snd_inact; +        bool          snd_drf; +        uint64_t      snd_lwe; +        uint64_t      snd_rwe; + +        struct tw_f * rcv_inact; +        bool          rcv_drf; +        uint64_t      rcv_lwe; +        uint64_t      rcv_rwe; + +        bool          resource_control; +        bool          reliable; +        bool          error_check; +        bool          ordered; +        bool          partial; +}; +  struct port {          int             fd; @@ -89,10 +117,14 @@ struct {          struct shm_rdrbuff *  rdrb;          struct shm_flow_set * fqset; +        struct timerwheel *   tw; +        int                   tw_users; +          struct bmp *          fds;          struct bmp *          fqueues;          struct flow *         flows;          struct port *         ports; +        struct frcti *        frcti;          pthread_rwlock_t      lock;  } ai; @@ -203,6 +235,242 @@ static int api_announce(char * ap_name)          return ret;  } +/* Call under flows lock */ +static int finalize_write(int    fd, +                          size_t idx) +{ +        if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) +                return -ENOTALLOC; + +        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + +        return 0; +} + +static int frcti_init(int fd) +{ +        ai.frcti[fd].used = true; + +        ai.frcti[fd].snd_drf = true; +        ai.frcti[fd].snd_lwe = 0; +        ai.frcti[fd].snd_rwe = 0; + +        ai.frcti[fd].rcv_drf = true; +        ai.frcti[fd].rcv_lwe = 0; +        ai.frcti[fd].rcv_rwe = 0; + +        return 0; +} + +static void frcti_fini(int fd) +{ +        struct frcti * frcti; + +        frcti = &(ai.frcti[fd]); + +        frcti->used = false; + +        /* FIXME: We actually need to wait until these timers become NULL. */ +        if (frcti->snd_inact != NULL) +                timerwheel_stop(ai.tw, frcti->snd_inact); + +        if (frcti->rcv_inact != NULL) +                timerwheel_stop(ai.tw, frcti->rcv_inact); +} + +static int frcti_configure(int         fd, +                           qosspec_t * qos) +{ +        /* FIXME: Send configuration message here to other side. */ + +        (void) fd; +        (void) qos; + +        return 0; +} + +static void frcti_snd_inactivity(void * arg) +{ +        struct frcti *  frcti; + +        pthread_rwlock_wrlock(&ai.lock); + +        frcti = (struct frcti * ) arg; + +        frcti->snd_drf = true; +        frcti->snd_inact = NULL; + +        pthread_rwlock_unlock(&ai.lock); +} + +/* Called under flows lock */ +static int frcti_write(int                  fd, +                       struct shm_du_buff * sdb) +{ +        struct frcti *  frcti; +        struct frct_pci pci; + +        memset(&pci, 0, sizeof(pci)); + +        frcti = &(ai.frcti[fd]); + +        /* +         * Set the DRF in the first packet of a new run of SDUs, +         * otherwise simply recharge the timer. +         */ +        if (frcti->snd_drf) { +                frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity, +                                                    frcti, 2 * MPL); +                if (frcti->snd_inact == NULL) +                        return -1; + +                pci.flags |= FLAG_DATA_RUN; +                frcti->snd_drf = false; +        } else { +                if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL)) +                        return -1; +        } + +        pci.seqno = frcti->snd_lwe++; +        pci.type |= PDU_TYPE_DATA; + +        if (frct_pci_ser(sdb, &pci, frcti->error_check)) +                return -1; + +        if (finalize_write(fd, shm_du_buff_get_idx(sdb))) +                return -ENOTALLOC; + +        return 0; +} + +static void frcti_rcv_inactivity(void * arg) +{ +        struct frcti *  frcti; + +        pthread_rwlock_wrlock(&ai.lock); + +        frcti = (struct frcti * ) arg; + +        frcti->rcv_drf = true; +        frcti->rcv_inact = NULL; + +        pthread_rwlock_unlock(&ai.lock); +} + +static ssize_t frcti_read(int fd) +{ +        ssize_t              idx = -1; +        struct timespec      abstime; +        struct frcti *       frcti; +        struct frct_pci      pci; +        struct shm_du_buff * sdb; + +        pthread_rwlock_rdlock(&ai.lock); + +        if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { +                idx = shm_rbuff_read(ai.flows[fd].rx_rb); +                pthread_rwlock_unlock(&ai.lock); +        } else { +                struct shm_rbuff * rb   = ai.flows[fd].rx_rb; +                bool timeo = ai.flows[fd].timesout; +                struct timespec timeout = ai.flows[fd].rcv_timeo; + +                pthread_rwlock_unlock(&ai.lock); + +                if (timeo) { +                        clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                        ts_add(&abstime, &timeout, &abstime); +                        idx = shm_rbuff_read_b(rb, &abstime); +                } else { +                        idx = shm_rbuff_read_b(rb, NULL); +                } +        } + +        if (idx < 0) +                return idx; + +        pthread_rwlock_rdlock(&ai.lock); + +        frcti = &(ai.frcti[fd]); + +        sdb = shm_rdrbuff_get(ai.rdrb, idx); + +        /* SDU may be corrupted. */ +        if (frct_pci_des(sdb, &pci, frcti->error_check)) { +                pthread_rwlock_unlock(&ai.lock); +                shm_rdrbuff_remove(ai.rdrb, idx); +                return -1; +        } + +        /* We don't accept packets when there is no inactivity timer. */ +        if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { +                pthread_rwlock_unlock(&ai.lock); +                shm_rdrbuff_remove(ai.rdrb, idx); +                return -1; +        } + +        /* +         * If there is an inactivity timer and the DRF is set, +         * reset the state of the connection. +         */ +        if (pci.flags & FLAG_DATA_RUN) { +                frcti->rcv_drf = true; +                if (frcti->rcv_inact != NULL) +                        timerwheel_stop(ai.tw, frcti->rcv_inact); +                frcti->rcv_lwe = pci.seqno; +        } + +        /* +         * Start receiver inactivity if this packet has the DRF, +         * otherwise simply restart it. +         */ +        if (frcti->rcv_drf) { +                frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity, +                                                    frcti, 3 * MPL); +                if (frcti->rcv_inact == NULL) { +                        pthread_rwlock_unlock(&ai.lock); +                        shm_rdrbuff_remove(ai.rdrb, idx); +                        return -1; +                } + +                frcti->rcv_drf = false; +        } else { +                if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) { +                        pthread_rwlock_unlock(&ai.lock); +                        shm_rdrbuff_remove(ai.rdrb, idx); +                        return -1; +                } +        } + +        pthread_rwlock_unlock(&ai.lock); + +        return idx; +} + +static int frcti_event_wait(struct flow_set *       set, +                            struct fqueue *         fq, +                            const struct timespec * timeout) +{ +        int ret; + +        assert(set); +        assert(fq); +        assert(timeout); + +        /* +         * FIXME: Return the fq only if a data SDU +         * for the application is available. +         */ + +        ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); +        if (ret == -ETIMEDOUT) { +                fq->fqsize = 0; +                return -ETIMEDOUT; +        } + +        return ret; +} +  static void flow_clear(int fd)  {          assert(!(fd < 0)); @@ -230,6 +498,9 @@ static void flow_fini(int fd)          if (ai.flows[fd].set != NULL)                  shm_flow_set_close(ai.flows[fd].set); +        if (ai.frcti[fd].used) +                frcti_fini(fd); +          flow_clear(fd);  } @@ -316,8 +587,14 @@ int ouroboros_init(const char * ap_name)          if (ai.flows == NULL)                  goto fail_flows; -        for (i = 0; i < AP_MAX_FLOWS; ++i) +        ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); +        if (ai.frcti == NULL) +                goto fail_frcti; + +        for (i = 0; i < AP_MAX_FLOWS; ++i) {                  flow_clear(i); +                frcti_fini(i); +        }          ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);          if (ai.ports == NULL) @@ -353,8 +630,15 @@ int ouroboros_init(const char * ap_name)          if (pthread_rwlock_init(&ai.lock, NULL))                  goto fail_lock; +        ai.tw = timerwheel_create(TW_RESOLUTION, +                                  TW_RESOLUTION * TW_ELEMENTS); +        if (ai.tw == NULL) +                goto fail_timerwheel; +          return 0; + fail_timerwheel: +        pthread_rwlock_destroy(&ai.lock);   fail_lock:          for (i = 0; i < IRMD_MAX_FLOWS; ++i)                  pthread_cond_destroy(&ai.ports[i].state_cond); @@ -366,11 +650,13 @@ int ouroboros_init(const char * ap_name)   fail_ap_name:          free(ai.ports);   fail_ports: +        free(ai.frcti); + fail_frcti:          free(ai.flows);   fail_flows:          shm_rdrbuff_close(ai.rdrb);   fail_rdrb: -      shm_flow_set_destroy(ai.fqset); +        shm_flow_set_destroy(ai.fqset);   fail_fqset:          bmp_destroy(ai.fqueues);   fail_fqueues: @@ -409,6 +695,9 @@ void ouroboros_fini()          shm_rdrbuff_close(ai.rdrb); +        if (ai.tw != NULL) +                timerwheel_destroy(ai.tw); +          free(ai.flows);          free(ai.ports); @@ -463,9 +752,15 @@ int flow_accept(qosspec_t *             qs,          if (fd < 0)                  return fd; +        pthread_rwlock_wrlock(&ai.lock); + +        frcti_init(fd); +          if (qs != NULL)                  *qs = ai.flows[fd].spec; +        pthread_rwlock_unlock(&ai.lock); +          return fd;  } @@ -505,7 +800,7 @@ int flow_alloc(const char *            dst_name,                  return -EIRMD;          } -        if (recv_msg->result !=  0) { +        if (recv_msg->result != 0) {                  int res =  recv_msg->result;                  irm_msg__free_unpacked(recv_msg, NULL);                  return res; @@ -520,6 +815,22 @@ int flow_alloc(const char *            dst_name,          irm_msg__free_unpacked(recv_msg, NULL); +        if (fd < 0) +                return fd; + +        pthread_rwlock_wrlock(&ai.lock); + +        frcti_init(fd); + +        if (frcti_configure(fd, qs)) { +                flow_fini(fd); +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.lock); +                return -1; +        } + +        pthread_rwlock_unlock(&ai.lock); +          return fd;  } @@ -720,34 +1031,31 @@ ssize_t flow_write(int          fd,                          return idx;                  } -                if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { -                        shm_rdrbuff_remove(ai.rdrb, idx); -                        pthread_rwlock_unlock(&ai.lock); -                        return -ENOTALLOC; -                }          } else { /* blocking */ -                struct shm_rdrbuff * rdrb = ai.rdrb; -                struct shm_rbuff * tx_rb  = ai.flows[fd].tx_rb; -                  pthread_rwlock_unlock(&ai.lock); -                assert(tx_rb); - -                idx = shm_rdrbuff_write_b(rdrb, +                idx = shm_rdrbuff_write_b(ai.rdrb,                                            DU_BUFF_HEADSPACE,                                            DU_BUFF_TAILSPACE,                                            buf,                                            count); -                if (shm_rbuff_write(tx_rb, idx) < 0) { -                        shm_rdrbuff_remove(rdrb, idx); -                        return -ENOTALLOC; -                } -                  pthread_rwlock_rdlock(&ai.lock);          } -        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); +        if (!ai.frcti[fd].used) { +                if (finalize_write(fd, idx)) { +                        pthread_rwlock_unlock(&ai.lock); +                        shm_rdrbuff_remove(ai.rdrb, idx); +                        return -ENOTALLOC; +                } +        } else { +                if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { +                        pthread_rwlock_unlock(&ai.lock); +                        shm_rdrbuff_remove(ai.rdrb, idx); +                        return -1; +                } +        }          pthread_rwlock_unlock(&ai.lock); @@ -772,21 +1080,12 @@ ssize_t flow_read(int    fd,                  return -ENOTALLOC;          } -        if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_rbuff_read(ai.flows[fd].rx_rb); -                pthread_rwlock_unlock(&ai.lock); -        } else { -                struct shm_rbuff * rb   = ai.flows[fd].rx_rb; -                bool timeo = ai.flows[fd].timesout; -                struct timespec timeout = ai.flows[fd].rcv_timeo; - -                pthread_rwlock_unlock(&ai.lock); +        pthread_rwlock_unlock(&ai.lock); -                if (timeo) -                        idx = shm_rbuff_read_b(rb, &timeout); -                else -                        idx = shm_rbuff_read_b(rb, NULL); -        } +        if (!ai.frcti[fd].used) +                idx = shm_rbuff_read(ai.flows[fd].rx_rb); +        else +                idx = frcti_read(fd);          if (idx < 0) {                  assert(idx == -EAGAIN || idx == -ETIMEDOUT); @@ -823,6 +1122,8 @@ struct flow_set * flow_set_create()                  return NULL;          } +        set->np1_set = false; +          pthread_rwlock_unlock(&ai.lock);          return set; @@ -891,6 +1192,9 @@ int flow_set_add(struct flow_set * set,          for (i = 0; i < sdus; i++)                  shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); +        if (ai.frcti[fd].used) +                set->np1_set = true; +          pthread_rwlock_unlock(&ai.lock);          return ret; @@ -960,7 +1264,8 @@ int flow_event_wait(struct flow_set *       set,                      struct fqueue *         fq,                      const struct timespec * timeout)  { -        ssize_t ret; +        ssize_t         ret; +        struct timespec abstime;          if (set == NULL || fq == NULL)                  return -EINVAL; @@ -970,7 +1275,17 @@ int flow_event_wait(struct flow_set *       set,          assert(!fq->next); -        ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        if (set->np1_set) +                ret = frcti_event_wait(set, fq, &abstime); +        else +                ret = shm_flow_set_wait(ai.fqset, set->idx, +                                        fq->fqueue, &abstime); +          if (ret == -ETIMEDOUT) {                  fq->fqsize = 0;                  return -ETIMEDOUT; @@ -1132,9 +1447,8 @@ int ipcp_flow_read(int                   fd,  {          ssize_t idx = -1;          int port_id = -1; -        struct shm_rbuff * rb; -        assert(fd >=0); +        assert(fd >= 0);          assert(sdb);          pthread_rwlock_rdlock(&ai.lock); @@ -1144,11 +1458,13 @@ int ipcp_flow_read(int                   fd,                  return -ENOTALLOC;          } -        rb = ai.flows[fd].rx_rb; -          pthread_rwlock_unlock(&ai.lock); -        idx = shm_rbuff_read(rb); +        if (!ai.frcti[fd].used) +                idx = shm_rbuff_read(ai.flows[fd].rx_rb); +        else +                idx = frcti_read(fd); +          if (idx < 0)                  return idx; @@ -1160,8 +1476,6 @@ int ipcp_flow_read(int                   fd,  int ipcp_flow_write(int                  fd,                      struct shm_du_buff * sdb)  { -        size_t idx; -          if (sdb == NULL)                  return -EINVAL; @@ -1179,10 +1493,17 @@ int ipcp_flow_write(int                  fd,          assert(ai.flows[fd].tx_rb); -        idx = shm_du_buff_get_idx(sdb); - -        shm_rbuff_write(ai.flows[fd].tx_rb, idx); -        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); +        if (!ai.frcti[fd].used) { +                if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { +                        pthread_rwlock_unlock(&ai.lock); +                        return -ENOTALLOC; +                } +        } else { +                if (frcti_write(fd, sdb)) { +                        pthread_rwlock_unlock(&ai.lock); +                        return -1; +                } +        }          pthread_rwlock_unlock(&ai.lock); @@ -1274,32 +1595,11 @@ int local_flow_write(int    fd,                  return -ENOTALLOC;          } -        shm_rbuff_write(ai.flows[fd].tx_rb, idx); - -        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - -        pthread_rwlock_unlock(&ai.lock); - -        return 0; -} - -int ipcp_read_shim(int                   fd, -                   struct shm_du_buff ** sdb) -{ -        ssize_t idx; - -        pthread_rwlock_rdlock(&ai.lock); - -        assert(ai.flows[fd].rx_rb); - -        idx = shm_rbuff_read(ai.flows[fd].rx_rb); -        if (idx < 0) { +        if (finalize_write(fd, idx)) {                  pthread_rwlock_unlock(&ai.lock); -                return -EAGAIN; +                return -ENOTALLOC;          } -        *sdb = shm_rdrbuff_get(ai.rdrb, idx); -          pthread_rwlock_unlock(&ai.lock);          return 0; diff --git a/src/lib/frct_pci.c b/src/lib/frct_pci.c new file mode 100644 index 00000000..92cf8cd9 --- /dev/null +++ b/src/lib/frct_pci.c @@ -0,0 +1,105 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Protocol Control Information of FRCT + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/config.h> +#include <ouroboros/frct_pci.h> +#include <ouroboros/hash.h> +#include <ouroboros/errno.h> + +#include <assert.h> +#include <string.h> + +#define TYPE_SIZE  1 +#define SEQNO_SIZE 8 +#define FLAGS_SIZE 1 + +/* FIXME: Head size will differ on type */ +#define HEAD_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE + +int frct_pci_ser(struct shm_du_buff * sdb, +                 struct frct_pci *    pci, +                 bool                 error_check) +{ +        uint8_t * head; +        uint8_t * tail; + +        assert(sdb); +        assert(pci); + +        head = shm_du_buff_head_alloc(sdb, HEAD_SIZE); +        if (head == NULL) +                return -EPERM; + +        memcpy(head, &pci->type, TYPE_SIZE); +        memcpy(head + TYPE_SIZE, &pci->flags, FLAGS_SIZE); +        memcpy(head + TYPE_SIZE + FLAGS_SIZE, &pci->seqno, SEQNO_SIZE); + +        if (error_check) { +                tail = shm_du_buff_tail_alloc(sdb, hash_len(HASH_CRC32)); +                if (tail == NULL) { +                        shm_du_buff_head_release(sdb, HEAD_SIZE); +                        return -EPERM; +                } + +                *((uint32_t *) tail) = 0; +                mem_hash(HASH_CRC32, (uint32_t *) tail, head, tail - head); +        } + +        return 0; +} + +int frct_pci_des(struct shm_du_buff * sdb, +                 struct frct_pci *    pci, +                 bool                 error_check) +{ +        uint8_t * head; +        uint8_t * tail; +        uint32_t  crc; + +        assert(sdb); +        assert(pci); + +        head = shm_du_buff_head(sdb); + +         /* FIXME: Depending on the type a different deserialization */ +        memcpy(&pci->type, head, TYPE_SIZE); +        memcpy(&pci->flags, head + TYPE_SIZE, FLAGS_SIZE); +        memcpy(&pci->seqno, head + TYPE_SIZE + FLAGS_SIZE, SEQNO_SIZE); + +        if (error_check) { +                tail = shm_du_buff_tail(sdb); +                if (tail == NULL) +                        return -EPERM; + +                mem_hash(HASH_CRC32, &crc, head, tail - head); + +                /* Corrupted SDU */ +                if (crc != 0) +                        return -1; + +                shm_du_buff_tail_release(sdb, hash_len(HASH_CRC32)); +        } + +        shm_du_buff_head_release(sdb, HEAD_SIZE); + +        return 0; +} diff --git a/src/lib/hash.c b/src/lib/hash.c index d8cabfd3..e062a0ad 100644 --- a/src/lib/hash.c +++ b/src/lib/hash.c @@ -64,45 +64,46 @@ uint16_t hash_len(enum hash_algo algo)  #endif  } -void str_hash(enum hash_algo algo, -              void *         buf, -              const char *   str) +void mem_hash(enum hash_algo  algo, +              void *          dst, +              const uint8_t * buf, +              size_t          len)  {  #ifdef HAVE_LIBGCRYPT -        gcry_md_hash_buffer(algo, buf, str, strlen(str)); +        gcry_md_hash_buffer(algo, dst, buf, len);  #else          struct sha3_ctx sha3_ctx;          struct md5_ctx md5_ctx;          switch (algo) {          case HASH_CRC32: -                memset(buf, 0, CRC32_HASH_LEN); -                crc32((uint32_t *) buf, str, strlen(str)); +                memset(dst, 0, CRC32_HASH_LEN); +                crc32((uint32_t *) dst, buf, len);                  break;          case HASH_MD5:                  rhash_md5_init(&md5_ctx); -                rhash_md5_update(&md5_ctx, str, strlen(str)); -                rhash_md5_final(&md5_ctx, (uint8_t *) buf); +                rhash_md5_update(&md5_ctx, buf, len); +                rhash_md5_final(&md5_ctx, (uint8_t *) dst);                  break;          case HASH_SHA3_224:                  rhash_sha3_224_init(&sha3_ctx); -                rhash_sha3_update(&sha3_ctx, str, strlen(str)); -                rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); +                rhash_sha3_update(&sha3_ctx, buf, len); +                rhash_sha3_final(&sha3_ctx, (uint8_t *) dst);                  break;          case HASH_SHA3_256:                  rhash_sha3_256_init(&sha3_ctx); -                rhash_sha3_update(&sha3_ctx, str, strlen(str)); -                rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); +                rhash_sha3_update(&sha3_ctx, buf, len); +                rhash_sha3_final(&sha3_ctx, (uint8_t *) dst);                  break;          case HASH_SHA3_384:                  rhash_sha3_384_init(&sha3_ctx); -                rhash_sha3_update(&sha3_ctx, str, strlen(str)); -                rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); +                rhash_sha3_update(&sha3_ctx, buf, len); +                rhash_sha3_final(&sha3_ctx, (uint8_t *) dst);                  break;          case HASH_SHA3_512:                  rhash_sha3_512_init(&sha3_ctx); -                rhash_sha3_update(&sha3_ctx, str, strlen(str)); -                rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); +                rhash_sha3_update(&sha3_ctx, buf, len); +                rhash_sha3_final(&sha3_ctx, (uint8_t *) dst);                  break;          default:                  assert(false); @@ -110,3 +111,10 @@ void str_hash(enum hash_algo algo,          }  #endif  } + +void str_hash(enum hash_algo algo, +              void *         dst, +              const char *   str) +{ +        return mem_hash(algo, dst, (const uint8_t *) str, strlen(str)); +} diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index cd6946d4..2f1d4e33 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -326,10 +326,9 @@ void shm_flow_set_notify(struct shm_flow_set * set,  ssize_t shm_flow_set_wait(const struct shm_flow_set * set,                            size_t                      idx,                            int *                       fqueue, -                          const struct timespec *     timeout) +                          const struct timespec *     abstime)  {          ssize_t ret = 0; -        struct timespec abstime;          assert(set);          assert(idx < AP_MAX_FQUEUES); @@ -341,19 +340,15 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,          if (pthread_mutex_lock(set->lock) == EOWNERDEAD)                  pthread_mutex_consistent(set->lock);  #endif -        if (timeout != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        }          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) set->lock);          while (set->heads[idx] == 0 && ret != -ETIMEDOUT) { -                if (timeout != NULL) +                if (abstime != NULL)                          ret = -pthread_cond_timedwait(set->conds + idx,                                                        set->lock, -                                                      &abstime); +                                                      abstime);                  else                          ret = -pthread_cond_wait(set->conds + idx,                                                   set->lock); diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 33e236b0..b420b785 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -281,9 +281,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)  }  ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb, -                         const struct timespec * timeout) +                         const struct timespec * abstime)  { -        struct timespec abstime;          ssize_t idx = -1;          assert(rb); @@ -293,11 +292,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          if (idx != -EAGAIN)                  return idx; -        if (timeout != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        } -  #ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else @@ -308,10 +302,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                               (void *) rb->lock);          while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { -                if (timeout != NULL) +                if (abstime != NULL)                          idx = -pthread_cond_timedwait(rb->add,                                                        rb->lock, -                                                      &abstime); +                                                      abstime);                  else                          idx = -pthread_cond_wait(rb->add, rb->lock);  #ifdef HAVE_ROBUST_MUTEX diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 44001458..7dc5f5d9 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -284,18 +284,12 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)  }  ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb, -                         const struct timespec * timeout) +                         const struct timespec * abstime)  { -        struct timespec abstime;          ssize_t idx = -1;          assert(rb); -        if (timeout != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        } -  #ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else @@ -306,10 +300,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                               (void *) rb->lock);          while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { -                if (timeout != NULL) +                if (abstime != NULL)                          idx = -pthread_cond_timedwait(rb->add,                                                        rb->lock, -                                                      &abstime); +                                                      abstime);                  else                          idx = -pthread_cond_wait(rb->add, rb->lock);  #ifdef HAVE_ROBUST_MUTEX diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 41c2074a..fd3c1c6a 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -11,6 +11,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c    rib_test.c    sha3_test.c    time_utils_test.c +  timerwheel_test.c    )  add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c new file mode 100644 index 00000000..d9ca164e --- /dev/null +++ b/src/lib/tests/timerwheel_test.c @@ -0,0 +1,106 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Test of the timer wheel + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "timerwheel.c" + +#include <pthread.h> +#include <time.h> +#include <stdlib.h> +#include <stdio.h> + +#define MAX_ELEMENTS   100 +#define MAX_RESOLUTION 10  /* ms */ +#define MAX_ADDITIONS  1000 + +int total; + +int add(void * o) +{ +        total += *((int *) o); +        return 0; +} + +int timerwheel_test(int argc, char ** argv) +{ +        struct timerwheel * tw; +        long resolution; +        long elements; +        struct timespec wait; + +        int additions; + +        int check_total = 0; + +        int i; +        int var = 5; + +        struct tw_f * f; + +        (void) argc; +        (void) argv; + +        total = 0; + +        srand(time(NULL)); + +        resolution = rand() % (MAX_RESOLUTION - 1) + 1; +        elements = rand() % (MAX_ELEMENTS - 10) + 10; + +        tw = timerwheel_create(resolution, resolution * elements); +        if (tw == NULL) { +                printf("Failed to create timerwheel.\n"); +                return -1; +        } + +        wait.tv_sec = (resolution * elements) / 1000; +        wait.tv_nsec = ((resolution * elements) % 1000) * MILLION; + +        additions = rand() % MAX_ADDITIONS + 1000; + +        for (i = 0; i < additions; ++i) { +                int delay = rand() % (resolution * elements); +                check_total += var; +                f = timerwheel_start(tw, +                                     (void (*)(void *)) add, +                                     (void *) &var, +                                     delay); +                if (f == NULL) { +                        printf("Failed to add function."); +                        return -1; +                } +        } + +        nanosleep(&wait, NULL); + +        /* On some systems and VMs, the scheduler may be too slow. */ +        if (total != check_total) +                nanosleep(&wait, NULL); + +        timerwheel_destroy(tw); + +        if (total != check_total) { +                printf("Totals do not match.\n"); +                return -1; +        } + +        return 0; +} diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c new file mode 100644 index 00000000..7e2779d0 --- /dev/null +++ b/src/lib/timerwheel.c @@ -0,0 +1,371 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Timerwheel + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/config.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/errno.h> +#include <ouroboros/list.h> + +#include <pthread.h> +#include <stdlib.h> +#include <assert.h> +#include <string.h> + +#define FRAC 10 /* accuracy of the timer */ + +#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 1)); +#define tw_free(tw) (tw_used(tw) + 1 < tw->elements) +#define tw_empty(tw) (tw->head == tw->tail) + +enum tw_state { +        TW_NULL = 0, +        TW_RUNNING, +        TW_DESTROY +}; + +struct tw_f { +        struct list_head next; +        void (* func)(void *); +        void * arg; +}; + +struct tw_el { +        struct list_head funcs; +        struct timespec  expiry; +}; + +struct timerwheel { +        struct tw_el *   wheel; + +        struct timespec  intv; + +        size_t           pos; + +        struct list_head wq; + +        pthread_cond_t   work; +        pthread_mutex_t  lock; + +        int              resolution; +        unsigned int     elements; + +        enum tw_state    state; +        pthread_mutex_t  s_lock; + +        pthread_t        ticker; +        pthread_t        worker; +}; + +static void tw_el_fini(struct tw_el * e) +{ +        struct list_head * p; +        struct list_head * h; + +        list_for_each_safe(p, h, &e->funcs) { +                struct tw_f * f = list_entry(p, struct tw_f, next); +                list_del(&f->next); +        } +} + +static enum tw_state tw_get_state(struct timerwheel * tw) +{ +        enum tw_state state; + +        assert(tw); + +        pthread_mutex_lock(&tw->s_lock); + +        state = tw->state; + +        pthread_mutex_unlock(&tw->s_lock); + +        return state; +} + +static void tw_set_state(struct timerwheel * tw, enum tw_state state) +{ +        assert(tw); +        assert(state != TW_NULL); + +        pthread_mutex_lock(&tw->s_lock); + +        tw->state = state; + +        pthread_mutex_unlock(&tw->s_lock); +} + +static void * worker(void * o) +{ +        struct list_head * p; +        struct list_head * h; + +        struct timerwheel * tw = (struct timerwheel *) o; +        struct timespec dl; +        struct timespec now; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        ts_add(&now, &tw->intv, &dl); + +        pthread_mutex_lock(&tw->lock); + +        while (tw_get_state(tw) == TW_RUNNING) { +                if (pthread_cond_timedwait(&tw->work, &tw->lock, &dl) +                    == ETIMEDOUT) +                        ts_add(&dl, &tw->intv, &dl); + +                list_for_each_safe(p, h, &tw->wq) { +                        struct tw_f * f = list_entry(p, struct tw_f, next); +                        list_del(&f->next); +                        pthread_mutex_unlock(&tw->lock); +                        f->func(f->arg); +                        free(f); + +                        pthread_mutex_lock(&tw->lock); +                } +        } + +        pthread_mutex_unlock(&tw->lock); + +        return (void *) o; +} + +static void * movement(void * o) +{ +        struct timerwheel * tw = (struct timerwheel *) o; +        struct timespec now = {0, 0}; +        long ms = tw->resolution * tw->elements; +        struct timespec total = {ms / 1000, +                                 (ms % 1000) * MILLION}; +        struct list_head * p; +        struct list_head * h; + +        while (tw_get_state(tw) == TW_RUNNING) { +                clock_gettime(CLOCK_MONOTONIC, &now); + +                pthread_mutex_lock(&tw->lock); + +                if (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) < 0) { +                        pthread_mutex_unlock(&tw->lock); +                        nanosleep(&tw->intv, NULL); +                        continue; +                } + +                list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) { +                        struct tw_f * f = list_entry(p, struct tw_f, next); +                        list_del(&f->next); +                        list_add(&f->next, &tw->wq); +                } + +                ts_add(&tw->wheel[tw->pos].expiry, +                       &total, +                       &tw->wheel[tw->pos].expiry); + +                tw->pos = (tw->pos + 1) & (tw->elements - 1); + +                pthread_cond_signal(&tw->work); + +                pthread_mutex_unlock(&tw->lock); +        } + +        return (void *) 0; +} + +struct timerwheel * timerwheel_create(time_t resolution, +                                      time_t max_delay) +{ +        struct timespec now = {0, 0}; +        struct timespec res_ts = {resolution / 1000, +                                  (resolution % 1000) * MILLION}; +        unsigned long i; + +        struct timerwheel * tw; + +        pthread_condattr_t cattr; + +        assert(resolution != 0); + +        tw = malloc(sizeof(*tw)); +        if (tw == NULL) +                return NULL; + +        if (pthread_mutex_init(&tw->lock, NULL)) +                return NULL; + +        tw->elements = 1; + +        while (tw->elements < max_delay / resolution) +                tw->elements <<= 1; + +        tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements); +        if (tw->wheel == NULL) +                goto fail_wheel_malloc; + +        tw->resolution = resolution; + +        tw->intv.tv_sec = (tw->resolution / FRAC) / 1000; +        tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION; + +        list_head_init(&tw->wq); + +        if (pthread_mutex_init(&tw->lock, NULL)) +                goto fail_lock_init; + +        if (pthread_mutex_init(&tw->s_lock, NULL)) +                goto fail_s_lock_init; + +        if (pthread_condattr_init(&cattr)) +                goto fail_cond_init; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&tw->work, &cattr)) +                goto fail_cond_init; + +        tw->pos = 0; +        tw->state = TW_RUNNING; + +        clock_gettime(CLOCK_MONOTONIC, &now); +        now.tv_nsec -= (now.tv_nsec % MILLION); + +        for (i = 0; i < tw->elements; ++i) { +                list_head_init(&tw->wheel[i].funcs); +                tw->wheel[i].expiry = now; +                ts_add(&now, &res_ts, &now); +        } + +        if (pthread_create(&tw->worker, NULL, worker, (void *) tw)) +                goto fail_worker_create; + +        if (pthread_create(&tw->ticker, NULL, movement, (void *) tw)) { +                tw_set_state(tw, TW_DESTROY); +                goto fail_ticker_create; +        } + +        return tw; + + fail_ticker_create: +         pthread_join(tw->worker, NULL); + fail_worker_create: +         pthread_cond_destroy(&tw->work); + fail_cond_init: +         pthread_mutex_destroy(&tw->s_lock); + fail_s_lock_init: +         pthread_mutex_destroy(&tw->lock); + fail_lock_init: +         free(tw->wheel); + fail_wheel_malloc: +         free(tw); +         return NULL; +} + +void timerwheel_destroy(struct timerwheel * tw) +{ +        unsigned long i; + +        struct list_head * p; +        struct list_head * h; + +        tw_set_state(tw, TW_DESTROY); + +        pthread_join(tw->ticker, NULL); +        pthread_join(tw->worker, NULL); + +        for (i = 0; i < tw->elements; ++i) +                tw_el_fini(&tw->wheel[i]); + +        pthread_mutex_lock(&tw->lock); + +        list_for_each_safe(p, h, &tw->wq) { +                struct tw_f * f = list_entry(p, struct tw_f, next); +                list_del(&f->next); +                free(f); +        } + +        pthread_mutex_unlock(&tw->lock); + +        pthread_cond_destroy(&tw->work); +        pthread_mutex_destroy(&tw->lock); +        pthread_mutex_destroy(&tw->s_lock); + +        free(tw->wheel); +        free(tw); +} + +struct tw_f * timerwheel_start(struct timerwheel * tw, +                               void (* func)(void *), +                               void *              arg, +                               time_t              delay) +{ +        int pos; +        struct tw_f * f = malloc(sizeof(*f)); +        if (f == NULL) +                return NULL; + +        f->func = func; +        f->arg = arg; + +        assert(delay < tw->elements * tw->resolution); + +        pthread_mutex_lock(&tw->lock); + +        pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); +        list_add(&f->next, &tw->wheel[pos].funcs); + +        pthread_mutex_unlock(&tw->lock); + +        return f; +} + +int timerwheel_restart(struct timerwheel * tw, +                       struct tw_f *       f, +                       time_t              delay) +{ +        int pos; + +        assert(tw); +        assert(delay < tw->elements * tw->resolution); + +        pthread_mutex_lock(&tw->lock); + +        list_del(&f->next); +        pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); +        list_add(&f->next, &tw->wheel[pos].funcs); + +        pthread_mutex_unlock(&tw->lock); + +        return 0; +} + +void timerwheel_stop(struct timerwheel * tw, +                     struct tw_f *       f) +{ +        assert(tw); + +        pthread_mutex_lock(&tw->lock); + +        list_del(&f->next); +        free(f); + +        pthread_mutex_unlock(&tw->lock); +} | 
