summaryrefslogtreecommitdiff
path: root/src/ipcpd
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2017-03-03 10:31:03 +0000
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2017-03-03 10:31:03 +0000
commitb1b59cc4642faa99514f2288ba1bb5324a79850f (patch)
treecec27d3c2064f0c0bcb564060d9d9012f819b22f /src/ipcpd
parent46c2f9d5363cdff2d99cf1b1c4a41c5bf97d2c03 (diff)
parenta409fd81dfc6d22f9a287f15394b86490dea5273 (diff)
downloadouroboros-b1b59cc4642faa99514f2288ba1bb5324a79850f.tar.gz
ouroboros-b1b59cc4642faa99514f2288ba1bb5324a79850f.zip
Merged in sandervrijders/ouroboros/be-ae-conn (pull request #396)
ipcpd: normal: Refactor application entities and add neighbors struct
Diffstat (limited to 'src/ipcpd')
-rw-r--r--src/ipcpd/normal/CMakeLists.txt2
-rw-r--r--src/ipcpd/normal/connmgr.c8
-rw-r--r--src/ipcpd/normal/connmgr.h2
-rw-r--r--src/ipcpd/normal/enroll.c292
-rw-r--r--src/ipcpd/normal/enroll.h10
-rw-r--r--src/ipcpd/normal/fmgr.c283
-rw-r--r--src/ipcpd/normal/fmgr.h4
-rw-r--r--src/ipcpd/normal/frct.c16
-rw-r--r--src/ipcpd/normal/frct.h2
-rw-r--r--src/ipcpd/normal/gam.c296
-rw-r--r--src/ipcpd/normal/gam.h21
-rw-r--r--src/ipcpd/normal/main.c54
-rw-r--r--src/ipcpd/normal/neighbors.c213
-rw-r--r--src/ipcpd/normal/neighbors.h81
-rw-r--r--src/ipcpd/normal/pff.c7
-rw-r--r--src/ipcpd/normal/pff.h4
-rw-r--r--src/ipcpd/normal/pol-gam-ops.h14
-rw-r--r--src/ipcpd/normal/pol/complete.c189
-rw-r--r--src/ipcpd/normal/pol/complete.h29
-rw-r--r--src/ipcpd/normal/ribconfig.h2
-rw-r--r--src/ipcpd/normal/ribmgr.c84
-rw-r--r--src/ipcpd/normal/ribmgr.h3
-rw-r--r--src/ipcpd/normal/routing.c132
-rw-r--r--src/ipcpd/normal/routing.h42
24 files changed, 1065 insertions, 725 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 70742336..772d5212 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -27,8 +27,10 @@ set(SOURCE_FILES
frct.c
gam.c
main.c
+ neighbors.c
pff.c
ribmgr.c
+ routing.c
shm_pci.c
# Add policies last
pol/complete.c
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c
index 387c38fd..0c908cd1 100644
--- a/src/ipcpd/normal/connmgr.c
+++ b/src/ipcpd/normal/connmgr.c
@@ -42,8 +42,6 @@
#include <stdlib.h>
#include <assert.h>
-#define FRCT_PROTO "frct"
-
struct ae_conn {
struct list_head next;
struct conn conn;
@@ -272,7 +270,7 @@ void connmgr_ae_destroy(struct ae * ae)
int connmgr_alloc(struct ae * ae,
char * dst_name,
- qosspec_t qs,
+ qosspec_t * qs,
struct conn * conn)
{
assert(ae);
@@ -281,13 +279,13 @@ int connmgr_alloc(struct ae * ae,
memset(&conn->conn_info, 0, sizeof(conn->conn_info));
- conn->flow_info.fd = flow_alloc(dst_name, &qs);
+ conn->flow_info.fd = flow_alloc(dst_name, qs);
if (conn->flow_info.fd < 0) {
log_err("Failed to allocate flow to %s.", dst_name);
return -1;
}
- conn->flow_info.qs = qs;
+ conn->flow_info.qs = *qs;
if (flow_alloc_res(conn->flow_info.fd)) {
log_err("Flow allocation to %s failed.", dst_name);
diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h
index bfb3d762..5dbf2bcc 100644
--- a/src/ipcpd/normal/connmgr.h
+++ b/src/ipcpd/normal/connmgr.h
@@ -48,7 +48,7 @@ void connmgr_ae_destroy(struct ae * ae);
int connmgr_alloc(struct ae * ae,
char * dst_name,
- qosspec_t qs,
+ qosspec_t * qs,
struct conn * conn);
int connmgr_wait(struct ae * ae,
diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c
index 9c3b9973..25460161 100644
--- a/src/ipcpd/normal/enroll.c
+++ b/src/ipcpd/normal/enroll.c
@@ -22,6 +22,8 @@
#include <ouroboros/config.h>
#include <ouroboros/endian.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/cdap.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/dev.h>
#include <ouroboros/logs.h>
@@ -29,29 +31,35 @@
#include <ouroboros/errno.h>
#include "ae.h"
-#include "cdap_flow.h"
+#include "connmgr.h"
#include "ribconfig.h"
#include <assert.h>
#include <stdlib.h>
#include <string.h>
+#include <pthread.h>
/* Symbolic, will return current time */
#define TIME_NAME "localtime"
#define TIME_PATH DLR TIME_NAME
#define ENROLL_WARN_TIME_OFFSET 20
-int enroll_handle(int fd)
+struct {
+ struct ae * ae;
+ pthread_t listener;
+} enroll;
+
+static void * enroll_handle(void * o)
{
- struct cdap_flow * flow;
- struct conn_info info;
- cdap_key_t key;
- enum cdap_opcode oc;
- char * name;
- uint8_t * buf;
- uint8_t * data;
- ssize_t len;
- uint32_t flags;
+ struct cdap * cdap;
+ struct conn conn;
+ cdap_key_t key;
+ enum cdap_opcode oc;
+ char * name;
+ uint8_t * buf;
+ uint8_t * data;
+ ssize_t len;
+ uint32_t flags;
bool boot_r = false;
bool members_r = false;
@@ -61,98 +69,107 @@ int enroll_handle(int fd)
char * members_ro = MEMBERS_PATH;
char * dif_ro = DIF_PATH;
- memset(&info, 0, sizeof(info));
-
- strcpy(info.ae_name, ENROLL_AE);
- strcpy(info.protocol, CDAP_PROTO);
- info.pref_version = 1;
- info.pref_syntax = PROTO_GPB;
+ (void) o;
- flow = cdap_flow_arr(fd, 0, &info);
- if (flow == NULL) {
- log_err("Failed to auth enrollment request.");
- flow_dealloc(fd);
- return -1;
- }
-
- while (!(boot_r && members_r && dif_name_r)) {
- key = cdap_request_wait(flow->ci, &oc, &name, &data,
- (size_t *) &len , &flags);
- assert(key >= 0);
- assert(name);
-
- if (data != NULL) {
- free(data);
- log_warn("Received data with enrollment request.");
- }
-
- if (oc != CDAP_READ) {
- log_warn("Invalid request.");
- cdap_reply_send(flow->ci, key, -1, NULL, 0);
- cdap_flow_dealloc(flow);
- free(name);
- return -1;
+ while (true) {
+ if (connmgr_wait(enroll.ae, &conn)) {
+ log_err("Failed to get next connection.");
+ continue;
}
- if (strcmp(name, boot_ro) == 0) {
- boot_r = true;
- } else if (strcmp(name, members_ro) == 0) {
- members_r = true;
- } else if (strcmp(name, dif_ro) == 0) {
- dif_name_r = true;
- } else if (strcmp(name, TIME_PATH) == 0) {
- struct timespec t;
- uint64_t buf[2];
- clock_gettime(CLOCK_REALTIME, &t);
- buf[0] = hton64(t.tv_sec);
- buf[1] = hton64(t.tv_nsec);
- cdap_reply_send(flow->ci, key, 0, buf, sizeof(buf));
- free(name);
+ cdap = cdap_create(conn.flow_info.fd);
+ if (cdap == NULL) {
+ log_err("Failed to instantiate CDAP.");
+ flow_dealloc(conn.flow_info.fd);
continue;
- } else {
- log_warn("Illegal read: %s.", name);
- cdap_reply_send(flow->ci, key, -1, NULL, 0);
- cdap_flow_dealloc(flow);
- free(name);
- return -1;
}
- len = rib_pack(name, &buf, PACK_HASH_ROOT);
- if (len < 0) {
- log_err("Failed to pack %s.", name);
- cdap_reply_send(flow->ci, key, -1, NULL, 0);
- cdap_flow_dealloc(flow);
- free(name);
- return -1;
- }
+ while (!(boot_r && members_r && dif_name_r)) {
+ key = cdap_request_wait(cdap, &oc, &name, &data,
+ (size_t *) &len , &flags);
+ assert(key >= 0);
+ assert(name);
+
+ if (data != NULL) {
+ free(data);
+ log_warn("Received data with enroll request.");
+ }
+
+ if (oc != CDAP_READ) {
+ log_warn("Invalid request.");
+ cdap_reply_send(cdap, key, -1, NULL, 0);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ free(name);
+ continue;
+ }
+
+ if (strcmp(name, boot_ro) == 0) {
+ boot_r = true;
+ } else if (strcmp(name, members_ro) == 0) {
+ members_r = true;
+ } else if (strcmp(name, dif_ro) == 0) {
+ dif_name_r = true;
+ } else if (strcmp(name, TIME_PATH) == 0) {
+ struct timespec t;
+ uint64_t buf[2];
+ clock_gettime(CLOCK_REALTIME, &t);
+ buf[0] = hton64(t.tv_sec);
+ buf[1] = hton64(t.tv_nsec);
+ cdap_reply_send(cdap, key, 0, buf, sizeof(buf));
+ free(name);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ continue;
+ } else {
+ log_warn("Illegal read: %s.", name);
+ cdap_reply_send(cdap, key, -1, NULL, 0);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ free(name);
+ continue;
+ }
+
+ len = rib_pack(name, &buf, PACK_HASH_ROOT);
+ if (len < 0) {
+ log_err("Failed to pack %s.", name);
+ cdap_reply_send(cdap, key, -1, NULL, 0);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ free(name);
+ continue;
+ }
+
+ log_dbg("Packed %s (%zu bytes).", name, len);
- log_dbg("Packed %s (%zu bytes).", name, len);
+ free(name);
- free(name);
+ if (cdap_reply_send(cdap, key, 0, buf, len)) {
+ log_err("Failed to send CDAP reply.");
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ continue;
+ }
- if (cdap_reply_send(flow->ci, key, 0, buf, len)) {
- log_err("Failed to send CDAP reply.");
- cdap_flow_dealloc(flow);
- return -1;
+ free(buf);
}
- free(buf);
- }
-
- log_dbg("Sent boot info to new member.");
+ log_dbg("Sent boot info to new member.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ }
return 0;
}
int enroll_boot(char * dst_name)
{
- struct cdap_flow * flow;
- struct conn_info info;
- cdap_key_t key;
- uint8_t * data;
- size_t len;
+ struct cdap * cdap;
+ cdap_key_t key;
+ uint8_t * data;
+ size_t len;
+ struct conn conn;
struct timespec t0;
struct timespec rtt;
@@ -163,16 +180,14 @@ int enroll_boot(char * dst_name)
char * members_ro = MEMBERS_PATH;
char * dif_ro = DIF_PATH;
- memset(&info, 0, sizeof(info));
-
- strcpy(info.ae_name, ENROLL_AE);
- strcpy(info.protocol, CDAP_PROTO);
- info.pref_version = 1;
- info.pref_syntax = PROTO_GPB;
+ if (connmgr_alloc(enroll.ae, dst_name, NULL, &conn)) {
+ log_err("Failed to get connection.");
+ return -1;
+ }
- flow = cdap_flow_alloc(dst_name, NULL, &info);
- if (flow == NULL) {
- log_err("Failed to allocate flow for enrollment request.");
+ cdap = cdap_create(conn.flow_info.fd);
+ if (cdap == NULL) {
+ log_err("Failed to instantiate CDAP.");
return -1;
}
@@ -180,16 +195,18 @@ int enroll_boot(char * dst_name)
clock_gettime(CLOCK_REALTIME, &t0);
- key = cdap_request_send(flow->ci, CDAP_READ, TIME_PATH, NULL, 0, 0);
+ key = cdap_request_send(cdap, CDAP_READ, TIME_PATH, NULL, 0, 0);
if (key < 0) {
log_err("Failed to send CDAP request.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(flow->ci, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key, &data, &len)) {
log_err("Failed to get CDAP reply.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
@@ -207,16 +224,18 @@ int enroll_boot(char * dst_name)
free(data);
- key = cdap_request_send(flow->ci, CDAP_READ, boot_ro, NULL, 0, 0);
+ key = cdap_request_send(cdap, CDAP_READ, boot_ro, NULL, 0, 0);
if (key < 0) {
log_err("Failed to send CDAP request.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(flow->ci, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key, &data, &len)) {
log_err("Failed to get CDAP reply.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
@@ -226,22 +245,25 @@ int enroll_boot(char * dst_name)
log_warn("Error unpacking RIB data.");
rib_del(boot_ro);
free(data);
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
log_dbg("Packed information inserted into RIB.");
- key = cdap_request_send(flow->ci, CDAP_READ, members_ro, NULL, 0, 0);
+ key = cdap_request_send(cdap, CDAP_READ, members_ro, NULL, 0, 0);
if (key < 0) {
log_err("Failed to send CDAP request.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(flow->ci, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key, &data, &len)) {
log_err("Failed to get CDAP reply.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
@@ -251,22 +273,25 @@ int enroll_boot(char * dst_name)
log_warn("Error unpacking RIB data.");
rib_del(boot_ro);
free(data);
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
log_dbg("Packed information inserted into RIB.");
- key = cdap_request_send(flow->ci, CDAP_READ, dif_ro, NULL, 0, 0);
+ key = cdap_request_send(cdap, CDAP_READ, dif_ro, NULL, 0, 0);
if (key < 0) {
log_err("Failed to send CDAP request.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(flow->ci, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key, &data, &len)) {
log_err("Failed to get CDAP reply.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
@@ -276,13 +301,52 @@ int enroll_boot(char * dst_name)
log_warn("Error unpacking RIB data.");
rib_del(boot_ro);
free(data);
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
log_dbg("Packed information inserted into RIB.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+
+ return 0;
+}
+
+int enroll_init(void)
+{
+ struct conn_info info;
+
+ memset(&info, 0, sizeof(info));
+
+ strcpy(info.ae_name, ENROLL_AE);
+ strcpy(info.protocol, CDAP_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_GPB;
+
+ enroll.ae = connmgr_ae_create(info);
+ if (enroll.ae == NULL)
+ return -1;
+
+ return 0;
+}
+
+void enroll_fini(void)
+{
+ connmgr_ae_destroy(enroll.ae);
+}
+
+int enroll_start(void)
+{
+ if (pthread_create(&enroll.listener, NULL, enroll_handle, NULL))
+ return -1;
return 0;
}
+
+void enroll_stop(void)
+{
+ pthread_cancel(enroll.listener);
+ pthread_join(enroll.listener, NULL);
+}
diff --git a/src/ipcpd/normal/enroll.h b/src/ipcpd/normal/enroll.h
index 2980c380..3c81ae25 100644
--- a/src/ipcpd/normal/enroll.h
+++ b/src/ipcpd/normal/enroll.h
@@ -22,8 +22,14 @@
#ifndef OUROBOROS_IPCPD_NORMAL_ENROLL_H
#define OUROBOROS_IPCPD_NORMAL_ENROLL_H
-int enroll_handle(int fd);
+int enroll_init(void);
-int enroll_boot(char * dst_name);
+void enroll_fini(void);
+
+int enroll_start(void);
+
+void enroll_stop(void);
+
+int enroll_boot(char * dst_name);
#endif /* OUROBOROS_IPCPD_NORMAL_ENROLL_H */
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 34724ddd..b7a99f6c 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -31,12 +31,16 @@
#include <ouroboros/cacep.h>
#include <ouroboros/rib.h>
+#include "connmgr.h"
#include "fmgr.h"
#include "frct.h"
#include "ipcp.h"
#include "shm_pci.h"
-#include "gam.h"
#include "ribconfig.h"
+#include "pff.h"
+#include "neighbors.h"
+#include "gam.h"
+#include "routing.h"
#include <stdlib.h>
#include <stdbool.h>
@@ -48,19 +52,7 @@ typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */
-struct nm1_flow {
- struct list_head next;
- int fd;
- qosspec_t qs;
- struct conn_info * info;
-};
-
struct {
- flow_set_t * nm1_set[QOS_CUBE_MAX];
- fqueue_t * nm1_fqs[QOS_CUBE_MAX];
- struct list_head nm1_flows;
- pthread_rwlock_t nm1_flows_lock;
-
flow_set_t * np1_set[QOS_CUBE_MAX];
fqueue_t * np1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t np1_flows_lock;
@@ -69,15 +61,43 @@ struct {
int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
pthread_t np1_sdu_reader;
+
+ flow_set_t * nm1_set[QOS_CUBE_MAX];
+ fqueue_t * nm1_fqs[QOS_CUBE_MAX];
pthread_t nm1_sdu_reader;
- pthread_t nm1_flow_wait;
- /* FIXME: Replace with PFF */
- int fd;
+ struct pff * pff[QOS_CUBE_MAX];
+ struct routing * routing[QOS_CUBE_MAX];
struct gam * gam;
+ struct nbs * nbs;
+ struct ae * ae;
+
+ struct nb_notifier nb_notifier;
} fmgr;
+static int fmgr_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ qoscube_t cube;
+
+ /* We are only interested in neighbors being added and removed. */
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd);
+ break;
+ case NEIGHBOR_REMOVED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
@@ -171,12 +191,20 @@ void * fmgr_nm1_sdu_reader(void * o)
continue;
}
- /*
- * FIXME: Dropping for now, since
- * we don't have a PFF yet
- */
- ipcp_flow_del(sdb);
- continue;
+ fd = pff_nhop(fmgr.pff[i], pci.dst_addr);
+ if (fd < 0) {
+ log_err("No next hop for %lu",
+ pci.dst_addr);
+ ipcp_flow_del(sdb);
+ continue;
+ }
+
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.",
+ fd);
+ ipcp_flow_del(sdb);
+ continue;
+ }
}
shm_pci_shrink(sdb);
@@ -192,49 +220,6 @@ void * fmgr_nm1_sdu_reader(void * o)
return (void *) 0;
}
-static void * fmgr_nm1_flow_wait(void * o)
-{
- qoscube_t cube;
- struct conn_info * info;
- int fd;
- qosspec_t qs;
- struct nm1_flow * flow;
-
- (void) o;
-
- while (true) {
- if (gam_flow_wait(fmgr.gam, &fd, &info, &qs)) {
- log_err("Failed to get next flow descriptor.");
- continue;
- }
-
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_add(fmgr.nm1_set[cube], fd);
-
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
-
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
- flow = malloc(sizeof(*flow));
- if (flow == NULL) {
- free(info);
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
- continue;
- }
-
- flow->info = info;
- flow->fd = fd;
- flow->qs = qs;
-
- list_head_init(&flow->next);
- list_add(&flow->next, &fmgr.nm1_flows);
-
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
- }
-
- return (void *) 0;
-}
-
static void fmgr_destroy_flows(void)
{
int i;
@@ -247,12 +232,29 @@ static void fmgr_destroy_flows(void)
}
}
-int fmgr_init(void)
+static void fmgr_destroy_routing(void)
{
- enum pol_gam pg;
+ int i;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ routing_destroy(fmgr.routing[i]);
+}
+static void fmgr_destroy_pff(void)
+{
int i;
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(fmgr.pff[i]);
+}
+
+int fmgr_init(void)
+{
+ enum pol_gam pg;
+ int i;
+ int j;
+ struct conn_info info;
+
for (i = 0; i < AP_MAX_FLOWS; ++i)
fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
@@ -288,63 +290,116 @@ int fmgr_init(void)
if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg))
!= sizeof(pg)) {
log_err("Failed to read policy for ribmgr gam.");
+ fmgr_destroy_flows();
return -1;
}
- fmgr.gam = gam_create(pg);
- if (fmgr.gam == NULL) {
- log_err("Failed to create graph adjacency manager.");
+ strcpy(info.ae_name, DT_AE);
+ strcpy(info.protocol, FRCT_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_FIXED;
+ info.addr = ipcpi.dt_addr;
+
+ fmgr.ae = connmgr_ae_create(info);
+ if (fmgr.ae == NULL) {
+ log_err("Failed to create AE struct.");
fmgr_destroy_flows();
return -1;
}
- list_head_init(&fmgr.nm1_flows);
+ fmgr.nbs = nbs_create();
+ if (fmgr.nbs == NULL) {
+ log_err("Failed to create neighbors struct.");
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
- pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
- pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
+ fmgr.nb_notifier.notify_call = fmgr_neighbor_event;
+ if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) {
+ log_err("Failed to register notifier.");
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) {
+ gam_destroy(fmgr.gam);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fmgr.pff[i] = pff_create();
+ if (fmgr.pff[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ pff_destroy(fmgr.pff[j]);
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ fmgr.routing[i] = routing_create(fmgr.pff[i], fmgr.nbs);
+ if (fmgr.routing[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ routing_destroy(fmgr.routing[j]);
+ fmgr_destroy_pff();
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+ }
+
+ fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae);
+ if (fmgr.gam == NULL) {
+ log_err("Failed to init dt graph adjacency manager.");
+ fmgr_destroy_routing();
+ fmgr_destroy_pff();
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);
pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL);
- pthread_create(&fmgr.nm1_flow_wait, NULL, fmgr_nm1_flow_wait, NULL);
return 0;
}
void fmgr_fini()
{
- struct list_head * pos = NULL;
- struct list_head * n = NULL;
- qoscube_t cube;
-
pthread_cancel(fmgr.np1_sdu_reader);
pthread_cancel(fmgr.nm1_sdu_reader);
- pthread_cancel(fmgr.nm1_flow_wait);
pthread_join(fmgr.np1_sdu_reader, NULL);
pthread_join(fmgr.nm1_sdu_reader, NULL);
- pthread_join(fmgr.nm1_flow_wait, NULL);
- gam_destroy(fmgr.gam);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
-
- list_for_each_safe(pos, n, &fmgr.nm1_flows) {
- struct nm1_flow * flow =
- list_entry(pos, struct nm1_flow, next);
- list_del(&flow->next);
- flow_dealloc(flow->fd);
- ipcp_flow_get_qoscube(flow->fd, &cube);
- flow_set_del(fmgr.nm1_set[cube], flow->fd);
- free(flow->info);
- free(flow);
- }
+ gam_destroy(fmgr.gam);
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
+ fmgr_destroy_routing();
- pthread_rwlock_destroy(&fmgr.nm1_flows_lock);
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ fmgr_destroy_pff();
fmgr_destroy_flows();
+
+ connmgr_ae_destroy(fmgr.ae);
+
+ nbs_destroy(fmgr.nbs);
}
int fmgr_np1_alloc(int fd,
@@ -601,24 +656,20 @@ int fmgr_np1_post_sdu(cep_id_t cep_id,
return 0;
}
-int fmgr_nm1_flow_arr(int fd,
- qosspec_t qs)
-{
- assert(fmgr.gam);
-
- if (gam_flow_arr(fmgr.gam, fd, qs)) {
- log_err("Failed to hand to graph adjacency manager.");
- return -1;
- }
-
- return 0;
-}
-
int fmgr_nm1_write_sdu(struct pci * pci,
struct shm_du_buff * sdb)
{
+ int fd;
+
if (pci == NULL || sdb == NULL)
+ return -EINVAL;
+
+ fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ log_err("Could not get nhop for address %lu", pci->dst_addr);
+ ipcp_flow_del(sdb);
return -1;
+ }
if (shm_pci_ser(sdb, pci)) {
log_err("Failed to serialize PDU.");
@@ -626,8 +677,8 @@ int fmgr_nm1_write_sdu(struct pci * pci,
return -1;
}
- if (ipcp_flow_write(fmgr.fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fmgr.fd);
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
ipcp_flow_del(sdb);
return -1;
}
@@ -639,9 +690,17 @@ int fmgr_nm1_write_buf(struct pci * pci,
buffer_t * buf)
{
buffer_t * buffer;
+ int fd;
if (pci == NULL || buf == NULL || buf->data == NULL)
+ return -EINVAL;
+
+ fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ log_err("Could not get nhop for address %lu", pci->dst_addr);
+ free(buf->data);
return -1;
+ }
buffer = shm_pci_ser_buf(buf, pci);
if (buffer == NULL) {
@@ -650,7 +709,7 @@ int fmgr_nm1_write_buf(struct pci * pci,
return -1;
}
- if (flow_write(fmgr.fd, buffer->data, buffer->len) == -1) {
+ if (flow_write(fd, buffer->data, buffer->len) == -1) {
log_err("Failed to write buffer to fd.");
free(buffer);
return -1;
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
index e75417f3..06eab0a1 100644
--- a/src/ipcpd/normal/fmgr.h
+++ b/src/ipcpd/normal/fmgr.h
@@ -53,8 +53,4 @@ int fmgr_nm1_write_sdu(struct pci * pci,
int fmgr_nm1_write_buf(struct pci * pci,
buffer_t * buf);
-int fmgr_nm1_flow_arr(int fd,
- qosspec_t qs);
-
-
#endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index c9b23060..b5a42db4 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -198,12 +198,12 @@ int frct_fini()
return 0;
}
-int frct_nm1_post_sdu(struct pci * pci,
+int frct_nm1_post_sdu(struct pci * pci,
struct shm_du_buff * sdb)
{
struct frct_i * instance;
- buffer_t buf;
- cep_id_t id;
+ buffer_t buf;
+ cep_id_t id;
if (pci == NULL || sdb == NULL)
return -1;
@@ -267,8 +267,8 @@ cep_id_t frct_i_create(uint64_t address,
qoscube_t cube)
{
struct frct_i * instance;
- struct pci pci;
- cep_id_t id;
+ struct pci pci;
+ cep_id_t id;
if (buf == NULL || buf->data == NULL)
return INVALID_CEP_ID;
@@ -304,7 +304,7 @@ int frct_i_accept(cep_id_t id,
buffer_t * buf,
qoscube_t cube)
{
- struct pci pci;
+ struct pci pci;
struct frct_i * instance;
if (buf == NULL || buf->data == NULL)
@@ -347,7 +347,7 @@ int frct_i_accept(cep_id_t id,
int frct_i_destroy(cep_id_t id,
buffer_t * buf)
{
- struct pci pci;
+ struct pci pci;
struct frct_i * instance;
pthread_mutex_lock(&frct.instances_lock);
@@ -390,7 +390,7 @@ int frct_i_destroy(cep_id_t id,
int frct_i_write_sdu(cep_id_t id,
struct shm_du_buff * sdb)
{
- struct pci pci;
+ struct pci pci;
struct frct_i * instance;
if (sdb == NULL)
diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h
index 462b8cc3..d85d11f5 100644
--- a/src/ipcpd/normal/frct.h
+++ b/src/ipcpd/normal/frct.h
@@ -27,6 +27,8 @@
#include "shm_pci.h"
+#define FRCT_PROTO "FRCT"
+
struct frct_i;
int frct_init(void);
diff --git a/src/ipcpd/normal/gam.c b/src/ipcpd/normal/gam.c
index 212cfd83..cb4e662f 100644
--- a/src/ipcpd/normal/gam.c
+++ b/src/ipcpd/normal/gam.c
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Data transfer graph adjacency manager
*
* Dimitri Staessens <dimitri.staessens@intec.ugent.be>
* Sander Vrijders <sander.vrijders@intec.ugent.be>
@@ -20,7 +20,7 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "graph-adjacency-manager"
+#define OUROBOROS_PREFIX "dt-gam"
#include <ouroboros/config.h>
#include <ouroboros/cdap.h>
@@ -40,305 +40,43 @@
#include <pthread.h>
#include <string.h>
-struct ga {
- struct list_head next;
-
- qosspec_t qs;
- int fd;
- struct conn_info * info;
-};
-
struct gam {
- struct list_head gas;
- pthread_mutex_t gas_lock;
- pthread_cond_t gas_cond;
-
struct pol_gam_ops * ops;
void * ops_o;
};
-struct gam * gam_create(enum pol_gam gam_type)
+struct gam * gam_create(enum pol_gam gam_type,
+ struct nbs * nbs,
+ struct ae * ae)
{
- struct gam * tmp;
+ struct gam * gam;
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL)
+ gam = malloc(sizeof(*gam));
+ if (gam == NULL)
return NULL;
switch (gam_type) {
case COMPLETE:
- tmp->ops = &complete_ops;
+ gam->ops = &complete_ops;
break;
default:
log_err("Unknown gam policy: %d.", gam_type);
- free(tmp);
- return NULL;
- }
-
- list_head_init(&tmp->gas);
-
- if (pthread_mutex_init(&tmp->gas_lock, NULL)) {
- free(tmp);
- return NULL;
- }
-
- if (pthread_cond_init(&tmp->gas_cond, NULL)) {
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp);
return NULL;
}
- tmp->ops_o = tmp->ops->create(tmp);
- if (tmp->ops_o == NULL) {
- pthread_cond_destroy(&tmp->gas_cond);
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp);
+ gam->ops_o = gam->ops->create(nbs, ae);
+ if (gam->ops_o == NULL) {
+ free(gam);
return NULL;
}
- if (tmp->ops->start(tmp->ops_o)) {
- pthread_cond_destroy(&tmp->gas_cond);
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp);
- return NULL;
- }
-
- return tmp;
-}
-
-void gam_destroy(struct gam * instance)
-{
- struct list_head * p = NULL;
- struct list_head * n = NULL;
-
- assert(instance);
-
- instance->ops->stop(instance->ops_o);
-
- pthread_mutex_lock(&instance->gas_lock);
-
- list_for_each_safe(p, n, &instance->gas) {
- struct ga * e = list_entry(p, struct ga, next);
- list_del(&e->next);
- free(e->info);
- free(e);
- }
-
- pthread_mutex_unlock(&instance->gas_lock);
-
- pthread_mutex_destroy(&instance->gas_lock);
- pthread_cond_destroy(&instance->gas_cond);
-
- instance->ops->destroy(instance->ops_o);
- free(instance);
-}
-
-static int add_ga(struct gam * instance,
- int fd,
- qosspec_t qs,
- struct conn_info * info)
-{
- struct ga * ga;
-
- ga = malloc(sizeof(*ga));
- if (ga == NULL)
- return -ENOMEM;
-
- ga->fd = fd;
- ga->info = info;
- ga->qs = qs;
-
- list_head_init(&ga->next);
-
- pthread_mutex_lock(&instance->gas_lock);
- list_add(&ga->next, &instance->gas);
- pthread_cond_signal(&instance->gas_cond);
- pthread_mutex_unlock(&instance->gas_lock);
-
- log_info("Added flow.");
-
- return 0;
-}
-
-int gam_flow_arr(struct gam * instance,
- int fd,
- qosspec_t qs)
-{
- struct conn_info * rcv_info;
- struct conn_info snd_info;
-
- if (flow_alloc_resp(fd, instance->ops->accept_new_flow(instance->ops_o))
- < 0) {
- log_err("Could not respond to new flow.");
- return -1;
- }
-
- rcv_info = malloc(sizeof(*rcv_info));
- if (rcv_info == NULL)
- return -ENOMEM;
-
- memset(&snd_info, 0, sizeof(snd_info));
- memset(rcv_info, 0, sizeof(*rcv_info));
-
- /* FIXME: send correct AE */
- strcpy(snd_info.ae_name, "FIXME:CORRECT_AE");
- strcpy(snd_info.protocol, CDAP_PROTO);
- snd_info.pref_version = 1;
- snd_info.pref_syntax = PROTO_GPB;
- snd_info.addr = ipcpi.dt_addr;
-
- if (cacep_rcv(fd, rcv_info)) {
- log_err("Error establishing application connection.");
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- if (cacep_snd(fd, &snd_info)) {
- log_err("Failed to respond to application connection request.");
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- if (strcmp(snd_info.ae_name, rcv_info->ae_name)) {
- log_err("Received connection for wrong AE.");
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- if (strcmp(snd_info.protocol, rcv_info->protocol) ||
- snd_info.pref_version != rcv_info->pref_version ||
- snd_info.pref_syntax != rcv_info->pref_syntax) {
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) {
- flow_dealloc(fd);
- free(rcv_info);
- return 0;
- }
-
- if (add_ga(instance, fd, qs, rcv_info)) {
- log_err("Failed to add ga to graph adjacency manager list.");
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- return 0;
+ return gam;
}
-int gam_flow_alloc(struct gam * instance,
- char * dst_name,
- qosspec_t qs)
+void gam_destroy(struct gam * gam)
{
- struct conn_info * rcv_info;
- struct conn_info snd_info;
- int fd;
-
- log_dbg("Allocating flow to %s.", dst_name);
-
- rcv_info = malloc(sizeof(*rcv_info));
- if (rcv_info == NULL)
- return -ENOMEM;
-
- fd = flow_alloc(dst_name, NULL);
- if (fd < 0) {
- log_err("Failed to allocate flow to %s.", dst_name);
- return -1;
- }
-
- if (flow_alloc_res(fd)) {
- log_err("Flow allocation to %s failed.", dst_name);
- flow_dealloc(fd);
- return -1;
- }
-
- memset(&snd_info, 0, sizeof(snd_info));
- memset(rcv_info, 0, sizeof(*rcv_info));
-
- /* FIXME: send correct AE */
- strcpy(snd_info.ae_name, "FIXME:CORRECT_AE");
- strcpy(snd_info.protocol, CDAP_PROTO);
- snd_info.pref_version = 1;
- snd_info.pref_syntax = PROTO_GPB;
- snd_info.addr = ipcpi.dt_addr;
-
- if (cacep_snd(fd, &snd_info)) {
- log_err("Failed to create application connection.");
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- if (cacep_rcv(fd, rcv_info)) {
- log_err("Failed to connect to application.");
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- if (strcmp(snd_info.protocol, rcv_info->protocol) ||
- snd_info.pref_version != rcv_info->pref_version ||
- snd_info.pref_syntax != rcv_info->pref_syntax) {
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) {
- flow_dealloc(fd);
- free(rcv_info);
- return 0;
- }
-
- if (add_ga(instance, fd, qs, rcv_info)) {
- log_err("Failed to add GA to graph adjacency manager list.");
- flow_dealloc(fd);
- free(rcv_info);
- return -1;
- }
-
- return 0;
-}
-
-int gam_flow_wait(struct gam * instance,
- int * fd,
- struct conn_info ** info,
- qosspec_t * qs)
-{
- struct ga * ga;
-
- assert(fd);
- assert(info);
- assert(qs);
-
- pthread_mutex_lock(&instance->gas_lock);
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) &instance->gas_lock);
-
- while (list_is_empty(&instance->gas))
- pthread_cond_wait(&instance->gas_cond, &instance->gas_lock);
-
- ga = list_first_entry((&instance->gas), struct ga, next);
- if (ga == NULL) {
- pthread_mutex_unlock(&instance->gas_lock);
- return -1;
- }
-
- *fd = ga->fd;
- *info = ga->info;
- *qs = ga->qs;
-
- list_del(&ga->next);
- free(ga);
-
- pthread_cleanup_pop(true);
+ assert(gam);
- return 0;
+ gam->ops->destroy(gam->ops_o);
+ free(gam);
}
diff --git a/src/ipcpd/normal/gam.h b/src/ipcpd/normal/gam.h
index 58b028b9..01a6e40e 100644
--- a/src/ipcpd/normal/gam.h
+++ b/src/ipcpd/normal/gam.h
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Data transfer graph adjacency manager
*
* Dimitri Staessens <dimitri.staessens@intec.ugent.be>
* Sander Vrijders <sander.vrijders@intec.ugent.be>
@@ -26,21 +26,12 @@
#include <ouroboros/cacep.h>
#include <ouroboros/irm_config.h>
-struct gam * gam_create(enum pol_gam gam_type);
+#include "neighbors.h"
-void gam_destroy(struct gam * instance);
+struct gam * gam_create(enum pol_gam gam_type,
+ struct nbs * nbs,
+ struct ae * ae);
-int gam_flow_arr(struct gam * instance,
- int fd,
- qosspec_t qs);
-
-int gam_flow_alloc(struct gam * instance,
- char * dst_name,
- qosspec_t qs);
-
-int gam_flow_wait(struct gam * instance,
- int * fd,
- struct conn_info ** info,
- qosspec_t * qs);
+void gam_destroy(struct gam * gam);
#endif /* OUROBOROS_IPCPD_NORMAL_GAM_H */
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 3e5907a8..8b9a7c09 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -105,11 +105,6 @@ static int boot_components(void)
log_dbg("Starting components.");
- if (connmgr_init()) {
- log_err("Failed to init ap connection manager");
- return -1;
- }
-
if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa))
!= sizeof(pa)) {
log_err("Failed to read policy for address authority.");
@@ -126,7 +121,6 @@ static int boot_components(void)
if (ipcpi.dt_addr == 0) {
log_err("Failed to get a valid address.");
addr_auth_fini();
- connmgr_fini();
return -1;
}
@@ -137,7 +131,6 @@ static int boot_components(void)
if (ribmgr_init()) {
log_err("Failed to initialize RIB manager.");
addr_auth_fini();
- connmgr_fini();
return -1;
}
@@ -145,7 +138,6 @@ static int boot_components(void)
log_err("Failed to initialize directory.");
ribmgr_fini();
addr_auth_fini();
- connmgr_fini();
return -1;
}
@@ -155,7 +147,6 @@ static int boot_components(void)
dir_fini();
ribmgr_fini();
addr_auth_fini();
- connmgr_fini();
log_err("Failed to start flow manager.");
return -1;
}
@@ -165,20 +156,29 @@ static int boot_components(void)
dir_fini();
ribmgr_fini();
addr_auth_fini();
- connmgr_fini();
log_err("Failed to initialize FRCT.");
return -1;
}
+ if (enroll_start()) {
+ fmgr_fini();
+ dir_fini();
+ ribmgr_fini();
+ addr_auth_fini();
+ log_err("Failed to start enroll.");
+ return -1;
+ }
+
ipcp_set_state(IPCP_OPERATIONAL);
if (connmgr_start()) {
ipcp_set_state(IPCP_INIT);
+ enroll_stop();
+ frct_fini();
fmgr_fini();
dir_fini();
ribmgr_fini();
addr_auth_fini();
- connmgr_fini();
log_err("Failed to start AP connection manager.");
return -1;
}
@@ -190,6 +190,8 @@ void shutdown_components(void)
{
connmgr_stop();
+ enroll_stop();
+
frct_fini();
fmgr_fini();
@@ -199,8 +201,6 @@ void shutdown_components(void)
ribmgr_fini();
addr_auth_fini();
-
- connmgr_fini();
}
static int normal_ipcp_enroll(char * dst_name)
@@ -418,11 +418,33 @@ int main(int argc,
exit(EXIT_FAILURE);
}
+
+ if (connmgr_init()) {
+ log_err("Failed to initialize connection manager.");
+ ipcp_create_r(getpid(), -1);
+ rib_fini();
+ irm_unbind_api(getpid(), ipcpi.name);
+ ipcp_fini();
+ exit(EXIT_FAILURE);
+ }
+
+ if (enroll_init()) {
+ log_err("Failed to initialize enroll component.");
+ ipcp_create_r(getpid(), -1);
+ connmgr_fini();
+ rib_fini();
+ irm_unbind_api(getpid(), ipcpi.name);
+ ipcp_fini();
+ exit(EXIT_FAILURE);
+ }
+
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
if (ipcp_boot() < 0) {
log_err("Failed to boot IPCP.");
ipcp_create_r(getpid(), -1);
+ enroll_fini();
+ connmgr_fini();
rib_fini();
irm_unbind_api(getpid(), ipcpi.name);
ipcp_fini();
@@ -435,6 +457,8 @@ int main(int argc,
log_err("Failed to notify IRMd we are initialized.");
ipcp_set_state(IPCP_NULL);
ipcp_shutdown();
+ enroll_fini();
+ connmgr_fini();
rib_fini();
irm_unbind_api(getpid(), ipcpi.name);
ipcp_fini();
@@ -448,6 +472,10 @@ int main(int argc,
rib_fini();
+ connmgr_fini();
+
+ enroll_fini();
+
irm_unbind_api(getpid(), ipcpi.name);
ipcp_fini();
diff --git a/src/ipcpd/normal/neighbors.c b/src/ipcpd/normal/neighbors.c
new file mode 100644
index 00000000..40ef0d73
--- /dev/null
+++ b/src/ipcpd/normal/neighbors.c
@@ -0,0 +1,213 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Data transfer neighbors
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "neighbors"
+
+#include <ouroboros/config.h>
+#include <ouroboros/shared.h>
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/logs.h>
+
+#include "neighbors.h"
+
+#include <stdlib.h>
+#include <assert.h>
+#include <inttypes.h>
+
+static void notify_listeners(enum nb_event event,
+ struct nb * nb,
+ struct nbs * nbs)
+{
+ struct list_head * p = NULL;
+
+ list_for_each(p, &nbs->notifiers) {
+ struct nb_notifier * e =
+ list_entry(p, struct nb_notifier, next);
+ if (e->notify_call(event, nb->conn))
+ log_err("Listener reported an error.");
+ }
+}
+
+struct nbs * nbs_create(void)
+{
+ struct nbs * nbs;
+
+ nbs = malloc(sizeof(*nbs));
+ if (nbs == NULL)
+ return NULL;
+
+ list_head_init(&nbs->list);
+ list_head_init(&nbs->notifiers);
+
+ if (pthread_mutex_init(&nbs->list_lock, NULL))
+ return NULL;
+
+ if (pthread_mutex_init(&nbs->notifiers_lock, NULL)) {
+ pthread_mutex_destroy(&nbs->list_lock);
+ return NULL;
+ }
+
+ return nbs;
+}
+
+void nbs_destroy(struct nbs * nbs)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ assert(nbs);
+
+ pthread_mutex_lock(&nbs->list_lock);
+
+ list_for_each_safe(p, n, &nbs->list) {
+ struct nb * e = list_entry(p, struct nb, next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ pthread_mutex_unlock(&nbs->list_lock);
+
+ pthread_mutex_destroy(&nbs->list_lock);
+ pthread_mutex_destroy(&nbs->notifiers_lock);
+}
+
+int nbs_add(struct nbs * nbs,
+ struct conn conn)
+{
+ struct nb * nb;
+
+ assert(nbs);
+
+ nb = malloc(sizeof(*nb));
+ if (nb == NULL)
+ return -ENOMEM;
+
+ nb->conn = conn;
+
+ list_head_init(&nb->next);
+
+ pthread_mutex_lock(&nbs->list_lock);
+
+ list_add(&nb->next, &nbs->list);
+
+ notify_listeners(NEIGHBOR_ADDED, nb, nbs);
+
+ pthread_mutex_unlock(&nbs->list_lock);
+
+ log_info("Added neighbor with address %" PRIu64 " to list.",
+ conn.conn_info.addr);
+
+ return 0;
+}
+
+int nbs_update_qos(struct nbs * nbs,
+ int fd,
+ qosspec_t qs)
+{
+ struct list_head * p = NULL;
+
+ assert(nbs);
+
+ pthread_mutex_lock(&nbs->list_lock);
+
+ list_for_each(p, &nbs->list) {
+ struct nb * e = list_entry(p, struct nb, next);
+ if (e->conn.flow_info.fd == fd) {
+ e->conn.flow_info.qs = qs;
+
+ notify_listeners(NEIGHBOR_QOS_CHANGE, e, nbs);
+
+ pthread_mutex_unlock(&nbs->list_lock);
+ return 0;
+ }
+ }
+
+ pthread_mutex_unlock(&nbs->list_lock);
+
+ return -1;
+}
+
+int nbs_del(struct nbs * nbs,
+ int fd)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ assert(nbs);
+
+ pthread_mutex_lock(&nbs->list_lock);
+
+ list_for_each_safe(p, n, &nbs->list) {
+ struct nb * e = list_entry(p, struct nb, next);
+ if (e->conn.flow_info.fd == fd) {
+ notify_listeners(NEIGHBOR_REMOVED, e, nbs);
+ list_del(&e->next);
+ free(e);
+ pthread_mutex_unlock(&nbs->list_lock);
+ return 0;
+ }
+ }
+
+ pthread_mutex_unlock(&nbs->list_lock);
+
+ return -1;
+}
+
+int nbs_reg_notifier(struct nbs * nbs,
+ struct nb_notifier * notify)
+{
+ assert(nbs);
+ assert(notify);
+
+ pthread_mutex_lock(&nbs->notifiers_lock);
+
+ list_head_init(&notify->next);
+ list_add(&notify->next, &nbs->notifiers);
+
+ pthread_mutex_unlock(&nbs->notifiers_lock);
+
+ return 0;
+}
+
+int nbs_unreg_notifier(struct nbs * nbs,
+ struct nb_notifier * notify)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ pthread_mutex_lock(&nbs->notifiers_lock);
+
+ list_for_each_safe(p, n, &nbs->notifiers) {
+ struct nb_notifier * e =
+ list_entry(p, struct nb_notifier, next);
+ if (e == notify) {
+ list_del(&e->next);
+ pthread_mutex_unlock(&nbs->notifiers_lock);
+ return 0;
+ }
+ }
+
+ pthread_mutex_unlock(&nbs->notifiers_lock);
+
+ return -1;
+}
diff --git a/src/ipcpd/normal/neighbors.h b/src/ipcpd/normal/neighbors.h
new file mode 100644
index 00000000..743bc7b8
--- /dev/null
+++ b/src/ipcpd/normal/neighbors.h
@@ -0,0 +1,81 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Data transfer neighbors
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H
+#define OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H
+
+#include <ouroboros/irm_config.h>
+#include <ouroboros/list.h>
+#include <ouroboros/qos.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/cacep.h>
+
+#include "connmgr.h"
+
+enum nb_event {
+ NEIGHBOR_ADDED,
+ NEIGHBOR_REMOVED,
+ NEIGHBOR_QOS_CHANGE
+};
+
+typedef int (* nb_notify_t)(enum nb_event event,
+ struct conn conn);
+
+struct nb_notifier {
+ struct list_head next;
+ nb_notify_t notify_call;
+};
+
+struct nb {
+ struct list_head next;
+ struct conn conn;
+};
+
+struct nbs {
+ struct list_head notifiers;
+ pthread_mutex_t notifiers_lock;
+
+ struct list_head list;
+ pthread_mutex_t list_lock;
+};
+
+struct nbs * nbs_create(void);
+
+void nbs_destroy(struct nbs * nbs);
+
+int nbs_add(struct nbs * nbs,
+ struct conn conn);
+
+int nbs_update_qos(struct nbs * nbs,
+ int fd,
+ qosspec_t qs);
+
+int nbs_del(struct nbs * nbs,
+ int fd);
+
+int nbs_reg_notifier(struct nbs * nbs,
+ struct nb_notifier * notify);
+
+int nbs_unreg_notifier(struct nbs * nbs,
+ struct nb_notifier * notify);
+
+#endif
diff --git a/src/ipcpd/normal/pff.c b/src/ipcpd/normal/pff.c
index 2f7d554b..b44f79bf 100644
--- a/src/ipcpd/normal/pff.c
+++ b/src/ipcpd/normal/pff.c
@@ -55,15 +55,16 @@ struct pff * pff_create(void)
return tmp;
}
-int pff_destroy(struct pff * instance)
+void pff_destroy(struct pff * instance)
{
assert(instance);
+ pthread_mutex_lock(&instance->lock);
htable_destroy(instance->table);
+ pthread_mutex_unlock(&instance->lock);
+
pthread_mutex_destroy(&instance->lock);
free(instance);
-
- return 0;
}
int pff_add(struct pff * instance, uint64_t addr, int fd)
diff --git a/src/ipcpd/normal/pff.h b/src/ipcpd/normal/pff.h
index b4a1400b..d4edb90c 100644
--- a/src/ipcpd/normal/pff.h
+++ b/src/ipcpd/normal/pff.h
@@ -25,8 +25,6 @@
#include <stdint.h>
-struct pff;
-
/*
* PFF will take a type in the future,
* to allow different policies.
@@ -34,7 +32,7 @@ struct pff;
*/
struct pff * pff_create(void);
-int pff_destroy(struct pff * instance);
+void pff_destroy(struct pff * instance);
int pff_add(struct pff * instance,
uint64_t addr,
diff --git a/src/ipcpd/normal/pol-gam-ops.h b/src/ipcpd/normal/pol-gam-ops.h
index 264f252b..a7753b8b 100644
--- a/src/ipcpd/normal/pol-gam-ops.h
+++ b/src/ipcpd/normal/pol-gam-ops.h
@@ -24,21 +24,13 @@
#define OUROBOROS_IPCPD_NORMAL_POL_GAM_OPS_H
#include <ouroboros/cacep.h>
+#include <ouroboros/qos.h>
struct pol_gam_ops {
- void * (* create)(struct gam * instance);
+ void * (* create)(struct nbs * nbs,
+ struct ae * ae);
void (* destroy)(void * o);
-
- int (* start)(void * o);
-
- int (* stop)(void * o);
-
- int (* accept_new_flow)(void * o);
-
- int (* accept_flow)(void * o,
- qosspec_t qs,
- const struct conn_info * info);
};
#endif /* OUROBOROS_IPCPD_NORMAL_POL_GAM_OPS_H */
diff --git a/src/ipcpd/normal/pol/complete.c b/src/ipcpd/normal/pol/complete.c
index daf8c9bf..f84c3a23 100644
--- a/src/ipcpd/normal/pol/complete.c
+++ b/src/ipcpd/normal/pol/complete.c
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Sets up a complete graph
*
* Dimitri Staessens <dimitri.staessens@intec.ugent.be>
* Sander Vrijders <sander.vrijders@intec.ugent.be>
@@ -20,35 +20,54 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "complete-graph-adjacency-manager"
+#define OUROBOROS_PREFIX "complete"
#include <ouroboros/config.h>
-#include <ouroboros/logs.h>
-#include <ouroboros/list.h>
-#include <ouroboros/qos.h>
+#include <ouroboros/shared.h>
#include <ouroboros/rib.h>
+#include <ouroboros/dev.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/cacep.h>
-#include "ipcp.h"
-#include "gam.h"
+#include "neighbors.h"
+#include "frct.h"
#include "ribconfig.h"
+#include "ipcp.h"
+#include "ae.h"
#include <string.h>
#include <stdlib.h>
#include <assert.h>
-struct neighbor {
- struct list_head next;
- uint64_t neighbor;
+struct complete {
+ struct nbs * nbs;
+ struct ae * ae;
+ pthread_t allocator;
+ pthread_t listener;
};
-struct complete {
- struct list_head neighbors;
- pthread_mutex_t neighbors_lock;
+static void * listener(void * o)
+{
+ struct complete * complete;
+ struct conn conn;
- pthread_t allocator;
+ complete = (struct complete *) o;
- struct gam * gam;
-};
+ while (true) {
+ if (connmgr_wait(complete->ae, &conn)) {
+ log_err("Error while getting next connection.");
+ continue;
+ }
+
+ if (nbs_add(complete->nbs, conn)) {
+ log_err("Failed to add neighbor.");
+ continue;
+ }
+ }
+
+ return (void *) 0;
+}
static void * allocator(void * o)
{
@@ -56,10 +75,10 @@ static void * allocator(void * o)
ssize_t len;
char ** children;
ssize_t i;
- struct complete * complete = (struct complete *) o;
+ struct complete * complete;
+ struct conn conn;
- assert(complete);
- assert(complete->gam);
+ complete = (struct complete *) o;
qs.delay = 0;
qs.jitter = 0;
@@ -67,8 +86,23 @@ static void * allocator(void * o)
/* FIXME: subscribe to members to keep the graph complete. */
len = rib_children("/" MEMBERS_NAME, &children);
for (i = 0; i < len; ++i) {
- if (strcmp(children[i], ipcpi.name) < 0)
- gam_flow_alloc(complete->gam, children[i], qs);
+ if (strcmp(children[i], ipcpi.name) < 0) {
+ if (connmgr_alloc(complete->ae,
+ children[i],
+ &qs,
+ &conn)) {
+ log_warn("Failed to get a conn to neighbor.");
+ free(children[i]);
+ continue;
+ }
+
+ if (nbs_add(complete->nbs, conn)) {
+ log_err("Failed to add neighbor.");
+ free(children[i]);
+ continue;
+ }
+
+ }
free(children[i]);
}
@@ -78,118 +112,41 @@ static void * allocator(void * o)
return (void *) 0;
}
-void * complete_create(struct gam * gam)
+void * complete_create(struct nbs * nbs,
+ struct ae * ae)
{
struct complete * complete;
- assert(gam);
-
complete = malloc(sizeof(*complete));
if (complete == NULL)
return NULL;
- list_head_init(&complete->neighbors);
- complete->gam = gam;
-
- if (pthread_mutex_init(&complete->neighbors_lock, NULL)) {
- free(complete);
- return NULL;
- }
-
- return (void *) complete;
-}
-
-int complete_start(void * o)
-{
- struct complete * complete = (struct complete *) o;
-
- assert(complete);
- assert(complete->gam);
+ complete->nbs = nbs;
+ complete->ae = ae;
if (pthread_create(&complete->allocator, NULL,
- allocator, (void *) complete)) {
- pthread_mutex_destroy(&complete->neighbors_lock);
- free(complete);
- return -1;
- }
+ allocator, (void *) complete))
+ return NULL;
- /* FIXME: Handle flooding of the flow allocator before detaching.*/
- pthread_join(complete->allocator, NULL);
+ if (pthread_create(&complete->listener, NULL,
+ listener, (void *) complete))
+ return NULL;
- return 0;
+ return complete;
}
-int complete_stop(void * o)
+void complete_destroy(void * ops_o)
{
- (void) o;
+ struct complete * complete;
- return 0;
-}
+ assert(ops_o);
-void complete_destroy(void * o)
-{
- struct list_head * p = NULL;
- struct list_head * n = NULL;
- struct complete * complete = (struct complete *) o;
-
- list_for_each_safe(p, n, &complete->neighbors) {
- struct neighbor * e = list_entry(p, struct neighbor, next);
- list_del(&e->next);
- free(e);
- }
+ complete = (struct complete *) ops_o;
- pthread_mutex_destroy(&complete->neighbors_lock);
+ pthread_cancel(complete->allocator);
+ pthread_cancel(complete->listener);
+ pthread_join(complete->allocator, NULL);
+ pthread_join(complete->listener, NULL);
free(complete);
}
-
-int complete_accept_new_flow(void * o)
-{
- (void) o;
-
- return 0;
-}
-
-int complete_accept_flow(void * o,
- qosspec_t qs,
- const struct conn_info * info)
-{
- struct list_head * pos = NULL;
- struct neighbor * n;
- struct complete * complete = (struct complete *) o;
-
- (void) qs;
-
- assert(complete);
-
- pthread_mutex_lock(&complete->neighbors_lock);
-
- list_for_each(pos, &complete->neighbors) {
- struct neighbor * e = list_entry(pos, struct neighbor, next);
- /* FIXME: figure out union type and check name or address */
- if (e->neighbor == info->addr) {
- pthread_mutex_unlock(&complete->neighbors_lock);
- return -1;
- }
-
- assert(complete);
- assert(&complete->neighbors_lock);
- assert(pos->nxt);
- }
-
- n = malloc(sizeof(*n));
- if (n == NULL) {
- pthread_mutex_unlock(&complete->neighbors_lock);
- return -1;
- }
-
- list_head_init(&n->next);
-
- n->neighbor = info->addr;
-
- list_add(&n->next, &complete->neighbors);
-
- pthread_mutex_unlock(&complete->neighbors_lock);
-
- return 0;
-}
diff --git a/src/ipcpd/normal/pol/complete.h b/src/ipcpd/normal/pol/complete.h
index 8fe1437f..40aca69d 100644
--- a/src/ipcpd/normal/pol/complete.h
+++ b/src/ipcpd/normal/pol/complete.h
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Sets up a complete graph
*
* Dimitri Staessens <dimitri.staessens@intec.ugent.be>
* Sander Vrijders <sander.vrijders@intec.ugent.be>
@@ -23,30 +23,19 @@
#ifndef OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H
#define OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H
-#include "gam.h"
-#include "pol-gam-ops.h"
-
-void * complete_create(struct gam * instance);
-
-void complete_destroy(void * o);
+#include <ouroboros/irm_config.h>
+#include <ouroboros/qos.h>
-int complete_start(void * o);
-
-int complete_stop(void * o);
+#include "pol-gam-ops.h"
-int complete_accept_new_flow(void * o);
+void * complete_create(struct nbs * nbs,
+ struct ae * ae);
-int complete_accept_flow(void * o,
- qosspec_t qs,
- const struct conn_info * info);
+void complete_destroy(void * ops_o);
struct pol_gam_ops complete_ops = {
- .create = complete_create,
- .destroy = complete_destroy,
- .start = complete_start,
- .stop = complete_stop,
- .accept_new_flow = complete_accept_new_flow,
- .accept_flow = complete_accept_flow
+ .create = complete_create,
+ .destroy = complete_destroy
};
#endif /* OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H */
diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h
index 15b65ce2..5ecdaab3 100644
--- a/src/ipcpd/normal/ribconfig.h
+++ b/src/ipcpd/normal/ribconfig.h
@@ -31,9 +31,11 @@
#define MEMBERS_NAME "members"
#define DIF_NAME "dif_name"
#define DIR_NAME "directory"
+#define ROUTING_NAME "fsdb"
#define DIF_PATH DLR DIF_NAME
#define DIR_PATH DLR DIR_NAME
#define BOOT_PATH DLR BOOT_NAME
#define MEMBERS_PATH DLR MEMBERS_NAME
+#define ROUTING_PATH DLR ROUTING_NAME
#endif /* OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H */
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 25f1687e..e8fa77a4 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -45,25 +45,70 @@
#include <assert.h>
struct {
- flow_set_t * fs;
- fqueue_t * fq;
- struct gam * gam;
+ flow_set_t * fs;
+ fqueue_t * fq;
+
+ struct gam * gam;
+ struct nbs * nbs;
+ struct ae * ae;
+
+ struct nb_notifier nb_notifier;
} ribmgr;
+static int ribmgr_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ /* We are only interested in neighbors being added and removed. */
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ flow_set_add(ribmgr.fs, conn.flow_info.fd);
+ break;
+ case NEIGHBOR_REMOVED:
+ flow_set_del(ribmgr.fs, conn.flow_info.fd);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
int ribmgr_init(void)
{
- enum pol_gam pg;
+ enum pol_gam pg;
+ struct conn_info info;
+
+ strcpy(info.ae_name, MGMT_AE);
+ strcpy(info.protocol, CDAP_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_GPB;
+
+ ribmgr.nbs = nbs_create();
+ if (ribmgr.nbs == NULL) {
+ log_err("Failed to create neighbors.");
+ return -1;
+ }
+
+ ribmgr.ae = connmgr_ae_create(info);
+ if (ribmgr.ae == NULL) {
+ log_err("Failed to create AE struct.");
+ nbs_destroy(ribmgr.nbs);
+ return -1;
+ }
if (rib_read(BOOT_PATH "/rm/gam/type", &pg, sizeof(pg))
!= sizeof(pg)) {
log_err("Failed to read policy for ribmgr gam.");
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
return -1;
}
- ribmgr.gam = gam_create(pg);
+ ribmgr.gam = gam_create(pg, ribmgr.nbs, ribmgr.ae);
if (ribmgr.gam == NULL) {
log_err("Failed to create gam.");
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
return -1;
}
@@ -71,6 +116,8 @@ int ribmgr_init(void)
if (ribmgr.fs == NULL) {
log_err("Failed to create flow set.");
gam_destroy(ribmgr.gam);
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
return -1;
}
@@ -79,6 +126,19 @@ int ribmgr_init(void)
log_err("Failed to create fq.");
flow_set_destroy(ribmgr.fs);
gam_destroy(ribmgr.gam);
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
+ return -1;
+ }
+
+ ribmgr.nb_notifier.notify_call = ribmgr_neighbor_event;
+ if (nbs_reg_notifier(ribmgr.nbs, &ribmgr.nb_notifier)) {
+ log_err("Failed to register notifier.");
+ fqueue_destroy(ribmgr.fq);
+ flow_set_destroy(ribmgr.fs);
+ gam_destroy(ribmgr.gam);
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
return -1;
}
@@ -87,20 +147,12 @@ int ribmgr_init(void)
void ribmgr_fini(void)
{
+ nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier);
flow_set_destroy(ribmgr.fs);
fqueue_destroy(ribmgr.fq);
gam_destroy(ribmgr.gam);
-}
-
-int ribmgr_flow_arr(int fd,
- qosspec_t qs)
-{
- assert(ribmgr.gam);
-
- if (gam_flow_arr(ribmgr.gam, fd, qs))
- return -1;
-
- return 0;
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
}
int ribmgr_disseminate(char * path,
diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h
index 12f407ab..f3f4cc24 100644
--- a/src/ipcpd/normal/ribmgr.h
+++ b/src/ipcpd/normal/ribmgr.h
@@ -41,9 +41,6 @@ int ribmgr_init(void);
void ribmgr_fini(void);
-int ribmgr_flow_arr(int fd,
- qosspec_t qs);
-
int ribmgr_disseminate(char * path,
enum diss_target target,
enum diss_freq freq,
diff --git a/src/ipcpd/normal/routing.c b/src/ipcpd/normal/routing.c
new file mode 100644
index 00000000..48c2c16d
--- /dev/null
+++ b/src/ipcpd/normal/routing.c
@@ -0,0 +1,132 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Routing component of the IPCP
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "routing"
+
+#include <ouroboros/config.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/list.h>
+#include <ouroboros/logs.h>
+
+#include "routing.h"
+#include "ribmgr.h"
+
+#include <assert.h>
+#include <stdlib.h>
+#include <inttypes.h>
+
+struct edge {
+ struct vertex * ep;
+ qosspec_t qs;
+};
+
+struct vertex {
+ struct list_head next;
+
+ uint64_t addr;
+
+ struct list_head edges;
+};
+
+struct routing {
+ struct pff * pff;
+ struct nbs * nbs;
+
+ struct nb_notifier nb_notifier;
+
+ struct list_head vertices;
+};
+
+static int routing_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ (void) conn;
+
+ /* FIXME: React to events here */
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ break;
+ case NEIGHBOR_REMOVED:
+ break;
+ case NEIGHBOR_QOS_CHANGE:
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+#if 0
+/* FIXME: If zeroed since it is not used currently */
+static int add_vertex(struct routing * instance,
+ uint64_t addr)
+{
+ struct vertex * vertex;
+
+ vertex = malloc(sizeof(*vertex));
+ if (vertex == NULL)
+ return -1;
+
+ list_head_init(&vertex->next);
+ list_head_init(&vertex->edges);
+ vertex->addr = addr;
+
+ list_add(&vertex->next, &instance->vertices);
+
+ return 0;
+}
+#endif
+
+struct routing * routing_create(struct pff * pff,
+ struct nbs * nbs)
+{
+ struct routing * tmp;
+
+ assert(pff);
+
+ tmp = malloc(sizeof(*tmp));
+ if (tmp == NULL)
+ return NULL;
+
+ tmp->pff = pff;
+ tmp->nbs = nbs;
+
+ list_head_init(&tmp->vertices);
+
+ tmp->nb_notifier.notify_call = routing_neighbor_event;
+ if (nbs_reg_notifier(tmp->nbs, &tmp->nb_notifier)) {
+ free(tmp);
+ return NULL;
+ }
+
+ return tmp;
+}
+
+void routing_destroy(struct routing * instance)
+{
+ assert(instance);
+
+ nbs_unreg_notifier(instance->nbs, &instance->nb_notifier);
+
+ free(instance);
+}
diff --git a/src/ipcpd/normal/routing.h b/src/ipcpd/normal/routing.h
new file mode 100644
index 00000000..624763ec
--- /dev/null
+++ b/src/ipcpd/normal/routing.h
@@ -0,0 +1,42 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Routing component of the IPCP
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_ROUTING_H
+#define OUROBOROS_IPCPD_NORMAL_ROUTING_H
+
+#include <ouroboros/qos.h>
+
+#include "pff.h"
+#include "neighbors.h"
+
+#include <stdint.h>
+
+/*
+ * Routing will take a type in the future,
+ * to allow different policies.
+ */
+struct routing * routing_create(struct pff * pff,
+ struct nbs * nbs);
+
+void routing_destroy(struct routing * instance);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_ROUTING_H */