From 3d99be39d1c803ff0faa049fe31b9c7b3302890a Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@ugent.be>
Date: Fri, 24 Mar 2017 14:38:09 +0100
Subject: ipcpd: Initial synchronisation of the RIB

---
 src/ipcpd/normal/main.c   |   4 +-
 src/ipcpd/normal/ribmgr.c | 284 ++++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 264 insertions(+), 24 deletions(-)

(limited to 'src/ipcpd')

diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 00ecaae9..41e0544d 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -202,6 +202,8 @@ static int boot_components(void)
 
 void shutdown_components(void)
 {
+        ribmgr_fini();
+
         connmgr_stop();
 
         enroll_stop();
@@ -214,8 +216,6 @@ void shutdown_components(void)
 
         dir_fini();
 
-        ribmgr_fini();
-
         addr_auth_fini();
 
         free(ipcpi.dif_name);
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index ef79f627..2696026a 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -44,35 +44,272 @@
 #include <errno.h>
 #include <assert.h>
 
+#define RIB_SYNC_TIMEOUT 1
+
+enum ribmgr_state {
+        RIBMGR_NULL = 0,
+        RIBMGR_OPERATIONAL,
+        RIBMGR_SHUTDOWN
+};
+
 struct {
-        flow_set_t *       fs;
-        fqueue_t *         fq;
+        struct cdap *      cdap;
+
+        pthread_t          reader;
+        pthread_t          sync;
 
         struct gam *       gam;
         struct nbs *       nbs;
         struct ae *        ae;
 
         struct nb_notifier nb_notifier;
+
+        pthread_rwlock_t   state_lock;
+        enum ribmgr_state  state;
 } ribmgr;
 
 static int ribmgr_neighbor_event(enum nb_event event,
                                  struct conn   conn)
 {
-        /* We are only interested in neighbors being added and removed. */
         switch (event) {
         case NEIGHBOR_ADDED:
-                flow_set_add(ribmgr.fs, conn.flow_info.fd);
+                cdap_add_flow(ribmgr.cdap, conn.flow_info.fd);
                 break;
         case NEIGHBOR_REMOVED:
-                flow_set_del(ribmgr.fs, conn.flow_info.fd);
+                cdap_del_flow(ribmgr.cdap, conn.flow_info.fd);
                 break;
         default:
+                /* Don't care about other events */
                 break;
         }
 
         return 0;
 }
 
+static enum ribmgr_state ribmgr_get_state(void)
+{
+        enum ribmgr_state state;
+
+        pthread_rwlock_rdlock(&ribmgr.state_lock);
+
+        state = ribmgr.state;
+
+        pthread_rwlock_unlock(&ribmgr.state_lock);
+
+        return state;
+}
+
+static void ribmgr_set_state(enum ribmgr_state state)
+{
+        pthread_rwlock_wrlock(&ribmgr.state_lock);
+
+        ribmgr.state = state;
+
+        pthread_rwlock_unlock(&ribmgr.state_lock);
+}
+
+static void * reader(void * o)
+{
+        cdap_key_t       key;
+        enum cdap_opcode oc;
+        char *           name;
+        uint8_t *        data;
+        size_t           len;
+        ssize_t          slen;
+        uint32_t         flags;
+        uint8_t *        buf;
+        int              rval;
+
+        (void) o;
+
+        while (ribmgr_get_state() == RIBMGR_OPERATIONAL) {
+                key = cdap_request_wait(ribmgr.cdap, &oc, &name, &data,
+                                        (size_t *) &len , &flags);
+                assert(key != -EINVAL);
+
+                if (key == INVALID_CDAP_KEY) {
+                        log_warn("Bad CDAP request.");
+                        continue;
+                }
+
+                assert(name);
+                assert(strlen(name));
+
+                switch (oc) {
+                case CDAP_READ:
+                        assert(len == 0);
+                        slen = rib_pack(name, &buf, PACK_HASH_ROOT);
+                        if (slen < 0) {
+                                log_err("Failed to pack %s.", name);
+                                cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
+                                free(name);
+                                continue;
+                        }
+
+                        log_dbg("Packed %s (%zu bytes).", name, slen);
+
+                        free(name);
+
+                        if (cdap_reply_send(ribmgr.cdap, key, 0, buf, slen)) {
+                                log_err("Failed to send CDAP reply.");
+                                free(buf);
+                                continue;
+                        }
+
+                        free(buf);
+                        break;
+                case CDAP_WRITE:
+                        assert(len);
+                        assert(data);
+
+                        rval = rib_unpack(data, len, 0);
+                        switch(rval) {
+                        case 0:
+                                break;
+                        case -EFAULT:
+                                log_warn("Hash mismatch, not in sync.");
+                                break;
+                        default:
+                                log_warn("Error unpacking %s.", name);
+                                cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
+                                free(name);
+                                free(data);
+                                continue;
+                        }
+
+                        free(name);
+
+                        if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
+                                log_err("Failed to send CDAP reply.");
+                                continue;
+                        }
+                        break;
+                case CDAP_CREATE:
+                        assert(len);
+                        assert(data);
+
+                        rval = rib_unpack(data, len, UNPACK_CREATE);
+                        switch(rval) {
+                        case 0:
+                                break;
+                        case -EFAULT:
+                                log_warn("Hash mismatch, not yet in sync.");
+                                break;
+                        default:
+                                log_warn("Error unpacking %s.", name);
+                                cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
+                                free(name);
+                                free(data);
+                                continue;
+                        }
+
+                        free(name);
+
+                        if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
+                                log_err("Failed to send CDAP reply.");
+                                continue;
+                        }
+                        break;
+                case CDAP_DELETE:
+                        assert(len == 0);
+                        if (rib_del(name)) {
+                                log_warn("Failed deleting %s.", name);
+                                cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
+                        }
+
+                        free(name);
+
+                        if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
+                                log_err("Failed to send CDAP reply.");
+                                continue;
+                        }
+
+                        break;
+                case CDAP_START:
+                case CDAP_STOP:
+                        log_warn("Unsupported CDAP command.");
+                        if (len)
+                                free(data);
+                        break;
+                default:
+                        log_err("Bad CDAP command.");
+                        if (len)
+                                free(data);
+                        break;
+                }
+        }
+
+        return (void *) 0;
+}
+
+char path[RIB_MAX_PATH_LEN + 1];
+
+static void path_reset(void) {
+        path[strlen(RIB_ROOT)] = '\0';
+        assert(strcmp(path, RIB_ROOT) == 0);
+}
+
+static int ribmgr_sync(const char * path)
+{
+        uint8_t *    buf;
+        ssize_t      len;
+        cdap_key_t * keys;
+
+        len = rib_pack(path, &buf, PACK_HASH_ALL);
+        if (len < 0) {
+                log_warn("Failed to pack %s.", path);
+                return -1;
+        }
+
+        keys = cdap_request_send(ribmgr.cdap, CDAP_CREATE, path, buf, len, 0);
+        if (keys != NULL) {
+                cdap_key_t * key = keys;
+                while (*key != INVALID_CDAP_KEY)
+                        cdap_reply_wait(ribmgr.cdap, *(key++), NULL, NULL);
+                free(keys);
+        }
+
+        free(buf);
+
+        return 0;
+}
+
+/* FIXME: Temporary thread, syncs rib with neighbors every second */
+static void * sync_rib(void *o)
+{
+        char ** children;
+        ssize_t ch;
+
+        (void) o;
+
+        strcpy(path, RIB_ROOT);
+
+        while (ribmgr_get_state() == RIBMGR_OPERATIONAL) {
+                sleep(RIB_SYNC_TIMEOUT);
+
+                ch = rib_children(RIB_ROOT, &children);
+                if (ch <= 0)
+                        continue;
+
+                while (ch > 0) {
+                        path_reset();
+
+                        rib_path_append(path, children[--ch]);
+                        free(children[ch]);
+
+                        /* Only sync fsdb, members and directory */
+                        if (strcmp(path, MEMBERS_PATH) == 0
+                            || strcmp(path, DIR_PATH) == 0
+                            || strcmp(path, ROUTING_PATH) == 0)
+                                ribmgr_sync(path);
+                }
+
+                free(children);
+        }
+
+        return (void *) 0;
+}
+
 int ribmgr_init(void)
 {
         enum pol_gam     pg;
@@ -114,19 +351,9 @@ int ribmgr_init(void)
                 return -1;
         }
 
