diff options
Diffstat (limited to 'src/ipcpd/normal/dht.c')
-rw-r--r-- | src/ipcpd/normal/dht.c | 126 |
1 files changed, 110 insertions, 16 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index b1ba44a8..e4c37884 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -31,10 +31,12 @@ #include <ouroboros/errno.h> #include <ouroboros/logs.h> #include <ouroboros/list.h> +#include <ouroboros/notifier.h> #include <ouroboros/random.h> #include <ouroboros/time_utils.h> #include <ouroboros/utils.h> +#include "connmgr.h" #include "dht.h" #include "dt.h" @@ -63,9 +65,12 @@ typedef KadContactMsg kad_contact_msg_t; #define KAD_QUEER 15 /* Time to declare peer questionable. */ #define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ #define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ +#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */ +#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ enum dht_state { DHT_INIT = 0, + DHT_JOINING, DHT_RUNNING, DHT_SHUTDOWN, }; @@ -207,6 +212,7 @@ struct dht { struct bmp * cookies; enum dht_state state; + pthread_cond_t cond; pthread_mutex_t mtx; pthread_rwlock_t lock; @@ -216,6 +222,11 @@ struct dht { pthread_t worker; }; +struct join_info { + struct dht * dht; + uint64_t addr; +}; + static uint8_t * dht_dup_key(const uint8_t * key, size_t len) { @@ -250,9 +261,28 @@ static void dht_set_state(struct dht * dht, dht->state = state; + pthread_cond_signal(&dht->cond); + pthread_mutex_unlock(&dht->mtx); } +static int dht_wait_running(struct dht * dht) +{ + int ret = 0; + + pthread_mutex_lock(&dht->mtx); + + while (dht->state == DHT_JOINING) + pthread_cond_wait(&dht->cond, &dht->mtx); + + if (dht->state != DHT_RUNNING) + ret = -1; + + pthread_mutex_unlock(&dht->mtx); + + return ret; +} + static uint8_t * create_id(size_t len) { uint8_t * id; @@ -1626,7 +1656,6 @@ static int kad_join(struct dht * dht, uint64_t addr) { kad_msg_t msg = KAD_MSG__INIT; - struct lookup * lu; msg.code = KAD_JOIN; @@ -1657,10 +1686,6 @@ static int kad_join(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); - if (lu != NULL) - lookup_destroy(lu); - return 0; } @@ -2074,14 +2099,6 @@ int dht_bootstrap(struct dht * dht, return -1; } -int dht_enroll(struct dht * dht, - uint64_t addr) -{ - assert(dht); - - return kad_join(dht, addr); -} - int dht_reg(struct dht * dht, const uint8_t * key) { @@ -2222,8 +2239,9 @@ void dht_post_sdu(void * ae, return; } - if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) { + if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { kad_msg__free_unpacked(msg, NULL); + log_dbg("Got a request message when not running."); return; } @@ -2383,6 +2401,75 @@ void dht_destroy(struct dht * dht) free(dht); } +static void * join_thr(void * o) +{ + struct join_info * info = (struct join_info *) o; + struct lookup * lu; + size_t retr = 0; + + assert(info); + + while (kad_join(info->dht, info->addr)) { + if (retr++ == KAD_JOIN_RETR) { + dht_set_state(info->dht, DHT_INIT); + goto finish; + } + + sleep(KAD_JOIN_INTV); + } + + lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + + finish: + free(info); + + return (void *) 0; +} + +static void handle_event(void * self, + int event, + const void * o) +{ + struct dht * dht = (struct dht *) self; + + if (event == NOTIFY_DT_CONN_ADD) { + struct lookup * lu; + pthread_t thr; + struct join_info * info; + struct conn * c = (struct conn *) o; + enum dht_state state = dht_get_state(dht); + + switch(state) { + case DHT_INIT: + info = malloc(sizeof(*info)); + if (info == NULL) + break; + + info->dht = dht; + info->addr = c->conn_info.addr; + + dht_set_state(dht, DHT_JOINING); + + if (pthread_create(&thr, NULL, join_thr, info)) { + dht_set_state(dht, DHT_INIT); + free(info); + return; + } + pthread_detach(thr); + break; + case DHT_RUNNING: + lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + break; + default: + break; + } + } +} + struct dht * dht_create(uint64_t addr) { struct dht * dht; @@ -2404,6 +2491,9 @@ struct dht * dht_create(uint64_t addr) if (pthread_mutex_init(&dht->mtx, NULL)) goto fail_mutex; + if (pthread_cond_init(&dht->cond, NULL)) + goto fail_cond; + dht->cookies = bmp_create(DHT_MAX_REQS, 1); if (dht->cookies == NULL) goto fail_bmp; @@ -2413,13 +2503,17 @@ struct dht * dht_create(uint64_t addr) dht->id = NULL; #ifndef __DHT_TEST__ dht->fd = dt_reg_ae(dht, &dht_post_sdu); -#endif /* __DHT_TEST__ */ - + notifier_reg(handle_event, dht); +#else + (void) handle_event; +#endif dht->state = DHT_INIT; return dht; fail_bmp: + pthread_cond_destroy(&dht->cond); + fail_cond: pthread_mutex_destroy(&dht->mtx); fail_mutex: pthread_rwlock_destroy(&dht->lock); |