summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt6
-rw-r--r--include/ouroboros/config.h.in20
-rw-r--r--include/ouroboros/endian.h2
-rw-r--r--include/ouroboros/np1_flow.h7
-rw-r--r--include/ouroboros/sockets.h3
-rw-r--r--include/ouroboros/tpm.h (renamed from src/lib/frct_enroll.proto)37
-rw-r--r--src/ipcpd/CMakeLists.txt2
-rw-r--r--src/ipcpd/ipcp.c186
-rw-r--r--src/ipcpd/ipcp.h9
-rw-r--r--src/ipcpd/normal/CMakeLists.txt6
-rw-r--r--src/ipcpd/normal/dht.c2383
-rw-r--r--src/ipcpd/normal/dht.h54
-rw-r--r--src/ipcpd/normal/dir.c167
-rw-r--r--src/ipcpd/normal/dir.h10
-rw-r--r--src/ipcpd/normal/dt.c14
-rw-r--r--src/ipcpd/normal/dt.h2
-rw-r--r--src/ipcpd/normal/fa.c60
-rw-r--r--src/ipcpd/normal/kademlia.proto46
-rw-r--r--src/ipcpd/normal/main.c28
-rw-r--r--src/ipcpd/normal/pol/flat.c4
-rw-r--r--src/ipcpd/normal/ribconfig.h2
-rw-r--r--src/ipcpd/normal/ribmgr.c3
-rw-r--r--src/ipcpd/normal/sdu_sched.c68
-rw-r--r--src/ipcpd/normal/tests/CMakeLists.txt37
-rw-r--r--src/ipcpd/normal/tests/dht_test.c99
-rw-r--r--src/irmd/main.c211
-rw-r--r--src/lib/CMakeLists.txt7
-rw-r--r--src/lib/dev.c371
-rw-r--r--src/lib/shm_flow_set.c6
-rw-r--r--src/lib/shm_rbuff_ll.c10
-rw-r--r--src/lib/shm_rbuff_pthr.c20
-rw-r--r--src/lib/shm_rdrbuff.c14
-rw-r--r--src/lib/tests/time_utils_test.c4
-rw-r--r--src/lib/tpm.c266
34 files changed, 3274 insertions, 890 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3c7266eb..85040496 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -81,6 +81,12 @@ add_custom_target(check COMMAND ${CMAKE_CTEST_COMMAND})
find_package(ProtobufC REQUIRED)
include_directories(${PROTOBUF_C_INCLUDE_DIRS})
+include(CheckSymbolExists)
+list(APPEND CMAKE_REQUIRED_DEFINITIONS -D_POSIX_C_SOURCE=200809L)
+list(APPEND CMAKE_REQUIRED_DEFINITIONS -D__XSI_VISIBLE=500)
+list(APPEND CMAKE_REQUIRED_LIBRARIES pthread)
+check_symbol_exists(pthread_mutexattr_setrobust pthread.h HAVE_ROBUST_MUTEX)
+
add_subdirectory(src)
add_subdirectory(include)
add_subdirectory(doc)
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index bae2d89e..4c255da5 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -3,7 +3,8 @@
*
* Configuration information
*
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -35,6 +36,7 @@
#define IPCP_SHIM_ETH_LLC_EXEC "@IPCP_SHIM_ETH_LLC_TARGET@"
#define IPCP_NORMAL_EXEC "@IPCP_NORMAL_TARGET@"
#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@"
+#cmakedefine HAVE_ROBUST_MUTEX
#define AP_MAX_FLOWS 2048
#define AP_RES_FDS 64
#define AP_MAX_FQUEUES 64
@@ -50,25 +52,21 @@
#define SHM_FLOW_SET_PREFIX "/ouroboros.sets."
#define IRMD_MAX_FLOWS 4096
/* IRMD dynamic threadpooling */
-#define IRMD_MIN_AV_THREADS 16
-#define IRMD_MAX_AV_THREADS 64
-#define IRMD_MAX_THREADS 256
+#define IRMD_MIN_THREADS 16
+#define IRMD_ADD_THREADS 32
/* IPCP dynamic threadpooling */
-#define IPCP_MIN_AV_THREADS 4
-#define IPCP_MAX_AV_THREADS 32
-#define IPCP_MAX_THREADS 64
-
+#define IPCP_MIN_THREADS 4
+#define IPCP_ADD_THREADS 16
+#define IPCP_SCHED_THREADS 8
#define IPCPD_MAX_CONNS IRMD_MAX_FLOWS
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
#define PFT_SIZE 1 << 12
/* Timeout values */
-#define IRMD_TPM_TIMEOUT 1000
-#define IPCP_TPM_TIMEOUT 1000
#define IRMD_ACCEPT_TIMEOUT 100
#define IRMD_REQ_ARR_TIMEOUT 500
#define IRMD_FLOW_TIMEOUT 5000
#define IPCP_ACCEPT_TIMEOUT 100
-#define SOCKET_TIMEOUT 4000
+#define SOCKET_TIMEOUT 10000
#define CDAP_REPLY_TIMEOUT 1000
#define ENROLL_TIMEOUT 2000
diff --git a/include/ouroboros/endian.h b/include/ouroboros/endian.h
index 16200028..873aff73 100644
--- a/include/ouroboros/endian.h
+++ b/include/ouroboros/endian.h
@@ -24,7 +24,7 @@
#ifndef OUROBOROS_ENDIAN_H
#define OUROBOROS_ENDIAN_H
-#if defined(__linux__) || defined(__CYGWIN__)
+#if defined(__linux__) || defined(__CYGWIN__) || defined(__MACH__)
#ifndef _BSD_SOURCE
#define _BSD_SOURCE
diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h
index a4e94b89..3db2a0dd 100644
--- a/include/ouroboros/np1_flow.h
+++ b/include/ouroboros/np1_flow.h
@@ -24,10 +24,13 @@
#ifndef OUROBOROS_NP1_FLOW_H
#define OUROBOROS_NP1_FLOW_H
+#include <ouroboros/qoscube.h>
+
#include <unistd.h>
-int np1_flow_alloc(pid_t n_api,
- int port_id);
+int np1_flow_alloc(pid_t n_api,
+ int port_id,
+ qoscube_t qc);
int np1_flow_resp(int port_id);
diff --git a/include/ouroboros/sockets.h b/include/ouroboros/sockets.h
index 0d65c15d..660709bf 100644
--- a/include/ouroboros/sockets.h
+++ b/include/ouroboros/sockets.h
@@ -30,9 +30,6 @@
typedef IpcpConfigMsg ipcp_config_msg_t;
typedef DifInfoMsg dif_info_msg_t;
-#include "frct_enroll.pb-c.h"
-typedef FrctEnrollMsg frct_enroll_msg_t;
-
#include "irmd_messages.pb-c.h"
typedef IrmMsg irm_msg_t;
diff --git a/src/lib/frct_enroll.proto b/include/ouroboros/tpm.h
index 497d6acc..d34f06f3 100644
--- a/src/lib/frct_enroll.proto
+++ b/include/ouroboros/tpm.h
@@ -1,10 +1,10 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * QoS messages
+ * Threadpool management
*
- * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -21,12 +21,27 @@
* 02110-1301 USA
*/
-syntax = "proto2";
+#ifndef OUROBOROS_LIB_TPM_H
+#define OUROBOROS_LIB_TPM_H
-message frct_enroll_msg {
- required bool resource_control = 1;
- required bool reliable = 2;
- required bool error_check = 3;
- required bool ordered = 4;
- required bool partial = 5;
-};
+#include <stdbool.h>
+
+int tpm_init(size_t min,
+ size_t inc,
+ void * (* func)(void *));
+
+int tpm_start(void);
+
+void tpm_stop(void);
+
+void tpm_fini(void);
+
+bool tpm_check(void);
+
+void tpm_exit(void);
+
+void tpm_dec(void);
+
+void tpm_inc(void);
+
+#endif /* OUROBOROS_LIB_TPM_H */
diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt
index 00baa762..e0b375b6 100644
--- a/src/ipcpd/CMakeLists.txt
+++ b/src/ipcpd/CMakeLists.txt
@@ -8,7 +8,7 @@ set(IPCP_SOURCES
add_subdirectory(local)
add_subdirectory(normal)
add_subdirectory(shim-udp)
-if (NOT APPLE)
+if (NOT APPLE AND NOT CMAKE_SYSTEM_NAME STREQUAL GNU)
add_subdirectory(shim-eth-llc)
endif ()
add_subdirectory(tests)
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index fdd1edc4..48ff046c 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -31,6 +31,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
+#include <ouroboros/tpm.h>
#include "ipcp.h"
@@ -56,6 +57,8 @@ void ipcp_sig_handler(int sig,
if (ipcp_get_state() == IPCP_OPERATIONAL)
ipcp_set_state(IPCP_SHUTDOWN);
}
+
+ tpm_stop();
default:
return;
}
@@ -87,51 +90,7 @@ void ipcp_hash_str(char * buf,
buf[2 * i] = '\0';
}
-static void thread_inc(void)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- ++ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static void thread_dec(void)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- --ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static bool thread_check(void)
-{
- int ret;
-
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- ret = ipcpi.threads > ipcpi.max_threads;
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-
- return ret;
-}
-
-static void thread_exit(ssize_t id)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
- bmp_release(ipcpi.thread_ids, id);
-
- --ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static void * ipcp_main_loop(void * o)
+static void * mainloop(void * o)
{
int lsockfd;
uint8_t buf[IPCP_MSG_BUF_SIZE];
@@ -147,7 +106,7 @@ static void * ipcp_main_loop(void * o)
struct timeval ltv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
- ssize_t id = (ssize_t) o;
+ (void) o;
while (true) {
#ifdef __FreeBSD__
@@ -159,8 +118,8 @@ static void * ipcp_main_loop(void * o)
if (ipcp_get_state() == IPCP_SHUTDOWN ||
ipcp_get_state() == IPCP_NULL ||
- thread_check()) {
- thread_exit(id);
+ tpm_check()) {
+ tpm_exit();
break;
}
@@ -192,7 +151,7 @@ static void * ipcp_main_loop(void * o)
continue;
}
- thread_dec();
+ tpm_dec();
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
@@ -260,7 +219,6 @@ static void * ipcp_main_loop(void * o)
ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dst_name,
&info);
-
if (ret_msg.result == 0) {
ret_msg.dif_info = &dif_info;
dif_info.dir_hash_algo = info.dir_hash_algo;
@@ -332,7 +290,9 @@ static void * ipcp_main_loop(void * o)
break;
}
- fd = np1_flow_alloc(msg->api, msg->port_id);
+ fd = np1_flow_alloc(msg->api,
+ msg->port_id,
+ msg->qoscube);
if (fd < 0) {
log_err("Failed allocating fd on port_id %d.",
msg->port_id);
@@ -409,7 +369,7 @@ static void * ipcp_main_loop(void * o)
if (buffer.len == 0) {
log_err("Failed to pack reply message");
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -417,7 +377,7 @@ static void * ipcp_main_loop(void * o)
if (buffer.data == NULL) {
log_err("Failed to create reply buffer.");
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -427,14 +387,14 @@ static void * ipcp_main_loop(void * o)
log_err("Failed to send reply message");
free(buffer.data);
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
free(buffer.data);
close(lsockfd);
- thread_inc();
+ tpm_inc();
}
return (void *) 0;
@@ -497,15 +457,6 @@ int ipcp_init(int argc,
ipcpi.state = IPCP_NULL;
ipcpi.shim_data = NULL;
- ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS);
- if (ipcpi.threadpool == NULL) {
- ret = -ENOMEM;
- goto fail_thr;
- }
-
- ipcpi.threads = 0;
- ipcpi.max_threads = IPCP_MIN_AV_THREADS;
-
ipcpi.sock_path = ipcp_sock_path(getpid());
if (ipcpi.sock_path == NULL)
goto fail_sock_path;
@@ -527,11 +478,6 @@ int ipcp_init(int argc,
goto fail_state_mtx;
}
- if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) {
- log_err("Could not create mutex.");
- goto fail_thread_lock;
- }
-
if (pthread_condattr_init(&cattr)) {
log_err("Could not create condattr.");
goto fail_cond_attr;
@@ -545,17 +491,6 @@ int ipcp_init(int argc,
goto fail_state_cond;
}
- if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) {
- log_err("Could not init condvar.");
- goto fail_thread_cond;
- }
-
- ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0);
- if (ipcpi.thread_ids == NULL) {
- log_err("Could not init condvar.");
- goto fail_bmp;
- }
-
if (pthread_mutex_init(&ipcpi.alloc_lock, NULL)) {
log_err("Failed to init mutex.");
goto fail_alloc_lock;
@@ -588,94 +523,21 @@ int ipcp_init(int argc,
fail_alloc_cond:
pthread_mutex_destroy(&ipcpi.alloc_lock);
fail_alloc_lock:
- bmp_destroy(ipcpi.thread_ids);
- fail_bmp:
- pthread_cond_destroy(&ipcpi.threads_cond);
- fail_thread_cond:
pthread_cond_destroy(&ipcpi.state_cond);
fail_state_cond:
pthread_condattr_destroy(&cattr);
fail_cond_attr:
- pthread_mutex_destroy(&ipcpi.threads_lock);
- fail_thread_lock:
pthread_mutex_destroy(&ipcpi.state_mtx);
fail_state_mtx:
close(ipcpi.sockfd);
fail_serv_sock:
free(ipcpi.sock_path);
fail_sock_path:
- free(ipcpi.threadpool);
- fail_thr:
ouroboros_fini();
return ret;
}
-void * threadpoolmgr(void * o)
-{
- pthread_attr_t pattr;
- struct timespec dl;
- struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
- (IRMD_TPM_TIMEOUT % 1000) * MILLION};
- (void) o;
-
- if (pthread_attr_init(&pattr))
- return (void *) -1;
-
- pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
-
- while (true) {
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
- if (ipcp_get_state() == IPCP_SHUTDOWN ||
- ipcp_get_state() == IPCP_NULL) {
- pthread_attr_destroy(&pattr);
- log_dbg("Waiting for threads to exit.");
- pthread_mutex_lock(&ipcpi.threads_lock);
- while (ipcpi.threads > 0)
- pthread_cond_wait(&ipcpi.threads_cond,
- &ipcpi.threads_lock);
- pthread_mutex_unlock(&ipcpi.threads_lock);
-
- log_dbg("Threadpool manager done.");
- break;
- }
-
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- if (ipcpi.threads < IPCP_MIN_AV_THREADS) {
- log_dbg("Increasing threadpool.");
- ipcpi.max_threads = IPCP_MAX_AV_THREADS;
-
- while (ipcpi.threads < ipcpi.max_threads) {
- ssize_t id = bmp_allocate(ipcpi.thread_ids);
- if (!bmp_is_id_valid(ipcpi.thread_ids, id)) {
- log_warn("IPCP threadpool exhausted.");
- break;
- }
-
- if (pthread_create(&ipcpi.threadpool[id],
- &pattr, ipcp_main_loop,
- (void *) id))
- log_warn("Failed to start new thread.");
- else
- ++ipcpi.threads;
- }
- }
-
- if (pthread_cond_timedwait(&ipcpi.threads_cond,
- &ipcpi.threads_lock,
- &dl) == ETIMEDOUT)
- if (ipcpi.threads > IPCP_MIN_AV_THREADS)
- --ipcpi.max_threads;
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
- }
-
- return (void *) 0;
-}
-
int ipcp_boot()
{
struct sigaction sig_act;
@@ -700,9 +562,15 @@ int ipcp_boot()
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
- ipcp_set_state(IPCP_INIT);
+ if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop))
+ return -1;
+
+ if (tpm_start()) {
+ tpm_fini();
+ return -1;
+ }
- pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL);
+ ipcp_set_state(IPCP_INIT);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
@@ -711,8 +579,7 @@ int ipcp_boot()
void ipcp_shutdown()
{
- pthread_join(ipcpi.tpm, NULL);
-
+ tpm_fini();
log_info("IPCP %d shutting down.", getpid());
}
@@ -722,16 +589,11 @@ void ipcp_fini()
if (unlink(ipcpi.sock_path))
log_warn("Could not unlink %s.", ipcpi.sock_path);
- bmp_destroy(ipcpi.thread_ids);
-
free(ipcpi.sock_path);
- free(ipcpi.threadpool);
shim_data_destroy(ipcpi.shim_data);
pthread_cond_destroy(&ipcpi.state_cond);
- pthread_cond_destroy(&ipcpi.threads_cond);
- pthread_mutex_destroy(&ipcpi.threads_lock);
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_cond_destroy(&ipcpi.alloc_cond);
pthread_mutex_destroy(&ipcpi.alloc_lock);
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 3f5e1bd6..fb69df5c 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -93,15 +93,6 @@ struct ipcp {
pthread_cond_t alloc_cond;
pthread_mutex_t alloc_lock;
- pthread_t * threadpool;
-
- struct bmp * thread_ids;
- size_t max_threads;
- size_t threads;
- pthread_cond_t threads_cond;
- pthread_mutex_t threads_lock;
-
- pthread_t tpm;
} ipcpi;
int ipcp_init(int argc,
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 336b0e8f..8c2d4efc 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -15,6 +15,8 @@ include_directories(${CMAKE_BINARY_DIR}/include)
set(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")
protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS flow_alloc.proto)
+protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto)
+
# Add GPB sources of policies last
protobuf_generate_c(FSO_SRCS FSO_HDRS pol/fso.proto)
@@ -22,6 +24,7 @@ set(SOURCE_FILES
# Add source files here
addr_auth.c
connmgr.c
+ dht.c
dir.c
dt.c
dt_pci.c
@@ -42,7 +45,7 @@ set(SOURCE_FILES
)
add_executable(ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES}
- ${FLOW_ALLOC_SRCS} ${FSO_SRCS})
+ ${FLOW_ALLOC_SRCS} ${FSO_SRCS} ${KAD_PROTO_SRCS})
target_link_libraries(ipcpd-normal LINK_PUBLIC ouroboros)
include(AddCompileFlags)
@@ -53,3 +56,4 @@ endif (CMAKE_BUILD_TYPE MATCHES Debug)
install(TARGETS ipcpd-normal RUNTIME DESTINATION sbin)
add_subdirectory(pol/tests)
+add_subdirectory(tests)
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
new file mode 100644
index 00000000..74618658
--- /dev/null
+++ b/src/ipcpd/normal/dht.c
@@ -0,0 +1,2383 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Distributed Hash Table based on Kademlia
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#define OUROBOROS_PREFIX "dht"
+
+#include <ouroboros/config.h>
+#include <ouroboros/hash.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/list.h>
+#include <ouroboros/random.h>
+#include <ouroboros/time_utils.h>
+#include <ouroboros/utils.h>
+
+#include "dht.h"
+#include "dt.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <inttypes.h>
+
+#include "kademlia.pb-c.h"
+typedef KadMsg kad_msg_t;
+typedef KadContactMsg kad_contact_msg_t;
+
+#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */
+#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */
+#define KAD_K 8 /* Replication factor, MDHT value. */
+#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */
+#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */
+#define KAD_T_JOIN 6 /* Response time to wait for a join. */
+#define KAD_T_RESP 2 /* Response time to wait for a response. */
+#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */
+#define KAD_QUEER 15 /* Time to declare peer questionable. */
+#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */
+#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */
+
+enum dht_state {
+ DHT_INIT = 0,
+ DHT_RUNNING,
+ DHT_SHUTDOWN,
+};
+
+enum kad_code {
+ KAD_JOIN = 0,
+ KAD_FIND_NODE,
+ KAD_FIND_VALUE,
+ /* Messages without a response below. */
+ KAD_STORE,
+ KAD_RESPONSE
+};
+
+enum kad_req_state {
+ REQ_NULL = 0,
+ REQ_INIT,
+ REQ_PENDING,
+ REQ_RESPONSE,
+ REQ_DONE,
+ REQ_DESTROY
+};
+
+enum lookup_state {
+ LU_NULL = 0,
+ LU_INIT,
+ LU_PENDING,
+ LU_UPDATE,
+ LU_COMPLETE,
+ LU_DONE,
+ LU_DESTROY
+};
+
+struct kad_req {
+ struct list_head next;
+
+ uint32_t cookie;
+ enum kad_code code;
+ uint8_t * key;
+ uint64_t addr;
+
+ enum kad_req_state state;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+
+ time_t t_exp;
+};
+
+struct lookup {
+ struct list_head next;
+
+ uint8_t * key;
+
+ struct list_head contacts;
+ size_t n_contacts;
+
+ uint64_t * addrs;
+ size_t n_addrs;
+
+ enum lookup_state state;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+};
+
+struct val {
+ struct list_head next;
+
+ uint64_t addr;
+
+ time_t t_exp;
+ time_t t_rep;
+};
+
+struct ref_entry {
+ struct list_head next;
+
+ uint8_t * key;
+
+ time_t t_rep;
+};
+
+struct dht_entry {
+ struct list_head next;
+
+ uint8_t * key;
+ size_t n_vals;
+ struct list_head vals;
+};
+
+struct contact {
+ struct list_head next;
+
+ uint8_t * id;
+ uint64_t addr;
+
+ size_t fails;
+ time_t t_seen;
+};
+
+struct bucket {
+ struct list_head contacts;
+ size_t n_contacts;
+
+ struct list_head alts;
+ size_t n_alts;
+
+ time_t t_refr;
+
+ size_t depth;
+ uint8_t mask;
+
+ struct bucket * parent;
+ struct bucket * children[1L << KAD_BETA];
+};
+
+struct dht {
+ size_t alpha;
+ size_t b;
+ size_t k;
+
+ time_t t_expire;
+ time_t t_refresh;
+ time_t t_replic;
+ time_t t_repub;
+
+ uint8_t * id;
+ uint64_t addr;
+
+ struct bucket * buckets;
+
+ struct list_head entries;
+
+ struct list_head refs;
+
+ struct list_head lookups;
+
+ struct list_head requests;
+ struct bmp * cookies;
+
+ enum dht_state state;
+ pthread_mutex_t mtx;
+
+ pthread_rwlock_t lock;
+
+ int fd;
+
+ pthread_t worker;
+};
+
+static uint8_t * dht_dup_key(const uint8_t * key,
+ size_t len)
+{
+ uint8_t * dup;
+
+ dup = malloc(sizeof(*dup) * len);
+ if (dup == NULL)
+ return NULL;
+
+ memcpy(dup, key, len);
+
+ return dup;
+}
+
+static enum dht_state dht_get_state(struct dht * dht)
+{
+ enum dht_state state;
+
+ pthread_mutex_lock(&dht->mtx);
+
+ state = dht->state;
+
+ pthread_mutex_unlock(&dht->mtx);
+
+ return state;
+}
+
+static void dht_set_state(struct dht * dht,
+ enum dht_state state)
+{
+ pthread_mutex_lock(&dht->mtx);
+
+ dht->state = state;
+
+ pthread_mutex_unlock(&dht->mtx);
+}
+
+static uint8_t * create_id(size_t len)
+{
+ uint8_t * id;
+
+ id = malloc(len);
+ if (id == NULL)
+ return NULL;
+
+ if (random_buffer(id, len) < 0) {
+ free(id);
+ return NULL;
+ }
+
+ return id;
+}
+
+static struct kad_req * kad_req_create(struct dht * dht,
+ kad_msg_t * msg,
+ uint64_t addr)
+{
+ struct kad_req * req;
+ pthread_condattr_t cattr;
+ struct timespec t;
+
+ req = malloc(sizeof(*req));
+ if (req == NULL)
+ return NULL;
+
+ list_head_init(&req->next);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ req->t_exp = t.tv_sec + KAD_T_RESP;
+ req->addr = addr;
+ req->state = REQ_INIT;
+ req->cookie = msg->cookie;
+ req->code = msg->code;
+ req->key = NULL;
+
+ if (msg->has_key) {
+ req->key = dht_dup_key(msg->key.data, dht->b);
+ if (req->key == NULL) {
+ free(req);
+ return NULL;
+ }
+ }
+
+ if (pthread_mutex_init(&req->lock, NULL)) {
+ free(req->key);
+ free(req);
+ return NULL;
+ }
+
+ pthread_condattr_init(&cattr);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+
+ if (pthread_cond_init(&req->cond, &cattr)) {
+ pthread_condattr_destroy(&cattr);
+ pthread_mutex_destroy(&req->lock);
+ free(req->key);
+ free(req);
+ return NULL;
+ }
+
+ pthread_condattr_destroy(&cattr);
+
+ return req;
+}
+
+static void kad_req_destroy(struct kad_req * req)
+{
+ assert(req);
+
+ if (req->key != NULL)
+ free(req->key);
+
+ pthread_mutex_lock(&req->lock);
+
+ switch (req->state) {
+ case REQ_DESTROY:
+ pthread_mutex_unlock(&req->lock);
+ return;
+ case REQ_PENDING:
+ req->state = REQ_DESTROY;
+ pthread_cond_signal(&req->cond);
+ break;
+ case REQ_INIT:
+ case REQ_DONE:
+ req->state = REQ_NULL;
+ break;
+ case REQ_RESPONSE:
+ case REQ_NULL:
+ default:
+ break;
+ }
+
+ while (req->state != REQ_NULL)
+ pthread_cond_wait(&req->cond, &req->lock);
+
+ pthread_mutex_unlock(&req->lock);
+
+ pthread_cond_destroy(&req->cond);
+ pthread_mutex_destroy(&req->lock);
+
+ free(req);
+}
+
+static int kad_req_wait(struct kad_req * req,
+ time_t t)
+{
+ struct timespec timeo = {t, 0};
+ struct timespec abs;
+ int ret = 0;
+
+ assert(req);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+ ts_add(&abs, &timeo, &abs);
+
+ pthread_mutex_lock(&req->lock);
+
+ req->state = REQ_PENDING;
+
+ while (req->state == REQ_PENDING && ret != -ETIMEDOUT)
+ ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs);
+
+ switch(req->state) {
+ case REQ_DESTROY:
+ ret = -1;
+ req->state = REQ_NULL;
+ pthread_cond_signal(&req->cond);
+ break;
+ case REQ_PENDING: /* ETIMEDOUT */
+ case REQ_RESPONSE:
+ req->state = REQ_DONE;
+ pthread_cond_signal(&req->cond);
+ break;
+ default:
+ break;
+ }
+
+ pthread_mutex_unlock(&req->lock);
+
+ return ret;
+}
+
+static void kad_req_respond(struct kad_req * req)
+{
+ pthread_mutex_lock(&req->lock);
+
+ req->state = REQ_RESPONSE;
+ pthread_cond_signal(&req->cond);
+
+ pthread_mutex_unlock(&req->lock);
+}
+
+static struct contact * contact_create(const uint8_t * id,
+ size_t len,
+ uint64_t addr)
+{
+ struct contact * c;
+ struct timespec t;
+
+ c = malloc(sizeof(*c));
+ if (c == NULL)
+ return NULL;
+
+ list_head_init(&c->next);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ c->addr = addr;
+ c->fails = 0;
+ c->t_seen = t.tv_sec;
+ c->id = dht_dup_key(id, len);
+ if (c->id == NULL) {
+ free(c);
+ return NULL;
+ }
+
+ return c;
+}
+
+static void contact_destroy(struct contact * c)
+{
+ if (c != NULL)
+ free(c->id);
+
+ free(c);
+}
+
+static struct bucket * iter_bucket(struct bucket * b,
+ const uint8_t * id)
+{
+ uint8_t byte;
+ uint8_t mask;
+
+ assert(b);
+
+ if (b->children[0] == NULL)
+ return b;
+
+ byte = id[(b->depth * KAD_BETA) / CHAR_BIT];
+
+ mask = ((1L << KAD_BETA) - 1) & 0xFF;
+
+ byte >>= (CHAR_BIT - KAD_BETA) -
+ (((b->depth) * KAD_BETA) & (CHAR_BIT - 1));
+
+ return iter_bucket(b->children[(byte & mask)], id);
+}
+
+static struct bucket * dht_get_bucket(struct dht * dht,
+ const uint8_t * id)
+{
+ assert(dht->buckets);
+
+ return iter_bucket(dht->buckets, id);
+}
+
+/*
+ * If someone builds a network where the n (n > k) closest nodes all
+ * have IDs starting with the same 64 bits: by all means, change this.
+ */
+static uint64_t dist(const uint8_t * src,
+ const uint8_t * dst)
+{
+ return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst));
+}
+
+static size_t list_add_sorted(struct list_head * l,
+ struct contact * c,
+ const uint8_t * key)
+{
+ struct list_head * p;
+
+ assert(l);
+ assert(c);
+ assert(key);
+ assert(c->id);
+
+ list_for_each(p, l) {
+ struct contact * e = list_entry(p, struct contact, next);
+ if (dist(c->id, key) > dist(e->id, key))
+ break;
+ }
+
+ list_add_tail(&c->next, p);
+
+ return 1;
+}
+
+static size_t dht_contact_list(struct dht * dht,
+ struct list_head * l,
+ const uint8_t * key)
+{
+ struct list_head * p;
+ struct bucket * b;
+ size_t len = 0;
+ size_t i;
+ struct timespec t;
+
+ assert(l);
+ assert(dht);
+ assert(key);
+ assert(list_is_empty(l));
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ b = dht_get_bucket(dht, key);
+ if (b == NULL)
+ return 0;
+
+ b->t_refr = t.tv_sec + KAD_T_REFR;
+
+ if (b->n_contacts == dht->k || b->parent == NULL) {
+ list_for_each(p, &b->contacts) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ c = contact_create(c->id, dht->b, c->addr);
+ if (list_add_sorted(l, c, key) == 1)
+ if (++len > dht->k)
+ break;
+ }
+ } else {
+ struct bucket * d = b->parent;
+ for (i = 0; i < (1L << KAD_BETA); ++i) {
+ list_for_each(p, &d->children[i]->contacts) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ c = contact_create(c->id, dht->b, c->addr);
+ if (c == NULL)
+ continue;
+ if (list_add_sorted(l, c, key) == 1)
+ if (++len > dht->k)
+ break;
+ }
+ }
+ }
+
+ assert(len == dht->k || b->parent == NULL);
+
+ return len;
+}
+
+static struct lookup * lookup_create(struct dht * dht,
+ const uint8_t * id)
+{
+ struct lookup * lu;
+ pthread_condattr_t cattr;
+
+ assert(dht);
+ assert(id);
+
+ lu = malloc(sizeof(*lu));
+ if (lu == NULL)
+ goto fail_malloc;
+
+ list_head_init(&lu->contacts);
+
+ lu->state = LU_INIT;
+ lu->addrs = NULL;
+ lu->n_addrs = 0;
+ lu->key = dht_dup_key(id, dht->b);
+ if (lu->key == NULL)
+ goto fail_id;
+
+ if (pthread_mutex_init(&lu->lock, NULL))
+ goto fail_mutex;
+
+ pthread_condattr_init(&cattr);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+
+ if (pthread_cond_init(&lu->cond, &cattr))
+ goto fail_cond;
+
+ pthread_condattr_destroy(&cattr);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_add(&lu->next, &dht->lookups);
+
+ lu->n_contacts = dht_contact_list(dht, &lu->contacts, id);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return lu;
+
+ fail_cond:
+ pthread_condattr_destroy(&cattr);
+ pthread_mutex_destroy(&lu->lock);
+ fail_mutex:
+ free(lu->key);
+ fail_id:
+ free(lu);
+ fail_malloc:
+ return NULL;
+}
+
+static void lookup_destroy(struct lookup * lu)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(lu);
+
+ pthread_mutex_lock(&lu->lock);
+
+ switch (lu->state) {
+ case LU_DESTROY:
+ pthread_mutex_unlock(&lu->lock);
+ return;
+ case LU_PENDING:
+ lu->state = LU_DESTROY;
+ pthread_cond_signal(&lu->cond);
+ break;
+ case LU_INIT:
+ case LU_DONE:
+ case LU_UPDATE:
+ case LU_COMPLETE:
+ lu->state = REQ_NULL;
+ break;
+ case LU_NULL:
+ default:
+ break;
+ }
+
+ while (lu->state != LU_NULL)
+ pthread_cond_wait(&lu->cond, &lu->lock);
+
+ if (lu->key != NULL)
+ free(lu->key);
+ if (lu->addrs != NULL)
+ free(lu->addrs);
+
+ list_for_each_safe(p, h, &lu->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ }
+
+ pthread_mutex_unlock(&lu->lock);
+
+ pthread_cond_destroy(&lu->cond);
+ pthread_mutex_destroy(&lu->lock);
+
+ free(lu);
+}
+
+static void lookup_update(struct dht * dht,
+ struct lookup * lu,
+ kad_msg_t * msg)
+{
+ struct list_head * p = NULL;
+ struct contact * c = NULL;
+ size_t n;
+ size_t pos = 0;
+
+ assert(lu);
+ assert(msg);
+
+ if (dht_get_state(dht) != DHT_RUNNING)
+ return;
+
+ pthread_mutex_lock(&lu->lock);
+
+ if (msg->n_addrs > 0) {
+ if (lu->addrs == NULL) {
+ lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs);
+ for (n = 0; n < msg->n_addrs; ++n)
+ lu->addrs[n] = msg->addrs[n];
+ lu->n_addrs = msg->n_addrs;
+ }
+ lu->state = LU_COMPLETE;
+ pthread_cond_signal(&lu->cond);
+ pthread_mutex_unlock(&lu->lock);
+ return;
+ }
+
+ while (lu->state == LU_INIT)
+ pthread_cond_wait(&lu->cond, &lu->lock);
+
+ for (n = 0; n < msg->n_contacts; ++n) {
+ c = contact_create(msg->contacts[n]->id.data,
+ dht->b, msg->contacts[n]->addr);
+ if (c == NULL)
+ continue;
+
+ list_for_each(p, &lu->contacts) {
+ struct contact * e;
+ e = list_entry(p, struct contact, next);
+ if (!memcmp(e->id, c->id, dht->b)) {
+ contact_destroy(c);
+ c = NULL;
+ break;
+ }
+
+ if (dist(c->id, lu->key) > dist(e->id, lu->key))
+ break;
+ pos++;
+ }
+
+ if (c == NULL)
+ continue;
+
+ if (lu->n_contacts < dht->k) {
+ list_add_tail(&c->next, p);
+ ++lu->n_contacts;
+ } else if (pos == dht->k) {
+ contact_destroy(c);
+ continue;
+ } else {
+ struct contact * d;
+ list_add_tail(&c->next, p);
+ d = list_last_entry(&lu->contacts, struct contact, next);
+ list_del(&d->next);
+ contact_destroy(d);
+ }
+ }
+
+ lu->state = LU_UPDATE;
+ pthread_cond_signal(&lu->cond);
+ pthread_mutex_unlock(&lu->lock);
+ return;
+}
+
+static ssize_t lookup_get_addrs(struct lookup * lu,
+ uint64_t * addrs)
+{
+ ssize_t n;
+
+ assert(lu);
+
+ pthread_mutex_lock(&lu->lock);
+
+ for (n = 0; (size_t) n < lu->n_addrs; ++n)
+ addrs[n] = lu->addrs[n];
+
+ assert((size_t) n == lu->n_addrs);
+
+ pthread_mutex_unlock(&lu->lock);
+
+ return n;
+}
+
+static ssize_t lookup_contact_addrs(struct lookup * lu,
+ uint64_t * addrs)
+{
+ struct list_head * p;
+ ssize_t n = 0;
+
+ assert(lu);
+ assert(addrs);
+
+ pthread_mutex_lock(&lu->lock);
+
+ list_for_each(p, &lu->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ addrs[n] = c->addr;
+ n++;
+ }
+
+ pthread_mutex_unlock(&lu->lock);
+
+ return n;
+}
+
+static void lookup_new_addrs(struct lookup * lu,
+ uint64_t * addrs)
+{
+ struct list_head * p;
+ size_t n = 0;
+
+ assert(lu);
+ assert(addrs);
+
+ pthread_mutex_lock(&lu->lock);
+
+ /* Uses fails to check if the contact has been contacted. */
+ list_for_each(p, &lu->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ if (c->fails == 0) {
+ c->fails = 1;
+ addrs[n] = c->addr;
+ n++;
+ }
+
+ if (n == KAD_ALPHA)
+ break;
+ }
+
+ assert(n <= KAD_ALPHA);
+
+ addrs[n] = 0;
+
+ if (n == 0)
+ lu->state = LU_DONE;
+
+ pthread_mutex_unlock(&lu->lock);
+}
+
+static enum lookup_state lookup_wait(struct lookup * lu)
+{
+ enum lookup_state state;
+
+ pthread_mutex_lock(&lu->lock);
+
+ lu->state = LU_PENDING;
+ pthread_cond_signal(&lu->cond);
+
+ pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu);
+
+ while (lu->state == LU_PENDING)
+ pthread_cond_wait(&lu->cond, &lu->lock);
+
+ pthread_cleanup_pop(false);
+
+ state = lu->state;
+
+ pthread_mutex_unlock(&lu->lock);
+
+ return state;
+}
+
+static struct kad_req * dht_find_request(struct dht * dht,
+ kad_msg_t * msg)
+{
+ struct list_head * p;
+
+ assert(dht);
+ assert(msg);
+
+ list_for_each(p, &dht->requests) {
+ struct kad_req * r = list_entry(p, struct kad_req, next);
+ if (r->cookie == msg->cookie)
+ return r;
+ }
+
+ return NULL;
+}
+
+static struct lookup * dht_find_lookup(struct dht * dht,
+ const uint8_t * key)
+{
+ struct list_head * p;
+
+ assert(dht);
+ assert(key);
+
+ list_for_each(p, &dht->lookups) {
+ struct lookup * l = list_entry(p, struct lookup, next);
+ if (!memcmp(l->key, key, dht->b))
+ return l;
+ }
+
+ return NULL;
+}
+
+static struct val * val_create(uint64_t addr,
+ time_t exp)
+{
+ struct val * v;
+ struct timespec t;
+
+ v = malloc(sizeof(*v));
+ if (v == NULL)
+ return NULL;
+
+ list_head_init(&v->next);
+ v->addr = addr;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ v->t_exp = t.tv_sec + exp;
+ v->t_rep = t.tv_sec + KAD_T_REPL;
+
+ return v;
+}
+
+static void val_destroy(struct val * v)
+{
+ assert(v);
+
+ free(v);
+}
+
+static struct ref_entry * ref_entry_create(struct dht * dht,
+ const uint8_t * key)
+{
+ struct ref_entry * e;
+ struct timespec t;
+
+ assert(dht);
+ assert(key);
+
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ e->key = dht_dup_key(key, dht->b);
+ if (e->key == NULL) {
+ free(e);
+ return NULL;
+ }
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ e->t_rep = t.tv_sec + dht->t_repub;
+
+ return e;
+}
+
+static void ref_entry_destroy(struct ref_entry * e)
+{
+ free(e->key);
+ free(e);
+}
+
+static struct dht_entry * dht_entry_create(struct dht * dht,
+ const uint8_t * key)
+{
+ struct dht_entry * e;
+
+ assert(dht);
+ assert(key);
+
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ list_head_init(&e->next);
+ list_head_init(&e->vals);
+
+ e->n_vals = 0;
+
+ e->key = dht_dup_key(key, dht->b);
+ if (e->key == NULL) {
+ free(e);
+ return NULL;
+ }
+
+ return e;
+}
+
+static void dht_entry_destroy(struct dht_entry * e)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(e);
+
+ list_for_each_safe(p, h, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ list_del(&v->next);
+ val_destroy(v);
+ }
+
+ free(e->key);
+
+ free(e);
+}
+
+static int dht_entry_add_addr(struct dht_entry * e,
+ uint64_t addr,
+ time_t exp)
+{
+ struct list_head * p;
+ struct val * val;
+ struct timespec t;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ list_for_each(p, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ if (v->addr == addr) {
+ if (v->t_exp < t.tv_sec + exp) {
+ v->t_exp = t.tv_sec + exp;
+ v->t_rep = t.tv_sec + KAD_T_REPL;
+ }
+
+ return 0;
+ }
+ }
+
+ val = val_create(addr, exp);
+ if (val == NULL)
+ return -ENOMEM;
+
+ list_add(&val->next, &e->vals);
+ ++e->n_vals;
+
+ return 0;
+}
+
+
+static void dht_entry_del_addr(struct dht_entry * e,
+ uint64_t addr)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(e);
+
+ list_for_each_safe(p, h, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ if (v->addr == addr) {
+ list_del(&v->next);
+ val_destroy(v);
+ --e->n_vals;
+ }
+ }
+
+ if (e->n_vals == 0) {
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ }
+}
+
+static uint64_t dht_entry_get_addr(struct dht * dht,
+ struct dht_entry * e)
+{
+ struct list_head * p;
+
+ assert(e);
+ assert(!list_is_empty(&e->vals));
+
+ list_for_each(p, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ if (v->addr != dht->addr)
+ return v->addr;
+ }
+
+ return 0;
+}
+
+/* Forward declaration. */
+static struct lookup * kad_lookup(struct dht * dht,
+ const uint8_t * key,
+ enum kad_code code);
+
+
+/* Build a refresh list. */
+static void bucket_refresh(struct dht * dht,
+ struct bucket * b,
+ time_t t,
+ struct list_head * r)
+{
+ size_t i;
+
+ if (*b->children != NULL)
+ for (i = 0; i < (1L << KAD_BETA); ++i)
+ bucket_refresh(dht, b->children[i], t, r);
+
+ if (b->n_contacts == 0)
+ return;
+
+ if (t > b->t_refr) {
+ struct contact * c;
+ struct contact * d;
+ c = list_first_entry(&b->contacts, struct contact, next);
+ d = contact_create(c->id, dht->b, c->addr);
+ if (c != NULL)
+ list_add(&d->next, r);
+ return;
+ }
+}
+
+
+static struct bucket * bucket_create(void)
+{
+ struct bucket * b;
+ struct timespec t;
+ size_t i;
+
+ b = malloc(sizeof(*b));
+ if (b == NULL)
+ return NULL;
+
+ list_head_init(&b->contacts);
+ b->n_contacts = 0;
+
+ list_head_init(&b->alts);
+ b->n_alts = 0;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+ b->t_refr = t.tv_sec + KAD_T_REFR;
+
+ for (i = 0; i < (1L << KAD_BETA); ++i)
+ b->children[i] = NULL;
+
+ b->parent = NULL;
+ b->depth = 0;
+
+ return b;
+}
+
+static void bucket_destroy(struct bucket * b)
+{
+ struct list_head * p;
+ struct list_head * h;
+ size_t i;
+
+ assert(b);
+
+ for (i = 0; i < (1L << KAD_BETA); ++i)
+ if (b->children[i] != NULL)
+ bucket_destroy(b->children[i]);
+
+ list_for_each_safe(p, h, &b->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->n_contacts;
+ }
+
+ list_for_each_safe(p, h, &b->alts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->n_contacts;
+ }
+
+ free(b);
+}
+
+static bool bucket_has_id(struct bucket * b,
+ const uint8_t * id)
+{
+ uint8_t mask;
+ uint8_t byte;
+
+ if (b->depth == 0)
+ return true;
+
+ byte = id[(b->depth * KAD_BETA) / CHAR_BIT];
+
+ mask = ((1L << KAD_BETA) - 1) & 0xFF;
+
+ byte >>= (CHAR_BIT - KAD_BETA) -
+ (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1));
+
+ return ((byte & mask) == b->mask);
+}
+
+static int split_bucket(struct bucket * b)
+{
+ struct list_head * p;
+ struct list_head * h;
+ uint8_t mask = 0;
+ size_t i;
+ size_t c;
+
+ assert(b);
+ assert(b->n_alts == 0);
+ assert(b->n_contacts);
+ assert(b->children[0] == NULL);
+
+ c = b->n_contacts;
+
+ for (i = 0; i < (1L << KAD_BETA); ++i) {
+ b->children[i] = bucket_create();
+ if (b->children[i] == NULL) {
+ size_t j;
+ for (j = 0; j < i; ++j)
+ bucket_destroy(b->children[j]);
+ return -1;
+ }
+
+ b->children[i]->depth = b->depth + 1;
+ b->children[i]->mask = mask;
+ b->children[i]->parent = b;
+
+ list_for_each_safe(p, h, &b->contacts) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ if (bucket_has_id(b->children[i], c->id)) {
+ list_del(&c->next);
+ --b->n_contacts;
+ list_add(&c->next, &b->children[i]->contacts);
+ ++b->children[i]->n_contacts;
+ }
+ }
+
+ mask++;
+ }
+
+ for (i = 0; i < (1L << KAD_BETA); ++i)
+ if (b->children[i]->n_contacts == c)
+ split_bucket(b->children[i]);
+
+ return 0;
+}
+
+/* Locked externally to mandate update as (final) part of join transaction. */
+static int dht_update_bucket(struct dht * dht,
+ const uint8_t * id,
+ uint64_t addr)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct bucket * b;
+ struct contact * c;
+
+ assert(dht);
+
+ b = dht_get_bucket(dht, id);
+ if (b == NULL)
+ return -1;
+
+ c = contact_create(id, dht->b, addr);
+ if (c == NULL)
+ return -1;
+
+ list_for_each_safe(p, h, &b->contacts) {
+ struct contact * d = list_entry(p, struct contact, next);
+ if (d->addr == addr) {
+ list_del(&d->next);
+ contact_destroy(d);
+ --b->n_contacts;
+ }
+ }
+
+ if (b->n_contacts == dht->k) {
+ if (bucket_has_id(b, dht->id)) {
+ list_add_tail(&c->next, &b->contacts);
+ ++b->n_contacts;
+ if (split_bucket(b)) {
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->n_contacts;
+ }
+ } else if (b->n_alts == dht->k) {
+ struct contact * d;
+ d = list_first_entry(&b->alts, struct contact, next);
+ list_del(&d->next);
+ contact_destroy(d);
+ list_add_tail(&c->next, &b->alts);
+ } else {
+ list_add_tail(&c->next, &b->alts);
+ ++b->n_alts;
+ }
+ } else {
+ list_add_tail(&c->next, &b->contacts);
+ ++b->n_contacts;
+ }
+
+ return 0;
+}
+
+static int send_msg(struct dht * dht,
+ kad_msg_t * msg,
+ uint64_t addr)
+{
+ struct shm_du_buff * sdb;
+ struct kad_req * req;
+ size_t len;
+ int retr = 0;
+
+ if (msg->code == KAD_RESPONSE)
+ retr = KAD_RESP_RETR;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ if (dht->id != NULL) {
+ msg->has_s_id = true;
+ msg->s_id.data = dht->id;
+ msg->s_id.len = dht->b;
+ }
+
+ msg->s_addr = dht->addr;
+
+ if (msg->code < KAD_STORE) {
+ msg->cookie = bmp_allocate(dht->cookies);
+ if (!bmp_is_id_valid(dht->cookies, msg->cookie))
+ goto fail_bmp_alloc;
+ }
+
+ len = kad_msg__get_packed_size(msg);
+ if (len == 0)
+ goto fail_msg;
+
+ if (ipcp_sdb_reserve(&sdb, len))
+ goto fail_msg;
+
+ kad_msg__pack(msg, shm_du_buff_head(sdb));
+
+#ifndef __DHT_TEST__
+ while (retr >= 0) {
+ if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb))
+ retr--;
+ else
+ break;
+ sleep(1);
+ }
+
+ if (retr < 0)
+ goto fail_write;
+#else
+ (void) addr;
+ (void) retr;
+ ipcp_sdb_release(sdb);
+#endif /* __DHT_TEST__ */
+
+ if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) {
+ req = kad_req_create(dht, msg, addr);
+ if (req != NULL)
+ list_add(&req->next, &dht->requests);
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+
+#ifndef __DHT_TEST__
+ fail_write:
+ ipcp_sdb_release(sdb);
+#endif
+ fail_msg:
+ bmp_release(dht->cookies, msg->cookie);
+ fail_bmp_alloc:
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+}
+
+static struct dht_entry * dht_find_entry(struct dht * dht,
+ const uint8_t * key)
+{
+ struct list_head * p;
+
+ list_for_each(p, &dht->entries) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ if (!memcmp(key, e->key, dht->b))
+ return e;
+ }
+
+ return NULL;
+}
+
+static int kad_add(struct dht * dht,
+ const kad_contact_msg_t * contacts,
+ ssize_t n,
+ time_t exp)
+{
+ struct dht_entry * e;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ while (n-- > 0) {
+ if (contacts[n].id.len != dht->b)
+ log_warn("Bad key length in contact data.");
+
+ e = dht_find_entry(dht, contacts[n].id.data);
+ if (e != NULL) {
+ if (dht_entry_add_addr(e, contacts[n].addr, exp))
+ goto fail;
+ } else {
+ e = dht_entry_create(dht, contacts[n].id.data);
+ if (e == NULL)
+ goto fail;
+
+ if (dht_entry_add_addr(e, contacts[n].addr, exp)) {
+ dht_entry_destroy(e);
+ goto fail;
+ }
+
+ list_add(&e->next, &dht->entries);
+ }
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+ return 0;
+
+ fail:
+ pthread_rwlock_unlock(&dht->lock);
+ return -ENOMEM;
+}
+
+static int wait_resp(struct dht * dht,
+ kad_msg_t * msg,
+ time_t timeo)
+{
+ struct kad_req * req;
+
+ assert(dht);
+ assert(msg);
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ req = dht_find_request(dht, msg);
+ if (req == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return -EPERM;
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return kad_req_wait(req, timeo);
+}
+
+static int kad_store(struct dht * dht,
+ const uint8_t * key,
+ uint64_t addr,
+ uint64_t r_addr,
+ time_t ttl)
+{
+ kad_msg_t msg = KAD_MSG__INIT;
+ kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT;
+ kad_contact_msg_t * cmsgp[1];
+
+ cmsg.id.data = (uint8_t *) key;
+ cmsg.id.len = dht->b;
+ cmsg.addr = addr;
+
+ cmsgp[0] = &cmsg;
+
+ msg.code = KAD_STORE;
+ msg.has_t_expire = true;
+ msg.t_expire = ttl;
+ msg.n_contacts = 1;
+ msg.contacts = cmsgp;
+
+ if (send_msg(dht, &msg, r_addr))
+ return -1;
+
+ return 0;
+}
+
+static ssize_t kad_find(struct dht * dht,
+ const uint8_t * key,
+ const uint64_t * addrs,
+ enum kad_code code)
+{
+ kad_msg_t msg = KAD_MSG__INIT;
+ ssize_t sent = 0;
+
+ assert(dht);
+ assert(key);
+
+ msg.code = code;
+
+ msg.has_key = true;
+ msg.key.data = (uint8_t *) key;
+ msg.key.len = dht->b;
+
+ while (*addrs != 0) {
+ if (*addrs != dht->addr) {
+ send_msg(dht, &msg, *addrs);
+ sent++;
+ }
+ ++addrs;
+ }
+
+ return sent;
+}
+
+static void lookup_set_state(struct lookup * lu,
+ enum lookup_state state)
+{
+ pthread_mutex_lock(&lu->lock);
+
+ lu->state = state;
+ pthread_cond_signal(&lu->cond);
+
+ pthread_mutex_unlock(&lu->lock);
+}
+
+static struct lookup * kad_lookup(struct dht * dht,
+ const uint8_t * id,
+ enum kad_code code)
+{
+ uint64_t addrs[KAD_ALPHA + 1];
+ enum lookup_state state;
+ struct lookup * lu;
+
+ lu = lookup_create(dht, id);
+ if (lu == NULL)
+ return NULL;
+
+ lookup_new_addrs(lu, addrs);
+
+ if (addrs[0] == 0) {
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+ lookup_destroy(lu);
+ return NULL;
+ }
+
+ if (kad_find(dht, id, addrs, code) == 0) {
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+ lu->state = LU_COMPLETE;
+ return lu;
+ }
+
+ while ((state = lookup_wait(lu)) != LU_COMPLETE) {
+ switch (state) {
+ case LU_UPDATE:
+ lookup_new_addrs(lu, addrs);
+ if (addrs[0] == 0) {
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+ return lu;
+ }
+
+ kad_find(dht, id, addrs, code);
+ break;
+ case LU_DESTROY:
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+ lookup_set_state(lu, LU_NULL);
+ return NULL;
+ default:
+ break;
+ };
+ }
+
+ assert(state = LU_COMPLETE);
+
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+
+ return lu;
+}
+
+static void kad_publish(struct dht * dht,
+ const uint8_t * key,
+ uint64_t addr,
+ time_t exp)
+{
+ struct lookup * lu;
+ uint64_t addrs[KAD_K];
+ ssize_t n;
+
+ assert(dht);
+ assert(key);
+
+ lu = kad_lookup(dht, key, KAD_FIND_NODE);
+ if (lu == NULL)
+ return;
+
+ n = lookup_contact_addrs(lu, addrs);
+
+ while (n-- > 0) {
+ if (addrs[n] == dht->addr) {
+ kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;
+ msg.id.data = (uint8_t *) key;
+ msg.id.len = dht->b;
+ msg.addr = addr;
+ kad_add(dht, &msg, 1, exp);
+ } else {
+ if (kad_store(dht, key, addr, addrs[n], dht->t_expire))
+ log_warn("Failed to send store message.");
+ }
+ }
+
+ lookup_destroy(lu);
+}
+
+static int kad_join(struct dht * dht,
+ uint64_t addr)
+{
+ kad_msg_t msg = KAD_MSG__INIT;
+ struct lookup * lu;
+
+ msg.code = KAD_JOIN;
+
+ msg.has_alpha = true;
+ msg.has_b = true;
+ msg.has_k = true;
+ msg.has_t_refresh = true;
+ msg.has_t_replicate = true;
+ msg.alpha = KAD_ALPHA;
+ msg.b = dht->b;
+ msg.k = KAD_K;
+ msg.t_refresh = KAD_T_REFR;
+ msg.t_replicate = KAD_T_REPL;
+
+ if (send_msg(dht, &msg, addr))
+ return -1;
+
+ if (wait_resp(dht, &msg, KAD_T_JOIN) < 0)
+ return -1;
+
+ dht->id = create_id(dht->b);
+ if (dht->id == NULL)
+ return -1;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ dht_update_bucket(dht, dht->id, dht->addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
+ if (lu != NULL)
+ lookup_destroy(lu);
+
+ return 0;
+}
+
+static void dht_dead_peer(struct dht * dht,
+ uint8_t * key,
+ uint64_t addr)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct bucket * b;
+
+ b = dht_get_bucket(dht, key);
+
+ list_for_each_safe(p, h, &b->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ if (b->n_contacts + b->n_alts <= dht->k) {
+ ++c->fails;
+ return;
+ }
+
+ if (c->addr == addr) {
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->n_contacts;
+ break;
+ }
+ }
+
+ while (b->n_contacts < dht->k && b->n_alts > 0) {
+ struct contact * c;
+ c = list_first_entry(&b->alts, struct contact, next);
+ list_del(&c->next);
+ --b->n_alts;
+ list_add(&c->next, &b->contacts);
+ ++b->n_contacts;
+ }
+}
+
+static int dht_del(struct dht * dht,
+ const uint8_t * key,
+ uint64_t addr)
+{
+ struct dht_entry * e;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ e = dht_find_entry(dht, key);
+ if (e == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return -EPERM;
+ }
+
+ dht_entry_del_addr(e, addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+}
+
+static buffer_t dht_retrieve(struct dht * dht,
+ const uint8_t * key)
+{
+ struct dht_entry * e;
+ struct list_head * p;
+ buffer_t buf;
+ uint64_t * pos;
+
+ buf.len = 0;
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ e = dht_find_entry(dht, key);
+ if (e == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return buf;
+ }
+
+ buf.data = malloc(sizeof(dht->addr) * e->n_vals);
+ if (buf.data == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return buf;
+ }
+
+ buf.len = e->n_vals;
+
+ pos = (uint64_t *) buf.data;;
+
+ list_for_each(p, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ *pos++ = v->addr;
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return buf;
+}
+
+static ssize_t dht_get_contacts(struct dht * dht,
+ const uint8_t * key,
+ kad_contact_msg_t *** msgs)
+{
+ struct list_head l;
+ struct list_head * p;
+ struct list_head * h;
+ size_t len;
+ size_t i = 0;
+
+ list_head_init(&l);
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ len = dht_contact_list(dht, &l, key);
+ if (len == 0)
+ return 0;
+
+ *msgs = malloc(len * sizeof(**msgs));
+ if (*msgs == NULL)
+ return 0;
+
+ list_for_each_safe(p, h, &l) {
+ struct contact * c = list_entry(p, struct contact, next);
+ (*msgs)[i] = malloc(sizeof(***msgs));
+ if ((*msgs)[i] == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ while (i > 0)
+ free(*msgs[--i]);
+ free(*msgs);
+ return 0;
+ }
+
+ kad_contact_msg__init((*msgs)[i]);
+
+ (*msgs)[i]->id.data = c->id;
+ (*msgs)[i]->id.len = dht->b;
+ (*msgs)[i++]->addr = c->addr;
+ list_del(&c->next);
+ free(c);
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return i;
+}
+
+static time_t gcd(time_t a,
+ time_t b)
+{
+ if (a == 0)
+ return b;
+
+ return gcd(b % a, a);
+}
+
+static void * work(void * o)
+{
+ struct dht * dht;
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ struct list_head reflist;
+ time_t intv;
+ struct lookup * lu;
+
+ dht = (struct dht *) o;
+
+ intv = gcd(dht->t_expire, dht->t_repub);
+ intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2;
+
+ list_head_init(&reflist);
+
+ while (true) {
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ /* Republish registered hashes. */
+ list_for_each_safe(p, h, &dht->refs) {
+ struct ref_entry * e;
+ e = list_entry(p, struct ref_entry, next);
+ if (now.tv_sec > e->t_rep) {
+ kad_publish(dht, e->key, dht->addr,
+ dht->t_expire);
+ e->t_rep = now.tv_sec + dht->t_repub;
+ }
+ }
+
+ /* Remove stale entries and republish if necessary. */
+ list_for_each_safe(p, h, &dht->entries) {
+ struct list_head * p1;
+ struct list_head * h1;
+ struct dht_entry * e;
+ e = list_entry (p, struct dht_entry, next);
+ list_for_each_safe(p1, h1, &e->vals) {
+ struct val * v;
+ v = list_entry(p1, struct val, next);
+ if (now.tv_sec > v->t_exp) {
+ list_del(&v->next);
+ val_destroy(v);
+ }
+
+ if (now.tv_sec > v->t_rep) {
+ kad_publish(dht, e->key, v->addr,
+ dht->t_expire - now.tv_sec);
+ v->t_rep = now.tv_sec + dht->t_replic;
+ }
+ }
+ }
+
+ /* Check the requests list for unresponsive nodes. */
+ list_for_each_safe(p, h, &dht->requests) {
+ struct kad_req * r;
+ r = list_entry(p, struct kad_req, next);
+ if (now.tv_sec > r->t_exp) {
+ list_del(&r->next);
+ bmp_release(dht->cookies, r->cookie);
+ dht_dead_peer(dht, r->key, r->addr);
+ kad_req_destroy(r);
+ }
+ }
+
+ /* Refresh unaccessed buckets. */
+ bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ list_for_each_safe(p, h, &reflist) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ lu = kad_lookup(dht, c->id, KAD_FIND_NODE);
+ if (lu != NULL)
+ lookup_destroy(lu);
+ list_del(&c->next);
+ contact_destroy(c);
+ }
+
+ sleep(intv);
+ }
+
+ return (void *) 0;
+}
+
+static int kad_handle_join_resp(struct dht * dht,
+ struct kad_req * req,
+ kad_msg_t * msg)
+{
+ assert(dht);
+ assert(req);
+ assert(msg);
+
+ /* We might send version numbers later to warn of updates if needed. */
+ if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire &&
+ msg->has_t_refresh && msg->has_t_replicate)) {
+ log_warn("Join refused by remote.");
+ return -1;
+ }
+
+ if (msg->b < sizeof(uint64_t)) {
+ log_err("Hash sizes less than 8 bytes unsupported.");
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ dht->buckets = bucket_create();
+ if (dht->buckets == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+ }
+
+ /* Likely corrupt packet. The member will refuse, we might here too. */
+ if (msg->alpha != KAD_ALPHA || msg->k != KAD_K)
+ log_warn("Different kademlia parameters detected.");
+
+ if (msg->t_replicate != KAD_T_REPL)
+ log_warn("Different kademlia replication time detected.");
+
+ if (msg->t_refresh != KAD_T_REFR)
+ log_warn("Different kademlia refresh time detected.");
+
+ dht->k = msg->k;
+ dht->b = msg->b;
+ dht->t_expire = msg->t_expire;
+ dht->t_repub = MAX(1, dht->t_expire - 10);
+
+ if (pthread_create(&dht->worker, NULL, work, dht)) {
+ bucket_destroy(dht->buckets);
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+ }
+
+ dht->state = DHT_RUNNING;
+
+ kad_req_respond(req);
+
+ dht_update_bucket(dht, msg->s_id.data, msg->s_addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ log_dbg("Enrollment of DHT completed.");
+
+ return 0;
+}
+
+static int kad_handle_find_resp(struct dht * dht,
+ struct kad_req * req,
+ kad_msg_t * msg)
+{
+ struct lookup * lu;
+
+ assert(dht);
+ assert(req);
+ assert(msg);
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ lu = dht_find_lookup(dht, req->key);
+ if (lu == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+ }
+
+ lookup_update(dht, lu, msg);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+}
+
+static void kad_handle_response(struct dht * dht,
+ kad_msg_t * msg)
+{
+ struct kad_req * req;
+
+ assert(dht);
+ assert(msg);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ req = dht_find_request(dht, msg);
+ if (req == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return;
+ }
+
+ bmp_release(dht->cookies, req->cookie);
+ list_del(&req->next);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ switch(req->code) {
+ case KAD_JOIN:
+ if (kad_handle_join_resp(dht, req, msg))
+ log_err("Enrollment of DHT failed.");
+ break;
+ case KAD_FIND_VALUE:
+ case KAD_FIND_NODE:
+ if (dht_get_state(dht) != DHT_RUNNING)
+ return;
+ kad_handle_find_resp(dht, req, msg);
+ break;
+ default:
+ break;
+ }
+
+ kad_req_destroy(req);
+}
+
+int dht_bootstrap(struct dht * dht,
+ size_t b,
+ time_t t_expire)
+{
+ assert(dht);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ dht->id = create_id(b);
+ if (dht->id == NULL)
+ goto fail_id;
+
+ dht->buckets = bucket_create();
+ if (dht->buckets == NULL)
+ goto fail_buckets;
+
+ dht->buckets->depth = 0;
+ dht->buckets->mask = 0;
+
+ dht->b = b / CHAR_BIT;
+ dht->t_expire = MAX(2, t_expire);
+ dht->t_repub = MAX(1, t_expire - 10);
+ dht->k = KAD_K;
+
+ if (pthread_create(&dht->worker, NULL, work, dht))
+ goto fail_pthread_create;
+
+ dht->state = DHT_RUNNING;
+
+ dht_update_bucket(dht, dht->id, dht->addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+
+ fail_pthread_create:
+ bucket_destroy(dht->buckets);
+ dht->buckets = NULL;
+ fail_buckets:
+ free(dht->id);
+ dht->id = NULL;
+ fail_id:
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+}
+
+int dht_enroll(struct dht * dht,
+ uint64_t addr)
+{
+ assert(dht);
+
+ return kad_join(dht, addr);
+}
+
+int dht_reg(struct dht * dht,
+ const uint8_t * key)
+{
+ struct ref_entry * e;
+
+ assert(dht);
+ assert(key);
+ assert(dht->addr != 0);
+
+ if (dht_get_state(dht) != DHT_RUNNING)
+ return -1;
+
+ e = ref_entry_create(dht, key);
+ if (e == NULL)
+ return -ENOMEM;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_add(&e->next, &dht->refs);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ kad_publish(dht, key, dht->addr, dht->t_expire);
+
+ return 0;
+}
+
+int dht_unreg(struct dht * dht,
+ const uint8_t * key)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(dht);
+ assert(key);
+
+ if (dht_get_state(dht) != DHT_RUNNING)
+ return -1;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_for_each_safe(p, h, &dht->refs) {
+ struct ref_entry * r = list_entry(p, struct ref_entry, next);
+ if (!memcmp(key, r->key, dht-> b) ) {
+ list_del(&r->next);
+ ref_entry_destroy(r);
+ }
+ }
+
+ dht_del(dht, key, dht->addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+}
+
+uint64_t dht_query(struct dht * dht,
+ const uint8_t * key)
+{
+ struct dht_entry * e;
+ struct lookup * lu;
+ uint64_t addrs[KAD_K];
+ size_t n;
+
+ addrs[0] = 0;
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ e = dht_find_entry(dht, key);
+ if (e != NULL)
+ addrs[0] = dht_entry_get_addr(dht, e);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ if (addrs[0] != 0 && addrs[0] != dht->addr)
+ return addrs[0];
+
+ lu = kad_lookup(dht, key, KAD_FIND_VALUE);
+ if (lu == NULL)
+ return 0;
+
+ n = lookup_get_addrs(lu, addrs);
+ if (n == 0) {
+ lookup_destroy(lu);
+ return 0;
+ }
+
+ lookup_destroy(lu);
+
+ /* Current behaviour is anycast and return the first peer address. */
+ if (addrs[0] != dht->addr)
+ return addrs[0];
+
+ if (n > 1)
+ return addrs[1];
+
+ return 0;
+}
+
+void dht_post_sdu(void * ae,
+ struct shm_du_buff * sdb)
+{
+ struct dht * dht;
+ kad_msg_t * msg;
+ kad_contact_msg_t ** cmsgs;
+ kad_msg_t resp_msg = KAD_MSG__INIT;
+ uint64_t addr;
+ buffer_t buf;
+ size_t i;
+
+ assert(ae);
+ assert(sdb);
+
+ memset(&buf, 0, sizeof(buf));
+
+ dht = (struct dht *) ae;
+
+ msg = kad_msg__unpack(NULL,
+ shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
+ shm_du_buff_head(sdb));
+
+ ipcp_sdb_release(sdb);
+
+ if (msg == NULL) {
+ log_err("Failed to unpack message.");
+ return;
+ }
+
+ if (msg->has_key && msg->key.len != dht->b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad key in message.");
+ return;
+ }
+
+ if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad source ID in message of type %d.", msg->code);
+ return;
+ }
+
+ if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) {
+ kad_msg__free_unpacked(msg, NULL);
+ return;
+ }
+
+ addr = msg->s_addr;
+
+ resp_msg.code = KAD_RESPONSE;
+ resp_msg.cookie = msg->cookie;
+
+ switch(msg->code) {
+ case KAD_JOIN:
+ /* Refuse enrollee on check fails. */
+ if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
+ log_warn("Parameter mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
+
+ if (msg->t_replicate != KAD_T_REPL) {
+ log_warn("Replication time mismatch. "
+ "DHT enrolment refused.");
+
+ break;
+ }
+
+ if (msg->t_refresh != KAD_T_REFR) {
+ log_warn("Refresh time mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
+
+ resp_msg.has_alpha = true;
+ resp_msg.has_b = true;
+ resp_msg.has_k = true;
+ resp_msg.has_t_expire = true;
+ resp_msg.has_t_refresh = true;
+ resp_msg.has_t_replicate = true;
+ resp_msg.alpha = KAD_ALPHA;
+ resp_msg.b = dht->b;
+ resp_msg.k = KAD_K;
+ resp_msg.t_expire = dht->t_expire;
+ resp_msg.t_refresh = KAD_T_REFR;
+ resp_msg.t_replicate = KAD_T_REPL;
+ break;
+ case KAD_FIND_VALUE:
+ buf = dht_retrieve(dht, msg->key.data);
+ if (buf.len != 0) {
+ resp_msg.n_addrs = buf.len;
+ resp_msg.addrs = (uint64_t *) buf.data;
+ break;
+ }
+ /* FALLTHRU */
+ case KAD_FIND_NODE:
+ /* Return k closest contacts. */
+ resp_msg.n_contacts =
+ dht_get_contacts(dht, msg->key.data, &cmsgs);
+ resp_msg.contacts = cmsgs;
+ break;
+ case KAD_STORE:
+ if (msg->n_contacts < 1) {
+ log_warn("No contacts in store message.");
+ break;
+ }
+
+ if (!msg->has_t_expire) {
+ log_warn("No expiry time in store message.");
+ break;
+ }
+
+ kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire);
+ break;
+ case KAD_RESPONSE:
+ kad_handle_response(dht, msg);
+ break;
+ default:
+ assert(false);
+ break;
+ }
+
+ if (msg->code != KAD_JOIN) {
+ pthread_rwlock_wrlock(&dht->lock);
+ if (dht_update_bucket(dht, msg->s_id.data, addr))
+ log_warn("Failed to update bucket.");
+ pthread_rwlock_unlock(&dht->lock);
+ }
+
+ if (msg->code < KAD_STORE) {
+ if (send_msg(dht, &resp_msg, addr))
+ log_warn("Failed to send response.");
+ }
+
+ kad_msg__free_unpacked(msg, NULL);
+
+ if (resp_msg.n_addrs > 0)
+ free(resp_msg.addrs);
+
+ if (resp_msg.n_contacts == 0)
+ return;
+
+ for (i = 0; i < resp_msg.n_contacts; ++i)
+ kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL);
+ free(resp_msg.contacts);
+}
+
+void dht_destroy(struct dht * dht)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ if (dht == NULL)
+ return;
+
+ if (dht_get_state(dht) == DHT_RUNNING)
+ dht_set_state(dht, DHT_SHUTDOWN);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_for_each_safe(p, h, &dht->entries) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ }
+
+ list_for_each_safe(p, h, &dht->requests) {
+ struct kad_req * r = list_entry(p, struct kad_req, next);
+ list_del(&r->next);
+ free(r);
+ }
+
+ list_for_each_safe(p, h, &dht->refs) {
+ struct ref_entry * e = list_entry(p, struct ref_entry, next);
+ list_del(&e->next);
+ ref_entry_destroy(e);
+ }
+
+ list_for_each_safe(p, h, &dht->lookups) {
+ struct lookup * l = list_entry(p, struct lookup, next);
+ list_del(&l->next);
+ lookup_destroy(l);
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ if (dht_get_state(dht) == DHT_SHUTDOWN) {
+ pthread_cancel(dht->worker);
+ pthread_join(dht->worker, NULL);
+ }
+
+ if (dht->buckets != NULL)
+ bucket_destroy(dht->buckets);
+
+ bmp_destroy(dht->cookies);
+
+ pthread_mutex_destroy(&dht->mtx);
+
+ pthread_rwlock_destroy(&dht->lock);
+
+ free(dht->id);
+
+ free(dht);
+}
+
+struct dht * dht_create(uint64_t addr)
+{
+ struct dht * dht;
+
+ dht = malloc(sizeof(*dht));
+ if (dht == NULL)
+ goto fail_malloc;
+
+ dht->buckets = NULL;
+
+ list_head_init(&dht->entries);
+ list_head_init(&dht->requests);
+ list_head_init(&dht->refs);
+ list_head_init(&dht->lookups);
+
+ if (pthread_rwlock_init(&dht->lock, NULL))
+ goto fail_rwlock;
+
+ if (pthread_mutex_init(&dht->mtx, NULL))
+ goto fail_mutex;
+
+ dht->cookies = bmp_create(DHT_MAX_REQS, 1);
+ if (dht->cookies == NULL)
+ goto fail_bmp;
+
+ dht->b = 0;
+ dht->addr = addr;
+ dht->id = NULL;
+#ifndef __DHT_TEST__
+ dht->fd = dt_reg_ae(dht, &dht_post_sdu);
+#endif /* __DHT_TEST__ */
+
+ dht->state = DHT_INIT;
+
+ return dht;
+
+ fail_bmp:
+ pthread_mutex_destroy(&dht->mtx);
+ fail_mutex:
+ pthread_rwlock_destroy(&dht->lock);
+ fail_rwlock:
+ free(dht);
+ fail_malloc:
+ return NULL;
+}
diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h
new file mode 100644
index 00000000..5d7fc894
--- /dev/null
+++ b/src/ipcpd/normal/dht.h
@@ -0,0 +1,54 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Distributed Hash Table based on Kademlia
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_DHT_H
+#define OUROBOROS_IPCPD_NORMAL_DHT_H
+
+#include <ouroboros/ipcp-dev.h>
+
+#include <stdint.h>
+#include <sys/types.h>
+
+struct dht;
+
+struct dht * dht_create(uint64_t addr);
+
+int dht_bootstrap(struct dht * dht,
+ size_t b,
+ time_t t_expire);
+
+int dht_enroll(struct dht * dht,
+ uint64_t addr);
+
+void dht_destroy(struct dht * dht);
+
+int dht_reg(struct dht * dht,
+ const uint8_t * key);
+
+int dht_unreg(struct dht * dht,
+ const uint8_t * key);
+
+uint64_t dht_query(struct dht * dht,
+ const uint8_t * key);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_DHT_H */
diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c
index 5ea8a300..69b7e90e 100644
--- a/src/ipcpd/normal/dir.c
+++ b/src/ipcpd/normal/dir.c
@@ -20,129 +20,134 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
+#define OUROBOROS_PREFIX "directory"
+
#include <ouroboros/config.h>
+#include <ouroboros/endian.h>
#include <ouroboros/errno.h>
+#include <ouroboros/logs.h>
#include <ouroboros/rib.h>
+#include <ouroboros/utils.h>
#include "dir.h"
+#include "dht.h"
#include "ipcp.h"
#include "ribconfig.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
+#include <inttypes.h>
-static char dir_path[RIB_MAX_PATH_LEN + 1];
+#define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT)
+#define ENROL_RETR 6
+#define ENROL_INTV 1
-static void dir_path_reset(void) {
- dir_path[strlen(DIR_PATH)]= '\0';
- assert(strcmp(DIR_PATH, dir_path) == 0);
-}
+struct dht * dht;
-int dir_init(void)
+static uint64_t find_peer_addr(void)
{
- /* FIXME: set ribmgr dissemination here */
- if (rib_add(RIB_ROOT, DIR_NAME))
- return -1;
+ ssize_t i;
+ char ** members;
+ ssize_t n_members;
+ size_t reset;
+ char path[RIB_MAX_PATH_LEN + 1];
- strcpy(dir_path, DIR_PATH);
+ strcpy(path, MEMBERS_PATH);
- return 0;
-}
+ reset = strlen(path);
-int dir_fini(void)
-{
- /* FIXME: remove ribmgr dissemination here*/
+ n_members = rib_children(path, &members);
+ if (n_members == 1) {
+ freepp(ssize_t, members, n_members);
+ return 0;
+ }
+
+ for (i = 0; i < n_members; ++i) {
+ uint64_t addr;
+ rib_path_append(path, members[i]);
+ if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) {
+ log_err("Failed to read address from RIB.");
+ freepp(ssize_t, members, n_members);
+ return ipcpi.dt_addr;
+ }
+
+ if (addr != ipcpi.dt_addr) {
+ freepp(ssize_t, members, n_members);
+ return addr;
+ }
+
+ path[reset] = '\0';
+ }
- dir_path_reset();
- rib_del(dir_path);
+ freepp(ssize_t, members, n_members);
return 0;
}
-int dir_reg(const uint8_t * hash)
+int dir_init()
{
- char hashstr[ipcp_dir_hash_strlen() + 1];
- int ret;
-
- assert(hash);
+ uint64_t addr;
- dir_path_reset();
-
- ipcp_hash_str(hashstr, hash);
-
- ret = rib_add(dir_path, hashstr);
- if (ret == -ENOMEM)
- return -ENOMEM;
-
- rib_path_append(dir_path, hashstr);
+ dht = dht_create(ipcpi.dt_addr);
+ if (dht == NULL)
+ return -ENOMEM;
- ret = rib_add(dir_path, ipcpi.name);
- if (ret == -EPERM)
+ addr = find_peer_addr();
+ if (addr == ipcpi.dt_addr) {
+ log_err("Failed to get peer address.");
+ dht_destroy(dht);
return -EPERM;
- if (ret == -ENOMEM) {
- if (rib_children(dir_path, NULL) == 0)
- rib_del(dir_path);
- return -ENOMEM;
}
- return 0;
-}
-
-int dir_unreg(const uint8_t * hash)
-{
- char hashstr[ipcp_dir_hash_strlen() + 1];
- size_t len;
-
- assert(hash);
+ if (addr != 0) {
+ size_t retr = 0;
+ log_dbg("Enrolling directory with peer %" PRIu64 ".", addr);
+ /* NOTE: we could try other members if dht_enroll times out. */
+ while (dht_enroll(dht, addr)) {
+ if (retr++ == ENROL_RETR) {
+ dht_destroy(dht);
+ return -EPERM;
+ }
- dir_path_reset();
+ log_dbg("Directory enrollment failed, retrying...");
+ sleep(ENROL_INTV);
+ }
- ipcp_hash_str(hashstr, hash);
+ log_dbg("Directory enrolled.");
- rib_path_append(dir_path, hashstr);
-
- if (!rib_has(dir_path))
return 0;
+ }
- len = strlen(dir_path);
-
- rib_path_append(dir_path, ipcpi.name);
-
- rib_del(dir_path);
+ log_dbg("Bootstrapping directory.");
- dir_path[len] = '\0';
+ /* TODO: get parameters for bootstrap from IRM tool. */
+ if (dht_bootstrap(dht, KAD_B, 86400)) {
+ dht_destroy(dht);
+ return -ENOMEM;
+ }
- if (rib_children(dir_path, NULL) == 0)
- rib_del(dir_path);
+ log_dbg("Directory bootstrapped.");
return 0;
}
-int dir_query(const uint8_t * hash)
+void dir_fini(void)
{
- char hashstr[ipcp_dir_hash_strlen() + 1];
- size_t len;
-
- dir_path_reset();
-
- ipcp_hash_str(hashstr, hash);
-
- rib_path_append(dir_path, hashstr);
-
- if (!rib_has(dir_path))
- return -1;
-
- /* FIXME: assert after local IPCP is deprecated */
- len = strlen(dir_path);
+ dht_destroy(dht);
+}
- rib_path_append(dir_path, ipcpi.name);
+int dir_reg(const uint8_t * hash)
+{
+ return dht_reg(dht, hash);
+}
- if (rib_has(dir_path)) {
- dir_path[len] = '\0';
- if (rib_children(dir_path, NULL) == 1)
- return -1;
- }
+int dir_unreg(const uint8_t * hash)
+{
+ return dht_unreg(dht, hash);
+}
- return 0;
+uint64_t dir_query(const uint8_t * hash)
+{
+ return dht_query(dht, hash);
}
diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h
index 1b28a5c0..4091a3e8 100644
--- a/src/ipcpd/normal/dir.h
+++ b/src/ipcpd/normal/dir.h
@@ -23,14 +23,14 @@
#ifndef OUROBOROS_IPCPD_NORMAL_DIR_H
#define OUROBOROS_IPCPD_NORMAL_DIR_H
-int dir_init(void);
+int dir_init(void);
-int dir_fini(void);
+void dir_fini(void);
-int dir_reg(const uint8_t * hash);
+int dir_reg(const uint8_t * hash);
-int dir_unreg(const uint8_t * hash);
+int dir_unreg(const uint8_t * hash);
-int dir_query(const uint8_t * hash);
+uint64_t dir_query(const uint8_t * hash);
#endif /* OUROBOROS_IPCPD_NORMAL_DIR_H */
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 1867c13b..5fcc5865 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -50,7 +50,7 @@
#include <assert.h>
struct ae_info {
- int (*post_sdu)(void * ae, struct shm_du_buff * sdb);
+ void (* post_sdu)(void * ae, struct shm_du_buff * sdb);
void * ae;
};
@@ -131,11 +131,14 @@ static int sdu_handler(int fd,
return 0;
}
- if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) {
+ if (dt.aes[dt_pci.fd].post_sdu == NULL) {
+ log_err("No registered AE on fd %d.", dt_pci.fd);
ipcp_sdb_release(sdb);
- return -1;
+ return -EPERM;
}
+ dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb);
+
return 0;
}
@@ -295,7 +298,7 @@ void dt_stop(void)
}
int dt_reg_ae(void * ae,
- int (* func)(void * func, struct shm_du_buff *))
+ void (* func)(void * func, struct shm_du_buff *))
{
int res_fd;
@@ -330,10 +333,11 @@ int dt_write_sdu(uint64_t dst_addr,
struct dt_pci dt_pci;
assert(sdb);
+ assert(dst_addr != ipcpi.dt_addr);
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
- log_err("Could not get nhop for addr %" PRIu64 ".", dst_addr);
+ log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
return -1;
}
diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h
index 0e1a8cc3..15ef51f0 100644
--- a/src/ipcpd/normal/dt.h
+++ b/src/ipcpd/normal/dt.h
@@ -38,7 +38,7 @@ int dt_start(void);
void dt_stop(void);
int dt_reg_ae(void * ae,
- int (* func)(void * ae, struct shm_du_buff * sdb));
+ void (* func)(void * ae, struct shm_du_buff * sdb));
int dt_write_sdu(uint64_t dst_addr,
qoscube_t qc,
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index 40a680c3..704f4f16 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -30,6 +30,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
+#include "dir.h"
#include "dt_pci.h"
#include "fa.h"
#include "sdu_sched.h"
@@ -79,8 +80,8 @@ static void destroy_conn(int fd)
fa.r_addr[fd] = INVALID_ADDR;
}
-static int fa_post_sdu(void * ae,
- struct shm_du_buff * sdb)
+static void fa_post_sdu(void * ae,
+ struct shm_du_buff * sdb)
{
struct timespec ts = {0, TIMEOUT * 1000};
struct timespec abstime;
@@ -98,9 +99,12 @@ static int fa_post_sdu(void * ae,
shm_du_buff_tail(sdb) -
shm_du_buff_head(sdb),
shm_du_buff_head(sdb));
+
+ ipcp_sdb_release(sdb);
+
if (msg == NULL) {
log_err("Failed to unpack flow alloc message.");
- return -1;
+ return;
}
switch (msg->code) {
@@ -113,7 +117,7 @@ static int fa_post_sdu(void * ae,
log_err("Bad flow request.");
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ return;
}
while (ipcpi.alloc_id != -1 &&
@@ -128,7 +132,7 @@ static int fa_post_sdu(void * ae,
log_dbg("Won't allocate over non-operational IPCP.");
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ return;
}
assert(ipcpi.alloc_id == -1);
@@ -141,7 +145,7 @@ static int fa_post_sdu(void * ae,
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
log_err("Failed to get fd for flow.");
- return -1;
+ return;
}
pthread_rwlock_wrlock(&fa.flows_lock);
@@ -173,13 +177,10 @@ static int fa_post_sdu(void * ae,
default:
log_err("Got an unknown flow allocation message.");
flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ return;
}
flow_alloc_msg__free_unpacked(msg, NULL);
- ipcp_sdb_release(sdb);
-
- return 0;
}
int fa_init(void)
@@ -240,47 +241,10 @@ int fa_alloc(int fd,
qoscube_t qc)
{
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- char path[RIB_MAX_PATH_LEN + 1];
uint64_t addr;
- ssize_t ch;
- ssize_t i;
- char ** children;
- char hashstr[ipcp_dir_hash_strlen() + 1];
- char * dst_ipcp = NULL;
struct shm_du_buff * sdb;
- ipcp_hash_str(hashstr, dst);
-
- assert(strlen(hashstr) + strlen(DIR_PATH) + 1
- < RIB_MAX_PATH_LEN);
-
- strcpy(path, DIR_PATH);
-
- rib_path_append(path, hashstr);
-
- ch = rib_children(path, &children);
- if (ch <= 0)
- return -1;
-
- for (i = 0; i < ch; ++i)
- if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0)
- dst_ipcp = children[i];
- else
- free(children[i]);
-
- free(children);
-
- if (dst_ipcp == NULL)
- return -1;
-
- strcpy(path, MEMBERS_PATH);
-
- rib_path_append(path, dst_ipcp);
-
- free(dst_ipcp);
-
- if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr))
- return -1;
+ addr = dir_query(dst);
msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
msg.has_hash = true;
diff --git a/src/ipcpd/normal/kademlia.proto b/src/ipcpd/normal/kademlia.proto
new file mode 100644
index 00000000..0b7e8beb
--- /dev/null
+++ b/src/ipcpd/normal/kademlia.proto
@@ -0,0 +1,46 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * KAD protocol
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+syntax = "proto2";
+
+message kad_contact_msg {
+ required bytes id = 1;
+ required uint64 addr = 2;
+};
+
+message kad_msg {
+ required uint32 code = 1;
+ required uint32 cookie = 2;
+ required uint64 s_addr = 3;
+ optional bytes s_id = 4;
+ optional bytes key = 5;
+ repeated uint64 addrs = 6;
+ repeated kad_contact_msg contacts = 7;
+ // enrolment parameters
+ optional uint32 alpha = 8;
+ optional uint32 b = 9;
+ optional uint32 k = 10;
+ optional uint32 t_expire = 11;
+ optional uint32 t_refresh = 12;
+ optional uint32 t_replicate = 13;
+}; \ No newline at end of file
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 8c28de78..f94c15de 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -116,11 +116,6 @@ static int boot_components(void)
log_dbg("Starting ribmgr.");
- if (dir_init()) {
- log_err("Failed to initialize directory.");
- goto fail_dir;
- }
-
if (ribmgr_init()) {
log_err("Failed to initialize RIB manager.");
goto fail_ribmgr;
@@ -148,6 +143,11 @@ static int boot_components(void)
goto fail_fa_start;
}
+ if (dir_init()) {
+ log_err("Failed to initialize directory.");
+ goto fail_dir;
+ }
+
if (enroll_start()) {
log_err("Failed to start enroll.");
goto fail_enroll_start;
@@ -166,6 +166,8 @@ static int boot_components(void)
ipcp_set_state(IPCP_INIT);
enroll_stop();
fail_enroll_start:
+ dir_fini();
+ fail_dir:
fa_stop();
fail_fa_start:
dt_stop();
@@ -176,8 +178,6 @@ static int boot_components(void)
fail_dt:
ribmgr_fini();
fail_ribmgr:
- dir_fini();
- fail_dir:
addr_auth_fini();
fail_addr_auth:
free(ipcpi.dif_name);
@@ -191,6 +191,8 @@ void shutdown_components(void)
enroll_stop();
+ dir_fini();
+
fa_stop();
dt_stop();
@@ -201,8 +203,6 @@ void shutdown_components(void)
ribmgr_fini();
- dir_fini();
-
addr_auth_fini();
free(ipcpi.dif_name);
@@ -227,10 +227,9 @@ static int normal_ipcp_enroll(const char * dst,
return -1;
}
- log_dbg("Enrolled with " HASH_FMT, HASH_VAL(dst));
+ log_dbg("Enrolled with %s.", dst);
info->dir_hash_algo = ipcpi.dir_hash_algo;
-
strcpy(info->dif_name, ipcpi.dif_name);
return 0;
@@ -347,12 +346,17 @@ static int normal_ipcp_bootstrap(const struct ipcp_config * conf)
return 0;
}
+static int normal_ipcp_query(const uint8_t * dst)
+{
+ return dir_query(dst) ? 0 : -1;
+}
+
static struct ipcp_ops normal_ops = {
.ipcp_bootstrap = normal_ipcp_bootstrap,
.ipcp_enroll = normal_ipcp_enroll,
.ipcp_reg = dir_reg,
.ipcp_unreg = dir_unreg,
- .ipcp_query = dir_query,
+ .ipcp_query = normal_ipcp_query,
.ipcp_flow_alloc = fa_alloc,
.ipcp_flow_alloc_resp = fa_alloc_resp,
.ipcp_flow_dealloc = fa_dealloc
diff --git a/src/ipcpd/normal/pol/flat.c b/src/ipcpd/normal/pol/flat.c
index e709da7c..0907cf7a 100644
--- a/src/ipcpd/normal/pol/flat.c
+++ b/src/ipcpd/normal/pol/flat.c
@@ -56,7 +56,7 @@ static int addr_taken(char * name,
char path[RIB_MAX_PATH_LEN + 1];
size_t reset;
- strcpy(path, "/" MEMBERS_NAME);
+ strcpy(path, MEMBERS_PATH);
reset = strlen(path);
@@ -102,7 +102,7 @@ uint64_t flat_address(void)
char ** members;
ssize_t n_members;
- strcpy(path, "/" MEMBERS_NAME);
+ strcpy(path, MEMBERS_PATH);
if (!rib_has(path)) {
log_err("Could not read members from RIB.");
diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h
index 31c79fbe..db1ff1bb 100644
--- a/src/ipcpd/normal/ribconfig.h
+++ b/src/ipcpd/normal/ribconfig.h
@@ -29,9 +29,7 @@
#define DLR "/"
#define BOOT_NAME "boot"
#define MEMBERS_NAME "members"
-#define DIR_NAME "directory"
#define ROUTING_NAME "fsdb"
-#define DIR_PATH DLR DIR_NAME
#define BOOT_PATH DLR BOOT_NAME
#define MEMBERS_PATH DLR MEMBERS_NAME
#define ROUTING_PATH DLR ROUTING_NAME
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 266a628d..3beb917c 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -299,9 +299,8 @@ static void * sync_rib(void *o)
rib_path_append(path, children[--ch]);
free(children[ch]);
- /* Only sync fsdb, members and directory */
+ /* Only sync fsdb and members */
if (strcmp(path, MEMBERS_PATH) == 0
- || strcmp(path, DIR_PATH) == 0
|| strcmp(path, ROUTING_PATH) == 0)
ribmgr_sync(path);
}
diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c
index 63259430..a4b9e074 100644
--- a/src/ipcpd/normal/sdu_sched.c
+++ b/src/ipcpd/normal/sdu_sched.c
@@ -36,11 +36,19 @@
struct sdu_sched {
flow_set_t * set[QOS_CUBE_MAX];
- fqueue_t * fqs[QOS_CUBE_MAX];
next_sdu_t callback;
- pthread_t sdu_reader;
+ pthread_t sdu_readers[IPCP_SCHED_THREADS];
};
+static void cleanup_reader(void * o)
+{
+ int i;
+ fqueue_t ** fqs = (fqueue_t **) o;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ fqueue_destroy(fqs[i]);
+}
+
static void * sdu_reader(void * o)
{
struct sdu_sched * sched;
@@ -49,14 +57,27 @@ static void * sdu_reader(void * o)
int fd;
int i = 0;
int ret;
+ fqueue_t * fqs[QOS_CUBE_MAX];
sched = (struct sdu_sched *) o;
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fqs[i] = fqueue_create();
+ if (fqs[i] == NULL) {
+ int j;
+ for (j = 0; j < i; ++j)
+ fqueue_destroy(fqs[j]);
+ return (void *) -1;
+ }
+ }
+
+ pthread_cleanup_push(cleanup_reader, fqs);
+
while (true) {
/* FIXME: replace with scheduling policy call */
i = (i + 1) % QOS_CUBE_MAX;
- ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout);
+ ret = flow_event_wait(sched->set[i], fqs[i], &timeout);
if (ret == -ETIMEDOUT)
continue;
@@ -65,7 +86,7 @@ static void * sdu_reader(void * o)
continue;
}
- while ((fd = fqueue_next(sched->fqs[i])) >= 0) {
+ while ((fd = fqueue_next(fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
log_warn("Failed to read SDU from fd %d.", fd);
continue;
@@ -78,6 +99,8 @@ static void * sdu_reader(void * o)
}
}
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
@@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)
sdu_sched = malloc(sizeof(*sdu_sched));
if (sdu_sched == NULL)
- return NULL;
+ goto fail_malloc;
sdu_sched->callback = callback;
@@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)
if (sdu_sched->set[i] == NULL) {
for (j = 0; j < i; ++j)
flow_set_destroy(sdu_sched->set[j]);
- goto fail_sdu_sched;
+ goto fail_flow_set;
}
}
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- sdu_sched->fqs[i] = fqueue_create();
- if (sdu_sched->fqs[i] == NULL) {
- for (j = 0; j < i; ++j)
- fqueue_destroy(sdu_sched->fqs[j]);
+ for (i = 0; i < IPCP_SCHED_THREADS; ++i) {
+ if (pthread_create(&sdu_sched->sdu_readers[i], NULL,
+ sdu_reader, sdu_sched)) {
+ int j;
+ for (j = 0; j < i; ++j) {
+ pthread_cancel(sdu_sched->sdu_readers[j]);
+ pthread_join(sdu_sched->sdu_readers[j], NULL);
+ }
goto fail_flow_set;
}
}
- pthread_create(&sdu_sched->sdu_reader,
- NULL,
- sdu_reader,
- (void *) sdu_sched);
-
return sdu_sched;
fail_flow_set:
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- flow_set_destroy(sdu_sched->set[i]);
- fail_sdu_sched:
free(sdu_sched);
+ fail_malloc:
return NULL;
}
@@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)
assert(sdu_sched);
- pthread_cancel(sdu_sched->sdu_reader);
-
- pthread_join(sdu_sched->sdu_reader, NULL);
+ for (i = 0; i < IPCP_SCHED_THREADS; ++i) {
+ pthread_cancel(sdu_sched->sdu_readers[i]);
+ pthread_join(sdu_sched->sdu_readers[i], NULL);
+ }
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fqueue_destroy(sdu_sched->fqs[i]);
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
flow_set_destroy(sdu_sched->set[i]);
- }
free(sdu_sched);
}
diff --git a/src/ipcpd/normal/tests/CMakeLists.txt b/src/ipcpd/normal/tests/CMakeLists.txt
new file mode 100644
index 00000000..d975caf6
--- /dev/null
+++ b/src/ipcpd/normal/tests/CMakeLists.txt
@@ -0,0 +1,37 @@
+get_filename_component(CURRENT_SOURCE_PARENT_DIR
+ ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
+get_filename_component(CURRENT_BINARY_PARENT_DIR
+ ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY)
+
+include_directories(${CMAKE_CURRENT_SOURCE_DIR})
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+include_directories(${CURRENT_SOURCE_PARENT_DIR})
+include_directories(${CURRENT_BINARY_PARENT_DIR})
+
+include_directories(${CMAKE_SOURCE_DIR}/include)
+include_directories(${CMAKE_BINARY_DIR}/include)
+
+get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
+get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
+
+create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
+ # Add new tests here
+ dht_test.c
+)
+
+set_source_files_properties(${KAD_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+
+add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}
+ ${KAD_PROTO_SRCS})
+target_link_libraries(${PARENT_DIR}_test ouroboros)
+
+add_dependencies(check ${PARENT_DIR}_test)
+
+set(tests_to_run ${${PARENT_DIR}_tests})
+remove(tests_to_run test_suite.c)
+
+foreach (test ${tests_to_run})
+ get_filename_component(test_name ${test} NAME_WE)
+ add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name})
+endforeach (test)
diff --git a/src/ipcpd/normal/tests/dht_test.c b/src/ipcpd/normal/tests/dht_test.c
new file mode 100644
index 00000000..861ae10a
--- /dev/null
+++ b/src/ipcpd/normal/tests/dht_test.c
@@ -0,0 +1,99 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Unit tests of the DHT AE
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define __DHT_TEST__
+
+#include "dht.c"
+
+#include <pthread.h>
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#define KEY_LEN 32
+
+#define EXP 86400
+#define CONTACTS 1000
+
+int dht_test(int argc,
+ char ** argv)
+{
+ struct dht * dht;
+ uint64_t addr = 0x0D1F;
+ uint8_t key[KEY_LEN];
+ size_t i;
+
+ (void) argc;
+ (void) argv;
+
+ dht = dht_create(addr);
+ if (dht == NULL) {
+ printf("Failed to create dht.\n");
+ return -1;
+ }
+
+ dht_destroy(dht);
+
+ dht = dht_create(addr);
+ if (dht == NULL) {
+ printf("Failed to re-create dht.\n");
+ return -1;
+ }
+
+ if (dht_bootstrap(dht, KEY_LEN, EXP)) {
+ printf("Failed to bootstrap dht.\n");
+ dht_destroy(dht);
+ return -1;
+ }
+
+ dht_destroy(dht);
+
+ dht = dht_create(addr);
+ if (dht == NULL) {
+ printf("Failed to re-create dht.\n");
+ return -1;
+ }
+
+ if (dht_bootstrap(dht, KEY_LEN, EXP)) {
+ printf("Failed to bootstrap dht.\n");
+ dht_destroy(dht);
+ return -1;
+ }
+
+ for (i = 0; i < CONTACTS; ++i) {
+ uint64_t addr;
+ random_buffer(&addr, sizeof(addr));
+ random_buffer(key, KEY_LEN);
+ pthread_rwlock_wrlock(&dht->lock);
+ if (dht_update_bucket(dht, key, addr)) {
+ pthread_rwlock_unlock(&dht->lock);
+ printf("Failed to update bucket.\n");
+ dht_destroy(dht);
+ return -1;
+ }
+ pthread_rwlock_unlock(&dht->lock);
+ }
+
+ dht_destroy(dht);
+
+ return 0;
+}
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 912234d6..96b0b729 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -36,6 +36,7 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/qos.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/tpm.h>
#include <ouroboros/logs.h>
#include "utils.h"
@@ -99,18 +100,9 @@ struct irm {
struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */
int sockfd; /* UNIX socket */
- pthread_t * threadpool; /* pool of mainloop threads */
-
- struct bmp * thread_ids; /* ids for mainloop threads */
- size_t max_threads; /* max threads set by tpm */
- size_t threads; /* available mainloop threads */
- pthread_cond_t threads_cond; /* signal thread entry/exit */
- pthread_mutex_t threads_lock; /* mutex for threads/condvar */
-
enum irm_state state; /* state of the irmd */
pthread_rwlock_t state_lock; /* lock for the entire irmd */
- pthread_t tpm; /* threadpool manager */
pthread_t irm_sanitize; /* clean up irmd resources */
pthread_t shm_sanitize; /* keep track of rdrbuff use */
} irmd;
@@ -327,7 +319,7 @@ static pid_t create_ipcp(char * name,
break;
}
- list_add_tail(&tmp->next, &irmd.ipcps);
+ list_add_tail(&tmp->next, p);
list_add(&api->next, &irmd.spawned_apis);
@@ -490,7 +482,7 @@ static int enroll_ipcp(pid_t api,
pthread_rwlock_unlock(&irmd.reg_lock);
if (ipcp_enroll(api, dst_name, &info) < 0) {
- log_err("Could not enroll IPCP.");
+ log_err("Could not enroll IPCP %d.", api);
return -1;
}
@@ -1434,16 +1426,6 @@ static void irm_fini(void)
if (irmd_get_state() != IRMD_NULL)
log_warn("Unsafe destroy.");
- pthread_mutex_lock(&irmd.threads_lock);
-
- if (irmd.thread_ids != NULL)
- bmp_destroy(irmd.thread_ids);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-
- if (irmd.threadpool != NULL)
- free(irmd.threadpool);
-
pthread_rwlock_wrlock(&irmd.flows_lock);
if (irmd.port_ids != NULL)
@@ -1517,8 +1499,8 @@ void irmd_sig_handler(int sig,
}
log_info("IRMd shutting down...");
-
irmd_set_state(IRMD_NULL);
+ tpm_stop();
break;
case SIGPIPE:
log_dbg("Ignored SIGPIPE.");
@@ -1700,55 +1682,11 @@ void * irm_sanitize(void * o)
}
}
-static void thread_inc(void)
-{
- pthread_mutex_lock(&irmd.threads_lock);
-
- ++irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
-static void thread_dec(void)
-{
- pthread_mutex_lock(&irmd.threads_lock);
-
- --irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
-static bool thread_check(void)
-{
- int ret;
-
- pthread_mutex_lock(&irmd.threads_lock);
-
- ret = irmd.threads > irmd.max_threads;
-
- pthread_mutex_unlock(&irmd.threads_lock);
-
- return ret;
-}
-
-static void thread_exit(ssize_t id)
-{
- pthread_mutex_lock(&irmd.threads_lock);
- bmp_release(irmd.thread_ids, id);
-
- --irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
void * mainloop(void * o)
{
uint8_t buf[IRM_MSG_BUF_SIZE];
- ssize_t id = (ssize_t) o;
+ (void) o;
while (true) {
#ifdef __FreeBSD__
@@ -1768,8 +1706,8 @@ void * mainloop(void * o)
struct timeval tv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
- if (irmd_get_state() != IRMD_RUNNING || thread_check()) {
- thread_exit(id);
+ if (irmd_get_state() != IRMD_RUNNING || tpm_check()) {
+ tpm_exit();
break;
}
@@ -1780,7 +1718,6 @@ void * mainloop(void * o)
if (select(irmd.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0)
continue;
#endif
-
cli_sockfd = accept(irmd.sockfd, 0, 0);
if (cli_sockfd < 0)
continue;
@@ -1798,7 +1735,7 @@ void * mainloop(void * o)
if (irmd_get_state() != IRMD_RUNNING) {
close(cli_sockfd);
- thread_exit(id);
+ tpm_exit();
break;
}
@@ -1808,7 +1745,7 @@ void * mainloop(void * o)
continue;
}
- thread_dec();
+ tpm_dec();
if (msg->has_timeo_sec) {
assert(msg->has_timeo_nsec);
@@ -1937,7 +1874,7 @@ void * mainloop(void * o)
if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1947,7 +1884,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1956,7 +1893,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1971,70 +1908,7 @@ void * mainloop(void * o)
free(buffer.data);
close(cli_sockfd);
- thread_inc();
- }
-
- return (void *) 0;
-}
-
-void * threadpoolmgr(void * o)
-{
- pthread_attr_t pattr;
- struct timespec dl;
- struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
- (IRMD_TPM_TIMEOUT % 1000) * MILLION};
- (void) o;
-
- if (pthread_attr_init(&pattr))
- return (void *) -1;
-
- pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
-
- while (true) {
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
- if (irmd_get_state() != IRMD_RUNNING) {
- pthread_attr_destroy(&pattr);
- log_dbg("Waiting for threads to exit.");
- pthread_mutex_lock(&irmd.threads_lock);
- while (irmd.threads > 0)
- pthread_cond_wait(&irmd.threads_cond,
- &irmd.threads_lock);
- pthread_mutex_unlock(&irmd.threads_lock);
- log_dbg("Threadpool manager done.");
- break;
- }
-
- pthread_mutex_lock(&irmd.threads_lock);
-
- if (irmd.threads < IRMD_MIN_AV_THREADS) {
- log_dbg("Increasing threadpool.");
- irmd.max_threads = IRMD_MAX_AV_THREADS;
-
- while (irmd.threads < irmd.max_threads) {
- ssize_t id = bmp_allocate(irmd.thread_ids);
- if (!bmp_is_id_valid(irmd.thread_ids, id)) {
- log_warn("IRMd threadpool exhausted.");
- break;
- }
-
- if (pthread_create(&irmd.threadpool[id],
- &pattr, mainloop,
- (void *) id))
- log_warn("Failed to start new thread.");
- else
- ++irmd.threads;
- }
- }
-
- if (pthread_cond_timedwait(&irmd.threads_cond,
- &irmd.threads_lock,
- &dl) == ETIMEDOUT)
- if (irmd.threads > IRMD_MIN_AV_THREADS )
- --irmd.max_threads;
-
- pthread_mutex_unlock(&irmd.threads_lock);
+ tpm_inc();
}
return (void *) 0;
@@ -2043,7 +1917,6 @@ void * threadpoolmgr(void * o)
static int irm_init(void)
{
struct stat st;
- pthread_condattr_t cattr;
struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
(IRMD_ACCEPT_TIMEOUT % 1000) * 1000};
@@ -2066,24 +1939,6 @@ static int irm_init(void)
goto fail_flows_lock;
}
- if (pthread_mutex_init(&irmd.threads_lock, NULL)) {
- log_err("Failed to initialize mutex.");
- goto fail_threads_lock;
- }
-
- if (pthread_condattr_init(&cattr)) {
- log_err("Failed to initialize condattr.");
- goto fail_cattr;
- }
-
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- if (pthread_cond_init(&irmd.threads_cond, &cattr)) {
- log_err("Failed to initialize cond.");
- goto fail_threads_cond;
- }
-
list_head_init(&irmd.ipcps);
list_head_init(&irmd.api_table);
list_head_init(&irmd.apn_table);
@@ -2097,18 +1952,6 @@ static int irm_init(void)
goto fail_port_ids;
}
- irmd.thread_ids = bmp_create(IRMD_MAX_THREADS, 0);
- if (irmd.thread_ids == NULL) {
- log_err("Failed to thread thread_ids bitmap.");
- goto fail_thread_ids;
- }
-
- irmd.threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS);
- if (irmd.threadpool == NULL) {
- log_err("Failed to malloc threadpool");
- goto fail_thrpool;
- }
-
if ((irmd.lf = lockfile_create()) == NULL) {
if ((irmd.lf = lockfile_open()) == NULL) {
log_err("Lockfile error.");
@@ -2163,8 +2006,6 @@ static int irm_init(void)
goto fail_rdrbuff;
}
- irmd.threads = 0;
- irmd.max_threads = IRMD_MIN_AV_THREADS;
irmd.state = IRMD_RUNNING;
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2180,18 +2021,8 @@ fail_sock_path:
fail_stat:
lockfile_destroy(irmd.lf);
fail_lockfile:
- free(irmd.threadpool);
-fail_thrpool:
- bmp_destroy(irmd.thread_ids);
-fail_thread_ids:
bmp_destroy(irmd.port_ids);
fail_port_ids:
- pthread_cond_destroy(&irmd.threads_cond);
-fail_threads_cond:
- pthread_condattr_destroy(&cattr);
-fail_cattr:
- pthread_mutex_destroy(&irmd.threads_lock);
-fail_threads_lock:
pthread_rwlock_destroy(&irmd.flows_lock);
fail_flows_lock:
pthread_rwlock_destroy(&irmd.reg_lock);
@@ -2261,12 +2092,24 @@ int main(int argc,
exit(EXIT_FAILURE);
}
- pthread_create(&irmd.tpm, NULL, threadpoolmgr, NULL);
+ if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) {
+ log_fini();
+ exit(EXIT_FAILURE);
+ }
+
+ if (tpm_start()) {
+ tpm_fini();
+ log_fini();
+ exit(EXIT_FAILURE);
+ }
pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL);
pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb);
- pthread_join(irmd.tpm, NULL);
+ /* tpm_stop() called from sighandler */
+
+ tpm_fini();
+
pthread_join(irmd.irm_sanitize, NULL);
pthread_join(irmd.shm_sanitize, NULL);
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index e08869b8..fe4dd88c 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -11,7 +11,6 @@ protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS
protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS cdap.proto)
protobuf_generate_c(RO_PROTO_SRCS RO_PROTO_HDRS ro.proto)
protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto)
-protobuf_generate_c(FRCT_ENROLL_SRCS FRCT_ENROLL_HDRS frct_enroll.proto)
if (NOT APPLE)
find_library(LIBRT_LIBRARIES rt)
@@ -59,12 +58,13 @@ set(SOURCE_FILES
shm_rdrbuff.c
sockets.c
time_utils.c
+ tpm.c
utils.c
)
add_library(ouroboros SHARED ${SOURCE_FILES} ${IRM_PROTO_SRCS}
${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS}
- ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS} ${FRCT_ENROLL_SRCS})
+ ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS})
include(AddCompileFlags)
if (CMAKE_BUILD_TYPE MATCHES Debug)
@@ -77,9 +77,8 @@ else()
if (${LINUX_RND_HDR} STREQUAL "LINUX_RND_HDR-NOTFOUND")
find_package(OpenSSL)
if (NOT OPENSSL_FOUND)
- message(STATUS "No secure random generation, please install OpenSSL.")
+ message(FATAL_ERROR "No secure random generation, please install libssl.")
else()
- message(STATUS "OpenSSL found")
include_directories($OPENSSL_INCLUDE_DIR})
add_compile_flags(ouroboros -DHAVE_OPENSSL)
endif()
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 14971528..c8e43778 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -85,7 +85,6 @@ struct flow {
struct {
char * ap_name;
- char * daf_name;
pid_t api;
struct shm_rdrbuff * rdrb;
@@ -205,7 +204,7 @@ static int api_announce(char * ap_name)
return ret;
}
-static void init_flow(int fd)
+static void flow_clear(int fd)
{
assert(!(fd < 0));
@@ -216,9 +215,9 @@ static void init_flow(int fd)
ai.flows[fd].cube = QOS_CUBE_BE;
}
-static void reset_flow(int fd)
+static void flow_fini(int fd)
{
- assert (!(fd < 0));
+ assert(!(fd < 0));
if (ai.flows[fd].port_id != -1)
port_destroy(&ai.ports[ai.flows[fd].port_id]);
@@ -232,7 +231,59 @@ static void reset_flow(int fd)
if (ai.flows[fd].set != NULL)
shm_flow_set_close(ai.flows[fd].set);
- init_flow(fd);
+ flow_clear(fd);
+}
+
+static int flow_init(int port_id,
+ pid_t api,
+ qoscube_t qc)
+{
+ int fd;
+
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ return -EBADF;
+ }
+
+ ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ return -ENOMEM;
+ }
+
+ ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
+ flow_fini(fd);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ return -ENOMEM;
+ }
+
+ ai.flows[fd].set = shm_flow_set_open(api);
+ if (ai.flows[fd].set == NULL) {
+ flow_fini(fd);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ return -ENOMEM;
+ }
+
+ ai.flows[fd].port_id = port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = api;
+ ai.flows[fd].cube = qc;
+ ai.flows[fd].spec = qos_cube_to_spec(qc);
+
+ ai.ports[port_id].fd = fd;
+
+ port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+
+ return fd;
}
int ouroboros_init(const char * ap_name)
@@ -242,7 +293,6 @@ int ouroboros_init(const char * ap_name)
assert(ai.ap_name == NULL);
ai.api = getpid();
- ai.daf_name = NULL;
ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS);
if (ai.fds == NULL)
@@ -279,7 +329,7 @@ int ouroboros_init(const char * ap_name)
}
for (i = 0; i < AP_MAX_FLOWS; ++i)
- init_flow(i);
+ flow_clear(i);
ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
if (ai.ports == NULL) {
@@ -333,9 +383,6 @@ void ouroboros_fini()
shm_flow_set_destroy(ai.fqset);
- if (ai.daf_name != NULL)
- free(ai.daf_name);
-
if (ai.ap_name != NULL)
free(ai.ap_name);
@@ -346,7 +393,7 @@ void ouroboros_fini()
ssize_t idx;
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- reset_flow(i);
+ flow_fini(i);
}
}
@@ -368,13 +415,9 @@ void ouroboros_fini()
int flow_accept(qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int fd = -1;
- frct_enroll_msg_t * frct_enroll;
- qosspec_t spec;
- uint8_t data[BUF_SIZE];
- ssize_t n;
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ int fd = -1;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_api = true;
@@ -408,83 +451,15 @@ int flow_accept(qosspec_t * qs,
return -EIRMD;
}
- pthread_rwlock_wrlock(&ai.flows_lock);
-
- fd = bmp_allocate(ai.fds);
- if (!bmp_is_id_valid(ai.fds, fd)) {
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EBADF;
- }
-
- ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);
- if (ai.flows[fd].rx_rb == NULL) {
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -ENOMEM;
- }
-
- ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id);
- if (ai.flows[fd].tx_rb == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -ENOMEM;
- }
-
- ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
- if (ai.flows[fd].set == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -ENOMEM;
- }
-
- ai.flows[fd].port_id = recv_msg->port_id;
- ai.flows[fd].oflags = FLOW_O_DEFAULT;
- ai.flows[fd].api = recv_msg->api;
- ai.flows[fd].cube = recv_msg->qoscube;
-
- assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT);
-
- spec = qos_cube_to_spec(recv_msg->qoscube);
-
- ai.ports[recv_msg->port_id].fd = fd;
- ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
-
- pthread_rwlock_unlock(&ai.flows_lock);
+ fd = flow_init(recv_msg->port_id, recv_msg->api, recv_msg->qoscube);
irm_msg__free_unpacked(recv_msg, NULL);
- n = flow_read(fd, data, BUF_SIZE);
- if (n < 0) {
- flow_dealloc(fd);
- return n;
- }
-
- frct_enroll = frct_enroll_msg__unpack(NULL, n, data);
- if (frct_enroll == NULL) {
- flow_dealloc(fd);
- return -1;
- }
-
- spec.resource_control = frct_enroll->resource_control;
- spec.reliable = frct_enroll->reliable;
- spec.error_check = frct_enroll->error_check;
- spec.ordered = frct_enroll->ordered;
- spec.partial = frct_enroll->partial;
-
- frct_enroll_msg__free_unpacked(frct_enroll, NULL);
-
- pthread_rwlock_wrlock(&ai.flows_lock);
- ai.flows[fd].spec = spec;
- pthread_rwlock_unlock(&ai.flows_lock);
+ if (fd < 0)
+ return fd;
if (qs != NULL)
- *qs = spec;
+ *qs = ai.flows[fd].spec;
return fd;
}
@@ -493,14 +468,10 @@ int flow_alloc(const char * dst_name,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- frct_enroll_msg_t frct_enroll = FRCT_ENROLL_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- qoscube_t qc = QOS_CUBE_BE;
- int fd;
- ssize_t len;
- uint8_t * data;
- int ret;
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ qoscube_t qc = QOS_CUBE_BE;
+ int fd;
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst_name = (char *) dst_name;
@@ -508,15 +479,8 @@ int flow_alloc(const char * dst_name,
msg.has_qoscube = true;
msg.api = ai.api;
- if (qs != NULL) {
- frct_enroll.resource_control = qs->resource_control;
- frct_enroll.reliable = qs->reliable;
- frct_enroll.error_check = qs->error_check;
- frct_enroll.ordered = qs->ordered;
- frct_enroll.partial = qs->partial;
-
+ if (qs != NULL)
qc = qos_spec_to_cube(*qs);
- }
msg.qoscube = qc;
@@ -547,78 +511,10 @@ int flow_alloc(const char * dst_name,
return -EIRMD;
}
- pthread_rwlock_wrlock(&ai.flows_lock);
-
- fd = bmp_allocate(ai.fds);
- if (!bmp_is_id_valid(ai.fds, fd)) {
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EBADF;
- }
-
- ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);
- if (ai.flows[fd].rx_rb == NULL) {
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -ENOMEM;
- }
-
- ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id);
- if (ai.flows[fd].tx_rb == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -ENOMEM;
- }
-
- ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
- if (ai.flows[fd].set == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -ENOMEM;
- }
-
- ai.flows[fd].port_id = recv_msg->port_id;
- ai.flows[fd].oflags = FLOW_O_DEFAULT;
- ai.flows[fd].api = recv_msg->api;
- ai.flows[fd].cube = recv_msg->qoscube;
-
- assert(ai.ports[recv_msg->port_id].state == PORT_INIT);
-
- ai.ports[recv_msg->port_id].fd = fd;
- ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
+ fd = flow_init(recv_msg->port_id, recv_msg->api, qc);
irm_msg__free_unpacked(recv_msg, NULL);
- pthread_rwlock_unlock(&ai.flows_lock);
-
- len = frct_enroll_msg__get_packed_size(&frct_enroll);
- if (len < 0) {
- flow_dealloc(fd);
- return -1;
- }
-
- data = malloc(len);
- if (data == NULL) {
- flow_dealloc(fd);
- return -ENOMEM;
- }
-
- frct_enroll_msg__pack(&frct_enroll, data);
-
- ret = flow_write(fd, data, len);
- if (ret < 0) {
- flow_dealloc(fd);
- free(data);
- return ret;
- }
-
- free(data);
-
return fd;
}
@@ -657,7 +553,7 @@ int flow_dealloc(int fd)
pthread_rwlock_wrlock(&ai.flows_lock);
- reset_flow(fd);
+ flow_fini(fd);
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
@@ -1073,53 +969,11 @@ int flow_event_wait(struct flow_set * set,
/* ipcp-dev functions */
-int np1_flow_alloc(pid_t n_api,
- int port_id)
+int np1_flow_alloc(pid_t n_api,
+ int port_id,
+ qoscube_t qc)
{
- int fd;
-
- pthread_rwlock_wrlock(&ai.flows_lock);
-
- fd = bmp_allocate(ai.fds);
- if (!bmp_is_id_valid(ai.fds, fd)) {
- pthread_rwlock_unlock(&ai.flows_lock);
- return -1;
- }
-
- ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
- if (ai.flows[fd].rx_rb == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- return -1;
- }
-
- ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id);
- if (ai.flows[fd].tx_rb == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- return -1;
- }
-
- ai.flows[fd].set = shm_flow_set_open(n_api);
- if (ai.flows[fd].set == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- return -1;
- }
-
- ai.flows[fd].port_id = port_id;
- ai.flows[fd].oflags = FLOW_O_DEFAULT;
- ai.flows[fd].api = n_api;
-
- ai.ports[port_id].fd = fd;
- ai.ports[port_id].state = PORT_ID_ASSIGNED;
-
- pthread_rwlock_unlock(&ai.flows_lock);
-
- return fd;
+ return flow_init(port_id, n_api, qc);
}
int np1_flow_dealloc(int port_id)
@@ -1182,11 +1036,10 @@ int ipcp_create_r(pid_t api,
int ipcp_flow_req_arr(pid_t api,
const uint8_t * dst,
size_t len,
- qoscube_t cube)
+ qoscube_t qc)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int port_id = -1;
int fd = -1;
if (dst == NULL)
@@ -1199,88 +1052,24 @@ int ipcp_flow_req_arr(pid_t api,
msg.hash.len = len;
msg.hash.data = (uint8_t *) dst;
msg.has_qoscube = true;
- msg.qoscube = cube;
-
- pthread_rwlock_wrlock(&ai.flows_lock);
-
- fd = bmp_allocate(ai.fds);
- if (!bmp_is_id_valid(ai.fds, fd)) {
- pthread_rwlock_unlock(&ai.flows_lock);
- return -1; /* -ENOMOREFDS */
- }
-
- pthread_rwlock_unlock(&ai.flows_lock);
+ msg.qoscube = qc;
recv_msg = send_recv_irm_msg(&msg);
- pthread_rwlock_wrlock(&ai.flows_lock);
-
- if (recv_msg == NULL) {
- ai.ports[fd].state = PORT_INIT;
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
+ if (recv_msg == NULL)
return -EIRMD;
- }
if (!recv_msg->has_port_id || !recv_msg->has_api) {
- ai.ports[fd].state = PORT_INIT;
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
if (recv_msg->has_result && recv_msg->result) {
- ai.ports[fd].state = PORT_INIT;
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- port_id = recv_msg->port_id;
- if (port_id < 0) {
- ai.ports[fd].state = PORT_INIT;
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
- if (ai.flows[fd].rx_rb == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id);
- if (ai.flows[fd].tx_rb == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
- if (ai.flows[fd].set == NULL) {
- reset_flow(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ai.flows[fd].port_id = port_id;
- ai.flows[fd].oflags = FLOW_O_DEFAULT;
-
- ai.ports[port_id].fd = fd;
- port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
-
- pthread_rwlock_unlock(&ai.flows_lock);
+ fd = flow_init(recv_msg->port_id, recv_msg->api, qc);
irm_msg__free_unpacked(recv_msg, NULL);
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
index 67abbb5b..7660b1dd 100644
--- a/src/lib/shm_flow_set.c
+++ b/src/lib/shm_flow_set.c
@@ -117,7 +117,7 @@ struct shm_flow_set * shm_flow_set_create()
(set->fqueues + AP_MAX_FQUEUES * (SHM_BUFFER_SIZE));
pthread_mutexattr_init(&mattr);
-#ifndef __APPLE__
+#ifdef HAVE_ROBUST_MUTEX
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
#endif
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
@@ -336,7 +336,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
assert(idx < AP_MAX_FQUEUES);
assert(fqueue);
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(set->lock);
#else
if (pthread_mutex_lock(set->lock) == EOWNERDEAD)
@@ -358,7 +358,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
else
ret = -pthread_cond_wait(set->conds + idx,
set->lock);
-#ifndef __APPLE__
+#ifdef HAVE_ROBUST_MUTEX
if (ret == -EOWNERDEAD)
pthread_mutex_consistent(set->lock);
#endif
diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c
index 1e072b21..757f65c8 100644
--- a/src/lib/shm_rbuff_ll.c
+++ b/src/lib/shm_rbuff_ll.c
@@ -127,7 +127,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
rb->del = rb->add + 1;
pthread_mutexattr_init(&mattr);
-#ifndef __APPLE__
+#ifdef HAVE_ROBUST_MUTEX
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
#endif
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
@@ -299,7 +299,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
ts_add(&abstime, timeout, &abstime);
}
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
@@ -315,7 +315,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
&abstime);
else
idx = -pthread_cond_wait(rb->add, rb->lock);
-#ifndef __APPLE__
+#ifdef HAVE_ROBUST_MUTEX
if (idx == -EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
@@ -356,7 +356,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)
if (shm_rbuff_empty(rb))
return;
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
@@ -367,7 +367,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)
(void *) rb->lock);
while (!shm_rbuff_empty(rb))
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_cond_wait(rb->del, rb->lock);
#else
if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD)
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c
index d3c1e143..1b9f07d1 100644
--- a/src/lib/shm_rbuff_pthr.c
+++ b/src/lib/shm_rbuff_pthr.c
@@ -124,7 +124,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
rb->del = rb->add + 1;
pthread_mutexattr_init(&mattr);
-#ifndef __APPLE__
+#ifdef HAVE_ROBUST_MUTEX
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
#endif
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
@@ -231,7 +231,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
@@ -264,7 +264,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
assert(rb);
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
@@ -297,7 +297,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
ts_add(&abstime, timeout, &abstime);
}
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
@@ -313,7 +313,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
&abstime);
else
idx = -pthread_cond_wait(rb->add, rb->lock);
-#ifndef __APPLE__
+#ifdef HAVE_ROBUST_MUTEX
if (idx == -EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
@@ -334,7 +334,7 @@ void shm_rbuff_block(struct shm_rbuff * rb)
{
assert(rb);
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
@@ -349,7 +349,7 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)
{
assert(rb);
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
@@ -364,7 +364,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)
{
assert(rb);
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
@@ -376,7 +376,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)
(void *) rb->lock);
while (!shm_rbuff_empty(rb))
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_cond_wait(rb->del, rb->lock);
#else
if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD)
@@ -391,7 +391,7 @@ size_t shm_rbuff_queued(struct shm_rbuff * rb)
assert(rb);
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rb->lock);
#else
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index 1f999f93..d454fef8 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -192,7 +192,7 @@ struct shm_rdrbuff * shm_rdrbuff_create()
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
-#ifndef __APPLE__
+#ifdef HAVE_ROBUST_MUTEX
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
#endif
pthread_mutex_init(rdrb->lock, &mattr);
@@ -274,7 +274,7 @@ int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,
ts_add(&abstime, timeo, &abstime);
}
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
@@ -282,9 +282,9 @@ int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,
#endif
while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) {
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
if (pthread_cond_timedwait(rdrb->full,
- rdrb->lock
+ rdrb->lock,
&abstime) == ETIMEDOUT) {
pthread_mutex_unlock(rdrb->lock);
return -ETIMEDOUT;
@@ -358,7 +358,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
if (sz > SHM_RDRB_BLOCK_SIZE)
return -EMSGSIZE;
#endif
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
@@ -437,7 +437,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
if (sz > SHM_RDRB_BLOCK_SIZE)
return -EMSGSIZE;
#endif
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
@@ -535,7 +535,7 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
assert(rdrb);
assert(idx < (SHM_BUFFER_SIZE));
-#ifdef __APPLE__
+#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
diff --git a/src/lib/tests/time_utils_test.c b/src/lib/tests/time_utils_test.c
index 6d1ae32f..86636c15 100644
--- a/src/lib/tests/time_utils_test.c
+++ b/src/lib/tests/time_utils_test.c
@@ -28,12 +28,12 @@
static void ts_print(struct timespec * s)
{
- printf("timespec is %zd:%zd.\n", s->tv_sec, s->tv_nsec);
+ printf("timespec is %zd:%ld.\n", (ssize_t) s->tv_sec, s->tv_nsec);
}
static void tv_print(struct timeval * v)
{
- printf("timeval is %zd:%zd.\n", v->tv_sec, v->tv_usec);
+ printf("timeval is %zd:%ld.\n", (ssize_t) v->tv_sec, v->tv_usec);
}
static void ts_init(struct timespec * s,
diff --git a/src/lib/tpm.c b/src/lib/tpm.c
new file mode 100644
index 00000000..8298eeb5
--- /dev/null
+++ b/src/lib/tpm.c
@@ -0,0 +1,266 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Threadpool management
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#include <ouroboros/config.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/list.h>
+#include <ouroboros/time_utils.h>
+#include <ouroboros/tpm.h>
+
+#include <pthread.h>
+#include <stdlib.h>
+
+#define TPM_TIMEOUT 1000
+
+struct pthr_el {
+ struct list_head next;
+
+ bool join;
+
+ pthread_t thr;
+};
+
+enum tpm_state {
+ TPM_NULL = 0,
+ TPM_INIT,
+ TPM_RUNNING
+};
+
+struct {
+ size_t min;
+ size_t inc;
+ size_t max;
+ size_t cur;
+
+ void * (* func)(void *);
+
+ struct list_head pool;
+
+ enum tpm_state state;
+
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+
+ pthread_t mgr;
+} tpm;
+
+static void tpm_join(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ list_for_each_safe(p, h, &tpm.pool) {
+ struct pthr_el * e = list_entry(p, struct pthr_el, next);
+ if (tpm.state != TPM_RUNNING)
+ while (!e->join)
+ pthread_cond_wait(&tpm.cond, &tpm.lock);
+
+ if (e->join) {
+ pthread_join(e->thr, NULL);
+ list_del(&e->next);
+ free(e);
+ }
+ }
+}
+
+static void * tpmgr(void * o)
+{
+ struct timespec dl;
+ struct timespec to = {(TPM_TIMEOUT / 1000),
+ (TPM_TIMEOUT % 1000) * MILLION};
+ (void) o;
+
+ while (true) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_mutex_lock(&tpm.lock);
+
+ tpm_join();
+
+ if (tpm.state != TPM_RUNNING) {
+ tpm.max = 0;
+ tpm_join();
+ pthread_mutex_unlock(&tpm.lock);
+ break;
+ }
+
+ if (tpm.cur < tpm.min) {
+ tpm.max = tpm.inc;
+
+ while (tpm.cur < tpm.max) {
+ struct pthr_el * e = malloc(sizeof(*e));
+ if (e == NULL)
+ break;
+
+ e->join = false;
+
+ if (pthread_create(&e->thr, NULL,
+ tpm.func, NULL)) {
+ free(e);
+ } else {
+ list_add(&e->next, &tpm.pool);
+ ++tpm.cur;
+ }
+ }
+ }
+
+ if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl)
+ == ETIMEDOUT)
+ if (tpm.cur > tpm.min )
+ --tpm.max;
+
+ pthread_mutex_unlock(&tpm.lock);
+ }
+
+ return (void *) 0;
+}
+
+int tpm_init(size_t min,
+ size_t inc,
+ void * (* func)(void *))
+{
+ pthread_condattr_t cattr;
+
+ if (pthread_mutex_init(&tpm.lock, NULL))
+ goto fail_lock;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&tpm.cond, &cattr))
+ goto fail_cond;
+
+ list_head_init(&tpm.pool);
+
+ pthread_condattr_destroy(&cattr);
+
+ tpm.state = TPM_INIT;
+ tpm.func = func;
+ tpm.min = min;
+ tpm.inc = inc;
+ tpm.max = 0;
+ tpm.cur = 0;
+
+ return 0;
+
+ fail_cond:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(&tpm.lock);
+ fail_lock:
+ return -1;
+}
+
+int tpm_start(void)
+{
+ pthread_mutex_lock(&tpm.lock);
+
+ if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) {
+ pthread_mutex_unlock(&tpm.lock);
+ return -1;
+ }
+
+ tpm.state = TPM_RUNNING;
+
+ pthread_mutex_unlock(&tpm.lock);
+
+ return 0;
+}
+
+void tpm_stop(void)
+{
+ pthread_mutex_lock(&tpm.lock);
+
+ tpm.state = TPM_NULL;
+
+ pthread_mutex_unlock(&tpm.lock);
+}
+
+void tpm_fini(void)
+{
+ pthread_join(tpm.mgr, NULL);
+
+ pthread_mutex_destroy(&tpm.lock);
+ pthread_cond_destroy(&tpm.cond);
+}
+
+bool tpm_check(void)
+{
+ bool ret;
+
+ pthread_mutex_lock(&tpm.lock);
+
+ ret = tpm.cur > tpm.max;
+
+ pthread_mutex_unlock(&tpm.lock);
+
+ return ret;
+}
+
+void tpm_inc(void)
+{
+ pthread_mutex_lock(&tpm.lock);
+
+ ++tpm.cur;
+
+ pthread_mutex_unlock(&tpm.lock);
+}
+
+void tpm_dec(void)
+{
+ pthread_mutex_lock(&tpm.lock);
+
+ --tpm.cur;
+
+ pthread_cond_signal(&tpm.cond);
+
+ pthread_mutex_unlock(&tpm.lock);
+}
+
+void tpm_exit(void)
+{
+ struct list_head * p;
+ pthread_t id;
+
+ id = pthread_self();
+
+ pthread_mutex_lock(&tpm.lock);
+
+ --tpm.cur;
+
+ list_for_each(p, &tpm.pool) {
+ struct pthr_el * e = list_entry(p, struct pthr_el, next);
+ if (e->thr == id) {
+ e->join = true;
+ break;
+ }
+ }
+
+ pthread_cond_signal(&tpm.cond);
+
+ pthread_mutex_unlock(&tpm.lock);
+}