summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dir/dht.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dir/dht.c')
-rw-r--r--src/ipcpd/unicast/dir/dht.c152
1 files changed, 80 insertions, 72 deletions
diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c
index 1742267b..08a5a5a9 100644
--- a/src/ipcpd/unicast/dir/dht.c
+++ b/src/ipcpd/unicast/dir/dht.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2021
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Distributed Hash Table based on Kademlia
*
@@ -31,6 +31,7 @@
#define DHT "dht"
#define OUROBOROS_PREFIX DHT
+#include <ouroboros/endian.h>
#include <ouroboros/hash.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/bitmap.h>
@@ -39,7 +40,7 @@
#include <ouroboros/list.h>
#include <ouroboros/notifier.h>
#include <ouroboros/random.h>
-#include <ouroboros/time_utils.h>
+#include <ouroboros/time.h>
#include <ouroboros/tpm.h>
#include <ouroboros/utils.h>
#include <ouroboros/pthread.h>
@@ -56,9 +57,9 @@
#include <inttypes.h>
#include <limits.h>
-#include "kademlia.pb-c.h"
-typedef KadMsg kad_msg_t;
-typedef KadContactMsg kad_contact_msg_t;
+#include "dht.pb-c.h"
+typedef DhtMsg dht_msg_t;
+typedef DhtContactMsg dht_contact_msg_t;
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
@@ -353,7 +354,7 @@ static uint8_t * create_id(size_t len)
}
static void kad_req_create(struct dht * dht,
- kad_msg_t * msg,
+ dht_msg_t * msg,
uint64_t addr)
{
struct kad_req * req;
@@ -361,14 +362,14 @@ static void kad_req_create(struct dht * dht,
struct timespec t;
size_t b;
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
req = malloc(sizeof(*req));
if (req == NULL)
- return;
+ goto fail_malloc;
list_head_init(&req->next);
- clock_gettime(CLOCK_REALTIME_COARSE, &t);
-
req->t_exp = t.tv_sec + KAD_T_RESP;
req->addr = addr;
req->state = REQ_INIT;
@@ -382,30 +383,22 @@ static void kad_req_create(struct dht * dht,
if (msg->has_key) {
req->key = dht_dup_key(msg->key.data, b);
- if (req->key == NULL) {
- free(req);
- return;
- }
+ if (req->key == NULL)
+ goto fail_dup_key;
}
- if (pthread_mutex_init(&req->lock, NULL)) {
- free(req->key);
- free(req);
- return;
- }
+ if (pthread_mutex_init(&req->lock, NULL))
+ goto fail_mutex;
- pthread_condattr_init(&cattr);
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_condattr;
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- if (pthread_cond_init(&req->cond, &cattr)) {
- pthread_condattr_destroy(&cattr);
- pthread_mutex_destroy(&req->lock);
- free(req->key);
- free(req);
- return;
- }
+ if (pthread_cond_init(&req->cond, &cattr))
+ goto fail_cond_init;
pthread_condattr_destroy(&cattr);
@@ -414,6 +407,19 @@ static void kad_req_create(struct dht * dht,
list_add(&req->next, &dht->requests);
pthread_rwlock_unlock(&dht->lock);
+
+ return;
+
+ fail_cond_init:
+ pthread_condattr_destroy(&cattr);
+ fail_condattr:
+ pthread_mutex_destroy(&req->lock);
+ fail_mutex:
+ free(req->key);
+ fail_dup_key:
+ free(req);
+ fail_malloc:
+ return;
}
static void cancel_req_destroy(void * o)
@@ -443,7 +449,7 @@ static void kad_req_destroy(struct kad_req * req)
return;
case REQ_PENDING:
req->state = REQ_DESTROY;
- pthread_cond_signal(&req->cond);
+ pthread_cond_broadcast(&req->cond);
break;
case REQ_INIT:
case REQ_DONE:
@@ -466,12 +472,14 @@ static void kad_req_destroy(struct kad_req * req)
static int kad_req_wait(struct kad_req * req,
time_t t)
{
- struct timespec timeo = {t, 0};
+ struct timespec timeo = TIMESPEC_INIT_S(0);
struct timespec abs;
int ret = 0;
assert(req);
+ timeo.tv_sec = t;
+
clock_gettime(PTHREAD_COND_CLOCK, &abs);
ts_add(&abs, &timeo, &abs);
@@ -787,7 +795,7 @@ static void lookup_destroy(struct lookup * lu)
static void lookup_update(struct dht * dht,
struct lookup * lu,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
struct list_head * p = NULL;
struct list_head * h;
@@ -989,7 +997,7 @@ static void cancel_lookup_wait(void * o)
static enum lookup_state lookup_wait(struct lookup * lu)
{
- struct timespec timeo = {KAD_T_RESP, 0};
+ struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP);
struct timespec abs;
enum lookup_state state;
int ret = 0;
@@ -1021,7 +1029,7 @@ static enum lookup_state lookup_wait(struct lookup * lu)
}
static struct kad_req * dht_find_request(struct dht * dht,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
struct list_head * p;
@@ -1269,7 +1277,7 @@ static void bucket_refresh(struct dht * dht,
struct contact * d;
c = list_first_entry(&b->contacts, struct contact, next);
d = contact_create(c->id, dht->b, c->addr);
- if (c != NULL)
+ if (d != NULL)
list_add(&d->next, r);
return;
}
@@ -1458,7 +1466,7 @@ static int dht_update_bucket(struct dht * dht,
}
static int send_msg(struct dht * dht,
- kad_msg_t * msg,
+ dht_msg_t * msg,
uint64_t addr)
{
#ifndef __DHT_TEST__
@@ -1491,7 +1499,7 @@ static int send_msg(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
#ifndef __DHT_TEST__
- len = kad_msg__get_packed_size(msg);
+ len = dht_msg__get_packed_size(msg);
if (len == 0)
goto fail_msg;
@@ -1499,7 +1507,7 @@ static int send_msg(struct dht * dht,
if (ipcp_sdb_reserve(&sdb, len))
goto fail_msg;
- kad_msg__pack(msg, shm_du_buff_head(sdb));
+ dht_msg__pack(msg, shm_du_buff_head(sdb));
if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)
break;
@@ -1546,7 +1554,7 @@ static struct dht_entry * dht_find_entry(struct dht * dht,
}
static int kad_add(struct dht * dht,
- const kad_contact_msg_t * contacts,
+ const dht_contact_msg_t * contacts,
ssize_t n,
time_t exp)
{
@@ -1585,7 +1593,7 @@ static int kad_add(struct dht * dht,
}
static int wait_resp(struct dht * dht,
- kad_msg_t * msg,
+ dht_msg_t * msg,
time_t timeo)
{
struct kad_req * req;
@@ -1612,9 +1620,9 @@ static int kad_store(struct dht * dht,
uint64_t r_addr,
time_t ttl)
{
- kad_msg_t msg = KAD_MSG__INIT;
- kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT;
- kad_contact_msg_t * cmsgp[1];
+ dht_msg_t msg = DHT_MSG__INIT;
+ dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT;
+ dht_contact_msg_t * cmsgp[1];
cmsg.id.data = (uint8_t *) key;
cmsg.addr = addr;
@@ -1644,7 +1652,7 @@ static ssize_t kad_find(struct dht * dht,
const uint64_t * addrs,
enum kad_code code)
{
- kad_msg_t msg = KAD_MSG__INIT;
+ dht_msg_t msg = DHT_MSG__INIT;
ssize_t sent = 0;
assert(dht);
@@ -1784,7 +1792,7 @@ static void kad_publish(struct dht * dht,
while (n-- > 0) {
if (addrs[n] == dht->addr) {
- kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;
+ dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT;
msg.id.data = (uint8_t *) key;
msg.id.len = dht->b;
msg.addr = addr;
@@ -1803,7 +1811,7 @@ static void kad_publish(struct dht * dht,
static int kad_join(struct dht * dht,
uint64_t addr)
{
- kad_msg_t msg = KAD_MSG__INIT;
+ dht_msg_t msg = DHT_MSG__INIT;
msg.code = KAD_JOIN;
@@ -1883,18 +1891,13 @@ static int dht_del(struct dht * dht,
{
struct dht_entry * e;
- pthread_rwlock_wrlock(&dht->lock);
-
e = dht_find_entry(dht, key);
if (e == NULL) {
- pthread_rwlock_unlock(&dht->lock);
return -EPERM;
}
dht_entry_del_addr(e, addr);
- pthread_rwlock_unlock(&dht->lock);
-
return 0;
}
@@ -1936,14 +1939,14 @@ static buffer_t dht_retrieve(struct dht * dht,
fail:
pthread_rwlock_unlock(&dht->lock);
- buf.len = 0;
-
+ buf.len = 0;
+ buf.data = NULL;
return buf;
}
static ssize_t dht_get_contacts(struct dht * dht,
const uint8_t * key,
- kad_contact_msg_t *** msgs)
+ dht_contact_msg_t *** msgs)
{
struct list_head l;
struct list_head * p;
@@ -1980,7 +1983,7 @@ static ssize_t dht_get_contacts(struct dht * dht,
return 0;
}
- kad_contact_msg__init((*msgs)[i]);
+ dht_contact_msg__init((*msgs)[i]);
(*msgs)[i]->id.data = c->id;
(*msgs)[i]->id.len = dht->b;
@@ -2117,7 +2120,7 @@ static void * work(void * o)
static int kad_handle_join_resp(struct dht * dht,
struct kad_req * req,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
assert(dht);
assert(req);
@@ -2177,7 +2180,7 @@ static int kad_handle_join_resp(struct dht * dht,
static int kad_handle_find_resp(struct dht * dht,
struct kad_req * req,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
struct lookup * lu;
@@ -2201,7 +2204,7 @@ static int kad_handle_find_resp(struct dht * dht,
}
static void kad_handle_response(struct dht * dht,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
struct kad_req * req;
@@ -2249,6 +2252,12 @@ int dht_bootstrap(void * dir)
pthread_rwlock_wrlock(&dht->lock);
+#ifndef __DHT_TEST__
+ dht->b = hash_len(ipcpi.dir_hash_algo);
+#else
+ dht->b = DHT_TEST_KEY_LEN;
+#endif
+
dht->id = create_id(dht->b);
if (dht->id == NULL)
goto fail_id;
@@ -2259,11 +2268,7 @@ int dht_bootstrap(void * dir)
dht->buckets->depth = 0;
dht->buckets->mask = 0;
-#ifndef __DHT_TEST__
- dht->b = hash_len(ipcpi.dir_hash_algo);
-#else
- dht->b = DHT_TEST_KEY_LEN;
-#endif
+
dht->t_expire = 86400; /* 1 day */
dht->t_repub = dht->t_expire - 10;
dht->k = KAD_K;
@@ -2437,9 +2442,9 @@ static void * dht_handle_packet(void * o)
assert(dht);
while (true) {
- kad_msg_t * msg;
- kad_contact_msg_t ** cmsgs;
- kad_msg_t resp_msg = KAD_MSG__INIT;
+ dht_msg_t * msg;
+ dht_contact_msg_t ** cmsgs;
+ dht_msg_t resp_msg = DHT_MSG__INIT;
uint64_t addr;
buffer_t buf;
size_t i;
@@ -2459,9 +2464,9 @@ static void * dht_handle_packet(void * o)
pthread_cleanup_pop(true);
- i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
+ i = shm_du_buff_len(cmd->sdb);
- msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
+ msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
#ifndef __DHT_TEST__
ipcp_sdb_release(cmd->sdb);
#endif
@@ -2473,7 +2478,7 @@ static void * dht_handle_packet(void * o)
}
if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
- kad_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(msg, NULL);
log_dbg("Got a request message when not running.");
continue;
}
@@ -2486,13 +2491,13 @@ static void * dht_handle_packet(void * o)
pthread_rwlock_unlock(&dht->lock);
if (msg->has_key && msg->key.len != b) {
- kad_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(msg, NULL);
log_warn("Bad key in message.");
continue;
}
if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
- kad_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(msg, NULL);
log_warn("Bad source ID in message of type %d.",
msg->code);
continue;
@@ -2593,7 +2598,7 @@ static void * dht_handle_packet(void * o)
log_warn("Failed to send response.");
finish:
- kad_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(msg, NULL);
if (resp_msg.n_addrs > 0)
free(resp_msg.addrs);
@@ -2604,7 +2609,7 @@ static void * dht_handle_packet(void * o)
}
for (i = 0; i < resp_msg.n_contacts; ++i)
- kad_contact_msg__free_unpacked(resp_msg.contacts[i],
+ dht_contact_msg__free_unpacked(resp_msg.contacts[i],
NULL);
free(resp_msg.contacts);
@@ -2761,7 +2766,7 @@ static void handle_event(void * self,
pthread_t thr;
struct join_info * inf;
struct conn * c = (struct conn *) o;
- struct timespec slack = {0, DHT_ENROLL_SLACK * MILLION};
+ struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK);
/* Give the pff some time to update for the new link. */
nanosleep(&slack, NULL);
@@ -2847,7 +2852,8 @@ void * dht_create(void)
if ((int) dht->eid < 0)
goto fail_tpm_start;
- notifier_reg(handle_event, dht);
+ if (notifier_reg(handle_event, dht))
+ goto fail_notifier_reg;
#else
(void) handle_event;
(void) dht_handle_packet;
@@ -2857,6 +2863,8 @@ void * dht_create(void)
return (void *) dht;
#ifndef __DHT_TEST__
+ fail_notifier_reg:
+ tpm_stop(dht->tpm);
fail_tpm_start:
tpm_destroy(dht->tpm);
fail_tpm_create: