diff options
Diffstat (limited to 'src/ipcpd/unicast/dir/dht.c')
-rw-r--r-- | src/ipcpd/unicast/dir/dht.c | 152 |
1 files changed, 80 insertions, 72 deletions
diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index 1742267b..08a5a5a9 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024 * * Distributed Hash Table based on Kademlia * @@ -31,6 +31,7 @@ #define DHT "dht" #define OUROBOROS_PREFIX DHT +#include <ouroboros/endian.h> #include <ouroboros/hash.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/bitmap.h> @@ -39,7 +40,7 @@ #include <ouroboros/list.h> #include <ouroboros/notifier.h> #include <ouroboros/random.h> -#include <ouroboros/time_utils.h> +#include <ouroboros/time.h> #include <ouroboros/tpm.h> #include <ouroboros/utils.h> #include <ouroboros/pthread.h> @@ -56,9 +57,9 @@ #include <inttypes.h> #include <limits.h> -#include "kademlia.pb-c.h" -typedef KadMsg kad_msg_t; -typedef KadContactMsg kad_contact_msg_t; +#include "dht.pb-c.h" +typedef DhtMsg dht_msg_t; +typedef DhtContactMsg dht_contact_msg_t; #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME @@ -353,7 +354,7 @@ static uint8_t * create_id(size_t len) } static void kad_req_create(struct dht * dht, - kad_msg_t * msg, + dht_msg_t * msg, uint64_t addr) { struct kad_req * req; @@ -361,14 +362,14 @@ static void kad_req_create(struct dht * dht, struct timespec t; size_t b; + clock_gettime(CLOCK_REALTIME_COARSE, &t); + req = malloc(sizeof(*req)); if (req == NULL) - return; + goto fail_malloc; list_head_init(&req->next); - clock_gettime(CLOCK_REALTIME_COARSE, &t); - req->t_exp = t.tv_sec + KAD_T_RESP; req->addr = addr; req->state = REQ_INIT; @@ -382,30 +383,22 @@ static void kad_req_create(struct dht * dht, if (msg->has_key) { req->key = dht_dup_key(msg->key.data, b); - if (req->key == NULL) { - free(req); - return; - } + if (req->key == NULL) + goto fail_dup_key; } - if (pthread_mutex_init(&req->lock, NULL)) { - free(req->key); - free(req); - return; - } + if (pthread_mutex_init(&req->lock, NULL)) + goto fail_mutex; - pthread_condattr_init(&cattr); + + if (pthread_condattr_init(&cattr)) + goto fail_condattr; #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif - if (pthread_cond_init(&req->cond, &cattr)) { - pthread_condattr_destroy(&cattr); - pthread_mutex_destroy(&req->lock); - free(req->key); - free(req); - return; - } + if (pthread_cond_init(&req->cond, &cattr)) + goto fail_cond_init; pthread_condattr_destroy(&cattr); @@ -414,6 +407,19 @@ static void kad_req_create(struct dht * dht, list_add(&req->next, &dht->requests); pthread_rwlock_unlock(&dht->lock); + + return; + + fail_cond_init: + pthread_condattr_destroy(&cattr); + fail_condattr: + pthread_mutex_destroy(&req->lock); + fail_mutex: + free(req->key); + fail_dup_key: + free(req); + fail_malloc: + return; } static void cancel_req_destroy(void * o) @@ -443,7 +449,7 @@ static void kad_req_destroy(struct kad_req * req) return; case REQ_PENDING: req->state = REQ_DESTROY; - pthread_cond_signal(&req->cond); + pthread_cond_broadcast(&req->cond); break; case REQ_INIT: case REQ_DONE: @@ -466,12 +472,14 @@ static void kad_req_destroy(struct kad_req * req) static int kad_req_wait(struct kad_req * req, time_t t) { - struct timespec timeo = {t, 0}; + struct timespec timeo = TIMESPEC_INIT_S(0); struct timespec abs; int ret = 0; assert(req); + timeo.tv_sec = t; + clock_gettime(PTHREAD_COND_CLOCK, &abs); ts_add(&abs, &timeo, &abs); @@ -787,7 +795,7 @@ static void lookup_destroy(struct lookup * lu) static void lookup_update(struct dht * dht, struct lookup * lu, - kad_msg_t * msg) + dht_msg_t * msg) { struct list_head * p = NULL; struct list_head * h; @@ -989,7 +997,7 @@ static void cancel_lookup_wait(void * o) static enum lookup_state lookup_wait(struct lookup * lu) { - struct timespec timeo = {KAD_T_RESP, 0}; + struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP); struct timespec abs; enum lookup_state state; int ret = 0; @@ -1021,7 +1029,7 @@ static enum lookup_state lookup_wait(struct lookup * lu) } static struct kad_req * dht_find_request(struct dht * dht, - kad_msg_t * msg) + dht_msg_t * msg) { struct list_head * p; @@ -1269,7 +1277,7 @@ static void bucket_refresh(struct dht * dht, struct contact * d; c = list_first_entry(&b->contacts, struct contact, next); d = contact_create(c->id, dht->b, c->addr); - if (c != NULL) + if (d != NULL) list_add(&d->next, r); return; } @@ -1458,7 +1466,7 @@ static int dht_update_bucket(struct dht * dht, } static int send_msg(struct dht * dht, - kad_msg_t * msg, + dht_msg_t * msg, uint64_t addr) { #ifndef __DHT_TEST__ @@ -1491,7 +1499,7 @@ static int send_msg(struct dht * dht, pthread_rwlock_unlock(&dht->lock); #ifndef __DHT_TEST__ - len = kad_msg__get_packed_size(msg); + len = dht_msg__get_packed_size(msg); if (len == 0) goto fail_msg; @@ -1499,7 +1507,7 @@ static int send_msg(struct dht * dht, if (ipcp_sdb_reserve(&sdb, len)) goto fail_msg; - kad_msg__pack(msg, shm_du_buff_head(sdb)); + dht_msg__pack(msg, shm_du_buff_head(sdb)); if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) break; @@ -1546,7 +1554,7 @@ static struct dht_entry * dht_find_entry(struct dht * dht, } static int kad_add(struct dht * dht, - const kad_contact_msg_t * contacts, + const dht_contact_msg_t * contacts, ssize_t n, time_t exp) { @@ -1585,7 +1593,7 @@ static int kad_add(struct dht * dht, } static int wait_resp(struct dht * dht, - kad_msg_t * msg, + dht_msg_t * msg, time_t timeo) { struct kad_req * req; @@ -1612,9 +1620,9 @@ static int kad_store(struct dht * dht, uint64_t r_addr, time_t ttl) { - kad_msg_t msg = KAD_MSG__INIT; - kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; - kad_contact_msg_t * cmsgp[1]; + dht_msg_t msg = DHT_MSG__INIT; + dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT; + dht_contact_msg_t * cmsgp[1]; cmsg.id.data = (uint8_t *) key; cmsg.addr = addr; @@ -1644,7 +1652,7 @@ static ssize_t kad_find(struct dht * dht, const uint64_t * addrs, enum kad_code code) { - kad_msg_t msg = KAD_MSG__INIT; + dht_msg_t msg = DHT_MSG__INIT; ssize_t sent = 0; assert(dht); @@ -1784,7 +1792,7 @@ static void kad_publish(struct dht * dht, while (n-- > 0) { if (addrs[n] == dht->addr) { - kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; + dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT; msg.id.data = (uint8_t *) key; msg.id.len = dht->b; msg.addr = addr; @@ -1803,7 +1811,7 @@ static void kad_publish(struct dht * dht, static int kad_join(struct dht * dht, uint64_t addr) { - kad_msg_t msg = KAD_MSG__INIT; + dht_msg_t msg = DHT_MSG__INIT; msg.code = KAD_JOIN; @@ -1883,18 +1891,13 @@ static int dht_del(struct dht * dht, { struct dht_entry * e; - pthread_rwlock_wrlock(&dht->lock); - e = dht_find_entry(dht, key); if (e == NULL) { - pthread_rwlock_unlock(&dht->lock); return -EPERM; } dht_entry_del_addr(e, addr); - pthread_rwlock_unlock(&dht->lock); - return 0; } @@ -1936,14 +1939,14 @@ static buffer_t dht_retrieve(struct dht * dht, fail: pthread_rwlock_unlock(&dht->lock); - buf.len = 0; - + buf.len = 0; + buf.data = NULL; return buf; } static ssize_t dht_get_contacts(struct dht * dht, const uint8_t * key, - kad_contact_msg_t *** msgs) + dht_contact_msg_t *** msgs) { struct list_head l; struct list_head * p; @@ -1980,7 +1983,7 @@ static ssize_t dht_get_contacts(struct dht * dht, return 0; } - kad_contact_msg__init((*msgs)[i]); + dht_contact_msg__init((*msgs)[i]); (*msgs)[i]->id.data = c->id; (*msgs)[i]->id.len = dht->b; @@ -2117,7 +2120,7 @@ static void * work(void * o) static int kad_handle_join_resp(struct dht * dht, struct kad_req * req, - kad_msg_t * msg) + dht_msg_t * msg) { assert(dht); assert(req); @@ -2177,7 +2180,7 @@ static int kad_handle_join_resp(struct dht * dht, static int kad_handle_find_resp(struct dht * dht, struct kad_req * req, - kad_msg_t * msg) + dht_msg_t * msg) { struct lookup * lu; @@ -2201,7 +2204,7 @@ static int kad_handle_find_resp(struct dht * dht, } static void kad_handle_response(struct dht * dht, - kad_msg_t * msg) + dht_msg_t * msg) { struct kad_req * req; @@ -2249,6 +2252,12 @@ int dht_bootstrap(void * dir) pthread_rwlock_wrlock(&dht->lock); +#ifndef __DHT_TEST__ + dht->b = hash_len(ipcpi.dir_hash_algo); +#else + dht->b = DHT_TEST_KEY_LEN; +#endif + dht->id = create_id(dht->b); if (dht->id == NULL) goto fail_id; @@ -2259,11 +2268,7 @@ int dht_bootstrap(void * dir) dht->buckets->depth = 0; dht->buckets->mask = 0; -#ifndef __DHT_TEST__ - dht->b = hash_len(ipcpi.dir_hash_algo); -#else - dht->b = DHT_TEST_KEY_LEN; -#endif + dht->t_expire = 86400; /* 1 day */ dht->t_repub = dht->t_expire - 10; dht->k = KAD_K; @@ -2437,9 +2442,9 @@ static void * dht_handle_packet(void * o) assert(dht); while (true) { - kad_msg_t * msg; - kad_contact_msg_t ** cmsgs; - kad_msg_t resp_msg = KAD_MSG__INIT; + dht_msg_t * msg; + dht_contact_msg_t ** cmsgs; + dht_msg_t resp_msg = DHT_MSG__INIT; uint64_t addr; buffer_t buf; size_t i; @@ -2459,9 +2464,9 @@ static void * dht_handle_packet(void * o) pthread_cleanup_pop(true); - i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + i = shm_du_buff_len(cmd->sdb); - msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); + msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); #ifndef __DHT_TEST__ ipcp_sdb_release(cmd->sdb); #endif @@ -2473,7 +2478,7 @@ static void * dht_handle_packet(void * o) } if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { - kad_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(msg, NULL); log_dbg("Got a request message when not running."); continue; } @@ -2486,13 +2491,13 @@ static void * dht_handle_packet(void * o) pthread_rwlock_unlock(&dht->lock); if (msg->has_key && msg->key.len != b) { - kad_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(msg, NULL); log_warn("Bad key in message."); continue; } if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { - kad_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(msg, NULL); log_warn("Bad source ID in message of type %d.", msg->code); continue; @@ -2593,7 +2598,7 @@ static void * dht_handle_packet(void * o) log_warn("Failed to send response."); finish: - kad_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(msg, NULL); if (resp_msg.n_addrs > 0) free(resp_msg.addrs); @@ -2604,7 +2609,7 @@ static void * dht_handle_packet(void * o) } for (i = 0; i < resp_msg.n_contacts; ++i) - kad_contact_msg__free_unpacked(resp_msg.contacts[i], + dht_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); free(resp_msg.contacts); @@ -2761,7 +2766,7 @@ static void handle_event(void * self, pthread_t thr; struct join_info * inf; struct conn * c = (struct conn *) o; - struct timespec slack = {0, DHT_ENROLL_SLACK * MILLION}; + struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK); /* Give the pff some time to update for the new link. */ nanosleep(&slack, NULL); @@ -2847,7 +2852,8 @@ void * dht_create(void) if ((int) dht->eid < 0) goto fail_tpm_start; - notifier_reg(handle_event, dht); + if (notifier_reg(handle_event, dht)) + goto fail_notifier_reg; #else (void) handle_event; (void) dht_handle_packet; @@ -2857,6 +2863,8 @@ void * dht_create(void) return (void *) dht; #ifndef __DHT_TEST__ + fail_notifier_reg: + tpm_stop(dht->tpm); fail_tpm_start: tpm_destroy(dht->tpm); fail_tpm_create: |