-        ribmgr.fs = flow_set_create();
-        if (ribmgr.fs == NULL) {
-                log_err("Failed to create flow set.");
-                gam_destroy(ribmgr.gam);
-                connmgr_ae_destroy(ribmgr.ae);
-                nbs_destroy(ribmgr.nbs);
-                return -1;
-        }
-
-        ribmgr.fq = fqueue_create();
-        if (ribmgr.fq == NULL) {
-                log_err("Failed to create fq.");
-                flow_set_destroy(ribmgr.fs);
+        ribmgr.cdap = cdap_create();
+        if (ribmgr.cdap == NULL) {
+                log_err("Failed to create CDAP instance.");
                 gam_destroy(ribmgr.gam);
                 connmgr_ae_destroy(ribmgr.ae);
                 nbs_destroy(ribmgr.nbs);
@@ -136,22 +363,35 @@ int ribmgr_init(void)
         ribmgr.nb_notifier.notify_call = ribmgr_neighbor_event;
         if (nbs_reg_notifier(ribmgr.nbs, &ribmgr.nb_notifier)) {
                 log_err("Failed to register notifier.");
-                fqueue_destroy(ribmgr.fq);
-                flow_set_destroy(ribmgr.fs);
+                cdap_destroy(ribmgr.cdap);
                 gam_destroy(ribmgr.gam);
                 connmgr_ae_destroy(ribmgr.ae);
                 nbs_destroy(ribmgr.nbs);
                 return -1;
         }
 
+        pthread_rwlock_init(&ribmgr.state_lock, NULL);
+
+        ribmgr.state = RIBMGR_OPERATIONAL;
+
+        pthread_create(&ribmgr.sync, NULL, sync_rib, NULL);
+
+        pthread_create(&ribmgr.reader, NULL, reader, NULL);
+
         return 0;
 }
 
 void ribmgr_fini(void)
 {
+        ribmgr_set_state(RIBMGR_SHUTDOWN);
+
+        pthread_cancel(ribmgr.reader);
+
+        pthread_join(ribmgr.reader, NULL);
+        pthread_join(ribmgr.sync, NULL);
+
         nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier);
-        flow_set_destroy(ribmgr.fs);
-        fqueue_destroy(ribmgr.fq);
+        cdap_destroy(ribmgr.cdap);
         gam_destroy(ribmgr.gam);
         connmgr_ae_destroy(ribmgr.ae);
         nbs_destroy(ribmgr.nbs);
-- 
cgit v1.2.3