From f8632a11faed3402e8255f368b8eff58f3b2eadb Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 16 Nov 2016 17:46:41 +0100 Subject: ipcpd: normal: Add syncing of RIB objects This adds the remote syncing of RIB objects. Subscribers are notified upon receipt of new/deleted/updated RIB objects. --- src/ipcpd/normal/CMakeLists.txt | 4 +- src/ipcpd/normal/ribmgr.c | 1146 ++++++++++++++++++++++++++------------- src/ipcpd/normal/ro.h | 33 +- src/ipcpd/normal/ro.proto | 9 + 4 files changed, 799 insertions(+), 393 deletions(-) create mode 100644 src/ipcpd/normal/ro.proto diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 08c5c691..06e41e9c 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -20,6 +20,8 @@ protobuf_generate_c(STATIC_INFO_SRCS STATIC_INFO_HDRS protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS flow_alloc.proto) +protobuf_generate_c(RO_SRCS RO_HDRS ro.proto) + set(SOURCE_FILES # Add source files here addr_auth.c @@ -34,7 +36,7 @@ set(SOURCE_FILES ) add_executable (ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES} - ${STATIC_INFO_SRCS} ${FLOW_ALLOC_SRCS}) + ${STATIC_INFO_SRCS} ${FLOW_ALLOC_SRCS} ${RO_SRCS}) target_link_libraries (ipcpd-normal LINK_PUBLIC ouroboros) include(MacroAddCompileFlags) diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 96c7e7e0..49971eda 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -47,15 +47,22 @@ #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; +#include "ro.pb-c.h" +typedef RoMsg ro_msg_t; + #define SUBS_SIZE 25 #define ENROLLMENT "enrollment" -#define STATIC_INFO "static DIF information" + +#define RIBMGR_PREFIX "/ribmgr" +#define STAT_INFO "/statinfo" #define PATH_DELIMITER "/" /* RIB objects */ struct rnode { char * name; + char * full_name; + uint64_t seqno; /* * NOTE: Naive implementation for now, could be replaced by @@ -92,6 +99,7 @@ struct { struct list_head subs; struct bmp * sids; pthread_mutex_t subs_lock; + int ribmgr_sid; struct dt_const dtc; @@ -106,6 +114,296 @@ struct { struct addr_auth * addr_auth; } rib; +int ribmgr_ro_created(const char * name, + uint8_t * data, + size_t len) +{ + static_info_msg_t * stat_msg; + + pthread_rwlock_wrlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_PENDING_ENROLL && + strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { + LOG_DBG("Received static DIF information."); + + stat_msg = static_info_msg__unpack(NULL, len, data); + if (stat_msg == NULL) { + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to unpack static info message."); + return -1; + } + + rib.dtc.addr_size = stat_msg->addr_size; + rib.dtc.cep_id_size = stat_msg->cep_id_size; + rib.dtc.pdu_length_size = stat_msg->pdu_length_size; + rib.dtc.seqno_size = stat_msg->seqno_size; + rib.dtc.has_ttl = stat_msg->has_ttl; + rib.dtc.has_chk = stat_msg->has_chk; + rib.dtc.min_pdu_size = stat_msg->min_pdu_size; + rib.dtc.max_pdu_size = stat_msg->max_pdu_size; + + rib.addr_auth = addr_auth_create(stat_msg->addr_auth_type); + if (rib.addr_auth == NULL) { + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); + static_info_msg__free_unpacked(stat_msg, NULL); + LOG_ERR("Failed to create address authority"); + return -1; + } + + rib.address = rib.addr_auth->address(); + LOG_DBG("IPCP has address %lu", rib.address); + + if (frct_init()) { + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); + static_info_msg__free_unpacked(stat_msg, NULL); + LOG_ERR("Failed to init FRCT"); + return -1; + } + + static_info_msg__free_unpacked(stat_msg, NULL); + } + pthread_rwlock_unlock(&ipcpi.state_lock); + + return 0; +} + +/* We only have a create operation for now */ +static struct ro_sub_ops ribmgr_sub_ops = { + .ro_created = ribmgr_ro_created, + .ro_updated = NULL, + .ro_deleted = NULL +}; + +static struct rnode * find_rnode_by_name(const char * name) +{ + char * str; + char * str1; + char * saveptr; + char * token; + struct rnode * node; + + str = strdup(name); + if (str == NULL) + return NULL; + + node = rib.root; + + for (str1 = str; ; str1 = NULL) { + token = strtok_r(str1, PATH_DELIMITER, &saveptr); + if (token == NULL) + break; + + node = node->child; + + while (node != NULL) + if (strcmp(node->name, token) == 0) + break; + else + node = node->sibling; + + if (node == NULL) { + free(str); + return NULL; + } + } + + free(str); + return node; +} + +/* Call under RIB object lock */ +static int ro_msg_create(struct rnode * node, + ro_msg_t * msg) +{ + msg->address = rib.address; + msg->seqno = node->seqno; + msg->recv_set = node->props->recv_set; + msg->enrol_sync = node->props->enrol_sync; + msg->sec = node->props->expiry.tv_sec; + msg->nsec = node->props->expiry.tv_nsec; + msg->value.data = node->data; + msg->value.len = node->len; + + return 0; +} + +static struct rnode * ribmgr_ro_create(const char * name, + struct ro_props * props, + uint8_t * data, + size_t len) +{ + char * str; + char * str1; + char * saveptr; + char * token; + char * token2; + struct rnode * node; + struct rnode * new; + struct rnode * prev; + bool sibling; + + str = strdup(name); + if (str == NULL) + return NULL; + + node = rib.root; + + for (str1 = str; ; str1 = NULL) { + token = strtok_r(str1, PATH_DELIMITER, &saveptr); + if (token == NULL) { + LOG_ERR("RO already exists."); + free(str); + return NULL; + } + + prev = node; + node = node->child; + sibling = false; + + /* Search horizontally */ + while (node != NULL) { + if (strcmp(node->name, token) == 0) { + break; + } else { + prev = node; + node = node->sibling; + sibling = true; + } + } + + if (node == NULL) + break; + } + + token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr); + if (token2 != NULL) { + LOG_ERR("Part of the pathname does not exist."); + free(str); + return NULL; + } + + new = malloc(sizeof(*new)); + if (new == NULL) { + free(str); + return NULL; + } + + new->name = strdup(token); + if (new->name == NULL) { + free(str); + free(new); + return NULL; + } + + free(str); + + new->full_name = strdup(name); + if (new->full_name == NULL) { + free(new); + return NULL; + } + + new->seqno = 0; + new->props = props; + + if (sibling) + prev->sibling = new; + else + prev->child = new; + + new->data = data; + new->len = len; + new->child = NULL; + new->sibling = NULL; + + return new; +} + +static int ribmgr_ro_delete(const char * name) +{ + char * str; + char * str1; + char * saveptr; + char * token; + struct rnode * node; + struct rnode * prev; + bool sibling = false; + + str = strdup(name); + if (str == NULL) + return -1; + + node = rib.root; + prev = NULL; + + for (str1 = str; ; str1 = NULL) { + token = strtok_r(str1, PATH_DELIMITER, &saveptr); + if (token == NULL) + break; + + prev = node; + node = node->child; + sibling = false; + + while (node != NULL) { + if (strcmp(node->name, token) == 0) { + break; + } else { + prev = node; + node = node->sibling; + sibling = true; + } + } + + if (node == NULL) { + free(str); + return -1; + } + } + + if (node == rib.root) { + LOG_ERR("Won't remove root."); + free(str); + return -1; + } + + free(node->name); + free(node->full_name); + if (node->data != NULL) + free(node->data); + + if (sibling) + prev->sibling = node->sibling; + else + prev->child = node->sibling; + + free(node); + free(str); + + return 0; +} + +static struct rnode * ribmgr_ro_write(const char * name, + uint8_t * data, + size_t len) +{ + struct rnode * node; + + node = find_rnode_by_name(name); + if (node == NULL) + return NULL; + + free(node->data); + + node->data = data; + node->len = len; + node->seqno++; + + return node; +} + /* Call while holding cdap_reqs_lock */ /* FIXME: better not to call blocking functions under any lock */ int cdap_result_wait(struct cdap * instance, @@ -151,6 +449,46 @@ int cdap_result_wait(struct cdap * instance, return ret; } +static int write_ro_msg(struct cdap * neighbor, + ro_msg_t * msg, + char * name, + enum cdap_opcode code) +{ + uint8_t * data; + size_t len; + int iid = 0; + + len = ro_msg__get_packed_size(msg); + if (len == 0) + return -1; + + data = malloc(len); + if (data == NULL) + return -ENOMEM; + + ro_msg__pack(msg, data); + + pthread_mutex_lock(&rib.cdap_reqs_lock); + iid = cdap_send_request(neighbor, code, + name, data, len, 0); + if (iid < 0) { + pthread_mutex_unlock(&rib.cdap_reqs_lock); + free(data); + return -1; + } + + if (cdap_result_wait(neighbor, code, name, iid)) { + pthread_mutex_unlock(&rib.cdap_reqs_lock); + free(data); + LOG_ERR("Remote did not receive RIB object."); + return -1; + } + pthread_mutex_unlock(&rib.cdap_reqs_lock); + free(data); + + return 0; +} + int ribmgr_init() { INIT_LIST_HEAD(&rib.flows); @@ -206,6 +544,18 @@ int ribmgr_init() return -1; } + rib.ribmgr_sid = ro_subscribe(RIBMGR_PREFIX, &ribmgr_sub_ops); + if (rib.ribmgr_sid < 0) { + LOG_ERR("Failed to subscribe."); + bmp_destroy(rib.sids); + pthread_rwlock_destroy(&rib.flows_lock); + pthread_mutex_destroy(&rib.cdap_reqs_lock); + pthread_mutex_destroy(&rib.ro_lock); + pthread_mutex_destroy(&rib.subs_lock); + free(rib.root); + return -1; + } + return 0; } @@ -247,6 +597,8 @@ int ribmgr_fini() } pthread_rwlock_unlock(&rib.flows_lock); + ro_unsubscribe(rib.ribmgr_sid); + if (rib.addr_auth != NULL) addr_auth_destroy(rib.addr_auth); @@ -273,6 +625,7 @@ static int ribmgr_cdap_reply(struct cdap * instance, { struct list_head * pos, * n = NULL; + /* We never perform reads on other RIBs */ (void) data; (void) len; @@ -295,7 +648,6 @@ static int ribmgr_cdap_reply(struct cdap * instance, pthread_mutex_unlock(&rib.cdap_reqs_lock); - /* FIXME: In case of a read, update values here */ cdap_request_respond(req, result); pthread_mutex_lock(&rib.cdap_reqs_lock); @@ -306,69 +658,184 @@ static int ribmgr_cdap_reply(struct cdap * instance, return 0; } -static int ribmgr_cdap_write(struct cdap * instance, +static int ribmgr_cdap_create(struct cdap * instance, + int invoke_id, + char * name, + ro_msg_t * msg) +{ + int ret = 0; + struct list_head * p = NULL; + size_t len_s, len_n; + uint8_t * ro_data; + struct ro_props * props; + struct rnode * node; + + props = malloc(sizeof(*props)); + if (props == NULL) { + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + return -ENOMEM; + } + + props->expiry.tv_sec = msg->sec; + props->expiry.tv_nsec = msg->nsec; + props->enrol_sync = msg->enrol_sync; + + pthread_mutex_lock(&rib.ro_lock); + + ro_data = malloc(msg->value.len); + if (ro_data == NULL) { + pthread_mutex_unlock(&rib.ro_lock); + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + free(props); + return -1; + } + memcpy(ro_data, msg->value.data, msg->value.len); + + node = ribmgr_ro_create(name, props, ro_data, msg->value.len); + if (node == NULL) { + pthread_mutex_unlock(&rib.ro_lock); + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + free(props); + free(ro_data); + return -1; + } + + pthread_mutex_lock(&rib.subs_lock); + list_for_each(p, &rib.subs) { + struct ro_sub * e = list_entry(p, struct ro_sub, next); + len_s = strlen(e->name); + len_n = strlen(name); + + if (len_n < len_s) + continue; + + if (memcmp(name, e->name, len_s) == 0) { + if (e->ops->ro_created == NULL) + continue; + + ro_data = malloc(node->len); + if (ro_data == NULL) + continue; + + memcpy(ro_data, node->data, node->len); + e->ops->ro_created(name, ro_data, node->len); + } + } + + pthread_mutex_unlock(&rib.subs_lock); + pthread_mutex_unlock(&rib.ro_lock); + + if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + LOG_ERR("Failed to send reply to create request."); + return -1; + } + + return 0; +} + +static int ribmgr_cdap_delete(struct cdap * instance, + int invoke_id, + char * name) +{ + struct list_head * p = NULL; + size_t len_s; + size_t len_n; + + pthread_mutex_lock(&rib.ro_lock); + + if (ribmgr_ro_delete(name)) { + pthread_mutex_unlock(&rib.ro_lock); + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + return -1; + } + + pthread_mutex_lock(&rib.subs_lock); + + list_for_each(p, &rib.subs) { + struct ro_sub * e = list_entry(p, struct ro_sub, next); + len_s = strlen(e->name); + len_n = strlen(name); + + if (len_n < len_s) + continue; + + if (memcmp(name, e->name, len_s) == 0) { + if (e->ops->ro_deleted == NULL) + continue; + + e->ops->ro_deleted(name); + } + } + + pthread_mutex_unlock(&rib.subs_lock); + pthread_mutex_unlock(&rib.ro_lock); + + if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { + LOG_ERR("Failed to send reply to create request."); + return -1; + } + + return 0; +} + +static int ribmgr_cdap_write(struct cdap * instance, int invoke_id, char * name, - uint8_t * data, - size_t len, + ro_msg_t * msg, uint32_t flags) { - static_info_msg_t * msg; int ret = 0; + struct list_head * p = NULL; + size_t len_s; + size_t len_n; + uint8_t * ro_data; + struct rnode * node; (void) flags; - pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_PENDING_ENROLL && - strcmp(name, STATIC_INFO) == 0) { - LOG_DBG("Received static DIF information."); + pthread_mutex_lock(&rib.ro_lock); - msg = static_info_msg__unpack(NULL, len, data); - if (msg == NULL) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - LOG_ERR("Failed to unpack static info message."); - return -1; - } + ro_data = malloc(msg->value.len); + if (ro_data == NULL) { + pthread_mutex_unlock(&rib.ro_lock); + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + return -1; + } + memcpy(ro_data, msg->value.data, msg->value.len); - rib.dtc.addr_size = msg->addr_size; - rib.dtc.cep_id_size = msg->cep_id_size; - rib.dtc.pdu_length_size = msg->pdu_length_size; - rib.dtc.seqno_size = msg->seqno_size; - rib.dtc.has_ttl = msg->has_ttl; - rib.dtc.has_chk = msg->has_chk; - rib.dtc.min_pdu_size = msg->min_pdu_size; - rib.dtc.max_pdu_size = msg->max_pdu_size; + node = ribmgr_ro_write(name, msg->value.data, msg->value.len); + if (node == NULL) { + pthread_mutex_unlock(&rib.ro_lock); + free(ro_data); + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + return -1; + } - rib.addr_auth = addr_auth_create(msg->addr_auth_type); - if (rib.addr_auth == NULL) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - static_info_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to create address authority"); - return -1; - } + pthread_mutex_lock(&rib.subs_lock); - rib.address = rib.addr_auth->address(); - LOG_DBG("IPCP has address %lu", rib.address); + list_for_each(p, &rib.subs) { + struct ro_sub * e = list_entry(p, struct ro_sub, next); + len_s = strlen(e->name); + len_n = strlen(name); - if (frct_init()) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - static_info_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to init FRCT"); - return -1; - } + if (len_n < len_s) + continue; - static_info_msg__free_unpacked(msg, NULL); - } else { - ret = -1; + if (memcmp(name, e->name, len_s) == 0) { + if (e->ops->ro_updated == NULL) + continue; + + ro_data = malloc(node->len); + if (ro_data == NULL) + continue; + + memcpy(ro_data, node->data, node->len); + e->ops->ro_updated(name, ro_data, node->len); + } } - pthread_rwlock_unlock(&ipcpi.state_lock); + pthread_mutex_unlock(&rib.subs_lock); + pthread_mutex_unlock(&rib.ro_lock); if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { LOG_ERR("Failed to send reply to write request."); @@ -378,13 +845,39 @@ static int ribmgr_cdap_write(struct cdap * instance, return 0; } +static int ribmgr_enrol_sync(struct cdap * instance, + struct rnode * node) +{ + int ret = 0; + + if (node != NULL) { + if (node->props->enrol_sync == true) { + ro_msg_t msg = RO_MSG__INIT; + + if (ro_msg_create(node, &msg)) { + LOG_ERR("Failed to create RO msg."); + return -1; + } + + if (write_ro_msg(instance, &msg, + node->full_name, CDAP_CREATE)) { + LOG_ERR("Failed to send RO msg."); + return -1; + } + } + + ret = ribmgr_enrol_sync(instance, node->child); + if (ret == 0) + ret = ribmgr_enrol_sync(instance, node->sibling); + } + + return ret; +} + static int ribmgr_cdap_start(struct cdap * instance, int invoke_id, char * name) { - static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; - uint8_t * data = NULL; - size_t len = 0; int iid = 0; pthread_rwlock_wrlock(&ipcpi.state_lock); @@ -398,57 +891,17 @@ static int ribmgr_cdap_start(struct cdap * instance, return -1; } - stat_info.addr_size = rib.dtc.addr_size; - stat_info.cep_id_size = rib.dtc.cep_id_size; - stat_info.pdu_length_size = rib.dtc.pdu_length_size; - stat_info.seqno_size = rib.dtc.seqno_size; - stat_info.has_ttl = rib.dtc.has_ttl; - stat_info.has_chk = rib.dtc.has_chk; - stat_info.min_pdu_size = rib.dtc.min_pdu_size; - stat_info.max_pdu_size = rib.dtc.max_pdu_size; - stat_info.addr_auth_type = rib.addr_auth->type; - - len = static_info_msg__get_packed_size(&stat_info); - if (len == 0) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Failed to get size of static information."); - return -1; - } - - data = malloc(len); - if (data == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Failed to allocate memory."); - return -1; - } - - static_info_msg__pack(&stat_info, data); - - LOG_DBGF("Sending static info..."); - - pthread_mutex_lock(&rib.cdap_reqs_lock); + /* Loop through rtree and send correct objects */ + LOG_DBGF("Sending ROs that need to be sent on enrolment..."); - iid = cdap_send_request(instance, CDAP_WRITE, - STATIC_INFO, data, len, 0); - if (iid < 0) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); - free(data); - LOG_ERR("Failed to send static information."); - return -1; - } - - if (cdap_result_wait(instance, CDAP_WRITE, - STATIC_INFO, iid)) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_mutex_lock(&rib.ro_lock); + if (ribmgr_enrol_sync(instance, rib.root->child)) { + pthread_mutex_unlock(&rib.ro_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - free(data); - LOG_ERR("Remote did not receive static information."); + LOG_ERR("Failed to sync part of the RIB."); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - /* FIXME: Send neighbors here. */ + pthread_mutex_unlock(&rib.ro_lock); LOG_DBGF("Sending stop enrollment..."); @@ -459,7 +912,6 @@ static int ribmgr_cdap_start(struct cdap * instance, if (iid < 0) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - free(data); LOG_ERR("Failed to send stop of enrollment."); return -1; } @@ -468,13 +920,10 @@ static int ribmgr_cdap_start(struct cdap * instance, ENROLLMENT, iid)) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - free(data); LOG_ERR("Remote failed to complete enrollment."); return -1; } pthread_mutex_unlock(&rib.cdap_reqs_lock); - - free(data); } else { if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); @@ -520,27 +969,75 @@ static int ribmgr_cdap_request(struct cdap * instance, size_t len, uint32_t flags) { - switch (opcode) { - case CDAP_WRITE: - return ribmgr_cdap_write(instance, - invoke_id, - name, data, - len, flags); - case CDAP_START: + ro_msg_t * msg; + int ret = -1; + struct list_head * p = NULL; + + if (opcode == CDAP_START) return ribmgr_cdap_start(instance, invoke_id, name); - case CDAP_STOP: + else if (opcode == CDAP_STOP) return ribmgr_cdap_stop(instance, invoke_id, name); - default: - LOG_INFO("Unsupported CDAP opcode received."); + + msg = ro_msg__unpack(NULL, len, data); + if (msg == NULL) { + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + LOG_ERR("Failed to unpack RO message"); + return -1; + } + + /* FIXME: Check if we already received this */ + + if (opcode == CDAP_CREATE) { + ret = ribmgr_cdap_create(instance, + invoke_id, + name, + msg); + } else if (opcode == CDAP_WRITE) { + ret = ribmgr_cdap_write(instance, + invoke_id, + name, msg, + flags); + + } else if (opcode == CDAP_DELETE) { + ret = ribmgr_cdap_delete(instance, + invoke_id, + name); + } else { + LOG_INFO("Unsupported opcode received."); + cdap_send_reply(instance, invoke_id, -1, NULL, 0); return -1; } + + if (msg->recv_set == ALL_MEMBERS) { + pthread_rwlock_rdlock(&rib.flows_lock); + list_for_each(p, &rib.flows) { + struct mgmt_flow * e = + list_entry(p, struct mgmt_flow, next); + + /* Don't send it back */ + if (e->instance == instance) + continue; + + if (write_ro_msg(e->instance, msg, name, opcode)) { + LOG_ERR("Failed to send to a neighbor."); + pthread_rwlock_unlock(&rib.flows_lock); + ro_msg__free_unpacked(msg, NULL); + return -1; + } + } + pthread_rwlock_unlock(&rib.flows_lock); + } + + ro_msg__free_unpacked(msg, NULL); + + return ret; } -static struct cdap_ops ribmgr_ops = { +static struct cdap_ops ribmgr_cdap_ops = { .cdap_reply = ribmgr_cdap_reply, .cdap_request = ribmgr_cdap_request }; @@ -555,7 +1052,7 @@ int ribmgr_add_flow(int fd) if (flow == NULL) return -1; - instance = cdap_create(&ribmgr_ops, fd); + instance = cdap_create(&ribmgr_cdap_ops, fd); if (instance == NULL) { LOG_ERR("Failed to create CDAP instance"); free(flow); @@ -630,24 +1127,92 @@ int ribmgr_remove_flow(int fd) int ribmgr_bootstrap(struct dif_config * conf) { + static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; + uint8_t * data = NULL; + size_t len = 0; + struct ro_props * props; + if (conf == NULL || conf->type != IPCP_NORMAL) { LOG_ERR("Bad DIF configuration."); return -1; } - rib.dtc.addr_size = conf->addr_size; - rib.dtc.cep_id_size = conf->cep_id_size; - rib.dtc.pdu_length_size = conf->pdu_length_size; - rib.dtc.seqno_size = conf->seqno_size; - rib.dtc.has_ttl = conf->has_ttl; - rib.dtc.has_chk = conf->has_chk; - rib.dtc.min_pdu_size = conf->min_pdu_size; - rib.dtc.max_pdu_size = conf->max_pdu_size; + props = malloc(sizeof(*props)); + if (props == NULL) { + LOG_ERR("Failed to allocate memory."); + return -1; + } + + props->enrol_sync = true; + props->recv_set = NEIGHBORS; + props->expiry.tv_sec = 0; + props->expiry.tv_nsec = 0; + + if (ribmgr_ro_create(RIBMGR_PREFIX, props, NULL, 0) == NULL) { + LOG_ERR("Failed to create RIBMGR RO."); + free(props); + return -1; + } + + stat_info.addr_size = rib.dtc.addr_size = conf->addr_size; + stat_info.cep_id_size = rib.dtc.cep_id_size = conf->cep_id_size; + stat_info.pdu_length_size = rib.dtc.pdu_length_size + = conf->pdu_length_size; + stat_info.seqno_size = rib.dtc.seqno_size = conf->seqno_size; + stat_info.has_ttl = rib.dtc.has_ttl = conf->has_ttl; + stat_info.has_chk = rib.dtc.has_chk = conf->has_chk; + stat_info.min_pdu_size = rib.dtc.min_pdu_size = conf->min_pdu_size; + stat_info.max_pdu_size = rib.dtc.max_pdu_size = conf->max_pdu_size; rib.addr_auth = addr_auth_create(conf->addr_auth_type); if (rib.addr_auth == NULL) { LOG_ERR("Failed to create address authority."); + ro_delete(RIBMGR_PREFIX); + return -1; + } + + stat_info.addr_auth_type = rib.addr_auth->type; + + len = static_info_msg__get_packed_size(&stat_info); + if (len == 0) { + LOG_ERR("Failed to get size of static information."); + addr_auth_destroy(rib.addr_auth); + ribmgr_ro_delete(RIBMGR_PREFIX); + return -1; + } + + data = malloc(len); + if (data == NULL) { + LOG_ERR("Failed to allocate memory."); + addr_auth_destroy(rib.addr_auth); + ribmgr_ro_delete(RIBMGR_PREFIX); + return -1; + } + + static_info_msg__pack(&stat_info, data); + + props = malloc(sizeof(*props)); + if (props == NULL) { + LOG_ERR("Failed to allocate memory."); + free(data); + addr_auth_destroy(rib.addr_auth); + ribmgr_ro_delete(RIBMGR_PREFIX); + return -1; + } + + props->enrol_sync = true; + props->recv_set = NEIGHBORS; + props->expiry.tv_sec = 0; + props->expiry.tv_nsec = 0; + + if (ribmgr_ro_create(RIBMGR_PREFIX STAT_INFO, + props, data, len) == NULL) { + LOG_ERR("Failed to create static info RO."); + free(data); + free(props); + addr_auth_destroy(rib.addr_auth); + ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -656,7 +1221,9 @@ int ribmgr_bootstrap(struct dif_config * conf) if (frct_init()) { LOG_ERR("Failed to initialize FRCT."); + ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); addr_auth_destroy(rib.addr_auth); + ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -675,109 +1242,64 @@ uint64_t ribmgr_address() return rib.address; } -int ro_create(const char * name, - uint8_t * data, - size_t len) +static int send_neighbors_ro(char * name, + ro_msg_t * msg, + enum cdap_opcode code) { - char * str, * str1, * saveptr, * token; - struct rnode * node, * new, * prev; - bool sibling; struct list_head * p = NULL; - size_t len_s, len_n; - uint8_t * ro_data; - str = strdup(name); - if (str == NULL) - return -1; + pthread_rwlock_rdlock(&rib.flows_lock); + list_for_each(p, &rib.flows) { + struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); - pthread_mutex_lock(&rib.ro_lock); - node = rib.root; - - for (str1 = str; ; str1 = NULL) { - token = strtok_r(str1, PATH_DELIMITER, &saveptr); - if (token == NULL) { - pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("RO already exists."); - free(str); + if (write_ro_msg(e->instance, msg, name, code)) { + LOG_ERR("Failed to send to a neighbor."); + pthread_rwlock_unlock(&rib.flows_lock); return -1; } + } + pthread_rwlock_unlock(&rib.flows_lock); - prev = node; - node = node->child; - sibling = false; + return 0; +} - /* Search horizontally */ - while (node != NULL) { - if (strcmp(node->name, token) == 0) { - break; - } else { - prev = node; - node = node->sibling; - sibling = true; - } - } +int ro_create(const char * name, + struct ro_props * props, + uint8_t * data, + size_t len) +{ + struct rnode * node; + ro_msg_t msg = RO_MSG__INIT; - if (node == NULL) - break; - } + if (name == NULL || props == NULL) + return -EINVAL; + + pthread_mutex_lock(&rib.ro_lock); - token = strtok_r(str1, PATH_DELIMITER, &saveptr); - if (token != NULL) { + node = ribmgr_ro_create(name, props, data, len); + if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); - LOG_ERR("Part of the pathname does not exist."); - free(str); + LOG_ERR("Failed to create RO."); return -1; } - free(str); - - new = malloc(sizeof(*new)); - if (new == NULL) { + if (node->props->recv_set == NO_SYNC) { pthread_mutex_unlock(&rib.ro_lock); - return -1; + return 0; } - new->name = strdup(token); - if (new->name == NULL) { + if (ro_msg_create(node, &msg)) { pthread_mutex_unlock(&rib.ro_lock); - free(new); + LOG_ERR("Failed to create RO msg."); return -1; } - if (sibling) - prev->sibling = new; - else - prev->child = new; - - new->data = data; - new->len = len; - new->child = NULL; - new->sibling = NULL; - - pthread_mutex_lock(&rib.subs_lock); - - list_for_each(p, &rib.subs) { - struct ro_sub * e = list_entry(p, struct ro_sub, next); - len_s = strlen(e->name); - len_n = strlen(name); - - if (len_n < len_s) - continue; - - if (strncmp(name, e->name, len_s) == 0) { - if (e->ops->ro_created == NULL) - continue; - - ro_data = malloc(len); - if (ro_data == NULL) - continue; - - memcpy(ro_data, data, len); - e->ops->ro_created(name, ro_data, len); - } + if (send_neighbors_ro(node->full_name, &msg, CDAP_CREATE)) { + pthread_mutex_unlock(&rib.ro_lock); + LOG_ERR("Failed to send to neighbors."); + return -1; } - pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); return 0; @@ -785,207 +1307,94 @@ int ro_create(const char * name, int ro_delete(const char * name) { - char * str, * str1, * saveptr, * token; - struct rnode * node, * prev; - bool sibling = false; - struct list_head * p = NULL; - size_t len_s, len_n; + struct rnode * node; + ro_msg_t msg = RO_MSG__INIT; - str = strdup(name); - if (str == NULL) - return -1; + if (name == NULL) + return -EINVAL; pthread_mutex_lock(&rib.ro_lock); - node = rib.root; - prev = NULL; - - for (str1 = str; ; str1 = NULL) { - token = strtok_r(str1, PATH_DELIMITER, &saveptr); - if (token == NULL) - break; - - prev = node; - node = node->child; - sibling = false; + node = find_rnode_by_name(name); + if (node == NULL) { + pthread_mutex_unlock(&rib.ro_lock); + LOG_ERR("Failed to sync RO."); + return -1; + } - while (node != NULL) { - if (strcmp(node->name, token) == 0) { - break; - } else { - prev = node; - node = node->sibling; - sibling = true; - } + if (node->props->recv_set != NO_SYNC) { + if (ro_msg_create(node, &msg)) { + pthread_mutex_unlock(&rib.ro_lock); + LOG_ERR("Failed to create RO msg."); + return -1; } - if (node == NULL) { + if (send_neighbors_ro(node->full_name, &msg, CDAP_DELETE)) { pthread_mutex_unlock(&rib.ro_lock); - free(str); + LOG_ERR("Failed to send to neighbors."); return -1; } } - if (node == rib.root) { - LOG_ERR("Won't remove root."); - free(str); + if (ribmgr_ro_delete(name)) { + pthread_mutex_unlock(&rib.ro_lock); return -1; } - free(node->name); - if (node->data != NULL) - free(node->data); - - if (sibling) - prev->sibling = node->sibling; - else - prev->child = node->sibling; - - free(node); - - pthread_mutex_lock(&rib.subs_lock); - - list_for_each(p, &rib.subs) { - struct ro_sub * e = list_entry(p, struct ro_sub, next); - len_s = strlen(e->name); - len_n = strlen(name); - - if (len_n < len_s) - continue; - - if (strncmp(name, e->name, len_s) == 0) { - if (e->ops->ro_deleted == NULL) - continue; - - e->ops->ro_deleted(name); - } - } - - pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - free(str); return 0; } -static struct rnode * find_rnode_by_name(const char * name) +int ro_write(const char * name, + uint8_t * data, + size_t len) { - char * str, * str1, * saveptr, * token; struct rnode * node; + ro_msg_t msg = RO_MSG__INIT; - str = strdup(name); - if (str == NULL) - return NULL; - - node = rib.root; - - for (str1 = str; ; str1 = NULL) { - token = strtok_r(str1, PATH_DELIMITER, &saveptr); - if (token == NULL) - break; - - node = node->child; - - while (node != NULL) - if (strcmp(node->name, token) == 0) - break; - else - node = node->sibling; - - if (node == NULL) { - free(str); - return NULL; - } - } - - free(str); - return node; -} - -ssize_t ro_read(const char * name, - uint8_t ** data) -{ - struct rnode * node; - ssize_t len; + if (name == NULL || data == NULL) + return -EINVAL; pthread_mutex_lock(&rib.ro_lock); - node = find_rnode_by_name(name); + node = ribmgr_ro_write(name, data, len); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); + LOG_ERR("Failed to create RO."); return -1; } - *data = malloc(node->len); - if (*data == NULL) { + if (node->props->recv_set == NO_SYNC) { pthread_mutex_unlock(&rib.ro_lock); - return -1; + return 0; } - memcpy(*data, node->data, node->len); - len = node->len; - - pthread_mutex_unlock(&rib.ro_lock); - - return len; -} - -int ro_write(const char * name, - uint8_t * data, - size_t len) -{ - struct rnode * node; - struct list_head * p = NULL; - size_t len_s, len_n; - uint8_t * ro_data; - - pthread_mutex_lock(&rib.ro_lock); - - node = find_rnode_by_name(name); - if (node == NULL) { + if (ro_msg_create(node, &msg)) { pthread_mutex_unlock(&rib.ro_lock); + LOG_ERR("Failed to create RO msg."); return -1; } - free(node->data); - - node->data = data; - node->len = len; - - pthread_mutex_lock(&rib.subs_lock); - - list_for_each(p, &rib.subs) { - struct ro_sub * e = - list_entry(p, struct ro_sub, next); - len_s = strlen(e->name); - len_n = strlen(name); - - if (len_n < len_s) - continue; - - if (strncmp(name, e->name, len_s) == 0) { - if (e->ops->ro_updated == NULL) - continue; - - ro_data = malloc(len); - if (ro_data == NULL) - continue; - - memcpy(ro_data, data, len); - e->ops->ro_updated(name, ro_data, len); - } + if (send_neighbors_ro(node->full_name, &msg, CDAP_WRITE)) { + pthread_mutex_unlock(&rib.ro_lock); + LOG_ERR("Failed to send to neighbors."); + return -1; } - pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); return 0; } -int ro_props(const char * name, - struct ro_props * props) +ssize_t ro_read(const char * name, + uint8_t ** data) { struct rnode * node; + ssize_t len; + + if (name == NULL || data == NULL) + return -EINVAL; pthread_mutex_lock(&rib.ro_lock); @@ -995,27 +1404,18 @@ int ro_props(const char * name, return -1; } - if (node->props != NULL) { - if (node->props->expiry != NULL) - free(node->props->expiry); - free(node->props); + *data = malloc(node->len); + if (*data == NULL) { + pthread_mutex_unlock(&rib.ro_lock); + return -1; } - node->props = props; + memcpy(*data, node->data, node->len); + len = node->len; pthread_mutex_unlock(&rib.ro_lock); - return 0; -} - -int ro_sync(const char * name) -{ - (void) name; - - LOG_MISSING; - /* FIXME: We need whatevercast sets first */ - - return -1; + return len; } int ro_subscribe(const char * name, diff --git a/src/ipcpd/normal/ro.h b/src/ipcpd/normal/ro.h index 0dfa7e8a..6fa1db2a 100644 --- a/src/ipcpd/normal/ro.h +++ b/src/ipcpd/normal/ro.h @@ -24,38 +24,33 @@ #define OUROBOROS_IPCP_RO_H enum ro_recv_set { - ALL_MEMBERS = 0, - NEIGHBORS + NO_SYNC = 0, + NEIGHBORS, + ALL_MEMBERS }; struct ro_props { - bool enrol_sync; - enum ro_recv_set recv_set; - struct timespec * expiry; + bool enrol_sync; + enum ro_recv_set recv_set; + struct timespec expiry; }; /* All RIB-objects have a pathname, separated by a slash. */ -/* Takes ownership of the data */ -int ro_create(const char * name, - uint8_t * data, - size_t len); +/* Takes ownership of the data and props */ +int ro_create(const char * name, + struct ro_props * props, + uint8_t * data, + size_t len); int ro_delete(const char * name); -/* Reader takes ownership of data */ -ssize_t ro_read(const char * name, - uint8_t ** data); - int ro_write(const char * name, uint8_t * data, size_t len); -/* Takes ownership of the props */ -int ro_props(const char * name, - struct ro_props * props); - -/* Sync changes with other members in the DIF */ -int ro_sync(const char * name); +/* Reader takes ownership of data */ +ssize_t ro_read(const char * name, + uint8_t ** data); /* Callback passes ownership of the data */ struct ro_sub_ops { diff --git a/src/ipcpd/normal/ro.proto b/src/ipcpd/normal/ro.proto new file mode 100644 index 00000000..c5062468 --- /dev/null +++ b/src/ipcpd/normal/ro.proto @@ -0,0 +1,9 @@ +message ro_msg { + required uint64 address = 1; + required uint64 seqno = 2; + required int32 recv_set = 3; + required bool enrol_sync = 4; + required uint32 sec = 5; + required uint64 nsec = 6; + required bytes value = 7; +} \ No newline at end of file -- cgit v1.2.3