diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-03-24 14:38:09 +0100 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-03-24 15:08:28 +0100 |
commit | 3d99be39d1c803ff0faa049fe31b9c7b3302890a (patch) | |
tree | 06e88f3109177671bac4fb52dc1eab3d8ec392da /src | |
parent | 25d262cb383039b1d97a5b41ec477d6a40957398 (diff) | |
download | ouroboros-3d99be39d1c803ff0faa049fe31b9c7b3302890a.tar.gz ouroboros-3d99be39d1c803ff0faa049fe31b9c7b3302890a.zip |
ipcpd: Initial synchronisation of the RIB
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/normal/main.c | 4 | ||||
-rw-r--r-- | src/ipcpd/normal/ribmgr.c | 284 |
2 files changed, 264 insertions, 24 deletions
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 00ecaae9..41e0544d 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -202,6 +202,8 @@ static int boot_components(void) void shutdown_components(void) { + ribmgr_fini(); + connmgr_stop(); enroll_stop(); @@ -214,8 +216,6 @@ void shutdown_components(void) dir_fini(); - ribmgr_fini(); - addr_auth_fini(); free(ipcpi.dif_name); diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index ef79f627..2696026a 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -44,35 +44,272 @@ #include <errno.h> #include <assert.h> +#define RIB_SYNC_TIMEOUT 1 + +enum ribmgr_state { + RIBMGR_NULL = 0, + RIBMGR_OPERATIONAL, + RIBMGR_SHUTDOWN +}; + struct { - flow_set_t * fs; - fqueue_t * fq; + struct cdap * cdap; + + pthread_t reader; + pthread_t sync; struct gam * gam; struct nbs * nbs; struct ae * ae; struct nb_notifier nb_notifier; + + pthread_rwlock_t state_lock; + enum ribmgr_state state; } ribmgr; static int ribmgr_neighbor_event(enum nb_event event, struct conn conn) { - /* We are only interested in neighbors being added and removed. */ switch (event) { case NEIGHBOR_ADDED: - flow_set_add(ribmgr.fs, conn.flow_info.fd); + cdap_add_flow(ribmgr.cdap, conn.flow_info.fd); break; case NEIGHBOR_REMOVED: - flow_set_del(ribmgr.fs, conn.flow_info.fd); + cdap_del_flow(ribmgr.cdap, conn.flow_info.fd); break; default: + /* Don't care about other events */ break; } return 0; } +static enum ribmgr_state ribmgr_get_state(void) +{ + enum ribmgr_state state; + + pthread_rwlock_rdlock(&ribmgr.state_lock); + + state = ribmgr.state; + + pthread_rwlock_unlock(&ribmgr.state_lock); + + return state; +} + +static void ribmgr_set_state(enum ribmgr_state state) +{ + pthread_rwlock_wrlock(&ribmgr.state_lock); + + ribmgr.state = state; + + pthread_rwlock_unlock(&ribmgr.state_lock); +} + +static void * reader(void * o) +{ + cdap_key_t key; + enum cdap_opcode oc; + char * name; + uint8_t * data; + size_t len; + ssize_t slen; + uint32_t flags; + uint8_t * buf; + int rval; + + (void) o; + + while (ribmgr_get_state() == RIBMGR_OPERATIONAL) { + key = cdap_request_wait(ribmgr.cdap, &oc, &name, &data, + (size_t *) &len , &flags); + assert(key != -EINVAL); + + if (key == INVALID_CDAP_KEY) { + log_warn("Bad CDAP request."); + continue; + } + + assert(name); + assert(strlen(name)); + + switch (oc) { + case CDAP_READ: + assert(len == 0); + slen = rib_pack(name, &buf, PACK_HASH_ROOT); + if (slen < 0) { + log_err("Failed to pack %s.", name); + cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0); + free(name); + continue; + } + + log_dbg("Packed %s (%zu bytes).", name, slen); + + free(name); + + if (cdap_reply_send(ribmgr.cdap, key, 0, buf, slen)) { + log_err("Failed to send CDAP reply."); + free(buf); + continue; + } + + free(buf); + break; + case CDAP_WRITE: + assert(len); + assert(data); + + rval = rib_unpack(data, len, 0); + switch(rval) { + case 0: + break; + case -EFAULT: + log_warn("Hash mismatch, not in sync."); + break; + default: + log_warn("Error unpacking %s.", name); + cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0); + free(name); + free(data); + continue; + } + + free(name); + + if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) { + log_err("Failed to send CDAP reply."); + continue; + } + break; + case CDAP_CREATE: + assert(len); + assert(data); + + rval = rib_unpack(data, len, UNPACK_CREATE); + switch(rval) { + case 0: + break; + case -EFAULT: + log_warn("Hash mismatch, not yet in sync."); + break; + default: + log_warn("Error unpacking %s.", name); + cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0); + free(name); + free(data); + continue; + } + + free(name); + + if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) { + log_err("Failed to send CDAP reply."); + continue; + } + break; + case CDAP_DELETE: + assert(len == 0); + if (rib_del(name)) { + log_warn("Failed deleting %s.", name); + cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0); + } + + free(name); + + if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) { + log_err("Failed to send CDAP reply."); + continue; + } + + break; + case CDAP_START: + case CDAP_STOP: + log_warn("Unsupported CDAP command."); + if (len) + free(data); + break; + default: + log_err("Bad CDAP command."); + if (len) + free(data); + break; + } + } + + return (void *) 0; +} + +char path[RIB_MAX_PATH_LEN + 1]; + +static void path_reset(void) { + path[strlen(RIB_ROOT)] = '\0'; + assert(strcmp(path, RIB_ROOT) == 0); +} + +static int ribmgr_sync(const char * path) +{ + uint8_t * buf; + ssize_t len; + cdap_key_t * keys; + + len = rib_pack(path, &buf, PACK_HASH_ALL); + if (len < 0) { + log_warn("Failed to pack %s.", path); + return -1; + } + + keys = cdap_request_send(ribmgr.cdap, CDAP_CREATE, path, buf, len, 0); + if (keys != NULL) { + cdap_key_t * key = keys; + while (*key != INVALID_CDAP_KEY) + cdap_reply_wait(ribmgr.cdap, *(key++), NULL, NULL); + free(keys); + } + + free(buf); + + return 0; +} + +/* FIXME: Temporary thread, syncs rib with neighbors every second */ +static void * sync_rib(void *o) +{ + char ** children; + ssize_t ch; + + (void) o; + + strcpy(path, RIB_ROOT); + + while (ribmgr_get_state() == RIBMGR_OPERATIONAL) { + sleep(RIB_SYNC_TIMEOUT); + + ch = rib_children(RIB_ROOT, &children); + if (ch <= 0) + continue; + + while (ch > 0) { + path_reset(); + + rib_path_append(path, children[--ch]); + free(children[ch]); + + /* Only sync fsdb, members and directory */ + if (strcmp(path, MEMBERS_PATH) == 0 + || strcmp(path, DIR_PATH) == 0 + || strcmp(path, ROUTING_PATH) == 0) + ribmgr_sync(path); + } + + free(children); + } + + return (void *) 0; +} + int ribmgr_init(void) { enum pol_gam pg; @@ -114,19 +351,9 @@ int ribmgr_init(void) return -1; } - ribmgr.fs = flow_set_create(); - if (ribmgr.fs == NULL) { - log_err("Failed to create flow set."); - gam_destroy(ribmgr.gam); - connmgr_ae_destroy(ribmgr.ae); - nbs_destroy(ribmgr.nbs); - return -1; - } - - ribmgr.fq = fqueue_create(); - if (ribmgr.fq == NULL) { - log_err("Failed to create fq."); - flow_set_destroy(ribmgr.fs); + ribmgr.cdap = cdap_create(); + if (ribmgr.cdap == NULL) { + log_err("Failed to create CDAP instance."); gam_destroy(ribmgr.gam); connmgr_ae_destroy(ribmgr.ae); nbs_destroy(ribmgr.nbs); @@ -136,22 +363,35 @@ int ribmgr_init(void) ribmgr.nb_notifier.notify_call = ribmgr_neighbor_event; if (nbs_reg_notifier(ribmgr.nbs, &ribmgr.nb_notifier)) { log_err("Failed to register notifier."); - fqueue_destroy(ribmgr.fq); - flow_set_destroy(ribmgr.fs); + cdap_destroy(ribmgr.cdap); gam_destroy(ribmgr.gam); connmgr_ae_destroy(ribmgr.ae); nbs_destroy(ribmgr.nbs); return -1; } + pthread_rwlock_init(&ribmgr.state_lock, NULL); + + ribmgr.state = RIBMGR_OPERATIONAL; + + pthread_create(&ribmgr.sync, NULL, sync_rib, NULL); + + pthread_create(&ribmgr.reader, NULL, reader, NULL); + return 0; } void ribmgr_fini(void) { + ribmgr_set_state(RIBMGR_SHUTDOWN); + + pthread_cancel(ribmgr.reader); + + pthread_join(ribmgr.reader, NULL); + pthread_join(ribmgr.sync, NULL); + nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier); - flow_set_destroy(ribmgr.fs); - fqueue_destroy(ribmgr.fq); + cdap_destroy(ribmgr.cdap); gam_destroy(ribmgr.gam); connmgr_ae_destroy(ribmgr.ae); nbs_destroy(ribmgr.nbs); |