diff options
34 files changed, 3274 insertions, 890 deletions
| diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c7266eb..85040496 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,6 +81,12 @@ add_custom_target(check COMMAND ${CMAKE_CTEST_COMMAND})  find_package(ProtobufC REQUIRED)  include_directories(${PROTOBUF_C_INCLUDE_DIRS}) +include(CheckSymbolExists) +list(APPEND CMAKE_REQUIRED_DEFINITIONS -D_POSIX_C_SOURCE=200809L) +list(APPEND CMAKE_REQUIRED_DEFINITIONS -D__XSI_VISIBLE=500) +list(APPEND CMAKE_REQUIRED_LIBRARIES pthread) +check_symbol_exists(pthread_mutexattr_setrobust pthread.h HAVE_ROBUST_MUTEX) +  add_subdirectory(src)  add_subdirectory(include)  add_subdirectory(doc) diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index bae2d89e..4c255da5 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -3,7 +3,8 @@   *   * Configuration information   * - *    Sander Vrijders <sander.vrijders@intec.ugent.be> + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be>   *   * This library is free software; you can redistribute it and/or   * modify it under the terms of the GNU Lesser General Public License @@ -35,6 +36,7 @@  #define IPCP_SHIM_ETH_LLC_EXEC "@IPCP_SHIM_ETH_LLC_TARGET@"  #define IPCP_NORMAL_EXEC       "@IPCP_NORMAL_TARGET@"  #define IPCP_LOCAL_EXEC        "@IPCP_LOCAL_TARGET@" +#cmakedefine HAVE_ROBUST_MUTEX  #define AP_MAX_FLOWS           2048  #define AP_RES_FDS             64  #define AP_MAX_FQUEUES         64 @@ -50,25 +52,21 @@  #define SHM_FLOW_SET_PREFIX    "/ouroboros.sets."  #define IRMD_MAX_FLOWS         4096  /* IRMD dynamic threadpooling */ -#define IRMD_MIN_AV_THREADS    16 -#define IRMD_MAX_AV_THREADS    64 -#define IRMD_MAX_THREADS       256 +#define IRMD_MIN_THREADS       16 +#define IRMD_ADD_THREADS       32  /* IPCP dynamic threadpooling */ -#define IPCP_MIN_AV_THREADS    4 -#define IPCP_MAX_AV_THREADS    32 -#define IPCP_MAX_THREADS       64 - +#define IPCP_MIN_THREADS       4 +#define IPCP_ADD_THREADS       16 +#define IPCP_SCHED_THREADS     8  #define IPCPD_MAX_CONNS        IRMD_MAX_FLOWS  #define PTHREAD_COND_CLOCK     CLOCK_MONOTONIC  #define PFT_SIZE               1 << 12  /* Timeout values */ -#define IRMD_TPM_TIMEOUT       1000 -#define IPCP_TPM_TIMEOUT       1000  #define IRMD_ACCEPT_TIMEOUT    100  #define IRMD_REQ_ARR_TIMEOUT   500  #define IRMD_FLOW_TIMEOUT      5000  #define IPCP_ACCEPT_TIMEOUT    100 -#define SOCKET_TIMEOUT         4000 +#define SOCKET_TIMEOUT         10000  #define CDAP_REPLY_TIMEOUT     1000  #define ENROLL_TIMEOUT         2000 diff --git a/include/ouroboros/endian.h b/include/ouroboros/endian.h index 16200028..873aff73 100644 --- a/include/ouroboros/endian.h +++ b/include/ouroboros/endian.h @@ -24,7 +24,7 @@  #ifndef OUROBOROS_ENDIAN_H  #define OUROBOROS_ENDIAN_H -#if defined(__linux__) || defined(__CYGWIN__) +#if defined(__linux__) || defined(__CYGWIN__) || defined(__MACH__)  #ifndef _BSD_SOURCE  #define _BSD_SOURCE diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index a4e94b89..3db2a0dd 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -24,10 +24,13 @@  #ifndef OUROBOROS_NP1_FLOW_H  #define OUROBOROS_NP1_FLOW_H +#include <ouroboros/qoscube.h> +  #include <unistd.h> -int  np1_flow_alloc(pid_t n_api, -                    int   port_id); +int  np1_flow_alloc(pid_t     n_api, +                    int       port_id, +                    qoscube_t qc);  int  np1_flow_resp(int port_id); diff --git a/include/ouroboros/sockets.h b/include/ouroboros/sockets.h index 0d65c15d..660709bf 100644 --- a/include/ouroboros/sockets.h +++ b/include/ouroboros/sockets.h @@ -30,9 +30,6 @@  typedef IpcpConfigMsg ipcp_config_msg_t;  typedef DifInfoMsg dif_info_msg_t; -#include "frct_enroll.pb-c.h" -typedef FrctEnrollMsg frct_enroll_msg_t; -  #include "irmd_messages.pb-c.h"  typedef IrmMsg irm_msg_t; diff --git a/src/lib/frct_enroll.proto b/include/ouroboros/tpm.h index 497d6acc..d34f06f3 100644 --- a/src/lib/frct_enroll.proto +++ b/include/ouroboros/tpm.h @@ -1,10 +1,10 @@  /*   * Ouroboros - Copyright (C) 2016 - 2017   * - * QoS messages + * Threadpool management   * - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - *    Sander Vrijders   <sander.vrijders@intec.ugent.be> + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be>   *   * This library is free software; you can redistribute it and/or   * modify it under the terms of the GNU Lesser General Public License @@ -21,12 +21,27 @@   * 02110-1301 USA   */ -syntax = "proto2"; +#ifndef OUROBOROS_LIB_TPM_H +#define OUROBOROS_LIB_TPM_H -message frct_enroll_msg { -        required bool resource_control = 1; -        required bool reliable         = 2; -        required bool error_check      = 3; -        required bool ordered          = 4; -        required bool partial          = 5; -}; +#include <stdbool.h> + +int  tpm_init(size_t    min, +              size_t    inc, +              void * (* func)(void *)); + +int  tpm_start(void); + +void tpm_stop(void); + +void tpm_fini(void); + +bool tpm_check(void); + +void tpm_exit(void); + +void tpm_dec(void); + +void tpm_inc(void); + +#endif /* OUROBOROS_LIB_TPM_H */ diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt index 00baa762..e0b375b6 100644 --- a/src/ipcpd/CMakeLists.txt +++ b/src/ipcpd/CMakeLists.txt @@ -8,7 +8,7 @@ set(IPCP_SOURCES  add_subdirectory(local)  add_subdirectory(normal)  add_subdirectory(shim-udp) -if (NOT APPLE) +if (NOT APPLE AND NOT CMAKE_SYSTEM_NAME STREQUAL GNU)    add_subdirectory(shim-eth-llc)  endif ()  add_subdirectory(tests) diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index fdd1edc4..48ff046c 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -31,6 +31,7 @@  #include <ouroboros/dev.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/np1_flow.h> +#include <ouroboros/tpm.h>  #include "ipcp.h" @@ -56,6 +57,8 @@ void ipcp_sig_handler(int         sig,                          if (ipcp_get_state() == IPCP_OPERATIONAL)                                  ipcp_set_state(IPCP_SHUTDOWN);                  } + +                tpm_stop();          default:                  return;          } @@ -87,51 +90,7 @@ void ipcp_hash_str(char *          buf,          buf[2 * i] = '\0';  } -static void thread_inc(void) -{ -        pthread_mutex_lock(&ipcpi.threads_lock); - -        ++ipcpi.threads; -        pthread_cond_signal(&ipcpi.threads_cond); - -        pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static void thread_dec(void) -{ -        pthread_mutex_lock(&ipcpi.threads_lock); - -        --ipcpi.threads; -        pthread_cond_signal(&ipcpi.threads_cond); - -        pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static bool thread_check(void) -{ -        int ret; - -        pthread_mutex_lock(&ipcpi.threads_lock); - -        ret = ipcpi.threads > ipcpi.max_threads; - -        pthread_mutex_unlock(&ipcpi.threads_lock); - -        return ret; -} - -static void thread_exit(ssize_t id) -{ -        pthread_mutex_lock(&ipcpi.threads_lock); -        bmp_release(ipcpi.thread_ids, id); - -        --ipcpi.threads; -        pthread_cond_signal(&ipcpi.threads_cond); - -        pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static void * ipcp_main_loop(void * o) +static void * mainloop(void * o)  {          int                 lsockfd;          uint8_t             buf[IPCP_MSG_BUF_SIZE]; @@ -147,7 +106,7 @@ static void * ipcp_main_loop(void * o)          struct timeval      ltv      = {(SOCKET_TIMEOUT / 1000),                                          (SOCKET_TIMEOUT % 1000) * 1000}; -        ssize_t             id       = (ssize_t)  o; +        (void)  o;          while (true) {  #ifdef __FreeBSD__ @@ -159,8 +118,8 @@ static void * ipcp_main_loop(void * o)                  if (ipcp_get_state() == IPCP_SHUTDOWN ||                      ipcp_get_state() == IPCP_NULL || -                    thread_check()) { -                        thread_exit(id); +                    tpm_check()) { +                        tpm_exit();                          break;                  } @@ -192,7 +151,7 @@ static void * ipcp_main_loop(void * o)                          continue;                  } -                thread_dec(); +                tpm_dec();                  switch (msg->code) {                  case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -260,7 +219,6 @@ static void * ipcp_main_loop(void * o)                          ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dst_name,                                                                  &info); -                          if (ret_msg.result == 0) {                                  ret_msg.dif_info = &dif_info;                                  dif_info.dir_hash_algo = info.dir_hash_algo; @@ -332,7 +290,9 @@ static void * ipcp_main_loop(void * o)                                  break;                          } -                        fd = np1_flow_alloc(msg->api, msg->port_id); +                        fd = np1_flow_alloc(msg->api, +                                            msg->port_id, +                                            msg->qoscube);                          if (fd < 0) {                                  log_err("Failed allocating fd on port_id %d.",                                          msg->port_id); @@ -409,7 +369,7 @@ static void * ipcp_main_loop(void * o)                  if (buffer.len == 0) {                          log_err("Failed to pack reply message");                          close(lsockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  } @@ -417,7 +377,7 @@ static void * ipcp_main_loop(void * o)                  if (buffer.data == NULL) {                          log_err("Failed to create reply buffer.");                          close(lsockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  } @@ -427,14 +387,14 @@ static void * ipcp_main_loop(void * o)                          log_err("Failed to send reply message");                          free(buffer.data);                          close(lsockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  }                  free(buffer.data);                  close(lsockfd); -                thread_inc(); +                tpm_inc();          }          return (void *) 0; @@ -497,15 +457,6 @@ int ipcp_init(int               argc,          ipcpi.state     = IPCP_NULL;          ipcpi.shim_data = NULL; -        ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS); -        if (ipcpi.threadpool == NULL) { -                ret = -ENOMEM; -                goto fail_thr; -        } - -        ipcpi.threads = 0; -        ipcpi.max_threads = IPCP_MIN_AV_THREADS; -          ipcpi.sock_path = ipcp_sock_path(getpid());          if (ipcpi.sock_path == NULL)                  goto fail_sock_path; @@ -527,11 +478,6 @@ int ipcp_init(int               argc,                  goto fail_state_mtx;          } -        if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) { -                log_err("Could not create mutex."); -                goto fail_thread_lock; -        } -          if (pthread_condattr_init(&cattr)) {                  log_err("Could not create condattr.");                  goto fail_cond_attr; @@ -545,17 +491,6 @@ int ipcp_init(int               argc,                  goto fail_state_cond;          } -        if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) { -                log_err("Could not init condvar."); -                goto fail_thread_cond; -        } - -        ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0); -        if (ipcpi.thread_ids == NULL) { -                log_err("Could not init condvar."); -                goto fail_bmp; -        } -          if (pthread_mutex_init(&ipcpi.alloc_lock, NULL)) {                  log_err("Failed to init mutex.");                  goto fail_alloc_lock; @@ -588,94 +523,21 @@ int ipcp_init(int               argc,   fail_alloc_cond:          pthread_mutex_destroy(&ipcpi.alloc_lock);   fail_alloc_lock: -        bmp_destroy(ipcpi.thread_ids); - fail_bmp: -        pthread_cond_destroy(&ipcpi.threads_cond); - fail_thread_cond:          pthread_cond_destroy(&ipcpi.state_cond);   fail_state_cond:          pthread_condattr_destroy(&cattr);   fail_cond_attr: -        pthread_mutex_destroy(&ipcpi.threads_lock); - fail_thread_lock:          pthread_mutex_destroy(&ipcpi.state_mtx);   fail_state_mtx:          close(ipcpi.sockfd);   fail_serv_sock:          free(ipcpi.sock_path);   fail_sock_path: -        free(ipcpi.threadpool); - fail_thr:          ouroboros_fini();          return ret;  } -void * threadpoolmgr(void * o) -{ -        pthread_attr_t  pattr; -        struct timespec dl; -        struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), -                              (IRMD_TPM_TIMEOUT % 1000) * MILLION}; -        (void) o; - -        if (pthread_attr_init(&pattr)) -                return (void *) -1; - -        pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); - -        while (true) { -                clock_gettime(PTHREAD_COND_CLOCK, &dl); -                ts_add(&dl, &to, &dl); - -                if (ipcp_get_state() == IPCP_SHUTDOWN || -                    ipcp_get_state() == IPCP_NULL) { -                        pthread_attr_destroy(&pattr); -                        log_dbg("Waiting for threads to exit."); -                        pthread_mutex_lock(&ipcpi.threads_lock); -                        while (ipcpi.threads > 0) -                                pthread_cond_wait(&ipcpi.threads_cond, -                                                  &ipcpi.threads_lock); -                        pthread_mutex_unlock(&ipcpi.threads_lock); - -                        log_dbg("Threadpool manager done."); -                        break; -                } - -                pthread_mutex_lock(&ipcpi.threads_lock); - -                if (ipcpi.threads < IPCP_MIN_AV_THREADS) { -                        log_dbg("Increasing threadpool."); -                        ipcpi.max_threads = IPCP_MAX_AV_THREADS; - -                        while (ipcpi.threads < ipcpi.max_threads) { -                                ssize_t id = bmp_allocate(ipcpi.thread_ids); -                                if (!bmp_is_id_valid(ipcpi.thread_ids, id)) { -                                        log_warn("IPCP threadpool exhausted."); -                                        break; -                                } - -                                if (pthread_create(&ipcpi.threadpool[id], -                                                   &pattr, ipcp_main_loop, -                                                   (void *) id)) -                                        log_warn("Failed to start new thread."); -                                else -                                        ++ipcpi.threads; -                        } -                } - -                if (pthread_cond_timedwait(&ipcpi.threads_cond, -                                           &ipcpi.threads_lock, -                                           &dl) == ETIMEDOUT) -                        if (ipcpi.threads > IPCP_MIN_AV_THREADS) -                                --ipcpi.max_threads; - -                pthread_mutex_unlock(&ipcpi.threads_lock); -        } - -        return (void *) 0; -} -  int ipcp_boot()  {          struct sigaction sig_act; @@ -700,9 +562,15 @@ int ipcp_boot()          pthread_sigmask(SIG_BLOCK, &sigset, NULL); -        ipcp_set_state(IPCP_INIT); +        if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) +                return -1; + +        if (tpm_start()) { +                tpm_fini(); +                return -1; +        } -        pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL); +        ipcp_set_state(IPCP_INIT);          pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); @@ -711,8 +579,7 @@ int ipcp_boot()  void ipcp_shutdown()  { -        pthread_join(ipcpi.tpm, NULL); - +        tpm_fini();          log_info("IPCP %d shutting down.", getpid());  } @@ -722,16 +589,11 @@ void ipcp_fini()          if (unlink(ipcpi.sock_path))                  log_warn("Could not unlink %s.", ipcpi.sock_path); -        bmp_destroy(ipcpi.thread_ids); -          free(ipcpi.sock_path); -        free(ipcpi.threadpool);          shim_data_destroy(ipcpi.shim_data);          pthread_cond_destroy(&ipcpi.state_cond); -        pthread_cond_destroy(&ipcpi.threads_cond); -        pthread_mutex_destroy(&ipcpi.threads_lock);          pthread_mutex_destroy(&ipcpi.state_mtx);          pthread_cond_destroy(&ipcpi.alloc_cond);          pthread_mutex_destroy(&ipcpi.alloc_lock); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 3f5e1bd6..fb69df5c 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -93,15 +93,6 @@ struct ipcp {          pthread_cond_t     alloc_cond;          pthread_mutex_t    alloc_lock; -        pthread_t *        threadpool; - -        struct bmp *       thread_ids; -        size_t             max_threads; -        size_t             threads; -        pthread_cond_t     threads_cond; -        pthread_mutex_t    threads_lock; - -        pthread_t          tpm;  } ipcpi;  int             ipcp_init(int               argc, diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 336b0e8f..8c2d4efc 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -15,6 +15,8 @@ include_directories(${CMAKE_BINARY_DIR}/include)  set(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")  protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS flow_alloc.proto) +protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto) +  # Add GPB sources of policies last  protobuf_generate_c(FSO_SRCS FSO_HDRS pol/fso.proto) @@ -22,6 +24,7 @@ set(SOURCE_FILES    # Add source files here    addr_auth.c    connmgr.c +  dht.c    dir.c    dt.c    dt_pci.c @@ -42,7 +45,7 @@ set(SOURCE_FILES    )  add_executable(ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES} -  ${FLOW_ALLOC_SRCS} ${FSO_SRCS}) +  ${FLOW_ALLOC_SRCS} ${FSO_SRCS} ${KAD_PROTO_SRCS})  target_link_libraries(ipcpd-normal LINK_PUBLIC ouroboros)  include(AddCompileFlags) @@ -53,3 +56,4 @@ endif (CMAKE_BUILD_TYPE MATCHES Debug)  install(TARGETS ipcpd-normal RUNTIME DESTINATION sbin)  add_subdirectory(pol/tests) +add_subdirectory(tests) diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c new file mode 100644 index 00000000..74618658 --- /dev/null +++ b/src/ipcpd/normal/dht.c @@ -0,0 +1,2383 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Distributed Hash Table based on Kademlia + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#define OUROBOROS_PREFIX "dht" + +#include <ouroboros/config.h> +#include <ouroboros/hash.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/list.h> +#include <ouroboros/random.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/utils.h> + +#include "dht.h" +#include "dt.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> + +#include "kademlia.pb-c.h" +typedef KadMsg kad_msg_t; +typedef KadContactMsg kad_contact_msg_t; + +#define DHT_MAX_REQS  2048 /* KAD recommends rnd(), bmp can be changed.  */ +#define KAD_ALPHA     3    /* Parallel factor, proven optimal value.     */ +#define KAD_K         8    /* Replication factor, MDHT value.            */ +#define KAD_T_REPL    900  /* Replication time, tied to k. MDHT value.   */ +#define KAD_T_REFR    900  /* Refresh time stale bucket, MDHT value.     */ +#define KAD_T_JOIN    6    /* Response time to wait for a join.          */ +#define KAD_T_RESP    2    /* Response time to wait for a response.      */ +#define KAD_R_PING    2    /* Ping retries before declaring peer dead.   */ +#define KAD_QUEER     15   /* Time to declare peer questionable.         */ +#define KAD_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */ + +enum dht_state { +        DHT_INIT = 0, +        DHT_RUNNING, +        DHT_SHUTDOWN, +}; + +enum kad_code { +        KAD_JOIN = 0, +        KAD_FIND_NODE, +        KAD_FIND_VALUE, +        /* Messages without a response below. */ +        KAD_STORE, +        KAD_RESPONSE +}; + +enum kad_req_state { +        REQ_NULL = 0, +        REQ_INIT, +        REQ_PENDING, +        REQ_RESPONSE, +        REQ_DONE, +        REQ_DESTROY +}; + +enum lookup_state { +        LU_NULL = 0, +        LU_INIT, +        LU_PENDING, +        LU_UPDATE, +        LU_COMPLETE, +        LU_DONE, +        LU_DESTROY +}; + +struct kad_req { +        struct list_head   next; + +        uint32_t           cookie; +        enum kad_code      code; +        uint8_t *          key; +        uint64_t           addr; + +        enum kad_req_state state; +        pthread_cond_t     cond; +        pthread_mutex_t    lock; + +        time_t             t_exp; +}; + +struct lookup { +        struct list_head  next; + +        uint8_t *         key; + +        struct list_head  contacts; +        size_t            n_contacts; + +        uint64_t *        addrs; +        size_t            n_addrs; + +        enum lookup_state state; +        pthread_cond_t    cond; +        pthread_mutex_t   lock; +}; + +struct val { +        struct list_head next; + +        uint64_t         addr; + +        time_t           t_exp; +        time_t           t_rep; +}; + +struct ref_entry { +        struct list_head next; + +        uint8_t *        key; + +        time_t           t_rep; +}; + +struct dht_entry { +        struct list_head next; + +        uint8_t *        key; +        size_t           n_vals; +        struct list_head vals; +}; + +struct contact { +        struct list_head next; + +        uint8_t *        id; +        uint64_t         addr; + +        size_t           fails; +        time_t           t_seen; +}; + +struct bucket { +        struct list_head contacts; +        size_t           n_contacts; + +        struct list_head alts; +        size_t           n_alts; + +        time_t           t_refr; + +        size_t           depth; +        uint8_t          mask; + +        struct bucket *  parent; +        struct bucket *  children[1L << KAD_BETA]; +}; + +struct dht { +        size_t           alpha; +        size_t           b; +        size_t           k; + +        time_t           t_expire; +        time_t           t_refresh; +        time_t           t_replic; +        time_t           t_repub; + +        uint8_t *        id; +        uint64_t         addr; + +        struct bucket *  buckets; + +        struct list_head entries; + +        struct list_head refs; + +        struct list_head lookups; + +        struct list_head requests; +        struct bmp *     cookies; + +        enum dht_state   state; +        pthread_mutex_t  mtx; + +        pthread_rwlock_t lock; + +        int              fd; + +        pthread_t        worker; +}; + +static uint8_t * dht_dup_key(const uint8_t * key, +                             size_t          len) +{ +        uint8_t * dup; + +        dup = malloc(sizeof(*dup) * len); +        if (dup == NULL) +                return NULL; + +        memcpy(dup, key, len); + +        return dup; +} + +static enum dht_state dht_get_state(struct dht * dht) +{ +        enum dht_state state; + +        pthread_mutex_lock(&dht->mtx); + +        state = dht->state; + +        pthread_mutex_unlock(&dht->mtx); + +        return state; +} + +static void dht_set_state(struct dht *   dht, +                          enum dht_state state) +{ +        pthread_mutex_lock(&dht->mtx); + +        dht->state = state; + +        pthread_mutex_unlock(&dht->mtx); +} + +static uint8_t * create_id(size_t len) +{ +        uint8_t * id; + +        id = malloc(len); +        if (id == NULL) +                return NULL; + +        if (random_buffer(id, len) < 0) { +                free(id); +                return NULL; +        } + +        return id; +} + +static struct kad_req * kad_req_create(struct dht * dht, +                                       kad_msg_t *  msg, +                                       uint64_t     addr) +{ +        struct kad_req *   req; +        pthread_condattr_t cattr; +        struct timespec    t; + +        req = malloc(sizeof(*req)); +        if (req == NULL) +                return NULL; + +        list_head_init(&req->next); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        req->t_exp  = t.tv_sec + KAD_T_RESP; +        req->addr   = addr; +        req->state  = REQ_INIT; +        req->cookie = msg->cookie; +        req->code   = msg->code; +        req->key    = NULL; + +        if (msg->has_key) { +                req->key = dht_dup_key(msg->key.data, dht->b); +                if (req->key == NULL) { +                        free(req); +                        return NULL; +                } +        } + +        if (pthread_mutex_init(&req->lock, NULL)) { +                free(req->key); +                free(req); +                return NULL; +        } + +        pthread_condattr_init(&cattr); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + +        if (pthread_cond_init(&req->cond, &cattr)) { +                pthread_condattr_destroy(&cattr); +                pthread_mutex_destroy(&req->lock); +                free(req->key); +                free(req); +                return NULL; +        } + +        pthread_condattr_destroy(&cattr); + +        return req; +} + +static void kad_req_destroy(struct kad_req * req) +{ +        assert(req); + +        if (req->key != NULL) +                free(req->key); + +        pthread_mutex_lock(&req->lock); + +        switch (req->state) { +        case REQ_DESTROY: +                pthread_mutex_unlock(&req->lock); +                return; +        case REQ_PENDING: +                req->state = REQ_DESTROY; +                pthread_cond_signal(&req->cond); +                break; +        case REQ_INIT: +        case REQ_DONE: +                req->state = REQ_NULL; +                break; +        case REQ_RESPONSE: +        case REQ_NULL: +        default: +                break; +        } + +        while (req->state != REQ_NULL) +                pthread_cond_wait(&req->cond, &req->lock); + +        pthread_mutex_unlock(&req->lock); + +        pthread_cond_destroy(&req->cond); +        pthread_mutex_destroy(&req->lock); + +        free(req); +} + +static int kad_req_wait(struct kad_req * req, +                        time_t           t) +{ +        struct timespec timeo = {t, 0}; +        struct timespec abs; +        int ret = 0; + +        assert(req); + +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        ts_add(&abs, &timeo, &abs); + +        pthread_mutex_lock(&req->lock); + +        req->state = REQ_PENDING; + +        while (req->state == REQ_PENDING && ret != -ETIMEDOUT) +                ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); + +        switch(req->state) { +        case REQ_DESTROY: +                ret = -1; +                req->state = REQ_NULL; +                pthread_cond_signal(&req->cond); +                break; +        case REQ_PENDING: /* ETIMEDOUT */ +        case REQ_RESPONSE: +                req->state = REQ_DONE; +                pthread_cond_signal(&req->cond); +                break; +        default: +                break; +        } + +        pthread_mutex_unlock(&req->lock); + +        return ret; +} + +static void kad_req_respond(struct kad_req * req) +{ +        pthread_mutex_lock(&req->lock); + +        req->state = REQ_RESPONSE; +        pthread_cond_signal(&req->cond); + +        pthread_mutex_unlock(&req->lock); +} + +static struct contact * contact_create(const uint8_t * id, +                                       size_t          len, +                                       uint64_t        addr) +{ +        struct contact * c; +        struct timespec  t; + +        c = malloc(sizeof(*c)); +        if (c == NULL) +                return NULL; + +        list_head_init(&c->next); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        c->addr   = addr; +        c->fails  = 0; +        c->t_seen = t.tv_sec; +        c->id     = dht_dup_key(id, len); +        if (c->id == NULL) { +                free(c); +                return NULL; +        } + +        return c; +} + +static void contact_destroy(struct contact * c) +{ +        if (c != NULL) +                free(c->id); + +        free(c); +} + +static struct bucket * iter_bucket(struct bucket * b, +                                   const uint8_t * id) +{ +        uint8_t byte; +        uint8_t mask; + +        assert(b); + +        if (b->children[0] == NULL) +                return b; + +        byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + +        mask = ((1L << KAD_BETA) - 1) & 0xFF; + +        byte >>= (CHAR_BIT - KAD_BETA) - +                (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + +        return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * dht_get_bucket(struct dht *    dht, +                                      const uint8_t * id) +{ +        assert(dht->buckets); + +        return iter_bucket(dht->buckets, id); +} + +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, +                     const uint8_t * dst) +{ +        return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +} + +static size_t list_add_sorted(struct list_head * l, +                              struct contact *   c, +                              const uint8_t *    key) +{ +        struct list_head * p; + +        assert(l); +        assert(c); +        assert(key); +        assert(c->id); + +        list_for_each(p, l) { +                struct contact * e = list_entry(p, struct contact, next); +                if (dist(c->id, key) > dist(e->id, key)) +                        break; +        } + +        list_add_tail(&c->next, p); + +        return 1; +} + +static size_t dht_contact_list(struct dht *       dht, +                               struct list_head * l, +                               const uint8_t *    key) +{ +        struct list_head * p; +        struct bucket *    b; +        size_t             len = 0; +        size_t             i; +        struct timespec    t; + +        assert(l); +        assert(dht); +        assert(key); +        assert(list_is_empty(l)); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        b = dht_get_bucket(dht, key); +        if (b == NULL) +                return 0; + +        b->t_refr = t.tv_sec + KAD_T_REFR; + +        if (b->n_contacts == dht->k || b->parent == NULL) { +                list_for_each(p, &b->contacts) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        c = contact_create(c->id, dht->b, c->addr); +                        if (list_add_sorted(l, c, key) == 1) +                                if (++len > dht->k) +                                        break; +                } +        } else { +                struct bucket * d = b->parent; +                for (i = 0; i < (1L << KAD_BETA); ++i) { +                        list_for_each(p, &d->children[i]->contacts) { +                                struct contact * c; +                                c = list_entry(p, struct contact, next); +                                c = contact_create(c->id, dht->b, c->addr); +                                if (c == NULL) +                                        continue; +                                if (list_add_sorted(l, c, key) == 1) +                                        if (++len > dht->k) +                                                break; +                        } +                } +        } + +        assert(len == dht->k || b->parent == NULL); + +        return len; +} + +static struct lookup * lookup_create(struct dht *    dht, +                                     const uint8_t * id) +{ +        struct lookup *    lu; +        pthread_condattr_t cattr; + +        assert(dht); +        assert(id); + +        lu = malloc(sizeof(*lu)); +        if (lu == NULL) +                goto fail_malloc; + +        list_head_init(&lu->contacts); + +        lu->state   = LU_INIT; +        lu->addrs   = NULL; +        lu->n_addrs = 0; +        lu->key     = dht_dup_key(id, dht->b); +        if (lu->key == NULL) +                goto fail_id; + +        if (pthread_mutex_init(&lu->lock, NULL)) +                goto fail_mutex; + +        pthread_condattr_init(&cattr); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + +        if (pthread_cond_init(&lu->cond, &cattr)) +                goto fail_cond; + +        pthread_condattr_destroy(&cattr); + +        pthread_rwlock_wrlock(&dht->lock); + +        list_add(&lu->next, &dht->lookups); + +        lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + +        pthread_rwlock_unlock(&dht->lock); + +        return lu; + + fail_cond: +        pthread_condattr_destroy(&cattr); +        pthread_mutex_destroy(&lu->lock); + fail_mutex: +        free(lu->key); + fail_id: +        free(lu); + fail_malloc: +        return NULL; +} + +static void lookup_destroy(struct lookup * lu) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(lu); + +        pthread_mutex_lock(&lu->lock); + +        switch (lu->state) { +        case LU_DESTROY: +                pthread_mutex_unlock(&lu->lock); +                return; +        case LU_PENDING: +                lu->state = LU_DESTROY; +                pthread_cond_signal(&lu->cond); +                break; +        case LU_INIT: +        case LU_DONE: +        case LU_UPDATE: +        case LU_COMPLETE: +                lu->state = REQ_NULL; +                break; +        case LU_NULL: +        default: +                break; +        } + +        while (lu->state != LU_NULL) +                pthread_cond_wait(&lu->cond, &lu->lock); + +        if (lu->key != NULL) +                free(lu->key); +        if (lu->addrs != NULL) +                free(lu->addrs); + +        list_for_each_safe(p, h, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +        } + +        pthread_mutex_unlock(&lu->lock); + +        pthread_cond_destroy(&lu->cond); +        pthread_mutex_destroy(&lu->lock); + +        free(lu); +} + +static void lookup_update(struct dht *    dht, +                          struct lookup * lu, +                          kad_msg_t *     msg) +{ +        struct list_head * p = NULL; +        struct contact *   c = NULL; +        size_t             n; +        size_t             pos = 0; + +        assert(lu); +        assert(msg); + +        if (dht_get_state(dht) != DHT_RUNNING) +                return; + +        pthread_mutex_lock(&lu->lock); + +        if (msg->n_addrs > 0) { +                if (lu->addrs == NULL) { +                        lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); +                        for (n = 0; n < msg->n_addrs; ++n) +                                lu->addrs[n] = msg->addrs[n]; +                        lu->n_addrs = msg->n_addrs; +                } +                lu->state = LU_COMPLETE; +                pthread_cond_signal(&lu->cond); +                pthread_mutex_unlock(&lu->lock); +                return; +        } + +        while (lu->state == LU_INIT) +                pthread_cond_wait(&lu->cond, &lu->lock); + +        for (n = 0; n < msg->n_contacts; ++n) { +                c = contact_create(msg->contacts[n]->id.data, +                                   dht->b, msg->contacts[n]->addr); +                if (c == NULL) +                        continue; + +                list_for_each(p, &lu->contacts) { +                        struct contact * e; +                        e = list_entry(p, struct contact, next); +                        if (!memcmp(e->id, c->id, dht->b)) { +                                contact_destroy(c); +                                c = NULL; +                                break; +                        } + +                        if (dist(c->id, lu->key) > dist(e->id, lu->key)) +                                break; +                        pos++; +                } + +                if (c == NULL) +                        continue; + +                if (lu->n_contacts < dht->k) { +                        list_add_tail(&c->next, p); +                        ++lu->n_contacts; +                } else if (pos == dht->k) { +                        contact_destroy(c); +                        continue; +                } else { +                        struct contact * d; +                        list_add_tail(&c->next, p); +                        d = list_last_entry(&lu->contacts, struct contact, next); +                        list_del(&d->next); +                        contact_destroy(d); +                } +        } + +        lu->state = LU_UPDATE; +        pthread_cond_signal(&lu->cond); +        pthread_mutex_unlock(&lu->lock); +        return; +} + +static ssize_t lookup_get_addrs(struct lookup * lu, +                                uint64_t *      addrs) +{ +        ssize_t n; + +        assert(lu); + +        pthread_mutex_lock(&lu->lock); + +        for (n = 0; (size_t) n < lu->n_addrs; ++n) +                addrs[n] = lu->addrs[n]; + +        assert((size_t) n == lu->n_addrs); + +        pthread_mutex_unlock(&lu->lock); + +        return n; +} + +static ssize_t lookup_contact_addrs(struct lookup * lu, +                                    uint64_t *      addrs) +{ +        struct list_head * p; +        ssize_t            n = 0; + +        assert(lu); +        assert(addrs); + +        pthread_mutex_lock(&lu->lock); + +        list_for_each(p, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                addrs[n] = c->addr; +                n++; +        } + +        pthread_mutex_unlock(&lu->lock); + +        return n; +} + +static void lookup_new_addrs(struct lookup * lu, +                                uint64_t *      addrs) +{ +        struct list_head * p; +        size_t             n = 0; + +        assert(lu); +        assert(addrs); + +        pthread_mutex_lock(&lu->lock); + +        /* Uses fails to check if the contact has been contacted. */ +        list_for_each(p, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                if (c->fails == 0) { +                        c->fails = 1; +                        addrs[n] = c->addr; +                        n++; +                } + +                if (n == KAD_ALPHA) +                        break; +        } + +        assert(n <= KAD_ALPHA); + +        addrs[n] = 0; + +        if (n == 0) +                lu->state = LU_DONE; + +        pthread_mutex_unlock(&lu->lock); +} + +static enum lookup_state lookup_wait(struct lookup * lu) +{ +        enum lookup_state state; + +        pthread_mutex_lock(&lu->lock); + +        lu->state = LU_PENDING; +        pthread_cond_signal(&lu->cond); + +        pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu); + +        while (lu->state == LU_PENDING) +                pthread_cond_wait(&lu->cond, &lu->lock); + +        pthread_cleanup_pop(false); + +        state = lu->state; + +        pthread_mutex_unlock(&lu->lock); + +        return state; +} + +static struct kad_req * dht_find_request(struct dht * dht, +                                         kad_msg_t *  msg) +{ +        struct list_head * p; + +        assert(dht); +        assert(msg); + +        list_for_each(p, &dht->requests) { +                struct kad_req * r = list_entry(p, struct kad_req, next); +                if (r->cookie == msg->cookie) +                        return r; +        } + +        return NULL; +} + +static struct lookup * dht_find_lookup(struct dht *    dht, +                                       const uint8_t * key) +{ +        struct list_head * p; + +        assert(dht); +        assert(key); + +        list_for_each(p, &dht->lookups) { +                struct lookup * l = list_entry(p, struct lookup, next); +                if (!memcmp(l->key, key, dht->b)) +                        return l; +        } + +        return NULL; +} + +static struct val * val_create(uint64_t addr, +                               time_t   exp) +{ +        struct val *    v; +        struct timespec t; + +        v = malloc(sizeof(*v)); +        if (v == NULL) +                return NULL; + +        list_head_init(&v->next); +        v->addr = addr; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        v->t_exp = t.tv_sec + exp; +        v->t_rep = t.tv_sec + KAD_T_REPL; + +        return v; +} + +static void val_destroy(struct val * v) +{ +        assert(v); + +        free(v); +} + +static struct ref_entry * ref_entry_create(struct dht *    dht, +                                           const uint8_t * key) +{ +        struct ref_entry * e; +        struct timespec    t; + +        assert(dht); +        assert(key); + +        e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        e->key = dht_dup_key(key, dht->b); +        if (e->key == NULL) { +                free(e); +                return NULL; +        } + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        e->t_rep = t.tv_sec + dht->t_repub; + +        return e; +} + +static void ref_entry_destroy(struct ref_entry * e) +{ +        free(e->key); +        free(e); +} + +static struct dht_entry * dht_entry_create(struct dht *    dht, +                                           const uint8_t * key) +{ +        struct dht_entry * e; + +        assert(dht); +        assert(key); + +        e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        list_head_init(&e->next); +        list_head_init(&e->vals); + +        e->n_vals = 0; + +        e->key = dht_dup_key(key, dht->b); +        if (e->key == NULL) { +                free(e); +                return NULL; +        } + +        return e; +} + +static void dht_entry_destroy(struct dht_entry * e) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(e); + +        list_for_each_safe(p, h, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                list_del(&v->next); +                val_destroy(v); +        } + +        free(e->key); + +        free(e); +} + +static int dht_entry_add_addr(struct dht_entry * e, +                              uint64_t           addr, +                              time_t             exp) +{ +        struct list_head * p; +        struct val * val; +        struct timespec t; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr == addr) { +                        if (v->t_exp < t.tv_sec + exp) { +                                v->t_exp = t.tv_sec + exp; +                                v->t_rep = t.tv_sec + KAD_T_REPL; +                        } + +                        return 0; +                } +        } + +        val = val_create(addr, exp); +        if (val == NULL) +                return -ENOMEM; + +        list_add(&val->next, &e->vals); +        ++e->n_vals; + +        return 0; +} + + +static void dht_entry_del_addr(struct dht_entry * e, +                               uint64_t           addr) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(e); + +        list_for_each_safe(p, h, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr == addr) { +                        list_del(&v->next); +                        val_destroy(v); +                        --e->n_vals; +                } +        } + +        if (e->n_vals == 0) { +                list_del(&e->next); +                dht_entry_destroy(e); +        } +} + +static uint64_t dht_entry_get_addr(struct dht *       dht, +                                   struct dht_entry * e) +{ +        struct list_head * p; + +        assert(e); +        assert(!list_is_empty(&e->vals)); + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr != dht->addr) +                        return v->addr; +        } + +        return 0; +} + +/* Forward declaration. */ +static struct lookup * kad_lookup(struct dht *    dht, +                                  const uint8_t * key, +                                  enum kad_code   code); + + +/* Build a refresh list. */ +static void bucket_refresh(struct dht *       dht, +                           struct bucket *    b, +                           time_t             t, +                           struct list_head * r) +{ +        size_t i; + +        if (*b->children != NULL) +                for (i = 0; i < (1L << KAD_BETA); ++i) +                        bucket_refresh(dht, b->children[i], t, r); + +        if (b->n_contacts == 0) +                return; + +        if (t > b->t_refr) { +                struct contact * c; +                struct contact * d; +                c = list_first_entry(&b->contacts, struct contact, next); +                d = contact_create(c->id, dht->b, c->addr); +                if (c != NULL) +                        list_add(&d->next, r); +                return; +        } +} + + +static struct bucket * bucket_create(void) +{ +        struct bucket * b; +        struct timespec t; +        size_t          i; + +        b = malloc(sizeof(*b)); +        if (b == NULL) +                return NULL; + +        list_head_init(&b->contacts); +        b->n_contacts = 0; + +        list_head_init(&b->alts); +        b->n_alts = 0; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); +        b->t_refr = t.tv_sec + KAD_T_REFR; + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                b->children[i]  = NULL; + +        b->parent = NULL; +        b->depth = 0; + +        return b; +} + +static void bucket_destroy(struct bucket * b) +{ +        struct list_head * p; +        struct list_head * h; +        size_t             i; + +        assert(b); + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                if (b->children[i] != NULL) +                        bucket_destroy(b->children[i]); + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +                --b->n_contacts; +        } + +        list_for_each_safe(p, h, &b->alts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +                --b->n_contacts; +        } + +        free(b); +} + +static bool bucket_has_id(struct bucket * b, +                          const uint8_t * id) +{ +        uint8_t mask; +        uint8_t byte; + +        if (b->depth == 0) +                return true; + +        byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + +        mask = ((1L << KAD_BETA) - 1) & 0xFF; + +        byte >>= (CHAR_BIT - KAD_BETA) - +                (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + +        return ((byte & mask) == b->mask); +} + +static int split_bucket(struct bucket * b) +{ +        struct list_head * p; +        struct list_head * h; +        uint8_t mask = 0; +        size_t i; +        size_t c; + +        assert(b); +        assert(b->n_alts == 0); +        assert(b->n_contacts); +        assert(b->children[0] == NULL); + +        c = b->n_contacts; + +        for (i = 0; i < (1L << KAD_BETA); ++i) { +                b->children[i] = bucket_create(); +                if (b->children[i] == NULL) { +                        size_t j; +                        for (j = 0; j < i; ++j) +                                bucket_destroy(b->children[j]); +                        return -1; +                } + +                b->children[i]->depth  = b->depth + 1; +                b->children[i]->mask   = mask; +                b->children[i]->parent = b; + +                list_for_each_safe(p, h, &b->contacts) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        if (bucket_has_id(b->children[i], c->id)) { +                                list_del(&c->next); +                                --b->n_contacts; +                                list_add(&c->next, &b->children[i]->contacts); +                                ++b->children[i]->n_contacts; +                        } +                } + +                mask++; +        } + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                if (b->children[i]->n_contacts == c) +                        split_bucket(b->children[i]); + +        return 0; +} + +/* Locked externally to mandate update as (final) part of join transaction. */ +static int dht_update_bucket(struct dht *    dht, +                             const uint8_t * id, +                             uint64_t        addr) +{ +        struct list_head * p; +        struct list_head * h; +        struct bucket *    b; +        struct contact *   c; + +        assert(dht); + +        b = dht_get_bucket(dht, id); +        if (b == NULL) +                return -1; + +        c = contact_create(id, dht->b, addr); +        if (c == NULL) +                return -1; + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * d = list_entry(p, struct contact, next); +                if (d->addr == addr) { +                        list_del(&d->next); +                        contact_destroy(d); +                        --b->n_contacts; +                } +        } + +        if (b->n_contacts == dht->k) { +                if (bucket_has_id(b, dht->id)) { +                        list_add_tail(&c->next, &b->contacts); +                        ++b->n_contacts; +                        if (split_bucket(b)) { +                                list_del(&c->next); +                                contact_destroy(c); +                                --b->n_contacts; +                        } +                } else if (b->n_alts == dht->k) { +                        struct contact * d; +                        d = list_first_entry(&b->alts, struct contact, next); +                        list_del(&d->next); +                        contact_destroy(d); +                        list_add_tail(&c->next, &b->alts); +                } else { +                        list_add_tail(&c->next, &b->alts); +                        ++b->n_alts; +                } +        } else { +                list_add_tail(&c->next, &b->contacts); +                ++b->n_contacts; +        } + +        return 0; +} + +static int send_msg(struct dht * dht, +                    kad_msg_t *  msg, +                    uint64_t     addr) +{ +        struct shm_du_buff * sdb; +        struct kad_req *     req; +        size_t               len; +        int                  retr = 0; + +        if (msg->code == KAD_RESPONSE) +                retr = KAD_RESP_RETR; + +        pthread_rwlock_wrlock(&dht->lock); + +        if (dht->id != NULL) { +                msg->has_s_id = true; +                msg->s_id.data = dht->id; +                msg->s_id.len  = dht->b; +        } + +        msg->s_addr = dht->addr; + +        if (msg->code < KAD_STORE) { +                msg->cookie = bmp_allocate(dht->cookies); +                if (!bmp_is_id_valid(dht->cookies, msg->cookie)) +                        goto fail_bmp_alloc; +        } + +        len = kad_msg__get_packed_size(msg); +        if (len == 0) +                goto fail_msg; + +        if (ipcp_sdb_reserve(&sdb, len)) +                goto fail_msg; + +        kad_msg__pack(msg, shm_du_buff_head(sdb)); + +#ifndef __DHT_TEST__ +        while (retr >= 0) { +                if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb)) +                        retr--; +                else +                        break; +                sleep(1); +        } + +        if (retr < 0) +                goto fail_write; +#else +        (void) addr; +        (void) retr; +        ipcp_sdb_release(sdb); +#endif /* __DHT_TEST__ */ + +        if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) { +                req = kad_req_create(dht, msg, addr); +                if (req != NULL) +                        list_add(&req->next, &dht->requests); +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; + +#ifndef __DHT_TEST__ + fail_write: +        ipcp_sdb_release(sdb); +#endif + fail_msg: +        bmp_release(dht->cookies, msg->cookie); + fail_bmp_alloc: +        pthread_rwlock_unlock(&dht->lock); +        return -1; +} + +static struct dht_entry * dht_find_entry(struct dht *    dht, +                                         const uint8_t * key) +{ +        struct list_head * p; + +        list_for_each(p, &dht->entries) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                if (!memcmp(key, e->key, dht->b)) +                        return e; +        } + +        return NULL; +} + +static int kad_add(struct dht *              dht, +                   const kad_contact_msg_t * contacts, +                   ssize_t                   n, +                   time_t                    exp) +{ +        struct dht_entry * e; + +        pthread_rwlock_wrlock(&dht->lock); + +        while (n-- > 0) { +                if (contacts[n].id.len != dht->b) +                        log_warn("Bad key length in contact data."); + +                e = dht_find_entry(dht, contacts[n].id.data); +                if (e != NULL) { +                        if (dht_entry_add_addr(e, contacts[n].addr, exp)) +                                goto fail; +                } else { +                        e = dht_entry_create(dht, contacts[n].id.data); +                        if (e == NULL) +                                goto fail; + +                        if (dht_entry_add_addr(e, contacts[n].addr, exp)) { +                                dht_entry_destroy(e); +                                goto fail; +                        } + +                        list_add(&e->next, &dht->entries); +                } +        } + +        pthread_rwlock_unlock(&dht->lock); +        return 0; + + fail: +        pthread_rwlock_unlock(&dht->lock); +        return -ENOMEM; +} + +static int wait_resp(struct dht * dht, +                     kad_msg_t *  msg, +                     time_t       timeo) +{ +        struct kad_req * req; + +        assert(dht); +        assert(msg); + +        pthread_rwlock_rdlock(&dht->lock); + +        req = dht_find_request(dht, msg); +        if (req == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -EPERM; +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return kad_req_wait(req, timeo); +} + +static int kad_store(struct dht *    dht, +                     const uint8_t * key, +                     uint64_t        addr, +                     uint64_t        r_addr, +                     time_t          ttl) +{ +        kad_msg_t msg = KAD_MSG__INIT; +        kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; +        kad_contact_msg_t * cmsgp[1]; + +        cmsg.id.data = (uint8_t *) key; +        cmsg.id.len  = dht->b; +        cmsg.addr    = addr; + +        cmsgp[0] = &cmsg; + +        msg.code         = KAD_STORE; +        msg.has_t_expire = true; +        msg.t_expire     = ttl; +        msg.n_contacts   = 1; +        msg.contacts     = cmsgp; + +        if (send_msg(dht, &msg, r_addr)) +                return -1; + +        return 0; +} + +static ssize_t kad_find(struct dht *     dht, +                        const uint8_t *  key, +                        const uint64_t * addrs, +                        enum kad_code    code) +{ +        kad_msg_t msg  = KAD_MSG__INIT; +        ssize_t   sent = 0; + +        assert(dht); +        assert(key); + +        msg.code = code; + +        msg.has_key       = true; +        msg.key.data      = (uint8_t *) key; +        msg.key.len       = dht->b; + +        while (*addrs != 0) { +                if (*addrs != dht->addr) { +                        send_msg(dht, &msg, *addrs); +                        sent++; +                } +                ++addrs; +        } + +        return sent; +} + +static void lookup_set_state(struct lookup *   lu, +                             enum lookup_state state) +{ +        pthread_mutex_lock(&lu->lock); + +        lu->state = state; +        pthread_cond_signal(&lu->cond); + +        pthread_mutex_unlock(&lu->lock); +} + +static struct lookup * kad_lookup(struct dht *    dht, +                                  const uint8_t * id, +                                  enum kad_code   code) +{ +        uint64_t          addrs[KAD_ALPHA + 1]; +        enum lookup_state state; +        struct lookup *   lu; + +        lu = lookup_create(dht, id); +        if (lu == NULL) +                return NULL; + +        lookup_new_addrs(lu, addrs); + +        if (addrs[0] == 0) { +                pthread_rwlock_wrlock(&dht->lock); +                list_del(&lu->next); +                pthread_rwlock_unlock(&dht->lock); +                lookup_destroy(lu); +                return NULL; +        } + +        if (kad_find(dht, id, addrs, code) == 0) { +                pthread_rwlock_wrlock(&dht->lock); +                list_del(&lu->next); +                pthread_rwlock_unlock(&dht->lock); +                lu->state = LU_COMPLETE; +                return lu; +        } + +        while ((state = lookup_wait(lu)) != LU_COMPLETE) { +                switch (state) { +                case LU_UPDATE: +                        lookup_new_addrs(lu, addrs); +                        if (addrs[0] == 0) { +                                pthread_rwlock_wrlock(&dht->lock); +                                list_del(&lu->next); +                                pthread_rwlock_unlock(&dht->lock); +                                return lu; +                        } + +                        kad_find(dht, id, addrs, code); +                        break; +                case LU_DESTROY: +                        pthread_rwlock_wrlock(&dht->lock); +                        list_del(&lu->next); +                        pthread_rwlock_unlock(&dht->lock); +                        lookup_set_state(lu, LU_NULL); +                        return NULL; +                default: +                        break; +                }; +        } + +        assert(state = LU_COMPLETE); + +        pthread_rwlock_wrlock(&dht->lock); +        list_del(&lu->next); +        pthread_rwlock_unlock(&dht->lock); + +        return lu; +} + +static void kad_publish(struct dht *    dht, +                        const uint8_t * key, +                        uint64_t        addr, +                        time_t          exp) +{ +        struct lookup * lu; +        uint64_t        addrs[KAD_K]; +        ssize_t         n; + +        assert(dht); +        assert(key); + +        lu = kad_lookup(dht, key, KAD_FIND_NODE); +        if (lu == NULL) +                return; + +        n = lookup_contact_addrs(lu, addrs); + +        while (n-- > 0) { +                if (addrs[n] == dht->addr) { +                        kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; +                        msg.id.data = (uint8_t *) key; +                        msg.id.len  = dht->b; +                        msg.addr    = addr; +                        kad_add(dht, &msg, 1, exp); +                } else { +                        if (kad_store(dht, key, addr, addrs[n], dht->t_expire)) +                                log_warn("Failed to send store message."); +                } +        } + +        lookup_destroy(lu); +} + +static int kad_join(struct dht * dht, +                    uint64_t     addr) +{ +        kad_msg_t       msg = KAD_MSG__INIT; +        struct lookup * lu; + +        msg.code = KAD_JOIN; + +        msg.has_alpha       = true; +        msg.has_b           = true; +        msg.has_k           = true; +        msg.has_t_refresh   = true; +        msg.has_t_replicate = true; +        msg.alpha           = KAD_ALPHA; +        msg.b               = dht->b; +        msg.k               = KAD_K; +        msg.t_refresh       = KAD_T_REFR; +        msg.t_replicate     = KAD_T_REPL; + +        if (send_msg(dht, &msg, addr)) +                return -1; + +        if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) +                return -1; + +        dht->id = create_id(dht->b); +        if (dht->id == NULL) +                return -1; + +        pthread_rwlock_wrlock(&dht->lock); + +        dht_update_bucket(dht, dht->id, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); +        if (lu != NULL) +                lookup_destroy(lu); + +        return 0; +} + +static void dht_dead_peer(struct dht * dht, +                          uint8_t *    key, +                          uint64_t     addr) +{ +        struct list_head * p; +        struct list_head * h; +        struct bucket *    b; + +        b = dht_get_bucket(dht, key); + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                if (b->n_contacts + b->n_alts <= dht->k) { +                        ++c->fails; +                        return; +                } + +                if (c->addr == addr) { +                        list_del(&c->next); +                        contact_destroy(c); +                        --b->n_contacts; +                        break; +                } +        } + +        while (b->n_contacts < dht->k && b->n_alts > 0) { +                struct contact * c; +                c = list_first_entry(&b->alts, struct contact, next); +                list_del(&c->next); +                --b->n_alts; +                list_add(&c->next, &b->contacts); +                ++b->n_contacts; +        } +} + +static int dht_del(struct dht *    dht, +                   const uint8_t * key, +                   uint64_t        addr) +{ +        struct dht_entry * e; + +        pthread_rwlock_wrlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -EPERM; +        } + +        dht_entry_del_addr(e, addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +static buffer_t dht_retrieve(struct dht *    dht, +                             const uint8_t * key) +{ +        struct dht_entry * e; +        struct list_head * p; +        buffer_t           buf; +        uint64_t *         pos; + +        buf.len = 0; + +        pthread_rwlock_rdlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return buf; +        } + +        buf.data = malloc(sizeof(dht->addr) * e->n_vals); +        if (buf.data == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return buf; +        } + +        buf.len = e->n_vals; + +        pos = (uint64_t *) buf.data;; + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                *pos++ = v->addr; +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return buf; +} + +static ssize_t dht_get_contacts(struct dht *          dht, +                                const uint8_t *       key, +                                kad_contact_msg_t *** msgs) +{ +        struct list_head   l; +        struct list_head * p; +        struct list_head * h; +        size_t             len; +        size_t             i = 0; + +        list_head_init(&l); + +        pthread_rwlock_rdlock(&dht->lock); + +        len = dht_contact_list(dht, &l, key); +        if (len == 0) +                return 0; + +        *msgs = malloc(len * sizeof(**msgs)); +        if (*msgs == NULL) +                return 0; + +        list_for_each_safe(p, h, &l) { +                struct contact * c = list_entry(p, struct contact, next); +                (*msgs)[i] = malloc(sizeof(***msgs)); +                if ((*msgs)[i] == NULL) { +                        pthread_rwlock_unlock(&dht->lock); +                        while (i > 0) +                                free(*msgs[--i]); +                        free(*msgs); +                        return 0; +                } + +                kad_contact_msg__init((*msgs)[i]); + +                (*msgs)[i]->id.data = c->id; +                (*msgs)[i]->id.len  = dht->b; +                (*msgs)[i++]->addr  = c->addr; +                list_del(&c->next); +                free(c); +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return i; +} + +static time_t gcd(time_t a, +                  time_t b) +{ +        if (a == 0) +                return b; + +        return gcd(b % a, a); +} + +static void * work(void * o) +{ +        struct dht *       dht; +        struct timespec    now; +        struct list_head * p; +        struct list_head * h; +        struct list_head   reflist; +        time_t             intv; +        struct lookup *    lu; + +        dht = (struct dht *) o; + +        intv = gcd(dht->t_expire, dht->t_repub); +        intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + +        list_head_init(&reflist); + +        while (true) { +                clock_gettime(CLOCK_REALTIME_COARSE, &now); + +                pthread_rwlock_wrlock(&dht->lock); + +                /* Republish registered hashes. */ +                list_for_each_safe(p, h, &dht->refs) { +                        struct ref_entry * e; +                        e = list_entry(p, struct ref_entry, next); +                        if (now.tv_sec > e->t_rep) { +                                kad_publish(dht, e->key, dht->addr, +                                            dht->t_expire); +                                e->t_rep = now.tv_sec + dht->t_repub; +                        } +                } + +                /* Remove stale entries and republish if necessary. */ +                list_for_each_safe(p, h, &dht->entries) { +                        struct list_head * p1; +                        struct list_head * h1; +                        struct dht_entry * e; +                        e = list_entry (p, struct dht_entry, next); +                        list_for_each_safe(p1, h1, &e->vals) { +                                struct val * v; +                                v = list_entry(p1, struct val, next); +                                if (now.tv_sec > v->t_exp) { +                                        list_del(&v->next); +                                        val_destroy(v); +                                 } + +                                if (now.tv_sec > v->t_rep) { +                                        kad_publish(dht, e->key, v->addr, +                                                    dht->t_expire - now.tv_sec); +                                        v->t_rep = now.tv_sec + dht->t_replic; +                                } +                        } +                } + +                /* Check the requests list for unresponsive nodes. */ +                list_for_each_safe(p, h, &dht->requests) { +                        struct kad_req * r; +                        r = list_entry(p, struct kad_req, next); +                        if (now.tv_sec > r->t_exp) { +                                list_del(&r->next); +                                bmp_release(dht->cookies, r->cookie); +                                dht_dead_peer(dht, r->key, r->addr); +                                kad_req_destroy(r); +                        } +                } + +                /* Refresh unaccessed buckets. */ +                bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + +                pthread_rwlock_unlock(&dht->lock); + +                list_for_each_safe(p, h, &reflist) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        lu = kad_lookup(dht, c->id, KAD_FIND_NODE); +                        if (lu != NULL) +                                lookup_destroy(lu); +                        list_del(&c->next); +                        contact_destroy(c); +                } + +                sleep(intv); +        } + +        return (void *) 0; +} + +static int kad_handle_join_resp(struct dht *     dht, +                                struct kad_req * req, +                                kad_msg_t *      msg) +{ +        assert(dht); +        assert(req); +        assert(msg); + +        /* We might send version numbers later to warn of updates if needed. */ +        if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && +              msg->has_t_refresh && msg->has_t_replicate)) { +                log_warn("Join refused by remote."); +                return -1; +        } + +        if (msg->b < sizeof(uint64_t)) { +                log_err("Hash sizes less than 8 bytes unsupported."); +                return -1; +        } + +        pthread_rwlock_wrlock(&dht->lock); + +        dht->buckets = bucket_create(); +        if (dht->buckets == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        /* Likely corrupt packet. The member will refuse, we might here too. */ +        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) +                log_warn("Different kademlia parameters detected."); + +        if (msg->t_replicate != KAD_T_REPL) +                log_warn("Different kademlia replication time detected."); + +        if (msg->t_refresh != KAD_T_REFR) +                log_warn("Different kademlia refresh time detected."); + +        dht->k        = msg->k; +        dht->b        = msg->b; +        dht->t_expire = msg->t_expire; +        dht->t_repub  = MAX(1, dht->t_expire - 10); + +        if (pthread_create(&dht->worker, NULL, work, dht)) { +                bucket_destroy(dht->buckets); +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        dht->state = DHT_RUNNING; + +        kad_req_respond(req); + +        dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + +        pthread_rwlock_unlock(&dht->lock); + +        log_dbg("Enrollment of DHT completed."); + +        return 0; +} + +static int kad_handle_find_resp(struct dht *     dht, +                                struct kad_req * req, +                                kad_msg_t *      msg) +{ +        struct lookup * lu; + +        assert(dht); +        assert(req); +        assert(msg); + +        pthread_rwlock_rdlock(&dht->lock); + +        lu = dht_find_lookup(dht, req->key); +        if (lu == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        lookup_update(dht, lu, msg); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +static void kad_handle_response(struct dht * dht, +                                kad_msg_t *  msg) +{ +        struct kad_req * req; + +        assert(dht); +        assert(msg); + +        pthread_rwlock_wrlock(&dht->lock); + +        req = dht_find_request(dht, msg); +        if (req == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return; +        } + +        bmp_release(dht->cookies, req->cookie); +        list_del(&req->next); + +        pthread_rwlock_unlock(&dht->lock); + +        switch(req->code) { +        case KAD_JOIN: +                if (kad_handle_join_resp(dht, req, msg)) +                        log_err("Enrollment of DHT failed."); +                break; +        case KAD_FIND_VALUE: +        case KAD_FIND_NODE: +                if (dht_get_state(dht) != DHT_RUNNING) +                        return; +                kad_handle_find_resp(dht, req, msg); +                break; +        default: +                break; +        } + +        kad_req_destroy(req); +} + +int dht_bootstrap(struct dht * dht, +                  size_t       b, +                  time_t       t_expire) +{ +        assert(dht); + +        pthread_rwlock_wrlock(&dht->lock); + +        dht->id = create_id(b); +        if (dht->id == NULL) +                goto fail_id; + +        dht->buckets = bucket_create(); +        if (dht->buckets == NULL) +                goto fail_buckets; + +        dht->buckets->depth = 0; +        dht->buckets->mask  = 0; + +        dht->b        = b / CHAR_BIT; +        dht->t_expire = MAX(2, t_expire); +        dht->t_repub  = MAX(1, t_expire - 10); +        dht->k        = KAD_K; + +        if (pthread_create(&dht->worker, NULL, work, dht)) +                goto fail_pthread_create; + +        dht->state = DHT_RUNNING; + +        dht_update_bucket(dht, dht->id, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; + + fail_pthread_create: +        bucket_destroy(dht->buckets); +        dht->buckets = NULL; + fail_buckets: +        free(dht->id); +        dht->id = NULL; + fail_id: +        pthread_rwlock_unlock(&dht->lock); +        return -1; +} + +int dht_enroll(struct dht * dht, +               uint64_t     addr) +{ +        assert(dht); + +        return kad_join(dht, addr); +} + +int dht_reg(struct dht *    dht, +            const uint8_t * key) +{ +        struct ref_entry * e; + +        assert(dht); +        assert(key); +        assert(dht->addr != 0); + +        if (dht_get_state(dht) != DHT_RUNNING) +                return -1; + +        e = ref_entry_create(dht, key); +        if (e == NULL) +                return -ENOMEM; + +        pthread_rwlock_wrlock(&dht->lock); + +        list_add(&e->next, &dht->refs); + +        pthread_rwlock_unlock(&dht->lock); + +        kad_publish(dht, key, dht->addr, dht->t_expire); + +        return 0; +} + +int dht_unreg(struct dht *    dht, +              const uint8_t * key) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(dht); +        assert(key); + +        if (dht_get_state(dht) != DHT_RUNNING) +                return -1; + +        pthread_rwlock_wrlock(&dht->lock); + +        list_for_each_safe(p, h, &dht->refs) { +                struct ref_entry * r = list_entry(p, struct ref_entry, next); +                if (!memcmp(key, r->key, dht-> b) ) { +                        list_del(&r->next); +                        ref_entry_destroy(r); +                } +        } + +        dht_del(dht, key, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +uint64_t dht_query(struct dht *    dht, +                   const uint8_t * key) +{ +        struct dht_entry * e; +        struct lookup *    lu; +        uint64_t           addrs[KAD_K]; +        size_t             n; + +        addrs[0] = 0; + +        pthread_rwlock_rdlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e != NULL) +                addrs[0] = dht_entry_get_addr(dht, e); + +        pthread_rwlock_unlock(&dht->lock); + +        if (addrs[0] != 0 && addrs[0] != dht->addr) +                return addrs[0]; + +        lu = kad_lookup(dht, key, KAD_FIND_VALUE); +        if (lu == NULL) +                return 0; + +        n = lookup_get_addrs(lu, addrs); +        if (n == 0) { +                lookup_destroy(lu); +                return 0; +        } + +        lookup_destroy(lu); + +        /* Current behaviour is anycast and return the first peer address. */ +        if (addrs[0] != dht->addr) +                return addrs[0]; + +        if (n > 1) +                return addrs[1]; + +        return 0; +} + +void dht_post_sdu(void *               ae, +                  struct shm_du_buff * sdb) +{ +        struct dht *         dht; +        kad_msg_t *          msg; +        kad_contact_msg_t ** cmsgs; +        kad_msg_t            resp_msg = KAD_MSG__INIT; +        uint64_t             addr; +        buffer_t             buf; +        size_t               i; + +        assert(ae); +        assert(sdb); + +        memset(&buf, 0, sizeof(buf)); + +        dht = (struct dht *) ae; + +        msg = kad_msg__unpack(NULL, +                              shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), +                              shm_du_buff_head(sdb)); + +        ipcp_sdb_release(sdb); + +        if (msg == NULL) { +                log_err("Failed to unpack message."); +                return; +        } + +        if (msg->has_key && msg->key.len != dht->b) { +                kad_msg__free_unpacked(msg, NULL); +                log_warn("Bad key in message."); +                return; +        } + +        if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) { +                kad_msg__free_unpacked(msg, NULL); +                log_warn("Bad source ID in message of type %d.", msg->code); +                return; +        } + +        if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) { +                kad_msg__free_unpacked(msg, NULL); +                return; +        } + +        addr = msg->s_addr; + +        resp_msg.code   = KAD_RESPONSE; +        resp_msg.cookie = msg->cookie; + +        switch(msg->code) { +        case KAD_JOIN: +                /* Refuse enrollee on check fails. */ +                if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { +                        log_warn("Parameter mismatch. " +                                 "DHT enrolment refused."); +                        break; +                } + +                if (msg->t_replicate != KAD_T_REPL) { +                        log_warn("Replication time mismatch. " +                                 "DHT enrolment refused."); + +                        break; +                } + +                if (msg->t_refresh != KAD_T_REFR) { +                        log_warn("Refresh time mismatch. " +                                 "DHT enrolment refused."); +                        break; +                } + +                resp_msg.has_alpha       = true; +                resp_msg.has_b           = true; +                resp_msg.has_k           = true; +                resp_msg.has_t_expire    = true; +                resp_msg.has_t_refresh   = true; +                resp_msg.has_t_replicate = true; +                resp_msg.alpha           = KAD_ALPHA; +                resp_msg.b               = dht->b; +                resp_msg.k               = KAD_K; +                resp_msg.t_expire        = dht->t_expire; +                resp_msg.t_refresh       = KAD_T_REFR; +                resp_msg.t_replicate     = KAD_T_REPL; +                break; +        case KAD_FIND_VALUE: +                buf = dht_retrieve(dht, msg->key.data); +                if (buf.len != 0) { +                        resp_msg.n_addrs = buf.len; +                        resp_msg.addrs   = (uint64_t *) buf.data; +                        break; +                } +                /* FALLTHRU */ +        case KAD_FIND_NODE: +                /* Return k closest contacts. */ +                resp_msg.n_contacts = +                        dht_get_contacts(dht, msg->key.data, &cmsgs); +                resp_msg.contacts = cmsgs; +                break; +        case KAD_STORE: +                if (msg->n_contacts < 1) { +                        log_warn("No contacts in store message."); +                        break; +                } + +                if (!msg->has_t_expire) { +                        log_warn("No expiry time in store message."); +                        break; +                } + +                kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); +                break; +        case KAD_RESPONSE: +                kad_handle_response(dht, msg); +                break; +        default: +                assert(false); +                break; +        } + +        if (msg->code != KAD_JOIN) { +                pthread_rwlock_wrlock(&dht->lock); +                if (dht_update_bucket(dht, msg->s_id.data, addr)) +                        log_warn("Failed to update bucket."); +                pthread_rwlock_unlock(&dht->lock); +        } + +        if (msg->code < KAD_STORE) { +                if (send_msg(dht, &resp_msg, addr)) +                        log_warn("Failed to send response."); +        } + +        kad_msg__free_unpacked(msg, NULL); + +        if (resp_msg.n_addrs > 0) +                free(resp_msg.addrs); + +        if (resp_msg.n_contacts == 0) +                return; + +        for (i = 0; i < resp_msg.n_contacts; ++i) +                kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); +        free(resp_msg.contacts); +} + +void dht_destroy(struct dht * dht) +{ +        struct list_head * p; +        struct list_head * h; + +        if (dht == NULL) +                return; + +        if (dht_get_state(dht) == DHT_RUNNING) +                dht_set_state(dht, DHT_SHUTDOWN); + +        pthread_rwlock_wrlock(&dht->lock); + +        list_for_each_safe(p, h, &dht->entries) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                list_del(&e->next); +                dht_entry_destroy(e); +        } + +        list_for_each_safe(p, h, &dht->requests) { +                struct kad_req * r = list_entry(p, struct kad_req, next); +                list_del(&r->next); +                free(r); +        } + +        list_for_each_safe(p, h, &dht->refs) { +                struct ref_entry * e = list_entry(p, struct ref_entry, next); +                list_del(&e->next); +                ref_entry_destroy(e); +        } + +        list_for_each_safe(p, h, &dht->lookups) { +                struct lookup * l = list_entry(p, struct lookup, next); +                list_del(&l->next); +                lookup_destroy(l); +        } + +        pthread_rwlock_unlock(&dht->lock); + +        if (dht_get_state(dht) == DHT_SHUTDOWN) { +                pthread_cancel(dht->worker); +                pthread_join(dht->worker, NULL); +        } + +        if (dht->buckets != NULL) +                bucket_destroy(dht->buckets); + +        bmp_destroy(dht->cookies); + +        pthread_mutex_destroy(&dht->mtx); + +        pthread_rwlock_destroy(&dht->lock); + +        free(dht->id); + +        free(dht); +} + +struct dht * dht_create(uint64_t addr) +{ +        struct dht * dht; + +        dht = malloc(sizeof(*dht)); +        if (dht == NULL) +                goto fail_malloc; + +        dht->buckets = NULL; + +        list_head_init(&dht->entries); +        list_head_init(&dht->requests); +        list_head_init(&dht->refs); +        list_head_init(&dht->lookups); + +        if (pthread_rwlock_init(&dht->lock, NULL)) +                goto fail_rwlock; + +        if (pthread_mutex_init(&dht->mtx, NULL)) +                goto fail_mutex; + +        dht->cookies = bmp_create(DHT_MAX_REQS, 1); +        if (dht->cookies == NULL) +                goto fail_bmp; + +        dht->b    = 0; +        dht->addr = addr; +        dht->id   = NULL; +#ifndef __DHT_TEST__ +        dht->fd   = dt_reg_ae(dht, &dht_post_sdu); +#endif /* __DHT_TEST__ */ + +        dht->state = DHT_INIT; + +        return dht; + + fail_bmp: +        pthread_mutex_destroy(&dht->mtx); + fail_mutex: +        pthread_rwlock_destroy(&dht->lock); + fail_rwlock: +        free(dht); + fail_malloc: +        return NULL; +} diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h new file mode 100644 index 00000000..5d7fc894 --- /dev/null +++ b/src/ipcpd/normal/dht.h @@ -0,0 +1,54 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Distributed Hash Table based on Kademlia + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_DHT_H +#define OUROBOROS_IPCPD_NORMAL_DHT_H + +#include <ouroboros/ipcp-dev.h> + +#include <stdint.h> +#include <sys/types.h> + +struct dht; + +struct dht * dht_create(uint64_t addr); + +int          dht_bootstrap(struct dht * dht, +                           size_t       b, +                           time_t       t_expire); + +int          dht_enroll(struct dht * dht, +                        uint64_t     addr); + +void         dht_destroy(struct dht * dht); + +int          dht_reg(struct dht *    dht, +                     const uint8_t * key); + +int          dht_unreg(struct dht *    dht, +                       const uint8_t * key); + +uint64_t     dht_query(struct dht *    dht, +                       const uint8_t * key); + +#endif /* OUROBOROS_IPCPD_NORMAL_DHT_H */ diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 5ea8a300..69b7e90e 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -20,129 +20,134 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ +#define OUROBOROS_PREFIX "directory" +  #include <ouroboros/config.h> +#include <ouroboros/endian.h>  #include <ouroboros/errno.h> +#include <ouroboros/logs.h>  #include <ouroboros/rib.h> +#include <ouroboros/utils.h>  #include "dir.h" +#include "dht.h"  #include "ipcp.h"  #include "ribconfig.h"  #include <stdlib.h>  #include <string.h>  #include <assert.h> +#include <inttypes.h> -static char dir_path[RIB_MAX_PATH_LEN + 1]; +#define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT) +#define ENROL_RETR 6 +#define ENROL_INTV 1 -static void dir_path_reset(void) { -        dir_path[strlen(DIR_PATH)]= '\0'; -        assert(strcmp(DIR_PATH, dir_path) == 0); -} +struct dht * dht; -int dir_init(void) +static uint64_t find_peer_addr(void)  { -        /* FIXME: set ribmgr dissemination here */ -        if (rib_add(RIB_ROOT, DIR_NAME)) -                return -1; +        ssize_t  i; +        char ** members; +        ssize_t n_members; +        size_t  reset; +        char    path[RIB_MAX_PATH_LEN + 1]; -        strcpy(dir_path, DIR_PATH); +        strcpy(path, MEMBERS_PATH); -        return 0; -} +        reset = strlen(path); -int dir_fini(void) -{ -        /* FIXME: remove ribmgr dissemination here*/ +        n_members = rib_children(path, &members); +        if (n_members == 1) { +                freepp(ssize_t, members, n_members); +                return 0; +        } + +        for (i = 0; i < n_members; ++i) { +                uint64_t addr; +                rib_path_append(path, members[i]); +                if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) { +                        log_err("Failed to read address from RIB."); +                        freepp(ssize_t, members, n_members); +                        return ipcpi.dt_addr; +                } + +                if (addr != ipcpi.dt_addr) { +                        freepp(ssize_t, members, n_members); +                        return addr; +                } + +                path[reset] = '\0'; +        } -        dir_path_reset(); -        rib_del(dir_path); +        freepp(ssize_t, members, n_members);          return 0;  } -int dir_reg(const uint8_t * hash) +int dir_init()  { -        char hashstr[ipcp_dir_hash_strlen() + 1]; -        int ret; - -        assert(hash); +        uint64_t addr; -        dir_path_reset(); - -        ipcp_hash_str(hashstr, hash); - -        ret = rib_add(dir_path, hashstr); -        if (ret == -ENOMEM) -                 return -ENOMEM; - -        rib_path_append(dir_path, hashstr); +        dht = dht_create(ipcpi.dt_addr); +        if (dht == NULL) +                return -ENOMEM; -        ret = rib_add(dir_path, ipcpi.name); -        if (ret == -EPERM) +        addr = find_peer_addr(); +        if (addr == ipcpi.dt_addr) { +                log_err("Failed to get peer address."); +                dht_destroy(dht);                  return -EPERM; -        if (ret == -ENOMEM) { -                if (rib_children(dir_path, NULL) == 0) -                        rib_del(dir_path); -                return -ENOMEM;          } -        return 0; -} - -int dir_unreg(const uint8_t * hash) -{ -        char hashstr[ipcp_dir_hash_strlen() + 1]; -        size_t len; - -        assert(hash); +        if (addr != 0) { +                size_t retr = 0; +                log_dbg("Enrolling directory with peer %" PRIu64 ".", addr); +                /* NOTE: we could try other members if dht_enroll times out. */ +                while (dht_enroll(dht, addr)) { +                        if (retr++ == ENROL_RETR) { +                                dht_destroy(dht); +                                return -EPERM; +                        } -        dir_path_reset(); +                        log_dbg("Directory enrollment failed, retrying..."); +                        sleep(ENROL_INTV); +                } -        ipcp_hash_str(hashstr, hash); +                log_dbg("Directory enrolled."); -        rib_path_append(dir_path, hashstr); - -        if (!rib_has(dir_path))                  return 0; +        } -        len = strlen(dir_path); - -        rib_path_append(dir_path, ipcpi.name); - -        rib_del(dir_path); +        log_dbg("Bootstrapping directory."); -        dir_path[len] = '\0'; +        /* TODO: get parameters for bootstrap from IRM tool. */ +        if (dht_bootstrap(dht, KAD_B, 86400)) { +                dht_destroy(dht); +                return -ENOMEM; +        } -        if (rib_children(dir_path, NULL) == 0) -                rib_del(dir_path); +        log_dbg("Directory bootstrapped.");          return 0;  } -int dir_query(const uint8_t * hash) +void dir_fini(void)  { -        char hashstr[ipcp_dir_hash_strlen() + 1]; -        size_t len; - -        dir_path_reset(); - -        ipcp_hash_str(hashstr, hash); - -        rib_path_append(dir_path, hashstr); - -        if (!rib_has(dir_path)) -                return -1; - -        /* FIXME: assert after local IPCP is deprecated */ -        len = strlen(dir_path); +        dht_destroy(dht); +} -        rib_path_append(dir_path, ipcpi.name); +int dir_reg(const uint8_t * hash) +{ +        return dht_reg(dht, hash); +} -        if (rib_has(dir_path)) { -                dir_path[len] = '\0'; -                if (rib_children(dir_path, NULL) == 1) -                        return -1; -        } +int dir_unreg(const uint8_t * hash) +{ +        return dht_unreg(dht, hash); +} -        return 0; +uint64_t dir_query(const uint8_t * hash) +{ +        return dht_query(dht, hash);  } diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h index 1b28a5c0..4091a3e8 100644 --- a/src/ipcpd/normal/dir.h +++ b/src/ipcpd/normal/dir.h @@ -23,14 +23,14 @@  #ifndef OUROBOROS_IPCPD_NORMAL_DIR_H  #define OUROBOROS_IPCPD_NORMAL_DIR_H -int dir_init(void); +int      dir_init(void); -int dir_fini(void); +void     dir_fini(void); -int dir_reg(const uint8_t * hash); +int      dir_reg(const uint8_t * hash); -int dir_unreg(const uint8_t * hash); +int      dir_unreg(const uint8_t * hash); -int dir_query(const uint8_t * hash); +uint64_t dir_query(const uint8_t * hash);  #endif /* OUROBOROS_IPCPD_NORMAL_DIR_H */ diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 1867c13b..5fcc5865 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -50,7 +50,7 @@  #include <assert.h>  struct ae_info { -        int    (*post_sdu)(void * ae, struct shm_du_buff * sdb); +        void   (* post_sdu)(void * ae, struct shm_du_buff * sdb);          void * ae;  }; @@ -131,11 +131,14 @@ static int sdu_handler(int                  fd,                          return 0;                  } -                if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) { +                if (dt.aes[dt_pci.fd].post_sdu == NULL) { +                        log_err("No registered AE on fd %d.", dt_pci.fd);                          ipcp_sdb_release(sdb); -                        return -1; +                        return -EPERM;                  } +                dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb); +                  return 0;          } @@ -295,7 +298,7 @@ void dt_stop(void)  }  int dt_reg_ae(void * ae, -              int (* func)(void * func, struct shm_du_buff *)) +              void (* func)(void * func, struct shm_du_buff *))  {          int res_fd; @@ -330,10 +333,11 @@ int dt_write_sdu(uint64_t             dst_addr,          struct dt_pci dt_pci;          assert(sdb); +        assert(dst_addr != ipcpi.dt_addr);          fd = pff_nhop(dt.pff[qc], dst_addr);          if (fd < 0) { -                log_err("Could not get nhop for addr %" PRIu64 ".", dst_addr); +                log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);                  return -1;          } diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index 0e1a8cc3..15ef51f0 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -38,7 +38,7 @@ int  dt_start(void);  void dt_stop(void);  int  dt_reg_ae(void * ae, -               int (* func)(void * ae, struct shm_du_buff * sdb)); +               void (* func)(void * ae, struct shm_du_buff * sdb));  int  dt_write_sdu(uint64_t             dst_addr,                    qoscube_t            qc, diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 40a680c3..704f4f16 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -30,6 +30,7 @@  #include <ouroboros/dev.h>  #include <ouroboros/ipcp-dev.h> +#include "dir.h"  #include "dt_pci.h"  #include "fa.h"  #include "sdu_sched.h" @@ -79,8 +80,8 @@ static void destroy_conn(int fd)          fa.r_addr[fd] = INVALID_ADDR;  } -static int fa_post_sdu(void *               ae, -                       struct shm_du_buff * sdb) +static void fa_post_sdu(void *               ae, +                        struct shm_du_buff * sdb)  {          struct timespec    ts  = {0, TIMEOUT * 1000};          struct timespec    abstime; @@ -98,9 +99,12 @@ static int fa_post_sdu(void *               ae,                                       shm_du_buff_tail(sdb) -                                       shm_du_buff_head(sdb),                                       shm_du_buff_head(sdb)); + +        ipcp_sdb_release(sdb); +          if (msg == NULL) {                  log_err("Failed to unpack flow alloc message."); -                return -1; +                return;          }          switch (msg->code) { @@ -113,7 +117,7 @@ static int fa_post_sdu(void *               ae,                          log_err("Bad flow request.");                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL); -                        return -1; +                        return;                  }                  while (ipcpi.alloc_id != -1 && @@ -128,7 +132,7 @@ static int fa_post_sdu(void *               ae,                          log_dbg("Won't allocate over non-operational IPCP.");                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL); -                        return -1; +                        return;                  }                  assert(ipcpi.alloc_id == -1); @@ -141,7 +145,7 @@ static int fa_post_sdu(void *               ae,                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL);                          log_err("Failed to get fd for flow."); -                        return -1; +                        return;                  }                  pthread_rwlock_wrlock(&fa.flows_lock); @@ -173,13 +177,10 @@ static int fa_post_sdu(void *               ae,          default:                  log_err("Got an unknown flow allocation message.");                  flow_alloc_msg__free_unpacked(msg, NULL); -                return -1; +                return;          }          flow_alloc_msg__free_unpacked(msg, NULL); -        ipcp_sdb_release(sdb); - -        return 0;  }  int fa_init(void) @@ -240,47 +241,10 @@ int fa_alloc(int             fd,               qoscube_t       qc)  {          flow_alloc_msg_t     msg = FLOW_ALLOC_MSG__INIT; -        char                 path[RIB_MAX_PATH_LEN + 1];          uint64_t             addr; -        ssize_t              ch; -        ssize_t              i; -        char **              children; -        char                 hashstr[ipcp_dir_hash_strlen() + 1]; -        char *               dst_ipcp = NULL;          struct shm_du_buff * sdb; -        ipcp_hash_str(hashstr, dst); - -        assert(strlen(hashstr) + strlen(DIR_PATH) + 1 -               < RIB_MAX_PATH_LEN); - -        strcpy(path, DIR_PATH); - -        rib_path_append(path, hashstr); - -        ch = rib_children(path, &children); -        if (ch <= 0) -                return -1; - -        for (i = 0; i < ch; ++i) -                if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) -                        dst_ipcp = children[i]; -                else -                        free(children[i]); - -        free(children); - -        if (dst_ipcp == NULL) -                return -1; - -        strcpy(path, MEMBERS_PATH); - -        rib_path_append(path, dst_ipcp); - -        free(dst_ipcp); - -        if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) -                return -1; +        addr = dir_query(dst);          msg.code         = FLOW_ALLOC_CODE__FLOW_REQ;          msg.has_hash     = true; diff --git a/src/ipcpd/normal/kademlia.proto b/src/ipcpd/normal/kademlia.proto new file mode 100644 index 00000000..0b7e8beb --- /dev/null +++ b/src/ipcpd/normal/kademlia.proto @@ -0,0 +1,46 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * KAD protocol + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +syntax = "proto2"; + +message kad_contact_msg { +        required bytes  id   = 1; +        required uint64 addr = 2; +}; + +message kad_msg { +        required uint32 code              =  1; +        required uint32 cookie            =  2; +        required uint64 s_addr            =  3; +        optional bytes  s_id              =  4; +        optional bytes  key               =  5; +        repeated uint64 addrs             =  6; +        repeated kad_contact_msg contacts =  7; +        // enrolment parameters +        optional uint32 alpha             =  8; +        optional uint32 b                 =  9; +        optional uint32 k                 = 10; +        optional uint32 t_expire          = 11; +        optional uint32 t_refresh         = 12; +        optional uint32 t_replicate       = 13; +};
\ No newline at end of file diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 8c28de78..f94c15de 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -116,11 +116,6 @@ static int boot_components(void)          log_dbg("Starting ribmgr."); -        if (dir_init()) { -                log_err("Failed to initialize directory."); -                goto fail_dir; -        } -          if (ribmgr_init()) {                  log_err("Failed to initialize RIB manager.");                  goto fail_ribmgr; @@ -148,6 +143,11 @@ static int boot_components(void)                  goto fail_fa_start;          } +        if (dir_init()) { +                log_err("Failed to initialize directory."); +                goto fail_dir; +        } +          if (enroll_start()) {                  log_err("Failed to start enroll.");                  goto fail_enroll_start; @@ -166,6 +166,8 @@ static int boot_components(void)          ipcp_set_state(IPCP_INIT);          enroll_stop();   fail_enroll_start: +        dir_fini(); + fail_dir:          fa_stop();   fail_fa_start:          dt_stop(); @@ -176,8 +178,6 @@ static int boot_components(void)   fail_dt:          ribmgr_fini();   fail_ribmgr: -        dir_fini(); - fail_dir:          addr_auth_fini();   fail_addr_auth:          free(ipcpi.dif_name); @@ -191,6 +191,8 @@ void shutdown_components(void)          enroll_stop(); +        dir_fini(); +          fa_stop();          dt_stop(); @@ -201,8 +203,6 @@ void shutdown_components(void)          ribmgr_fini(); -        dir_fini(); -          addr_auth_fini();          free(ipcpi.dif_name); @@ -227,10 +227,9 @@ static int normal_ipcp_enroll(const char *      dst,                  return -1;          } -        log_dbg("Enrolled with " HASH_FMT, HASH_VAL(dst)); +        log_dbg("Enrolled with %s.", dst);          info->dir_hash_algo = ipcpi.dir_hash_algo; -          strcpy(info->dif_name, ipcpi.dif_name);          return 0; @@ -347,12 +346,17 @@ static int normal_ipcp_bootstrap(const struct ipcp_config * conf)          return 0;  } +static int normal_ipcp_query(const uint8_t * dst) +{ +        return dir_query(dst) ? 0 : -1; +} +  static struct ipcp_ops normal_ops = {          .ipcp_bootstrap       = normal_ipcp_bootstrap,          .ipcp_enroll          = normal_ipcp_enroll,          .ipcp_reg             = dir_reg,          .ipcp_unreg           = dir_unreg, -        .ipcp_query           = dir_query, +        .ipcp_query           = normal_ipcp_query,          .ipcp_flow_alloc      = fa_alloc,          .ipcp_flow_alloc_resp = fa_alloc_resp,          .ipcp_flow_dealloc    = fa_dealloc diff --git a/src/ipcpd/normal/pol/flat.c b/src/ipcpd/normal/pol/flat.c index e709da7c..0907cf7a 100644 --- a/src/ipcpd/normal/pol/flat.c +++ b/src/ipcpd/normal/pol/flat.c @@ -56,7 +56,7 @@ static int addr_taken(char *  name,          char path[RIB_MAX_PATH_LEN + 1];          size_t reset; -        strcpy(path, "/" MEMBERS_NAME); +        strcpy(path, MEMBERS_PATH);          reset = strlen(path); @@ -102,7 +102,7 @@ uint64_t flat_address(void)          char ** members;          ssize_t n_members; -        strcpy(path, "/" MEMBERS_NAME); +        strcpy(path, MEMBERS_PATH);          if (!rib_has(path)) {                  log_err("Could not read members from RIB."); diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h index 31c79fbe..db1ff1bb 100644 --- a/src/ipcpd/normal/ribconfig.h +++ b/src/ipcpd/normal/ribconfig.h @@ -29,9 +29,7 @@  #define DLR          "/"  #define BOOT_NAME    "boot"  #define MEMBERS_NAME "members" -#define DIR_NAME     "directory"  #define ROUTING_NAME "fsdb" -#define DIR_PATH     DLR DIR_NAME  #define BOOT_PATH    DLR BOOT_NAME  #define MEMBERS_PATH DLR MEMBERS_NAME  #define ROUTING_PATH DLR ROUTING_NAME diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 266a628d..3beb917c 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -299,9 +299,8 @@ static void * sync_rib(void *o)                          rib_path_append(path, children[--ch]);                          free(children[ch]); -                        /* Only sync fsdb, members and directory */ +                        /* Only sync fsdb and members */                          if (strcmp(path, MEMBERS_PATH) == 0 -                            || strcmp(path, DIR_PATH) == 0                              || strcmp(path, ROUTING_PATH) == 0)                                  ribmgr_sync(path);                  } diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 63259430..a4b9e074 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -36,11 +36,19 @@  struct sdu_sched {          flow_set_t * set[QOS_CUBE_MAX]; -        fqueue_t *   fqs[QOS_CUBE_MAX];          next_sdu_t   callback; -        pthread_t    sdu_reader; +        pthread_t    sdu_readers[IPCP_SCHED_THREADS];  }; +static void cleanup_reader(void * o) +{ +        int         i; +        fqueue_t ** fqs = (fqueue_t **) o; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                fqueue_destroy(fqs[i]); +} +  static void * sdu_reader(void * o)  {          struct sdu_sched *   sched; @@ -49,14 +57,27 @@ static void * sdu_reader(void * o)          int                  fd;          int                  i = 0;          int                  ret; +        fqueue_t *           fqs[QOS_CUBE_MAX];          sched = (struct sdu_sched *) o; +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                fqs[i] = fqueue_create(); +                if (fqs[i] == NULL) { +                        int j; +                        for (j = 0; j < i; ++j) +                                fqueue_destroy(fqs[j]); +                        return (void *) -1; +                } +        } + +        pthread_cleanup_push(cleanup_reader, fqs); +          while (true) {                  /* FIXME: replace with scheduling policy call */                  i = (i + 1) % QOS_CUBE_MAX; -                ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); +                ret = flow_event_wait(sched->set[i], fqs[i], &timeout);                  if (ret == -ETIMEDOUT)                          continue; @@ -65,7 +86,7 @@ static void * sdu_reader(void * o)                          continue;                  } -                while ((fd = fqueue_next(sched->fqs[i])) >= 0) { +                while ((fd = fqueue_next(fqs[i])) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  log_warn("Failed to read SDU from fd %d.", fd);                                  continue; @@ -78,6 +99,8 @@ static void * sdu_reader(void * o)                  }          } +        pthread_cleanup_pop(true); +          return (void *) 0;  } @@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)          sdu_sched = malloc(sizeof(*sdu_sched));          if (sdu_sched == NULL) -                return NULL; +                goto fail_malloc;          sdu_sched->callback = callback; @@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)                  if (sdu_sched->set[i] == NULL) {                          for (j = 0; j < i; ++j)                                  flow_set_destroy(sdu_sched->set[j]); -                        goto fail_sdu_sched; +                        goto fail_flow_set;                  }          } -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                sdu_sched->fqs[i] = fqueue_create(); -                if (sdu_sched->fqs[i] == NULL) { -                        for (j = 0; j < i; ++j) -                                fqueue_destroy(sdu_sched->fqs[j]); +        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { +                if (pthread_create(&sdu_sched->sdu_readers[i], NULL, +                                   sdu_reader, sdu_sched)) { +                        int j; +                        for (j = 0; j < i; ++j) { +                                pthread_cancel(sdu_sched->sdu_readers[j]); +                                pthread_join(sdu_sched->sdu_readers[j], NULL); +                        }                          goto fail_flow_set;                  }          } -        pthread_create(&sdu_sched->sdu_reader, -                       NULL, -                       sdu_reader, -                       (void *) sdu_sched); -          return sdu_sched;   fail_flow_set: -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                flow_set_destroy(sdu_sched->set[i]); - fail_sdu_sched:           free(sdu_sched); + fail_malloc:           return NULL;  } @@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)          assert(sdu_sched); -        pthread_cancel(sdu_sched->sdu_reader); - -        pthread_join(sdu_sched->sdu_reader, NULL); +        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { +                pthread_cancel(sdu_sched->sdu_readers[i]); +                pthread_join(sdu_sched->sdu_readers[i], NULL); +        } -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fqueue_destroy(sdu_sched->fqs[i]); +        for (i = 0; i < QOS_CUBE_MAX; ++i)                  flow_set_destroy(sdu_sched->set[i]); -        }          free(sdu_sched);  } diff --git a/src/ipcpd/normal/tests/CMakeLists.txt b/src/ipcpd/normal/tests/CMakeLists.txt new file mode 100644 index 00000000..d975caf6 --- /dev/null +++ b/src/ipcpd/normal/tests/CMakeLists.txt @@ -0,0 +1,37 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR +  ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR +  ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c +  # Add new tests here +  dht_test.c +) + +set_source_files_properties(${KAD_PROTO_SRCS} PROPERTIES GENERATED TRUE) + +add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests} +  ${KAD_PROTO_SRCS}) +target_link_libraries(${PARENT_DIR}_test ouroboros) + +add_dependencies(check ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +remove(tests_to_run test_suite.c) + +foreach (test ${tests_to_run}) +  get_filename_component(test_name ${test} NAME_WE) +  add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) +endforeach (test) diff --git a/src/ipcpd/normal/tests/dht_test.c b/src/ipcpd/normal/tests/dht_test.c new file mode 100644 index 00000000..861ae10a --- /dev/null +++ b/src/ipcpd/normal/tests/dht_test.c @@ -0,0 +1,99 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Unit tests of the DHT AE + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define __DHT_TEST__ + +#include "dht.c" + +#include <pthread.h> +#include <time.h> +#include <stdlib.h> +#include <stdio.h> + +#define KEY_LEN  32 + +#define EXP      86400 +#define CONTACTS 1000 + +int dht_test(int     argc, +             char ** argv) +{ +        struct dht * dht; +        uint64_t     addr = 0x0D1F; +        uint8_t      key[KEY_LEN]; +        size_t       i; + +        (void) argc; +        (void) argv; + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to create dht.\n"); +                return -1; +        } + +        dht_destroy(dht); + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to re-create dht.\n"); +                return -1; +        } + +        if (dht_bootstrap(dht, KEY_LEN, EXP)) { +                printf("Failed to bootstrap dht.\n"); +                dht_destroy(dht); +                return -1; +        } + +        dht_destroy(dht); + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to re-create dht.\n"); +                return -1; +        } + +        if (dht_bootstrap(dht, KEY_LEN, EXP)) { +                printf("Failed to bootstrap dht.\n"); +                dht_destroy(dht); +                return -1; +        } + +        for (i = 0; i < CONTACTS; ++i) { +                uint64_t addr; +                random_buffer(&addr, sizeof(addr)); +                random_buffer(key, KEY_LEN); +                pthread_rwlock_wrlock(&dht->lock); +                if (dht_update_bucket(dht, key, addr)) { +                        pthread_rwlock_unlock(&dht->lock); +                        printf("Failed to update bucket.\n"); +                        dht_destroy(dht); +                        return -1; +                } +                pthread_rwlock_unlock(&dht->lock); +        } + +        dht_destroy(dht); + +        return 0; +} diff --git a/src/irmd/main.c b/src/irmd/main.c index 912234d6..96b0b729 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -36,6 +36,7 @@  #include <ouroboros/bitmap.h>  #include <ouroboros/qos.h>  #include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h>  #include <ouroboros/logs.h>  #include "utils.h" @@ -99,18 +100,9 @@ struct irm {          struct shm_rdrbuff * rdrb;         /* rdrbuff for SDUs           */          int                  sockfd;       /* UNIX socket                */ -        pthread_t *          threadpool;   /* pool of mainloop threads   */ - -        struct bmp *         thread_ids;   /* ids for mainloop threads   */ -        size_t               max_threads;  /* max threads set by tpm     */ -        size_t               threads;      /* available mainloop threads */ -        pthread_cond_t       threads_cond; /* signal thread entry/exit   */ -        pthread_mutex_t      threads_lock; /* mutex for threads/condvar  */ -          enum irm_state       state;        /* state of the irmd          */          pthread_rwlock_t     state_lock;   /* lock for the entire irmd   */ -        pthread_t            tpm;          /* threadpool manager         */          pthread_t            irm_sanitize; /* clean up irmd resources    */          pthread_t            shm_sanitize; /* keep track of rdrbuff use  */  } irmd; @@ -327,7 +319,7 @@ static pid_t create_ipcp(char *         name,                          break;          } -        list_add_tail(&tmp->next, &irmd.ipcps); +        list_add_tail(&tmp->next, p);          list_add(&api->next, &irmd.spawned_apis); @@ -490,7 +482,7 @@ static int enroll_ipcp(pid_t  api,          pthread_rwlock_unlock(&irmd.reg_lock);          if (ipcp_enroll(api, dst_name, &info) < 0) { -                log_err("Could not enroll IPCP."); +                log_err("Could not enroll IPCP %d.", api);                  return -1;          } @@ -1434,16 +1426,6 @@ static void irm_fini(void)          if (irmd_get_state() != IRMD_NULL)                  log_warn("Unsafe destroy."); -        pthread_mutex_lock(&irmd.threads_lock); - -        if (irmd.thread_ids != NULL) -                bmp_destroy(irmd.thread_ids); - -        pthread_mutex_unlock(&irmd.threads_lock); - -        if (irmd.threadpool != NULL) -                free(irmd.threadpool); -          pthread_rwlock_wrlock(&irmd.flows_lock);          if (irmd.port_ids != NULL) @@ -1517,8 +1499,8 @@ void irmd_sig_handler(int         sig,                  }                  log_info("IRMd shutting down..."); -                  irmd_set_state(IRMD_NULL); +                tpm_stop();                  break;          case SIGPIPE:                  log_dbg("Ignored SIGPIPE."); @@ -1700,55 +1682,11 @@ void * irm_sanitize(void * o)          }  } -static void thread_inc(void) -{ -        pthread_mutex_lock(&irmd.threads_lock); - -        ++irmd.threads; -        pthread_cond_signal(&irmd.threads_cond); - -        pthread_mutex_unlock(&irmd.threads_lock); -} - -static void thread_dec(void) -{ -        pthread_mutex_lock(&irmd.threads_lock); - -        --irmd.threads; -        pthread_cond_signal(&irmd.threads_cond); - -        pthread_mutex_unlock(&irmd.threads_lock); -} - -static bool thread_check(void) -{ -        int ret; - -        pthread_mutex_lock(&irmd.threads_lock); - -        ret = irmd.threads > irmd.max_threads; - -        pthread_mutex_unlock(&irmd.threads_lock); - -        return ret; -} - -static void thread_exit(ssize_t id) -{ -        pthread_mutex_lock(&irmd.threads_lock); -        bmp_release(irmd.thread_ids, id); - -        --irmd.threads; -        pthread_cond_signal(&irmd.threads_cond); - -        pthread_mutex_unlock(&irmd.threads_lock); -} -  void * mainloop(void * o)  {          uint8_t buf[IRM_MSG_BUF_SIZE]; -        ssize_t id = (ssize_t) o; +        (void) o;          while (true) {  #ifdef __FreeBSD__ @@ -1768,8 +1706,8 @@ void * mainloop(void * o)                  struct timeval    tv      = {(SOCKET_TIMEOUT / 1000),                                               (SOCKET_TIMEOUT % 1000) * 1000}; -                if (irmd_get_state() != IRMD_RUNNING || thread_check()) { -                        thread_exit(id); +                if (irmd_get_state() != IRMD_RUNNING || tpm_check()) { +                        tpm_exit();                          break;                  } @@ -1780,7 +1718,6 @@ void * mainloop(void * o)                  if (select(irmd.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0)                          continue;  #endif -                  cli_sockfd = accept(irmd.sockfd, 0, 0);                  if (cli_sockfd < 0)                          continue; @@ -1798,7 +1735,7 @@ void * mainloop(void * o)                  if (irmd_get_state() != IRMD_RUNNING) {                          close(cli_sockfd); -                        thread_exit(id); +                        tpm_exit();                          break;                  } @@ -1808,7 +1745,7 @@ void * mainloop(void * o)                          continue;                  } -                thread_dec(); +                tpm_dec();                  if (msg->has_timeo_sec) {                          assert(msg->has_timeo_nsec); @@ -1937,7 +1874,7 @@ void * mainloop(void * o)                  if (ret_msg.result == -EPIPE || !ret_msg.has_result) {                          close(cli_sockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  } @@ -1947,7 +1884,7 @@ void * mainloop(void * o)                          if (apis != NULL)                                  free(apis);                          close(cli_sockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  } @@ -1956,7 +1893,7 @@ void * mainloop(void * o)                          if (apis != NULL)                                  free(apis);                          close(cli_sockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  } @@ -1971,70 +1908,7 @@ void * mainloop(void * o)                  free(buffer.data);                  close(cli_sockfd); -                thread_inc(); -        } - -        return (void *) 0; -} - -void * threadpoolmgr(void * o) -{ -        pthread_attr_t  pattr; -        struct timespec dl; -        struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), -                              (IRMD_TPM_TIMEOUT % 1000) * MILLION}; -        (void) o; - -        if (pthread_attr_init(&pattr)) -                return (void *) -1; - -        pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); - -        while (true) { -                clock_gettime(PTHREAD_COND_CLOCK, &dl); -                ts_add(&dl, &to, &dl); - -                if (irmd_get_state() != IRMD_RUNNING) { -                        pthread_attr_destroy(&pattr); -                        log_dbg("Waiting for threads to exit."); -                        pthread_mutex_lock(&irmd.threads_lock); -                        while (irmd.threads > 0) -                                pthread_cond_wait(&irmd.threads_cond, -                                                  &irmd.threads_lock); -                        pthread_mutex_unlock(&irmd.threads_lock); -                        log_dbg("Threadpool manager done."); -                        break; -                } - -                pthread_mutex_lock(&irmd.threads_lock); - -                if (irmd.threads < IRMD_MIN_AV_THREADS) { -                        log_dbg("Increasing threadpool."); -                        irmd.max_threads = IRMD_MAX_AV_THREADS; - -                        while (irmd.threads < irmd.max_threads) { -                                ssize_t id = bmp_allocate(irmd.thread_ids); -                                if (!bmp_is_id_valid(irmd.thread_ids, id)) { -                                        log_warn("IRMd threadpool exhausted."); -                                        break; -                                } - -                                if (pthread_create(&irmd.threadpool[id], -                                                   &pattr, mainloop, -                                                   (void *) id)) -                                        log_warn("Failed to start new thread."); -                                else -                                        ++irmd.threads; -                        } -                } - -                if (pthread_cond_timedwait(&irmd.threads_cond, -                                           &irmd.threads_lock, -                                           &dl) == ETIMEDOUT) -                        if (irmd.threads > IRMD_MIN_AV_THREADS ) -                                --irmd.max_threads; - -                pthread_mutex_unlock(&irmd.threads_lock); +                tpm_inc();          }          return (void *) 0; @@ -2043,7 +1917,6 @@ void * threadpoolmgr(void * o)  static int irm_init(void)  {          struct stat st; -        pthread_condattr_t cattr;          struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),                                    (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -2066,24 +1939,6 @@ static int irm_init(void)                  goto fail_flows_lock;          } -        if (pthread_mutex_init(&irmd.threads_lock, NULL)) { -                log_err("Failed to initialize mutex."); -                goto fail_threads_lock; -        } - -        if (pthread_condattr_init(&cattr)) { -                log_err("Failed to initialize condattr."); -                goto fail_cattr; -        } - -#ifndef __APPLE__ -        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif -        if (pthread_cond_init(&irmd.threads_cond, &cattr)) { -                log_err("Failed to initialize cond."); -                goto fail_threads_cond; -        } -          list_head_init(&irmd.ipcps);          list_head_init(&irmd.api_table);          list_head_init(&irmd.apn_table); @@ -2097,18 +1952,6 @@ static int irm_init(void)                  goto fail_port_ids;          } -        irmd.thread_ids = bmp_create(IRMD_MAX_THREADS, 0); -        if (irmd.thread_ids == NULL) { -                log_err("Failed to thread thread_ids bitmap."); -                goto fail_thread_ids; -        } - -        irmd.threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS); -        if (irmd.threadpool == NULL) { -                log_err("Failed to malloc threadpool"); -                goto fail_thrpool; -        } -          if ((irmd.lf = lockfile_create()) == NULL) {                  if ((irmd.lf = lockfile_open()) == NULL) {                          log_err("Lockfile error."); @@ -2163,8 +2006,6 @@ static int irm_init(void)                  goto fail_rdrbuff;          } -        irmd.threads     = 0; -        irmd.max_threads = IRMD_MIN_AV_THREADS;          irmd.state       = IRMD_RUNNING;          log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2180,18 +2021,8 @@ fail_sock_path:  fail_stat:          lockfile_destroy(irmd.lf);  fail_lockfile: -        free(irmd.threadpool); -fail_thrpool: -        bmp_destroy(irmd.thread_ids); -fail_thread_ids:          bmp_destroy(irmd.port_ids);  fail_port_ids: -        pthread_cond_destroy(&irmd.threads_cond); -fail_threads_cond: -        pthread_condattr_destroy(&cattr); -fail_cattr: -        pthread_mutex_destroy(&irmd.threads_lock); -fail_threads_lock:          pthread_rwlock_destroy(&irmd.flows_lock);  fail_flows_lock:          pthread_rwlock_destroy(&irmd.reg_lock); @@ -2261,12 +2092,24 @@ int main(int     argc,                  exit(EXIT_FAILURE);          } -        pthread_create(&irmd.tpm, NULL, threadpoolmgr, NULL); +        if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) { +                log_fini(); +                exit(EXIT_FAILURE); +        } + +        if (tpm_start()) { +                tpm_fini(); +                log_fini(); +                exit(EXIT_FAILURE); +        }          pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL);          pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb); -        pthread_join(irmd.tpm, NULL); +        /* tpm_stop() called from sighandler */ + +        tpm_fini(); +          pthread_join(irmd.irm_sanitize, NULL);          pthread_join(irmd.shm_sanitize, NULL); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index e08869b8..fe4dd88c 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -11,7 +11,6 @@ protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS  protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS cdap.proto)  protobuf_generate_c(RO_PROTO_SRCS RO_PROTO_HDRS ro.proto)  protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto) -protobuf_generate_c(FRCT_ENROLL_SRCS FRCT_ENROLL_HDRS frct_enroll.proto)  if (NOT APPLE)    find_library(LIBRT_LIBRARIES rt) @@ -59,12 +58,13 @@ set(SOURCE_FILES    shm_rdrbuff.c    sockets.c    time_utils.c +  tpm.c    utils.c    )  add_library(ouroboros SHARED ${SOURCE_FILES} ${IRM_PROTO_SRCS}    ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS} -  ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS} ${FRCT_ENROLL_SRCS}) +  ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS})  include(AddCompileFlags)  if (CMAKE_BUILD_TYPE MATCHES Debug) @@ -77,9 +77,8 @@ else()    if (${LINUX_RND_HDR} STREQUAL "LINUX_RND_HDR-NOTFOUND")      find_package(OpenSSL)      if (NOT OPENSSL_FOUND) -      message(STATUS "No secure random generation, please install OpenSSL.") +      message(FATAL_ERROR "No secure random generation, please install libssl.")      else() -      message(STATUS "OpenSSL found")        include_directories($OPENSSL_INCLUDE_DIR})        add_compile_flags(ouroboros -DHAVE_OPENSSL)      endif() diff --git a/src/lib/dev.c b/src/lib/dev.c index 14971528..c8e43778 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -85,7 +85,6 @@ struct flow {  struct {          char *                ap_name; -        char *                daf_name;          pid_t                 api;          struct shm_rdrbuff *  rdrb; @@ -205,7 +204,7 @@ static int api_announce(char * ap_name)          return ret;  } -static void init_flow(int fd) +static void flow_clear(int fd)  {          assert(!(fd < 0)); @@ -216,9 +215,9 @@ static void init_flow(int fd)          ai.flows[fd].cube     = QOS_CUBE_BE;  } -static void reset_flow(int fd) +static void flow_fini(int fd)  { -        assert (!(fd < 0)); +        assert(!(fd < 0));          if (ai.flows[fd].port_id != -1)                  port_destroy(&ai.ports[ai.flows[fd].port_id]); @@ -232,7 +231,59 @@ static void reset_flow(int fd)          if (ai.flows[fd].set != NULL)                  shm_flow_set_close(ai.flows[fd].set); -        init_flow(fd); +        flow_clear(fd); +} + +static int flow_init(int       port_id, +                     pid_t     api, +                     qoscube_t qc) +{ +        int fd; + +        pthread_rwlock_wrlock(&ai.flows_lock); + +        fd = bmp_allocate(ai.fds); +        if (!bmp_is_id_valid(ai.fds, fd)) { +                pthread_rwlock_unlock(&ai.flows_lock); +                return -EBADF; +        } + +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); +        if (ai.flows[fd].rx_rb == NULL) { +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                return -ENOMEM; +        } + +        ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                flow_fini(fd); +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                return -ENOMEM; +        } + +        ai.flows[fd].set = shm_flow_set_open(api); +        if (ai.flows[fd].set == NULL) { +                flow_fini(fd); +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                return -ENOMEM; +        } + +        ai.flows[fd].port_id = port_id; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT; +        ai.flows[fd].api     = api; +        ai.flows[fd].cube    = qc; +        ai.flows[fd].spec    = qos_cube_to_spec(qc); + +        ai.ports[port_id].fd = fd; + +        port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + +        pthread_rwlock_unlock(&ai.flows_lock); + +        return fd;  }  int ouroboros_init(const char * ap_name) @@ -242,7 +293,6 @@ int ouroboros_init(const char * ap_name)          assert(ai.ap_name == NULL);          ai.api = getpid(); -        ai.daf_name = NULL;          ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS);          if (ai.fds == NULL) @@ -279,7 +329,7 @@ int ouroboros_init(const char * ap_name)          }          for (i = 0; i < AP_MAX_FLOWS; ++i) -                init_flow(i); +                flow_clear(i);          ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);          if (ai.ports == NULL) { @@ -333,9 +383,6 @@ void ouroboros_fini()          shm_flow_set_destroy(ai.fqset); -        if (ai.daf_name != NULL) -                free(ai.daf_name); -          if (ai.ap_name != NULL)                  free(ai.ap_name); @@ -346,7 +393,7 @@ void ouroboros_fini()                          ssize_t idx;                          while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)                                  shm_rdrbuff_remove(ai.rdrb, idx); -                        reset_flow(i); +                        flow_fini(i);                  }          } @@ -368,13 +415,9 @@ void ouroboros_fini()  int flow_accept(qosspec_t *             qs,                  const struct timespec * timeo)  { -        irm_msg_t           msg      = IRM_MSG__INIT; -        irm_msg_t *         recv_msg = NULL; -        int                 fd       = -1; -        frct_enroll_msg_t * frct_enroll; -        qosspec_t           spec; -        uint8_t             data[BUF_SIZE]; -        ssize_t             n; +        irm_msg_t   msg      = IRM_MSG__INIT; +        irm_msg_t * recv_msg = NULL; +        int         fd       = -1;          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_api = true; @@ -408,83 +451,15 @@ int flow_accept(qosspec_t *             qs,                  return -EIRMD;          } -        pthread_rwlock_wrlock(&ai.flows_lock); - -        fd = bmp_allocate(ai.fds); -        if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -EBADF; -        } - -        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].port_id = recv_msg->port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = recv_msg->api; -        ai.flows[fd].cube    = recv_msg->qoscube; - -        assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT); - -        spec = qos_cube_to_spec(recv_msg->qoscube); - -        ai.ports[recv_msg->port_id].fd    = fd; -        ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - -        pthread_rwlock_unlock(&ai.flows_lock); +        fd = flow_init(recv_msg->port_id, recv_msg->api, recv_msg->qoscube);          irm_msg__free_unpacked(recv_msg, NULL); -        n = flow_read(fd, data, BUF_SIZE); -        if (n < 0) { -                flow_dealloc(fd); -                return n; -        } - -        frct_enroll = frct_enroll_msg__unpack(NULL, n, data); -        if (frct_enroll == NULL) { -                flow_dealloc(fd); -                return -1; -        } - -        spec.resource_control = frct_enroll->resource_control; -        spec.reliable = frct_enroll->reliable; -        spec.error_check = frct_enroll->error_check; -        spec.ordered = frct_enroll->ordered; -        spec.partial = frct_enroll->partial; - -        frct_enroll_msg__free_unpacked(frct_enroll, NULL); - -        pthread_rwlock_wrlock(&ai.flows_lock); -        ai.flows[fd].spec = spec; -        pthread_rwlock_unlock(&ai.flows_lock); +        if (fd < 0) +                return fd;          if (qs != NULL) -                *qs = spec; +                *qs = ai.flows[fd].spec;          return fd;  } @@ -493,14 +468,10 @@ int flow_alloc(const char *            dst_name,                 qosspec_t *             qs,                 const struct timespec * timeo)  { -        irm_msg_t         msg         = IRM_MSG__INIT; -        frct_enroll_msg_t frct_enroll = FRCT_ENROLL_MSG__INIT; -        irm_msg_t *       recv_msg    = NULL; -        qoscube_t         qc          = QOS_CUBE_BE; -        int               fd; -        ssize_t           len; -        uint8_t *         data; -        int               ret; +        irm_msg_t   msg      = IRM_MSG__INIT; +        irm_msg_t * recv_msg = NULL; +        qoscube_t   qc       = QOS_CUBE_BE; +        int         fd;          msg.code        = IRM_MSG_CODE__IRM_FLOW_ALLOC;          msg.dst_name    = (char *) dst_name; @@ -508,15 +479,8 @@ int flow_alloc(const char *            dst_name,          msg.has_qoscube = true;          msg.api         = ai.api; -        if (qs != NULL) { -                frct_enroll.resource_control = qs->resource_control; -                frct_enroll.reliable = qs->reliable; -                frct_enroll.error_check = qs->error_check; -                frct_enroll.ordered = qs->ordered; -                frct_enroll.partial = qs->partial; - +        if (qs != NULL)                  qc = qos_spec_to_cube(*qs); -        }          msg.qoscube = qc; @@ -547,78 +511,10 @@ int flow_alloc(const char *            dst_name,                  return -EIRMD;          } -        pthread_rwlock_wrlock(&ai.flows_lock); - -        fd = bmp_allocate(ai.fds); -        if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -EBADF; -        } - -        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].port_id = recv_msg->port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = recv_msg->api; -        ai.flows[fd].cube    = recv_msg->qoscube; - -        assert(ai.ports[recv_msg->port_id].state == PORT_INIT); - -        ai.ports[recv_msg->port_id].fd    = fd; -        ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; +        fd = flow_init(recv_msg->port_id, recv_msg->api, qc);          irm_msg__free_unpacked(recv_msg, NULL); -        pthread_rwlock_unlock(&ai.flows_lock); - -        len = frct_enroll_msg__get_packed_size(&frct_enroll); -        if (len < 0) { -                flow_dealloc(fd); -                return -1; -        } - -        data = malloc(len); -        if (data == NULL) { -                flow_dealloc(fd); -                return -ENOMEM; -        } - -        frct_enroll_msg__pack(&frct_enroll, data); - -        ret = flow_write(fd, data, len); -        if (ret < 0) { -                flow_dealloc(fd); -                free(data); -                return ret; -        } - -        free(data); -          return fd;  } @@ -657,7 +553,7 @@ int flow_dealloc(int fd)          pthread_rwlock_wrlock(&ai.flows_lock); -        reset_flow(fd); +        flow_fini(fd);          bmp_release(ai.fds, fd);          pthread_rwlock_unlock(&ai.flows_lock); @@ -1073,53 +969,11 @@ int flow_event_wait(struct flow_set *       set,  /* ipcp-dev functions */ -int np1_flow_alloc(pid_t n_api, -                   int   port_id) +int np1_flow_alloc(pid_t     n_api, +                   int       port_id, +                   qoscube_t qc)  { -        int fd; - -        pthread_rwlock_wrlock(&ai.flows_lock); - -        fd = bmp_allocate(ai.fds); -        if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; -        } - -        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; -        } - -        ai.flows[fd].set = shm_flow_set_open(n_api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; -        } - -        ai.flows[fd].port_id = port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = n_api; - -        ai.ports[port_id].fd    = fd; -        ai.ports[port_id].state = PORT_ID_ASSIGNED; - -        pthread_rwlock_unlock(&ai.flows_lock); - -        return fd; +        return flow_init(port_id, n_api, qc);  }  int np1_flow_dealloc(int port_id) @@ -1182,11 +1036,10 @@ int ipcp_create_r(pid_t api,  int ipcp_flow_req_arr(pid_t           api,                        const uint8_t * dst,                        size_t          len, -                      qoscube_t       cube) +                      qoscube_t       qc)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int port_id = -1;          int fd = -1;          if (dst == NULL) @@ -1199,88 +1052,24 @@ int ipcp_flow_req_arr(pid_t           api,          msg.hash.len    = len;          msg.hash.data   = (uint8_t *) dst;          msg.has_qoscube = true; -        msg.qoscube     = cube; - -        pthread_rwlock_wrlock(&ai.flows_lock); - -        fd = bmp_allocate(ai.fds); -        if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; /* -ENOMOREFDS */ -        } - -        pthread_rwlock_unlock(&ai.flows_lock); +        msg.qoscube     = qc;          recv_msg = send_recv_irm_msg(&msg); -        pthread_rwlock_wrlock(&ai.flows_lock); - -        if (recv_msg == NULL) { -                ai.ports[fd].state = PORT_INIT; -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); +        if (recv_msg == NULL)                  return -EIRMD; -        }          if (!recv_msg->has_port_id || !recv_msg->has_api) { -                ai.ports[fd].state = PORT_INIT; -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          if (recv_msg->has_result && recv_msg->result) { -                ai.ports[fd].state = PORT_INIT; -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        port_id = recv_msg->port_id; -        if (port_id < 0) { -                ai.ports[fd].state = PORT_INIT; -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ai.flows[fd].port_id = port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; - -        ai.ports[port_id].fd = fd; -        port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); - -        pthread_rwlock_unlock(&ai.flows_lock); +        fd = flow_init(recv_msg->port_id, recv_msg->api, qc);          irm_msg__free_unpacked(recv_msg, NULL); diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 67abbb5b..7660b1dd 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -117,7 +117,7 @@ struct shm_flow_set * shm_flow_set_create()                  (set->fqueues + AP_MAX_FQUEUES * (SHM_BUFFER_SIZE));          pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);  #endif          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -336,7 +336,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,          assert(idx < AP_MAX_FQUEUES);          assert(fqueue); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(set->lock);  #else          if (pthread_mutex_lock(set->lock) == EOWNERDEAD) @@ -358,7 +358,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,                  else                          ret = -pthread_cond_wait(set->conds + idx,                                                   set->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX                  if (ret == -EOWNERDEAD)                          pthread_mutex_consistent(set->lock);  #endif diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 1e072b21..757f65c8 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -127,7 +127,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          rb->del      = rb->add + 1;          pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);  #endif          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -299,7 +299,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                  ts_add(&abstime, timeout, &abstime);          } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -315,7 +315,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                                                        &abstime);                  else                          idx = -pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX                  if (idx == -EOWNERDEAD)                          pthread_mutex_consistent(rb->lock);  #endif @@ -356,7 +356,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)          if (shm_rbuff_empty(rb))                  return; -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -367,7 +367,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)                               (void *) rb->lock);          while (!shm_rbuff_empty(rb)) -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX                  pthread_cond_wait(rb->del, rb->lock);  #else                  if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index d3c1e143..1b9f07d1 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -124,7 +124,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          rb->del      = rb->add + 1;          pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);  #endif          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -231,7 +231,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)          assert(rb);          assert(idx < SHM_BUFFER_SIZE); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -264,7 +264,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -297,7 +297,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                  ts_add(&abstime, timeout, &abstime);          } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -313,7 +313,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                                                        &abstime);                  else                          idx = -pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX                  if (idx == -EOWNERDEAD)                          pthread_mutex_consistent(rb->lock);  #endif @@ -334,7 +334,7 @@ void shm_rbuff_block(struct shm_rbuff * rb)  {          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -349,7 +349,7 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)  {          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -364,7 +364,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)  {          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -376,7 +376,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)                               (void *) rb->lock);          while (!shm_rbuff_empty(rb)) -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX                  pthread_cond_wait(rb->del, rb->lock);  #else                  if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) @@ -391,7 +391,7 @@ size_t shm_rbuff_queued(struct shm_rbuff * rb)          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 1f999f93..d454fef8 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -192,7 +192,7 @@ struct shm_rdrbuff * shm_rdrbuff_create()          pthread_mutexattr_init(&mattr);          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);  #endif          pthread_mutex_init(rdrb->lock, &mattr); @@ -274,7 +274,7 @@ int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,                  ts_add(&abstime, timeo, &abstime);          } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -282,9 +282,9 @@ int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,  #endif          while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) { -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX                  if (pthread_cond_timedwait(rdrb->full, -                                           rdrb->lock +                                           rdrb->lock,                                             &abstime) == ETIMEDOUT) {                          pthread_mutex_unlock(rdrb->lock);                          return -ETIMEDOUT; @@ -358,7 +358,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,          if (sz > SHM_RDRB_BLOCK_SIZE)                  return -EMSGSIZE;  #endif -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -437,7 +437,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,          if (sz > SHM_RDRB_BLOCK_SIZE)                  return -EMSGSIZE;  #endif -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -535,7 +535,7 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,          assert(rdrb);          assert(idx < (SHM_BUFFER_SIZE)); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) diff --git a/src/lib/tests/time_utils_test.c b/src/lib/tests/time_utils_test.c index 6d1ae32f..86636c15 100644 --- a/src/lib/tests/time_utils_test.c +++ b/src/lib/tests/time_utils_test.c @@ -28,12 +28,12 @@  static void ts_print(struct timespec * s)  { -        printf("timespec is %zd:%zd.\n", s->tv_sec, s->tv_nsec); +        printf("timespec is %zd:%ld.\n", (ssize_t) s->tv_sec, s->tv_nsec);  }  static void tv_print(struct timeval * v)  { -        printf("timeval is %zd:%zd.\n", v->tv_sec, v->tv_usec); +        printf("timeval is %zd:%ld.\n", (ssize_t) v->tv_sec, v->tv_usec);  }  static void ts_init(struct timespec * s, diff --git a/src/lib/tpm.c b/src/lib/tpm.c new file mode 100644 index 00000000..8298eeb5 --- /dev/null +++ b/src/lib/tpm.c @@ -0,0 +1,266 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Threadpool management + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#include <ouroboros/config.h> +#include <ouroboros/errno.h> +#include <ouroboros/list.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h> + +#include <pthread.h> +#include <stdlib.h> + +#define TPM_TIMEOUT 1000 + +struct pthr_el { +        struct list_head next; + +        bool             join; + +        pthread_t        thr; +}; + +enum tpm_state { +        TPM_NULL = 0, +        TPM_INIT, +        TPM_RUNNING +}; + +struct { +        size_t           min; +        size_t           inc; +        size_t           max; +        size_t           cur; + +        void * (* func)(void *); + +        struct list_head pool; + +        enum tpm_state   state; + +        pthread_cond_t   cond; +        pthread_mutex_t  lock; + +        pthread_t        mgr; +} tpm; + +static void tpm_join(void) +{ +        struct list_head * p; +        struct list_head * h; + +        list_for_each_safe(p, h, &tpm.pool) { +                struct pthr_el * e = list_entry(p, struct pthr_el, next); +                if (tpm.state != TPM_RUNNING) +                        while (!e->join) +                                pthread_cond_wait(&tpm.cond, &tpm.lock); + +                if (e->join) { +                        pthread_join(e->thr, NULL); +                        list_del(&e->next); +                        free(e); +                } +        } +} + +static void * tpmgr(void * o) +{ +        struct timespec dl; +        struct timespec to = {(TPM_TIMEOUT / 1000), +                              (TPM_TIMEOUT % 1000) * MILLION}; +        (void) o; + +        while (true) { +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_mutex_lock(&tpm.lock); + +                tpm_join(); + +                if (tpm.state != TPM_RUNNING) { +                        tpm.max = 0; +                        tpm_join(); +                        pthread_mutex_unlock(&tpm.lock); +                        break; +                } + +                if (tpm.cur < tpm.min) { +                        tpm.max = tpm.inc; + +                        while (tpm.cur < tpm.max) { +                                struct pthr_el * e = malloc(sizeof(*e)); +                                if (e == NULL) +                                        break; + +                                e->join = false; + +                                if (pthread_create(&e->thr, NULL, +                                                   tpm.func, NULL)) { +                                        free(e); +                                } else { +                                        list_add(&e->next, &tpm.pool); +                                        ++tpm.cur; +                                } +                        } +                } + +                if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl) +                    == ETIMEDOUT) +                        if (tpm.cur > tpm.min ) +                                --tpm.max; + +                pthread_mutex_unlock(&tpm.lock); +        } + +        return (void *) 0; +} + +int tpm_init(size_t min, +             size_t inc, +             void * (* func)(void *)) +{ +        pthread_condattr_t cattr; + +        if (pthread_mutex_init(&tpm.lock, NULL)) +                goto fail_lock; + +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&tpm.cond, &cattr)) +                goto fail_cond; + +        list_head_init(&tpm.pool); + +        pthread_condattr_destroy(&cattr); + +        tpm.state = TPM_INIT; +        tpm.func  = func; +        tpm.min   = min; +        tpm.inc   = inc; +        tpm.max   = 0; +        tpm.cur   = 0; + +        return 0; + + fail_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        pthread_mutex_destroy(&tpm.lock); + fail_lock: +        return -1; +} + +int tpm_start(void) +{ +        pthread_mutex_lock(&tpm.lock); + +        if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) { +                pthread_mutex_unlock(&tpm.lock); +                return -1; +        } + +        tpm.state = TPM_RUNNING; + +        pthread_mutex_unlock(&tpm.lock); + +        return 0; +} + +void tpm_stop(void) +{ +        pthread_mutex_lock(&tpm.lock); + +        tpm.state = TPM_NULL; + +        pthread_mutex_unlock(&tpm.lock); +} + +void tpm_fini(void) +{ +        pthread_join(tpm.mgr, NULL); + +        pthread_mutex_destroy(&tpm.lock); +        pthread_cond_destroy(&tpm.cond); +} + +bool tpm_check(void) +{ +        bool ret; + +        pthread_mutex_lock(&tpm.lock); + +        ret = tpm.cur > tpm.max; + +        pthread_mutex_unlock(&tpm.lock); + +        return ret; +} + +void tpm_inc(void) +{ +        pthread_mutex_lock(&tpm.lock); + +        ++tpm.cur; + +        pthread_mutex_unlock(&tpm.lock); +} + +void tpm_dec(void) +{ +        pthread_mutex_lock(&tpm.lock); + +        --tpm.cur; + +        pthread_cond_signal(&tpm.cond); + +        pthread_mutex_unlock(&tpm.lock); +} + +void tpm_exit(void) +{ +        struct list_head * p; +        pthread_t          id; + +        id = pthread_self(); + +        pthread_mutex_lock(&tpm.lock); + +        --tpm.cur; + +        list_for_each(p, &tpm.pool) { +                struct pthr_el * e = list_entry(p, struct pthr_el, next); +                if (e->thr == id) { +                        e->join = true; +                        break; +                } +        } + +        pthread_cond_signal(&tpm.cond); + +        pthread_mutex_unlock(&tpm.lock); +} | 
