From 1dcef3957393c0500b81d93ffacf573e78be9a51 Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@ugent.be>
Date: Tue, 19 Sep 2017 17:47:26 +0200
Subject: ipcpd: Enroll DHT when creating dt connection

The DHT will now enroll or sync when a data transfer connection is
added. This avoids the need to create a temporary data transfer
connection during enrollment (and speeds it up considerably).

The notifier system was modified to take an opaque pointer to the
object that registers as a parameter.
---
 src/ipcpd/normal/dht.c            | 126 +++++++++++++++++++++++++++++++++-----
 src/ipcpd/normal/dht.h            |   3 -
 src/ipcpd/normal/dir.c            |  18 ------
 src/ipcpd/normal/dir.h            |   2 -
 src/ipcpd/normal/dt.c             |   7 ++-
 src/ipcpd/normal/main.c           |  41 ++-----------
 src/ipcpd/normal/pol/link_state.c |  32 +++++-----
 7 files changed, 139 insertions(+), 90 deletions(-)

(limited to 'src/ipcpd/normal')

diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index b1ba44a8..e4c37884 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -31,10 +31,12 @@
 #include <ouroboros/errno.h>
 #include <ouroboros/logs.h>
 #include <ouroboros/list.h>
+#include <ouroboros/notifier.h>
 #include <ouroboros/random.h>
 #include <ouroboros/time_utils.h>
 #include <ouroboros/utils.h>
 
+#include "connmgr.h"
 #include "dht.h"
 #include "dt.h"
 
@@ -63,9 +65,12 @@ typedef KadContactMsg kad_contact_msg_t;
 #define KAD_QUEER     15   /* Time to declare peer questionable.         */
 #define KAD_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8. */
 #define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */
+#define KAD_JOIN_RETR 5    /* Number of retries sending a join.          */
+#define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.       */
 
 enum dht_state {
         DHT_INIT = 0,
+        DHT_JOINING,
         DHT_RUNNING,
         DHT_SHUTDOWN,
 };
