summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/ribmgr.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-03-24 14:38:09 +0100
committerdimitri staessens <dimitri.staessens@ugent.be>2017-03-24 15:08:28 +0100
commit3d99be39d1c803ff0faa049fe31b9c7b3302890a (patch)
tree06e88f3109177671bac4fb52dc1eab3d8ec392da /src/ipcpd/normal/ribmgr.c
parent25d262cb383039b1d97a5b41ec477d6a40957398 (diff)
downloadouroboros-3d99be39d1c803ff0faa049fe31b9c7b3302890a.tar.gz
ouroboros-3d99be39d1c803ff0faa049fe31b9c7b3302890a.zip
ipcpd: Initial synchronisation of the RIB
Diffstat (limited to 'src/ipcpd/normal/ribmgr.c')
-rw-r--r--src/ipcpd/normal/ribmgr.c284
1 files changed, 262 insertions, 22 deletions
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index ef79f627..2696026a 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -44,35 +44,272 @@
#include <errno.h>
#include <assert.h>
+#define RIB_SYNC_TIMEOUT 1
+
+enum ribmgr_state {
+ RIBMGR_NULL = 0,
+ RIBMGR_OPERATIONAL,
+ RIBMGR_SHUTDOWN
+};
+
struct {
- flow_set_t * fs;
- fqueue_t * fq;
+ struct cdap * cdap;
+
+ pthread_t reader;
+ pthread_t sync;
struct gam * gam;
struct nbs * nbs;
struct ae * ae;
struct nb_notifier nb_notifier;
+
+ pthread_rwlock_t state_lock;
+ enum ribmgr_state state;
} ribmgr;
static int ribmgr_neighbor_event(enum nb_event event,
struct conn conn)
{
- /* We are only interested in neighbors being added and removed. */
switch (event) {
case NEIGHBOR_ADDED:
- flow_set_add(ribmgr.fs, conn.flow_info.fd);
+ cdap_add_flow(ribmgr.cdap, conn.flow_info.fd);
break;
case NEIGHBOR_REMOVED:
- flow_set_del(ribmgr.fs, conn.flow_info.fd);
+ cdap_del_flow(ribmgr.cdap, conn.flow_info.fd);
break;
default:
+ /* Don't care about other events */
break;
}
return 0;
}
+static enum ribmgr_state ribmgr_get_state(void)
+{
+ enum ribmgr_state state;
+
+ pthread_rwlock_rdlock(&ribmgr.state_lock);
+
+ state = ribmgr.state;
+
+ pthread_rwlock_unlock(&ribmgr.state_lock);
+
+ return state;
+}
+
+static void ribmgr_set_state(enum ribmgr_state state)
+{
+ pthread_rwlock_wrlock(&ribmgr.state_lock);
+
+ ribmgr.state = state;
+
+ pthread_rwlock_unlock(&ribmgr.state_lock);
+}
+
+static void * reader(void * o)
+{
+ cdap_key_t key;
+ enum cdap_opcode oc;
+ char * name;
+ uint8_t * data;
+ size_t len;
+ ssize_t slen;
+ uint32_t flags;
+ uint8_t * buf;
+ int rval;
+
+ (void) o;
+
+ while (ribmgr_get_state() == RIBMGR_OPERATIONAL) {
+ key = cdap_request_wait(ribmgr.cdap, &oc, &name, &data,
+ (size_t *) &len , &flags);
+ assert(key != -EINVAL);
+
+ if (key == INVALID_CDAP_KEY) {
+ log_warn("Bad CDAP request.");
+ continue;
+ }
+
+ assert(name);
+ assert(strlen(name));
+
+ switch (oc) {
+ case CDAP_READ:
+ assert(len == 0);
+ slen = rib_pack(name, &buf, PACK_HASH_ROOT);
+ if (slen < 0) {
+ log_err("Failed to pack %s.", name);
+ cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
+ free(name);
+ continue;
+ }
+
+ log_dbg("Packed %s (%zu bytes).", name, slen);
+
+ free(name);
+
+ if (cdap_reply_send(ribmgr.cdap, key, 0, buf, slen)) {
+ log_err("Failed to send CDAP reply.");
+ free(buf);
+ continue;
+ }
+
+ free(buf);
+ break;
+ case CDAP_WRITE:
+ assert(len);
+ assert(data);
+
+ rval = rib_unpack(data, len, 0);
+ switch(rval) {
+ case 0:
+ break;
+ case -EFAULT:
+ log_warn("Hash mismatch, not in sync.");
+ break;
+ default:
+ log_warn("Error unpacking %s.", name);
+ cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
+ free(name);
+ free(data);
+ continue;
+ }
+
+ free(name);
+
+ if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
+ log_err("Failed to send CDAP reply.");
+ continue;
+ }
+ break;
+ case CDAP_CREATE:
+ assert(len);
+ assert(data);
+
+ rval = rib_unpack(data, len, UNPACK_CREATE);
+ switch(rval) {
+ case 0:
+ break;
+ case -EFAULT:
+ log_warn("Hash mismatch, not yet in sync.");
+ break;
+ default:
+ log_warn("Error unpacking %s.", name);
+ cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
+ free(name);
+ free(data);
+ continue;
+ }
+
+ free(name);
+
+ if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
+ log_err("Failed to send CDAP reply.");
+ continue;
+ }
+ break;
+ case CDAP_DELETE:
+ assert(len == 0);
+ if (rib_del(name)) {
+ log_warn("Failed deleting %s.", name);
+ cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
+ }
+
+ free(name);
+
+ if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
+ log_err("Failed to send CDAP reply.");
+ continue;
+ }
+
+ break;
+ case CDAP_START:
+ case CDAP_STOP:
+ log_warn("Unsupported CDAP command.");
+ if (len)
+ free(data);
+ break;
+ default:
+ log_err("Bad CDAP command.");
+ if (len)
+ free(data);
+ break;
+ }
+ }
+
+ return (void *) 0;
+}
+
+char path[RIB_MAX_PATH_LEN + 1];
+
+static void path_reset(void) {
+ path[strlen(RIB_ROOT)] = '\0';
+ assert(strcmp(path, RIB_ROOT) == 0);
+}
+
+static int ribmgr_sync(const char * path)
+{
+ uint8_t * buf;
+ ssize_t len;
+ cdap_key_t * keys;
+
+ len = rib_pack(path, &buf, PACK_HASH_ALL);
+ if (len < 0) {
+ log_warn("Failed to pack %s.", path);
+ return -1;
+ }
+
+ keys = cdap_request_send(ribmgr.cdap, CDAP_CREATE, path, buf, len, 0);
+ if (keys != NULL) {
+ cdap_key_t * key = keys;
+ while (*key != INVALID_CDAP_KEY)
+ cdap_reply_wait(ribmgr.cdap, *(key++), NULL, NULL);
+ free(keys);
+ }
+
+ free(buf);
+
+ return 0;
+}
+
+/* FIXME: Temporary thread, syncs rib with neighbors every second */
+static void * sync_rib(void *o)
+{
+ char ** children;
+ ssize_t ch;
+
+ (void) o;
+
+ strcpy(path, RIB_ROOT);
+
+ while (ribmgr_get_state() == RIBMGR_OPERATIONAL) {
+ sleep(RIB_SYNC_TIMEOUT);
+
+ ch = rib_children(RIB_ROOT, &children);
+ if (ch <= 0)
+ continue;
+
+ while (ch > 0) {
+ path_reset();
+
+ rib_path_append(path, children[--ch]);
+ free(children[ch]);
+
+ /* Only sync fsdb, members and directory */
+ if (strcmp(path, MEMBERS_PATH) == 0
+ || strcmp(path, DIR_PATH) == 0
+ || strcmp(path, ROUTING_PATH) == 0)
+ ribmgr_sync(path);
+ }
+
+ free(children);
+ }
+
+ return (void *) 0;
+}
+
int ribmgr_init(void)
{
enum pol_gam pg;
@@ -114,19 +351,9 @@ int ribmgr_init(void)
return -1;
}
- ribmgr.fs = flow_set_create();
- if (ribmgr.fs == NULL) {
- log_err("Failed to create flow set.");
- gam_destroy(ribmgr.gam);
- connmgr_ae_destroy(ribmgr.ae);
- nbs_destroy(ribmgr.nbs);
- return -1;
- }
-
- ribmgr.fq = fqueue_create();
- if (ribmgr.fq == NULL) {
- log_err("Failed to create fq.");
- flow_set_destroy(ribmgr.fs);
+ ribmgr.cdap = cdap_create();
+ if (ribmgr.cdap == NULL) {
+ log_err("Failed to create CDAP instance.");
gam_destroy(ribmgr.gam);
connmgr_ae_destroy(ribmgr.ae);
nbs_destroy(ribmgr.nbs);
@@ -136,22 +363,35 @@ int ribmgr_init(void)
ribmgr.nb_notifier.notify_call = ribmgr_neighbor_event;
if (nbs_reg_notifier(ribmgr.nbs, &ribmgr.nb_notifier)) {
log_err("Failed to register notifier.");
- fqueue_destroy(ribmgr.fq);
- flow_set_destroy(ribmgr.fs);
+ cdap_destroy(ribmgr.cdap);
gam_destroy(ribmgr.gam);
connmgr_ae_destroy(ribmgr.ae);
nbs_destroy(ribmgr.nbs);
return -1;
}
+ pthread_rwlock_init(&ribmgr.state_lock, NULL);
+
+ ribmgr.state = RIBMGR_OPERATIONAL;
+
+ pthread_create(&ribmgr.sync, NULL, sync_rib, NULL);
+
+ pthread_create(&ribmgr.reader, NULL, reader, NULL);
+
return 0;
}
void ribmgr_fini(void)
{
+ ribmgr_set_state(RIBMGR_SHUTDOWN);
+
+ pthread_cancel(ribmgr.reader);
+
+ pthread_join(ribmgr.reader, NULL);
+ pthread_join(ribmgr.sync, NULL);
+
nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier);
- flow_set_destroy(ribmgr.fs);
- fqueue_destroy(ribmgr.fq);
+ cdap_destroy(ribmgr.cdap);
gam_destroy(ribmgr.gam);
connmgr_ae_destroy(ribmgr.ae);
nbs_destroy(ribmgr.nbs);