diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-19 17:47:26 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-19 18:46:13 +0200 |
commit | 1dcef3957393c0500b81d93ffacf573e78be9a51 (patch) | |
tree | cf29abf695c1d53251560433441c29af57333e4b /src | |
parent | 115431af51795dfd583e24a051a7749c58a900b3 (diff) | |
download | ouroboros-1dcef3957393c0500b81d93ffacf573e78be9a51.tar.gz ouroboros-1dcef3957393c0500b81d93ffacf573e78be9a51.zip |
ipcpd: Enroll DHT when creating dt connection
The DHT will now enroll or sync when a data transfer connection is
added. This avoids the need to create a temporary data transfer
connection during enrollment (and speeds it up considerably).
The notifier system was modified to take an opaque pointer to the
object that registers as a parameter.
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/normal/dht.c | 126 | ||||
-rw-r--r-- | src/ipcpd/normal/dht.h | 3 | ||||
-rw-r--r-- | src/ipcpd/normal/dir.c | 18 | ||||
-rw-r--r-- | src/ipcpd/normal/dir.h | 2 | ||||
-rw-r--r-- | src/ipcpd/normal/dt.c | 7 | ||||
-rw-r--r-- | src/ipcpd/normal/main.c | 41 | ||||
-rw-r--r-- | src/ipcpd/normal/pol/link_state.c | 32 | ||||
-rw-r--r-- | src/lib/notifier.c | 13 |
8 files changed, 148 insertions, 94 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index b1ba44a8..e4c37884 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -31,10 +31,12 @@ #include <ouroboros/errno.h> #include <ouroboros/logs.h> #include <ouroboros/list.h> +#include <ouroboros/notifier.h> #include <ouroboros/random.h> #include <ouroboros/time_utils.h> #include <ouroboros/utils.h> +#include "connmgr.h" #include "dht.h" #include "dt.h" @@ -63,9 +65,12 @@ typedef KadContactMsg kad_contact_msg_t; #define KAD_QUEER 15 /* Time to declare peer questionable. */ #define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ #define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ +#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */ +#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ enum dht_state { DHT_INIT = 0, + DHT_JOINING, DHT_RUNNING, DHT_SHUTDOWN, }; @@ -207,6 +212,7 @@ struct dht { struct bmp * cookies; enum dht_state state; + pthread_cond_t cond; pthread_mutex_t mtx; pthread_rwlock_t lock; @@ -216,6 +222,11 @@ struct dht { pthread_t worker; }; +struct join_info { + struct dht * dht; + uint64_t addr; +}; + static uint8_t * dht_dup_key(const uint8_t * key, size_t len) { @@ -250,9 +261,28 @@ static void dht_set_state(struct dht * dht, dht->state = state; + pthread_cond_signal(&dht->cond); + pthread_mutex_unlock(&dht->mtx); } +static int dht_wait_running(struct dht * dht) +{ + int ret = 0; + + pthread_mutex_lock(&dht->mtx); + + while (dht->state == DHT_JOINING) + pthread_cond_wait(&dht->cond, &dht->mtx); + + if (dht->state != DHT_RUNNING) + ret = -1; + + pthread_mutex_unlock(&dht->mtx); + + return ret; +} + static uint8_t * create_id(size_t len) { uint8_t * id; @@ -1626,7 +1656,6 @@ static int kad_join(struct dht * dht, uint64_t addr) { kad_msg_t msg = KAD_MSG__INIT; - struct lookup * lu; msg.code = KAD_JOIN; @@ -1657,10 +1686,6 @@ static int kad_join(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); - if (lu != NULL) - lookup_destroy(lu); - return 0; } @@ -2074,14 +2099,6 @@ int dht_bootstrap(struct dht * dht, return -1; } -int dht_enroll(struct dht * dht, - uint64_t addr) -{ - assert(dht); - - return kad_join(dht, addr); -} - int dht_reg(struct dht * dht, const uint8_t * key) { @@ -2222,8 +2239,9 @@ void dht_post_sdu(void * ae, return; } - if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) { + if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { kad_msg__free_unpacked(msg, NULL); + log_dbg("Got a request message when not running."); return; } @@ -2383,6 +2401,75 @@ void dht_destroy(struct dht * dht) free(dht); } +static void * join_thr(void * o) +{ + struct join_info * info = (struct join_info *) o; + struct lookup * lu; + size_t retr = 0; + + assert(info); + + while (kad_join(info->dht, info->addr)) { + if (retr++ == KAD_JOIN_RETR) { + dht_set_state(info->dht, DHT_INIT); + goto finish; + } + + sleep(KAD_JOIN_INTV); + } + + lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + + finish: + free(info); + + return (void *) 0; +} + +static void handle_event(void * self, + int event, + const void * o) +{ + struct dht * dht = (struct dht *) self; + + if (event == NOTIFY_DT_CONN_ADD) { + struct lookup * lu; + pthread_t thr; + struct join_info * info; + struct conn * c = (struct conn *) o; + enum dht_state state = dht_get_state(dht); + + switch(state) { + case DHT_INIT: + info = malloc(sizeof(*info)); + if (info == NULL) + break; + + info->dht = dht; + info->addr = c->conn_info.addr; + + dht_set_state(dht, DHT_JOINING); + + if (pthread_create(&thr, NULL, join_thr, info)) { + dht_set_state(dht, DHT_INIT); + free(info); + return; + } + pthread_detach(thr); + break; + case DHT_RUNNING: + lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + break; + default: + break; + } + } +} + struct dht * dht_create(uint64_t addr) { struct dht * dht; @@ -2404,6 +2491,9 @@ struct dht * dht_create(uint64_t addr) if (pthread_mutex_init(&dht->mtx, NULL)) goto fail_mutex; + if (pthread_cond_init(&dht->cond, NULL)) + goto fail_cond; + dht->cookies = bmp_create(DHT_MAX_REQS, 1); if (dht->cookies == NULL) goto fail_bmp; @@ -2413,13 +2503,17 @@ struct dht * dht_create(uint64_t addr) dht->id = NULL; #ifndef __DHT_TEST__ dht->fd = dt_reg_ae(dht, &dht_post_sdu); -#endif /* __DHT_TEST__ */ - + notifier_reg(handle_event, dht); +#else + (void) handle_event; +#endif dht->state = DHT_INIT; return dht; fail_bmp: + pthread_cond_destroy(&dht->cond); + fail_cond: pthread_mutex_destroy(&dht->mtx); fail_mutex: pthread_rwlock_destroy(&dht->lock); diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h index d893f913..e4647a08 100644 --- a/src/ipcpd/normal/dht.h +++ b/src/ipcpd/normal/dht.h @@ -36,9 +36,6 @@ int dht_bootstrap(struct dht * dht, size_t b, time_t t_expire); -int dht_enroll(struct dht * dht, - uint64_t addr); - void dht_destroy(struct dht * dht); int dht_reg(struct dht * dht, diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 6d04c66a..0d046cd6 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -39,8 +39,6 @@ #include <inttypes.h> #define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT) -#define ENROL_RETR 6 -#define ENROL_INTV 1 struct dht * dht; @@ -72,22 +70,6 @@ int dir_bootstrap(void) { return 0; } -int dir_enroll(uint64_t addr) { - size_t retr = 0; - log_dbg("Enrolling directory with peer %" PRIu64 ".", addr); - while (dht_enroll(dht, addr)) { - if (retr++ == ENROL_RETR) - return -EPERM; - - log_dbg("Directory enrollment failed, retrying..."); - sleep(ENROL_INTV); - } - - log_info("Directory enrolled."); - - return 0; -} - int dir_reg(const uint8_t * hash) { return dht_reg(dht, hash); diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h index 13571b68..68959836 100644 --- a/src/ipcpd/normal/dir.h +++ b/src/ipcpd/normal/dir.h @@ -29,8 +29,6 @@ void dir_fini(void); int dir_bootstrap(void); -int dir_enroll(uint64_t peer); - int dir_reg(const uint8_t * hash); int dir_unreg(const uint8_t * hash); diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 56cb5a61..2db0e7e2 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -67,11 +67,14 @@ struct { pthread_t listener; } dt; -static void handle_event(int event, +static void handle_event(void * self, + int event, const void * o) { struct conn * c; + (void) self; + c = (struct conn *) o; switch (event) { @@ -182,7 +185,7 @@ int dt_init(enum pol_routing pr, goto fail_pci_init; } - if (notifier_reg(handle_event)) { + if (notifier_reg(handle_event, NULL)) { log_err("Failed to register with notifier."); goto fail_notifier_reg; } diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index e6dd6717..6cfea4bc 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -182,29 +182,18 @@ static int bootstrap_components(void) return 0; } -static int enroll_components(uint64_t peer) -{ - if (dir_enroll(peer)) { - log_err("Failed to enroll directory."); - return -1; - } - - return 0; -} - static int normal_ipcp_enroll(const char * dst, struct dif_info * info) { - struct conn er_conn; - struct conn dt_conn; + struct conn conn; - if (connmgr_alloc(AEID_ENROLL, dst, NULL, &er_conn)) { + if (connmgr_alloc(AEID_ENROLL, dst, NULL, &conn)) { log_err("Failed to get connection."); goto fail_er_flow; } /* Get boot state from peer. */ - if (enroll_boot(&er_conn)) { + if (enroll_boot(&conn)) { log_err("Failed to get boot information."); goto fail_enroll_boot; } @@ -219,29 +208,15 @@ static int normal_ipcp_enroll(const char * dst, goto fail_dt_start; } - if (connmgr_alloc(AEID_DT, dst, NULL, &dt_conn)) { - log_err("Failed to create a data transfer flow."); - goto fail_dt_flow; - } - if (start_components()) { log_err("Failed to start components."); goto fail_start_comp; } - if (enroll_components(dt_conn.conn_info.addr)) { - enroll_done(&er_conn, -1); - log_err("Failed to enroll components."); - goto fail_enroll_comp; - } - - if (enroll_done(&er_conn, 0)) + if (enroll_done(&conn, 0)) log_warn("Failed to confirm enrollment with peer."); - if (connmgr_dealloc(AEID_DT, &dt_conn)) - log_warn("Failed to deallocate data transfer flow."); - - if (connmgr_dealloc(AEID_ENROLL, &er_conn)) + if (connmgr_dealloc(AEID_ENROLL, &conn)) log_warn("Failed to deallocate enrollment flow."); log_info("Enrolled with %s.", dst); @@ -251,16 +226,12 @@ static int normal_ipcp_enroll(const char * dst, return 0; - fail_enroll_comp: - stop_components(); fail_start_comp: - connmgr_dealloc(AEID_DT, &dt_conn); - fail_dt_flow: dt_stop(); fail_dt_start: finalize_components(); fail_enroll_boot: - connmgr_dealloc(AEID_ENROLL, &er_conn); + connmgr_dealloc(AEID_ENROLL, &conn); fail_er_flow: return -1; } diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c index df0933c6..2823f28e 100644 --- a/src/ipcpd/normal/pol/link_state.c +++ b/src/ipcpd/normal/pol/link_state.c @@ -54,7 +54,7 @@ typedef LinkStateMsg link_state_msg_t; #define RECALC_TIME 4 #define LS_UPDATE_TIME 15 #define LS_TIMEO 60 -#define LSA_MAX_LEN 128 +#define LSM_MAX_LEN 128 #define LSDB "lsdb" #ifndef CLOCK_REALTIME_COARSE @@ -416,22 +416,22 @@ static void * calculate_pff(void * o) return (void *) 0; } -static void send_lsa(uint64_t src, +static void send_lsm(uint64_t src, uint64_t dst) { - uint8_t buf[LSA_MAX_LEN]; - link_state_msg_t lsa = LINK_STATE_MSG__INIT; + uint8_t buf[LSM_MAX_LEN]; + link_state_msg_t lsm = LINK_STATE_MSG__INIT; size_t len; struct list_head * p; - lsa.d_addr = dst; - lsa.s_addr = src; + lsm.d_addr = dst; + lsm.s_addr = src; - len = link_state_msg__get_packed_size(&lsa); + len = link_state_msg__get_packed_size(&lsm); - assert(len <= LSA_MAX_LEN); + assert(len <= LSM_MAX_LEN); - link_state_msg__pack(&lsa, buf); + link_state_msg__pack(&lsm, buf); list_for_each(p, &ls.nbs) { struct nb * nb = list_entry(p, struct nb, next); @@ -471,7 +471,7 @@ static void * lsupdate(void * o) } if (adj->src == ipcpi.dt_addr) { - send_lsa(adj->src, adj->dst); + send_lsm(adj->src, adj->dst); adj->stamp = now.tv_sec; } } @@ -526,7 +526,7 @@ static void * lsreader(void * o) { fqueue_t * fq; int ret; - uint8_t buf[LSA_MAX_LEN]; + uint8_t buf[LSM_MAX_LEN]; size_t len; int fd; qosspec_t qs; @@ -551,7 +551,7 @@ static void * lsreader(void * o) while ((fd = fqueue_next(fq)) >= 0) { link_state_msg_t * msg; - len = flow_read(fd, buf, LSA_MAX_LEN); + len = flow_read(fd, buf, LSM_MAX_LEN); if (len <= 0) continue; @@ -574,13 +574,16 @@ static void * lsreader(void * o) return (void *) 0; } -static void handle_event(int event, +static void handle_event(void * self, + int event, const void * o) { /* FIXME: Apply correct QoS on graph */ struct conn * c; qosspec_t qs; + (void) self; + c = (struct conn *) o; memset(&qs, 0, sizeof(qs)); @@ -592,6 +595,7 @@ static void handle_event(int event, if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, &qs)) log_dbg("Failed to add adjacency to LSDB."); + send_lsm(ipcpi.dt_addr, c->conn_info.addr); break; case NOTIFY_DT_CONN_DEL: if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) @@ -666,7 +670,7 @@ int link_state_init(void) if (ls.graph == NULL) goto fail_graph; - if (notifier_reg(handle_event)) + if (notifier_reg(handle_event, NULL)) goto fail_notifier_reg; if (pthread_rwlock_init(&ls.db_lock, NULL)) diff --git a/src/lib/notifier.c b/src/lib/notifier.c index cfd383d4..5c38cf58 100644 --- a/src/lib/notifier.c +++ b/src/lib/notifier.c @@ -30,6 +30,7 @@ struct listener { struct list_head next; notifier_fn_t callback; + void * obj; }; struct { @@ -72,13 +73,16 @@ void notifier_event(int event, pthread_mutex_lock(¬ifier.lock); - list_for_each(p, ¬ifier.listeners) - list_entry(p, struct listener, next)->callback(event, o); + list_for_each(p, ¬ifier.listeners) { + struct listener * l = list_entry(p, struct listener, next); + l->callback(l->obj, event, o); + } pthread_mutex_unlock(¬ifier.lock); } -int notifier_reg(notifier_fn_t callback) +int notifier_reg(notifier_fn_t callback, + void * obj) { struct listener * l; struct list_head * p; @@ -100,8 +104,9 @@ int notifier_reg(notifier_fn_t callback) } l->callback = callback; + l->obj = obj; - list_add(&l->next, ¬ifier.listeners); + list_add_tail(&l->next, ¬ifier.listeners); pthread_mutex_unlock(¬ifier.lock); |