diff options
Diffstat (limited to 'src/ipcpd/normal/ribmgr.c')
-rw-r--r-- | src/ipcpd/normal/ribmgr.c | 1663 |
1 files changed, 48 insertions, 1615 deletions
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 993fe62a..4ff316dc 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -27,1670 +27,103 @@ #include <ouroboros/list.h> #include <ouroboros/time_utils.h> #include <ouroboros/ipcp-dev.h> -#include <ouroboros/bitmap.h> #include <ouroboros/errno.h> #include <ouroboros/dev.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/rib.h> #include "timerwheel.h" #include "addr_auth.h" #include "ribmgr.h" -#include "dt_const.h" -#include "ro.h" -#include "pathname.h" -#include "dir.h" +#include "gam.h" #include "ae.h" #include <stdlib.h> #include <pthread.h> #include <string.h> #include <errno.h> +#include <assert.h> -#include "static_info.pb-c.h" -typedef StaticInfoMsg static_info_msg_t; +#define BOOT_PATH "/" BOOT_NAME -#include "ro.pb-c.h" -typedef RoMsg ro_msg_t; - -#define SUBS_SIZE 25 -#define WHEEL_RESOLUTION 1000 /* ms */ -#define WHEEL_DELAY 3600000 /* ms */ -#define RO_ID_TIMEOUT 1000 /* ms */ - -#define ENROLLMENT "enrollment" - -#define RIBMGR_PREFIX PATH_DELIMITER "ribmgr" -#define STAT_INFO PATH_DELIMITER "statinfo" - -/* RIB objects */ -struct rnode { - char * name; - char * full_name; - uint64_t seqno; - - /* - * NOTE: Naive implementation for now, could be replaced by - * for instance taking a hash of the pathname and using that - * as an index in a B-tree - */ - - /* If there are no children, this is a leaf. */ - struct rnode * child; - struct rnode * sibling; - - struct ro_attr attr; - uint8_t * data; - size_t len; -}; - -struct mgmt_flow { - struct list_head next; - - struct cdap * instance; - int fd; - - pthread_t handler; -}; - -struct ro_sub { - struct list_head next; - - int sid; - - char * name; - struct ro_sub_ops * ops; -}; - -struct ro_id { - struct list_head next; - - uint64_t seqno; - char * full_name; -}; - -enum ribmgr_state { - RIBMGR_NULL, - RIBMGR_INIT, - RIBMGR_OPERATIONAL, - RIBMGR_SHUTDOWN -}; - -/* FIXME: Extract rib from ribmgr. */ struct { - struct rnode * root; - pthread_mutex_t ro_lock; - - struct list_head subs; - struct bmp * sids; - pthread_mutex_t subs_lock; - int ribmgr_sid; - - struct dt_const dtc; - - uint64_t address; - - struct timerwheel * wheel; - - struct list_head ro_ids; - pthread_mutex_t ro_ids_lock; - - struct list_head flows; - pthread_rwlock_t flows_lock; - - struct addr_auth * addr_auth; - enum pol_addr_auth addr_auth_type; - - enum pol_gam dt_gam_type; - - enum ribmgr_state state; - pthread_cond_t state_cond; - pthread_mutex_t state_lock; -} rib; - -void ribmgr_ro_created(const char * name, - uint8_t * data, - size_t len) -{ - static_info_msg_t * stat_msg; - - if (strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { - LOG_DBG("Received static DIF information."); - - stat_msg = static_info_msg__unpack(NULL, len, data); - if (stat_msg == NULL) { - LOG_ERR("Failed to unpack static info message."); - return; - } - - rib.dtc.addr_size = stat_msg->addr_size; - rib.dtc.cep_id_size = stat_msg->cep_id_size; - rib.dtc.pdu_length_size = stat_msg->pdu_length_size; - rib.dtc.seqno_size = stat_msg->seqno_size; - rib.dtc.has_ttl = stat_msg->has_ttl; - rib.dtc.has_chk = stat_msg->has_chk; - rib.dtc.min_pdu_size = stat_msg->min_pdu_size; - rib.dtc.max_pdu_size = stat_msg->max_pdu_size; - rib.addr_auth_type = stat_msg->addr_auth_type; - rib.dt_gam_type = stat_msg->dt_gam_type; - - static_info_msg__free_unpacked(stat_msg, NULL); - } -} - -/* We only have a create operation for now. */ -static struct ro_sub_ops ribmgr_sub_ops = { - .ro_created = ribmgr_ro_created, - .ro_updated = NULL, - .ro_deleted = NULL -}; - -static struct rnode * find_rnode_by_name(const char * name) -{ - char * str; - char * str1; - char * token; - struct rnode * node; - - str = strdup(name); - if (str == NULL) - return NULL; - - node = rib.root; - - for (str1 = str; node != NULL; str1 = NULL) { - token = strtok(str1, PATH_DELIMITER); - if (token == NULL) - break; - - node = node->child; - - while (node != NULL) - if (strcmp(node->name, token) == 0) - break; - else - node = node->sibling; - } - - free(str); - return node; -} - -/* Call under RIB object lock. */ -static int ro_msg_create(struct rnode * node, - ro_msg_t * msg) -{ - msg->address = rib.address; - msg->seqno = node->seqno; - msg->recv_set = node->attr.recv_set; - msg->enrol_sync = node->attr.enrol_sync; - msg->sec = node->attr.expiry.tv_sec; - msg->nsec = node->attr.expiry.tv_nsec; - msg->value.data = node->data; - msg->value.len = node->len; - - return 0; -} - -static int ribmgr_ro_delete(const char * name) -{ - char * str; - char * str1; - char * saveptr; - char * token; - struct rnode * node; - struct rnode * prev; - bool sibling = false; - - str = strdup(name); - if (str == NULL) - return -1; - - node = rib.root; - prev = NULL; - - for (str1 = str; ; str1 = NULL) { - token = strtok_r(str1, PATH_DELIMITER, &saveptr); - if (token == NULL) - break; - - prev = node; - node = node->child; - sibling = false; - - while (node != NULL) { - if (strcmp(node->name, token) == 0) { - break; - } else { - prev = node; - node = node->sibling; - sibling = true; - } - } - - if (node == NULL) { - free(str); - return -1; - } - } - - if (node == rib.root) { - LOG_ERR("Won't remove root."); - free(str); - return -1; - } - - free(node->name); - free(node->full_name); - if (node->data != NULL) - free(node->data); - - if (sibling) - prev->sibling = node->sibling; - else - prev->child = node->sibling; - - free(node); - free(str); - - LOG_DBG("Deleted RO with name %s.", name); - - return 0; -} - -static void ro_delete_timer(void * o) -{ - char * name = (char *) o; - - pthread_mutex_lock(&rib.ro_lock); - - if (ribmgr_ro_delete(name)) - LOG_ERR("Failed to delete %s.", name); - - pthread_mutex_unlock(&rib.ro_lock); -} - -static struct rnode * ribmgr_ro_create(const char * name, - struct ro_attr attr, - uint8_t * data, - size_t len) -{ - char * str; - char * str1; - char * saveptr = NULL; - char * token = NULL; - char * token2; - struct rnode * node = NULL; - struct rnode * new = NULL; - struct rnode * prev = NULL; - bool sibling = false; - int timeout; - - str = strdup(name); - if (str == NULL) - return NULL; - - node = rib.root; - - assert(node); - - for (str1 = str; node != NULL; str1 = NULL) { - token = strtok_r(str1, PATH_DELIMITER, &saveptr); - if (token == NULL) { - LOG_ERR("RO already exists."); - free(str); - return NULL; - } - - prev = node; - node = node->child; - sibling = false; - - /* Search horizontally. */ - while (node != NULL) { - if (strcmp(node->name, token) == 0) { - break; - } else { - prev = node; - node = node->sibling; - sibling = true; - } - } - } - - assert(token); - assert(prev); - - token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr); - if (token2 != NULL) { - LOG_ERR("Part of the pathname does not exist."); - free(str); - return NULL; - } - - new = malloc(sizeof(*new)); - if (new == NULL) { - free(str); - return NULL; - } - - new->name = strdup(token); - if (new->name == NULL) { - free(str); - free(new); - return NULL; - } - - free(str); - - new->full_name = strdup(name); - if (new->full_name == NULL) { - free(new); - return NULL; - } - - new->seqno = 0; - new->attr = attr; - - if (sibling) - prev->sibling = new; - else - prev->child = new; - - new->data = data; - new->len = len; - new->child = NULL; - new->sibling = NULL; - - LOG_DBG("Created RO with name %s.", name); - - if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) { - timeout = attr.expiry.tv_sec * 1000 + - attr.expiry.tv_nsec / MILLION; - if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name, - strlen(new->full_name) + 1, timeout)) - LOG_ERR("Failed to add deletion timer of RO."); - } - - return new; -} - -static struct rnode * ribmgr_ro_write(const char * name, - uint8_t * data, - size_t len) -{ - struct rnode * node; - - node = find_rnode_by_name(name); - if (node == NULL) - return NULL; - - free(node->data); - - node->data = data; - node->len = len; - - LOG_DBG("Updated RO with name %s.", name); - - return node; -} - -static int write_ro_msg(struct cdap * neighbor, - ro_msg_t * msg, - char * name, - enum cdap_opcode code) -{ - uint8_t * data; - size_t len; - cdap_key_t key; - int ret; - - len = ro_msg__get_packed_size(msg); - if (len == 0) - return -1; - - data = malloc(len); - if (data == NULL) - return -ENOMEM; - - ro_msg__pack(msg, data); - - key = cdap_request_send(neighbor, code, name, data, len, 0); - if (key < 0) { - LOG_ERR("Failed to send CDAP request."); - free(data); - return -1; - } - - free(data); - - ret = cdap_reply_wait(neighbor, key, NULL, NULL); - if (ret < 0) { - LOG_ERR("CDAP command with code %d and name %s failed: %d.", - code, name, ret); - return -1; - } - - return 0; -} - -int ribmgr_init() -{ - list_head_init(&rib.flows); - list_head_init(&rib.subs); - list_head_init(&rib.ro_ids); - - rib.root = malloc(sizeof(*(rib.root))); - if (rib.root == NULL) - return -1; - - rib.root->name = "root"; - rib.root->child = NULL; - rib.root->sibling = NULL; - - if (pthread_rwlock_init(&rib.flows_lock, NULL)) { - LOG_ERR("Failed to initialize rwlock."); - free(rib.root); - return -1; - } - - if (pthread_mutex_init(&rib.ro_lock, NULL)) { - LOG_ERR("Failed to initialize mutex."); - pthread_rwlock_destroy(&rib.flows_lock); - free(rib.root); - return -1; - } - - if (pthread_mutex_init(&rib.subs_lock, NULL)) { - LOG_ERR("Failed to initialize mutex."); - pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.ro_lock); - free(rib.root); - return -1; - } - - if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) { - LOG_ERR("Failed to initialize mutex."); - pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.ro_lock); - pthread_mutex_destroy(&rib.subs_lock); - free(rib.root); - return -1; - } - - rib.sids = bmp_create(SUBS_SIZE, 0); - if (rib.sids == NULL) { - LOG_ERR("Failed to create bitmap."); - pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.ro_lock); - pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.ro_ids_lock); - free(rib.root); - return -1; - } - - rib.wheel = timerwheel_create(WHEEL_RESOLUTION, WHEEL_DELAY); - if (rib.wheel == NULL) { - LOG_ERR("Failed to create timerwheel."); - bmp_destroy(rib.sids); - pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.ro_lock); - pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.ro_ids_lock); - free(rib.root); - return -1; - } - - rib.ribmgr_sid = ro_subscribe(RIBMGR_PREFIX, &ribmgr_sub_ops); - if (rib.ribmgr_sid < 0) { - LOG_ERR("Failed to subscribe."); - timerwheel_destroy(rib.wheel); - bmp_destroy(rib.sids); - pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.ro_lock); - pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.ro_ids_lock); - free(rib.root); - return -1; - } - - if (pthread_cond_init(&rib.state_cond, NULL)) { - LOG_ERR("Failed to init condvar."); - timerwheel_destroy(rib.wheel); - bmp_destroy(rib.sids); - pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.ro_lock); - pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.ro_ids_lock); - free(rib.root); - return -1; - } - - if (pthread_mutex_init(&rib.state_lock, NULL)) { - LOG_ERR("Failed to init mutex."); - pthread_cond_destroy(&rib.state_cond); - timerwheel_destroy(rib.wheel); - bmp_destroy(rib.sids); - pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.ro_lock); - pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.ro_ids_lock); - free(rib.root); - return -1; - } - - rib.state = RIBMGR_INIT; - - return 0; -} - -static enum ribmgr_state ribmgr_get_state(void) -{ - enum ribmgr_state state; - - pthread_mutex_lock(&rib.state_lock); - - state = rib.state; - - pthread_mutex_unlock(&rib.state_lock); - - return state; -} - -static void ribmgr_set_state(enum ribmgr_state state) -{ - pthread_mutex_lock(&rib.state_lock); - - rib.state = state; - - pthread_cond_broadcast(&rib.state_cond); - - pthread_mutex_unlock(&rib.state_lock); -} - -static int ribmgr_wait_state(enum ribmgr_state state, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret = 0; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - - pthread_mutex_lock(&rib.state_lock); - - while (rib.state != state - && rib.state != RIBMGR_SHUTDOWN - && rib.state != RIBMGR_NULL) { - if (timeout == NULL) - ret = -pthread_cond_wait(&rib.state_cond, - &rib.state_lock); - else - ret = -pthread_cond_timedwait(&rib.state_cond, - &rib.state_lock, - &abstime); - } - - pthread_mutex_unlock(&rib.state_lock); - - return ret; -} - -static void rtree_destroy(struct rnode * node) -{ - if (node != NULL) { - rtree_destroy(node->child); - rtree_destroy(node->sibling); - free(node->name); - if (node->data != NULL) - free(node->data); - free(node); - } -} - -int ribmgr_fini() -{ - struct list_head * pos = NULL; - struct list_head * n = NULL; - - pthread_mutex_lock(&rib.state_lock); - rib.state = RIBMGR_SHUTDOWN; - pthread_cond_broadcast(&rib.state_cond); - pthread_mutex_unlock(&rib.state_lock); - - pthread_rwlock_wrlock(&rib.flows_lock); - - list_for_each_safe(pos, n, &rib.flows) { - struct mgmt_flow * flow = - list_entry(pos, struct mgmt_flow, next); - if (cdap_destroy(flow->instance)) - LOG_ERR("Failed to destroy CDAP instance."); - list_del(&flow->next); - free(flow); - } - - pthread_rwlock_unlock(&rib.flows_lock); - - ro_unsubscribe(rib.ribmgr_sid); - - if (rib.addr_auth != NULL) - addr_auth_destroy(rib.addr_auth); - - pthread_mutex_lock(&rib.ro_lock); - - rtree_destroy(rib.root->child); - free(rib.root); - - pthread_mutex_unlock(&rib.ro_lock); - - bmp_destroy(rib.sids); - timerwheel_destroy(rib.wheel); - - pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.ro_lock); - pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.ro_ids_lock); - - pthread_cond_destroy(&rib.state_cond); - pthread_mutex_destroy(&rib.state_lock); - - return 0; -} - -static int ribmgr_cdap_create(struct cdap * instance, - cdap_key_t key, - char * name, - ro_msg_t * msg) -{ - int ret = 0; - struct list_head * p = NULL; - size_t len_s, len_n; - uint8_t * ro_data; - struct ro_attr attr; - struct rnode * node; - - assert(instance); - - ro_attr_init(&attr); - attr.expiry.tv_sec = msg->sec; - attr.expiry.tv_nsec = msg->nsec; - attr.enrol_sync = msg->enrol_sync; - attr.recv_set = msg->recv_set; - - pthread_mutex_lock(&rib.ro_lock); - - ro_data = malloc(msg->value.len); - if (ro_data == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - cdap_reply_send(instance, key, -1, NULL, 0); - return -1; - } - memcpy(ro_data, msg->value.data, msg->value.len); - - node = ribmgr_ro_create(name, attr, ro_data, msg->value.len); - if (node == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - cdap_reply_send(instance, key, -1, NULL, 0); - free(ro_data); - return -1; - } - - pthread_mutex_lock(&rib.subs_lock); - list_for_each(p, &rib.subs) { - struct ro_sub * e = list_entry(p, struct ro_sub, next); - len_s = strlen(e->name); - len_n = strlen(name); - - if (len_n < len_s) - continue; - - if (memcmp(name, e->name, len_s) == 0) { - if (e->ops->ro_created == NULL) - continue; - - ro_data = malloc(node->len); - if (ro_data == NULL) - continue; - - memcpy(ro_data, node->data, node->len); - e->ops->ro_created(name, ro_data, node->len); - } - } - - pthread_mutex_unlock(&rib.subs_lock); - pthread_mutex_unlock(&rib.ro_lock); - - if (cdap_reply_send(instance, key, ret, NULL, 0)) { - LOG_ERR("Failed to send reply to create request."); - return -1; - } - - return 0; -} - -static int ribmgr_cdap_delete(struct cdap * instance, - cdap_key_t key, - char * name) -{ - struct list_head * p = NULL; - size_t len_s; - size_t len_n; - - pthread_mutex_lock(&rib.ro_lock); - - if (ribmgr_ro_delete(name)) { - pthread_mutex_unlock(&rib.ro_lock); - cdap_reply_send(instance, key, -1, NULL, 0); - return -1; - } - - pthread_mutex_lock(&rib.subs_lock); - - list_for_each(p, &rib.subs) { - struct ro_sub * e = list_entry(p, struct ro_sub, next); - len_s = strlen(e->name); - len_n = strlen(name); - - if (len_n < len_s) - continue; - - if (memcmp(name, e->name, len_s) == 0) { - if (e->ops->ro_deleted == NULL) - continue; - - e->ops->ro_deleted(name); - } - } - - pthread_mutex_unlock(&rib.subs_lock); - pthread_mutex_unlock(&rib.ro_lock); - - if (cdap_reply_send(instance, key, 0, NULL, 0)) { - LOG_ERR("Failed to send reply to create request."); - return -1; - } - - return 0; -} - -static int ribmgr_cdap_write(struct cdap * instance, - cdap_key_t key, - char * name, - ro_msg_t * msg, - uint32_t flags) -{ - int ret = 0; - struct list_head * p = NULL; - size_t len_s; - size_t len_n; - uint8_t * ro_data; - struct rnode * node; - - (void) flags; - - pthread_mutex_lock(&rib.ro_lock); - - ro_data = malloc(msg->value.len); - if (ro_data == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - cdap_reply_send(instance, key, -1, NULL, 0); - return -1; - } - memcpy(ro_data, msg->value.data, msg->value.len); - - node = ribmgr_ro_write(name, msg->value.data, msg->value.len); - if (node == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - free(ro_data); - cdap_reply_send(instance, key, -1, NULL, 0); - return -1; - } - node->seqno = msg->seqno; - - pthread_mutex_lock(&rib.subs_lock); - - list_for_each(p, &rib.subs) { - struct ro_sub * e = list_entry(p, struct ro_sub, next); - len_s = strlen(e->name); - len_n = strlen(name); - - if (len_n < len_s) - continue; - - if (memcmp(name, e->name, len_s) == 0) { - if (e->ops->ro_updated == NULL) - continue; - - ro_data = malloc(node->len); - if (ro_data == NULL) - continue; - - memcpy(ro_data, node->data, node->len); - e->ops->ro_updated(name, ro_data, node->len); - } - } - - pthread_mutex_unlock(&rib.subs_lock); - pthread_mutex_unlock(&rib.ro_lock); - - if (cdap_reply_send(instance, key, ret, NULL, 0)) { - LOG_ERR("Failed to send reply to write request."); - return -1; - } + flow_set_t * fs; + fqueue_t * fq; + struct gam * gam; +} ribmgr; - return 0; -} - -static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node) -{ - int ret = 0; - - if (node != NULL) { - if (node->attr.enrol_sync == true) { - ro_msg_t msg = RO_MSG__INIT; - - if (ro_msg_create(node, &msg)) { - LOG_ERR("Failed to create RO msg."); - return -1; - } - - LOG_DBG("Syncing RO with name %s.", node->full_name); - - if (write_ro_msg(instance, &msg, - node->full_name, CDAP_CREATE)) { - LOG_ERR("Failed to send RO msg."); - return -1; - } - } - - ret = ribmgr_enrol_sync(instance, node->child); - if (ret == 0) - ret = ribmgr_enrol_sync(instance, node->sibling); - } - - return ret; -} - -static int ribmgr_cdap_start(struct cdap * instance, - cdap_key_t key, - char * name) -{ - if (strcmp(name, ENROLLMENT) == 0) { - LOG_DBG("New enrollment request."); - - if (cdap_reply_send(instance, key, 0, NULL, 0)) { - LOG_ERR("Failed to send reply to enrollment request."); - return -1; - } - - /* Loop through rtree and send correct objects. */ - LOG_DBG("Sending ROs that need to be sent on enrolment..."); - - pthread_mutex_lock(&rib.ro_lock); - if (ribmgr_enrol_sync(instance, rib.root->child)) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to sync part of the RIB."); - return -1; - } - - pthread_mutex_unlock(&rib.ro_lock); - - LOG_DBGF("Sending stop enrollment..."); - - key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT, - NULL, 0, 0); - if (key < 0) { - LOG_ERR("Failed to send stop of enrollment."); - return -1; - } - - if (cdap_reply_wait(instance, key, NULL, NULL)) { - LOG_ERR("Remote failed to complete enrollment."); - return -1; - } - } else { - LOG_WARN("Request to start unknown operation."); - if (cdap_reply_send(instance, key, -1, NULL, 0)) - LOG_ERR("Failed to send negative reply."); - } - - return 0; -} - -static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name) -{ - int ret = 0; - - if (strcmp(name, ENROLLMENT) == 0) { - LOG_DBG("Stop enrollment received."); - /* FIXME: don't use states to match start to stop. */ - ribmgr_set_state(RIBMGR_OPERATIONAL); - } else { - ret = -1; - } - - if (cdap_reply_send(instance, key, ret, NULL, 0)) { - LOG_ERR("Failed to send reply to stop request."); - return -1; - } - - return 0; -} - -static void ro_id_delete(void * o) -{ - struct ro_id * ro_id = *((struct ro_id **) o); - - pthread_mutex_lock(&rib.ro_ids_lock); - list_del(&ro_id->next); - free(ro_id->full_name); - free(ro_id); - pthread_mutex_unlock(&rib.ro_ids_lock); -} - -static int ro_id_create(char * name, ro_msg_t * msg) -{ - struct ro_id * tmp; - - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) - return -ENOMEM; - - tmp->seqno = msg->seqno; - tmp->full_name = strdup(name); - list_head_init(&tmp->next); - - if (tmp->full_name == NULL) { - free(tmp); - return -ENOMEM; - } - - pthread_mutex_lock(&rib.ro_ids_lock); - list_add(&tmp->next, &rib.ro_ids); - - if (timerwheel_add(rib.wheel, ro_id_delete, - &tmp, sizeof(tmp), RO_ID_TIMEOUT)) { - LOG_ERR("Failed to add item to timerwheel."); - pthread_mutex_unlock(&rib.ro_ids_lock); - free(tmp->full_name); - free(tmp); - return -1; - } - pthread_mutex_unlock(&rib.ro_ids_lock); - - return 0; -} - -static void * cdap_req_handler(void * o) -{ - struct cdap * instance = (struct cdap *) o; - enum cdap_opcode opcode; - char * name; - uint8_t * data; - size_t len; - uint32_t flags; - ro_msg_t * msg; - struct list_head * p = NULL; - - assert(instance); - - while (true) { - cdap_key_t key = cdap_request_wait(instance, - &opcode, - &name, - &data, - &len, - &flags); - assert(key >= 0); - - if (opcode == CDAP_START) { - if (ribmgr_cdap_start(instance, key, name)) - LOG_WARN("CDAP start failed."); - free(name); - continue; - } - else if (opcode == CDAP_STOP) { - if (ribmgr_cdap_stop(instance, key, name)) - LOG_WARN("CDAP stop failed."); - free(name); - continue; - } - - assert(len > 0); - - msg = ro_msg__unpack(NULL, len, data); - if (msg == NULL) { - cdap_reply_send(instance, key, -1, NULL, 0); - LOG_WARN("Failed to unpack RO message"); - free(data); - continue; - } - - pthread_mutex_lock(&rib.ro_ids_lock); - list_for_each(p, &rib.ro_ids) { - struct ro_id * e = list_entry(p, struct ro_id, next); - - if (strcmp(e->full_name, name) == 0 && - e->seqno == msg->seqno) { - pthread_mutex_unlock(&rib.ro_ids_lock); - ro_msg__free_unpacked(msg, NULL); - cdap_reply_send(instance, key, 0, NULL, 0); - LOG_DBG("Already received this RO."); - free(name); - continue; - } - } - pthread_mutex_unlock(&rib.ro_ids_lock); - - if (opcode == CDAP_CREATE) { - if (ribmgr_cdap_create(instance, key, name, msg)) { - LOG_WARN("CDAP create failed."); - ro_msg__free_unpacked(msg, NULL); - free(name); - continue; - } - } else if (opcode == CDAP_WRITE) { - if (ribmgr_cdap_write(instance, key, name, - msg, flags)) { - LOG_WARN("CDAP write failed."); - ro_msg__free_unpacked(msg, NULL); - free(name); - continue; - } - } else if (opcode == CDAP_DELETE) { - if (ribmgr_cdap_delete(instance, key, name)) { - LOG_WARN("CDAP delete failed."); - ro_msg__free_unpacked(msg, NULL); - free(name); - continue; - } - } else { - LOG_INFO("Unsupported opcode received."); - ro_msg__free_unpacked(msg, NULL); - cdap_reply_send(instance, key, -1, NULL, 0); - free(name); - continue; - } - - if (ro_id_create(name, msg)) { - LOG_WARN("Failed to create RO id."); - ro_msg__free_unpacked(msg, NULL); - free(name); - continue; - } - - if (msg->recv_set == ALL_MEMBERS) { - pthread_rwlock_rdlock(&rib.flows_lock); - list_for_each(p, &rib.flows) { - struct mgmt_flow * e = - list_entry(p, struct mgmt_flow, next); - - /* Don't send it back. */ - if (e->instance == instance) - continue; - - if (write_ro_msg(e->instance, msg, - name, opcode)) - LOG_WARN("Failed to send to neighbor."); - } - pthread_rwlock_unlock(&rib.flows_lock); - } - - free(name); - ro_msg__free_unpacked(msg, NULL); - } - return (void *) 0; -} - -static int ribmgr_add_flow(int fd) -{ - struct cdap * instance = NULL; - struct mgmt_flow * flow; - - flow = malloc(sizeof(*flow)); - if (flow == NULL) - return -ENOMEM; - - instance = cdap_create(fd); - if (instance == NULL) { - LOG_ERR("Failed to create CDAP instance"); - free(flow); - return -1; - } - - list_head_init(&flow->next); - flow->instance = instance; - flow->fd = fd; - - if (pthread_create(&flow->handler, NULL, - cdap_req_handler, instance)) { - LOG_ERR("Failed to start handler thread for mgt flow."); - free(flow); - return -1; - } - - pthread_rwlock_wrlock(&rib.flows_lock); - - list_add(&flow->next, &rib.flows); - - pthread_rwlock_unlock(&rib.flows_lock); - - return 0; -} - -int ribmgr_remove_flow(int fd) +int ribmgr_init(void) { - struct list_head * pos, * n = NULL; - - pthread_rwlock_wrlock(&rib.flows_lock); - list_for_each_safe(pos, n, &rib.flows) { - struct mgmt_flow * flow = - list_entry(pos, struct mgmt_flow, next); - if (flow->fd == fd) { - pthread_cancel(flow->handler); - if (cdap_destroy(flow->instance)) - LOG_ERR("Failed to destroy CDAP instance."); - list_del(&flow->next); - pthread_rwlock_unlock(&rib.flows_lock); - free(flow); - return 0; - } - } - pthread_rwlock_unlock(&rib.flows_lock); - - return -1; -} - -/* FIXME: do this in a topologymanager instance */ -int ribmgr_add_nm1_flow(int fd) -{ - if (flow_alloc_resp(fd, 0) < 0) { - LOG_ERR("Could not respond to new flow."); - return -1; - } - - return ribmgr_add_flow(fd); -} - -int ribmgr_nm1_mgt_flow(char * dst_name) -{ - int fd; - int result; - - /* FIXME: Request retransmission. */ - fd = flow_alloc(dst_name, MGMT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s.", dst_name); - return -1; - } - - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d.", - dst_name, result); - flow_dealloc(fd); - return -1; - } + enum pol_cacep pc; + enum pol_gam pg; - if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to add file descriptor."); - flow_dealloc(fd); + if (rib_read(BOOT_PATH "/rm/gam/type", &pg, sizeof(pg)) + != sizeof(pg)) { + LOG_ERR("Failed to read policy for ribmgr gam."); return -1; } - return fd; -} - -int ribmgr_bootstrap(struct dif_config * conf) -{ - static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; - uint8_t * data = NULL; - size_t len = 0; - struct ro_attr attr; - - ro_attr_init(&attr); - attr.enrol_sync = true; - - if (ribmgr_ro_create(RIBMGR_PREFIX, attr, NULL, 0) == NULL) { - LOG_ERR("Failed to create RIBMGR RO."); + if (rib_read(BOOT_PATH "/rm/gam/cacep", &pc, sizeof(pc)) + != sizeof(pc)) { + LOG_ERR("Failed to read CACEP policy for ribmgr gam."); return -1; } - stat_info.addr_size = rib.dtc.addr_size = conf->addr_size; - stat_info.cep_id_size = rib.dtc.cep_id_size = conf->cep_id_size; - stat_info.pdu_length_size = rib.dtc.pdu_length_size - = conf->pdu_length_size; - stat_info.seqno_size = rib.dtc.seqno_size = conf->seqno_size; - stat_info.has_ttl = rib.dtc.has_ttl = conf->has_ttl; - stat_info.has_chk = rib.dtc.has_chk = conf->has_chk; - stat_info.min_pdu_size = rib.dtc.min_pdu_size = conf->min_pdu_size; - stat_info.max_pdu_size = rib.dtc.max_pdu_size = conf->max_pdu_size; - stat_info.addr_auth_type = rib.addr_auth_type = conf->addr_auth_type; - stat_info.dt_gam_type = rib.dt_gam_type = conf->dt_gam_type; + /* FIXME: Implement cacep policies */ + (void) pc; - len = static_info_msg__get_packed_size(&stat_info); - if (len == 0) { - LOG_ERR("Failed to get size of static information."); - ribmgr_ro_delete(RIBMGR_PREFIX); + ribmgr.gam = gam_create(pg, MGMT_AE); + if (ribmgr.gam == NULL) { + LOG_ERR("Failed to create gam."); return -1; } - data = malloc(len); - if (data == NULL) { - LOG_ERR("Failed to allocate memory."); - ribmgr_ro_delete(RIBMGR_PREFIX); + ribmgr.fs = flow_set_create(); + if (ribmgr.fs == NULL) { + LOG_ERR("Failed to create flow set."); + gam_destroy(ribmgr.gam); return -1; } - static_info_msg__pack(&stat_info, data); - - if (ribmgr_ro_create(RIBMGR_PREFIX STAT_INFO, - attr, data, len) == NULL) { - LOG_ERR("Failed to create static info RO."); - free(data); - ribmgr_ro_delete(RIBMGR_PREFIX); + ribmgr.fq = fqueue_create(); + if (ribmgr.fq == NULL) { + LOG_ERR("Failed to create fq."); + flow_set_destroy(ribmgr.fs); + gam_destroy(ribmgr.gam); return -1; } - if (dir_init()) { - LOG_ERR("Failed to init directory"); - ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - ribmgr_ro_delete(RIBMGR_PREFIX); - return -1; - } - - LOG_DBG("Bootstrapped RIB Manager."); - return 0; } -int ribmgr_enrol() +void ribmgr_fini(void) { - struct cdap * instance = NULL; - struct mgmt_flow * flow; - cdap_key_t key; - int ret; - struct timespec timeout = {(ENROLL_TIMEOUT / 1000), - (ENROLL_TIMEOUT % 1000) * MILLION}; - - pthread_rwlock_wrlock(&rib.flows_lock); - - assert(!list_is_empty(&rib.flows)); - - flow = list_first_entry((&rib.flows), struct mgmt_flow, next); - instance = flow->instance; - - key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); - if (key < 0) { - pthread_rwlock_unlock(&rib.flows_lock); - LOG_ERR("Failed to start enrollment."); - return -1; - } - - ret = cdap_reply_wait(instance, key, NULL, NULL); - if (ret) { - pthread_rwlock_unlock(&rib.flows_lock); - LOG_ERR("Failed to enroll: %d.", ret); - return -1; - } - - pthread_rwlock_unlock(&rib.flows_lock); - - if (ribmgr_wait_state(RIBMGR_OPERATIONAL, &timeout) == -ETIMEDOUT) - LOG_ERR("Enrollment of RIB timed out."); - - if (ribmgr_get_state() != RIBMGR_OPERATIONAL) - return -1; - - return 0; + flow_set_destroy(ribmgr.fs); + fqueue_destroy(ribmgr.fq); + gam_destroy(ribmgr.gam); } -int ribmgr_start_policies(void) +int ribmgr_flow_arr(int fd, + qosspec_t qs) { - rib.addr_auth = addr_auth_create(rib.addr_auth_type); - if (rib.addr_auth == NULL) { - LOG_ERR("Failed to create address authority."); - return -1; - } - - rib.address = rib.addr_auth->address(); - LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address); + assert(ribmgr.gam); - return 0; -} - -struct dt_const * ribmgr_dt_const(void) -{ - return &(rib.dtc); -} - -uint64_t ribmgr_address(void) -{ - return rib.address; -} - -enum pol_gam ribmgr_dt_gam(void) -{ - return rib.dt_gam_type; -} - -static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code) -{ - struct list_head * p = NULL; - - pthread_rwlock_rdlock(&rib.flows_lock); - - list_for_each(p, &rib.flows) { - struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); - if (write_ro_msg(e->instance, msg, name, code)) { - pthread_rwlock_unlock(&rib.flows_lock); - LOG_ERR("Failed to send to a neighbor."); - return -1; - } - } - - pthread_rwlock_unlock(&rib.flows_lock); - - return 0; -} - -int ro_create(const char * name, - struct ro_attr * attr, - uint8_t * data, - size_t len) -{ - struct rnode * node; - ro_msg_t msg = RO_MSG__INIT; - struct ro_attr rattr; - - assert(name); - - if (attr == NULL) { - ro_attr_init(&rattr); - attr = &rattr; - } - - pthread_mutex_lock(&rib.ro_lock); - - node = ribmgr_ro_create(name, *attr, data, len); - if (node == NULL) { - pthread_mutex_unlock(&rib.ro_lock); + if (gam_flow_arr(ribmgr.gam, fd, qs)) return -1; - } - - if (node->attr.recv_set == NO_SYNC) { - pthread_mutex_unlock(&rib.ro_lock); - return 0; - } - - if (ro_msg_create(node, &msg)) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to create RO msg."); - return -1; - } - - if (send_neighbors_ro(node->full_name, &msg, CDAP_CREATE)) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to send to neighbors."); - return -1; - } - - pthread_mutex_unlock(&rib.ro_lock); - - return 0; -} - -int ro_attr_init(struct ro_attr * attr) -{ - assert(attr); - - attr->enrol_sync = false; - attr->recv_set = NO_SYNC; - attr->expiry.tv_sec = 0; - attr->expiry.tv_nsec = 0; return 0; } -int ro_delete(const char * name) +int ribmgr_disseminate(char * path, + enum diss_target target, + enum diss_freq freq, + size_t delay) { - struct rnode * node; - ro_msg_t msg = RO_MSG__INIT; - - assert(name); - - pthread_mutex_lock(&rib.ro_lock); - - node = find_rnode_by_name(name); - if (node == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to sync RO."); - return -1; - } - - if (node->attr.recv_set != NO_SYNC) { - if (ro_msg_create(node, &msg)) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to create RO msg."); - return -1; - } - - if (send_neighbors_ro(node->full_name, &msg, CDAP_DELETE)) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to send to neighbors."); - return -1; - } - } - - if (ribmgr_ro_delete(name)) { - pthread_mutex_unlock(&rib.ro_lock); - return -1; - } - - pthread_mutex_unlock(&rib.ro_lock); + (void) path; + (void) target; + (void) freq; + (void) delay; return 0; } - -int ro_write(const char * name, uint8_t * data, size_t len) -{ - struct rnode * node; - ro_msg_t msg = RO_MSG__INIT; - - assert(name); - assert(data); - - pthread_mutex_lock(&rib.ro_lock); - - node = ribmgr_ro_write(name, data, len); - if (node == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to create RO."); - return -1; - } - node->seqno++; - - if (node->attr.recv_set == NO_SYNC) { - pthread_mutex_unlock(&rib.ro_lock); - return 0; - } - - if (ro_msg_create(node, &msg)) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to create RO msg."); - return -1; - } - - if (send_neighbors_ro(node->full_name, &msg, CDAP_WRITE)) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Failed to send to neighbors."); - return -1; - } - - pthread_mutex_unlock(&rib.ro_lock); - - return 0; -} - -ssize_t ro_read(const char * name, uint8_t ** data) -{ - struct rnode * node; - ssize_t len; - - assert(name); - assert(data); - - pthread_mutex_lock(&rib.ro_lock); - - node = find_rnode_by_name(name); - if (node == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - return -1; - } - - *data = malloc(node->len); - if (*data == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - return -1; - } - - memcpy(*data, node->data, node->len); - len = node->len; - - pthread_mutex_unlock(&rib.ro_lock); - - return len; -} - -ssize_t ro_children(const char * name, char *** children) -{ - struct rnode * node; - struct rnode * child; - ssize_t len = 0; - int i = 0; - - assert(name); - assert(children); - - pthread_mutex_lock(&rib.ro_lock); - - node = find_rnode_by_name(name); - if (node == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - return -1; - } - - child = node->child; - while (child != NULL) { - len++; - child = child->sibling; - } - child = node->child; - - *children = malloc(len); - if (*children == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - return -1; - } - - for (i = 0; i < len; i++) { - (*children)[i] = strdup(child->name); - if ((*children)[i] == NULL) { - while (i >= 0) { - free((*children)[i]); - i--; - } - free(*children); - pthread_mutex_unlock(&rib.ro_lock); - return -1; - } - child = child->sibling; - } - - pthread_mutex_unlock(&rib.ro_lock); - - return len; -} - -bool ro_exists(const char * name) -{ - struct rnode * node; - bool found; - - assert(name); - - pthread_mutex_lock(&rib.ro_lock); - - node = find_rnode_by_name(name); - found = (node == NULL) ? false : true; - - pthread_mutex_unlock(&rib.ro_lock); - - return found; -} - -int ro_subscribe(const char * name, struct ro_sub_ops * ops) -{ - struct ro_sub * sub; - int sid; - - assert(name); - assert(ops); - - sub = malloc(sizeof(*sub)); - if (sub == NULL) - return -ENOMEM; - - list_head_init(&sub->next); - - sub->name = strdup(name); - if (sub->name == NULL) { - free(sub); - return -1; - } - - sub->ops = ops; - - pthread_mutex_lock(&rib.subs_lock); - - sid = bmp_allocate(rib.sids); - if (sid < 0) { - pthread_mutex_unlock(&rib.subs_lock); - free(sub->name); - free(sub); - LOG_ERR("Failed to get sub id."); - return -1; - } - sub->sid = sid; - - list_add(&sub->next, &rib.subs); - - pthread_mutex_unlock(&rib.subs_lock); - - return sid; -} - -int ro_unsubscribe(int sid) -{ - struct list_head * pos = NULL; - struct list_head * n = NULL; - - pthread_mutex_lock(&rib.subs_lock); - - list_for_each_safe(pos, n, &(rib.subs)) { - struct ro_sub * e = list_entry(pos, struct ro_sub, next); - if (sid == e->sid) { - bmp_release(rib.sids, sid); - list_del(&e->next); - free(e->name); - free(e); - pthread_mutex_unlock(&rib.subs_lock); - return 0; - } - } - - pthread_mutex_unlock(&rib.subs_lock); - - LOG_ERR("No such subscription found."); - - return -1; -} |