summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-09-19 16:48:54 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-09-19 16:48:54 +0000
commitacd8235533d63a7e4dd4d1b5bdb17423d4b6fd75 (patch)
tree195bbf3a1aa1923394410db072f72c0b38aede8c
parentc0bbd7e2184f51038ecbaebfa3b21e5c52d35239 (diff)
parent1dcef3957393c0500b81d93ffacf573e78be9a51 (diff)
downloadouroboros-acd8235533d63a7e4dd4d1b5bdb17423d4b6fd75.tar.gz
ouroboros-acd8235533d63a7e4dd4d1b5bdb17423d4b6fd75.zip
Merged in dstaesse/ouroboros/be-dht-async (pull request #598)
ipcpd: Enroll DHT when creating dt connection
-rw-r--r--include/ouroboros/notifier.h6
-rw-r--r--src/ipcpd/normal/dht.c126
-rw-r--r--src/ipcpd/normal/dht.h3
-rw-r--r--src/ipcpd/normal/dir.c18
-rw-r--r--src/ipcpd/normal/dir.h2
-rw-r--r--src/ipcpd/normal/dt.c7
-rw-r--r--src/ipcpd/normal/main.c41
-rw-r--r--src/ipcpd/normal/pol/link_state.c32
-rw-r--r--src/lib/notifier.c13
9 files changed, 152 insertions, 96 deletions
diff --git a/include/ouroboros/notifier.h b/include/ouroboros/notifier.h
index 7a70f95f..037d5daf 100644
--- a/include/ouroboros/notifier.h
+++ b/include/ouroboros/notifier.h
@@ -23,7 +23,8 @@
#ifndef OUROBOROS_LIB_NOTIFIER_H
#define OUROBOROS_LIB_NOTIFIER_H
-typedef void (* notifier_fn_t)(int event,
+typedef void (* notifier_fn_t)(void * self,
+ int event,
const void * o);
int notifier_init(void);
@@ -33,7 +34,8 @@ void notifier_fini(void);
void notifier_event(int event,
const void * o);
-int notifier_reg(notifier_fn_t callback);
+int notifier_reg(notifier_fn_t callback,
+ void * obj);
void notifier_unreg(notifier_fn_t callback);
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index b1ba44a8..e4c37884 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -31,10 +31,12 @@
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
#include <ouroboros/list.h>
+#include <ouroboros/notifier.h>
#include <ouroboros/random.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/utils.h>
+#include "connmgr.h"
#include "dht.h"
#include "dt.h"
@@ -63,9 +65,12 @@ typedef KadContactMsg kad_contact_msg_t;
#define KAD_QUEER 15 /* Time to declare peer questionable. */
#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */
#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */
+#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */
+#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */
enum dht_state {
DHT_INIT = 0,
+ DHT_JOINING,
DHT_RUNNING,
DHT_SHUTDOWN,
};
@@ -207,6 +212,7 @@ struct dht {
struct bmp * cookies;
enum dht_state state;
+ pthread_cond_t cond;
pthread_mutex_t mtx;
pthread_rwlock_t lock;
@@ -216,6 +222,11 @@ struct dht {
pthread_t worker;
};
+struct join_info {
+ struct dht * dht;
+ uint64_t addr;
+};
+
static uint8_t * dht_dup_key(const uint8_t * key,
size_t len)
{
@@ -250,9 +261,28 @@ static void dht_set_state(struct dht * dht,
dht->state = state;
+ pthread_cond_signal(&dht->cond);
+
pthread_mutex_unlock(&dht->mtx);
}
+static int dht_wait_running(struct dht * dht)
+{
+ int ret = 0;
+
+ pthread_mutex_lock(&dht->mtx);
+
+ while (dht->state == DHT_JOINING)
+ pthread_cond_wait(&dht->cond, &dht->mtx);
+
+ if (dht->state != DHT_RUNNING)
+ ret = -1;
+
+ pthread_mutex_unlock(&dht->mtx);
+
+ return ret;
+}
+
static uint8_t * create_id(size_t len)
{
uint8_t * id;
@@ -1626,7 +1656,6 @@ static int kad_join(struct dht * dht,
uint64_t addr)
{
kad_msg_t msg = KAD_MSG__INIT;
- struct lookup * lu;
msg.code = KAD_JOIN;
@@ -1657,10 +1686,6 @@ static int kad_join(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
- lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
- if (lu != NULL)
- lookup_destroy(lu);
-
return 0;
}
@@ -2074,14 +2099,6 @@ int dht_bootstrap(struct dht * dht,
return -1;
}
-int dht_enroll(struct dht * dht,
- uint64_t addr)
-{
- assert(dht);
-
- return kad_join(dht, addr);
-}
-
int dht_reg(struct dht * dht,
const uint8_t * key)
{
@@ -2222,8 +2239,9 @@ void dht_post_sdu(void * ae,
return;
}
- if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) {
+ if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
kad_msg__free_unpacked(msg, NULL);
+ log_dbg("Got a request message when not running.");
return;
}
@@ -2383,6 +2401,75 @@ void dht_destroy(struct dht * dht)
free(dht);
}
+static void * join_thr(void * o)
+{
+ struct join_info * info = (struct join_info *) o;
+ struct lookup * lu;
+ size_t retr = 0;
+
+ assert(info);
+
+ while (kad_join(info->dht, info->addr)) {
+ if (retr++ == KAD_JOIN_RETR) {
+ dht_set_state(info->dht, DHT_INIT);
+ goto finish;
+ }
+
+ sleep(KAD_JOIN_INTV);
+ }
+
+ lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE);
+ if (lu != NULL)
+ lookup_destroy(lu);
+
+ finish:
+ free(info);
+
+ return (void *) 0;
+}
+
+static void handle_event(void * self,
+ int event,
+ const void * o)
+{
+ struct dht * dht = (struct dht *) self;
+
+ if (event == NOTIFY_DT_CONN_ADD) {
+ struct lookup * lu;
+ pthread_t thr;
+ struct join_info * info;
+ struct conn * c = (struct conn *) o;
+ enum dht_state state = dht_get_state(dht);
+
+ switch(state) {
+ case DHT_INIT:
+ info = malloc(sizeof(*info));
+ if (info == NULL)
+ break;
+
+ info->dht = dht;
+ info->addr = c->conn_info.addr;
+
+ dht_set_state(dht, DHT_JOINING);
+
+ if (pthread_create(&thr, NULL, join_thr, info)) {
+ dht_set_state(dht, DHT_INIT);
+ free(info);
+ return;
+ }
+ pthread_detach(thr);
+ break;
+ case DHT_RUNNING:
+ lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
+ if (lu != NULL)
+ lookup_destroy(lu);
+ break;
+ default:
+ break;
+ }
+ }
+}
+
struct dht * dht_create(uint64_t addr)
{
struct dht * dht;
@@ -2404,6 +2491,9 @@ struct dht * dht_create(uint64_t addr)
if (pthread_mutex_init(&dht->mtx, NULL))
goto fail_mutex;
+ if (pthread_cond_init(&dht->cond, NULL))
+ goto fail_cond;
+
dht->cookies = bmp_create(DHT_MAX_REQS, 1);
if (dht->cookies == NULL)
goto fail_bmp;
@@ -2413,13 +2503,17 @@ struct dht * dht_create(uint64_t addr)
dht->id = NULL;
#ifndef __DHT_TEST__
dht->fd = dt_reg_ae(dht, &dht_post_sdu);
-#endif /* __DHT_TEST__ */
-
+ notifier_reg(handle_event, dht);
+#else
+ (void) handle_event;
+#endif
dht->state = DHT_INIT;
return dht;
fail_bmp:
+ pthread_cond_destroy(&dht->cond);
+ fail_cond:
pthread_mutex_destroy(&dht->mtx);
fail_mutex:
pthread_rwlock_destroy(&dht->lock);
diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h
index d893f913..e4647a08 100644
--- a/src/ipcpd/normal/dht.h
+++ b/src/ipcpd/normal/dht.h
@@ -36,9 +36,6 @@ int dht_bootstrap(struct dht * dht,
size_t b,
time_t t_expire);
-int dht_enroll(struct dht * dht,
- uint64_t addr);
-
void dht_destroy(struct dht * dht);
int dht_reg(struct dht * dht,
diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c
index 6d04c66a..0d046cd6 100644
--- a/src/ipcpd/normal/dir.c
+++ b/src/ipcpd/normal/dir.c
@@ -39,8 +39,6 @@
#include <inttypes.h>
#define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT)
-#define ENROL_RETR 6
-#define ENROL_INTV 1
struct dht * dht;
@@ -72,22 +70,6 @@ int dir_bootstrap(void) {
return 0;
}
-int dir_enroll(uint64_t addr) {
- size_t retr = 0;
- log_dbg("Enrolling directory with peer %" PRIu64 ".", addr);
- while (dht_enroll(dht, addr)) {
- if (retr++ == ENROL_RETR)
- return -EPERM;
-
- log_dbg("Directory enrollment failed, retrying...");
- sleep(ENROL_INTV);
- }
-
- log_info("Directory enrolled.");
-
- return 0;
-}
-
int dir_reg(const uint8_t * hash)
{
return dht_reg(dht, hash);
diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h
index 13571b68..68959836 100644
--- a/src/ipcpd/normal/dir.h
+++ b/src/ipcpd/normal/dir.h
@@ -29,8 +29,6 @@ void dir_fini(void);
int dir_bootstrap(void);
-int dir_enroll(uint64_t peer);
-
int dir_reg(const uint8_t * hash);
int dir_unreg(const uint8_t * hash);
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 56cb5a61..2db0e7e2 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -67,11 +67,14 @@ struct {
pthread_t listener;
} dt;
-static void handle_event(int event,
+static void handle_event(void * self,
+ int event,
const void * o)
{
struct conn * c;
+ (void) self;
+
c = (struct conn *) o;
switch (event) {
@@ -182,7 +185,7 @@ int dt_init(enum pol_routing pr,
goto fail_pci_init;
}
- if (notifier_reg(handle_event)) {
+ if (notifier_reg(handle_event, NULL)) {
log_err("Failed to register with notifier.");
goto fail_notifier_reg;
}
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index e6dd6717..6cfea4bc 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -182,29 +182,18 @@ static int bootstrap_components(void)
return 0;
}
-static int enroll_components(uint64_t peer)
-{
- if (dir_enroll(peer)) {
- log_err("Failed to enroll directory.");
- return -1;
- }
-
- return 0;
-}
-
static int normal_ipcp_enroll(const char * dst,
struct dif_info * info)
{
- struct conn er_conn;
- struct conn dt_conn;
+ struct conn conn;
- if (connmgr_alloc(AEID_ENROLL, dst, NULL, &er_conn)) {
+ if (connmgr_alloc(AEID_ENROLL, dst, NULL, &conn)) {
log_err("Failed to get connection.");
goto fail_er_flow;
}
/* Get boot state from peer. */
- if (enroll_boot(&er_conn)) {
+ if (enroll_boot(&conn)) {
log_err("Failed to get boot information.");
goto fail_enroll_boot;
}
@@ -219,29 +208,15 @@ static int normal_ipcp_enroll(const char * dst,
goto fail_dt_start;
}
- if (connmgr_alloc(AEID_DT, dst, NULL, &dt_conn)) {
- log_err("Failed to create a data transfer flow.");
- goto fail_dt_flow;
- }
-
if (start_components()) {
log_err("Failed to start components.");
goto fail_start_comp;
}
- if (enroll_components(dt_conn.conn_info.addr)) {
- enroll_done(&er_conn, -1);
- log_err("Failed to enroll components.");
- goto fail_enroll_comp;
- }
-
- if (enroll_done(&er_conn, 0))
+ if (enroll_done(&conn, 0))
log_warn("Failed to confirm enrollment with peer.");
- if (connmgr_dealloc(AEID_DT, &dt_conn))
- log_warn("Failed to deallocate data transfer flow.");
-
- if (connmgr_dealloc(AEID_ENROLL, &er_conn))
+ if (connmgr_dealloc(AEID_ENROLL, &conn))
log_warn("Failed to deallocate enrollment flow.");
log_info("Enrolled with %s.", dst);
@@ -251,16 +226,12 @@ static int normal_ipcp_enroll(const char * dst,
return 0;
- fail_enroll_comp:
- stop_components();
fail_start_comp:
- connmgr_dealloc(AEID_DT, &dt_conn);
- fail_dt_flow:
dt_stop();
fail_dt_start:
finalize_components();
fail_enroll_boot:
- connmgr_dealloc(AEID_ENROLL, &er_conn);
+ connmgr_dealloc(AEID_ENROLL, &conn);
fail_er_flow:
return -1;
}
diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c
index df0933c6..2823f28e 100644
--- a/src/ipcpd/normal/pol/link_state.c
+++ b/src/ipcpd/normal/pol/link_state.c
@@ -54,7 +54,7 @@ typedef LinkStateMsg link_state_msg_t;
#define RECALC_TIME 4
#define LS_UPDATE_TIME 15
#define LS_TIMEO 60
-#define LSA_MAX_LEN 128
+#define LSM_MAX_LEN 128
#define LSDB "lsdb"
#ifndef CLOCK_REALTIME_COARSE
@@ -416,22 +416,22 @@ static void * calculate_pff(void * o)
return (void *) 0;
}
-static void send_lsa(uint64_t src,
+static void send_lsm(uint64_t src,
uint64_t dst)
{
- uint8_t buf[LSA_MAX_LEN];
- link_state_msg_t lsa = LINK_STATE_MSG__INIT;
+ uint8_t buf[LSM_MAX_LEN];
+ link_state_msg_t lsm = LINK_STATE_MSG__INIT;
size_t len;
struct list_head * p;
- lsa.d_addr = dst;
- lsa.s_addr = src;
+ lsm.d_addr = dst;
+ lsm.s_addr = src;
- len = link_state_msg__get_packed_size(&lsa);
+ len = link_state_msg__get_packed_size(&lsm);
- assert(len <= LSA_MAX_LEN);
+ assert(len <= LSM_MAX_LEN);
- link_state_msg__pack(&lsa, buf);
+ link_state_msg__pack(&lsm, buf);
list_for_each(p, &ls.nbs) {
struct nb * nb = list_entry(p, struct nb, next);
@@ -471,7 +471,7 @@ static void * lsupdate(void * o)
}
if (adj->src == ipcpi.dt_addr) {
- send_lsa(adj->src, adj->dst);
+ send_lsm(adj->src, adj->dst);
adj->stamp = now.tv_sec;
}
}
@@ -526,7 +526,7 @@ static void * lsreader(void * o)
{
fqueue_t * fq;
int ret;
- uint8_t buf[LSA_MAX_LEN];
+ uint8_t buf[LSM_MAX_LEN];
size_t len;
int fd;
qosspec_t qs;
@@ -551,7 +551,7 @@ static void * lsreader(void * o)
while ((fd = fqueue_next(fq)) >= 0) {
link_state_msg_t * msg;
- len = flow_read(fd, buf, LSA_MAX_LEN);
+ len = flow_read(fd, buf, LSM_MAX_LEN);
if (len <= 0)
continue;
@@ -574,13 +574,16 @@ static void * lsreader(void * o)
return (void *) 0;
}
-static void handle_event(int event,
+static void handle_event(void * self,
+ int event,
const void * o)
{
/* FIXME: Apply correct QoS on graph */
struct conn * c;
qosspec_t qs;
+ (void) self;
+
c = (struct conn *) o;
memset(&qs, 0, sizeof(qs));
@@ -592,6 +595,7 @@ static void handle_event(int event,
if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, &qs))
log_dbg("Failed to add adjacency to LSDB.");
+ send_lsm(ipcpi.dt_addr, c->conn_info.addr);
break;
case NOTIFY_DT_CONN_DEL:
if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd))
@@ -666,7 +670,7 @@ int link_state_init(void)
if (ls.graph == NULL)
goto fail_graph;
- if (notifier_reg(handle_event))
+ if (notifier_reg(handle_event, NULL))
goto fail_notifier_reg;
if (pthread_rwlock_init(&ls.db_lock, NULL))
diff --git a/src/lib/notifier.c b/src/lib/notifier.c
index cfd383d4..5c38cf58 100644
--- a/src/lib/notifier.c
+++ b/src/lib/notifier.c
@@ -30,6 +30,7 @@
struct listener {
struct list_head next;
notifier_fn_t callback;
+ void * obj;
};
struct {
@@ -72,13 +73,16 @@ void notifier_event(int event,
pthread_mutex_lock(&notifier.lock);
- list_for_each(p, &notifier.listeners)
- list_entry(p, struct listener, next)->callback(event, o);
+ list_for_each(p, &notifier.listeners) {
+ struct listener * l = list_entry(p, struct listener, next);
+ l->callback(l->obj, event, o);
+ }
pthread_mutex_unlock(&notifier.lock);
}
-int notifier_reg(notifier_fn_t callback)
+int notifier_reg(notifier_fn_t callback,
+ void * obj)
{
struct listener * l;
struct list_head * p;
@@ -100,8 +104,9 @@ int notifier_reg(notifier_fn_t callback)
}
l->callback = callback;
+ l->obj = obj;
- list_add(&l->next, &notifier.listeners);
+ list_add_tail(&l->next, &notifier.listeners);
pthread_mutex_unlock(&notifier.lock);