summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2017-04-21 11:21:44 +0200
committerSander Vrijders <sander.vrijders@ugent.be>2017-04-21 12:46:37 +0200
commita9d71381a84886007625958b9daea6b2d4a50563 (patch)
tree67b8576e9747d7815c7eed7170f49a10e5a4e0e0
parent4bfd6c07281847405e127e9588376fcf20d07a7e (diff)
downloadouroboros-a9d71381a84886007625958b9daea6b2d4a50563.tar.gz
ouroboros-a9d71381a84886007625958b9daea6b2d4a50563.zip
ipcpd: normal: Split flow manager into DT and FA
This splits the flow manager into the Data Transfer AE, which is in charge of routing SDUs, and the Flow Allocator AE, which handles flow allocations.
-rw-r--r--src/ipcpd/normal/CMakeLists.txt3
-rw-r--r--src/ipcpd/normal/connmgr.c1
-rw-r--r--src/ipcpd/normal/dt.c351
-rw-r--r--src/ipcpd/normal/dt.h45
-rw-r--r--src/ipcpd/normal/fa.c438
-rw-r--r--src/ipcpd/normal/fa.h54
-rw-r--r--src/ipcpd/normal/fmgr.c748
-rw-r--r--src/ipcpd/normal/fmgr.h61
-rw-r--r--src/ipcpd/normal/frct.c29
-rw-r--r--src/ipcpd/normal/frct.h4
-rw-r--r--src/ipcpd/normal/main.c122
11 files changed, 972 insertions, 884 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 2045b8df..9a220ba6 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -22,8 +22,9 @@ set(SOURCE_FILES
addr_auth.c
connmgr.c
dir.c
+ dt.c
enroll.c
- fmgr.c
+ fa.c
frct.c
gam.c
graph.c
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c
index 421bc5b0..56fe9164 100644
--- a/src/ipcpd/normal/connmgr.c
+++ b/src/ipcpd/normal/connmgr.c
@@ -32,7 +32,6 @@
#include "ae.h"
#include "connmgr.h"
#include "enroll.h"
-#include "fmgr.h"
#include "frct.h"
#include "ipcp.h"
#include "ribmgr.h"
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
new file mode 100644
index 00000000..72e0195e
--- /dev/null
+++ b/src/ipcpd/normal/dt.c
@@ -0,0 +1,351 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Data Transfer AE
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "dt-ae"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/rib.h>
+#include <ouroboros/dev.h>
+
+#include "dt.h"
+#include "connmgr.h"
+#include "ipcp.h"
+#include "shm_pci.h"
+#include "pff.h"
+#include "neighbors.h"
+#include "gam.h"
+#include "routing.h"
+#include "sdu_sched.h"
+#include "frct.h"
+#include "ae.h"
+#include "ribconfig.h"
+
+#include <stdlib.h>
+#include <stdbool.h>
+#include <pthread.h>
+#include <string.h>
+#include <inttypes.h>
+#include <assert.h>
+
+struct {
+ flow_set_t * set[QOS_CUBE_MAX];
+ struct sdu_sched * sdu_sched;
+
+ struct pff * pff[QOS_CUBE_MAX];
+ struct routing_i * routing[QOS_CUBE_MAX];
+
+ struct gam * gam;
+ struct nbs * nbs;
+ struct ae * ae;
+
+ struct nb_notifier nb_notifier;
+} dt;
+
+static int dt_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ qoscube_t cube;
+
+ /* We are only interested in neighbors being added and removed. */
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_add(dt.set[cube], conn.flow_info.fd);
+ log_dbg("Added fd %d to flow set.", conn.flow_info.fd);
+ break;
+ case NEIGHBOR_REMOVED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_del(dt.set[cube], conn.flow_info.fd);
+ log_dbg("Removed fd %d from flow set.", conn.flow_info.fd);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+static int sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
+{
+ struct pci pci;
+
+ memset(&pci, 0, sizeof(pci));
+
+ shm_pci_des(sdb, &pci);
+
+ if (pci.dst_addr != ipcpi.dt_addr) {
+ if (pci.ttl == 0) {
+ log_dbg("TTL was zero.");
+ ipcp_flow_del(sdb);
+ return 0;
+ }
+
+ pff_lock(dt.pff[qc]);
+
+ fd = pff_nhop(dt.pff[qc], pci.dst_addr);
+ if (fd < 0) {
+ pff_unlock(dt.pff[qc]);
+ log_err("No next hop for %" PRIu64, pci.dst_addr);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ pff_unlock(dt.pff[qc]);
+
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+ } else {
+ shm_pci_shrink(sdb);
+
+ if (frct_post_sdu(&pci, sdb)) {
+ log_err("Failed to hand PDU to FRCT.");
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int dt_init(void)
+{
+ int i;
+ int j;
+ struct conn_info info;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ dt.set[i] = flow_set_create();
+ if (dt.set[i] == NULL) {
+ goto fail_flows;
+ return -1;
+ }
+ }
+
+ if (shm_pci_init()) {
+ log_err("Failed to init shm pci.");
+ goto fail_flows;
+ return -1;
+ }
+
+ memset(&info, 0, sizeof(info));
+
+ strcpy(info.ae_name, DT_AE);
+ strcpy(info.protocol, FRCT_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_FIXED;
+ info.addr = ipcpi.dt_addr;
+
+ dt.ae = connmgr_ae_create(info);
+ if (dt.ae == NULL) {
+ log_err("Failed to create AE struct.");
+ goto fail_flows;
+ }
+
+ dt.nbs = nbs_create();
+ if (dt.nbs == NULL) {
+ log_err("Failed to create neighbors struct.");
+ goto fail_connmgr;
+ }
+
+ dt.nb_notifier.notify_call = dt_neighbor_event;
+ if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) {
+ log_err("Failed to register notifier.");
+ goto fail_nbs;
+ }
+
+ if (routing_init(dt.nbs)) {
+ log_err("Failed to init routing.");
+ goto fail_nbs_notifier;
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ dt.pff[i] = pff_create();
+ if (dt.pff[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ pff_destroy(dt.pff[j]);
+ goto fail_routing;
+ }
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ dt.routing[i] = routing_i_create(dt.pff[i]);
+ if (dt.routing[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ routing_i_destroy(dt.routing[j]);
+ goto fail_pff;
+ }
+ }
+
+ return 0;
+ fail_pff:
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(dt.pff[i]);
+ fail_routing:
+ routing_fini();
+ fail_nbs_notifier:
+ nbs_unreg_notifier(dt.nbs, &dt.nb_notifier);
+ fail_nbs:
+ nbs_destroy(dt.nbs);
+ fail_connmgr:
+ connmgr_ae_destroy(dt.ae);
+ fail_flows:
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(dt.set[i]);
+
+ return -1;
+}
+
+void dt_fini(void)
+{
+ int i;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ routing_i_destroy(dt.routing[i]);
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(dt.pff[i]);
+
+ routing_fini();
+
+ nbs_unreg_notifier(dt.nbs, &dt.nb_notifier);
+
+ nbs_destroy(dt.nbs);
+
+ connmgr_ae_destroy(dt.ae);
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(dt.set[i]);
+}
+
+int dt_start(void)
+{
+ enum pol_gam pg;
+
+ if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg))
+ != sizeof(pg)) {
+ log_err("Failed to read policy for ribmgr gam.");
+ return -1;
+ }
+
+ dt.gam = gam_create(pg, dt.nbs, dt.ae);
+ if (dt.gam == NULL) {
+ log_err("Failed to init dt graph adjacency manager.");
+ return -1;
+ }
+
+ dt.sdu_sched = sdu_sched_create(dt.set, sdu_handler);
+ if (dt.sdu_sched == NULL) {
+ log_err("Failed to create N-1 SDU scheduler.");
+ gam_destroy(dt.gam);
+ return -1;
+ }
+
+ return 0;
+}
+
+void dt_stop(void)
+{
+ sdu_sched_destroy(dt.sdu_sched);
+
+ gam_destroy(dt.gam);
+}
+
+int dt_write_sdu(struct pci * pci,
+ struct shm_du_buff * sdb)
+{
+ int fd;
+
+ assert(pci);
+ assert(sdb);
+
+ pff_lock(dt.pff[pci->qos_id]);
+
+ fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ pff_unlock(dt.pff[pci->qos_id]);
+ log_err("Could not get nhop for address %" PRIu64,
+ pci->dst_addr);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ pff_unlock(dt.pff[pci->qos_id]);
+
+ if (shm_pci_ser(sdb, pci)) {
+ log_err("Failed to serialize PDU.");
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ return 0;
+}
+
+int dt_write_buf(struct pci * pci,
+ buffer_t * buf)
+{
+ buffer_t * buffer;
+ int fd;
+
+ assert(pci);
+ assert(buf);
+ assert(buf->data);
+
+ pff_lock(dt.pff[pci->qos_id]);
+
+ fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ pff_unlock(dt.pff[pci->qos_id]);
+ log_err("Could not get nhop for address %" PRIu64,
+ pci->dst_addr);
+ return -1;
+ }
+
+ pff_unlock(dt.pff[pci->qos_id]);
+
+ buffer = shm_pci_ser_buf(buf, pci);
+ if (buffer == NULL) {
+ log_err("Failed to serialize buffer.");
+ return -1;
+ }
+
+ if (flow_write(fd, buffer->data, buffer->len) == -1) {
+ log_err("Failed to write buffer to fd.");
+ free(buffer);
+ return -1;
+ }
+
+ free(buffer->data);
+ free(buffer);
+
+ return 0;
+}
diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h
new file mode 100644
index 00000000..dea9b91f
--- /dev/null
+++ b/src/ipcpd/normal/dt.h
@@ -0,0 +1,45 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Data Transfer AE
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_DT_H
+#define OUROBOROS_IPCPD_NORMAL_DT_H
+
+#include <ouroboros/shm_rdrbuff.h>
+#include <ouroboros/utils.h>
+
+#include "shm_pci.h"
+
+int dt_init(void);
+
+void dt_fini(void);
+
+int dt_start(void);
+
+void dt_stop(void);
+
+int dt_write_sdu(struct pci * pci,
+ struct shm_du_buff * sdb);
+
+int dt_write_buf(struct pci * pci,
+ buffer_t * buf);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_DT_H */
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
new file mode 100644
index 00000000..be1080b1
--- /dev/null
+++ b/src/ipcpd/normal/fa.c
@@ -0,0 +1,438 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Flow allocator of the IPC Process
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "flow-allocator"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/rib.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/dev.h>
+
+#include "fa.h"
+#include "sdu_sched.h"
+#include "ipcp.h"
+#include "ribconfig.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "flow_alloc.pb-c.h"
+typedef FlowAllocMsg flow_alloc_msg_t;
+
+#define TIMEOUT 10000 /* nanoseconds */
+
+struct {
+ pthread_rwlock_t flows_lock;
+ cep_id_t fd_to_cep_id[AP_MAX_FLOWS];
+ int cep_id_to_fd[IPCPD_MAX_CONNS];
+
+ flow_set_t * set[QOS_CUBE_MAX];
+ struct sdu_sched * sdu_sched;
+} fa;
+
+static int sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
+{
+ (void) qc;
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ if (frct_i_write_sdu(fa.fd_to_cep_id[fd], sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_flow_del(sdb);
+ log_warn("Failed to hand SDU to FRCT.");
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+int fa_init(void)
+{
+ int i;
+
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ fa.fd_to_cep_id[i] = INVALID_CEP_ID;
+
+ for (i = 0; i < IPCPD_MAX_CONNS; ++i)
+ fa.cep_id_to_fd[i] = -1;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fa.set[i] = flow_set_create();
+ if (fa.set[i] == NULL)
+ goto fail_flows;
+ }
+
+ if (pthread_rwlock_init(&fa.flows_lock, NULL))
+ goto fail_flows;
+
+ return 0;
+fail_flows:
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(fa.set[i]);
+
+ return -1;
+}
+
+void fa_fini(void)
+{
+ int i;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(fa.set[i]);
+
+ pthread_rwlock_destroy(&fa.flows_lock);
+}
+
+int fa_start(void)
+{
+ fa.sdu_sched = sdu_sched_create(fa.set, sdu_handler);
+ if (fa.sdu_sched == NULL) {
+ log_err("Failed to create SDU scheduler.");
+ return -1;
+ }
+
+ return 0;
+}
+
+void fa_stop(void)
+{
+ sdu_sched_destroy(fa.sdu_sched);
+}
+
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qoscube_t qc)
+{
+ cep_id_t cep_id;
+ buffer_t buf;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ char path[RIB_MAX_PATH_LEN + 1];
+ uint64_t addr;
+ ssize_t ch;
+ ssize_t i;
+ char ** children;
+ char hashstr[ipcp_dir_hash_strlen() + 1];
+ char * dst_ipcp = NULL;
+
+ ipcp_hash_str(hashstr, dst);
+
+ assert(strlen(hashstr) + strlen(DIR_PATH) + 1
+ < RIB_MAX_PATH_LEN);
+
+ strcpy(path, DIR_PATH);
+
+ rib_path_append(path, hashstr);
+
+ ch = rib_children(path, &children);
+ if (ch <= 0)
+ return -1;
+
+ for (i = 0; i < ch; ++i)
+ if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0)
+ dst_ipcp = children[i];
+ else
+ free(children[i]);
+
+ free(children);
+
+ if (dst_ipcp == NULL)
+ return -1;
+
+ strcpy(path, MEMBERS_PATH);
+
+ rib_path_append(path, dst_ipcp);
+
+ free(dst_ipcp);
+
+ if (rib_read(path, &addr, sizeof(addr)) < 0)
+ return -1;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
+ msg.has_hash = true;
+ msg.hash.len = ipcp_dir_hash_len();
+ msg.hash.data = (uint8_t *) dst;
+ msg.has_qoscube = true;
+ msg.qoscube = qc;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -1;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ cep_id = frct_i_create(addr, &buf, qc);
+ if (cep_id == INVALID_CEP_ID) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ free(buf.data);
+ return -1;
+ }
+
+ free(buf.data);
+
+ fa.fd_to_cep_id[fd] = cep_id;
+ fa.cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+/* Call under flows lock */
+static int fa_flow_dealloc(int fd)
+{
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+ int ret;
+ qoscube_t qc;
+
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_del(fa.set[qc], fd);
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -ENOMEM;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ ret = frct_i_destroy(fa.fd_to_cep_id[fd], &buf);
+
+ fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] = -1;
+ fa.fd_to_cep_id[fd] = INVALID_CEP_ID;
+
+ free(buf.data);
+
+ return ret;
+}
+
+int fa_alloc_resp(int fd,
+ int response)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
+ msg.response = response;
+ msg.has_response = true;
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -ENOMEM;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ if (response < 0) {
+ frct_i_destroy(fa.fd_to_cep_id[fd], &buf);
+ free(buf.data);
+ fa.cep_id_to_fd[fa.fd_to_cep_id[fd]]
+ = INVALID_CEP_ID;
+ fa.fd_to_cep_id[fd] = -1;
+ } else {
+ qoscube_t qc;
+ ipcp_flow_get_qoscube(fd, &qc);
+ if (frct_i_accept(fa.fd_to_cep_id[fd], &buf, qc)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ free(buf.data);
+ return -1;
+ }
+ flow_set_add(fa.set[qc], fd);
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ free(buf.data);
+
+ return 0;
+}
+
+int fa_dealloc(int fd)
+{
+ int ret;
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ ret = fa_flow_dealloc(fd);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return ret;
+}
+
+int fa_post_buf(cep_id_t cep_id,
+ buffer_t * buf)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ int ret = 0;
+ int fd;
+ flow_alloc_msg_t * msg;
+ qoscube_t qc;
+
+ /* Depending on the message call the function in ipcp-dev.h */
+
+ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ log_err("Failed to unpack flow alloc message");
+ return -1;
+ }
+
+ switch (msg->code) {
+ case FLOW_ALLOC_CODE__FLOW_REQ:
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ if (!msg->has_hash) {
+ log_err("Bad flow request.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ while (ipcpi.alloc_id != -1 &&
+ ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ log_dbg("Won't allocate over non-operational IPCP.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ assert(ipcpi.alloc_id == -1);
+
+ fd = ipcp_flow_req_arr(getpid(),
+ msg->hash.data,
+ ipcp_dir_hash_len(),
+ msg->qoscube);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ log_err("Failed to get fd for flow.");
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.fd_to_cep_id[fd] = cep_id;
+ fa.cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_REPLY:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fd = fa.cep_id_to_fd[cep_id];
+ ret = ipcp_flow_alloc_reply(fd, msg->response);
+ if (msg->response < 0) {
+ fa.fd_to_cep_id[fd] = INVALID_CEP_ID;
+ fa.cep_id_to_fd[cep_id] = -1;
+ } else {
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_add(fa.set[qc],
+ fa.cep_id_to_fd[cep_id]);
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_DEALLOC:
+ fd = fa.cep_id_to_fd[cep_id];
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_del(fa.set[qc], fd);
+ ret = flow_dealloc(fd);
+ break;
+ default:
+ log_err("Got an unknown flow allocation message.");
+ ret = -1;
+ break;
+ }
+
+ flow_alloc_msg__free_unpacked(msg, NULL);
+
+ return ret;
+}
+
+int fa_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb)
+{
+ int fd;
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ fd = fa.cep_id_to_fd[cep_id];
+ if (ipcp_flow_write(fd, sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ log_err("Failed to hand SDU to N flow.");
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h
new file mode 100644
index 00000000..d370a381
--- /dev/null
+++ b/src/ipcpd/normal/fa.h
@@ -0,0 +1,54 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Flow allocator of the IPC Process
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_FA_H
+#define OUROBOROS_IPCPD_NORMAL_FA_H
+
+#include <ouroboros/shared.h>
+#include <ouroboros/utils.h>
+
+#include "frct.h"
+
+int fa_init(void);
+
+void fa_fini(void);
+
+int fa_start(void);
+
+void fa_stop(void);
+
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qoscube_t qos);
+
+int fa_alloc_resp(int fd,
+ int response);
+
+int fa_dealloc(int fd);
+
+int fa_post_buf(cep_id_t cep_id,
+ buffer_t * buf);
+
+int fa_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_FA_H */
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
deleted file mode 100644
index d055b311..00000000
--- a/src/ipcpd/normal/fmgr.c
+++ /dev/null
@@ -1,748 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Flow manager of the IPC Process
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define OUROBOROS_PREFIX "flow-manager"
-
-#include <ouroboros/config.h>
-#include <ouroboros/logs.h>
-#include <ouroboros/dev.h>
-#include <ouroboros/list.h>
-#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/fqueue.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/cacep.h>
-#include <ouroboros/rib.h>
-
-#include "connmgr.h"
-#include "fmgr.h"
-#include "frct.h"
-#include "ipcp.h"
-#include "shm_pci.h"
-#include "ribconfig.h"
-#include "pff.h"
-#include "neighbors.h"
-#include "gam.h"
-#include "routing.h"
-#include "sdu_sched.h"
-
-#include <stdlib.h>
-#include <stdbool.h>
-#include <pthread.h>
-#include <string.h>
-#include <inttypes.h>
-
-#include "flow_alloc.pb-c.h"
-typedef FlowAllocMsg flow_alloc_msg_t;
-
-#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */
-
-struct {
- pthread_rwlock_t np1_flows_lock;
- cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
- int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
-
- flow_set_t * np1_set[QOS_CUBE_MAX];
- struct sdu_sched * np1_sdu_sched;
-
- flow_set_t * nm1_set[QOS_CUBE_MAX];
- struct sdu_sched * nm1_sdu_sched;
-
- struct pff * pff[QOS_CUBE_MAX];
- struct routing_i * routing[QOS_CUBE_MAX];
-
- struct gam * gam;
- struct nbs * nbs;
- struct ae * ae;
-
- struct nb_notifier nb_notifier;
-} fmgr;
-
-static int fmgr_neighbor_event(enum nb_event event,
- struct conn conn)
-{
- qoscube_t cube;
-
- /* We are only interested in neighbors being added and removed. */
- switch (event) {
- case NEIGHBOR_ADDED:
- ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
- flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd);
- log_dbg("Added fd %d to flow set.", conn.flow_info.fd);
- break;
- case NEIGHBOR_REMOVED:
- ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
- flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd);
- log_dbg("Removed fd %d from flow set.", conn.flow_info.fd);
- break;
- default:
- break;
- }
-
- return 0;
-}
-
-static int np1_sdu_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
-{
- (void) qc;
-
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
-
- if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- log_warn("Failed to hand SDU to FRCT.");
- return -1;
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-static int nm1_sdu_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
-{
- struct pci pci;
-
- memset(&pci, 0, sizeof(pci));
-
- shm_pci_des(sdb, &pci);
-
- if (pci.dst_addr != ipcpi.dt_addr) {
- if (pci.ttl == 0) {
- log_dbg("TTL was zero.");
- ipcp_flow_del(sdb);
- return 0;
- }
-
- pff_lock(fmgr.pff[qc]);
-
- fd = pff_nhop(fmgr.pff[qc], pci.dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[qc]);
- log_err("No next hop for %" PRIu64, pci.dst_addr);
- ipcp_flow_del(sdb);
- return -1;
- }
-
- pff_unlock(fmgr.pff[qc]);
-
- if (ipcp_flow_write(fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fd);
- ipcp_flow_del(sdb);
- return -1;
- }
- } else {
- shm_pci_shrink(sdb);
-
- if (frct_nm1_post_sdu(&pci, sdb)) {
- log_err("Failed to hand PDU to FRCT.");
- return -1;
- }
- }
-
- return 0;
-}
-
-static void fmgr_destroy_flows(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- flow_set_destroy(fmgr.nm1_set[i]);
- flow_set_destroy(fmgr.np1_set[i]);
- }
-}
-
-static void fmgr_destroy_routing(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- routing_i_destroy(fmgr.routing[i]);
-}
-
-static void fmgr_destroy_pff(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- pff_destroy(fmgr.pff[i]);
-}
-
-int fmgr_init(void)
-{
- int i;
- int j;
- struct conn_info info;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
-
- for (i = 0; i < IPCPD_MAX_CONNS; ++i)
- fmgr.np1_cep_id_to_fd[i] = -1;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fmgr.np1_set[i] = flow_set_create();
- if (fmgr.np1_set[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
-
- fmgr.nm1_set[i] = flow_set_create();
- if (fmgr.nm1_set[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
- }
-
- if (shm_pci_init()) {
- log_err("Failed to init shm pci.");
- fmgr_destroy_flows();
- return -1;
- }
-
- memset(&info, 0, sizeof(info));
-
- strcpy(info.ae_name, DT_AE);
- strcpy(info.protocol, FRCT_PROTO);
- info.pref_version = 1;
- info.pref_syntax = PROTO_FIXED;
- info.addr = ipcpi.dt_addr;
-
- fmgr.ae = connmgr_ae_create(info);
- if (fmgr.ae == NULL) {
- log_err("Failed to create AE struct.");
- fmgr_destroy_flows();
- return -1;
- }
-
- fmgr.nbs = nbs_create();
- if (fmgr.nbs == NULL) {
- log_err("Failed to create neighbors struct.");
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- fmgr.nb_notifier.notify_call = fmgr_neighbor_event;
- if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) {
- log_err("Failed to register notifier.");
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- if (routing_init(fmgr.nbs)) {
- log_err("Failed to init routing.");
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) {
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fmgr.pff[i] = pff_create();
- if (fmgr.pff[i] == NULL) {
- for (j = 0; j < i; ++j)
- pff_destroy(fmgr.pff[j]);
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- fmgr.routing[i] = routing_i_create(fmgr.pff[i]);
- if (fmgr.routing[i] == NULL) {
- for (j = 0; j < i; ++j)
- routing_i_destroy(fmgr.routing[j]);
- fmgr_destroy_pff();
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
- }
-
- return 0;
-}
-
-void fmgr_fini()
-{
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
-
- fmgr_destroy_routing();
-
- fmgr_destroy_pff();
-
- routing_fini();
-
- fmgr_destroy_flows();
-
- connmgr_ae_destroy(fmgr.ae);
-
- nbs_destroy(fmgr.nbs);
-}
-
-int fmgr_start(void)
-{
- enum pol_gam pg;
-
- if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg))
- != sizeof(pg)) {
- log_err("Failed to read policy for ribmgr gam.");
- return -1;
- }
-
- fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae);
- if (fmgr.gam == NULL) {
- log_err("Failed to init dt graph adjacency manager.");
- return -1;
- }
-
- fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler);
- if (fmgr.nm1_sdu_sched == NULL) {
- log_err("Failed to create N-1 SDU scheduler.");
- gam_destroy(fmgr.gam);
- return -1;
- }
-
- fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler);
- if (fmgr.np1_sdu_sched == NULL) {
- log_err("Failed to create N+1 SDU scheduler.");
- sdu_sched_destroy(fmgr.nm1_sdu_sched);
- gam_destroy(fmgr.gam);
- return -1;
- }
-
- return 0;
-}
-
-void fmgr_stop(void)
-{
- sdu_sched_destroy(fmgr.np1_sdu_sched);
-
- sdu_sched_destroy(fmgr.nm1_sdu_sched);
-
- gam_destroy(fmgr.gam);
-}
-
-int fmgr_np1_alloc(int fd,
- const uint8_t * dst,
- qoscube_t cube)
-{
- cep_id_t cep_id;
- buffer_t buf;
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- char path[RIB_MAX_PATH_LEN + 1];
- uint64_t addr;
- ssize_t ch;
- ssize_t i;
- char ** children;
- char hashstr[ipcp_dir_hash_strlen() + 1];
- char * dst_ipcp = NULL;
-
- ipcp_hash_str(hashstr, dst);
-
- assert(strlen(hashstr) + strlen(DIR_PATH) + 1
- < RIB_MAX_PATH_LEN);
-
- strcpy(path, DIR_PATH);
-
- rib_path_append(path, hashstr);
-
- ch = rib_children(path, &children);
- if (ch <= 0)
- return -1;
-
- for (i = 0; i < ch; ++i)
- if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0)
- dst_ipcp = children[i];
- else
- free(children[i]);
-
- free(children);
-
- if (dst_ipcp == NULL)
- return -1;
-
- strcpy(path, MEMBERS_PATH);
-
- rib_path_append(path, dst_ipcp);
-
- free(dst_ipcp);
-
- if (rib_read(path, &addr, sizeof(addr)) < 0)
- return -1;
-
- msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
- msg.has_hash = true;
- msg.hash.len = ipcp_dir_hash_len();
- msg.hash.data = (uint8_t *) dst;
- msg.has_qoscube = true;
- msg.qoscube = cube;
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -1;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- cep_id = frct_i_create(addr, &buf, cube);
- if (cep_id == INVALID_CEP_ID) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- free(buf.data);
- return -1;
- }
-
- free(buf.data);
-
- fmgr.np1_fd_to_cep_id[fd] = cep_id;
- fmgr.np1_cep_id_to_fd[cep_id] = fd;
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-/* Call under np1_flows lock */
-static int np1_flow_dealloc(int fd)
-{
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- buffer_t buf;
- int ret;
- qoscube_t cube;
-
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_del(fmgr.np1_set[cube], fd);
-
- msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -ENOMEM;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
-
- fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID;
- fmgr.np1_fd_to_cep_id[fd] = -1;
-
- free(buf.data);
-
- return ret;
-}
-
-int fmgr_np1_alloc_resp(int fd,
- int response)
-{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- buffer_t buf;
-
- msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
- msg.response = response;
- msg.has_response = true;
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &ts);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -ENOMEM;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- if (response < 0) {
- frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
- free(buf.data);
- fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]]
- = INVALID_CEP_ID;
- fmgr.np1_fd_to_cep_id[fd] = -1;
- } else {
- qoscube_t cube;
- ipcp_flow_get_qoscube(fd, &cube);
- if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- free(buf.data);
- return -1;
- }
- flow_set_add(fmgr.np1_set[cube], fd);
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- free(buf.data);
-
- return 0;
-}
-
-int fmgr_np1_dealloc(int fd)
-{
- int ret;
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- ret = np1_flow_dealloc(fd);
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return ret;
-}
-
-int fmgr_np1_post_buf(cep_id_t cep_id,
- buffer_t * buf)
-{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- int ret = 0;
- int fd;
- flow_alloc_msg_t * msg;
- qoscube_t cube;
-
- /* Depending on the message call the function in ipcp-dev.h */
-
- msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
- if (msg == NULL) {
- log_err("Failed to unpack flow alloc message");
- return -1;
- }
-
- switch (msg->code) {
- case FLOW_ALLOC_CODE__FLOW_REQ:
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- if (!msg->has_hash) {
- log_err("Bad flow request.");
- return -1;
- }
-
- while (ipcpi.alloc_id != -1 &&
- ipcp_get_state() == IPCP_OPERATIONAL)
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &ts);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- assert(ipcpi.alloc_id == -1);
-
- fd = ipcp_flow_req_arr(getpid(),
- msg->hash.data,
- ipcp_dir_hash_len(),
- msg->qoscube);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- log_err("Failed to get fd for flow.");
- return -1;
- }
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- fmgr.np1_fd_to_cep_id[fd] = cep_id;
- fmgr.np1_cep_id_to_fd[cep_id] = fd;
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- break;
- case FLOW_ALLOC_CODE__FLOW_REPLY:
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- ret = ipcp_flow_alloc_reply(fd, msg->response);
- if (msg->response < 0) {
- fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID;
- fmgr.np1_cep_id_to_fd[cep_id] = -1;
- } else {
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_add(fmgr.np1_set[cube],
- fmgr.np1_cep_id_to_fd[cep_id]);
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- break;
- case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_del(fmgr.np1_set[cube], fd);
- ret = flow_dealloc(fd);
- break;
- default:
- log_err("Got an unknown flow allocation message.");
- ret = -1;
- break;
- }
-
- flow_alloc_msg__free_unpacked(msg, NULL);
-
- return ret;
-}
-
-int fmgr_np1_post_sdu(cep_id_t cep_id,
- struct shm_du_buff * sdb)
-{
- int fd;
-
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
-
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- if (ipcp_flow_write(fd, sdb)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- log_err("Failed to hand SDU to N flow.");
- return -1;
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-int fmgr_nm1_write_sdu(struct pci * pci,
- struct shm_du_buff * sdb)
-{
- int fd;
-
- if (pci == NULL || sdb == NULL)
- return -EINVAL;
-
- pff_lock(fmgr.pff[pci->qos_id]);
- fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[pci->qos_id]);
- log_err("Could not get nhop for address %" PRIu64,
- pci->dst_addr);
- ipcp_flow_del(sdb);
- return -1;
- }
- pff_unlock(fmgr.pff[pci->qos_id]);
-
- if (shm_pci_ser(sdb, pci)) {
- log_err("Failed to serialize PDU.");
- ipcp_flow_del(sdb);
- return -1;
- }
-
- if (ipcp_flow_write(fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fd);
- ipcp_flow_del(sdb);
- return -1;
- }
-
- return 0;
-}
-
-int fmgr_nm1_write_buf(struct pci * pci,
- buffer_t * buf)
-{
- buffer_t * buffer;
- int fd;
-
- if (pci == NULL || buf == NULL || buf->data == NULL)
- return -EINVAL;
-
- pff_lock(fmgr.pff[pci->qos_id]);
- fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[pci->qos_id]);
- log_err("Could not get nhop for address %" PRIu64,
- pci->dst_addr);
- return -1;
- }
- pff_unlock(fmgr.pff[pci->qos_id]);
-
- buffer = shm_pci_ser_buf(buf, pci);
- if (buffer == NULL) {
- log_err("Failed to serialize buffer.");
- return -1;
- }
-
- if (flow_write(fd, buffer->data, buffer->len) == -1) {
- log_err("Failed to write buffer to fd.");
- free(buffer);
- return -1;
- }
-
- free(buffer->data);
- free(buffer);
- return 0;
-}
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
deleted file mode 100644
index c59c0875..00000000
--- a/src/ipcpd/normal/fmgr.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Flow manager of the IPC Process
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#ifndef OUROBOROS_IPCPD_NORMAL_FMGR_H
-#define OUROBOROS_IPCPD_NORMAL_FMGR_H
-
-#include <ouroboros/shared.h>
-#include <ouroboros/qos.h>
-
-#include "ae.h"
-#include "frct.h"
-
-int fmgr_init(void);
-
-void fmgr_fini(void);
-
-int fmgr_start(void);
-
-void fmgr_stop(void);
-
-int fmgr_np1_alloc(int fd,
- const uint8_t * dst,
- qoscube_t qos);
-
-int fmgr_np1_alloc_resp(int fd,
- int response);
-
-int fmgr_np1_dealloc(int fd);
-
-int fmgr_np1_post_buf(cep_id_t id,
- buffer_t * buf);
-
-int fmgr_np1_post_sdu(cep_id_t id,
- struct shm_du_buff * sdb);
-
-int fmgr_nm1_write_sdu(struct pci * pci,
- struct shm_du_buff * sdb);
-
-int fmgr_nm1_write_buf(struct pci * pci,
- buffer_t * buf);
-
-#endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index d873beae..bfcde1b3 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -30,8 +30,9 @@
#include <ouroboros/errno.h>
#include "frct.h"
-#include "fmgr.h"
#include "ipcp.h"
+#include "dt.h"
+#include "fa.h"
#include <stdlib.h>
#include <stdbool.h>
@@ -210,8 +211,8 @@ int frct_fini()
return 0;
}
-int frct_nm1_post_sdu(struct pci * pci,
- struct shm_du_buff * sdb)
+int frct_post_sdu(struct pci * pci,
+ struct shm_du_buff * sdb)
{
struct frct_i * instance;
buffer_t buf;
@@ -250,17 +251,17 @@ int frct_nm1_post_sdu(struct pci * pci,
buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
buf.data = shm_du_buff_head(sdb);
- if (fmgr_np1_post_buf(id, &buf)) {
- log_err("Failed to hand buffer to Flow Manager.");
+ if (fa_post_buf(id, &buf)) {
+ log_err("Failed to hand buffer to FA.");
ipcp_flow_del(sdb);
return -1;
}
ipcp_flow_del(sdb);
} else {
- /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */
- if (fmgr_np1_post_sdu(pci->dst_cep_id, sdb)) {
- log_err("Failed to hand SDU to FMGR.");
+ /* FIXME: Known cep-ids are delivered to FA (minimal DTP) */
+ if (fa_post_sdu(pci->dst_cep_id, sdb)) {
+ log_err("Failed to hand SDU to FA.");
ipcp_flow_del(sdb);
return -1;
}
@@ -301,11 +302,11 @@ cep_id_t frct_i_create(uint64_t address,
pci.seqno = 0;
pci.qos_id = cube;
- if (fmgr_nm1_write_buf(&pci, buf)) {
+ if (dt_write_buf(&pci, buf)) {
pthread_mutex_lock(&frct.instances_lock);
destroy_frct_i(id);
pthread_mutex_unlock(&frct.instances_lock);
- log_err("Failed to hand PDU to FMGR.");
+ log_err("Failed to hand PDU to DT.");
return INVALID_CEP_ID;
}
@@ -350,7 +351,7 @@ int frct_i_accept(cep_id_t id,
pthread_mutex_unlock(&frct.instances_lock);
- if (fmgr_nm1_write_buf(&pci, buf))
+ if (dt_write_buf(&pci, buf))
return -1;
return 0;
@@ -390,7 +391,7 @@ int frct_i_destroy(cep_id_t id,
pthread_mutex_unlock(&frct.instances_lock);
if (buf != NULL && buf->data != NULL)
- if (fmgr_nm1_write_buf(&pci, buf))
+ if (dt_write_buf(&pci, buf))
return -1;
return 0;
@@ -427,9 +428,9 @@ int frct_i_write_sdu(cep_id_t id,
pci.seqno = (instance->seqno)++;
pci.qos_id = instance->cube;
- if (fmgr_nm1_write_sdu(&pci, sdb)) {
+ if (dt_write_sdu(&pci, sdb)) {
pthread_mutex_unlock(&frct.instances_lock);
- log_err("Failed to hand SDU to FMGR.");
+ log_err("Failed to hand SDU to DT.");
return -1;
}
diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h
index a1dcb151..b179e36b 100644
--- a/src/ipcpd/normal/frct.h
+++ b/src/ipcpd/normal/frct.h
@@ -50,7 +50,7 @@ int frct_i_destroy(cep_id_t id,
int frct_i_write_sdu(cep_id_t id,
struct shm_du_buff * sdb);
-int frct_nm1_post_sdu(struct pci * pci,
- struct shm_du_buff * sdb);
+int frct_post_sdu(struct pci * pci,
+ struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCPD_NORMAL_FRCT_H */
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 67424914..ab8cf387 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -36,7 +36,8 @@
#include "connmgr.h"
#include "dir.h"
#include "enroll.h"
-#include "fmgr.h"
+#include "fa.h"
+#include "dt.h"
#include "ipcp.h"
#include "ribconfig.h"
#include "ribmgr.h"
@@ -73,7 +74,7 @@ static int boot_components(void)
&ipcpi.dir_hash_algo, sizeof(ipcpi.dir_hash_algo));
if (len < 0) {
log_err("Failed to read hash length: %zd.", len);
- return -1;
+ goto fail_name;
}
ipcpi.dir_hash_algo = ntoh32(ipcpi.dir_hash_algo);
@@ -82,7 +83,7 @@ static int boot_components(void)
if (rib_add(MEMBERS_PATH, ipcpi.name)) {
log_err("Failed to add name to " MEMBERS_PATH);
- return -1;
+ goto fail_name;
}
log_dbg("Starting components.");
@@ -90,27 +91,25 @@ static int boot_components(void)
if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa))
!= sizeof(pa)) {
log_err("Failed to read policy for address authority.");
- return -1;
+ goto fail_name;
}
if (addr_auth_init(pa)) {
log_err("Failed to init address authority.");
- return -1;
+ goto fail_name;
}
ipcpi.dt_addr = addr_auth_address();
if (ipcpi.dt_addr == 0) {
log_err("Failed to get a valid address.");
- addr_auth_fini();
- return -1;
+ goto fail_addr_auth;
}
path[0] = '\0';
rib_path_append(rib_path_append(path, MEMBERS_NAME), ipcpi.name);
if (rib_write(path, &ipcpi.dt_addr, sizeof(&ipcpi.dt_addr))) {
log_err("Failed to write address to member object.");
- addr_auth_fini();
- return -1;
+ goto fail_addr_auth;
}
log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr);
@@ -119,91 +118,100 @@ static int boot_components(void)
if (ribmgr_init()) {
log_err("Failed to initialize RIB manager.");
- addr_auth_fini();
- return -1;
+ goto fail_addr_auth;
}
if (dir_init()) {
log_err("Failed to initialize directory.");
- ribmgr_fini();
- addr_auth_fini();
- return -1;
+ goto fail_ribmgr;
}
log_dbg("Ribmgr started.");
if (frct_init()) {
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
log_err("Failed to initialize FRCT.");
- return -1;
+ goto fail_dir;
}
- if (fmgr_init()) {
- frct_fini();
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
- log_err("Failed to initialize flow manager component.");
- return -1;
+ if (fa_init()) {
+ log_err("Failed to initialize flow allocator ae.");
+ goto fail_frct;
}
- if (fmgr_start()) {
- fmgr_fini();
- frct_fini();
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
- log_err("Failed to start flow manager.");
- return -1;
+ if (dt_init()) {
+ log_err("Failed to initialize data transfer ae.");
+ goto fail_fa;
+ }
+
+ if (fa_start()) {
+ log_err("Failed to start flow allocator.");
+ goto fail_dt;
+ }
+
+ if (dt_start()) {
+ log_err("Failed to start data transfer ae.");
+ goto fail_fa_start;
}
if (enroll_start()) {
- fmgr_stop();
- fmgr_fini();
- frct_fini();
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
log_err("Failed to start enroll.");
- return -1;
+ goto fail_dt_start;
}
ipcp_set_state(IPCP_OPERATIONAL);
if (connmgr_start()) {
- ipcp_set_state(IPCP_INIT);
- enroll_stop();
- fmgr_stop();
- fmgr_fini();
- frct_fini();
- dir_fini();
- ribmgr_fini();
- addr_auth_fini();
log_err("Failed to start AP connection manager.");
- return -1;
+ goto fail_enroll;
}
return 0;
+
+ fail_enroll:
+ ipcp_set_state(IPCP_INIT);
+ enroll_stop();
+ fail_dt_start:
+ dt_stop();
+ fail_fa_start:
+ fa_stop();
+ fail_dt:
+ dt_fini();
+ fail_fa:
+ fa_fini();
+ fail_frct:
+ frct_fini();
+ fail_dir:
+ dir_fini();
+ fail_ribmgr:
+ ribmgr_fini();
+ fail_addr_auth:
+ addr_auth_fini();
+ fail_name:
+ free(ipcpi.dif_name);
+
+ return -1;
}
void shutdown_components(void)
{
- ribmgr_fini();
-
connmgr_stop();
enroll_stop();
- frct_fini();
+ dt_stop();
+
+ fa_stop();
- fmgr_stop();
+ dt_fini();
- fmgr_fini();
+ fa_fini();
+
+ frct_fini();
dir_fini();
+ ribmgr_fini();
+
addr_auth_fini();
free(ipcpi.dif_name);
@@ -366,9 +374,9 @@ static struct ipcp_ops normal_ops = {
.ipcp_reg = dir_reg,
.ipcp_unreg = dir_unreg,
.ipcp_query = dir_query,
- .ipcp_flow_alloc = fmgr_np1_alloc,
- .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp,
- .ipcp_flow_dealloc = fmgr_np1_dealloc
+ .ipcp_flow_alloc = fa_alloc,
+ .ipcp_flow_alloc_resp = fa_alloc_resp,
+ .ipcp_flow_dealloc = fa_dealloc
};
int main(int argc,