diff options
Diffstat (limited to 'src/ipcpd')
26 files changed, 5289 insertions, 2220 deletions
diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c index 0f2bf26c..ebdb182c 100644 --- a/src/ipcpd/broadcast/main.c +++ b/src/ipcpd/broadcast/main.c @@ -155,7 +155,7 @@ static int broadcast_ipcp_enroll(const char * dst, return -1; } -static int broadcast_ipcp_bootstrap(const struct ipcp_config * conf) +static int broadcast_ipcp_bootstrap(struct ipcp_config * conf) { assert(conf); assert(conf->type == THIS_TYPE); @@ -207,7 +207,7 @@ static int broadcast_ipcp_join(int fd, { struct conn conn; time_t mpl = IPCP_BROADCAST_MPL; - buffer_t data = {0, NULL}; + buffer_t data = BUF_INIT; (void) qs; diff --git a/src/ipcpd/common/connmgr.c b/src/ipcpd/common/connmgr.c index aa9f043e..eed6238e 100644 --- a/src/ipcpd/common/connmgr.c +++ b/src/ipcpd/common/connmgr.c @@ -489,9 +489,6 @@ int connmgr_alloc(enum comp_id id, switch (id) { case COMPID_DT: notifier_event(NOTIFY_DT_CONN_ADD, conn); -#if defined(BUILD_IPCP_UNICAST) && defined(IPCP_CONN_WAIT_DIR) - dir_wait_running(); -#endif break; case COMPID_MGMT: notifier_event(NOTIFY_MGMT_CONN_ADD, conn); diff --git a/src/ipcpd/common/enroll.c b/src/ipcpd/common/enroll.c index 3fc999af..4b437b27 100644 --- a/src/ipcpd/common/enroll.c +++ b/src/ipcpd/common/enroll.c @@ -43,6 +43,10 @@ #include <string.h> #include <pthread.h> +#ifdef __APPLE__ +#define llabs labs +#endif + #define ENROLL_COMP "Enrollment" #define ENROLL_PROTO "OEP" /* Ouroboros enrollment protocol */ #define ENROLL_WARN_TIME_OFFSET 20 @@ -60,6 +64,13 @@ struct { pthread_t listener; } enroll; +#ifdef DEBUG_PROTO_OEP + + +#endif + + + static void * enroll_handle(void * o) { struct enroll_req req; @@ -107,8 +118,6 @@ static void * enroll_handle(void * o) log_info_id(req.id, "Handling incoming enrollment."); - /* TODO: authentication, timezone handling (UTC). */ - ack.result = -100; clock_gettime(CLOCK_REALTIME, &resp.t); @@ -242,7 +251,7 @@ int enroll_boot(struct conn * conn, rtt.tv_sec = resp.t.tv_sec; rtt.tv_nsec = resp.t.tv_nsec; - if (labs(ts_diff_ms(&t0, &rtt)) - delta_t > ENROLL_WARN_TIME_OFFSET) + if (llabs(ts_diff_ms(&t0, &rtt)) - delta_t > ENROLL_WARN_TIME_OFFSET) log_warn_id(id, "Clock offset above threshold."); return 0; diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index fe4f5fd2..b3a118ac 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -41,8 +41,6 @@ #define IPCP_LINUX_SLACK_NS @IPCP_LINUX_TIMERSLACK_NS@ -#cmakedefine IPCP_DEBUG_LOCAL - /* unicast IPCP */ #define QOS_PRIO_BE @IPCP_QOS_CUBE_BE_PRIO@ #define QOS_PRIO_VIDEO @IPCP_QOS_CUBE_VIDEO_PRIO@ @@ -56,6 +54,12 @@ #cmakedefine IPCP_CONN_WAIT_DIR #cmakedefine DISABLE_CORE_LOCK #cmakedefine IPCP_FLOW_STATS +#cmakedefine IPCP_DEBUG_LOCAL +#ifdef CONFIG_OUROBOROS_DEBUG +#cmakedefine DEBUG_PROTO_DHT +#cmakedefine DEBUG_PROTO_OEP +#cmakedefine DEBUG_PROTO_LS +#endif /* udp */ #cmakedefine HAVE_DDNS diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 850231cd..1028ee03 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1487,7 +1487,7 @@ static int eth_init_raw_socket(struct ifreq * ifr) } #endif -static int eth_ipcp_bootstrap(const struct ipcp_config * conf) +static int eth_ipcp_bootstrap(struct ipcp_config * conf) { struct ifreq ifr; int i; diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index bc16e2f0..ab38f5d0 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -68,6 +68,34 @@ #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif +static char * ipcp_type_str[] = { + "local", + "unicast", + "broadcast", + "eth-llc", + "eth-dix", + "udp" +}; + +static char * dir_hash_str[] = { + "SHA3-224", + "SHA3-256", + "SHA3-384", + "SHA3-512", + "CRC32", + "MD5" +}; + +static char * ipcp_state_str[] = { + "null", + "init", + "boot", + "bootstrapped", + "enrolled", + "operational", + "shutdown" +}; + struct { pid_t irmd_pid; char * name; @@ -102,13 +130,6 @@ struct { pthread_t acceptor; } ipcpd; -char * info[LAYER_NAME_SIZE + 1] = { - "_state", - "_type", - "_layer", - NULL -}; - struct cmd { struct list_head next; @@ -127,6 +148,11 @@ const char * ipcp_get_name(void) return ipcpd.name; } +void ipcp_set_dir_hash_algo(enum hash_algo algo) +{ + ipcpd.dir_hash_algo = algo; +} + size_t ipcp_dir_hash_len(void) { return hash_len(ipcpd.dir_hash_algo); @@ -158,6 +184,13 @@ void ipcp_hash_str(char * buf, buf[2 * i] = '\0'; } +static const char * info[] = { + "_state", + "_type", + "_layer", + NULL +}; + static int ipcp_rib_read(const char * path, char * buf, size_t len) @@ -223,7 +256,7 @@ static int ipcp_rib_readdir(char *** buf) *buf = malloc(sizeof(**buf) * i); if (*buf == NULL) - goto fail; + goto fail_entries; i = 0; @@ -236,12 +269,12 @@ static int ipcp_rib_readdir(char *** buf) return i; fail_dup: - while (i > 0) - free((*buf)[--i]); - fail: + while (i-- > 0) + free((*buf)[i]); + fail_entries: free(*buf); - return -1; + return -ENOMEM; } static int ipcp_rib_getattr(const char * path, @@ -402,20 +435,24 @@ static void free_msg(void * o) static void do_bootstrap(ipcp_config_msg_t * conf_msg, ipcp_msg_t * ret_msg) { - struct ipcp_config conf; + struct ipcp_config conf; + struct layer_info * info; log_info("Bootstrapping..."); if (ipcpd.ops->ipcp_bootstrap == NULL) { - log_err("Bootstrap unsupported."); + log_err("Failed to Bootstrap: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_INIT) { - log_err("IPCP in wrong state."); + + log_err("Failed to bootstrap: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_INIT]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } conf = ipcp_config_msg_to_s(conf_msg); @@ -431,15 +468,24 @@ static void do_bootstrap(ipcp_config_msg_t * conf_msg, } ret_msg->result = ipcpd.ops->ipcp_bootstrap(&conf); - if (ret_msg->result == 0) { - enum pol_dir_hash algo = conf.layer_info.dir_hash_algo; - strcpy(ipcpd.layer_name, conf.layer_info.name); - ipcpd.dir_hash_algo = (enum hash_algo) algo; - ret_msg->layer_info = layer_info_s_to_msg(&conf.layer_info); - ipcp_set_state(IPCP_OPERATIONAL); + if (ret_msg->result < 0) { + log_err("Failed to bootstrap IPCP."); + return; } - finish: - log_info("Finished bootstrapping: %d.", ret_msg->result); + + info = &conf.layer_info; + + strcpy(ipcpd.layer_name, info->name); + ipcpd.dir_hash_algo = (enum hash_algo) info->dir_hash_algo; + ret_msg->layer_info = layer_info_s_to_msg(info); + + log_info("Finished bootstrapping in %s.", info->name); + log_info(" type: %s", ipcp_type_str[ipcpd.type]); + log_info(" hash: %s [%zd bytes]", + dir_hash_str[ipcpd.dir_hash_algo], + ipcp_dir_hash_len()); + + ipcp_set_state(IPCP_OPERATIONAL); } static void do_enroll(const char * dst, @@ -450,26 +496,35 @@ static void do_enroll(const char * dst, log_info("Enrolling with %s...", dst); if (ipcpd.ops->ipcp_enroll == NULL) { - log_err("Enroll unsupported."); + log_err("Failed to enroll: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_INIT) { - log_err("IPCP in wrong state."); + log_err("Failed to enroll: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_INIT]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_enroll(dst, &info); - if (ret_msg->result == 0) { - strcpy(ipcpd.layer_name, info.name); - ipcpd.dir_hash_algo = (enum hash_algo) info.dir_hash_algo; - ret_msg->layer_info = layer_info_s_to_msg(&info); - ipcp_set_state(IPCP_OPERATIONAL); + if (ret_msg->result < 0) { + log_err("Failed to bootstrap IPCP."); + return; } - finish: - log_info("Finished enrolling with %s: %d.", dst, ret_msg->result); + + strcpy(ipcpd.layer_name, info.name); + ipcpd.dir_hash_algo = (enum hash_algo) info.dir_hash_algo; + ret_msg->layer_info = layer_info_s_to_msg(&info); + ipcp_set_state(IPCP_OPERATIONAL); + + log_info("Finished enrolling with %s in layer %s.", dst, info.name); + log_info(" type: %s", ipcp_type_str[ipcpd.type]); + log_info(" hash: %s [%zd bytes]", + dir_hash_str[ipcpd.dir_hash_algo], + ipcp_dir_hash_len()); } static void do_connect(const char * dst, @@ -480,14 +535,14 @@ static void do_connect(const char * dst, log_info("Connecting %s to %s...", comp, dst); if (ipcpd.ops->ipcp_connect == NULL) { - log_err("Connect unsupported."); + log_err("Failed to connect: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_connect(dst, comp, qs); - finish: - log_info("Finished connecting: %d.", ret_msg->result); + + log_info("Finished connecting."); } static void do_disconnect(const char * dst, @@ -497,16 +552,14 @@ static void do_disconnect(const char * dst, log_info("Disconnecting %s from %s...", comp, dst); if (ipcpd.ops->ipcp_disconnect == NULL) { - log_err("Disconnect unsupported."); + log_err("Failed to disconnect: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_disconnect(dst, comp); - finish: - log_info("Finished disconnecting %s from %s: %d.", - comp, dst, ret_msg->result); + log_info("Finished disconnecting %s from %s.", comp, dst); } static void do_reg(const uint8_t * hash, @@ -516,15 +569,14 @@ static void do_reg(const uint8_t * hash, log_info("Registering " HASH_FMT32 "...", HASH_VAL32(hash)); if (ipcpd.ops->ipcp_reg == NULL) { - log_err("Registration unsupported."); + log_err("Failed to register: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_reg(hash); - finish: - log_info("Finished registering " HASH_FMT32 " : %d.", - HASH_VAL32(hash), ret_msg->result); + + log_info("Finished registering " HASH_FMT32 ".", HASH_VAL32(hash)); } static void do_unreg(const uint8_t * hash, @@ -533,15 +585,14 @@ static void do_unreg(const uint8_t * hash, log_info("Unregistering " HASH_FMT32 "...", HASH_VAL32(hash)); if (ipcpd.ops->ipcp_unreg == NULL) { - log_err("Unregistration unsupported."); + log_err("Failed to unregister: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_unreg(hash); - finish: - log_info("Finished unregistering " HASH_FMT32 ": %d.", - HASH_VAL32(hash), ret_msg->result); + + log_info("Finished unregistering " HASH_FMT32 ".", HASH_VAL32(hash)); } static void do_query(const uint8_t * hash, @@ -550,13 +601,15 @@ static void do_query(const uint8_t * hash, /* TODO: Log this operation when IRMd has internal caches. */ if (ipcpd.ops->ipcp_query == NULL) { - log_err("Directory query unsupported."); + log_err("Failed to query: operation unsupported."); ret_msg->result = -ENOTSUP; return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_dbg("Failed to query: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; return; } @@ -577,15 +630,17 @@ static void do_flow_alloc(pid_t pid, flow_id, pid, HASH_VAL32(dst)); if (ipcpd.ops->ipcp_flow_alloc == NULL) { - log_err("Flow allocation unsupported."); + log_err("Flow allocation failed: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_err("Failed to enroll: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } fd = np1_flow_alloc(pid, flow_id); @@ -593,13 +648,13 @@ static void do_flow_alloc(pid_t pid, log_err("Failed allocating n + 1 fd on flow_id %d: %d", flow_id, fd); ret_msg->result = -EFLOWDOWN; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_flow_alloc(fd, dst, qs, data); - finish: - log_info("Finished allocating flow %d to " HASH_FMT32 ": %d.", - flow_id, HASH_VAL32(dst), ret_msg->result); + + log_info("Finished allocating flow %d to " HASH_FMT32 ".", + flow_id, HASH_VAL32(dst)); } @@ -614,26 +669,28 @@ static void do_flow_join(pid_t pid, log_info("Joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); if (ipcpd.ops->ipcp_flow_join == NULL) { - log_err("Broadcast unsupported."); + log_err("Failed to join: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_err("Failed to join: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } fd = np1_flow_alloc(pid, flow_id); if (fd < 0) { log_err("Failed allocating n + 1 fd on flow_id %d.", flow_id); ret_msg->result = -1; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_flow_join(fd, dst, qs); - finish: + log_info("Finished joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); } @@ -647,15 +704,20 @@ static void do_flow_alloc_resp(int resp, log_info("Responding %d to alloc on flow_id %d.", resp, flow_id); if (ipcpd.ops->ipcp_flow_alloc_resp == NULL) { - log_err("Flow_alloc_resp unsupported."); + log_err("Failed to respond on flow %d: operation unsupported.", + flow_id); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_err("Failed to respond to flow %d:" + "IPCP in state <%s>, need <%s>.", + flow_id, + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } if (resp == 0) { @@ -663,13 +725,13 @@ static void do_flow_alloc_resp(int resp, if (fd < 0) { log_warn("Flow_id %d is not known.", flow_id); ret_msg->result = -1; - goto finish; + return; } } ret_msg->result = ipcpd.ops->ipcp_flow_alloc_resp(fd, resp, data); - finish: - log_info("Finished responding to allocation request: %d", + + log_info("Finished responding %d to allocation request.", ret_msg->result); } @@ -682,28 +744,29 @@ static void do_flow_dealloc(int flow_id, log_info("Deallocating flow %d.", flow_id); if (ipcpd.ops->ipcp_flow_dealloc == NULL) { - log_err("Flow deallocation unsupported."); + log_err("Failed to dealloc: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_err("Failed to enroll: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } fd = np1_flow_dealloc(flow_id, timeo_sec); if (fd < 0) { log_warn("Could not deallocate flow_id %d.", flow_id); ret_msg->result = -1; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_flow_dealloc(fd); - finish: - log_info("Finished deallocating flow %d: %d.", - flow_id, ret_msg->result); + + log_info("Finished deallocating flow %d.", flow_id); } static void * mainloop(void * o) @@ -888,8 +951,8 @@ int ipcp_init(int argc, log_init(log); - ipcpd.state = IPCP_NULL; - ipcpd.type = type; + ipcpd.state = IPCP_NULL; + ipcpd.type = type; #if defined (__linux__) prctl(PR_SET_TIMERSLACK, IPCP_LINUX_SLACK_NS, 0, 0, 0); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 3ec6401b..2c41f5b9 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -37,7 +37,7 @@ #define ipcp_dir_hash_strlen() (ipcp_dir_hash_len() * 2) struct ipcp_ops { - int (* ipcp_bootstrap)(const struct ipcp_config * conf); + int (* ipcp_bootstrap)(struct ipcp_config * conf); int (* ipcp_enroll)(const char * dst, struct layer_info * info); @@ -88,6 +88,9 @@ enum ipcp_type ipcp_get_type(void); const char * ipcp_get_name(void); +/* TODO: Only specify hash algorithm in directory policy */ +void ipcp_set_dir_hash_algo(enum hash_algo algo); + void ipcp_set_state(enum ipcp_state state); enum ipcp_state ipcp_get_state(void); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 054602e5..ffa6dc5a 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -137,7 +137,7 @@ static void * local_ipcp_packet_loop(void * o) return (void *) 0; } -static int local_ipcp_bootstrap(const struct ipcp_config * conf) +static int local_ipcp_bootstrap(struct ipcp_config * conf) { assert(conf); diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 3cfcf2cc..5f770a61 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -584,7 +584,7 @@ static const char * inet4_ntop(const void * addr, return inet_ntop(AF_INET, addr, buf, INET_ADDRSTRLEN); } -static int udp_ipcp_bootstrap(const struct ipcp_config * conf) +static int udp_ipcp_bootstrap(struct ipcp_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; @@ -664,14 +664,14 @@ static int udp_ipcp_bootstrap(const struct ipcp_config * conf) return 0; fail_packet_writer: - while (i > 0) { - pthread_cancel(udp_data.packet_writer[--i]); + while (i-- > 0) { + pthread_cancel(udp_data.packet_writer[i]); pthread_join(udp_data.packet_writer[i], NULL); } i = IPCP_UDP_RD_THR; fail_packet_reader: - while (i > 0) { - pthread_cancel(udp_data.packet_reader[--i]); + while (i-- > 0) { + pthread_cancel(udp_data.packet_reader[i]); pthread_join(udp_data.packet_reader[i], NULL); } pthread_cancel(udp_data.mgmt_handler); diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt index b0dd3acc..7d2a765b 100644 --- a/src/ipcpd/unicast/CMakeLists.txt +++ b/src/ipcpd/unicast/CMakeLists.txt @@ -31,7 +31,7 @@ if (HAVE_FUSE) endif () endif () -set(SOURCE_FILES +set(IPCP_UNICAST_SOURCE_FILES # Add source files here addr-auth.c ca.c @@ -56,7 +56,7 @@ set(SOURCE_FILES routing/graph.c ) -add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES} ${COMMON_SOURCES} +add_executable(ipcpd-unicast ${IPCP_UNICAST_SOURCE_FILES} ${IPCP_SOURCES} ${COMMON_SOURCES} ${DHT_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) target_link_libraries(ipcpd-unicast LINK_PUBLIC ouroboros-dev) diff --git a/src/ipcpd/unicast/addr-auth.h b/src/ipcpd/unicast/addr-auth.h index e119dff3..b453cba5 100644 --- a/src/ipcpd/unicast/addr-auth.h +++ b/src/ipcpd/unicast/addr-auth.h @@ -27,6 +27,14 @@ #include <stdint.h> +#define ADDR_FMT32 "%02x:%02x:%02x:%02x" +#define ADDR_VAL32(a) \ + ((uint8_t *) a)[0], ((uint8_t *) a)[1], \ + ((uint8_t *) a)[2], ((uint8_t *) a)[3] + +#define ADDR_FMT64 ADDR_FMT32 ":" ADDR_FMT32 +#define ADDR_VAL64(a) ADDR_VAL32(a), ADDR_VAL32(a + 4) + int addr_auth_init(enum pol_addr_auth type, const void * info); diff --git a/src/ipcpd/unicast/addr-auth/flat.c b/src/ipcpd/unicast/addr-auth/flat.c index dfdeccd6..34ca1cef 100644 --- a/src/ipcpd/unicast/addr-auth/flat.c +++ b/src/ipcpd/unicast/addr-auth/flat.c @@ -31,10 +31,11 @@ #include <ouroboros/logs.h> #include <ouroboros/random.h> +#include "addr-auth.h" #include "ipcp.h" #include "flat.h" -#define NAME_LEN 8 +#define NAME_LEN 8 #define INVALID_ADDRESS 0 struct { @@ -63,6 +64,9 @@ int flat_init(const void * info) while (flat.addr == INVALID_ADDRESS) random_buffer(&flat.addr,sizeof(flat.addr)); #endif + log_dbg("Flat address initialized to " ADDR_FMT32 ".", + ADDR_VAL32((uint8_t *) &flat.addr)); + return 0; } diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c index 287eaf41..1fcc9bb2 100644 --- a/src/ipcpd/unicast/ca.c +++ b/src/ipcpd/unicast/ca.c @@ -49,7 +49,6 @@ int ca_init(enum pol_cong_avoid pol) return 0; } - void ca_fini(void) { ca.ops = NULL; diff --git a/src/ipcpd/unicast/dir.c b/src/ipcpd/unicast/dir.c index e0cb09fc..2b305626 100644 --- a/src/ipcpd/unicast/dir.c +++ b/src/ipcpd/unicast/dir.c @@ -44,50 +44,57 @@ struct { struct dir_ops * ops; - void * dir; -} dirmgr; +} dir; -int dir_init(void) +int dir_init(struct dir_config * conf) { - dirmgr.ops = &dht_dir_ops; + void * cfg; - dirmgr.dir = dirmgr.ops->create(); - if (dirmgr.dir == NULL) { - dirmgr.ops = NULL; - return -ENOMEM; + assert(conf != NULL); + + switch (conf->pol) { + case DIR_DHT: + log_info("Using DHT policy."); + dir.ops = &dht_dir_ops; + cfg = &conf->dht; + break; + default: /* DIR_INVALID */ + log_err("Invalid directory policy %d.", conf->pol); + return -EINVAL; } - return 0; + assert(dir.ops->init != NULL); + + return dir.ops->init(cfg); } void dir_fini(void) { - dirmgr.ops->destroy(dirmgr.dir); - dirmgr.ops = NULL; - dirmgr.dir = NULL; + dir.ops->fini(); + dir.ops = NULL; } -int dir_bootstrap(void) +int dir_start(void) { - return dirmgr.ops->bootstrap(dirmgr.dir); + return dir.ops->start(); } -int dir_reg(const uint8_t * hash) +void dir_stop(void) { - return dirmgr.ops->reg(dirmgr.dir, hash); + dir.ops->stop(); } -int dir_unreg(const uint8_t * hash) +int dir_reg(const uint8_t * hash) { - return dirmgr.ops->unreg(dirmgr.dir, hash); + return dir.ops->reg(hash); } -uint64_t dir_query(const uint8_t * hash) +int dir_unreg(const uint8_t * hash) { - return dirmgr.ops->query(dirmgr.dir, hash); + return dir.ops->unreg(hash); } -int dir_wait_running(void) +uint64_t dir_query(const uint8_t * hash) { - return dirmgr.ops->wait_running(dirmgr.dir); + return dir.ops->query(hash); } diff --git a/src/ipcpd/unicast/dir.h b/src/ipcpd/unicast/dir.h index b261ea2c..dbfde19f 100644 --- a/src/ipcpd/unicast/dir.h +++ b/src/ipcpd/unicast/dir.h @@ -25,11 +25,14 @@ #include <inttypes.h> -int dir_init(void); +/* may update the config! */ +int dir_init(struct dir_config * conf); void dir_fini(void); -int dir_bootstrap(void); +int dir_start(void); + +void dir_stop(void); int dir_reg(const uint8_t * hash); @@ -37,6 +40,4 @@ int dir_unreg(const uint8_t * hash); uint64_t dir_query(const uint8_t * hash); -int dir_wait_running(void); - #endif /* OUROBOROS_IPCPD_UNICAST_DIR_H */ diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index da39e567..c7205505 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -4,7 +4,6 @@ * Distributed Hash Table based on Kademlia * * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,10 +19,12 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L +#if !defined (__DHT_TEST__) + #if defined(__linux__) || defined(__CYGWIN__) + #define _DEFAULT_SOURCE + #else + #define _POSIX_C_SOURCE 200112L + #endif #endif #include "config.h" @@ -40,6 +41,7 @@ #include <ouroboros/list.h> #include <ouroboros/notifier.h> #include <ouroboros/random.h> +#include <ouroboros/rib.h> #include <ouroboros/time.h> #include <ouroboros/tpm.h> #include <ouroboros/utils.h> @@ -59,143 +61,153 @@ #include <limits.h> #include "dht.pb-c.h" -typedef DhtMsg dht_msg_t; -typedef DhtContactMsg dht_contact_msg_t; +typedef DhtMsg dht_msg_t; +typedef DhtContactMsg dht_contact_msg_t; +typedef DhtStoreMsg dht_store_msg_t; +typedef DhtFindReqMsg dht_find_req_msg_t; +typedef DhtFindNodeRspMsg dht_find_node_rsp_msg_t; +typedef DhtFindValueRspMsg dht_find_value_rsp_msg_t; +typedef ProtobufCBinaryData binary_data_t; #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ -#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ -#define KAD_K 8 /* Replication factor, MDHT value. */ -#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ -#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 8 /* Response time to wait for a join. */ -#define KAD_T_RESP 5 /* Response time to wait for a response. */ -#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ -#define KAD_QUEER 15 /* Time to declare peer questionable. */ -#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ -#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ -#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define DHT_MAX_REQS 128 /* KAD recommends rnd(), bmp can be changed. */ +#define DHT_WARN_REQS 100 /* Warn if number of requests exceeds this. */ +#define DHT_MAX_VALS 8 /* Max number of values to return for a key. */ +#define DHT_T_CACHE 60 /* Max cache time for values (s) */ +#define DHT_T_RESP 2 /* Response time to wait for a response (s). */ +#define DHT_N_REPUB 5 /* Republish if expiry within n replications. */ +#define DHT_R_PING 2 /* Ping retries before declaring peer dead. */ +#define DHT_QUEER 15 /* Time to declare peer questionable. */ +#define DHT_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define DHT_RESP_RETR 6 /* Number of retries on sending a response. */ #define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ -#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ +#define DHT_INVALID 0 /* Invalid cookie value. */ -enum dht_state { - DHT_INIT = 0, - DHT_SHUTDOWN, - DHT_JOINING, - DHT_RUNNING, -}; +#define KEY_FMT "K<" HASH_FMT64 ">" +#define KEY_VAL(key) HASH_VAL64(key) -enum kad_code { - KAD_JOIN = 0, - KAD_FIND_NODE, - KAD_FIND_VALUE, - /* Messages without a response below. */ - KAD_STORE, - KAD_RESPONSE -}; +#define VAL_FMT "V<" HASH_FMT64 ">" +#define VAL_VAL(val) HASH_VAL64((val).data) -enum kad_req_state { - REQ_NULL = 0, - REQ_INIT, - REQ_PENDING, - REQ_RESPONSE, - REQ_DONE, - REQ_DESTROY -}; +#define KV_FMT "<" HASH_FMT64 ", " HASH_FMT64 ">" +#define KV_VAL(key, val) HASH_VAL64(key), HASH_VAL64((val).data) -enum lookup_state { - LU_NULL = 0, - LU_INIT, - LU_PENDING, - LU_UPDATE, - LU_COMPLETE, - LU_DESTROY -}; +#define PEER_FMT "[" HASH_FMT64 "|" ADDR_FMT32 "]" +#define PEER_VAL(id, addr) HASH_VAL64(id), ADDR_VAL32(&(addr)) -struct kad_req { - struct list_head next; +#define DHT_CODE(msg) dht_code_str[(msg)->code] - uint32_t cookie; - enum kad_code code; - uint8_t * key; - uint64_t addr; +#define TX_HDR_FMT "%s --> " PEER_FMT +#define TX_HDR_VAL(msg, id, addr) DHT_CODE(msg), PEER_VAL(id, addr) - enum kad_req_state state; - pthread_cond_t cond; - pthread_mutex_t lock; +#define RX_HDR_FMT "%s <-- " PEER_FMT +#define RX_HDR_VAL(msg) DHT_CODE(msg), \ + PEER_VAL(msg->src->id.data, msg->src->addr) - time_t t_exp; +#define CK_FMT "|" HASH_FMT64 "|" +#define CK_VAL(cookie) HASH_VAL64(&(cookie)) + +#define IS_REQUEST(code) \ + (code == DHT_FIND_NODE_REQ || code == DHT_FIND_VALUE_REQ) + +enum dht_code { + DHT_STORE, + DHT_FIND_NODE_REQ, + DHT_FIND_NODE_RSP, + DHT_FIND_VALUE_REQ, + DHT_FIND_VALUE_RSP }; -struct cookie_el { - struct list_head next; +const char * dht_code_str[] = { + "DHT_STORE", + "DHT_FIND_NODE_REQ", + "DHT_FIND_NODE_RSP", + "DHT_FIND_VALUE_REQ", + "DHT_FIND_VALUE_RSP" +}; - uint32_t cookie; +enum dht_state { + DHT_NULL = 0, + DHT_INIT, + DHT_RUNNING }; -struct lookup { - struct list_head next; +struct val_entry { + struct list_head next; - struct list_head cookies; + buffer_t val; - uint8_t * key; + time_t t_exp; /* Expiry time */ + time_t t_repl; /* Last replication time */ +}; + +struct dht_entry { + struct list_head next; - struct list_head contacts; - size_t n_contacts; + uint8_t * key; - uint64_t * addrs; - size_t n_addrs; + struct { + struct list_head list; + size_t len; + } vals; /* We don't own these, only replicate */ - enum lookup_state state; - pthread_cond_t cond; - pthread_mutex_t lock; + struct { + struct list_head list; + size_t len; + } lvals; /* We own these, must be republished */ }; -struct val { +struct contact { struct list_head next; + uint8_t * id; uint64_t addr; - time_t t_exp; - time_t t_rep; + size_t fails; + time_t t_seen; }; -struct ref_entry { +struct peer_entry { struct list_head next; - uint8_t * key; + uint64_t cookie; + uint8_t * id; + uint64_t addr; + enum dht_code code; - time_t t_rep; + time_t t_sent; }; -struct dht_entry { +struct dht_req { struct list_head next; uint8_t * key; - size_t n_vals; - struct list_head vals; -}; - -struct contact { - struct list_head next; + time_t t_exp; - uint8_t * id; - uint64_t addr; + struct { + struct list_head list; + size_t len; + } peers; - size_t fails; - time_t t_seen; + struct { + struct list_head list; + size_t len; + } cache; }; struct bucket { - struct list_head contacts; - size_t n_contacts; + struct { + struct list_head list; + size_t len; + } contacts; - struct list_head alts; - size_t n_alts; + struct { + struct list_head list; + size_t len; + } alts; time_t t_refr; @@ -203,1093 +215,1461 @@ struct bucket { uint8_t mask; struct bucket * parent; - struct bucket * children[1L << KAD_BETA]; + struct bucket * children[1L << DHT_BETA]; }; struct cmd { - struct list_head next; - - struct shm_du_buff * sdb; + struct list_head next; + buffer_t cbuf; }; struct dir_ops dht_dir_ops = { - .create = dht_create, - .destroy = dht_destroy, - .bootstrap = dht_bootstrap, - .reg = dht_reg, - .unreg = dht_unreg, - .query = dht_query, - .wait_running = dht_wait_running + .init = (int (*)(void *)) dht_init, + .fini = dht_fini, + .start = dht_start, + .stop = dht_stop, + .reg = dht_reg, + .unreg = dht_unreg, + .query = dht_query }; -struct dht { - size_t alpha; - size_t b; - size_t k; +struct { + struct { /* Kademlia parameters */ + uint32_t alpha; /* Number of concurrent requests */ + size_t k; /* Number of replicas to store */ + time_t t_expire; /* Expiry time for values (s) */ + time_t t_refresh; /* Refresh time for contacts (s) */ + time_t t_repl; /* Replication time for values (s) */ + }; - time_t t_expire; - time_t t_refresh; - time_t t_replic; - time_t t_repub; + buffer_t id; - uint8_t * id; - uint64_t addr; + time_t t0; /* Creation time */ + uint64_t addr; /* Our own address */ + uint64_t peer; /* Enrollment peer address */ + uint64_t magic; /* Magic cookie for retransmit */ - struct bucket * buckets; + uint64_t eid; /* Entity ID */ - struct list_head entries; + struct tpm * tpm; + pthread_t worker; - struct list_head refs; + enum dht_state state; - struct list_head lookups; + struct { + struct { + struct bucket * root; + } contacts; + + struct { + struct list_head list; + size_t len; + size_t vals; + size_t lvals; + } kv; + + pthread_rwlock_t lock; + } db; + + struct { + struct list_head list; + size_t len; + pthread_cond_t cond; + pthread_mutex_t mtx; + } reqs; + + struct { + struct list_head list; + pthread_cond_t cond; + pthread_mutex_t mtx; + } cmds; +} dht; + + +/* DHT RIB */ + +static const char * dht_dir[] = { + "database", + "stats", + NULL +}; - struct list_head requests; - struct bmp * cookies; +const char * dht_stats = \ + "DHT: " HASH_FMT64 "\n" + " Created: %s\n" + " Address: " ADDR_FMT32 "\n" + " Kademlia parameters:\n" + " Number of concurrent requests (alpha): %10zu\n" + " Number of replicas (k): %10zu\n" + " Expiry time for values (s): %10ld\n" + " Refresh time for contacts (s): %10ld\n" + " Replication time for values (s): %10ld\n" + " Number of keys: %10zu\n" + " Number of local values: %10zu\n" + " Number of non-local values: %10zu\n"; - enum dht_state state; - struct list_head cmds; - pthread_cond_t cond; - pthread_mutex_t mtx; +static int dht_rib_statfile(char * buf, + size_t len) +{ + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + size_t keys; + size_t vals; + size_t lvals; - pthread_rwlock_t lock; + assert(buf != NULL); + assert(len > 0); - uint64_t eid; + pthread_rwlock_rdlock(&dht.db.lock); - struct tpm * tpm; + keys = dht.db.kv.len; + lvals = dht.db.kv.lvals; + vals = dht.db.kv.vals; - pthread_t worker; -}; + pthread_rwlock_unlock(&dht.db.lock); -struct join_info { - struct dht * dht; - uint64_t addr; -}; + tm = gmtime(&dht.t0); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); -struct packet_info { - struct dht * dht; - struct shm_du_buff * sdb; -}; + snprintf(buf, len, dht_stats, + HASH_VAL64(dht.id.data), + tmstr, + ADDR_VAL32(&dht.addr), + dht.alpha, dht.k, + dht.t_expire, dht.t_refresh, dht.t_repl, + keys, vals, lvals); + + return strlen(buf); +} -static uint8_t * dht_dup_key(const uint8_t * key, - size_t len) +static size_t dht_db_file_len(void) { - uint8_t * dup; + size_t sz; + size_t vals; - dup = malloc(sizeof(*dup) * len); - if (dup == NULL) - return NULL; + sz = 18; /* DHT database + 2 * \n */ - memcpy(dup, key, len); + pthread_rwlock_rdlock(&dht.db.lock); - return dup; -} + if (dht.db.kv.len == 0) { + pthread_rwlock_unlock(&dht.db.lock); + sz += 14; /* No entries */ + return sz; + } -static enum dht_state dht_get_state(struct dht * dht) -{ - enum dht_state state; + sz += 39 * 3 + 1; /* tally + extra newline */ + sz += dht.db.kv.len * (25 + 19 + 23 + 1); - pthread_mutex_lock(&dht->mtx); + vals = dht.db.kv.vals + dht.db.kv.lvals; - state = dht->state; + sz += vals * (48 + 2 * RIB_TM_STRLEN); - pthread_mutex_unlock(&dht->mtx); + pthread_rwlock_unlock(&dht.db.lock); - return state; + return sz; } -static int dht_set_state(struct dht * dht, - enum dht_state state) +static int dht_rib_dbfile(char * buf, + size_t len) { - pthread_mutex_lock(&dht->mtx); + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + char exstr[RIB_TM_STRLEN]; + size_t i = 0; + struct list_head * p; - if (state == DHT_JOINING && dht->state != DHT_INIT) { - pthread_mutex_unlock(&dht->mtx); - return -1; + assert(buf != NULL); + assert(len > 0); + + pthread_rwlock_rdlock(&dht.db.lock); + + if (dht.db.kv.len == 0) { + i += snprintf(buf, len, " No entries.\n"); + pthread_rwlock_unlock(&dht.db.lock); + return i; } - dht->state = state; + i += snprintf(buf + i, len - i, "DHT database:\n\n"); + i += snprintf(buf + i, len - i, + "Number of keys: %10zu\n" + "Number of local values: %10zu\n" + "Number of non-local values: %10zu\n\n", + dht.db.kv.len, dht.db.kv.vals, dht.db.kv.lvals); + + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + struct list_head * h; + + i += snprintf(buf + i, len - i, "Key: " KEY_FMT "\n", + KEY_VAL(e->key)); + i += snprintf(buf + i, len - i, " Local entries:\n"); + + list_for_each(h, &e->vals.list) { + struct val_entry * v; + + v = list_entry(h, struct val_entry, next); + + tm = gmtime(&v->t_repl); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + + tm = gmtime(&v->t_exp); + strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm); + + i += snprintf(buf + i, len - i, + " " VAL_FMT + ", t_replicated=%.*s, t_expire=%.*s\n", + VAL_VAL(v->val), + RIB_TM_STRLEN, tmstr, + RIB_TM_STRLEN, exstr); + } - pthread_cond_broadcast(&dht->cond); + i += snprintf(buf + i, len - i, "\n"); - pthread_mutex_unlock(&dht->mtx); + i += snprintf(buf + i, len - i, " Non-local entries:\n"); + + list_for_each(h, &e->lvals.list) { + struct val_entry * v; + + v= list_entry(h, struct val_entry, next); + + tm = gmtime(&v->t_repl); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + + tm = gmtime(&v->t_exp); + strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm); + + i += snprintf(buf + i, len - i, + " " VAL_FMT + ", t_replicated=%.*s, t_expire=%.*s\n", + VAL_VAL(v->val), + RIB_TM_STRLEN, tmstr, + RIB_TM_STRLEN, exstr); + + } + } + + pthread_rwlock_unlock(&dht.db.lock); + + printf("DHT RIB DB file generated (%zu bytes).\n", i); + + return i; +} + +static int dht_rib_read(const char * path, + char * buf, + size_t len) +{ + char * entry; + + entry = strstr(path, RIB_SEPARATOR) + 1; + + if (strcmp(entry, "database") == 0) { + return dht_rib_dbfile(buf, len); + } else if (strcmp(entry, "stats") == 0) { + return dht_rib_statfile(buf, len); + } return 0; } -int dht_wait_running(void * dir) +static int dht_rib_readdir(char *** buf) { - struct dht * dht; - int ret = 0; + int i = 0; + + while (dht_dir[i++] != NULL); + + *buf = malloc(sizeof(**buf) * i); + if (*buf == NULL) + goto fail_buf; + + i = 0; + + while (dht_dir[i] != NULL) { + (*buf)[i] = strdup(dht_dir[i]); + if ((*buf)[i] == NULL) + goto fail_dup; + i++; + } - dht = (struct dht *) dir; + return i; + fail_dup: + freepp(char, *buf, i); + fail_buf: + return -ENOMEM; +} - pthread_mutex_lock(&dht->mtx); +static int dht_rib_getattr(const char * path, + struct rib_attr * attr) +{ + struct timespec now; + char * entry; - pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - while (dht->state == DHT_JOINING) - pthread_cond_wait(&dht->cond, &dht->mtx); + attr->mtime = now.tv_sec; - if (dht->state != DHT_RUNNING) - ret = -1; + entry = strstr(path, RIB_SEPARATOR) + 1; - pthread_cleanup_pop(true); + if (strcmp(entry, "database") == 0) { + attr->size = dht_db_file_len(); + } else if (strcmp(entry, "stats") == 0) { + attr->size = 545; + } - return ret; + return 0; } -static uint8_t * create_id(size_t len) +static struct rib_ops r_ops = { + .read = dht_rib_read, + .readdir = dht_rib_readdir, + .getattr = dht_rib_getattr +}; + +/* Helper functions */ + +static uint8_t * generate_id(void) { uint8_t * id; - id = malloc(len); - if (id == NULL) + if(dht.id.len < sizeof(uint64_t)) { + log_err("DHT ID length is too short (%zu < %zu).", + dht.id.len, sizeof(uint64_t)); return NULL; + }; - if (random_buffer(id, len) < 0) { - free(id); - return NULL; + id = malloc(dht.id.len); + if (id == NULL) { + log_err("Failed to malloc ID."); + goto fail_id; + } + + if (random_buffer(id, dht.id.len) < 0) { + log_err("Failed to generate random ID."); + goto fail_rnd; } return id; + fail_rnd: + free(id); + fail_id: + return NULL; } -static void kad_req_create(struct dht * dht, - dht_msg_t * msg, - uint64_t addr) +static uint64_t generate_cookie(void) { - struct kad_req * req; - pthread_condattr_t cattr; - struct timespec t; - size_t b; + uint64_t cookie = DHT_INVALID; - clock_gettime(CLOCK_REALTIME_COARSE, &t); + while (cookie == DHT_INVALID) + random_buffer((uint8_t *) &cookie, sizeof(cookie)); - req = malloc(sizeof(*req)); - if (req == NULL) - goto fail_malloc; + return cookie; +} - list_head_init(&req->next); +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, + const uint8_t * dst) +{ + assert(dht.id.len >= sizeof(uint64_t)); - req->t_exp = t.tv_sec + KAD_T_RESP; - req->addr = addr; - req->state = REQ_INIT; - req->cookie = msg->cookie; - req->code = msg->code; - req->key = NULL; + return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +} - pthread_rwlock_rdlock(&dht->lock); - b = dht->b; - pthread_rwlock_unlock(&dht->lock); +#define IS_CLOSER(x, y) (dist((x), dht.id.data) < dist((y), dht.id.data)) - if (msg->has_key) { - req->key = dht_dup_key(msg->key.data, b); - if (req->key == NULL) - goto fail_dup_key; - } +static int addr_to_buf(const uint64_t addr, + buffer_t * buf) +{ + size_t len; + uint64_t _addr; - if (pthread_mutex_init(&req->lock, NULL)) - goto fail_mutex; + len = sizeof(addr); + _addr = hton64(addr); + assert(buf != NULL); - if (pthread_condattr_init(&cattr)) - goto fail_condattr; -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif + buf->data = malloc(len); + if (buf->data == NULL) + goto fail_malloc; - if (pthread_cond_init(&req->cond, &cattr)) - goto fail_cond_init; + buf->len = sizeof(_addr); + memcpy(buf->data, &_addr, sizeof(_addr)); - pthread_condattr_destroy(&cattr); + return 0; + fail_malloc: + return -ENOMEM; +} - pthread_rwlock_wrlock(&dht->lock); +static int buf_to_addr(const buffer_t buf, + uint64_t * addr) +{ + assert(addr != NULL); + assert(buf.data != NULL); - list_add(&req->next, &dht->requests); + if (buf.len != sizeof(*addr)) + return - EINVAL; - pthread_rwlock_unlock(&dht->lock); + *addr = ntoh64(*((uint64_t *) buf.data)); - return; + if (*addr == dht.addr) + *addr = INVALID_ADDR; - fail_cond_init: - pthread_condattr_destroy(&cattr); - fail_condattr: - pthread_mutex_destroy(&req->lock); - fail_mutex: - free(req->key); - fail_dup_key: - free(req); - fail_malloc: - return; + return 0; } -static void cancel_req_destroy(void * o) +static uint8_t * dht_dup_key(const uint8_t * key) { - struct kad_req * req = (struct kad_req *) o; + uint8_t * dup; - pthread_mutex_unlock(&req->lock); + assert(key != NULL); + assert(dht.id.len != 0); - pthread_cond_destroy(&req->cond); - pthread_mutex_destroy(&req->lock); + dup = malloc(dht.id.len); + if (dup == NULL) + return NULL; - if (req->key != NULL) - free(req->key); + memcpy(dup, key, dht.id.len); - free(req); + return dup; } -static void kad_req_destroy(struct kad_req * req) +/* DHT */ + +static struct val_entry * val_entry_create(const buffer_t val, + time_t exp) { - struct timespec t; - struct timespec intv = TIMESPEC_INIT_S(20); + struct val_entry * e; + struct timespec now; - clock_gettime(PTHREAD_COND_CLOCK, &t); - ts_add(&t, &intv, &t); + assert(val.data != NULL); + assert(val.len > 0); - assert(req); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_mutex_lock(&req->lock); +#ifndef __DHT_TEST_ALLOW_EXPIRED__ + if (exp < now.tv_sec) + return NULL; /* Refuse to add expired values */ +#endif + e = malloc(sizeof(*e)); + if (e == NULL) + goto fail_entry; - switch (req->state) { - case REQ_DESTROY: - pthread_mutex_unlock(&req->lock); - return; - case REQ_PENDING: - req->state = REQ_DESTROY; - pthread_cond_broadcast(&req->cond); - break; - case REQ_INIT: - case REQ_DONE: - req->state = REQ_NULL; - break; - case REQ_RESPONSE: - case REQ_NULL: - default: - break; - } + list_head_init(&e->next); - pthread_cleanup_push(cancel_req_destroy, req); + e->val.len = val.len; + e->val.data = malloc(val.len); + if (e->val.data == NULL) + goto fail_val; - while (req->state != REQ_NULL && req->state != REQ_DONE) - pthread_cond_timedwait(&req->cond, &req->lock, &t); + memcpy(e->val.data, val.data, val.len); - pthread_cleanup_pop(true); + e->t_repl = 0; + e->t_exp = exp; + + return e; + + fail_val: + free(e); + fail_entry: + return NULL; } -static int kad_req_wait(struct kad_req * req, - time_t t) +static void val_entry_destroy(struct val_entry * v) { - struct timespec timeo = TIMESPEC_INIT_S(0); - struct timespec abs; - int ret = 0; + assert(v->val.data != NULL); - assert(req); + freebuf(v->val); + free(v); +} - timeo.tv_sec = t; +static struct dht_entry * dht_entry_create(const uint8_t * key) +{ + struct dht_entry * e; - clock_gettime(PTHREAD_COND_CLOCK, &abs); + assert(key != NULL); - ts_add(&abs, &timeo, &abs); + e = malloc(sizeof(*e)); + if (e == NULL) + goto fail_entry; - pthread_mutex_lock(&req->lock); + list_head_init(&e->next); + list_head_init(&e->vals.list); + list_head_init(&e->lvals.list); - req->state = REQ_PENDING; + e->vals.len = 0; + e->lvals.len = 0; - pthread_cleanup_push(__cleanup_mutex_unlock, &req->lock); + e->key = dht_dup_key(key); + if (e->key == NULL) + goto fail_key; - while (req->state == REQ_PENDING && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); + return e; + fail_key: + free(e); + fail_entry: + return NULL; +} - switch(req->state) { - case REQ_DESTROY: - ret = -1; - req->state = REQ_NULL; - pthread_cond_broadcast(&req->cond); - break; - case REQ_PENDING: /* ETIMEDOUT */ - case REQ_RESPONSE: - req->state = REQ_DONE; - pthread_cond_broadcast(&req->cond); - break; - default: - break; +static void dht_entry_destroy(struct dht_entry * e) +{ + struct list_head * p; + struct list_head * h; + + assert(e != NULL); + + list_for_each_safe(p, h, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + --e->vals.len; + --dht.db.kv.vals; } - pthread_cleanup_pop(true); + list_for_each_safe(p, h, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + --e->lvals.len; + --dht.db.kv.lvals; + } + + free(e->key); - return ret; + assert(e->vals.len == 0 && e->lvals.len == 0); + + free(e); } -static void kad_req_respond(struct kad_req * req) +static struct val_entry * dht_entry_get_lval(const struct dht_entry * e, + const buffer_t val) { - pthread_mutex_lock(&req->lock); + struct list_head * p; - req->state = REQ_RESPONSE; - pthread_cond_broadcast(&req->cond); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + list_for_each(p, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (bufcmp(&v->val, &val) == 0) + return v; + } - pthread_mutex_unlock(&req->lock); + return NULL; } -static struct contact * contact_create(const uint8_t * id, - size_t len, - uint64_t addr) +static struct val_entry * dht_entry_get_val(const struct dht_entry * e, + const buffer_t val) { - struct contact * c; - struct timespec t; - - c = malloc(sizeof(*c)); - if (c == NULL) - return NULL; + struct list_head * p; - list_head_init(&c->next); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - clock_gettime(CLOCK_REALTIME_COARSE, &t); + list_for_each(p, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (bufcmp(&v->val, &val) == 0) + return v; - c->addr = addr; - c->fails = 0; - c->t_seen = t.tv_sec; - c->id = dht_dup_key(id, len); - if (c->id == NULL) { - free(c); - return NULL; } - return c; + return NULL; } -static void contact_destroy(struct contact * c) +static int dht_entry_update_val(struct dht_entry * e, + buffer_t val, + time_t exp) { - if (c != NULL) - free(c->id); + struct val_entry * v; + struct timespec now; - free(c); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (exp < now.tv_sec) + return -EINVAL; /* Refuse to add expired values */ + + if (dht_entry_get_lval(e, val) != NULL) { + log_dbg(KV_FMT " Val already in lvals.", KV_VAL(e->key, val)); + return 0; /* Refuse to add local values */ + } + + v = dht_entry_get_val(e, val); + if (v == NULL) { + v = val_entry_create(val, exp); + if (v == NULL) + return -ENOMEM; + + list_add_tail(&v->next, &e->vals.list); + ++e->vals.len; + ++dht.db.kv.vals; + + return 0; + } + + if (v->t_exp < exp) + v->t_exp = exp; + + return 0; } -static struct bucket * iter_bucket(struct bucket * b, - const uint8_t * id) +static int dht_entry_update_lval(struct dht_entry * e, + buffer_t val) { - uint8_t byte; - uint8_t mask; + struct val_entry * v; + struct timespec now; - assert(b); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - if (b->children[0] == NULL) - return b; + clock_gettime(CLOCK_REALTIME_COARSE, &now); - byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + v = dht_entry_get_lval(e, val); + if (v == NULL) { + log_dbg(KV_FMT " Adding lval.", KV_VAL(e->key, val)); + v = val_entry_create(val, now.tv_sec + dht.t_expire); + if (v == NULL) + return -ENOMEM; - mask = ((1L << KAD_BETA) - 1) & 0xFF; + list_add_tail(&v->next, &e->lvals.list); + ++e->lvals.len; + ++dht.db.kv.lvals; - byte >>= (CHAR_BIT - KAD_BETA) - - (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + return 0; + } - return iter_bucket(b->children[(byte & mask)], id); + return 0; } -static struct bucket * dht_get_bucket(struct dht * dht, - const uint8_t * id) +static int dht_entry_remove_lval(struct dht_entry * e, + buffer_t val) { - assert(dht->buckets); + struct val_entry * v; + + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + v = dht_entry_get_lval(e, val); + if (v == NULL) + return -ENOENT; + + log_dbg(KV_FMT " Removing lval.", KV_VAL(e->key, val)); - return iter_bucket(dht->buckets, id); + list_del(&v->next); + val_entry_destroy(v); + --e->lvals.len; + --dht.db.kv.lvals; + + return 0; } -/* - * If someone builds a network where the n (n > k) closest nodes all - * have IDs starting with the same 64 bits: by all means, change this. - */ -static uint64_t dist(const uint8_t * src, - const uint8_t * dst) +#define IS_EXPIRED(v, now) ((now)->tv_sec > (v)->t_exp) +static void dht_entry_remove_expired_vals(struct dht_entry * e) { - return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); + struct list_head * p; + struct list_head * h; + struct timespec now; + + assert(e != NULL); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + list_for_each_safe(p, h, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (!IS_EXPIRED(v, &now)) + continue; + + log_dbg(KV_FMT " Value expired." , KV_VAL(e->key, v->val)); + list_del(&v->next); + val_entry_destroy(v); + --e->vals.len; + --dht.db.kv.vals; + } } -static size_t list_add_sorted(struct list_head * l, - struct contact * c, - const uint8_t * key) +static struct dht_entry * __dht_kv_find_entry(const uint8_t * key) { struct list_head * p; - assert(l); - assert(c); - assert(key); - assert(c->id); + assert(key != NULL); - list_for_each(p, l) { - struct contact * e = list_entry(p, struct contact, next); - if (dist(c->id, key) > dist(e->id, key)) - break; + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + if (!memcmp(key, e->key, dht.id.len)) + return e; } - list_add_tail(&c->next, p); - - return 1; + return NULL; } -static size_t dht_contact_list(struct dht * dht, - struct list_head * l, - const uint8_t * key) +static void dht_kv_remove_expired_entries(void) { struct list_head * p; - struct bucket * b; - size_t len = 0; - size_t i; - struct timespec t; - - assert(l); - assert(dht); - assert(key); - assert(list_is_empty(l)); + struct list_head * h; + struct timespec now; - clock_gettime(CLOCK_REALTIME_COARSE, &t); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - b = dht_get_bucket(dht, key); - if (b == NULL) - return 0; + pthread_rwlock_wrlock(&dht.db.lock); - b->t_refr = t.tv_sec + KAD_T_REFR; + list_for_each_safe(p, h, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + dht_entry_remove_expired_vals(e); + if (e->lvals.len > 0 || e->vals.len > 0) + continue; - if (b->n_contacts == dht->k || b->parent == NULL) { - list_for_each(p, &b->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht->b, c->addr); - if (list_add_sorted(l, c, key) == 1) - if (++len == dht->k) - break; - } - } else { - struct bucket * d = b->parent; - for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { - list_for_each(p, &d->children[i]->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht->b, c->addr); - if (c == NULL) - continue; - if (list_add_sorted(l, c, key) == 1) - if (++len == dht->k) - break; - } - } + log_dbg(KEY_FMT " Entry removed. ", KEY_VAL(e->key)); + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; } - assert(len == dht->k || b->parent == NULL); - - return len; + pthread_rwlock_unlock(&dht.db.lock); } -static struct lookup * lookup_create(struct dht * dht, - const uint8_t * id) + +static struct contact * contact_create(const uint8_t * id, + uint64_t addr) { - struct lookup * lu; - pthread_condattr_t cattr; + struct contact * c; + struct timespec t; - assert(dht); - assert(id); + c = malloc(sizeof(*c)); + if (c == NULL) + return NULL; - lu = malloc(sizeof(*lu)); - if (lu == NULL) - goto fail_malloc; + list_head_init(&c->next); - list_head_init(&lu->contacts); - list_head_init(&lu->cookies); + clock_gettime(CLOCK_REALTIME_COARSE, &t); - lu->state = LU_INIT; - lu->addrs = NULL; - lu->n_addrs = 0; - lu->key = dht_dup_key(id, dht->b); - if (lu->key == NULL) - goto fail_id; + c->addr = addr; + c->fails = 0; + c->t_seen = t.tv_sec; + c->id = dht_dup_key(id); + if (c->id == NULL) { + free(c); + return NULL; + } - if (pthread_mutex_init(&lu->lock, NULL)) - goto fail_mutex; + return c; +} - pthread_condattr_init(&cattr); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif +static void contact_destroy(struct contact * c) +{ + assert(c != NULL); + assert(list_is_empty(&c->next)); + + free(c->id); + free(c); +} - if (pthread_cond_init(&lu->cond, &cattr)) - goto fail_cond; +static struct dht_req * dht_req_create(const uint8_t * key) +{ + struct dht_req * req; + struct timespec now; - pthread_condattr_destroy(&cattr); + assert(key != NULL); - pthread_rwlock_wrlock(&dht->lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); - list_add(&lu->next, &dht->lookups); + req = malloc(sizeof(*req)); + if (req == NULL) + goto fail_malloc; - lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + list_head_init(&req->next); - pthread_rwlock_unlock(&dht->lock); + req->t_exp = now.tv_sec + DHT_T_RESP; - return lu; + list_head_init(&req->peers.list); + req->peers.len = 0; - fail_cond: - pthread_condattr_destroy(&cattr); - pthread_mutex_destroy(&lu->lock); - fail_mutex: - free(lu->key); - fail_id: - free(lu); + req->key = dht_dup_key(key); + if (req->key == NULL) + goto fail_dup_key; + + list_head_init(&req->cache.list); + req->cache.len = 0; + + return req; + + fail_dup_key: + free(req); fail_malloc: return NULL; } -static void cancel_lookup_destroy(void * o) +static void dht_req_destroy(struct dht_req * req) { - struct lookup * lu; struct list_head * p; struct list_head * h; - lu = (struct lookup *) o; - - if (lu->key != NULL) - free(lu->key); - if (lu->addrs != NULL) - free(lu->addrs); + assert(req); + assert(req->key); - list_for_each_safe(p, h, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - list_del(&c->next); - contact_destroy(c); + list_for_each_safe(p, h, &req->peers.list) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + list_del(&e->next); + free(e->id); + free(e); + --req->peers.len; } - list_for_each_safe(p, h, &lu->cookies) { - struct cookie_el * c = list_entry(p, struct cookie_el, next); - list_del(&c->next); - free(c); + list_for_each_safe(p, h, &req->cache.list) { + struct val_entry * e = list_entry(p, struct val_entry, next); + list_del(&e->next); + val_entry_destroy(e); + --req->cache.len; } - pthread_mutex_unlock(&lu->lock); + free(req->key); - pthread_mutex_destroy(&lu->lock); + assert(req->peers.len == 0); - free(lu); + free(req); } -static void lookup_destroy(struct lookup * lu) +static struct peer_entry * dht_req_get_peer(struct dht_req * req, + struct peer_entry * e) { - assert(lu); - - pthread_mutex_lock(&lu->lock); + struct list_head * p; - switch (lu->state) { - case LU_DESTROY: - pthread_mutex_unlock(&lu->lock); - return; - case LU_PENDING: - lu->state = LU_DESTROY; - pthread_cond_broadcast(&lu->cond); - break; - case LU_INIT: - case LU_UPDATE: - case LU_COMPLETE: - lu->state = LU_NULL; - break; - case LU_NULL: - default: - break; + list_for_each(p, &req->peers.list) { + struct peer_entry * x = list_entry(p, struct peer_entry, next); + if (x->addr == e->addr) + return x; } - pthread_cleanup_push(cancel_lookup_destroy, lu); - - while (lu->state != LU_NULL) - pthread_cond_wait(&lu->cond, &lu->lock); - - pthread_cleanup_pop(true); + return NULL; } -static void lookup_update(struct dht * dht, - struct lookup * lu, - dht_msg_t * msg) +#define IS_MAGIC(peer) ((peer)->cookie == dht.magic) +void dht_req_add_peer(struct dht_req * req, + struct peer_entry * e) { - struct list_head * p = NULL; - struct list_head * h; - struct contact * c = NULL; - size_t n; - size_t pos = 0; - bool mod = false; + struct peer_entry * x; /* existing */ + struct list_head * p; /* iterator */ + size_t pos = 0; - assert(lu); - assert(msg); + assert(req != NULL); + assert(e != NULL); + assert(e->id != NULL); - if (dht_get_state(dht) != DHT_RUNNING) - return; + /* + * Dedupe messages to the same peer, unless + * 1) The previous request was FIND_NODE and now it's FIND_VALUE + * 2) We urgently need contacts from emergency peer (magic cookie) + */ + x = dht_req_get_peer(req, e); + if (x != NULL && x->code >= e->code && !IS_MAGIC(e)) + goto skip; - pthread_mutex_lock(&lu->lock); - - list_for_each_safe(p, h, &lu->cookies) { - struct cookie_el * e = list_entry(p, struct cookie_el, next); - if (e->cookie == msg->cookie) { - list_del(&e->next); - free(e); - break; + /* Find how this contact ranks in distance to the key */ + list_for_each(p, &req->peers.list) { + struct peer_entry * y = list_entry(p, struct peer_entry, next); + if (IS_CLOSER(y->id, e->id)) { + pos++; + continue; } + break; } - if (lu->state == LU_COMPLETE) { - pthread_mutex_unlock(&lu->lock); - return; - } + /* Add a new peer to this request if we need to */ + if (pos < dht.alpha || !IS_MAGIC(e)) { + x = malloc(sizeof(*x)); + if (x == NULL) { + log_err("Failed to malloc peer entry."); + goto skip; + } - if (msg->n_addrs > 0) { - if (lu->addrs == NULL) { - lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); - for (n = 0; n < msg->n_addrs; ++n) - lu->addrs[n] = msg->addrs[n]; - lu->n_addrs = msg->n_addrs; + x->cookie = e->cookie; + x->addr = e->addr; + x->code = e->code; + x->t_sent = e->t_sent; + x->id = dht_dup_key(e->id); + if (x->id == NULL) { + log_err("Failed to dup peer ID."); + free(x); + goto skip; } - lu->state = LU_COMPLETE; - pthread_cond_broadcast(&lu->cond); - pthread_mutex_unlock(&lu->lock); + if (IS_MAGIC(e)) + list_add(&x->next, p); + else + list_add_tail(&x->next, p); + ++req->peers.len; return; } + skip: + list_del(&e->next); + free(e->id); + free(e); +} + +static size_t dht_req_add_peers(struct dht_req * req, + struct list_head * pl) +{ + struct list_head * p; + struct list_head * h; + size_t n = 0; - pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock); + assert(req != NULL); + assert(pl != NULL); - while (lu->state == LU_INIT) { - pthread_rwlock_unlock(&dht->lock); - pthread_cond_wait(&lu->cond, &lu->lock); - pthread_rwlock_rdlock(&dht->lock); + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + dht_req_add_peer(req, e); } - pthread_cleanup_pop(false); + return n; +} - for (n = 0; n < msg->n_contacts; ++n) { - c = contact_create(msg->contacts[n]->id.data, - dht->b, msg->contacts[n]->addr); - if (c == NULL) - continue; +static bool dht_req_has_peer(struct dht_req * req, + uint64_t cookie) +{ + struct list_head * p; - pos = 0; + assert(req != NULL); - list_for_each(p, &lu->contacts) { - struct contact * e; - e = list_entry(p, struct contact, next); - if (!memcmp(e->id, c->id, dht->b)) { - contact_destroy(c); - c = NULL; - break; - } + list_for_each(p, &req->peers.list) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + if (e->cookie == cookie) + return true; + } - if (dist(c->id, lu->key) > dist(e->id, lu->key)) - break; + return false; +} - pos++; - } +static void peer_list_destroy(struct list_head * pl) +{ + struct list_head * p; + struct list_head * h; - if (c == NULL) - continue; + assert(pl != NULL); - if (lu->n_contacts < dht->k) { - list_add_tail(&c->next, p); - ++lu->n_contacts; - mod = true; - } else if (pos == dht->k) { - contact_destroy(c); - } else { - struct contact * d; - d = list_last_entry(&lu->contacts, - struct contact, next); - list_add_tail(&c->next, p); - list_del(&d->next); - contact_destroy(d); - mod = true; - } + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + list_del(&e->next); + free(e->id); + free(e); } - - if (list_is_empty(&lu->cookies) && !mod) - lu->state = LU_COMPLETE; - else - lu->state = LU_UPDATE; - - pthread_cond_broadcast(&lu->cond); - pthread_mutex_unlock(&lu->lock); - return; } -static ssize_t lookup_get_addrs(struct lookup * lu, - uint64_t * addrs) +static int dht_kv_create_peer_list(struct list_head * cl, + struct list_head * pl, + enum dht_code code) { - ssize_t n; - - assert(lu); + struct list_head * p; + struct list_head * h; + struct timespec now; + size_t len; - pthread_mutex_lock(&lu->lock); + assert(cl != NULL); + assert(pl != NULL); + assert(list_is_empty(pl)); - for (n = 0; (size_t) n < lu->n_addrs; ++n) - addrs[n] = lu->addrs[n]; + clock_gettime(CLOCK_REALTIME_COARSE, &now); - assert((size_t) n == lu->n_addrs); + len = 0; - pthread_mutex_unlock(&lu->lock); + list_for_each_safe(p, h, cl) { + struct contact * c = list_entry(p, struct contact, next); + struct peer_entry * e; + if (len++ == dht.alpha) + break; - return n; -} + e = malloc(sizeof(*e)); + if (e == NULL) + return -ENOMEM; -static ssize_t lookup_contact_addrs(struct lookup * lu, - uint64_t * addrs) -{ - struct list_head * p; - ssize_t n = 0; + e->cookie = generate_cookie(); + e->code = code; + e->addr = c->addr; + e->t_sent = now.tv_sec; - assert(lu); - assert(addrs); + e->id = c->id; - pthread_mutex_lock(&lu->lock); + list_add_tail(&e->next, pl); - list_for_each(p, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - addrs[n] = c->addr; - n++; + list_del(&c->next); + c->id = NULL; /* we stole the id */ + contact_destroy(c); } - pthread_mutex_unlock(&lu->lock); - - return n; + return 0; } -static void lookup_new_addrs(struct lookup * lu, - uint64_t * addrs) +static struct dht_req * __dht_kv_req_get_req(const uint8_t * key) { struct list_head * p; - size_t n = 0; - assert(lu); - assert(addrs); + list_for_each(p, &dht.reqs.list) { + struct dht_req * r = list_entry(p, struct dht_req, next); + if (memcmp(r->key, key, dht.id.len) == 0) + return r; + } - pthread_mutex_lock(&lu->lock); + return NULL; +} - /* Uses fails to check if the contact has been contacted. */ - list_for_each(p, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - if (c->fails == 0) { - c->fails = 1; - addrs[n] = c->addr; - n++; - } +static struct dht_req * __dht_kv_get_req_cache(const uint8_t * key) +{ + struct dht_req * req; - if (n == KAD_ALPHA) - break; - } + assert(key != NULL); - assert(n <= KAD_ALPHA); + req = __dht_kv_req_get_req(key); + if (req == NULL) + return NULL; - addrs[n] = 0; + if (req->cache.len == 0) + return NULL; - pthread_mutex_unlock(&lu->lock); + return req; } -static void lookup_set_state(struct lookup * lu, - enum lookup_state state) +static void __dht_kv_req_remove(const uint8_t * key) { - pthread_mutex_lock(&lu->lock); + struct dht_req * req; - lu->state = state; - pthread_cond_broadcast(&lu->cond); + assert(key != NULL); - pthread_mutex_unlock(&lu->lock); -} + req = __dht_kv_req_get_req(key); + if (req == NULL) + return; -static void cancel_lookup_wait(void * o) -{ - struct lookup * lu = (struct lookup *) o; - lu->state = LU_NULL; - pthread_mutex_unlock(&lu->lock); - lookup_destroy(lu); + list_del(&req->next); + --dht.reqs.len; + + dht_req_destroy(req); } -static enum lookup_state lookup_wait(struct lookup * lu) +static struct dht_req * __dht_kv_get_req_peer(const uint8_t * key, + uint64_t cookie) { - struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP); - struct timespec abs; - enum lookup_state state; - int ret = 0; - - clock_gettime(PTHREAD_COND_CLOCK, &abs); + struct dht_req * req; - ts_add(&abs, &timeo, &abs); + assert(key != NULL); - pthread_mutex_lock(&lu->lock); - - if (lu->state == LU_INIT || lu->state == LU_UPDATE) - lu->state = LU_PENDING; + req = __dht_kv_req_get_req(key); + if (req == NULL) + return NULL; - pthread_cleanup_push(cancel_lookup_wait, lu); + if (!dht_req_has_peer(req, cookie)) + return NULL; - while (lu->state == LU_PENDING && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs); + return req; +} - pthread_cleanup_pop(false); +static bool dht_kv_has_req(const uint8_t * key, + uint64_t cookie) +{ + bool found; - if (ret == -ETIMEDOUT) - lu->state = LU_COMPLETE; + pthread_mutex_lock(&dht.reqs.mtx); - state = lu->state; + found = __dht_kv_get_req_peer(key, cookie) != NULL; - pthread_mutex_unlock(&lu->lock); + pthread_mutex_unlock(&dht.reqs.mtx); - return state; + return found; } -static struct kad_req * dht_find_request(struct dht * dht, - dht_msg_t * msg) +/* + * This will filter the peer list for addresses that still need to be + * contacted. + */ +static int dht_kv_update_req(const uint8_t * key, + struct list_head * pl) { - struct list_head * p; + struct dht_req * req; + struct timespec now; - assert(dht); - assert(msg); + assert(key != NULL); + assert(pl != NULL); + assert(!list_is_empty(pl)); - list_for_each(p, &dht->requests) { - struct kad_req * r = list_entry(p, struct kad_req, next); - if (r->cookie == msg->cookie) - return r; - } + clock_gettime(PTHREAD_COND_CLOCK, &now); - return NULL; -} + pthread_mutex_lock(&dht.reqs.mtx); -static struct lookup * dht_find_lookup(struct dht * dht, - uint32_t cookie) -{ - struct list_head * p; - struct list_head * p2; - struct list_head * h2; - - assert(dht); - assert(cookie > 0); - - list_for_each(p, &dht->lookups) { - struct lookup * l = list_entry(p, struct lookup, next); - pthread_mutex_lock(&l->lock); - list_for_each_safe(p2, h2, &l->cookies) { - struct cookie_el * e; - e = list_entry(p2, struct cookie_el, next); - if (e->cookie == cookie) { - list_del(&e->next); - free(e); - pthread_mutex_unlock(&l->lock); - return l; - } + req = __dht_kv_req_get_req(key); + if (req == NULL) { + if (dht.reqs.len == DHT_MAX_REQS) { + log_err(KEY_FMT " Max reqs reached (%zu).", + KEY_VAL(key), dht.reqs.len); + peer_list_destroy(pl); + goto fail_req; } - pthread_mutex_unlock(&l->lock); + req = dht_req_create(key); + if (req == NULL) { + log_err(KEY_FMT "Failed to create req.", KEY_VAL(key)); + goto fail_req; + } + list_add_tail(&req->next, &dht.reqs.list); + ++dht.reqs.len; } - return NULL; + if (req->cache.len > 0) /* Already have values */ + peer_list_destroy(pl); + + dht_req_add_peers(req, pl); + req->t_exp = now.tv_sec + DHT_T_RESP; + + if (dht.reqs.len > DHT_WARN_REQS) { + log_warn("Number of outstanding requests (%zu) exceeds %u.", + dht.reqs.len, DHT_WARN_REQS); + } + + pthread_mutex_unlock(&dht.reqs.mtx); + + return 0; + fail_req: + pthread_mutex_unlock(&dht.reqs.mtx); + return -1; } -static struct val * val_create(uint64_t addr, - time_t exp) +static int dht_kv_respond_req(uint8_t * key, + binary_data_t * vals, + size_t len) { - struct val * v; - struct timespec t; + struct dht_req * req; + struct timespec now; + size_t i; - v = malloc(sizeof(*v)); - if (v == NULL) - return NULL; + assert(key != NULL); + assert(vals != NULL); + assert(len > 0); - list_head_init(&v->next); - v->addr = addr; + clock_gettime(CLOCK_REALTIME_COARSE, &now); - clock_gettime(CLOCK_REALTIME_COARSE, &t); + pthread_mutex_lock(&dht.reqs.mtx); - v->t_exp = t.tv_sec + exp; - v->t_rep = t.tv_sec + KAD_T_REPL; + req = __dht_kv_req_get_req(key); + if (req == NULL) { + log_warn(KEY_FMT " Failed to find req.", KEY_VAL(key)); + goto fail_req; + } + + for (i = 0; i < len; ++i) { + struct val_entry * e; + buffer_t val; + val.data = vals[i].data; + val.len = vals[i].len; + e = val_entry_create(val, now.tv_sec + DHT_T_CACHE); + if (e == NULL) { + log_err(" Failed to create val_entry."); + continue; + } - return v; -} + list_add_tail(&e->next, &req->cache.list); + ++req->cache.len; + } -static void val_destroy(struct val * v) -{ - assert(v); + pthread_cond_broadcast(&dht.reqs.cond); - free(v); + pthread_mutex_unlock(&dht.reqs.mtx); + fail_req: + pthread_mutex_unlock(&dht.reqs.mtx); + return -1; } -static struct ref_entry * ref_entry_create(struct dht * dht, - const uint8_t * key) +static ssize_t dht_kv_wait_req(const uint8_t * key, + buffer_t ** vals) { - struct ref_entry * e; + struct list_head * p; + struct dht_req * req; struct timespec t; +#ifdef __DHT_TEST__ + struct timespec intv = TIMESPEC_INIT_MS(10); +#else + struct timespec intv = TIMESPEC_INIT_S(DHT_T_RESP); +#endif + size_t max; + size_t i = 0; + int ret = 0; - assert(dht); - assert(key); + assert(key != NULL); + assert(vals != NULL); - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; + clock_gettime(PTHREAD_COND_CLOCK, &t); - e->key = dht_dup_key(key, dht->b); - if (e->key == NULL) { - free(e); - return NULL; + ts_add(&t, &intv, &t); + + pthread_mutex_lock(&dht.reqs.mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, &dht.reqs.mtx); + + while ((req = __dht_kv_get_req_cache(key)) == NULL) { + ret = pthread_cond_timedwait(&dht.reqs.cond, &dht.reqs.mtx, &t); + if (ret == ETIMEDOUT) + break; } - clock_gettime(CLOCK_REALTIME_COARSE, &t); + pthread_cleanup_pop(false); - e->t_rep = t.tv_sec + dht->t_repub; + if (ret == ETIMEDOUT) { + log_warn(KEY_FMT " Req timed out.", KEY_VAL(key)); + __dht_kv_req_remove(key); + goto timedout; + } - return e; -} + max = MIN(req->cache.len, DHT_MAX_VALS); + if (max == 0) + goto no_vals; -static void ref_entry_destroy(struct ref_entry * e) -{ - free(e->key); - free(e); + *vals = malloc(max * sizeof(**vals)); + if (*vals == NULL) { + log_err(KEY_FMT "Failed to malloc val buffer.", KEY_VAL(key)); + goto fail_vals; + } + + memset(*vals, 0, max * sizeof(**vals)); + + list_for_each(p, &req->cache.list) { + struct val_entry * v; + if (i == max) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + pthread_mutex_unlock(&dht.reqs.mtx); + + return i; + no_vals: + pthread_mutex_unlock(&dht.reqs.mtx); + return 0; + fail_val_data: + freebufs(*vals, i); + fail_vals: + pthread_mutex_unlock(&dht.reqs.mtx); + return -ENOMEM; + timedout: + pthread_mutex_unlock(&dht.reqs.mtx); + return -ETIMEDOUT; } -static struct dht_entry * dht_entry_create(struct dht * dht, - const uint8_t * key) +static struct bucket * iter_bucket(struct bucket * b, + const uint8_t * id) { - struct dht_entry * e; + uint8_t byte; + uint8_t mask; - assert(dht); - assert(key); + assert(b != NULL); - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; + if (b->children[0] == NULL) + return b; - list_head_init(&e->next); - list_head_init(&e->vals); + byte = id[(b->depth * DHT_BETA) / CHAR_BIT]; - e->n_vals = 0; + mask = ((1L << DHT_BETA) - 1) & 0xFF; - e->key = dht_dup_key(key, dht->b); - if (e->key == NULL) { - free(e); - return NULL; - } + byte >>= (CHAR_BIT - DHT_BETA) - + (((b->depth) * DHT_BETA) & (CHAR_BIT - 1)); - return e; + return iter_bucket(b->children[(byte & mask)], id); } -static void dht_entry_destroy(struct dht_entry * e) +static struct bucket * __dht_kv_get_bucket(const uint8_t * id) +{ + assert(dht.db.contacts.root != NULL); + + return iter_bucket(dht.db.contacts.root, id); +} + +static void contact_list_add(struct list_head * l, + struct contact * c) { struct list_head * p; - struct list_head * h; - assert(e); + assert(l != NULL); + assert(c != NULL); - list_for_each_safe(p, h, &e->vals) { - struct val * v = list_entry(p, struct val, next); - list_del(&v->next); - val_destroy(v); + list_for_each(p, l) { + struct contact * e = list_entry(p, struct contact, next); + if (IS_CLOSER(e->id, c->id)) + continue; } - free(e->key); - - free(e); + list_add_tail(&c->next, p); } -static int dht_entry_add_addr(struct dht_entry * e, - uint64_t addr, - time_t exp) +static ssize_t dht_kv_contact_list(const uint8_t * key, + struct list_head * l, + size_t max) { struct list_head * p; - struct val * val; - struct timespec t; + struct bucket * b; + struct timespec t; + size_t i; + size_t len = 0; + + assert(l != NULL); + assert(key != NULL); + assert(list_is_empty(l)); clock_gettime(CLOCK_REALTIME_COARSE, &t); - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr == addr) { - if (v->t_exp < t.tv_sec + exp) { - v->t_exp = t.tv_sec + exp; - v->t_rep = t.tv_sec + KAD_T_REPL; - } + max = MIN(max, dht.k); - return 0; - } + pthread_rwlock_rdlock(&dht.db.lock); + + b = __dht_kv_get_bucket(key); + if (b == NULL) { + log_err(KEY_FMT " Failed to get bucket.", KEY_VAL(key)); + goto fail_bucket; } - val = val_create(addr, exp); - if (val == NULL) - return -ENOMEM; + b->t_refr = t.tv_sec + dht.t_refresh; - list_add(&val->next, &e->vals); - ++e->n_vals; + if (b->contacts.len == dht.k || b->parent == NULL) { + list_for_each(p, &b->contacts.list) { + struct contact * c; + struct contact * d; + c = list_entry(p, struct contact, next); + if (c->addr == dht.addr) + continue; + d = contact_create(c->id, c->addr); + if (d == NULL) + continue; + contact_list_add(l, d); + if (++len == max) + break; + } + } else { + struct bucket * d = b->parent; + for (i = 0; i < (1L << DHT_BETA) && len < dht.k; ++i) { + list_for_each(p, &d->children[i]->contacts.list) { + struct contact * c; + struct contact * d; + c = list_entry(p, struct contact, next); + if (c->addr == dht.addr) + continue; + d = contact_create(c->id, c->addr); + if (d == NULL) + continue; + contact_list_add(l, d); + if (++len == max) + break; + } + } + } - return 0; -} + pthread_rwlock_unlock(&dht.db.lock); + return len; + fail_bucket: + pthread_rwlock_unlock(&dht.db.lock); + return -1; +} -static void dht_entry_del_addr(struct dht_entry * e, - uint64_t addr) +static void contact_list_destroy(struct list_head * l) { struct list_head * p; struct list_head * h; - assert(e); - - list_for_each_safe(p, h, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr == addr) { - list_del(&v->next); - val_destroy(v); - --e->n_vals; - } - } + assert(l != NULL); - if (e->n_vals == 0) { - list_del(&e->next); - dht_entry_destroy(e); + list_for_each_safe(p, h, l) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); } } -static uint64_t dht_entry_get_addr(struct dht * dht, - struct dht_entry * e) +static ssize_t dht_kv_get_contacts(const uint8_t * key, + dht_contact_msg_t *** msgs) { + struct list_head cl; struct list_head * p; + struct list_head * h; + size_t len; + size_t i = 0; + + assert(key != NULL); + assert(msgs != NULL); - assert(e); - assert(!list_is_empty(&e->vals)); + list_head_init(&cl); - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr != dht->addr) - return v->addr; + len = dht_kv_contact_list(key, &cl, dht.k); + if (len == 0) { + *msgs = NULL; + return 0; } - return 0; -} + *msgs = malloc(len * sizeof(**msgs)); + if (*msgs == NULL) + goto fail_msgs; -/* Forward declaration. */ -static struct lookup * kad_lookup(struct dht * dht, - const uint8_t * key, - enum kad_code code); + list_for_each_safe(p, h, &cl) { + struct contact * c; + (*msgs)[i] = malloc(sizeof(***msgs)); + if ((*msgs)[i] == NULL) + goto fail_contact; + dht_contact_msg__init((*msgs)[i]); + c = list_entry(p, struct contact, next); + list_del(&c->next); + (*msgs)[i]->id.data = c->id; + (*msgs)[i]->id.len = dht.id.len; + (*msgs)[i++]->addr = c->addr; + free(c); + } + + return i; + fail_contact: + while (i-- > 0) + dht_contact_msg__free_unpacked((*msgs)[i], NULL); + free(*msgs); + *msgs = NULL; + fail_msgs: + contact_list_destroy(&cl); + return -ENOMEM; +} /* Build a refresh list. */ -static void bucket_refresh(struct dht * dht, - struct bucket * b, - time_t t, - struct list_head * r) +static void __dht_kv_bucket_refresh_list(struct bucket * b, + time_t t, + struct list_head * r) { - size_t i; + struct contact * c; + struct contact * d; - if (*b->children != NULL) - for (i = 0; i < (1L << KAD_BETA); ++i) - bucket_refresh(dht, b->children[i], t, r); + assert(b != NULL); - if (b->n_contacts == 0) + if (t < b->t_refr) return; - if (t > b->t_refr) { - struct contact * c; - struct contact * d; - c = list_first_entry(&b->contacts, struct contact, next); - d = contact_create(c->id, dht->b, c->addr); + if (*b->children != NULL) { + size_t i; + for (i = 0; i < (1L << DHT_BETA); ++i) + __dht_kv_bucket_refresh_list(b->children[i], t, r); + } + + if (b->contacts.len == 0) + return; + + c = list_first_entry(&b->contacts.list, struct contact, next); + if (t > c->t_seen + dht.t_refresh) { + d = contact_create(c->id, c->addr); if (d != NULL) list_add(&d->next, r); - return; } } - static struct bucket * bucket_create(void) { struct bucket * b; @@ -1300,20 +1680,21 @@ static struct bucket * bucket_create(void) if (b == NULL) return NULL; - list_head_init(&b->contacts); - b->n_contacts = 0; + list_head_init(&b->contacts.list); + b->contacts.len = 0; - list_head_init(&b->alts); - b->n_alts = 0; + list_head_init(&b->alts.list); + b->alts.len = 0; clock_gettime(CLOCK_REALTIME_COARSE, &t); - b->t_refr = t.tv_sec + KAD_T_REFR; + b->t_refr = t.tv_sec + dht.t_refresh; - for (i = 0; i < (1L << KAD_BETA); ++i) + for (i = 0; i < (1L << DHT_BETA); ++i) b->children[i] = NULL; b->parent = NULL; b->depth = 0; + b->mask = 0; return b; } @@ -1324,24 +1705,24 @@ static void bucket_destroy(struct bucket * b) struct list_head * h; size_t i; - assert(b); + assert(b != NULL); - for (i = 0; i < (1L << KAD_BETA); ++i) + for (i = 0; i < (1L << DHT_BETA); ++i) if (b->children[i] != NULL) bucket_destroy(b->children[i]); - list_for_each_safe(p, h, &b->contacts) { + list_for_each_safe(p, h, &b->contacts.list) { struct contact * c = list_entry(p, struct contact, next); list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->contacts.len; } - list_for_each_safe(p, h, &b->alts) { + list_for_each_safe(p, h, &b->alts.list) { struct contact * c = list_entry(p, struct contact, next); list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->alts.len; } free(b); @@ -1356,1537 +1737,2297 @@ static bool bucket_has_id(struct bucket * b, if (b->depth == 0) return true; - byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + byte = id[(b->depth * DHT_BETA) / CHAR_BIT]; - mask = ((1L << KAD_BETA) - 1) & 0xFF; + mask = ((1L << DHT_BETA) - 1) & 0xFF; - byte >>= (CHAR_BIT - KAD_BETA) - - (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + byte >>= (CHAR_BIT - DHT_BETA) - + (((b->depth - 1) * DHT_BETA) & (CHAR_BIT - 1)); return ((byte & mask) == b->mask); } -static int split_bucket(struct bucket * b) +static int move_contacts(struct bucket * b, + struct bucket * c) { struct list_head * p; struct list_head * h; + struct contact * d; + + assert(b != NULL); + assert(c != NULL); + + list_for_each_safe(p, h, &b->contacts.list) { + d = list_entry(p, struct contact, next); + if (bucket_has_id(c, d->id)) { + list_del(&d->next); + --b->contacts.len; + list_add_tail(&d->next, &c->contacts.list); + ++c->contacts.len; + } + } + + return 0; +} + +static int split_bucket(struct bucket * b) +{ uint8_t mask = 0; size_t i; - size_t c; + size_t b_len; assert(b); - assert(b->n_alts == 0); - assert(b->n_contacts); + assert(b->alts.len == 0); + assert(b->contacts.len != 0); assert(b->children[0] == NULL); - c = b->n_contacts; + b_len = b->contacts.len; - for (i = 0; i < (1L << KAD_BETA); ++i) { + for (i = 0; i < (1L << DHT_BETA); ++i) { b->children[i] = bucket_create(); - if (b->children[i] == NULL) { - size_t j; - for (j = 0; j < i; ++j) - bucket_destroy(b->children[j]); - return -1; - } + if (b->children[i] == NULL) + goto fail_child; b->children[i]->depth = b->depth + 1; b->children[i]->mask = mask; b->children[i]->parent = b; - list_for_each_safe(p, h, &b->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - if (bucket_has_id(b->children[i], c->id)) { - list_del(&c->next); - --b->n_contacts; - list_add(&c->next, &b->children[i]->contacts); - ++b->children[i]->n_contacts; - } - } + move_contacts(b, b->children[i]); mask++; } - for (i = 0; i < (1L << KAD_BETA); ++i) - if (b->children[i]->n_contacts == c) + for (i = 0; i < (1L << DHT_BETA); ++i) + if (b->children[i]->contacts.len == b_len) split_bucket(b->children[i]); return 0; + fail_child: + while (i-- > 0) + bucket_destroy(b->children[i]); + return -1; } -/* Locked externally to mandate update as (final) part of join transaction. */ -static int dht_update_bucket(struct dht * dht, - const uint8_t * id, - uint64_t addr) +static int dht_kv_update_contacts(const uint8_t * id, + uint64_t addr) { struct list_head * p; struct list_head * h; struct bucket * b; struct contact * c; - assert(dht); + assert(id != NULL); + assert(addr != INVALID_ADDR); - b = dht_get_bucket(dht, id); - if (b == NULL) - return -1; + pthread_rwlock_wrlock(&dht.db.lock); - c = contact_create(id, dht->b, addr); - if (c == NULL) - return -1; + b = __dht_kv_get_bucket(id); + if (b == NULL) { + log_err(PEER_FMT " Failed to get bucket.", PEER_VAL(id, addr)); + goto fail_update; + } + + c = contact_create(id, addr); + if (c == NULL) { + log_err(PEER_FMT " Failed to create contact.", + PEER_VAL(id, addr)); + goto fail_update; + } - list_for_each_safe(p, h, &b->contacts) { + list_for_each_safe(p, h, &b->contacts.list) { struct contact * d = list_entry(p, struct contact, next); if (d->addr == addr) { list_del(&d->next); contact_destroy(d); - --b->n_contacts; + --b->contacts.len; } } - if (b->n_contacts == dht->k) { - if (bucket_has_id(b, dht->id)) { - list_add_tail(&c->next, &b->contacts); - ++b->n_contacts; + if (b->contacts.len == dht.k) { + if (bucket_has_id(b, dht.id.data)) { + list_add_tail(&c->next, &b->contacts.list); + ++b->contacts.len; if (split_bucket(b)) { list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->contacts.len; } - } else if (b->n_alts == dht->k) { + } else if (b->alts.len == dht.k) { struct contact * d; - d = list_first_entry(&b->alts, struct contact, next); + d = list_first_entry(&b->alts.list, + struct contact, next); list_del(&d->next); contact_destroy(d); - list_add_tail(&c->next, &b->alts); + list_add_tail(&c->next, &b->alts.list); + ++b->alts.len; } else { - list_add_tail(&c->next, &b->alts); - ++b->n_alts; + list_add_tail(&c->next, &b->alts.list); + ++b->alts.len; } } else { - list_add_tail(&c->next, &b->contacts); - ++b->n_contacts; + list_add_tail(&c->next, &b->contacts.list); + ++b->contacts.len; } + pthread_rwlock_unlock(&dht.db.lock); + return 0; + fail_update: + pthread_rwlock_unlock(&dht.db.lock); + return -1; } -static int send_msg(struct dht * dht, - dht_msg_t * msg, - uint64_t addr) +static time_t gcd(time_t a, + time_t b) { -#ifndef __DHT_TEST__ - struct shm_du_buff * sdb; - size_t len; -#endif - int retr = 0; + if (a == 0) + return b; - if (msg->code == KAD_RESPONSE) - retr = KAD_RESP_RETR; + return gcd(b % a, a); +} - pthread_rwlock_wrlock(&dht->lock); +static dht_contact_msg_t * dht_kv_src_contact_msg(void) +{ + dht_contact_msg_t * src; - if (dht->id != NULL) { - msg->has_s_id = true; - msg->s_id.data = dht->id; - msg->s_id.len = dht->b; - } + src = malloc(sizeof(*src)); + if (src == NULL) + goto fail_malloc; - msg->s_addr = dht->addr; + dht_contact_msg__init(src); - if (msg->code < KAD_STORE) { - msg->cookie = bmp_allocate(dht->cookies); - if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { - pthread_rwlock_unlock(&dht->lock); - goto fail_bmp_alloc; - } - } + src->id.data = dht_dup_key(dht.id.data); + if (src->id.data == NULL) + goto fail_id; - pthread_rwlock_unlock(&dht->lock); + src->id.len = dht.id.len; + src->addr = dht.addr; -#ifndef __DHT_TEST__ - len = dht_msg__get_packed_size(msg); - if (len == 0) - goto fail_msg; + return src; + fail_id: + dht_contact_msg__free_unpacked(src, NULL); + fail_malloc: + return NULL; +} - while (true) { - if (ipcp_sdb_reserve(&sdb, len)) - goto fail_msg; +static dht_msg_t * dht_kv_find_req_msg(const uint8_t * key, + enum dht_code code) +{ + dht_msg_t * msg; - dht_msg__pack(msg, shm_du_buff_head(sdb)); + assert(key != NULL); - if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) - break; + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; - ipcp_sdb_release(sdb); + dht_msg__init(msg); + msg->code = code; - sleep(1); + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; - if (--retr < 0) - goto fail_msg; - } + msg->find = malloc(sizeof(*msg->find)); + if (msg->find == NULL) + goto fail_msg; -#else - (void) addr; - (void) retr; -#endif /* __DHT_TEST__ */ + dht_find_req_msg__init(msg->find); - if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) - kad_req_create(dht, msg, addr); + msg->find->key.data = dht_dup_key(key); + if (msg->find->key.data == NULL) + goto fail_msg; + + msg->find->key.len = dht.id.len; + msg->find->cookie = DHT_INVALID; + + return msg; - return msg->cookie; -#ifndef __DHT_TEST__ fail_msg: - pthread_rwlock_wrlock(&dht->lock); - bmp_release(dht->cookies, msg->cookie); - pthread_rwlock_unlock(&dht->lock); -#endif /* !__DHT_TEST__ */ - fail_bmp_alloc: - return -1; + dht_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; } -static struct dht_entry * dht_find_entry(struct dht * dht, - const uint8_t * key) +static dht_msg_t * dht_kv_find_node_req_msg(const uint8_t * key) { - struct list_head * p; + return dht_kv_find_req_msg(key, DHT_FIND_NODE_REQ); +} - list_for_each(p, &dht->entries) { - struct dht_entry * e = list_entry(p, struct dht_entry, next); - if (!memcmp(key, e->key, dht->b)) - return e; +static dht_msg_t * dht_kv_find_value_req_msg(const uint8_t * key) +{ + return dht_kv_find_req_msg(key, DHT_FIND_VALUE_REQ); +} + +static dht_msg_t * dht_kv_find_node_rsp_msg(uint8_t * key, + uint64_t cookie, + dht_contact_msg_t *** contacts, + size_t len) +{ + dht_msg_t * msg; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + dht_msg__init(msg); + msg->code = DHT_FIND_NODE_RSP; + + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; + + msg->node = malloc(sizeof(*msg->node)); + if (msg->node == NULL) + goto fail_msg; + + dht_find_node_rsp_msg__init(msg->node); + + msg->node->key.data = dht_dup_key(key); + if (msg->node->key.data == NULL) + goto fail_msg; + + msg->node->cookie = cookie; + msg->node->key.len = dht.id.len; + msg->node->n_contacts = len; + if (len != 0) { /* Steal the ptr */ + msg->node->contacts = *contacts; + *contacts = NULL; } + return msg; + + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_malloc: return NULL; } -static int kad_add(struct dht * dht, - const dht_contact_msg_t * contacts, - ssize_t n, - time_t exp) +static dht_msg_t * dht_kv_find_value_rsp_msg(uint8_t * key, + uint64_t cookie, + dht_contact_msg_t *** contacts, + size_t n_contacts, + buffer_t ** vals, + size_t n_vals) { - struct dht_entry * e; + dht_msg_t * msg; - pthread_rwlock_wrlock(&dht->lock); + msg = dht_kv_find_node_rsp_msg(key, cookie, contacts, n_contacts); + if (msg == NULL) + goto fail_node_rsp; - while (n-- > 0) { - if (contacts[n].id.len != dht->b) - log_warn("Bad key length in contact data."); + msg->code = DHT_FIND_VALUE_RSP; - e = dht_find_entry(dht, contacts[n].id.data); - if (e != NULL) { - if (dht_entry_add_addr(e, contacts[n].addr, exp)) - goto fail; - } else { - e = dht_entry_create(dht, contacts[n].id.data); - if (e == NULL) - goto fail; + msg->val = malloc(sizeof(*msg->val)); + if (msg->val == NULL) + goto fail_msg; - if (dht_entry_add_addr(e, contacts[n].addr, exp)) { - dht_entry_destroy(e); - goto fail; - } + dht_find_value_rsp_msg__init(msg->val); - list_add(&e->next, &dht->entries); - } - } + msg->val->n_values = n_vals; + if (n_vals != 0) /* Steal the ptr */ + msg->val->values = (binary_data_t *) *vals; - pthread_rwlock_unlock(&dht->lock); - return 0; + return msg; - fail: - pthread_rwlock_unlock(&dht->lock); - return -ENOMEM; + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_node_rsp: + return NULL; } -static int wait_resp(struct dht * dht, - dht_msg_t * msg, - time_t timeo) +static dht_msg_t * dht_kv_store_msg(const uint8_t * key, + const buffer_t val, + time_t exp) { - struct kad_req * req; + dht_msg_t * msg; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); - assert(dht); - assert(msg); + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; - pthread_rwlock_rdlock(&dht->lock); + dht_msg__init(msg); - req = dht_find_request(dht, msg); - if (req == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -EPERM; - } + msg->code = DHT_STORE; + + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; + + msg->store = malloc(sizeof(*msg->store)); + if (msg->store == NULL) + goto fail_msg; + + dht_store_msg__init(msg->store); + + msg->store->key.data = dht_dup_key(key); + if (msg->store->key.data == NULL) + goto fail_msg; + + msg->store->key.len = dht.id.len; + msg->store->val.data = malloc(val.len); + if (msg->store->val.data == NULL) + goto fail_msg; + + memcpy(msg->store->val.data, val.data, val.len); + + msg->store->val.len = val.len; + msg->store->exp = exp; - pthread_rwlock_unlock(&dht->lock); + return msg; - return kad_req_wait(req, timeo); + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; } -static int kad_store(struct dht * dht, - const uint8_t * key, - uint64_t addr, - uint64_t r_addr, - time_t ttl) +static ssize_t dht_kv_retrieve(const uint8_t * key, + buffer_t ** vals) { - dht_msg_t msg = DHT_MSG__INIT; - dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT; - dht_contact_msg_t * cmsgp[1]; + struct dht_entry * e; + struct list_head * p; + size_t n; + size_t i; - cmsg.id.data = (uint8_t *) key; - cmsg.addr = addr; + assert(key != NULL); - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.db.lock); - cmsg.id.len = dht->b; + e = __dht_kv_find_entry(key); + if (e == NULL) + goto no_vals; - pthread_rwlock_unlock(&dht->lock); + n = MIN(DHT_MAX_VALS, e->vals.len + e->lvals.len); + if (n == 0) + goto no_vals; - cmsgp[0] = &cmsg; + *vals = malloc(n * sizeof(**vals)); + if (*vals == NULL) + goto fail_vals; - msg.code = KAD_STORE; - msg.has_t_expire = true; - msg.t_expire = ttl; - msg.n_contacts = 1; - msg.contacts = cmsgp; + memset(*vals, 0, n * sizeof(**vals)); - if (send_msg(dht, &msg, r_addr) < 0) - return -1; + i = 0; + + list_for_each(p, &e->vals.list) { + struct val_entry * v; + if (i == n) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + list_for_each(p, &e->lvals.list) { + struct val_entry * v; + if (i == n) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + pthread_rwlock_unlock(&dht.db.lock); + + return (ssize_t) i; + fail_val_data: + pthread_rwlock_unlock(&dht.db.lock); + freebufs(*vals, i); + *vals = NULL; + return -ENOMEM; + fail_vals: + pthread_rwlock_unlock(&dht.db.lock); + return -ENOMEM; + no_vals: + pthread_rwlock_unlock(&dht.db.lock); + *vals = NULL; return 0; } -static ssize_t kad_find(struct dht * dht, - struct lookup * lu, - const uint64_t * addrs, - enum kad_code code) +static void __cleanup_dht_msg(void * msg) +{ + dht_msg__free_unpacked((dht_msg_t *) msg, NULL); +} + +#ifdef DEBUG_PROTO_DHT +static void dht_kv_debug_msg(dht_msg_t * msg) { - dht_msg_t msg = DHT_MSG__INIT; - ssize_t sent = 0; + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + time_t stamp; + size_t i; - assert(dht); - assert(lu->key); + if (msg == NULL) + return; - msg.code = code; + pthread_cleanup_push(__cleanup_dht_msg, msg); + + switch (msg->code) { + case DHT_STORE: + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->store->key.data), + msg->store->key.len); + log_proto(" val: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->store->val.data), + msg->store->val.len); + stamp = msg->store->exp; + tm = gmtime(&stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + log_proto(" exp: %s.", tmstr); + break; + case DHT_FIND_NODE_REQ: + /* FALLTHRU */ + case DHT_FIND_VALUE_REQ: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->find->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->find->key.data), + msg->find->key.len); + break; + case DHT_FIND_VALUE_RSP: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->node->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->node->key.data), + msg->node->key.len); + log_proto(" values: [%zd]", msg->val->n_values); + for (i = 0; i < msg->val->n_values; i++) + log_proto(" " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->val->values[i].data), + msg->val->values[i].len); + log_proto(" contacts: [%zd]", msg->node->n_contacts); + for (i = 0; i < msg->node->n_contacts; i++) { + dht_contact_msg_t * c = msg->node->contacts[i]; + log_proto(" " PEER_FMT, + PEER_VAL(c->id.data, c->addr)); + } + break; + case DHT_FIND_NODE_RSP: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->node->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->node->key.data), msg->node->key.len); + log_proto(" contacts: [%zd]", msg->node->n_contacts); + for (i = 0; i < msg->node->n_contacts; i++) { + dht_contact_msg_t * c = msg->node->contacts[i]; + log_proto(" " PEER_FMT, + PEER_VAL(c->id.data, c->addr)); + } - msg.has_key = true; - msg.key.data = (uint8_t *) lu->key; - msg.key.len = dht->b; + break; + default: + break; + } - while (*addrs != 0) { - struct cookie_el * c; - int ret; + pthread_cleanup_pop(false); +} - if (*addrs == dht->addr) { - ++addrs; - continue; - } +static void dht_kv_debug_msg_snd(dht_msg_t * msg, + uint8_t * id, + uint64_t addr) +{ + if (msg == NULL) + return; - ret = send_msg(dht, &msg, *addrs); - if (ret < 0) - break; + log_proto(TX_HDR_FMT ".", TX_HDR_VAL(msg, id, addr)); - c = malloc(sizeof(*c)); - if (c == NULL) - break; + dht_kv_debug_msg(msg); +} + +static void dht_kv_debug_msg_rcv(dht_msg_t * msg) +{ + if (msg == NULL) + return; + + log_proto(RX_HDR_FMT ".", RX_HDR_VAL(msg)); + + dht_kv_debug_msg(msg); +} +#endif + +#ifndef __DHT_TEST__ +static int dht_send_msg(dht_msg_t * msg, + uint64_t addr) +{ + size_t len; + struct shm_du_buff * sdb; + + if (msg == NULL) + return 0; + + assert(addr != INVALID_ADDR && addr != dht.addr); + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + log_warn("%s failed to pack.", DHT_CODE(msg)); + goto fail_msg; + } + + if (ipcp_sdb_reserve(&sdb, len)) { + log_warn("%s failed to get sdb.", DHT_CODE(msg)); + goto fail_msg; + } + + dht_msg__pack(msg, shm_du_buff_head(sdb)); + + if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) < 0) { + log_warn("%s write failed", DHT_CODE(msg)); + goto fail_send; + } - c->cookie = (uint32_t) ret; + return 0; + fail_send: + ipcp_sdb_release(sdb); + fail_msg: + return -1; +} +#else /* funtion for testing */ +static int dht_send_msg(dht_msg_t * msg, + uint64_t addr) +{ + buffer_t buf; + + assert(msg != NULL); + assert(addr != INVALID_ADDR && addr != dht.addr); - pthread_mutex_lock(&lu->lock); + buf.len = dht_msg__get_packed_size(msg); + if (buf.len == 0) { + log_warn("%s failed to pack.", DHT_CODE(msg)); + goto fail_msg; + } - list_add_tail(&c->next, &lu->cookies); + buf.data = malloc(buf.len); + if (buf.data == NULL) { + log_warn("%s failed to malloc buf.", DHT_CODE(msg)); + goto fail_msg; + } - pthread_mutex_unlock(&lu->lock); + dht_msg__pack(msg, buf.data); - ++sent; - ++addrs; + if (sink_send_msg(&buf, addr) < 0) { + log_warn("%s write failed", DHT_CODE(msg)); + goto fail_send; } - return sent; + return 0; + fail_send: + freebuf(buf); + fail_msg: + return -1; } +#endif /* __DHT_TEST__ */ -static void lookup_detach(struct dht * dht, - struct lookup * lu) +static void __cleanup_peer_list(void * pl) { - pthread_rwlock_wrlock(&dht->lock); + struct list_head * p; + struct list_head * h; - list_del(&lu->next); + assert(pl != NULL); - pthread_rwlock_unlock(&dht->lock); + list_for_each_safe(p, h, (struct list_head *) pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + list_del(&e->next); + free(e->id); + free(e); + } } -static struct lookup * kad_lookup(struct dht * dht, - const uint8_t * id, - enum kad_code code) + +static int dht_kv_send_msgs(dht_msg_t * msg, + struct list_head * pl) { - uint64_t addrs[KAD_ALPHA + 1]; - enum lookup_state state; - struct lookup * lu; + struct list_head * p; + struct list_head * h; - lu = lookup_create(dht, id); - if (lu == NULL) - return NULL; + pthread_cleanup_push(__cleanup_dht_msg, msg); + pthread_cleanup_push(__cleanup_peer_list, pl); - lookup_new_addrs(lu, addrs); + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + if (IS_REQUEST(msg->code)) { + msg->find->cookie = e->cookie; + assert(msg->find->cookie != DHT_INVALID); + } + if (dht_send_msg(msg, e->addr) < 0) + continue; - if (addrs[0] == 0) { - lookup_detach(dht, lu); - lookup_destroy(lu); - return NULL; +#ifdef DEBUG_PROTO_DHT + dht_kv_debug_msg_snd(msg, e->id, e->addr); +#endif + list_del(&e->next); + free(e->id); + free(e); } - if (kad_find(dht, lu, addrs, code) == 0) { - lookup_detach(dht, lu); - return lu; + pthread_cleanup_pop(false); + pthread_cleanup_pop(false); + + return list_is_empty(pl) ? 0 : -1; +} + +static int dht_kv_get_peer_list_for_msg(dht_msg_t * msg, + struct list_head * pl) +{ + struct list_head cl; /* contact list */ + uint8_t * key; /* key in the request */ + size_t max; + + assert(msg != NULL); + + assert(list_is_empty(pl)); + + max = msg->code == DHT_STORE ? dht.k : dht.alpha; + + switch (msg->code) { + case DHT_FIND_NODE_REQ: + /* FALLTHRU */ + case DHT_FIND_VALUE_REQ: + key = msg->find->key.data; + break; + case DHT_STORE: + key = msg->store->key.data; + break; + default: + log_err("Invalid DHT msg code (%d).", msg->code); + return -1; } - while ((state = lookup_wait(lu)) != LU_COMPLETE) { - switch (state) { - case LU_UPDATE: - lookup_new_addrs(lu, addrs); - if (addrs[0] == 0) - break; + list_head_init(&cl); - kad_find(dht, lu, addrs, code); - break; - case LU_DESTROY: - lookup_detach(dht, lu); - lookup_set_state(lu, LU_NULL); - return NULL; - default: - break; - } + if (dht_kv_contact_list(key, &cl, max) < 0) { + log_err(KEY_FMT " Failed to get contact list.", KEY_VAL(key)); + goto fail_contacts; } - assert(state == LU_COMPLETE); + if (list_is_empty(&cl)) { + log_warn(KEY_FMT " No available contacts.", KEY_VAL(key)); + goto fail_contacts; + } - lookup_detach(dht, lu); + if (dht_kv_create_peer_list(&cl, pl, msg->code) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peers; + } - return lu; + contact_list_destroy(&cl); + return 0; + fail_peers: + contact_list_destroy(&cl); + fail_contacts: + return -1; } -static void kad_publish(struct dht * dht, - const uint8_t * key, - uint64_t addr, - time_t exp) +static int dht_kv_store_remote(const uint8_t * key, + const buffer_t val, + time_t exp) { - struct lookup * lu; - uint64_t * addrs; - ssize_t n; - size_t k; - time_t t_expire; + dht_msg_t * msg; + struct timespec now; + struct list_head pl; + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); - assert(dht); - assert(key); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_rwlock_rdlock(&dht->lock); + msg = dht_kv_store_msg(key, val, exp); + if (msg == NULL) { + log_err(KV_FMT " Failed to create %s.", + KV_VAL(key, val), dht_code_str[DHT_STORE]); + goto fail_msg; + } - k = dht->k; - t_expire = dht->t_expire; + list_head_init(&pl); - pthread_rwlock_unlock(&dht->lock); + if (dht_kv_get_peer_list_for_msg(msg, &pl) < 0) { + log_dbg(KV_FMT " Failed to get peer list.", KV_VAL(key, val)); + goto fail_peer_list; + } - addrs = malloc(k * sizeof(*addrs)); - if (addrs == NULL) - return; + if (dht_kv_send_msgs(msg, &pl) < 0) { + log_warn(KV_FMT " Failed to send any %s msg.", + KV_VAL(key, val), DHT_CODE(msg)); + goto fail_msgs; + } - lu = kad_lookup(dht, key, KAD_FIND_NODE); - if (lu == NULL) { - free(addrs); - return; + dht_msg__free_unpacked(msg, NULL); + + return 0; + fail_msgs: + peer_list_destroy(&pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + return -1; +} + +/* recursive lookup, start with pl NULL */ +static int dht_kv_query_contacts(const uint8_t * key, + struct list_head * pl) +{ + struct list_head p; + + dht_msg_t * msg; + + assert(key != NULL); + + msg = dht_kv_find_node_req_msg(key); + if (msg == NULL) { + log_err(KEY_FMT " Failed to create %s msg.", + KEY_VAL(key), dht_code_str[DHT_FIND_NODE_REQ]); + goto fail_msg; } - n = lookup_contact_addrs(lu, addrs); + if (pl == NULL) { + list_head_init(&p); + pl = &p; + } - while (n-- > 0) { - if (addrs[n] == dht->addr) { - dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT; - msg.id.data = (uint8_t *) key; - msg.id.len = dht->b; - msg.addr = addr; - kad_add(dht, &msg, 1, exp); - } else { - if (kad_store(dht, key, addr, addrs[n], t_expire)) - log_warn("Failed to send store message."); - } + if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peer_list; } - lookup_destroy(lu); + if (dht_kv_update_req(key, pl) < 0) { + log_warn(KEY_FMT " Failed to update req.", KEY_VAL(key)); + goto fail_update; + } - free(addrs); + if (dht_kv_send_msgs(msg, pl)) { + log_warn(KEY_FMT " Failed to send any %s msg.", + KEY_VAL(key), DHT_CODE(msg)); + goto fail_update; + } + + dht_msg__free_unpacked(msg, NULL); + + return 0; + fail_update: + peer_list_destroy(pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + return -1; } -static int kad_join(struct dht * dht, - uint64_t addr) +/* recursive lookup, start with pl NULL */ +static ssize_t dht_kv_query_remote(const uint8_t * key, + buffer_t ** vals, + struct list_head * pl) { - dht_msg_t msg = DHT_MSG__INIT; + struct list_head p; + dht_msg_t * msg; - msg.code = KAD_JOIN; + assert(key != NULL); - msg.has_alpha = true; - msg.has_b = true; - msg.has_k = true; - msg.has_t_refresh = true; - msg.has_t_replicate = true; - msg.alpha = KAD_ALPHA; - msg.k = KAD_K; - msg.t_refresh = KAD_T_REFR; - msg.t_replicate = KAD_T_REPL; + msg = dht_kv_find_value_req_msg(key); + if (msg == NULL) { + log_err(KEY_FMT " Failed to create value req.", KEY_VAL(key)); + goto fail_msg; + } - pthread_rwlock_rdlock(&dht->lock); + if (pl == NULL) { + list_head_init(&p); + pl = &p; + } - msg.b = dht->b; + if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peer_list; + } - pthread_rwlock_unlock(&dht->lock); + if (dht_kv_update_req(key, pl) < 0) { + log_err(KEY_FMT " Failed to update request.", KEY_VAL(key)); + goto fail_update; + } - if (send_msg(dht, &msg, addr) < 0) - return -1; + if (dht_kv_send_msgs(msg, pl)) { + log_warn(KEY_FMT " Failed to send %s msg.", + KEY_VAL(key), DHT_CODE(msg)); + goto fail_update; + } - if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) - return -1; + dht_msg__free_unpacked(msg, NULL); - dht->id = create_id(dht->b); - if (dht->id == NULL) - return -1; + if (vals == NULL) /* recursive lookup, already waiting */ + return 0; - pthread_rwlock_wrlock(&dht->lock); + return dht_kv_wait_req(key, vals); + fail_update: + peer_list_destroy(pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + return -1; +} - dht_update_bucket(dht, dht->id, dht->addr); +static void __add_dht_kv_entry(struct dht_entry * e) +{ + struct list_head * p; - pthread_rwlock_unlock(&dht->lock); + assert(e != NULL); - return 0; + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * d = list_entry(p, struct dht_entry, next); + if (IS_CLOSER(d->key, e->key)) + continue; + break; + } + + list_add_tail(&e->next, p); + ++dht.db.kv.len; } -static void dht_dead_peer(struct dht * dht, - uint8_t * key, - uint64_t addr) +/* incoming store message */ +static int dht_kv_store(const uint8_t * key, + const buffer_t val, + time_t exp) { - struct list_head * p; - struct list_head * h; - struct bucket * b; + struct dht_entry * e; + bool new = false; - b = dht_get_bucket(dht, key); + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); - list_for_each_safe(p, h, &b->contacts) { - struct contact * c = list_entry(p, struct contact, next); - if (b->n_contacts + b->n_alts <= dht->k) { - ++c->fails; - return; - } + pthread_rwlock_wrlock(&dht.db.lock); - if (c->addr == addr) { - list_del(&c->next); - contact_destroy(c); - --b->n_contacts; - break; - } + e = __dht_kv_find_entry(key); + if (e == NULL) { + log_dbg(KV_FMT " Adding entry (store).", KV_VAL(key, val)); + e = dht_entry_create(key); + if (e == NULL) + goto fail; + + new = true; + + __add_dht_kv_entry(e); } - while (b->n_contacts < dht->k && b->n_alts > 0) { - struct contact * c; - c = list_first_entry(&b->alts, struct contact, next); - list_del(&c->next); - --b->n_alts; - list_add(&c->next, &b->contacts); - ++b->n_contacts; + if (dht_entry_update_val(e, val, exp) < 0) + goto fail_add; + + pthread_rwlock_unlock(&dht.db.lock); + + return 0; + fail_add: + if (new) { + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; } + fail: + pthread_rwlock_unlock(&dht.db.lock); + return -1; } -static int dht_del(struct dht * dht, - const uint8_t * key, - uint64_t addr) +static int dht_kv_publish(const uint8_t * key, + const buffer_t val) { struct dht_entry * e; + struct timespec now; + bool new = false; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&dht.db.lock); - e = dht_find_entry(dht, key); + e = __dht_kv_find_entry(key); if (e == NULL) { - return -EPERM; + log_dbg(KV_FMT " Adding entry (publish).", KV_VAL(key, val)); + e = dht_entry_create(key); + if (e == NULL) + goto fail; + + __add_dht_kv_entry(e); + new = true; } - dht_entry_del_addr(e, addr); + if (dht_entry_update_lval(e, val) < 0) + goto fail_add; + + pthread_rwlock_unlock(&dht.db.lock); + + dht_kv_store_remote(key, val, now.tv_sec + dht.t_expire); return 0; + fail_add: + if (new) { + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; + } + fail: + pthread_rwlock_unlock(&dht.db.lock); + return -1; } -static buffer_t dht_retrieve(struct dht * dht, - const uint8_t * key) +static int dht_kv_unpublish(const uint8_t * key, + const buffer_t val) { struct dht_entry * e; - struct list_head * p; - buffer_t buf; - uint64_t * pos; - size_t addrs = 0; + int rc; - pthread_rwlock_rdlock(&dht->lock); + assert(key != NULL); - e = dht_find_entry(dht, key); + pthread_rwlock_wrlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); if (e == NULL) - goto fail; + goto no_entry; - buf.len = MIN(DHT_RETR_ADDR, e->n_vals); - if (buf.len == 0) - goto fail; + rc = dht_entry_remove_lval(e, val); - pos = malloc(sizeof(dht->addr) * buf.len); - if (pos == NULL) - goto fail; + pthread_rwlock_unlock(&dht.db.lock); - buf.data = (uint8_t *) pos; + return rc; + no_entry: + pthread_rwlock_unlock(&dht.db.lock); + return -ENOENT; - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - *pos++ = v->addr; - if (++addrs >= buf.len) - break; +} + +/* message validation */ +static int dht_kv_validate_store_msg(const dht_store_msg_t * store) +{ + if (store == NULL) { + log_warn("Store in msg is NULL."); + return -EINVAL; } - pthread_rwlock_unlock(&dht->lock); + if (store->key.data == NULL || store->key.len == 0) { + log_warn("Invalid key in DHT store msg."); + return -EINVAL; + } - return buf; + if (store->key.len != dht.id.len) { + log_warn("Invalid key length in DHT store msg."); + return -EINVAL; + } - fail: - pthread_rwlock_unlock(&dht->lock); - buf.len = 0; - buf.data = NULL; - return buf; + if (store->val.data == NULL || store->val.len == 0) { + log_warn("Invalid value in DHT store msg."); + return -EINVAL; + } + + return 0; } -static ssize_t dht_get_contacts(struct dht * dht, - const uint8_t * key, - dht_contact_msg_t *** msgs) +static int validate_find_req_msg(const dht_find_req_msg_t * req) { - struct list_head l; - struct list_head * p; - struct list_head * h; - size_t len; - size_t i = 0; + if (req == NULL) { + log_warn("Request in msg is NULL."); + return -EINVAL; + } - list_head_init(&l); + if (req->key.data == NULL || req->key.len == 0) { + log_warn("Find request without key."); + return -EINVAL; + } - pthread_rwlock_wrlock(&dht->lock); + if (req->key.len != dht.id.len) { + log_warn("Invalid key length in request msg."); + return -EINVAL; + } - len = dht_contact_list(dht, &l, key); - if (len == 0) { - pthread_rwlock_unlock(&dht->lock); - *msgs = NULL; + return 0; +} + +static int validate_node_rsp_msg(const dht_find_node_rsp_msg_t * rsp) +{ + if (rsp == NULL) { + log_warn("Node rsp in msg is NULL."); + return -EINVAL; + } + + if (rsp->key.data == NULL) { + log_warn("Invalid key in DHT response msg."); + return -EINVAL; + } + + if (rsp->key.len != dht.id.len) { + log_warn("Invalid key length in DHT response msg."); + return -EINVAL; + } + + if (!dht_kv_has_req(rsp->key.data, rsp->cookie)) { + log_warn(KEY_FMT " No request " CK_FMT ".", + KEY_VAL(rsp->key.data), CK_VAL(rsp->cookie)); + + return -EINVAL; + } + + return 0; +} + +static int validate_value_rsp_msg(const dht_find_value_rsp_msg_t * rsp) +{ + if (rsp == NULL) { + log_warn("Invalid DHT find value response msg."); + return -EINVAL; + } + + if (rsp->values == NULL && rsp->n_values > 0) { + log_dbg("No values in DHT response msg."); return 0; } - *msgs = malloc(len * sizeof(**msgs)); - if (*msgs == NULL) { - pthread_rwlock_unlock(&dht->lock); + if (rsp->n_values == 0 && rsp->values != NULL) { + log_dbg("DHT response did not set values NULL."); return 0; } - list_for_each_safe(p, h, &l) { - struct contact * c = list_entry(p, struct contact, next); - (*msgs)[i] = malloc(sizeof(***msgs)); - if ((*msgs)[i] == NULL) { - pthread_rwlock_unlock(&dht->lock); - while (i > 0) - free(*msgs[--i]); - free(*msgs); - *msgs = NULL; - return 0; - } + return 0; +} - dht_contact_msg__init((*msgs)[i]); +static int dht_kv_validate_msg(dht_msg_t * msg) +{ - (*msgs)[i]->id.data = c->id; - (*msgs)[i]->id.len = dht->b; - (*msgs)[i++]->addr = c->addr; - list_del(&c->next); - free(c); + assert(msg != NULL); + + if (msg->src->id.len != dht.id.len) { + log_warn("%s Invalid source contact ID.", DHT_CODE(msg)); + return -EINVAL; } - pthread_rwlock_unlock(&dht->lock); + if (msg->src->addr == INVALID_ADDR) { + log_warn("%s Invalid source address.", DHT_CODE(msg)); + return -EINVAL; + } - return i; + switch (msg->code) { + case DHT_FIND_VALUE_REQ: + /* FALLTHRU */ + case DHT_FIND_NODE_REQ: + if (validate_find_req_msg(msg->find) < 0) + return -EINVAL; + break; + case DHT_FIND_VALUE_RSP: + if (validate_value_rsp_msg(msg->val) < 0) + return -EINVAL; + /* FALLTHRU */ + case DHT_FIND_NODE_RSP: + if (validate_node_rsp_msg(msg->node) < 0) + return -EINVAL; + break; + case DHT_STORE: + if (dht_kv_validate_store_msg(msg->store) < 0) + return -EINVAL; + break; + default: + log_warn("Invalid DHT msg code (%d).", msg->code); + return -ENOENT; + } + + return 0; } -static time_t gcd(time_t a, - time_t b) +static void do_dht_kv_store(const dht_store_msg_t * store) { - if (a == 0) - return b; + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + buffer_t val; + uint8_t * key; + time_t exp; - return gcd(b % a, a); + assert(store != NULL); + + val.data = store->val.data; + val.len = store->val.len; + key = store->key.data; + exp = store->exp; + + if (dht_kv_store(store->key.data, val, store->exp) < 0) { + log_err(KV_FMT " Failed to store.", KV_VAL(key, val)); + return; + } + + tm = gmtime(&exp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + log_info(KV_FMT " Stored value until %s.", KV_VAL(key, val), tmstr); } -static void * work(void * o) +static dht_msg_t * do_dht_kv_find_node_req(const dht_find_req_msg_t * req) { - struct dht * dht; - struct timespec now; - struct list_head * p; - struct list_head * h; - struct list_head reflist; - time_t intv; - struct lookup * lu; + dht_contact_msg_t ** contacts; + dht_msg_t * rsp; + uint8_t * key; + uint64_t cookie; + ssize_t len; - dht = (struct dht *) o; + assert(req != NULL); - pthread_rwlock_rdlock(&dht->lock); + key = req->key.data; + cookie = req->cookie; - intv = gcd(dht->t_expire, dht->t_repub); - intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + len = dht_kv_get_contacts(key, &contacts); + if (len < 0) { + log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key)); + goto fail_contacts; + } - pthread_rwlock_unlock(&dht->lock); + rsp = dht_kv_find_node_rsp_msg(key, cookie, &contacts, len); + if (rsp == NULL) { + log_err(KEY_FMT " Failed to create %s.", KEY_VAL(key), + dht_code_str[DHT_FIND_NODE_RSP]); + goto fail_msg; + } - list_head_init(&reflist); + assert(rsp->code == DHT_FIND_NODE_RSP); - while (true) { - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_wrlock(&dht->lock); - - /* Republish registered hashes. */ - list_for_each(p, &dht->refs) { - struct ref_entry * e; - uint8_t * key; - uint64_t addr; - time_t t_expire; - e = list_entry(p, struct ref_entry, next); - if (now.tv_sec > e->t_rep) { - key = dht_dup_key(e->key, dht->b); - if (key == NULL) - continue; - addr = dht->addr; - t_expire = dht->t_expire; - e->t_rep = now.tv_sec + dht->t_repub; - - pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, addr, t_expire); - pthread_rwlock_wrlock(&dht->lock); - free(key); - } - } + log_info(KEY_FMT " Responding with %zd contacts", KEY_VAL(key), len); - /* Remove stale entries and republish if necessary. */ - list_for_each_safe(p, h, &dht->entries) { - struct list_head * p1; - struct list_head * h1; - struct dht_entry * e; - uint8_t * key; - time_t t_expire; - e = list_entry (p, struct dht_entry, next); - list_for_each_safe(p1, h1, &e->vals) { - struct val * v; - uint64_t addr; - v = list_entry(p1, struct val, next); - if (now.tv_sec > v->t_exp) { - list_del(&v->next); - val_destroy(v); - continue; - } - - if (now.tv_sec > v->t_rep) { - key = dht_dup_key(e->key, dht->b); - addr = v->addr; - t_expire = dht->t_expire = now.tv_sec; - v->t_rep = now.tv_sec + dht->t_replic; - pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, addr, t_expire); - pthread_rwlock_wrlock(&dht->lock); - free(key); - } - } - } + return rsp; + fail_msg: + while (len-- > 0) + dht_contact_msg__free_unpacked(contacts[len], NULL); + free(contacts); + fail_contacts: + return NULL; +} - /* Check the requests list for unresponsive nodes. */ - list_for_each_safe(p, h, &dht->requests) { - struct kad_req * r; - r = list_entry(p, struct kad_req, next); - if (now.tv_sec > r->t_exp) { - list_del(&r->next); - bmp_release(dht->cookies, r->cookie); - dht_dead_peer(dht, r->key, r->addr); - kad_req_destroy(r); - } - } +static void dht_kv_process_node_rsp(dht_contact_msg_t ** contacts, + size_t len, + struct list_head * pl, + enum dht_code code) +{ + struct timespec now; + size_t i; - /* Refresh unaccessed buckets. */ - bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + assert(contacts != NULL); + assert(len > 0); + assert(pl != NULL); + assert(list_is_empty(pl)); - pthread_rwlock_unlock(&dht->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - list_for_each_safe(p, h, &reflist) { - struct contact * c; - c = list_entry(p, struct contact, next); - lu = kad_lookup(dht, c->id, KAD_FIND_NODE); - if (lu != NULL) - lookup_destroy(lu); - list_del(&c->next); - contact_destroy(c); + for (i = 0; i < len; i++) { + dht_contact_msg_t * c = contacts[i]; + struct peer_entry * e; + if (c->addr == dht.addr) + continue; + + if (dht_kv_update_contacts(c->id.data, c->addr) < 0) + log_warn(PEER_FMT " Failed to update contacts.", + PEER_VAL(c->id.data, c->addr)); + + e = malloc(sizeof(*e)); + if (e == NULL) { + log_err(PEER_FMT " Failed to malloc entry.", + PEER_VAL(c->id.data, c->addr)); + continue; } - sleep(intv); - } + e->id = dht_dup_key(c->id.data); + if (e->id == NULL) { + log_warn(PEER_FMT " Failed to duplicate id.", + PEER_VAL(c->id.data, c->addr)); + free(e); + continue; + } - return (void *) 0; + e->cookie = generate_cookie(); + e->code = code; + e->addr = c->addr; + e->t_sent = now.tv_sec; + + list_add_tail(&e->next, pl); + } } -static int kad_handle_join_resp(struct dht * dht, - struct kad_req * req, - dht_msg_t * msg) +static dht_msg_t * do_dht_kv_find_value_req(const dht_find_req_msg_t * req) { - assert(dht); - assert(req); - assert(msg); + dht_contact_msg_t ** contacts; + ssize_t n_contacts; + buffer_t * vals; + ssize_t n_vals; + dht_msg_t * rsp; + uint8_t * key; + uint64_t cookie; - /* We might send version numbers later to warn of updates if needed. */ - if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && - msg->has_t_refresh && msg->has_t_replicate)) { - log_warn("Join refused by remote."); - return -1; + assert(req != NULL); + + key = req->key.data; + cookie = req->cookie; + + n_contacts = dht_kv_get_contacts(key, &contacts); + if (n_contacts < 0) { + log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key)); + goto fail_contacts; } - if (msg->b < sizeof(uint64_t)) { - log_err("Hash sizes less than 8 bytes unsupported."); - return -1; + assert(n_contacts > 0 || contacts == NULL); + + n_vals = dht_kv_retrieve(key, &vals); + if (n_vals < 0) { + log_dbg(KEY_FMT " Failed to get values.", KEY_VAL(key)); + goto fail_vals; } - pthread_rwlock_wrlock(&dht->lock); + if (n_vals == 0) + log_dbg(KEY_FMT " No values found.", KEY_VAL(key)); - dht->buckets = bucket_create(); - if (dht->buckets == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -1; + rsp = dht_kv_find_value_rsp_msg(key, cookie, &contacts, n_contacts, + &vals, n_vals); + if (rsp == NULL) { + log_err(KEY_FMT " Failed to create %s msg.", + KEY_VAL(key), dht_code_str[DHT_FIND_VALUE_RSP]); + goto fail_msg; } - /* Likely corrupt packet. The member will refuse, we might here too. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) - log_warn("Different kademlia parameters detected."); + log_info(KEY_FMT " Responding with %zd contacts, %zd values.", + KEY_VAL(req->key.data), n_contacts, n_vals); - if (msg->t_replicate != KAD_T_REPL) - log_warn("Different kademlia replication time detected."); + return rsp; - if (msg->t_refresh != KAD_T_REFR) - log_warn("Different kademlia refresh time detected."); + fail_msg: + freebufs(vals, n_vals); + fail_vals: + while (n_contacts-- > 0) + dht_contact_msg__free_unpacked(contacts[n_contacts], NULL); + free(contacts); + fail_contacts: + return NULL; +} - dht->k = msg->k; - dht->b = msg->b; - dht->t_expire = msg->t_expire; - dht->t_repub = MAX(1, dht->t_expire - 10); +static void do_dht_kv_find_node_rsp(const dht_find_node_rsp_msg_t * rsp) +{ + struct list_head pl; - if (pthread_create(&dht->worker, NULL, work, dht)) { - bucket_destroy(dht->buckets); - pthread_rwlock_unlock(&dht->lock); - return -1; - } + assert(rsp != NULL); - kad_req_respond(req); + list_head_init(&pl); - dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + dht_kv_process_node_rsp(rsp->contacts, rsp->n_contacts, &pl, + DHT_FIND_NODE_REQ); - pthread_rwlock_unlock(&dht->lock); + if (list_is_empty(&pl)) + goto no_contacts; - log_dbg("Enrollment of DHT completed."); + if (dht_kv_update_req(rsp->key.data, &pl) < 0) { + log_err(KEY_FMT " Failed to update request.", + KEY_VAL(rsp->key.data)); + goto fail_update; + } - return 0; + dht_kv_query_contacts(rsp->key.data, &pl); + + return; + + fail_update: + peer_list_destroy(&pl); + no_contacts: + return; } -static int kad_handle_find_resp(struct dht * dht, - struct kad_req * req, - dht_msg_t * msg) +static void do_dht_kv_find_value_rsp(const dht_find_node_rsp_msg_t * node, + const dht_find_value_rsp_msg_t * val) { - struct lookup * lu; + struct list_head pl; + uint8_t * key; - assert(dht); - assert(req); - assert(msg); + assert(node != NULL); + assert(val != NULL); - pthread_rwlock_rdlock(&dht->lock); + list_head_init(&pl); - lu = dht_find_lookup(dht, req->cookie); - if (lu == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -1; + key = node->key.data; + + dht_kv_process_node_rsp(node->contacts, node->n_contacts, &pl, + DHT_FIND_VALUE_REQ); + + if (val->n_values > 0) { + log_dbg(KEY_FMT " %zd new values received.", + KEY_VAL(key), val->n_values); + dht_kv_respond_req(key, val->values, val->n_values); + peer_list_destroy(&pl); + return; /* done! */ } - lookup_update(dht, lu, msg); + if (list_is_empty(&pl)) + goto no_contacts; - pthread_rwlock_unlock(&dht->lock); + if (dht_kv_update_req(key, &pl) < 0) { + log_err(KEY_FMT " Failed to update request.", KEY_VAL(key)); + goto fail_update; + } - return 0; + dht_kv_query_remote(key, NULL, &pl); + + return; + fail_update: + peer_list_destroy(&pl); + no_contacts: + return; } -static void kad_handle_response(struct dht * dht, - dht_msg_t * msg) +static dht_msg_t * dht_wait_for_dht_msg(void) { - struct kad_req * req; + dht_msg_t * msg; + struct cmd * cmd; - assert(dht); - assert(msg); + pthread_mutex_lock(&dht.cmds.mtx); - pthread_rwlock_wrlock(&dht->lock); + pthread_cleanup_push(__cleanup_mutex_unlock, &dht.cmds.mtx); - req = dht_find_request(dht, msg); - if (req == NULL) { - pthread_rwlock_unlock(&dht->lock); + while (list_is_empty(&dht.cmds.list)) + pthread_cond_wait(&dht.cmds.cond, &dht.cmds.mtx); + + cmd = list_last_entry(&dht.cmds.list, struct cmd, next); + list_del(&cmd->next); + + pthread_cleanup_pop(true); + + msg = dht_msg__unpack(NULL, cmd->cbuf.len, cmd->cbuf.data); + if (msg == NULL) + log_warn("Failed to unpack DHT msg."); + + freebuf(cmd->cbuf); + free(cmd); + + return msg; +} + +static void do_dht_msg(dht_msg_t * msg) +{ + dht_msg_t * rsp = NULL; + uint8_t * id; + uint64_t addr; + +#ifdef DEBUG_PROTO_DHT + dht_kv_debug_msg_rcv(msg); +#endif + if (dht_kv_validate_msg(msg) == -EINVAL) { + log_warn("%s Validation failed.", DHT_CODE(msg)); + dht_msg__free_unpacked(msg, NULL); return; } - bmp_release(dht->cookies, req->cookie); - list_del(&req->next); + id = msg->src->id.data; + addr = msg->src->addr; - pthread_rwlock_unlock(&dht->lock); + if (dht_kv_update_contacts(id, addr) < 0) + log_warn(PEER_FMT " Failed to update contact from msg src.", + PEER_VAL(id, addr)); - switch(req->code) { - case KAD_JOIN: - if (kad_handle_join_resp(dht, req, msg)) - log_err("Enrollment of DHT failed."); + pthread_cleanup_push(__cleanup_dht_msg, msg); + + switch(msg->code) { + case DHT_FIND_VALUE_REQ: + rsp = do_dht_kv_find_value_req(msg->find); break; - case KAD_FIND_VALUE: - case KAD_FIND_NODE: - if (dht_get_state(dht) != DHT_RUNNING) - break; - kad_handle_find_resp(dht, req, msg); + case DHT_FIND_NODE_REQ: + rsp = do_dht_kv_find_node_req(msg->find); break; - default: + case DHT_STORE: + do_dht_kv_store(msg->store); break; + case DHT_FIND_NODE_RSP: + do_dht_kv_find_node_rsp(msg->node); + break; + case DHT_FIND_VALUE_RSP: + do_dht_kv_find_value_rsp(msg->node, msg->val); + break; + default: + assert(false); /* already validated */ } - kad_req_destroy(req); + pthread_cleanup_pop(true); + + if (rsp == NULL) + return; + + pthread_cleanup_push(__cleanup_dht_msg, rsp); + + dht_send_msg(rsp, addr); + + pthread_cleanup_pop(true); /* free rsp */ } -int dht_bootstrap(void * dir) +static void * dht_handle_packet(void * o) { - struct dht * dht; + (void) o; - dht = (struct dht *) dir; + while (true) { + dht_msg_t * msg; - assert(dht); + msg = dht_wait_for_dht_msg(); + if (msg == NULL) + continue; + + tpm_begin_work(dht.tpm); - pthread_rwlock_wrlock(&dht->lock); + do_dht_msg(msg); + tpm_end_work(dht.tpm); + } + + return (void *) 0; +} #ifndef __DHT_TEST__ - dht->b = ipcp_dir_hash_len(); -#else - dht->b = DHT_TEST_KEY_LEN; -#endif +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) +{ + struct cmd * cmd; - dht->id = create_id(dht->b); - if (dht->id == NULL) - goto fail_id; + (void) comp; - dht->buckets = bucket_create(); - if (dht->buckets == NULL) - goto fail_buckets; + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command malloc failed."); + goto fail_cmd; + } - dht->buckets->depth = 0; - dht->buckets->mask = 0; + cmd->cbuf.data = malloc(shm_du_buff_len(sdb)); + if (cmd->cbuf.data == NULL) { + log_err("Command buffer malloc failed."); + goto fail_buf; + } - dht->t_expire = 86400; /* 1 day */ - dht->t_repub = dht->t_expire - 10; - dht->k = KAD_K; + cmd->cbuf.len = shm_du_buff_len(sdb); - if (pthread_create(&dht->worker, NULL, work, dht)) - goto fail_pthread_create; + memcpy(cmd->cbuf.data, shm_du_buff_head(sdb), cmd->cbuf.len); - dht->state = DHT_RUNNING; + ipcp_sdb_release(sdb); - dht_update_bucket(dht, dht->id, dht->addr); + pthread_mutex_lock(&dht.cmds.mtx); - pthread_rwlock_unlock(&dht->lock); + list_add(&cmd->next, &dht.cmds.list); - return 0; + pthread_cond_signal(&dht.cmds.cond); - fail_pthread_create: - bucket_destroy(dht->buckets); - dht->buckets = NULL; - fail_buckets: - free(dht->id); - dht->id = NULL; - fail_id: - pthread_rwlock_unlock(&dht->lock); - return -1; + pthread_mutex_unlock(&dht.cmds.mtx); + + return; + + fail_buf: + free(cmd); + fail_cmd: + ipcp_sdb_release(sdb); + return; } +#endif -static struct ref_entry * ref_entry_get(struct dht * dht, - const uint8_t * key) +int dht_reg(const uint8_t * key) { - struct list_head * p; + buffer_t val; - list_for_each(p, &dht->refs) { - struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht-> b) ) - return r; + if (addr_to_buf(dht.addr, &val) < 0) { + log_err("Failed to convert address to buffer."); + goto fail_a2b; } - return NULL; + if (dht_kv_publish(key, val)) { + log_err(KV_FMT " Failed to publish.", KV_VAL(key, val)); + goto fail_publish; + } + + freebuf(val); + + return 0; + fail_publish: + freebuf(val); + fail_a2b: + return -1; } -int dht_reg(void * dir, - const uint8_t * key) +int dht_unreg(const uint8_t * key) { - struct dht * dht; - struct ref_entry * e; - uint64_t addr; - time_t t_expire; + buffer_t val; - dht = (struct dht *) dir; + if (addr_to_buf(dht.addr, &val) < 0) { + log_err("Failed to convert address to buffer."); + goto fail_a2b; + } - assert(dht); - assert(key); - assert(dht->addr != 0); + if (dht_kv_unpublish(key, val)) { + log_err(KV_FMT " Failed to unpublish.", KV_VAL(key, val)); + goto fail_unpublish; + } - if (dht_wait_running(dht)) - return -1; + freebuf(val); - pthread_rwlock_wrlock(&dht->lock); + return 0; + fail_unpublish: + freebuf(val); + fail_a2b: + return -ENOMEM; +} - if (ref_entry_get(dht, key) != NULL) { - log_dbg("Name already registered."); - pthread_rwlock_unlock(&dht->lock); - return 0; - } +uint64_t dht_query(const uint8_t * key) +{ + buffer_t * vals; + ssize_t n; + uint64_t addr; - e = ref_entry_create(dht, key); - if (e == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -ENOMEM; + n = dht_kv_retrieve(key, &vals); + if (n < 0) { + log_err(KEY_FMT " Failed to query db.", KEY_VAL(key)); + goto fail_vals; } - list_add(&e->next, &dht->refs); + if (n == 0) { + log_dbg(KEY_FMT " No local values.", KEY_VAL(key)); + n = dht_kv_query_remote(key, &vals, NULL); + if (n < 0) { + log_warn(KEY_FMT " Failed to query DHT.", KEY_VAL(key)); + goto fail_vals; + } + if (n == 0) { + log_dbg(KEY_FMT " No values.", KEY_VAL(key)); + goto no_vals; + } + } - t_expire = dht->t_expire; - addr = dht->addr; + if (buf_to_addr(vals[0], &addr) < 0) { + log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[0])); + goto fail_b2a; + } - pthread_rwlock_unlock(&dht->lock); + if (n > 1 && addr == INVALID_ADDR && buf_to_addr(vals[1], &addr) < 0) { + log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[1])); + goto fail_b2a; + } - kad_publish(dht, key, addr, t_expire); + freebufs(vals, n); - return 0; + return addr; + fail_b2a: + freebufs(vals, n); + return INVALID_ADDR; + no_vals: + free(vals); + fail_vals: + return INVALID_ADDR; } -int dht_unreg(void * dir, - const uint8_t * key) +static int emergency_peer(struct list_head * pl) { - struct dht * dht; - struct list_head * p; - struct list_head * h; - - dht = (struct dht *) dir; + struct peer_entry * e; + struct timespec now; - assert(dht); - assert(key); + assert(pl != NULL); + assert(list_is_empty(pl)); - if (dht_get_state(dht) != DHT_RUNNING) + if (dht.peer == INVALID_ADDR) return -1; - pthread_rwlock_wrlock(&dht->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - list_for_each_safe(p, h, &dht->refs) { - struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht-> b) ) { - list_del(&r->next); - ref_entry_destroy(r); - } + e = malloc(sizeof(*e)); + if (e == NULL) { + log_err("Failed to malloc emergency peer entry."); + goto fail_malloc; } - dht_del(dht, key, dht->addr); + e->id = dht_dup_key(dht.id.data); + if (e->id == NULL) { + log_err("Failed to duplicate DHT ID for emergency peer."); + goto fail_id; + } + + e->addr = dht.peer; + e->cookie = dht.magic; + e->code = DHT_FIND_NODE_REQ; + e->t_sent = now.tv_sec; - pthread_rwlock_unlock(&dht->lock); + list_add_tail(&e->next, pl); return 0; + fail_id: + free(e); + fail_malloc: + return -ENOMEM; } -uint64_t dht_query(void * dir, - const uint8_t * key) +static int dht_kv_seed_bootstrap_peer(void) { - struct dht * dht; - struct dht_entry * e; - struct lookup * lu; - uint64_t addrs[KAD_K]; - size_t n; + struct list_head pl; - dht = (struct dht *) dir; + list_head_init(&pl); - assert(dht); + if (dht.peer == INVALID_ADDR) { + log_dbg("No-one to contact."); + return 0; + } - addrs[0] = 0; + if (emergency_peer(&pl) < 0) { + log_err("Could not create emergency peer."); + goto fail_peer; + } - if (dht_wait_running(dht)) - return 0; + log_dbg("Pinging emergency peer " ADDR_FMT32 ".", + ADDR_VAL32(&dht.peer)); - pthread_rwlock_rdlock(&dht->lock); + if (dht_kv_query_contacts(dht.id.data, &pl) < 0) { + log_warn("Failed to bootstrap peer."); + goto fail_query; + } - e = dht_find_entry(dht, key); - if (e != NULL) - addrs[0] = dht_entry_get_addr(dht, e); + peer_list_destroy(&pl); - pthread_rwlock_unlock(&dht->lock); + return 0; + fail_query: + peer_list_destroy(&pl); + fail_peer: + return -EAGAIN; +} - if (addrs[0] != 0) - return addrs[0]; +static void dht_kv_check_contacts(void) +{ + struct list_head cl; + struct list_head pl; - lu = kad_lookup(dht, key, KAD_FIND_VALUE); - if (lu == NULL) - return 0; + list_head_init(&cl); - n = lookup_get_addrs(lu, addrs); - if (n == 0) { - lookup_destroy(lu); - return 0; + dht_kv_contact_list(dht.id.data, &cl, dht.k); + + if (!list_is_empty(&cl)) + goto success; + + contact_list_destroy(&cl); + + list_head_init(&pl); + + if (dht.peer == INVALID_ADDR) { + log_dbg("No-one to contact."); + return; } - lookup_destroy(lu); + if (emergency_peer(&pl) < 0) { + log_err("Could not create emergency peer."); + goto fail_peer; + } - /* Current behaviour is anycast and return the first peer address. */ - if (addrs[0] != dht->addr) - return addrs[0]; + log_dbg("No contacts found, using emergency peer " ADDR_FMT32 ".", + ADDR_VAL32(&dht.peer)); - if (n > 1) - return addrs[1]; + dht_kv_query_contacts(dht.id.data, &pl); - return 0; + peer_list_destroy(&pl); + + return; + success: + contact_list_destroy(&cl); + return; + fail_peer: + return; } -static void * dht_handle_packet(void * o) +static void dht_kv_remove_expired_reqs(void) { - struct dht * dht = (struct dht *) o; + struct list_head * p; + struct list_head * h; + struct timespec now; - assert(dht); + clock_gettime(PTHREAD_COND_CLOCK, &now); - while (true) { - dht_msg_t * msg; - dht_contact_msg_t ** cmsgs; - dht_msg_t resp_msg = DHT_MSG__INIT; - uint64_t addr; - buffer_t buf; - size_t i; - size_t b; - size_t t_expire; - struct cmd * cmd; + pthread_mutex_lock(&dht.reqs.mtx); - pthread_mutex_lock(&dht->mtx); + list_for_each_safe(p, h, &dht.reqs.list) { + struct dht_req * e; + e = list_entry(p, struct dht_req, next); + if (IS_EXPIRED(e, &now)) { + log_dbg(KEY_FMT " Removing expired request.", + KEY_VAL(e->key)); + list_del(&e->next); + dht_req_destroy(e); + --dht.reqs.len; + } + } - pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + pthread_mutex_unlock(&dht.reqs.mtx); +} - while (list_is_empty(&dht->cmds)) - pthread_cond_wait(&dht->cond, &dht->mtx); +static void value_list_destroy(struct list_head * vl) +{ + struct list_head * p; + struct list_head * h; - cmd = list_last_entry(&dht->cmds, struct cmd, next); - list_del(&cmd->next); + assert(vl != NULL); - pthread_cleanup_pop(true); + list_for_each_safe(p, h, vl) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + } +} - i = shm_du_buff_len(cmd->sdb); +#define MUST_REPLICATE(v, now) ((now)->tv_sec > (v)->t_repl + dht.t_repl) +#define MUST_REPUBLISH(v, now) /* Close to expiry deadline */ \ + (((v)->t_exp - (now)->tv_sec) < (DHT_N_REPUB * dht.t_repl)) +static void dht_entry_get_repl_lists(const struct dht_entry * e, + struct list_head * repl, + struct list_head * rebl, + struct timespec * now) +{ + struct list_head * p; + struct val_entry * n; - msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); -#ifndef __DHT_TEST__ - ipcp_sdb_release(cmd->sdb); -#endif - free(cmd); + list_for_each(p, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (MUST_REPLICATE(v, now) && !IS_EXPIRED(v, now)) { + n = val_entry_create(v->val, v->t_exp); + if (n == NULL) + continue; - if (msg == NULL) { - log_err("Failed to unpack message."); - continue; + list_add_tail(&n->next, repl); } + } - if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { - dht_msg__free_unpacked(msg, NULL); - log_dbg("Got a request message when not running."); - continue; + list_for_each(p, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (MUST_REPLICATE(v, now) && MUST_REPUBLISH(v, now)) { + /* Add expire time here, to allow creating val_entry */ + n = val_entry_create(v->val, now->tv_sec + dht.t_expire); + if (n == NULL) + continue; + + list_add_tail(&n->next, rebl); } + } +} - pthread_rwlock_rdlock(&dht->lock); +static int dht_kv_next_values(uint8_t * key, + struct list_head * repl, + struct list_head * rebl) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; + struct dht_entry * e = NULL; - b = dht->b; - t_expire = dht->t_expire; + assert(key != NULL); + assert(repl != NULL); + assert(rebl != NULL); - pthread_rwlock_unlock(&dht->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - if (msg->has_key && msg->key.len != b) { - dht_msg__free_unpacked(msg, NULL); - log_warn("Bad key in message."); - continue; - } + assert(list_is_empty(repl)); + assert(list_is_empty(rebl)); - if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { - dht_msg__free_unpacked(msg, NULL); - log_warn("Bad source ID in message of type %d.", - msg->code); - continue; - } + pthread_rwlock_rdlock(&dht.db.lock); - tpm_begin_work(dht->tpm); + if (dht.db.kv.len == 0) + goto no_entries; - addr = msg->s_addr; + list_for_each_safe(p, h, &dht.db.kv.list) { + e = list_entry(p, struct dht_entry, next); + if (IS_CLOSER(e->key, key)) + continue; /* Already processed */ + } - resp_msg.code = KAD_RESPONSE; - resp_msg.cookie = msg->cookie; + if (e != NULL) { + memcpy(key, e->key, dht.id.len); + dht_entry_get_repl_lists(e, repl, rebl, &now); + } + no_entries: + pthread_rwlock_unlock(&dht.db.lock); - switch(msg->code) { - case KAD_JOIN: - /* Refuse enrollee on check fails. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { - log_warn("Parameter mismatch. " - "DHT enrolment refused."); - break; - } + return list_is_empty(repl) && list_is_empty(rebl) ? -ENOENT : 0; +} - if (msg->t_replicate != KAD_T_REPL) { - log_warn("Replication time mismatch. " - "DHT enrolment refused."); +static void dht_kv_replicate_value(const uint8_t * key, + struct val_entry * v, + const struct timespec * now) +{ + assert(MUST_REPLICATE(v, now)); - break; - } + (void) now; - if (msg->t_refresh != KAD_T_REFR) { - log_warn("Refresh time mismatch. " - "DHT enrolment refused."); - break; - } + if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) { + log_dbg(KV_FMT " Replicated.", KV_VAL(key, v->val)); + return; + } - resp_msg.has_alpha = true; - resp_msg.has_b = true; - resp_msg.has_k = true; - resp_msg.has_t_expire = true; - resp_msg.has_t_refresh = true; - resp_msg.has_t_replicate = true; - resp_msg.alpha = KAD_ALPHA; - resp_msg.b = b; - resp_msg.k = KAD_K; - resp_msg.t_expire = t_expire; - resp_msg.t_refresh = KAD_T_REFR; - resp_msg.t_replicate = KAD_T_REPL; - break; - case KAD_FIND_VALUE: - buf = dht_retrieve(dht, msg->key.data); - if (buf.len != 0) { - resp_msg.n_addrs = buf.len; - resp_msg.addrs = (uint64_t *) buf.data; - break; - } - /* FALLTHRU */ - case KAD_FIND_NODE: - /* Return k closest contacts. */ - resp_msg.n_contacts = - dht_get_contacts(dht, msg->key.data, &cmsgs); - resp_msg.contacts = cmsgs; - break; - case KAD_STORE: - if (msg->n_contacts < 1) { - log_warn("No contacts in store message."); - break; - } + log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val)); - if (!msg->has_t_expire) { - log_warn("No expiry time in store message."); - break; - } + list_del(&v->next); + val_entry_destroy(v); +} - kad_add(dht, *msg->contacts, msg->n_contacts, - msg->t_expire); - break; - case KAD_RESPONSE: - kad_handle_response(dht, msg); - break; - default: - assert(false); - break; - } +static void dht_kv_republish_value(const uint8_t * key, + struct val_entry * v, + const struct timespec * now) +{ + assert(MUST_REPLICATE(v, now)); - if (msg->code != KAD_JOIN) { - pthread_rwlock_wrlock(&dht->lock); - if (dht_get_state(dht) == DHT_JOINING && - dht->buckets == NULL) { - pthread_rwlock_unlock(&dht->lock); - goto finish; - } + if (MUST_REPUBLISH(v, now)) + assert(v->t_exp >= now->tv_sec + dht.t_expire); - if (dht_update_bucket(dht, msg->s_id.data, addr)) - log_warn("Failed to update bucket."); - pthread_rwlock_unlock(&dht->lock); - } + if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) { + log_dbg(KV_FMT " Republished.", KV_VAL(key, v->val)); + return; + } - if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) - log_warn("Failed to send response."); + if (MUST_REPUBLISH(v, now)) + log_warn(KV_FMT " Republish failed.", KV_VAL(key, v->val)); + else + log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val)); - finish: - dht_msg__free_unpacked(msg, NULL); + list_del(&v->next); + val_entry_destroy(v); +} + +static void dht_kv_update_replication_times(const uint8_t * key, + struct list_head * repl, + struct list_head * rebl, + const struct timespec * now) +{ + struct dht_entry * e; + struct list_head * p; + struct list_head * h; + struct val_entry * v; - if (resp_msg.n_addrs > 0) - free(resp_msg.addrs); + assert(key != NULL); + assert(repl != NULL); + assert(rebl != NULL); + assert(now != NULL); - if (resp_msg.n_contacts == 0) { - tpm_end_work(dht->tpm); + pthread_rwlock_wrlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) { + pthread_rwlock_unlock(&dht.db.lock); + return; + } + + list_for_each_safe(p, h, repl) { + struct val_entry * x; + v = list_entry(p, struct val_entry, next); + x = dht_entry_get_val(e, v->val); + if (x == NULL) { + log_err(KV_FMT " Not in vals.", KV_VAL(key, v->val)); continue; } - for (i = 0; i < resp_msg.n_contacts; ++i) - dht_contact_msg__free_unpacked(resp_msg.contacts[i], - NULL); - free(resp_msg.contacts); + x->t_repl = now->tv_sec; - tpm_end_work(dht->tpm); + list_del(&v->next); + val_entry_destroy(v); } - return (void *) 0; + list_for_each_safe(p, h, rebl) { + struct val_entry * x; + v = list_entry(p, struct val_entry, next); + x = dht_entry_get_lval(e, v->val); + if (x == NULL) { + log_err(KV_FMT " Not in lvals.", KV_VAL(key, v->val)); + continue; + } + + x->t_repl = now->tv_sec; + if (v->t_exp > x->t_exp) { + x->t_exp = v->t_exp; /* update expiration time */ + } + + list_del(&v->next); + val_entry_destroy(v); + } + + pthread_rwlock_unlock(&dht.db.lock); } -static void dht_post_packet(void * comp, - struct shm_du_buff * sdb) +static void dht_kv_replicate_values(const uint8_t * key, + struct list_head * repl, + struct list_head * rebl) { - struct cmd * cmd; - struct dht * dht = (struct dht *) comp; + struct timespec now; + struct list_head * p; + struct list_head * h; - if (dht_get_state(dht) == DHT_SHUTDOWN) { -#ifndef __DHT_TEST__ - ipcp_sdb_release(sdb); -#endif - return; + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + list_for_each_safe(p, h, repl) { + struct val_entry * v; + v = list_entry(p, struct val_entry, next); + dht_kv_replicate_value(key, v, &now); } - cmd = malloc(sizeof(*cmd)); - if (cmd == NULL) { - log_err("Command failed. Out of memory."); - return; + list_for_each_safe(p, h, rebl) { + struct val_entry * v; + v = list_entry(p, struct val_entry, next); + dht_kv_republish_value(key, v, &now); } - cmd->sdb = sdb; + /* removes non-replicated items from the list */ + dht_kv_update_replication_times(key, repl, rebl, &now); + + if (list_is_empty(repl) && list_is_empty(rebl)) + return; + + log_warn(KEY_FMT " Failed to update replication times.", KEY_VAL(key)); +} + +static void dht_kv_replicate(void) +{ + struct list_head repl; /* list of values to replicate */ + struct list_head rebl; /* list of local values to republish */ + uint8_t * key; + + key = dht_dup_key(dht.id.data); /* dist == 0 */ + if (key == NULL) { + log_err("Replicate: Failed to duplicate DHT ID."); + return; + } - pthread_mutex_lock(&dht->mtx); + list_head_init(&repl); + list_head_init(&rebl); - list_add(&cmd->next, &dht->cmds); + while (dht_kv_next_values(key, &repl, &rebl) == 0) { + dht_kv_replicate_values(key, &repl, &rebl); + if (!list_is_empty(&repl)) { + log_warn(KEY_FMT " Replication items left.", + KEY_VAL(key)); + value_list_destroy(&repl); + } - pthread_cond_signal(&dht->cond); + if (!list_is_empty(&rebl)) { + log_warn(KEY_FMT " Republish items left.", + KEY_VAL(key)); + value_list_destroy(&rebl); + } + } - pthread_mutex_unlock(&dht->mtx); + free(key); } -void dht_destroy(void * dir) +static void dht_kv_refresh_contacts(void) { - struct dht * dht; struct list_head * p; struct list_head * h; + struct list_head rl; /* refresh list */ + struct timespec now; - dht = (struct dht *) dir; - if (dht == NULL) - return; + list_head_init(&rl); -#ifndef __DHT_TEST__ - tpm_stop(dht->tpm); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - tpm_destroy(dht->tpm); -#endif - if (dht_get_state(dht) == DHT_RUNNING) { - dht_set_state(dht, DHT_SHUTDOWN); - pthread_cancel(dht->worker); - pthread_join(dht->worker, NULL); - } + pthread_rwlock_rdlock(&dht.db.lock); - pthread_rwlock_wrlock(&dht->lock); + __dht_kv_bucket_refresh_list(dht.db.contacts.root, now.tv_sec, &rl); - list_for_each_safe(p, h, &dht->cmds) { - struct cmd * c = list_entry(p, struct cmd, next); + pthread_rwlock_unlock(&dht.db.lock); + + list_for_each_safe(p, h, &rl) { + struct contact * c; + c = list_entry(p, struct contact, next); + log_dbg(PEER_FMT " Refreshing contact.", + PEER_VAL(c->id, c->addr)); + dht_kv_query_contacts(c->id, NULL); list_del(&c->next); -#ifndef __DHT_TEST__ - ipcp_sdb_release(c->sdb); -#endif - free(c); + contact_destroy(c); } - list_for_each_safe(p, h, &dht->entries) { - struct dht_entry * e = list_entry(p, struct dht_entry, next); - list_del(&e->next); - dht_entry_destroy(e); - } + assert(list_is_empty(&rl)); +} - list_for_each_safe(p, h, &dht->requests) { - struct kad_req * r = list_entry(p, struct kad_req, next); - list_del(&r->next); - kad_req_destroy(r); - } +static void (*tasks[])(void) = { + dht_kv_check_contacts, + dht_kv_remove_expired_entries, + dht_kv_remove_expired_reqs, + dht_kv_replicate, + dht_kv_refresh_contacts, + NULL +}; - list_for_each_safe(p, h, &dht->refs) { - struct ref_entry * e = list_entry(p, struct ref_entry, next); - list_del(&e->next); - ref_entry_destroy(e); +static void * work(void * o) +{ + struct timespec now = TIMESPEC_INIT_MS(1); + time_t intv; + size_t n; /* number of tasks */ + + n = sizeof(tasks) / sizeof(tasks[0]) - 1; /* last is NULL */ + + (void) o; + + while (dht_kv_seed_bootstrap_peer() == -EAGAIN) { + ts_add(&now, &now, &now); /* exponential backoff */ + if (now.tv_sec > 1) /* cap at 1 second */ + now.tv_sec = 1; + nanosleep(&now, NULL); } - list_for_each_safe(p, h, &dht->lookups) { - struct lookup * l = list_entry(p, struct lookup, next); - list_del(&l->next); - lookup_destroy(l); + intv = gcd(dht.t_expire, (dht.t_expire - DHT_N_REPUB * dht.t_repl)); + intv = gcd(intv, gcd(dht.t_repl, dht.t_refresh)) / 2; + intv = MAX(1, intv / n); + + log_dbg("DHT worker starting %ld seconds interval.", intv * n); + + while (true) { + int i = 0; + while (tasks[i] != NULL) { + tasks[i++](); + sleep(intv); + } } - pthread_rwlock_unlock(&dht->lock); + return (void *) 0; +} - if (dht->buckets != NULL) - bucket_destroy(dht->buckets); +int dht_start(void) +{ + dht.state = DHT_RUNNING; - bmp_destroy(dht->cookies); + if (tpm_start(dht.tpm)) + goto fail_tpm_start; - pthread_mutex_destroy(&dht->mtx); +#ifndef __DHT_TEST__ + if (pthread_create(&dht.worker, NULL, work, NULL)) { + log_err("Failed to create DHT worker thread."); + goto fail_worker; + } - pthread_rwlock_destroy(&dht->lock); + dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT); + if ((int) dht.eid < 0) { + log_err("Failed to register DHT component."); + goto fail_reg; + } +#else + (void) work; +#endif + return 0; +#ifndef __DHT_TEST__ + fail_reg: + pthread_cancel(dht.worker); + pthread_join(dht.worker, NULL); + fail_worker: + tpm_stop(dht.tpm); +#endif + fail_tpm_start: + dht.state = DHT_INIT; + return -1; +} + +void dht_stop(void) +{ + assert(dht.state == DHT_RUNNING); - free(dht->id); +#ifndef __DHT_TEST__ + dt_unreg_comp(dht.eid); - free(dht); + pthread_cancel(dht.worker); + pthread_join(dht.worker, NULL); +#endif + tpm_stop(dht.tpm); + + dht.state = DHT_INIT; } -static void * join_thr(void * o) +int dht_init(struct dir_dht_config * conf) { - struct join_info * info = (struct join_info *) o; - struct lookup * lu; - size_t retr = 0; + struct timespec now; + pthread_condattr_t cattr; - assert(info); + assert(conf != NULL); - while (kad_join(info->dht, info->addr)) { - if (dht_get_state(info->dht) == DHT_SHUTDOWN) { - log_dbg("DHT enrollment aborted."); - goto finish; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - if (retr++ == KAD_JOIN_RETR) { - dht_set_state(info->dht, DHT_INIT); - log_warn("DHT enrollment attempt failed."); - goto finish; - } +#ifndef __DHT_TEST__ + dht.id.len = ipcp_dir_hash_len(); + dht.addr = addr_auth_address(); +#else + dht.id.len = DHT_TEST_KEY_LEN; + dht.addr = DHT_TEST_ADDR; +#endif + dht.t0 = now.tv_sec; + dht.alpha = conf->params.alpha; + dht.k = conf->params.k; + dht.t_expire = conf->params.t_expire; + dht.t_refresh = conf->params.t_refresh; + dht.t_repl = conf->params.t_replicate; + dht.peer = conf->peer; + + dht.magic = generate_cookie(); + + /* Send my address on enrollment */ + conf->peer = dht.addr; + + dht.id.data = generate_id(); + if (dht.id.data == NULL) { + log_err("Failed to create DHT ID."); + goto fail_id; + } - sleep(KAD_JOIN_INTV); + list_head_init(&dht.cmds.list); + + if (pthread_mutex_init(&dht.cmds.mtx, NULL)) { + log_err("Failed to initialize command mutex."); + goto fail_cmds_mutex; } - dht_set_state(info->dht, DHT_RUNNING); + if (pthread_cond_init(&dht.cmds.cond, NULL)) { + log_err("Failed to initialize command condvar."); + goto fail_cmds_cond; + } - lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); - if (lu != NULL) - lookup_destroy(lu); + list_head_init(&dht.reqs.list); + dht.reqs.len = 0; - finish: - free(info); + if (pthread_mutex_init(&dht.reqs.mtx, NULL)) { + log_err("Failed to initialize request mutex."); + goto fail_reqs_mutex; + } - return (void *) 0; -} + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize request condvar attributes."); + goto fail_cattr; + } +#ifndef __APPLE__ + if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) { + log_err("Failed to set request condvar clock."); + goto fail_cattr; + } +#endif + if (pthread_cond_init(&dht.reqs.cond, &cattr)) { + log_err("Failed to initialize request condvar."); + goto fail_reqs_cond; + } -static void handle_event(void * self, - int event, - const void * o) -{ - struct dht * dht = (struct dht *) self; + list_head_init(&dht.db.kv.list); + dht.db.kv.len = 0; + dht.db.kv.vals = 0; + dht.db.kv.lvals = 0; - if (event == NOTIFY_DT_CONN_ADD) { - pthread_t thr; - struct join_info * inf; - struct conn * c = (struct conn *) o; - struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK); + if (pthread_rwlock_init(&dht.db.lock, NULL)) { + log_err("Failed to initialize store rwlock."); + goto fail_rwlock; + } - /* Give the pff some time to update for the new link. */ - nanosleep(&slack, NULL); + dht.db.contacts.root = bucket_create(); + if (dht.db.contacts.root == NULL) { + log_err("Failed to create DHT buckets."); + goto fail_buckets; + } - switch(dht_get_state(dht)) { - case DHT_INIT: - inf = malloc(sizeof(*inf)); - if (inf == NULL) - break; + if (rib_reg(DHT, &r_ops) < 0) { + log_err("Failed to register DHT RIB operations."); + goto fail_rib_reg; + } - inf->dht = dht; - inf->addr = c->conn_info.addr; - - if (dht_set_state(dht, DHT_JOINING) == 0 || - dht_wait_running(dht)) { - if (pthread_create(&thr, NULL, join_thr, inf)) { - dht_set_state(dht, DHT_INIT); - free(inf); - return; - } - pthread_detach(thr); - } else { - free(inf); - } - break; - case DHT_RUNNING: - /* - * FIXME: this lookup for effiency reasons - * causes a SEGV when stressed with rapid - * enrollments. - * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); - * if (lu != NULL) - * lookup_destroy(lu); - */ - break; - default: - break; - } + dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL); + if (dht.tpm == NULL) { + log_err("Failed to create TPM for DHT."); + goto fail_tpm_create; } + + if (dht_kv_update_contacts(dht.id.data, dht.addr) < 0) + log_warn("Failed to update contacts with DHT ID."); + + pthread_condattr_destroy(&cattr); +#ifndef __DHT_TEST__ + log_info("DHT initialized."); + log_dbg(" ID: " HASH_FMT64 " [%zu bytes].", + HASH_VAL64(dht.id.data), dht.id.len); + log_dbg(" address: " ADDR_FMT32 ".", ADDR_VAL32(&dht.addr)); + log_dbg(" peer: " ADDR_FMT32 ".", ADDR_VAL32(&dht.peer)); + log_dbg(" magic cookie: " HASH_FMT64 ".", HASH_VAL64(&dht.magic)); + log_info(" parameters: alpha=%u, k=%zu, t_expire=%ld, " + "t_refresh=%ld, t_replicate=%ld.", + dht.alpha, dht.k, dht.t_expire, dht.t_refresh, dht.t_repl); +#endif + dht.state = DHT_INIT; + + return 0; + + fail_tpm_create: + rib_unreg(DHT); + fail_rib_reg: + bucket_destroy(dht.db.contacts.root); + fail_buckets: + pthread_rwlock_destroy(&dht.db.lock); + fail_rwlock: + pthread_cond_destroy(&dht.reqs.cond); + fail_reqs_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&dht.reqs.mtx); + fail_reqs_mutex: + pthread_cond_destroy(&dht.cmds.cond); + fail_cmds_cond: + pthread_mutex_destroy(&dht.cmds.mtx); + fail_cmds_mutex: + freebuf(dht.id); + fail_id: + return -1; } -void * dht_create(void) +void dht_fini(void) { - struct dht * dht; + struct list_head * p; + struct list_head * h; - dht = malloc(sizeof(*dht)); - if (dht == NULL) - goto fail_malloc; + rib_unreg(DHT); - dht->buckets = NULL; + tpm_destroy(dht.tpm); - list_head_init(&dht->entries); - list_head_init(&dht->requests); - list_head_init(&dht->refs); - list_head_init(&dht->lookups); - list_head_init(&dht->cmds); + pthread_mutex_lock(&dht.cmds.mtx); - if (pthread_rwlock_init(&dht->lock, NULL)) - goto fail_rwlock; + list_for_each_safe(p, h, &dht.cmds.list) { + struct cmd * c = list_entry(p, struct cmd, next); + list_del(&c->next); + freebuf(c->cbuf); + free(c); + } - if (pthread_mutex_init(&dht->mtx, NULL)) - goto fail_mutex; + pthread_mutex_unlock(&dht.cmds.mtx); - if (pthread_cond_init(&dht->cond, NULL)) - goto fail_cond; + pthread_cond_destroy(&dht.cmds.cond); + pthread_mutex_destroy(&dht.cmds.mtx); - dht->cookies = bmp_create(DHT_MAX_REQS, 1); - if (dht->cookies == NULL) - goto fail_bmp; + pthread_mutex_lock(&dht.reqs.mtx); - dht->b = 0; - dht->id = NULL; -#ifndef __DHT_TEST__ - dht->addr = addr_auth_address(); - if (dht->addr == INVALID_ADDR) - goto fail_bmp; + list_for_each_safe(p, h, &dht.reqs.list) { + struct dht_req * r = list_entry(p, struct dht_req, next); + list_del(&r->next); + dht_req_destroy(r); + dht.reqs.len--; + } - dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); - if (dht->tpm == NULL) - goto fail_tpm_create; + pthread_mutex_unlock(&dht.reqs.mtx); - if (tpm_start(dht->tpm)) - goto fail_tpm_start; + pthread_cond_destroy(&dht.reqs.cond); + pthread_mutex_destroy(&dht.reqs.mtx); - dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT); - if ((int) dht->eid < 0) - goto fail_tpm_start; + pthread_rwlock_wrlock(&dht.db.lock); - if (notifier_reg(handle_event, dht)) - goto fail_notifier_reg; -#else - (void) handle_event; - (void) dht_handle_packet; - (void) dht_post_packet; -#endif - dht->state = DHT_INIT; + list_for_each_safe(p, h, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + list_del(&e->next); + dht_entry_destroy(e); + dht.db.kv.len--; + } - return (void *) dht; -#ifndef __DHT_TEST__ - fail_notifier_reg: - tpm_stop(dht->tpm); - fail_tpm_start: - tpm_destroy(dht->tpm); - fail_tpm_create: - bmp_destroy(dht->cookies); -#endif - fail_bmp: - pthread_cond_destroy(&dht->cond); - fail_cond: - pthread_mutex_destroy(&dht->mtx); - fail_mutex: - pthread_rwlock_destroy(&dht->lock); - fail_rwlock: - free(dht); - fail_malloc: - return NULL; + if (dht.db.contacts.root != NULL) + bucket_destroy(dht.db.contacts.root); + + pthread_rwlock_unlock(&dht.db.lock); + + pthread_rwlock_destroy(&dht.db.lock); + + assert(dht.db.kv.len == 0); + assert(dht.db.kv.vals == 0); + assert(dht.db.kv.lvals == 0); + assert(dht.reqs.len == 0); + + freebuf(dht.id); } diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h index 311c6b23..852a5130 100644 --- a/src/ipcpd/unicast/dir/dht.h +++ b/src/ipcpd/unicast/dir/dht.h @@ -30,22 +30,19 @@ #include <stdint.h> #include <sys/types.h> -void * dht_create(void); +int dht_init(struct dir_dht_config * conf); -void dht_destroy(void * dir); +void dht_fini(void); -int dht_bootstrap(void * dir); +int dht_start(void); -int dht_reg(void * dir, - const uint8_t * key); +void dht_stop(void); -int dht_unreg(void * dir, - const uint8_t * key); +int dht_reg(const uint8_t * key); -uint64_t dht_query(void * dir, - const uint8_t * key); +int dht_unreg(const uint8_t * key); -int dht_wait_running(void * dir); +uint64_t dht_query(const uint8_t * key); extern struct dir_ops dht_dir_ops; diff --git a/src/ipcpd/unicast/dir/dht.proto b/src/ipcpd/unicast/dir/dht.proto index 4c5b06db..ea74805f 100644 --- a/src/ipcpd/unicast/dir/dht.proto +++ b/src/ipcpd/unicast/dir/dht.proto @@ -27,19 +27,32 @@ message dht_contact_msg { required uint64 addr = 2; } +message dht_find_req_msg { + required uint64 cookie = 1; + required bytes key = 2; +} + +message dht_find_node_rsp_msg { + required uint64 cookie = 1; + required bytes key = 2; + repeated dht_contact_msg contacts = 3; +} + +message dht_find_value_rsp_msg { + repeated bytes values = 1; +} + +message dht_store_msg { + required bytes key = 1; + required bytes val = 2; + required uint32 exp = 3; +} + message dht_msg { - required uint32 code = 1; - required uint32 cookie = 2; - required uint64 s_addr = 3; - optional bytes s_id = 4; - optional bytes key = 5; - repeated uint64 addrs = 6; - repeated dht_contact_msg contacts = 7; - // enrolment parameters - optional uint32 alpha = 8; - optional uint32 b = 9; - optional uint32 k = 10; - optional uint32 t_expire = 11; - optional uint32 t_refresh = 12; - optional uint32 t_replicate = 13; + required uint32 code = 1; + required dht_contact_msg src = 2; + optional dht_store_msg store = 3; + optional dht_find_req_msg find = 4; + optional dht_find_node_rsp_msg node = 5; + optional dht_find_value_rsp_msg val = 6; } diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h index 6ff61ce6..8c6e5eb5 100644 --- a/src/ipcpd/unicast/dir/ops.h +++ b/src/ipcpd/unicast/dir/ops.h @@ -23,24 +23,20 @@ #ifndef OUROBOROS_IPCPD_UNICAST_DIR_OPS_H #define OUROBOROS_IPCPD_UNICAST_DIR_OPS_H - struct dir_ops { - void * (* create)(void); + int (* init)(void * config); - void (* destroy)(void * dir); + void (* fini)(void); - int (* bootstrap)(void * dir); + int (* start)(void); - int (* reg)(void * dir, - const uint8_t * hash); + void (* stop)(void); - int (* unreg)(void * dir, - const uint8_t * hash); + int (* reg)(const uint8_t * hash); - uint64_t (* query)(void * dir, - const uint8_t * hash); + int (* unreg)(const uint8_t * hash); - int (* wait_running)(void * dir); + uint64_t (* query)(const uint8_t * hash); }; #endif /* OUROBOROS_IPCPD_UNICAST_DIR_OPS_H */ diff --git a/src/ipcpd/unicast/dir/tests/CMakeLists.txt b/src/ipcpd/unicast/dir/tests/CMakeLists.txt index f62ed993..41a18c27 100644 --- a/src/ipcpd/unicast/dir/tests/CMakeLists.txt +++ b/src/ipcpd/unicast/dir/tests/CMakeLists.txt @@ -15,6 +15,9 @@ include_directories(${CMAKE_BINARY_DIR}/include) get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) +set(DEBUG_PROTO_DHT FALSE CACHE BOOL + "Add DHT protocol message output to debug logging") + create_test_sourcelist(${PARENT_DIR}_tests test_suite.c # Add new tests here dht_test.c diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c index bea2c3e7..72e8f0df 100644 --- a/src/ipcpd/unicast/dir/tests/dht_test.c +++ b/src/ipcpd/unicast/dir/tests/dht_test.c @@ -4,7 +4,6 @@ * Unit tests of the DHT * * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as @@ -21,76 +20,1899 @@ */ #define __DHT_TEST__ -#define DHT_TEST_KEY_LEN 32 -#include "dht.c" +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include <ouroboros/test.h> +#include <ouroboros/list.h> +#include <ouroboros/utils.h> -#include <pthread.h> +#include "dht.pb-c.h" + +#include <assert.h> +#include <inttypes.h> #include <time.h> #include <stdlib.h> #include <stdio.h> -#define CONTACTS 1000 +#define DHT_MAX_RAND_SIZE 64 +#define DHT_TEST_KEY_LEN 32 +#define DHT_TEST_ADDR 0x1234567890abcdefULL -int dht_test(int argc, - char ** argv) +/* forward declare for use in the dht code */ +/* Packet sink for DHT tests */ +struct { + bool enabled; + + struct list_head list; + size_t len; +} sink; + +struct message { + struct list_head next; + void * msg; + uint64_t dst; +}; + +static int sink_send_msg(buffer_t * pkt, + uint64_t addr) { - struct dht * dht; - uint8_t key[DHT_TEST_KEY_LEN]; - size_t i; + struct message * m; - (void) argc; - (void) argv; + assert(pkt != NULL); + assert(addr != 0); + + assert(!list_is_empty(&sink.list) || sink.len == 0); + + if (!sink.enabled) + goto finish; + + m = malloc(sizeof(*m)); + if (m == NULL) { + printf("Failed to malloc message."); + goto fail_malloc; + } + + m->msg = dht_msg__unpack(NULL, pkt->len, pkt->data); + if (m->msg == NULL) + goto fail_unpack; + + m->dst = addr; + + list_add_tail(&m->next, &sink.list); + + ++sink.len; + finish: + freebuf(*pkt); + + return 0; + fail_unpack: + free(m); + fail_malloc: + freebuf(*pkt); + return -1; +} + +#include "dht.c" + +/* Test helpers */ + +static void sink_init(void) +{ + list_head_init(&sink.list); + sink.len = 0; + sink.enabled = true; +} + +static void sink_clear(void) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &sink.list) { + struct message * m = list_entry(p, struct message, next); + list_del(&m->next); + dht_msg__free_unpacked((dht_msg_t *) m->msg, NULL); + free(m); + --sink.len; + } + + assert(list_is_empty(&sink.list)); +} + +static void sink_fini(void) +{ + sink_clear(); + + assert(list_is_empty(&sink.list) || sink.len != 0); +} + +static dht_msg_t * sink_read(void) +{ + struct message * m; + dht_msg_t * msg; + + assert(!list_is_empty(&sink.list) || sink.len == 0); + + if (list_is_empty(&sink.list)) + return NULL; + + m = list_first_entry(&sink.list, struct message, next); + + --sink.len; + + list_del(&m->next); + + msg = m->msg; + + free(m); + + return (dht_msg_t *) msg; +} + +static const buffer_t test_val = { + .data = (uint8_t *) "test_value", + .len = 10 +}; + +static const buffer_t test_val2 = { + .data = (uint8_t *) "test_value_2", + .len = 12 +}; + +static struct dir_dht_config test_dht_config = default_dht_config; + +static int random_value_len(buffer_t * b) +{ + assert(b != NULL); + assert(b->len > 0 && b->len <= DHT_MAX_RAND_SIZE); + + b->data = malloc(b->len); + if (b->data == NULL) + goto fail_malloc; + + random_buffer(b->data, b->len); + + return 0; + + fail_malloc: + return -ENOMEM; +} + +static int random_value(buffer_t * b) +{ + assert(b != NULL); + + b->len = rand() % DHT_MAX_RAND_SIZE + 1; + + return random_value_len(b); +} + +static int fill_dht_with_contacts(size_t n) +{ + size_t i; + uint8_t * id; + + for (i = 0; i < n; i++) { + uint64_t addr = generate_cookie(); + id = generate_id(); + if (id == NULL) + goto fail_id; + + if (dht_kv_update_contacts(id, addr) < 0) + goto fail_update; + free(id); + } + + return 0; + + fail_update: + free(id); + fail_id: + return -1; +} + +static int fill_store_with_random_values(const uint8_t * key, + size_t len, + size_t n_values) +{ + buffer_t val; + struct timespec now; + size_t i; + uint8_t * _key; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + for (i = 0; i < n_values; ++i) { + if (key != NULL) + _key = (uint8_t *) key; + else { + _key = generate_id(); + if (_key == NULL) + goto fail_key; + } + + if (len == 0) + val.len = rand() % DHT_MAX_RAND_SIZE + 1; + else + val.len = len; + + if (random_value_len(&val) < 0) + goto fail_value; + + if (dht_kv_store(_key, val, now.tv_sec + 10) < 0) + goto fail_store; + + freebuf(val); + if (key == NULL) + free(_key); + } + + return 0; + + fail_store: + freebuf(val); + fail_value: + free(_key); + fail_key: + return -1; +} + +static int random_contact_list(dht_contact_msg_t *** contacts, + size_t max) +{ + size_t i; + + assert(contacts != NULL); + assert(max > 0); + + *contacts = malloc(max * sizeof(**contacts)); + if (*contacts == NULL) + goto fail_malloc; + + for (i = 0; i < max; i++) { + (*contacts)[i] = malloc(sizeof(*(*contacts)[i])); + if ((*contacts)[i] == NULL) + goto fail_contacts; + + dht_contact_msg__init((*contacts)[i]); + + (*contacts)[i]->id.data = generate_id(); + if ((*contacts)[i]->id.data == NULL) + goto fail_contact; + + (*contacts)[i]->id.len = dht.id.len; + (*contacts)[i]->addr = generate_cookie(); + } + + return 0; + + fail_contact: + dht_contact_msg__free_unpacked((*contacts)[i], NULL); + fail_contacts: + while (i-- > 0) + free((*contacts)[i]); + free(*contacts); + fail_malloc: + return -ENOMEM; +} + +static void clear_contacts(dht_contact_msg_t ** contacts, + size_t len) +{ + size_t i; + + assert(contacts != NULL); + if (*contacts == NULL) + return; + + for (i = 0; i < len; ++i) + dht_contact_msg__free_unpacked((contacts)[i], NULL); + + free(*contacts); + *contacts = NULL; +} + +/* Start of actual tests */ + +static int test_dht_init_fini(void) +{ + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_start_stop(void) +{ + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (dht_start() < 0) { + printf("Failed to start dht.\n"); + goto fail_start; + } + + dht_stop(); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; - dht = dht_create(); - if (dht == NULL) { + fail_start: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_val_entry_create_destroy(void) +{ + struct val_entry * e; + struct timespec now; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (dht_init(&test_dht_config) < 0) { printf("Failed to create dht.\n"); - return -1; + goto fail_init; } - dht_destroy(dht); + e = val_entry_create(test_val, now.tv_sec + 10); + if (e == NULL) { + printf("Failed to create val entry.\n"); + goto fail_entry; + } + + val_entry_destroy(e); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; - dht = dht_create(); - if (dht == NULL) { - printf("Failed to re-create dht.\n"); - return -1; + fail_entry: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_entry_create_destroy(void) +{ + struct dht_entry * e; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; } - if (dht_bootstrap(dht)) { - printf("Failed to bootstrap dht.\n"); - dht_destroy(dht); - return -1; + e = dht_entry_create(dht.id.data); + if (e == NULL) { + printf("Failed to create dht entry.\n"); + goto fail_entry; } - dht_destroy(dht); + dht_entry_destroy(e); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_entry: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_entry_update_get_val(void) +{ + struct dht_entry * e; + struct val_entry * v; + struct timespec now; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + e = dht_entry_create(dht.id.data); + if (e == NULL) { + printf("Failed to create dht entry.\n"); + goto fail_entry; + } + + if (dht_entry_get_val(e, test_val) != NULL) { + printf("Found value in empty dht entry.\n"); + goto fail_get; + } + + if (dht_entry_update_val(e, test_val, now.tv_sec + 10) < 0) { + printf("Failed to update dht entry value.\n"); + goto fail_get; + } + + if (dht_entry_get_val(e, test_val2) != NULL) { + printf("Found value in dht entry with different key.\n"); + goto fail_get; + } + + v = dht_entry_get_val(e, test_val); + if (v == NULL) { + printf("Failed to get value from dht entry.\n"); + goto fail_get; + } + + if (v->val.len != test_val.len) { + printf("Length in dht entry does not match expected.\n"); + goto fail_get; + } + + if(memcmp(v->val.data, test_val.data, test_val.len) != 0) { + printf("Data in dht entry does not match expected.\n"); + goto fail_get; + } - dht = dht_create(); - if (dht == NULL) { - printf("Failed to re-create dht.\n"); - return -1; + if (dht_entry_update_val(e, test_val, now.tv_sec + 15) < 0) { + printf("Failed to update exsting dht entry value.\n"); + goto fail_get; } - if (dht_bootstrap(dht)) { - printf("Failed to bootstrap dht.\n"); - dht_destroy(dht); - return -1; + if (v->t_exp != now.tv_sec + 15) { + printf("Expiration time in dht entry value not updated.\n"); + goto fail_get; } - for (i = 0; i < CONTACTS; ++i) { - uint64_t addr; - random_buffer(&addr, sizeof(addr)); - random_buffer(key, DHT_TEST_KEY_LEN); - pthread_rwlock_wrlock(&dht->lock); - if (dht_update_bucket(dht, key, addr)) { - pthread_rwlock_unlock(&dht->lock); - printf("Failed to update bucket.\n"); - dht_destroy(dht); - return -1; + if (dht_entry_update_val(e, test_val, now.tv_sec + 5) < 0) { + printf("Failed to update existing dht entry value (5).\n"); + goto fail_get; + } + + if (v->t_exp != now.tv_sec + 15) { + printf("Expiration time in dht entry shortened.\n"); + goto fail_get; + } + + if (dht_entry_get_val(e, test_val) != v) { + printf("Wrong value in dht entry found after update.\n"); + goto fail_get; + } + + dht_entry_destroy(e); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_get: + dht_entry_destroy(e); + fail_entry: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_entry_update_get_lval(void) +{ + struct dht_entry * e; + struct val_entry * v; + struct timespec now; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + e = dht_entry_create(dht.id.data); + if (e == NULL) { + printf("Failed to create dht entry.\n"); + goto fail_entry; + } + + if (dht_entry_get_lval(e, test_val) != NULL) { + printf("Found value in empty dht entry.\n"); + goto fail_get; + } + + if (dht_entry_update_lval(e, test_val) < 0) { + printf("Failed to update dht entry value.\n"); + goto fail_get; + } + + v = dht_entry_get_lval(e, test_val); + if (v== NULL) { + printf("Failed to get value from dht entry.\n"); + goto fail_get; + } + + if (dht_entry_get_lval(e, test_val2) != NULL) { + printf("Found value in dht entry in vals.\n"); + goto fail_get; + } + + if (v->val.len != test_val.len) { + printf("Length in dht entry does not match expected.\n"); + goto fail_get; + } + + if(memcmp(v->val.data, test_val.data, test_val.len) != 0) { + printf("Data in dht entry does not match expected.\n"); + goto fail_get; + } + + if (dht_entry_update_lval(e, test_val) < 0) { + printf("Failed to update existing dht entry value.\n"); + goto fail_get; + } + + if (dht_entry_get_lval(e, test_val) != v) { + printf("Wrong value in dht entry found after update.\n"); + goto fail_get; + } + + dht_entry_destroy(e); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_get: + dht_entry_destroy(e); + fail_entry: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_contact_create_destroy(void) +{ + struct contact * c; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + c = contact_create(dht.id.data, dht.addr); + if (c == NULL) { + printf("Failed to create contact.\n"); + goto fail_contact; + } + + contact_destroy(c); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_contact: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_update_bucket(void) +{ + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(1000) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_update; + } + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_update: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_contact_list(void) +{ + struct list_head cl; + ssize_t len; + ssize_t items; + + TEST_START(); + + list_head_init(&cl); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + items = 5; + + if (fill_dht_with_contacts(items) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_fill; + } + + len = dht_kv_contact_list(dht.id.data, &cl, dht.k); + if (len < 0) { + printf("Failed to get contact list.\n"); + goto fail_fill; + } + + if (len != items) { + printf("Failed to get contacts (%zu != %zu).\n", len, items); + goto fail_contact_list; + } + + contact_list_destroy(&cl); + + items = 100; + + if (fill_dht_with_contacts(items) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_fill; + } + + len = dht_kv_contact_list(dht.id.data, &cl, items); + if (len < 0) { + printf("Failed to get contact list.\n"); + goto fail_fill; + } + + if ((size_t) len < dht.k) { + printf("Failed to get contacts (%zu < %zu).\n", len, dht.k); + goto fail_contact_list; + } + + contact_list_destroy(&cl); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_contact_list: + contact_list_destroy(&cl); + fail_fill: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_get_values(void) +{ + buffer_t * vals; + ssize_t len; + size_t n = sizeof(uint64_t); + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_store_with_random_values(dht.id.data, n, 3) < 0) { + printf("Failed to fill store with random values.\n"); + goto fail_fill; + } + + len = dht_kv_retrieve(dht.id.data, &vals); + if (len < 0) { + printf("Failed to get values from store.\n"); + goto fail_fill; + } + + if (len != 3) { + printf("Failed to get %ld values (%zu).\n", 3L, len); + goto fail_get_values; + } + + freebufs(vals, len); + + if (fill_store_with_random_values(dht.id.data, n, 20) < 0) { + printf("Failed to fill store with random values.\n"); + goto fail_fill; + } + + len = dht_kv_retrieve(dht.id.data, &vals); + if (len < 0) { + printf("Failed to get values from store.\n"); + goto fail_fill; + } + + if (len != DHT_MAX_VALS) { + printf("Failed to get %d values.\n", DHT_MAX_VALS); + goto fail_get_values; + } + + freebufs(vals, len); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_get_values: + freebufs(vals, len); + fail_fill: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_req_msg(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_find_node_req_msg(dht.id.data); + if (msg == NULL) { + printf("Failed to get find node request message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_NODE_REQ) { + printf("Wrong code in find_node_req message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_FIND_NODE_REQ]); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_node_req.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_node_req buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_node_req message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_req message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_rsp_msg(void) +{ + dht_contact_msg_t ** contacts; + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, 0); + if (msg == NULL) { + printf("Failed to get find node response message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_NODE_RSP) { + printf("Wrong code in find_node_rsp message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_FIND_NODE_RSP]); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_node_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_node_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_node_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_node_rsp message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_rsp_msg_contacts(void) +{ + dht_contact_msg_t ** contacts; + dht_msg_t * msg; + dht_msg_t * upk; + uint8_t * buf; + size_t len; + ssize_t n; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(100) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_fill; + } + + n = dht_kv_get_contacts(dht.id.data, &contacts); + if (n < 0) { + printf("Failed to get contacts.\n"); + goto fail_fill; + } + + if ((size_t) n < dht.k) { + printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k); + goto fail_fill; + } + + msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, n); + if (msg == NULL) { + printf("Failed to build find node response message.\n"); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_node_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_node_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_node_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_node_rsp message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + clear_contacts(contacts, n); + fail_fill: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_req_msg(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_find_value_req_msg(dht.id.data); + if (msg == NULL) { + printf("Failed to build find value request message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_VALUE_REQ) { + printf("Wrong code in find_value_req message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_FIND_VALUE_REQ]); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_value_req.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_node_req buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_value_req message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_req message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, NULL, 0, NULL, 0); + if (msg == NULL) { + printf("Failed to build find value response message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_VALUE_RSP) { + printf("Wrong code in find_value_rsp message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_FIND_VALUE_RSP]); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_value_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_value_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_value_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_rsp message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg_contacts(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + dht_contact_msg_t ** contacts; + ssize_t n; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(100) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_fill; + } + + n = dht_kv_get_contacts(dht.id.data, &contacts); + if (n < 0) { + printf("Failed to get contacts.\n"); + goto fail_fill; + } + + if ((size_t) n < dht.k) { + printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k); + goto fail_fill; + } + + msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, &contacts, n, NULL, 0); + if (msg == NULL) { + printf("Failed to build find value response message.\n"); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_value_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_value_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_value_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_rsp message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + clear_contacts(contacts, n); + fail_fill: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg_values(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + buffer_t * values; + size_t i; + uint64_t ck; + + TEST_START(); + + ck = generate_cookie(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + values = malloc(sizeof(*values) * 8); + if (values == NULL) { + printf("Failed to malloc values.\n"); + goto fail_values; + } + + for (i = 0; i < 8; i++) { + if (random_value(&values[i]) < 0) { + printf("Failed to create random value.\n"); + goto fail_fill; } - pthread_rwlock_unlock(&dht->lock); } - dht_destroy(dht); + msg = dht_kv_find_value_rsp_msg(dht.id.data, ck, NULL, 0, &values, 8); + if (msg == NULL) { + printf("Failed to build find value response message.\n"); + goto fail_msg; + } - return 0; + values = NULL; /* msg owns the values now */ + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_value_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_value_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_value_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_rsp message.\n"); + goto fail_unpack; + } + + if (upk->code != DHT_FIND_VALUE_RSP) { + printf("Wrong code in find_value_rsp message (%s != %s).\n", + dht_code_str[upk->code], + dht_code_str[DHT_FIND_VALUE_RSP]); + goto fail_unpack; + } + + if (upk->val == NULL) { + printf("No values in find_value_rsp message.\n"); + goto fail_unpack; + } + + if (upk->val->n_values != 8) { + printf("Not enough values in find_value_rsp (%zu != %zu).\n", + upk->val->n_values, 8UL); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + free(values); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + fail_fill: + while((i--) > 0) + freebuf(values[i]); + free(values); + fail_values: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_store_msg(void) +{ + dht_msg_t * msg; + size_t len; + uint8_t * buf; + struct timespec now; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_store_msg(dht.id.data, test_val, now.tv_sec + 10); + if (msg == NULL) { + printf("Failed to get store message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_STORE) { + printf("Wrong code in store message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_STORE]); + goto fail_store_msg; + } + + if (dht_kv_validate_msg(msg) < 0) { + printf("Failed to validate store message.\n"); + goto fail_store_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed msg length.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc store msg buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack store message.\n"); + goto fail_pack; + } + + free(buf); + + dht_msg__free_unpacked(msg, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_pack: + free(buf); + fail_store_msg: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_query_contacts_req_rsp(void) +{ + dht_msg_t * req; + dht_msg_t * rsp; + dht_contact_msg_t ** contacts; + size_t len = 2; + + uint8_t * key; + + TEST_START(); + + sink_init(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(1) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_prep; + } + + key = generate_id(); + if (key == NULL) { + printf("Failed to generate key.\n"); + goto fail_prep; + } + + if (dht_kv_query_contacts(key, NULL) < 0) { + printf("Failed to query contacts.\n"); + goto fail_query; + } + + req = sink_read(); + if (req == NULL) { + printf("Failed to read request from sink.\n"); + goto fail_query; + } + + if (dht_kv_validate_msg(req) < 0) { + printf("Failed to validate find node req.\n"); + goto fail_val_req; + } + + if (random_contact_list(&contacts, len) < 0) { + printf("Failed to create random contact.\n"); + goto fail_val_req; + } + + rsp = dht_kv_find_node_rsp_msg(key, req->find->cookie, &contacts, len); + if (rsp == NULL) { + printf("Failed to create find node response message.\n"); + goto fail_rsp; + } + + memcpy(rsp->src->id.data, dht.id.data, dht.id.len); + rsp->src->addr = generate_cookie(); + + if (dht_kv_validate_msg(rsp) < 0) { + printf("Failed to validate find node response message.\n"); + goto fail_val_rsp; + } + + do_dht_kv_find_node_rsp(rsp->node); + + /* dht_contact_msg__free_unpacked(contacts[0], NULL); set to NULL */ + + free(contacts); + + dht_msg__free_unpacked(rsp, NULL); + + free(key); + + dht_msg__free_unpacked(req, NULL); + + sink_fini(); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_val_rsp: + dht_msg__free_unpacked(rsp, NULL); + fail_rsp: + while (len-- > 0) + dht_contact_msg__free_unpacked(contacts[len], NULL); + free(contacts); + fail_val_req: + dht_msg__free_unpacked(req, NULL); + fail_query: + free(key); + fail_prep: + dht_fini(); + fail_init: + sink_fini(); + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_req_create_destroy(void) +{ + struct dht_req * req; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + req = dht_req_create(dht.id.data); + if (req == NULL) { + printf("Failed to create kad request.\n"); + goto fail_req; + } + + dht_req_destroy(req); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_req: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_reg_unreg(void) +{ + TEST_START(); + + sink_init(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (dht_reg(dht.id.data) < 0) { + printf("Failed to register own id.\n"); + goto fail_reg; + } + + if (sink.len != 0) { + printf("Packet sent without contacts!"); + goto fail_msg; + } + + if (dht_unreg(dht.id.data) < 0) { + printf("Failed to unregister own id.\n"); + goto fail_msg; + } + + dht_fini(); + + sink_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_msg: + dht_unreg(dht.id.data); + fail_reg: + dht_fini(); + fail_init: + sink_fini(); + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_reg_unreg_contacts(void) +{ + dht_msg_t * msg; + + TEST_START(); + + sink_init(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(4) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_reg; + } + + if (dht_reg(dht.id.data) < 0) { + printf("Failed to register own id.\n"); + goto fail_reg; + } + + if (sink.len != dht.alpha) { + printf("Packet sent to too few contacts!\n"); + goto fail_msg; + } + + msg = sink_read(); + if (msg == NULL) { + printf("Failed to read message from sink.\n"); + goto fail_msg; + } + + if (msg->code != DHT_STORE) { + printf("Wrong code in dht reg message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_STORE]); + goto fail_validation; + } + + if (dht_kv_validate_msg(msg) < 0) { + printf("Failed to validate dht message.\n"); + goto fail_validation; + } + + if (dht_unreg(dht.id.data) < 0) { + printf("Failed to unregister own id.\n"); + goto fail_validation; + } + + dht_msg__free_unpacked(msg, NULL); + + dht_fini(); + + sink_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_validation: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + sink_clear(); + dht_unreg(dht.id.data); + fail_reg: + dht_fini(); + fail_init: + sink_fini(); + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_reg_query_local(void) +{ + struct timespec now; + buffer_t test_addr; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (addr_to_buf(1234321, &test_addr) < 0) { + printf("Failed to convert test address to buffer.\n"); + goto fail_buf; + } + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (dht_reg(dht.id.data) < 0) { + printf("Failed to register own id.\n"); + goto fail_reg; + } + + if (dht_query(dht.id.data) == dht.addr) { + printf("Succeeded to query own id.\n"); + goto fail_get; + } + + if (dht_kv_store(dht.id.data, test_addr, now.tv_sec + 5) < 0) { + printf("Failed to publish value.\n"); + goto fail_get; + } + + if (dht_query(dht.id.data) != 1234321) { + printf("Failed to return remote addr.\n"); + goto fail_get; + } + + if (dht_unreg(dht.id.data) < 0) { + printf("Failed to unregister own id.\n"); + goto fail_get; + } + + freebuf(test_addr); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_get: + dht_unreg(dht.id.data); + fail_reg: + dht_fini(); + fail_init: + freebuf(test_addr); + fail_buf: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_query(void) +{ + uint8_t * key; + struct dir_dht_config cfg; + + TEST_START(); + + sink_init(); + + cfg = default_dht_config; + cfg.peer = generate_cookie(); + + if (dht_init(&cfg)) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + key = generate_id(); + if (key == NULL) { + printf("Failed to generate key.\n"); + goto fail_key; + } + + if (dht_query(key) != INVALID_ADDR) { + printf("Succeeded to get address without contacts.\n"); + goto fail_get; + } + + if (sink.len != 0) { + printf("Packet sent without contacts!"); + goto fail_test; + } + + free(key); + + dht_fini(); + + sink_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_test: + sink_clear(); + fail_get: + free(key); + fail_key: + dht_fini(); + fail_init: + sink_fini(); + return TEST_RC_FAIL; +} + +static int test_dht_query_contacts(void) +{ + dht_msg_t * msg; + uint8_t * key; + struct dir_dht_config cfg; + + + TEST_START(); + + sink_init(); + + cfg = default_dht_config; + cfg.peer = generate_cookie(); + + if (dht_init(&cfg)) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(10) < 0) { + printf("Failed to fill with contacts!"); + goto fail_contacts; + } + + key = generate_id(); + if (key == NULL) { + printf("Failed to generate key."); + goto fail_contacts; + } + + if (dht_query(key) != INVALID_ADDR) { + printf("Succeeded to get address for random id.\n"); + goto fail_query; + } + + msg = sink_read(); + if (msg == NULL) { + printf("Failed to read message.!\n"); + goto fail_read; + } + + if (dht_kv_validate_msg(msg) < 0) { + printf("Failed to validate dht message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_VALUE_REQ) { + printf("Failed to validate dht message.\n"); + goto fail_msg; + } + + dht_msg__free_unpacked(msg, NULL); + + free(key); + + sink_clear(); + + dht_fini(); + + sink_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_read: + sink_clear(); + fail_query: + free(key); + fail_contacts: + dht_fini(); + fail_init: + sink_fini(); + return TEST_RC_FAIL; +} + +int dht_test(int argc, + char ** argv) +{ + int rc = 0; + + (void) argc; + (void) argv; + + rc |= test_dht_init_fini(); + rc |= test_dht_start_stop(); + rc |= test_val_entry_create_destroy(); + rc |= test_dht_entry_create_destroy(); + rc |= test_dht_entry_update_get_val(); + rc |= test_dht_entry_update_get_lval(); + rc |= test_dht_kv_contact_create_destroy(); + rc |= test_dht_kv_contact_list(); + rc |= test_dht_kv_update_bucket(); + rc |= test_dht_kv_get_values(); + rc |= test_dht_kv_find_node_req_msg(); + rc |= test_dht_kv_find_node_rsp_msg(); + rc |= test_dht_kv_find_node_rsp_msg_contacts(); + rc |= test_dht_kv_query_contacts_req_rsp(); + rc |= test_dht_kv_find_value_req_msg(); + rc |= test_dht_kv_find_value_rsp_msg(); + rc |= test_dht_kv_find_value_rsp_msg_contacts(); + rc |= test_dht_kv_find_value_rsp_msg_values(); + rc |= test_dht_kv_store_msg(); + rc |= test_dht_req_create_destroy(); + rc |= test_dht_reg_unreg(); + rc |= test_dht_reg_unreg_contacts(); + rc |= test_dht_reg_query_local(); + rc |= test_dht_query(); + rc |= test_dht_query_contacts(); + + return rc; } diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index 14eaac09..a4e3be36 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -60,7 +60,7 @@ #include <assert.h> #define QOS_BLOCK_LEN 672 -#define RIB_FILE_STRLEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX) +#define RIB_FILE_STRLEN (169 + RIB_TM_STRLEN + QOS_BLOCK_LEN * QOS_CUBE_MAX) #define RIB_NAME_STRLEN 256 #ifndef CLOCK_REALTIME_COARSE @@ -189,7 +189,7 @@ static int dt_rib_read(const char * path, char str[QOS_BLOCK_LEN + 1]; char addrstr[20]; char * entry; - char tmstr[20]; + char tmstr[RIB_TM_STRLEN]; size_t rxqlen = 0; size_t txqlen = 0; struct tm * tm; @@ -217,8 +217,8 @@ static int dt_rib_read(const char * path, else sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr); - tm = localtime(&dt.stat[fd].stamp); - strftime(tmstr, sizeof(tmstr), "%F %T", tm); + tm = gmtime(&dt.stat[fd].stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); if (fd >= PROG_RES_FDS) { fccntl(fd, FLOWGRXQLEN, &rxqlen); @@ -226,11 +226,11 @@ static int dt_rib_read(const char * path, } sprintf(buf, - "Flow established at: %20s\n" + "Flow established at: %.*s\n" "Endpoint address: %20s\n" "Queued packets (rx): %20zu\n" "Queued packets (tx): %20zu\n\n", - tmstr, addrstr, rxqlen, txqlen); + RIB_TM_STRLEN - 1, tmstr, addrstr, rxqlen, txqlen); for (i = 0; i < QOS_CUBE_MAX; ++i) { sprintf(str, "Qos cube %3d:\n" @@ -287,49 +287,46 @@ static int dt_rib_readdir(char *** buf) pthread_rwlock_rdlock(&dt.lock); - if (dt.n_flows < 1) { - pthread_rwlock_unlock(&dt.lock); - return 0; - } + if (dt.n_flows < 1) + goto no_flows; *buf = malloc(sizeof(**buf) * dt.n_flows); - if (*buf == NULL) { - pthread_rwlock_unlock(&dt.lock); - return -ENOMEM; - } + if (*buf == NULL) + goto fail_entries; for (i = 0; i < PROG_MAX_FLOWS; ++i) { pthread_mutex_lock(&dt.stat[i].lock); if (dt.stat[i].stamp == 0) { pthread_mutex_unlock(&dt.stat[i].lock); - /* Optimization: skip unused res_fds. */ - if (i < PROG_RES_FDS) - i = PROG_RES_FDS; - continue; + break; } + pthread_mutex_unlock(&dt.stat[i].lock); + sprintf(entry, "%zu", i); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - while (idx-- > 0) - free((*buf)[idx]); - free(*buf); - pthread_mutex_unlock(&dt.stat[i].lock); - pthread_rwlock_unlock(&dt.lock); - return -ENOMEM; - } + if ((*buf)[idx] == NULL) + goto fail_entry; strcpy((*buf)[idx++], entry); - pthread_mutex_unlock(&dt.stat[i].lock); } assert((size_t) idx == dt.n_flows); - + no_flows: pthread_rwlock_unlock(&dt.lock); return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); +fail_entries: + pthread_rwlock_unlock(&dt.lock); + return -ENOMEM; + #else (void) buf; return 0; @@ -728,17 +725,17 @@ int dt_start(void) if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - psched_destroy(dt.psched); - return -1; + goto fail_listener; } return 0; + fail_listener: + notifier_unreg(&handle_event); fail_notifier_reg: psched_destroy(dt.psched); fail_psched: return -1; - } void dt_stop(void) @@ -757,7 +754,7 @@ int dt_reg_comp(void * comp, { int eid; - assert(func); + assert(func != NULL); pthread_rwlock_wrlock(&dt.lock); @@ -783,6 +780,23 @@ int dt_reg_comp(void * comp, return eid; } +void dt_unreg_comp(int eid) +{ + assert(eid >= 0 && eid < PROG_RES_FDS); + + pthread_rwlock_wrlock(&dt.lock); + + assert(dt.comps[eid].post_packet != NULL); + + dt.comps[eid].post_packet = NULL; + dt.comps[eid].comp = NULL; + dt.comps[eid].name = NULL; + + pthread_rwlock_unlock(&dt.lock); + + return; +} + int dt_write_packet(uint64_t dst_addr, qoscube_t qc, uint64_t eid, @@ -811,7 +825,8 @@ int dt_write_packet(uint64_t dst_addr, #endif fd = pff_nhop(dt.pff[qc], dst_addr); if (fd < 0) { - log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); + log_dbg("Could not get nhop for " ADDR_FMT32 ".", + ADDR_VAL32(&dst_addr)); #ifdef IPCP_FLOW_STATS if (eid < PROG_RES_FDS) { pthread_mutex_lock(&dt.stat[eid].lock); diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h index 7198a013..2c5b7978 100644 --- a/src/ipcpd/unicast/dt.h +++ b/src/ipcpd/unicast/dt.h @@ -39,9 +39,11 @@ int dt_start(void); void dt_stop(void); -int dt_reg_comp(void * comp, +int dt_reg_comp(void * comp, void (* func)(void * comp, struct shm_du_buff * sdb), - char * name); + char * name); + +void dt_unreg_comp(int eid); int dt_write_packet(uint64_t dst_addr, qoscube_t qc, diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index ecc3894e..61abff52 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -136,7 +136,7 @@ static int fa_rib_read(const char * path, char r_addrstr[21]; char s_eidstr[21]; char r_eidstr[21]; - char tmstr[20]; + char tmstr[RIB_TM_STRLEN]; char castr[1024]; char * entry; struct tm * tm; @@ -167,8 +167,8 @@ static int fa_rib_read(const char * path, sprintf(s_eidstr, "%" PRIu64, flow->s_eid); sprintf(r_eidstr, "%" PRIu64, flow->r_eid); - tm = localtime(&flow->stamp); - strftime(tmstr, sizeof(tmstr), "%F %T", tm); + tm = gmtime(&flow->stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); ca_print_stats(flow->ctx, castr, 1024); @@ -217,16 +217,12 @@ static int fa_rib_readdir(char *** buf) pthread_rwlock_rdlock(&fa.flows_lock); - if (fa.n_flows < 1) { - pthread_rwlock_unlock(&fa.flows_lock); - return 0; - } + if (fa.n_flows < 1) + goto no_flows; *buf = malloc(sizeof(**buf) * fa.n_flows); - if (*buf == NULL) { - pthread_rwlock_unlock(&fa.flows_lock); - return -ENOMEM; - } + if (*buf == NULL) + goto fail_entries; for (i = 0; i < PROG_MAX_FLOWS; ++i) { struct fa_flow * flow; @@ -238,22 +234,25 @@ static int fa_rib_readdir(char *** buf) sprintf(entry, "%zu", i); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - while (idx-- > 0) - free((*buf)[idx]); - free(*buf); - pthread_rwlock_unlock(&fa.flows_lock); - return -ENOMEM; - } + if ((*buf)[idx] == NULL) + goto fail_entry; strcpy((*buf)[idx++], entry); } assert((size_t) idx == fa.n_flows); - + no_flows: pthread_rwlock_unlock(&fa.flows_lock); return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + fail_entries: + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; #else (void) buf; return 0; @@ -648,19 +647,21 @@ int fa_init(void) if (pthread_cond_init(&fa.cond, &cattr)) goto fail_cond; - pthread_condattr_destroy(&cattr); - - list_head_init(&fa.cmds); - if (rib_reg(FA, &r_ops)) goto fail_rib_reg; fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); if ((int) fa.eid < 0) - goto fail_rib_reg; + goto fail_dt_reg; + + list_head_init(&fa.cmds); + + pthread_condattr_destroy(&cattr); return 0; + fail_dt_reg: + rib_unreg(FA); fail_rib_reg: pthread_cond_destroy(&fa.cond); fail_cond: diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c index bd1fee51..c2348242 100644 --- a/src/ipcpd/unicast/main.c +++ b/src/ipcpd/unicast/main.c @@ -55,7 +55,7 @@ #include <assert.h> #include <inttypes.h> -static int initialize_components(const struct ipcp_config * conf) +static int initialize_components(struct ipcp_config * conf) { assert(ipcp_dir_hash_len() != 0); @@ -77,23 +77,25 @@ static int initialize_components(const struct ipcp_config * conf) goto fail_dt; } - if (fa_init()) { - log_err("Failed to initialize flow allocator component."); - goto fail_fa; - } + ipcp_set_dir_hash_algo((enum hash_algo) conf->layer_info.dir_hash_algo); - if (dir_init()) { + if (dir_init(&conf->unicast.dir)) { log_err("Failed to initialize directory."); goto fail_dir; } + if (fa_init()) { + log_err("Failed to initialize flow allocator component."); + goto fail_fa; + } + ipcp_set_state(IPCP_INIT); return 0; - fail_dir: - fa_fini(); fail_fa: + dir_fini(); + fail_dir: dt_fini(); fail_dt: ca_fini(); @@ -105,10 +107,10 @@ static int initialize_components(const struct ipcp_config * conf) static void finalize_components(void) { - dir_fini(); - fa_fini(); + dir_fini(); + dt_fini(); ca_fini(); @@ -138,8 +140,15 @@ static int start_components(void) goto fail_connmgr_start; } + if (dir_start() < 0) { + log_err("Failed to start directory."); + goto fail_dir_start; + } + return 0; + fail_dir_start: + connmgr_stop(); fail_connmgr_start: enroll_stop(); fail_enroll_start: @@ -153,6 +162,8 @@ static int start_components(void) static void stop_components(void) { + dir_stop(); + connmgr_stop(); enroll_stop(); @@ -164,16 +175,6 @@ static void stop_components(void) ipcp_set_state(IPCP_INIT); } -static int bootstrap_components(void) -{ - if (dir_bootstrap()) { - log_err("Failed to bootstrap directory."); - return -1; - } - - return 0; -} - static int unicast_ipcp_enroll(const char * dst, struct layer_info * info) { @@ -231,32 +232,25 @@ static int unicast_ipcp_enroll(const char * dst, return -1; } -static int unicast_ipcp_bootstrap(const struct ipcp_config * conf) +static int unicast_ipcp_bootstrap(struct ipcp_config * conf) { assert(conf); assert(conf->type == THIS_TYPE); - enroll_bootstrap(conf); - if (initialize_components(conf) < 0) { log_err("Failed to init IPCP components."); goto fail_init; } + enroll_bootstrap(conf); + if (start_components() < 0) { log_err("Failed to init IPCP components."); goto fail_start; } - if (bootstrap_components() < 0) { - log_err("Failed to bootstrap IPCP components."); - goto fail_bootstrap; - } - return 0; - fail_bootstrap: - stop_components(); fail_start: finalize_components(); fail_init: diff --git a/src/ipcpd/unicast/routing/link-state.c b/src/ipcpd/unicast/routing/link-state.c index 0bc6a852..fa6b9f4a 100644 --- a/src/ipcpd/unicast/routing/link-state.c +++ b/src/ipcpd/unicast/routing/link-state.c @@ -141,7 +141,7 @@ static int str_adj(struct adjacency * adj, char * buf, size_t len) { - char tmbuf[64]; + char tmstr[RIB_TM_STRLEN]; char srcbuf[64]; char dstbuf[64]; char seqnobuf[64]; @@ -152,15 +152,16 @@ static int str_adj(struct adjacency * adj, if (len < LS_ENTRY_SIZE) return -1; - tm = localtime(&adj->stamp); - strftime(tmbuf, sizeof(tmbuf), "%F %T", tm); /* 19 chars */ + tm = gmtime(&adj->stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); sprintf(srcbuf, "%" PRIu64, adj->src); sprintf(dstbuf, "%" PRIu64, adj->dst); sprintf(seqnobuf, "%" PRIu64, adj->seqno); - sprintf(buf, "src: %20s\ndst: %20s\nseqno: %18s\nupd: %20s\n", - srcbuf, dstbuf, seqnobuf, tmbuf); + sprintf(buf, "src: %20s\ndst: %20s\nseqno: %18s\n" + "upd: %s\n", + srcbuf, dstbuf, seqnobuf, tmstr); return LS_ENTRY_SIZE; } @@ -257,56 +258,45 @@ static int lsdb_rib_readdir(char *** buf) pthread_rwlock_rdlock(&ls.db_lock); - if (ls.db_len + ls.nbs_len == 0) { - pthread_rwlock_unlock(&ls.db_lock); - return 0; - } + if (ls.db_len + ls.nbs_len == 0) + goto no_entries; *buf = malloc(sizeof(**buf) * (ls.db_len + ls.nbs_len)); - if (*buf == NULL) { - pthread_rwlock_unlock(&ls.db_lock); - return -ENOMEM; - } + if (*buf == NULL) + goto fail_entries; list_for_each(p, &ls.nbs) { struct nb * nb = list_entry(p, struct nb, next); char * str = (nb->type == NB_DT ? "dt." : "mgmt."); sprintf(entry, "%s%" PRIu64, str, nb->addr); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - while (idx-- > 0) - free((*buf)[idx]); - free(*buf); - pthread_rwlock_unlock(&ls.db_lock); - return -ENOMEM; - } + if ((*buf)[idx] == NULL) + goto fail_entry; - strcpy((*buf)[idx], entry); - - idx++; + strcpy((*buf)[idx++], entry); } list_for_each(p, &ls.db) { struct adjacency * a = list_entry(p, struct adjacency, next); sprintf(entry, "%" PRIu64 ".%" PRIu64, a->src, a->dst); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - ssize_t j; - for (j = 0; j < idx; ++j) - free(*buf[j]); - free(buf); - pthread_rwlock_unlock(&ls.db_lock); - return -ENOMEM; - } - - strcpy((*buf)[idx], entry); + if ((*buf)[idx] == NULL) + goto fail_entry; - idx++; + strcpy((*buf)[idx++], entry); } - + no_entries: pthread_rwlock_unlock(&ls.db_lock); return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + fail_entries: + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; } static struct rib_ops r_ops = { |