/* * Ouroboros - Copyright (C) 2016 * * RIB manager of the IPC Process * * Sander Vrijders * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #define OUROBOROS_PREFIX "rib-manager" #include #include #include #include #include #include #include #include #include #include #include #include #include "addr_auth.h" #include "ribmgr.h" #include "dt_const.h" #include "frct.h" #include "ipcp.h" #include "cdap_request.h" #include "ro.h" #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; #include "ro.pb-c.h" typedef RoMsg ro_msg_t; #define SUBS_SIZE 25 #define ENROLLMENT "enrollment" #define RIBMGR_PREFIX "/ribmgr" #define STAT_INFO "/statinfo" #define PATH_DELIMITER "/" /* RIB objects */ struct rnode { char * name; char * full_name; uint64_t seqno; /* * NOTE: Naive implementation for now, could be replaced by * for instance taking a hash of the pathname and using that * as an index in a B-tree */ /* If there are no children, this is a leaf */ struct rnode * child; struct rnode * sibling; struct ro_props * props; uint8_t * data; size_t len; }; struct mgmt_flow { struct cdap * instance; int fd; struct list_head next; }; struct ro_sub { int sid; char * name; struct ro_sub_ops * ops; struct list_head next; }; struct { struct rnode * root; pthread_mutex_t ro_lock; struct list_head subs; struct bmp * sids; pthread_mutex_t subs_lock; int ribmgr_sid; struct dt_const dtc; uint64_t address; struct list_head flows; pthread_rwlock_t flows_lock; struct list_head cdap_reqs; pthread_mutex_t cdap_reqs_lock; struct addr_auth * addr_auth; } rib; int ribmgr_ro_created(const char * name, uint8_t * data, size_t len) { static_info_msg_t * stat_msg; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_PENDING_ENROLL && strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { LOG_DBG("Received static DIF information."); stat_msg = static_info_msg__unpack(NULL, len, data); if (stat_msg == NULL) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to unpack static info message."); return -1; } rib.dtc.addr_size = stat_msg->addr_size; rib.dtc.cep_id_size = stat_msg->cep_id_size; rib.dtc.pdu_length_size = stat_msg->pdu_length_size; rib.dtc.seqno_size = stat_msg->seqno_size; rib.dtc.has_ttl = stat_msg->has_ttl; rib.dtc.has_chk = stat_msg->has_chk; rib.dtc.min_pdu_size = stat_msg->min_pdu_size; rib.dtc.max_pdu_size = stat_msg->max_pdu_size; rib.addr_auth = addr_auth_create(stat_msg->addr_auth_type); if (rib.addr_auth == NULL) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); static_info_msg__free_unpacked(stat_msg, NULL); LOG_ERR("Failed to create address authority"); return -1; } rib.address = rib.addr_auth->address(); LOG_DBG("IPCP has address %lu", rib.address); if (frct_init()) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); static_info_msg__free_unpacked(stat_msg, NULL); LOG_ERR("Failed to init FRCT"); return -1; } static_info_msg__free_unpacked(stat_msg, NULL); } pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } /* We only have a create operation for now */ static struct ro_sub_ops ribmgr_sub_ops = { .ro_created = ribmgr_ro_created, .ro_updated = NULL, .ro_deleted = NULL }; static struct rnode * find_rnode_by_name(const char * name) { char * str; char * str1; char * saveptr; char * token; struct rnode * node; str = strdup(name); if (str == NULL) return NULL; node = rib.root; for (str1 = str; ; str1 = NULL) { token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token == NULL) break; node = node->child; while (node != NULL) if (strcmp(node->name, token) == 0) break; else node = node->sibling; if (node == NULL) { free(str); return NULL; } } free(str); return node; } /* Call under RIB object lock */ static int ro_msg_create(struct rnode * node, ro_msg_t * msg) { msg->address = rib.address; msg->seqno = node->seqno; msg->recv_set = node->props->recv_set; msg->enrol_sync = node->props->enrol_sync; msg->sec = node->props->expiry.tv_sec; msg->nsec = node->props->expiry.tv_nsec; msg->value.data = node->data; msg->value.len = node->len; return 0; } static struct rnode * ribmgr_ro_create(const char * name, struct ro_props * props, uint8_t * data, size_t len) { char * str; char * str1; char * saveptr; char * token; char * token2; struct rnode * node; struct rnode * new; struct rnode * prev; bool sibling; str = strdup(name); if (str == NULL) return NULL; node = rib.root; for (str1 = str; ; str1 = NULL) { token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token == NULL) { LOG_ERR("RO already exists."); free(str); return NULL; } prev = node; node = node->child; sibling = false; /* Search horizontally */ while (node != NULL) { if (strcmp(node->name, token) == 0) { break; } else { prev = node; node = node->sibling; sibling = true; } } if (node == NULL) break; } token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr); if (token2 != NULL) { LOG_ERR("Part of the pathname does not exist."); free(str); return NULL; } new = malloc(sizeof(*new)); if (new == NULL) { free(str); return NULL; } new->name = strdup(token); if (new->name == NULL) { free(str); free(new); return NULL; } free(str); new->full_name = strdup(name); if (new->full_name == NULL) { free(new); return NULL; } new->seqno = 0; new->props = props; if (sibling) prev->sibling = new; else prev->child = new; new->data = data; new->len = len; new->child = NULL; new->sibling = NULL; return new; } static int ribmgr_ro_delete(const char * name) { char * str; char * str1; char * saveptr; char * token; struct rnode * node; struct rnode * prev; bool sibling = false; str = strdup(name); if (str == NULL) return -1; node = rib.root; prev = NULL; for (str1 = str; ; str1 = NULL) { token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token == NULL) break; prev = node; node = node->child; sibling = false; while (node != NULL) { if (strcmp(node->name, token) == 0) { break; } else { prev = node; node = node->sibling; sibling = true; } } if (node == NULL) { free(str); return -1; } } if (node == rib.root) { LOG_ERR("Won't remove root."); free(str); return -1; } free(node->name); free(node->full_name); if (node->data != NULL) free(node->data); if (sibling) prev->sibling = node->sibling; else prev->child = node->sibling; free(node); free(str); return 0; } static struct rnode * ribmgr_ro_write(const char * name, uint8_t * data, size_t len) { struct rnode * node; node = find_rnode_by_name(name); if (node == NULL) return NULL; free(node->data); node->data = data; node->len = len; node->seqno++; return node; } /* Call while holding cdap_reqs_lock */ /* FIXME: better not to call blocking functions under any lock */ int cdap_result_wait(struct cdap * instance, enum cdap_opcode code, char * name, int invoke_id) { struct cdap_request * req; int ret; char * name_dup = strdup(name); if (name_dup == NULL) return -1; req = cdap_request_create(code, name_dup, invoke_id, instance); if (req == NULL) { free(name_dup); return -1; } list_add(&req->next, &rib.cdap_reqs); pthread_mutex_unlock(&rib.cdap_reqs_lock); ret = cdap_request_wait(req); pthread_mutex_lock(&rib.cdap_reqs_lock); if (ret == -1) /* should only be on ipcp shutdown */ LOG_DBG("Waiting CDAP request destroyed."); if (ret == -ETIMEDOUT) LOG_ERR("CDAP Request timed out."); if (ret) LOG_DBG("Unknown error code: %d.", ret); if (!ret) ret = req->result; list_del(&req->next); cdap_request_destroy(req); return ret; } static int write_ro_msg(struct cdap * neighbor, ro_msg_t * msg, char * name, enum cdap_opcode code) { uint8_t * data; size_t len; int iid = 0; len = ro_msg__get_packed_size(msg); if (len == 0) return -1; data = malloc(len); if (data == NULL) return -ENOMEM; ro_msg__pack(msg, data); pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_request(neighbor, code, name, data, len, 0); if (iid < 0) { pthread_mutex_unlock(&rib.cdap_reqs_lock); free(data); return -1; } if (cdap_result_wait(neighbor, code, name, iid)) { pthread_mutex_unlock(&rib.cdap_reqs_lock); free(data); LOG_ERR("Remote did not receive RIB object."); return -1; } pthread_mutex_unlock(&rib.cdap_reqs_lock); free(data); return 0; } int ribmgr_init() { INIT_LIST_HEAD(&rib.flows); INIT_LIST_HEAD(&rib.cdap_reqs); INIT_LIST_HEAD(&rib.subs); rib.root = malloc(sizeof(*(rib.root))); if (rib.root == NULL) return -1; rib.root->name = "root"; rib.root->child = NULL; rib.root->sibling = NULL; if (pthread_rwlock_init(&rib.flows_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); free(rib.root); return -1; } if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); free(rib.root); return -1; } if (pthread_mutex_init(&rib.ro_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); free(rib.root); return -1; } if (pthread_mutex_init(&rib.subs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); free(rib.root); return -1; } rib.sids = bmp_create(SUBS_SIZE, 0); if (rib.sids == NULL) { LOG_ERR("Failed to create bitmap."); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); free(rib.root); return -1; } rib.ribmgr_sid = ro_subscribe(RIBMGR_PREFIX, &ribmgr_sub_ops); if (rib.ribmgr_sid < 0) { LOG_ERR("Failed to subscribe."); bmp_destroy(rib.sids); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); free(rib.root); return -1; } return 0; } static void rtree_destroy(struct rnode * node) { if (node != NULL) { rtree_destroy(node->child); rtree_destroy(node->sibling); free(node->name); if (node->data != NULL) free(node->data); free(node); } } int ribmgr_fini() { struct list_head * pos = NULL; struct list_head * n = NULL; pthread_mutex_lock(&rib.cdap_reqs_lock); list_for_each_safe(pos, n, &rib.cdap_reqs) { struct cdap_request * req = list_entry(pos, struct cdap_request, next); free(req->name); list_del(&req->next); free(req); } pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_wrlock(&rib.flows_lock); list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); free(flow); } pthread_rwlock_unlock(&rib.flows_lock); ro_unsubscribe(rib.ribmgr_sid); if (rib.addr_auth != NULL) addr_auth_destroy(rib.addr_auth); pthread_mutex_lock(&rib.ro_lock); rtree_destroy(rib.root->child); free(rib.root); pthread_mutex_unlock(&rib.ro_lock); bmp_destroy(rib.sids); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_rwlock_destroy(&rib.flows_lock); return 0; } static int ribmgr_cdap_reply(struct cdap * instance, int invoke_id, int result, uint8_t * data, size_t len) { struct list_head * pos, * n = NULL; /* We never perform reads on other RIBs */ (void) data; (void) len; pthread_mutex_lock(&rib.cdap_reqs_lock); list_for_each_safe(pos, n, &rib.cdap_reqs) { struct cdap_request * req = list_entry(pos, struct cdap_request, next); if (req->instance == instance && req->invoke_id == invoke_id && req->state == REQ_PENDING) { if (result != 0) LOG_ERR("CDAP command with code %d and name %s " "failed with error %d", req->code, req->name, result); else LOG_DBG("CDAP command with code %d and name %s " "executed succesfully", req->code, req->name); pthread_mutex_unlock(&rib.cdap_reqs_lock); cdap_request_respond(req, result); pthread_mutex_lock(&rib.cdap_reqs_lock); } } pthread_mutex_unlock(&rib.cdap_reqs_lock); return 0; } static int ribmgr_cdap_create(struct cdap * instance, int invoke_id, char * name, ro_msg_t * msg) { int ret = 0; struct list_head * p = NULL; size_t len_s, len_n; uint8_t * ro_data; struct ro_props * props; struct rnode * node; props = malloc(sizeof(*props)); if (props == NULL) { cdap_send_reply(instance, invoke_id, -1, NULL, 0); return -ENOMEM; } props->expiry.tv_sec = msg->sec; props->expiry.tv_nsec = msg->nsec; props->enrol_sync = msg->enrol_sync; pthread_mutex_lock(&rib.ro_lock); ro_data = malloc(msg->value.len); if (ro_data == NULL) { pthread_mutex_unlock(&rib.ro_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); free(props); return -1; } memcpy(ro_data, msg->value.data, msg->value.len); node = ribmgr_ro_create(name, props, ro_data, msg->value.len); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); free(props); free(ro_data); return -1; } pthread_mutex_lock(&rib.subs_lock); list_for_each(p, &rib.subs) { struct ro_sub * e = list_entry(p, struct ro_sub, next); len_s = strlen(e->name); len_n = strlen(name); if (len_n < len_s) continue; if (memcmp(name, e->name, len_s) == 0) { if (e->ops->ro_created == NULL) continue; ro_data = malloc(node->len); if (ro_data == NULL) continue; memcpy(ro_data, node->data, node->len); e->ops->ro_created(name, ro_data, node->len); } } pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { LOG_ERR("Failed to send reply to create request."); return -1; } return 0; } static int ribmgr_cdap_delete(struct cdap * instance, int invoke_id, char * name) { struct list_head * p = NULL; size_t len_s; size_t len_n; pthread_mutex_lock(&rib.ro_lock); if (ribmgr_ro_delete(name)) { pthread_mutex_unlock(&rib.ro_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); return -1; } pthread_mutex_lock(&rib.subs_lock); list_for_each(p, &rib.subs) { struct ro_sub * e = list_entry(p, struct ro_sub, next); len_s = strlen(e->name); len_n = strlen(name); if (len_n < len_s) continue; if (memcmp(name, e->name, len_s) == 0) { if (e->ops->ro_deleted == NULL) continue; e->ops->ro_deleted(name); } } pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { LOG_ERR("Failed to send reply to create request."); return -1; } return 0; } static int ribmgr_cdap_write(struct cdap * instance, int invoke_id, char * name, ro_msg_t * msg, uint32_t flags) { int ret = 0; struct list_head * p = NULL; size_t len_s; size_t len_n; uint8_t * ro_data; struct rnode * node; (void) flags; pthread_mutex_lock(&rib.ro_lock); ro_data = malloc(msg->value.len); if (ro_data == NULL) { pthread_mutex_unlock(&rib.ro_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); return -1; } memcpy(ro_data, msg->value.data, msg->value.len); node = ribmgr_ro_write(name, msg->value.data, msg->value.len); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); free(ro_data); cdap_send_reply(instance, invoke_id, -1, NULL, 0); return -1; } pthread_mutex_lock(&rib.subs_lock); list_for_each(p, &rib.subs) { struct ro_sub * e = list_entry(p, struct ro_sub, next); len_s = strlen(e->name); len_n = strlen(name); if (len_n < len_s) continue; if (memcmp(name, e->name, len_s) == 0) { if (e->ops->ro_updated == NULL) continue; ro_data = malloc(node->len); if (ro_data == NULL) continue; memcpy(ro_data, node->data, node->len); e->ops->ro_updated(name, ro_data, node->len); } } pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { LOG_ERR("Failed to send reply to write request."); return -1; } return 0; } static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node) { int ret = 0; if (node != NULL) { if (node->props->enrol_sync == true) { ro_msg_t msg = RO_MSG__INIT; if (ro_msg_create(node, &msg)) { LOG_ERR("Failed to create RO msg."); return -1; } if (write_ro_msg(instance, &msg, node->full_name, CDAP_CREATE)) { LOG_ERR("Failed to send RO msg."); return -1; } } ret = ribmgr_enrol_sync(instance, node->child); if (ret == 0) ret = ribmgr_enrol_sync(instance, node->sibling); } return ret; } static int ribmgr_cdap_start(struct cdap * instance, int invoke_id, char * name) { int iid = 0; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_ENROLLED && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } /* Loop through rtree and send correct objects */ LOG_DBGF("Sending ROs that need to be sent on enrolment..."); pthread_mutex_lock(&rib.ro_lock); if (ribmgr_enrol_sync(instance, rib.root->child)) { pthread_mutex_unlock(&rib.ro_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to sync part of the RIB."); return -1; } pthread_mutex_unlock(&rib.ro_lock); LOG_DBGF("Sending stop enrollment..."); pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT, NULL, 0, 0); if (iid < 0) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send stop of enrollment."); return -1; } if (cdap_result_wait(instance, CDAP_STOP, ENROLLMENT, iid)) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Remote failed to complete enrollment."); return -1; } pthread_mutex_unlock(&rib.cdap_reqs_lock); } else { if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to start request."); return -1; } } pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } static int ribmgr_cdap_stop(struct cdap * instance, int invoke_id, char * name) { int ret = 0; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_PENDING_ENROLL && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); ipcp_set_state(IPCP_ENROLLED); } else ret = -1; if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; } pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } static int ribmgr_cdap_request(struct cdap * instance, int invoke_id, enum cdap_opcode opcode, char * name, uint8_t * data, size_t len, uint32_t flags) { ro_msg_t * msg; int ret = -1; struct list_head * p = NULL; if (opcode == CDAP_START) return ribmgr_cdap_start(instance, invoke_id, name); else if (opcode == CDAP_STOP) return ribmgr_cdap_stop(instance, invoke_id, name); msg = ro_msg__unpack(NULL, len, data); if (msg == NULL) { cdap_send_reply(instance, invoke_id, -1, NULL, 0); LOG_ERR("Failed to unpack RO message"); return -1; } /* FIXME: Check if we already received this */ if (opcode == CDAP_CREATE) { ret = ribmgr_cdap_create(instance, invoke_id, name, msg); } else if (opcode == CDAP_WRITE) { ret = ribmgr_cdap_write(instance, invoke_id, name, msg, flags); } else if (opcode == CDAP_DELETE) { ret = ribmgr_cdap_delete(instance, invoke_id, name); } else { LOG_INFO("Unsupported opcode received."); cdap_send_reply(instance, invoke_id, -1, NULL, 0); return -1; } if (msg->recv_set == ALL_MEMBERS) { pthread_rwlock_rdlock(&rib.flows_lock); list_for_each(p, &rib.flows) { struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); /* Don't send it back */ if (e->instance == instance) continue; if (write_ro_msg(e->instance, msg, name, opcode)) { LOG_ERR("Failed to send to a neighbor."); pthread_rwlock_unlock(&rib.flows_lock); ro_msg__free_unpacked(msg, NULL); return -1; } } pthread_rwlock_unlock(&rib.flows_lock); } ro_msg__free_unpacked(msg, NULL); return ret; } static struct cdap_ops ribmgr_cdap_ops = { .cdap_reply = ribmgr_cdap_reply, .cdap_request = ribmgr_cdap_request }; int ribmgr_add_flow(int fd) { struct cdap * instance = NULL; struct mgmt_flow * flow; int iid = 0; flow = malloc(sizeof(*flow)); if (flow == NULL) return -1; instance = cdap_create(&ribmgr_cdap_ops, fd); if (instance == NULL) { LOG_ERR("Failed to create CDAP instance"); free(flow); return -1; } INIT_LIST_HEAD(&flow->next); flow->instance = instance; flow->fd = fd; pthread_rwlock_wrlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&rib.flows_lock); if (list_empty(&rib.flows) && ipcp_get_state() == IPCP_INIT) { ipcp_set_state(IPCP_PENDING_ENROLL); pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_request(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); if (iid < 0) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); return -1; } if (cdap_result_wait(instance, CDAP_START, ENROLLMENT, iid)) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); return -1; } pthread_mutex_unlock(&rib.cdap_reqs_lock); } list_add(&flow->next, &rib.flows); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } int ribmgr_remove_flow(int fd) { struct list_head * pos, * n = NULL; pthread_rwlock_wrlock(&rib.flows_lock); list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (flow->fd == fd) { if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); pthread_rwlock_unlock(&rib.flows_lock); free(flow); return 0; } } pthread_rwlock_unlock(&rib.flows_lock); return -1; } int ribmgr_bootstrap(struct dif_config * conf) { static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; uint8_t * data = NULL; size_t len = 0; struct ro_props * props; if (conf == NULL || conf->type != IPCP_NORMAL) { LOG_ERR("Bad DIF configuration."); return -1; } props = malloc(sizeof(*props)); if (props == NULL) { LOG_ERR("Failed to allocate memory."); return -1; } props->enrol_sync = true; props->recv_set = NEIGHBORS; props->expiry.tv_sec = 0; props->expiry.tv_nsec = 0; if (ribmgr_ro_create(RIBMGR_PREFIX, props, NULL, 0) == NULL) { LOG_ERR("Failed to create RIBMGR RO."); free(props); return -1; } stat_info.addr_size = rib.dtc.addr_size = conf->addr_size; stat_info.cep_id_size = rib.dtc.cep_id_size = conf->cep_id_size; stat_info.pdu_length_size = rib.dtc.pdu_length_size = conf->pdu_length_size; stat_info.seqno_size = rib.dtc.seqno_size = conf->seqno_size; stat_info.has_ttl = rib.dtc.has_ttl = conf->has_ttl; stat_info.has_chk = rib.dtc.has_chk = conf->has_chk; stat_info.min_pdu_size = rib.dtc.min_pdu_size = conf->min_pdu_size; stat_info.max_pdu_size = rib.dtc.max_pdu_size = conf->max_pdu_size; rib.addr_auth = addr_auth_create(conf->addr_auth_type); if (rib.addr_auth == NULL) { LOG_ERR("Failed to create address authority."); ro_delete(RIBMGR_PREFIX); return -1; } stat_info.addr_auth_type = rib.addr_auth->type; len = static_info_msg__get_packed_size(&stat_info); if (len == 0) { LOG_ERR("Failed to get size of static information."); addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } data = malloc(len); if (data == NULL) { LOG_ERR("Failed to allocate memory."); addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } static_info_msg__pack(&stat_info, data); props = malloc(sizeof(*props)); if (props == NULL) { LOG_ERR("Failed to allocate memory."); free(data); addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } props->enrol_sync = true; props->recv_set = NEIGHBORS; props->expiry.tv_sec = 0; props->expiry.tv_nsec = 0; if (ribmgr_ro_create(RIBMGR_PREFIX STAT_INFO, props, data, len) == NULL) { LOG_ERR("Failed to create static info RO."); free(data); free(props); addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } rib.address = rib.addr_auth->address(); LOG_DBG("IPCP has address %lu", rib.address); if (frct_init()) { LOG_ERR("Failed to initialize FRCT."); ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } LOG_DBG("Bootstrapped RIB Manager."); return 0; } struct dt_const * ribmgr_dt_const() { return &(rib.dtc); } uint64_t ribmgr_address() { return rib.address; } static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code) { struct list_head * p = NULL; pthread_rwlock_rdlock(&rib.flows_lock); list_for_each(p, &rib.flows) { struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); if (write_ro_msg(e->instance, msg, name, code)) { LOG_ERR("Failed to send to a neighbor."); pthread_rwlock_unlock(&rib.flows_lock); return -1; } } pthread_rwlock_unlock(&rib.flows_lock); return 0; } int ro_create(const char * name, struct ro_props * props, uint8_t * data, size_t len) { struct rnode * node; ro_msg_t msg = RO_MSG__INIT; if (name == NULL || props == NULL) return -EINVAL; pthread_mutex_lock(&rib.ro_lock); node = ribmgr_ro_create(name, props, data, len); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to create RO."); return -1; } if (node->props->recv_set == NO_SYNC) { pthread_mutex_unlock(&rib.ro_lock); return 0; } if (ro_msg_create(node, &msg)) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to create RO msg."); return -1; } if (send_neighbors_ro(node->full_name, &msg, CDAP_CREATE)) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to send to neighbors."); return -1; } pthread_mutex_unlock(&rib.ro_lock); return 0; } int ro_delete(const char * name) { struct rnode * node; ro_msg_t msg = RO_MSG__INIT; if (name == NULL) return -EINVAL; pthread_mutex_lock(&rib.ro_lock); node = find_rnode_by_name(name); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to sync RO."); return -1; } if (node->props->recv_set != NO_SYNC) { if (ro_msg_create(node, &msg)) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to create RO msg."); return -1; } if (send_neighbors_ro(node->full_name, &msg, CDAP_DELETE)) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to send to neighbors."); return -1; } } if (ribmgr_ro_delete(name)) { pthread_mutex_unlock(&rib.ro_lock); return -1; } pthread_mutex_unlock(&rib.ro_lock); return 0; } int ro_write(const char * name, uint8_t * data, size_t len) { struct rnode * node; ro_msg_t msg = RO_MSG__INIT; if (name == NULL || data == NULL) return -EINVAL; pthread_mutex_lock(&rib.ro_lock); node = ribmgr_ro_write(name, data, len); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to create RO."); return -1; } if (node->props->recv_set == NO_SYNC) { pthread_mutex_unlock(&rib.ro_lock); return 0; } if (ro_msg_create(node, &msg)) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to create RO msg."); return -1; } if (send_neighbors_ro(node->full_name, &msg, CDAP_WRITE)) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Failed to send to neighbors."); return -1; } pthread_mutex_unlock(&rib.ro_lock); return 0; } ssize_t ro_read(const char * name, uint8_t ** data) { struct rnode * node; ssize_t len; if (name == NULL || data == NULL) return -EINVAL; pthread_mutex_lock(&rib.ro_lock); node = find_rnode_by_name(name); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); return -1; } *data = malloc(node->len); if (*data == NULL) { pthread_mutex_unlock(&rib.ro_lock); return -1; } memcpy(*data, node->data, node->len); len = node->len; pthread_mutex_unlock(&rib.ro_lock); return len; } int ro_subscribe(const char * name, struct ro_sub_ops * ops) { struct ro_sub * sub; if (name == NULL || ops == NULL) return -EINVAL; sub = malloc(sizeof(*sub)); if (sub == NULL) return -ENOMEM; INIT_LIST_HEAD(&sub->next); sub->name = strdup(name); if (sub->name == NULL) { free(sub); return -1; } sub->ops = ops; pthread_mutex_lock(&rib.subs_lock); sub->sid = bmp_allocate(rib.sids); if (sub->sid < 0) { pthread_mutex_unlock(&rib.subs_lock); free(sub->name); free(sub); LOG_ERR("Failed to get sub id."); } list_add(&sub->next, &rib.subs); pthread_mutex_unlock(&rib.subs_lock); return 0; } int ro_unsubscribe(int sid) { struct list_head * pos = NULL; struct list_head * n = NULL; pthread_mutex_lock(&rib.subs_lock); list_for_each_safe(pos, n, &(rib.subs)) { struct ro_sub * e = list_entry(pos, struct ro_sub, next); if (sid == e->sid) { bmp_release(rib.sids, sid); list_del(&e->next); free(e->name); free(e); pthread_mutex_unlock(&rib.subs_lock); return 0; } } pthread_mutex_unlock(&rib.subs_lock); LOG_ERR("No such subscription found."); return -1; }