/* * Ouroboros - Copyright (C) 2016 * * RIB manager of the IPC Process * * Sander Vrijders <sander.vrijders@intec.ugent.be> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #define OUROBOROS_PREFIX "rib-manager" #include <ouroboros/config.h> #include <ouroboros/logs.h> #include <ouroboros/cdap.h> #include <ouroboros/list.h> #include <ouroboros/time_utils.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/bitmap.h> #include <ouroboros/errno.h> #include <stdlib.h> #include <pthread.h> #include <string.h> #include <errno.h> #include "addr_auth.h" #include "ribmgr.h" #include "dt_const.h" #include "frct.h" #include "ipcp.h" #include "cdap_request.h" #include "ro.h" #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; #define SUBS_SIZE 25 #define ENROLLMENT "enrollment" #define STATIC_INFO "static DIF information" #define PATH_DELIMITER "/" /* RIB objects */ struct rnode { char * name; /* * NOTE: Naive implementation for now, could be replaced by * for instance taking a hash of the pathname and using that * as an index in a B-tree */ /* If there are no children, this is a leaf */ struct rnode * child; struct rnode * sibling; struct ro_props * props; uint8_t * data; size_t len; }; struct mgmt_flow { struct cdap * instance; int fd; struct list_head next; }; struct ro_sub { int sid; char * name; struct ro_sub_ops * ops; struct list_head next; }; struct { struct rnode * root; pthread_mutex_t ro_lock; struct list_head subs; struct bmp * sids; pthread_mutex_t subs_lock; struct dt_const dtc; uint64_t address; struct list_head flows; pthread_rwlock_t flows_lock; struct list_head cdap_reqs; pthread_mutex_t cdap_reqs_lock; struct addr_auth * addr_auth; } rib; /* Call while holding cdap_reqs_lock */ /* FIXME: better not to call blocking functions under any lock */ int cdap_result_wait(struct cdap * instance, enum cdap_opcode code, char * name, int invoke_id) { struct cdap_request * req; int ret; char * name_dup = strdup(name); if (name_dup == NULL) return -1; req = cdap_request_create(code, name_dup, invoke_id, instance); if (req == NULL) { free(name_dup); return -1; } list_add(&req->next, &rib.cdap_reqs); pthread_mutex_unlock(&rib.cdap_reqs_lock); ret = cdap_request_wait(req); pthread_mutex_lock(&rib.cdap_reqs_lock); if (ret == -1) /* should only be on ipcp shutdown */ LOG_DBG("Waiting CDAP request destroyed."); if (ret == -ETIMEDOUT) LOG_ERR("CDAP Request timed out."); if (ret) LOG_DBG("Unknown error code: %d.", ret); if (!ret) ret = req->result; list_del(&req->next); cdap_request_destroy(req); return ret; } int ribmgr_init() { INIT_LIST_HEAD(&rib.flows); INIT_LIST_HEAD(&rib.cdap_reqs); INIT_LIST_HEAD(&rib.subs); rib.root = malloc(sizeof(*(rib.root))); if (rib.root == NULL) return -1; rib.root->name = "root"; rib.root->child = NULL; rib.root->sibling = NULL; if (pthread_rwlock_init(&rib.flows_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); free(rib.root); return -1; } if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); free(rib.root); return -1; } if (pthread_mutex_init(&rib.ro_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); free(rib.root); return -1; } if (pthread_mutex_init(&rib.subs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); free(rib.root); return -1; } rib.sids = bmp_create(SUBS_SIZE, 0); if (rib.sids == NULL) { LOG_ERR("Failed to create bitmap."); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); free(rib.root); return -1; } return 0; } static void rtree_destroy(struct rnode * node) { if (node != NULL) { rtree_destroy(node->child); rtree_destroy(node->sibling); free(node->name); if (node->data != NULL) free(node->data); free(node); } } int ribmgr_fini() { struct list_head * pos = NULL; struct list_head * n = NULL; pthread_mutex_lock(&rib.cdap_reqs_lock); list_for_each_safe(pos, n, &rib.cdap_reqs) { struct cdap_request * req = list_entry(pos, struct cdap_request, next); free(req->name); list_del(&req->next); free(req); } pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_wrlock(&rib.flows_lock); list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); free(flow); } pthread_rwlock_unlock(&rib.flows_lock); if (rib.addr_auth != NULL) addr_auth_destroy(rib.addr_auth); pthread_mutex_lock(&rib.ro_lock); rtree_destroy(rib.root->child); free(rib.root); pthread_mutex_unlock(&rib.ro_lock); bmp_destroy(rib.sids); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_rwlock_destroy(&rib.flows_lock); return 0; } static int ribmgr_cdap_reply(struct cdap * instance, int invoke_id, int result, uint8_t * data, size_t len) { struct list_head * pos, * n = NULL; (void) data; (void) len; pthread_mutex_lock(&rib.cdap_reqs_lock); list_for_each_safe(pos, n, &rib.cdap_reqs) { struct cdap_request * req = list_entry(pos, struct cdap_request, next); if (req->instance == instance && req->invoke_id == invoke_id && req->state == REQ_PENDING) { if (result != 0) LOG_ERR("CDAP command with code %d and name %s " "failed with error %d", req->code, req->name, result); else LOG_DBG("CDAP command with code %d and name %s " "executed succesfully", req->code, req->name); pthread_mutex_unlock(&rib.cdap_reqs_lock); /* FIXME: In case of a read, update values here */ cdap_request_respond(req, result); pthread_mutex_lock(&rib.cdap_reqs_lock); } } pthread_mutex_unlock(&rib.cdap_reqs_lock); return 0; } static int ribmgr_cdap_write(struct cdap * instance, int invoke_id, char * name, uint8_t * data, size_t len, uint32_t flags) { static_info_msg_t * msg; int ret = 0; (void) flags; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_PENDING_ENROLL && strcmp(name, STATIC_INFO) == 0) { LOG_DBG("Received static DIF information."); msg = static_info_msg__unpack(NULL, len, data); if (msg == NULL) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); LOG_ERR("Failed to unpack static info message."); return -1; } rib.dtc.addr_size = msg->addr_size; rib.dtc.cep_id_size = msg->cep_id_size; rib.dtc.pdu_length_size = msg->pdu_length_size; rib.dtc.seqno_size = msg->seqno_size; rib.dtc.has_ttl = msg->has_ttl; rib.dtc.has_chk = msg->has_chk; rib.dtc.min_pdu_size = msg->min_pdu_size; rib.dtc.max_pdu_size = msg->max_pdu_size; rib.addr_auth = addr_auth_create(msg->addr_auth_type); if (rib.addr_auth == NULL) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); static_info_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to create address authority"); return -1; } rib.address = rib.addr_auth->address(); LOG_DBG("IPCP has address %lu", rib.address); if (frct_init()) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); static_info_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to init FRCT"); return -1; } static_info_msg__free_unpacked(msg, NULL); } else { ret = -1; } pthread_rwlock_unlock(&ipcpi.state_lock); if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { LOG_ERR("Failed to send reply to write request."); return -1; } return 0; } static int ribmgr_cdap_start(struct cdap * instance, int invoke_id, char * name) { static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; uint8_t * data = NULL; size_t len = 0; int iid = 0; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_ENROLLED && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } stat_info.addr_size = rib.dtc.addr_size; stat_info.cep_id_size = rib.dtc.cep_id_size; stat_info.pdu_length_size = rib.dtc.pdu_length_size; stat_info.seqno_size = rib.dtc.seqno_size; stat_info.has_ttl = rib.dtc.has_ttl; stat_info.has_chk = rib.dtc.has_chk; stat_info.min_pdu_size = rib.dtc.min_pdu_size; stat_info.max_pdu_size = rib.dtc.max_pdu_size; stat_info.addr_auth_type = rib.addr_auth->type; len = static_info_msg__get_packed_size(&stat_info); if (len == 0) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to get size of static information."); return -1; } data = malloc(len); if (data == NULL) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to allocate memory."); return -1; } static_info_msg__pack(&stat_info, data); LOG_DBGF("Sending static info..."); pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_request(instance, CDAP_WRITE, STATIC_INFO, data, len, 0); if (iid < 0) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Failed to send static information."); return -1; } if (cdap_result_wait(instance, CDAP_WRITE, STATIC_INFO, iid)) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Remote did not receive static information."); return -1; } pthread_mutex_unlock(&rib.cdap_reqs_lock); /* FIXME: Send neighbors here. */ LOG_DBGF("Sending stop enrollment..."); pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT, NULL, 0, 0); if (iid < 0) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Failed to send stop of enrollment."); return -1; } if (cdap_result_wait(instance, CDAP_STOP, ENROLLMENT, iid)) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Remote failed to complete enrollment."); return -1; } pthread_mutex_unlock(&rib.cdap_reqs_lock); free(data); } else { if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to start request."); return -1; } } pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } static int ribmgr_cdap_stop(struct cdap * instance, int invoke_id, char * name) { int ret = 0; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_PENDING_ENROLL && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); ipcp_set_state(IPCP_ENROLLED); } else ret = -1; if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; } pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } static int ribmgr_cdap_request(struct cdap * instance, int invoke_id, enum cdap_opcode opcode, char * name, uint8_t * data, size_t len, uint32_t flags) { switch (opcode) { case CDAP_WRITE: return ribmgr_cdap_write(instance, invoke_id, name, data, len, flags); case CDAP_START: return ribmgr_cdap_start(instance, invoke_id, name); case CDAP_STOP: return ribmgr_cdap_stop(instance, invoke_id, name); default: LOG_INFO("Unsupported CDAP opcode received."); return -1; } } static struct cdap_ops ribmgr_ops = { .cdap_reply = ribmgr_cdap_reply, .cdap_request = ribmgr_cdap_request }; int ribmgr_add_flow(int fd) { struct cdap * instance = NULL; struct mgmt_flow * flow; int iid = 0; flow = malloc(sizeof(*flow)); if (flow == NULL) return -1; instance = cdap_create(&ribmgr_ops, fd); if (instance == NULL) { LOG_ERR("Failed to create CDAP instance"); free(flow); return -1; } INIT_LIST_HEAD(&flow->next); flow->instance = instance; flow->fd = fd; pthread_rwlock_wrlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&rib.flows_lock); if (list_empty(&rib.flows) && ipcp_get_state() == IPCP_INIT) { ipcp_set_state(IPCP_PENDING_ENROLL); pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_request(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); if (iid < 0) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); return -1; } if (cdap_result_wait(instance, CDAP_START, ENROLLMENT, iid)) { pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); return -1; } pthread_mutex_unlock(&rib.cdap_reqs_lock); } list_add(&flow->next, &rib.flows); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } int ribmgr_remove_flow(int fd) { struct list_head * pos, * n = NULL; pthread_rwlock_wrlock(&rib.flows_lock); list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (flow->fd == fd) { if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); pthread_rwlock_unlock(&rib.flows_lock); free(flow); return 0; } } pthread_rwlock_unlock(&rib.flows_lock); return -1; } int ribmgr_bootstrap(struct dif_config * conf) { if (conf == NULL || conf->type != IPCP_NORMAL) { LOG_ERR("Bad DIF configuration."); return -1; } rib.dtc.addr_size = conf->addr_size; rib.dtc.cep_id_size = conf->cep_id_size; rib.dtc.pdu_length_size = conf->pdu_length_size; rib.dtc.seqno_size = conf->seqno_size; rib.dtc.has_ttl = conf->has_ttl; rib.dtc.has_chk = conf->has_chk; rib.dtc.min_pdu_size = conf->min_pdu_size; rib.dtc.max_pdu_size = conf->max_pdu_size; rib.addr_auth = addr_auth_create(conf->addr_auth_type); if (rib.addr_auth == NULL) { LOG_ERR("Failed to create address authority."); return -1; } rib.address = rib.addr_auth->address(); LOG_DBG("IPCP has address %lu", rib.address); if (frct_init()) { LOG_ERR("Failed to initialize FRCT."); addr_auth_destroy(rib.addr_auth); return -1; } LOG_DBG("Bootstrapped RIB Manager."); return 0; } struct dt_const * ribmgr_dt_const() { return &(rib.dtc); } uint64_t ribmgr_address() { return rib.address; } int ro_create(const char * name, uint8_t * data, size_t len) { char * str, * str1, * saveptr, * token; struct rnode * node, * new, * prev; bool sibling; struct list_head * p = NULL; size_t len_s, len_n; uint8_t * ro_data; str = strdup(name); if (str == NULL) return -1; pthread_mutex_lock(&rib.ro_lock); node = rib.root; for (str1 = str; ; str1 = NULL) { token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token == NULL) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("RO already exists."); free(str); return -1; } prev = node; node = node->child; sibling = false; /* Search horizontally */ while (node != NULL) { if (strcmp(node->name, token) == 0) { break; } else { prev = node; node = node->sibling; sibling = true; } } if (node == NULL) break; } token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token != NULL) { pthread_mutex_unlock(&rib.ro_lock); LOG_ERR("Part of the pathname does not exist."); free(str); return -1; } free(str); new = malloc(sizeof(*new)); if (new == NULL) { pthread_mutex_unlock(&rib.ro_lock); return -1; } new->name = strdup(token); if (new->name == NULL) { pthread_mutex_unlock(&rib.ro_lock); free(new); return -1; } if (sibling) prev->sibling = new; else prev->child = new; new->data = data; new->len = len; new->child = NULL; new->sibling = NULL; pthread_mutex_lock(&rib.subs_lock); list_for_each(p, &rib.subs) { struct ro_sub * e = list_entry(p, struct ro_sub, next); len_s = strlen(e->name); len_n = strlen(name); if (len_n < len_s) continue; if (strncmp(name, e->name, len_s) == 0) { if (e->ops->ro_created == NULL) continue; ro_data = malloc(len); if (ro_data == NULL) continue; memcpy(ro_data, data, len); e->ops->ro_created(name, ro_data, len); } } pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); return 0; } int ro_delete(const char * name) { char * str, * str1, * saveptr, * token; struct rnode * node, * prev; bool sibling = false; struct list_head * p = NULL; size_t len_s, len_n; str = strdup(name); if (str == NULL) return -1; pthread_mutex_lock(&rib.ro_lock); node = rib.root; prev = NULL; for (str1 = str; ; str1 = NULL) { token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token == NULL) break; prev = node; node = node->child; sibling = false; while (node != NULL) { if (strcmp(node->name, token) == 0) { break; } else { prev = node; node = node->sibling; sibling = true; } } if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); free(str); return -1; } } if (node == rib.root) { LOG_ERR("Won't remove root."); free(str); return -1; } free(node->name); if (node->data != NULL) free(node->data); if (sibling) prev->sibling = node->sibling; else prev->child = node->sibling; free(node); pthread_mutex_lock(&rib.subs_lock); list_for_each(p, &rib.subs) { struct ro_sub * e = list_entry(p, struct ro_sub, next); len_s = strlen(e->name); len_n = strlen(name); if (len_n < len_s) continue; if (strncmp(name, e->name, len_s) == 0) { if (e->ops->ro_deleted == NULL) continue; e->ops->ro_deleted(name); } } pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); free(str); return 0; } static struct rnode * find_rnode_by_name(const char * name) { char * str, * str1, * saveptr, * token; struct rnode * node; str = strdup(name); if (str == NULL) return NULL; node = rib.root; for (str1 = str; ; str1 = NULL) { token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token == NULL) break; node = node->child; while (node != NULL) if (strcmp(node->name, token) == 0) break; else node = node->sibling; if (node == NULL) { free(str); return NULL; } } free(str); return node; } ssize_t ro_read(const char * name, uint8_t ** data) { struct rnode * node; ssize_t len; pthread_mutex_lock(&rib.ro_lock); node = find_rnode_by_name(name); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); return -1; } *data = malloc(node->len); if (*data == NULL) { pthread_mutex_unlock(&rib.ro_lock); return -1; } memcpy(*data, node->data, node->len); len = node->len; pthread_mutex_unlock(&rib.ro_lock); return len; } int ro_write(const char * name, uint8_t * data, size_t len) { struct rnode * node; struct list_head * p = NULL; size_t len_s, len_n; uint8_t * ro_data; pthread_mutex_lock(&rib.ro_lock); node = find_rnode_by_name(name); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); return -1; } free(node->data); node->data = data; node->len = len; pthread_mutex_lock(&rib.subs_lock); list_for_each(p, &rib.subs) { struct ro_sub * e = list_entry(p, struct ro_sub, next); len_s = strlen(e->name); len_n = strlen(name); if (len_n < len_s) continue; if (strncmp(name, e->name, len_s) == 0) { if (e->ops->ro_updated == NULL) continue; ro_data = malloc(len); if (ro_data == NULL) continue; memcpy(ro_data, data, len); e->ops->ro_updated(name, ro_data, len); } } pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); return 0; } int ro_props(const char * name, struct ro_props * props) { struct rnode * node; pthread_mutex_lock(&rib.ro_lock); node = find_rnode_by_name(name); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); return -1; } if (node->props != NULL) { if (node->props->expiry != NULL) free(node->props->expiry); free(node->props); } node->props = props; pthread_mutex_unlock(&rib.ro_lock); return 0; } int ro_sync(const char * name) { (void) name; LOG_MISSING; /* FIXME: We need whatevercast sets first */ return -1; } int ro_subscribe(const char * name, struct ro_sub_ops * ops) { struct ro_sub * sub; if (name == NULL || ops == NULL) return -EINVAL; sub = malloc(sizeof(*sub)); if (sub == NULL) return -ENOMEM; INIT_LIST_HEAD(&sub->next); sub->name = strdup(name); if (sub->name == NULL) { free(sub); return -1; } sub->ops = ops; pthread_mutex_lock(&rib.subs_lock); sub->sid = bmp_allocate(rib.sids); if (sub->sid < 0) { pthread_mutex_unlock(&rib.subs_lock); free(sub->name); free(sub); LOG_ERR("Failed to get sub id."); } list_add(&sub->next, &rib.subs); pthread_mutex_unlock(&rib.subs_lock); return 0; } int ro_unsubscribe(int sid) { struct list_head * pos = NULL; struct list_head * n = NULL; pthread_mutex_lock(&rib.subs_lock); list_for_each_safe(pos, n, &(rib.subs)) { struct ro_sub * e = list_entry(pos, struct ro_sub, next); if (sid == e->sid) { bmp_release(rib.sids, sid); list_del(&e->next); free(e->name); free(e); pthread_mutex_unlock(&rib.subs_lock); return 0; } } pthread_mutex_unlock(&rib.subs_lock); LOG_ERR("No such subscription found."); return -1; }