diff options
author | Sander Vrijders <sander.vrijders@ugent.be> | 2017-04-21 10:49:26 +0000 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-21 10:49:26 +0000 |
commit | 61ec9ed4da2938d8dfc06e05cc4212f080db398e (patch) | |
tree | 67b8576e9747d7815c7eed7170f49a10e5a4e0e0 /src/ipcpd/normal | |
parent | 4bfd6c07281847405e127e9588376fcf20d07a7e (diff) | |
parent | a9d71381a84886007625958b9daea6b2d4a50563 (diff) | |
download | ouroboros-61ec9ed4da2938d8dfc06e05cc4212f080db398e.tar.gz ouroboros-61ec9ed4da2938d8dfc06e05cc4212f080db398e.zip |
Merged in sandervrijders/ouroboros/be-fmgr-split (pull request #490)
ipcpd: normal: Split flow manager into DT and FA
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/ipcpd/normal/connmgr.c | 1 | ||||
-rw-r--r-- | src/ipcpd/normal/dt.c | 351 | ||||
-rw-r--r-- | src/ipcpd/normal/dt.h | 45 | ||||
-rw-r--r-- | src/ipcpd/normal/fa.c | 438 | ||||
-rw-r--r-- | src/ipcpd/normal/fa.h | 54 | ||||
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 748 | ||||
-rw-r--r-- | src/ipcpd/normal/fmgr.h | 61 | ||||
-rw-r--r-- | src/ipcpd/normal/frct.c | 29 | ||||
-rw-r--r-- | src/ipcpd/normal/frct.h | 4 | ||||
-rw-r--r-- | src/ipcpd/normal/main.c | 122 |
11 files changed, 972 insertions, 884 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 2045b8df..9a220ba6 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -22,8 +22,9 @@ set(SOURCE_FILES addr_auth.c connmgr.c dir.c + dt.c enroll.c - fmgr.c + fa.c frct.c gam.c graph.c diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index 421bc5b0..56fe9164 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -32,7 +32,6 @@ #include "ae.h" #include "connmgr.h" #include "enroll.h" -#include "fmgr.h" #include "frct.h" #include "ipcp.h" #include "ribmgr.h" diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c new file mode 100644 index 00000000..72e0195e --- /dev/null +++ b/src/ipcpd/normal/dt.c @@ -0,0 +1,351 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Data Transfer AE + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "dt-ae" + +#include <ouroboros/config.h> +#include <ouroboros/logs.h> +#include <ouroboros/rib.h> +#include <ouroboros/dev.h> + +#include "dt.h" +#include "connmgr.h" +#include "ipcp.h" +#include "shm_pci.h" +#include "pff.h" +#include "neighbors.h" +#include "gam.h" +#include "routing.h" +#include "sdu_sched.h" +#include "frct.h" +#include "ae.h" +#include "ribconfig.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <inttypes.h> +#include <assert.h> + +struct { + flow_set_t * set[QOS_CUBE_MAX]; + struct sdu_sched * sdu_sched; + + struct pff * pff[QOS_CUBE_MAX]; + struct routing_i * routing[QOS_CUBE_MAX]; + + struct gam * gam; + struct nbs * nbs; + struct ae * ae; + + struct nb_notifier nb_notifier; +} dt; + +static int dt_neighbor_event(enum nb_event event, + struct conn conn) +{ + qoscube_t cube; + + /* We are only interested in neighbors being added and removed. */ + switch (event) { + case NEIGHBOR_ADDED: + ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); + flow_set_add(dt.set[cube], conn.flow_info.fd); + log_dbg("Added fd %d to flow set.", conn.flow_info.fd); + break; + case NEIGHBOR_REMOVED: + ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); + flow_set_del(dt.set[cube], conn.flow_info.fd); + log_dbg("Removed fd %d from flow set.", conn.flow_info.fd); + break; + default: + break; + } + + return 0; +} + +static int sdu_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) +{ + struct pci pci; + + memset(&pci, 0, sizeof(pci)); + + shm_pci_des(sdb, &pci); + + if (pci.dst_addr != ipcpi.dt_addr) { + if (pci.ttl == 0) { + log_dbg("TTL was zero."); + ipcp_flow_del(sdb); + return 0; + } + + pff_lock(dt.pff[qc]); + + fd = pff_nhop(dt.pff[qc], pci.dst_addr); + if (fd < 0) { + pff_unlock(dt.pff[qc]); + log_err("No next hop for %" PRIu64, pci.dst_addr); + ipcp_flow_del(sdb); + return -1; + } + + pff_unlock(dt.pff[qc]); + + if (ipcp_flow_write(fd, sdb)) { + log_err("Failed to write SDU to fd %d.", fd); + ipcp_flow_del(sdb); + return -1; + } + } else { + shm_pci_shrink(sdb); + + if (frct_post_sdu(&pci, sdb)) { + log_err("Failed to hand PDU to FRCT."); + return -1; + } + } + + return 0; +} + +int dt_init(void) +{ + int i; + int j; + struct conn_info info; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + dt.set[i] = flow_set_create(); + if (dt.set[i] == NULL) { + goto fail_flows; + return -1; + } + } + + if (shm_pci_init()) { + log_err("Failed to init shm pci."); + goto fail_flows; + return -1; + } + + memset(&info, 0, sizeof(info)); + + strcpy(info.ae_name, DT_AE); + strcpy(info.protocol, FRCT_PROTO); + info.pref_version = 1; + info.pref_syntax = PROTO_FIXED; + info.addr = ipcpi.dt_addr; + + dt.ae = connmgr_ae_create(info); + if (dt.ae == NULL) { + log_err("Failed to create AE struct."); + goto fail_flows; + } + + dt.nbs = nbs_create(); + if (dt.nbs == NULL) { + log_err("Failed to create neighbors struct."); + goto fail_connmgr; + } + + dt.nb_notifier.notify_call = dt_neighbor_event; + if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) { + log_err("Failed to register notifier."); + goto fail_nbs; + } + + if (routing_init(dt.nbs)) { + log_err("Failed to init routing."); + goto fail_nbs_notifier; + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + dt.pff[i] = pff_create(); + if (dt.pff[i] == NULL) { + for (j = 0; j < i; ++j) + pff_destroy(dt.pff[j]); + goto fail_routing; + } + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + dt.routing[i] = routing_i_create(dt.pff[i]); + if (dt.routing[i] == NULL) { + for (j = 0; j < i; ++j) + routing_i_destroy(dt.routing[j]); + goto fail_pff; + } + } + + return 0; + fail_pff: + for (i = 0; i < QOS_CUBE_MAX; ++i) + pff_destroy(dt.pff[i]); + fail_routing: + routing_fini(); + fail_nbs_notifier: + nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); + fail_nbs: + nbs_destroy(dt.nbs); + fail_connmgr: + connmgr_ae_destroy(dt.ae); + fail_flows: + for (i = 0; i < QOS_CUBE_MAX; ++i) + flow_set_destroy(dt.set[i]); + + return -1; +} + +void dt_fini(void) +{ + int i; + + for (i = 0; i < QOS_CUBE_MAX; ++i) + routing_i_destroy(dt.routing[i]); + + for (i = 0; i < QOS_CUBE_MAX; ++i) + pff_destroy(dt.pff[i]); + + routing_fini(); + + nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); + + nbs_destroy(dt.nbs); + + connmgr_ae_destroy(dt.ae); + + for (i = 0; i < QOS_CUBE_MAX; ++i) + flow_set_destroy(dt.set[i]); +} + +int dt_start(void) +{ + enum pol_gam pg; + + if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg)) + != sizeof(pg)) { + log_err("Failed to read policy for ribmgr gam."); + return -1; + } + + dt.gam = gam_create(pg, dt.nbs, dt.ae); + if (dt.gam == NULL) { + log_err("Failed to init dt graph adjacency manager."); + return -1; + } + + dt.sdu_sched = sdu_sched_create(dt.set, sdu_handler); + if (dt.sdu_sched == NULL) { + log_err("Failed to create N-1 SDU scheduler."); + gam_destroy(dt.gam); + return -1; + } + + return 0; +} + +void dt_stop(void) +{ + sdu_sched_destroy(dt.sdu_sched); + + gam_destroy(dt.gam); +} + +int dt_write_sdu(struct pci * pci, + struct shm_du_buff * sdb) +{ + int fd; + + assert(pci); + assert(sdb); + + pff_lock(dt.pff[pci->qos_id]); + + fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr); + if (fd < 0) { + pff_unlock(dt.pff[pci->qos_id]); + log_err("Could not get nhop for address %" PRIu64, + pci->dst_addr); + ipcp_flow_del(sdb); + return -1; + } + + pff_unlock(dt.pff[pci->qos_id]); + + if (shm_pci_ser(sdb, pci)) { + log_err("Failed to serialize PDU."); + ipcp_flow_del(sdb); + return -1; + } + + if (ipcp_flow_write(fd, sdb)) { + log_err("Failed to write SDU to fd %d.", fd); + ipcp_flow_del(sdb); + return -1; + } + + return 0; +} + +int dt_write_buf(struct pci * pci, + buffer_t * buf) +{ + buffer_t * buffer; + int fd; + + assert(pci); + assert(buf); + assert(buf->data); + + pff_lock(dt.pff[pci->qos_id]); + + fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr); + if (fd < 0) { + pff_unlock(dt.pff[pci->qos_id]); + log_err("Could not get nhop for address %" PRIu64, + pci->dst_addr); + return -1; + } + + pff_unlock(dt.pff[pci->qos_id]); + + buffer = shm_pci_ser_buf(buf, pci); + if (buffer == NULL) { + log_err("Failed to serialize buffer."); + return -1; + } + + if (flow_write(fd, buffer->data, buffer->len) == -1) { + log_err("Failed to write buffer to fd."); + free(buffer); + return -1; + } + + free(buffer->data); + free(buffer); + + return 0; +} diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h new file mode 100644 index 00000000..dea9b91f --- /dev/null +++ b/src/ipcpd/normal/dt.h @@ -0,0 +1,45 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Data Transfer AE + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_DT_H +#define OUROBOROS_IPCPD_NORMAL_DT_H + +#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/utils.h> + +#include "shm_pci.h" + +int dt_init(void); + +void dt_fini(void); + +int dt_start(void); + +void dt_stop(void); + +int dt_write_sdu(struct pci * pci, + struct shm_du_buff * sdb); + +int dt_write_buf(struct pci * pci, + buffer_t * buf); + +#endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c new file mode 100644 index 00000000..be1080b1 --- /dev/null +++ b/src/ipcpd/normal/fa.c @@ -0,0 +1,438 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow allocator of the IPC Process + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "flow-allocator" + +#include <ouroboros/config.h> +#include <ouroboros/logs.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/rib.h> +#include <ouroboros/errno.h> +#include <ouroboros/dev.h> + +#include "fa.h" +#include "sdu_sched.h" +#include "ipcp.h" +#include "ribconfig.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> + +#include "flow_alloc.pb-c.h" +typedef FlowAllocMsg flow_alloc_msg_t; + +#define TIMEOUT 10000 /* nanoseconds */ + +struct { + pthread_rwlock_t flows_lock; + cep_id_t fd_to_cep_id[AP_MAX_FLOWS]; + int cep_id_to_fd[IPCPD_MAX_CONNS]; + + flow_set_t * set[QOS_CUBE_MAX]; + struct sdu_sched * sdu_sched; +} fa; + +static int sdu_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) +{ + (void) qc; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (frct_i_write_sdu(fa.fd_to_cep_id[fd], sdb)) { + pthread_rwlock_unlock(&fa.flows_lock); + ipcp_flow_del(sdb); + log_warn("Failed to hand SDU to FRCT."); + return -1; + } + + pthread_rwlock_unlock(&fa.flows_lock); + + return 0; +} + +int fa_init(void) +{ + int i; + + for (i = 0; i < AP_MAX_FLOWS; ++i) + fa.fd_to_cep_id[i] = INVALID_CEP_ID; + + for (i = 0; i < IPCPD_MAX_CONNS; ++i) + fa.cep_id_to_fd[i] = -1; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + fa.set[i] = flow_set_create(); + if (fa.set[i] == NULL) + goto fail_flows; + } + + if (pthread_rwlock_init(&fa.flows_lock, NULL)) + goto fail_flows; + + return 0; +fail_flows: + for (i = 0; i < QOS_CUBE_MAX; ++i) + flow_set_destroy(fa.set[i]); + + return -1; +} + +void fa_fini(void) +{ + int i; + + for (i = 0; i < QOS_CUBE_MAX; ++i) + flow_set_destroy(fa.set[i]); + + pthread_rwlock_destroy(&fa.flows_lock); +} + +int fa_start(void) +{ + fa.sdu_sched = sdu_sched_create(fa.set, sdu_handler); + if (fa.sdu_sched == NULL) { + log_err("Failed to create SDU scheduler."); + return -1; + } + + return 0; +} + +void fa_stop(void) +{ + sdu_sched_destroy(fa.sdu_sched); +} + +int fa_alloc(int fd, + const uint8_t * dst, + qoscube_t qc) +{ + cep_id_t cep_id; + buffer_t buf; + flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; + char path[RIB_MAX_PATH_LEN + 1]; + uint64_t addr; + ssize_t ch; + ssize_t i; + char ** children; + char hashstr[ipcp_dir_hash_strlen() + 1]; + char * dst_ipcp = NULL; + + ipcp_hash_str(hashstr, dst); + + assert(strlen(hashstr) + strlen(DIR_PATH) + 1 + < RIB_MAX_PATH_LEN); + + strcpy(path, DIR_PATH); + + rib_path_append(path, hashstr); + + ch = rib_children(path, &children); + if (ch <= 0) + return -1; + + for (i = 0; i < ch; ++i) + if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) + dst_ipcp = children[i]; + else + free(children[i]); + + free(children); + + if (dst_ipcp == NULL) + return -1; + + strcpy(path, MEMBERS_PATH); + + rib_path_append(path, dst_ipcp); + + free(dst_ipcp); + + if (rib_read(path, &addr, sizeof(addr)) < 0) + return -1; + + msg.code = FLOW_ALLOC_CODE__FLOW_REQ; + msg.has_hash = true; + msg.hash.len = ipcp_dir_hash_len(); + msg.hash.data = (uint8_t *) dst; + msg.has_qoscube = true; + msg.qoscube = qc; + + buf.len = flow_alloc_msg__get_packed_size(&msg); + if (buf.len == 0) + return -1; + + buf.data = malloc(buf.len); + if (buf.data == NULL) + return -1; + + flow_alloc_msg__pack(&msg, buf.data); + + pthread_rwlock_wrlock(&fa.flows_lock); + + cep_id = frct_i_create(addr, &buf, qc); + if (cep_id == INVALID_CEP_ID) { + pthread_rwlock_unlock(&fa.flows_lock); + free(buf.data); + return -1; + } + + free(buf.data); + + fa.fd_to_cep_id[fd] = cep_id; + fa.cep_id_to_fd[cep_id] = fd; + + pthread_rwlock_unlock(&fa.flows_lock); + + return 0; +} + +/* Call under flows lock */ +static int fa_flow_dealloc(int fd) +{ + flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; + buffer_t buf; + int ret; + qoscube_t qc; + + ipcp_flow_get_qoscube(fd, &qc); + flow_set_del(fa.set[qc], fd); + + msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; + + buf.len = flow_alloc_msg__get_packed_size(&msg); + if (buf.len == 0) + return -1; + + buf.data = malloc(buf.len); + if (buf.data == NULL) + return -ENOMEM; + + flow_alloc_msg__pack(&msg, buf.data); + + ret = frct_i_destroy(fa.fd_to_cep_id[fd], &buf); + + fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] = -1; + fa.fd_to_cep_id[fd] = INVALID_CEP_ID; + + free(buf.data); + + return ret; +} + +int fa_alloc_resp(int fd, + int response) +{ + struct timespec ts = {0, TIMEOUT * 1000}; + flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; + buffer_t buf; + + msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; + msg.response = response; + msg.has_response = true; + + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + ipcpi.alloc_id = -1; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + buf.len = flow_alloc_msg__get_packed_size(&msg); + if (buf.len == 0) + return -1; + + buf.data = malloc(buf.len); + if (buf.data == NULL) + return -ENOMEM; + + flow_alloc_msg__pack(&msg, buf.data); + + pthread_rwlock_wrlock(&fa.flows_lock); + + if (response < 0) { + frct_i_destroy(fa.fd_to_cep_id[fd], &buf); + free(buf.data); + fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] + = INVALID_CEP_ID; + fa.fd_to_cep_id[fd] = -1; + } else { + qoscube_t qc; + ipcp_flow_get_qoscube(fd, &qc); + if (frct_i_accept(fa.fd_to_cep_id[fd], &buf, qc)) { + pthread_rwlock_unlock(&fa.flows_lock); + free(buf.data); + return -1; + } + flow_set_add(fa.set[qc], fd); + } + + pthread_rwlock_unlock(&fa.flows_lock); + + free(buf.data); + + return 0; +} + +int fa_dealloc(int fd) +{ + int ret; + + pthread_rwlock_wrlock(&fa.flows_lock); + + ret = fa_flow_dealloc(fd); + + pthread_rwlock_unlock(&fa.flows_lock); + + return ret; +} + +int fa_post_buf(cep_id_t cep_id, + buffer_t * buf) +{ + struct timespec ts = {0, TIMEOUT * 1000}; + int ret = 0; + int fd; + flow_alloc_msg_t * msg; + qoscube_t qc; + + /* Depending on the message call the function in ipcp-dev.h */ + + msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); + if (msg == NULL) { + log_err("Failed to unpack flow alloc message"); + return -1; + } + + switch (msg->code) { + case FLOW_ALLOC_CODE__FLOW_REQ: + pthread_mutex_lock(&ipcpi.alloc_lock); + + if (!msg->has_hash) { + log_err("Bad flow request."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + while (ipcpi.alloc_id != -1 && + ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_dbg("Won't allocate over non-operational IPCP."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + assert(ipcpi.alloc_id == -1); + + fd = ipcp_flow_req_arr(getpid(), + msg->hash.data, + ipcp_dir_hash_len(), + msg->qoscube); + if (fd < 0) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + log_err("Failed to get fd for flow."); + return -1; + } + + pthread_rwlock_wrlock(&fa.flows_lock); + + fa.fd_to_cep_id[fd] = cep_id; + fa.cep_id_to_fd[cep_id] = fd; + + pthread_rwlock_unlock(&fa.flows_lock); + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + break; + case FLOW_ALLOC_CODE__FLOW_REPLY: + pthread_rwlock_wrlock(&fa.flows_lock); + + fd = fa.cep_id_to_fd[cep_id]; + ret = ipcp_flow_alloc_reply(fd, msg->response); + if (msg->response < 0) { + fa.fd_to_cep_id[fd] = INVALID_CEP_ID; + fa.cep_id_to_fd[cep_id] = -1; + } else { + ipcp_flow_get_qoscube(fd, &qc); + flow_set_add(fa.set[qc], + fa.cep_id_to_fd[cep_id]); + } + + pthread_rwlock_unlock(&fa.flows_lock); + + break; + case FLOW_ALLOC_CODE__FLOW_DEALLOC: + fd = fa.cep_id_to_fd[cep_id]; + ipcp_flow_get_qoscube(fd, &qc); + flow_set_del(fa.set[qc], fd); + ret = flow_dealloc(fd); + break; + default: + log_err("Got an unknown flow allocation message."); + ret = -1; + break; + } + + flow_alloc_msg__free_unpacked(msg, NULL); + + return ret; +} + +int fa_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb) +{ + int fd; + + pthread_rwlock_rdlock(&fa.flows_lock); + + fd = fa.cep_id_to_fd[cep_id]; + if (ipcp_flow_write(fd, sdb)) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Failed to hand SDU to N flow."); + return -1; + } + + pthread_rwlock_unlock(&fa.flows_lock); + + return 0; +} diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h new file mode 100644 index 00000000..d370a381 --- /dev/null +++ b/src/ipcpd/normal/fa.h @@ -0,0 +1,54 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow allocator of the IPC Process + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_FA_H +#define OUROBOROS_IPCPD_NORMAL_FA_H + +#include <ouroboros/shared.h> +#include <ouroboros/utils.h> + +#include "frct.h" + +int fa_init(void); + +void fa_fini(void); + +int fa_start(void); + +void fa_stop(void); + +int fa_alloc(int fd, + const uint8_t * dst, + qoscube_t qos); + +int fa_alloc_resp(int fd, + int response); + +int fa_dealloc(int fd); + +int fa_post_buf(cep_id_t cep_id, + buffer_t * buf); + +int fa_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb); + +#endif /* OUROBOROS_IPCPD_NORMAL_FA_H */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c deleted file mode 100644 index d055b311..00000000 --- a/src/ipcpd/normal/fmgr.c +++ /dev/null @@ -1,748 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Flow manager of the IPC Process - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#define OUROBOROS_PREFIX "flow-manager" - -#include <ouroboros/config.h> -#include <ouroboros/logs.h> -#include <ouroboros/dev.h> -#include <ouroboros/list.h> -#include <ouroboros/ipcp-dev.h> -#include <ouroboros/fqueue.h> -#include <ouroboros/errno.h> -#include <ouroboros/cacep.h> -#include <ouroboros/rib.h> - -#include "connmgr.h" -#include "fmgr.h" -#include "frct.h" -#include "ipcp.h" -#include "shm_pci.h" -#include "ribconfig.h" -#include "pff.h" -#include "neighbors.h" -#include "gam.h" -#include "routing.h" -#include "sdu_sched.h" - -#include <stdlib.h> -#include <stdbool.h> -#include <pthread.h> -#include <string.h> -#include <inttypes.h> - -#include "flow_alloc.pb-c.h" -typedef FlowAllocMsg flow_alloc_msg_t; - -#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ - -struct { - pthread_rwlock_t np1_flows_lock; - cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; - int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - - flow_set_t * np1_set[QOS_CUBE_MAX]; - struct sdu_sched * np1_sdu_sched; - - flow_set_t * nm1_set[QOS_CUBE_MAX]; - struct sdu_sched * nm1_sdu_sched; - - struct pff * pff[QOS_CUBE_MAX]; - struct routing_i * routing[QOS_CUBE_MAX]; - - struct gam * gam; - struct nbs * nbs; - struct ae * ae; - - struct nb_notifier nb_notifier; -} fmgr; - -static int fmgr_neighbor_event(enum nb_event event, - struct conn conn) -{ - qoscube_t cube; - - /* We are only interested in neighbors being added and removed. */ - switch (event) { - case NEIGHBOR_ADDED: - ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); - flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd); - log_dbg("Added fd %d to flow set.", conn.flow_info.fd); - break; - case NEIGHBOR_REMOVED: - ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); - flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd); - log_dbg("Removed fd %d from flow set.", conn.flow_info.fd); - break; - default: - break; - } - - return 0; -} - -static int np1_sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) -{ - (void) qc; - - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - - if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - log_warn("Failed to hand SDU to FRCT."); - return -1; - } - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - return 0; -} - -static int nm1_sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) -{ - struct pci pci; - - memset(&pci, 0, sizeof(pci)); - - shm_pci_des(sdb, &pci); - - if (pci.dst_addr != ipcpi.dt_addr) { - if (pci.ttl == 0) { - log_dbg("TTL was zero."); - ipcp_flow_del(sdb); - return 0; - } - - pff_lock(fmgr.pff[qc]); - - fd = pff_nhop(fmgr.pff[qc], pci.dst_addr); - if (fd < 0) { - pff_unlock(fmgr.pff[qc]); - log_err("No next hop for %" PRIu64, pci.dst_addr); - ipcp_flow_del(sdb); - return -1; - } - - pff_unlock(fmgr.pff[qc]); - - if (ipcp_flow_write(fd, sdb)) { - log_err("Failed to write SDU to fd %d.", fd); - ipcp_flow_del(sdb); - return -1; - } - } else { - shm_pci_shrink(sdb); - - if (frct_nm1_post_sdu(&pci, sdb)) { - log_err("Failed to hand PDU to FRCT."); - return -1; - } - } - - return 0; -} - -static void fmgr_destroy_flows(void) -{ - int i; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - flow_set_destroy(fmgr.nm1_set[i]); - flow_set_destroy(fmgr.np1_set[i]); - } -} - -static void fmgr_destroy_routing(void) -{ - int i; - - for (i = 0; i < QOS_CUBE_MAX; ++i) - routing_i_destroy(fmgr.routing[i]); -} - -static void fmgr_destroy_pff(void) -{ - int i; - - for (i = 0; i < QOS_CUBE_MAX; ++i) - pff_destroy(fmgr.pff[i]); -} - -int fmgr_init(void) -{ - int i; - int j; - struct conn_info info; - - for (i = 0; i < AP_MAX_FLOWS; ++i) - fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; - - for (i = 0; i < IPCPD_MAX_CONNS; ++i) - fmgr.np1_cep_id_to_fd[i] = -1; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - fmgr.np1_set[i] = flow_set_create(); - if (fmgr.np1_set[i] == NULL) { - fmgr_destroy_flows(); - return -1; - } - - fmgr.nm1_set[i] = flow_set_create(); - if (fmgr.nm1_set[i] == NULL) { - fmgr_destroy_flows(); - return -1; - } - } - - if (shm_pci_init()) { - log_err("Failed to init shm pci."); - fmgr_destroy_flows(); - return -1; - } - - memset(&info, 0, sizeof(info)); - - strcpy(info.ae_name, DT_AE); - strcpy(info.protocol, FRCT_PROTO); - info.pref_version = 1; - info.pref_syntax = PROTO_FIXED; - info.addr = ipcpi.dt_addr; - - fmgr.ae = connmgr_ae_create(info); - if (fmgr.ae == NULL) { - log_err("Failed to create AE struct."); - fmgr_destroy_flows(); - return -1; - } - - fmgr.nbs = nbs_create(); - if (fmgr.nbs == NULL) { - log_err("Failed to create neighbors struct."); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - fmgr.nb_notifier.notify_call = fmgr_neighbor_event; - if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) { - log_err("Failed to register notifier."); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - if (routing_init(fmgr.nbs)) { - log_err("Failed to init routing."); - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) { - routing_fini(); - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - fmgr.pff[i] = pff_create(); - if (fmgr.pff[i] == NULL) { - for (j = 0; j < i; ++j) - pff_destroy(fmgr.pff[j]); - pthread_rwlock_destroy(&fmgr.np1_flows_lock); - routing_fini(); - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - fmgr.routing[i] = routing_i_create(fmgr.pff[i]); - if (fmgr.routing[i] == NULL) { - for (j = 0; j < i; ++j) - routing_i_destroy(fmgr.routing[j]); - fmgr_destroy_pff(); - pthread_rwlock_destroy(&fmgr.np1_flows_lock); - routing_fini(); - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - } - - return 0; -} - -void fmgr_fini() -{ - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - - fmgr_destroy_routing(); - - fmgr_destroy_pff(); - - routing_fini(); - - fmgr_destroy_flows(); - - connmgr_ae_destroy(fmgr.ae); - - nbs_destroy(fmgr.nbs); -} - -int fmgr_start(void) -{ - enum pol_gam pg; - - if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg)) - != sizeof(pg)) { - log_err("Failed to read policy for ribmgr gam."); - return -1; - } - - fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae); - if (fmgr.gam == NULL) { - log_err("Failed to init dt graph adjacency manager."); - return -1; - } - - fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler); - if (fmgr.nm1_sdu_sched == NULL) { - log_err("Failed to create N-1 SDU scheduler."); - gam_destroy(fmgr.gam); - return -1; - } - - fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler); - if (fmgr.np1_sdu_sched == NULL) { - log_err("Failed to create N+1 SDU scheduler."); - sdu_sched_destroy(fmgr.nm1_sdu_sched); - gam_destroy(fmgr.gam); - return -1; - } - - return 0; -} - -void fmgr_stop(void) -{ - sdu_sched_destroy(fmgr.np1_sdu_sched); - - sdu_sched_destroy(fmgr.nm1_sdu_sched); - - gam_destroy(fmgr.gam); -} - -int fmgr_np1_alloc(int fd, - const uint8_t * dst, - qoscube_t cube) -{ - cep_id_t cep_id; - buffer_t buf; - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - char path[RIB_MAX_PATH_LEN + 1]; - uint64_t addr; - ssize_t ch; - ssize_t i; - char ** children; - char hashstr[ipcp_dir_hash_strlen() + 1]; - char * dst_ipcp = NULL; - - ipcp_hash_str(hashstr, dst); - - assert(strlen(hashstr) + strlen(DIR_PATH) + 1 - < RIB_MAX_PATH_LEN); - - strcpy(path, DIR_PATH); - - rib_path_append(path, hashstr); - - ch = rib_children(path, &children); - if (ch <= 0) - return -1; - - for (i = 0; i < ch; ++i) - if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) - dst_ipcp = children[i]; - else - free(children[i]); - - free(children); - - if (dst_ipcp == NULL) - return -1; - - strcpy(path, MEMBERS_PATH); - - rib_path_append(path, dst_ipcp); - - free(dst_ipcp); - - if (rib_read(path, &addr, sizeof(addr)) < 0) - return -1; - - msg.code = FLOW_ALLOC_CODE__FLOW_REQ; - msg.has_hash = true; - msg.hash.len = ipcp_dir_hash_len(); - msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = cube; - - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) - return -1; - - buf.data = malloc(buf.len); - if (buf.data == NULL) - return -1; - - flow_alloc_msg__pack(&msg, buf.data); - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - cep_id = frct_i_create(addr, &buf, cube); - if (cep_id == INVALID_CEP_ID) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - free(buf.data); - return -1; - } - - free(buf.data); - - fmgr.np1_fd_to_cep_id[fd] = cep_id; - fmgr.np1_cep_id_to_fd[cep_id] = fd; - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - return 0; -} - -/* Call under np1_flows lock */ -static int np1_flow_dealloc(int fd) -{ - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - buffer_t buf; - int ret; - qoscube_t cube; - - ipcp_flow_get_qoscube(fd, &cube); - flow_set_del(fmgr.np1_set[cube], fd); - - msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; - - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) - return -1; - - buf.data = malloc(buf.len); - if (buf.data == NULL) - return -ENOMEM; - - flow_alloc_msg__pack(&msg, buf.data); - - ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - - fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; - fmgr.np1_fd_to_cep_id[fd] = -1; - - free(buf.data); - - return ret; -} - -int fmgr_np1_alloc_resp(int fd, - int response) -{ - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - buffer_t buf; - - msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; - msg.response = response; - msg.has_response = true; - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &ts); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) - return -1; - - buf.data = malloc(buf.len); - if (buf.data == NULL) - return -ENOMEM; - - flow_alloc_msg__pack(&msg, buf.data); - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - if (response < 0) { - frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - free(buf.data); - fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] - = INVALID_CEP_ID; - fmgr.np1_fd_to_cep_id[fd] = -1; - } else { - qoscube_t cube; - ipcp_flow_get_qoscube(fd, &cube); - if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - free(buf.data); - return -1; - } - flow_set_add(fmgr.np1_set[cube], fd); - } - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - free(buf.data); - - return 0; -} - -int fmgr_np1_dealloc(int fd) -{ - int ret; - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - ret = np1_flow_dealloc(fd); - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - return ret; -} - -int fmgr_np1_post_buf(cep_id_t cep_id, - buffer_t * buf) -{ - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - int ret = 0; - int fd; - flow_alloc_msg_t * msg; - qoscube_t cube; - - /* Depending on the message call the function in ipcp-dev.h */ - - msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); - if (msg == NULL) { - log_err("Failed to unpack flow alloc message"); - return -1; - } - - switch (msg->code) { - case FLOW_ALLOC_CODE__FLOW_REQ: - pthread_mutex_lock(&ipcpi.alloc_lock); - - if (!msg->has_hash) { - log_err("Bad flow request."); - return -1; - } - - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &ts); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - assert(ipcpi.alloc_id == -1); - - fd = ipcp_flow_req_arr(getpid(), - msg->hash.data, - ipcp_dir_hash_len(), - msg->qoscube); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - log_err("Failed to get fd for flow."); - return -1; - } - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - fmgr.np1_fd_to_cep_id[fd] = cep_id; - fmgr.np1_cep_id_to_fd[cep_id] = fd; - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - break; - case FLOW_ALLOC_CODE__FLOW_REPLY: - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - fd = fmgr.np1_cep_id_to_fd[cep_id]; - ret = ipcp_flow_alloc_reply(fd, msg->response); - if (msg->response < 0) { - fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; - fmgr.np1_cep_id_to_fd[cep_id] = -1; - } else { - ipcp_flow_get_qoscube(fd, &cube); - flow_set_add(fmgr.np1_set[cube], - fmgr.np1_cep_id_to_fd[cep_id]); - } - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - break; - case FLOW_ALLOC_CODE__FLOW_DEALLOC: - fd = fmgr.np1_cep_id_to_fd[cep_id]; - ipcp_flow_get_qoscube(fd, &cube); - flow_set_del(fmgr.np1_set[cube], fd); - ret = flow_dealloc(fd); - break; - default: - log_err("Got an unknown flow allocation message."); - ret = -1; - break; - } - - flow_alloc_msg__free_unpacked(msg, NULL); - - return ret; -} - -int fmgr_np1_post_sdu(cep_id_t cep_id, - struct shm_du_buff * sdb) -{ - int fd; - - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - - fd = fmgr.np1_cep_id_to_fd[cep_id]; - if (ipcp_flow_write(fd, sdb)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - log_err("Failed to hand SDU to N flow."); - return -1; - } - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - return 0; -} - -int fmgr_nm1_write_sdu(struct pci * pci, - struct shm_du_buff * sdb) -{ - int fd; - - if (pci == NULL || sdb == NULL) - return -EINVAL; - - pff_lock(fmgr.pff[pci->qos_id]); - fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); - if (fd < 0) { - pff_unlock(fmgr.pff[pci->qos_id]); - log_err("Could not get nhop for address %" PRIu64, - pci->dst_addr); - ipcp_flow_del(sdb); - return -1; - } - pff_unlock(fmgr.pff[pci->qos_id]); - - if (shm_pci_ser(sdb, pci)) { - log_err("Failed to serialize PDU."); - ipcp_flow_del(sdb); - return -1; - } - - if (ipcp_flow_write(fd, sdb)) { - log_err("Failed to write SDU to fd %d.", fd); - ipcp_flow_del(sdb); - return -1; - } - - return 0; -} - -int fmgr_nm1_write_buf(struct pci * pci, - buffer_t * buf) -{ - buffer_t * buffer; - int fd; - - if (pci == NULL || buf == NULL || buf->data == NULL) - return -EINVAL; - - pff_lock(fmgr.pff[pci->qos_id]); - fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); - if (fd < 0) { - pff_unlock(fmgr.pff[pci->qos_id]); - log_err("Could not get nhop for address %" PRIu64, - pci->dst_addr); - return -1; - } - pff_unlock(fmgr.pff[pci->qos_id]); - - buffer = shm_pci_ser_buf(buf, pci); - if (buffer == NULL) { - log_err("Failed to serialize buffer."); - return -1; - } - - if (flow_write(fd, buffer->data, buffer->len) == -1) { - log_err("Failed to write buffer to fd."); - free(buffer); - return -1; - } - - free(buffer->data); - free(buffer); - return 0; -} diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h deleted file mode 100644 index c59c0875..00000000 --- a/src/ipcpd/normal/fmgr.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Flow manager of the IPC Process - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_FMGR_H -#define OUROBOROS_IPCPD_NORMAL_FMGR_H - -#include <ouroboros/shared.h> -#include <ouroboros/qos.h> - -#include "ae.h" -#include "frct.h" - -int fmgr_init(void); - -void fmgr_fini(void); - -int fmgr_start(void); - -void fmgr_stop(void); - -int fmgr_np1_alloc(int fd, - const uint8_t * dst, - qoscube_t qos); - -int fmgr_np1_alloc_resp(int fd, - int response); - -int fmgr_np1_dealloc(int fd); - -int fmgr_np1_post_buf(cep_id_t id, - buffer_t * buf); - -int fmgr_np1_post_sdu(cep_id_t id, - struct shm_du_buff * sdb); - -int fmgr_nm1_write_sdu(struct pci * pci, - struct shm_du_buff * sdb); - -int fmgr_nm1_write_buf(struct pci * pci, - buffer_t * buf); - -#endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */ diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index d873beae..bfcde1b3 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -30,8 +30,9 @@ #include <ouroboros/errno.h> #include "frct.h" -#include "fmgr.h" #include "ipcp.h" +#include "dt.h" +#include "fa.h" #include <stdlib.h> #include <stdbool.h> @@ -210,8 +211,8 @@ int frct_fini() return 0; } -int frct_nm1_post_sdu(struct pci * pci, - struct shm_du_buff * sdb) +int frct_post_sdu(struct pci * pci, + struct shm_du_buff * sdb) { struct frct_i * instance; buffer_t buf; @@ -250,17 +251,17 @@ int frct_nm1_post_sdu(struct pci * pci, buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); buf.data = shm_du_buff_head(sdb); - if (fmgr_np1_post_buf(id, &buf)) { - log_err("Failed to hand buffer to Flow Manager."); + if (fa_post_buf(id, &buf)) { + log_err("Failed to hand buffer to FA."); ipcp_flow_del(sdb); return -1; } ipcp_flow_del(sdb); } else { - /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ - if (fmgr_np1_post_sdu(pci->dst_cep_id, sdb)) { - log_err("Failed to hand SDU to FMGR."); + /* FIXME: Known cep-ids are delivered to FA (minimal DTP) */ + if (fa_post_sdu(pci->dst_cep_id, sdb)) { + log_err("Failed to hand SDU to FA."); ipcp_flow_del(sdb); return -1; } @@ -301,11 +302,11 @@ cep_id_t frct_i_create(uint64_t address, pci.seqno = 0; pci.qos_id = cube; - if (fmgr_nm1_write_buf(&pci, buf)) { + if (dt_write_buf(&pci, buf)) { pthread_mutex_lock(&frct.instances_lock); destroy_frct_i(id); pthread_mutex_unlock(&frct.instances_lock); - log_err("Failed to hand PDU to FMGR."); + log_err("Failed to hand PDU to DT."); return INVALID_CEP_ID; } @@ -350,7 +351,7 @@ int frct_i_accept(cep_id_t id, pthread_mutex_unlock(&frct.instances_lock); - if (fmgr_nm1_write_buf(&pci, buf)) + if (dt_write_buf(&pci, buf)) return -1; return 0; @@ -390,7 +391,7 @@ int frct_i_destroy(cep_id_t id, pthread_mutex_unlock(&frct.instances_lock); if (buf != NULL && buf->data != NULL) - if (fmgr_nm1_write_buf(&pci, buf)) + if (dt_write_buf(&pci, buf)) return -1; return 0; @@ -427,9 +428,9 @@ int frct_i_write_sdu(cep_id_t id, pci.seqno = (instance->seqno)++; pci.qos_id = instance->cube; - if (fmgr_nm1_write_sdu(&pci, sdb)) { + if (dt_write_sdu(&pci, sdb)) { pthread_mutex_unlock(&frct.instances_lock); - log_err("Failed to hand SDU to FMGR."); + log_err("Failed to hand SDU to DT."); return -1; } diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index a1dcb151..b179e36b 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -50,7 +50,7 @@ int frct_i_destroy(cep_id_t id, int frct_i_write_sdu(cep_id_t id, struct shm_du_buff * sdb); -int frct_nm1_post_sdu(struct pci * pci, - struct shm_du_buff * sdb); +int frct_post_sdu(struct pci * pci, + struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_NORMAL_FRCT_H */ diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 67424914..ab8cf387 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -36,7 +36,8 @@ #include "connmgr.h" #include "dir.h" #include "enroll.h" -#include "fmgr.h" +#include "fa.h" +#include "dt.h" #include "ipcp.h" #include "ribconfig.h" #include "ribmgr.h" @@ -73,7 +74,7 @@ static int boot_components(void) &ipcpi.dir_hash_algo, sizeof(ipcpi.dir_hash_algo)); if (len < 0) { log_err("Failed to read hash length: %zd.", len); - return -1; + goto fail_name; } ipcpi.dir_hash_algo = ntoh32(ipcpi.dir_hash_algo); @@ -82,7 +83,7 @@ static int boot_components(void) if (rib_add(MEMBERS_PATH, ipcpi.name)) { log_err("Failed to add name to " MEMBERS_PATH); - return -1; + goto fail_name; } log_dbg("Starting components."); @@ -90,27 +91,25 @@ static int boot_components(void) if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa)) != sizeof(pa)) { log_err("Failed to read policy for address authority."); - return -1; + goto fail_name; } if (addr_auth_init(pa)) { log_err("Failed to init address authority."); - return -1; + goto fail_name; } ipcpi.dt_addr = addr_auth_address(); if (ipcpi.dt_addr == 0) { log_err("Failed to get a valid address."); - addr_auth_fini(); - return -1; + goto fail_addr_auth; } path[0] = '\0'; rib_path_append(rib_path_append(path, MEMBERS_NAME), ipcpi.name); if (rib_write(path, &ipcpi.dt_addr, sizeof(&ipcpi.dt_addr))) { log_err("Failed to write address to member object."); - addr_auth_fini(); - return -1; + goto fail_addr_auth; } log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); @@ -119,91 +118,100 @@ static int boot_components(void) if (ribmgr_init()) { log_err("Failed to initialize RIB manager."); - addr_auth_fini(); - return -1; + goto fail_addr_auth; } if (dir_init()) { log_err("Failed to initialize directory."); - ribmgr_fini(); - addr_auth_fini(); - return -1; + goto fail_ribmgr; } log_dbg("Ribmgr started."); if (frct_init()) { - dir_fini(); - ribmgr_fini(); - addr_auth_fini(); log_err("Failed to initialize FRCT."); - return -1; + goto fail_dir; } - if (fmgr_init()) { - frct_fini(); - dir_fini(); - ribmgr_fini(); - addr_auth_fini(); - log_err("Failed to initialize flow manager component."); - return -1; + if (fa_init()) { + log_err("Failed to initialize flow allocator ae."); + goto fail_frct; } - if (fmgr_start()) { - fmgr_fini(); - frct_fini(); - dir_fini(); - ribmgr_fini(); - addr_auth_fini(); - log_err("Failed to start flow manager."); - return -1; + if (dt_init()) { + log_err("Failed to initialize data transfer ae."); + goto fail_fa; + } + + if (fa_start()) { + log_err("Failed to start flow allocator."); + goto fail_dt; + } + + if (dt_start()) { + log_err("Failed to start data transfer ae."); + goto fail_fa_start; } if (enroll_start()) { - fmgr_stop(); - fmgr_fini(); - frct_fini(); - dir_fini(); - ribmgr_fini(); - addr_auth_fini(); log_err("Failed to start enroll."); - return -1; + goto fail_dt_start; } ipcp_set_state(IPCP_OPERATIONAL); if (connmgr_start()) { - ipcp_set_state(IPCP_INIT); - enroll_stop(); - fmgr_stop(); - fmgr_fini(); - frct_fini(); - dir_fini(); - ribmgr_fini(); - addr_auth_fini(); log_err("Failed to start AP connection manager."); - return -1; + goto fail_enroll; } return 0; + + fail_enroll: + ipcp_set_state(IPCP_INIT); + enroll_stop(); + fail_dt_start: + dt_stop(); + fail_fa_start: + fa_stop(); + fail_dt: + dt_fini(); + fail_fa: + fa_fini(); + fail_frct: + frct_fini(); + fail_dir: + dir_fini(); + fail_ribmgr: + ribmgr_fini(); + fail_addr_auth: + addr_auth_fini(); + fail_name: + free(ipcpi.dif_name); + + return -1; } void shutdown_components(void) { - ribmgr_fini(); - connmgr_stop(); enroll_stop(); - frct_fini(); + dt_stop(); + + fa_stop(); - fmgr_stop(); + dt_fini(); - fmgr_fini(); + fa_fini(); + + frct_fini(); dir_fini(); + ribmgr_fini(); + addr_auth_fini(); free(ipcpi.dif_name); @@ -366,9 +374,9 @@ static struct ipcp_ops normal_ops = { .ipcp_reg = dir_reg, .ipcp_unreg = dir_unreg, .ipcp_query = dir_query, - .ipcp_flow_alloc = fmgr_np1_alloc, - .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp, - .ipcp_flow_dealloc = fmgr_np1_dealloc + .ipcp_flow_alloc = fa_alloc, + .ipcp_flow_alloc_resp = fa_alloc_resp, + .ipcp_flow_dealloc = fa_dealloc }; int main(int argc, |