summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/dht.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal/dht.c')
-rw-r--r--src/ipcpd/normal/dht.c126
1 files changed, 110 insertions, 16 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index b1ba44a8..e4c37884 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -31,10 +31,12 @@
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
#include <ouroboros/list.h>
+#include <ouroboros/notifier.h>
#include <ouroboros/random.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/utils.h>
+#include "connmgr.h"
#include "dht.h"
#include "dt.h"
@@ -63,9 +65,12 @@ typedef KadContactMsg kad_contact_msg_t;
#define KAD_QUEER 15 /* Time to declare peer questionable. */
#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */
#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */
+#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */
+#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */
enum dht_state {
DHT_INIT = 0,
+ DHT_JOINING,
DHT_RUNNING,
DHT_SHUTDOWN,
};
@@ -207,6 +212,7 @@ struct dht {
struct bmp * cookies;
enum dht_state state;
+ pthread_cond_t cond;
pthread_mutex_t mtx;
pthread_rwlock_t lock;
@@ -216,6 +222,11 @@ struct dht {
pthread_t worker;
};
+struct join_info {
+ struct dht * dht;
+ uint64_t addr;
+};
+
static uint8_t * dht_dup_key(const uint8_t * key,
size_t len)
{
@@ -250,9 +261,28 @@ static void dht_set_state(struct dht * dht,
dht->state = state;
+ pthread_cond_signal(&dht->cond);
+
pthread_mutex_unlock(&dht->mtx);
}
+static int dht_wait_running(struct dht * dht)
+{
+ int ret = 0;
+
+ pthread_mutex_lock(&dht->mtx);
+
+ while (dht->state == DHT_JOINING)
+ pthread_cond_wait(&dht->cond, &dht->mtx);
+
+ if (dht->state != DHT_RUNNING)
+ ret = -1;
+
+ pthread_mutex_unlock(&dht->mtx);
+
+ return ret;
+}
+
static uint8_t * create_id(size_t len)
{
uint8_t * id;
@@ -1626,7 +1656,6 @@ static int kad_join(struct dht * dht,
uint64_t addr)
{
kad_msg_t msg = KAD_MSG__INIT;
- struct lookup * lu;
msg.code = KAD_JOIN;
@@ -1657,10 +1686,6 @@ static int kad_join(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
- lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
- if (lu != NULL)
- lookup_destroy(lu);
-
return 0;
}
@@ -2074,14 +2099,6 @@ int dht_bootstrap(struct dht * dht,
return -1;
}
-int dht_enroll(struct dht * dht,
- uint64_t addr)
-{
- assert(dht);
-
- return kad_join(dht, addr);
-}
-
int dht_reg(struct dht * dht,
const uint8_t * key)
{
@@ -2222,8 +2239,9 @@ void dht_post_sdu(void * ae,
return;
}
- if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) {
+ if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
kad_msg__free_unpacked(msg, NULL);
+ log_dbg("Got a request message when not running.");
return;
}
@@ -2383,6 +2401,75 @@ void dht_destroy(struct dht * dht)
free(dht);
}
+static void * join_thr(void * o)
+{
+ struct join_info * info = (struct join_info *) o;
+ struct lookup * lu;
+ size_t retr = 0;
+
+ assert(info);
+
+ while (kad_join(info->dht, info->addr)) {
+ if (retr++ == KAD_JOIN_RETR) {
+ dht_set_state(info->dht, DHT_INIT);
+ goto finish;
+ }
+
+ sleep(KAD_JOIN_INTV);
+ }
+
+ lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE);
+ if (lu != NULL)
+ lookup_destroy(lu);
+
+ finish:
+ free(info);
+
+ return (void *) 0;
+}
+
+static void handle_event(void * self,
+ int event,
+ const void * o)
+{
+ struct dht * dht = (struct dht *) self;
+
+ if (event == NOTIFY_DT_CONN_ADD) {
+ struct lookup * lu;
+ pthread_t thr;
+ struct join_info * info;
+ struct conn * c = (struct conn *) o;
+ enum dht_state state = dht_get_state(dht);
+
+ switch(state) {
+ case DHT_INIT:
+ info = malloc(sizeof(*info));
+ if (info == NULL)
+ break;
+
+ info->dht = dht;
+ info->addr = c->conn_info.addr;
+
+ dht_set_state(dht, DHT_JOINING);
+
+ if (pthread_create(&thr, NULL, join_thr, info)) {
+ dht_set_state(dht, DHT_INIT);
+ free(info);
+ return;
+ }
+ pthread_detach(thr);
+ break;
+ case DHT_RUNNING:
+ lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
+ if (lu != NULL)
+ lookup_destroy(lu);
+ break;
+ default:
+ break;
+ }
+ }
+}
+
struct dht * dht_create(uint64_t addr)
{
struct dht * dht;
@@ -2404,6 +2491,9 @@ struct dht * dht_create(uint64_t addr)
if (pthread_mutex_init(&dht->mtx, NULL))
goto fail_mutex;
+ if (pthread_cond_init(&dht->cond, NULL))
+ goto fail_cond;
+
dht->cookies = bmp_create(DHT_MAX_REQS, 1);
if (dht->cookies == NULL)
goto fail_bmp;
@@ -2413,13 +2503,17 @@ struct dht * dht_create(uint64_t addr)
dht->id = NULL;
#ifndef __DHT_TEST__
dht->fd = dt_reg_ae(dht, &dht_post_sdu);
-#endif /* __DHT_TEST__ */
-
+ notifier_reg(handle_event, dht);
+#else
+ (void) handle_event;
+#endif
dht->state = DHT_INIT;
return dht;
fail_bmp:
+ pthread_cond_destroy(&dht->cond);
+ fail_cond:
pthread_mutex_destroy(&dht->mtx);
fail_mutex:
pthread_rwlock_destroy(&dht->lock);