summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2017-04-21 10:49:26 +0000
committerdimitri staessens <dimitri.staessens@ugent.be>2017-04-21 10:49:26 +0000
commit61ec9ed4da2938d8dfc06e05cc4212f080db398e (patch)
tree67b8576e9747d7815c7eed7170f49a10e5a4e0e0
parent4bfd6c07281847405e127e9588376fcf20d07a7e (diff)
parenta9d71381a84886007625958b9daea6b2d4a50563 (diff)
downloadouroboros-61ec9ed4da2938d8dfc06e05cc4212f080db398e.tar.gz
ouroboros-61ec9ed4da2938d8dfc06e05cc4212f080db398e.zip
Merged in sandervrijders/ouroboros/be-fmgr-split (pull request #490)
ipcpd: normal: Split flow manager into DT and FA
-rw-r--r--src/ipcpd/normal/CMakeLists.txt3
-rw-r--r--src/ipcpd/normal/connmgr.c1
-rw-r--r--src/ipcpd/normal/dt.c351
-rw-r--r--src/ipcpd/normal/dt.h45
-rw-r--r--src/ipcpd/normal/fa.c438
-rw-r--r--src/ipcpd/normal/fa.h54
-rw-r--r--src/ipcpd/normal/fmgr.c748
-rw-r--r--src/ipcpd/normal/fmgr.h61
-rw-r--r--src/ipcpd/normal/frct.c29
-rw-r--r--src/ipcpd/normal/frct.h4
-rw-r--r--src/ipcpd/normal/main.c122
11 files changed, 972 insertions, 884 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 2045b8df..9a220ba6 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -22,8 +22,9 @@ set(SOURCE_FILES
addr_auth.c
connmgr.c
dir.c
+ dt.c
enroll.c
- fmgr.c
+ fa.c
frct.c
gam.c
graph.c
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c
index 421bc5b0..56fe9164 100644
--- a/src/ipcpd/normal/connmgr.c
+++ b/src/ipcpd/normal/connmgr.c
@@ -32,7 +32,6 @@
#include "ae.h"
#include "connmgr.h"
#include "enroll.h"
-#include "fmgr.h"
#include "frct.h"
#include "ipcp.h"
#include "ribmgr.h"
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
new file mode 100644
index 00000000..72e0195e
--- /dev/null
+++ b/src/ipcpd/normal/dt.c
@@ -0,0 +1,351 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Data Transfer AE
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "dt-ae"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/rib.h>
+#include <ouroboros/dev.h>
+
+#include "dt.h"
+#include "connmgr.h"
+#include "ipcp.h"
+#include "shm_pci.h"
+#include "pff.h"
+#include "neighbors.h"
+#include "gam.h"
+#include "routing.h"
+#include "sdu_sched.h"
+#include "frct.h"
+#include "ae.h"
+#include "ribconfig.h"
+
+#include <stdlib.h>
+#include <stdbool.h>
+#include <pthread.h>
+#include <string.h>
+#include <inttypes.h>
+#include <assert.h>
+
+struct {
+ flow_set_t * set[QOS_CUBE_MAX];
+ struct sdu_sched * sdu_sched;
+
+ struct pff * pff[QOS_CUBE_MAX];
+ struct routing_i * routing[QOS_CUBE_MAX];
+
+ struct gam * gam;
+ struct nbs * nbs;
+ struct ae * ae;
+
+ struct nb_notifier nb_notifier;
+} dt;
+
+static int dt_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ qoscube_t cube;
+
+ /* We are only interested in neighbors being added and removed. */
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_add(dt.set[cube], conn.flow_info.fd);
+ log_dbg("Added fd %d to flow set.", conn.flow_info.fd);
+ break;
+ case NEIGHBOR_REMOVED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_del(dt.set[cube], conn.flow_info.fd);
+ log_dbg("Removed fd %d from flow set.", conn.flow_info.fd);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+static int sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
+{
+ struct pci pci;
+
+ memset(&pci, 0, sizeof(pci));
+
+ shm_pci_des(sdb, &pci);
+
+ if (pci.dst_addr != ipcpi.dt_addr) {
+ if (pci.ttl == 0) {
+ log_dbg("TTL was zero.");
+ ipcp_flow_del(sdb);
+ return 0;
+ }
+
+ pff_lock(dt.pff[qc]);
+
+ fd = pff_nhop(dt.pff[qc], pci.dst_addr);
+ if (fd < 0) {
+ pff_unlock(dt.pff[qc]);
+ log_err("No next hop for %" PRIu64, pci.dst_addr);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ pff_unlock(dt.pff[qc]);
+
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+ } else {
+ shm_pci_shrink(sdb);
+
+ if (frct_post_sdu(&pci, sdb)) {
+ log_err("Failed to hand PDU to FRCT.");
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int dt_init(void)
+{
+ int i;
+ int j;
+ struct conn_info info;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ dt.set[i] = flow_set_create();
+ if (dt.set[i] == NULL) {
+ goto fail_flows;
+ return -1;
+ }
+ }
+
+ if (shm_pci_init()) {
+ log_err("Failed to init shm pci.");
+ goto fail_flows;
+ return -1;
+ }
+
+ memset(&info, 0, sizeof(info));
+
+ strcpy(info.ae_name, DT_AE);
+ strcpy(info.protocol, FRCT_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_FIXED;
+ info.addr = ipcpi.dt_addr;
+
+ dt.ae = connmgr_ae_create(info);
+ if (dt.ae == NULL) {
+ log_err("Failed to create AE struct.");
+ goto fail_flows;
+ }
+
+ dt.nbs = nbs_create();
+ if (dt.nbs == NULL) {
+ log_err("Failed to create neighbors struct.");
+ goto fail_connmgr;
+ }
+
+ dt.nb_notifier.notify_call = dt_neighbor_event;
+ if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) {
+ log_err("Failed to register notifier.");
+ goto fail_nbs;
+ }
+
+ if (routing_init(dt.nbs)) {
+ log_err("Failed to init routing.");
+ goto fail_nbs_notifier;
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ dt.pff[i] = pff_create();
+ if (dt.pff[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ pff_destroy(dt.pff[j]);
+ goto fail_routing;
+ }
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ dt.routing[i] = routing_i_create(dt.pff[i]);
+ if (dt.routing[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ routing_i_destroy(dt.routing[j]);
+ goto fail_pff;
+ }
+ }
+
+ return 0;
+ fail_pff:
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(dt.pff[i]);
+ fail_routing:
+ routing_fini();
+ fail_nbs_notifier:
+ nbs_unreg_notifier(dt.nbs, &dt.nb_notifier);
+ fail_nbs:
+ nbs_destroy(dt.nbs);
+ fail_connmgr:
+ connmgr_ae_destroy(dt.ae);
+ fail_flows:
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(dt.set[i]);
+
+ return -1;
+}
+
+void dt_fini(void)
+{
+ int i;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ routing_i_destroy(dt.routing[i]);
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(dt.pff[i]);
+
+ routing_fini();
+
+ nbs_unreg_notifier(dt.nbs, &dt.nb_notifier);
+
+ nbs_destroy(dt.nbs);
+
+ connmgr_ae_destroy(dt.ae);
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(dt.set[i]);
+}
+
+int dt_start(void)
+{
+ enum pol_gam pg;
+
+ if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg))
+ != sizeof(pg)) {
+ log_err("Failed to read policy for ribmgr gam.");
+ return -1;
+ }
+
+ dt.gam = gam_create(pg, dt.nbs, dt.ae);
+ if (dt.gam == NULL) {
+ log_err("Failed to init dt graph adjacency manager.");
+ return -1;
+ }
+
+ dt.sdu_sched = sdu_sched_create(dt.set, sdu_handler);
+ if (dt.sdu_sched == NULL) {
+ log_err("Failed to create N-1 SDU scheduler.");
+ gam_destroy(dt.gam);
+ return -1;
+ }
+
+ return 0;
+}
+
+void dt_stop(void)
+{
+ sdu_sched_destroy(dt.sdu_sched);
+
+ gam_destroy(dt.gam);
+}
+
+int dt_write_sdu(struct pci * pci,
+ struct shm_du_buff * sdb)
+{
+ int fd;
+
+ assert(pci);
+ assert(sdb);
+
+ pff_lock(dt.pff[pci->qos_id]);
+
+ fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ pff_unlock(dt.pff[pci->qos_id]);
+ log_err("Could not get nhop for address %" PRIu64,
+ pci->dst_addr);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ pff_unlock(dt.pff[pci->qos_id]);
+
+ if (shm_pci_ser(sdb, pci)) {
+ log_err("Failed to serialize PDU.");
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ return 0;
+}
+
+int dt_write_buf(struct pci * pci,
+ buffer_t * buf)
+{
+ buffer_t * buffer;
+ int fd;
+
+ assert(pci);
+ assert(buf);
+ assert(buf->data);
+
+ pff_lock(dt.pff[pci->qos_id]);
+
+ fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ pff_unlock(dt.pff[pci->qos_id]);
+ log_err("Could not get nhop for address %" PRIu64,
+ pci->dst_addr);
+ return -1;
+ }
+
+ pff_unlock(dt.pff[pci->qos_id]);
+
+ buffer = shm_pci_ser_buf(buf, pci);
+ if (buffer == NULL) {
+ log_err("Failed to serialize buffer.");
+ return -1;
+ }
+
+ if (flow_write(fd, buffer->data, buffer->len) == -1) {
+ log_err("Failed to write buffer to fd.");
+ free(buffer);
+ return -1;
+ }
+
+ free(buffer->data);
+ free(buffer);
+
+ return 0;
+}
diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h
new file mode 100644
index 00000000..dea9b91f
--- /dev/null
+++ b/src/ipcpd/normal/dt.h
@@ -0,0 +1,45 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Data Transfer AE
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_DT_H
+#define OUROBOROS_IPCPD_NORMAL_DT_H
+
+#include <ouroboros/shm_rdrbuff.h>
+#include <ouroboros/utils.h>
+
+#include "shm_pci.h"
+
+int dt_init(void);
+
+void dt_fini(void);
+
+int dt_start(void);
+
+void dt_stop(void);
+
+int dt_write_sdu(struct pci * pci,
+ struct shm_du_buff * sdb);
+
+int dt_write_buf(struct pci * pci,
+ buffer_t * buf);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_DT_H */
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
new file mode 100644
index 00000000..be1080b1
--- /dev/null
+++ b/src/ipcpd/normal/fa.c
@@ -0,0 +1,438 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Flow allocator of the IPC Process
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "flow-allocator"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/rib.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/dev.h>
+
+#include "fa.h"
+#include "sdu_sched.h"
+#include "ipcp.h"
+#include "ribconfig.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "flow_alloc.pb-c.h"
+typedef FlowAllocMsg flow_alloc_msg_t;
+
+#define TIMEOUT 10000 /* nanoseconds */
+
+struct {
+ pthread_rwlock_t flows_lock;
+ cep_id_t fd_to_cep_id[AP_MAX_FLOWS];
+ int cep_id_to_fd[IPCPD_MAX_CONNS];
+
+ flow_set_t * set[QOS_CUBE_MAX];
+ struct sdu_sched * sdu_sched;
+} fa;
+
+static int sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
+{
+ (void) qc;
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ if (frct_i_write_sdu(fa.fd_to_cep_id[fd], sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_flow_del(sdb);
+ log_warn("Failed to hand SDU to FRCT.");
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+int fa_init(void)
+{
+ int i;
+
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ fa.fd_to_cep_id[i] = INVALID_CEP_ID;
+
+ for (i = 0; i < IPCPD_MAX_CONNS; ++i)
+ fa.cep_id_to_fd[i] = -1;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fa.set[i] = flow_set_create();
+ if (fa.set[i] == NULL)
+ goto fail_flows;
+ }
+
+ if (pthread_rwlock_init(&fa.flows_lock, NULL))
+ goto fail_flows;
+
+ return 0;
+fail_flows:
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(fa.set[i]);
+
+ return -1;
+}
+
+void fa_fini(void)
+{
+ int i;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(fa.set[i]);
+
+ pthread_rwlock_destroy(&fa.flows_lock);
+}
+
+int fa_start(void)
+{
+ fa.sdu_sched = sdu_sched_create(fa.set, sdu_handler);
+ if (fa.sdu_sched == NULL) {
+ log_err("Failed to create SDU scheduler.");
+ return -1;
+ }
+
+ return 0;
+}
+
+void fa_stop(void)
+{
+ sdu_sched_destroy(fa.sdu_sched);
+}
+
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qoscube_t qc)
+{
+ cep_id_t cep_id;
+ buffer_t buf;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ char path[RIB_MAX_PATH_LEN + 1];
+ uint64_t addr;
+ ssize_t ch;
+ ssize_t i;
+ char ** children;
+ char hashstr[ipcp_dir_hash_strlen() + 1];
+ char * dst_ipcp = NULL;
+
+ ipcp_hash_str(hashstr, dst);
+
+ assert(strlen(hashstr) + strlen(DIR_PATH) + 1
+ < RIB_MAX_PATH_LEN);
+
+ strcpy(path, DIR_PATH);
+
+ rib_path_append(path, hashstr);
+
+ ch = rib_children(path, &children);
+ if (ch <= 0)
+ return -1;
+
+ for (i = 0; i < ch; ++i)
+ if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0)
+ dst_ipcp = children[i];
+ else
+ free(children[i]);
+
+ free(children);
+
+ if (dst_ipcp == NULL)
+ return -1;
+
+ strcpy(path, MEMBERS_PATH);
+
+ rib_path_append(path, dst_ipcp);
+
+ free(dst_ipcp);
+
+ if (rib_read(path, &addr, sizeof(addr)) < 0)
+ return -1;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
+ msg.has_hash = true;
+ msg.hash.len = ipcp_dir_hash_len();
+ msg.hash.data = (uint8_t *) dst;
+ msg.has_qoscube = true;
+ msg.qoscube = qc;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -1;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ cep_id = frct_i_create(addr, &buf, qc);
+ if (cep_id == INVALID_CEP_ID) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ free(buf.data);
+ return -1;
+ }
+
+ free(buf.data);
+
+ fa.fd_to_cep_id[fd] = cep_id;
+ fa.cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+/* Call under flows lock */
+static int fa_flow_dealloc(int fd)
+{
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+ int ret;
+ qoscube_t qc;
+
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_del(fa.set[qc], fd);
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -ENOMEM;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ ret = frct_i_destroy(fa.fd_to_cep_id[fd], &buf);
+
+ fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] = -1;
+ fa.fd_to_cep_id[fd] = INVALID_CEP_ID;
+
+ free(buf.data);
+
+ return ret;
+}
+
+int fa_alloc_resp(int fd,
+ int response)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
+ msg.response = response;
+ msg.has_response = true;
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -ENOMEM;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ if (response < 0) {
+ frct_i_destroy(fa.fd_to_cep_id[fd], &buf);
+ free(buf.data);
+ fa.cep_id_to_fd[fa.fd_to_cep_id[fd]]
+ = INVALID_CEP_ID;
+ fa.fd_to_cep_id[fd] = -1;
+ } else {
+ qoscube_t qc;
+ ipcp_flow_get_qoscube(fd, &qc);
+ if (frct_i_accept(fa.fd_to_cep_id[fd], &buf, qc)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ free(buf.data);
+ return -1;
+ }
+ flow_set_add(fa.set[qc], fd);
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ free(buf.data);
+
+ return 0;
+}
+
+int fa_dealloc(int fd)
+{
+ int ret;
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ ret = fa_flow_dealloc(fd);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return ret;
+}
+
+int fa_post_buf(cep_id_t cep_id,
+ buffer_t * buf)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ int ret = 0;
+ int fd;
+ flow_alloc_msg_t * msg;
+ qoscube_t qc;
+
+ /* Depending on the message call the function in ipcp-dev.h */
+
+ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ log_err("Failed to unpack flow alloc message");
+ return -1;
+ }
+
+ switch (msg->code) {
+ case FLOW_ALLOC_CODE__FLOW_REQ:
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ if (!msg->has_hash) {
+ log_err("Bad flow request.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ while (ipcpi.alloc_id != -1 &&
+ ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ log_dbg("Won't allocate over non-operational IPCP.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ assert(ipcpi.alloc_id == -1);
+
+ fd = ipcp_flow_req_arr(getpid(),
+ msg->hash.data,
+ ipcp_dir_hash_len(),
+ msg->qoscube);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ log_err("Failed to get fd for flow.");
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.fd_to_cep_id[fd] = cep_id;
+ fa.cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_REPLY:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fd = fa.cep_id_to_fd[cep_id];
+ ret = ipcp_flow_alloc_reply(fd, msg->response);
+ if (msg->response < 0) {
+ fa.fd_to_cep_id[fd] = INVALID_CEP_ID;
+ fa.cep_id_to_fd[cep_id] = -1;
+ } else {
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_add(fa.set[qc],
+ fa.cep_id_to_fd[cep_id]);
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_DEALLOC:
+ fd = fa.cep_id_to_fd[cep_id];
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_del(fa.set[qc], fd);
+ ret = flow_dealloc(fd);
+ break;
+ default:
+ log_err("Got an unknown flow allocation message.");
+ ret = -1;
+ break;
+ }
+
+ flow_alloc_msg__free_unpacked(msg, NULL);
+
+ return ret;
+}
+
+int fa_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb)
+{
+ int fd;
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ fd = fa.cep_id_to_fd[cep_id];
+ if (ipcp_flow_write(fd, sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ log_err("Failed to hand SDU to N flow.");
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h
new file mode 100644
index 00000000..d370a381
--- /dev/null
+++ b/src/ipcpd/normal/fa.h
@@ -0,0 +1,54 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Flow allocator of the IPC Process
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_FA_H
+#define OUROBOROS_IPCPD_NORMAL_FA_H
+
+#include <ouroboros/shared.h>
+#include <ouroboros/utils.h>
+
+#include "frct.h"
+
+int fa_init(void);
+
+void fa_fini(void);
+
+int fa_start(void);
+
+void fa_stop(void);
+
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qoscube_t qos);
+
+int fa_alloc_resp(int fd,
+ int response);
+
+int fa_dealloc(int fd);
+
+int fa_post_buf(cep_id_t cep_id,
+ buffer_t * buf);
+
+int fa_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_FA_H */
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
deleted file mode 100644
index d055b311..00000000
--- a/src/ipcpd/normal/fmgr.c
+++ /dev/null
@@ -1,748 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Flow manager of the IPC Process
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define OUROBOROS_PREFIX "flow-manager"
-
-#include <ouroboros/config.h>
-#include <ouroboros/logs.h>
-#include <ouroboros/dev.h>
-#include <ouroboros/list.h>
-#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/fqueue.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/cacep.h>
-#include <ouroboros/rib.h>
-
-#include "connmgr.h"
-#include "fmgr.h"
-#include "frct.h"
-#include "ipcp.h"
-#include "shm_pci.h"
-#include "ribconfig.h"
-#include "pff.h"
-#include "neighbors.h"
-#include "gam.h"
-#include "routing.h"
-#include "sdu_sched.h"
-
-#include <stdlib.h>
-#include <stdbool.h>
-#include <pthread.h>
-#include <string.h>
-#include <inttypes.h>
-
-#include "flow_alloc.pb-c.h"
-typedef FlowAllocMsg flow_alloc_msg_t;
-
-#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */
-
-struct {
- pthread_rwlock_t np1_flows_lock;
- cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
- int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
-
- flow_set_t * np1_set[QOS_CUBE_MAX];
- struct sdu_sched * np1_sdu_sched;
-
- flow_set_t * nm1_set[QOS_CUBE_MAX];
- struct sdu_sched * nm1_sdu_sched;
-
- struct pff * pff[QOS_CUBE_MAX];
- struct routing_i * routing[QOS_CUBE_MAX];
-
- struct gam * gam;
- struct nbs * nbs;
- struct ae * ae;
-
- struct nb_notifier nb_notifier;
-} fmgr;
-
-static int fmgr_neighbor_event(enum nb_event event,
- struct conn conn)
-{
- qoscube_t cube;
-
- /* We are only interested in neighbors being added and removed. */
- switch (event) {
- case NEIGHBOR_ADDED:
- ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
- flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd);
- log_dbg("Added fd %d to flow set.", conn.flow_info.fd);
- break;
- case NEIGHBOR_REMOVED:
- ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
- flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd);
- log_dbg("Removed fd %d from flow set.", conn.flow_info.fd);
- break;
- default:
- break;
- }
-
- return 0;
-}
-
-static int np1_sdu_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
-{
- (void) qc;
-
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
-
- if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- log_warn("Failed to hand SDU to FRCT.");
- return -1;
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-static int nm1_sdu_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
-{
- struct pci pci;
-
- memset(&pci, 0, sizeof(pci));
-
- shm_pci_des(sdb, &pci);
-
- if (pci.dst_addr != ipcpi.dt_addr) {
- if (pci.ttl == 0) {
- log_dbg("TTL was zero.");
- ipcp_flow_del(sdb);
- return 0;
- }
-
- pff_lock(fmgr.pff[qc]);
-
- fd = pff_nhop(fmgr.pff[qc], pci.dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[qc]);
- log_err("No next hop for %" PRIu64, pci.dst_addr);
- ipcp_flow_del(sdb);
- return -1;
- }
-
- pff_unlock(fmgr.pff[qc]);
-
- if (ipcp_flow_write(fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fd);
- ipcp_flow_del(sdb);
- return -1;
- }
- } else {
- shm_pci_shrink(sdb);
-
- if (frct_nm1_post_sdu(&pci, sdb)) {
- log_err("Failed to hand PDU to FRCT.");
- return -1;
- }
- }
-
- return 0;
-}
-
-static void fmgr_destroy_flows(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- flow_set_destroy(fmgr.nm1_set[i]);
- flow_set_destroy(fmgr.np1_set[i]);
- }
-}
-
-static void fmgr_destroy_routing(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- routing_i_destroy(fmgr.routing[i]);
-}
-
-static void fmgr_destroy_pff(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- pff_destroy(fmgr.pff[i]);
-}
-
-int fmgr_init(void)
-{
- int i;
- int j;
- struct conn_info info;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
-
- for (i = 0; i < IPCPD_MAX_CONNS; ++i)
- fmgr.np1_cep_id_to_fd[i] = -1;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fmgr.np1_set[i] = flow_set_create();
- if (fmgr.np1_set[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
-
- fmgr.nm1_set[i] = flow_set_create();
- if (fmgr.nm1_set[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
- }
-
- if (shm_pci_init()) {
- log_err("Failed to init shm pci.");
- fmgr_destroy_flows();
- return -1;
- }
-
- memset(&info, 0, sizeof(info));
-
- strcpy(info.ae_name, DT_AE);
- strcpy(info.protocol, FRCT_PROTO);
- info.pref_version = 1;
- info.pref_syntax = PROTO_FIXED;
- info.addr = ipcpi.dt_addr;
-
- fmgr.ae = connmgr_ae_create(info);
- if (fmgr.ae == NULL) {
- log_err("Failed to create AE struct.");
- fmgr_destroy_flows();
- return -1;
- }
-
- fmgr.nbs = nbs_create();
- if (fmgr.nbs == NULL) {
- log_err("Failed to create neighbors struct.");
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- fmgr.nb_notifier.notify_call = fmgr_neighbor_event;
- if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) {
- log_err("Failed to register notifier.");
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- if (routing_init(fmgr.nbs)) {
- log_err("Failed to init routing.");
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) {
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fmgr.pff[i] = pff_create();
- if (fmgr.pff[i] == NULL) {
- for (j = 0; j < i; ++j)
- pff_destroy(fmgr.pff[j]);
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- fmgr.routing[i] = routing_i_create(fmgr.pff[i]);
- if (fmgr.routing[i] == NULL) {
- for (j = 0; j < i; ++j)
- routing_i_destroy(fmgr.routing[j]);
- fmgr_destroy_pff();
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
- }
-
- return 0;
-}
-
-void fmgr_fini()
-{
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
-
- fmgr_destroy_routing();
-
- fmgr_destroy_pff();
-
- routing_fini();
-
- fmgr_destroy_flows();
-
- connmgr_ae_destroy(fmgr.ae);
-
- nbs_destroy(fmgr.nbs);
-}
-
-int fmgr_start(void)
-{
- enum pol_gam pg;
-
- if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg))
- != sizeof(pg)) {
- log_err("Failed to read policy for ribmgr gam.");
- return -1;
- }
-
- fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae);
- if (fmgr.gam == NULL) {
- log_err("Failed to init dt graph adjacency manager.");
- return -1;
- }
-
- fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler);
- if (fmgr.nm1_sdu_sched == NULL) {
- log_err("Failed to create N-1 SDU scheduler.");
- gam_destroy(fmgr.gam);
- return -1;
- }
-
- fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler);
- if (fmgr.np1_sdu_sched == NULL) {
- log_err("Failed to create N+1 SDU scheduler.");
- sdu_sched_destroy(fmgr.nm1_sdu_sched);
- gam_destroy(fmgr.gam);
- return -1;
- }
-
- return 0;
-}
-
-void fmgr_stop(void)
-{
- sdu_sched_destroy(fmgr.np1_sdu_sched);
-
- sdu_sched_destroy(fmgr.nm1_sdu_sched);
-
- gam_destroy(fmgr.gam);
-}
-
-int fmgr_np1_alloc(int fd,
- const uint8_t * dst,
- qoscube_t cube)
-{
- cep_id_t cep_id;
- buffer_t buf;
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- char path[RIB_MAX_PATH_LEN + 1];
- uint64_t addr;
- ssize_t ch;
- ssize_t i;
- char ** children;
- char hashstr[ipcp_dir_hash_strlen() + 1];
- char * dst_ipcp = NULL;
-
- ipcp_hash_str(hashstr, dst);
-
- assert(strlen(hashstr) + strlen(DIR_PATH) + 1
- < RIB_MAX_PATH_LEN);
-
- strcpy(path, DIR_PATH);
-
- rib_path_append(path, hashstr);
-
- ch = rib_children(path, &children);
- if (ch <= 0)
- return -1;
-
- for (i = 0; i < ch; ++i)
- if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0)
- dst_ipcp = children[i];
- else
- free(children[i]);
-
- free(children);
-
- if (dst_ipcp == NULL)
- return -1;
-
- strcpy(path, MEMBERS_PATH);
-
- rib_path_append(path, dst_ipcp);
-
- free(dst_ipcp);
-
- if (rib_read(path, &addr, sizeof(addr)) < 0)
- return -1;
-
- msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
- msg.has_hash = true;
- msg.hash.len = ipcp_dir_hash_len();
- msg.hash.data = (uint8_t *) dst;
- msg.has_qoscube = true;
- msg.qoscube = cube;
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -1;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- cep_id = frct_i_create(addr, &buf, cube);
- if (cep_id == INVALID_CEP_ID) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- free(buf.data);
- return -1;
- }
-
- free(buf.data);
-
- fmgr.np1_fd_to_cep_id[fd] = cep_id;
- fmgr.np1_cep_id_to_fd[cep_id] = fd;
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-/* Call under np1_flows lock */
-static int np1_flow_dealloc(int fd)
-{
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- buffer_t buf;
- int ret;
- qoscube_t cube;
-
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_del(fmgr.np1_set[cube], fd);
-
- msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -ENOMEM;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
-
- fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID;
- fmgr.np1_fd_to_cep_id[fd] = -1;
-
- free(buf.data);
-
- return ret;
-}
-
-int fmgr_np1_alloc_resp(int fd,
- int response)
-{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- buffer_t buf;
-
- msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
- msg.response = response;
- msg.has_response = true;
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &ts);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -ENOMEM;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- if (response < 0) {
- frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
- free(buf.data);
- fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]]
- = INVALID_CEP_ID;
- fmgr.np1_fd_to_cep_id[fd] = -1;
- } else {
- qoscube_t cube;
- ipcp_flow_get_qoscube(fd, &cube);
- if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- free(buf.data);
- return -1;
- }
- flow_set_add(fmgr.np1_set[cube], fd);
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- free(buf.data);
-
- return 0;
-}
-
-int fmgr_np1_dealloc(int fd)
-{
- int ret;
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- ret = np1_flow_dealloc(fd);
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return ret;
-}
-
-int fmgr_np1_post_buf(cep_id_t cep_id,
- buffer_t * buf)
-{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- int ret = 0;
- int fd;
- flow_alloc_msg_t * msg;
- qoscube_t cube;
-
- /* Depending on the message call the function in ipcp-dev.h */
-
- msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
- if (msg == NULL) {
- log_err("Failed to unpack flow alloc message");
- return -1;
- }
-
- switch (msg->code) {
- case FLOW_ALLOC_CODE__FLOW_REQ:
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- if (!msg->has_hash) {
- log_err("Bad flow request.");
- return -1;
- }
-
- while (ipcpi.alloc_id != -1 &&
- ipcp_get_state() == IPCP_OPERATIONAL)
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &ts);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- assert(ipcpi.alloc_id == -1);
-
- fd = ipcp_flow_req_arr(getpid(),
- msg->hash.data,
- ipcp_dir_hash_len(),
- msg->qoscube);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- log_err("Failed to get fd for flow.");
- return -1;
- }
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- fmgr.np1_fd_to_cep_id[fd] = cep_id;
- fmgr.np1_cep_id_to_fd[cep_id] = fd;
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- break;
- case FLOW_ALLOC_CODE__FLOW_REPLY:
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- ret = ipcp_flow_alloc_reply(fd, msg->response);
- if (msg->response < 0) {
- fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID;
- fmgr.np1_cep_id_to_fd[cep_id] = -1;
- } else {
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_add(fmgr.np1_set[cube],
- fmgr.np1_cep_id_to_fd[cep_id]);
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- break;
- case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_del(fmgr.np1_set[cube], fd);
- ret = flow_dealloc(fd);
- break;
- default:
- log_err("Got an unknown flow allocation message.");
- ret = -1;
- break;
- }
-
- flow_alloc_msg__free_unpacked(msg, NULL);
-
- return ret;
-}
-
-int fmgr_np1_post_sdu(cep_id_t cep_id,
- struct shm_du_buff * sdb)
-{
- int fd;
-
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
-
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- if (ipcp_flow_write(fd, sdb)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- log_err("Failed to hand SDU to N flow.");
- return -1;
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-int fmgr_nm1_write_sdu(struct pci * pci,
- struct shm_du_buff * sdb)
-{
- int fd;
-
- if (pci == NULL || sdb == NULL)
- return -EINVAL;
-
- pff_lock(fmgr.pff[pci->qos_id]);
- fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[pci->qos_id]);
- log_err("Could not get nhop for address %" PRIu64,
- pci->dst_addr);
- ipcp_flow_del(sdb);
- return -1;
- }
- pff_unlock(fmgr.pff[pci->qos_id]);
-
- if (shm_pci_ser(sdb, pci)) {
- log_err("Failed to serialize PDU.");
- ipcp_flow_del(sdb);
- return -1;
- }
-
- if (ipcp_flow_write(fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fd);
- ipcp_flow_del(sdb);
- return -1;
- }
-
- return 0;
-}
-
-int fmgr_nm1_write_buf(struct pci * pci,
- buffer_t * buf)
-{
- buffer_t * buffer;
- int fd;
-
- if (pci == NULL || buf == NULL || buf->data == NULL)
- return -EINVAL;
-
- pff_lock(fmgr.pff[pci->qos_id]);
- fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[pci->qos_id]);
- log_err("Could not get nhop for address %" PRIu64,
- pci->dst_addr);
- return -1;
- }
- pff_unlock(fmgr.pff[pci->qos_id]);
-
- buffer = shm_pci_ser_buf(buf, pci);
- if (buffer == NULL) {
- log_err("Failed to serialize buffer.");
- return -1;
- }
-
- if (flow_write(fd, buffer->data, buffer->len) == -1) {
- log_err("Failed to write buffer to fd.");
- free(buffer);
- return -1;
- }
-
- free(buffer->data);
- free(buffer);
- return 0;
-}
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
deleted file mode 100644
index c59c0875..00000000
--- a/src/ipcpd/normal/fmgr.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Flow manager of the IPC Process
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#ifndef OUROBOROS_IPCPD_NORMAL_FMGR_H
-#define OUROBOROS_IPCPD_NORMAL_FMGR_H
-
-#include <ouroboros/shared.h>
-#include <ouroboros/qos.h>
-
-#include "ae.h"
-#include "frct.h"
-
-int fmgr_init(void);
-
-void fmgr_fini(void);
-
-int fmgr_start(void);
-
-void fmgr_stop(void);
-
-int fmgr_np1_alloc(int fd,
- const uint8_t * dst,
- qoscube_t qos);
-
-int fmgr_np1_alloc_resp(int fd,
- int response);
-
-int fmgr_np1_dealloc(int fd);
-
-int fmgr_np1_post_buf(cep_id_t id,
- buffer_t * buf);
-
-int fmgr_np1_post_sdu(cep_id_t id,
- struct shm_du_buff * sdb);
-
-int fmgr_nm1_write_sdu(struct pci * pci,
- struct shm_du_buff * sdb);
-
-int fmgr_nm1_write_buf(struct pci * pci,
- buffer_t * buf);
-
-#endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index d873beae..bfcde1b3 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -30,8 +30,9 @@
#include <ouroboros/errno.h>
#include "frct.h"
-#include "fmgr.h"
#include "ipcp.h"
+#include "dt.h"
+#include "fa.h"
#include <stdlib.h>
#include <stdbool.h>
@@ -210,8 +211,8 @@ int frct_fini()
return 0;
}
-int frct_nm1_post_sdu(struct pci * pci,
- struct shm_du_buff * sdb)
+int frct_post_sdu(struct pci * pci,
+ struct shm_du_buff * sdb)
{
struct frct_i * instance;
buffer_t buf;
@@ -250,17 +251,17 @@ int frct_nm1_post_sdu(struct pci * pci,
buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
buf.data = shm_du_buff_head(sdb);
- if (fmgr_np1_post_buf(id, &buf)) {
- log_err("Failed to hand buffer to Flow Manager.");
+ if (fa_post_buf(id, &buf)) {
+ log_err("Failed to hand buffer to FA.");
ipcp_flow_del(sdb);
return -1;
}
ipcp_flow_del(sdb);
} else {
- /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */
- if (fmgr_np1_post_sdu(pci->dst_cep_id, sdb)) {
- log_err("Failed to hand SDU to FMGR.");
+ /* FIXME: Known cep-ids are delivered to FA (minimal DTP) */
+ if (fa_post_sdu(pci->dst_cep_id, sdb)) {
+ log_err("Failed to hand SDU to FA.");
ipcp_flow_del(sdb);
return -1;
}
@@ -301,11 +302,11 @@ cep_id_t frct_i_create(uint64_t address,
pci.seqno = 0;
pci.qos_id = cube;
- if (fmgr_nm1_write_buf(&pci, buf)) {
+ if (dt_write_buf(&pci, buf)) {
pthread_mutex_lock(&frct.instances_lock);
destroy_frct_i(id);
pthread_mutex_unlock(&frct.instances_lock);
- log_err("Failed to hand PDU to FMGR.");
+ log_err("Failed to hand PDU to DT.");
return INVALID_CEP_ID;
}
@@ -350,7 +351,7 @@ int frct_i_accept(cep_id_t id,
pthread_mutex_unlock(&frct.instances_lock);
- if (fmgr_nm1_write_buf(&pci, buf))
+ if (dt_write_buf(&pci, buf))
return -1;
return 0;
@@ -390,7 +391,7 @@ int frct_i_destroy(cep_id_t id,
pthread_mutex_unlock(&frct.instances_lock);
if (buf != NULL && buf->data != NULL)
- if (fmgr_nm1_write_buf(&pci, buf))
+ if (dt_write_buf(&pci, buf))
return -1;
return 0;
@@ -427,9 +428,9 @@ int frct_i_write_sdu(cep_id_t id,
pci.seqno = (instance->seqno)++;
pci.qos_id = instance->cube;
- if (fmgr_nm1_write_sdu(&pci, sdb)) {
+ if (dt_write_sdu(&pci, sdb)) {
pthread_mutex_unlock(&frct.instances_lock);
- log_err("Failed to hand SDU to FMGR.");
+ log_err("Failed to hand SDU to DT.");
return -1;
}
diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h
index a1dcb151..b179e36b 100644
--- a/src/ipcpd/normal/frct.h
+++ b/src/ipcpd/normal/frct.h
@@ -50,7 +50,7 @@ int frct_i_destroy(cep_id_t id,
int frct_i_write_sdu(cep_id_t id,
struct shm_du_buff * sdb);
-int frct_nm1_post_sdu(struct pci * pci,
- struct shm_du_buff * sdb);
+int frct_post_sdu(struct pci * pci,
+ struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCPD_NORMAL_FRCT_H */
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 67424914..ab8cf387 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -36,7 +36,8 @@
#include "connmgr.h"
#include "dir.h"
#include "enroll.h"
-#include "fmgr.h"
+#include "fa.h"
+#include "dt.h"
#include "ipcp.h"
#include "ribconfig.h"
#include "ribmgr.h"
@@ -73,7 +74,7 @@ static int boot_components(void)
&ipcpi.dir_hash_algo, sizeof(ipcpi.dir_hash_algo));
if (len < 0) {
log_err("Failed to read hash length: %zd.", len);
- return -1;
+ goto fail_name;
}
ipcpi.dir_hash_algo = ntoh32(ipcpi.dir_hash_algo);
@@ -82,7 +83,7 @@ static int boot_components(void)
if (rib_add(MEMBERS_PATH, ipcpi.name)) {
log_err("Failed to add name to " MEMBERS_PATH);
- return -1;
+ goto fail_name;
}
log_dbg("Starting components.");
@@ -90,27 +91,25 @@ static int boot_components(void)
if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa))
!= sizeof(pa)) {
log_err("Failed to read policy for address authority.");
- return -1;
+ goto fail_name;
}
if (addr_auth_init(pa)) {
log_err("Failed to init address authority.");
- return -1;
+ goto fail_name;
}
ipcpi.dt_addr = addr_auth_address();
if (ipcpi.dt_addr == 0) {
log_err("Failed to get a valid address.");
- addr_auth_fini();
- return -1;
+ goto fail_addr_auth;
}
path[0] = '\0';
rib_path_append(rib_path_append(path, MEMBERS_NAME), ipcpi.name);
if (rib_write(path, &ipcpi.dt_addr, sizeof(&ipcpi.dt_addr))) {
log_err("Failed to write address to member object.");
- addr_auth_fini();
- return -1;
+ goto fail_addr_auth;
}
log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr);
@@ -119,91 +118,100 @@ static int boot_components(void)
if (ribmgr_init()) {
log_err("Failed to initialize RIB manager.");
- addr_auth_fini();
- return -1;
+ goto fail_addr_auth;
}
if (dir_init()) {
log_err("Failed to initialize directory.");
- ribmgr_fini();
- addr_auth_fini();
- return -1;
+ goto fail_ribmgr;
}
log_dbg("Ribmgr started.");
if (frct_init()) {
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
log_err("Failed to initialize FRCT.");
- return -1;
+ goto fail_dir;
}
- if (fmgr_init()) {
- frct_fini();
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
- log_err("Failed to initialize flow manager component.");
- return -1;
+ if (fa_init()) {
+ log_err("Failed to initialize flow allocator ae.");
+ goto fail_frct;
}
- if (fmgr_start()) {
- fmgr_fini();
- frct_fini();
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
- log_err("Failed to start flow manager.");
- return -1;
+ if (dt_init()) {
+ log_err("Failed to initialize data transfer ae.");
+ goto fail_fa;
+ }
+
+ if (fa_start()) {
+ log_err("Failed to start flow allocator.");
+ goto fail_dt;
+ }
+
+ if (dt_start()) {
+ log_err("Failed to start data transfer ae.");
+ goto fail_fa_start;
}
if (enroll_start()) {
- fmgr_stop();
- fmgr_fini();
- frct_fini();
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
log_err("Failed to start enroll.");
- return -1;
+ goto fail_dt_start;
}
ipcp_set_state(IPCP_OPERATIONAL);
if (connmgr_start()) {
- ipcp_set_state(IPCP_INIT);
- enroll_stop();
- fmgr_stop();
- fmgr_fini();
- frct_fini();
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
log_err("Failed to start AP connection manager.");
- return -1;
+ goto fail_enroll;
}
return 0;
+
+ fail_enroll:
+ ipcp_set_state(IPCP_INIT);
+ enroll_stop();
+ fail_dt_start:
+ dt_stop();
+ fail_fa_start:
+ fa_stop();
+ fail_dt:
+ dt_fini();
+ fail_fa:
+ fa_fini();
+ fail_frct:
+ frct_fini();
+ fail_dir:
+ dir_fini();
+ fail_ribmgr:
+ ribmgr_fini();
+ fail_addr_auth:
+ addr_auth_fini();
+ fail_name:
+ free(ipcpi.dif_name);
+
+ return -1;
}
void shutdown_components(void)
{
- ribmgr_fini();
-
connmgr_stop();
enroll_stop();
- frct_fini();
+ dt_stop();
+
+ fa_stop();
- fmgr_stop();
+ dt_fini();
- fmgr_fini();
+ fa_fini();
+
+ frct_fini();
dir_fini();
+ ribmgr_fini();
+
addr_auth_fini();
free(ipcpi.dif_name);
@@ -366,9 +374,9 @@ static struct ipcp_ops normal_ops = {
.ipcp_reg = dir_reg,
.ipcp_unreg = dir_unreg,
.ipcp_query = dir_query,
- .ipcp_flow_alloc = fmgr_np1_alloc,
- .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp,
- .ipcp_flow_dealloc = fmgr_np1_dealloc
+ .ipcp_flow_alloc = fa_alloc,
+ .ipcp_flow_alloc_resp = fa_alloc_resp,
+ .ipcp_flow_dealloc = fa_dealloc
};
int main(int argc,