diff options
-rw-r--r-- | src/ipcpd/ipcp.c | 7 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 10 | ||||
-rw-r--r-- | src/ipcpd/normal/ae.h | 29 | ||||
-rw-r--r-- | src/ipcpd/normal/dir.c | 49 | ||||
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 102 | ||||
-rw-r--r-- | src/ipcpd/normal/fmgr.h | 12 | ||||
-rw-r--r-- | src/ipcpd/normal/frct.c | 123 | ||||
-rw-r--r-- | src/ipcpd/normal/main.c | 181 | ||||
-rw-r--r-- | src/ipcpd/normal/ribmgr.c | 277 | ||||
-rw-r--r-- | src/ipcpd/normal/ribmgr.h | 6 | ||||
-rw-r--r-- | src/ipcpd/normal/ro.h | 6 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 3 |
12 files changed, 463 insertions, 342 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a2dc9e8f..c47f1181 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * IPC process main loop * @@ -65,6 +65,7 @@ static void * ipcp_main_loop(void * o) int fd = -1; pthread_rwlock_rdlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_SHUTDOWN || ipcp_get_state() == IPCP_NULL) { pthread_rwlock_unlock(&ipcpi.state_lock); @@ -389,7 +390,9 @@ int ipcp_wait_state(enum ipcp_state state, pthread_mutex_lock(&ipcpi.state_mtx); - while (ipcpi.state != state && ipcpi.state != IPCP_SHUTDOWN) { + while (ipcpi.state != state + && ipcpi.state != IPCP_SHUTDOWN + && ipcpi.state != IPCP_NULL) { if (timeout == NULL) ret = -pthread_cond_wait(&ipcpi.state_cond, &ipcpi.state_mtx); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index f090f415..6910115e 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * IPC process structure * @@ -24,17 +24,15 @@ #include <ouroboros/config.h> -#include <pthread.h> -#include <time.h> - #include "ipcp-ops.h" #include "ipcp-data.h" +#include <pthread.h> +#include <time.h> + enum ipcp_state { IPCP_NULL = 0, IPCP_INIT, - IPCP_CONFIG, - IPCP_BOOTING, IPCP_OPERATIONAL, IPCP_SHUTDOWN }; diff --git a/src/ipcpd/normal/ae.h b/src/ipcpd/normal/ae.h new file mode 100644 index 00000000..229ff4aa --- /dev/null +++ b/src/ipcpd/normal/ae.h @@ -0,0 +1,29 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Application Entities for the normal IPC process + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * Sander Vrijders <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_AE_H +#define OUROBOROS_IPCPD_NORMAL_AE_H + +#define MGMT_AE "Management" +#define DT_AE "Data transfer" + +#endif /* OUROBOROS_IPCPD_NORMAL_AE_H */ diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 47fb1f6e..c5bb03dd 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * DIF directory * @@ -26,7 +26,6 @@ #include <ouroboros/errno.h> #include "dir.h" -#include "ipcp.h" #include "ro.h" #include "pathname.h" #include "ribmgr.h" @@ -95,41 +94,27 @@ int dir_name_reg(char * name) char * path; uint64_t * addr; - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP is not in RUNNING state."); - return -1; - } - ro_attr_init(&attr); attr.enrol_sync = true; attr.recv_set = ALL_MEMBERS; path = create_path(name); - if (path == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); + if (path == NULL) return -ENOMEM; - } addr = malloc(sizeof(*addr)); if (addr == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); pathname_destroy(path); return -ENOMEM; } *addr = ribmgr_address(); if (ro_create(path, &attr, (uint8_t *) addr, sizeof(*addr))) { - pthread_rwlock_unlock(&ipcpi.state_lock); pathname_destroy(path); LOG_ERR("Failed to create RIB object."); return -1; } - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("Registered %s.", name); pathname_destroy(path); @@ -140,29 +125,16 @@ int dir_name_unreg(char * name) { char * path; - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP is not in RUNNING state."); - return -1; - } - path = create_path(name); - if (path == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); + if (path == NULL) return -ENOMEM; - } if (ro_delete(path)) { - pthread_rwlock_unlock(&ipcpi.state_lock); pathname_destroy(path); LOG_ERR("No such RIB object exists."); return -1; } - pthread_rwlock_unlock(&ipcpi.state_lock); - pathname_destroy(path); return 0; @@ -176,18 +148,9 @@ int dir_name_query(char * name) uint64_t addr; struct dt_const * dtc; - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - return -1; - } - path = create_path(name); - if (path == NULL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - return -1; - } + if (path == NULL) + return -ENOMEM; if (ro_exists(path)) { if (ro_read(path, &ro_data) < 0) { @@ -206,8 +169,6 @@ int dir_name_query(char * name) ret = (addr == ribmgr_address()) ? -1 : 0; } - pthread_rwlock_unlock(&ipcpi.state_lock); - pathname_destroy(path); return ret; diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 4b24d5a1..64d9a5a9 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * Flow manager of the IPC Process * @@ -60,7 +60,6 @@ struct { cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - pthread_t nm1_flow_acceptor; pthread_t nm1_sdu_reader; pthread_t np1_sdu_reader; @@ -68,65 +67,6 @@ struct { int fd; } fmgr; -static void * fmgr_nm1_acceptor(void * o) -{ - int fd; - char * ae_name; - qoscube_t cube; - qosspec_t qs; - - (void) o; - - while (true) { - if (ipcp_get_state() == IPCP_SHUTDOWN) - return 0; - - fd = flow_accept(&ae_name, &qs); - if (fd < 0) { - LOG_WARN("Flow accept failed."); - continue; - } - - if (!(strcmp(ae_name, MGMT_AE) == 0 || - strcmp(ae_name, DT_AE) == 0)) { - if (flow_alloc_resp(fd, -1)) - LOG_WARN("Failed to reply to flow allocation."); - flow_dealloc(fd); - continue; - } - - if (flow_alloc_resp(fd, 0)) { - LOG_WARN("Failed to reply to flow allocation."); - flow_dealloc(fd); - continue; - } - - LOG_DBG("Accepted new flow allocation request for AE %s.", - ae_name); - - if (strcmp(ae_name, MGMT_AE) == 0) { - if (ribmgr_add_flow(fd)) { - LOG_WARN("Failed to hand fd to RIB."); - flow_dealloc(fd); - continue; - } - } else { - ipcp_flow_get_qoscube(fd, &cube); - if (flow_set_add(fmgr.nm1_set[cube], fd)) { - LOG_WARN("Failed to add fd."); - flow_dealloc(fd); - continue; - } - /* FIXME: Temporary, until we have a PFF */ - fmgr.fd = fd; - } - - free(ae_name); - } - - return (void *) 0; -} - static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; @@ -313,7 +253,6 @@ int fmgr_init() } } - pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL); @@ -325,11 +264,9 @@ int fmgr_fini() int i; int j; - pthread_cancel(fmgr.nm1_flow_acceptor); pthread_cancel(fmgr.np1_sdu_reader); pthread_cancel(fmgr.nm1_sdu_reader); - pthread_join(fmgr.nm1_flow_acceptor, NULL); pthread_join(fmgr.np1_sdu_reader, NULL); pthread_join(fmgr.nm1_sdu_reader, NULL); @@ -360,16 +297,6 @@ int fmgr_np1_alloc(int fd, uint8_t * ro_data; uint64_t addr; - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP is not enrolled yet."); - return -1; /* -ENOTINIT */ - } - - pthread_rwlock_unlock(&ipcpi.state_lock); - path = pathname_create(RO_DIR); if (path == NULL) return -1; @@ -605,30 +532,21 @@ int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb) return 0; } -int fmgr_nm1_mgmt_flow(char * dst_name) +/* FIXME: do this in a topologymanager instance */ +int fmgr_nm1_add_flow(int fd) { - int fd; - int result; + qoscube_t qos; - /* FIXME: Request retransmission. */ - fd = flow_alloc(dst_name, MGMT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s.", dst_name); + if (flow_alloc_resp(fd, 0) < 0) { + LOG_ERR("Could not respond to new flow."); return -1; } - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d.", - dst_name, result); - return -1; - } + ipcp_flow_get_qoscube(fd, &qos); + flow_set_add(fmgr.nm1_set[qos], fd); - if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand file descriptor to RIB manager."); - flow_dealloc(fd); - return -1; - } + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; return 0; } diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index e17f3f55..85731081 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * Flow manager of the IPC Process * @@ -24,15 +24,9 @@ #include <ouroboros/shared.h> -#include <unistd.h> -#include <stdint.h> -#include <sys/types.h> - +#include "ae.h" #include "frct.h" -#define MGMT_AE "Management" -#define DT_AE "Data transfer" - int fmgr_init(void); int fmgr_fini(void); @@ -53,7 +47,7 @@ int fmgr_np1_post_buf(cep_id_t id, int fmgr_np1_post_sdu(cep_id_t id, struct shm_du_buff * sdb); -int fmgr_nm1_mgmt_flow(char * dst_name); +int fmgr_nm1_add_flow(int fd); int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos); diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 33bd044b..6cd68f18 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * The Flow and Retransmission control component * @@ -40,21 +40,21 @@ enum conn_state { }; struct frct_i { - uint32_t cep_id; - uint64_t r_address; - uint32_t r_cep_id; + uint32_t cep_id; + uint64_t r_address; + uint32_t r_cep_id; qoscube_t cube; - uint64_t seqno; + uint64_t seqno; enum conn_state state; }; struct { - pthread_mutex_t instances_lock; struct frct_i ** instances; + pthread_mutex_t instances_lock; - struct bmp * cep_ids; - pthread_mutex_t cep_ids_lock; + struct bmp * cep_ids; + pthread_mutex_t cep_ids_lock; } frct; static cep_id_t next_cep_id(void) @@ -62,9 +62,11 @@ static cep_id_t next_cep_id(void) cep_id_t ret; pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_allocate(frct.cep_ids); if (!bmp_is_id_valid(frct.cep_ids, ret)) ret = INVALID_CEP_ID; + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; @@ -75,51 +77,48 @@ static int release_cep_id(cep_id_t id) int ret; pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_release(frct.cep_ids, id); + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; } -int frct_init() +static int init_cep_ids(void) { - int i; - if (pthread_mutex_init(&frct.cep_ids_lock, NULL)) return -1; - if (pthread_mutex_init(&frct.instances_lock, NULL)) - return -1; - - frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); - if (frct.instances == NULL) - return -1; - - for (i = 0; i < IRMD_MAX_FLOWS; i++) - frct.instances[i] = NULL; - frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1)); if (frct.cep_ids == NULL) { - free(frct.instances); + pthread_mutex_destroy(&frct.cep_ids_lock); return -1; } return 0; } -int frct_fini() +static int init_instances(void) { - pthread_mutex_lock(&frct.cep_ids_lock); - bmp_destroy(frct.cep_ids); - pthread_mutex_unlock(&frct.cep_ids_lock); + int i; - free(frct.instances); + if (pthread_mutex_init(&frct.instances_lock, NULL)) + return -1; + + frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); + if (frct.instances == NULL) { + pthread_mutex_destroy(&frct.instances_lock); + return -1; + } + + for (i = 0; i < IRMD_MAX_FLOWS; i++) + frct.instances[i] = NULL; return 0; } -static struct frct_i * create_frct_i(uint64_t address, - cep_id_t r_cep_id) +static struct frct_i * create_frct_i(uint64_t address, cep_id_t r_cep_id) { struct frct_i * instance; cep_id_t id; @@ -145,6 +144,60 @@ static struct frct_i * create_frct_i(uint64_t address, return instance; } +static void destroy_frct_i(struct frct_i * instance) +{ + free(instance); +} + +static void fini_cep_ids(void) +{ + pthread_mutex_lock(&frct.cep_ids_lock); + + bmp_destroy(frct.cep_ids); + + pthread_mutex_unlock(&frct.cep_ids_lock); + + pthread_mutex_destroy(&frct.cep_ids_lock); +} + +static void fini_instances(void) +{ + int i; + + pthread_mutex_lock(&frct.instances_lock); + + for (i = 0; i < IRMD_MAX_FLOWS; i++) + if (frct.instances[i] != NULL) + destroy_frct_i(frct.instances[i]); + + pthread_mutex_unlock(&frct.instances_lock); + + pthread_mutex_destroy(&frct.instances_lock); + + free(frct.instances); +} + +int frct_init() +{ + if (init_cep_ids()) + return -1; + + if (init_instances()) { + fini_cep_ids(); + return -1; + } + + return 0; +} + +int frct_fini() +{ + fini_cep_ids(); + fini_instances(); + + return 0; +} + int frct_nm1_post_sdu(struct pci * pci, struct shm_du_buff * sdb) { @@ -209,14 +262,6 @@ int frct_nm1_post_sdu(struct pci * pci, return 0; } -/* Call under instances lock */ -static void destroy_frct_i(struct frct_i * instance) -{ - release_cep_id(instance->cep_id); - frct.instances[instance->cep_id] = NULL; - free(instance); -} - cep_id_t frct_i_create(uint64_t address, buffer_t * buf, qoscube_t cube) @@ -328,7 +373,11 @@ int frct_i_destroy(cep_id_t id, pci.seqno = 0; pci.qos_id = instance->cube; + frct.instances[id] = NULL; destroy_frct_i(instance); + + release_cep_id(instance->cep_id); + pthread_mutex_unlock(&frct.instances_lock); if (buf != NULL && buf->data != NULL) diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 34ba52da..f6a88f29 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * Normal IPC Process * @@ -59,7 +59,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(IPCP_SHUTDOWN); + if (ipcp_get_state() == IPCP_INIT) + ipcp_set_state(IPCP_NULL); + + if (ipcp_get_state() == IPCP_OPERATIONAL) + ipcp_set_state(IPCP_SHUTDOWN); pthread_rwlock_unlock(&ipcpi.state_lock); } @@ -68,12 +72,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) } } -static int normal_ipcp_enroll(char * dif_name) +static int normal_ipcp_enroll(char * dst_name) { - struct timespec timeout = {(ENROLL_TIMEOUT / 1000), - (ENROLL_TIMEOUT % 1000) * MILLION}; + int ret; - pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_INIT) { pthread_rwlock_unlock(&ipcpi.state_lock); @@ -81,37 +84,62 @@ static int normal_ipcp_enroll(char * dif_name) return -1; /* -ENOTINIT */ } - pthread_rwlock_unlock(&ipcpi.state_lock); - - if (fmgr_nm1_mgmt_flow(dif_name)) { - LOG_ERR("Failed to establish management flow."); + if (ribmgr_init()) { + LOG_ERR("Failed to initialise RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - if (ribmgr_enrol()) { - LOG_ERR("Failed to enrol IPCP."); + if (ribmgr_nm1_mgt_flow(dst_name)) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + LOG_ERR("Failed to establish management flow."); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - if (ipcp_wait_state(IPCP_BOOTING, &timeout) == -ETIMEDOUT) { - LOG_ERR("Enrollment timed out."); + ret = ribmgr_enrol(); + if (ret < 0) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + if (ret == -ETIMEDOUT) + LOG_ERR("Enrollment timed out."); + else + LOG_ERR("Failed to enrol IPCP: %d.", ret); return -1; } if (ribmgr_start_policies()) { - pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start policies."); return -1; } - pthread_rwlock_wrlock(&ipcpi.state_lock); + if (fmgr_init()) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to start flow manager."); + return -1; + } + + if (frct_init()) { + if (fmgr_fini()) + LOG_WARN("Failed to finalize flow manager."); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to initialize FRCT."); + return -1; + } + ipcp_set_state(IPCP_OPERATIONAL); + pthread_rwlock_unlock(&ipcpi.state_lock); /* FIXME: Remove once we obtain neighbors during enrollment */ - if (fmgr_nm1_dt_flow(dif_name, QOS_CUBE_BE)) { + if (fmgr_nm1_dt_flow(dst_name, QOS_CUBE_BE)) { LOG_ERR("Failed to establish data transfer flow."); return -1; } @@ -121,6 +149,11 @@ static int normal_ipcp_enroll(char * dif_name) static int normal_ipcp_bootstrap(struct dif_config * conf) { + if (conf == NULL || conf->type != THIS_TYPE) { + LOG_ERR("Bad DIF configuration."); + return -EINVAL; + } + pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_INIT) { @@ -129,26 +162,48 @@ static int normal_ipcp_bootstrap(struct dif_config * conf) return -1; /* -ENOTINIT */ } + if (ribmgr_init()) { + LOG_ERR("Failed to initialise RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + return -1; + } + if (ribmgr_bootstrap(conf)) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to bootstrap RIB manager."); return -1; } - ipcp_set_state(IPCP_BOOTING); - pthread_rwlock_unlock(&ipcpi.state_lock); - if (ribmgr_start_policies()) { - pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(IPCP_INIT); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start policies."); return -1; } - pthread_rwlock_wrlock(&ipcpi.state_lock); + if (fmgr_init()) { + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to start flow manager."); + return -1; + } + + if (frct_init()) { + if (fmgr_fini()) + LOG_WARN("Failed to finalize flow manager."); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to initialize FRCT."); + return -1; + } ipcp_set_state(IPCP_OPERATIONAL); + ipcpi.data->dif_name = conf->dif_name; pthread_rwlock_unlock(&ipcpi.state_lock); @@ -169,10 +224,56 @@ static struct ipcp_ops normal_ops = { .ipcp_flow_dealloc = fmgr_np1_dealloc }; +static void * flow_acceptor(void * o) +{ + int fd; + char * ae_name; + qosspec_t qs; + + (void) o; + + while (true) { + pthread_rwlock_rdlock(&ipcpi.state_lock); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_INFO("Shutting down flow acceptor."); + return 0; + } + + pthread_rwlock_unlock(&ipcpi.state_lock); + + fd = flow_accept(&ae_name, &qs); + if (fd < 0) { + LOG_WARN("Flow accept failed."); + continue; + } + + LOG_DBG("New flow allocation request for AE %s.", ae_name); + + if (strcmp(ae_name, MGMT_AE) == 0) { + ribmgr_add_nm1_flow(fd); + } else if (strcmp(ae_name, DT_AE) == 0) { + fmgr_nm1_add_flow(fd); + } else { + LOG_DBG("Flow allocation request for unknown AE %s.", + ae_name); + if (flow_alloc_resp(fd, -1)) + LOG_WARN("Failed to reply to flow allocation."); + flow_dealloc(fd); + } + + free(ae_name); + } + + return (void *) 0; +} + int main(int argc, char * argv[]) { struct sigaction sig_act; sigset_t sigset; + pthread_t acceptor; if (ap_init(argv[0])) { LOG_ERR("Failed to init AP"); @@ -215,16 +316,8 @@ int main(int argc, char * argv[]) pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); - if (ribmgr_init()) { - fmgr_fini(); - ipcp_fini(); - close_logfile(); - exit(EXIT_FAILURE); - } - if (ipcp_create_r(getpid())) { LOG_ERR("Failed to notify IRMd we are initialized."); - fmgr_fini(); ipcp_fini(); close_logfile(); exit(EXIT_FAILURE); @@ -232,22 +325,24 @@ int main(int argc, char * argv[]) ipcp_wait_state(IPCP_OPERATIONAL, NULL); - if (fmgr_init()) { - ipcp_fini(); - close_logfile(); - exit(EXIT_FAILURE); + if (pthread_create(&acceptor, NULL, flow_acceptor, NULL)) { + LOG_ERR("Failed to create acceptor thread."); + ipcp_set_state(IPCP_SHUTDOWN); } ipcp_fini(); - if (fmgr_fini()) - LOG_ERR("Failed to finalize flow manager."); + if (ipcp_get_state() == IPCP_SHUTDOWN) { + pthread_cancel(acceptor); + pthread_join(acceptor, NULL); - if (ribmgr_fini()) - LOG_ERR("Failed to finalize RIB manager."); - - if (frct_fini()) - LOG_ERR("Failed to finalize FRCT."); + if (frct_fini()) + LOG_WARN("Failed to finalize FRCT."); + if (fmgr_fini()) + LOG_WARN("Failed to finalize flow manager."); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + } close_logfile(); diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 3b4a5784..ab492e7f 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * RIB manager of the IPC Process * @@ -29,21 +29,21 @@ #include <ouroboros/ipcp-dev.h> #include <ouroboros/bitmap.h> #include <ouroboros/errno.h> - -#include <stdlib.h> -#include <pthread.h> -#include <string.h> -#include <errno.h> +#include <ouroboros/dev.h> #include "timerwheel.h" #include "addr_auth.h" #include "ribmgr.h" #include "dt_const.h" -#include "frct.h" -#include "ipcp.h" #include "ro.h" #include "pathname.h" #include "dir.h" +#include "ae.h" + +#include <stdlib.h> +#include <pthread.h> +#include <string.h> +#include <errno.h> #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; @@ -73,7 +73,7 @@ struct rnode { * as an index in a B-tree */ - /* If there are no children, this is a leaf */ + /* If there are no children, this is a leaf. */ struct rnode * child; struct rnode * sibling; @@ -107,6 +107,14 @@ struct ro_id { char * full_name; }; +enum ribmgr_state { + RIBMGR_NULL, + RIBMGR_INIT, + RIBMGR_OPERATIONAL, + RIBMGR_SHUTDOWN +}; + +/* FIXME: Extract rib from ribmgr. */ struct { struct rnode * root; pthread_mutex_t ro_lock; @@ -130,6 +138,10 @@ struct { struct addr_auth * addr_auth; enum pol_addr_auth addr_auth_type; + + enum ribmgr_state state; + pthread_cond_t state_cond; + pthread_mutex_t state_lock; } rib; void ribmgr_ro_created(const char * name, @@ -138,15 +150,11 @@ void ribmgr_ro_created(const char * name, { static_info_msg_t * stat_msg; - pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_CONFIG && - strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { + if (strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { LOG_DBG("Received static DIF information."); stat_msg = static_info_msg__unpack(NULL, len, data); if (stat_msg == NULL) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to unpack static info message."); return; } @@ -161,17 +169,8 @@ void ribmgr_ro_created(const char * name, rib.dtc.max_pdu_size = stat_msg->max_pdu_size; rib.addr_auth_type = stat_msg->addr_auth_type; - if (frct_init()) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); - static_info_msg__free_unpacked(stat_msg, NULL); - LOG_ERR("Failed to init FRCT"); - return; - } - static_info_msg__free_unpacked(stat_msg, NULL); } - pthread_rwlock_unlock(&ipcpi.state_lock); } /* We only have a create operation for now. */ @@ -185,7 +184,6 @@ static struct rnode * find_rnode_by_name(const char * name) { char * str; char * str1; - char * saveptr; char * token; struct rnode * node; @@ -195,8 +193,8 @@ static struct rnode * find_rnode_by_name(const char * name) node = rib.root; - for (str1 = str; ; str1 = NULL) { - token = strtok_r(str1, PATH_DELIMITER, &saveptr); + for (str1 = str; node != NULL; str1 = NULL) { + token = strtok(str1, PATH_DELIMITER); if (token == NULL) break; @@ -207,18 +205,13 @@ static struct rnode * find_rnode_by_name(const char * name) break; else node = node->sibling; - - if (node == NULL) { - free(str); - return NULL; - } } free(str); return node; } -/* Call under RIB object lock */ +/* Call under RIB object lock. */ static int ro_msg_create(struct rnode * node, ro_msg_t * msg) { @@ -334,7 +327,7 @@ static struct rnode * ribmgr_ro_create(const char * name, node = rib.root; - for (str1 = str; ; str1 = NULL) { + for (str1 = str; node != NULL; str1 = NULL) { token = strtok_r(str1, PATH_DELIMITER, &saveptr); if (token == NULL) { LOG_ERR("RO already exists."); @@ -356,9 +349,6 @@ static struct rnode * ribmgr_ro_create(const char * name, sibling = true; } } - - if (node == NULL) - break; } token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr); @@ -554,9 +544,88 @@ int ribmgr_init() return -1; } + if (pthread_cond_init(&rib.state_cond, NULL)) { + LOG_ERR("Failed to init condvar."); + timerwheel_destroy(rib.wheel); + bmp_destroy(rib.sids); + pthread_rwlock_destroy(&rib.flows_lock); + pthread_mutex_destroy(&rib.ro_lock); + pthread_mutex_destroy(&rib.subs_lock); + pthread_mutex_destroy(&rib.ro_ids_lock); + free(rib.root); + return -1; + } + + if (pthread_mutex_init(&rib.state_lock, NULL)) { + LOG_ERR("Failed to init mutex."); + pthread_cond_destroy(&rib.state_cond); + timerwheel_destroy(rib.wheel); + bmp_destroy(rib.sids); + pthread_rwlock_destroy(&rib.flows_lock); + pthread_mutex_destroy(&rib.ro_lock); + pthread_mutex_destroy(&rib.subs_lock); + pthread_mutex_destroy(&rib.ro_ids_lock); + free(rib.root); + return -1; + } + + rib.state = RIBMGR_INIT; + return 0; } +static enum ribmgr_state ribmgr_get_state(void) +{ + enum ribmgr_state state; + + pthread_mutex_lock(&rib.state_lock); + + state = rib.state; + + pthread_mutex_unlock(&rib.state_lock); + + return state; +} + +static void ribmgr_set_state(enum ribmgr_state state) +{ + pthread_mutex_lock(&rib.state_lock); + + rib.state = state; + + pthread_cond_broadcast(&rib.state_cond); + + pthread_mutex_unlock(&rib.state_lock); +} + +static int ribmgr_wait_state(enum ribmgr_state state, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + + pthread_mutex_lock(&rib.state_lock); + + while (rib.state != state + && rib.state != RIBMGR_SHUTDOWN + && rib.state != RIBMGR_NULL) { + if (timeout == NULL) + ret = -pthread_cond_wait(&rib.state_cond, + &rib.state_lock); + else + ret = -pthread_cond_timedwait(&rib.state_cond, + &rib.state_lock, + &abstime); + } + + pthread_mutex_unlock(&rib.state_lock); + + return ret; +} + static void rtree_destroy(struct rnode * node) { if (node != NULL) { @@ -574,7 +643,13 @@ int ribmgr_fini() struct list_head * pos = NULL; struct list_head * n = NULL; + pthread_mutex_lock(&rib.state_lock); + rib.state = RIBMGR_SHUTDOWN; + pthread_cond_broadcast(&rib.state_cond); + pthread_mutex_unlock(&rib.state_lock); + pthread_rwlock_wrlock(&rib.flows_lock); + list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); @@ -583,6 +658,7 @@ int ribmgr_fini() list_del(&flow->next); free(flow); } + pthread_rwlock_unlock(&rib.flows_lock); ro_unsubscribe(rib.ribmgr_sid); @@ -591,8 +667,10 @@ int ribmgr_fini() addr_auth_destroy(rib.addr_auth); pthread_mutex_lock(&rib.ro_lock); + rtree_destroy(rib.root->child); free(rib.root); + pthread_mutex_unlock(&rib.ro_lock); bmp_destroy(rib.sids); @@ -603,6 +681,9 @@ int ribmgr_fini() pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.ro_ids_lock); + pthread_cond_destroy(&rib.state_cond); + pthread_mutex_destroy(&rib.state_lock); + return 0; } @@ -827,16 +908,7 @@ static int ribmgr_cdap_start(struct cdap * instance, if (strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); - pthread_rwlock_wrlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP in wrong state."); - return -1; - } - if (cdap_reply_send(instance, key, 0, NULL, 0)) { - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } @@ -847,7 +919,6 @@ static int ribmgr_cdap_start(struct cdap * instance, pthread_mutex_lock(&rib.ro_lock); if (ribmgr_enrol_sync(instance, rib.root->child)) { pthread_mutex_unlock(&rib.ro_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to sync part of the RIB."); return -1; } @@ -859,18 +930,14 @@ static int ribmgr_cdap_start(struct cdap * instance, key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT, NULL, 0, 0); if (key < 0) { - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send stop of enrollment."); return -1; } if (cdap_reply_wait(instance, key, NULL, NULL)) { - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Remote failed to complete enrollment."); return -1; } - - pthread_rwlock_unlock(&ipcpi.state_lock); } else { LOG_WARN("Request to start unknown operation."); if (cdap_reply_send(instance, key, -1, NULL, 0)) @@ -884,20 +951,18 @@ static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name) { int ret = 0; - pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) { + if (strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); - - ipcp_set_state(IPCP_BOOTING); - } else + /* FIXME: don't use states to match start to stop. */ + ribmgr_set_state(RIBMGR_OPERATIONAL); + } else { ret = -1; + } if (cdap_reply_send(instance, key, ret, NULL, 0)) { - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; } - pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -1068,7 +1133,7 @@ static void * cdap_req_handler(void * o) return (void *) 0; } -int ribmgr_add_flow(int fd) +static int ribmgr_add_flow(int fd) { struct cdap * instance = NULL; struct mgmt_flow * flow; @@ -1127,6 +1192,46 @@ int ribmgr_remove_flow(int fd) return -1; } +/* FIXME: do this in a topologymanager instance */ +int ribmgr_add_nm1_flow(int fd) +{ + if (flow_alloc_resp(fd, 0) < 0) { + LOG_ERR("Could not respond to new flow."); + return -1; + } + + return ribmgr_add_flow(fd); +} + +int ribmgr_nm1_mgt_flow(char * dst_name) +{ + int fd; + int result; + + /* FIXME: Request retransmission. */ + fd = flow_alloc(dst_name, MGMT_AE, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s.", dst_name); + return -1; + } + + result = flow_alloc_res(fd); + if (result < 0) { + LOG_ERR("Result of flow allocation to %s is %d.", + dst_name, result); + flow_dealloc(fd); + return -1; + } + + if (ribmgr_add_flow(fd)) { + LOG_ERR("Failed to add file descriptor."); + flow_dealloc(fd); + return -1; + } + + return fd; +} + int ribmgr_bootstrap(struct dif_config * conf) { static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; @@ -1134,11 +1239,6 @@ int ribmgr_bootstrap(struct dif_config * conf) size_t len = 0; struct ro_attr attr; - if (conf == NULL || conf->type != IPCP_NORMAL) { - LOG_ERR("Bad DIF configuration."); - return -EINVAL; - } - ro_attr_init(&attr); attr.enrol_sync = true; @@ -1189,83 +1289,54 @@ int ribmgr_bootstrap(struct dif_config * conf) return -1; } - if (frct_init()) { - LOG_ERR("Failed to initialize FRCT."); - dir_fini(); - ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - ribmgr_ro_delete(RIBMGR_PREFIX); - return -1; - } - LOG_DBG("Bootstrapped RIB Manager."); return 0; } -int ribmgr_enrol(void) +int ribmgr_enrol() { struct cdap * instance = NULL; struct mgmt_flow * flow; cdap_key_t key; int ret; - - pthread_rwlock_wrlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_INIT) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("IPCP in wrong state."); - return -1; - } - - ipcp_set_state(IPCP_CONFIG); + struct timespec timeout = {(ENROLL_TIMEOUT / 1000), + (ENROLL_TIMEOUT % 1000) * MILLION}; pthread_rwlock_wrlock(&rib.flows_lock); - if (list_empty(&rib.flows)) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&rib.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("No flows in RIB."); - return -1; - } + + assert(!list_empty(&rib.flows)); flow = list_first_entry((&rib.flows), struct mgmt_flow, next); instance = flow->instance; key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); if (key < 0) { - ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&rib.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); return -1; } ret = cdap_reply_wait(instance, key, NULL, NULL); if (ret) { - ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&rib.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to enroll: %d.", ret); return -1; } pthread_rwlock_unlock(&rib.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + + if (ribmgr_wait_state(RIBMGR_OPERATIONAL, &timeout) == -ETIMEDOUT) + LOG_ERR("Enrollment of RIB timed out."); + + if (ribmgr_get_state() != RIBMGR_OPERATIONAL) + return -1; return 0; } int ribmgr_start_policies(void) { - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_BOOTING) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Cannot start policies in wrong state."); - return -1; - } - pthread_rwlock_unlock(&ipcpi.state_lock); - rib.addr_auth = addr_auth_create(rib.addr_auth_type); if (rib.addr_auth == NULL) { LOG_ERR("Failed to create address authority."); diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index 954b7a3c..ddc98e6e 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * RIB manager of the IPC Process * @@ -31,9 +31,9 @@ int ribmgr_init(void); int ribmgr_fini(void); -int ribmgr_add_flow(int fd); +int ribmgr_add_nm1_flow(int fd); -int ribmgr_remove_flow(int fd); +int ribmgr_nm1_mgt_flow(char * dst_name); int ribmgr_bootstrap(struct dif_config * conf); diff --git a/src/ipcpd/normal/ro.h b/src/ipcpd/normal/ro.h index 43e432c3..6fda2adf 100644 --- a/src/ipcpd/normal/ro.h +++ b/src/ipcpd/normal/ro.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * RIB objects * @@ -22,6 +22,10 @@ #ifndef OUROBOROS_IPCPD_NORMAL_RO_H #define OUROBOROS_IPCPD_NORMAL_RO_H +#include <stdbool.h> +#include <time.h> +#include <stdint.h> + enum ro_recv_set { NO_SYNC = 0, NEIGHBORS, diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 12f586f0..9bc8f287 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017 * * Shim IPC process over UDP * @@ -1221,7 +1221,6 @@ int main(int argc, char * argv[]) ipcp_fini(); - if (ipcp_get_state() == IPCP_SHUTDOWN) { pthread_cancel(udp_data.handler); pthread_cancel(udp_data.sdu_reader); |