diff options
-rw-r--r-- | include/ouroboros/frct_pci.h | 69 | ||||
-rw-r--r-- | include/ouroboros/hash.h | 8 | ||||
-rw-r--r-- | include/ouroboros/shm_flow_set.h | 2 | ||||
-rw-r--r-- | include/ouroboros/shm_rbuff.h | 2 | ||||
-rw-r--r-- | include/ouroboros/timerwheel.h | 45 | ||||
-rw-r--r-- | src/ipcpd/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/ipcpd/tests/CMakeLists.txt | 34 | ||||
-rw-r--r-- | src/ipcpd/timerwheel.h | 39 | ||||
-rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/lib/dev.c | 440 | ||||
-rw-r--r-- | src/lib/frct_pci.c | 105 | ||||
-rw-r--r-- | src/lib/hash.c | 40 | ||||
-rw-r--r-- | src/lib/shm_flow_set.c | 11 | ||||
-rw-r--r-- | src/lib/shm_rbuff_ll.c | 12 | ||||
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 12 | ||||
-rw-r--r-- | src/lib/tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/lib/tests/timerwheel_test.c (renamed from src/ipcpd/tests/timerwheel_test.c) | 14 | ||||
-rw-r--r-- | src/lib/timerwheel.c (renamed from src/ipcpd/timerwheel.c) | 147 |
18 files changed, 718 insertions, 269 deletions
diff --git a/include/ouroboros/frct_pci.h b/include/ouroboros/frct_pci.h new file mode 100644 index 00000000..3a93ac1c --- /dev/null +++ b/include/ouroboros/frct_pci.h @@ -0,0 +1,69 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Protocol Control Information of FRCT + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_FRCT_PCI_H +#define OUROBOROS_LIB_FRCT_PCI_H + +#include <ouroboros/shm_du_buff.h> + +#include <stdint.h> +#include <stdbool.h> + +struct frct_pci { + uint8_t type; + uint8_t flags; + uint64_t seqno; + uint64_t lwe; + uint64_t rwe; +}; + +enum pdu_types { + PDU_TYPE_DATA = 0x01, + PDU_TYPE_ACK = 0x02, + PDU_TYPE_FC = 0x04, + PDU_TYPE_ACK_AND_FC = (PDU_TYPE_ACK | PDU_TYPE_FC), + PDU_TYPE_CONFIG = 0x08, + PDU_TYPE_RENDEZ_VOUS = 0x10 +}; + +enum config_flags { + CONF_RESOURCE_CONTROL = 0x01, + CONF_RELIABLE = 0x02, + CONF_ERROR_CHECK = 0x04, + CONF_ORDERED = 0x08, + CONF_PARTIAL = 0x10 +}; + +enum data_flags { + FLAG_DATA_RUN = 0x01, + FLAG_MORE_FRAGMENTS = 0x02 +}; + +int frct_pci_ser(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check); + +int frct_pci_des(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check); + +#endif /* OUROBOROS_LIB_FRCT_PCI_H */ diff --git a/include/ouroboros/hash.h b/include/ouroboros/hash.h index db47c9fe..49160226 100644 --- a/include/ouroboros/hash.h +++ b/include/ouroboros/hash.h @@ -29,6 +29,7 @@ #include <gcrypt.h> #endif #include <stdint.h> +#include <stddef.h> /* Hash algorithms */ enum hash_algo { @@ -58,8 +59,13 @@ enum hash_algo { uint16_t hash_len(enum hash_algo algo); +void mem_hash(enum hash_algo algo, + void * dst, + const uint8_t * buf, + size_t len); + void str_hash(enum hash_algo algo, - void * buf, + void * dst, const char * str); #endif /* OUROBOROS_LIB_HASH_H */ diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h index 5c498336..be2f836b 100644 --- a/include/ouroboros/shm_flow_set.h +++ b/include/ouroboros/shm_flow_set.h @@ -58,6 +58,6 @@ void shm_flow_set_notify(struct shm_flow_set * set, ssize_t shm_flow_set_wait(const struct shm_flow_set * shm_set, size_t idx, int * fqueue, - const struct timespec * timeout); + const struct timespec * abstime); #endif /* OUROBOROS_SHM_FLOW_SET_H */ diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index 1557e50c..55d03b41 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -50,7 +50,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, ssize_t shm_rbuff_read(struct shm_rbuff * rb); ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, - const struct timespec * timeout); + const struct timespec * abstime); size_t shm_rbuff_queued(struct shm_rbuff * rb); diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h new file mode 100644 index 00000000..e259c855 --- /dev/null +++ b/include/ouroboros/timerwheel.h @@ -0,0 +1,45 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Ring buffer for incoming SDUs + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_TIMERWHEEL_H +#define OUROBOROS_LIB_TIMERWHEEL_H + +struct timerwheel; + +struct timerwheel * timerwheel_create(time_t resolution, + time_t max_delay); + +void timerwheel_destroy(struct timerwheel * tw); + +struct tw_f * timerwheel_start(struct timerwheel * tw, + void (* func)(void *), + void * arg, + time_t delay); /* ms */ + +int timerwheel_restart(struct timerwheel * tw, + struct tw_f * f, + time_t delay); /* ms */ + +void timerwheel_stop(struct timerwheel * tw, + struct tw_f * f); + +#endif /* OUROBOROS_LIB_TIMERWHEEL_H */ diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt index 0ead1fed..b2f350dd 100644 --- a/src/ipcpd/CMakeLists.txt +++ b/src/ipcpd/CMakeLists.txt @@ -2,13 +2,9 @@ set(IPCP_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/ipcp.c ${CMAKE_CURRENT_SOURCE_DIR}/shim-data.c - ${CMAKE_CURRENT_SOURCE_DIR}/timerwheel.c ) add_subdirectory(local) add_subdirectory(normal) add_subdirectory(shim-udp) add_subdirectory(shim-eth-llc) -if (NOT APPLE) - add_subdirectory(tests) -endif () diff --git a/src/ipcpd/tests/CMakeLists.txt b/src/ipcpd/tests/CMakeLists.txt deleted file mode 100644 index 9b5eeaa1..00000000 --- a/src/ipcpd/tests/CMakeLists.txt +++ /dev/null @@ -1,34 +0,0 @@ -get_filename_component(CURRENT_SOURCE_PARENT_DIR - ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) -get_filename_component(CURRENT_BINARY_PARENT_DIR - ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) - -include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -include_directories(${CMAKE_CURRENT_BINARY_DIR}) - -include_directories(${CURRENT_SOURCE_PARENT_DIR}) -include_directories(${CURRENT_BINARY_PARENT_DIR}) - -include_directories(${CMAKE_SOURCE_DIR}/include) -include_directories(${CMAKE_BINARY_DIR}/include) - -get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) -get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) - -create_test_sourcelist(${PARENT_DIR}_tests test_suite.c - # Add new tests here - timerwheel_test.c - ) - -add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) -target_link_libraries(${PARENT_DIR}_test ouroboros) - -add_dependencies(check ${PARENT_DIR}_test) - -set(tests_to_run ${${PARENT_DIR}_tests}) -remove(tests_to_run test_suite.c) - -foreach (test ${tests_to_run}) - get_filename_component(test_name ${test} NAME_WE) - add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) -endforeach (test) diff --git a/src/ipcpd/timerwheel.h b/src/ipcpd/timerwheel.h deleted file mode 100644 index 37a6d06a..00000000 --- a/src/ipcpd/timerwheel.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Ring buffer for incoming SDUs - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_TIMERWHEEL_H -#define OUROBOROS_IPCPD_TIMERWHEEL_H - -struct timerwheel; - -struct timerwheel * timerwheel_create(unsigned int resolution, - unsigned int max_delay); - -void timerwheel_destroy(struct timerwheel * tw); - -int timerwheel_add(struct timerwheel * tw, - void (* func)(void *), - void * arg, - size_t arg_len, - unsigned int delay); /* ms */ - -#endif /* OUROBOROS_IPCPD_TIMERWHEEL_H */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 550bbc08..728d975a 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -86,6 +86,7 @@ set(SOURCE_FILES cdap_req.c crc32.c dev.c + frct_pci.c hash.c hashtable.c irm.c @@ -104,6 +105,7 @@ set(SOURCE_FILES shm_rdrbuff.c sockets.c time_utils.c + timerwheel.c tpm.c utils.c ) diff --git a/src/lib/dev.c b/src/lib/dev.c index 9354855b..e81bf105 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -34,6 +34,8 @@ #include <ouroboros/utils.h> #include <ouroboros/fqueue.h> #include <ouroboros/qoscube.h> +#include <ouroboros/timerwheel.h> +#include <ouroboros/frct_pci.h> #include <stdlib.h> #include <string.h> @@ -41,8 +43,14 @@ #define BUF_SIZE 1500 +#define TW_ELEMENTS 6000 +#define TW_RESOLUTION 1 /* ms */ + +#define MPL 2000 /* ms */ + struct flow_set { size_t idx; + bool np1_set; }; struct fqueue { @@ -59,6 +67,26 @@ enum port_state { PORT_DESTROY }; +struct frcti { + bool used; + + struct tw_f * snd_inact; + bool snd_drf; + uint64_t snd_lwe; + uint64_t snd_rwe; + + struct tw_f * rcv_inact; + bool rcv_drf; + uint64_t rcv_lwe; + uint64_t rcv_rwe; + + bool resource_control; + bool reliable; + bool error_check; + bool ordered; + bool partial; +}; + struct port { int fd; @@ -89,10 +117,14 @@ struct { struct shm_rdrbuff * rdrb; struct shm_flow_set * fqset; + struct timerwheel * tw; + int tw_users; + struct bmp * fds; struct bmp * fqueues; struct flow * flows; struct port * ports; + struct frcti * frcti; pthread_rwlock_t lock; } ai; @@ -203,6 +235,242 @@ static int api_announce(char * ap_name) return ret; } +/* Call under flows lock */ +static int finalize_write(int fd, + size_t idx) +{ + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) + return -ENOTALLOC; + + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + + return 0; +} + +static int frcti_init(int fd) +{ + ai.frcti[fd].used = true; + + ai.frcti[fd].snd_drf = true; + ai.frcti[fd].snd_lwe = 0; + ai.frcti[fd].snd_rwe = 0; + + ai.frcti[fd].rcv_drf = true; + ai.frcti[fd].rcv_lwe = 0; + ai.frcti[fd].rcv_rwe = 0; + + return 0; +} + +static void frcti_fini(int fd) +{ + struct frcti * frcti; + + frcti = &(ai.frcti[fd]); + + frcti->used = false; + + /* FIXME: We actually need to wait until these timers become NULL. */ + if (frcti->snd_inact != NULL) + timerwheel_stop(ai.tw, frcti->snd_inact); + + if (frcti->rcv_inact != NULL) + timerwheel_stop(ai.tw, frcti->rcv_inact); +} + +static int frcti_configure(int fd, + qosspec_t * qos) +{ + /* FIXME: Send configuration message here to other side. */ + + (void) fd; + (void) qos; + + return 0; +} + +static void frcti_snd_inactivity(void * arg) +{ + struct frcti * frcti; + + pthread_rwlock_wrlock(&ai.lock); + + frcti = (struct frcti * ) arg; + + frcti->snd_drf = true; + frcti->snd_inact = NULL; + + pthread_rwlock_unlock(&ai.lock); +} + +/* Called under flows lock */ +static int frcti_write(int fd, + struct shm_du_buff * sdb) +{ + struct frcti * frcti; + struct frct_pci pci; + + memset(&pci, 0, sizeof(pci)); + + frcti = &(ai.frcti[fd]); + + /* + * Set the DRF in the first packet of a new run of SDUs, + * otherwise simply recharge the timer. + */ + if (frcti->snd_drf) { + frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity, + frcti, 2 * MPL); + if (frcti->snd_inact == NULL) + return -1; + + pci.flags |= FLAG_DATA_RUN; + frcti->snd_drf = false; + } else { + if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL)) + return -1; + } + + pci.seqno = frcti->snd_lwe++; + pci.type |= PDU_TYPE_DATA; + + if (frct_pci_ser(sdb, &pci, frcti->error_check)) + return -1; + + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) + return -ENOTALLOC; + + return 0; +} + +static void frcti_rcv_inactivity(void * arg) +{ + struct frcti * frcti; + + pthread_rwlock_wrlock(&ai.lock); + + frcti = (struct frcti * ) arg; + + frcti->rcv_drf = true; + frcti->rcv_inact = NULL; + + pthread_rwlock_unlock(&ai.lock); +} + +static ssize_t frcti_read(int fd) +{ + ssize_t idx = -1; + struct timespec abstime; + struct frcti * frcti; + struct frct_pci pci; + struct shm_du_buff * sdb; + + pthread_rwlock_rdlock(&ai.lock); + + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + pthread_rwlock_unlock(&ai.lock); + } else { + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + bool timeo = ai.flows[fd].timesout; + struct timespec timeout = ai.flows[fd].rcv_timeo; + + pthread_rwlock_unlock(&ai.lock); + + if (timeo) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, &timeout, &abstime); + idx = shm_rbuff_read_b(rb, &abstime); + } else { + idx = shm_rbuff_read_b(rb, NULL); + } + } + + if (idx < 0) + return idx; + + pthread_rwlock_rdlock(&ai.lock); + + frcti = &(ai.frcti[fd]); + + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + /* SDU may be corrupted. */ + if (frct_pci_des(sdb, &pci, frcti->error_check)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + /* We don't accept packets when there is no inactivity timer. */ + if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + /* + * If there is an inactivity timer and the DRF is set, + * reset the state of the connection. + */ + if (pci.flags & FLAG_DATA_RUN) { + frcti->rcv_drf = true; + if (frcti->rcv_inact != NULL) + timerwheel_stop(ai.tw, frcti->rcv_inact); + frcti->rcv_lwe = pci.seqno; + } + + /* + * Start receiver inactivity if this packet has the DRF, + * otherwise simply restart it. + */ + if (frcti->rcv_drf) { + frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity, + frcti, 3 * MPL); + if (frcti->rcv_inact == NULL) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + frcti->rcv_drf = false; + } else { + if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + } + + pthread_rwlock_unlock(&ai.lock); + + return idx; +} + +static int frcti_event_wait(struct flow_set * set, + struct fqueue * fq, + const struct timespec * timeout) +{ + int ret; + + assert(set); + assert(fq); + assert(timeout); + + /* + * FIXME: Return the fq only if a data SDU + * for the application is available. + */ + + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (ret == -ETIMEDOUT) { + fq->fqsize = 0; + return -ETIMEDOUT; + } + + return ret; +} + static void flow_clear(int fd) { assert(!(fd < 0)); @@ -230,6 +498,9 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); + if (ai.frcti[fd].used) + frcti_fini(fd); + flow_clear(fd); } @@ -316,8 +587,14 @@ int ouroboros_init(const char * ap_name) if (ai.flows == NULL) goto fail_flows; - for (i = 0; i < AP_MAX_FLOWS; ++i) + ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); + if (ai.frcti == NULL) + goto fail_frcti; + + for (i = 0; i < AP_MAX_FLOWS; ++i) { flow_clear(i); + frcti_fini(i); + } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); if (ai.ports == NULL) @@ -353,8 +630,15 @@ int ouroboros_init(const char * ap_name) if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; + ai.tw = timerwheel_create(TW_RESOLUTION, + TW_RESOLUTION * TW_ELEMENTS); + if (ai.tw == NULL) + goto fail_timerwheel; + return 0; + fail_timerwheel: + pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < IRMD_MAX_FLOWS; ++i) pthread_cond_destroy(&ai.ports[i].state_cond); @@ -366,11 +650,13 @@ int ouroboros_init(const char * ap_name) fail_ap_name: free(ai.ports); fail_ports: + free(ai.frcti); + fail_frcti: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); fail_rdrb: - shm_flow_set_destroy(ai.fqset); + shm_flow_set_destroy(ai.fqset); fail_fqset: bmp_destroy(ai.fqueues); fail_fqueues: @@ -409,6 +695,9 @@ void ouroboros_fini() shm_rdrbuff_close(ai.rdrb); + if (ai.tw != NULL) + timerwheel_destroy(ai.tw); + free(ai.flows); free(ai.ports); @@ -463,9 +752,15 @@ int flow_accept(qosspec_t * qs, if (fd < 0) return fd; + pthread_rwlock_wrlock(&ai.lock); + + frcti_init(fd); + if (qs != NULL) *qs = ai.flows[fd].spec; + pthread_rwlock_unlock(&ai.lock); + return fd; } @@ -505,7 +800,7 @@ int flow_alloc(const char * dst_name, return -EIRMD; } - if (recv_msg->result != 0) { + if (recv_msg->result != 0) { int res = recv_msg->result; irm_msg__free_unpacked(recv_msg, NULL); return res; @@ -520,6 +815,22 @@ int flow_alloc(const char * dst_name, irm_msg__free_unpacked(recv_msg, NULL); + if (fd < 0) + return fd; + + pthread_rwlock_wrlock(&ai.lock); + + frcti_init(fd); + + if (frcti_configure(fd, qs)) { + flow_fini(fd); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.lock); + return -1; + } + + pthread_rwlock_unlock(&ai.lock); + return fd; } @@ -720,34 +1031,31 @@ ssize_t flow_write(int fd, return idx; } - if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { - shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } } else { /* blocking */ - struct shm_rdrbuff * rdrb = ai.rdrb; - struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; - pthread_rwlock_unlock(&ai.lock); - assert(tx_rb); - - idx = shm_rdrbuff_write_b(rdrb, + idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, count); - if (shm_rbuff_write(tx_rb, idx) < 0) { - shm_rdrbuff_remove(rdrb, idx); - return -ENOTALLOC; - } - pthread_rwlock_rdlock(&ai.lock); } - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + if (!ai.frcti[fd].used) { + if (finalize_write(fd, idx)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOTALLOC; + } + } else { + if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + } pthread_rwlock_unlock(&ai.lock); @@ -772,21 +1080,12 @@ ssize_t flow_read(int fd, return -ENOTALLOC; } - if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - pthread_rwlock_unlock(&ai.lock); - } else { - struct shm_rbuff * rb = ai.flows[fd].rx_rb; - bool timeo = ai.flows[fd].timesout; - struct timespec timeout = ai.flows[fd].rcv_timeo; - - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&ai.lock); - if (timeo) - idx = shm_rbuff_read_b(rb, &timeout); - else - idx = shm_rbuff_read_b(rb, NULL); - } + if (!ai.frcti[fd].used) + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + else + idx = frcti_read(fd); if (idx < 0) { assert(idx == -EAGAIN || idx == -ETIMEDOUT); @@ -823,6 +1122,8 @@ struct flow_set * flow_set_create() return NULL; } + set->np1_set = false; + pthread_rwlock_unlock(&ai.lock); return set; @@ -891,6 +1192,9 @@ int flow_set_add(struct flow_set * set, for (i = 0; i < sdus; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + if (ai.frcti[fd].used) + set->np1_set = true; + pthread_rwlock_unlock(&ai.lock); return ret; @@ -960,7 +1264,8 @@ int flow_event_wait(struct flow_set * set, struct fqueue * fq, const struct timespec * timeout) { - ssize_t ret; + ssize_t ret; + struct timespec abstime; if (set == NULL || fq == NULL) return -EINVAL; @@ -970,7 +1275,17 @@ int flow_event_wait(struct flow_set * set, assert(!fq->next); - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + if (set->np1_set) + ret = frcti_event_wait(set, fq, &abstime); + else + ret = shm_flow_set_wait(ai.fqset, set->idx, + fq->fqueue, &abstime); + if (ret == -ETIMEDOUT) { fq->fqsize = 0; return -ETIMEDOUT; @@ -1132,9 +1447,8 @@ int ipcp_flow_read(int fd, { ssize_t idx = -1; int port_id = -1; - struct shm_rbuff * rb; - assert(fd >=0); + assert(fd >= 0); assert(sdb); pthread_rwlock_rdlock(&ai.lock); @@ -1144,11 +1458,13 @@ int ipcp_flow_read(int fd, return -ENOTALLOC; } - rb = ai.flows[fd].rx_rb; - pthread_rwlock_unlock(&ai.lock); - idx = shm_rbuff_read(rb); + if (!ai.frcti[fd].used) + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + else + idx = frcti_read(fd); + if (idx < 0) return idx; @@ -1160,8 +1476,6 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - size_t idx; - if (sdb == NULL) return -EINVAL; @@ -1179,10 +1493,17 @@ int ipcp_flow_write(int fd, assert(ai.flows[fd].tx_rb); - idx = shm_du_buff_get_idx(sdb); - - shm_rbuff_write(ai.flows[fd].tx_rb, idx); - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + if (!ai.frcti[fd].used) { + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; + } + } else { + if (frcti_write(fd, sdb)) { + pthread_rwlock_unlock(&ai.lock); + return -1; + } + } pthread_rwlock_unlock(&ai.lock); @@ -1274,32 +1595,11 @@ int local_flow_write(int fd, return -ENOTALLOC; } - shm_rbuff_write(ai.flows[fd].tx_rb, idx); - - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - - pthread_rwlock_unlock(&ai.lock); - - return 0; -} - -int ipcp_read_shim(int fd, - struct shm_du_buff ** sdb) -{ - ssize_t idx; - - pthread_rwlock_rdlock(&ai.lock); - - assert(ai.flows[fd].rx_rb); - - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - if (idx < 0) { + if (finalize_write(fd, idx)) { pthread_rwlock_unlock(&ai.lock); - return -EAGAIN; + return -ENOTALLOC; } - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.lock); return 0; diff --git a/src/lib/frct_pci.c b/src/lib/frct_pci.c new file mode 100644 index 00000000..92cf8cd9 --- /dev/null +++ b/src/lib/frct_pci.c @@ -0,0 +1,105 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Protocol Control Information of FRCT + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/config.h> +#include <ouroboros/frct_pci.h> +#include <ouroboros/hash.h> +#include <ouroboros/errno.h> + +#include <assert.h> +#include <string.h> + +#define TYPE_SIZE 1 +#define SEQNO_SIZE 8 +#define FLAGS_SIZE 1 + +/* FIXME: Head size will differ on type */ +#define HEAD_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE + +int frct_pci_ser(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check) +{ + uint8_t * head; + uint8_t * tail; + + assert(sdb); + assert(pci); + + head = shm_du_buff_head_alloc(sdb, HEAD_SIZE); + if (head == NULL) + return -EPERM; + + memcpy(head, &pci->type, TYPE_SIZE); + memcpy(head + TYPE_SIZE, &pci->flags, FLAGS_SIZE); + memcpy(head + TYPE_SIZE + FLAGS_SIZE, &pci->seqno, SEQNO_SIZE); + + if (error_check) { + tail = shm_du_buff_tail_alloc(sdb, hash_len(HASH_CRC32)); + if (tail == NULL) { + shm_du_buff_head_release(sdb, HEAD_SIZE); + return -EPERM; + } + + *((uint32_t *) tail) = 0; + mem_hash(HASH_CRC32, (uint32_t *) tail, head, tail - head); + } + + return 0; +} + +int frct_pci_des(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check) +{ + uint8_t * head; + uint8_t * tail; + uint32_t crc; + + assert(sdb); + assert(pci); + + head = shm_du_buff_head(sdb); + + /* FIXME: Depending on the type a different deserialization */ + memcpy(&pci->type, head, TYPE_SIZE); + memcpy(&pci->flags, head + TYPE_SIZE, FLAGS_SIZE); + memcpy(&pci->seqno, head + TYPE_SIZE + FLAGS_SIZE, SEQNO_SIZE); + + if (error_check) { + tail = shm_du_buff_tail(sdb); + if (tail == NULL) + return -EPERM; + + mem_hash(HASH_CRC32, &crc, head, tail - head); + + /* Corrupted SDU */ + if (crc != 0) + return -1; + + shm_du_buff_tail_release(sdb, hash_len(HASH_CRC32)); + } + + shm_du_buff_head_release(sdb, HEAD_SIZE); + + return 0; +} diff --git a/src/lib/hash.c b/src/lib/hash.c index d8cabfd3..e062a0ad 100644 --- a/src/lib/hash.c +++ b/src/lib/hash.c @@ -64,45 +64,46 @@ uint16_t hash_len(enum hash_algo algo) #endif } -void str_hash(enum hash_algo algo, - void * buf, - const char * str) +void mem_hash(enum hash_algo algo, + void * dst, + const uint8_t * buf, + size_t len) { #ifdef HAVE_LIBGCRYPT - gcry_md_hash_buffer(algo, buf, str, strlen(str)); + gcry_md_hash_buffer(algo, dst, buf, len); #else struct sha3_ctx sha3_ctx; struct md5_ctx md5_ctx; switch (algo) { case HASH_CRC32: - memset(buf, 0, CRC32_HASH_LEN); - crc32((uint32_t *) buf, str, strlen(str)); + memset(dst, 0, CRC32_HASH_LEN); + crc32((uint32_t *) dst, buf, len); break; case HASH_MD5: rhash_md5_init(&md5_ctx); - rhash_md5_update(&md5_ctx, str, strlen(str)); - rhash_md5_final(&md5_ctx, (uint8_t *) buf); + rhash_md5_update(&md5_ctx, buf, len); + rhash_md5_final(&md5_ctx, (uint8_t *) dst); break; case HASH_SHA3_224: rhash_sha3_224_init(&sha3_ctx); - rhash_sha3_update(&sha3_ctx, str, strlen(str)); - rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); + rhash_sha3_update(&sha3_ctx, buf, len); + rhash_sha3_final(&sha3_ctx, (uint8_t *) dst); break; case HASH_SHA3_256: rhash_sha3_256_init(&sha3_ctx); - rhash_sha3_update(&sha3_ctx, str, strlen(str)); - rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); + rhash_sha3_update(&sha3_ctx, buf, len); + rhash_sha3_final(&sha3_ctx, (uint8_t *) dst); break; case HASH_SHA3_384: rhash_sha3_384_init(&sha3_ctx); - rhash_sha3_update(&sha3_ctx, str, strlen(str)); - rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); + rhash_sha3_update(&sha3_ctx, buf, len); + rhash_sha3_final(&sha3_ctx, (uint8_t *) dst); break; case HASH_SHA3_512: rhash_sha3_512_init(&sha3_ctx); - rhash_sha3_update(&sha3_ctx, str, strlen(str)); - rhash_sha3_final(&sha3_ctx, (uint8_t *) buf); + rhash_sha3_update(&sha3_ctx, buf, len); + rhash_sha3_final(&sha3_ctx, (uint8_t *) dst); break; default: assert(false); @@ -110,3 +111,10 @@ void str_hash(enum hash_algo algo, } #endif } + +void str_hash(enum hash_algo algo, + void * dst, + const char * str) +{ + return mem_hash(algo, dst, (const uint8_t *) str, strlen(str)); +} diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index cd6946d4..2f1d4e33 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -326,10 +326,9 @@ void shm_flow_set_notify(struct shm_flow_set * set, ssize_t shm_flow_set_wait(const struct shm_flow_set * set, size_t idx, int * fqueue, - const struct timespec * timeout) + const struct timespec * abstime) { ssize_t ret = 0; - struct timespec abstime; assert(set); assert(idx < AP_MAX_FQUEUES); @@ -341,19 +340,15 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set, if (pthread_mutex_lock(set->lock) == EOWNERDEAD) pthread_mutex_consistent(set->lock); #endif - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) set->lock); while (set->heads[idx] == 0 && ret != -ETIMEDOUT) { - if (timeout != NULL) + if (abstime != NULL) ret = -pthread_cond_timedwait(set->conds + idx, set->lock, - &abstime); + abstime); else ret = -pthread_cond_wait(set->conds + idx, set->lock); diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 33e236b0..b420b785 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -281,9 +281,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) } ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, - const struct timespec * timeout) + const struct timespec * abstime) { - struct timespec abstime; ssize_t idx = -1; assert(rb); @@ -293,11 +292,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (idx != -EAGAIN) return idx; - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else @@ -308,10 +302,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, (void *) rb->lock); while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { - if (timeout != NULL) + if (abstime != NULL) idx = -pthread_cond_timedwait(rb->add, rb->lock, - &abstime); + abstime); else idx = -pthread_cond_wait(rb->add, rb->lock); #ifdef HAVE_ROBUST_MUTEX diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 44001458..7dc5f5d9 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -284,18 +284,12 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) } ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, - const struct timespec * timeout) + const struct timespec * abstime) { - struct timespec abstime; ssize_t idx = -1; assert(rb); - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else @@ -306,10 +300,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, (void *) rb->lock); while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { - if (timeout != NULL) + if (abstime != NULL) idx = -pthread_cond_timedwait(rb->add, rb->lock, - &abstime); + abstime); else idx = -pthread_cond_wait(rb->add, rb->lock); #ifdef HAVE_ROBUST_MUTEX diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 41c2074a..fd3c1c6a 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -11,6 +11,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c rib_test.c sha3_test.c time_utils_test.c + timerwheel_test.c ) add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) diff --git a/src/ipcpd/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c index 6ba1b890..d9ca164e 100644 --- a/src/ipcpd/tests/timerwheel_test.c +++ b/src/lib/tests/timerwheel_test.c @@ -51,6 +51,9 @@ int timerwheel_test(int argc, char ** argv) int check_total = 0; int i; + int var = 5; + + struct tw_f * f; (void) argc; (void) argv; @@ -75,13 +78,12 @@ int timerwheel_test(int argc, char ** argv) for (i = 0; i < additions; ++i) { int delay = rand() % (resolution * elements); - int var = rand() % 5; check_total += var; - if (timerwheel_add(tw, - (void (*)(void *)) add, - (void *) &var, - sizeof(var), - delay)) { + f = timerwheel_start(tw, + (void (*)(void *)) add, + (void *) &var, + delay); + if (f == NULL) { printf("Failed to add function."); return -1; } diff --git a/src/ipcpd/timerwheel.c b/src/lib/timerwheel.c index 6086181a..7e2779d0 100644 --- a/src/ipcpd/timerwheel.c +++ b/src/lib/timerwheel.c @@ -6,17 +6,17 @@ * Dimitri Staessens <dimitri.staessens@ugent.be> * Sander Vrijders <sander.vrijders@ugent.be> * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. * - * This program is distributed in the hope that it will be useful, + * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software * Foundation, Inc., http://www.fsf.org/about/contact/. */ @@ -83,8 +83,6 @@ static void tw_el_fini(struct tw_el * e) list_for_each_safe(p, h, &e->funcs) { struct tw_f * f = list_entry(p, struct tw_f, next); list_del(&f->next); - if (f->arg != NULL) - free(f->arg); } } @@ -140,8 +138,6 @@ static void * worker(void * o) list_del(&f->next); pthread_mutex_unlock(&tw->lock); f->func(f->arg); - if (f->arg != NULL) - free(f->arg); free(f); pthread_mutex_lock(&tw->lock); @@ -194,8 +190,8 @@ static void * movement(void * o) return (void *) 0; } -struct timerwheel * timerwheel_create(unsigned int resolution, - unsigned int max_delay) +struct timerwheel * timerwheel_create(time_t resolution, + time_t max_delay) { struct timespec now = {0, 0}; struct timespec res_ts = {resolution / 1000, @@ -221,10 +217,8 @@ struct timerwheel * timerwheel_create(unsigned int resolution, tw->elements <<= 1; tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements); - if (tw->wheel == NULL) { - free(tw); - return NULL; - } + if (tw->wheel == NULL) + goto fail_wheel_malloc; tw->resolution = resolution; @@ -233,35 +227,20 @@ struct timerwheel * timerwheel_create(unsigned int resolution, list_head_init(&tw->wq); - if (pthread_mutex_init(&tw->lock, NULL)) { - free(tw->wheel); - free(tw); - return NULL; - } + if (pthread_mutex_init(&tw->lock, NULL)) + goto fail_lock_init; - if (pthread_mutex_init(&tw->s_lock, NULL)) { - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } + if (pthread_mutex_init(&tw->s_lock, NULL)) + goto fail_s_lock_init; + + if (pthread_condattr_init(&cattr)) + goto fail_cond_init; - if (pthread_condattr_init(&cattr)) { - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif - if (pthread_cond_init(&tw->work, &cattr)) { - pthread_mutex_destroy(&tw->s_lock); - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } + if (pthread_cond_init(&tw->work, &cattr)) + goto fail_cond_init; tw->pos = 0; tw->state = TW_RUNNING; @@ -275,27 +254,29 @@ struct timerwheel * timerwheel_create(unsigned int resolution, ts_add(&now, &res_ts, &now); } - if (pthread_create(&tw->worker, NULL, worker, (void *) tw)) { - pthread_cond_destroy(&tw->work); - pthread_mutex_destroy(&tw->s_lock); - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; - } + if (pthread_create(&tw->worker, NULL, worker, (void *) tw)) + goto fail_worker_create; if (pthread_create(&tw->ticker, NULL, movement, (void *) tw)) { tw_set_state(tw, TW_DESTROY); - pthread_join(tw->worker, NULL); - pthread_cond_destroy(&tw->work); - pthread_mutex_destroy(&tw->s_lock); - pthread_mutex_destroy(&tw->lock); - free(tw->wheel); - free(tw); - return NULL; + goto fail_ticker_create; } return tw; + + fail_ticker_create: + pthread_join(tw->worker, NULL); + fail_worker_create: + pthread_cond_destroy(&tw->work); + fail_cond_init: + pthread_mutex_destroy(&tw->s_lock); + fail_s_lock_init: + pthread_mutex_destroy(&tw->lock); + fail_lock_init: + free(tw->wheel); + fail_wheel_malloc: + free(tw); + return NULL; } void timerwheel_destroy(struct timerwheel * tw) @@ -318,8 +299,6 @@ void timerwheel_destroy(struct timerwheel * tw) list_for_each_safe(p, h, &tw->wq) { struct tw_f * f = list_entry(p, struct tw_f, next); list_del(&f->next); - if (f->arg != NULL) - free(f->arg); free(f); } @@ -333,30 +312,43 @@ void timerwheel_destroy(struct timerwheel * tw) free(tw); } -int timerwheel_add(struct timerwheel * tw, - void (* func)(void *), - void * arg, - size_t arg_len, - unsigned int delay) +struct tw_f * timerwheel_start(struct timerwheel * tw, + void (* func)(void *), + void * arg, + time_t delay) { int pos; struct tw_f * f = malloc(sizeof(*f)); if (f == NULL) - return -ENOMEM; + return NULL; f->func = func; - f->arg = malloc(arg_len); - if (f->arg == NULL) { - free(f); - return -ENOMEM; - } + f->arg = arg; + + assert(delay < tw->elements * tw->resolution); - memcpy(f->arg, arg, arg_len); + pthread_mutex_lock(&tw->lock); + + pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); + list_add(&f->next, &tw->wheel[pos].funcs); + + pthread_mutex_unlock(&tw->lock); + + return f; +} + +int timerwheel_restart(struct timerwheel * tw, + struct tw_f * f, + time_t delay) +{ + int pos; + assert(tw); assert(delay < tw->elements * tw->resolution); pthread_mutex_lock(&tw->lock); + list_del(&f->next); pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1); list_add(&f->next, &tw->wheel[pos].funcs); @@ -364,3 +356,16 @@ int timerwheel_add(struct timerwheel * tw, return 0; } + +void timerwheel_stop(struct timerwheel * tw, + struct tw_f * f) +{ + assert(tw); + + pthread_mutex_lock(&tw->lock); + + list_del(&f->next); + free(f); + + pthread_mutex_unlock(&tw->lock); +} |