@@ -207,6 +212,7 @@ struct dht {
         struct bmp *     cookies;
 
         enum dht_state   state;
+        pthread_cond_t   cond;
         pthread_mutex_t  mtx;
 
         pthread_rwlock_t lock;
@@ -216,6 +222,11 @@ struct dht {
         pthread_t        worker;
 };
 
+struct join_info {
+        struct dht * dht;
+        uint64_t     addr;
+};
+
 static uint8_t * dht_dup_key(const uint8_t * key,
                              size_t          len)
 {
@@ -250,9 +261,28 @@ static void dht_set_state(struct dht *   dht,
 
         dht->state = state;
 
+        pthread_cond_signal(&dht->cond);
+
         pthread_mutex_unlock(&dht->mtx);
 }
 
+static int dht_wait_running(struct dht * dht)
+{
+        int ret = 0;
+
+        pthread_mutex_lock(&dht->mtx);
+
+        while (dht->state == DHT_JOINING)
+                pthread_cond_wait(&dht->cond, &dht->mtx);
+
+        if (dht->state != DHT_RUNNING)
+                ret = -1;
+
+        pthread_mutex_unlock(&dht->mtx);
+
+        return ret;
+}
+
 static uint8_t * create_id(size_t len)
 {
         uint8_t * id;
@@ -1626,7 +1656,6 @@ static int kad_join(struct dht * dht,
                     uint64_t     addr)
 {
         kad_msg_t       msg = KAD_MSG__INIT;
-        struct lookup * lu;
 
         msg.code = KAD_JOIN;
 
@@ -1657,10 +1686,6 @@ static int kad_join(struct dht * dht,
 
         pthread_rwlock_unlock(&dht->lock);
 
-        lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
-        if (lu != NULL)
-                lookup_destroy(lu);
-
         return 0;
 }
 
@@ -2074,14 +2099,6 @@ int dht_bootstrap(struct dht * dht,
         return -1;
 }
 
-int dht_enroll(struct dht * dht,
-               uint64_t     addr)
-{
-        assert(dht);
-
-        return kad_join(dht, addr);
-}
-
 int dht_reg(struct dht *    dht,
             const uint8_t * key)
 {
@@ -2222,8 +2239,9 @@ void dht_post_sdu(void *               ae,
                 return;
         }
 
-        if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) {
+        if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
                 kad_msg__free_unpacked(msg, NULL);
+                log_dbg("Got a request message when not running.");
                 return;
         }
 
@@ -2383,6 +2401,75 @@ void dht_destroy(struct dht * dht)
         free(dht);
 }
 
+static void * join_thr(void * o)
+{
+        struct join_info * info = (struct join_info *) o;
+        struct lookup *    lu;
+        size_t             retr = 0;
+
+        assert(info);
+
+        while (kad_join(info->dht, info->addr)) {
+                if (retr++ == KAD_JOIN_RETR) {
+                        dht_set_state(info->dht, DHT_INIT);
+                        goto finish;
+                }
+
+                sleep(KAD_JOIN_INTV);
+        }
+
+        lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE);
+        if (lu != NULL)
+                lookup_destroy(lu);
+
+ finish:
+        free(info);
+
+        return (void *) 0;
+}
+
+static void handle_event(void *       self,
+                         int          event,
+                         const void * o)
+{
+        struct dht * dht = (struct dht *) self;
+
+        if (event == NOTIFY_DT_CONN_ADD) {
+                struct lookup *    lu;
+                pthread_t          thr;
+                struct join_info * info;
+                struct conn *      c     = (struct conn *) o;
+                enum dht_state     state = dht_get_state(dht);
+
+                switch(state) {
+                case DHT_INIT:
+                        info = malloc(sizeof(*info));
+                        if (info == NULL)
+                                break;
+
+                        info->dht  = dht;
+                        info->addr = c->conn_info.addr;
+
+                        dht_set_state(dht, DHT_JOINING);
+
+                        if (pthread_create(&thr, NULL, join_thr, info)) {
+                                dht_set_state(dht, DHT_INIT);
+                                free(info);
+                                return;
+                        }
+                        pthread_detach(thr);
+                        break;
+                case DHT_RUNNING:
+                        lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
+                        if (lu != NULL)
+                                lookup_destroy(lu);
+                        break;
+                default:
+                        break;
+                }
+        }
+}
+
 struct dht * dht_create(uint64_t addr)
 {
         struct dht * dht;
@@ -2404,6 +2491,9 @@ struct dht * dht_create(uint64_t addr)
         if (pthread_mutex_init(&dht->mtx, NULL))
                 goto fail_mutex;
 
+        if (pthread_cond_init(&dht->cond, NULL))
+                goto fail_cond;
+
         dht->cookies = bmp_create(DHT_MAX_REQS, 1);
         if (dht->cookies == NULL)
                 goto fail_bmp;
@@ -2413,13 +2503,17 @@ struct dht * dht_create(uint64_t addr)
         dht->id   = NULL;
 #ifndef __DHT_TEST__
         dht->fd   = dt_reg_ae(dht, &dht_post_sdu);
-#endif /* __DHT_TEST__ */
-
+        notifier_reg(handle_event, dht);
+#else
+        (void) handle_event;
+#endif
         dht->state = DHT_INIT;
 
         return dht;
 
  fail_bmp:
+        pthread_cond_destroy(&dht->cond);
+ fail_cond:
         pthread_mutex_destroy(&dht->mtx);
  fail_mutex:
         pthread_rwlock_destroy(&dht->lock);
diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h
index d893f913..e4647a08 100644
--- a/src/ipcpd/normal/dht.h
+++ b/src/ipcpd/normal/dht.h
@@ -36,9 +36,6 @@ int          dht_bootstrap(struct dht * dht,
                            size_t       b,
                            time_t       t_expire);
 
-int          dht_enroll(struct dht * dht,
-                        uint64_t     addr);
-
 void         dht_destroy(struct dht * dht);
 
 int          dht_reg(struct dht *    dht,
diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c
index 6d04c66a..0d046cd6 100644
--- a/src/ipcpd/normal/dir.c
+++ b/src/ipcpd/normal/dir.c
@@ -39,8 +39,6 @@
 #include <inttypes.h>
 
 #define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT)
-#define ENROL_RETR 6
-#define ENROL_INTV 1
 
 struct dht * dht;
 
@@ -72,22 +70,6 @@ int dir_bootstrap(void) {
         return 0;
 }
 
-int dir_enroll(uint64_t addr) {
-        size_t retr = 0;
-        log_dbg("Enrolling directory with peer %" PRIu64 ".", addr);
-        while (dht_enroll(dht, addr)) {
-                if (retr++ == ENROL_RETR)
-                        return -EPERM;
-
-                log_dbg("Directory enrollment failed, retrying...");
-                sleep(ENROL_INTV);
-        }
-
-        log_info("Directory enrolled.");
-
-        return 0;
-}
-
 int dir_reg(const uint8_t * hash)
 {
         return dht_reg(dht, hash);
diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h
index 13571b68..68959836 100644
--- a/src/ipcpd/normal/dir.h
+++ b/src/ipcpd/normal/dir.h
@@ -29,8 +29,6 @@ void     dir_fini(void);
 
 int      dir_bootstrap(void);
 
-int      dir_enroll(uint64_t peer);
-
 int      dir_reg(const uint8_t * hash);
 
 int      dir_unreg(const uint8_t * hash);
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 56cb5a61..2db0e7e2 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -67,11 +67,14 @@ struct {
         pthread_t          listener;
 } dt;
 
-static void handle_event(int          event,
+static void handle_event(void *       self,
+                         int          event,
                          const void * o)
 {
         struct conn * c;
 
+        (void) self;
+
         c = (struct conn *) o;
 
         switch (event) {
@@ -182,7 +185,7 @@ int dt_init(enum pol_routing pr,
                 goto fail_pci_init;
         }
 
-        if (notifier_reg(handle_event)) {
+        if (notifier_reg(handle_event, NULL)) {
                 log_err("Failed to register with notifier.");
                 goto fail_notifier_reg;
         }
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index e6dd6717..6cfea4bc 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -182,29 +182,18 @@ static int bootstrap_components(void)
         return 0;
 }
 
-static int enroll_components(uint64_t peer)
-{
-        if (dir_enroll(peer)) {
-                log_err("Failed to enroll directory.");
-                return -1;
-        }
-
-        return 0;
-}
-
 static int normal_ipcp_enroll(const char *      dst,
                               struct dif_info * info)
 {
-        struct conn er_conn;
-        struct conn dt_conn;
+        struct conn conn;
 
-        if (connmgr_alloc(AEID_ENROLL, dst, NULL, &er_conn)) {
+        if (connmgr_alloc(AEID_ENROLL, dst, NULL, &conn)) {
                 log_err("Failed to get connection.");
                 goto fail_er_flow;
         }
 
         /* Get boot state from peer. */
-        if (enroll_boot(&er_conn)) {
+        if (enroll_boot(&conn)) {
                 log_err("Failed to get boot information.");
                 goto fail_enroll_boot;
         }
@@ -219,29 +208,15 @@ static int normal_ipcp_enroll(const char *      dst,
                 goto fail_dt_start;
         }
 
-        if (connmgr_alloc(AEID_DT, dst, NULL, &dt_conn)) {
-                log_err("Failed to create a data transfer flow.");
-                goto fail_dt_flow;
-        }
-
         if (start_components()) {
                 log_err("Failed to start components.");
                 goto fail_start_comp;
         }
 
-        if (enroll_components(dt_conn.conn_info.addr)) {
-                enroll_done(&er_conn, -1);
-                log_err("Failed to enroll components.");
-                goto fail_enroll_comp;
-        }
-
-        if (enroll_done(&er_conn, 0))
+        if (enroll_done(&conn, 0))
                 log_warn("Failed to confirm enrollment with peer.");
 
-        if (connmgr_dealloc(AEID_DT, &dt_conn))
-                log_warn("Failed to deallocate data transfer flow.");
-
-        if (connmgr_dealloc(AEID_ENROLL, &er_conn))
+        if (connmgr_dealloc(AEID_ENROLL, &conn))
                 log_warn("Failed to deallocate enrollment flow.");
 
         log_info("Enrolled with %s.", dst);
@@ -251,16 +226,12 @@ static int normal_ipcp_enroll(const char *      dst,
 
         return 0;
 
- fail_enroll_comp:
-        stop_components();
  fail_start_comp:
-        connmgr_dealloc(AEID_DT, &dt_conn);
- fail_dt_flow:
         dt_stop();
  fail_dt_start:
         finalize_components();
  fail_enroll_boot:
-        connmgr_dealloc(AEID_ENROLL, &er_conn);
+        connmgr_dealloc(AEID_ENROLL, &conn);
  fail_er_flow:
         return -1;
 }
diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c
index df0933c6..2823f28e 100644
--- a/src/ipcpd/normal/pol/link_state.c
+++ b/src/ipcpd/normal/pol/link_state.c
@@ -54,7 +54,7 @@ typedef LinkStateMsg link_state_msg_t;
 #define RECALC_TIME    4
 #define LS_UPDATE_TIME 15
 #define LS_TIMEO       60
-#define LSA_MAX_LEN    128
+#define LSM_MAX_LEN    128
 #define LSDB           "lsdb"
 
 #ifndef CLOCK_REALTIME_COARSE
@@ -416,22 +416,22 @@ static void * calculate_pff(void * o)
         return (void *) 0;
 }
 
-static void send_lsa(uint64_t src,
+static void send_lsm(uint64_t src,
                      uint64_t dst)
 {
-        uint8_t            buf[LSA_MAX_LEN];
-        link_state_msg_t   lsa = LINK_STATE_MSG__INIT;
+        uint8_t            buf[LSM_MAX_LEN];
+        link_state_msg_t   lsm = LINK_STATE_MSG__INIT;
         size_t             len;
         struct list_head * p;
 
-        lsa.d_addr = dst;
-        lsa.s_addr = src;
+        lsm.d_addr = dst;
+        lsm.s_addr = src;
 
-        len = link_state_msg__get_packed_size(&lsa);
+        len = link_state_msg__get_packed_size(&lsm);
 
-        assert(len <= LSA_MAX_LEN);
+        assert(len <= LSM_MAX_LEN);
 
-        link_state_msg__pack(&lsa, buf);
+        link_state_msg__pack(&lsm, buf);
 
         list_for_each(p, &ls.nbs) {
                 struct nb * nb = list_entry(p, struct nb, next);
@@ -471,7 +471,7 @@ static void * lsupdate(void * o)
                         }
 
                         if (adj->src == ipcpi.dt_addr) {
-                                send_lsa(adj->src, adj->dst);
+                                send_lsm(adj->src, adj->dst);
                                 adj->stamp = now.tv_sec;
                         }
                 }
@@ -526,7 +526,7 @@ static void * lsreader(void * o)
 {
         fqueue_t * fq;
         int        ret;
-        uint8_t    buf[LSA_MAX_LEN];
+        uint8_t    buf[LSM_MAX_LEN];
         size_t     len;
         int        fd;
         qosspec_t  qs;
@@ -551,7 +551,7 @@ static void * lsreader(void * o)
 
                 while ((fd = fqueue_next(fq)) >= 0) {
                         link_state_msg_t * msg;
-                        len = flow_read(fd, buf, LSA_MAX_LEN);
+                        len = flow_read(fd, buf, LSM_MAX_LEN);
                         if (len <= 0)
                                 continue;
 
@@ -574,13 +574,16 @@ static void * lsreader(void * o)
         return (void *) 0;
 }
 
-static void handle_event(int          event,
+static void handle_event(void *       self,
+                         int          event,
                          const void * o)
 {
         /* FIXME: Apply correct QoS on graph */
         struct conn * c;
         qosspec_t     qs;
 
+        (void) self;
+
         c = (struct conn *) o;
 
         memset(&qs, 0, sizeof(qs));
@@ -592,6 +595,7 @@ static void handle_event(int          event,
 
                 if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, &qs))
                         log_dbg("Failed to add adjacency to LSDB.");
+                send_lsm(ipcpi.dt_addr, c->conn_info.addr);
                 break;
         case NOTIFY_DT_CONN_DEL:
                 if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd))
@@ -666,7 +670,7 @@ int link_state_init(void)
         if (ls.graph == NULL)
                 goto fail_graph;
 
-        if (notifier_reg(handle_event))
+        if (notifier_reg(handle_event, NULL))
                 goto fail_notifier_reg;
 
         if (pthread_rwlock_init(&ls.db_lock, NULL))
-- 
cgit v1.2.3