summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/frct_pci.h69
-rw-r--r--include/ouroboros/hash.h8
-rw-r--r--include/ouroboros/shm_flow_set.h2
-rw-r--r--include/ouroboros/shm_rbuff.h2
-rw-r--r--include/ouroboros/timerwheel.h45
-rw-r--r--src/ipcpd/CMakeLists.txt4
-rw-r--r--src/ipcpd/tests/CMakeLists.txt34
-rw-r--r--src/ipcpd/timerwheel.h39
-rw-r--r--src/lib/CMakeLists.txt2
-rw-r--r--src/lib/dev.c440
-rw-r--r--src/lib/frct_pci.c105
-rw-r--r--src/lib/hash.c40
-rw-r--r--src/lib/shm_flow_set.c11
-rw-r--r--src/lib/shm_rbuff_ll.c12
-rw-r--r--src/lib/shm_rbuff_pthr.c12
-rw-r--r--src/lib/tests/CMakeLists.txt1
-rw-r--r--src/lib/tests/timerwheel_test.c (renamed from src/ipcpd/tests/timerwheel_test.c)14
-rw-r--r--src/lib/timerwheel.c (renamed from src/ipcpd/timerwheel.c)147
18 files changed, 718 insertions, 269 deletions
diff --git a/include/ouroboros/frct_pci.h b/include/ouroboros/frct_pci.h
new file mode 100644
index 00000000..3a93ac1c
--- /dev/null
+++ b/include/ouroboros/frct_pci.h
@@ -0,0 +1,69 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Protocol Control Information of FRCT
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_LIB_FRCT_PCI_H
+#define OUROBOROS_LIB_FRCT_PCI_H
+
+#include <ouroboros/shm_du_buff.h>
+
+#include <stdint.h>
+#include <stdbool.h>
+
+struct frct_pci {
+ uint8_t type;
+ uint8_t flags;
+ uint64_t seqno;
+ uint64_t lwe;
+ uint64_t rwe;
+};
+
+enum pdu_types {
+ PDU_TYPE_DATA = 0x01,
+ PDU_TYPE_ACK = 0x02,
+ PDU_TYPE_FC = 0x04,
+ PDU_TYPE_ACK_AND_FC = (PDU_TYPE_ACK | PDU_TYPE_FC),
+ PDU_TYPE_CONFIG = 0x08,
+ PDU_TYPE_RENDEZ_VOUS = 0x10
+};
+
+enum config_flags {
+ CONF_RESOURCE_CONTROL = 0x01,
+ CONF_RELIABLE = 0x02,
+ CONF_ERROR_CHECK = 0x04,
+ CONF_ORDERED = 0x08,
+ CONF_PARTIAL = 0x10
+};
+
+enum data_flags {
+ FLAG_DATA_RUN = 0x01,
+ FLAG_MORE_FRAGMENTS = 0x02
+};
+
+int frct_pci_ser(struct shm_du_buff * sdb,
+ struct frct_pci * pci,
+ bool error_check);
+
+int frct_pci_des(struct shm_du_buff * sdb,
+ struct frct_pci * pci,
+ bool error_check);
+
+#endif /* OUROBOROS_LIB_FRCT_PCI_H */
diff --git a/include/ouroboros/hash.h b/include/ouroboros/hash.h
index db47c9fe..49160226 100644
--- a/include/ouroboros/hash.h
+++ b/include/ouroboros/hash.h
@@ -29,6 +29,7 @@
#include <gcrypt.h>
#endif
#include <stdint.h>
+#include <stddef.h>
/* Hash algorithms */
enum hash_algo {
@@ -58,8 +59,13 @@ enum hash_algo {
uint16_t hash_len(enum hash_algo algo);
+void mem_hash(enum hash_algo algo,
+ void * dst,
+ const uint8_t * buf,
+ size_t len);
+
void str_hash(enum hash_algo algo,
- void * buf,
+ void * dst,
const char * str);
#endif /* OUROBOROS_LIB_HASH_H */
diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h
index 5c498336..be2f836b 100644
--- a/include/ouroboros/shm_flow_set.h
+++ b/include/ouroboros/shm_flow_set.h
@@ -58,6 +58,6 @@ void shm_flow_set_notify(struct shm_flow_set * set,
ssize_t shm_flow_set_wait(const struct shm_flow_set * shm_set,
size_t idx,
int * fqueue,
- const struct timespec * timeout);
+ const struct timespec * abstime);
#endif /* OUROBOROS_SHM_FLOW_SET_H */
diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h
index 1557e50c..55d03b41 100644
--- a/include/ouroboros/shm_rbuff.h
+++ b/include/ouroboros/shm_rbuff.h
@@ -50,7 +50,7 @@ int shm_rbuff_write(struct shm_rbuff * rb,
ssize_t shm_rbuff_read(struct shm_rbuff * rb);
ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
- const struct timespec * timeout);
+ const struct timespec * abstime);
size_t shm_rbuff_queued(struct shm_rbuff * rb);
diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h
new file mode 100644
index 00000000..e259c855
--- /dev/null
+++ b/include/ouroboros/timerwheel.h
@@ -0,0 +1,45 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Ring buffer for incoming SDUs
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_LIB_TIMERWHEEL_H
+#define OUROBOROS_LIB_TIMERWHEEL_H
+
+struct timerwheel;
+
+struct timerwheel * timerwheel_create(time_t resolution,
+ time_t max_delay);
+
+void timerwheel_destroy(struct timerwheel * tw);
+
+struct tw_f * timerwheel_start(struct timerwheel * tw,
+ void (* func)(void *),
+ void * arg,
+ time_t delay); /* ms */
+
+int timerwheel_restart(struct timerwheel * tw,
+ struct tw_f * f,
+ time_t delay); /* ms */
+
+void timerwheel_stop(struct timerwheel * tw,
+ struct tw_f * f);
+
+#endif /* OUROBOROS_LIB_TIMERWHEEL_H */
diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt
index 0ead1fed..b2f350dd 100644
--- a/src/ipcpd/CMakeLists.txt
+++ b/src/ipcpd/CMakeLists.txt
@@ -2,13 +2,9 @@ set(IPCP_SOURCES
# Add source files here
${CMAKE_CURRENT_SOURCE_DIR}/ipcp.c
${CMAKE_CURRENT_SOURCE_DIR}/shim-data.c
- ${CMAKE_CURRENT_SOURCE_DIR}/timerwheel.c
)
add_subdirectory(local)
add_subdirectory(normal)
add_subdirectory(shim-udp)
add_subdirectory(shim-eth-llc)
-if (NOT APPLE)
- add_subdirectory(tests)
-endif ()
diff --git a/src/ipcpd/tests/CMakeLists.txt b/src/ipcpd/tests/CMakeLists.txt
deleted file mode 100644
index 9b5eeaa1..00000000
--- a/src/ipcpd/tests/CMakeLists.txt
+++ /dev/null
@@ -1,34 +0,0 @@
-get_filename_component(CURRENT_SOURCE_PARENT_DIR
- ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
-get_filename_component(CURRENT_BINARY_PARENT_DIR
- ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY)
-
-include_directories(${CMAKE_CURRENT_SOURCE_DIR})
-include_directories(${CMAKE_CURRENT_BINARY_DIR})
-
-include_directories(${CURRENT_SOURCE_PARENT_DIR})
-include_directories(${CURRENT_BINARY_PARENT_DIR})
-
-include_directories(${CMAKE_SOURCE_DIR}/include)
-include_directories(${CMAKE_BINARY_DIR}/include)
-
-get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
-get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
-
-create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
- # Add new tests here
- timerwheel_test.c
- )
-
-add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests})
-target_link_libraries(${PARENT_DIR}_test ouroboros)
-
-add_dependencies(check ${PARENT_DIR}_test)
-
-set(tests_to_run ${${PARENT_DIR}_tests})
-remove(tests_to_run test_suite.c)
-
-foreach (test ${tests_to_run})
- get_filename_component(test_name ${test} NAME_WE)
- add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name})
-endforeach (test)
diff --git a/src/ipcpd/timerwheel.h b/src/ipcpd/timerwheel.h
deleted file mode 100644
index 37a6d06a..00000000
--- a/src/ipcpd/timerwheel.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Ring buffer for incoming SDUs
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_IPCPD_TIMERWHEEL_H
-#define OUROBOROS_IPCPD_TIMERWHEEL_H
-
-struct timerwheel;
-
-struct timerwheel * timerwheel_create(unsigned int resolution,
- unsigned int max_delay);
-
-void timerwheel_destroy(struct timerwheel * tw);
-
-int timerwheel_add(struct timerwheel * tw,
- void (* func)(void *),
- void * arg,
- size_t arg_len,
- unsigned int delay); /* ms */
-
-#endif /* OUROBOROS_IPCPD_TIMERWHEEL_H */
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 550bbc08..728d975a 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -86,6 +86,7 @@ set(SOURCE_FILES
cdap_req.c
crc32.c
dev.c
+ frct_pci.c
hash.c
hashtable.c
irm.c
@@ -104,6 +105,7 @@ set(SOURCE_FILES
shm_rdrbuff.c
sockets.c
time_utils.c
+ timerwheel.c
tpm.c
utils.c
)
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 9354855b..e81bf105 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -34,6 +34,8 @@
#include <ouroboros/utils.h>
#include <ouroboros/fqueue.h>
#include <ouroboros/qoscube.h>
+#include <ouroboros/timerwheel.h>
+#include <ouroboros/frct_pci.h>
#include <stdlib.h>
#include <string.h>
@@ -41,8 +43,14 @@
#define BUF_SIZE 1500
+#define TW_ELEMENTS 6000
+#define TW_RESOLUTION 1 /* ms */
+
+#define MPL 2000 /* ms */
+
struct flow_set {
size_t idx;
+ bool np1_set;
};
struct fqueue {
@@ -59,6 +67,26 @@ enum port_state {
PORT_DESTROY
};
+struct frcti {
+ bool used;
+
+ struct tw_f * snd_inact;
+ bool snd_drf;
+ uint64_t snd_lwe;
+ uint64_t snd_rwe;
+
+ struct tw_f * rcv_inact;
+ bool rcv_drf;
+ uint64_t rcv_lwe;
+ uint64_t rcv_rwe;
+
+ bool resource_control;
+ bool reliable;
+ bool error_check;
+ bool ordered;
+ bool partial;
+};
+
struct port {
int fd;
@@ -89,10 +117,14 @@ struct {
struct shm_rdrbuff * rdrb;
struct shm_flow_set * fqset;
+ struct timerwheel * tw;
+ int tw_users;
+
struct bmp * fds;
struct bmp * fqueues;
struct flow * flows;
struct port * ports;
+ struct frcti * frcti;
pthread_rwlock_t lock;
} ai;
@@ -203,6 +235,242 @@ static int api_announce(char * ap_name)
return ret;
}
+/* Call under flows lock */
+static int finalize_write(int fd,
+ size_t idx)
+{
+ if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0)
+ return -ENOTALLOC;
+
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+
+ return 0;
+}
+
+static int frcti_init(int fd)
+{
+ ai.frcti[fd].used = true;
+
+ ai.frcti[fd].snd_drf = true;
+ ai.frcti[fd].snd_lwe = 0;
+ ai.frcti[fd].snd_rwe = 0;
+
+ ai.frcti[fd].rcv_drf = true;
+ ai.frcti[fd].rcv_lwe = 0;
+ ai.frcti[fd].rcv_rwe = 0;
+
+ return 0;
+}
+
+static void frcti_fini(int fd)
+{
+ struct frcti * frcti;
+
+ frcti = &(ai.frcti[fd]);
+
+ frcti->used = false;
+
+ /* FIXME: We actually need to wait until these timers become NULL. */
+ if (frcti->snd_inact != NULL)
+ timerwheel_stop(ai.tw, frcti->snd_inact);
+
+ if (frcti->rcv_inact != NULL)
+ timerwheel_stop(ai.tw, frcti->rcv_inact);
+}
+
+static int frcti_configure(int fd,
+ qosspec_t * qos)
+{
+ /* FIXME: Send configuration message here to other side. */
+
+ (void) fd;
+ (void) qos;
+
+ return 0;
+}
+
+static void frcti_snd_inactivity(void * arg)
+{
+ struct frcti * frcti;
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ frcti = (struct frcti * ) arg;
+
+ frcti->snd_drf = true;
+ frcti->snd_inact = NULL;
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+/* Called under flows lock */
+static int frcti_write(int fd,
+ struct shm_du_buff * sdb)
+{
+ struct frcti * frcti;
+ struct frct_pci pci;
+
+ memset(&pci, 0, sizeof(pci));
+
+ frcti = &(ai.frcti[fd]);
+
+ /*
+ * Set the DRF in the first packet of a new run of SDUs,
+ * otherwise simply recharge the timer.
+ */
+ if (frcti->snd_drf) {
+ frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity,
+ frcti, 2 * MPL);
+ if (frcti->snd_inact == NULL)
+ return -1;
+
+ pci.flags |= FLAG_DATA_RUN;
+ frcti->snd_drf = false;
+ } else {
+ if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL))
+ return -1;
+ }
+
+ pci.seqno = frcti->snd_lwe++;
+ pci.type |= PDU_TYPE_DATA;
+
+ if (frct_pci_ser(sdb, &pci, frcti->error_check))
+ return -1;
+
+ if (finalize_write(fd, shm_du_buff_get_idx(sdb)))
+ return -ENOTALLOC;
+
+ return 0;
+}
+
+static void frcti_rcv_inactivity(void * arg)
+{
+ struct frcti * frcti;
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ frcti = (struct frcti * ) arg;
+
+ frcti->rcv_drf = true;
+ frcti->rcv_inact = NULL;
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static ssize_t frcti_read(int fd)
+{
+ ssize_t idx = -1;
+ struct timespec abstime;
+ struct frcti * frcti;
+ struct frct_pci pci;
+ struct shm_du_buff * sdb;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
+ pthread_rwlock_unlock(&ai.lock);
+ } else {
+ struct shm_rbuff * rb = ai.flows[fd].rx_rb;
+ bool timeo = ai.flows[fd].timesout;
+ struct timespec timeout = ai.flows[fd].rcv_timeo;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ if (timeo) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, &timeout, &abstime);
+ idx = shm_rbuff_read_b(rb, &abstime);
+ } else {
+ idx = shm_rbuff_read_b(rb, NULL);
+ }
+ }
+
+ if (idx < 0)
+ return idx;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ frcti = &(ai.frcti[fd]);
+
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+ /* SDU may be corrupted. */
+ if (frct_pci_des(sdb, &pci, frcti->error_check)) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -1;
+ }
+
+ /* We don't accept packets when there is no inactivity timer. */
+ if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -1;
+ }
+
+ /*
+ * If there is an inactivity timer and the DRF is set,
+ * reset the state of the connection.
+ */
+ if (pci.flags & FLAG_DATA_RUN) {
+ frcti->rcv_drf = true;
+ if (frcti->rcv_inact != NULL)
+ timerwheel_stop(ai.tw, frcti->rcv_inact);
+ frcti->rcv_lwe = pci.seqno;
+ }
+
+ /*
+ * Start receiver inactivity if this packet has the DRF,
+ * otherwise simply restart it.
+ */
+ if (frcti->rcv_drf) {
+ frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity,
+ frcti, 3 * MPL);
+ if (frcti->rcv_inact == NULL) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -1;
+ }
+
+ frcti->rcv_drf = false;
+ } else {
+ if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -1;
+ }
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ return idx;
+}
+
+static int frcti_event_wait(struct flow_set * set,
+ struct fqueue * fq,
+ const struct timespec * timeout)
+{
+ int ret;
+
+ assert(set);
+ assert(fq);
+ assert(timeout);
+
+ /*
+ * FIXME: Return the fq only if a data SDU
+ * for the application is available.
+ */
+
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout);
+ if (ret == -ETIMEDOUT) {
+ fq->fqsize = 0;
+ return -ETIMEDOUT;
+ }
+
+ return ret;
+}
+
static void flow_clear(int fd)
{
assert(!(fd < 0));
@@ -230,6 +498,9 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL)
shm_flow_set_close(ai.flows[fd].set);
+ if (ai.frcti[fd].used)
+ frcti_fini(fd);
+
flow_clear(fd);
}
@@ -316,8 +587,14 @@ int ouroboros_init(const char * ap_name)
if (ai.flows == NULL)
goto fail_flows;
- for (i = 0; i < AP_MAX_FLOWS; ++i)
+ ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS);
+ if (ai.frcti == NULL)
+ goto fail_frcti;
+
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
flow_clear(i);
+ frcti_fini(i);
+ }
ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
if (ai.ports == NULL)
@@ -353,8 +630,15 @@ int ouroboros_init(const char * ap_name)
if (pthread_rwlock_init(&ai.lock, NULL))
goto fail_lock;
+ ai.tw = timerwheel_create(TW_RESOLUTION,
+ TW_RESOLUTION * TW_ELEMENTS);
+ if (ai.tw == NULL)
+ goto fail_timerwheel;
+
return 0;
+ fail_timerwheel:
+ pthread_rwlock_destroy(&ai.lock);
fail_lock:
for (i = 0; i < IRMD_MAX_FLOWS; ++i)
pthread_cond_destroy(&ai.ports[i].state_cond);
@@ -366,11 +650,13 @@ int ouroboros_init(const char * ap_name)
fail_ap_name:
free(ai.ports);
fail_ports:
+ free(ai.frcti);
+ fail_frcti:
free(ai.flows);
fail_flows:
shm_rdrbuff_close(ai.rdrb);
fail_rdrb:
- shm_flow_set_destroy(ai.fqset);
+ shm_flow_set_destroy(ai.fqset);
fail_fqset:
bmp_destroy(ai.fqueues);
fail_fqueues:
@@ -409,6 +695,9 @@ void ouroboros_fini()
shm_rdrbuff_close(ai.rdrb);
+ if (ai.tw != NULL)
+ timerwheel_destroy(ai.tw);
+
free(ai.flows);
free(ai.ports);
@@ -463,9 +752,15 @@ int flow_accept(qosspec_t * qs,
if (fd < 0)
return fd;
+ pthread_rwlock_wrlock(&ai.lock);
+
+ frcti_init(fd);
+
if (qs != NULL)
*qs = ai.flows[fd].spec;
+ pthread_rwlock_unlock(&ai.lock);
+
return fd;
}
@@ -505,7 +800,7 @@ int flow_alloc(const char * dst_name,
return -EIRMD;
}
- if (recv_msg->result != 0) {
+ if (recv_msg->result != 0) {
int res = recv_msg->result;
irm_msg__free_unpacked(recv_msg, NULL);
return res;
@@ -520,6 +815,22 @@ int flow_alloc(const char * dst_name,
irm_msg__free_unpacked(recv_msg, NULL);
+ if (fd < 0)
+ return fd;
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ frcti_init(fd);
+
+ if (frcti_configure(fd, qs)) {
+ flow_fini(fd);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.lock);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
return fd;
}
@@ -720,34 +1031,31 @@ ssize_t flow_write(int fd,
return idx;
}
- if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) {
- shm_rdrbuff_remove(ai.rdrb, idx);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
} else { /* blocking */
- struct shm_rdrbuff * rdrb = ai.rdrb;
- struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb;
-
pthread_rwlock_unlock(&ai.lock);
- assert(tx_rb);
-
- idx = shm_rdrbuff_write_b(rdrb,
+ idx = shm_rdrbuff_write_b(ai.rdrb,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
buf,
count);
- if (shm_rbuff_write(tx_rb, idx) < 0) {
- shm_rdrbuff_remove(rdrb, idx);
- return -ENOTALLOC;
- }
-
pthread_rwlock_rdlock(&ai.lock);
}
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ if (!ai.frcti[fd].used) {
+ if (finalize_write(fd, idx)) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -ENOTALLOC;
+ }
+ } else {
+ if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -1;
+ }
+ }
pthread_rwlock_unlock(&ai.lock);
@@ -772,21 +1080,12 @@ ssize_t flow_read(int fd,
return -ENOTALLOC;
}
- if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_rbuff_read(ai.flows[fd].rx_rb);
- pthread_rwlock_unlock(&ai.lock);
- } else {
- struct shm_rbuff * rb = ai.flows[fd].rx_rb;
- bool timeo = ai.flows[fd].timesout;
- struct timespec timeout = ai.flows[fd].rcv_timeo;
-
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&ai.lock);
- if (timeo)
- idx = shm_rbuff_read_b(rb, &timeout);
- else
- idx = shm_rbuff_read_b(rb, NULL);
- }
+ if (!ai.frcti[fd].used)
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
+ else
+ idx = frcti_read(fd);
if (idx < 0) {
assert(idx == -EAGAIN || idx == -ETIMEDOUT);
@@ -823,6 +1122,8 @@ struct flow_set * flow_set_create()
return NULL;
}
+ set->np1_set = false;
+
pthread_rwlock_unlock(&ai.lock);
return set;
@@ -891,6 +1192,9 @@ int flow_set_add(struct flow_set * set,
for (i = 0; i < sdus; i++)
shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id);
+ if (ai.frcti[fd].used)
+ set->np1_set = true;
+
pthread_rwlock_unlock(&ai.lock);
return ret;
@@ -960,7 +1264,8 @@ int flow_event_wait(struct flow_set * set,
struct fqueue * fq,
const struct timespec * timeout)
{
- ssize_t ret;
+ ssize_t ret;
+ struct timespec abstime;
if (set == NULL || fq == NULL)
return -EINVAL;
@@ -970,7 +1275,17 @@ int flow_event_wait(struct flow_set * set,
assert(!fq->next);
- ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout);
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ if (set->np1_set)
+ ret = frcti_event_wait(set, fq, &abstime);
+ else
+ ret = shm_flow_set_wait(ai.fqset, set->idx,
+ fq->fqueue, &abstime);
+
if (ret == -ETIMEDOUT) {
fq->fqsize = 0;
return -ETIMEDOUT;
@@ -1132,9 +1447,8 @@ int ipcp_flow_read(int fd,
{
ssize_t idx = -1;
int port_id = -1;
- struct shm_rbuff * rb;
- assert(fd >=0);
+ assert(fd >= 0);
assert(sdb);
pthread_rwlock_rdlock(&ai.lock);
@@ -1144,11 +1458,13 @@ int ipcp_flow_read(int fd,
return -ENOTALLOC;
}
- rb = ai.flows[fd].rx_rb;
-
pthread_rwlock_unlock(&ai.lock);
- idx = shm_rbuff_read(rb);
+ if (!ai.frcti[fd].used)
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
+ else
+ idx = frcti_read(fd);
+
if (idx < 0)
return idx;
@@ -1160,8 +1476,6 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
- size_t idx;
-
if (sdb == NULL)
return -EINVAL;
@@ -1179,10 +1493,17 @@ int ipcp_flow_write(int fd,
assert(ai.flows[fd].tx_rb);
- idx = shm_du_buff_get_idx(sdb);
-
- shm_rbuff_write(ai.flows[fd].tx_rb, idx);
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ if (!ai.frcti[fd].used) {
+ if (finalize_write(fd, shm_du_buff_get_idx(sdb))) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOTALLOC;
+ }
+ } else {
+ if (frcti_write(fd, sdb)) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -1;
+ }
+ }
pthread_rwlock_unlock(&ai.lock);
@@ -1274,32 +1595,11 @@ int local_flow_write(int fd,
return -ENOTALLOC;
}
- shm_rbuff_write(ai.flows[fd].tx_rb, idx);
-
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
-
- pthread_rwlock_unlock(&ai.lock);
-
- return 0;
-}
-
-int ipcp_read_shim(int fd,
- struct shm_du_buff ** sdb)
-{
- ssize_t idx;
-
- pthread_rwlock_rdlock(&ai.lock);
-
- assert(ai.flows[fd].rx_rb);
-
- idx = shm_rbuff_read(ai.flows[fd].rx_rb);
- if (idx < 0) {
+ if (finalize_write(fd, idx)) {
pthread_rwlock_unlock(&ai.lock);
- return -EAGAIN;
+ return -ENOTALLOC;
}
- *sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
pthread_rwlock_unlock(&ai.lock);
return 0;
diff --git a/src/lib/frct_pci.c b/src/lib/frct_pci.c
new file mode 100644
index 00000000..92cf8cd9
--- /dev/null
+++ b/src/lib/frct_pci.c
@@ -0,0 +1,105 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Protocol Control Information of FRCT
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/config.h>
+#include <ouroboros/frct_pci.h>
+#include <ouroboros/hash.h>
+#include <ouroboros/errno.h>
+
+#include <assert.h>
+#include <string.h>
+
+#define TYPE_SIZE 1
+#define SEQNO_SIZE 8
+#define FLAGS_SIZE 1
+
+/* FIXME: Head size will differ on type */
+#define HEAD_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE
+
+int frct_pci_ser(struct shm_du_buff * sdb,
+ struct frct_pci * pci,
+ bool error_check)
+{
+ uint8_t * head;
+ uint8_t * tail;
+
+ assert(sdb);
+ assert(pci);
+
+ head = shm_du_buff_head_alloc(sdb, HEAD_SIZE);
+ if (head == NULL)
+ return -EPERM;
+
+ memcpy(head, &pci->type, TYPE_SIZE);
+ memcpy(head + TYPE_SIZE, &pci->flags, FLAGS_SIZE);
+ memcpy(head + TYPE_SIZE + FLAGS_SIZE, &pci->seqno, SEQNO_SIZE);
+
+ if (error_check) {
+ tail = shm_du_buff_tail_alloc(sdb, hash_len(HASH_CRC32));
+ if (tail == NULL) {
+ shm_du_buff_head_release(sdb, HEAD_SIZE);
+ return -EPERM;
+ }
+
+ *((uint32_t *) tail) = 0;
+ mem_hash(HASH_CRC32, (uint32_t *) tail, head, tail - head);
+ }
+
+ return 0;
+}
+
+int frct_pci_des(struct shm_du_buff * sdb,
+ struct frct_pci * pci,
+ bool error_check)
+{
+ uint8_t * head;
+ uint8_t * tail;
+ uint32_t crc;
+
+ assert(sdb);
+ assert(pci);
+
+ head = shm_du_buff_head(sdb);
+
+ /* FIXME: Depending on the type a different deserialization */
+ memcpy(&pci->type, head, TYPE_SIZE);
+ memcpy(&pci->flags, head + TYPE_SIZE, FLAGS_SIZE);
+ memcpy(&pci->seqno, head + TYPE_SIZE + FLAGS_SIZE, SEQNO_SIZE);
+
+ if (error_check) {
+ tail = shm_du_buff_tail(sdb);
+ if (tail == NULL)
+ return -EPERM;
+
+ mem_hash(HASH_CRC32, &crc, head, tail - head);
+
+ /* Corrupted SDU */
+ if (crc != 0)
+ return -1;
+
+ shm_du_buff_tail_release(sdb, hash_len(HASH_CRC32));
+ }
+
+ shm_du_buff_head_release(sdb, HEAD_SIZE);
+
+ return 0;
+}
diff --git a/src/lib/hash.c b/src/lib/hash.c
index d8cabfd3..e062a0ad 100644
--- a/src/lib/hash.c
+++ b/src/lib/hash.c
@@ -64,45 +64,46 @@ uint16_t hash_len(enum hash_algo algo)
#endif
}
-void str_hash(enum hash_algo algo,
- void * buf,
- const char * str)
+void mem_hash(enum hash_algo algo,
+ void * dst,
+ const uint8_t * buf,
+ size_t len)
{
#ifdef HAVE_LIBGCRYPT
- gcry_md_hash_buffer(algo, buf, str, strlen(str));
+ gcry_md_hash_buffer(algo, dst, buf, len);
#else
struct sha3_ctx sha3_ctx;
struct md5_ctx md5_ctx;
switch (algo) {
case HASH_CRC32:
- memset(buf, 0, CRC32_HASH_LEN);
- crc32((uint32_t *) buf, str, strlen(str));
+ memset(dst, 0, CRC32_HASH_LEN);
+ crc32((uint32_t *) dst, buf, len);
break;
case HASH_MD5:
rhash_md5_init(&md5_ctx);
- rhash_md5_update(&md5_ctx, str, strlen(str));
- rhash_md5_final(&md5_ctx, (uint8_t *) buf);
+ rhash_md5_update(&md5_ctx, buf, len);
+ rhash_md5_final(&md5_ctx, (uint8_t *) dst);
break;
case HASH_SHA3_224:
rhash_sha3_224_init(&sha3_ctx);
- rhash_sha3_update(&sha3_ctx, str, strlen(str));
- rhash_sha3_final(&sha3_ctx, (uint8_t *) buf);
+ rhash_sha3_update(&sha3_ctx, buf, len);
+ rhash_sha3_final(&sha3_ctx, (uint8_t *) dst);
break;
case HASH_SHA3_256:
rhash_sha3_256_init(&sha3_ctx);
- rhash_sha3_update(&sha3_ctx, str, strlen(str));
- rhash_sha3_final(&sha3_ctx, (uint8_t *) buf);
+ rhash_sha3_update(&sha3_ctx, buf, len);
+ rhash_sha3_final(&sha3_ctx, (uint8_t *) dst);
break;
case HASH_SHA3_384:
rhash_sha3_384_init(&sha3_ctx);
- rhash_sha3_update(&sha3_ctx, str, strlen(str));
- rhash_sha3_final(&sha3_ctx, (uint8_t *) buf);
+ rhash_sha3_update(&sha3_ctx, buf, len);
+ rhash_sha3_final(&sha3_ctx, (uint8_t *) dst);
break;
case HASH_SHA3_512:
rhash_sha3_512_init(&sha3_ctx);
- rhash_sha3_update(&sha3_ctx, str, strlen(str));
- rhash_sha3_final(&sha3_ctx, (uint8_t *) buf);
+ rhash_sha3_update(&sha3_ctx, buf, len);
+ rhash_sha3_final(&sha3_ctx, (uint8_t *) dst);
break;
default:
assert(false);
@@ -110,3 +111,10 @@ void str_hash(enum hash_algo algo,
}
#endif
}
+
+void str_hash(enum hash_algo algo,
+ void * dst,
+ const char * str)
+{
+ return mem_hash(algo, dst, (const uint8_t *) str, strlen(str));
+}
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
index cd6946d4..2f1d4e33 100644
--- a/src/lib/shm_flow_set.c
+++ b/src/lib/shm_flow_set.c
@@ -326,10 +326,9 @@ void shm_flow_set_notify(struct shm_flow_set * set,
ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
size_t idx,
int * fqueue,
- const struct timespec * timeout)
+ const struct timespec * abstime)
{
ssize_t ret = 0;
- struct timespec abstime;
assert(set);
assert(idx < AP_MAX_FQUEUES);
@@ -341,19 +340,15 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
if (pthread_mutex_lock(set->lock) == EOWNERDEAD)
pthread_mutex_consistent(set->lock);
#endif
- if (timeout != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeout, &abstime);
- }
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) set->lock);
while (set->heads[idx] == 0 && ret != -ETIMEDOUT) {
- if (timeout != NULL)
+ if (abstime != NULL)
ret = -pthread_cond_timedwait(set->conds + idx,
set->lock,
- &abstime);
+ abstime);
else
ret = -pthread_cond_wait(set->conds + idx,
set->lock);
diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c
index 33e236b0..b420b785 100644
--- a/src/lib/shm_rbuff_ll.c
+++ b/src/lib/shm_rbuff_ll.c
@@ -281,9 +281,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
}
ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
- const struct timespec * timeout)
+ const struct timespec * abstime)
{
- struct timespec abstime;
ssize_t idx = -1;
assert(rb);
@@ -293,11 +292,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
if (idx != -EAGAIN)
return idx;
- if (timeout != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeout, &abstime);
- }
-
#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
@@ -308,10 +302,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
(void *) rb->lock);
while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) {
- if (timeout != NULL)
+ if (abstime != NULL)
idx = -pthread_cond_timedwait(rb->add,
rb->lock,
- &abstime);
+ abstime);
else
idx = -pthread_cond_wait(rb->add, rb->lock);
#ifdef HAVE_ROBUST_MUTEX
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c
index 44001458..7dc5f5d9 100644
--- a/src/lib/shm_rbuff_pthr.c
+++ b/src/lib/shm_rbuff_pthr.c
@@ -284,18 +284,12 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
}
ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
- const struct timespec * timeout)
+ const struct timespec * abstime)
{
- struct timespec abstime;
ssize_t idx = -1;
assert(rb);
- if (timeout != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeout, &abstime);
- }
-
#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
@@ -306,10 +300,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
(void *) rb->lock);
while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) {
- if (timeout != NULL)
+ if (abstime != NULL)
idx = -pthread_cond_timedwait(rb->add,
rb->lock,
- &abstime);
+ abstime);
else
idx = -pthread_cond_wait(rb->add, rb->lock);
#ifdef HAVE_ROBUST_MUTEX
diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt
index 41c2074a..fd3c1c6a 100644
--- a/src/lib/tests/CMakeLists.txt
+++ b/src/lib/tests/CMakeLists.txt
@@ -11,6 +11,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
rib_test.c
sha3_test.c
time_utils_test.c
+ timerwheel_test.c
)
add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests})
diff --git a/src/ipcpd/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c
index 6ba1b890..d9ca164e 100644
--- a/src/ipcpd/tests/timerwheel_test.c
+++ b/src/lib/tests/timerwheel_test.c
@@ -51,6 +51,9 @@ int timerwheel_test(int argc, char ** argv)
int check_total = 0;
int i;
+ int var = 5;
+
+ struct tw_f * f;
(void) argc;
(void) argv;
@@ -75,13 +78,12 @@ int timerwheel_test(int argc, char ** argv)
for (i = 0; i < additions; ++i) {
int delay = rand() % (resolution * elements);
- int var = rand() % 5;
check_total += var;
- if (timerwheel_add(tw,
- (void (*)(void *)) add,
- (void *) &var,
- sizeof(var),
- delay)) {
+ f = timerwheel_start(tw,
+ (void (*)(void *)) add,
+ (void *) &var,
+ delay);
+ if (f == NULL) {
printf("Failed to add function.");
return -1;
}
diff --git a/src/ipcpd/timerwheel.c b/src/lib/timerwheel.c
index 6086181a..7e2779d0 100644
--- a/src/ipcpd/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -6,17 +6,17 @@
* Dimitri Staessens <dimitri.staessens@ugent.be>
* Sander Vrijders <sander.vrijders@ugent.be>
*
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
*
- * This program is distributed in the hope that it will be useful,
+ * This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
*
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
@@ -83,8 +83,6 @@ static void tw_el_fini(struct tw_el * e)
list_for_each_safe(p, h, &e->funcs) {
struct tw_f * f = list_entry(p, struct tw_f, next);
list_del(&f->next);
- if (f->arg != NULL)
- free(f->arg);
}
}
@@ -140,8 +138,6 @@ static void * worker(void * o)
list_del(&f->next);
pthread_mutex_unlock(&tw->lock);
f->func(f->arg);
- if (f->arg != NULL)
- free(f->arg);
free(f);
pthread_mutex_lock(&tw->lock);
@@ -194,8 +190,8 @@ static void * movement(void * o)
return (void *) 0;
}
-struct timerwheel * timerwheel_create(unsigned int resolution,
- unsigned int max_delay)
+struct timerwheel * timerwheel_create(time_t resolution,
+ time_t max_delay)
{
struct timespec now = {0, 0};
struct timespec res_ts = {resolution / 1000,
@@ -221,10 +217,8 @@ struct timerwheel * timerwheel_create(unsigned int resolution,
tw->elements <<= 1;
tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements);
- if (tw->wheel == NULL) {
- free(tw);
- return NULL;
- }
+ if (tw->wheel == NULL)
+ goto fail_wheel_malloc;
tw->resolution = resolution;
@@ -233,35 +227,20 @@ struct timerwheel * timerwheel_create(unsigned int resolution,
list_head_init(&tw->wq);
- if (pthread_mutex_init(&tw->lock, NULL)) {
- free(tw->wheel);
- free(tw);
- return NULL;
- }
+ if (pthread_mutex_init(&tw->lock, NULL))
+ goto fail_lock_init;
- if (pthread_mutex_init(&tw->s_lock, NULL)) {
- pthread_mutex_destroy(&tw->lock);
- free(tw->wheel);
- free(tw);
- return NULL;
- }
+ if (pthread_mutex_init(&tw->s_lock, NULL))
+ goto fail_s_lock_init;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_cond_init;
- if (pthread_condattr_init(&cattr)) {
- pthread_mutex_destroy(&tw->lock);
- free(tw->wheel);
- free(tw);
- return NULL;
- }
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- if (pthread_cond_init(&tw->work, &cattr)) {
- pthread_mutex_destroy(&tw->s_lock);
- pthread_mutex_destroy(&tw->lock);
- free(tw->wheel);
- free(tw);
- return NULL;
- }
+ if (pthread_cond_init(&tw->work, &cattr))
+ goto fail_cond_init;
tw->pos = 0;
tw->state = TW_RUNNING;
@@ -275,27 +254,29 @@ struct timerwheel * timerwheel_create(unsigned int resolution,
ts_add(&now, &res_ts, &now);
}
- if (pthread_create(&tw->worker, NULL, worker, (void *) tw)) {
- pthread_cond_destroy(&tw->work);
- pthread_mutex_destroy(&tw->s_lock);
- pthread_mutex_destroy(&tw->lock);
- free(tw->wheel);
- free(tw);
- return NULL;
- }
+ if (pthread_create(&tw->worker, NULL, worker, (void *) tw))
+ goto fail_worker_create;
if (pthread_create(&tw->ticker, NULL, movement, (void *) tw)) {
tw_set_state(tw, TW_DESTROY);
- pthread_join(tw->worker, NULL);
- pthread_cond_destroy(&tw->work);
- pthread_mutex_destroy(&tw->s_lock);
- pthread_mutex_destroy(&tw->lock);
- free(tw->wheel);
- free(tw);
- return NULL;
+ goto fail_ticker_create;
}
return tw;
+
+ fail_ticker_create:
+ pthread_join(tw->worker, NULL);
+ fail_worker_create:
+ pthread_cond_destroy(&tw->work);
+ fail_cond_init:
+ pthread_mutex_destroy(&tw->s_lock);
+ fail_s_lock_init:
+ pthread_mutex_destroy(&tw->lock);
+ fail_lock_init:
+ free(tw->wheel);
+ fail_wheel_malloc:
+ free(tw);
+ return NULL;
}
void timerwheel_destroy(struct timerwheel * tw)
@@ -318,8 +299,6 @@ void timerwheel_destroy(struct timerwheel * tw)
list_for_each_safe(p, h, &tw->wq) {
struct tw_f * f = list_entry(p, struct tw_f, next);
list_del(&f->next);
- if (f->arg != NULL)
- free(f->arg);
free(f);
}
@@ -333,30 +312,43 @@ void timerwheel_destroy(struct timerwheel * tw)
free(tw);
}
-int timerwheel_add(struct timerwheel * tw,
- void (* func)(void *),
- void * arg,
- size_t arg_len,
- unsigned int delay)
+struct tw_f * timerwheel_start(struct timerwheel * tw,
+ void (* func)(void *),
+ void * arg,
+ time_t delay)
{
int pos;
struct tw_f * f = malloc(sizeof(*f));
if (f == NULL)
- return -ENOMEM;
+ return NULL;
f->func = func;
- f->arg = malloc(arg_len);
- if (f->arg == NULL) {
- free(f);
- return -ENOMEM;
- }
+ f->arg = arg;
+
+ assert(delay < tw->elements * tw->resolution);
- memcpy(f->arg, arg, arg_len);
+ pthread_mutex_lock(&tw->lock);
+
+ pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1);
+ list_add(&f->next, &tw->wheel[pos].funcs);
+
+ pthread_mutex_unlock(&tw->lock);
+
+ return f;
+}
+
+int timerwheel_restart(struct timerwheel * tw,
+ struct tw_f * f,
+ time_t delay)
+{
+ int pos;
+ assert(tw);
assert(delay < tw->elements * tw->resolution);
pthread_mutex_lock(&tw->lock);
+ list_del(&f->next);
pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1);
list_add(&f->next, &tw->wheel[pos].funcs);
@@ -364,3 +356,16 @@ int timerwheel_add(struct timerwheel * tw,
return 0;
}
+
+void timerwheel_stop(struct timerwheel * tw,
+ struct tw_f * f)
+{
+ assert(tw);
+
+ pthread_mutex_lock(&tw->lock);
+
+ list_del(&f->next);
+ free(f);
+
+ pthread_mutex_unlock(&tw->lock);
+}