From f3fbf2c6093b293f995c4d784509577695e052b1 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 30 Dec 2016 09:15:39 +0100 Subject: ipcpd: Refactor of normal IPCP Reorganizes the normal IPCP a bit to make sure internal components do not need to access the state of the IPCP. The IPCP has now a thread calling accept and delegating it to the correct component based on the AE name (this used to be in the fmgr). Internal components are initialized upon enrollment or bootstrap of the IPCP. If a step fails, the IPCP goes back to the INIT state, if all components boot correctly, it goes to the operational state. RIB synchronization is still done by sending a CDAP start/stop and syncing with a ribmgr state, but needs revision later on. --- src/ipcpd/ipcp.c | 7 +- src/ipcpd/ipcp.h | 10 +- src/ipcpd/normal/ae.h | 29 +++++ src/ipcpd/normal/dir.c | 49 +------- src/ipcpd/normal/fmgr.c | 102 ++--------------- src/ipcpd/normal/fmgr.h | 12 +- src/ipcpd/normal/frct.c | 123 +++++++++++++------- src/ipcpd/normal/main.c | 181 +++++++++++++++++++++++------- src/ipcpd/normal/ribmgr.c | 277 +++++++++++++++++++++++++++++----------------- src/ipcpd/normal/ribmgr.h | 6 +- src/ipcpd/normal/ro.h | 6 +- src/ipcpd/shim-udp/main.c | 3 +- 12 files changed, 463 insertions(+), 342 deletions(-) create mode 100644 src/ipcpd/normal/ae.h diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 8c0bd0bf..e4488eb7 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * IPC process main loop * @@ -65,6 +65,7 @@ static void * ipcp_main_loop(void * o) int fd = -1; pthread_rwlock_rdlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_SHUTDOWN || ipcp_get_state() == IPCP_NULL) { pthread_rwlock_unlock(&ipcpi.state_lock); @@ -389,7 +390,9 @@ int ipcp_wait_state(enum ipcp_state state, pthread_mutex_lock(&ipcpi.state_mtx); - while (ipcpi.state != state && ipcpi.state != IPCP_SHUTDOWN) { + while (ipcpi.state != state + && ipcpi.state != IPCP_SHUTDOWN + && ipcpi.state != IPCP_NULL) { if (timeout == NULL) ret = -pthread_cond_wait(&ipcpi.state_cond, &ipcpi.state_mtx); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index f090f415..6910115e 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * IPC process structure * @@ -24,17 +24,15 @@ #include -#include -#include - #include "ipcp-ops.h" #include "ipcp-data.h" +#include +#include + enum ipcp_state { IPCP_NULL = 0, IPCP_INIT, - IPCP_CONFIG, - IPCP_BOOTING, IPCP_OPERATIONAL, IPCP_SHUTDOWN }; diff --git a/src/ipcpd/normal/ae.h b/src/ipcpd/normal/ae.h new file mode 100644 index 00000000..229ff4aa --- /dev/null +++ b/src/ipcpd/normal/ae.h @@ -0,0 +1,29 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Application Entities for the normal IPC process + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_AE_H +#define OUROBOROS_IPCPD_NORMAL_AE_H + +#define MGMT_AE "Management" +#define DT_AE "Data transfer" + +#endif /* OUROBOROS_IPCPD_NORMAL_AE_H */ diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 47fb1f6e..c5bb03dd 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * DIF directory * @@ -26,7 +26,6 @@ #include #include "dir.h" -#include "ipcp.h" #include "ro.h" #include "pathname.h" #include "ribmgr.h" @@ -95,41 +94,27 @@ int dir_name_reg(char * name) char * path; uint64_t * addr; - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP is not in RUNNING state."); - return -1; - } - ro_attr_init(&attr); attr.enrol_sync = true; attr.recv_set = ALL_MEMBERS; path = create_path(name); - if (path == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); + if (path == NULL) return -ENOMEM; - } addr = malloc(sizeof(*addr)); if (addr == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); pathname_destroy(path); return -ENOMEM; } *addr = ribmgr_address(); if (ro_create(path, &attr, (uint8_t *) addr, sizeof(*addr))) { - pthread_rwlock_unlock(&ipcpi.state_lock); pathname_destroy(path); LOG_ERR("Failed to create RIB object."); return -1; } - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("Registered %s.", name); pathname_destroy(path); @@ -140,29 +125,16 @@ int dir_name_unreg(char * name) { char * path; - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP is not in RUNNING state."); - return -1; - } - path = create_path(name); - if (path == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); + if (path == NULL) return -ENOMEM; - } if (ro_delete(path)) { - pthread_rwlock_unlock(&ipcpi.state_lock); pathname_destroy(path); LOG_ERR("No such RIB object exists."); return -1; } - pthread_rwlock_unlock(&ipcpi.state_lock); - pathname_destroy(path); return 0; @@ -176,18 +148,9 @@ int dir_name_query(char * name) uint64_t addr; struct dt_const * dtc; - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - return -1; - } - path = create_path(name); - if (path == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - return -1; - } + if (path == NULL) + return -ENOMEM; if (ro_exists(path)) { if (ro_read(path, &ro_data) < 0) { @@ -206,8 +169,6 @@ int dir_name_query(char * name) ret = (addr == ribmgr_address()) ? -1 : 0; } - pthread_rwlock_unlock(&ipcpi.state_lock); - pathname_destroy(path); return ret; diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index d8190572..fc1342a9 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * Flow manager of the IPC Process * @@ -60,7 +60,6 @@ struct { cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - pthread_t nm1_flow_acceptor; pthread_t nm1_sdu_reader; pthread_t np1_sdu_reader; @@ -68,65 +67,6 @@ struct { int fd; } fmgr; -static void * fmgr_nm1_acceptor(void * o) -{ - int fd; - char * ae_name; - qoscube_t cube; - qosspec_t qs; - - (void) o; - - while (true) { - if (ipcp_get_state() == IPCP_SHUTDOWN) - return 0; - - fd = flow_accept(&ae_name, &qs); - if (fd < 0) { - LOG_WARN("Flow accept failed."); - continue; - } - - if (!(strcmp(ae_name, MGMT_AE) == 0 || - strcmp(ae_name, DT_AE) == 0)) { - if (flow_alloc_resp(fd, -1)) - LOG_WARN("Failed to reply to flow allocation."); - flow_dealloc(fd); - continue; - } - - if (flow_alloc_resp(fd, 0)) { - LOG_WARN("Failed to reply to flow allocation."); - flow_dealloc(fd); - continue; - } - - LOG_DBG("Accepted new flow allocation request for AE %s.", - ae_name); - - if (strcmp(ae_name, MGMT_AE) == 0) { - if (ribmgr_add_flow(fd)) { - LOG_WARN("Failed to hand fd to RIB."); - flow_dealloc(fd); - continue; - } - } else { - ipcp_flow_get_qoscube(fd, &cube); - if (flow_set_add(fmgr.nm1_set[cube], fd)) { - LOG_WARN("Failed to add fd."); - flow_dealloc(fd); - continue; - } - /* FIXME: Temporary, until we have a PFF */ - fmgr.fd = fd; - } - - free(ae_name); - } - - return (void *) 0; -} - static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; @@ -313,7 +253,6 @@ int fmgr_init() } } - pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL); @@ -325,11 +264,9 @@ int fmgr_fini() int i; int j; - pthread_cancel(fmgr.nm1_flow_acceptor); pthread_cancel(fmgr.np1_sdu_reader); pthread_cancel(fmgr.nm1_sdu_reader); - pthread_join(fmgr.nm1_flow_acceptor, NULL); pthread_join(fmgr.np1_sdu_reader, NULL); pthread_join(fmgr.nm1_sdu_reader, NULL); @@ -360,16 +297,6 @@ int fmgr_np1_alloc(int fd, uint8_t * ro_data; uint64_t addr; - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP is not enrolled yet."); - return -1; /* -ENOTINIT */ - } - - pthread_rwlock_unlock(&ipcpi.state_lock); - path = pathname_create(RO_DIR); if (path == NULL) return -1; @@ -604,30 +531,21 @@ int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb) return 0; } -int fmgr_nm1_mgmt_flow(char * dst_name) +/* FIXME: do this in a topologymanager instance */ +int fmgr_nm1_add_flow(int fd) { - int fd; - int result; + qoscube_t qos; - /* FIXME: Request retransmission. */ - fd = flow_alloc(dst_name, MGMT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s.", dst_name); + if (flow_alloc_resp(fd, 0) < 0) { + LOG_ERR("Could not respond to new flow."); return -1; } - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d.", - dst_name, result); - return -1; - } + ipcp_flow_get_qoscube(fd, &qos); + flow_set_add(fmgr.nm1_set[qos], fd); - if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand file descriptor to RIB manager."); - flow_dealloc(fd); - return -1; - } + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; return 0; } diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index e17f3f55..85731081 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * Flow manager of the IPC Process * @@ -24,15 +24,9 @@ #include -#include -#include -#include - +#include "ae.h" #include "frct.h" -#define MGMT_AE "Management" -#define DT_AE "Data transfer" - int fmgr_init(void); int fmgr_fini(void); @@ -53,7 +47,7 @@ int fmgr_np1_post_buf(cep_id_t id, int fmgr_np1_post_sdu(cep_id_t id, struct shm_du_buff * sdb); -int fmgr_nm1_mgmt_flow(char * dst_name); +int fmgr_nm1_add_flow(int fd); int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos); diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 33bd044b..6cd68f18 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * The Flow and Retransmission control component * @@ -40,21 +40,21 @@ enum conn_state { }; struct frct_i { - uint32_t cep_id; - uint64_t r_address; - uint32_t r_cep_id; + uint32_t cep_id; + uint64_t r_address; + uint32_t r_cep_id; qoscube_t cube; - uint64_t seqno; + uint64_t seqno; enum conn_state state; }; struct { - pthread_mutex_t instances_lock; struct frct_i ** instances; + pthread_mutex_t instances_lock; - struct bmp * cep_ids; - pthread_mutex_t cep_ids_lock; + struct bmp * cep_ids; + pthread_mutex_t cep_ids_lock; } frct; static cep_id_t next_cep_id(void) @@ -62,9 +62,11 @@ static cep_id_t next_cep_id(void) cep_id_t ret; pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_allocate(frct.cep_ids); if (!bmp_is_id_valid(frct.cep_ids, ret)) ret = INVALID_CEP_ID; + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; @@ -75,51 +77,48 @@ static int release_cep_id(cep_id_t id) int ret; pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_release(frct.cep_ids, id); + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; } -int frct_init() +static int init_cep_ids(void) { - int i; - if (pthread_mutex_init(&frct.cep_ids_lock, NULL)) return -1; - if (pthread_mutex_init(&frct.instances_lock, NULL)) - return -1; - - frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); - if (frct.instances == NULL) - return -1; - - for (i = 0; i < IRMD_MAX_FLOWS; i++) - frct.instances[i] = NULL; - frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1)); if (frct.cep_ids == NULL) { - free(frct.instances); + pthread_mutex_destroy(&frct.cep_ids_lock); return -1; } return 0; } -int frct_fini() +static int init_instances(void) { - pthread_mutex_lock(&frct.cep_ids_lock); - bmp_destroy(frct.cep_ids); - pthread_mutex_unlock(&frct.cep_ids_lock); + int i; - free(frct.instances); + if (pthread_mutex_init(&frct.instances_lock, NULL)) + return -1; + + frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); + if (frct.instances == NULL) { + pthread_mutex_destroy(&frct.instances_lock); + return -1; + } + + for (i = 0; i < IRMD_MAX_FLOWS; i++) + frct.instances[i] = NULL; return 0; } -static struct frct_i * create_frct_i(uint64_t address, - cep_id_t r_cep_id) +static struct frct_i * create_frct_i(uint64_t address, cep_id_t r_cep_id) { struct frct_i * instance; cep_id_t id; @@ -145,6 +144,60 @@ static struct frct_i * create_frct_i(uint64_t address, return instance; } +static void destroy_frct_i(struct frct_i * instance) +{ + free(instance); +} + +static void fini_cep_ids(void) +{ + pthread_mutex_lock(&frct.cep_ids_lock); + + bmp_destroy(frct.cep_ids); + + pthread_mutex_unlock(&frct.cep_ids_lock); + + pthread_mutex_destroy(&frct.cep_ids_lock); +} + +static void fini_instances(void) +{ + int i; + + pthread_mutex_lock(&frct.instances_lock); + + for (i = 0; i < IRMD_MAX_FLOWS; i++) + if (frct.instances[i] != NULL) + destroy_frct_i(frct.instances[i]); + + pthread_mutex_unlock(&frct.instances_lock); + + pthread_mutex_destroy(&frct.instances_lock); + + free(frct.instances); +} + +int frct_init() +{ + if (init_cep_ids()) + return -1; + + if (init_instances()) { + fini_cep_ids(); + return -1; + } + + return 0; +} + +int frct_fini() +{ + fini_cep_ids(); + fini_instances(); + + return 0; +} + int frct_nm1_post_sdu(struct pci * pci, struct shm_du_buff * sdb) { @@ -209,14 +262,6 @@ int frct_nm1_post_sdu(struct pci * pci, return 0; } -/* Call under instances lock */ -static void destroy_frct_i(struct frct_i * instance) -{ - release_cep_id(instance->cep_id); - frct.instances[instance->cep_id] = NULL; - free(instance); -} - cep_id_t frct_i_create(uint64_t address, buffer_t * buf, qoscube_t cube) @@ -328,7 +373,11 @@ int frct_i_destroy(cep_id_t id, pci.seqno = 0; pci.qos_id = instance->cube; + frct.instances[id] = NULL; destroy_frct_i(instance); + + release_cep_id(instance->cep_id); + pthread_mutex_unlock(&frct.instances_lock); if (buf != NULL && buf->data != NULL) diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 34ba52da..f6a88f29 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * Normal IPC Process * @@ -59,7 +59,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(IPCP_SHUTDOWN); + if (ipcp_get_state() == IPCP_INIT) + ipcp_set_state(IPCP_NULL); + + if (ipcp_get_state() == IPCP_OPERATIONAL) + ipcp_set_state(IPCP_SHUTDOWN); pthread_rwlock_unlock(&ipcpi.state_lock); } @@ -68,12 +72,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) } } -static int normal_ipcp_enroll(char * dif_name) +static int normal_ipcp_enroll(char * dst_name) { - struct timespec timeout = {(ENROLL_TIMEOUT / 1000), - (ENROLL_TIMEOUT % 1000) * MILLION}; + int ret; - pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_INIT) { pthread_rwlock_unlock(&ipcpi.state_lock); @@ -81,37 +84,62 @@ static int normal_ipcp_enroll(char * dif_name) return -1; /* -ENOTINIT */ } - pthread_rwlock_unlock(&ipcpi.state_lock); - - if (fmgr_nm1_mgmt_flow(dif_name)) { - LOG_ERR("Failed to establish management flow."); + if (ribmgr_init()) { + LOG_ERR("Failed to initialise RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - if (ribmgr_enrol()) { - LOG_ERR("Failed to enrol IPCP."); + if (ribmgr_nm1_mgt_flow(dst_name)) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + LOG_ERR("Failed to establish management flow."); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - if (ipcp_wait_state(IPCP_BOOTING, &timeout) == -ETIMEDOUT) { - LOG_ERR("Enrollment timed out."); + ret = ribmgr_enrol(); + if (ret < 0) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + if (ret == -ETIMEDOUT) + LOG_ERR("Enrollment timed out."); + else + LOG_ERR("Failed to enrol IPCP: %d.", ret); return -1; } if (ribmgr_start_policies()) { - pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start policies."); return -1; } - pthread_rwlock_wrlock(&ipcpi.state_lock); + if (fmgr_init()) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to start flow manager."); + return -1; + } + + if (frct_init()) { + if (fmgr_fini()) + LOG_WARN("Failed to finalize flow manager."); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to initialize FRCT."); + return -1; + } + ipcp_set_state(IPCP_OPERATIONAL); + pthread_rwlock_unlock(&ipcpi.state_lock); /* FIXME: Remove once we obtain neighbors during enrollment */ - if (fmgr_nm1_dt_flow(dif_name, QOS_CUBE_BE)) { + if (fmgr_nm1_dt_flow(dst_name, QOS_CUBE_BE)) { LOG_ERR("Failed to establish data transfer flow."); return -1; } @@ -121,6 +149,11 @@ static int normal_ipcp_enroll(char * dif_name) static int normal_ipcp_bootstrap(struct dif_config * conf) { + if (conf == NULL || conf->type != THIS_TYPE) { + LOG_ERR("Bad DIF configuration."); + return -EINVAL; + } + pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_INIT) { @@ -129,26 +162,48 @@ static int normal_ipcp_bootstrap(struct dif_config * conf) return -1; /* -ENOTINIT */ } + if (ribmgr_init()) { + LOG_ERR("Failed to initialise RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + return -1; + } + if (ribmgr_bootstrap(conf)) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to bootstrap RIB manager."); return -1; } - ipcp_set_state(IPCP_BOOTING); - pthread_rwlock_unlock(&ipcpi.state_lock); - if (ribmgr_start_policies()) { - pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(IPCP_INIT); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start policies."); return -1; } - pthread_rwlock_wrlock(&ipcpi.state_lock); + if (fmgr_init()) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to start flow manager."); + return -1; + } + + if (frct_init()) { + if (fmgr_fini()) + LOG_WARN("Failed to finalize flow manager."); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to initialize FRCT."); + return -1; + } ipcp_set_state(IPCP_OPERATIONAL); + ipcpi.data->dif_name = conf->dif_name; pthread_rwlock_unlock(&ipcpi.state_lock); @@ -169,10 +224,56 @@ static struct ipcp_ops normal_ops = { .ipcp_flow_dealloc = fmgr_np1_dealloc }; +static void * flow_acceptor(void * o) +{ + int fd; + char * ae_name; + qosspec_t qs; + + (void) o; + + while (true) { + pthread_rwlock_rdlock(&ipcpi.state_lock); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_INFO("Shutting down flow acceptor."); + return 0; + } + + pthread_rwlock_unlock(&ipcpi.state_lock); + + fd = flow_accept(&ae_name, &qs); + if (fd < 0) { + LOG_WARN("Flow accept failed."); + continue; + } + + LOG_DBG("New flow allocation request for AE %s.", ae_name); + + if (strcmp(ae_name, MGMT_AE) == 0) { + ribmgr_add_nm1_flow(fd); + } else if (strcmp(ae_name, DT_AE) == 0) { + fmgr_nm1_add_flow(fd); + } else { + LOG_DBG("Flow allocation request for unknown AE %s.", + ae_name); + if (flow_alloc_resp(fd, -1)) + LOG_WARN("Failed to reply to flow allocation."); + flow_dealloc(fd); + } + + free(ae_name); + } + + return (void *) 0; +} + int main(int argc, char * argv[]) { struct sigaction sig_act; sigset_t sigset; + pthread_t acceptor; if (ap_init(argv[0])) { LOG_ERR("Failed to init AP"); @@ -215,16 +316,8 @@ int main(int argc, char * argv[]) pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); - if (ribmgr_init()) { - fmgr_fini(); - ipcp_fini(); - close_logfile(); - exit(EXIT_FAILURE); - } - if (ipcp_create_r(getpid())) { LOG_ERR("Failed to notify IRMd we are initialized."); - fmgr_fini(); ipcp_fini(); close_logfile(); exit(EXIT_FAILURE); @@ -232,22 +325,24 @@ int main(int argc, char * argv[]) ipcp_wait_state(IPCP_OPERATIONAL, NULL); - if (fmgr_init()) { - ipcp_fini(); - close_logfile(); - exit(EXIT_FAILURE); + if (pthread_create(&acceptor, NULL, flow_acceptor, NULL)) { + LOG_ERR("Failed to create acceptor thread."); + ipcp_set_state(IPCP_SHUTDOWN); } ipcp_fini(); - if (fmgr_fini()) - LOG_ERR("Failed to finalize flow manager."); + if (ipcp_get_state() == IPCP_SHUTDOWN) { + pthread_cancel(acceptor); + pthread_join(acceptor, NULL); - if (ribmgr_fini()) - LOG_ERR("Failed to finalize RIB manager."); - - if (frct_fini()) - LOG_ERR("Failed to finalize FRCT."); + if (frct_fini()) + LOG_WARN("Failed to finalize FRCT."); + if (fmgr_fini()) + LOG_WARN("Failed to finalize flow manager."); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + } close_logfile(); diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 3b4a5784..ab492e7f 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * RIB manager of the IPC Process * @@ -29,21 +29,21 @@ #include #include #include - -#include -#include -#include -#include +#include #include "timerwheel.h" #include "addr_auth.h" #include "ribmgr.h" #include "dt_const.h" -#include "frct.h" -#include "ipcp.h" #include "ro.h" #include "pathname.h" #include "dir.h" +#include "ae.h" + +#include +#include +#include +#include #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; @@ -73,7 +73,7 @@ struct rnode { * as an index in a B-tree */ - /* If there are no children, this is a leaf */ + /* If there are no children, this is a leaf. */ struct rnode * child; struct rnode * sibling; @@ -107,6 +107,14 @@ struct ro_id { char * full_name; }; +enum ribmgr_state { + RIBMGR_NULL, + RIBMGR_INIT, + RIBMGR_OPERATIONAL, + RIBMGR_SHUTDOWN +}; + +/* FIXME: Extract rib from ribmgr. */ struct { struct rnode * root; pthread_mutex_t ro_lock; @@ -130,6 +138,10 @@ struct { struct addr_auth * addr_auth; enum pol_addr_auth addr_auth_type; + + enum ribmgr_state state; + pthread_cond_t state_cond; + pthread_mutex_t state_lock; } rib; void ribmgr_ro_created(const char * name, @@ -138,15 +150,11 @@ void ribmgr_ro_created(const char * name, { static_info_msg_t * stat_msg; - pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_CONFIG && - strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { + if (strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { LOG_DBG("Received static DIF information."); stat_msg = static_info_msg__unpack(NULL, len, data); if (stat_msg == NULL) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to unpack static info message."); return; } @@ -161,17 +169,8 @@ void ribmgr_ro_created(const char * name, rib.dtc.max_pdu_size = stat_msg->max_pdu_size; rib.addr_auth_type = stat_msg->addr_auth_type; - if (frct_init()) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); - static_info_msg__free_unpacked(stat_msg, NULL); - LOG_ERR("Failed to init FRCT"); - return; - } - static_info_msg__free_unpacked(stat_msg, NULL); } - pthread_rwlock_unlock(&ipcpi.state_lock); } /* We only have a create operation for now. */ @@ -185,7 +184,6 @@ static struct rnode * find_rnode_by_name(const char * name) { char * str; char * str1; - char * saveptr; char * token; struct rnode * node; @@ -195,8 +193,8 @@ static struct rnode * find_rnode_by_name(const char * name) node = rib.root; - for (str1 = str; ; str1 = NULL) { - token = strtok_r(str1, PATH_DELIMITER, &saveptr); + for (str1 = str; node != NULL; str1 = NULL) { + token = strtok(str1, PATH_DELIMITER); if (token == NULL) break; @@ -207,18 +205,13 @@ static struct rnode * find_rnode_by_name(const char * name) break; else node = node->sibling; - - if (node == NULL) { - free(str); - return NULL; - } } free(str); return node; } -/* Call under RIB object lock */ +/* Call under RIB object lock. */ static int ro_msg_create(struct rnode * node, ro_msg_t * msg) { @@ -334,7 +327,7 @@ static struct rnode * ribmgr_ro_create(const char * name, node = rib.root; - for (str1 = str; ; str1 = NULL) { + for (str1 = str; node != NULL; str1 = NULL) { token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token == NULL) { LOG_ERR("RO already exists."); @@ -356,9 +349,6 @@ static struct rnode * ribmgr_ro_create(const char * name, sibling = true; } } - - if (node == NULL) - break; } token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr); @@ -554,9 +544,88 @@ int ribmgr_init() return -1; } + if (pthread_cond_init(&rib.state_cond, NULL)) { + LOG_ERR("Failed to init condvar."); + timerwheel_destroy(rib.wheel); + bmp_destroy(rib.sids); + pthread_rwlock_destroy(&rib.flows_lock); + pthread_mutex_destroy(&rib.ro_lock); + pthread_mutex_destroy(&rib.subs_lock); + pthread_mutex_destroy(&rib.ro_ids_lock); + free(rib.root); + return -1; + } + + if (pthread_mutex_init(&rib.state_lock, NULL)) { + LOG_ERR("Failed to init mutex."); + pthread_cond_destroy(&rib.state_cond); + timerwheel_destroy(rib.wheel); + bmp_destroy(rib.sids); + pthread_rwlock_destroy(&rib.flows_lock); + pthread_mutex_destroy(&rib.ro_lock); + pthread_mutex_destroy(&rib.subs_lock); + pthread_mutex_destroy(&rib.ro_ids_lock); + free(rib.root); + return -1; + } + + rib.state = RIBMGR_INIT; + return 0; } +static enum ribmgr_state ribmgr_get_state(void) +{ + enum ribmgr_state state; + + pthread_mutex_lock(&rib.state_lock); + + state = rib.state; + + pthread_mutex_unlock(&rib.state_lock); + + return state; +} + +static void ribmgr_set_state(enum ribmgr_state state) +{ + pthread_mutex_lock(&rib.state_lock); + + rib.state = state; + + pthread_cond_broadcast(&rib.state_cond); + + pthread_mutex_unlock(&rib.state_lock); +} + +static int ribmgr_wait_state(enum ribmgr_state state, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + + pthread_mutex_lock(&rib.state_lock); + + while (rib.state != state + && rib.state != RIBMGR_SHUTDOWN + && rib.state != RIBMGR_NULL) { + if (timeout == NULL) + ret = -pthread_cond_wait(&rib.state_cond, + &rib.state_lock); + else + ret = -pthread_cond_timedwait(&rib.state_cond, + &rib.state_lock, + &abstime); + } + + pthread_mutex_unlock(&rib.state_lock); + + return ret; +} + static void rtree_destroy(struct rnode * node) { if (node != NULL) { @@ -574,7 +643,13 @@ int ribmgr_fini() struct list_head * pos = NULL; struct list_head * n = NULL; + pthread_mutex_lock(&rib.state_lock); + rib.state = RIBMGR_SHUTDOWN; + pthread_cond_broadcast(&rib.state_cond); + pthread_mutex_unlock(&rib.state_lock); + pthread_rwlock_wrlock(&rib.flows_lock); + list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); @@ -583,6 +658,7 @@ int ribmgr_fini() list_del(&flow->next); free(flow); } + pthread_rwlock_unlock(&rib.flows_lock); ro_unsubscribe(rib.ribmgr_sid); @@ -591,8 +667,10 @@ int ribmgr_fini() addr_auth_destroy(rib.addr_auth); pthread_mutex_lock(&rib.ro_lock); + rtree_destroy(rib.root->child); free(rib.root); + pthread_mutex_unlock(&rib.ro_lock); bmp_destroy(rib.sids); @@ -603,6 +681,9 @@ int ribmgr_fini() pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.ro_ids_lock); + pthread_cond_destroy(&rib.state_cond); + pthread_mutex_destroy(&rib.state_lock); + return 0; } @@ -827,16 +908,7 @@ static int ribmgr_cdap_start(struct cdap * instance, if (strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); - pthread_rwlock_wrlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP in wrong state."); - return -1; - } - if (cdap_reply_send(instance, key, 0, NULL, 0)) { - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } @@ -847,7 +919,6 @@ static int ribmgr_cdap_start(struct cdap * instance, pthread_mutex_lock(&rib.ro_lock); if (ribmgr_enrol_sync(instance, rib.root->child)) { pthread_mutex_unlock(&rib.ro_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to sync part of the RIB."); return -1; } @@ -859,18 +930,14 @@ static int ribmgr_cdap_start(struct cdap * instance, key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT, NULL, 0, 0); if (key < 0) { - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send stop of enrollment."); return -1; } if (cdap_reply_wait(instance, key, NULL, NULL)) { - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Remote failed to complete enrollment."); return -1; } - - pthread_rwlock_unlock(&ipcpi.state_lock); } else { LOG_WARN("Request to start unknown operation."); if (cdap_reply_send(instance, key, -1, NULL, 0)) @@ -884,20 +951,18 @@ static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name) { int ret = 0; - pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) { + if (strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); - - ipcp_set_state(IPCP_BOOTING); - } else + /* FIXME: don't use states to match start to stop. */ + ribmgr_set_state(RIBMGR_OPERATIONAL); + } else { ret = -1; + } if (cdap_reply_send(instance, key, ret, NULL, 0)) { - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; } - pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -1068,7 +1133,7 @@ static void * cdap_req_handler(void * o) return (void *) 0; } -int ribmgr_add_flow(int fd) +static int ribmgr_add_flow(int fd) { struct cdap * instance = NULL; struct mgmt_flow * flow; @@ -1127,6 +1192,46 @@ int ribmgr_remove_flow(int fd) return -1; } +/* FIXME: do this in a topologymanager instance */ +int ribmgr_add_nm1_flow(int fd) +{ + if (flow_alloc_resp(fd, 0) < 0) { + LOG_ERR("Could not respond to new flow."); + return -1; + } + + return ribmgr_add_flow(fd); +} + +int ribmgr_nm1_mgt_flow(char * dst_name) +{ + int fd; + int result; + + /* FIXME: Request retransmission. */ + fd = flow_alloc(dst_name, MGMT_AE, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s.", dst_name); + return -1; + } + + result = flow_alloc_res(fd); + if (result < 0) { + LOG_ERR("Result of flow allocation to %s is %d.", + dst_name, result); + flow_dealloc(fd); + return -1; + } + + if (ribmgr_add_flow(fd)) { + LOG_ERR("Failed to add file descriptor."); + flow_dealloc(fd); + return -1; + } + + return fd; +} + int ribmgr_bootstrap(struct dif_config * conf) { static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; @@ -1134,11 +1239,6 @@ int ribmgr_bootstrap(struct dif_config * conf) size_t len = 0; struct ro_attr attr; - if (conf == NULL || conf->type != IPCP_NORMAL) { - LOG_ERR("Bad DIF configuration."); - return -EINVAL; - } - ro_attr_init(&attr); attr.enrol_sync = true; @@ -1189,83 +1289,54 @@ int ribmgr_bootstrap(struct dif_config * conf) return -1; } - if (frct_init()) { - LOG_ERR("Failed to initialize FRCT."); - dir_fini(); - ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - ribmgr_ro_delete(RIBMGR_PREFIX); - return -1; - } - LOG_DBG("Bootstrapped RIB Manager."); return 0; } -int ribmgr_enrol(void) +int ribmgr_enrol() { struct cdap * instance = NULL; struct mgmt_flow * flow; cdap_key_t key; int ret; - - pthread_rwlock_wrlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_INIT) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP in wrong state."); - return -1; - } - - ipcp_set_state(IPCP_CONFIG); + struct timespec timeout = {(ENROLL_TIMEOUT / 1000), + (ENROLL_TIMEOUT % 1000) * MILLION}; pthread_rwlock_wrlock(&rib.flows_lock); - if (list_empty(&rib.flows)) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&rib.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("No flows in RIB."); - return -1; - } + + assert(!list_empty(&rib.flows)); flow = list_first_entry((&rib.flows), struct mgmt_flow, next); instance = flow->instance; key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); if (key < 0) { - ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&rib.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); return -1; } ret = cdap_reply_wait(instance, key, NULL, NULL); if (ret) { - ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&rib.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to enroll: %d.", ret); return -1; } pthread_rwlock_unlock(&rib.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + + if (ribmgr_wait_state(RIBMGR_OPERATIONAL, &timeout) == -ETIMEDOUT) + LOG_ERR("Enrollment of RIB timed out."); + + if (ribmgr_get_state() != RIBMGR_OPERATIONAL) + return -1; return 0; } int ribmgr_start_policies(void) { - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_BOOTING) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Cannot start policies in wrong state."); - return -1; - } - pthread_rwlock_unlock(&ipcpi.state_lock); - rib.addr_auth = addr_auth_create(rib.addr_auth_type); if (rib.addr_auth == NULL) { LOG_ERR("Failed to create address authority."); diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index 954b7a3c..ddc98e6e 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * RIB manager of the IPC Process * @@ -31,9 +31,9 @@ int ribmgr_init(void); int ribmgr_fini(void); -int ribmgr_add_flow(int fd); +int ribmgr_add_nm1_flow(int fd); -int ribmgr_remove_flow(int fd); +int ribmgr_nm1_mgt_flow(char * dst_name); int ribmgr_bootstrap(struct dif_config * conf); diff --git a/src/ipcpd/normal/ro.h b/src/ipcpd/normal/ro.h index 43e432c3..6fda2adf 100644 --- a/src/ipcpd/normal/ro.h +++ b/src/ipcpd/normal/ro.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * RIB objects * @@ -22,6 +22,10 @@ #ifndef OUROBOROS_IPCPD_NORMAL_RO_H #define OUROBOROS_IPCPD_NORMAL_RO_H +#include +#include +#include + enum ro_recv_set { NO_SYNC = 0, NEIGHBORS, diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index cdd02c49..32d9a46e 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * Shim IPC process over UDP * @@ -1213,7 +1213,6 @@ int main(int argc, char * argv[]) ipcp_fini(); - if (ipcp_get_state() == IPCP_SHUTDOWN) { pthread_cancel(udp_data.handler); pthread_cancel(udp_data.sdu_reader); -- cgit v1.2.3