From fa1af6aaed6a46acd0af1600f4c63e79fcf9ff84 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Wed, 6 Aug 2025 12:29:02 +0200 Subject: ipcpd: Update DHT for unicast layer This is a rewrite of the DHT for name-to-address resolution in the unicast layer. It is now integrated as a proper directory policy. The dir_wait_running function is removed, instead the a DHT peer is passed on during IPCP enrolment. Each DHT request/response gets a random 64-bit ID ('cookie'). DHT messages to the same peer are deduped, except in the case when the DHT is low on contacts. In that case, it will contact the per it received at enrolment for more contacts. To combat packet loss, these messages are not deduped by means of a 'magic cookie', chosen at random when the DHT starts. The DHT parameters (Kademlia) can be set using the configfile or the IRM command line tools: if DIRECTORY_POLICY == DHT [dht_alpha (default: 3)] [dht_k (default: 8)] [dht_t_expire (default: 86400)] [dht_t_refresh (default: 900)] [dht_t_replicate (default: 900)] This commit also adds support for a protocol debug level (PP). Protocol debugging for the DHT can be enabled using the DEBUG_PROTO_DHT build flag. The DHT has the following message types: DHT_STORE, sent to k peers. Not acknowledged. DHT_STORE --> [2861814146dbf9b5|ed:d9:e2:c4]. key: bcc236ab6ec69e65 [32 bytes] val: 00000000c4e2d9ed [8 bytes] exp: 2025-08-03 17:29:44 (UTC). DHT_FIND_NODE_REQ, sent to 'alpha' peers, with a corresponding response. This is used to update the peer routing table to iteratively look for the nodes with IDs closest to the requested key. DHT_FIND_NODE_REQ --> [a62f92abffb451c4|ed:d9:e2:c4]. cookie: 2d4b7acef8308210 key: a62f92abffb451c4 [32 bytes] DHT_FIND_NODE_RSP <-- [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 2d4b7acef8308210 key: a62f92abffb451c4 [32 bytes] contacts: [1] [a62f92abffb451c4|9f:0d:c1:fb] DHT_FIND_VALUE_REQ, sent to 'k' peers, with a corresponding response. Used to find a value for a key. Will also send its closest known peers in the response. DHT_FIND_VALUE_REQ --> [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 80a1adcb09a2ff0a key: 42dee3b0415b4f69 [32 bytes] DHT_FIND_VALUE_RSP <-- [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 80a1adcb09a2ff0a key: 42dee3b0415b4f69 [32 bytes] values: [1] 00000000c4e2d9ed [8 bytes] contacts: [1] [a62f92abffb451c4|9f:0d:c1:fb] Also removes ubuntu 20 from appveyor config as it is not supported anymore. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/broadcast/main.c | 4 +- src/ipcpd/common/connmgr.c | 3 - src/ipcpd/common/enroll.c | 15 +- src/ipcpd/config.h.in | 8 +- src/ipcpd/eth/eth.c | 2 +- src/ipcpd/ipcp.c | 239 +- src/ipcpd/ipcp.h | 5 +- src/ipcpd/local/main.c | 2 +- src/ipcpd/udp/main.c | 10 +- src/ipcpd/unicast/CMakeLists.txt | 4 +- src/ipcpd/unicast/addr-auth.h | 8 + src/ipcpd/unicast/addr-auth/flat.c | 6 +- src/ipcpd/unicast/ca.c | 1 - src/ipcpd/unicast/dir.c | 51 +- src/ipcpd/unicast/dir.h | 9 +- src/ipcpd/unicast/dir/dht.c | 5127 +++++++++++++++++----------- src/ipcpd/unicast/dir/dht.h | 17 +- src/ipcpd/unicast/dir/dht.proto | 41 +- src/ipcpd/unicast/dir/ops.h | 18 +- src/ipcpd/unicast/dir/tests/CMakeLists.txt | 3 + src/ipcpd/unicast/dir/tests/dht_test.c | 1916 ++++++++++- src/ipcpd/unicast/dt.c | 81 +- src/ipcpd/unicast/dt.h | 6 +- src/ipcpd/unicast/fa.c | 49 +- src/ipcpd/unicast/main.c | 54 +- src/ipcpd/unicast/routing/link-state.c | 60 +- 26 files changed, 5404 insertions(+), 2335 deletions(-) (limited to 'src/ipcpd') diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c index 0f2bf26c..ebdb182c 100644 --- a/src/ipcpd/broadcast/main.c +++ b/src/ipcpd/broadcast/main.c @@ -155,7 +155,7 @@ static int broadcast_ipcp_enroll(const char * dst, return -1; } -static int broadcast_ipcp_bootstrap(const struct ipcp_config * conf) +static int broadcast_ipcp_bootstrap(struct ipcp_config * conf) { assert(conf); assert(conf->type == THIS_TYPE); @@ -207,7 +207,7 @@ static int broadcast_ipcp_join(int fd, { struct conn conn; time_t mpl = IPCP_BROADCAST_MPL; - buffer_t data = {0, NULL}; + buffer_t data = BUF_INIT; (void) qs; diff --git a/src/ipcpd/common/connmgr.c b/src/ipcpd/common/connmgr.c index aa9f043e..eed6238e 100644 --- a/src/ipcpd/common/connmgr.c +++ b/src/ipcpd/common/connmgr.c @@ -489,9 +489,6 @@ int connmgr_alloc(enum comp_id id, switch (id) { case COMPID_DT: notifier_event(NOTIFY_DT_CONN_ADD, conn); -#if defined(BUILD_IPCP_UNICAST) && defined(IPCP_CONN_WAIT_DIR) - dir_wait_running(); -#endif break; case COMPID_MGMT: notifier_event(NOTIFY_MGMT_CONN_ADD, conn); diff --git a/src/ipcpd/common/enroll.c b/src/ipcpd/common/enroll.c index 3fc999af..4b437b27 100644 --- a/src/ipcpd/common/enroll.c +++ b/src/ipcpd/common/enroll.c @@ -43,6 +43,10 @@ #include #include +#ifdef __APPLE__ +#define llabs labs +#endif + #define ENROLL_COMP "Enrollment" #define ENROLL_PROTO "OEP" /* Ouroboros enrollment protocol */ #define ENROLL_WARN_TIME_OFFSET 20 @@ -60,6 +64,13 @@ struct { pthread_t listener; } enroll; +#ifdef DEBUG_PROTO_OEP + + +#endif + + + static void * enroll_handle(void * o) { struct enroll_req req; @@ -107,8 +118,6 @@ static void * enroll_handle(void * o) log_info_id(req.id, "Handling incoming enrollment."); - /* TODO: authentication, timezone handling (UTC). */ - ack.result = -100; clock_gettime(CLOCK_REALTIME, &resp.t); @@ -242,7 +251,7 @@ int enroll_boot(struct conn * conn, rtt.tv_sec = resp.t.tv_sec; rtt.tv_nsec = resp.t.tv_nsec; - if (labs(ts_diff_ms(&t0, &rtt)) - delta_t > ENROLL_WARN_TIME_OFFSET) + if (llabs(ts_diff_ms(&t0, &rtt)) - delta_t > ENROLL_WARN_TIME_OFFSET) log_warn_id(id, "Clock offset above threshold."); return 0; diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index fe4f5fd2..b3a118ac 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -41,8 +41,6 @@ #define IPCP_LINUX_SLACK_NS @IPCP_LINUX_TIMERSLACK_NS@ -#cmakedefine IPCP_DEBUG_LOCAL - /* unicast IPCP */ #define QOS_PRIO_BE @IPCP_QOS_CUBE_BE_PRIO@ #define QOS_PRIO_VIDEO @IPCP_QOS_CUBE_VIDEO_PRIO@ @@ -56,6 +54,12 @@ #cmakedefine IPCP_CONN_WAIT_DIR #cmakedefine DISABLE_CORE_LOCK #cmakedefine IPCP_FLOW_STATS +#cmakedefine IPCP_DEBUG_LOCAL +#ifdef CONFIG_OUROBOROS_DEBUG +#cmakedefine DEBUG_PROTO_DHT +#cmakedefine DEBUG_PROTO_OEP +#cmakedefine DEBUG_PROTO_LS +#endif /* udp */ #cmakedefine HAVE_DDNS diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 850231cd..1028ee03 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1487,7 +1487,7 @@ static int eth_init_raw_socket(struct ifreq * ifr) } #endif -static int eth_ipcp_bootstrap(const struct ipcp_config * conf) +static int eth_ipcp_bootstrap(struct ipcp_config * conf) { struct ifreq ifr; int i; diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index bc16e2f0..ab38f5d0 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -68,6 +68,34 @@ #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif +static char * ipcp_type_str[] = { + "local", + "unicast", + "broadcast", + "eth-llc", + "eth-dix", + "udp" +}; + +static char * dir_hash_str[] = { + "SHA3-224", + "SHA3-256", + "SHA3-384", + "SHA3-512", + "CRC32", + "MD5" +}; + +static char * ipcp_state_str[] = { + "null", + "init", + "boot", + "bootstrapped", + "enrolled", + "operational", + "shutdown" +}; + struct { pid_t irmd_pid; char * name; @@ -102,13 +130,6 @@ struct { pthread_t acceptor; } ipcpd; -char * info[LAYER_NAME_SIZE + 1] = { - "_state", - "_type", - "_layer", - NULL -}; - struct cmd { struct list_head next; @@ -127,6 +148,11 @@ const char * ipcp_get_name(void) return ipcpd.name; } +void ipcp_set_dir_hash_algo(enum hash_algo algo) +{ + ipcpd.dir_hash_algo = algo; +} + size_t ipcp_dir_hash_len(void) { return hash_len(ipcpd.dir_hash_algo); @@ -158,6 +184,13 @@ void ipcp_hash_str(char * buf, buf[2 * i] = '\0'; } +static const char * info[] = { + "_state", + "_type", + "_layer", + NULL +}; + static int ipcp_rib_read(const char * path, char * buf, size_t len) @@ -223,7 +256,7 @@ static int ipcp_rib_readdir(char *** buf) *buf = malloc(sizeof(**buf) * i); if (*buf == NULL) - goto fail; + goto fail_entries; i = 0; @@ -236,12 +269,12 @@ static int ipcp_rib_readdir(char *** buf) return i; fail_dup: - while (i > 0) - free((*buf)[--i]); - fail: + while (i-- > 0) + free((*buf)[i]); + fail_entries: free(*buf); - return -1; + return -ENOMEM; } static int ipcp_rib_getattr(const char * path, @@ -402,20 +435,24 @@ static void free_msg(void * o) static void do_bootstrap(ipcp_config_msg_t * conf_msg, ipcp_msg_t * ret_msg) { - struct ipcp_config conf; + struct ipcp_config conf; + struct layer_info * info; log_info("Bootstrapping..."); if (ipcpd.ops->ipcp_bootstrap == NULL) { - log_err("Bootstrap unsupported."); + log_err("Failed to Bootstrap: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_INIT) { - log_err("IPCP in wrong state."); + + log_err("Failed to bootstrap: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_INIT]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } conf = ipcp_config_msg_to_s(conf_msg); @@ -431,15 +468,24 @@ static void do_bootstrap(ipcp_config_msg_t * conf_msg, } ret_msg->result = ipcpd.ops->ipcp_bootstrap(&conf); - if (ret_msg->result == 0) { - enum pol_dir_hash algo = conf.layer_info.dir_hash_algo; - strcpy(ipcpd.layer_name, conf.layer_info.name); - ipcpd.dir_hash_algo = (enum hash_algo) algo; - ret_msg->layer_info = layer_info_s_to_msg(&conf.layer_info); - ipcp_set_state(IPCP_OPERATIONAL); + if (ret_msg->result < 0) { + log_err("Failed to bootstrap IPCP."); + return; } - finish: - log_info("Finished bootstrapping: %d.", ret_msg->result); + + info = &conf.layer_info; + + strcpy(ipcpd.layer_name, info->name); + ipcpd.dir_hash_algo = (enum hash_algo) info->dir_hash_algo; + ret_msg->layer_info = layer_info_s_to_msg(info); + + log_info("Finished bootstrapping in %s.", info->name); + log_info(" type: %s", ipcp_type_str[ipcpd.type]); + log_info(" hash: %s [%zd bytes]", + dir_hash_str[ipcpd.dir_hash_algo], + ipcp_dir_hash_len()); + + ipcp_set_state(IPCP_OPERATIONAL); } static void do_enroll(const char * dst, @@ -450,26 +496,35 @@ static void do_enroll(const char * dst, log_info("Enrolling with %s...", dst); if (ipcpd.ops->ipcp_enroll == NULL) { - log_err("Enroll unsupported."); + log_err("Failed to enroll: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_INIT) { - log_err("IPCP in wrong state."); + log_err("Failed to enroll: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_INIT]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_enroll(dst, &info); - if (ret_msg->result == 0) { - strcpy(ipcpd.layer_name, info.name); - ipcpd.dir_hash_algo = (enum hash_algo) info.dir_hash_algo; - ret_msg->layer_info = layer_info_s_to_msg(&info); - ipcp_set_state(IPCP_OPERATIONAL); + if (ret_msg->result < 0) { + log_err("Failed to bootstrap IPCP."); + return; } - finish: - log_info("Finished enrolling with %s: %d.", dst, ret_msg->result); + + strcpy(ipcpd.layer_name, info.name); + ipcpd.dir_hash_algo = (enum hash_algo) info.dir_hash_algo; + ret_msg->layer_info = layer_info_s_to_msg(&info); + ipcp_set_state(IPCP_OPERATIONAL); + + log_info("Finished enrolling with %s in layer %s.", dst, info.name); + log_info(" type: %s", ipcp_type_str[ipcpd.type]); + log_info(" hash: %s [%zd bytes]", + dir_hash_str[ipcpd.dir_hash_algo], + ipcp_dir_hash_len()); } static void do_connect(const char * dst, @@ -480,14 +535,14 @@ static void do_connect(const char * dst, log_info("Connecting %s to %s...", comp, dst); if (ipcpd.ops->ipcp_connect == NULL) { - log_err("Connect unsupported."); + log_err("Failed to connect: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_connect(dst, comp, qs); - finish: - log_info("Finished connecting: %d.", ret_msg->result); + + log_info("Finished connecting."); } static void do_disconnect(const char * dst, @@ -497,16 +552,14 @@ static void do_disconnect(const char * dst, log_info("Disconnecting %s from %s...", comp, dst); if (ipcpd.ops->ipcp_disconnect == NULL) { - log_err("Disconnect unsupported."); + log_err("Failed to disconnect: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_disconnect(dst, comp); - finish: - log_info("Finished disconnecting %s from %s: %d.", - comp, dst, ret_msg->result); + log_info("Finished disconnecting %s from %s.", comp, dst); } static void do_reg(const uint8_t * hash, @@ -516,15 +569,14 @@ static void do_reg(const uint8_t * hash, log_info("Registering " HASH_FMT32 "...", HASH_VAL32(hash)); if (ipcpd.ops->ipcp_reg == NULL) { - log_err("Registration unsupported."); + log_err("Failed to register: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_reg(hash); - finish: - log_info("Finished registering " HASH_FMT32 " : %d.", - HASH_VAL32(hash), ret_msg->result); + + log_info("Finished registering " HASH_FMT32 ".", HASH_VAL32(hash)); } static void do_unreg(const uint8_t * hash, @@ -533,15 +585,14 @@ static void do_unreg(const uint8_t * hash, log_info("Unregistering " HASH_FMT32 "...", HASH_VAL32(hash)); if (ipcpd.ops->ipcp_unreg == NULL) { - log_err("Unregistration unsupported."); + log_err("Failed to unregister: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_unreg(hash); - finish: - log_info("Finished unregistering " HASH_FMT32 ": %d.", - HASH_VAL32(hash), ret_msg->result); + + log_info("Finished unregistering " HASH_FMT32 ".", HASH_VAL32(hash)); } static void do_query(const uint8_t * hash, @@ -550,13 +601,15 @@ static void do_query(const uint8_t * hash, /* TODO: Log this operation when IRMd has internal caches. */ if (ipcpd.ops->ipcp_query == NULL) { - log_err("Directory query unsupported."); + log_err("Failed to query: operation unsupported."); ret_msg->result = -ENOTSUP; return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_dbg("Failed to query: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; return; } @@ -577,15 +630,17 @@ static void do_flow_alloc(pid_t pid, flow_id, pid, HASH_VAL32(dst)); if (ipcpd.ops->ipcp_flow_alloc == NULL) { - log_err("Flow allocation unsupported."); + log_err("Flow allocation failed: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_err("Failed to enroll: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } fd = np1_flow_alloc(pid, flow_id); @@ -593,13 +648,13 @@ static void do_flow_alloc(pid_t pid, log_err("Failed allocating n + 1 fd on flow_id %d: %d", flow_id, fd); ret_msg->result = -EFLOWDOWN; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_flow_alloc(fd, dst, qs, data); - finish: - log_info("Finished allocating flow %d to " HASH_FMT32 ": %d.", - flow_id, HASH_VAL32(dst), ret_msg->result); + + log_info("Finished allocating flow %d to " HASH_FMT32 ".", + flow_id, HASH_VAL32(dst)); } @@ -614,26 +669,28 @@ static void do_flow_join(pid_t pid, log_info("Joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); if (ipcpd.ops->ipcp_flow_join == NULL) { - log_err("Broadcast unsupported."); + log_err("Failed to join: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_err("Failed to join: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } fd = np1_flow_alloc(pid, flow_id); if (fd < 0) { log_err("Failed allocating n + 1 fd on flow_id %d.", flow_id); ret_msg->result = -1; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_flow_join(fd, dst, qs); - finish: + log_info("Finished joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); } @@ -647,15 +704,20 @@ static void do_flow_alloc_resp(int resp, log_info("Responding %d to alloc on flow_id %d.", resp, flow_id); if (ipcpd.ops->ipcp_flow_alloc_resp == NULL) { - log_err("Flow_alloc_resp unsupported."); + log_err("Failed to respond on flow %d: operation unsupported.", + flow_id); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_err("Failed to respond to flow %d:" + "IPCP in state <%s>, need <%s>.", + flow_id, + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } if (resp == 0) { @@ -663,13 +725,13 @@ static void do_flow_alloc_resp(int resp, if (fd < 0) { log_warn("Flow_id %d is not known.", flow_id); ret_msg->result = -1; - goto finish; + return; } } ret_msg->result = ipcpd.ops->ipcp_flow_alloc_resp(fd, resp, data); - finish: - log_info("Finished responding to allocation request: %d", + + log_info("Finished responding %d to allocation request.", ret_msg->result); } @@ -682,28 +744,29 @@ static void do_flow_dealloc(int flow_id, log_info("Deallocating flow %d.", flow_id); if (ipcpd.ops->ipcp_flow_dealloc == NULL) { - log_err("Flow deallocation unsupported."); + log_err("Failed to dealloc: operation unsupported."); ret_msg->result = -ENOTSUP; - goto finish; + return; } if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); + log_err("Failed to enroll: IPCP in state <%s>, need <%s>.", + ipcp_state_str[ipcp_get_state()], + ipcp_state_str[IPCP_OPERATIONAL]); ret_msg->result = -EIPCPSTATE; - goto finish; + return; } fd = np1_flow_dealloc(flow_id, timeo_sec); if (fd < 0) { log_warn("Could not deallocate flow_id %d.", flow_id); ret_msg->result = -1; - goto finish; + return; } ret_msg->result = ipcpd.ops->ipcp_flow_dealloc(fd); - finish: - log_info("Finished deallocating flow %d: %d.", - flow_id, ret_msg->result); + + log_info("Finished deallocating flow %d.", flow_id); } static void * mainloop(void * o) @@ -888,8 +951,8 @@ int ipcp_init(int argc, log_init(log); - ipcpd.state = IPCP_NULL; - ipcpd.type = type; + ipcpd.state = IPCP_NULL; + ipcpd.type = type; #if defined (__linux__) prctl(PR_SET_TIMERSLACK, IPCP_LINUX_SLACK_NS, 0, 0, 0); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 3ec6401b..2c41f5b9 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -37,7 +37,7 @@ #define ipcp_dir_hash_strlen() (ipcp_dir_hash_len() * 2) struct ipcp_ops { - int (* ipcp_bootstrap)(const struct ipcp_config * conf); + int (* ipcp_bootstrap)(struct ipcp_config * conf); int (* ipcp_enroll)(const char * dst, struct layer_info * info); @@ -88,6 +88,9 @@ enum ipcp_type ipcp_get_type(void); const char * ipcp_get_name(void); +/* TODO: Only specify hash algorithm in directory policy */ +void ipcp_set_dir_hash_algo(enum hash_algo algo); + void ipcp_set_state(enum ipcp_state state); enum ipcp_state ipcp_get_state(void); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 054602e5..ffa6dc5a 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -137,7 +137,7 @@ static void * local_ipcp_packet_loop(void * o) return (void *) 0; } -static int local_ipcp_bootstrap(const struct ipcp_config * conf) +static int local_ipcp_bootstrap(struct ipcp_config * conf) { assert(conf); diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 3cfcf2cc..5f770a61 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -584,7 +584,7 @@ static const char * inet4_ntop(const void * addr, return inet_ntop(AF_INET, addr, buf, INET_ADDRSTRLEN); } -static int udp_ipcp_bootstrap(const struct ipcp_config * conf) +static int udp_ipcp_bootstrap(struct ipcp_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; @@ -664,14 +664,14 @@ static int udp_ipcp_bootstrap(const struct ipcp_config * conf) return 0; fail_packet_writer: - while (i > 0) { - pthread_cancel(udp_data.packet_writer[--i]); + while (i-- > 0) { + pthread_cancel(udp_data.packet_writer[i]); pthread_join(udp_data.packet_writer[i], NULL); } i = IPCP_UDP_RD_THR; fail_packet_reader: - while (i > 0) { - pthread_cancel(udp_data.packet_reader[--i]); + while (i-- > 0) { + pthread_cancel(udp_data.packet_reader[i]); pthread_join(udp_data.packet_reader[i], NULL); } pthread_cancel(udp_data.mgmt_handler); diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt index b0dd3acc..7d2a765b 100644 --- a/src/ipcpd/unicast/CMakeLists.txt +++ b/src/ipcpd/unicast/CMakeLists.txt @@ -31,7 +31,7 @@ if (HAVE_FUSE) endif () endif () -set(SOURCE_FILES +set(IPCP_UNICAST_SOURCE_FILES # Add source files here addr-auth.c ca.c @@ -56,7 +56,7 @@ set(SOURCE_FILES routing/graph.c ) -add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES} ${COMMON_SOURCES} +add_executable(ipcpd-unicast ${IPCP_UNICAST_SOURCE_FILES} ${IPCP_SOURCES} ${COMMON_SOURCES} ${DHT_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) target_link_libraries(ipcpd-unicast LINK_PUBLIC ouroboros-dev) diff --git a/src/ipcpd/unicast/addr-auth.h b/src/ipcpd/unicast/addr-auth.h index e119dff3..b453cba5 100644 --- a/src/ipcpd/unicast/addr-auth.h +++ b/src/ipcpd/unicast/addr-auth.h @@ -27,6 +27,14 @@ #include +#define ADDR_FMT32 "%02x:%02x:%02x:%02x" +#define ADDR_VAL32(a) \ + ((uint8_t *) a)[0], ((uint8_t *) a)[1], \ + ((uint8_t *) a)[2], ((uint8_t *) a)[3] + +#define ADDR_FMT64 ADDR_FMT32 ":" ADDR_FMT32 +#define ADDR_VAL64(a) ADDR_VAL32(a), ADDR_VAL32(a + 4) + int addr_auth_init(enum pol_addr_auth type, const void * info); diff --git a/src/ipcpd/unicast/addr-auth/flat.c b/src/ipcpd/unicast/addr-auth/flat.c index dfdeccd6..34ca1cef 100644 --- a/src/ipcpd/unicast/addr-auth/flat.c +++ b/src/ipcpd/unicast/addr-auth/flat.c @@ -31,10 +31,11 @@ #include #include +#include "addr-auth.h" #include "ipcp.h" #include "flat.h" -#define NAME_LEN 8 +#define NAME_LEN 8 #define INVALID_ADDRESS 0 struct { @@ -63,6 +64,9 @@ int flat_init(const void * info) while (flat.addr == INVALID_ADDRESS) random_buffer(&flat.addr,sizeof(flat.addr)); #endif + log_dbg("Flat address initialized to " ADDR_FMT32 ".", + ADDR_VAL32((uint8_t *) &flat.addr)); + return 0; } diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c index 287eaf41..1fcc9bb2 100644 --- a/src/ipcpd/unicast/ca.c +++ b/src/ipcpd/unicast/ca.c @@ -49,7 +49,6 @@ int ca_init(enum pol_cong_avoid pol) return 0; } - void ca_fini(void) { ca.ops = NULL; diff --git a/src/ipcpd/unicast/dir.c b/src/ipcpd/unicast/dir.c index e0cb09fc..2b305626 100644 --- a/src/ipcpd/unicast/dir.c +++ b/src/ipcpd/unicast/dir.c @@ -44,50 +44,57 @@ struct { struct dir_ops * ops; - void * dir; -} dirmgr; +} dir; -int dir_init(void) +int dir_init(struct dir_config * conf) { - dirmgr.ops = &dht_dir_ops; + void * cfg; - dirmgr.dir = dirmgr.ops->create(); - if (dirmgr.dir == NULL) { - dirmgr.ops = NULL; - return -ENOMEM; + assert(conf != NULL); + + switch (conf->pol) { + case DIR_DHT: + log_info("Using DHT policy."); + dir.ops = &dht_dir_ops; + cfg = &conf->dht; + break; + default: /* DIR_INVALID */ + log_err("Invalid directory policy %d.", conf->pol); + return -EINVAL; } - return 0; + assert(dir.ops->init != NULL); + + return dir.ops->init(cfg); } void dir_fini(void) { - dirmgr.ops->destroy(dirmgr.dir); - dirmgr.ops = NULL; - dirmgr.dir = NULL; + dir.ops->fini(); + dir.ops = NULL; } -int dir_bootstrap(void) +int dir_start(void) { - return dirmgr.ops->bootstrap(dirmgr.dir); + return dir.ops->start(); } -int dir_reg(const uint8_t * hash) +void dir_stop(void) { - return dirmgr.ops->reg(dirmgr.dir, hash); + dir.ops->stop(); } -int dir_unreg(const uint8_t * hash) +int dir_reg(const uint8_t * hash) { - return dirmgr.ops->unreg(dirmgr.dir, hash); + return dir.ops->reg(hash); } -uint64_t dir_query(const uint8_t * hash) +int dir_unreg(const uint8_t * hash) { - return dirmgr.ops->query(dirmgr.dir, hash); + return dir.ops->unreg(hash); } -int dir_wait_running(void) +uint64_t dir_query(const uint8_t * hash) { - return dirmgr.ops->wait_running(dirmgr.dir); + return dir.ops->query(hash); } diff --git a/src/ipcpd/unicast/dir.h b/src/ipcpd/unicast/dir.h index b261ea2c..dbfde19f 100644 --- a/src/ipcpd/unicast/dir.h +++ b/src/ipcpd/unicast/dir.h @@ -25,11 +25,14 @@ #include -int dir_init(void); +/* may update the config! */ +int dir_init(struct dir_config * conf); void dir_fini(void); -int dir_bootstrap(void); +int dir_start(void); + +void dir_stop(void); int dir_reg(const uint8_t * hash); @@ -37,6 +40,4 @@ int dir_unreg(const uint8_t * hash); uint64_t dir_query(const uint8_t * hash); -int dir_wait_running(void); - #endif /* OUROBOROS_IPCPD_UNICAST_DIR_H */ diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index da39e567..c7205505 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -4,7 +4,6 @@ * Distributed Hash Table based on Kademlia * * Dimitri Staessens - * Sander Vrijders * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,10 +19,12 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L +#if !defined (__DHT_TEST__) + #if defined(__linux__) || defined(__CYGWIN__) + #define _DEFAULT_SOURCE + #else + #define _POSIX_C_SOURCE 200112L + #endif #endif #include "config.h" @@ -40,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -59,143 +61,153 @@ #include #include "dht.pb-c.h" -typedef DhtMsg dht_msg_t; -typedef DhtContactMsg dht_contact_msg_t; +typedef DhtMsg dht_msg_t; +typedef DhtContactMsg dht_contact_msg_t; +typedef DhtStoreMsg dht_store_msg_t; +typedef DhtFindReqMsg dht_find_req_msg_t; +typedef DhtFindNodeRspMsg dht_find_node_rsp_msg_t; +typedef DhtFindValueRspMsg dht_find_value_rsp_msg_t; +typedef ProtobufCBinaryData binary_data_t; #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ -#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ -#define KAD_K 8 /* Replication factor, MDHT value. */ -#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ -#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 8 /* Response time to wait for a join. */ -#define KAD_T_RESP 5 /* Response time to wait for a response. */ -#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ -#define KAD_QUEER 15 /* Time to declare peer questionable. */ -#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ -#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ -#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define DHT_MAX_REQS 128 /* KAD recommends rnd(), bmp can be changed. */ +#define DHT_WARN_REQS 100 /* Warn if number of requests exceeds this. */ +#define DHT_MAX_VALS 8 /* Max number of values to return for a key. */ +#define DHT_T_CACHE 60 /* Max cache time for values (s) */ +#define DHT_T_RESP 2 /* Response time to wait for a response (s). */ +#define DHT_N_REPUB 5 /* Republish if expiry within n replications. */ +#define DHT_R_PING 2 /* Ping retries before declaring peer dead. */ +#define DHT_QUEER 15 /* Time to declare peer questionable. */ +#define DHT_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define DHT_RESP_RETR 6 /* Number of retries on sending a response. */ #define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ -#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ +#define DHT_INVALID 0 /* Invalid cookie value. */ -enum dht_state { - DHT_INIT = 0, - DHT_SHUTDOWN, - DHT_JOINING, - DHT_RUNNING, -}; +#define KEY_FMT "K<" HASH_FMT64 ">" +#define KEY_VAL(key) HASH_VAL64(key) -enum kad_code { - KAD_JOIN = 0, - KAD_FIND_NODE, - KAD_FIND_VALUE, - /* Messages without a response below. */ - KAD_STORE, - KAD_RESPONSE -}; +#define VAL_FMT "V<" HASH_FMT64 ">" +#define VAL_VAL(val) HASH_VAL64((val).data) -enum kad_req_state { - REQ_NULL = 0, - REQ_INIT, - REQ_PENDING, - REQ_RESPONSE, - REQ_DONE, - REQ_DESTROY -}; +#define KV_FMT "<" HASH_FMT64 ", " HASH_FMT64 ">" +#define KV_VAL(key, val) HASH_VAL64(key), HASH_VAL64((val).data) -enum lookup_state { - LU_NULL = 0, - LU_INIT, - LU_PENDING, - LU_UPDATE, - LU_COMPLETE, - LU_DESTROY -}; +#define PEER_FMT "[" HASH_FMT64 "|" ADDR_FMT32 "]" +#define PEER_VAL(id, addr) HASH_VAL64(id), ADDR_VAL32(&(addr)) + +#define DHT_CODE(msg) dht_code_str[(msg)->code] + +#define TX_HDR_FMT "%s --> " PEER_FMT +#define TX_HDR_VAL(msg, id, addr) DHT_CODE(msg), PEER_VAL(id, addr) -struct kad_req { - struct list_head next; +#define RX_HDR_FMT "%s <-- " PEER_FMT +#define RX_HDR_VAL(msg) DHT_CODE(msg), \ + PEER_VAL(msg->src->id.data, msg->src->addr) - uint32_t cookie; - enum kad_code code; - uint8_t * key; - uint64_t addr; +#define CK_FMT "|" HASH_FMT64 "|" +#define CK_VAL(cookie) HASH_VAL64(&(cookie)) - enum kad_req_state state; - pthread_cond_t cond; - pthread_mutex_t lock; +#define IS_REQUEST(code) \ + (code == DHT_FIND_NODE_REQ || code == DHT_FIND_VALUE_REQ) - time_t t_exp; +enum dht_code { + DHT_STORE, + DHT_FIND_NODE_REQ, + DHT_FIND_NODE_RSP, + DHT_FIND_VALUE_REQ, + DHT_FIND_VALUE_RSP }; -struct cookie_el { - struct list_head next; +const char * dht_code_str[] = { + "DHT_STORE", + "DHT_FIND_NODE_REQ", + "DHT_FIND_NODE_RSP", + "DHT_FIND_VALUE_REQ", + "DHT_FIND_VALUE_RSP" +}; - uint32_t cookie; +enum dht_state { + DHT_NULL = 0, + DHT_INIT, + DHT_RUNNING }; -struct lookup { - struct list_head next; +struct val_entry { + struct list_head next; + + buffer_t val; - struct list_head cookies; + time_t t_exp; /* Expiry time */ + time_t t_repl; /* Last replication time */ +}; - uint8_t * key; +struct dht_entry { + struct list_head next; - struct list_head contacts; - size_t n_contacts; + uint8_t * key; - uint64_t * addrs; - size_t n_addrs; + struct { + struct list_head list; + size_t len; + } vals; /* We don't own these, only replicate */ - enum lookup_state state; - pthread_cond_t cond; - pthread_mutex_t lock; + struct { + struct list_head list; + size_t len; + } lvals; /* We own these, must be republished */ }; -struct val { +struct contact { struct list_head next; + uint8_t * id; uint64_t addr; - time_t t_exp; - time_t t_rep; + size_t fails; + time_t t_seen; }; -struct ref_entry { +struct peer_entry { struct list_head next; - uint8_t * key; + uint64_t cookie; + uint8_t * id; + uint64_t addr; + enum dht_code code; - time_t t_rep; + time_t t_sent; }; -struct dht_entry { +struct dht_req { struct list_head next; uint8_t * key; - size_t n_vals; - struct list_head vals; -}; - -struct contact { - struct list_head next; + time_t t_exp; - uint8_t * id; - uint64_t addr; + struct { + struct list_head list; + size_t len; + } peers; - size_t fails; - time_t t_seen; + struct { + struct list_head list; + size_t len; + } cache; }; struct bucket { - struct list_head contacts; - size_t n_contacts; + struct { + struct list_head list; + size_t len; + } contacts; - struct list_head alts; - size_t n_alts; + struct { + struct list_head list; + size_t len; + } alts; time_t t_refr; @@ -203,395 +215,354 @@ struct bucket { uint8_t mask; struct bucket * parent; - struct bucket * children[1L << KAD_BETA]; + struct bucket * children[1L << DHT_BETA]; }; struct cmd { - struct list_head next; - - struct shm_du_buff * sdb; + struct list_head next; + buffer_t cbuf; }; struct dir_ops dht_dir_ops = { - .create = dht_create, - .destroy = dht_destroy, - .bootstrap = dht_bootstrap, - .reg = dht_reg, - .unreg = dht_unreg, - .query = dht_query, - .wait_running = dht_wait_running + .init = (int (*)(void *)) dht_init, + .fini = dht_fini, + .start = dht_start, + .stop = dht_stop, + .reg = dht_reg, + .unreg = dht_unreg, + .query = dht_query }; -struct dht { - size_t alpha; - size_t b; - size_t k; - - time_t t_expire; - time_t t_refresh; - time_t t_replic; - time_t t_repub; - - uint8_t * id; - uint64_t addr; - - struct bucket * buckets; - - struct list_head entries; +struct { + struct { /* Kademlia parameters */ + uint32_t alpha; /* Number of concurrent requests */ + size_t k; /* Number of replicas to store */ + time_t t_expire; /* Expiry time for values (s) */ + time_t t_refresh; /* Refresh time for contacts (s) */ + time_t t_repl; /* Replication time for values (s) */ + }; - struct list_head refs; + buffer_t id; - struct list_head lookups; + time_t t0; /* Creation time */ + uint64_t addr; /* Our own address */ + uint64_t peer; /* Enrollment peer address */ + uint64_t magic; /* Magic cookie for retransmit */ - struct list_head requests; - struct bmp * cookies; + uint64_t eid; /* Entity ID */ - enum dht_state state; - struct list_head cmds; - pthread_cond_t cond; - pthread_mutex_t mtx; + struct tpm * tpm; + pthread_t worker; - pthread_rwlock_t lock; - - uint64_t eid; - - struct tpm * tpm; - - pthread_t worker; -}; + enum dht_state state; -struct join_info { - struct dht * dht; - uint64_t addr; + struct { + struct { + struct bucket * root; + } contacts; + + struct { + struct list_head list; + size_t len; + size_t vals; + size_t lvals; + } kv; + + pthread_rwlock_t lock; + } db; + + struct { + struct list_head list; + size_t len; + pthread_cond_t cond; + pthread_mutex_t mtx; + } reqs; + + struct { + struct list_head list; + pthread_cond_t cond; + pthread_mutex_t mtx; + } cmds; +} dht; + + +/* DHT RIB */ + +static const char * dht_dir[] = { + "database", + "stats", + NULL }; -struct packet_info { - struct dht * dht; - struct shm_du_buff * sdb; -}; +const char * dht_stats = \ + "DHT: " HASH_FMT64 "\n" + " Created: %s\n" + " Address: " ADDR_FMT32 "\n" + " Kademlia parameters:\n" + " Number of concurrent requests (alpha): %10zu\n" + " Number of replicas (k): %10zu\n" + " Expiry time for values (s): %10ld\n" + " Refresh time for contacts (s): %10ld\n" + " Replication time for values (s): %10ld\n" + " Number of keys: %10zu\n" + " Number of local values: %10zu\n" + " Number of non-local values: %10zu\n"; -static uint8_t * dht_dup_key(const uint8_t * key, - size_t len) +static int dht_rib_statfile(char * buf, + size_t len) { - uint8_t * dup; + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + size_t keys; + size_t vals; + size_t lvals; - dup = malloc(sizeof(*dup) * len); - if (dup == NULL) - return NULL; + assert(buf != NULL); + assert(len > 0); - memcpy(dup, key, len); - - return dup; -} + pthread_rwlock_rdlock(&dht.db.lock); -static enum dht_state dht_get_state(struct dht * dht) -{ - enum dht_state state; + keys = dht.db.kv.len; + lvals = dht.db.kv.lvals; + vals = dht.db.kv.vals; - pthread_mutex_lock(&dht->mtx); + pthread_rwlock_unlock(&dht.db.lock); - state = dht->state; + tm = gmtime(&dht.t0); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); - pthread_mutex_unlock(&dht->mtx); + snprintf(buf, len, dht_stats, + HASH_VAL64(dht.id.data), + tmstr, + ADDR_VAL32(&dht.addr), + dht.alpha, dht.k, + dht.t_expire, dht.t_refresh, dht.t_repl, + keys, vals, lvals); - return state; + return strlen(buf); } -static int dht_set_state(struct dht * dht, - enum dht_state state) +static size_t dht_db_file_len(void) { - pthread_mutex_lock(&dht->mtx); - - if (state == DHT_JOINING && dht->state != DHT_INIT) { - pthread_mutex_unlock(&dht->mtx); - return -1; - } - - dht->state = state; - - pthread_cond_broadcast(&dht->cond); - - pthread_mutex_unlock(&dht->mtx); + size_t sz; + size_t vals; - return 0; -} - -int dht_wait_running(void * dir) -{ - struct dht * dht; - int ret = 0; + sz = 18; /* DHT database + 2 * \n */ - dht = (struct dht *) dir; + pthread_rwlock_rdlock(&dht.db.lock); - pthread_mutex_lock(&dht->mtx); + if (dht.db.kv.len == 0) { + pthread_rwlock_unlock(&dht.db.lock); + sz += 14; /* No entries */ + return sz; + } - pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + sz += 39 * 3 + 1; /* tally + extra newline */ + sz += dht.db.kv.len * (25 + 19 + 23 + 1); - while (dht->state == DHT_JOINING) - pthread_cond_wait(&dht->cond, &dht->mtx); + vals = dht.db.kv.vals + dht.db.kv.lvals; - if (dht->state != DHT_RUNNING) - ret = -1; + sz += vals * (48 + 2 * RIB_TM_STRLEN); - pthread_cleanup_pop(true); + pthread_rwlock_unlock(&dht.db.lock); - return ret; + return sz; } -static uint8_t * create_id(size_t len) +static int dht_rib_dbfile(char * buf, + size_t len) { - uint8_t * id; + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + char exstr[RIB_TM_STRLEN]; + size_t i = 0; + struct list_head * p; - id = malloc(len); - if (id == NULL) - return NULL; + assert(buf != NULL); + assert(len > 0); - if (random_buffer(id, len) < 0) { - free(id); - return NULL; + pthread_rwlock_rdlock(&dht.db.lock); + + if (dht.db.kv.len == 0) { + i += snprintf(buf, len, " No entries.\n"); + pthread_rwlock_unlock(&dht.db.lock); + return i; } - return id; -} + i += snprintf(buf + i, len - i, "DHT database:\n\n"); + i += snprintf(buf + i, len - i, + "Number of keys: %10zu\n" + "Number of local values: %10zu\n" + "Number of non-local values: %10zu\n\n", + dht.db.kv.len, dht.db.kv.vals, dht.db.kv.lvals); -static void kad_req_create(struct dht * dht, - dht_msg_t * msg, - uint64_t addr) -{ - struct kad_req * req; - pthread_condattr_t cattr; - struct timespec t; - size_t b; + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + struct list_head * h; - clock_gettime(CLOCK_REALTIME_COARSE, &t); + i += snprintf(buf + i, len - i, "Key: " KEY_FMT "\n", + KEY_VAL(e->key)); + i += snprintf(buf + i, len - i, " Local entries:\n"); - req = malloc(sizeof(*req)); - if (req == NULL) - goto fail_malloc; + list_for_each(h, &e->vals.list) { + struct val_entry * v; - list_head_init(&req->next); + v = list_entry(h, struct val_entry, next); - req->t_exp = t.tv_sec + KAD_T_RESP; - req->addr = addr; - req->state = REQ_INIT; - req->cookie = msg->cookie; - req->code = msg->code; - req->key = NULL; + tm = gmtime(&v->t_repl); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); - pthread_rwlock_rdlock(&dht->lock); - b = dht->b; - pthread_rwlock_unlock(&dht->lock); + tm = gmtime(&v->t_exp); + strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm); - if (msg->has_key) { - req->key = dht_dup_key(msg->key.data, b); - if (req->key == NULL) - goto fail_dup_key; - } + i += snprintf(buf + i, len - i, + " " VAL_FMT + ", t_replicated=%.*s, t_expire=%.*s\n", + VAL_VAL(v->val), + RIB_TM_STRLEN, tmstr, + RIB_TM_STRLEN, exstr); + } - if (pthread_mutex_init(&req->lock, NULL)) - goto fail_mutex; + i += snprintf(buf + i, len - i, "\n"); + i += snprintf(buf + i, len - i, " Non-local entries:\n"); - if (pthread_condattr_init(&cattr)) - goto fail_condattr; -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif + list_for_each(h, &e->lvals.list) { + struct val_entry * v; - if (pthread_cond_init(&req->cond, &cattr)) - goto fail_cond_init; + v= list_entry(h, struct val_entry, next); - pthread_condattr_destroy(&cattr); + tm = gmtime(&v->t_repl); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); - pthread_rwlock_wrlock(&dht->lock); + tm = gmtime(&v->t_exp); + strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm); - list_add(&req->next, &dht->requests); + i += snprintf(buf + i, len - i, + " " VAL_FMT + ", t_replicated=%.*s, t_expire=%.*s\n", + VAL_VAL(v->val), + RIB_TM_STRLEN, tmstr, + RIB_TM_STRLEN, exstr); - pthread_rwlock_unlock(&dht->lock); + } + } - return; + pthread_rwlock_unlock(&dht.db.lock); - fail_cond_init: - pthread_condattr_destroy(&cattr); - fail_condattr: - pthread_mutex_destroy(&req->lock); - fail_mutex: - free(req->key); - fail_dup_key: - free(req); - fail_malloc: - return; + printf("DHT RIB DB file generated (%zu bytes).\n", i); + + return i; } -static void cancel_req_destroy(void * o) +static int dht_rib_read(const char * path, + char * buf, + size_t len) { - struct kad_req * req = (struct kad_req *) o; - - pthread_mutex_unlock(&req->lock); + char * entry; - pthread_cond_destroy(&req->cond); - pthread_mutex_destroy(&req->lock); + entry = strstr(path, RIB_SEPARATOR) + 1; - if (req->key != NULL) - free(req->key); + if (strcmp(entry, "database") == 0) { + return dht_rib_dbfile(buf, len); + } else if (strcmp(entry, "stats") == 0) { + return dht_rib_statfile(buf, len); + } - free(req); + return 0; } -static void kad_req_destroy(struct kad_req * req) +static int dht_rib_readdir(char *** buf) { - struct timespec t; - struct timespec intv = TIMESPEC_INIT_S(20); + int i = 0; - clock_gettime(PTHREAD_COND_CLOCK, &t); - ts_add(&t, &intv, &t); + while (dht_dir[i++] != NULL); - assert(req); + *buf = malloc(sizeof(**buf) * i); + if (*buf == NULL) + goto fail_buf; - pthread_mutex_lock(&req->lock); + i = 0; - switch (req->state) { - case REQ_DESTROY: - pthread_mutex_unlock(&req->lock); - return; - case REQ_PENDING: - req->state = REQ_DESTROY; - pthread_cond_broadcast(&req->cond); - break; - case REQ_INIT: - case REQ_DONE: - req->state = REQ_NULL; - break; - case REQ_RESPONSE: - case REQ_NULL: - default: - break; + while (dht_dir[i] != NULL) { + (*buf)[i] = strdup(dht_dir[i]); + if ((*buf)[i] == NULL) + goto fail_dup; + i++; } - pthread_cleanup_push(cancel_req_destroy, req); - - while (req->state != REQ_NULL && req->state != REQ_DONE) - pthread_cond_timedwait(&req->cond, &req->lock, &t); - - pthread_cleanup_pop(true); + return i; + fail_dup: + freepp(char, *buf, i); + fail_buf: + return -ENOMEM; } -static int kad_req_wait(struct kad_req * req, - time_t t) +static int dht_rib_getattr(const char * path, + struct rib_attr * attr) { - struct timespec timeo = TIMESPEC_INIT_S(0); - struct timespec abs; - int ret = 0; - - assert(req); - - timeo.tv_sec = t; - - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - ts_add(&abs, &timeo, &abs); + struct timespec now; + char * entry; - pthread_mutex_lock(&req->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - req->state = REQ_PENDING; + attr->mtime = now.tv_sec; - pthread_cleanup_push(__cleanup_mutex_unlock, &req->lock); + entry = strstr(path, RIB_SEPARATOR) + 1; - while (req->state == REQ_PENDING && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); - - switch(req->state) { - case REQ_DESTROY: - ret = -1; - req->state = REQ_NULL; - pthread_cond_broadcast(&req->cond); - break; - case REQ_PENDING: /* ETIMEDOUT */ - case REQ_RESPONSE: - req->state = REQ_DONE; - pthread_cond_broadcast(&req->cond); - break; - default: - break; + if (strcmp(entry, "database") == 0) { + attr->size = dht_db_file_len(); + } else if (strcmp(entry, "stats") == 0) { + attr->size = 545; } - pthread_cleanup_pop(true); - - return ret; + return 0; } -static void kad_req_respond(struct kad_req * req) -{ - pthread_mutex_lock(&req->lock); - - req->state = REQ_RESPONSE; - pthread_cond_broadcast(&req->cond); +static struct rib_ops r_ops = { + .read = dht_rib_read, + .readdir = dht_rib_readdir, + .getattr = dht_rib_getattr +}; - pthread_mutex_unlock(&req->lock); -} +/* Helper functions */ -static struct contact * contact_create(const uint8_t * id, - size_t len, - uint64_t addr) +static uint8_t * generate_id(void) { - struct contact * c; - struct timespec t; + uint8_t * id; - c = malloc(sizeof(*c)); - if (c == NULL) + if(dht.id.len < sizeof(uint64_t)) { + log_err("DHT ID length is too short (%zu < %zu).", + dht.id.len, sizeof(uint64_t)); return NULL; + }; - list_head_init(&c->next); - - clock_gettime(CLOCK_REALTIME_COARSE, &t); - - c->addr = addr; - c->fails = 0; - c->t_seen = t.tv_sec; - c->id = dht_dup_key(id, len); - if (c->id == NULL) { - free(c); - return NULL; + id = malloc(dht.id.len); + if (id == NULL) { + log_err("Failed to malloc ID."); + goto fail_id; } - return c; -} - -static void contact_destroy(struct contact * c) -{ - if (c != NULL) - free(c->id); + if (random_buffer(id, dht.id.len) < 0) { + log_err("Failed to generate random ID."); + goto fail_rnd; + } - free(c); + return id; + fail_rnd: + free(id); + fail_id: + return NULL; } -static struct bucket * iter_bucket(struct bucket * b, - const uint8_t * id) +static uint64_t generate_cookie(void) { - uint8_t byte; - uint8_t mask; - - assert(b); - - if (b->children[0] == NULL) - return b; - - byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; - - mask = ((1L << KAD_BETA) - 1) & 0xFF; - - byte >>= (CHAR_BIT - KAD_BETA) - - (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + uint64_t cookie = DHT_INVALID; - return iter_bucket(b->children[(byte & mask)], id); -} - -static struct bucket * dht_get_bucket(struct dht * dht, - const uint8_t * id) -{ - assert(dht->buckets); + while (cookie == DHT_INVALID) + random_buffer((uint8_t *) &cookie, sizeof(cookie)); - return iter_bucket(dht->buckets, id); + return cookie; } /* @@ -601,719 +572,1129 @@ static struct bucket * dht_get_bucket(struct dht * dht, static uint64_t dist(const uint8_t * src, const uint8_t * dst) { + assert(dht.id.len >= sizeof(uint64_t)); + return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); } -static size_t list_add_sorted(struct list_head * l, - struct contact * c, - const uint8_t * key) +#define IS_CLOSER(x, y) (dist((x), dht.id.data) < dist((y), dht.id.data)) + +static int addr_to_buf(const uint64_t addr, + buffer_t * buf) { - struct list_head * p; + size_t len; + uint64_t _addr; - assert(l); - assert(c); - assert(key); - assert(c->id); + len = sizeof(addr); + _addr = hton64(addr); - list_for_each(p, l) { - struct contact * e = list_entry(p, struct contact, next); - if (dist(c->id, key) > dist(e->id, key)) - break; - } + assert(buf != NULL); - list_add_tail(&c->next, p); + buf->data = malloc(len); + if (buf->data == NULL) + goto fail_malloc; + + buf->len = sizeof(_addr); + memcpy(buf->data, &_addr, sizeof(_addr)); - return 1; + return 0; + fail_malloc: + return -ENOMEM; } -static size_t dht_contact_list(struct dht * dht, - struct list_head * l, - const uint8_t * key) +static int buf_to_addr(const buffer_t buf, + uint64_t * addr) { - struct list_head * p; - struct bucket * b; - size_t len = 0; - size_t i; - struct timespec t; + assert(addr != NULL); + assert(buf.data != NULL); - assert(l); - assert(dht); - assert(key); - assert(list_is_empty(l)); + if (buf.len != sizeof(*addr)) + return - EINVAL; - clock_gettime(CLOCK_REALTIME_COARSE, &t); + *addr = ntoh64(*((uint64_t *) buf.data)); - b = dht_get_bucket(dht, key); - if (b == NULL) - return 0; + if (*addr == dht.addr) + *addr = INVALID_ADDR; - b->t_refr = t.tv_sec + KAD_T_REFR; + return 0; +} - if (b->n_contacts == dht->k || b->parent == NULL) { - list_for_each(p, &b->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht->b, c->addr); - if (list_add_sorted(l, c, key) == 1) - if (++len == dht->k) - break; - } - } else { - struct bucket * d = b->parent; - for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { - list_for_each(p, &d->children[i]->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht->b, c->addr); - if (c == NULL) - continue; - if (list_add_sorted(l, c, key) == 1) - if (++len == dht->k) - break; - } - } - } +static uint8_t * dht_dup_key(const uint8_t * key) +{ + uint8_t * dup; - assert(len == dht->k || b->parent == NULL); + assert(key != NULL); + assert(dht.id.len != 0); - return len; -} + dup = malloc(dht.id.len); + if (dup == NULL) + return NULL; -static struct lookup * lookup_create(struct dht * dht, - const uint8_t * id) -{ - struct lookup * lu; - pthread_condattr_t cattr; + memcpy(dup, key, dht.id.len); - assert(dht); - assert(id); + return dup; +} - lu = malloc(sizeof(*lu)); - if (lu == NULL) - goto fail_malloc; +/* DHT */ - list_head_init(&lu->contacts); - list_head_init(&lu->cookies); +static struct val_entry * val_entry_create(const buffer_t val, + time_t exp) +{ + struct val_entry * e; + struct timespec now; - lu->state = LU_INIT; - lu->addrs = NULL; - lu->n_addrs = 0; - lu->key = dht_dup_key(id, dht->b); - if (lu->key == NULL) - goto fail_id; + assert(val.data != NULL); + assert(val.len > 0); - if (pthread_mutex_init(&lu->lock, NULL)) - goto fail_mutex; + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_condattr_init(&cattr); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#ifndef __DHT_TEST_ALLOW_EXPIRED__ + if (exp < now.tv_sec) + return NULL; /* Refuse to add expired values */ #endif + e = malloc(sizeof(*e)); + if (e == NULL) + goto fail_entry; - if (pthread_cond_init(&lu->cond, &cattr)) - goto fail_cond; + list_head_init(&e->next); - pthread_condattr_destroy(&cattr); + e->val.len = val.len; + e->val.data = malloc(val.len); + if (e->val.data == NULL) + goto fail_val; - pthread_rwlock_wrlock(&dht->lock); + memcpy(e->val.data, val.data, val.len); - list_add(&lu->next, &dht->lookups); + e->t_repl = 0; + e->t_exp = exp; - lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + return e; - pthread_rwlock_unlock(&dht->lock); + fail_val: + free(e); + fail_entry: + return NULL; +} - return lu; +static void val_entry_destroy(struct val_entry * v) +{ + assert(v->val.data != NULL); - fail_cond: - pthread_condattr_destroy(&cattr); - pthread_mutex_destroy(&lu->lock); - fail_mutex: - free(lu->key); - fail_id: - free(lu); - fail_malloc: - return NULL; + freebuf(v->val); + free(v); } -static void cancel_lookup_destroy(void * o) +static struct dht_entry * dht_entry_create(const uint8_t * key) { - struct lookup * lu; - struct list_head * p; - struct list_head * h; + struct dht_entry * e; - lu = (struct lookup *) o; + assert(key != NULL); - if (lu->key != NULL) - free(lu->key); - if (lu->addrs != NULL) - free(lu->addrs); + e = malloc(sizeof(*e)); + if (e == NULL) + goto fail_entry; - list_for_each_safe(p, h, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - list_del(&c->next); - contact_destroy(c); - } + list_head_init(&e->next); + list_head_init(&e->vals.list); + list_head_init(&e->lvals.list); - list_for_each_safe(p, h, &lu->cookies) { - struct cookie_el * c = list_entry(p, struct cookie_el, next); - list_del(&c->next); - free(c); - } + e->vals.len = 0; + e->lvals.len = 0; - pthread_mutex_unlock(&lu->lock); + e->key = dht_dup_key(key); + if (e->key == NULL) + goto fail_key; - pthread_mutex_destroy(&lu->lock); - - free(lu); + return e; + fail_key: + free(e); + fail_entry: + return NULL; } -static void lookup_destroy(struct lookup * lu) +static void dht_entry_destroy(struct dht_entry * e) { - assert(lu); + struct list_head * p; + struct list_head * h; - pthread_mutex_lock(&lu->lock); + assert(e != NULL); - switch (lu->state) { - case LU_DESTROY: - pthread_mutex_unlock(&lu->lock); - return; - case LU_PENDING: - lu->state = LU_DESTROY; - pthread_cond_broadcast(&lu->cond); - break; - case LU_INIT: - case LU_UPDATE: - case LU_COMPLETE: - lu->state = LU_NULL; - break; - case LU_NULL: - default: - break; + list_for_each_safe(p, h, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + --e->vals.len; + --dht.db.kv.vals; + } + + list_for_each_safe(p, h, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + --e->lvals.len; + --dht.db.kv.lvals; } - pthread_cleanup_push(cancel_lookup_destroy, lu); + free(e->key); - while (lu->state != LU_NULL) - pthread_cond_wait(&lu->cond, &lu->lock); + assert(e->vals.len == 0 && e->lvals.len == 0); - pthread_cleanup_pop(true); + free(e); } -static void lookup_update(struct dht * dht, - struct lookup * lu, - dht_msg_t * msg) +static struct val_entry * dht_entry_get_lval(const struct dht_entry * e, + const buffer_t val) { - struct list_head * p = NULL; - struct list_head * h; - struct contact * c = NULL; - size_t n; - size_t pos = 0; - bool mod = false; - - assert(lu); - assert(msg); - - if (dht_get_state(dht) != DHT_RUNNING) - return; + struct list_head * p; - pthread_mutex_lock(&lu->lock); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - list_for_each_safe(p, h, &lu->cookies) { - struct cookie_el * e = list_entry(p, struct cookie_el, next); - if (e->cookie == msg->cookie) { - list_del(&e->next); - free(e); - break; - } + list_for_each(p, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (bufcmp(&v->val, &val) == 0) + return v; } - if (lu->state == LU_COMPLETE) { - pthread_mutex_unlock(&lu->lock); - return; - } + return NULL; +} - if (msg->n_addrs > 0) { - if (lu->addrs == NULL) { - lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); - for (n = 0; n < msg->n_addrs; ++n) - lu->addrs[n] = msg->addrs[n]; - lu->n_addrs = msg->n_addrs; - } +static struct val_entry * dht_entry_get_val(const struct dht_entry * e, + const buffer_t val) +{ + struct list_head * p; - lu->state = LU_COMPLETE; - pthread_cond_broadcast(&lu->cond); - pthread_mutex_unlock(&lu->lock); - return; - } + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock); + list_for_each(p, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (bufcmp(&v->val, &val) == 0) + return v; - while (lu->state == LU_INIT) { - pthread_rwlock_unlock(&dht->lock); - pthread_cond_wait(&lu->cond, &lu->lock); - pthread_rwlock_rdlock(&dht->lock); } - pthread_cleanup_pop(false); + return NULL; +} - for (n = 0; n < msg->n_contacts; ++n) { - c = contact_create(msg->contacts[n]->id.data, - dht->b, msg->contacts[n]->addr); - if (c == NULL) - continue; +static int dht_entry_update_val(struct dht_entry * e, + buffer_t val, + time_t exp) +{ + struct val_entry * v; + struct timespec now; - pos = 0; + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - list_for_each(p, &lu->contacts) { - struct contact * e; - e = list_entry(p, struct contact, next); - if (!memcmp(e->id, c->id, dht->b)) { - contact_destroy(c); - c = NULL; - break; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - if (dist(c->id, lu->key) > dist(e->id, lu->key)) - break; + if (exp < now.tv_sec) + return -EINVAL; /* Refuse to add expired values */ - pos++; - } + if (dht_entry_get_lval(e, val) != NULL) { + log_dbg(KV_FMT " Val already in lvals.", KV_VAL(e->key, val)); + return 0; /* Refuse to add local values */ + } - if (c == NULL) - continue; + v = dht_entry_get_val(e, val); + if (v == NULL) { + v = val_entry_create(val, exp); + if (v == NULL) + return -ENOMEM; - if (lu->n_contacts < dht->k) { - list_add_tail(&c->next, p); - ++lu->n_contacts; - mod = true; - } else if (pos == dht->k) { - contact_destroy(c); - } else { - struct contact * d; - d = list_last_entry(&lu->contacts, - struct contact, next); - list_add_tail(&c->next, p); - list_del(&d->next); - contact_destroy(d); - mod = true; - } + list_add_tail(&v->next, &e->vals.list); + ++e->vals.len; + ++dht.db.kv.vals; + + return 0; } - if (list_is_empty(&lu->cookies) && !mod) - lu->state = LU_COMPLETE; - else - lu->state = LU_UPDATE; + if (v->t_exp < exp) + v->t_exp = exp; - pthread_cond_broadcast(&lu->cond); - pthread_mutex_unlock(&lu->lock); - return; + return 0; } -static ssize_t lookup_get_addrs(struct lookup * lu, - uint64_t * addrs) +static int dht_entry_update_lval(struct dht_entry * e, + buffer_t val) { - ssize_t n; + struct val_entry * v; + struct timespec now; - assert(lu); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - pthread_mutex_lock(&lu->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - for (n = 0; (size_t) n < lu->n_addrs; ++n) - addrs[n] = lu->addrs[n]; + v = dht_entry_get_lval(e, val); + if (v == NULL) { + log_dbg(KV_FMT " Adding lval.", KV_VAL(e->key, val)); + v = val_entry_create(val, now.tv_sec + dht.t_expire); + if (v == NULL) + return -ENOMEM; - assert((size_t) n == lu->n_addrs); + list_add_tail(&v->next, &e->lvals.list); + ++e->lvals.len; + ++dht.db.kv.lvals; - pthread_mutex_unlock(&lu->lock); + return 0; + } - return n; + return 0; } -static ssize_t lookup_contact_addrs(struct lookup * lu, - uint64_t * addrs) +static int dht_entry_remove_lval(struct dht_entry * e, + buffer_t val) { - struct list_head * p; - ssize_t n = 0; + struct val_entry * v; - assert(lu); - assert(addrs); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - pthread_mutex_lock(&lu->lock); + v = dht_entry_get_lval(e, val); + if (v == NULL) + return -ENOENT; - list_for_each(p, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - addrs[n] = c->addr; - n++; - } + log_dbg(KV_FMT " Removing lval.", KV_VAL(e->key, val)); - pthread_mutex_unlock(&lu->lock); + list_del(&v->next); + val_entry_destroy(v); + --e->lvals.len; + --dht.db.kv.lvals; - return n; + return 0; } -static void lookup_new_addrs(struct lookup * lu, - uint64_t * addrs) +#define IS_EXPIRED(v, now) ((now)->tv_sec > (v)->t_exp) +static void dht_entry_remove_expired_vals(struct dht_entry * e) { struct list_head * p; - size_t n = 0; + struct list_head * h; + struct timespec now; - assert(lu); - assert(addrs); + assert(e != NULL); - pthread_mutex_lock(&lu->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - /* Uses fails to check if the contact has been contacted. */ - list_for_each(p, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - if (c->fails == 0) { - c->fails = 1; - addrs[n] = c->addr; - n++; - } + list_for_each_safe(p, h, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (!IS_EXPIRED(v, &now)) + continue; - if (n == KAD_ALPHA) - break; + log_dbg(KV_FMT " Value expired." , KV_VAL(e->key, v->val)); + list_del(&v->next); + val_entry_destroy(v); + --e->vals.len; + --dht.db.kv.vals; } - - assert(n <= KAD_ALPHA); - - addrs[n] = 0; - - pthread_mutex_unlock(&lu->lock); } -static void lookup_set_state(struct lookup * lu, - enum lookup_state state) +static struct dht_entry * __dht_kv_find_entry(const uint8_t * key) { - pthread_mutex_lock(&lu->lock); + struct list_head * p; - lu->state = state; - pthread_cond_broadcast(&lu->cond); + assert(key != NULL); - pthread_mutex_unlock(&lu->lock); -} + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + if (!memcmp(key, e->key, dht.id.len)) + return e; + } -static void cancel_lookup_wait(void * o) -{ - struct lookup * lu = (struct lookup *) o; - lu->state = LU_NULL; - pthread_mutex_unlock(&lu->lock); - lookup_destroy(lu); + return NULL; } -static enum lookup_state lookup_wait(struct lookup * lu) +static void dht_kv_remove_expired_entries(void) { - struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP); - struct timespec abs; - enum lookup_state state; - int ret = 0; - - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - ts_add(&abs, &timeo, &abs); - - pthread_mutex_lock(&lu->lock); + struct list_head * p; + struct list_head * h; + struct timespec now; - if (lu->state == LU_INIT || lu->state == LU_UPDATE) - lu->state = LU_PENDING; + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_cleanup_push(cancel_lookup_wait, lu); + pthread_rwlock_wrlock(&dht.db.lock); - while (lu->state == LU_PENDING && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs); + list_for_each_safe(p, h, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + dht_entry_remove_expired_vals(e); + if (e->lvals.len > 0 || e->vals.len > 0) + continue; - pthread_cleanup_pop(false); + log_dbg(KEY_FMT " Entry removed. ", KEY_VAL(e->key)); + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; + } - if (ret == -ETIMEDOUT) - lu->state = LU_COMPLETE; + pthread_rwlock_unlock(&dht.db.lock); +} - state = lu->state; - pthread_mutex_unlock(&lu->lock); +static struct contact * contact_create(const uint8_t * id, + uint64_t addr) +{ + struct contact * c; + struct timespec t; - return state; -} + c = malloc(sizeof(*c)); + if (c == NULL) + return NULL; -static struct kad_req * dht_find_request(struct dht * dht, - dht_msg_t * msg) -{ - struct list_head * p; + list_head_init(&c->next); - assert(dht); - assert(msg); + clock_gettime(CLOCK_REALTIME_COARSE, &t); - list_for_each(p, &dht->requests) { - struct kad_req * r = list_entry(p, struct kad_req, next); - if (r->cookie == msg->cookie) - return r; + c->addr = addr; + c->fails = 0; + c->t_seen = t.tv_sec; + c->id = dht_dup_key(id); + if (c->id == NULL) { + free(c); + return NULL; } - return NULL; + return c; } -static struct lookup * dht_find_lookup(struct dht * dht, - uint32_t cookie) +static void contact_destroy(struct contact * c) { - struct list_head * p; - struct list_head * p2; - struct list_head * h2; - - assert(dht); - assert(cookie > 0); - - list_for_each(p, &dht->lookups) { - struct lookup * l = list_entry(p, struct lookup, next); - pthread_mutex_lock(&l->lock); - list_for_each_safe(p2, h2, &l->cookies) { - struct cookie_el * e; - e = list_entry(p2, struct cookie_el, next); - if (e->cookie == cookie) { - list_del(&e->next); - free(e); - pthread_mutex_unlock(&l->lock); - return l; - } - } - pthread_mutex_unlock(&l->lock); - } + assert(c != NULL); + assert(list_is_empty(&c->next)); - return NULL; + free(c->id); + free(c); } -static struct val * val_create(uint64_t addr, - time_t exp) +static struct dht_req * dht_req_create(const uint8_t * key) { - struct val * v; - struct timespec t; + struct dht_req * req; + struct timespec now; - v = malloc(sizeof(*v)); - if (v == NULL) - return NULL; + assert(key != NULL); - list_head_init(&v->next); - v->addr = addr; + clock_gettime(PTHREAD_COND_CLOCK, &now); - clock_gettime(CLOCK_REALTIME_COARSE, &t); + req = malloc(sizeof(*req)); + if (req == NULL) + goto fail_malloc; - v->t_exp = t.tv_sec + exp; - v->t_rep = t.tv_sec + KAD_T_REPL; + list_head_init(&req->next); - return v; -} + req->t_exp = now.tv_sec + DHT_T_RESP; -static void val_destroy(struct val * v) -{ - assert(v); + list_head_init(&req->peers.list); + req->peers.len = 0; - free(v); + req->key = dht_dup_key(key); + if (req->key == NULL) + goto fail_dup_key; + + list_head_init(&req->cache.list); + req->cache.len = 0; + + return req; + + fail_dup_key: + free(req); + fail_malloc: + return NULL; } -static struct ref_entry * ref_entry_create(struct dht * dht, - const uint8_t * key) +static void dht_req_destroy(struct dht_req * req) { - struct ref_entry * e; - struct timespec t; - - assert(dht); - assert(key); + struct list_head * p; + struct list_head * h; - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; + assert(req); + assert(req->key); - e->key = dht_dup_key(key, dht->b); - if (e->key == NULL) { + list_for_each_safe(p, h, &req->peers.list) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + list_del(&e->next); + free(e->id); free(e); - return NULL; + --req->peers.len; } - clock_gettime(CLOCK_REALTIME_COARSE, &t); + list_for_each_safe(p, h, &req->cache.list) { + struct val_entry * e = list_entry(p, struct val_entry, next); + list_del(&e->next); + val_entry_destroy(e); + --req->cache.len; + } + + free(req->key); - e->t_rep = t.tv_sec + dht->t_repub; + assert(req->peers.len == 0); - return e; + free(req); } -static void ref_entry_destroy(struct ref_entry * e) +static struct peer_entry * dht_req_get_peer(struct dht_req * req, + struct peer_entry * e) { - free(e->key); - free(e); + struct list_head * p; + + list_for_each(p, &req->peers.list) { + struct peer_entry * x = list_entry(p, struct peer_entry, next); + if (x->addr == e->addr) + return x; + } + + return NULL; } -static struct dht_entry * dht_entry_create(struct dht * dht, - const uint8_t * key) +#define IS_MAGIC(peer) ((peer)->cookie == dht.magic) +void dht_req_add_peer(struct dht_req * req, + struct peer_entry * e) { - struct dht_entry * e; + struct peer_entry * x; /* existing */ + struct list_head * p; /* iterator */ + size_t pos = 0; - assert(dht); - assert(key); + assert(req != NULL); + assert(e != NULL); + assert(e->id != NULL); - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; + /* + * Dedupe messages to the same peer, unless + * 1) The previous request was FIND_NODE and now it's FIND_VALUE + * 2) We urgently need contacts from emergency peer (magic cookie) + */ + x = dht_req_get_peer(req, e); + if (x != NULL && x->code >= e->code && !IS_MAGIC(e)) + goto skip; - list_head_init(&e->next); - list_head_init(&e->vals); + /* Find how this contact ranks in distance to the key */ + list_for_each(p, &req->peers.list) { + struct peer_entry * y = list_entry(p, struct peer_entry, next); + if (IS_CLOSER(y->id, e->id)) { + pos++; + continue; + } + break; + } - e->n_vals = 0; + /* Add a new peer to this request if we need to */ + if (pos < dht.alpha || !IS_MAGIC(e)) { + x = malloc(sizeof(*x)); + if (x == NULL) { + log_err("Failed to malloc peer entry."); + goto skip; + } - e->key = dht_dup_key(key, dht->b); - if (e->key == NULL) { - free(e); - return NULL; - } + x->cookie = e->cookie; + x->addr = e->addr; + x->code = e->code; + x->t_sent = e->t_sent; + x->id = dht_dup_key(e->id); + if (x->id == NULL) { + log_err("Failed to dup peer ID."); + free(x); + goto skip; + } - return e; + if (IS_MAGIC(e)) + list_add(&x->next, p); + else + list_add_tail(&x->next, p); + ++req->peers.len; + return; + } + skip: + list_del(&e->next); + free(e->id); + free(e); } -static void dht_entry_destroy(struct dht_entry * e) +static size_t dht_req_add_peers(struct dht_req * req, + struct list_head * pl) { - struct list_head * p; - struct list_head * h; + struct list_head * p; + struct list_head * h; + size_t n = 0; - assert(e); + assert(req != NULL); + assert(pl != NULL); - list_for_each_safe(p, h, &e->vals) { - struct val * v = list_entry(p, struct val, next); - list_del(&v->next); - val_destroy(v); + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + dht_req_add_peer(req, e); } - free(e->key); - - free(e); + return n; } -static int dht_entry_add_addr(struct dht_entry * e, - uint64_t addr, - time_t exp) +static bool dht_req_has_peer(struct dht_req * req, + uint64_t cookie) { struct list_head * p; - struct val * val; - struct timespec t; - clock_gettime(CLOCK_REALTIME_COARSE, &t); - - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr == addr) { - if (v->t_exp < t.tv_sec + exp) { - v->t_exp = t.tv_sec + exp; - v->t_rep = t.tv_sec + KAD_T_REPL; - } + assert(req != NULL); - return 0; - } + list_for_each(p, &req->peers.list) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + if (e->cookie == cookie) + return true; } - val = val_create(addr, exp); - if (val == NULL) - return -ENOMEM; - - list_add(&val->next, &e->vals); - ++e->n_vals; - - return 0; + return false; } - -static void dht_entry_del_addr(struct dht_entry * e, - uint64_t addr) +static void peer_list_destroy(struct list_head * pl) { struct list_head * p; struct list_head * h; - assert(e); + assert(pl != NULL); - list_for_each_safe(p, h, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr == addr) { - list_del(&v->next); - val_destroy(v); - --e->n_vals; - } - } - - if (e->n_vals == 0) { + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); list_del(&e->next); - dht_entry_destroy(e); + free(e->id); + free(e); } } -static uint64_t dht_entry_get_addr(struct dht * dht, - struct dht_entry * e) +static int dht_kv_create_peer_list(struct list_head * cl, + struct list_head * pl, + enum dht_code code) { - struct list_head * p; + struct list_head * p; + struct list_head * h; + struct timespec now; + size_t len; - assert(e); - assert(!list_is_empty(&e->vals)); + assert(cl != NULL); + assert(pl != NULL); + assert(list_is_empty(pl)); - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr != dht->addr) - return v->addr; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - return 0; -} + len = 0; -/* Forward declaration. */ -static struct lookup * kad_lookup(struct dht * dht, - const uint8_t * key, - enum kad_code code); + list_for_each_safe(p, h, cl) { + struct contact * c = list_entry(p, struct contact, next); + struct peer_entry * e; + if (len++ == dht.alpha) + break; + e = malloc(sizeof(*e)); + if (e == NULL) + return -ENOMEM; -/* Build a refresh list. */ -static void bucket_refresh(struct dht * dht, - struct bucket * b, - time_t t, - struct list_head * r) -{ - size_t i; + e->cookie = generate_cookie(); + e->code = code; + e->addr = c->addr; + e->t_sent = now.tv_sec; - if (*b->children != NULL) - for (i = 0; i < (1L << KAD_BETA); ++i) - bucket_refresh(dht, b->children[i], t, r); + e->id = c->id; - if (b->n_contacts == 0) - return; + list_add_tail(&e->next, pl); - if (t > b->t_refr) { - struct contact * c; - struct contact * d; - c = list_first_entry(&b->contacts, struct contact, next); - d = contact_create(c->id, dht->b, c->addr); - if (d != NULL) - list_add(&d->next, r); - return; + list_del(&c->next); + c->id = NULL; /* we stole the id */ + contact_destroy(c); } + + return 0; } +static struct dht_req * __dht_kv_req_get_req(const uint8_t * key) +{ + struct list_head * p; -static struct bucket * bucket_create(void) + list_for_each(p, &dht.reqs.list) { + struct dht_req * r = list_entry(p, struct dht_req, next); + if (memcmp(r->key, key, dht.id.len) == 0) + return r; + } + + return NULL; +} + +static struct dht_req * __dht_kv_get_req_cache(const uint8_t * key) { - struct bucket * b; - struct timespec t; - size_t i; + struct dht_req * req; - b = malloc(sizeof(*b)); - if (b == NULL) + assert(key != NULL); + + req = __dht_kv_req_get_req(key); + if (req == NULL) return NULL; - list_head_init(&b->contacts); - b->n_contacts = 0; + if (req->cache.len == 0) + return NULL; - list_head_init(&b->alts); - b->n_alts = 0; + return req; +} - clock_gettime(CLOCK_REALTIME_COARSE, &t); - b->t_refr = t.tv_sec + KAD_T_REFR; +static void __dht_kv_req_remove(const uint8_t * key) +{ + struct dht_req * req; - for (i = 0; i < (1L << KAD_BETA); ++i) - b->children[i] = NULL; + assert(key != NULL); + + req = __dht_kv_req_get_req(key); + if (req == NULL) + return; + + list_del(&req->next); + --dht.reqs.len; + + dht_req_destroy(req); +} + +static struct dht_req * __dht_kv_get_req_peer(const uint8_t * key, + uint64_t cookie) +{ + struct dht_req * req; + + assert(key != NULL); + + req = __dht_kv_req_get_req(key); + if (req == NULL) + return NULL; + + if (!dht_req_has_peer(req, cookie)) + return NULL; + + return req; +} + +static bool dht_kv_has_req(const uint8_t * key, + uint64_t cookie) +{ + bool found; + + pthread_mutex_lock(&dht.reqs.mtx); + + found = __dht_kv_get_req_peer(key, cookie) != NULL; + + pthread_mutex_unlock(&dht.reqs.mtx); + + return found; +} + +/* + * This will filter the peer list for addresses that still need to be + * contacted. + */ +static int dht_kv_update_req(const uint8_t * key, + struct list_head * pl) +{ + struct dht_req * req; + struct timespec now; + + assert(key != NULL); + assert(pl != NULL); + assert(!list_is_empty(pl)); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_mutex_lock(&dht.reqs.mtx); + + req = __dht_kv_req_get_req(key); + if (req == NULL) { + if (dht.reqs.len == DHT_MAX_REQS) { + log_err(KEY_FMT " Max reqs reached (%zu).", + KEY_VAL(key), dht.reqs.len); + peer_list_destroy(pl); + goto fail_req; + } + req = dht_req_create(key); + if (req == NULL) { + log_err(KEY_FMT "Failed to create req.", KEY_VAL(key)); + goto fail_req; + } + list_add_tail(&req->next, &dht.reqs.list); + ++dht.reqs.len; + } + + if (req->cache.len > 0) /* Already have values */ + peer_list_destroy(pl); + + dht_req_add_peers(req, pl); + req->t_exp = now.tv_sec + DHT_T_RESP; + + if (dht.reqs.len > DHT_WARN_REQS) { + log_warn("Number of outstanding requests (%zu) exceeds %u.", + dht.reqs.len, DHT_WARN_REQS); + } + + pthread_mutex_unlock(&dht.reqs.mtx); + + return 0; + fail_req: + pthread_mutex_unlock(&dht.reqs.mtx); + return -1; +} + +static int dht_kv_respond_req(uint8_t * key, + binary_data_t * vals, + size_t len) +{ + struct dht_req * req; + struct timespec now; + size_t i; + + assert(key != NULL); + assert(vals != NULL); + assert(len > 0); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_mutex_lock(&dht.reqs.mtx); + + req = __dht_kv_req_get_req(key); + if (req == NULL) { + log_warn(KEY_FMT " Failed to find req.", KEY_VAL(key)); + goto fail_req; + } + + for (i = 0; i < len; ++i) { + struct val_entry * e; + buffer_t val; + val.data = vals[i].data; + val.len = vals[i].len; + e = val_entry_create(val, now.tv_sec + DHT_T_CACHE); + if (e == NULL) { + log_err(" Failed to create val_entry."); + continue; + } + + list_add_tail(&e->next, &req->cache.list); + ++req->cache.len; + } + + pthread_cond_broadcast(&dht.reqs.cond); + + pthread_mutex_unlock(&dht.reqs.mtx); + fail_req: + pthread_mutex_unlock(&dht.reqs.mtx); + return -1; +} + +static ssize_t dht_kv_wait_req(const uint8_t * key, + buffer_t ** vals) +{ + struct list_head * p; + struct dht_req * req; + struct timespec t; +#ifdef __DHT_TEST__ + struct timespec intv = TIMESPEC_INIT_MS(10); +#else + struct timespec intv = TIMESPEC_INIT_S(DHT_T_RESP); +#endif + size_t max; + size_t i = 0; + int ret = 0; + + assert(key != NULL); + assert(vals != NULL); + + clock_gettime(PTHREAD_COND_CLOCK, &t); + + ts_add(&t, &intv, &t); + + pthread_mutex_lock(&dht.reqs.mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, &dht.reqs.mtx); + + while ((req = __dht_kv_get_req_cache(key)) == NULL) { + ret = pthread_cond_timedwait(&dht.reqs.cond, &dht.reqs.mtx, &t); + if (ret == ETIMEDOUT) + break; + } + + pthread_cleanup_pop(false); + + if (ret == ETIMEDOUT) { + log_warn(KEY_FMT " Req timed out.", KEY_VAL(key)); + __dht_kv_req_remove(key); + goto timedout; + } + + max = MIN(req->cache.len, DHT_MAX_VALS); + if (max == 0) + goto no_vals; + + *vals = malloc(max * sizeof(**vals)); + if (*vals == NULL) { + log_err(KEY_FMT "Failed to malloc val buffer.", KEY_VAL(key)); + goto fail_vals; + } + + memset(*vals, 0, max * sizeof(**vals)); + + list_for_each(p, &req->cache.list) { + struct val_entry * v; + if (i == max) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + pthread_mutex_unlock(&dht.reqs.mtx); + + return i; + no_vals: + pthread_mutex_unlock(&dht.reqs.mtx); + return 0; + fail_val_data: + freebufs(*vals, i); + fail_vals: + pthread_mutex_unlock(&dht.reqs.mtx); + return -ENOMEM; + timedout: + pthread_mutex_unlock(&dht.reqs.mtx); + return -ETIMEDOUT; +} + +static struct bucket * iter_bucket(struct bucket * b, + const uint8_t * id) +{ + uint8_t byte; + uint8_t mask; + + assert(b != NULL); + + if (b->children[0] == NULL) + return b; + + byte = id[(b->depth * DHT_BETA) / CHAR_BIT]; + + mask = ((1L << DHT_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - DHT_BETA) - + (((b->depth) * DHT_BETA) & (CHAR_BIT - 1)); + + return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * __dht_kv_get_bucket(const uint8_t * id) +{ + assert(dht.db.contacts.root != NULL); + + return iter_bucket(dht.db.contacts.root, id); +} + +static void contact_list_add(struct list_head * l, + struct contact * c) +{ + struct list_head * p; + + assert(l != NULL); + assert(c != NULL); + + list_for_each(p, l) { + struct contact * e = list_entry(p, struct contact, next); + if (IS_CLOSER(e->id, c->id)) + continue; + } + + list_add_tail(&c->next, p); +} + +static ssize_t dht_kv_contact_list(const uint8_t * key, + struct list_head * l, + size_t max) +{ + struct list_head * p; + struct bucket * b; + struct timespec t; + size_t i; + size_t len = 0; + + assert(l != NULL); + assert(key != NULL); + assert(list_is_empty(l)); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + max = MIN(max, dht.k); + + pthread_rwlock_rdlock(&dht.db.lock); + + b = __dht_kv_get_bucket(key); + if (b == NULL) { + log_err(KEY_FMT " Failed to get bucket.", KEY_VAL(key)); + goto fail_bucket; + } + + b->t_refr = t.tv_sec + dht.t_refresh; + + if (b->contacts.len == dht.k || b->parent == NULL) { + list_for_each(p, &b->contacts.list) { + struct contact * c; + struct contact * d; + c = list_entry(p, struct contact, next); + if (c->addr == dht.addr) + continue; + d = contact_create(c->id, c->addr); + if (d == NULL) + continue; + contact_list_add(l, d); + if (++len == max) + break; + } + } else { + struct bucket * d = b->parent; + for (i = 0; i < (1L << DHT_BETA) && len < dht.k; ++i) { + list_for_each(p, &d->children[i]->contacts.list) { + struct contact * c; + struct contact * d; + c = list_entry(p, struct contact, next); + if (c->addr == dht.addr) + continue; + d = contact_create(c->id, c->addr); + if (d == NULL) + continue; + contact_list_add(l, d); + if (++len == max) + break; + } + } + } + + pthread_rwlock_unlock(&dht.db.lock); + + return len; + fail_bucket: + pthread_rwlock_unlock(&dht.db.lock); + return -1; +} + +static void contact_list_destroy(struct list_head * l) +{ + struct list_head * p; + struct list_head * h; + + assert(l != NULL); + + list_for_each_safe(p, h, l) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + } +} + +static ssize_t dht_kv_get_contacts(const uint8_t * key, + dht_contact_msg_t *** msgs) +{ + struct list_head cl; + struct list_head * p; + struct list_head * h; + size_t len; + size_t i = 0; + + assert(key != NULL); + assert(msgs != NULL); + + list_head_init(&cl); + + len = dht_kv_contact_list(key, &cl, dht.k); + if (len == 0) { + *msgs = NULL; + return 0; + } + + *msgs = malloc(len * sizeof(**msgs)); + if (*msgs == NULL) + goto fail_msgs; + + list_for_each_safe(p, h, &cl) { + struct contact * c; + (*msgs)[i] = malloc(sizeof(***msgs)); + if ((*msgs)[i] == NULL) + goto fail_contact; + + dht_contact_msg__init((*msgs)[i]); + c = list_entry(p, struct contact, next); + list_del(&c->next); + (*msgs)[i]->id.data = c->id; + (*msgs)[i]->id.len = dht.id.len; + (*msgs)[i++]->addr = c->addr; + free(c); + } + + return i; + fail_contact: + while (i-- > 0) + dht_contact_msg__free_unpacked((*msgs)[i], NULL); + free(*msgs); + *msgs = NULL; + fail_msgs: + contact_list_destroy(&cl); + return -ENOMEM; +} + +/* Build a refresh list. */ +static void __dht_kv_bucket_refresh_list(struct bucket * b, + time_t t, + struct list_head * r) +{ + struct contact * c; + struct contact * d; + + assert(b != NULL); + + if (t < b->t_refr) + return; + + if (*b->children != NULL) { + size_t i; + for (i = 0; i < (1L << DHT_BETA); ++i) + __dht_kv_bucket_refresh_list(b->children[i], t, r); + } + + if (b->contacts.len == 0) + return; + + c = list_first_entry(&b->contacts.list, struct contact, next); + if (t > c->t_seen + dht.t_refresh) { + d = contact_create(c->id, c->addr); + if (d != NULL) + list_add(&d->next, r); + } +} + +static struct bucket * bucket_create(void) +{ + struct bucket * b; + struct timespec t; + size_t i; + + b = malloc(sizeof(*b)); + if (b == NULL) + return NULL; + + list_head_init(&b->contacts.list); + b->contacts.len = 0; + + list_head_init(&b->alts.list); + b->alts.len = 0; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + b->t_refr = t.tv_sec + dht.t_refresh; + + for (i = 0; i < (1L << DHT_BETA); ++i) + b->children[i] = NULL; b->parent = NULL; b->depth = 0; + b->mask = 0; return b; } @@ -1324,24 +1705,24 @@ static void bucket_destroy(struct bucket * b) struct list_head * h; size_t i; - assert(b); + assert(b != NULL); - for (i = 0; i < (1L << KAD_BETA); ++i) + for (i = 0; i < (1L << DHT_BETA); ++i) if (b->children[i] != NULL) bucket_destroy(b->children[i]); - list_for_each_safe(p, h, &b->contacts) { + list_for_each_safe(p, h, &b->contacts.list) { struct contact * c = list_entry(p, struct contact, next); list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->contacts.len; } - list_for_each_safe(p, h, &b->alts) { + list_for_each_safe(p, h, &b->alts.list) { struct contact * c = list_entry(p, struct contact, next); list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->alts.len; } free(b); @@ -1356,1537 +1737,2297 @@ static bool bucket_has_id(struct bucket * b, if (b->depth == 0) return true; - byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + byte = id[(b->depth * DHT_BETA) / CHAR_BIT]; - mask = ((1L << KAD_BETA) - 1) & 0xFF; + mask = ((1L << DHT_BETA) - 1) & 0xFF; - byte >>= (CHAR_BIT - KAD_BETA) - - (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + byte >>= (CHAR_BIT - DHT_BETA) - + (((b->depth - 1) * DHT_BETA) & (CHAR_BIT - 1)); return ((byte & mask) == b->mask); } -static int split_bucket(struct bucket * b) +static int move_contacts(struct bucket * b, + struct bucket * c) { struct list_head * p; struct list_head * h; + struct contact * d; + + assert(b != NULL); + assert(c != NULL); + + list_for_each_safe(p, h, &b->contacts.list) { + d = list_entry(p, struct contact, next); + if (bucket_has_id(c, d->id)) { + list_del(&d->next); + --b->contacts.len; + list_add_tail(&d->next, &c->contacts.list); + ++c->contacts.len; + } + } + + return 0; +} + +static int split_bucket(struct bucket * b) +{ uint8_t mask = 0; size_t i; - size_t c; + size_t b_len; assert(b); - assert(b->n_alts == 0); - assert(b->n_contacts); + assert(b->alts.len == 0); + assert(b->contacts.len != 0); assert(b->children[0] == NULL); - c = b->n_contacts; + b_len = b->contacts.len; - for (i = 0; i < (1L << KAD_BETA); ++i) { + for (i = 0; i < (1L << DHT_BETA); ++i) { b->children[i] = bucket_create(); - if (b->children[i] == NULL) { - size_t j; - for (j = 0; j < i; ++j) - bucket_destroy(b->children[j]); - return -1; - } + if (b->children[i] == NULL) + goto fail_child; b->children[i]->depth = b->depth + 1; b->children[i]->mask = mask; b->children[i]->parent = b; - list_for_each_safe(p, h, &b->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - if (bucket_has_id(b->children[i], c->id)) { - list_del(&c->next); - --b->n_contacts; - list_add(&c->next, &b->children[i]->contacts); - ++b->children[i]->n_contacts; - } - } + move_contacts(b, b->children[i]); mask++; } - for (i = 0; i < (1L << KAD_BETA); ++i) - if (b->children[i]->n_contacts == c) + for (i = 0; i < (1L << DHT_BETA); ++i) + if (b->children[i]->contacts.len == b_len) split_bucket(b->children[i]); return 0; + fail_child: + while (i-- > 0) + bucket_destroy(b->children[i]); + return -1; } -/* Locked externally to mandate update as (final) part of join transaction. */ -static int dht_update_bucket(struct dht * dht, - const uint8_t * id, - uint64_t addr) +static int dht_kv_update_contacts(const uint8_t * id, + uint64_t addr) { struct list_head * p; struct list_head * h; struct bucket * b; struct contact * c; - assert(dht); + assert(id != NULL); + assert(addr != INVALID_ADDR); - b = dht_get_bucket(dht, id); - if (b == NULL) - return -1; + pthread_rwlock_wrlock(&dht.db.lock); - c = contact_create(id, dht->b, addr); - if (c == NULL) - return -1; + b = __dht_kv_get_bucket(id); + if (b == NULL) { + log_err(PEER_FMT " Failed to get bucket.", PEER_VAL(id, addr)); + goto fail_update; + } + + c = contact_create(id, addr); + if (c == NULL) { + log_err(PEER_FMT " Failed to create contact.", + PEER_VAL(id, addr)); + goto fail_update; + } - list_for_each_safe(p, h, &b->contacts) { + list_for_each_safe(p, h, &b->contacts.list) { struct contact * d = list_entry(p, struct contact, next); if (d->addr == addr) { list_del(&d->next); contact_destroy(d); - --b->n_contacts; + --b->contacts.len; } } - if (b->n_contacts == dht->k) { - if (bucket_has_id(b, dht->id)) { - list_add_tail(&c->next, &b->contacts); - ++b->n_contacts; + if (b->contacts.len == dht.k) { + if (bucket_has_id(b, dht.id.data)) { + list_add_tail(&c->next, &b->contacts.list); + ++b->contacts.len; if (split_bucket(b)) { list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->contacts.len; } - } else if (b->n_alts == dht->k) { + } else if (b->alts.len == dht.k) { struct contact * d; - d = list_first_entry(&b->alts, struct contact, next); + d = list_first_entry(&b->alts.list, + struct contact, next); list_del(&d->next); contact_destroy(d); - list_add_tail(&c->next, &b->alts); + list_add_tail(&c->next, &b->alts.list); + ++b->alts.len; } else { - list_add_tail(&c->next, &b->alts); - ++b->n_alts; + list_add_tail(&c->next, &b->alts.list); + ++b->alts.len; } } else { - list_add_tail(&c->next, &b->contacts); - ++b->n_contacts; + list_add_tail(&c->next, &b->contacts.list); + ++b->contacts.len; } + pthread_rwlock_unlock(&dht.db.lock); + return 0; + fail_update: + pthread_rwlock_unlock(&dht.db.lock); + return -1; } -static int send_msg(struct dht * dht, - dht_msg_t * msg, - uint64_t addr) +static time_t gcd(time_t a, + time_t b) { -#ifndef __DHT_TEST__ - struct shm_du_buff * sdb; - size_t len; -#endif - int retr = 0; + if (a == 0) + return b; + + return gcd(b % a, a); +} + +static dht_contact_msg_t * dht_kv_src_contact_msg(void) +{ + dht_contact_msg_t * src; + + src = malloc(sizeof(*src)); + if (src == NULL) + goto fail_malloc; + + dht_contact_msg__init(src); + + src->id.data = dht_dup_key(dht.id.data); + if (src->id.data == NULL) + goto fail_id; + + src->id.len = dht.id.len; + src->addr = dht.addr; + + return src; + fail_id: + dht_contact_msg__free_unpacked(src, NULL); + fail_malloc: + return NULL; +} + +static dht_msg_t * dht_kv_find_req_msg(const uint8_t * key, + enum dht_code code) +{ + dht_msg_t * msg; + + assert(key != NULL); + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + dht_msg__init(msg); + msg->code = code; + + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; + + msg->find = malloc(sizeof(*msg->find)); + if (msg->find == NULL) + goto fail_msg; + + dht_find_req_msg__init(msg->find); + + msg->find->key.data = dht_dup_key(key); + if (msg->find->key.data == NULL) + goto fail_msg; + + msg->find->key.len = dht.id.len; + msg->find->cookie = DHT_INVALID; + + return msg; - if (msg->code == KAD_RESPONSE) - retr = KAD_RESP_RETR; + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; +} + +static dht_msg_t * dht_kv_find_node_req_msg(const uint8_t * key) +{ + return dht_kv_find_req_msg(key, DHT_FIND_NODE_REQ); +} + +static dht_msg_t * dht_kv_find_value_req_msg(const uint8_t * key) +{ + return dht_kv_find_req_msg(key, DHT_FIND_VALUE_REQ); +} + +static dht_msg_t * dht_kv_find_node_rsp_msg(uint8_t * key, + uint64_t cookie, + dht_contact_msg_t *** contacts, + size_t len) +{ + dht_msg_t * msg; - pthread_rwlock_wrlock(&dht->lock); + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + dht_msg__init(msg); + msg->code = DHT_FIND_NODE_RSP; + + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; + + msg->node = malloc(sizeof(*msg->node)); + if (msg->node == NULL) + goto fail_msg; - if (dht->id != NULL) { - msg->has_s_id = true; - msg->s_id.data = dht->id; - msg->s_id.len = dht->b; + dht_find_node_rsp_msg__init(msg->node); + + msg->node->key.data = dht_dup_key(key); + if (msg->node->key.data == NULL) + goto fail_msg; + + msg->node->cookie = cookie; + msg->node->key.len = dht.id.len; + msg->node->n_contacts = len; + if (len != 0) { /* Steal the ptr */ + msg->node->contacts = *contacts; + *contacts = NULL; } - msg->s_addr = dht->addr; + return msg; + + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; +} + +static dht_msg_t * dht_kv_find_value_rsp_msg(uint8_t * key, + uint64_t cookie, + dht_contact_msg_t *** contacts, + size_t n_contacts, + buffer_t ** vals, + size_t n_vals) +{ + dht_msg_t * msg; + + msg = dht_kv_find_node_rsp_msg(key, cookie, contacts, n_contacts); + if (msg == NULL) + goto fail_node_rsp; + + msg->code = DHT_FIND_VALUE_RSP; + + msg->val = malloc(sizeof(*msg->val)); + if (msg->val == NULL) + goto fail_msg; + + dht_find_value_rsp_msg__init(msg->val); + + msg->val->n_values = n_vals; + if (n_vals != 0) /* Steal the ptr */ + msg->val->values = (binary_data_t *) *vals; + + return msg; + + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_node_rsp: + return NULL; +} + +static dht_msg_t * dht_kv_store_msg(const uint8_t * key, + const buffer_t val, + time_t exp) +{ + dht_msg_t * msg; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + dht_msg__init(msg); + + msg->code = DHT_STORE; + + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; - if (msg->code < KAD_STORE) { - msg->cookie = bmp_allocate(dht->cookies); - if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { - pthread_rwlock_unlock(&dht->lock); - goto fail_bmp_alloc; + msg->store = malloc(sizeof(*msg->store)); + if (msg->store == NULL) + goto fail_msg; + + dht_store_msg__init(msg->store); + + msg->store->key.data = dht_dup_key(key); + if (msg->store->key.data == NULL) + goto fail_msg; + + msg->store->key.len = dht.id.len; + msg->store->val.data = malloc(val.len); + if (msg->store->val.data == NULL) + goto fail_msg; + + memcpy(msg->store->val.data, val.data, val.len); + + msg->store->val.len = val.len; + msg->store->exp = exp; + + return msg; + + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; +} + +static ssize_t dht_kv_retrieve(const uint8_t * key, + buffer_t ** vals) +{ + struct dht_entry * e; + struct list_head * p; + size_t n; + size_t i; + + assert(key != NULL); + + pthread_rwlock_rdlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) + goto no_vals; + + n = MIN(DHT_MAX_VALS, e->vals.len + e->lvals.len); + if (n == 0) + goto no_vals; + + *vals = malloc(n * sizeof(**vals)); + if (*vals == NULL) + goto fail_vals; + + memset(*vals, 0, n * sizeof(**vals)); + + i = 0; + + list_for_each(p, &e->vals.list) { + struct val_entry * v; + if (i == n) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + list_for_each(p, &e->lvals.list) { + struct val_entry * v; + if (i == n) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + pthread_rwlock_unlock(&dht.db.lock); + + return (ssize_t) i; + + fail_val_data: + pthread_rwlock_unlock(&dht.db.lock); + freebufs(*vals, i); + *vals = NULL; + return -ENOMEM; + fail_vals: + pthread_rwlock_unlock(&dht.db.lock); + return -ENOMEM; + no_vals: + pthread_rwlock_unlock(&dht.db.lock); + *vals = NULL; + return 0; +} + +static void __cleanup_dht_msg(void * msg) +{ + dht_msg__free_unpacked((dht_msg_t *) msg, NULL); +} + +#ifdef DEBUG_PROTO_DHT +static void dht_kv_debug_msg(dht_msg_t * msg) +{ + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + time_t stamp; + size_t i; + + if (msg == NULL) + return; + + pthread_cleanup_push(__cleanup_dht_msg, msg); + + switch (msg->code) { + case DHT_STORE: + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->store->key.data), + msg->store->key.len); + log_proto(" val: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->store->val.data), + msg->store->val.len); + stamp = msg->store->exp; + tm = gmtime(&stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + log_proto(" exp: %s.", tmstr); + break; + case DHT_FIND_NODE_REQ: + /* FALLTHRU */ + case DHT_FIND_VALUE_REQ: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->find->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->find->key.data), + msg->find->key.len); + break; + case DHT_FIND_VALUE_RSP: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->node->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->node->key.data), + msg->node->key.len); + log_proto(" values: [%zd]", msg->val->n_values); + for (i = 0; i < msg->val->n_values; i++) + log_proto(" " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->val->values[i].data), + msg->val->values[i].len); + log_proto(" contacts: [%zd]", msg->node->n_contacts); + for (i = 0; i < msg->node->n_contacts; i++) { + dht_contact_msg_t * c = msg->node->contacts[i]; + log_proto(" " PEER_FMT, + PEER_VAL(c->id.data, c->addr)); } + break; + case DHT_FIND_NODE_RSP: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->node->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->node->key.data), msg->node->key.len); + log_proto(" contacts: [%zd]", msg->node->n_contacts); + for (i = 0; i < msg->node->n_contacts; i++) { + dht_contact_msg_t * c = msg->node->contacts[i]; + log_proto(" " PEER_FMT, + PEER_VAL(c->id.data, c->addr)); + } + + break; + default: + break; } - pthread_rwlock_unlock(&dht->lock); + pthread_cleanup_pop(false); +} + +static void dht_kv_debug_msg_snd(dht_msg_t * msg, + uint8_t * id, + uint64_t addr) +{ + if (msg == NULL) + return; + + log_proto(TX_HDR_FMT ".", TX_HDR_VAL(msg, id, addr)); + + dht_kv_debug_msg(msg); +} + +static void dht_kv_debug_msg_rcv(dht_msg_t * msg) +{ + if (msg == NULL) + return; + + log_proto(RX_HDR_FMT ".", RX_HDR_VAL(msg)); + + dht_kv_debug_msg(msg); +} +#endif #ifndef __DHT_TEST__ +static int dht_send_msg(dht_msg_t * msg, + uint64_t addr) +{ + size_t len; + struct shm_du_buff * sdb; + + if (msg == NULL) + return 0; + + assert(addr != INVALID_ADDR && addr != dht.addr); + len = dht_msg__get_packed_size(msg); - if (len == 0) + if (len == 0) { + log_warn("%s failed to pack.", DHT_CODE(msg)); goto fail_msg; + } - while (true) { - if (ipcp_sdb_reserve(&sdb, len)) - goto fail_msg; + if (ipcp_sdb_reserve(&sdb, len)) { + log_warn("%s failed to get sdb.", DHT_CODE(msg)); + goto fail_msg; + } - dht_msg__pack(msg, shm_du_buff_head(sdb)); + dht_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) - break; + if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) < 0) { + log_warn("%s write failed", DHT_CODE(msg)); + goto fail_send; + } - ipcp_sdb_release(sdb); + return 0; + fail_send: + ipcp_sdb_release(sdb); + fail_msg: + return -1; +} +#else /* funtion for testing */ +static int dht_send_msg(dht_msg_t * msg, + uint64_t addr) +{ + buffer_t buf; - sleep(1); + assert(msg != NULL); + assert(addr != INVALID_ADDR && addr != dht.addr); - if (--retr < 0) - goto fail_msg; + buf.len = dht_msg__get_packed_size(msg); + if (buf.len == 0) { + log_warn("%s failed to pack.", DHT_CODE(msg)); + goto fail_msg; } -#else - (void) addr; - (void) retr; + buf.data = malloc(buf.len); + if (buf.data == NULL) { + log_warn("%s failed to malloc buf.", DHT_CODE(msg)); + goto fail_msg; + } + + dht_msg__pack(msg, buf.data); + + if (sink_send_msg(&buf, addr) < 0) { + log_warn("%s write failed", DHT_CODE(msg)); + goto fail_send; + } + + return 0; + fail_send: + freebuf(buf); + fail_msg: + return -1; +} #endif /* __DHT_TEST__ */ - if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) - kad_req_create(dht, msg, addr); +static void __cleanup_peer_list(void * pl) +{ + struct list_head * p; + struct list_head * h; - return msg->cookie; -#ifndef __DHT_TEST__ + assert(pl != NULL); + + list_for_each_safe(p, h, (struct list_head *) pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + list_del(&e->next); + free(e->id); + free(e); + } +} + + +static int dht_kv_send_msgs(dht_msg_t * msg, + struct list_head * pl) +{ + struct list_head * p; + struct list_head * h; + + pthread_cleanup_push(__cleanup_dht_msg, msg); + pthread_cleanup_push(__cleanup_peer_list, pl); + + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + if (IS_REQUEST(msg->code)) { + msg->find->cookie = e->cookie; + assert(msg->find->cookie != DHT_INVALID); + } + if (dht_send_msg(msg, e->addr) < 0) + continue; + +#ifdef DEBUG_PROTO_DHT + dht_kv_debug_msg_snd(msg, e->id, e->addr); +#endif + list_del(&e->next); + free(e->id); + free(e); + } + + pthread_cleanup_pop(false); + pthread_cleanup_pop(false); + + return list_is_empty(pl) ? 0 : -1; +} + +static int dht_kv_get_peer_list_for_msg(dht_msg_t * msg, + struct list_head * pl) +{ + struct list_head cl; /* contact list */ + uint8_t * key; /* key in the request */ + size_t max; + + assert(msg != NULL); + + assert(list_is_empty(pl)); + + max = msg->code == DHT_STORE ? dht.k : dht.alpha; + + switch (msg->code) { + case DHT_FIND_NODE_REQ: + /* FALLTHRU */ + case DHT_FIND_VALUE_REQ: + key = msg->find->key.data; + break; + case DHT_STORE: + key = msg->store->key.data; + break; + default: + log_err("Invalid DHT msg code (%d).", msg->code); + return -1; + } + + list_head_init(&cl); + + if (dht_kv_contact_list(key, &cl, max) < 0) { + log_err(KEY_FMT " Failed to get contact list.", KEY_VAL(key)); + goto fail_contacts; + } + + if (list_is_empty(&cl)) { + log_warn(KEY_FMT " No available contacts.", KEY_VAL(key)); + goto fail_contacts; + } + + if (dht_kv_create_peer_list(&cl, pl, msg->code) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peers; + } + + contact_list_destroy(&cl); + return 0; + fail_peers: + contact_list_destroy(&cl); + fail_contacts: + return -1; +} + +static int dht_kv_store_remote(const uint8_t * key, + const buffer_t val, + time_t exp) +{ + dht_msg_t * msg; + struct timespec now; + struct list_head pl; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + msg = dht_kv_store_msg(key, val, exp); + if (msg == NULL) { + log_err(KV_FMT " Failed to create %s.", + KV_VAL(key, val), dht_code_str[DHT_STORE]); + goto fail_msg; + } + + list_head_init(&pl); + + if (dht_kv_get_peer_list_for_msg(msg, &pl) < 0) { + log_dbg(KV_FMT " Failed to get peer list.", KV_VAL(key, val)); + goto fail_peer_list; + } + + if (dht_kv_send_msgs(msg, &pl) < 0) { + log_warn(KV_FMT " Failed to send any %s msg.", + KV_VAL(key, val), DHT_CODE(msg)); + goto fail_msgs; + } + + dht_msg__free_unpacked(msg, NULL); + + return 0; + fail_msgs: + peer_list_destroy(&pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + return -1; +} + +/* recursive lookup, start with pl NULL */ +static int dht_kv_query_contacts(const uint8_t * key, + struct list_head * pl) +{ + struct list_head p; + + dht_msg_t * msg; + + assert(key != NULL); + + msg = dht_kv_find_node_req_msg(key); + if (msg == NULL) { + log_err(KEY_FMT " Failed to create %s msg.", + KEY_VAL(key), dht_code_str[DHT_FIND_NODE_REQ]); + goto fail_msg; + } + + if (pl == NULL) { + list_head_init(&p); + pl = &p; + } + + if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peer_list; + } + + if (dht_kv_update_req(key, pl) < 0) { + log_warn(KEY_FMT " Failed to update req.", KEY_VAL(key)); + goto fail_update; + } + + if (dht_kv_send_msgs(msg, pl)) { + log_warn(KEY_FMT " Failed to send any %s msg.", + KEY_VAL(key), DHT_CODE(msg)); + goto fail_update; + } + + dht_msg__free_unpacked(msg, NULL); + + return 0; + fail_update: + peer_list_destroy(pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); fail_msg: - pthread_rwlock_wrlock(&dht->lock); - bmp_release(dht->cookies, msg->cookie); - pthread_rwlock_unlock(&dht->lock); -#endif /* !__DHT_TEST__ */ - fail_bmp_alloc: return -1; } -static struct dht_entry * dht_find_entry(struct dht * dht, - const uint8_t * key) +/* recursive lookup, start with pl NULL */ +static ssize_t dht_kv_query_remote(const uint8_t * key, + buffer_t ** vals, + struct list_head * pl) +{ + struct list_head p; + dht_msg_t * msg; + + assert(key != NULL); + + msg = dht_kv_find_value_req_msg(key); + if (msg == NULL) { + log_err(KEY_FMT " Failed to create value req.", KEY_VAL(key)); + goto fail_msg; + } + + if (pl == NULL) { + list_head_init(&p); + pl = &p; + } + + if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peer_list; + } + + if (dht_kv_update_req(key, pl) < 0) { + log_err(KEY_FMT " Failed to update request.", KEY_VAL(key)); + goto fail_update; + } + + if (dht_kv_send_msgs(msg, pl)) { + log_warn(KEY_FMT " Failed to send %s msg.", + KEY_VAL(key), DHT_CODE(msg)); + goto fail_update; + } + + dht_msg__free_unpacked(msg, NULL); + + if (vals == NULL) /* recursive lookup, already waiting */ + return 0; + + return dht_kv_wait_req(key, vals); + fail_update: + peer_list_destroy(pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + return -1; +} + +static void __add_dht_kv_entry(struct dht_entry * e) +{ + struct list_head * p; + + assert(e != NULL); + + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * d = list_entry(p, struct dht_entry, next); + if (IS_CLOSER(d->key, e->key)) + continue; + break; + } + + list_add_tail(&e->next, p); + ++dht.db.kv.len; +} + +/* incoming store message */ +static int dht_kv_store(const uint8_t * key, + const buffer_t val, + time_t exp) +{ + struct dht_entry * e; + bool new = false; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + pthread_rwlock_wrlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) { + log_dbg(KV_FMT " Adding entry (store).", KV_VAL(key, val)); + e = dht_entry_create(key); + if (e == NULL) + goto fail; + + new = true; + + __add_dht_kv_entry(e); + } + + if (dht_entry_update_val(e, val, exp) < 0) + goto fail_add; + + pthread_rwlock_unlock(&dht.db.lock); + + return 0; + fail_add: + if (new) { + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; + } + fail: + pthread_rwlock_unlock(&dht.db.lock); + return -1; +} + +static int dht_kv_publish(const uint8_t * key, + const buffer_t val) +{ + struct dht_entry * e; + struct timespec now; + bool new = false; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) { + log_dbg(KV_FMT " Adding entry (publish).", KV_VAL(key, val)); + e = dht_entry_create(key); + if (e == NULL) + goto fail; + + __add_dht_kv_entry(e); + new = true; + } + + if (dht_entry_update_lval(e, val) < 0) + goto fail_add; + + pthread_rwlock_unlock(&dht.db.lock); + + dht_kv_store_remote(key, val, now.tv_sec + dht.t_expire); + + return 0; + fail_add: + if (new) { + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; + } + fail: + pthread_rwlock_unlock(&dht.db.lock); + return -1; +} + +static int dht_kv_unpublish(const uint8_t * key, + const buffer_t val) +{ + struct dht_entry * e; + int rc; + + assert(key != NULL); + + pthread_rwlock_wrlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) + goto no_entry; + + rc = dht_entry_remove_lval(e, val); + + pthread_rwlock_unlock(&dht.db.lock); + + return rc; + no_entry: + pthread_rwlock_unlock(&dht.db.lock); + return -ENOENT; + +} + +/* message validation */ +static int dht_kv_validate_store_msg(const dht_store_msg_t * store) +{ + if (store == NULL) { + log_warn("Store in msg is NULL."); + return -EINVAL; + } + + if (store->key.data == NULL || store->key.len == 0) { + log_warn("Invalid key in DHT store msg."); + return -EINVAL; + } + + if (store->key.len != dht.id.len) { + log_warn("Invalid key length in DHT store msg."); + return -EINVAL; + } + + if (store->val.data == NULL || store->val.len == 0) { + log_warn("Invalid value in DHT store msg."); + return -EINVAL; + } + + return 0; +} + +static int validate_find_req_msg(const dht_find_req_msg_t * req) { - struct list_head * p; + if (req == NULL) { + log_warn("Request in msg is NULL."); + return -EINVAL; + } - list_for_each(p, &dht->entries) { - struct dht_entry * e = list_entry(p, struct dht_entry, next); - if (!memcmp(key, e->key, dht->b)) - return e; + if (req->key.data == NULL || req->key.len == 0) { + log_warn("Find request without key."); + return -EINVAL; } - return NULL; + if (req->key.len != dht.id.len) { + log_warn("Invalid key length in request msg."); + return -EINVAL; + } + + return 0; } -static int kad_add(struct dht * dht, - const dht_contact_msg_t * contacts, - ssize_t n, - time_t exp) +static int validate_node_rsp_msg(const dht_find_node_rsp_msg_t * rsp) { - struct dht_entry * e; - - pthread_rwlock_wrlock(&dht->lock); + if (rsp == NULL) { + log_warn("Node rsp in msg is NULL."); + return -EINVAL; + } - while (n-- > 0) { - if (contacts[n].id.len != dht->b) - log_warn("Bad key length in contact data."); + if (rsp->key.data == NULL) { + log_warn("Invalid key in DHT response msg."); + return -EINVAL; + } - e = dht_find_entry(dht, contacts[n].id.data); - if (e != NULL) { - if (dht_entry_add_addr(e, contacts[n].addr, exp)) - goto fail; - } else { - e = dht_entry_create(dht, contacts[n].id.data); - if (e == NULL) - goto fail; + if (rsp->key.len != dht.id.len) { + log_warn("Invalid key length in DHT response msg."); + return -EINVAL; + } - if (dht_entry_add_addr(e, contacts[n].addr, exp)) { - dht_entry_destroy(e); - goto fail; - } + if (!dht_kv_has_req(rsp->key.data, rsp->cookie)) { + log_warn(KEY_FMT " No request " CK_FMT ".", + KEY_VAL(rsp->key.data), CK_VAL(rsp->cookie)); - list_add(&e->next, &dht->entries); - } + return -EINVAL; } - pthread_rwlock_unlock(&dht->lock); return 0; - - fail: - pthread_rwlock_unlock(&dht->lock); - return -ENOMEM; } -static int wait_resp(struct dht * dht, - dht_msg_t * msg, - time_t timeo) +static int validate_value_rsp_msg(const dht_find_value_rsp_msg_t * rsp) { - struct kad_req * req; - - assert(dht); - assert(msg); - - pthread_rwlock_rdlock(&dht->lock); + if (rsp == NULL) { + log_warn("Invalid DHT find value response msg."); + return -EINVAL; + } - req = dht_find_request(dht, msg); - if (req == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -EPERM; + if (rsp->values == NULL && rsp->n_values > 0) { + log_dbg("No values in DHT response msg."); + return 0; } - pthread_rwlock_unlock(&dht->lock); + if (rsp->n_values == 0 && rsp->values != NULL) { + log_dbg("DHT response did not set values NULL."); + return 0; + } - return kad_req_wait(req, timeo); + return 0; } -static int kad_store(struct dht * dht, - const uint8_t * key, - uint64_t addr, - uint64_t r_addr, - time_t ttl) +static int dht_kv_validate_msg(dht_msg_t * msg) { - dht_msg_t msg = DHT_MSG__INIT; - dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT; - dht_contact_msg_t * cmsgp[1]; - - cmsg.id.data = (uint8_t *) key; - cmsg.addr = addr; - - pthread_rwlock_rdlock(&dht->lock); - - cmsg.id.len = dht->b; - pthread_rwlock_unlock(&dht->lock); + assert(msg != NULL); - cmsgp[0] = &cmsg; + if (msg->src->id.len != dht.id.len) { + log_warn("%s Invalid source contact ID.", DHT_CODE(msg)); + return -EINVAL; + } - msg.code = KAD_STORE; - msg.has_t_expire = true; - msg.t_expire = ttl; - msg.n_contacts = 1; - msg.contacts = cmsgp; + if (msg->src->addr == INVALID_ADDR) { + log_warn("%s Invalid source address.", DHT_CODE(msg)); + return -EINVAL; + } - if (send_msg(dht, &msg, r_addr) < 0) - return -1; + switch (msg->code) { + case DHT_FIND_VALUE_REQ: + /* FALLTHRU */ + case DHT_FIND_NODE_REQ: + if (validate_find_req_msg(msg->find) < 0) + return -EINVAL; + break; + case DHT_FIND_VALUE_RSP: + if (validate_value_rsp_msg(msg->val) < 0) + return -EINVAL; + /* FALLTHRU */ + case DHT_FIND_NODE_RSP: + if (validate_node_rsp_msg(msg->node) < 0) + return -EINVAL; + break; + case DHT_STORE: + if (dht_kv_validate_store_msg(msg->store) < 0) + return -EINVAL; + break; + default: + log_warn("Invalid DHT msg code (%d).", msg->code); + return -ENOENT; + } return 0; } -static ssize_t kad_find(struct dht * dht, - struct lookup * lu, - const uint64_t * addrs, - enum kad_code code) +static void do_dht_kv_store(const dht_store_msg_t * store) { - dht_msg_t msg = DHT_MSG__INIT; - ssize_t sent = 0; + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + buffer_t val; + uint8_t * key; + time_t exp; - assert(dht); - assert(lu->key); + assert(store != NULL); - msg.code = code; + val.data = store->val.data; + val.len = store->val.len; + key = store->key.data; + exp = store->exp; - msg.has_key = true; - msg.key.data = (uint8_t *) lu->key; - msg.key.len = dht->b; - - while (*addrs != 0) { - struct cookie_el * c; - int ret; - - if (*addrs == dht->addr) { - ++addrs; - continue; - } - - ret = send_msg(dht, &msg, *addrs); - if (ret < 0) - break; - - c = malloc(sizeof(*c)); - if (c == NULL) - break; + if (dht_kv_store(store->key.data, val, store->exp) < 0) { + log_err(KV_FMT " Failed to store.", KV_VAL(key, val)); + return; + } - c->cookie = (uint32_t) ret; + tm = gmtime(&exp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + log_info(KV_FMT " Stored value until %s.", KV_VAL(key, val), tmstr); +} - pthread_mutex_lock(&lu->lock); +static dht_msg_t * do_dht_kv_find_node_req(const dht_find_req_msg_t * req) +{ + dht_contact_msg_t ** contacts; + dht_msg_t * rsp; + uint8_t * key; + uint64_t cookie; + ssize_t len; - list_add_tail(&c->next, &lu->cookies); + assert(req != NULL); - pthread_mutex_unlock(&lu->lock); + key = req->key.data; + cookie = req->cookie; - ++sent; - ++addrs; + len = dht_kv_get_contacts(key, &contacts); + if (len < 0) { + log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key)); + goto fail_contacts; } - return sent; -} + rsp = dht_kv_find_node_rsp_msg(key, cookie, &contacts, len); + if (rsp == NULL) { + log_err(KEY_FMT " Failed to create %s.", KEY_VAL(key), + dht_code_str[DHT_FIND_NODE_RSP]); + goto fail_msg; + } -static void lookup_detach(struct dht * dht, - struct lookup * lu) -{ - pthread_rwlock_wrlock(&dht->lock); + assert(rsp->code == DHT_FIND_NODE_RSP); - list_del(&lu->next); + log_info(KEY_FMT " Responding with %zd contacts", KEY_VAL(key), len); - pthread_rwlock_unlock(&dht->lock); + return rsp; + fail_msg: + while (len-- > 0) + dht_contact_msg__free_unpacked(contacts[len], NULL); + free(contacts); + fail_contacts: + return NULL; } -static struct lookup * kad_lookup(struct dht * dht, - const uint8_t * id, - enum kad_code code) +static void dht_kv_process_node_rsp(dht_contact_msg_t ** contacts, + size_t len, + struct list_head * pl, + enum dht_code code) { - uint64_t addrs[KAD_ALPHA + 1]; - enum lookup_state state; - struct lookup * lu; - - lu = lookup_create(dht, id); - if (lu == NULL) - return NULL; + struct timespec now; + size_t i; - lookup_new_addrs(lu, addrs); + assert(contacts != NULL); + assert(len > 0); + assert(pl != NULL); + assert(list_is_empty(pl)); - if (addrs[0] == 0) { - lookup_detach(dht, lu); - lookup_destroy(lu); - return NULL; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - if (kad_find(dht, lu, addrs, code) == 0) { - lookup_detach(dht, lu); - return lu; - } + for (i = 0; i < len; i++) { + dht_contact_msg_t * c = contacts[i]; + struct peer_entry * e; + if (c->addr == dht.addr) + continue; - while ((state = lookup_wait(lu)) != LU_COMPLETE) { - switch (state) { - case LU_UPDATE: - lookup_new_addrs(lu, addrs); - if (addrs[0] == 0) - break; + if (dht_kv_update_contacts(c->id.data, c->addr) < 0) + log_warn(PEER_FMT " Failed to update contacts.", + PEER_VAL(c->id.data, c->addr)); - kad_find(dht, lu, addrs, code); - break; - case LU_DESTROY: - lookup_detach(dht, lu); - lookup_set_state(lu, LU_NULL); - return NULL; - default: - break; + e = malloc(sizeof(*e)); + if (e == NULL) { + log_err(PEER_FMT " Failed to malloc entry.", + PEER_VAL(c->id.data, c->addr)); + continue; } - } - assert(state == LU_COMPLETE); + e->id = dht_dup_key(c->id.data); + if (e->id == NULL) { + log_warn(PEER_FMT " Failed to duplicate id.", + PEER_VAL(c->id.data, c->addr)); + free(e); + continue; + } - lookup_detach(dht, lu); + e->cookie = generate_cookie(); + e->code = code; + e->addr = c->addr; + e->t_sent = now.tv_sec; - return lu; + list_add_tail(&e->next, pl); + } } -static void kad_publish(struct dht * dht, - const uint8_t * key, - uint64_t addr, - time_t exp) +static dht_msg_t * do_dht_kv_find_value_req(const dht_find_req_msg_t * req) { - struct lookup * lu; - uint64_t * addrs; - ssize_t n; - size_t k; - time_t t_expire; - + dht_contact_msg_t ** contacts; + ssize_t n_contacts; + buffer_t * vals; + ssize_t n_vals; + dht_msg_t * rsp; + uint8_t * key; + uint64_t cookie; - assert(dht); - assert(key); + assert(req != NULL); - pthread_rwlock_rdlock(&dht->lock); + key = req->key.data; + cookie = req->cookie; - k = dht->k; - t_expire = dht->t_expire; - - pthread_rwlock_unlock(&dht->lock); + n_contacts = dht_kv_get_contacts(key, &contacts); + if (n_contacts < 0) { + log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key)); + goto fail_contacts; + } - addrs = malloc(k * sizeof(*addrs)); - if (addrs == NULL) - return; + assert(n_contacts > 0 || contacts == NULL); - lu = kad_lookup(dht, key, KAD_FIND_NODE); - if (lu == NULL) { - free(addrs); - return; + n_vals = dht_kv_retrieve(key, &vals); + if (n_vals < 0) { + log_dbg(KEY_FMT " Failed to get values.", KEY_VAL(key)); + goto fail_vals; } - n = lookup_contact_addrs(lu, addrs); + if (n_vals == 0) + log_dbg(KEY_FMT " No values found.", KEY_VAL(key)); - while (n-- > 0) { - if (addrs[n] == dht->addr) { - dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT; - msg.id.data = (uint8_t *) key; - msg.id.len = dht->b; - msg.addr = addr; - kad_add(dht, &msg, 1, exp); - } else { - if (kad_store(dht, key, addr, addrs[n], t_expire)) - log_warn("Failed to send store message."); - } + rsp = dht_kv_find_value_rsp_msg(key, cookie, &contacts, n_contacts, + &vals, n_vals); + if (rsp == NULL) { + log_err(KEY_FMT " Failed to create %s msg.", + KEY_VAL(key), dht_code_str[DHT_FIND_VALUE_RSP]); + goto fail_msg; } - lookup_destroy(lu); + log_info(KEY_FMT " Responding with %zd contacts, %zd values.", + KEY_VAL(req->key.data), n_contacts, n_vals); + + return rsp; - free(addrs); + fail_msg: + freebufs(vals, n_vals); + fail_vals: + while (n_contacts-- > 0) + dht_contact_msg__free_unpacked(contacts[n_contacts], NULL); + free(contacts); + fail_contacts: + return NULL; } -static int kad_join(struct dht * dht, - uint64_t addr) +static void do_dht_kv_find_node_rsp(const dht_find_node_rsp_msg_t * rsp) { - dht_msg_t msg = DHT_MSG__INIT; - - msg.code = KAD_JOIN; - - msg.has_alpha = true; - msg.has_b = true; - msg.has_k = true; - msg.has_t_refresh = true; - msg.has_t_replicate = true; - msg.alpha = KAD_ALPHA; - msg.k = KAD_K; - msg.t_refresh = KAD_T_REFR; - msg.t_replicate = KAD_T_REPL; + struct list_head pl; - pthread_rwlock_rdlock(&dht->lock); + assert(rsp != NULL); - msg.b = dht->b; + list_head_init(&pl); - pthread_rwlock_unlock(&dht->lock); + dht_kv_process_node_rsp(rsp->contacts, rsp->n_contacts, &pl, + DHT_FIND_NODE_REQ); - if (send_msg(dht, &msg, addr) < 0) - return -1; - - if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) - return -1; - - dht->id = create_id(dht->b); - if (dht->id == NULL) - return -1; + if (list_is_empty(&pl)) + goto no_contacts; - pthread_rwlock_wrlock(&dht->lock); + if (dht_kv_update_req(rsp->key.data, &pl) < 0) { + log_err(KEY_FMT " Failed to update request.", + KEY_VAL(rsp->key.data)); + goto fail_update; + } - dht_update_bucket(dht, dht->id, dht->addr); + dht_kv_query_contacts(rsp->key.data, &pl); - pthread_rwlock_unlock(&dht->lock); + return; - return 0; + fail_update: + peer_list_destroy(&pl); + no_contacts: + return; } -static void dht_dead_peer(struct dht * dht, - uint8_t * key, - uint64_t addr) +static void do_dht_kv_find_value_rsp(const dht_find_node_rsp_msg_t * node, + const dht_find_value_rsp_msg_t * val) { - struct list_head * p; - struct list_head * h; - struct bucket * b; + struct list_head pl; + uint8_t * key; - b = dht_get_bucket(dht, key); + assert(node != NULL); + assert(val != NULL); - list_for_each_safe(p, h, &b->contacts) { - struct contact * c = list_entry(p, struct contact, next); - if (b->n_contacts + b->n_alts <= dht->k) { - ++c->fails; - return; - } + list_head_init(&pl); - if (c->addr == addr) { - list_del(&c->next); - contact_destroy(c); - --b->n_contacts; - break; - } - } + key = node->key.data; - while (b->n_contacts < dht->k && b->n_alts > 0) { - struct contact * c; - c = list_first_entry(&b->alts, struct contact, next); - list_del(&c->next); - --b->n_alts; - list_add(&c->next, &b->contacts); - ++b->n_contacts; + dht_kv_process_node_rsp(node->contacts, node->n_contacts, &pl, + DHT_FIND_VALUE_REQ); + + if (val->n_values > 0) { + log_dbg(KEY_FMT " %zd new values received.", + KEY_VAL(key), val->n_values); + dht_kv_respond_req(key, val->values, val->n_values); + peer_list_destroy(&pl); + return; /* done! */ } -} -static int dht_del(struct dht * dht, - const uint8_t * key, - uint64_t addr) -{ - struct dht_entry * e; + if (list_is_empty(&pl)) + goto no_contacts; - e = dht_find_entry(dht, key); - if (e == NULL) { - return -EPERM; + if (dht_kv_update_req(key, &pl) < 0) { + log_err(KEY_FMT " Failed to update request.", KEY_VAL(key)); + goto fail_update; } - dht_entry_del_addr(e, addr); + dht_kv_query_remote(key, NULL, &pl); - return 0; + return; + fail_update: + peer_list_destroy(&pl); + no_contacts: + return; } -static buffer_t dht_retrieve(struct dht * dht, - const uint8_t * key) +static dht_msg_t * dht_wait_for_dht_msg(void) { - struct dht_entry * e; - struct list_head * p; - buffer_t buf; - uint64_t * pos; - size_t addrs = 0; - - pthread_rwlock_rdlock(&dht->lock); + dht_msg_t * msg; + struct cmd * cmd; - e = dht_find_entry(dht, key); - if (e == NULL) - goto fail; + pthread_mutex_lock(&dht.cmds.mtx); - buf.len = MIN(DHT_RETR_ADDR, e->n_vals); - if (buf.len == 0) - goto fail; + pthread_cleanup_push(__cleanup_mutex_unlock, &dht.cmds.mtx); - pos = malloc(sizeof(dht->addr) * buf.len); - if (pos == NULL) - goto fail; + while (list_is_empty(&dht.cmds.list)) + pthread_cond_wait(&dht.cmds.cond, &dht.cmds.mtx); - buf.data = (uint8_t *) pos; + cmd = list_last_entry(&dht.cmds.list, struct cmd, next); + list_del(&cmd->next); - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - *pos++ = v->addr; - if (++addrs >= buf.len) - break; - } + pthread_cleanup_pop(true); - pthread_rwlock_unlock(&dht->lock); + msg = dht_msg__unpack(NULL, cmd->cbuf.len, cmd->cbuf.data); + if (msg == NULL) + log_warn("Failed to unpack DHT msg."); - return buf; + freebuf(cmd->cbuf); + free(cmd); - fail: - pthread_rwlock_unlock(&dht->lock); - buf.len = 0; - buf.data = NULL; - return buf; + return msg; } -static ssize_t dht_get_contacts(struct dht * dht, - const uint8_t * key, - dht_contact_msg_t *** msgs) +static void do_dht_msg(dht_msg_t * msg) { - struct list_head l; - struct list_head * p; - struct list_head * h; - size_t len; - size_t i = 0; - - list_head_init(&l); + dht_msg_t * rsp = NULL; + uint8_t * id; + uint64_t addr; - pthread_rwlock_wrlock(&dht->lock); - - len = dht_contact_list(dht, &l, key); - if (len == 0) { - pthread_rwlock_unlock(&dht->lock); - *msgs = NULL; - return 0; +#ifdef DEBUG_PROTO_DHT + dht_kv_debug_msg_rcv(msg); +#endif + if (dht_kv_validate_msg(msg) == -EINVAL) { + log_warn("%s Validation failed.", DHT_CODE(msg)); + dht_msg__free_unpacked(msg, NULL); + return; } - *msgs = malloc(len * sizeof(**msgs)); - if (*msgs == NULL) { - pthread_rwlock_unlock(&dht->lock); - return 0; - } + id = msg->src->id.data; + addr = msg->src->addr; - list_for_each_safe(p, h, &l) { - struct contact * c = list_entry(p, struct contact, next); - (*msgs)[i] = malloc(sizeof(***msgs)); - if ((*msgs)[i] == NULL) { - pthread_rwlock_unlock(&dht->lock); - while (i > 0) - free(*msgs[--i]); - free(*msgs); - *msgs = NULL; - return 0; - } + if (dht_kv_update_contacts(id, addr) < 0) + log_warn(PEER_FMT " Failed to update contact from msg src.", + PEER_VAL(id, addr)); - dht_contact_msg__init((*msgs)[i]); + pthread_cleanup_push(__cleanup_dht_msg, msg); - (*msgs)[i]->id.data = c->id; - (*msgs)[i]->id.len = dht->b; - (*msgs)[i++]->addr = c->addr; - list_del(&c->next); - free(c); + switch(msg->code) { + case DHT_FIND_VALUE_REQ: + rsp = do_dht_kv_find_value_req(msg->find); + break; + case DHT_FIND_NODE_REQ: + rsp = do_dht_kv_find_node_req(msg->find); + break; + case DHT_STORE: + do_dht_kv_store(msg->store); + break; + case DHT_FIND_NODE_RSP: + do_dht_kv_find_node_rsp(msg->node); + break; + case DHT_FIND_VALUE_RSP: + do_dht_kv_find_value_rsp(msg->node, msg->val); + break; + default: + assert(false); /* already validated */ } - pthread_rwlock_unlock(&dht->lock); + pthread_cleanup_pop(true); - return i; -} + if (rsp == NULL) + return; -static time_t gcd(time_t a, - time_t b) -{ - if (a == 0) - return b; + pthread_cleanup_push(__cleanup_dht_msg, rsp); - return gcd(b % a, a); + dht_send_msg(rsp, addr); + + pthread_cleanup_pop(true); /* free rsp */ } -static void * work(void * o) +static void * dht_handle_packet(void * o) { - struct dht * dht; - struct timespec now; - struct list_head * p; - struct list_head * h; - struct list_head reflist; - time_t intv; - struct lookup * lu; - - dht = (struct dht *) o; - - pthread_rwlock_rdlock(&dht->lock); - - intv = gcd(dht->t_expire, dht->t_repub); - intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; - - pthread_rwlock_unlock(&dht->lock); - - list_head_init(&reflist); + (void) o; while (true) { - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_wrlock(&dht->lock); - - /* Republish registered hashes. */ - list_for_each(p, &dht->refs) { - struct ref_entry * e; - uint8_t * key; - uint64_t addr; - time_t t_expire; - e = list_entry(p, struct ref_entry, next); - if (now.tv_sec > e->t_rep) { - key = dht_dup_key(e->key, dht->b); - if (key == NULL) - continue; - addr = dht->addr; - t_expire = dht->t_expire; - e->t_rep = now.tv_sec + dht->t_repub; - - pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, addr, t_expire); - pthread_rwlock_wrlock(&dht->lock); - free(key); - } - } - - /* Remove stale entries and republish if necessary. */ - list_for_each_safe(p, h, &dht->entries) { - struct list_head * p1; - struct list_head * h1; - struct dht_entry * e; - uint8_t * key; - time_t t_expire; - e = list_entry (p, struct dht_entry, next); - list_for_each_safe(p1, h1, &e->vals) { - struct val * v; - uint64_t addr; - v = list_entry(p1, struct val, next); - if (now.tv_sec > v->t_exp) { - list_del(&v->next); - val_destroy(v); - continue; - } - - if (now.tv_sec > v->t_rep) { - key = dht_dup_key(e->key, dht->b); - addr = v->addr; - t_expire = dht->t_expire = now.tv_sec; - v->t_rep = now.tv_sec + dht->t_replic; - pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, addr, t_expire); - pthread_rwlock_wrlock(&dht->lock); - free(key); - } - } - } - - /* Check the requests list for unresponsive nodes. */ - list_for_each_safe(p, h, &dht->requests) { - struct kad_req * r; - r = list_entry(p, struct kad_req, next); - if (now.tv_sec > r->t_exp) { - list_del(&r->next); - bmp_release(dht->cookies, r->cookie); - dht_dead_peer(dht, r->key, r->addr); - kad_req_destroy(r); - } - } + dht_msg_t * msg; - /* Refresh unaccessed buckets. */ - bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + msg = dht_wait_for_dht_msg(); + if (msg == NULL) + continue; - pthread_rwlock_unlock(&dht->lock); + tpm_begin_work(dht.tpm); - list_for_each_safe(p, h, &reflist) { - struct contact * c; - c = list_entry(p, struct contact, next); - lu = kad_lookup(dht, c->id, KAD_FIND_NODE); - if (lu != NULL) - lookup_destroy(lu); - list_del(&c->next); - contact_destroy(c); - } + do_dht_msg(msg); - sleep(intv); + tpm_end_work(dht.tpm); } return (void *) 0; } - -static int kad_handle_join_resp(struct dht * dht, - struct kad_req * req, - dht_msg_t * msg) +#ifndef __DHT_TEST__ +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) { - assert(dht); - assert(req); - assert(msg); + struct cmd * cmd; - /* We might send version numbers later to warn of updates if needed. */ - if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && - msg->has_t_refresh && msg->has_t_replicate)) { - log_warn("Join refused by remote."); - return -1; + (void) comp; + + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command malloc failed."); + goto fail_cmd; } - if (msg->b < sizeof(uint64_t)) { - log_err("Hash sizes less than 8 bytes unsupported."); - return -1; + cmd->cbuf.data = malloc(shm_du_buff_len(sdb)); + if (cmd->cbuf.data == NULL) { + log_err("Command buffer malloc failed."); + goto fail_buf; } - pthread_rwlock_wrlock(&dht->lock); + cmd->cbuf.len = shm_du_buff_len(sdb); - dht->buckets = bucket_create(); - if (dht->buckets == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -1; - } + memcpy(cmd->cbuf.data, shm_du_buff_head(sdb), cmd->cbuf.len); - /* Likely corrupt packet. The member will refuse, we might here too. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) - log_warn("Different kademlia parameters detected."); + ipcp_sdb_release(sdb); - if (msg->t_replicate != KAD_T_REPL) - log_warn("Different kademlia replication time detected."); + pthread_mutex_lock(&dht.cmds.mtx); - if (msg->t_refresh != KAD_T_REFR) - log_warn("Different kademlia refresh time detected."); + list_add(&cmd->next, &dht.cmds.list); - dht->k = msg->k; - dht->b = msg->b; - dht->t_expire = msg->t_expire; - dht->t_repub = MAX(1, dht->t_expire - 10); + pthread_cond_signal(&dht.cmds.cond); - if (pthread_create(&dht->worker, NULL, work, dht)) { - bucket_destroy(dht->buckets); - pthread_rwlock_unlock(&dht->lock); - return -1; - } + pthread_mutex_unlock(&dht.cmds.mtx); + + return; + + fail_buf: + free(cmd); + fail_cmd: + ipcp_sdb_release(sdb); + return; +} +#endif - kad_req_respond(req); +int dht_reg(const uint8_t * key) +{ + buffer_t val; - dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + if (addr_to_buf(dht.addr, &val) < 0) { + log_err("Failed to convert address to buffer."); + goto fail_a2b; + } - pthread_rwlock_unlock(&dht->lock); + if (dht_kv_publish(key, val)) { + log_err(KV_FMT " Failed to publish.", KV_VAL(key, val)); + goto fail_publish; + } - log_dbg("Enrollment of DHT completed."); + freebuf(val); return 0; + fail_publish: + freebuf(val); + fail_a2b: + return -1; } -static int kad_handle_find_resp(struct dht * dht, - struct kad_req * req, - dht_msg_t * msg) +int dht_unreg(const uint8_t * key) { - struct lookup * lu; - - assert(dht); - assert(req); - assert(msg); + buffer_t val; - pthread_rwlock_rdlock(&dht->lock); - - lu = dht_find_lookup(dht, req->cookie); - if (lu == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -1; + if (addr_to_buf(dht.addr, &val) < 0) { + log_err("Failed to convert address to buffer."); + goto fail_a2b; } - lookup_update(dht, lu, msg); + if (dht_kv_unpublish(key, val)) { + log_err(KV_FMT " Failed to unpublish.", KV_VAL(key, val)); + goto fail_unpublish; + } - pthread_rwlock_unlock(&dht->lock); + freebuf(val); return 0; + fail_unpublish: + freebuf(val); + fail_a2b: + return -ENOMEM; } -static void kad_handle_response(struct dht * dht, - dht_msg_t * msg) +uint64_t dht_query(const uint8_t * key) { - struct kad_req * req; - - assert(dht); - assert(msg); - - pthread_rwlock_wrlock(&dht->lock); + buffer_t * vals; + ssize_t n; + uint64_t addr; - req = dht_find_request(dht, msg); - if (req == NULL) { - pthread_rwlock_unlock(&dht->lock); - return; + n = dht_kv_retrieve(key, &vals); + if (n < 0) { + log_err(KEY_FMT " Failed to query db.", KEY_VAL(key)); + goto fail_vals; } - bmp_release(dht->cookies, req->cookie); - list_del(&req->next); + if (n == 0) { + log_dbg(KEY_FMT " No local values.", KEY_VAL(key)); + n = dht_kv_query_remote(key, &vals, NULL); + if (n < 0) { + log_warn(KEY_FMT " Failed to query DHT.", KEY_VAL(key)); + goto fail_vals; + } + if (n == 0) { + log_dbg(KEY_FMT " No values.", KEY_VAL(key)); + goto no_vals; + } + } - pthread_rwlock_unlock(&dht->lock); + if (buf_to_addr(vals[0], &addr) < 0) { + log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[0])); + goto fail_b2a; + } - switch(req->code) { - case KAD_JOIN: - if (kad_handle_join_resp(dht, req, msg)) - log_err("Enrollment of DHT failed."); - break; - case KAD_FIND_VALUE: - case KAD_FIND_NODE: - if (dht_get_state(dht) != DHT_RUNNING) - break; - kad_handle_find_resp(dht, req, msg); - break; - default: - break; + if (n > 1 && addr == INVALID_ADDR && buf_to_addr(vals[1], &addr) < 0) { + log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[1])); + goto fail_b2a; } - kad_req_destroy(req); + freebufs(vals, n); + + return addr; + fail_b2a: + freebufs(vals, n); + return INVALID_ADDR; + no_vals: + free(vals); + fail_vals: + return INVALID_ADDR; } -int dht_bootstrap(void * dir) +static int emergency_peer(struct list_head * pl) { - struct dht * dht; + struct peer_entry * e; + struct timespec now; - dht = (struct dht *) dir; + assert(pl != NULL); + assert(list_is_empty(pl)); - assert(dht); + if (dht.peer == INVALID_ADDR) + return -1; - pthread_rwlock_wrlock(&dht->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); -#ifndef __DHT_TEST__ - dht->b = ipcp_dir_hash_len(); -#else - dht->b = DHT_TEST_KEY_LEN; -#endif + e = malloc(sizeof(*e)); + if (e == NULL) { + log_err("Failed to malloc emergency peer entry."); + goto fail_malloc; + } - dht->id = create_id(dht->b); - if (dht->id == NULL) + e->id = dht_dup_key(dht.id.data); + if (e->id == NULL) { + log_err("Failed to duplicate DHT ID for emergency peer."); goto fail_id; + } - dht->buckets = bucket_create(); - if (dht->buckets == NULL) - goto fail_buckets; - - dht->buckets->depth = 0; - dht->buckets->mask = 0; - - dht->t_expire = 86400; /* 1 day */ - dht->t_repub = dht->t_expire - 10; - dht->k = KAD_K; + e->addr = dht.peer; + e->cookie = dht.magic; + e->code = DHT_FIND_NODE_REQ; + e->t_sent = now.tv_sec; - if (pthread_create(&dht->worker, NULL, work, dht)) - goto fail_pthread_create; + list_add_tail(&e->next, pl); - dht->state = DHT_RUNNING; + return 0; + fail_id: + free(e); + fail_malloc: + return -ENOMEM; +} - dht_update_bucket(dht, dht->id, dht->addr); +static int dht_kv_seed_bootstrap_peer(void) +{ + struct list_head pl; - pthread_rwlock_unlock(&dht->lock); + list_head_init(&pl); - return 0; + if (dht.peer == INVALID_ADDR) { + log_dbg("No-one to contact."); + return 0; + } - fail_pthread_create: - bucket_destroy(dht->buckets); - dht->buckets = NULL; - fail_buckets: - free(dht->id); - dht->id = NULL; - fail_id: - pthread_rwlock_unlock(&dht->lock); - return -1; -} + if (emergency_peer(&pl) < 0) { + log_err("Could not create emergency peer."); + goto fail_peer; + } -static struct ref_entry * ref_entry_get(struct dht * dht, - const uint8_t * key) -{ - struct list_head * p; + log_dbg("Pinging emergency peer " ADDR_FMT32 ".", + ADDR_VAL32(&dht.peer)); - list_for_each(p, &dht->refs) { - struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht-> b) ) - return r; + if (dht_kv_query_contacts(dht.id.data, &pl) < 0) { + log_warn("Failed to bootstrap peer."); + goto fail_query; } - return NULL; + peer_list_destroy(&pl); + + return 0; + fail_query: + peer_list_destroy(&pl); + fail_peer: + return -EAGAIN; } -int dht_reg(void * dir, - const uint8_t * key) +static void dht_kv_check_contacts(void) { - struct dht * dht; - struct ref_entry * e; - uint64_t addr; - time_t t_expire; + struct list_head cl; + struct list_head pl; - dht = (struct dht *) dir; + list_head_init(&cl); - assert(dht); - assert(key); - assert(dht->addr != 0); + dht_kv_contact_list(dht.id.data, &cl, dht.k); - if (dht_wait_running(dht)) - return -1; + if (!list_is_empty(&cl)) + goto success; - pthread_rwlock_wrlock(&dht->lock); + contact_list_destroy(&cl); - if (ref_entry_get(dht, key) != NULL) { - log_dbg("Name already registered."); - pthread_rwlock_unlock(&dht->lock); - return 0; - } + list_head_init(&pl); - e = ref_entry_create(dht, key); - if (e == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -ENOMEM; + if (dht.peer == INVALID_ADDR) { + log_dbg("No-one to contact."); + return; } - list_add(&e->next, &dht->refs); + if (emergency_peer(&pl) < 0) { + log_err("Could not create emergency peer."); + goto fail_peer; + } - t_expire = dht->t_expire; - addr = dht->addr; + log_dbg("No contacts found, using emergency peer " ADDR_FMT32 ".", + ADDR_VAL32(&dht.peer)); - pthread_rwlock_unlock(&dht->lock); + dht_kv_query_contacts(dht.id.data, &pl); - kad_publish(dht, key, addr, t_expire); + peer_list_destroy(&pl); - return 0; + return; + success: + contact_list_destroy(&cl); + return; + fail_peer: + return; } -int dht_unreg(void * dir, - const uint8_t * key) +static void dht_kv_remove_expired_reqs(void) { - struct dht * dht; struct list_head * p; struct list_head * h; + struct timespec now; - dht = (struct dht *) dir; - - assert(dht); - assert(key); - - if (dht_get_state(dht) != DHT_RUNNING) - return -1; + clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_wrlock(&dht->lock); + pthread_mutex_lock(&dht.reqs.mtx); - list_for_each_safe(p, h, &dht->refs) { - struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht-> b) ) { - list_del(&r->next); - ref_entry_destroy(r); + list_for_each_safe(p, h, &dht.reqs.list) { + struct dht_req * e; + e = list_entry(p, struct dht_req, next); + if (IS_EXPIRED(e, &now)) { + log_dbg(KEY_FMT " Removing expired request.", + KEY_VAL(e->key)); + list_del(&e->next); + dht_req_destroy(e); + --dht.reqs.len; } } - dht_del(dht, key, dht->addr); + pthread_mutex_unlock(&dht.reqs.mtx); +} + +static void value_list_destroy(struct list_head * vl) +{ + struct list_head * p; + struct list_head * h; - pthread_rwlock_unlock(&dht->lock); + assert(vl != NULL); - return 0; + list_for_each_safe(p, h, vl) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + } } -uint64_t dht_query(void * dir, - const uint8_t * key) +#define MUST_REPLICATE(v, now) ((now)->tv_sec > (v)->t_repl + dht.t_repl) +#define MUST_REPUBLISH(v, now) /* Close to expiry deadline */ \ + (((v)->t_exp - (now)->tv_sec) < (DHT_N_REPUB * dht.t_repl)) +static void dht_entry_get_repl_lists(const struct dht_entry * e, + struct list_head * repl, + struct list_head * rebl, + struct timespec * now) { - struct dht * dht; - struct dht_entry * e; - struct lookup * lu; - uint64_t addrs[KAD_K]; - size_t n; + struct list_head * p; + struct val_entry * n; - dht = (struct dht *) dir; + list_for_each(p, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (MUST_REPLICATE(v, now) && !IS_EXPIRED(v, now)) { + n = val_entry_create(v->val, v->t_exp); + if (n == NULL) + continue; - assert(dht); + list_add_tail(&n->next, repl); + } + } - addrs[0] = 0; + list_for_each(p, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (MUST_REPLICATE(v, now) && MUST_REPUBLISH(v, now)) { + /* Add expire time here, to allow creating val_entry */ + n = val_entry_create(v->val, now->tv_sec + dht.t_expire); + if (n == NULL) + continue; - if (dht_wait_running(dht)) - return 0; + list_add_tail(&n->next, rebl); + } + } +} + +static int dht_kv_next_values(uint8_t * key, + struct list_head * repl, + struct list_head * rebl) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; + struct dht_entry * e = NULL; - pthread_rwlock_rdlock(&dht->lock); + assert(key != NULL); + assert(repl != NULL); + assert(rebl != NULL); - e = dht_find_entry(dht, key); - if (e != NULL) - addrs[0] = dht_entry_get_addr(dht, e); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_rwlock_unlock(&dht->lock); + assert(list_is_empty(repl)); + assert(list_is_empty(rebl)); - if (addrs[0] != 0) - return addrs[0]; + pthread_rwlock_rdlock(&dht.db.lock); - lu = kad_lookup(dht, key, KAD_FIND_VALUE); - if (lu == NULL) - return 0; + if (dht.db.kv.len == 0) + goto no_entries; - n = lookup_get_addrs(lu, addrs); - if (n == 0) { - lookup_destroy(lu); - return 0; + list_for_each_safe(p, h, &dht.db.kv.list) { + e = list_entry(p, struct dht_entry, next); + if (IS_CLOSER(e->key, key)) + continue; /* Already processed */ + } + + if (e != NULL) { + memcpy(key, e->key, dht.id.len); + dht_entry_get_repl_lists(e, repl, rebl, &now); } + no_entries: + pthread_rwlock_unlock(&dht.db.lock); - lookup_destroy(lu); + return list_is_empty(repl) && list_is_empty(rebl) ? -ENOENT : 0; +} - /* Current behaviour is anycast and return the first peer address. */ - if (addrs[0] != dht->addr) - return addrs[0]; +static void dht_kv_replicate_value(const uint8_t * key, + struct val_entry * v, + const struct timespec * now) +{ + assert(MUST_REPLICATE(v, now)); - if (n > 1) - return addrs[1]; + (void) now; - return 0; + if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) { + log_dbg(KV_FMT " Replicated.", KV_VAL(key, v->val)); + return; + } + + log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val)); + + list_del(&v->next); + val_entry_destroy(v); } -static void * dht_handle_packet(void * o) +static void dht_kv_republish_value(const uint8_t * key, + struct val_entry * v, + const struct timespec * now) { - struct dht * dht = (struct dht *) o; + assert(MUST_REPLICATE(v, now)); - assert(dht); + if (MUST_REPUBLISH(v, now)) + assert(v->t_exp >= now->tv_sec + dht.t_expire); - while (true) { - dht_msg_t * msg; - dht_contact_msg_t ** cmsgs; - dht_msg_t resp_msg = DHT_MSG__INIT; - uint64_t addr; - buffer_t buf; - size_t i; - size_t b; - size_t t_expire; - struct cmd * cmd; - - pthread_mutex_lock(&dht->mtx); + if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) { + log_dbg(KV_FMT " Republished.", KV_VAL(key, v->val)); + return; + } - pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + if (MUST_REPUBLISH(v, now)) + log_warn(KV_FMT " Republish failed.", KV_VAL(key, v->val)); + else + log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val)); - while (list_is_empty(&dht->cmds)) - pthread_cond_wait(&dht->cond, &dht->mtx); + list_del(&v->next); + val_entry_destroy(v); +} - cmd = list_last_entry(&dht->cmds, struct cmd, next); - list_del(&cmd->next); +static void dht_kv_update_replication_times(const uint8_t * key, + struct list_head * repl, + struct list_head * rebl, + const struct timespec * now) +{ + struct dht_entry * e; + struct list_head * p; + struct list_head * h; + struct val_entry * v; - pthread_cleanup_pop(true); + assert(key != NULL); + assert(repl != NULL); + assert(rebl != NULL); + assert(now != NULL); - i = shm_du_buff_len(cmd->sdb); + pthread_rwlock_wrlock(&dht.db.lock); - msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); -#ifndef __DHT_TEST__ - ipcp_sdb_release(cmd->sdb); -#endif - free(cmd); + e = __dht_kv_find_entry(key); + if (e == NULL) { + pthread_rwlock_unlock(&dht.db.lock); + return; + } - if (msg == NULL) { - log_err("Failed to unpack message."); + list_for_each_safe(p, h, repl) { + struct val_entry * x; + v = list_entry(p, struct val_entry, next); + x = dht_entry_get_val(e, v->val); + if (x == NULL) { + log_err(KV_FMT " Not in vals.", KV_VAL(key, v->val)); continue; } - if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { - dht_msg__free_unpacked(msg, NULL); - log_dbg("Got a request message when not running."); + x->t_repl = now->tv_sec; + + list_del(&v->next); + val_entry_destroy(v); + } + + list_for_each_safe(p, h, rebl) { + struct val_entry * x; + v = list_entry(p, struct val_entry, next); + x = dht_entry_get_lval(e, v->val); + if (x == NULL) { + log_err(KV_FMT " Not in lvals.", KV_VAL(key, v->val)); continue; } - pthread_rwlock_rdlock(&dht->lock); + x->t_repl = now->tv_sec; + if (v->t_exp > x->t_exp) { + x->t_exp = v->t_exp; /* update expiration time */ + } - b = dht->b; - t_expire = dht->t_expire; + list_del(&v->next); + val_entry_destroy(v); + } - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.db.lock); +} - if (msg->has_key && msg->key.len != b) { - dht_msg__free_unpacked(msg, NULL); - log_warn("Bad key in message."); - continue; - } +static void dht_kv_replicate_values(const uint8_t * key, + struct list_head * repl, + struct list_head * rebl) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; - if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { - dht_msg__free_unpacked(msg, NULL); - log_warn("Bad source ID in message of type %d.", - msg->code); - continue; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - tpm_begin_work(dht->tpm); + list_for_each_safe(p, h, repl) { + struct val_entry * v; + v = list_entry(p, struct val_entry, next); + dht_kv_replicate_value(key, v, &now); + } - addr = msg->s_addr; + list_for_each_safe(p, h, rebl) { + struct val_entry * v; + v = list_entry(p, struct val_entry, next); + dht_kv_republish_value(key, v, &now); + } - resp_msg.code = KAD_RESPONSE; - resp_msg.cookie = msg->cookie; + /* removes non-replicated items from the list */ + dht_kv_update_replication_times(key, repl, rebl, &now); - switch(msg->code) { - case KAD_JOIN: - /* Refuse enrollee on check fails. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { - log_warn("Parameter mismatch. " - "DHT enrolment refused."); - break; - } + if (list_is_empty(repl) && list_is_empty(rebl)) + return; - if (msg->t_replicate != KAD_T_REPL) { - log_warn("Replication time mismatch. " - "DHT enrolment refused."); + log_warn(KEY_FMT " Failed to update replication times.", KEY_VAL(key)); +} - break; - } +static void dht_kv_replicate(void) +{ + struct list_head repl; /* list of values to replicate */ + struct list_head rebl; /* list of local values to republish */ + uint8_t * key; - if (msg->t_refresh != KAD_T_REFR) { - log_warn("Refresh time mismatch. " - "DHT enrolment refused."); - break; - } + key = dht_dup_key(dht.id.data); /* dist == 0 */ + if (key == NULL) { + log_err("Replicate: Failed to duplicate DHT ID."); + return; + } - resp_msg.has_alpha = true; - resp_msg.has_b = true; - resp_msg.has_k = true; - resp_msg.has_t_expire = true; - resp_msg.has_t_refresh = true; - resp_msg.has_t_replicate = true; - resp_msg.alpha = KAD_ALPHA; - resp_msg.b = b; - resp_msg.k = KAD_K; - resp_msg.t_expire = t_expire; - resp_msg.t_refresh = KAD_T_REFR; - resp_msg.t_replicate = KAD_T_REPL; - break; - case KAD_FIND_VALUE: - buf = dht_retrieve(dht, msg->key.data); - if (buf.len != 0) { - resp_msg.n_addrs = buf.len; - resp_msg.addrs = (uint64_t *) buf.data; - break; - } - /* FALLTHRU */ - case KAD_FIND_NODE: - /* Return k closest contacts. */ - resp_msg.n_contacts = - dht_get_contacts(dht, msg->key.data, &cmsgs); - resp_msg.contacts = cmsgs; - break; - case KAD_STORE: - if (msg->n_contacts < 1) { - log_warn("No contacts in store message."); - break; - } + list_head_init(&repl); + list_head_init(&rebl); - if (!msg->has_t_expire) { - log_warn("No expiry time in store message."); - break; - } + while (dht_kv_next_values(key, &repl, &rebl) == 0) { + dht_kv_replicate_values(key, &repl, &rebl); + if (!list_is_empty(&repl)) { + log_warn(KEY_FMT " Replication items left.", + KEY_VAL(key)); + value_list_destroy(&repl); + } - kad_add(dht, *msg->contacts, msg->n_contacts, - msg->t_expire); - break; - case KAD_RESPONSE: - kad_handle_response(dht, msg); - break; - default: - assert(false); - break; + if (!list_is_empty(&rebl)) { + log_warn(KEY_FMT " Republish items left.", + KEY_VAL(key)); + value_list_destroy(&rebl); } + } - if (msg->code != KAD_JOIN) { - pthread_rwlock_wrlock(&dht->lock); - if (dht_get_state(dht) == DHT_JOINING && - dht->buckets == NULL) { - pthread_rwlock_unlock(&dht->lock); - goto finish; - } + free(key); +} - if (dht_update_bucket(dht, msg->s_id.data, addr)) - log_warn("Failed to update bucket."); - pthread_rwlock_unlock(&dht->lock); - } +static void dht_kv_refresh_contacts(void) +{ + struct list_head * p; + struct list_head * h; + struct list_head rl; /* refresh list */ + struct timespec now; - if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) - log_warn("Failed to send response."); + list_head_init(&rl); - finish: - dht_msg__free_unpacked(msg, NULL); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - if (resp_msg.n_addrs > 0) - free(resp_msg.addrs); + pthread_rwlock_rdlock(&dht.db.lock); - if (resp_msg.n_contacts == 0) { - tpm_end_work(dht->tpm); - continue; - } + __dht_kv_bucket_refresh_list(dht.db.contacts.root, now.tv_sec, &rl); - for (i = 0; i < resp_msg.n_contacts; ++i) - dht_contact_msg__free_unpacked(resp_msg.contacts[i], - NULL); - free(resp_msg.contacts); + pthread_rwlock_unlock(&dht.db.lock); - tpm_end_work(dht->tpm); + list_for_each_safe(p, h, &rl) { + struct contact * c; + c = list_entry(p, struct contact, next); + log_dbg(PEER_FMT " Refreshing contact.", + PEER_VAL(c->id, c->addr)); + dht_kv_query_contacts(c->id, NULL); + list_del(&c->next); + contact_destroy(c); } - return (void *) 0; + assert(list_is_empty(&rl)); } -static void dht_post_packet(void * comp, - struct shm_du_buff * sdb) +static void (*tasks[])(void) = { + dht_kv_check_contacts, + dht_kv_remove_expired_entries, + dht_kv_remove_expired_reqs, + dht_kv_replicate, + dht_kv_refresh_contacts, + NULL +}; + +static void * work(void * o) { - struct cmd * cmd; - struct dht * dht = (struct dht *) comp; + struct timespec now = TIMESPEC_INIT_MS(1); + time_t intv; + size_t n; /* number of tasks */ - if (dht_get_state(dht) == DHT_SHUTDOWN) { -#ifndef __DHT_TEST__ - ipcp_sdb_release(sdb); -#endif - return; - } + n = sizeof(tasks) / sizeof(tasks[0]) - 1; /* last is NULL */ - cmd = malloc(sizeof(*cmd)); - if (cmd == NULL) { - log_err("Command failed. Out of memory."); - return; - } + (void) o; - cmd->sdb = sdb; + while (dht_kv_seed_bootstrap_peer() == -EAGAIN) { + ts_add(&now, &now, &now); /* exponential backoff */ + if (now.tv_sec > 1) /* cap at 1 second */ + now.tv_sec = 1; + nanosleep(&now, NULL); + } - pthread_mutex_lock(&dht->mtx); + intv = gcd(dht.t_expire, (dht.t_expire - DHT_N_REPUB * dht.t_repl)); + intv = gcd(intv, gcd(dht.t_repl, dht.t_refresh)) / 2; + intv = MAX(1, intv / n); - list_add(&cmd->next, &dht->cmds); + log_dbg("DHT worker starting %ld seconds interval.", intv * n); - pthread_cond_signal(&dht->cond); + while (true) { + int i = 0; + while (tasks[i] != NULL) { + tasks[i++](); + sleep(intv); + } + } - pthread_mutex_unlock(&dht->mtx); + return (void *) 0; } -void dht_destroy(void * dir) +int dht_start(void) { - struct dht * dht; - struct list_head * p; - struct list_head * h; + dht.state = DHT_RUNNING; - dht = (struct dht *) dir; - if (dht == NULL) - return; + if (tpm_start(dht.tpm)) + goto fail_tpm_start; #ifndef __DHT_TEST__ - tpm_stop(dht->tpm); + if (pthread_create(&dht.worker, NULL, work, NULL)) { + log_err("Failed to create DHT worker thread."); + goto fail_worker; + } - tpm_destroy(dht->tpm); -#endif - if (dht_get_state(dht) == DHT_RUNNING) { - dht_set_state(dht, DHT_SHUTDOWN); - pthread_cancel(dht->worker); - pthread_join(dht->worker, NULL); + dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT); + if ((int) dht.eid < 0) { + log_err("Failed to register DHT component."); + goto fail_reg; } +#else + (void) work; +#endif + return 0; +#ifndef __DHT_TEST__ + fail_reg: + pthread_cancel(dht.worker); + pthread_join(dht.worker, NULL); + fail_worker: + tpm_stop(dht.tpm); +#endif + fail_tpm_start: + dht.state = DHT_INIT; + return -1; +} - pthread_rwlock_wrlock(&dht->lock); +void dht_stop(void) +{ + assert(dht.state == DHT_RUNNING); - list_for_each_safe(p, h, &dht->cmds) { - struct cmd * c = list_entry(p, struct cmd, next); - list_del(&c->next); #ifndef __DHT_TEST__ - ipcp_sdb_release(c->sdb); + dt_unreg_comp(dht.eid); + + pthread_cancel(dht.worker); + pthread_join(dht.worker, NULL); #endif - free(c); - } + tpm_stop(dht.tpm); - list_for_each_safe(p, h, &dht->entries) { - struct dht_entry * e = list_entry(p, struct dht_entry, next); - list_del(&e->next); - dht_entry_destroy(e); - } + dht.state = DHT_INIT; +} - list_for_each_safe(p, h, &dht->requests) { - struct kad_req * r = list_entry(p, struct kad_req, next); - list_del(&r->next); - kad_req_destroy(r); - } +int dht_init(struct dir_dht_config * conf) +{ + struct timespec now; + pthread_condattr_t cattr; - list_for_each_safe(p, h, &dht->refs) { - struct ref_entry * e = list_entry(p, struct ref_entry, next); - list_del(&e->next); - ref_entry_destroy(e); - } + assert(conf != NULL); - list_for_each_safe(p, h, &dht->lookups) { - struct lookup * l = list_entry(p, struct lookup, next); - list_del(&l->next); - lookup_destroy(l); - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_rwlock_unlock(&dht->lock); +#ifndef __DHT_TEST__ + dht.id.len = ipcp_dir_hash_len(); + dht.addr = addr_auth_address(); +#else + dht.id.len = DHT_TEST_KEY_LEN; + dht.addr = DHT_TEST_ADDR; +#endif + dht.t0 = now.tv_sec; + dht.alpha = conf->params.alpha; + dht.k = conf->params.k; + dht.t_expire = conf->params.t_expire; + dht.t_refresh = conf->params.t_refresh; + dht.t_repl = conf->params.t_replicate; + dht.peer = conf->peer; + + dht.magic = generate_cookie(); + + /* Send my address on enrollment */ + conf->peer = dht.addr; + + dht.id.data = generate_id(); + if (dht.id.data == NULL) { + log_err("Failed to create DHT ID."); + goto fail_id; + } - if (dht->buckets != NULL) - bucket_destroy(dht->buckets); + list_head_init(&dht.cmds.list); - bmp_destroy(dht->cookies); + if (pthread_mutex_init(&dht.cmds.mtx, NULL)) { + log_err("Failed to initialize command mutex."); + goto fail_cmds_mutex; + } - pthread_mutex_destroy(&dht->mtx); + if (pthread_cond_init(&dht.cmds.cond, NULL)) { + log_err("Failed to initialize command condvar."); + goto fail_cmds_cond; + } - pthread_rwlock_destroy(&dht->lock); + list_head_init(&dht.reqs.list); + dht.reqs.len = 0; - free(dht->id); + if (pthread_mutex_init(&dht.reqs.mtx, NULL)) { + log_err("Failed to initialize request mutex."); + goto fail_reqs_mutex; + } - free(dht); -} + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize request condvar attributes."); + goto fail_cattr; + } +#ifndef __APPLE__ + if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) { + log_err("Failed to set request condvar clock."); + goto fail_cattr; + } +#endif + if (pthread_cond_init(&dht.reqs.cond, &cattr)) { + log_err("Failed to initialize request condvar."); + goto fail_reqs_cond; + } -static void * join_thr(void * o) -{ - struct join_info * info = (struct join_info *) o; - struct lookup * lu; - size_t retr = 0; + list_head_init(&dht.db.kv.list); + dht.db.kv.len = 0; + dht.db.kv.vals = 0; + dht.db.kv.lvals = 0; - assert(info); + if (pthread_rwlock_init(&dht.db.lock, NULL)) { + log_err("Failed to initialize store rwlock."); + goto fail_rwlock; + } - while (kad_join(info->dht, info->addr)) { - if (dht_get_state(info->dht) == DHT_SHUTDOWN) { - log_dbg("DHT enrollment aborted."); - goto finish; - } + dht.db.contacts.root = bucket_create(); + if (dht.db.contacts.root == NULL) { + log_err("Failed to create DHT buckets."); + goto fail_buckets; + } - if (retr++ == KAD_JOIN_RETR) { - dht_set_state(info->dht, DHT_INIT); - log_warn("DHT enrollment attempt failed."); - goto finish; - } + if (rib_reg(DHT, &r_ops) < 0) { + log_err("Failed to register DHT RIB operations."); + goto fail_rib_reg; + } - sleep(KAD_JOIN_INTV); + dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL); + if (dht.tpm == NULL) { + log_err("Failed to create TPM for DHT."); + goto fail_tpm_create; } - dht_set_state(info->dht, DHT_RUNNING); + if (dht_kv_update_contacts(dht.id.data, dht.addr) < 0) + log_warn("Failed to update contacts with DHT ID."); - lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); - if (lu != NULL) - lookup_destroy(lu); + pthread_condattr_destroy(&cattr); +#ifndef __DHT_TEST__ + log_info("DHT initialized."); + log_dbg(" ID: " HASH_FMT64 " [%zu bytes].", + HASH_VAL64(dht.id.data), dht.id.len); + log_dbg(" address: " ADDR_FMT32 ".", ADDR_VAL32(&dht.addr)); + log_dbg(" peer: " ADDR_FMT32 ".", ADDR_VAL32(&dht.peer)); + log_dbg(" magic cookie: " HASH_FMT64 ".", HASH_VAL64(&dht.magic)); + log_info(" parameters: alpha=%u, k=%zu, t_expire=%ld, " + "t_refresh=%ld, t_replicate=%ld.", + dht.alpha, dht.k, dht.t_expire, dht.t_refresh, dht.t_repl); +#endif + dht.state = DHT_INIT; - finish: - free(info); + return 0; - return (void *) 0; + fail_tpm_create: + rib_unreg(DHT); + fail_rib_reg: + bucket_destroy(dht.db.contacts.root); + fail_buckets: + pthread_rwlock_destroy(&dht.db.lock); + fail_rwlock: + pthread_cond_destroy(&dht.reqs.cond); + fail_reqs_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&dht.reqs.mtx); + fail_reqs_mutex: + pthread_cond_destroy(&dht.cmds.cond); + fail_cmds_cond: + pthread_mutex_destroy(&dht.cmds.mtx); + fail_cmds_mutex: + freebuf(dht.id); + fail_id: + return -1; } -static void handle_event(void * self, - int event, - const void * o) +void dht_fini(void) { - struct dht * dht = (struct dht *) self; + struct list_head * p; + struct list_head * h; - if (event == NOTIFY_DT_CONN_ADD) { - pthread_t thr; - struct join_info * inf; - struct conn * c = (struct conn *) o; - struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK); + rib_unreg(DHT); - /* Give the pff some time to update for the new link. */ - nanosleep(&slack, NULL); + tpm_destroy(dht.tpm); - switch(dht_get_state(dht)) { - case DHT_INIT: - inf = malloc(sizeof(*inf)); - if (inf == NULL) - break; + pthread_mutex_lock(&dht.cmds.mtx); - inf->dht = dht; - inf->addr = c->conn_info.addr; - - if (dht_set_state(dht, DHT_JOINING) == 0 || - dht_wait_running(dht)) { - if (pthread_create(&thr, NULL, join_thr, inf)) { - dht_set_state(dht, DHT_INIT); - free(inf); - return; - } - pthread_detach(thr); - } else { - free(inf); - } - break; - case DHT_RUNNING: - /* - * FIXME: this lookup for effiency reasons - * causes a SEGV when stressed with rapid - * enrollments. - * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); - * if (lu != NULL) - * lookup_destroy(lu); - */ - break; - default: - break; - } + list_for_each_safe(p, h, &dht.cmds.list) { + struct cmd * c = list_entry(p, struct cmd, next); + list_del(&c->next); + freebuf(c->cbuf); + free(c); } -} - -void * dht_create(void) -{ - struct dht * dht; - dht = malloc(sizeof(*dht)); - if (dht == NULL) - goto fail_malloc; + pthread_mutex_unlock(&dht.cmds.mtx); - dht->buckets = NULL; + pthread_cond_destroy(&dht.cmds.cond); + pthread_mutex_destroy(&dht.cmds.mtx); - list_head_init(&dht->entries); - list_head_init(&dht->requests); - list_head_init(&dht->refs); - list_head_init(&dht->lookups); - list_head_init(&dht->cmds); + pthread_mutex_lock(&dht.reqs.mtx); - if (pthread_rwlock_init(&dht->lock, NULL)) - goto fail_rwlock; + list_for_each_safe(p, h, &dht.reqs.list) { + struct dht_req * r = list_entry(p, struct dht_req, next); + list_del(&r->next); + dht_req_destroy(r); + dht.reqs.len--; + } - if (pthread_mutex_init(&dht->mtx, NULL)) - goto fail_mutex; + pthread_mutex_unlock(&dht.reqs.mtx); - if (pthread_cond_init(&dht->cond, NULL)) - goto fail_cond; + pthread_cond_destroy(&dht.reqs.cond); + pthread_mutex_destroy(&dht.reqs.mtx); - dht->cookies = bmp_create(DHT_MAX_REQS, 1); - if (dht->cookies == NULL) - goto fail_bmp; + pthread_rwlock_wrlock(&dht.db.lock); - dht->b = 0; - dht->id = NULL; -#ifndef __DHT_TEST__ - dht->addr = addr_auth_address(); - if (dht->addr == INVALID_ADDR) - goto fail_bmp; + list_for_each_safe(p, h, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + list_del(&e->next); + dht_entry_destroy(e); + dht.db.kv.len--; + } - dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); - if (dht->tpm == NULL) - goto fail_tpm_create; + if (dht.db.contacts.root != NULL) + bucket_destroy(dht.db.contacts.root); - if (tpm_start(dht->tpm)) - goto fail_tpm_start; + pthread_rwlock_unlock(&dht.db.lock); - dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT); - if ((int) dht->eid < 0) - goto fail_tpm_start; + pthread_rwlock_destroy(&dht.db.lock); - if (notifier_reg(handle_event, dht)) - goto fail_notifier_reg; -#else - (void) handle_event; - (void) dht_handle_packet; - (void) dht_post_packet; -#endif - dht->state = DHT_INIT; + assert(dht.db.kv.len == 0); + assert(dht.db.kv.vals == 0); + assert(dht.db.kv.lvals == 0); + assert(dht.reqs.len == 0); - return (void *) dht; -#ifndef __DHT_TEST__ - fail_notifier_reg: - tpm_stop(dht->tpm); - fail_tpm_start: - tpm_destroy(dht->tpm); - fail_tpm_create: - bmp_destroy(dht->cookies); -#endif - fail_bmp: - pthread_cond_destroy(&dht->cond); - fail_cond: - pthread_mutex_destroy(&dht->mtx); - fail_mutex: - pthread_rwlock_destroy(&dht->lock); - fail_rwlock: - free(dht); - fail_malloc: - return NULL; + freebuf(dht.id); } diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h index 311c6b23..852a5130 100644 --- a/src/ipcpd/unicast/dir/dht.h +++ b/src/ipcpd/unicast/dir/dht.h @@ -30,22 +30,19 @@ #include #include -void * dht_create(void); +int dht_init(struct dir_dht_config * conf); -void dht_destroy(void * dir); +void dht_fini(void); -int dht_bootstrap(void * dir); +int dht_start(void); -int dht_reg(void * dir, - const uint8_t * key); +void dht_stop(void); -int dht_unreg(void * dir, - const uint8_t * key); +int dht_reg(const uint8_t * key); -uint64_t dht_query(void * dir, - const uint8_t * key); +int dht_unreg(const uint8_t * key); -int dht_wait_running(void * dir); +uint64_t dht_query(const uint8_t * key); extern struct dir_ops dht_dir_ops; diff --git a/src/ipcpd/unicast/dir/dht.proto b/src/ipcpd/unicast/dir/dht.proto index 4c5b06db..ea74805f 100644 --- a/src/ipcpd/unicast/dir/dht.proto +++ b/src/ipcpd/unicast/dir/dht.proto @@ -27,19 +27,32 @@ message dht_contact_msg { required uint64 addr = 2; } +message dht_find_req_msg { + required uint64 cookie = 1; + required bytes key = 2; +} + +message dht_find_node_rsp_msg { + required uint64 cookie = 1; + required bytes key = 2; + repeated dht_contact_msg contacts = 3; +} + +message dht_find_value_rsp_msg { + repeated bytes values = 1; +} + +message dht_store_msg { + required bytes key = 1; + required bytes val = 2; + required uint32 exp = 3; +} + message dht_msg { - required uint32 code = 1; - required uint32 cookie = 2; - required uint64 s_addr = 3; - optional bytes s_id = 4; - optional bytes key = 5; - repeated uint64 addrs = 6; - repeated dht_contact_msg contacts = 7; - // enrolment parameters - optional uint32 alpha = 8; - optional uint32 b = 9; - optional uint32 k = 10; - optional uint32 t_expire = 11; - optional uint32 t_refresh = 12; - optional uint32 t_replicate = 13; + required uint32 code = 1; + required dht_contact_msg src = 2; + optional dht_store_msg store = 3; + optional dht_find_req_msg find = 4; + optional dht_find_node_rsp_msg node = 5; + optional dht_find_value_rsp_msg val = 6; } diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h index 6ff61ce6..8c6e5eb5 100644 --- a/src/ipcpd/unicast/dir/ops.h +++ b/src/ipcpd/unicast/dir/ops.h @@ -23,24 +23,20 @@ #ifndef OUROBOROS_IPCPD_UNICAST_DIR_OPS_H #define OUROBOROS_IPCPD_UNICAST_DIR_OPS_H - struct dir_ops { - void * (* create)(void); + int (* init)(void * config); - void (* destroy)(void * dir); + void (* fini)(void); - int (* bootstrap)(void * dir); + int (* start)(void); - int (* reg)(void * dir, - const uint8_t * hash); + void (* stop)(void); - int (* unreg)(void * dir, - const uint8_t * hash); + int (* reg)(const uint8_t * hash); - uint64_t (* query)(void * dir, - const uint8_t * hash); + int (* unreg)(const uint8_t * hash); - int (* wait_running)(void * dir); + uint64_t (* query)(const uint8_t * hash); }; #endif /* OUROBOROS_IPCPD_UNICAST_DIR_OPS_H */ diff --git a/src/ipcpd/unicast/dir/tests/CMakeLists.txt b/src/ipcpd/unicast/dir/tests/CMakeLists.txt index f62ed993..41a18c27 100644 --- a/src/ipcpd/unicast/dir/tests/CMakeLists.txt +++ b/src/ipcpd/unicast/dir/tests/CMakeLists.txt @@ -15,6 +15,9 @@ include_directories(${CMAKE_BINARY_DIR}/include) get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) +set(DEBUG_PROTO_DHT FALSE CACHE BOOL + "Add DHT protocol message output to debug logging") + create_test_sourcelist(${PARENT_DIR}_tests test_suite.c # Add new tests here dht_test.c diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c index bea2c3e7..72e8f0df 100644 --- a/src/ipcpd/unicast/dir/tests/dht_test.c +++ b/src/ipcpd/unicast/dir/tests/dht_test.c @@ -4,7 +4,6 @@ * Unit tests of the DHT * * Dimitri Staessens - * Sander Vrijders * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as @@ -21,76 +20,1899 @@ */ #define __DHT_TEST__ -#define DHT_TEST_KEY_LEN 32 -#include "dht.c" +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include +#include +#include + +#include "dht.pb-c.h" -#include +#include +#include #include #include #include -#define CONTACTS 1000 +#define DHT_MAX_RAND_SIZE 64 +#define DHT_TEST_KEY_LEN 32 +#define DHT_TEST_ADDR 0x1234567890abcdefULL -int dht_test(int argc, - char ** argv) +/* forward declare for use in the dht code */ +/* Packet sink for DHT tests */ +struct { + bool enabled; + + struct list_head list; + size_t len; +} sink; + +struct message { + struct list_head next; + void * msg; + uint64_t dst; +}; + +static int sink_send_msg(buffer_t * pkt, + uint64_t addr) { - struct dht * dht; - uint8_t key[DHT_TEST_KEY_LEN]; - size_t i; + struct message * m; - (void) argc; - (void) argv; + assert(pkt != NULL); + assert(addr != 0); + + assert(!list_is_empty(&sink.list) || sink.len == 0); + + if (!sink.enabled) + goto finish; + + m = malloc(sizeof(*m)); + if (m == NULL) { + printf("Failed to malloc message."); + goto fail_malloc; + } + + m->msg = dht_msg__unpack(NULL, pkt->len, pkt->data); + if (m->msg == NULL) + goto fail_unpack; + + m->dst = addr; + + list_add_tail(&m->next, &sink.list); + + ++sink.len; + finish: + freebuf(*pkt); + + return 0; + fail_unpack: + free(m); + fail_malloc: + freebuf(*pkt); + return -1; +} + +#include "dht.c" + +/* Test helpers */ + +static void sink_init(void) +{ + list_head_init(&sink.list); + sink.len = 0; + sink.enabled = true; +} + +static void sink_clear(void) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &sink.list) { + struct message * m = list_entry(p, struct message, next); + list_del(&m->next); + dht_msg__free_unpacked((dht_msg_t *) m->msg, NULL); + free(m); + --sink.len; + } + + assert(list_is_empty(&sink.list)); +} + +static void sink_fini(void) +{ + sink_clear(); + + assert(list_is_empty(&sink.list) || sink.len != 0); +} + +static dht_msg_t * sink_read(void) +{ + struct message * m; + dht_msg_t * msg; + + assert(!list_is_empty(&sink.list) || sink.len == 0); + + if (list_is_empty(&sink.list)) + return NULL; + + m = list_first_entry(&sink.list, struct message, next); + + --sink.len; + + list_del(&m->next); + + msg = m->msg; + + free(m); + + return (dht_msg_t *) msg; +} + +static const buffer_t test_val = { + .data = (uint8_t *) "test_value", + .len = 10 +}; + +static const buffer_t test_val2 = { + .data = (uint8_t *) "test_value_2", + .len = 12 +}; + +static struct dir_dht_config test_dht_config = default_dht_config; + +static int random_value_len(buffer_t * b) +{ + assert(b != NULL); + assert(b->len > 0 && b->len <= DHT_MAX_RAND_SIZE); + + b->data = malloc(b->len); + if (b->data == NULL) + goto fail_malloc; + + random_buffer(b->data, b->len); + + return 0; + + fail_malloc: + return -ENOMEM; +} + +static int random_value(buffer_t * b) +{ + assert(b != NULL); + + b->len = rand() % DHT_MAX_RAND_SIZE + 1; - dht = dht_create(); - if (dht == NULL) { + return random_value_len(b); +} + +static int fill_dht_with_contacts(size_t n) +{ + size_t i; + uint8_t * id; + + for (i = 0; i < n; i++) { + uint64_t addr = generate_cookie(); + id = generate_id(); + if (id == NULL) + goto fail_id; + + if (dht_kv_update_contacts(id, addr) < 0) + goto fail_update; + free(id); + } + + return 0; + + fail_update: + free(id); + fail_id: + return -1; +} + +static int fill_store_with_random_values(const uint8_t * key, + size_t len, + size_t n_values) +{ + buffer_t val; + struct timespec now; + size_t i; + uint8_t * _key; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + for (i = 0; i < n_values; ++i) { + if (key != NULL) + _key = (uint8_t *) key; + else { + _key = generate_id(); + if (_key == NULL) + goto fail_key; + } + + if (len == 0) + val.len = rand() % DHT_MAX_RAND_SIZE + 1; + else + val.len = len; + + if (random_value_len(&val) < 0) + goto fail_value; + + if (dht_kv_store(_key, val, now.tv_sec + 10) < 0) + goto fail_store; + + freebuf(val); + if (key == NULL) + free(_key); + } + + return 0; + + fail_store: + freebuf(val); + fail_value: + free(_key); + fail_key: + return -1; +} + +static int random_contact_list(dht_contact_msg_t *** contacts, + size_t max) +{ + size_t i; + + assert(contacts != NULL); + assert(max > 0); + + *contacts = malloc(max * sizeof(**contacts)); + if (*contacts == NULL) + goto fail_malloc; + + for (i = 0; i < max; i++) { + (*contacts)[i] = malloc(sizeof(*(*contacts)[i])); + if ((*contacts)[i] == NULL) + goto fail_contacts; + + dht_contact_msg__init((*contacts)[i]); + + (*contacts)[i]->id.data = generate_id(); + if ((*contacts)[i]->id.data == NULL) + goto fail_contact; + + (*contacts)[i]->id.len = dht.id.len; + (*contacts)[i]->addr = generate_cookie(); + } + + return 0; + + fail_contact: + dht_contact_msg__free_unpacked((*contacts)[i], NULL); + fail_contacts: + while (i-- > 0) + free((*contacts)[i]); + free(*contacts); + fail_malloc: + return -ENOMEM; +} + +static void clear_contacts(dht_contact_msg_t ** contacts, + size_t len) +{ + size_t i; + + assert(contacts != NULL); + if (*contacts == NULL) + return; + + for (i = 0; i < len; ++i) + dht_contact_msg__free_unpacked((contacts)[i], NULL); + + free(*contacts); + *contacts = NULL; +} + +/* Start of actual tests */ + +static int test_dht_init_fini(void) +{ + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { printf("Failed to create dht.\n"); - return -1; + goto fail_init; } - dht_destroy(dht); + dht_fini(); + + TEST_SUCCESS(); - dht = dht_create(); - if (dht == NULL) { - printf("Failed to re-create dht.\n"); - return -1; + return TEST_RC_SUCCESS; + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_start_stop(void) +{ + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; } - if (dht_bootstrap(dht)) { - printf("Failed to bootstrap dht.\n"); - dht_destroy(dht); - return -1; + if (dht_start() < 0) { + printf("Failed to start dht.\n"); + goto fail_start; } - dht_destroy(dht); + dht_stop(); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; - dht = dht_create(); - if (dht == NULL) { - printf("Failed to re-create dht.\n"); - return -1; + fail_start: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_val_entry_create_destroy(void) +{ + struct val_entry * e; + struct timespec now; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; } - if (dht_bootstrap(dht)) { - printf("Failed to bootstrap dht.\n"); - dht_destroy(dht); - return -1; + e = val_entry_create(test_val, now.tv_sec + 10); + if (e == NULL) { + printf("Failed to create val entry.\n"); + goto fail_entry; } - for (i = 0; i < CONTACTS; ++i) { - uint64_t addr; - random_buffer(&addr, sizeof(addr)); - random_buffer(key, DHT_TEST_KEY_LEN); - pthread_rwlock_wrlock(&dht->lock); - if (dht_update_bucket(dht, key, addr)) { - pthread_rwlock_unlock(&dht->lock); - printf("Failed to update bucket.\n"); - dht_destroy(dht); - return -1; - } - pthread_rwlock_unlock(&dht->lock); + val_entry_destroy(e); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_entry: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_entry_create_destroy(void) +{ + struct dht_entry * e; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; } - dht_destroy(dht); + e = dht_entry_create(dht.id.data); + if (e == NULL) { + printf("Failed to create dht entry.\n"); + goto fail_entry; + } - return 0; + dht_entry_destroy(e); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_entry: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_entry_update_get_val(void) +{ + struct dht_entry * e; + struct val_entry * v; + struct timespec now; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + e = dht_entry_create(dht.id.data); + if (e == NULL) { + printf("Failed to create dht entry.\n"); + goto fail_entry; + } + + if (dht_entry_get_val(e, test_val) != NULL) { + printf("Found value in empty dht entry.\n"); + goto fail_get; + } + + if (dht_entry_update_val(e, test_val, now.tv_sec + 10) < 0) { + printf("Failed to update dht entry value.\n"); + goto fail_get; + } + + if (dht_entry_get_val(e, test_val2) != NULL) { + printf("Found value in dht entry with different key.\n"); + goto fail_get; + } + + v = dht_entry_get_val(e, test_val); + if (v == NULL) { + printf("Failed to get value from dht entry.\n"); + goto fail_get; + } + + if (v->val.len != test_val.len) { + printf("Length in dht entry does not match expected.\n"); + goto fail_get; + } + + if(memcmp(v->val.data, test_val.data, test_val.len) != 0) { + printf("Data in dht entry does not match expected.\n"); + goto fail_get; + } + + if (dht_entry_update_val(e, test_val, now.tv_sec + 15) < 0) { + printf("Failed to update exsting dht entry value.\n"); + goto fail_get; + } + + if (v->t_exp != now.tv_sec + 15) { + printf("Expiration time in dht entry value not updated.\n"); + goto fail_get; + } + + if (dht_entry_update_val(e, test_val, now.tv_sec + 5) < 0) { + printf("Failed to update existing dht entry value (5).\n"); + goto fail_get; + } + + if (v->t_exp != now.tv_sec + 15) { + printf("Expiration time in dht entry shortened.\n"); + goto fail_get; + } + + if (dht_entry_get_val(e, test_val) != v) { + printf("Wrong value in dht entry found after update.\n"); + goto fail_get; + } + + dht_entry_destroy(e); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_get: + dht_entry_destroy(e); + fail_entry: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_entry_update_get_lval(void) +{ + struct dht_entry * e; + struct val_entry * v; + struct timespec now; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + e = dht_entry_create(dht.id.data); + if (e == NULL) { + printf("Failed to create dht entry.\n"); + goto fail_entry; + } + + if (dht_entry_get_lval(e, test_val) != NULL) { + printf("Found value in empty dht entry.\n"); + goto fail_get; + } + + if (dht_entry_update_lval(e, test_val) < 0) { + printf("Failed to update dht entry value.\n"); + goto fail_get; + } + + v = dht_entry_get_lval(e, test_val); + if (v== NULL) { + printf("Failed to get value from dht entry.\n"); + goto fail_get; + } + + if (dht_entry_get_lval(e, test_val2) != NULL) { + printf("Found value in dht entry in vals.\n"); + goto fail_get; + } + + if (v->val.len != test_val.len) { + printf("Length in dht entry does not match expected.\n"); + goto fail_get; + } + + if(memcmp(v->val.data, test_val.data, test_val.len) != 0) { + printf("Data in dht entry does not match expected.\n"); + goto fail_get; + } + + if (dht_entry_update_lval(e, test_val) < 0) { + printf("Failed to update existing dht entry value.\n"); + goto fail_get; + } + + if (dht_entry_get_lval(e, test_val) != v) { + printf("Wrong value in dht entry found after update.\n"); + goto fail_get; + } + + dht_entry_destroy(e); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_get: + dht_entry_destroy(e); + fail_entry: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_contact_create_destroy(void) +{ + struct contact * c; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + c = contact_create(dht.id.data, dht.addr); + if (c == NULL) { + printf("Failed to create contact.\n"); + goto fail_contact; + } + + contact_destroy(c); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_contact: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_update_bucket(void) +{ + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(1000) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_update; + } + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_update: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_contact_list(void) +{ + struct list_head cl; + ssize_t len; + ssize_t items; + + TEST_START(); + + list_head_init(&cl); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + items = 5; + + if (fill_dht_with_contacts(items) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_fill; + } + + len = dht_kv_contact_list(dht.id.data, &cl, dht.k); + if (len < 0) { + printf("Failed to get contact list.\n"); + goto fail_fill; + } + + if (len != items) { + printf("Failed to get contacts (%zu != %zu).\n", len, items); + goto fail_contact_list; + } + + contact_list_destroy(&cl); + + items = 100; + + if (fill_dht_with_contacts(items) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_fill; + } + + len = dht_kv_contact_list(dht.id.data, &cl, items); + if (len < 0) { + printf("Failed to get contact list.\n"); + goto fail_fill; + } + + if ((size_t) len < dht.k) { + printf("Failed to get contacts (%zu < %zu).\n", len, dht.k); + goto fail_contact_list; + } + + contact_list_destroy(&cl); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_contact_list: + contact_list_destroy(&cl); + fail_fill: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_get_values(void) +{ + buffer_t * vals; + ssize_t len; + size_t n = sizeof(uint64_t); + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_store_with_random_values(dht.id.data, n, 3) < 0) { + printf("Failed to fill store with random values.\n"); + goto fail_fill; + } + + len = dht_kv_retrieve(dht.id.data, &vals); + if (len < 0) { + printf("Failed to get values from store.\n"); + goto fail_fill; + } + + if (len != 3) { + printf("Failed to get %ld values (%zu).\n", 3L, len); + goto fail_get_values; + } + + freebufs(vals, len); + + if (fill_store_with_random_values(dht.id.data, n, 20) < 0) { + printf("Failed to fill store with random values.\n"); + goto fail_fill; + } + + len = dht_kv_retrieve(dht.id.data, &vals); + if (len < 0) { + printf("Failed to get values from store.\n"); + goto fail_fill; + } + + if (len != DHT_MAX_VALS) { + printf("Failed to get %d values.\n", DHT_MAX_VALS); + goto fail_get_values; + } + + freebufs(vals, len); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_get_values: + freebufs(vals, len); + fail_fill: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_req_msg(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_find_node_req_msg(dht.id.data); + if (msg == NULL) { + printf("Failed to get find node request message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_NODE_REQ) { + printf("Wrong code in find_node_req message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_FIND_NODE_REQ]); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_node_req.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_node_req buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_node_req message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_req message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_rsp_msg(void) +{ + dht_contact_msg_t ** contacts; + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, 0); + if (msg == NULL) { + printf("Failed to get find node response message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_NODE_RSP) { + printf("Wrong code in find_node_rsp message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_FIND_NODE_RSP]); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_node_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_node_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_node_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_node_rsp message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_rsp_msg_contacts(void) +{ + dht_contact_msg_t ** contacts; + dht_msg_t * msg; + dht_msg_t * upk; + uint8_t * buf; + size_t len; + ssize_t n; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(100) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_fill; + } + + n = dht_kv_get_contacts(dht.id.data, &contacts); + if (n < 0) { + printf("Failed to get contacts.\n"); + goto fail_fill; + } + + if ((size_t) n < dht.k) { + printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k); + goto fail_fill; + } + + msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, n); + if (msg == NULL) { + printf("Failed to build find node response message.\n"); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_node_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_node_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_node_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_node_rsp message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + clear_contacts(contacts, n); + fail_fill: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_req_msg(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_find_value_req_msg(dht.id.data); + if (msg == NULL) { + printf("Failed to build find value request message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_VALUE_REQ) { + printf("Wrong code in find_value_req message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_FIND_VALUE_REQ]); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_value_req.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_node_req buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_value_req message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_req message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, NULL, 0, NULL, 0); + if (msg == NULL) { + printf("Failed to build find value response message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_VALUE_RSP) { + printf("Wrong code in find_value_rsp message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_FIND_VALUE_RSP]); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_value_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_value_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_value_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_rsp message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg_contacts(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + dht_contact_msg_t ** contacts; + ssize_t n; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(100) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_fill; + } + + n = dht_kv_get_contacts(dht.id.data, &contacts); + if (n < 0) { + printf("Failed to get contacts.\n"); + goto fail_fill; + } + + if ((size_t) n < dht.k) { + printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k); + goto fail_fill; + } + + msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, &contacts, n, NULL, 0); + if (msg == NULL) { + printf("Failed to build find value response message.\n"); + goto fail_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_value_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_value_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_value_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_rsp message.\n"); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + clear_contacts(contacts, n); + fail_fill: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg_values(void) +{ + dht_msg_t * msg; + dht_msg_t * upk; + size_t len; + uint8_t * buf; + buffer_t * values; + size_t i; + uint64_t ck; + + TEST_START(); + + ck = generate_cookie(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + values = malloc(sizeof(*values) * 8); + if (values == NULL) { + printf("Failed to malloc values.\n"); + goto fail_values; + } + + for (i = 0; i < 8; i++) { + if (random_value(&values[i]) < 0) { + printf("Failed to create random value.\n"); + goto fail_fill; + } + } + + msg = dht_kv_find_value_rsp_msg(dht.id.data, ck, NULL, 0, &values, 8); + if (msg == NULL) { + printf("Failed to build find value response message.\n"); + goto fail_msg; + } + + values = NULL; /* msg owns the values now */ + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed length of find_value_rsp.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc find_value_rsp buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack find_value_rsp message.\n"); + goto fail_pack; + } + + upk = dht_msg__unpack(NULL, len, buf); + if (upk == NULL) { + printf("Failed to unpack find_value_rsp message.\n"); + goto fail_unpack; + } + + if (upk->code != DHT_FIND_VALUE_RSP) { + printf("Wrong code in find_value_rsp message (%s != %s).\n", + dht_code_str[upk->code], + dht_code_str[DHT_FIND_VALUE_RSP]); + goto fail_unpack; + } + + if (upk->val == NULL) { + printf("No values in find_value_rsp message.\n"); + goto fail_unpack; + } + + if (upk->val->n_values != 8) { + printf("Not enough values in find_value_rsp (%zu != %zu).\n", + upk->val->n_values, 8UL); + goto fail_unpack; + } + + free(buf); + dht_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(upk, NULL); + + free(values); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_unpack: + dht_msg__free_unpacked(msg, NULL); + fail_pack: + free(buf); + fail_msg: + fail_fill: + while((i--) > 0) + freebuf(values[i]); + free(values); + fail_values: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_store_msg(void) +{ + dht_msg_t * msg; + size_t len; + uint8_t * buf; + struct timespec now; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + msg = dht_kv_store_msg(dht.id.data, test_val, now.tv_sec + 10); + if (msg == NULL) { + printf("Failed to get store message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_STORE) { + printf("Wrong code in store message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_STORE]); + goto fail_store_msg; + } + + if (dht_kv_validate_msg(msg) < 0) { + printf("Failed to validate store message.\n"); + goto fail_store_msg; + } + + len = dht_msg__get_packed_size(msg); + if (len == 0) { + printf("Failed to get packed msg length.\n"); + goto fail_msg; + } + + buf = malloc(len); + if (buf == NULL) { + printf("Failed to malloc store msg buf.\n"); + goto fail_msg; + } + + if (dht_msg__pack(msg, buf) != len) { + printf("Failed to pack store message.\n"); + goto fail_pack; + } + + free(buf); + + dht_msg__free_unpacked(msg, NULL); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_pack: + free(buf); + fail_store_msg: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_kv_query_contacts_req_rsp(void) +{ + dht_msg_t * req; + dht_msg_t * rsp; + dht_contact_msg_t ** contacts; + size_t len = 2; + + uint8_t * key; + + TEST_START(); + + sink_init(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(1) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_prep; + } + + key = generate_id(); + if (key == NULL) { + printf("Failed to generate key.\n"); + goto fail_prep; + } + + if (dht_kv_query_contacts(key, NULL) < 0) { + printf("Failed to query contacts.\n"); + goto fail_query; + } + + req = sink_read(); + if (req == NULL) { + printf("Failed to read request from sink.\n"); + goto fail_query; + } + + if (dht_kv_validate_msg(req) < 0) { + printf("Failed to validate find node req.\n"); + goto fail_val_req; + } + + if (random_contact_list(&contacts, len) < 0) { + printf("Failed to create random contact.\n"); + goto fail_val_req; + } + + rsp = dht_kv_find_node_rsp_msg(key, req->find->cookie, &contacts, len); + if (rsp == NULL) { + printf("Failed to create find node response message.\n"); + goto fail_rsp; + } + + memcpy(rsp->src->id.data, dht.id.data, dht.id.len); + rsp->src->addr = generate_cookie(); + + if (dht_kv_validate_msg(rsp) < 0) { + printf("Failed to validate find node response message.\n"); + goto fail_val_rsp; + } + + do_dht_kv_find_node_rsp(rsp->node); + + /* dht_contact_msg__free_unpacked(contacts[0], NULL); set to NULL */ + + free(contacts); + + dht_msg__free_unpacked(rsp, NULL); + + free(key); + + dht_msg__free_unpacked(req, NULL); + + sink_fini(); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_val_rsp: + dht_msg__free_unpacked(rsp, NULL); + fail_rsp: + while (len-- > 0) + dht_contact_msg__free_unpacked(contacts[len], NULL); + free(contacts); + fail_val_req: + dht_msg__free_unpacked(req, NULL); + fail_query: + free(key); + fail_prep: + dht_fini(); + fail_init: + sink_fini(); + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_req_create_destroy(void) +{ + struct dht_req * req; + + TEST_START(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + req = dht_req_create(dht.id.data); + if (req == NULL) { + printf("Failed to create kad request.\n"); + goto fail_req; + } + + dht_req_destroy(req); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_req: + dht_fini(); + fail_init: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_reg_unreg(void) +{ + TEST_START(); + + sink_init(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (dht_reg(dht.id.data) < 0) { + printf("Failed to register own id.\n"); + goto fail_reg; + } + + if (sink.len != 0) { + printf("Packet sent without contacts!"); + goto fail_msg; + } + + if (dht_unreg(dht.id.data) < 0) { + printf("Failed to unregister own id.\n"); + goto fail_msg; + } + + dht_fini(); + + sink_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_msg: + dht_unreg(dht.id.data); + fail_reg: + dht_fini(); + fail_init: + sink_fini(); + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_reg_unreg_contacts(void) +{ + dht_msg_t * msg; + + TEST_START(); + + sink_init(); + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(4) < 0) { + printf("Failed to fill bucket with contacts.\n"); + goto fail_reg; + } + + if (dht_reg(dht.id.data) < 0) { + printf("Failed to register own id.\n"); + goto fail_reg; + } + + if (sink.len != dht.alpha) { + printf("Packet sent to too few contacts!\n"); + goto fail_msg; + } + + msg = sink_read(); + if (msg == NULL) { + printf("Failed to read message from sink.\n"); + goto fail_msg; + } + + if (msg->code != DHT_STORE) { + printf("Wrong code in dht reg message (%s != %s).\n", + dht_code_str[msg->code], + dht_code_str[DHT_STORE]); + goto fail_validation; + } + + if (dht_kv_validate_msg(msg) < 0) { + printf("Failed to validate dht message.\n"); + goto fail_validation; + } + + if (dht_unreg(dht.id.data) < 0) { + printf("Failed to unregister own id.\n"); + goto fail_validation; + } + + dht_msg__free_unpacked(msg, NULL); + + dht_fini(); + + sink_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_validation: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + sink_clear(); + dht_unreg(dht.id.data); + fail_reg: + dht_fini(); + fail_init: + sink_fini(); + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_reg_query_local(void) +{ + struct timespec now; + buffer_t test_addr; + + TEST_START(); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + if (addr_to_buf(1234321, &test_addr) < 0) { + printf("Failed to convert test address to buffer.\n"); + goto fail_buf; + } + + if (dht_init(&test_dht_config) < 0) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (dht_reg(dht.id.data) < 0) { + printf("Failed to register own id.\n"); + goto fail_reg; + } + + if (dht_query(dht.id.data) == dht.addr) { + printf("Succeeded to query own id.\n"); + goto fail_get; + } + + if (dht_kv_store(dht.id.data, test_addr, now.tv_sec + 5) < 0) { + printf("Failed to publish value.\n"); + goto fail_get; + } + + if (dht_query(dht.id.data) != 1234321) { + printf("Failed to return remote addr.\n"); + goto fail_get; + } + + if (dht_unreg(dht.id.data) < 0) { + printf("Failed to unregister own id.\n"); + goto fail_get; + } + + freebuf(test_addr); + + dht_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_get: + dht_unreg(dht.id.data); + fail_reg: + dht_fini(); + fail_init: + freebuf(test_addr); + fail_buf: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_dht_query(void) +{ + uint8_t * key; + struct dir_dht_config cfg; + + TEST_START(); + + sink_init(); + + cfg = default_dht_config; + cfg.peer = generate_cookie(); + + if (dht_init(&cfg)) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + key = generate_id(); + if (key == NULL) { + printf("Failed to generate key.\n"); + goto fail_key; + } + + if (dht_query(key) != INVALID_ADDR) { + printf("Succeeded to get address without contacts.\n"); + goto fail_get; + } + + if (sink.len != 0) { + printf("Packet sent without contacts!"); + goto fail_test; + } + + free(key); + + dht_fini(); + + sink_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + + fail_test: + sink_clear(); + fail_get: + free(key); + fail_key: + dht_fini(); + fail_init: + sink_fini(); + return TEST_RC_FAIL; +} + +static int test_dht_query_contacts(void) +{ + dht_msg_t * msg; + uint8_t * key; + struct dir_dht_config cfg; + + + TEST_START(); + + sink_init(); + + cfg = default_dht_config; + cfg.peer = generate_cookie(); + + if (dht_init(&cfg)) { + printf("Failed to create dht.\n"); + goto fail_init; + } + + if (fill_dht_with_contacts(10) < 0) { + printf("Failed to fill with contacts!"); + goto fail_contacts; + } + + key = generate_id(); + if (key == NULL) { + printf("Failed to generate key."); + goto fail_contacts; + } + + if (dht_query(key) != INVALID_ADDR) { + printf("Succeeded to get address for random id.\n"); + goto fail_query; + } + + msg = sink_read(); + if (msg == NULL) { + printf("Failed to read message.!\n"); + goto fail_read; + } + + if (dht_kv_validate_msg(msg) < 0) { + printf("Failed to validate dht message.\n"); + goto fail_msg; + } + + if (msg->code != DHT_FIND_VALUE_REQ) { + printf("Failed to validate dht message.\n"); + goto fail_msg; + } + + dht_msg__free_unpacked(msg, NULL); + + free(key); + + sink_clear(); + + dht_fini(); + + sink_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_read: + sink_clear(); + fail_query: + free(key); + fail_contacts: + dht_fini(); + fail_init: + sink_fini(); + return TEST_RC_FAIL; +} + +int dht_test(int argc, + char ** argv) +{ + int rc = 0; + + (void) argc; + (void) argv; + + rc |= test_dht_init_fini(); + rc |= test_dht_start_stop(); + rc |= test_val_entry_create_destroy(); + rc |= test_dht_entry_create_destroy(); + rc |= test_dht_entry_update_get_val(); + rc |= test_dht_entry_update_get_lval(); + rc |= test_dht_kv_contact_create_destroy(); + rc |= test_dht_kv_contact_list(); + rc |= test_dht_kv_update_bucket(); + rc |= test_dht_kv_get_values(); + rc |= test_dht_kv_find_node_req_msg(); + rc |= test_dht_kv_find_node_rsp_msg(); + rc |= test_dht_kv_find_node_rsp_msg_contacts(); + rc |= test_dht_kv_query_contacts_req_rsp(); + rc |= test_dht_kv_find_value_req_msg(); + rc |= test_dht_kv_find_value_rsp_msg(); + rc |= test_dht_kv_find_value_rsp_msg_contacts(); + rc |= test_dht_kv_find_value_rsp_msg_values(); + rc |= test_dht_kv_store_msg(); + rc |= test_dht_req_create_destroy(); + rc |= test_dht_reg_unreg(); + rc |= test_dht_reg_unreg_contacts(); + rc |= test_dht_reg_query_local(); + rc |= test_dht_query(); + rc |= test_dht_query_contacts(); + + return rc; } diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index 14eaac09..a4e3be36 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -60,7 +60,7 @@ #include #define QOS_BLOCK_LEN 672 -#define RIB_FILE_STRLEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX) +#define RIB_FILE_STRLEN (169 + RIB_TM_STRLEN + QOS_BLOCK_LEN * QOS_CUBE_MAX) #define RIB_NAME_STRLEN 256 #ifndef CLOCK_REALTIME_COARSE @@ -189,7 +189,7 @@ static int dt_rib_read(const char * path, char str[QOS_BLOCK_LEN + 1]; char addrstr[20]; char * entry; - char tmstr[20]; + char tmstr[RIB_TM_STRLEN]; size_t rxqlen = 0; size_t txqlen = 0; struct tm * tm; @@ -217,8 +217,8 @@ static int dt_rib_read(const char * path, else sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr); - tm = localtime(&dt.stat[fd].stamp); - strftime(tmstr, sizeof(tmstr), "%F %T", tm); + tm = gmtime(&dt.stat[fd].stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); if (fd >= PROG_RES_FDS) { fccntl(fd, FLOWGRXQLEN, &rxqlen); @@ -226,11 +226,11 @@ static int dt_rib_read(const char * path, } sprintf(buf, - "Flow established at: %20s\n" + "Flow established at: %.*s\n" "Endpoint address: %20s\n" "Queued packets (rx): %20zu\n" "Queued packets (tx): %20zu\n\n", - tmstr, addrstr, rxqlen, txqlen); + RIB_TM_STRLEN - 1, tmstr, addrstr, rxqlen, txqlen); for (i = 0; i < QOS_CUBE_MAX; ++i) { sprintf(str, "Qos cube %3d:\n" @@ -287,49 +287,46 @@ static int dt_rib_readdir(char *** buf) pthread_rwlock_rdlock(&dt.lock); - if (dt.n_flows < 1) { - pthread_rwlock_unlock(&dt.lock); - return 0; - } + if (dt.n_flows < 1) + goto no_flows; *buf = malloc(sizeof(**buf) * dt.n_flows); - if (*buf == NULL) { - pthread_rwlock_unlock(&dt.lock); - return -ENOMEM; - } + if (*buf == NULL) + goto fail_entries; for (i = 0; i < PROG_MAX_FLOWS; ++i) { pthread_mutex_lock(&dt.stat[i].lock); if (dt.stat[i].stamp == 0) { pthread_mutex_unlock(&dt.stat[i].lock); - /* Optimization: skip unused res_fds. */ - if (i < PROG_RES_FDS) - i = PROG_RES_FDS; - continue; + break; } + pthread_mutex_unlock(&dt.stat[i].lock); + sprintf(entry, "%zu", i); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - while (idx-- > 0) - free((*buf)[idx]); - free(*buf); - pthread_mutex_unlock(&dt.stat[i].lock); - pthread_rwlock_unlock(&dt.lock); - return -ENOMEM; - } + if ((*buf)[idx] == NULL) + goto fail_entry; strcpy((*buf)[idx++], entry); - pthread_mutex_unlock(&dt.stat[i].lock); } assert((size_t) idx == dt.n_flows); - + no_flows: pthread_rwlock_unlock(&dt.lock); return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); +fail_entries: + pthread_rwlock_unlock(&dt.lock); + return -ENOMEM; + #else (void) buf; return 0; @@ -728,17 +725,17 @@ int dt_start(void) if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - psched_destroy(dt.psched); - return -1; + goto fail_listener; } return 0; + fail_listener: + notifier_unreg(&handle_event); fail_notifier_reg: psched_destroy(dt.psched); fail_psched: return -1; - } void dt_stop(void) @@ -757,7 +754,7 @@ int dt_reg_comp(void * comp, { int eid; - assert(func); + assert(func != NULL); pthread_rwlock_wrlock(&dt.lock); @@ -783,6 +780,23 @@ int dt_reg_comp(void * comp, return eid; } +void dt_unreg_comp(int eid) +{ + assert(eid >= 0 && eid < PROG_RES_FDS); + + pthread_rwlock_wrlock(&dt.lock); + + assert(dt.comps[eid].post_packet != NULL); + + dt.comps[eid].post_packet = NULL; + dt.comps[eid].comp = NULL; + dt.comps[eid].name = NULL; + + pthread_rwlock_unlock(&dt.lock); + + return; +} + int dt_write_packet(uint64_t dst_addr, qoscube_t qc, uint64_t eid, @@ -811,7 +825,8 @@ int dt_write_packet(uint64_t dst_addr, #endif fd = pff_nhop(dt.pff[qc], dst_addr); if (fd < 0) { - log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); + log_dbg("Could not get nhop for " ADDR_FMT32 ".", + ADDR_VAL32(&dst_addr)); #ifdef IPCP_FLOW_STATS if (eid < PROG_RES_FDS) { pthread_mutex_lock(&dt.stat[eid].lock); diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h index 7198a013..2c5b7978 100644 --- a/src/ipcpd/unicast/dt.h +++ b/src/ipcpd/unicast/dt.h @@ -39,9 +39,11 @@ int dt_start(void); void dt_stop(void); -int dt_reg_comp(void * comp, +int dt_reg_comp(void * comp, void (* func)(void * comp, struct shm_du_buff * sdb), - char * name); + char * name); + +void dt_unreg_comp(int eid); int dt_write_packet(uint64_t dst_addr, qoscube_t qc, diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index ecc3894e..61abff52 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -136,7 +136,7 @@ static int fa_rib_read(const char * path, char r_addrstr[21]; char s_eidstr[21]; char r_eidstr[21]; - char tmstr[20]; + char tmstr[RIB_TM_STRLEN]; char castr[1024]; char * entry; struct tm * tm; @@ -167,8 +167,8 @@ static int fa_rib_read(const char * path, sprintf(s_eidstr, "%" PRIu64, flow->s_eid); sprintf(r_eidstr, "%" PRIu64, flow->r_eid); - tm = localtime(&flow->stamp); - strftime(tmstr, sizeof(tmstr), "%F %T", tm); + tm = gmtime(&flow->stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); ca_print_stats(flow->ctx, castr, 1024); @@ -217,16 +217,12 @@ static int fa_rib_readdir(char *** buf) pthread_rwlock_rdlock(&fa.flows_lock); - if (fa.n_flows < 1) { - pthread_rwlock_unlock(&fa.flows_lock); - return 0; - } + if (fa.n_flows < 1) + goto no_flows; *buf = malloc(sizeof(**buf) * fa.n_flows); - if (*buf == NULL) { - pthread_rwlock_unlock(&fa.flows_lock); - return -ENOMEM; - } + if (*buf == NULL) + goto fail_entries; for (i = 0; i < PROG_MAX_FLOWS; ++i) { struct fa_flow * flow; @@ -238,22 +234,25 @@ static int fa_rib_readdir(char *** buf) sprintf(entry, "%zu", i); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - while (idx-- > 0) - free((*buf)[idx]); - free(*buf); - pthread_rwlock_unlock(&fa.flows_lock); - return -ENOMEM; - } + if ((*buf)[idx] == NULL) + goto fail_entry; strcpy((*buf)[idx++], entry); } assert((size_t) idx == fa.n_flows); - + no_flows: pthread_rwlock_unlock(&fa.flows_lock); return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + fail_entries: + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; #else (void) buf; return 0; @@ -648,19 +647,21 @@ int fa_init(void) if (pthread_cond_init(&fa.cond, &cattr)) goto fail_cond; - pthread_condattr_destroy(&cattr); - - list_head_init(&fa.cmds); - if (rib_reg(FA, &r_ops)) goto fail_rib_reg; fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); if ((int) fa.eid < 0) - goto fail_rib_reg; + goto fail_dt_reg; + + list_head_init(&fa.cmds); + + pthread_condattr_destroy(&cattr); return 0; + fail_dt_reg: + rib_unreg(FA); fail_rib_reg: pthread_cond_destroy(&fa.cond); fail_cond: diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c index bd1fee51..c2348242 100644 --- a/src/ipcpd/unicast/main.c +++ b/src/ipcpd/unicast/main.c @@ -55,7 +55,7 @@ #include #include -static int initialize_components(const struct ipcp_config * conf) +static int initialize_components(struct ipcp_config * conf) { assert(ipcp_dir_hash_len() != 0); @@ -77,23 +77,25 @@ static int initialize_components(const struct ipcp_config * conf) goto fail_dt; } - if (fa_init()) { - log_err("Failed to initialize flow allocator component."); - goto fail_fa; - } + ipcp_set_dir_hash_algo((enum hash_algo) conf->layer_info.dir_hash_algo); - if (dir_init()) { + if (dir_init(&conf->unicast.dir)) { log_err("Failed to initialize directory."); goto fail_dir; } + if (fa_init()) { + log_err("Failed to initialize flow allocator component."); + goto fail_fa; + } + ipcp_set_state(IPCP_INIT); return 0; - fail_dir: - fa_fini(); fail_fa: + dir_fini(); + fail_dir: dt_fini(); fail_dt: ca_fini(); @@ -105,10 +107,10 @@ static int initialize_components(const struct ipcp_config * conf) static void finalize_components(void) { - dir_fini(); - fa_fini(); + dir_fini(); + dt_fini(); ca_fini(); @@ -138,8 +140,15 @@ static int start_components(void) goto fail_connmgr_start; } + if (dir_start() < 0) { + log_err("Failed to start directory."); + goto fail_dir_start; + } + return 0; + fail_dir_start: + connmgr_stop(); fail_connmgr_start: enroll_stop(); fail_enroll_start: @@ -153,6 +162,8 @@ static int start_components(void) static void stop_components(void) { + dir_stop(); + connmgr_stop(); enroll_stop(); @@ -164,16 +175,6 @@ static void stop_components(void) ipcp_set_state(IPCP_INIT); } -static int bootstrap_components(void) -{ - if (dir_bootstrap()) { - log_err("Failed to bootstrap directory."); - return -1; - } - - return 0; -} - static int unicast_ipcp_enroll(const char * dst, struct layer_info * info) { @@ -231,32 +232,25 @@ static int unicast_ipcp_enroll(const char * dst, return -1; } -static int unicast_ipcp_bootstrap(const struct ipcp_config * conf) +static int unicast_ipcp_bootstrap(struct ipcp_config * conf) { assert(conf); assert(conf->type == THIS_TYPE); - enroll_bootstrap(conf); - if (initialize_components(conf) < 0) { log_err("Failed to init IPCP components."); goto fail_init; } + enroll_bootstrap(conf); + if (start_components() < 0) { log_err("Failed to init IPCP components."); goto fail_start; } - if (bootstrap_components() < 0) { - log_err("Failed to bootstrap IPCP components."); - goto fail_bootstrap; - } - return 0; - fail_bootstrap: - stop_components(); fail_start: finalize_components(); fail_init: diff --git a/src/ipcpd/unicast/routing/link-state.c b/src/ipcpd/unicast/routing/link-state.c index 0bc6a852..fa6b9f4a 100644 --- a/src/ipcpd/unicast/routing/link-state.c +++ b/src/ipcpd/unicast/routing/link-state.c @@ -141,7 +141,7 @@ static int str_adj(struct adjacency * adj, char * buf, size_t len) { - char tmbuf[64]; + char tmstr[RIB_TM_STRLEN]; char srcbuf[64]; char dstbuf[64]; char seqnobuf[64]; @@ -152,15 +152,16 @@ static int str_adj(struct adjacency * adj, if (len < LS_ENTRY_SIZE) return -1; - tm = localtime(&adj->stamp); - strftime(tmbuf, sizeof(tmbuf), "%F %T", tm); /* 19 chars */ + tm = gmtime(&adj->stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); sprintf(srcbuf, "%" PRIu64, adj->src); sprintf(dstbuf, "%" PRIu64, adj->dst); sprintf(seqnobuf, "%" PRIu64, adj->seqno); - sprintf(buf, "src: %20s\ndst: %20s\nseqno: %18s\nupd: %20s\n", - srcbuf, dstbuf, seqnobuf, tmbuf); + sprintf(buf, "src: %20s\ndst: %20s\nseqno: %18s\n" + "upd: %s\n", + srcbuf, dstbuf, seqnobuf, tmstr); return LS_ENTRY_SIZE; } @@ -257,56 +258,45 @@ static int lsdb_rib_readdir(char *** buf) pthread_rwlock_rdlock(&ls.db_lock); - if (ls.db_len + ls.nbs_len == 0) { - pthread_rwlock_unlock(&ls.db_lock); - return 0; - } + if (ls.db_len + ls.nbs_len == 0) + goto no_entries; *buf = malloc(sizeof(**buf) * (ls.db_len + ls.nbs_len)); - if (*buf == NULL) { - pthread_rwlock_unlock(&ls.db_lock); - return -ENOMEM; - } + if (*buf == NULL) + goto fail_entries; list_for_each(p, &ls.nbs) { struct nb * nb = list_entry(p, struct nb, next); char * str = (nb->type == NB_DT ? "dt." : "mgmt."); sprintf(entry, "%s%" PRIu64, str, nb->addr); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - while (idx-- > 0) - free((*buf)[idx]); - free(*buf); - pthread_rwlock_unlock(&ls.db_lock); - return -ENOMEM; - } + if ((*buf)[idx] == NULL) + goto fail_entry; - strcpy((*buf)[idx], entry); - - idx++; + strcpy((*buf)[idx++], entry); } list_for_each(p, &ls.db) { struct adjacency * a = list_entry(p, struct adjacency, next); sprintf(entry, "%" PRIu64 ".%" PRIu64, a->src, a->dst); (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - ssize_t j; - for (j = 0; j < idx; ++j) - free(*buf[j]); - free(buf); - pthread_rwlock_unlock(&ls.db_lock); - return -ENOMEM; - } - - strcpy((*buf)[idx], entry); + if ((*buf)[idx] == NULL) + goto fail_entry; - idx++; + strcpy((*buf)[idx++], entry); } - + no_entries: pthread_rwlock_unlock(&ls.db_lock); return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + fail_entries: + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; } static struct rib_ops r_ops = { -- cgit v1.2.3