summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2017-03-21 16:21:49 +0100
committerSander Vrijders <sander.vrijders@ugent.be>2017-03-21 16:21:49 +0100
commitfef50c3db0e02f0052f1759d508045c44fc4146e (patch)
treefc73859827a5dfebf5022fad37e826d98ba4046f /src/ipcpd/normal/fmgr.c
parent4b257b249ea91d1ee7e2341c563bac561911e8a6 (diff)
parentd4e80d41197b75d2c351659c7e8d4546270e677d (diff)
downloadouroboros-fef50c3db0e02f0052f1759d508045c44fc4146e.tar.gz
ouroboros-fef50c3db0e02f0052f1759d508045c44fc4146e.zip
Merge branch 'be' of bitbucket.org:ouroboros-rina/ouroboros into be
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c307
1 files changed, 184 insertions, 123 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index c2b53abf..184baf82 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -3,7 +3,8 @@
*
* Flow manager of the IPC Process
*
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -31,12 +32,16 @@
#include <ouroboros/cacep.h>
#include <ouroboros/rib.h>
+#include "connmgr.h"
#include "fmgr.h"
#include "frct.h"
#include "ipcp.h"
#include "shm_pci.h"
-#include "gam.h"
#include "ribconfig.h"
+#include "pff.h"
+#include "neighbors.h"
+#include "gam.h"
+#include "routing.h"
#include <stdlib.h>
#include <stdbool.h>
@@ -48,19 +53,7 @@ typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */
-struct nm1_flow {
- struct list_head next;
- int fd;
- qosspec_t qs;
- struct cacep_info * info;
-};
-
struct {
- flow_set_t * nm1_set[QOS_CUBE_MAX];
- fqueue_t * nm1_fqs[QOS_CUBE_MAX];
- struct list_head nm1_flows;
- pthread_rwlock_t nm1_flows_lock;
-
flow_set_t * np1_set[QOS_CUBE_MAX];
fqueue_t * np1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t np1_flows_lock;
@@ -69,15 +62,43 @@ struct {
int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
pthread_t np1_sdu_reader;
+
+ flow_set_t * nm1_set[QOS_CUBE_MAX];
+ fqueue_t * nm1_fqs[QOS_CUBE_MAX];
pthread_t nm1_sdu_reader;
- pthread_t nm1_flow_wait;
- /* FIXME: Replace with PFF */
- int fd;
+ struct pff * pff[QOS_CUBE_MAX];
+ struct routing_i * routing[QOS_CUBE_MAX];
struct gam * gam;
+ struct nbs * nbs;
+ struct ae * ae;
+
+ struct nb_notifier nb_notifier;
} fmgr;
+static int fmgr_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ qoscube_t cube;
+
+ /* We are only interested in neighbors being added and removed. */
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd);
+ break;
+ case NEIGHBOR_REMOVED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
@@ -162,7 +183,7 @@ void * fmgr_nm1_sdu_reader(void * o)
shm_pci_des(sdb, &pci);
- if (pci.dst_addr != ipcpi.address) {
+ if (pci.dst_addr != ipcpi.dt_addr) {
log_dbg("PDU needs to be forwarded.");
if (pci.ttl == 0) {
@@ -171,12 +192,20 @@ void * fmgr_nm1_sdu_reader(void * o)
continue;
}
- /*
- * FIXME: Dropping for now, since
- * we don't have a PFF yet
- */
- ipcp_flow_del(sdb);
- continue;
+ fd = pff_nhop(fmgr.pff[i], pci.dst_addr);
+ if (fd < 0) {
+ log_err("No next hop for %lu",
+ pci.dst_addr);
+ ipcp_flow_del(sdb);
+ continue;
+ }
+
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.",
+ fd);
+ ipcp_flow_del(sdb);
+ continue;
+ }
}
shm_pci_shrink(sdb);
@@ -192,49 +221,6 @@ void * fmgr_nm1_sdu_reader(void * o)
return (void *) 0;
}
-static void * fmgr_nm1_flow_wait(void * o)
-{
- qoscube_t cube;
- struct cacep_info * info;
- int fd;
- qosspec_t qs;
- struct nm1_flow * flow;
-
- (void) o;
-
- while (true) {
- if (gam_flow_wait(fmgr.gam, &fd, &info, &qs)) {
- log_err("Failed to get next flow descriptor.");
- continue;
- }
-
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_add(fmgr.nm1_set[cube], fd);
-
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
-
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
- flow = malloc(sizeof(*flow));
- if (flow == NULL) {
- free(info);
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
- continue;
- }
-
- flow->info = info;
- flow->fd = fd;
- flow->qs = qs;
-
- list_head_init(&flow->next);
- list_add(&flow->next, &fmgr.nm1_flows);
-
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
- }
-
- return (void *) 0;
-}
-
static void fmgr_destroy_flows(void)
{
int i;
@@ -247,13 +233,29 @@ static void fmgr_destroy_flows(void)
}
}
-int fmgr_init(void)
+static void fmgr_destroy_routing(void)
{
- enum pol_cacep pc;
- enum pol_gam pg;
+ int i;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ routing_i_destroy(fmgr.routing[i]);
+}
+static void fmgr_destroy_pff(void)
+{
int i;
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(fmgr.pff[i]);
+}
+
+int fmgr_init(void)
+{
+ enum pol_gam pg;
+ int i;
+ int j;
+ struct conn_info info;
+
for (i = 0; i < AP_MAX_FLOWS; ++i)
fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
@@ -289,78 +291,135 @@ int fmgr_init(void)
if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg))
!= sizeof(pg)) {
log_err("Failed to read policy for ribmgr gam.");
+ fmgr_destroy_flows();
return -1;
}
- if (rib_read(BOOT_PATH "/dt/gam/cacep", &pc, sizeof(pc))
- != sizeof(pc)) {
- log_err("Failed to read CACEP policy for ribmgr gam.");
+ strcpy(info.ae_name, DT_AE);
+ strcpy(info.protocol, FRCT_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_FIXED;
+ info.addr = ipcpi.dt_addr;
+
+ fmgr.ae = connmgr_ae_create(info);
+ if (fmgr.ae == NULL) {
+ log_err("Failed to create AE struct.");
+ fmgr_destroy_flows();
return -1;
}
- /* FIXME: Implement cacep policies */
- (void) pc;
+ fmgr.nbs = nbs_create();
+ if (fmgr.nbs == NULL) {
+ log_err("Failed to create neighbors struct.");
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
- fmgr.gam = gam_create(pg, DT_AE);
- if (fmgr.gam == NULL) {
- log_err("Failed to create graph adjacency manager.");
+ if (routing_init(fmgr.nbs)) {
+ log_err("Failed to init routing.");
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ fmgr.nb_notifier.notify_call = fmgr_neighbor_event;
+ if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) {
+ log_err("Failed to register notifier.");
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) {
+ gam_destroy(fmgr.gam);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
return -1;
}
- list_head_init(&fmgr.nm1_flows);
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fmgr.pff[i] = pff_create();
+ if (fmgr.pff[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ pff_destroy(fmgr.pff[j]);
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ fmgr.routing[i] = routing_i_create(fmgr.pff[i]);
+ if (fmgr.routing[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ routing_i_destroy(fmgr.routing[j]);
+ fmgr_destroy_pff();
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+ }
- pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
- pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
+ fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae);
+ if (fmgr.gam == NULL) {
+ log_err("Failed to init dt graph adjacency manager.");
+ fmgr_destroy_routing();
+ fmgr_destroy_pff();
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);
pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL);
- pthread_create(&fmgr.nm1_flow_wait, NULL, fmgr_nm1_flow_wait, NULL);
return 0;
}
void fmgr_fini()
{
- struct list_head * pos = NULL;
- struct list_head * n = NULL;
- qoscube_t cube;
-
pthread_cancel(fmgr.np1_sdu_reader);
pthread_cancel(fmgr.nm1_sdu_reader);
- pthread_cancel(fmgr.nm1_flow_wait);
pthread_join(fmgr.np1_sdu_reader, NULL);
pthread_join(fmgr.nm1_sdu_reader, NULL);
- pthread_join(fmgr.nm1_flow_wait, NULL);
+
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
gam_destroy(fmgr.gam);
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
-
- list_for_each_safe(pos, n, &fmgr.nm1_flows) {
- struct nm1_flow * flow =
- list_entry(pos, struct nm1_flow, next);
- list_del(&flow->next);
- flow_dealloc(flow->fd);
- ipcp_flow_get_qoscube(flow->fd, &cube);
- flow_set_del(fmgr.nm1_set[cube], flow->fd);
- free(flow->info->name);
- free(flow->info);
- free(flow);
- }
+ fmgr_destroy_routing();
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
+ fmgr_destroy_pff();
- pthread_rwlock_destroy(&fmgr.nm1_flows_lock);
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ routing_fini();
fmgr_destroy_flows();
+
+ connmgr_ae_destroy(fmgr.ae);
+
+ nbs_destroy(fmgr.nbs);
}
int fmgr_np1_alloc(int fd,
char * dst_ap_name,
- char * src_ae_name,
qoscube_t cube)
{
cep_id_t cep_id;
@@ -406,7 +465,6 @@ int fmgr_np1_alloc(int fd,
msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
msg.dst_name = dst_ap_name;
- msg.src_ae_name = src_ae_name;
msg.has_qoscube = true;
msg.qoscube = cube;
@@ -546,7 +604,6 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
case FLOW_ALLOC_CODE__FLOW_REQ:
fd = ipcp_flow_req_arr(getpid(),
msg->dst_name,
- msg->src_ae_name,
msg->qoscube);
if (fd < 0) {
flow_alloc_msg__free_unpacked(msg, NULL);
@@ -615,24 +672,20 @@ int fmgr_np1_post_sdu(cep_id_t cep_id,
return 0;
}
-int fmgr_nm1_flow_arr(int fd,
- qosspec_t qs)
-{
- assert(fmgr.gam);
-
- if (gam_flow_arr(fmgr.gam, fd, qs)) {
- log_err("Failed to hand to graph adjacency manager.");
- return -1;
- }
-
- return 0;
-}
-
int fmgr_nm1_write_sdu(struct pci * pci,
struct shm_du_buff * sdb)
{
+ int fd;
+
if (pci == NULL || sdb == NULL)
+ return -EINVAL;
+
+ fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ log_err("Could not get nhop for address %lu", pci->dst_addr);
+ ipcp_flow_del(sdb);
return -1;
+ }
if (shm_pci_ser(sdb, pci)) {
log_err("Failed to serialize PDU.");
@@ -640,8 +693,8 @@ int fmgr_nm1_write_sdu(struct pci * pci,
return -1;
}
- if (ipcp_flow_write(fmgr.fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fmgr.fd);
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
ipcp_flow_del(sdb);
return -1;
}
@@ -653,9 +706,17 @@ int fmgr_nm1_write_buf(struct pci * pci,
buffer_t * buf)
{
buffer_t * buffer;
+ int fd;
if (pci == NULL || buf == NULL || buf->data == NULL)
+ return -EINVAL;
+
+ fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ log_err("Could not get nhop for address %lu", pci->dst_addr);
+ free(buf->data);
return -1;
+ }
buffer = shm_pci_ser_buf(buf, pci);
if (buffer == NULL) {
@@ -664,7 +725,7 @@ int fmgr_nm1_write_buf(struct pci * pci,
return -1;
}
- if (flow_write(fmgr.fd, buffer->data, buffer->len) == -1) {
+ if (flow_write(fd, buffer->data, buffer->len) == -1) {
log_err("Failed to write buffer to fd.");
free(buffer);
return -1;