From cbaa4a95cc6c74c7a2cfe8a5acaf7b4867fc343a Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 23 Dec 2016 18:06:33 +0100 Subject: logs: Print process id's to stdout --- include/ouroboros/logs.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/include/ouroboros/logs.h b/include/ouroboros/logs.h index 56eac068..6a16aca6 100644 --- a/include/ouroboros/logs.h +++ b/include/ouroboros/logs.h @@ -24,6 +24,7 @@ #ifndef OUROBOROS_LOGS_H #define OUROBOROS_LOGS_H +#include #include #ifndef OUROBOROS_PREFIX @@ -55,8 +56,10 @@ extern FILE * logfile; FMT ANSI_COLOR_RESET "\n", ##ARGS); \ fflush(logfile); \ } else { \ - printf(CLR OUROBOROS_PREFIX "(" LVL "): " \ - FMT ANSI_COLOR_RESET "\n", ##ARGS); \ + printf(CLR "==%05d== " \ + OUROBOROS_PREFIX "(" LVL "): " \ + FMT ANSI_COLOR_RESET "\n", getpid(), \ + ##ARGS); \ } \ } while (0) -- cgit v1.2.3 From f97dee45d3c1b0088aa8010a1c9d59593c3d0df0 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 23 Dec 2016 18:13:40 +0100 Subject: ipcpd, lib: Refactor normal ipcp and cdap Refactors the normal IPCP fmgr and ribmgr, and modifies the API for cdap so that no callbacks are needed. --- include/ouroboros/cdap.h | 46 ++-- include/ouroboros/config.h.in | 1 + include/ouroboros/ipcp-dev.h | 4 +- include/ouroboros/shared.h | 6 +- src/ipcpd/normal/CMakeLists.txt | 1 - src/ipcpd/normal/cdap_request.c | 157 ------------ src/ipcpd/normal/cdap_request.h | 68 ----- src/ipcpd/normal/fmgr.c | 390 ++++++++++++----------------- src/ipcpd/normal/ribmgr.c | 501 +++++++++++++++---------------------- src/lib/CMakeLists.txt | 1 + src/lib/cdap.c | 531 +++++++++++++++++++++++++++++----------- src/lib/cdap_req.c | 151 ++++++++++++ src/lib/cdap_req.h | 69 ++++++ 13 files changed, 990 insertions(+), 936 deletions(-) delete mode 100644 src/ipcpd/normal/cdap_request.c delete mode 100644 src/ipcpd/normal/cdap_request.h create mode 100644 src/lib/cdap_req.c create mode 100644 src/lib/cdap_req.h diff --git a/include/ouroboros/cdap.h b/include/ouroboros/cdap.h index 7312fb6f..89d598f9 100644 --- a/include/ouroboros/cdap.h +++ b/include/ouroboros/cdap.h @@ -30,6 +30,8 @@ #define F_SYNC 0x0001 +#define INVALID_CDAP_KEY -1 + enum cdap_opcode { CDAP_READ = 0, CDAP_WRITE, @@ -41,40 +43,36 @@ enum cdap_opcode { struct cdap; -/* Callback functions that work on the application's RIB */ -struct cdap_ops { - int (* cdap_request)(struct cdap * instance, - int invoke_id, - enum cdap_opcode opcode, - char * name, - uint8_t * data, - size_t len, - uint32_t flags); - - int (* cdap_reply)(struct cdap * instance, - int invoke_id, - int result, - uint8_t * data, - size_t len); -}; +typedef int32_t cdap_key_t; /* Assumes flow is blocking */ -struct cdap * cdap_create(struct cdap_ops * ops, - int fd); +struct cdap * cdap_create(int fd); + int cdap_destroy(struct cdap * instance); -/* Returns a positive invoke-id on success to be used in the callback */ -int cdap_send_request(struct cdap * instance, +cdap_key_t cdap_request_send(struct cdap * instance, enum cdap_opcode code, char * name, uint8_t * data, size_t len, uint32_t flags); -/* Can only be called following a callback function */ -int cdap_send_reply(struct cdap * instance, - int invoke_id, +int cdap_reply_wait(struct cdap * instance, + cdap_key_t key, + uint8_t ** data, + size_t * len); + +cdap_key_t cdap_request_wait(struct cdap * instance, + enum cdap_opcode * opcode, + char ** name, + uint8_t ** data, + size_t * len, + uint32_t * flags); + +int cdap_reply_send(struct cdap * instance, + cdap_key_t key, int result, uint8_t * data, size_t len); -#endif + +#endif /* OUROBOROS_CDAP_H */ diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 2417a5c0..4208c6d2 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -49,6 +49,7 @@ #define IRMD_MAX_FLOWS 4096 #define IRMD_THREADPOOL_SIZE 5 #define IPCPD_THREADPOOL_SIZE 3 +#define IPCPD_MAX_CONNS IRMD_MAX_FLOWS #define LOG_DIR "/@LOG_DIR@/" #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC #define PFT_SIZE 1 << 12 diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 19a66762..fd1e7478 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -45,7 +45,7 @@ void ipcp_flow_fini(int fd); void ipcp_flow_del(struct shm_du_buff * sdb); -int ipcp_flow_get_qoscube(int fd, - enum qos_cube * cube); +int ipcp_flow_get_qoscube(int fd, + qoscube_t * cube); #endif /* OUROBOROS_IPCP_DEV_H */ diff --git a/include/ouroboros/shared.h b/include/ouroboros/shared.h index c38b1bde..7b281cc2 100644 --- a/include/ouroboros/shared.h +++ b/include/ouroboros/shared.h @@ -24,10 +24,10 @@ #define OUROBOROS_SHARED_H /* FIXME: To be decided which QoS cubes we support */ -enum qos_cube { +typedef enum qos_cube { QOS_CUBE_BE = 0, QOS_CUBE_VIDEO, - QOS_MAX -}; + QOS_CUBE_MAX +} qoscube_t; #endif /* OUROBOROS_SHARED_H */ diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 67a7953b..5f85dd89 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -25,7 +25,6 @@ protobuf_generate_c(RO_SRCS RO_HDRS ro.proto) set(SOURCE_FILES # Add source files here addr_auth.c - cdap_request.c crc32.c dir.c fmgr.c diff --git a/src/ipcpd/normal/cdap_request.c b/src/ipcpd/normal/cdap_request.c deleted file mode 100644 index 8409b508..00000000 --- a/src/ipcpd/normal/cdap_request.c +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP - RIB Manager - CDAP request - * - * Sander Vrijders - * Dimitri Staessens - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include -#include -#include - -#include "cdap_request.h" - -#include - -struct cdap_request * cdap_request_create(enum cdap_opcode code, - char * name, - int invoke_id, - struct cdap * instance) -{ - struct cdap_request * creq = malloc(sizeof(*creq)); - pthread_condattr_t cattr; - - if (creq == NULL) - return NULL; - - creq->code = code; - creq->name = name; - creq->invoke_id = invoke_id; - creq->instance = instance; - creq->state = REQ_INIT; - creq->result = -1; - - pthread_condattr_init(&cattr); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - pthread_cond_init(&creq->cond, &cattr); - pthread_mutex_init(&creq->lock, NULL); - - INIT_LIST_HEAD(&creq->next); - - return creq; -} - -void cdap_request_destroy(struct cdap_request * creq) -{ - if (creq == NULL) - return; - - pthread_mutex_lock(&creq->lock); - - if (creq->state == REQ_DESTROY) { - pthread_mutex_unlock(&creq->lock); - return; - } - - if (creq->state == REQ_INIT) - creq->state = REQ_DONE; - - if (creq->state == REQ_PENDING) { - creq->state = REQ_DESTROY; - pthread_cond_broadcast(&creq->cond); - } - - while (creq->state != REQ_DONE) - pthread_cond_wait(&creq->cond, &creq->lock); - - pthread_mutex_unlock(&creq->lock); - - pthread_cond_destroy(&creq->cond); - pthread_mutex_destroy(&creq->lock); - - if (creq->name != NULL) - free(creq->name); - - free(creq); -} - -int cdap_request_wait(struct cdap_request * creq) -{ - struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000), - (CDAP_REPLY_TIMEOUT % 1000) * MILLION}; - struct timespec abstime; - int ret = -1; - - if (creq == NULL) - return -EINVAL; - - clock_gettime(CLOCK_REALTIME, &abstime); - ts_add(&abstime, &timeout, &abstime); - - pthread_mutex_lock(&creq->lock); - - if (creq->state != REQ_INIT) { - pthread_mutex_unlock(&creq->lock); - return -EINVAL; - } - - creq->state = REQ_PENDING; - - while (creq->state == REQ_PENDING) { - if ((ret = -pthread_cond_timedwait(&creq->cond, - &creq->lock, - &abstime)) == -ETIMEDOUT) { - break; - } - } - - if (creq->state == REQ_DESTROY) - ret = -1; - - creq->state = REQ_DONE; - pthread_cond_broadcast(&creq->cond); - - pthread_mutex_unlock(&creq->lock); - - return ret; -} - -void cdap_request_respond(struct cdap_request * creq, int response) -{ - if (creq == NULL) - return; - - pthread_mutex_lock(&creq->lock); - - if (creq->state != REQ_PENDING) { - pthread_mutex_unlock(&creq->lock); - return; - } - - creq->state = REQ_RESPONSE; - creq->result = response; - pthread_cond_broadcast(&creq->cond); - - while (creq->state == REQ_RESPONSE) - pthread_cond_wait(&creq->cond, &creq->lock); - - pthread_mutex_unlock(&creq->lock); -} diff --git a/src/ipcpd/normal/cdap_request.h b/src/ipcpd/normal/cdap_request.h deleted file mode 100644 index 9cccfda5..00000000 --- a/src/ipcpd/normal/cdap_request.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP - RIB Manager - CDAP request - * - * Sander Vrijders - * Dimitri Staessens - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H -#define OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H - -#include -#include -#include - -#include - -enum creq_state { - REQ_INIT = 0, - REQ_PENDING, - REQ_RESPONSE, - REQ_DONE, - REQ_DESTROY -}; - -struct cdap_request { - struct list_head next; - - enum cdap_opcode code; - char * name; - int invoke_id; - struct cdap * instance; - - int result; - - enum creq_state state; - pthread_cond_t cond; - pthread_mutex_t lock; -}; - -struct cdap_request * cdap_request_create(enum cdap_opcode code, - char * name, - int invoke_id, - struct cdap * instance); - -void cdap_request_destroy(struct cdap_request * creq); - -int cdap_request_wait(struct cdap_request * creq); - -void cdap_request_respond(struct cdap_request * creq, - int response); - -#endif /* OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8e416aa4..6684db7c 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -49,82 +49,31 @@ typedef FlowAllocMsg flow_alloc_msg_t; #define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */ -struct np1_flow { - int fd; - cep_id_t cep_id; - enum qos_cube qos; -}; - -struct nm1_flow { - int fd; - enum qos_cube qos; -}; - struct { - pthread_t nm1_flow_acceptor; - struct nm1_flow ** nm1_flows; + flow_set_t * nm1_set[QOS_CUBE_MAX]; + fqueue_t * nm1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t nm1_flows_lock; - flow_set_t * nm1_set; - pthread_t nm1_sdu_reader; - struct np1_flow ** np1_flows; - struct np1_flow ** np1_flows_cep; + flow_set_t * np1_set[QOS_CUBE_MAX]; + fqueue_t * np1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t np1_flows_lock; - flow_set_t * np1_set; + + cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; + int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; + + pthread_t nm1_flow_acceptor; + pthread_t nm1_sdu_reader; pthread_t np1_sdu_reader; /* FIXME: Replace with PFF */ int fd; } fmgr; -static int add_nm1_fd(int fd, - enum qos_cube qos) -{ - struct nm1_flow * tmp; - - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) - return -1; - - tmp->fd = fd; - tmp->qos = qos; - - pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); - fmgr.nm1_flows[fd] = tmp; - pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - - flow_set_add(fmgr.nm1_set, fd); - - /* FIXME: Temporary, until we have a PFF */ - fmgr.fd = fd; - - return 0; -} - -static int add_np1_fd(int fd, - cep_id_t cep_id, - enum qos_cube qos) -{ - struct np1_flow * flow; - - flow = malloc(sizeof(*flow)); - if (flow == NULL) - return -1; - - flow->cep_id = cep_id; - flow->qos = qos; - flow->fd = fd; - - fmgr.np1_flows[fd] = flow; - fmgr.np1_flows_cep[cep_id] = flow; - - return 0; -} - static void * fmgr_nm1_acceptor(void * o) { int fd; char * ae_name; + qoscube_t cube; qosspec_t qs; (void) o; @@ -150,13 +99,13 @@ static void * fmgr_nm1_acceptor(void * o) if (!(strcmp(ae_name, MGMT_AE) == 0 || strcmp(ae_name, DT_AE) == 0)) { if (flow_alloc_resp(fd, -1)) - LOG_ERR("Failed to reply to flow allocation."); + LOG_WARN("Failed to reply to flow allocation."); flow_dealloc(fd); continue; } if (flow_alloc_resp(fd, 0)) { - LOG_ERR("Failed to reply to flow allocation."); + LOG_WARN("Failed to reply to flow allocation."); flow_dealloc(fd); continue; } @@ -166,17 +115,19 @@ static void * fmgr_nm1_acceptor(void * o) if (strcmp(ae_name, MGMT_AE) == 0) { if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand fd to RIB."); + LOG_WARN("Failed to hand fd to RIB."); flow_dealloc(fd); continue; } } else { - /* FIXME: Pass correct QoS cube */ - if (add_nm1_fd(fd, QOS_CUBE_BE)) { - LOG_ERR("Failed to add fd to list."); + ipcp_flow_get_qoscube(fd, &cube); + if (flow_set_add(fmgr.nm1_set[cube], fd)) { + LOG_WARN("Failed to add fd."); flow_dealloc(fd); continue; } + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; } free(ae_name); @@ -189,44 +140,40 @@ static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct np1_flow * flow; int fd; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; + + int i = 0; + int ret; (void) o; while (true) { - int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); + /* FIXME: replace with scheduling policy call */ + i = (i + 1) % QOS_CUBE_MAX; + + ret = flow_event_wait(fmgr.np1_set[i], + fmgr.np1_fqs[i], + &timeout); if (ret == -ETIMEDOUT) continue; if (ret < 0) { - LOG_ERR("Event error: %d.", ret); + LOG_WARN("Event error: %d.", ret); continue; } - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); + LOG_WARN("Failed to read SDU from fd %d.", fd); continue; } pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to retrieve flow."); - continue; - } - - if (frct_i_write_sdu(flow->cep_id, sdb)) { + if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); - LOG_ERR("Failed to hand SDU to FRCT."); + LOG_WARN("Failed to hand SDU to FRCT."); continue; } @@ -244,14 +191,18 @@ void * fmgr_nm1_sdu_reader(void * o) struct shm_du_buff * sdb; struct pci * pci; int fd; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; + int i = 0; + int ret; (void) o; while (true) { - int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); + /* FIXME: replace with scheduling policy call */ + i = (i + 1) % QOS_CUBE_MAX; + + ret = flow_event_wait(fmgr.nm1_set[i], + fmgr.nm1_fqs[i], + &timeout); if (ret == -ETIMEDOUT) continue; @@ -260,7 +211,7 @@ void * fmgr_nm1_sdu_reader(void * o) continue; } - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { LOG_ERR("Failed to read SDU from fd %d.", fd); continue; @@ -320,52 +271,55 @@ void * fmgr_nm1_sdu_reader(void * o) return (void *) 0; } -int fmgr_init() +static void fmgr_destroy_flows(void) { int i; - fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS); - if (fmgr.nm1_flows == NULL) - return -1; - - fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS); - if (fmgr.np1_flows == NULL) { - free(fmgr.nm1_flows); - return -1; + for (i = 0; i < QOS_CUBE_MAX; ++i) { + flow_set_destroy(fmgr.nm1_set[i]); + flow_set_destroy(fmgr.np1_set[i]); + fqueue_destroy(fmgr.nm1_fqs[i]); + fqueue_destroy(fmgr.np1_fqs[i]); } +} - fmgr.np1_flows_cep = - malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS); - if (fmgr.np1_flows_cep == NULL) { - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; - } +int fmgr_init() +{ + int i; - for (i = 0; i < IRMD_MAX_FLOWS; i++) { - fmgr.nm1_flows[i] = NULL; - fmgr.np1_flows[i] = NULL; - fmgr.np1_flows_cep[i] = NULL; - } + for (i = 0; i < AP_MAX_FLOWS; ++i) + fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; + + for (i = 0; i < IPCPD_MAX_CONNS; ++i) + fmgr.np1_cep_id_to_fd[i] = -1; pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); - fmgr.np1_set = flow_set_create(); - if (fmgr.np1_set == NULL) { - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; - } + for (i = 0; i < QOS_CUBE_MAX; ++i) { + fmgr.np1_set[i] = flow_set_create(); + if (fmgr.np1_set == NULL) { + fmgr_destroy_flows(); + return -1; + } - fmgr.nm1_set = flow_set_create(); - if (fmgr.nm1_set == NULL) { - flow_set_destroy(fmgr.np1_set); - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; + fmgr.np1_fqs[i] = fqueue_create(); + if (fmgr.np1_fqs[i] == NULL) { + fmgr_destroy_flows(); + return -1; + } + + fmgr.nm1_set[i] = flow_set_create(); + if (fmgr.nm1_set == NULL) { + fmgr_destroy_flows(); + return -1; + } + + fmgr.nm1_fqs[i] = fqueue_create(); + if (fmgr.nm1_fqs[i] == NULL) { + fmgr_destroy_flows(); + return -1; + } } pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); @@ -378,6 +332,7 @@ int fmgr_init() int fmgr_fini() { int i; + int j; pthread_cancel(fmgr.nm1_flow_acceptor); pthread_cancel(fmgr.np1_sdu_reader); @@ -387,29 +342,25 @@ int fmgr_fini() pthread_join(fmgr.np1_sdu_reader, NULL); pthread_join(fmgr.nm1_sdu_reader, NULL); - for (i = 0; i < IRMD_MAX_FLOWS; i++) { - if (fmgr.nm1_flows[i] == NULL) - continue; - flow_dealloc(fmgr.nm1_flows[i]->fd); - free(fmgr.nm1_flows[i]); - } + for (i = 0; i < AP_MAX_FLOWS; ++i) + for (j = 0; j < QOS_CUBE_MAX; ++j) + if (flow_set_has(fmgr.nm1_set[j], i)) { + flow_dealloc(i); + flow_set_del(fmgr.nm1_set[j], i); + } pthread_rwlock_destroy(&fmgr.nm1_flows_lock); pthread_rwlock_destroy(&fmgr.np1_flows_lock); - flow_set_destroy(fmgr.nm1_set); - flow_set_destroy(fmgr.np1_set); - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); + fmgr_destroy_flows(); return 0; } -int fmgr_np1_alloc(int fd, - char * dst_ap_name, - char * src_ae_name, - enum qos_cube qos) +int fmgr_np1_alloc(int fd, + char * dst_ap_name, + char * src_ae_name, + qoscube_t qos) { cep_id_t cep_id; buffer_t buf; @@ -478,10 +429,8 @@ int fmgr_np1_alloc(int fd, free(ro_data); - if (add_np1_fd(fd, cep_id, qos)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } + fmgr.np1_fd_to_cep_id[fd] = cep_id; + fmgr.np1_cep_id_to_fd[cep_id] = fd; pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -491,16 +440,13 @@ int fmgr_np1_alloc(int fd, /* Call under np1_flows lock */ static int np1_flow_dealloc(int fd) { - struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; + qoscube_t cube; - flow_set_del(fmgr.np1_set, fd); - - flow = fmgr.np1_flows[fd]; - if (flow == NULL) - return -1; + ipcp_flow_get_qoscube(fd, &cube); + flow_set_del(fmgr.np1_set[cube], fd); msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; @@ -510,16 +456,15 @@ static int np1_flow_dealloc(int fd) buf.data = malloc(buf.len); if (buf.data == NULL) - return -1; + return -ENOMEM; flow_alloc_msg__pack(&msg, buf.data); - ret = frct_i_destroy(flow->cep_id, &buf); + ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - fmgr.np1_flows[fd] = NULL; - fmgr.np1_flows_cep[flow->cep_id] = NULL; + fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; + fmgr.np1_fd_to_cep_id[fd] = -1; - free(flow); free(buf.data); return ret; @@ -527,48 +472,39 @@ static int np1_flow_dealloc(int fd) int fmgr_np1_alloc_resp(int fd, int response) { - struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } - msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; msg.response = response; msg.has_response = true; buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); + if (buf.len == 0) return -1; - } buf.data = malloc(buf.len); - if (buf.data == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } + if (buf.data == NULL) + return -ENOMEM; flow_alloc_msg__pack(&msg, buf.data); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + if (response < 0) { - frct_i_destroy(flow->cep_id, &buf); + frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); free(buf.data); - fmgr.np1_flows[fd] = NULL; - fmgr.np1_flows_cep[flow->cep_id] = NULL; - free(flow); + fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] + = INVALID_CEP_ID; + fmgr.np1_fd_to_cep_id[fd] = -1; } else { - if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { + qoscube_t cube; + ipcp_flow_get_qoscube(fd, &cube); + if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } - flow_set_add(fmgr.np1_set, fd); + flow_set_add(fmgr.np1_set[cube], fd); } pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -581,27 +517,25 @@ int fmgr_np1_dealloc(int fd) int ret; pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + ret = np1_flow_dealloc(fd); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return ret; } -int fmgr_np1_post_buf(cep_id_t cep_id, - buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf) { - struct np1_flow * flow; int ret = 0; int fd; flow_alloc_msg_t * msg; - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + qoscube_t cube; /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } @@ -612,51 +546,41 @@ int fmgr_np1_post_buf(cep_id_t cep_id, msg->dst_name, msg->src_ae_name); if (fd < 0) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to get fd for flow."); return -1; } - if (add_np1_fd(fd, cep_id, msg->qos_cube)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to add np1 flow."); - return -1; - } + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + + fmgr.np1_fd_to_cep_id[fd] = cep_id; + fmgr.np1_cep_id_to_fd[cep_id] = fd; + + pthread_rwlock_unlock(&fmgr.np1_flows_lock); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("No such flow in flow manager."); - return -1; - } + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - ret = ipcp_flow_alloc_reply(flow->fd, msg->response); + fd = fmgr.np1_cep_id_to_fd[cep_id]; + ret = ipcp_flow_alloc_reply(fd, msg->response); if (msg->response < 0) { - fmgr.np1_flows[flow->fd] = NULL; - fmgr.np1_flows_cep[cep_id] = NULL; - free(flow); + fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; + fmgr.np1_cep_id_to_fd[cep_id] = -1; } else { - flow_set_add(fmgr.np1_set, flow->fd); + ipcp_flow_get_qoscube(fd, &cube); + flow_set_add(fmgr.np1_set[cube], + fmgr.np1_cep_id_to_fd[cep_id]); } + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("No such flow in flow manager."); - return -1; - } - - flow_set_del(fmgr.np1_set, flow->fd); - - ret = flow_dealloc(flow->fd); + fd = fmgr.np1_cep_id_to_fd[cep_id]; + ipcp_flow_get_qoscube(fd, &cube); + flow_set_del(fmgr.np1_set[cube], fd); + ret = flow_dealloc(fd); break; default: LOG_ERR("Got an unknown flow allocation message."); @@ -674,18 +598,12 @@ int fmgr_np1_post_buf(cep_id_t cep_id, int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb) { - struct np1_flow * flow; + int fd; pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - LOG_ERR("Failed to find N flow."); - return -1; - } - - if (ipcp_flow_write(flow->fd, sdb)) { + fd = fmgr.np1_cep_id_to_fd[cep_id]; + if (ipcp_flow_write(fd, sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to hand SDU to N flow."); return -1; @@ -704,19 +622,19 @@ int fmgr_nm1_mgmt_flow(char * dst_name) /* FIXME: Request retransmission. */ fd = flow_alloc(dst_name, MGMT_AE, NULL); if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); + LOG_ERR("Failed to allocate flow to %s.", dst_name); return -1; } result = flow_alloc_res(fd); if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", + LOG_ERR("Result of flow allocation to %s is %d.", dst_name, result); return -1; } if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand file descriptor to RIB manager"); + LOG_ERR("Failed to hand file descriptor to RIB manager."); flow_dealloc(fd); return -1; } @@ -724,8 +642,7 @@ int fmgr_nm1_mgmt_flow(char * dst_name) return 0; } -int fmgr_nm1_dt_flow(char * dst_name, - enum qos_cube qos) +int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos) { int fd; int result; @@ -733,28 +650,25 @@ int fmgr_nm1_dt_flow(char * dst_name, /* FIXME: Map qos cube on correct QoS. */ fd = flow_alloc(dst_name, DT_AE, NULL); if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); + LOG_ERR("Failed to allocate flow to %s.", dst_name); return -1; } result = flow_alloc_res(fd); if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); + LOG_ERR("Allocate flow to %s result %d.", dst_name, result); return -1; } - if (add_nm1_fd(fd, qos)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); - return -1; - } + flow_set_add(fmgr.nm1_set[qos], fd); + + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; return 0; } -int fmgr_nm1_write_sdu(struct pci * pci, - struct shm_du_buff * sdb) +int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb) { if (pci == NULL || sdb == NULL) return -1; diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index b0738a0c..1e9bcc18 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -42,7 +42,6 @@ #include "dt_const.h" #include "frct.h" #include "ipcp.h" -#include "cdap_request.h" #include "ro.h" #include "path.h" #include "dir.h" @@ -85,22 +84,28 @@ struct rnode { }; struct mgmt_flow { + struct list_head next; + struct cdap * instance; int fd; - struct list_head next; + + pthread_t handler; }; struct ro_sub { + struct list_head next; + int sid; + char * name; struct ro_sub_ops * ops; - struct list_head next; }; struct ro_id { + struct list_head next; + uint64_t seqno; char * full_name; - struct list_head next; }; struct { @@ -124,9 +129,6 @@ struct { struct list_head flows; pthread_rwlock_t flows_lock; - struct list_head cdap_reqs; - pthread_mutex_t cdap_reqs_lock; - struct addr_auth * addr_auth; enum pol_addr_auth addr_auth_type; } rib; @@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name, pthread_rwlock_unlock(&ipcpi.state_lock); } -/* We only have a create operation for now */ +/* We only have a create operation for now. */ static struct ro_sub_ops ribmgr_sub_ops = { .ro_created = ribmgr_ro_created, .ro_updated = NULL, @@ -303,9 +305,12 @@ static void ro_delete_timer(void * o) { char * name = (char *) o; - if (ribmgr_ro_delete(name)) { + pthread_mutex_lock(&rib.ro_lock); + + if (ribmgr_ro_delete(name)) LOG_ERR("Failed to delete %s.", name); - } + + pthread_mutex_unlock(&rib.ro_lock); } static struct rnode * ribmgr_ro_create(const char * name, @@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char * name, node = node->child; sibling = false; - /* Search horizontally */ + /* Search horizontally. */ while (node != NULL) { if (strcmp(node->name, token) == 0) { break; @@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char * name, LOG_DBG("Created RO with name %s.", name); - if (!(attr.expiry.tv_sec == 0 && - attr.expiry.tv_nsec == 0)) { + if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) { timeout = attr.expiry.tv_sec * 1000 + attr.expiry.tv_nsec / MILLION; - if (timerwheel_add(rib.wheel, ro_delete_timer, - new->full_name, strlen(new->full_name) + 1, - timeout)) { + if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name, + strlen(new->full_name) + 1, timeout)) LOG_ERR("Failed to add deletion timer of RO."); - } } return new; @@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name, return node; } -/* Call while holding cdap_reqs_lock */ -/* FIXME: better not to call blocking functions under any lock */ -int cdap_result_wait(struct cdap * instance, - enum cdap_opcode code, - char * name, - int invoke_id) -{ - struct cdap_request * req; - int ret; - char * name_dup = strdup(name); - if (name_dup == NULL) - return -1; - - req = cdap_request_create(code, name_dup, invoke_id, instance); - if (req == NULL) { - free(name_dup); - return -1; - } - - list_add(&req->next, &rib.cdap_reqs); - - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - ret = cdap_request_wait(req); - - pthread_mutex_lock(&rib.cdap_reqs_lock); - - if (ret == -1) /* should only be on ipcp shutdown */ - LOG_DBG("Waiting CDAP request destroyed."); - - if (ret == -ETIMEDOUT) - LOG_ERR("CDAP Request timed out."); - - if (ret) - LOG_DBG("Unknown error code: %d.", ret); - - if (!ret) - ret = req->result; - - list_del(&req->next); - cdap_request_destroy(req); - - return ret; -} - static int write_ro_msg(struct cdap * neighbor, ro_msg_t * msg, char * name, @@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap * neighbor, { uint8_t * data; size_t len; - int iid = 0; + cdap_key_t key; + int ret; len = ro_msg__get_packed_size(msg); if (len == 0) @@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap * neighbor, ro_msg__pack(msg, data); - pthread_mutex_lock(&rib.cdap_reqs_lock); - iid = cdap_send_request(neighbor, code, - name, data, len, 0); - if (iid < 0) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + key = cdap_request_send(neighbor, code, name, data, len, 0); + if (key < 0) { + LOG_ERR("Failed to send CDAP request."); free(data); return -1; } - if (cdap_result_wait(neighbor, code, name, iid)) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); - free(data); - LOG_ERR("Remote did not receive RIB object."); + free(data); + + ret = cdap_reply_wait(neighbor, key, NULL, NULL); + if (ret < 0) { + LOG_ERR("CDAP command with code %d and name %s failed: %d.", + code, name, ret); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - free(data); return 0; } @@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap * neighbor, int ribmgr_init() { INIT_LIST_HEAD(&rib.flows); - INIT_LIST_HEAD(&rib.cdap_reqs); INIT_LIST_HEAD(&rib.subs); INIT_LIST_HEAD(&rib.ro_ids); @@ -540,17 +495,9 @@ int ribmgr_init() return -1; } - if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { - LOG_ERR("Failed to initialize mutex."); - pthread_rwlock_destroy(&rib.flows_lock); - free(rib.root); - return -1; - } - if (pthread_mutex_init(&rib.ro_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); free(rib.root); return -1; } @@ -558,7 +505,6 @@ int ribmgr_init() if (pthread_mutex_init(&rib.subs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); free(rib.root); return -1; @@ -567,7 +513,6 @@ int ribmgr_init() if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); free(rib.root); @@ -578,7 +523,6 @@ int ribmgr_init() if (rib.sids == NULL) { LOG_ERR("Failed to create bitmap."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -591,7 +535,6 @@ int ribmgr_init() LOG_ERR("Failed to create timerwheel."); bmp_destroy(rib.sids); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -605,7 +548,6 @@ int ribmgr_init() timerwheel_destroy(rib.wheel); bmp_destroy(rib.sids); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -633,16 +575,6 @@ int ribmgr_fini() struct list_head * pos = NULL; struct list_head * n = NULL; - pthread_mutex_lock(&rib.cdap_reqs_lock); - list_for_each_safe(pos, n, &rib.cdap_reqs) { - struct cdap_request * req = - list_entry(pos, struct cdap_request, next); - free(req->name); - list_del(&req->next); - free(req); - } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - pthread_rwlock_wrlock(&rib.flows_lock); list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = @@ -668,7 +600,6 @@ int ribmgr_fini() timerwheel_destroy(rib.wheel); pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -676,49 +607,8 @@ int ribmgr_fini() return 0; } -static int ribmgr_cdap_reply(struct cdap * instance, - int invoke_id, - int result, - uint8_t * data, - size_t len) -{ - struct list_head * pos, * n = NULL; - - /* We never perform reads on other RIBs */ - (void) data; - (void) len; - - pthread_mutex_lock(&rib.cdap_reqs_lock); - - list_for_each_safe(pos, n, &rib.cdap_reqs) { - struct cdap_request * req = - list_entry(pos, struct cdap_request, next); - if (req->instance == instance && - req->invoke_id == invoke_id && - req->state == REQ_PENDING) { - if (result != 0) - LOG_ERR("CDAP command with code %d and name %s " - "failed with error %d", - req->code, req->name, result); - else - LOG_DBG("CDAP command with code %d and name %s " - "executed succesfully", - req->code, req->name); - - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - cdap_request_respond(req, result); - - pthread_mutex_lock(&rib.cdap_reqs_lock); - } - } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - return 0; -} - static int ribmgr_cdap_create(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name, ro_msg_t * msg) { @@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance, struct ro_attr attr; struct rnode * node; + assert(instance); + ro_attr_init(&attr); attr.expiry.tv_sec = msg->sec; attr.expiry.tv_nsec = msg->nsec; @@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance, ro_data = malloc(msg->value.len); if (ro_data == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } memcpy(ro_data, msg->value.data, msg->value.len); @@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance, node = ribmgr_ro_create(name, attr, ro_data, msg->value.len); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); free(ro_data); return -1; } @@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { LOG_ERR("Failed to send reply to create request."); return -1; } @@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance, } static int ribmgr_cdap_delete(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name) { struct list_head * p = NULL; @@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, if (ribmgr_ro_delete(name)) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } @@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { + if (cdap_reply_send(instance, key, 0, NULL, 0)) { LOG_ERR("Failed to send reply to create request."); return -1; } @@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, } static int ribmgr_cdap_write(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name, ro_msg_t * msg, uint32_t flags) @@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance, ro_data = malloc(msg->value.len); if (ro_data == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } memcpy(ro_data, msg->value.data, msg->value.len); @@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance, if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); free(ro_data); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } node->seqno = msg->seqno; @@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { LOG_ERR("Failed to send reply to write request."); return -1; } @@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance, return 0; } -static int ribmgr_enrol_sync(struct cdap * instance, - struct rnode * node) +static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node) { int ret = 0; @@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance, } static int ribmgr_cdap_start(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name) { - int iid = 0; - - pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_OPERATIONAL && - strcmp(name, ENROLLMENT) == 0) { + if (strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); - if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { + pthread_rwlock_wrlock(&ipcpi.state_lock); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("IPCP in wrong state."); + return -1; + } + + if (cdap_reply_send(instance, key, 0, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } - /* Loop through rtree and send correct objects */ - LOG_DBGF("Sending ROs that need to be sent on enrolment..."); + /* Loop through rtree and send correct objects. */ + LOG_DBG("Sending ROs that need to be sent on enrolment..."); pthread_mutex_lock(&rib.ro_lock); if (ribmgr_enrol_sync(instance, rib.root->child)) { @@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance, LOG_ERR("Failed to sync part of the RIB."); return -1; } + pthread_mutex_unlock(&rib.ro_lock); LOG_DBGF("Sending stop enrollment..."); - pthread_mutex_lock(&rib.cdap_reqs_lock); - - iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT, + key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT, NULL, 0, 0); - if (iid < 0) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + if (key < 0) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send stop of enrollment."); return -1; } - if (cdap_result_wait(instance, CDAP_STOP, - ENROLLMENT, iid)) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + if (cdap_reply_wait(instance, key, NULL, NULL)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Remote failed to complete enrollment."); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); + + pthread_rwlock_unlock(&ipcpi.state_lock); } else { - if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Failed to send reply to start request."); - return -1; - } + LOG_WARN("Request to start unknown operation."); + if (cdap_reply_send(instance, key, -1, NULL, 0)) + LOG_ERR("Failed to send negative reply."); } - pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } -static int ribmgr_cdap_stop(struct cdap * instance, - int invoke_id, - char * name) +static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name) { int ret = 0; pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_CONFIG && - strcmp(name, ENROLLMENT) == 0) { + if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); ipcp_set_state(IPCP_BOOTING); } else ret = -1; - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; @@ -1028,8 +914,7 @@ static void ro_id_delete(void * o) pthread_mutex_unlock(&rib.ro_ids_lock); } -static int ro_id_create(char * name, - ro_msg_t * msg) +static int ro_id_create(char * name, ro_msg_t * msg) { struct ro_id * tmp; @@ -1062,105 +947,113 @@ static int ro_id_create(char * name, return 0; } -static int ribmgr_cdap_request(struct cdap * instance, - int invoke_id, - enum cdap_opcode opcode, - char * name, - uint8_t * data, - size_t len, - uint32_t flags) +static void * cdap_req_handler(void * o) { + struct cdap * instance = (struct cdap *) o; + enum cdap_opcode opcode; + char * name; + uint8_t * data; + size_t len; + uint32_t flags; ro_msg_t * msg; - int ret = -1; struct list_head * p = NULL; - if (opcode == CDAP_START) - return ribmgr_cdap_start(instance, - invoke_id, - name); - else if (opcode == CDAP_STOP) - return ribmgr_cdap_stop(instance, - invoke_id, - name); - - msg = ro_msg__unpack(NULL, len, data); - if (msg == NULL) { - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - LOG_ERR("Failed to unpack RO message"); - return -1; - } + assert(instance); - pthread_mutex_lock(&rib.ro_ids_lock); - list_for_each(p, &rib.ro_ids) { - struct ro_id * e = list_entry(p, struct ro_id, next); + while (true) { + cdap_key_t key = cdap_request_wait(instance, + &opcode, + &name, + &data, + &len, + &flags); + assert(key >= 0); - if (strcmp(e->full_name, name) == 0 && - e->seqno == msg->seqno) { - pthread_mutex_unlock(&rib.ro_ids_lock); - ro_msg__free_unpacked(msg, NULL); - cdap_send_reply(instance, invoke_id, 0, NULL, 0); - LOG_DBG("Already received this RO."); - return 0; + if (opcode == CDAP_START) { + if (ribmgr_cdap_start(instance, key, name)) + LOG_WARN("CDAP start failed."); + continue; + } + else if (opcode == CDAP_STOP) { + if (ribmgr_cdap_stop(instance, key, name)) + LOG_WARN("CDAP stop failed."); + continue; } - } - pthread_mutex_unlock(&rib.ro_ids_lock); - - if (opcode == CDAP_CREATE) { - ret = ribmgr_cdap_create(instance, - invoke_id, - name, - msg); - } else if (opcode == CDAP_WRITE) { - ret = ribmgr_cdap_write(instance, - invoke_id, - name, msg, - flags); - - } else if (opcode == CDAP_DELETE) { - ret = ribmgr_cdap_delete(instance, - invoke_id, - name); - } else { - LOG_INFO("Unsupported opcode received."); - ro_msg__free_unpacked(msg, NULL); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - return -1; - } - if (ro_id_create(name, msg)) { - LOG_ERR("Failed to create RO id."); - return -1; - } + msg = ro_msg__unpack(NULL, len, data); + if (msg == NULL) { + cdap_reply_send(instance, key, -1, NULL, 0); + LOG_WARN("Failed to unpack RO message"); + continue; + } - if (msg->recv_set == ALL_MEMBERS) { - pthread_rwlock_rdlock(&rib.flows_lock); - list_for_each(p, &rib.flows) { - struct mgmt_flow * e = - list_entry(p, struct mgmt_flow, next); + pthread_mutex_lock(&rib.ro_ids_lock); + list_for_each(p, &rib.ro_ids) { + struct ro_id * e = list_entry(p, struct ro_id, next); - /* Don't send it back */ - if (e->instance == instance) + if (strcmp(e->full_name, name) == 0 && + e->seqno == msg->seqno) { + pthread_mutex_unlock(&rib.ro_ids_lock); + ro_msg__free_unpacked(msg, NULL); + cdap_reply_send(instance, key, 0, NULL, 0); + LOG_DBG("Already received this RO."); continue; + } + } + pthread_mutex_unlock(&rib.ro_ids_lock); - if (write_ro_msg(e->instance, msg, name, opcode)) { - LOG_ERR("Failed to send to a neighbor."); - pthread_rwlock_unlock(&rib.flows_lock); + if (opcode == CDAP_CREATE) { + if (ribmgr_cdap_create(instance, key, name, msg)) { + LOG_WARN("CDAP create failed."); ro_msg__free_unpacked(msg, NULL); - return -1; + continue; } + } else if (opcode == CDAP_WRITE) { + if (ribmgr_cdap_write(instance, key, name, + msg, flags)) { + LOG_WARN("CDAP write failed."); + ro_msg__free_unpacked(msg, NULL); + continue; + } + } else if (opcode == CDAP_DELETE) { + if (ribmgr_cdap_delete(instance, key, name)) { + LOG_WARN("CDAP delete failed."); + ro_msg__free_unpacked(msg, NULL); + continue; + } + } else { + LOG_INFO("Unsupported opcode received."); + ro_msg__free_unpacked(msg, NULL); + cdap_reply_send(instance, key, -1, NULL, 0); + continue; } - pthread_rwlock_unlock(&rib.flows_lock); - } - ro_msg__free_unpacked(msg, NULL); + if (ro_id_create(name, msg)) { + LOG_WARN("Failed to create RO id."); + ro_msg__free_unpacked(msg, NULL); + continue; + } - return ret; -} + if (msg->recv_set == ALL_MEMBERS) { + pthread_rwlock_rdlock(&rib.flows_lock); + list_for_each(p, &rib.flows) { + struct mgmt_flow * e = + list_entry(p, struct mgmt_flow, next); -static struct cdap_ops ribmgr_cdap_ops = { - .cdap_reply = ribmgr_cdap_reply, - .cdap_request = ribmgr_cdap_request -}; + /* Don't send it back. */ + if (e->instance == instance) + continue; + + if (write_ro_msg(e->instance, msg, + name, opcode)) + LOG_WARN("Failed to send to neighbor."); + } + pthread_rwlock_unlock(&rib.flows_lock); + } + + ro_msg__free_unpacked(msg, NULL); + } +} int ribmgr_add_flow(int fd) { @@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd) flow = malloc(sizeof(*flow)); if (flow == NULL) - return -1; + return -ENOMEM; - instance = cdap_create(&ribmgr_cdap_ops, fd); + instance = cdap_create(fd); if (instance == NULL) { LOG_ERR("Failed to create CDAP instance"); free(flow); @@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd) flow->instance = instance; flow->fd = fd; + if (pthread_create(&flow->handler, NULL, + cdap_req_handler, instance)) { + LOG_ERR("Failed to start handler thread for mgt flow."); + free(flow); + return -1; + } + pthread_rwlock_wrlock(&rib.flows_lock); + list_add(&flow->next, &rib.flows); + pthread_rwlock_unlock(&rib.flows_lock); return 0; @@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd) struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (flow->fd == fd) { + pthread_cancel(flow->handler); if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); @@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf) size_t len = 0; struct ro_attr attr; - if (conf == NULL || - conf->type != IPCP_NORMAL) { + if (conf == NULL || conf->type != IPCP_NORMAL) { LOG_ERR("Bad DIF configuration."); - return -1; + return -EINVAL; } ro_attr_init(&attr); @@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf) len = static_info_msg__get_packed_size(&stat_info); if (len == 0) { LOG_ERR("Failed to get size of static information."); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf) data = malloc(len); if (data == NULL) { LOG_ERR("Failed to allocate memory."); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf) attr, data, len) == NULL) { LOG_ERR("Failed to create static info RO."); free(data); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf) if (dir_init()) { LOG_ERR("Failed to init directory"); ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf) LOG_ERR("Failed to initialize FRCT."); dir_fini(); ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1296,12 +1193,14 @@ int ribmgr_enrol(void) { struct cdap * instance = NULL; struct mgmt_flow * flow; - int iid = 0; + cdap_key_t key; + int ret; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_INIT) { pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("IPCP in wrong state."); return -1; } @@ -1312,36 +1211,31 @@ int ribmgr_enrol(void) ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("No flows in RIB."); return -1; } - flow = list_entry((&rib.flows)->next, struct mgmt_flow, next); + flow = list_first_entry((&rib.flows), struct mgmt_flow, next); instance = flow->instance; - pthread_mutex_lock(&rib.cdap_reqs_lock); - iid = cdap_send_request(instance, - CDAP_START, - ENROLLMENT, - NULL, 0, 0); - if (iid < 0) { + key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); + if (key < 0) { ipcp_set_state(IPCP_INIT); - pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); return -1; } - if (cdap_result_wait(instance, CDAP_START, - ENROLLMENT, iid)) { + ret = cdap_reply_wait(instance, key, NULL, NULL); + if (ret) { ipcp_set_state(IPCP_INIT); - pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Failed to start enrollment."); + LOG_ERR("Failed to enroll: %d.", ret); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1351,9 +1245,10 @@ int ribmgr_enrol(void) int ribmgr_start_policies(void) { pthread_rwlock_rdlock(&ipcpi.state_lock); + if (ipcp_get_state() != IPCP_BOOTING) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Cannot start policies in wrong state"); + LOG_ERR("Cannot start policies in wrong state."); return -1; } pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void) } rib.address = rib.addr_auth->address(); - LOG_DBG("IPCP has address %lu", (unsigned long) rib.address); + LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address); return 0; } @@ -1380,22 +1275,21 @@ uint64_t ribmgr_address() return rib.address; } -static int send_neighbors_ro(char * name, - ro_msg_t * msg, - enum cdap_opcode code) +static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code) { struct list_head * p = NULL; pthread_rwlock_rdlock(&rib.flows_lock); + list_for_each(p, &rib.flows) { struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); - if (write_ro_msg(e->instance, msg, name, code)) { - LOG_ERR("Failed to send to a neighbor."); pthread_rwlock_unlock(&rib.flows_lock); + LOG_ERR("Failed to send to a neighbor."); return -1; } } + pthread_rwlock_unlock(&rib.flows_lock); return 0; @@ -1499,9 +1393,7 @@ int ro_delete(const char * name) return 0; } -int ro_write(const char * name, - uint8_t * data, - size_t len) +int ro_write(const char * name, uint8_t * data, size_t len) { struct rnode * node; ro_msg_t msg = RO_MSG__INIT; @@ -1541,8 +1433,7 @@ int ro_write(const char * name, return 0; } -ssize_t ro_read(const char * name, - uint8_t ** data) +ssize_t ro_read(const char * name, uint8_t ** data) { struct rnode * node; ssize_t len; @@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name, return len; } -ssize_t ro_children(const char * name, - char *** children) +ssize_t ro_children(const char * name, char *** children) { struct rnode * node; struct rnode * child; @@ -1640,8 +1530,7 @@ bool ro_exists(const char * name) return found; } -int ro_subscribe(const char * name, - struct ro_sub_ops * ops) +int ro_subscribe(const char * name, struct ro_sub_ops * ops) { struct ro_sub * sub; int sid; diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 9eaca9fd..22971806 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -29,6 +29,7 @@ set(SOURCE_FILES # Add source files here bitmap.c cdap.c + cdap_req.c dev.c hashtable.c irm.c diff --git a/src/lib/cdap.c b/src/lib/cdap.c index df79be54..dee8f88c 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -25,39 +25,65 @@ #include #include #include +#include + +#include "cdap_req.h" #include #include +#include +#include #include "cdap.pb-c.h" typedef Cdap cdap_t; typedef Opcode opcode_t; +typedef int32_t invoke_id_t; + +#define INVALID_INVOKE_ID -1 #define IDS_SIZE 256 #define BUF_SIZE 2048 struct cdap { - int fd; - struct bmp * ids; - pthread_mutex_t ids_lock; - pthread_t reader; - struct cdap_ops * ops; + int fd; + + struct bmp * ids; + pthread_mutex_t ids_lock; + + pthread_t reader; + + struct list_head sent; + pthread_rwlock_t sent_lock; + + struct list_head rcvd; + pthread_cond_t rcvd_cond; + pthread_mutex_t rcvd_lock; }; -struct cdap_info { - pthread_t thread; - struct cdap * instance; - cdap_t * msg; +struct cdap_rcvd { + struct list_head next; + + invoke_id_t iid; + + enum cdap_opcode opcode; + char * name; + uint8_t * data; + size_t len; + uint32_t flags; }; static int next_invoke_id(struct cdap * instance) { int ret; + assert(instance); + pthread_mutex_lock(&instance->ids_lock); + ret = bmp_allocate(instance->ids); if (!bmp_is_id_valid(instance->ids, ret)) - ret = -1; /* INVALID_INVOKE_ID */ + ret = INVALID_INVOKE_ID; + pthread_mutex_unlock(&instance->ids_lock); return ret; @@ -67,140 +93,225 @@ static int release_invoke_id(struct cdap * instance, int id) { int ret; + assert(instance); + pthread_mutex_lock(&instance->ids_lock); + ret = bmp_release(instance->ids, id); + pthread_mutex_unlock(&instance->ids_lock); return ret; } -static void * handle_cdap_msg(void * o) +#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL) + +struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, cdap_key_t key) { - struct cdap_info * info = (struct cdap_info *) o; - struct cdap * instance = info->instance; - cdap_t * msg = info->msg; - - switch (msg->opcode) { - case OPCODE__READ: - if (msg->name != NULL) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_READ, - msg->name, - NULL, 0, 0); - break; - case OPCODE__WRITE: - if (msg->name != NULL && - msg->has_value) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_WRITE, - msg->name, - msg->value.data, - msg->value.len, - msg->flags); - break; - case OPCODE__CREATE: - if (msg->name != NULL && - msg->has_value) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_CREATE, - msg->name, - msg->value.data, - msg->value.len, 0); - break; - case OPCODE__DELETE: - if (msg->name != NULL && - msg->has_value) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_DELETE, - msg->name, - msg->value.data, - msg->value.len, 0); - break; - case OPCODE__START: - if (msg->name != NULL) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_START, - msg->name, - NULL, 0, 0); - break; - case OPCODE__STOP: - if (msg->name != NULL) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_STOP, - msg->name, - NULL, 0, 0); - break; - case OPCODE__REPLY: - instance->ops->cdap_reply(instance, - msg->invoke_id, - msg->result, - msg->value.data, - msg->value.len); - release_invoke_id(instance, msg->invoke_id); - break; - default: - break; + struct list_head * p = NULL; + struct cdap_req * req = NULL; + + assert(instance); + assert(key >= 0); + + pthread_rwlock_rdlock(&instance->sent_lock); + + list_for_each(p, &instance->sent) { + req = list_entry(p, struct cdap_req, next); + if (req->key == key) { + pthread_rwlock_unlock(&instance->sent_lock); + return req; + } } - free(info); - cdap__free_unpacked(msg, NULL); + pthread_rwlock_unlock(&instance->sent_lock); - return (void *) 0; + return NULL; +} + +static int cdap_sent_add(struct cdap * instance, struct cdap_req * req) +{ + assert (instance); + assert (req); + + if (cdap_sent_has_key(instance, req->key)) + return -EPERM; + + pthread_rwlock_wrlock(&instance->sent_lock); + + list_add(&req->next, &instance->sent); + + pthread_rwlock_unlock(&instance->sent_lock); + + return 0; +} + +static void cdap_sent_del(struct cdap * instance, struct cdap_req * req) +{ + assert(instance); + assert(req); + + assert(cdap_sent_has_key(instance, req->key)); + + pthread_rwlock_wrlock(&instance->sent_lock); + + list_del(&req->next); + + pthread_rwlock_unlock(&instance->sent_lock); +} + +static void cdap_sent_destroy(struct cdap * instance) +{ + struct list_head * p = NULL; + struct list_head * h = NULL; + + assert(instance); + + pthread_rwlock_wrlock(&instance->sent_lock); + + list_for_each_safe(p, h, &instance->sent) { + struct cdap_req * req = list_entry(p, struct cdap_req, next); + list_del(&req->next); + cdap_req_destroy(req); + } + + pthread_rwlock_unlock(&instance->sent_lock); +} + +static void cdap_rcvd_destroy(struct cdap * instance) +{ + struct list_head * p = NULL; + struct list_head * h = NULL; + + assert(instance); + + pthread_mutex_lock(&instance->rcvd_lock); + + list_for_each_safe(p, h, &instance->sent) { + struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next); + list_del(&r->next); + if (r->data != NULL) + free(r->data); + if (r->name != NULL) + free(r->name); + free(r); + } + + pthread_mutex_unlock(&instance->rcvd_lock); } static void * sdu_reader(void * o) { struct cdap * instance = (struct cdap *) o; + struct cdap_req * req; + struct cdap_rcvd * rcvd; cdap_t * msg; uint8_t buf[BUF_SIZE]; ssize_t len; - struct cdap_info * cdap_info; + buffer_t data; while (true) { len = flow_read(instance->fd, buf, BUF_SIZE); if (len < 0) - return (void *) -1; + continue; msg = cdap__unpack(NULL, len, buf); if (msg == NULL) continue; - cdap_info = malloc(sizeof(*cdap_info)); - if (cdap_info == NULL) { - cdap__free_unpacked(msg, NULL); - continue; + if (msg->opcode != OPCODE__REPLY) { + rcvd = malloc(sizeof(*rcvd)); + if (rcvd == NULL) { + cdap__free_unpacked(msg, NULL); + continue; + } + + switch (msg->opcode) { + case OPCODE__START: + rcvd->opcode = CDAP_START; + break; + case OPCODE__STOP: + rcvd->opcode = CDAP_STOP; + break; + case OPCODE__READ: + rcvd->opcode = CDAP_READ; + break; + case OPCODE__WRITE: + rcvd->opcode = CDAP_WRITE; + break; + case OPCODE__CREATE: + rcvd->opcode = CDAP_CREATE; + break; + case OPCODE__DELETE: + rcvd->opcode = CDAP_DELETE; + break; + default: + cdap__free_unpacked(msg, NULL); + free(rcvd); + continue; + } + rcvd->iid = msg->invoke_id; + rcvd->flags = msg->flags; + rcvd->name = strdup(msg->name); + if (rcvd->name == NULL) { + cdap__free_unpacked(msg, NULL); + free(rcvd); + continue; + } + + if (msg->has_value) { + rcvd->len = msg->value.len; + rcvd->data = malloc(rcvd->len); + if (rcvd->data == NULL) { + cdap__free_unpacked(msg, NULL); + free(rcvd); + continue; + } + memcpy(rcvd->data, msg->value.data, rcvd->len); + } else { + rcvd->len = 0; + rcvd->data = NULL; + } + + pthread_mutex_lock(&instance->rcvd_lock); + + list_add(&rcvd->next, &instance->rcvd); + + pthread_cond_signal(&instance->rcvd_cond); + pthread_mutex_unlock(&instance->rcvd_lock); + } else { + req = cdap_sent_get_by_key(instance, msg->invoke_id); + if (req == NULL) + continue; + + if (msg->has_value) { + data.len = msg->value.len; + data.data = malloc(data.len); + if (data.data == NULL) { + cdap__free_unpacked(msg, NULL); + continue; + } + memcpy(data.data, msg->value.data, data.len); + } else { + data.len = 0; + data.data = NULL; + } + + cdap_req_respond(req, msg->result, data); } - cdap_info->instance = instance; - cdap_info->msg = msg; - - pthread_create(&cdap_info->thread, - NULL, - handle_cdap_msg, - (void *) cdap_info); - - pthread_detach(cdap_info->thread); - + cdap__free_unpacked(msg, NULL); } return (void *) 0; } -struct cdap * cdap_create(struct cdap_ops * ops, - int fd) +struct cdap * cdap_create(int fd) { struct cdap * instance = NULL; int flags; - if (ops == NULL || fd < 0 || - ops->cdap_reply == NULL || - ops->cdap_request == NULL) + if (fd < 0) return NULL; flags = flow_get_flags(fd); @@ -216,19 +327,43 @@ struct cdap * cdap_create(struct cdap_ops * ops, return NULL; } - instance->ops = ops; - instance->fd = fd; + if (pthread_mutex_init(&instance->rcvd_lock, NULL)) { + pthread_mutex_destroy(&instance->ids_lock); + free(instance); + return NULL; + } + + if (pthread_rwlock_init(&instance->sent_lock, NULL)) { + pthread_mutex_destroy(&instance->rcvd_lock); + pthread_mutex_destroy(&instance->ids_lock); + free(instance); + return NULL; + } + + if (pthread_cond_init(&instance->rcvd_cond, NULL)) { + pthread_rwlock_destroy(&instance->sent_lock); + pthread_mutex_destroy(&instance->rcvd_lock); + pthread_mutex_destroy(&instance->ids_lock); + free(instance); + return NULL; + } instance->ids = bmp_create(IDS_SIZE, 0); if (instance->ids == NULL) { + pthread_cond_destroy(&instance->rcvd_cond); + pthread_rwlock_destroy(&instance->sent_lock); + pthread_mutex_destroy(&instance->rcvd_lock); + pthread_mutex_destroy(&instance->ids_lock); free(instance); return NULL; } - pthread_create(&instance->reader, - NULL, - sdu_reader, - (void *) instance); + INIT_LIST_HEAD(&instance->sent); + INIT_LIST_HEAD(&instance->rcvd); + + instance->fd = fd; + + pthread_create(&instance->reader, NULL, sdu_reader, instance); return instance; } @@ -247,27 +382,37 @@ int cdap_destroy(struct cdap * instance) pthread_mutex_unlock(&instance->ids_lock); - flow_dealloc(instance->fd); + pthread_mutex_destroy(&instance->ids_lock); + + cdap_sent_destroy(instance); + + pthread_rwlock_destroy(&instance->sent_lock); + + cdap_rcvd_destroy(instance); + + pthread_mutex_destroy(&instance->rcvd_lock); free(instance); return 0; } -static int write_msg(struct cdap * instance, - cdap_t * msg) +static int write_msg(struct cdap * instance, cdap_t * msg) { int ret; uint8_t * data; size_t len; + assert(instance); + assert(msg); + len = cdap__get_packed_size(msg); if (len == 0) return -1; - data = malloc(BUF_SIZE); + data = malloc(len); if (data == NULL) - return -1; + return -ENOMEM; cdap__pack(msg, data); @@ -278,22 +423,41 @@ static int write_msg(struct cdap * instance, return ret; } -int cdap_send_request(struct cdap * instance, - enum cdap_opcode code, - char * name, - uint8_t * data, - size_t len, - uint32_t flags) +static cdap_key_t invoke_id_to_key(invoke_id_t iid) +{ + if (iid == INVALID_INVOKE_ID) + return INVALID_CDAP_KEY; + + return (cdap_key_t) iid; +} + +static invoke_id_t key_to_invoke_id(cdap_key_t key) +{ + if (key == INVALID_CDAP_KEY) + return INVALID_INVOKE_ID; + + return (invoke_id_t) key; +} + +cdap_key_t cdap_request_send(struct cdap * instance, + enum cdap_opcode code, + char * name, + uint8_t * data, + size_t len, + uint32_t flags) { - int id; cdap_t msg = CDAP__INIT; + struct cdap_req * req; + invoke_id_t iid; + cdap_key_t key; if (instance == NULL || name == NULL) - return -1; + return -EINVAL; - id = next_invoke_id(instance); - if (!bmp_is_id_valid(instance->ids, id)) - return -1; + + iid = next_invoke_id(instance); + if (iid == INVALID_INVOKE_ID) + return INVALID_CDAP_KEY; switch (code) { case CDAP_READ: @@ -315,39 +479,132 @@ int cdap_send_request(struct cdap * instance, msg.opcode = OPCODE__STOP; break; default: - release_invoke_id(instance, id); - return -1; + release_invoke_id(instance, iid); + return -EINVAL; } msg.name = name; msg.has_flags = true; msg.flags = flags; - msg.invoke_id = id; + msg.invoke_id = iid; if (data != NULL) { msg.has_value = true; msg.value.data = data; msg.value.len = len; } - if (write_msg(instance, &msg)) - return -1; + key = invoke_id_to_key(iid); + + req = cdap_req_create(key); + if (req == NULL) + return INVALID_CDAP_KEY; + + if (cdap_sent_add(instance, req)) { + cdap_req_destroy(req); + return INVALID_CDAP_KEY; + } + + if (write_msg(instance, &msg)) { + cdap_sent_del(instance, req); + cdap_req_destroy(req); + return INVALID_CDAP_KEY; + } + + return key; +} + +int cdap_reply_wait(struct cdap * instance, + cdap_key_t key, + uint8_t ** data, + size_t * len) +{ + int ret; + struct cdap_req * r; + invoke_id_t iid = key_to_invoke_id(key); + + if (instance == NULL || iid == INVALID_INVOKE_ID) + return -EINVAL; + + r = cdap_sent_get_by_key(instance, key); + if (r == NULL) + return -EINVAL; + + ret = cdap_req_wait(r); + if (ret < 0) + return ret; + + if (r->response) + return r->response; + + assert(ret == 0); + + if (data != NULL) { + *data = r->data.data; + *len = r->data.len; + } + + cdap_sent_del(instance, r); - return id; + release_invoke_id(instance, iid); + + return 0; } -int cdap_send_reply(struct cdap * instance, - int invoke_id, +cdap_key_t cdap_request_wait(struct cdap * instance, + enum cdap_opcode * opcode, + char ** name, + uint8_t ** data, + size_t * len, + uint32_t * flags) +{ + struct cdap_rcvd * rcvd; + invoke_id_t iid; + + if (instance == NULL || opcode == NULL || name == NULL || data == NULL + || len == NULL || flags == NULL) + return -EINVAL; + + pthread_mutex_lock(&instance->rcvd_lock); + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) &instance->rcvd_lock); + + while (list_empty(&instance->rcvd)) + pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock); + + rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); + + list_del(&rcvd->next); + + pthread_cleanup_pop(true); + + *opcode = rcvd->opcode; + *name = rcvd->name; + *data = rcvd->data; + *len = rcvd->len; + *flags = rcvd->flags; + + iid = rcvd->iid; + + free(rcvd); + + return invoke_id_to_key(iid); +} + +int cdap_reply_send(struct cdap * instance, + cdap_key_t key, int result, uint8_t * data, size_t len) { cdap_t msg = CDAP__INIT; + invoke_id_t iid = key_to_invoke_id(key); if (instance == NULL) - return -1; + return -EINVAL; msg.opcode = OPCODE__REPLY; - msg.invoke_id = invoke_id; + msg.invoke_id = iid; msg.has_result = true; msg.result = result; diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c new file mode 100644 index 00000000..02fa0846 --- /dev/null +++ b/src/lib/cdap_req.c @@ -0,0 +1,151 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * CDAP - CDAP request management + * + * Sander Vrijders + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include + +#include "cdap_req.h" + +#include +#include + +struct cdap_req * cdap_req_create(cdap_key_t key) +{ + struct cdap_req * creq = malloc(sizeof(*creq)); + pthread_condattr_t cattr; + + if (creq == NULL) + return NULL; + + creq->key = key; + creq->state = REQ_INIT; + + creq->response = -1; + creq->data.data = NULL; + creq->data.len = 0; + + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + pthread_cond_init(&creq->cond, &cattr); + pthread_mutex_init(&creq->lock, NULL); + + INIT_LIST_HEAD(&creq->next); + + clock_gettime(PTHREAD_COND_CLOCK, &creq->birth); + + return creq; +} + +void cdap_req_destroy(struct cdap_req * creq) +{ + assert(creq); + + pthread_mutex_lock(&creq->lock); + + if (creq->state == REQ_DESTROY) { + pthread_mutex_unlock(&creq->lock); + return; + } + + if (creq->state == REQ_INIT) + creq->state = REQ_DONE; + + if (creq->state == REQ_PENDING) { + creq->state = REQ_DESTROY; + pthread_cond_broadcast(&creq->cond); + } + + while (creq->state != REQ_DONE) + pthread_cond_wait(&creq->cond, &creq->lock); + + pthread_mutex_unlock(&creq->lock); + + pthread_cond_destroy(&creq->cond); + pthread_mutex_destroy(&creq->lock); + + free(creq); +} + +int cdap_req_wait(struct cdap_req * creq) +{ + struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000), + (CDAP_REPLY_TIMEOUT % 1000) * MILLION}; + struct timespec abstime; + int ret = -1; + + assert(creq); + + ts_add(&creq->birth, &timeout, &abstime); + + pthread_mutex_lock(&creq->lock); + + if (creq->state != REQ_INIT) { + pthread_mutex_unlock(&creq->lock); + return -EINVAL; + } + + creq->state = REQ_PENDING; + + while (creq->state == REQ_PENDING) { + if ((ret = -pthread_cond_timedwait(&creq->cond, + &creq->lock, + &abstime)) == -ETIMEDOUT) + break; + } + + if (creq->state == REQ_DESTROY) + ret = -1; + + creq->state = REQ_DONE; + pthread_cond_broadcast(&creq->cond); + + pthread_mutex_unlock(&creq->lock); + + return ret; +} + +void cdap_req_respond(struct cdap_req * creq, int response, buffer_t data) +{ + assert(creq); + + pthread_mutex_lock(&creq->lock); + + if (creq->state != REQ_PENDING) { + pthread_mutex_unlock(&creq->lock); + return; + } + + creq->state = REQ_RESPONSE; + creq->response = response; + creq->data = data; + + pthread_cond_broadcast(&creq->cond); + + while (creq->state == REQ_RESPONSE) + pthread_cond_wait(&creq->cond, &creq->lock); + + pthread_mutex_unlock(&creq->lock); +} diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h new file mode 100644 index 00000000..714744ab --- /dev/null +++ b/src/lib/cdap_req.h @@ -0,0 +1,69 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * CDAP - CDAP request management + * + * Sander Vrijders + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_CDAP_REQ_H +#define OUROBOROS_CDAP_REQ_H + +#include +#include +#include +#include + +#include + +enum creq_state { + REQ_INIT = 0, + REQ_PENDING, + REQ_RESPONSE, + REQ_DONE, + REQ_DESTROY +}; + +struct cdap_req { + struct list_head next; + + struct timespec birth; + + cdap_key_t key; + + int response; + buffer_t data; + + enum creq_state state; + pthread_cond_t cond; + pthread_mutex_t lock; +}; + +struct cdap_req * cdap_req_create(cdap_key_t key); + +void cdap_req_destroy(struct cdap_req * creq); + +int cdap_req_wait(struct cdap_req * creq); + +void cdap_req_respond(struct cdap_req * creq, + int response, + buffer_t data); + +enum creq_state cdap_req_get_state(struct cdap_req * creq); + +#endif /* OUROBOROS_CDAP_REQ_H */ -- cgit v1.2.3 From 8910bd28e2b6269f0900c8215352ab5d177a3b54 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 23 Dec 2016 18:22:20 +0100 Subject: ipcpd, lib, irmd: Update to use qoscube_t --- src/ipcpd/local/main.c | 8 ++++---- src/ipcpd/shim-eth-llc/main.c | 8 ++++---- src/ipcpd/shim-udp/main.c | 8 ++++---- src/irmd/ipcp.c | 12 ++++++------ src/irmd/ipcp.h | 15 +++++++++------ src/lib/dev.c | 8 +++----- 6 files changed, 30 insertions(+), 29 deletions(-) diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 30d2d2bd..122beafb 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -217,10 +217,10 @@ static int ipcp_local_name_query(char * name) return ret; } -static int ipcp_local_flow_alloc(int fd, - char * dst_name, - char * src_ae_name, - enum qos_cube qos) +static int ipcp_local_flow_alloc(int fd, + char * dst_name, + char * src_ae_name, + qoscube_t qos) { int out_fd = -1; diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index c7e17ff6..fd25dcd9 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -932,10 +932,10 @@ static int eth_llc_ipcp_name_query(char * name) return ret; } -static int eth_llc_ipcp_flow_alloc(int fd, - char * dst_name, - char * src_ae_name, - enum qos_cube qos) +static int eth_llc_ipcp_flow_alloc(int fd, + char * dst_name, + char * src_ae_name, + qoscube_t qos) { uint8_t ssap = 0; uint8_t r_addr[MAC_SIZE]; diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index fd321780..ea408914 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -945,10 +945,10 @@ static int ipcp_udp_name_query(char * name) return 0; } -static int ipcp_udp_flow_alloc(int fd, - char * dst_name, - char * src_ae_name, - enum qos_cube qos) +static int ipcp_udp_flow_alloc(int fd, + char * dst_name, + char * src_ae_name, + qoscube_t qos) { struct sockaddr_in r_saddr; /* server address */ struct sockaddr_in f_saddr; /* flow */ diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 29327df4..aa5e4f8a 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -326,12 +326,12 @@ int ipcp_name_query(pid_t api, return ret; } -int ipcp_flow_alloc(pid_t api, - int port_id, - pid_t n_api, - char * dst_name, - char * src_ae_name, - enum qos_cube qos) +int ipcp_flow_alloc(pid_t api, + int port_id, + pid_t n_api, + char * dst_name, + char * src_ae_name, + qoscube_t qos) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 67b14ece..eb278e5b 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -42,17 +42,20 @@ int ipcp_bootstrap(pid_t api, int ipcp_name_reg(pid_t api, char * name); + int ipcp_name_unreg(pid_t api, char * name); + int ipcp_name_query(pid_t api, char * name); -int ipcp_flow_alloc(pid_t api, - int port_id, - pid_t n_api, - char * dst_name, - char * src_ae_name, - enum qos_cube qos); +int ipcp_flow_alloc(pid_t api, + int port_id, + pid_t n_api, + char * dst_name, + char * src_ae_name, + qoscube_t qos); + int ipcp_flow_alloc_resp(pid_t api, int port_id, pid_t n_api, diff --git a/src/lib/dev.c b/src/lib/dev.c index bad56129..20976375 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -134,7 +134,7 @@ struct flow { struct shm_flow_set * set; int port_id; int oflags; - enum qos_cube qos; + qoscube_t qos; pid_t api; @@ -654,9 +654,8 @@ int flow_dealloc(int fd) pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg_b(&msg); - if (recv_msg == NULL) { + if (recv_msg == NULL) return -1; - } if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); @@ -1435,11 +1434,10 @@ int ipcp_flow_fini(int fd) pthread_rwlock_unlock(&ai.data_lock); shm_rbuff_fini(rb); - return 0; } -int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube) +int ipcp_flow_get_qoscube(int fd, qoscube_t * cube) { if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL) return -EINVAL; -- cgit v1.2.3