summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dir/dht.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dir/dht.c')
-rw-r--r--src/ipcpd/unicast/dir/dht.c77
1 files changed, 40 insertions, 37 deletions
diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c
index a8c9ff94..08a5a5a9 100644
--- a/src/ipcpd/unicast/dir/dht.c
+++ b/src/ipcpd/unicast/dir/dht.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2022
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Distributed Hash Table based on Kademlia
*
@@ -31,6 +31,7 @@
#define DHT "dht"
#define OUROBOROS_PREFIX DHT
+#include <ouroboros/endian.h>
#include <ouroboros/hash.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/bitmap.h>
@@ -39,7 +40,7 @@
#include <ouroboros/list.h>
#include <ouroboros/notifier.h>
#include <ouroboros/random.h>
-#include <ouroboros/time_utils.h>
+#include <ouroboros/time.h>
#include <ouroboros/tpm.h>
#include <ouroboros/utils.h>
#include <ouroboros/pthread.h>
@@ -56,9 +57,9 @@
#include <inttypes.h>
#include <limits.h>
-#include "kademlia.pb-c.h"
-typedef KadMsg kad_msg_t;
-typedef KadContactMsg kad_contact_msg_t;
+#include "dht.pb-c.h"
+typedef DhtMsg dht_msg_t;
+typedef DhtContactMsg dht_contact_msg_t;
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
@@ -353,7 +354,7 @@ static uint8_t * create_id(size_t len)
}
static void kad_req_create(struct dht * dht,
- kad_msg_t * msg,
+ dht_msg_t * msg,
uint64_t addr)
{
struct kad_req * req;
@@ -448,7 +449,7 @@ static void kad_req_destroy(struct kad_req * req)
return;
case REQ_PENDING:
req->state = REQ_DESTROY;
- pthread_cond_signal(&req->cond);
+ pthread_cond_broadcast(&req->cond);
break;
case REQ_INIT:
case REQ_DONE:
@@ -471,12 +472,14 @@ static void kad_req_destroy(struct kad_req * req)
static int kad_req_wait(struct kad_req * req,
time_t t)
{
- struct timespec timeo = {t, 0};
+ struct timespec timeo = TIMESPEC_INIT_S(0);
struct timespec abs;
int ret = 0;
assert(req);
+ timeo.tv_sec = t;
+
clock_gettime(PTHREAD_COND_CLOCK, &abs);
ts_add(&abs, &timeo, &abs);
@@ -792,7 +795,7 @@ static void lookup_destroy(struct lookup * lu)
static void lookup_update(struct dht * dht,
struct lookup * lu,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
struct list_head * p = NULL;
struct list_head * h;
@@ -994,7 +997,7 @@ static void cancel_lookup_wait(void * o)
static enum lookup_state lookup_wait(struct lookup * lu)
{
- struct timespec timeo = {KAD_T_RESP, 0};
+ struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP);
struct timespec abs;
enum lookup_state state;
int ret = 0;
@@ -1026,7 +1029,7 @@ static enum lookup_state lookup_wait(struct lookup * lu)
}
static struct kad_req * dht_find_request(struct dht * dht,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
struct list_head * p;
@@ -1463,7 +1466,7 @@ static int dht_update_bucket(struct dht * dht,
}
static int send_msg(struct dht * dht,
- kad_msg_t * msg,
+ dht_msg_t * msg,
uint64_t addr)
{
#ifndef __DHT_TEST__
@@ -1496,7 +1499,7 @@ static int send_msg(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
#ifndef __DHT_TEST__
- len = kad_msg__get_packed_size(msg);
+ len = dht_msg__get_packed_size(msg);
if (len == 0)
goto fail_msg;
@@ -1504,7 +1507,7 @@ static int send_msg(struct dht * dht,
if (ipcp_sdb_reserve(&sdb, len))
goto fail_msg;
- kad_msg__pack(msg, shm_du_buff_head(sdb));
+ dht_msg__pack(msg, shm_du_buff_head(sdb));
if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)
break;
@@ -1551,7 +1554,7 @@ static struct dht_entry * dht_find_entry(struct dht * dht,
}
static int kad_add(struct dht * dht,
- const kad_contact_msg_t * contacts,
+ const dht_contact_msg_t * contacts,
ssize_t n,
time_t exp)
{
@@ -1590,7 +1593,7 @@ static int kad_add(struct dht * dht,
}
static int wait_resp(struct dht * dht,
- kad_msg_t * msg,
+ dht_msg_t * msg,
time_t timeo)
{
struct kad_req * req;
@@ -1617,9 +1620,9 @@ static int kad_store(struct dht * dht,
uint64_t r_addr,
time_t ttl)
{
- kad_msg_t msg = KAD_MSG__INIT;
- kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT;
- kad_contact_msg_t * cmsgp[1];
+ dht_msg_t msg = DHT_MSG__INIT;
+ dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT;
+ dht_contact_msg_t * cmsgp[1];
cmsg.id.data = (uint8_t *) key;
cmsg.addr = addr;
@@ -1649,7 +1652,7 @@ static ssize_t kad_find(struct dht * dht,
const uint64_t * addrs,
enum kad_code code)
{
- kad_msg_t msg = KAD_MSG__INIT;
+ dht_msg_t msg = DHT_MSG__INIT;
ssize_t sent = 0;
assert(dht);
@@ -1789,7 +1792,7 @@ static void kad_publish(struct dht * dht,
while (n-- > 0) {
if (addrs[n] == dht->addr) {
- kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;
+ dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT;
msg.id.data = (uint8_t *) key;
msg.id.len = dht->b;
msg.addr = addr;
@@ -1808,7 +1811,7 @@ static void kad_publish(struct dht * dht,
static int kad_join(struct dht * dht,
uint64_t addr)
{
- kad_msg_t msg = KAD_MSG__INIT;
+ dht_msg_t msg = DHT_MSG__INIT;
msg.code = KAD_JOIN;
@@ -1943,7 +1946,7 @@ static buffer_t dht_retrieve(struct dht * dht,
static ssize_t dht_get_contacts(struct dht * dht,
const uint8_t * key,
- kad_contact_msg_t *** msgs)
+ dht_contact_msg_t *** msgs)
{
struct list_head l;
struct list_head * p;
@@ -1980,7 +1983,7 @@ static ssize_t dht_get_contacts(struct dht * dht,
return 0;
}
- kad_contact_msg__init((*msgs)[i]);
+ dht_contact_msg__init((*msgs)[i]);
(*msgs)[i]->id.data = c->id;
(*msgs)[i]->id.len = dht->b;
@@ -2117,7 +2120,7 @@ static void * work(void * o)
static int kad_handle_join_resp(struct dht * dht,
struct kad_req * req,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
assert(dht);
assert(req);
@@ -2177,7 +2180,7 @@ static int kad_handle_join_resp(struct dht * dht,
static int kad_handle_find_resp(struct dht * dht,
struct kad_req * req,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
struct lookup * lu;
@@ -2201,7 +2204,7 @@ static int kad_handle_find_resp(struct dht * dht,
}
static void kad_handle_response(struct dht * dht,
- kad_msg_t * msg)
+ dht_msg_t * msg)
{
struct kad_req * req;
@@ -2439,9 +2442,9 @@ static void * dht_handle_packet(void * o)
assert(dht);
while (true) {
- kad_msg_t * msg;
- kad_contact_msg_t ** cmsgs;
- kad_msg_t resp_msg = KAD_MSG__INIT;
+ dht_msg_t * msg;
+ dht_contact_msg_t ** cmsgs;
+ dht_msg_t resp_msg = DHT_MSG__INIT;
uint64_t addr;
buffer_t buf;
size_t i;
@@ -2463,7 +2466,7 @@ static void * dht_handle_packet(void * o)
i = shm_du_buff_len(cmd->sdb);
- msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
+ msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
#ifndef __DHT_TEST__
ipcp_sdb_release(cmd->sdb);
#endif
@@ -2475,7 +2478,7 @@ static void * dht_handle_packet(void * o)
}
if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
- kad_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(msg, NULL);
log_dbg("Got a request message when not running.");
continue;
}
@@ -2488,13 +2491,13 @@ static void * dht_handle_packet(void * o)
pthread_rwlock_unlock(&dht->lock);
if (msg->has_key && msg->key.len != b) {
- kad_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(msg, NULL);
log_warn("Bad key in message.");
continue;
}
if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
- kad_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(msg, NULL);
log_warn("Bad source ID in message of type %d.",
msg->code);
continue;
@@ -2595,7 +2598,7 @@ static void * dht_handle_packet(void * o)
log_warn("Failed to send response.");
finish:
- kad_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(msg, NULL);
if (resp_msg.n_addrs > 0)
free(resp_msg.addrs);
@@ -2606,7 +2609,7 @@ static void * dht_handle_packet(void * o)
}
for (i = 0; i < resp_msg.n_contacts; ++i)
- kad_contact_msg__free_unpacked(resp_msg.contacts[i],
+ dht_contact_msg__free_unpacked(resp_msg.contacts[i],
NULL);
free(resp_msg.contacts);
@@ -2763,7 +2766,7 @@ static void handle_event(void * self,
pthread_t thr;
struct join_info * inf;
struct conn * c = (struct conn *) o;
- struct timespec slack = {0, DHT_ENROLL_SLACK * MILLION};
+ struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK);
/* Give the pff some time to update for the new link. */
nanosleep(&slack, NULL);