From f97dee45d3c1b0088aa8010a1c9d59593c3d0df0 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 23 Dec 2016 18:13:40 +0100 Subject: ipcpd, lib: Refactor normal ipcp and cdap Refactors the normal IPCP fmgr and ribmgr, and modifies the API for cdap so that no callbacks are needed. --- src/ipcpd/normal/CMakeLists.txt | 1 - src/ipcpd/normal/cdap_request.c | 157 ------------- src/ipcpd/normal/cdap_request.h | 68 ------ src/ipcpd/normal/fmgr.c | 390 ++++++++++++------------------- src/ipcpd/normal/ribmgr.c | 501 ++++++++++++++++------------------------ 5 files changed, 347 insertions(+), 770 deletions(-) delete mode 100644 src/ipcpd/normal/cdap_request.c delete mode 100644 src/ipcpd/normal/cdap_request.h (limited to 'src/ipcpd/normal') diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 67a7953b..5f85dd89 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -25,7 +25,6 @@ protobuf_generate_c(RO_SRCS RO_HDRS ro.proto) set(SOURCE_FILES # Add source files here addr_auth.c - cdap_request.c crc32.c dir.c fmgr.c diff --git a/src/ipcpd/normal/cdap_request.c b/src/ipcpd/normal/cdap_request.c deleted file mode 100644 index 8409b508..00000000 --- a/src/ipcpd/normal/cdap_request.c +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP - RIB Manager - CDAP request - * - * Sander Vrijders - * Dimitri Staessens - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include -#include -#include - -#include "cdap_request.h" - -#include - -struct cdap_request * cdap_request_create(enum cdap_opcode code, - char * name, - int invoke_id, - struct cdap * instance) -{ - struct cdap_request * creq = malloc(sizeof(*creq)); - pthread_condattr_t cattr; - - if (creq == NULL) - return NULL; - - creq->code = code; - creq->name = name; - creq->invoke_id = invoke_id; - creq->instance = instance; - creq->state = REQ_INIT; - creq->result = -1; - - pthread_condattr_init(&cattr); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - pthread_cond_init(&creq->cond, &cattr); - pthread_mutex_init(&creq->lock, NULL); - - INIT_LIST_HEAD(&creq->next); - - return creq; -} - -void cdap_request_destroy(struct cdap_request * creq) -{ - if (creq == NULL) - return; - - pthread_mutex_lock(&creq->lock); - - if (creq->state == REQ_DESTROY) { - pthread_mutex_unlock(&creq->lock); - return; - } - - if (creq->state == REQ_INIT) - creq->state = REQ_DONE; - - if (creq->state == REQ_PENDING) { - creq->state = REQ_DESTROY; - pthread_cond_broadcast(&creq->cond); - } - - while (creq->state != REQ_DONE) - pthread_cond_wait(&creq->cond, &creq->lock); - - pthread_mutex_unlock(&creq->lock); - - pthread_cond_destroy(&creq->cond); - pthread_mutex_destroy(&creq->lock); - - if (creq->name != NULL) - free(creq->name); - - free(creq); -} - -int cdap_request_wait(struct cdap_request * creq) -{ - struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000), - (CDAP_REPLY_TIMEOUT % 1000) * MILLION}; - struct timespec abstime; - int ret = -1; - - if (creq == NULL) - return -EINVAL; - - clock_gettime(CLOCK_REALTIME, &abstime); - ts_add(&abstime, &timeout, &abstime); - - pthread_mutex_lock(&creq->lock); - - if (creq->state != REQ_INIT) { - pthread_mutex_unlock(&creq->lock); - return -EINVAL; - } - - creq->state = REQ_PENDING; - - while (creq->state == REQ_PENDING) { - if ((ret = -pthread_cond_timedwait(&creq->cond, - &creq->lock, - &abstime)) == -ETIMEDOUT) { - break; - } - } - - if (creq->state == REQ_DESTROY) - ret = -1; - - creq->state = REQ_DONE; - pthread_cond_broadcast(&creq->cond); - - pthread_mutex_unlock(&creq->lock); - - return ret; -} - -void cdap_request_respond(struct cdap_request * creq, int response) -{ - if (creq == NULL) - return; - - pthread_mutex_lock(&creq->lock); - - if (creq->state != REQ_PENDING) { - pthread_mutex_unlock(&creq->lock); - return; - } - - creq->state = REQ_RESPONSE; - creq->result = response; - pthread_cond_broadcast(&creq->cond); - - while (creq->state == REQ_RESPONSE) - pthread_cond_wait(&creq->cond, &creq->lock); - - pthread_mutex_unlock(&creq->lock); -} diff --git a/src/ipcpd/normal/cdap_request.h b/src/ipcpd/normal/cdap_request.h deleted file mode 100644 index 9cccfda5..00000000 --- a/src/ipcpd/normal/cdap_request.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP - RIB Manager - CDAP request - * - * Sander Vrijders - * Dimitri Staessens - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H -#define OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H - -#include -#include -#include - -#include - -enum creq_state { - REQ_INIT = 0, - REQ_PENDING, - REQ_RESPONSE, - REQ_DONE, - REQ_DESTROY -}; - -struct cdap_request { - struct list_head next; - - enum cdap_opcode code; - char * name; - int invoke_id; - struct cdap * instance; - - int result; - - enum creq_state state; - pthread_cond_t cond; - pthread_mutex_t lock; -}; - -struct cdap_request * cdap_request_create(enum cdap_opcode code, - char * name, - int invoke_id, - struct cdap * instance); - -void cdap_request_destroy(struct cdap_request * creq); - -int cdap_request_wait(struct cdap_request * creq); - -void cdap_request_respond(struct cdap_request * creq, - int response); - -#endif /* OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8e416aa4..6684db7c 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -49,82 +49,31 @@ typedef FlowAllocMsg flow_alloc_msg_t; #define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */ -struct np1_flow { - int fd; - cep_id_t cep_id; - enum qos_cube qos; -}; - -struct nm1_flow { - int fd; - enum qos_cube qos; -}; - struct { - pthread_t nm1_flow_acceptor; - struct nm1_flow ** nm1_flows; + flow_set_t * nm1_set[QOS_CUBE_MAX]; + fqueue_t * nm1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t nm1_flows_lock; - flow_set_t * nm1_set; - pthread_t nm1_sdu_reader; - struct np1_flow ** np1_flows; - struct np1_flow ** np1_flows_cep; + flow_set_t * np1_set[QOS_CUBE_MAX]; + fqueue_t * np1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t np1_flows_lock; - flow_set_t * np1_set; + + cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; + int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; + + pthread_t nm1_flow_acceptor; + pthread_t nm1_sdu_reader; pthread_t np1_sdu_reader; /* FIXME: Replace with PFF */ int fd; } fmgr; -static int add_nm1_fd(int fd, - enum qos_cube qos) -{ - struct nm1_flow * tmp; - - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) - return -1; - - tmp->fd = fd; - tmp->qos = qos; - - pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); - fmgr.nm1_flows[fd] = tmp; - pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - - flow_set_add(fmgr.nm1_set, fd); - - /* FIXME: Temporary, until we have a PFF */ - fmgr.fd = fd; - - return 0; -} - -static int add_np1_fd(int fd, - cep_id_t cep_id, - enum qos_cube qos) -{ - struct np1_flow * flow; - - flow = malloc(sizeof(*flow)); - if (flow == NULL) - return -1; - - flow->cep_id = cep_id; - flow->qos = qos; - flow->fd = fd; - - fmgr.np1_flows[fd] = flow; - fmgr.np1_flows_cep[cep_id] = flow; - - return 0; -} - static void * fmgr_nm1_acceptor(void * o) { int fd; char * ae_name; + qoscube_t cube; qosspec_t qs; (void) o; @@ -150,13 +99,13 @@ static void * fmgr_nm1_acceptor(void * o) if (!(strcmp(ae_name, MGMT_AE) == 0 || strcmp(ae_name, DT_AE) == 0)) { if (flow_alloc_resp(fd, -1)) - LOG_ERR("Failed to reply to flow allocation."); + LOG_WARN("Failed to reply to flow allocation."); flow_dealloc(fd); continue; } if (flow_alloc_resp(fd, 0)) { - LOG_ERR("Failed to reply to flow allocation."); + LOG_WARN("Failed to reply to flow allocation."); flow_dealloc(fd); continue; } @@ -166,17 +115,19 @@ static void * fmgr_nm1_acceptor(void * o) if (strcmp(ae_name, MGMT_AE) == 0) { if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand fd to RIB."); + LOG_WARN("Failed to hand fd to RIB."); flow_dealloc(fd); continue; } } else { - /* FIXME: Pass correct QoS cube */ - if (add_nm1_fd(fd, QOS_CUBE_BE)) { - LOG_ERR("Failed to add fd to list."); + ipcp_flow_get_qoscube(fd, &cube); + if (flow_set_add(fmgr.nm1_set[cube], fd)) { + LOG_WARN("Failed to add fd."); flow_dealloc(fd); continue; } + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; } free(ae_name); @@ -189,44 +140,40 @@ static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct np1_flow * flow; int fd; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; + + int i = 0; + int ret; (void) o; while (true) { - int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); + /* FIXME: replace with scheduling policy call */ + i = (i + 1) % QOS_CUBE_MAX; + + ret = flow_event_wait(fmgr.np1_set[i], + fmgr.np1_fqs[i], + &timeout); if (ret == -ETIMEDOUT) continue; if (ret < 0) { - LOG_ERR("Event error: %d.", ret); + LOG_WARN("Event error: %d.", ret); continue; } - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); + LOG_WARN("Failed to read SDU from fd %d.", fd); continue; } pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to retrieve flow."); - continue; - } - - if (frct_i_write_sdu(flow->cep_id, sdb)) { + if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); - LOG_ERR("Failed to hand SDU to FRCT."); + LOG_WARN("Failed to hand SDU to FRCT."); continue; } @@ -244,14 +191,18 @@ void * fmgr_nm1_sdu_reader(void * o) struct shm_du_buff * sdb; struct pci * pci; int fd; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; + int i = 0; + int ret; (void) o; while (true) { - int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); + /* FIXME: replace with scheduling policy call */ + i = (i + 1) % QOS_CUBE_MAX; + + ret = flow_event_wait(fmgr.nm1_set[i], + fmgr.nm1_fqs[i], + &timeout); if (ret == -ETIMEDOUT) continue; @@ -260,7 +211,7 @@ void * fmgr_nm1_sdu_reader(void * o) continue; } - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { LOG_ERR("Failed to read SDU from fd %d.", fd); continue; @@ -320,52 +271,55 @@ void * fmgr_nm1_sdu_reader(void * o) return (void *) 0; } -int fmgr_init() +static void fmgr_destroy_flows(void) { int i; - fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS); - if (fmgr.nm1_flows == NULL) - return -1; - - fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS); - if (fmgr.np1_flows == NULL) { - free(fmgr.nm1_flows); - return -1; + for (i = 0; i < QOS_CUBE_MAX; ++i) { + flow_set_destroy(fmgr.nm1_set[i]); + flow_set_destroy(fmgr.np1_set[i]); + fqueue_destroy(fmgr.nm1_fqs[i]); + fqueue_destroy(fmgr.np1_fqs[i]); } +} - fmgr.np1_flows_cep = - malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS); - if (fmgr.np1_flows_cep == NULL) { - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; - } +int fmgr_init() +{ + int i; - for (i = 0; i < IRMD_MAX_FLOWS; i++) { - fmgr.nm1_flows[i] = NULL; - fmgr.np1_flows[i] = NULL; - fmgr.np1_flows_cep[i] = NULL; - } + for (i = 0; i < AP_MAX_FLOWS; ++i) + fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; + + for (i = 0; i < IPCPD_MAX_CONNS; ++i) + fmgr.np1_cep_id_to_fd[i] = -1; pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); - fmgr.np1_set = flow_set_create(); - if (fmgr.np1_set == NULL) { - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; - } + for (i = 0; i < QOS_CUBE_MAX; ++i) { + fmgr.np1_set[i] = flow_set_create(); + if (fmgr.np1_set == NULL) { + fmgr_destroy_flows(); + return -1; + } - fmgr.nm1_set = flow_set_create(); - if (fmgr.nm1_set == NULL) { - flow_set_destroy(fmgr.np1_set); - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; + fmgr.np1_fqs[i] = fqueue_create(); + if (fmgr.np1_fqs[i] == NULL) { + fmgr_destroy_flows(); + return -1; + } + + fmgr.nm1_set[i] = flow_set_create(); + if (fmgr.nm1_set == NULL) { + fmgr_destroy_flows(); + return -1; + } + + fmgr.nm1_fqs[i] = fqueue_create(); + if (fmgr.nm1_fqs[i] == NULL) { + fmgr_destroy_flows(); + return -1; + } } pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); @@ -378,6 +332,7 @@ int fmgr_init() int fmgr_fini() { int i; + int j; pthread_cancel(fmgr.nm1_flow_acceptor); pthread_cancel(fmgr.np1_sdu_reader); @@ -387,29 +342,25 @@ int fmgr_fini() pthread_join(fmgr.np1_sdu_reader, NULL); pthread_join(fmgr.nm1_sdu_reader, NULL); - for (i = 0; i < IRMD_MAX_FLOWS; i++) { - if (fmgr.nm1_flows[i] == NULL) - continue; - flow_dealloc(fmgr.nm1_flows[i]->fd); - free(fmgr.nm1_flows[i]); - } + for (i = 0; i < AP_MAX_FLOWS; ++i) + for (j = 0; j < QOS_CUBE_MAX; ++j) + if (flow_set_has(fmgr.nm1_set[j], i)) { + flow_dealloc(i); + flow_set_del(fmgr.nm1_set[j], i); + } pthread_rwlock_destroy(&fmgr.nm1_flows_lock); pthread_rwlock_destroy(&fmgr.np1_flows_lock); - flow_set_destroy(fmgr.nm1_set); - flow_set_destroy(fmgr.np1_set); - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); + fmgr_destroy_flows(); return 0; } -int fmgr_np1_alloc(int fd, - char * dst_ap_name, - char * src_ae_name, - enum qos_cube qos) +int fmgr_np1_alloc(int fd, + char * dst_ap_name, + char * src_ae_name, + qoscube_t qos) { cep_id_t cep_id; buffer_t buf; @@ -478,10 +429,8 @@ int fmgr_np1_alloc(int fd, free(ro_data); - if (add_np1_fd(fd, cep_id, qos)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } + fmgr.np1_fd_to_cep_id[fd] = cep_id; + fmgr.np1_cep_id_to_fd[cep_id] = fd; pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -491,16 +440,13 @@ int fmgr_np1_alloc(int fd, /* Call under np1_flows lock */ static int np1_flow_dealloc(int fd) { - struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; + qoscube_t cube; - flow_set_del(fmgr.np1_set, fd); - - flow = fmgr.np1_flows[fd]; - if (flow == NULL) - return -1; + ipcp_flow_get_qoscube(fd, &cube); + flow_set_del(fmgr.np1_set[cube], fd); msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; @@ -510,16 +456,15 @@ static int np1_flow_dealloc(int fd) buf.data = malloc(buf.len); if (buf.data == NULL) - return -1; + return -ENOMEM; flow_alloc_msg__pack(&msg, buf.data); - ret = frct_i_destroy(flow->cep_id, &buf); + ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - fmgr.np1_flows[fd] = NULL; - fmgr.np1_flows_cep[flow->cep_id] = NULL; + fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; + fmgr.np1_fd_to_cep_id[fd] = -1; - free(flow); free(buf.data); return ret; @@ -527,48 +472,39 @@ static int np1_flow_dealloc(int fd) int fmgr_np1_alloc_resp(int fd, int response) { - struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } - msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; msg.response = response; msg.has_response = true; buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); + if (buf.len == 0) return -1; - } buf.data = malloc(buf.len); - if (buf.data == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } + if (buf.data == NULL) + return -ENOMEM; flow_alloc_msg__pack(&msg, buf.data); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + if (response < 0) { - frct_i_destroy(flow->cep_id, &buf); + frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); free(buf.data); - fmgr.np1_flows[fd] = NULL; - fmgr.np1_flows_cep[flow->cep_id] = NULL; - free(flow); + fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] + = INVALID_CEP_ID; + fmgr.np1_fd_to_cep_id[fd] = -1; } else { - if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { + qoscube_t cube; + ipcp_flow_get_qoscube(fd, &cube); + if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } - flow_set_add(fmgr.np1_set, fd); + flow_set_add(fmgr.np1_set[cube], fd); } pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -581,27 +517,25 @@ int fmgr_np1_dealloc(int fd) int ret; pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + ret = np1_flow_dealloc(fd); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return ret; } -int fmgr_np1_post_buf(cep_id_t cep_id, - buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf) { - struct np1_flow * flow; int ret = 0; int fd; flow_alloc_msg_t * msg; - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + qoscube_t cube; /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } @@ -612,51 +546,41 @@ int fmgr_np1_post_buf(cep_id_t cep_id, msg->dst_name, msg->src_ae_name); if (fd < 0) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to get fd for flow."); return -1; } - if (add_np1_fd(fd, cep_id, msg->qos_cube)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to add np1 flow."); - return -1; - } + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + + fmgr.np1_fd_to_cep_id[fd] = cep_id; + fmgr.np1_cep_id_to_fd[cep_id] = fd; + + pthread_rwlock_unlock(&fmgr.np1_flows_lock); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("No such flow in flow manager."); - return -1; - } + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - ret = ipcp_flow_alloc_reply(flow->fd, msg->response); + fd = fmgr.np1_cep_id_to_fd[cep_id]; + ret = ipcp_flow_alloc_reply(fd, msg->response); if (msg->response < 0) { - fmgr.np1_flows[flow->fd] = NULL; - fmgr.np1_flows_cep[cep_id] = NULL; - free(flow); + fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; + fmgr.np1_cep_id_to_fd[cep_id] = -1; } else { - flow_set_add(fmgr.np1_set, flow->fd); + ipcp_flow_get_qoscube(fd, &cube); + flow_set_add(fmgr.np1_set[cube], + fmgr.np1_cep_id_to_fd[cep_id]); } + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("No such flow in flow manager."); - return -1; - } - - flow_set_del(fmgr.np1_set, flow->fd); - - ret = flow_dealloc(flow->fd); + fd = fmgr.np1_cep_id_to_fd[cep_id]; + ipcp_flow_get_qoscube(fd, &cube); + flow_set_del(fmgr.np1_set[cube], fd); + ret = flow_dealloc(fd); break; default: LOG_ERR("Got an unknown flow allocation message."); @@ -674,18 +598,12 @@ int fmgr_np1_post_buf(cep_id_t cep_id, int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb) { - struct np1_flow * flow; + int fd; pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - LOG_ERR("Failed to find N flow."); - return -1; - } - - if (ipcp_flow_write(flow->fd, sdb)) { + fd = fmgr.np1_cep_id_to_fd[cep_id]; + if (ipcp_flow_write(fd, sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to hand SDU to N flow."); return -1; @@ -704,19 +622,19 @@ int fmgr_nm1_mgmt_flow(char * dst_name) /* FIXME: Request retransmission. */ fd = flow_alloc(dst_name, MGMT_AE, NULL); if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); + LOG_ERR("Failed to allocate flow to %s.", dst_name); return -1; } result = flow_alloc_res(fd); if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", + LOG_ERR("Result of flow allocation to %s is %d.", dst_name, result); return -1; } if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand file descriptor to RIB manager"); + LOG_ERR("Failed to hand file descriptor to RIB manager."); flow_dealloc(fd); return -1; } @@ -724,8 +642,7 @@ int fmgr_nm1_mgmt_flow(char * dst_name) return 0; } -int fmgr_nm1_dt_flow(char * dst_name, - enum qos_cube qos) +int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos) { int fd; int result; @@ -733,28 +650,25 @@ int fmgr_nm1_dt_flow(char * dst_name, /* FIXME: Map qos cube on correct QoS. */ fd = flow_alloc(dst_name, DT_AE, NULL); if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); + LOG_ERR("Failed to allocate flow to %s.", dst_name); return -1; } result = flow_alloc_res(fd); if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); + LOG_ERR("Allocate flow to %s result %d.", dst_name, result); return -1; } - if (add_nm1_fd(fd, qos)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); - return -1; - } + flow_set_add(fmgr.nm1_set[qos], fd); + + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; return 0; } -int fmgr_nm1_write_sdu(struct pci * pci, - struct shm_du_buff * sdb) +int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb) { if (pci == NULL || sdb == NULL) return -1; diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index b0738a0c..1e9bcc18 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -42,7 +42,6 @@ #include "dt_const.h" #include "frct.h" #include "ipcp.h" -#include "cdap_request.h" #include "ro.h" #include "path.h" #include "dir.h" @@ -85,22 +84,28 @@ struct rnode { }; struct mgmt_flow { + struct list_head next; + struct cdap * instance; int fd; - struct list_head next; + + pthread_t handler; }; struct ro_sub { + struct list_head next; + int sid; + char * name; struct ro_sub_ops * ops; - struct list_head next; }; struct ro_id { + struct list_head next; + uint64_t seqno; char * full_name; - struct list_head next; }; struct { @@ -124,9 +129,6 @@ struct { struct list_head flows; pthread_rwlock_t flows_lock; - struct list_head cdap_reqs; - pthread_mutex_t cdap_reqs_lock; - struct addr_auth * addr_auth; enum pol_addr_auth addr_auth_type; } rib; @@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name, pthread_rwlock_unlock(&ipcpi.state_lock); } -/* We only have a create operation for now */ +/* We only have a create operation for now. */ static struct ro_sub_ops ribmgr_sub_ops = { .ro_created = ribmgr_ro_created, .ro_updated = NULL, @@ -303,9 +305,12 @@ static void ro_delete_timer(void * o) { char * name = (char *) o; - if (ribmgr_ro_delete(name)) { + pthread_mutex_lock(&rib.ro_lock); + + if (ribmgr_ro_delete(name)) LOG_ERR("Failed to delete %s.", name); - } + + pthread_mutex_unlock(&rib.ro_lock); } static struct rnode * ribmgr_ro_create(const char * name, @@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char * name, node = node->child; sibling = false; - /* Search horizontally */ + /* Search horizontally. */ while (node != NULL) { if (strcmp(node->name, token) == 0) { break; @@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char * name, LOG_DBG("Created RO with name %s.", name); - if (!(attr.expiry.tv_sec == 0 && - attr.expiry.tv_nsec == 0)) { + if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) { timeout = attr.expiry.tv_sec * 1000 + attr.expiry.tv_nsec / MILLION; - if (timerwheel_add(rib.wheel, ro_delete_timer, - new->full_name, strlen(new->full_name) + 1, - timeout)) { + if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name, + strlen(new->full_name) + 1, timeout)) LOG_ERR("Failed to add deletion timer of RO."); - } } return new; @@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name, return node; } -/* Call while holding cdap_reqs_lock */ -/* FIXME: better not to call blocking functions under any lock */ -int cdap_result_wait(struct cdap * instance, - enum cdap_opcode code, - char * name, - int invoke_id) -{ - struct cdap_request * req; - int ret; - char * name_dup = strdup(name); - if (name_dup == NULL) - return -1; - - req = cdap_request_create(code, name_dup, invoke_id, instance); - if (req == NULL) { - free(name_dup); - return -1; - } - - list_add(&req->next, &rib.cdap_reqs); - - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - ret = cdap_request_wait(req); - - pthread_mutex_lock(&rib.cdap_reqs_lock); - - if (ret == -1) /* should only be on ipcp shutdown */ - LOG_DBG("Waiting CDAP request destroyed."); - - if (ret == -ETIMEDOUT) - LOG_ERR("CDAP Request timed out."); - - if (ret) - LOG_DBG("Unknown error code: %d.", ret); - - if (!ret) - ret = req->result; - - list_del(&req->next); - cdap_request_destroy(req); - - return ret; -} - static int write_ro_msg(struct cdap * neighbor, ro_msg_t * msg, char * name, @@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap * neighbor, { uint8_t * data; size_t len; - int iid = 0; + cdap_key_t key; + int ret; len = ro_msg__get_packed_size(msg); if (len == 0) @@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap * neighbor, ro_msg__pack(msg, data); - pthread_mutex_lock(&rib.cdap_reqs_lock); - iid = cdap_send_request(neighbor, code, - name, data, len, 0); - if (iid < 0) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + key = cdap_request_send(neighbor, code, name, data, len, 0); + if (key < 0) { + LOG_ERR("Failed to send CDAP request."); free(data); return -1; } - if (cdap_result_wait(neighbor, code, name, iid)) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); - free(data); - LOG_ERR("Remote did not receive RIB object."); + free(data); + + ret = cdap_reply_wait(neighbor, key, NULL, NULL); + if (ret < 0) { + LOG_ERR("CDAP command with code %d and name %s failed: %d.", + code, name, ret); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - free(data); return 0; } @@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap * neighbor, int ribmgr_init() { INIT_LIST_HEAD(&rib.flows); - INIT_LIST_HEAD(&rib.cdap_reqs); INIT_LIST_HEAD(&rib.subs); INIT_LIST_HEAD(&rib.ro_ids); @@ -540,17 +495,9 @@ int ribmgr_init() return -1; } - if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { - LOG_ERR("Failed to initialize mutex."); - pthread_rwlock_destroy(&rib.flows_lock); - free(rib.root); - return -1; - } - if (pthread_mutex_init(&rib.ro_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); free(rib.root); return -1; } @@ -558,7 +505,6 @@ int ribmgr_init() if (pthread_mutex_init(&rib.subs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); free(rib.root); return -1; @@ -567,7 +513,6 @@ int ribmgr_init() if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); free(rib.root); @@ -578,7 +523,6 @@ int ribmgr_init() if (rib.sids == NULL) { LOG_ERR("Failed to create bitmap."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -591,7 +535,6 @@ int ribmgr_init() LOG_ERR("Failed to create timerwheel."); bmp_destroy(rib.sids); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -605,7 +548,6 @@ int ribmgr_init() timerwheel_destroy(rib.wheel); bmp_destroy(rib.sids); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -633,16 +575,6 @@ int ribmgr_fini() struct list_head * pos = NULL; struct list_head * n = NULL; - pthread_mutex_lock(&rib.cdap_reqs_lock); - list_for_each_safe(pos, n, &rib.cdap_reqs) { - struct cdap_request * req = - list_entry(pos, struct cdap_request, next); - free(req->name); - list_del(&req->next); - free(req); - } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - pthread_rwlock_wrlock(&rib.flows_lock); list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = @@ -668,7 +600,6 @@ int ribmgr_fini() timerwheel_destroy(rib.wheel); pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -676,49 +607,8 @@ int ribmgr_fini() return 0; } -static int ribmgr_cdap_reply(struct cdap * instance, - int invoke_id, - int result, - uint8_t * data, - size_t len) -{ - struct list_head * pos, * n = NULL; - - /* We never perform reads on other RIBs */ - (void) data; - (void) len; - - pthread_mutex_lock(&rib.cdap_reqs_lock); - - list_for_each_safe(pos, n, &rib.cdap_reqs) { - struct cdap_request * req = - list_entry(pos, struct cdap_request, next); - if (req->instance == instance && - req->invoke_id == invoke_id && - req->state == REQ_PENDING) { - if (result != 0) - LOG_ERR("CDAP command with code %d and name %s " - "failed with error %d", - req->code, req->name, result); - else - LOG_DBG("CDAP command with code %d and name %s " - "executed succesfully", - req->code, req->name); - - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - cdap_request_respond(req, result); - - pthread_mutex_lock(&rib.cdap_reqs_lock); - } - } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - return 0; -} - static int ribmgr_cdap_create(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name, ro_msg_t * msg) { @@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance, struct ro_attr attr; struct rnode * node; + assert(instance); + ro_attr_init(&attr); attr.expiry.tv_sec = msg->sec; attr.expiry.tv_nsec = msg->nsec; @@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance, ro_data = malloc(msg->value.len); if (ro_data == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } memcpy(ro_data, msg->value.data, msg->value.len); @@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance, node = ribmgr_ro_create(name, attr, ro_data, msg->value.len); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); free(ro_data); return -1; } @@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { LOG_ERR("Failed to send reply to create request."); return -1; } @@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance, } static int ribmgr_cdap_delete(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name) { struct list_head * p = NULL; @@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, if (ribmgr_ro_delete(name)) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } @@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { + if (cdap_reply_send(instance, key, 0, NULL, 0)) { LOG_ERR("Failed to send reply to create request."); return -1; } @@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, } static int ribmgr_cdap_write(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name, ro_msg_t * msg, uint32_t flags) @@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance, ro_data = malloc(msg->value.len); if (ro_data == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } memcpy(ro_data, msg->value.data, msg->value.len); @@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance, if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); free(ro_data); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } node->seqno = msg->seqno; @@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { LOG_ERR("Failed to send reply to write request."); return -1; } @@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance, return 0; } -static int ribmgr_enrol_sync(struct cdap * instance, - struct rnode * node) +static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node) { int ret = 0; @@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance, } static int ribmgr_cdap_start(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name) { - int iid = 0; - - pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_OPERATIONAL && - strcmp(name, ENROLLMENT) == 0) { + if (strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); - if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { + pthread_rwlock_wrlock(&ipcpi.state_lock); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("IPCP in wrong state."); + return -1; + } + + if (cdap_reply_send(instance, key, 0, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } - /* Loop through rtree and send correct objects */ - LOG_DBGF("Sending ROs that need to be sent on enrolment..."); + /* Loop through rtree and send correct objects. */ + LOG_DBG("Sending ROs that need to be sent on enrolment..."); pthread_mutex_lock(&rib.ro_lock); if (ribmgr_enrol_sync(instance, rib.root->child)) { @@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance, LOG_ERR("Failed to sync part of the RIB."); return -1; } + pthread_mutex_unlock(&rib.ro_lock); LOG_DBGF("Sending stop enrollment..."); - pthread_mutex_lock(&rib.cdap_reqs_lock); - - iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT, + key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT, NULL, 0, 0); - if (iid < 0) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + if (key < 0) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send stop of enrollment."); return -1; } - if (cdap_result_wait(instance, CDAP_STOP, - ENROLLMENT, iid)) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + if (cdap_reply_wait(instance, key, NULL, NULL)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Remote failed to complete enrollment."); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); + + pthread_rwlock_unlock(&ipcpi.state_lock); } else { - if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Failed to send reply to start request."); - return -1; - } + LOG_WARN("Request to start unknown operation."); + if (cdap_reply_send(instance, key, -1, NULL, 0)) + LOG_ERR("Failed to send negative reply."); } - pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } -static int ribmgr_cdap_stop(struct cdap * instance, - int invoke_id, - char * name) +static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name) { int ret = 0; pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_CONFIG && - strcmp(name, ENROLLMENT) == 0) { + if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); ipcp_set_state(IPCP_BOOTING); } else ret = -1; - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; @@ -1028,8 +914,7 @@ static void ro_id_delete(void * o) pthread_mutex_unlock(&rib.ro_ids_lock); } -static int ro_id_create(char * name, - ro_msg_t * msg) +static int ro_id_create(char * name, ro_msg_t * msg) { struct ro_id * tmp; @@ -1062,105 +947,113 @@ static int ro_id_create(char * name, return 0; } -static int ribmgr_cdap_request(struct cdap * instance, - int invoke_id, - enum cdap_opcode opcode, - char * name, - uint8_t * data, - size_t len, - uint32_t flags) +static void * cdap_req_handler(void * o) { + struct cdap * instance = (struct cdap *) o; + enum cdap_opcode opcode; + char * name; + uint8_t * data; + size_t len; + uint32_t flags; ro_msg_t * msg; - int ret = -1; struct list_head * p = NULL; - if (opcode == CDAP_START) - return ribmgr_cdap_start(instance, - invoke_id, - name); - else if (opcode == CDAP_STOP) - return ribmgr_cdap_stop(instance, - invoke_id, - name); - - msg = ro_msg__unpack(NULL, len, data); - if (msg == NULL) { - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - LOG_ERR("Failed to unpack RO message"); - return -1; - } + assert(instance); - pthread_mutex_lock(&rib.ro_ids_lock); - list_for_each(p, &rib.ro_ids) { - struct ro_id * e = list_entry(p, struct ro_id, next); + while (true) { + cdap_key_t key = cdap_request_wait(instance, + &opcode, + &name, + &data, + &len, + &flags); + assert(key >= 0); - if (strcmp(e->full_name, name) == 0 && - e->seqno == msg->seqno) { - pthread_mutex_unlock(&rib.ro_ids_lock); - ro_msg__free_unpacked(msg, NULL); - cdap_send_reply(instance, invoke_id, 0, NULL, 0); - LOG_DBG("Already received this RO."); - return 0; + if (opcode == CDAP_START) { + if (ribmgr_cdap_start(instance, key, name)) + LOG_WARN("CDAP start failed."); + continue; + } + else if (opcode == CDAP_STOP) { + if (ribmgr_cdap_stop(instance, key, name)) + LOG_WARN("CDAP stop failed."); + continue; } - } - pthread_mutex_unlock(&rib.ro_ids_lock); - - if (opcode == CDAP_CREATE) { - ret = ribmgr_cdap_create(instance, - invoke_id, - name, - msg); - } else if (opcode == CDAP_WRITE) { - ret = ribmgr_cdap_write(instance, - invoke_id, - name, msg, - flags); - - } else if (opcode == CDAP_DELETE) { - ret = ribmgr_cdap_delete(instance, - invoke_id, - name); - } else { - LOG_INFO("Unsupported opcode received."); - ro_msg__free_unpacked(msg, NULL); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - return -1; - } - if (ro_id_create(name, msg)) { - LOG_ERR("Failed to create RO id."); - return -1; - } + msg = ro_msg__unpack(NULL, len, data); + if (msg == NULL) { + cdap_reply_send(instance, key, -1, NULL, 0); + LOG_WARN("Failed to unpack RO message"); + continue; + } - if (msg->recv_set == ALL_MEMBERS) { - pthread_rwlock_rdlock(&rib.flows_lock); - list_for_each(p, &rib.flows) { - struct mgmt_flow * e = - list_entry(p, struct mgmt_flow, next); + pthread_mutex_lock(&rib.ro_ids_lock); + list_for_each(p, &rib.ro_ids) { + struct ro_id * e = list_entry(p, struct ro_id, next); - /* Don't send it back */ - if (e->instance == instance) + if (strcmp(e->full_name, name) == 0 && + e->seqno == msg->seqno) { + pthread_mutex_unlock(&rib.ro_ids_lock); + ro_msg__free_unpacked(msg, NULL); + cdap_reply_send(instance, key, 0, NULL, 0); + LOG_DBG("Already received this RO."); continue; + } + } + pthread_mutex_unlock(&rib.ro_ids_lock); - if (write_ro_msg(e->instance, msg, name, opcode)) { - LOG_ERR("Failed to send to a neighbor."); - pthread_rwlock_unlock(&rib.flows_lock); + if (opcode == CDAP_CREATE) { + if (ribmgr_cdap_create(instance, key, name, msg)) { + LOG_WARN("CDAP create failed."); ro_msg__free_unpacked(msg, NULL); - return -1; + continue; } + } else if (opcode == CDAP_WRITE) { + if (ribmgr_cdap_write(instance, key, name, + msg, flags)) { + LOG_WARN("CDAP write failed."); + ro_msg__free_unpacked(msg, NULL); + continue; + } + } else if (opcode == CDAP_DELETE) { + if (ribmgr_cdap_delete(instance, key, name)) { + LOG_WARN("CDAP delete failed."); + ro_msg__free_unpacked(msg, NULL); + continue; + } + } else { + LOG_INFO("Unsupported opcode received."); + ro_msg__free_unpacked(msg, NULL); + cdap_reply_send(instance, key, -1, NULL, 0); + continue; } - pthread_rwlock_unlock(&rib.flows_lock); - } - ro_msg__free_unpacked(msg, NULL); + if (ro_id_create(name, msg)) { + LOG_WARN("Failed to create RO id."); + ro_msg__free_unpacked(msg, NULL); + continue; + } - return ret; -} + if (msg->recv_set == ALL_MEMBERS) { + pthread_rwlock_rdlock(&rib.flows_lock); + list_for_each(p, &rib.flows) { + struct mgmt_flow * e = + list_entry(p, struct mgmt_flow, next); -static struct cdap_ops ribmgr_cdap_ops = { - .cdap_reply = ribmgr_cdap_reply, - .cdap_request = ribmgr_cdap_request -}; + /* Don't send it back. */ + if (e->instance == instance) + continue; + + if (write_ro_msg(e->instance, msg, + name, opcode)) + LOG_WARN("Failed to send to neighbor."); + } + pthread_rwlock_unlock(&rib.flows_lock); + } + + ro_msg__free_unpacked(msg, NULL); + } +} int ribmgr_add_flow(int fd) { @@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd) flow = malloc(sizeof(*flow)); if (flow == NULL) - return -1; + return -ENOMEM; - instance = cdap_create(&ribmgr_cdap_ops, fd); + instance = cdap_create(fd); if (instance == NULL) { LOG_ERR("Failed to create CDAP instance"); free(flow); @@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd) flow->instance = instance; flow->fd = fd; + if (pthread_create(&flow->handler, NULL, + cdap_req_handler, instance)) { + LOG_ERR("Failed to start handler thread for mgt flow."); + free(flow); + return -1; + } + pthread_rwlock_wrlock(&rib.flows_lock); + list_add(&flow->next, &rib.flows); + pthread_rwlock_unlock(&rib.flows_lock); return 0; @@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd) struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (flow->fd == fd) { + pthread_cancel(flow->handler); if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); @@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf) size_t len = 0; struct ro_attr attr; - if (conf == NULL || - conf->type != IPCP_NORMAL) { + if (conf == NULL || conf->type != IPCP_NORMAL) { LOG_ERR("Bad DIF configuration."); - return -1; + return -EINVAL; } ro_attr_init(&attr); @@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf) len = static_info_msg__get_packed_size(&stat_info); if (len == 0) { LOG_ERR("Failed to get size of static information."); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf) data = malloc(len); if (data == NULL) { LOG_ERR("Failed to allocate memory."); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf) attr, data, len) == NULL) { LOG_ERR("Failed to create static info RO."); free(data); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf) if (dir_init()) { LOG_ERR("Failed to init directory"); ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf) LOG_ERR("Failed to initialize FRCT."); dir_fini(); ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1296,12 +1193,14 @@ int ribmgr_enrol(void) { struct cdap * instance = NULL; struct mgmt_flow * flow; - int iid = 0; + cdap_key_t key; + int ret; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_INIT) { pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("IPCP in wrong state."); return -1; } @@ -1312,36 +1211,31 @@ int ribmgr_enrol(void) ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("No flows in RIB."); return -1; } - flow = list_entry((&rib.flows)->next, struct mgmt_flow, next); + flow = list_first_entry((&rib.flows), struct mgmt_flow, next); instance = flow->instance; - pthread_mutex_lock(&rib.cdap_reqs_lock); - iid = cdap_send_request(instance, - CDAP_START, - ENROLLMENT, - NULL, 0, 0); - if (iid < 0) { + key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); + if (key < 0) { ipcp_set_state(IPCP_INIT); - pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); return -1; } - if (cdap_result_wait(instance, CDAP_START, - ENROLLMENT, iid)) { + ret = cdap_reply_wait(instance, key, NULL, NULL); + if (ret) { ipcp_set_state(IPCP_INIT); - pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Failed to start enrollment."); + LOG_ERR("Failed to enroll: %d.", ret); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1351,9 +1245,10 @@ int ribmgr_enrol(void) int ribmgr_start_policies(void) { pthread_rwlock_rdlock(&ipcpi.state_lock); + if (ipcp_get_state() != IPCP_BOOTING) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Cannot start policies in wrong state"); + LOG_ERR("Cannot start policies in wrong state."); return -1; } pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void) } rib.address = rib.addr_auth->address(); - LOG_DBG("IPCP has address %lu", (unsigned long) rib.address); + LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address); return 0; } @@ -1380,22 +1275,21 @@ uint64_t ribmgr_address() return rib.address; } -static int send_neighbors_ro(char * name, - ro_msg_t * msg, - enum cdap_opcode code) +static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code) { struct list_head * p = NULL; pthread_rwlock_rdlock(&rib.flows_lock); + list_for_each(p, &rib.flows) { struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); - if (write_ro_msg(e->instance, msg, name, code)) { - LOG_ERR("Failed to send to a neighbor."); pthread_rwlock_unlock(&rib.flows_lock); + LOG_ERR("Failed to send to a neighbor."); return -1; } } + pthread_rwlock_unlock(&rib.flows_lock); return 0; @@ -1499,9 +1393,7 @@ int ro_delete(const char * name) return 0; } -int ro_write(const char * name, - uint8_t * data, - size_t len) +int ro_write(const char * name, uint8_t * data, size_t len) { struct rnode * node; ro_msg_t msg = RO_MSG__INIT; @@ -1541,8 +1433,7 @@ int ro_write(const char * name, return 0; } -ssize_t ro_read(const char * name, - uint8_t ** data) +ssize_t ro_read(const char * name, uint8_t ** data) { struct rnode * node; ssize_t len; @@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name, return len; } -ssize_t ro_children(const char * name, - char *** children) +ssize_t ro_children(const char * name, char *** children) { struct rnode * node; struct rnode * child; @@ -1640,8 +1530,7 @@ bool ro_exists(const char * name) return found; } -int ro_subscribe(const char * name, - struct ro_sub_ops * ops) +int ro_subscribe(const char * name, struct ro_sub_ops * ops) { struct ro_sub * sub; int sid; -- cgit v1.2.3