summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-11-20 13:07:28 +0000
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-11-20 13:07:28 +0000
commitcad42bc85f351362894810aae1a5018ccb5e3c9f (patch)
tree6a02e236ddc1ccbd266760e021eb39713aa006c9 /src/ipcpd/normal
parentc39e0cbc5e5fd18625dea6522fbd389eb9dc871f (diff)
parentf8632a11faed3402e8255f368b8eff58f3b2eadb (diff)
downloadouroboros-cad42bc85f351362894810aae1a5018ccb5e3c9f.tar.gz
ouroboros-cad42bc85f351362894810aae1a5018ccb5e3c9f.zip
Merged in sandervrijders/ouroboros/be-rib-sync (pull request #301)
ipcpd: normal: Add syncing of RIB objects
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/CMakeLists.txt4
-rw-r--r--src/ipcpd/normal/ribmgr.c1144
-rw-r--r--src/ipcpd/normal/ro.h33
-rw-r--r--src/ipcpd/normal/ro.proto9
4 files changed, 798 insertions, 392 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 08c5c691..06e41e9c 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -20,6 +20,8 @@ protobuf_generate_c(STATIC_INFO_SRCS STATIC_INFO_HDRS
protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS
flow_alloc.proto)
+protobuf_generate_c(RO_SRCS RO_HDRS ro.proto)
+
set(SOURCE_FILES
# Add source files here
addr_auth.c
@@ -34,7 +36,7 @@ set(SOURCE_FILES
)
add_executable (ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES}
- ${STATIC_INFO_SRCS} ${FLOW_ALLOC_SRCS})
+ ${STATIC_INFO_SRCS} ${FLOW_ALLOC_SRCS} ${RO_SRCS})
target_link_libraries (ipcpd-normal LINK_PUBLIC ouroboros)
include(MacroAddCompileFlags)
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 96c7e7e0..49971eda 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -47,15 +47,22 @@
#include "static_info.pb-c.h"
typedef StaticInfoMsg static_info_msg_t;
+#include "ro.pb-c.h"
+typedef RoMsg ro_msg_t;
+
#define SUBS_SIZE 25
#define ENROLLMENT "enrollment"
-#define STATIC_INFO "static DIF information"
+
+#define RIBMGR_PREFIX "/ribmgr"
+#define STAT_INFO "/statinfo"
#define PATH_DELIMITER "/"
/* RIB objects */
struct rnode {
char * name;
+ char * full_name;
+ uint64_t seqno;
/*
* NOTE: Naive implementation for now, could be replaced by
@@ -92,6 +99,7 @@ struct {
struct list_head subs;
struct bmp * sids;
pthread_mutex_t subs_lock;
+ int ribmgr_sid;
struct dt_const dtc;
@@ -106,6 +114,296 @@ struct {
struct addr_auth * addr_auth;
} rib;
+int ribmgr_ro_created(const char * name,
+ uint8_t * data,
+ size_t len)
+{
+ static_info_msg_t * stat_msg;
+
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
+ if (ipcp_get_state() == IPCP_PENDING_ENROLL &&
+ strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) {
+ LOG_DBG("Received static DIF information.");
+
+ stat_msg = static_info_msg__unpack(NULL, len, data);
+ if (stat_msg == NULL) {
+ ipcp_set_state(IPCP_INIT);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("Failed to unpack static info message.");
+ return -1;
+ }
+
+ rib.dtc.addr_size = stat_msg->addr_size;
+ rib.dtc.cep_id_size = stat_msg->cep_id_size;
+ rib.dtc.pdu_length_size = stat_msg->pdu_length_size;
+ rib.dtc.seqno_size = stat_msg->seqno_size;
+ rib.dtc.has_ttl = stat_msg->has_ttl;
+ rib.dtc.has_chk = stat_msg->has_chk;
+ rib.dtc.min_pdu_size = stat_msg->min_pdu_size;
+ rib.dtc.max_pdu_size = stat_msg->max_pdu_size;
+
+ rib.addr_auth = addr_auth_create(stat_msg->addr_auth_type);
+ if (rib.addr_auth == NULL) {
+ ipcp_set_state(IPCP_INIT);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ static_info_msg__free_unpacked(stat_msg, NULL);
+ LOG_ERR("Failed to create address authority");
+ return -1;
+ }
+
+ rib.address = rib.addr_auth->address();
+ LOG_DBG("IPCP has address %lu", rib.address);
+
+ if (frct_init()) {
+ ipcp_set_state(IPCP_INIT);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ static_info_msg__free_unpacked(stat_msg, NULL);
+ LOG_ERR("Failed to init FRCT");
+ return -1;
+ }
+
+ static_info_msg__free_unpacked(stat_msg, NULL);
+ }
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ return 0;
+}
+
+/* We only have a create operation for now */
+static struct ro_sub_ops ribmgr_sub_ops = {
+ .ro_created = ribmgr_ro_created,
+ .ro_updated = NULL,
+ .ro_deleted = NULL
+};
+
+static struct rnode * find_rnode_by_name(const char * name)
+{
+ char * str;
+ char * str1;
+ char * saveptr;
+ char * token;
+ struct rnode * node;
+
+ str = strdup(name);
+ if (str == NULL)
+ return NULL;
+
+ node = rib.root;
+
+ for (str1 = str; ; str1 = NULL) {
+ token = strtok_r(str1, PATH_DELIMITER, &saveptr);
+ if (token == NULL)
+ break;
+
+ node = node->child;
+
+ while (node != NULL)
+ if (strcmp(node->name, token) == 0)
+ break;
+ else
+ node = node->sibling;
+
+ if (node == NULL) {
+ free(str);
+ return NULL;
+ }
+ }
+
+ free(str);
+ return node;
+}
+
+/* Call under RIB object lock */
+static int ro_msg_create(struct rnode * node,
+ ro_msg_t * msg)
+{
+ msg->address = rib.address;
+ msg->seqno = node->seqno;
+ msg->recv_set = node->props->recv_set;
+ msg->enrol_sync = node->props->enrol_sync;
+ msg->sec = node->props->expiry.tv_sec;
+ msg->nsec = node->props->expiry.tv_nsec;
+ msg->value.data = node->data;
+ msg->value.len = node->len;
+
+ return 0;
+}
+
+static struct rnode * ribmgr_ro_create(const char * name,
+ struct ro_props * props,
+ uint8_t * data,
+ size_t len)
+{
+ char * str;
+ char * str1;
+ char * saveptr;
+ char * token;
+ char * token2;
+ struct rnode * node;
+ struct rnode * new;
+ struct rnode * prev;
+ bool sibling;
+
+ str = strdup(name);
+ if (str == NULL)
+ return NULL;
+
+ node = rib.root;
+
+ for (str1 = str; ; str1 = NULL) {
+ token = strtok_r(str1, PATH_DELIMITER, &saveptr);
+ if (token == NULL) {
+ LOG_ERR("RO already exists.");
+ free(str);
+ return NULL;
+ }
+
+ prev = node;
+ node = node->child;
+ sibling = false;
+
+ /* Search horizontally */
+ while (node != NULL) {
+ if (strcmp(node->name, token) == 0) {
+ break;
+ } else {
+ prev = node;
+ node = node->sibling;
+ sibling = true;
+ }
+ }
+
+ if (node == NULL)
+ break;
+ }
+
+ token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr);
+ if (token2 != NULL) {
+ LOG_ERR("Part of the pathname does not exist.");
+ free(str);
+ return NULL;
+ }
+
+ new = malloc(sizeof(*new));
+ if (new == NULL) {
+ free(str);
+ return NULL;
+ }
+
+ new->name = strdup(token);
+ if (new->name == NULL) {
+ free(str);
+ free(new);
+ return NULL;
+ }
+
+ free(str);
+
+ new->full_name = strdup(name);
+ if (new->full_name == NULL) {
+ free(new);
+ return NULL;
+ }
+
+ new->seqno = 0;
+ new->props = props;
+
+ if (sibling)
+ prev->sibling = new;
+ else
+ prev->child = new;
+
+ new->data = data;
+ new->len = len;
+ new->child = NULL;
+ new->sibling = NULL;
+
+ return new;
+}
+
+static int ribmgr_ro_delete(const char * name)
+{
+ char * str;
+ char * str1;
+ char * saveptr;
+ char * token;
+ struct rnode * node;
+ struct rnode * prev;
+ bool sibling = false;
+
+ str = strdup(name);
+ if (str == NULL)
+ return -1;
+
+ node = rib.root;
+ prev = NULL;
+
+ for (str1 = str; ; str1 = NULL) {
+ token = strtok_r(str1, PATH_DELIMITER, &saveptr);
+ if (token == NULL)
+ break;
+
+ prev = node;
+ node = node->child;
+ sibling = false;
+
+ while (node != NULL) {
+ if (strcmp(node->name, token) == 0) {
+ break;
+ } else {
+ prev = node;
+ node = node->sibling;
+ sibling = true;
+ }
+ }
+
+ if (node == NULL) {
+ free(str);
+ return -1;
+ }
+ }
+
+ if (node == rib.root) {
+ LOG_ERR("Won't remove root.");
+ free(str);
+ return -1;
+ }
+
+ free(node->name);
+ free(node->full_name);
+ if (node->data != NULL)
+ free(node->data);
+
+ if (sibling)
+ prev->sibling = node->sibling;
+ else
+ prev->child = node->sibling;
+
+ free(node);
+ free(str);
+
+ return 0;
+}
+
+static struct rnode * ribmgr_ro_write(const char * name,
+ uint8_t * data,
+ size_t len)
+{
+ struct rnode * node;
+
+ node = find_rnode_by_name(name);
+ if (node == NULL)
+ return NULL;
+
+ free(node->data);
+
+ node->data = data;
+ node->len = len;
+ node->seqno++;
+
+ return node;
+}
+
/* Call while holding cdap_reqs_lock */
/* FIXME: better not to call blocking functions under any lock */
int cdap_result_wait(struct cdap * instance,
@@ -151,6 +449,46 @@ int cdap_result_wait(struct cdap * instance,
return ret;
}
+static int write_ro_msg(struct cdap * neighbor,
+ ro_msg_t * msg,
+ char * name,
+ enum cdap_opcode code)
+{
+ uint8_t * data;
+ size_t len;
+ int iid = 0;
+
+ len = ro_msg__get_packed_size(msg);
+ if (len == 0)
+ return -1;
+
+ data = malloc(len);
+ if (data == NULL)
+ return -ENOMEM;
+
+ ro_msg__pack(msg, data);
+
+ pthread_mutex_lock(&rib.cdap_reqs_lock);
+ iid = cdap_send_request(neighbor, code,
+ name, data, len, 0);
+ if (iid < 0) {
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ free(data);
+ return -1;
+ }
+
+ if (cdap_result_wait(neighbor, code, name, iid)) {
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ free(data);
+ LOG_ERR("Remote did not receive RIB object.");
+ return -1;
+ }
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ free(data);
+
+ return 0;
+}
+
int ribmgr_init()
{
INIT_LIST_HEAD(&rib.flows);
@@ -206,6 +544,18 @@ int ribmgr_init()
return -1;
}
+ rib.ribmgr_sid = ro_subscribe(RIBMGR_PREFIX, &ribmgr_sub_ops);
+ if (rib.ribmgr_sid < 0) {
+ LOG_ERR("Failed to subscribe.");
+ bmp_destroy(rib.sids);
+ pthread_rwlock_destroy(&rib.flows_lock);
+ pthread_mutex_destroy(&rib.cdap_reqs_lock);
+ pthread_mutex_destroy(&rib.ro_lock);
+ pthread_mutex_destroy(&rib.subs_lock);
+ free(rib.root);
+ return -1;
+ }
+
return 0;
}
@@ -247,6 +597,8 @@ int ribmgr_fini()
}
pthread_rwlock_unlock(&rib.flows_lock);
+ ro_unsubscribe(rib.ribmgr_sid);
+
if (rib.addr_auth != NULL)
addr_auth_destroy(rib.addr_auth);
@@ -273,6 +625,7 @@ static int ribmgr_cdap_reply(struct cdap * instance,
{
struct list_head * pos, * n = NULL;
+ /* We never perform reads on other RIBs */
(void) data;
(void) len;
@@ -295,7 +648,6 @@ static int ribmgr_cdap_reply(struct cdap * instance,
pthread_mutex_unlock(&rib.cdap_reqs_lock);
- /* FIXME: In case of a read, update values here */
cdap_request_respond(req, result);
pthread_mutex_lock(&rib.cdap_reqs_lock);
@@ -306,69 +658,184 @@ static int ribmgr_cdap_reply(struct cdap * instance,
return 0;
}
+static int ribmgr_cdap_create(struct cdap * instance,
+ int invoke_id,
+ char * name,
+ ro_msg_t * msg)
+{
+ int ret = 0;
+ struct list_head * p = NULL;
+ size_t len_s, len_n;
+ uint8_t * ro_data;
+ struct ro_props * props;
+ struct rnode * node;
+
+ props = malloc(sizeof(*props));
+ if (props == NULL) {
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ return -ENOMEM;
+ }
+
+ props->expiry.tv_sec = msg->sec;
+ props->expiry.tv_nsec = msg->nsec;
+ props->enrol_sync = msg->enrol_sync;
+
+ pthread_mutex_lock(&rib.ro_lock);
+
+ ro_data = malloc(msg->value.len);
+ if (ro_data == NULL) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ free(props);
+ return -1;
+ }
+ memcpy(ro_data, msg->value.data, msg->value.len);
+
+ node = ribmgr_ro_create(name, props, ro_data, msg->value.len);
+ if (node == NULL) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ free(props);
+ free(ro_data);
+ return -1;
+ }
+
+ pthread_mutex_lock(&rib.subs_lock);
+ list_for_each(p, &rib.subs) {
+ struct ro_sub * e = list_entry(p, struct ro_sub, next);
+ len_s = strlen(e->name);
+ len_n = strlen(name);
+
+ if (len_n < len_s)
+ continue;
+
+ if (memcmp(name, e->name, len_s) == 0) {
+ if (e->ops->ro_created == NULL)
+ continue;
+
+ ro_data = malloc(node->len);
+ if (ro_data == NULL)
+ continue;
+
+ memcpy(ro_data, node->data, node->len);
+ e->ops->ro_created(name, ro_data, node->len);
+ }
+ }
+
+ pthread_mutex_unlock(&rib.subs_lock);
+ pthread_mutex_unlock(&rib.ro_lock);
+
+ if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ LOG_ERR("Failed to send reply to create request.");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int ribmgr_cdap_delete(struct cdap * instance,
+ int invoke_id,
+ char * name)
+{
+ struct list_head * p = NULL;
+ size_t len_s;
+ size_t len_n;
+
+ pthread_mutex_lock(&rib.ro_lock);
+
+ if (ribmgr_ro_delete(name)) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ return -1;
+ }
+
+ pthread_mutex_lock(&rib.subs_lock);
+
+ list_for_each(p, &rib.subs) {
+ struct ro_sub * e = list_entry(p, struct ro_sub, next);
+ len_s = strlen(e->name);
+ len_n = strlen(name);
+
+ if (len_n < len_s)
+ continue;
+
+ if (memcmp(name, e->name, len_s) == 0) {
+ if (e->ops->ro_deleted == NULL)
+ continue;
+
+ e->ops->ro_deleted(name);
+ }
+ }
+
+ pthread_mutex_unlock(&rib.subs_lock);
+ pthread_mutex_unlock(&rib.ro_lock);
+
+ if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) {
+ LOG_ERR("Failed to send reply to create request.");
+ return -1;
+ }
+
+ return 0;
+}
+
static int ribmgr_cdap_write(struct cdap * instance,
int invoke_id,
char * name,
- uint8_t * data,
- size_t len,
+ ro_msg_t * msg,
uint32_t flags)
{
- static_info_msg_t * msg;
int ret = 0;
+ struct list_head * p = NULL;
+ size_t len_s;
+ size_t len_n;
+ uint8_t * ro_data;
+ struct rnode * node;
(void) flags;
- pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_PENDING_ENROLL &&
- strcmp(name, STATIC_INFO) == 0) {
- LOG_DBG("Received static DIF information.");
+ pthread_mutex_lock(&rib.ro_lock);
- msg = static_info_msg__unpack(NULL, len, data);
- if (msg == NULL) {
- ipcp_set_state(IPCP_INIT);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- LOG_ERR("Failed to unpack static info message.");
- return -1;
- }
+ ro_data = malloc(msg->value.len);
+ if (ro_data == NULL) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ return -1;
+ }
+ memcpy(ro_data, msg->value.data, msg->value.len);
- rib.dtc.addr_size = msg->addr_size;
- rib.dtc.cep_id_size = msg->cep_id_size;
- rib.dtc.pdu_length_size = msg->pdu_length_size;
- rib.dtc.seqno_size = msg->seqno_size;
- rib.dtc.has_ttl = msg->has_ttl;
- rib.dtc.has_chk = msg->has_chk;
- rib.dtc.min_pdu_size = msg->min_pdu_size;
- rib.dtc.max_pdu_size = msg->max_pdu_size;
+ node = ribmgr_ro_write(name, msg->value.data, msg->value.len);
+ if (node == NULL) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ free(ro_data);
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ return -1;
+ }
- rib.addr_auth = addr_auth_create(msg->addr_auth_type);
- if (rib.addr_auth == NULL) {
- ipcp_set_state(IPCP_INIT);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- static_info_msg__free_unpacked(msg, NULL);
- LOG_ERR("Failed to create address authority");
- return -1;
- }
+ pthread_mutex_lock(&rib.subs_lock);
- rib.address = rib.addr_auth->address();
- LOG_DBG("IPCP has address %lu", rib.address);
+ list_for_each(p, &rib.subs) {
+ struct ro_sub * e = list_entry(p, struct ro_sub, next);
+ len_s = strlen(e->name);
+ len_n = strlen(name);
- if (frct_init()) {
- ipcp_set_state(IPCP_INIT);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- static_info_msg__free_unpacked(msg, NULL);
- LOG_ERR("Failed to init FRCT");
- return -1;
- }
+ if (len_n < len_s)
+ continue;
- static_info_msg__free_unpacked(msg, NULL);
- } else {
- ret = -1;
+ if (memcmp(name, e->name, len_s) == 0) {
+ if (e->ops->ro_updated == NULL)
+ continue;
+
+ ro_data = malloc(node->len);
+ if (ro_data == NULL)
+ continue;
+
+ memcpy(ro_data, node->data, node->len);
+ e->ops->ro_updated(name, ro_data, node->len);
+ }
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ pthread_mutex_unlock(&rib.subs_lock);
+ pthread_mutex_unlock(&rib.ro_lock);
if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
LOG_ERR("Failed to send reply to write request.");
@@ -378,13 +845,39 @@ static int ribmgr_cdap_write(struct cdap * instance,
return 0;
}
+static int ribmgr_enrol_sync(struct cdap * instance,
+ struct rnode * node)
+{
+ int ret = 0;
+
+ if (node != NULL) {
+ if (node->props->enrol_sync == true) {
+ ro_msg_t msg = RO_MSG__INIT;
+
+ if (ro_msg_create(node, &msg)) {
+ LOG_ERR("Failed to create RO msg.");
+ return -1;
+ }
+
+ if (write_ro_msg(instance, &msg,
+ node->full_name, CDAP_CREATE)) {
+ LOG_ERR("Failed to send RO msg.");
+ return -1;
+ }
+ }
+
+ ret = ribmgr_enrol_sync(instance, node->child);
+ if (ret == 0)
+ ret = ribmgr_enrol_sync(instance, node->sibling);
+ }
+
+ return ret;
+}
+
static int ribmgr_cdap_start(struct cdap * instance,
int invoke_id,
char * name)
{
- static_info_msg_t stat_info = STATIC_INFO_MSG__INIT;
- uint8_t * data = NULL;
- size_t len = 0;
int iid = 0;
pthread_rwlock_wrlock(&ipcpi.state_lock);
@@ -398,57 +891,17 @@ static int ribmgr_cdap_start(struct cdap * instance,
return -1;
}
- stat_info.addr_size = rib.dtc.addr_size;
- stat_info.cep_id_size = rib.dtc.cep_id_size;
- stat_info.pdu_length_size = rib.dtc.pdu_length_size;
- stat_info.seqno_size = rib.dtc.seqno_size;
- stat_info.has_ttl = rib.dtc.has_ttl;
- stat_info.has_chk = rib.dtc.has_chk;
- stat_info.min_pdu_size = rib.dtc.min_pdu_size;
- stat_info.max_pdu_size = rib.dtc.max_pdu_size;
- stat_info.addr_auth_type = rib.addr_auth->type;
-
- len = static_info_msg__get_packed_size(&stat_info);
- if (len == 0) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Failed to get size of static information.");
- return -1;
- }
-
- data = malloc(len);
- if (data == NULL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Failed to allocate memory.");
- return -1;
- }
-
- static_info_msg__pack(&stat_info, data);
-
- LOG_DBGF("Sending static info...");
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- iid = cdap_send_request(instance, CDAP_WRITE,
- STATIC_INFO, data, len, 0);
- if (iid < 0) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- free(data);
- LOG_ERR("Failed to send static information.");
- return -1;
- }
+ /* Loop through rtree and send correct objects */
+ LOG_DBGF("Sending ROs that need to be sent on enrolment...");
- if (cdap_result_wait(instance, CDAP_WRITE,
- STATIC_INFO, iid)) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ pthread_mutex_lock(&rib.ro_lock);
+ if (ribmgr_enrol_sync(instance, rib.root->child)) {
+ pthread_mutex_unlock(&rib.ro_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- free(data);
- LOG_ERR("Remote did not receive static information.");
+ LOG_ERR("Failed to sync part of the RIB.");
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- /* FIXME: Send neighbors here. */
+ pthread_mutex_unlock(&rib.ro_lock);
LOG_DBGF("Sending stop enrollment...");
@@ -459,7 +912,6 @@ static int ribmgr_cdap_start(struct cdap * instance,
if (iid < 0) {
pthread_mutex_unlock(&rib.cdap_reqs_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- free(data);
LOG_ERR("Failed to send stop of enrollment.");
return -1;
}
@@ -468,13 +920,10 @@ static int ribmgr_cdap_start(struct cdap * instance,
ENROLLMENT, iid)) {
pthread_mutex_unlock(&rib.cdap_reqs_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- free(data);
LOG_ERR("Remote failed to complete enrollment.");
return -1;
}
pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- free(data);
} else {
if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -520,27 +969,75 @@ static int ribmgr_cdap_request(struct cdap * instance,
size_t len,
uint32_t flags)
{
- switch (opcode) {
- case CDAP_WRITE:
- return ribmgr_cdap_write(instance,
- invoke_id,
- name, data,
- len, flags);
- case CDAP_START:
+ ro_msg_t * msg;
+ int ret = -1;
+ struct list_head * p = NULL;
+
+ if (opcode == CDAP_START)
return ribmgr_cdap_start(instance,
invoke_id,
name);
- case CDAP_STOP:
+ else if (opcode == CDAP_STOP)
return ribmgr_cdap_stop(instance,
invoke_id,
name);
- default:
- LOG_INFO("Unsupported CDAP opcode received.");
+
+ msg = ro_msg__unpack(NULL, len, data);
+ if (msg == NULL) {
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ LOG_ERR("Failed to unpack RO message");
return -1;
}
+
+ /* FIXME: Check if we already received this */
+
+ if (opcode == CDAP_CREATE) {
+ ret = ribmgr_cdap_create(instance,
+ invoke_id,
+ name,
+ msg);
+ } else if (opcode == CDAP_WRITE) {
+ ret = ribmgr_cdap_write(instance,
+ invoke_id,
+ name, msg,
+ flags);
+
+ } else if (opcode == CDAP_DELETE) {
+ ret = ribmgr_cdap_delete(instance,
+ invoke_id,
+ name);
+ } else {
+ LOG_INFO("Unsupported opcode received.");
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ return -1;
+ }
+
+ if (msg->recv_set == ALL_MEMBERS) {
+ pthread_rwlock_rdlock(&rib.flows_lock);
+ list_for_each(p, &rib.flows) {
+ struct mgmt_flow * e =
+ list_entry(p, struct mgmt_flow, next);
+
+ /* Don't send it back */
+ if (e->instance == instance)
+ continue;
+
+ if (write_ro_msg(e->instance, msg, name, opcode)) {
+ LOG_ERR("Failed to send to a neighbor.");
+ pthread_rwlock_unlock(&rib.flows_lock);
+ ro_msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+ }
+ pthread_rwlock_unlock(&rib.flows_lock);
+ }
+
+ ro_msg__free_unpacked(msg, NULL);
+
+ return ret;
}
-static struct cdap_ops ribmgr_ops = {
+static struct cdap_ops ribmgr_cdap_ops = {
.cdap_reply = ribmgr_cdap_reply,
.cdap_request = ribmgr_cdap_request
};
@@ -555,7 +1052,7 @@ int ribmgr_add_flow(int fd)
if (flow == NULL)
return -1;
- instance = cdap_create(&ribmgr_ops, fd);
+ instance = cdap_create(&ribmgr_cdap_ops, fd);
if (instance == NULL) {
LOG_ERR("Failed to create CDAP instance");
free(flow);
@@ -630,24 +1127,92 @@ int ribmgr_remove_flow(int fd)
int ribmgr_bootstrap(struct dif_config * conf)
{
+ static_info_msg_t stat_info = STATIC_INFO_MSG__INIT;
+ uint8_t * data = NULL;
+ size_t len = 0;
+ struct ro_props * props;
+
if (conf == NULL ||
conf->type != IPCP_NORMAL) {
LOG_ERR("Bad DIF configuration.");
return -1;
}
- rib.dtc.addr_size = conf->addr_size;
- rib.dtc.cep_id_size = conf->cep_id_size;
- rib.dtc.pdu_length_size = conf->pdu_length_size;
- rib.dtc.seqno_size = conf->seqno_size;
- rib.dtc.has_ttl = conf->has_ttl;
- rib.dtc.has_chk = conf->has_chk;
- rib.dtc.min_pdu_size = conf->min_pdu_size;
- rib.dtc.max_pdu_size = conf->max_pdu_size;
+ props = malloc(sizeof(*props));
+ if (props == NULL) {
+ LOG_ERR("Failed to allocate memory.");
+ return -1;
+ }
+
+ props->enrol_sync = true;
+ props->recv_set = NEIGHBORS;
+ props->expiry.tv_sec = 0;
+ props->expiry.tv_nsec = 0;
+
+ if (ribmgr_ro_create(RIBMGR_PREFIX, props, NULL, 0) == NULL) {
+ LOG_ERR("Failed to create RIBMGR RO.");
+ free(props);
+ return -1;
+ }
+
+ stat_info.addr_size = rib.dtc.addr_size = conf->addr_size;
+ stat_info.cep_id_size = rib.dtc.cep_id_size = conf->cep_id_size;
+ stat_info.pdu_length_size = rib.dtc.pdu_length_size
+ = conf->pdu_length_size;
+ stat_info.seqno_size = rib.dtc.seqno_size = conf->seqno_size;
+ stat_info.has_ttl = rib.dtc.has_ttl = conf->has_ttl;
+ stat_info.has_chk = rib.dtc.has_chk = conf->has_chk;
+ stat_info.min_pdu_size = rib.dtc.min_pdu_size = conf->min_pdu_size;
+ stat_info.max_pdu_size = rib.dtc.max_pdu_size = conf->max_pdu_size;
rib.addr_auth = addr_auth_create(conf->addr_auth_type);
if (rib.addr_auth == NULL) {
LOG_ERR("Failed to create address authority.");
+ ro_delete(RIBMGR_PREFIX);
+ return -1;
+ }
+
+ stat_info.addr_auth_type = rib.addr_auth->type;
+
+ len = static_info_msg__get_packed_size(&stat_info);
+ if (len == 0) {
+ LOG_ERR("Failed to get size of static information.");
+ addr_auth_destroy(rib.addr_auth);
+ ribmgr_ro_delete(RIBMGR_PREFIX);
+ return -1;
+ }
+
+ data = malloc(len);
+ if (data == NULL) {
+ LOG_ERR("Failed to allocate memory.");
+ addr_auth_destroy(rib.addr_auth);
+ ribmgr_ro_delete(RIBMGR_PREFIX);
+ return -1;
+ }
+
+ static_info_msg__pack(&stat_info, data);
+
+ props = malloc(sizeof(*props));
+ if (props == NULL) {
+ LOG_ERR("Failed to allocate memory.");
+ free(data);
+ addr_auth_destroy(rib.addr_auth);
+ ribmgr_ro_delete(RIBMGR_PREFIX);
+ return -1;
+ }
+
+ props->enrol_sync = true;
+ props->recv_set = NEIGHBORS;
+ props->expiry.tv_sec = 0;
+ props->expiry.tv_nsec = 0;
+
+ if (ribmgr_ro_create(RIBMGR_PREFIX STAT_INFO,
+ props, data, len) == NULL) {
+ LOG_ERR("Failed to create static info RO.");
+ free(data);
+ free(props);
+ addr_auth_destroy(rib.addr_auth);
+ ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -656,7 +1221,9 @@ int ribmgr_bootstrap(struct dif_config * conf)
if (frct_init()) {
LOG_ERR("Failed to initialize FRCT.");
+ ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);
addr_auth_destroy(rib.addr_auth);
+ ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -675,109 +1242,64 @@ uint64_t ribmgr_address()
return rib.address;
}
-int ro_create(const char * name,
- uint8_t * data,
- size_t len)
+static int send_neighbors_ro(char * name,
+ ro_msg_t * msg,
+ enum cdap_opcode code)
{
- char * str, * str1, * saveptr, * token;
- struct rnode * node, * new, * prev;
- bool sibling;
struct list_head * p = NULL;
- size_t len_s, len_n;
- uint8_t * ro_data;
-
- str = strdup(name);
- if (str == NULL)
- return -1;
- pthread_mutex_lock(&rib.ro_lock);
- node = rib.root;
+ pthread_rwlock_rdlock(&rib.flows_lock);
+ list_for_each(p, &rib.flows) {
+ struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next);
- for (str1 = str; ; str1 = NULL) {
- token = strtok_r(str1, PATH_DELIMITER, &saveptr);
- if (token == NULL) {
- pthread_mutex_unlock(&rib.ro_lock);
- LOG_ERR("RO already exists.");
- free(str);
+ if (write_ro_msg(e->instance, msg, name, code)) {
+ LOG_ERR("Failed to send to a neighbor.");
+ pthread_rwlock_unlock(&rib.flows_lock);
return -1;
}
+ }
+ pthread_rwlock_unlock(&rib.flows_lock);
- prev = node;
- node = node->child;
- sibling = false;
+ return 0;
+}
- /* Search horizontally */
- while (node != NULL) {
- if (strcmp(node->name, token) == 0) {
- break;
- } else {
- prev = node;
- node = node->sibling;
- sibling = true;
- }
- }
+int ro_create(const char * name,
+ struct ro_props * props,
+ uint8_t * data,
+ size_t len)
+{
+ struct rnode * node;
+ ro_msg_t msg = RO_MSG__INIT;
- if (node == NULL)
- break;
- }
+ if (name == NULL || props == NULL)
+ return -EINVAL;
- token = strtok_r(str1, PATH_DELIMITER, &saveptr);
- if (token != NULL) {
+ pthread_mutex_lock(&rib.ro_lock);
+
+ node = ribmgr_ro_create(name, props, data, len);
+ if (node == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- LOG_ERR("Part of the pathname does not exist.");
- free(str);
+ LOG_ERR("Failed to create RO.");
return -1;
}
- free(str);
-
- new = malloc(sizeof(*new));
- if (new == NULL) {
+ if (node->props->recv_set == NO_SYNC) {
pthread_mutex_unlock(&rib.ro_lock);
- return -1;
+ return 0;
}
- new->name = strdup(token);
- if (new->name == NULL) {
+ if (ro_msg_create(node, &msg)) {
pthread_mutex_unlock(&rib.ro_lock);
- free(new);
+ LOG_ERR("Failed to create RO msg.");
return -1;
}
- if (sibling)
- prev->sibling = new;
- else
- prev->child = new;
-
- new->data = data;
- new->len = len;
- new->child = NULL;
- new->sibling = NULL;
-
- pthread_mutex_lock(&rib.subs_lock);
-
- list_for_each(p, &rib.subs) {
- struct ro_sub * e = list_entry(p, struct ro_sub, next);
- len_s = strlen(e->name);
- len_n = strlen(name);
-
- if (len_n < len_s)
- continue;
-
- if (strncmp(name, e->name, len_s) == 0) {
- if (e->ops->ro_created == NULL)
- continue;
-
- ro_data = malloc(len);
- if (ro_data == NULL)
- continue;
-
- memcpy(ro_data, data, len);
- e->ops->ro_created(name, ro_data, len);
- }
+ if (send_neighbors_ro(node->full_name, &msg, CDAP_CREATE)) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ LOG_ERR("Failed to send to neighbors.");
+ return -1;
}
- pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
return 0;
@@ -785,207 +1307,94 @@ int ro_create(const char * name,
int ro_delete(const char * name)
{
- char * str, * str1, * saveptr, * token;
- struct rnode * node, * prev;
- bool sibling = false;
- struct list_head * p = NULL;
- size_t len_s, len_n;
+ struct rnode * node;
+ ro_msg_t msg = RO_MSG__INIT;
- str = strdup(name);
- if (str == NULL)
- return -1;
+ if (name == NULL)
+ return -EINVAL;
pthread_mutex_lock(&rib.ro_lock);
- node = rib.root;
- prev = NULL;
-
- for (str1 = str; ; str1 = NULL) {
- token = strtok_r(str1, PATH_DELIMITER, &saveptr);
- if (token == NULL)
- break;
-
- prev = node;
- node = node->child;
- sibling = false;
+ node = find_rnode_by_name(name);
+ if (node == NULL) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ LOG_ERR("Failed to sync RO.");
+ return -1;
+ }
- while (node != NULL) {
- if (strcmp(node->name, token) == 0) {
- break;
- } else {
- prev = node;
- node = node->sibling;
- sibling = true;
- }
+ if (node->props->recv_set != NO_SYNC) {
+ if (ro_msg_create(node, &msg)) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ LOG_ERR("Failed to create RO msg.");
+ return -1;
}
- if (node == NULL) {
+ if (send_neighbors_ro(node->full_name, &msg, CDAP_DELETE)) {
pthread_mutex_unlock(&rib.ro_lock);
- free(str);
+ LOG_ERR("Failed to send to neighbors.");
return -1;
}
}
- if (node == rib.root) {
- LOG_ERR("Won't remove root.");
- free(str);
+ if (ribmgr_ro_delete(name)) {
+ pthread_mutex_unlock(&rib.ro_lock);
return -1;
}
- free(node->name);
- if (node->data != NULL)
- free(node->data);
-
- if (sibling)
- prev->sibling = node->sibling;
- else
- prev->child = node->sibling;
-
- free(node);
-
- pthread_mutex_lock(&rib.subs_lock);
-
- list_for_each(p, &rib.subs) {
- struct ro_sub * e = list_entry(p, struct ro_sub, next);
- len_s = strlen(e->name);
- len_n = strlen(name);
-
- if (len_n < len_s)
- continue;
-
- if (strncmp(name, e->name, len_s) == 0) {
- if (e->ops->ro_deleted == NULL)
- continue;
-
- e->ops->ro_deleted(name);
- }
- }
-
- pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- free(str);
return 0;
}
-static struct rnode * find_rnode_by_name(const char * name)
+int ro_write(const char * name,
+ uint8_t * data,
+ size_t len)
{
- char * str, * str1, * saveptr, * token;
struct rnode * node;
+ ro_msg_t msg = RO_MSG__INIT;
- str = strdup(name);
- if (str == NULL)
- return NULL;
-
- node = rib.root;
-
- for (str1 = str; ; str1 = NULL) {
- token = strtok_r(str1, PATH_DELIMITER, &saveptr);
- if (token == NULL)
- break;
-
- node = node->child;
-
- while (node != NULL)
- if (strcmp(node->name, token) == 0)
- break;
- else
- node = node->sibling;
-
- if (node == NULL) {
- free(str);
- return NULL;
- }
- }
-
- free(str);
- return node;
-}
-
-ssize_t ro_read(const char * name,
- uint8_t ** data)
-{
- struct rnode * node;
- ssize_t len;
+ if (name == NULL || data == NULL)
+ return -EINVAL;
pthread_mutex_lock(&rib.ro_lock);
- node = find_rnode_by_name(name);
+ node = ribmgr_ro_write(name, data, len);
if (node == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
+ LOG_ERR("Failed to create RO.");
return -1;
}
- *data = malloc(node->len);
- if (*data == NULL) {
+ if (node->props->recv_set == NO_SYNC) {
pthread_mutex_unlock(&rib.ro_lock);
- return -1;
+ return 0;
}
- memcpy(*data, node->data, node->len);
- len = node->len;
-
- pthread_mutex_unlock(&rib.ro_lock);
-
- return len;
-}
-
-int ro_write(const char * name,
- uint8_t * data,
- size_t len)
-{
- struct rnode * node;
- struct list_head * p = NULL;
- size_t len_s, len_n;
- uint8_t * ro_data;
-
- pthread_mutex_lock(&rib.ro_lock);
-
- node = find_rnode_by_name(name);
- if (node == NULL) {
+ if (ro_msg_create(node, &msg)) {
pthread_mutex_unlock(&rib.ro_lock);
+ LOG_ERR("Failed to create RO msg.");
return -1;
}
- free(node->data);
-
- node->data = data;
- node->len = len;
-
- pthread_mutex_lock(&rib.subs_lock);
-
- list_for_each(p, &rib.subs) {
- struct ro_sub * e =
- list_entry(p, struct ro_sub, next);
- len_s = strlen(e->name);
- len_n = strlen(name);
-
- if (len_n < len_s)
- continue;
-
- if (strncmp(name, e->name, len_s) == 0) {
- if (e->ops->ro_updated == NULL)
- continue;
-
- ro_data = malloc(len);
- if (ro_data == NULL)
- continue;
-
- memcpy(ro_data, data, len);
- e->ops->ro_updated(name, ro_data, len);
- }
+ if (send_neighbors_ro(node->full_name, &msg, CDAP_WRITE)) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ LOG_ERR("Failed to send to neighbors.");
+ return -1;
}
- pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
return 0;
}
-int ro_props(const char * name,
- struct ro_props * props)
+ssize_t ro_read(const char * name,
+ uint8_t ** data)
{
struct rnode * node;
+ ssize_t len;
+
+ if (name == NULL || data == NULL)
+ return -EINVAL;
pthread_mutex_lock(&rib.ro_lock);
@@ -995,27 +1404,18 @@ int ro_props(const char * name,
return -1;
}
- if (node->props != NULL) {
- if (node->props->expiry != NULL)
- free(node->props->expiry);
- free(node->props);
+ *data = malloc(node->len);
+ if (*data == NULL) {
+ pthread_mutex_unlock(&rib.ro_lock);
+ return -1;
}
- node->props = props;
+ memcpy(*data, node->data, node->len);
+ len = node->len;
pthread_mutex_unlock(&rib.ro_lock);
- return 0;
-}
-
-int ro_sync(const char * name)
-{
- (void) name;
-
- LOG_MISSING;
- /* FIXME: We need whatevercast sets first */
-
- return -1;
+ return len;
}
int ro_subscribe(const char * name,
diff --git a/src/ipcpd/normal/ro.h b/src/ipcpd/normal/ro.h
index 0dfa7e8a..6fa1db2a 100644
--- a/src/ipcpd/normal/ro.h
+++ b/src/ipcpd/normal/ro.h
@@ -24,38 +24,33 @@
#define OUROBOROS_IPCP_RO_H
enum ro_recv_set {
- ALL_MEMBERS = 0,
- NEIGHBORS
+ NO_SYNC = 0,
+ NEIGHBORS,
+ ALL_MEMBERS
};
struct ro_props {
- bool enrol_sync;
- enum ro_recv_set recv_set;
- struct timespec * expiry;
+ bool enrol_sync;
+ enum ro_recv_set recv_set;
+ struct timespec expiry;
};
/* All RIB-objects have a pathname, separated by a slash. */
-/* Takes ownership of the data */
-int ro_create(const char * name,
- uint8_t * data,
- size_t len);
+/* Takes ownership of the data and props */
+int ro_create(const char * name,
+ struct ro_props * props,
+ uint8_t * data,
+ size_t len);
int ro_delete(const char * name);
-/* Reader takes ownership of data */
-ssize_t ro_read(const char * name,
- uint8_t ** data);
-
int ro_write(const char * name,
uint8_t * data,
size_t len);
-/* Takes ownership of the props */
-int ro_props(const char * name,
- struct ro_props * props);
-
-/* Sync changes with other members in the DIF */
-int ro_sync(const char * name);
+/* Reader takes ownership of data */
+ssize_t ro_read(const char * name,
+ uint8_t ** data);
/* Callback passes ownership of the data */
struct ro_sub_ops {
diff --git a/src/ipcpd/normal/ro.proto b/src/ipcpd/normal/ro.proto
new file mode 100644
index 00000000..c5062468
--- /dev/null
+++ b/src/ipcpd/normal/ro.proto
@@ -0,0 +1,9 @@
+message ro_msg {
+ required uint64 address = 1;
+ required uint64 seqno = 2;
+ required int32 recv_set = 3;
+ required bool enrol_sync = 4;
+ required uint32 sec = 5;
+ required uint64 nsec = 6;
+ required bytes value = 7;
+} \ No newline at end of file