From a409fd81dfc6d22f9a287f15394b86490dea5273 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 23 Feb 2017 14:31:31 +0100 Subject: ipcpd: normal: Refactor application entities and add neighbors struct This refactors the different Application Entities of the normal IPCP. They all listen to and use the connection manager to establish new application connections. This commit also adds a neighbors struct to the normal IPCP. It contains neighbor structs that contain application connection. Notifiers can be registered in case a neighbor changes (added, removed, QoS changed). The flow manager has an instance of this neighbors struct and listens to these events to update its flow set. The routing component also listens to these events so that it can update the FSDB if needed. The flow manager now also creates the PFF instances and the routing instances per QoS cube. The RIB manager also uses this an instance of the neighbors struct and listens to neighbor events as well. --- src/ipcpd/normal/CMakeLists.txt | 2 + src/ipcpd/normal/connmgr.c | 8 +- src/ipcpd/normal/connmgr.h | 2 +- src/ipcpd/normal/enroll.c | 292 +++++++++++++++++++++++---------------- src/ipcpd/normal/enroll.h | 10 +- src/ipcpd/normal/fmgr.c | 283 +++++++++++++++++++++++--------------- src/ipcpd/normal/fmgr.h | 4 - src/ipcpd/normal/frct.c | 16 +-- src/ipcpd/normal/frct.h | 2 + src/ipcpd/normal/gam.c | 296 +++------------------------------------- src/ipcpd/normal/gam.h | 21 +-- src/ipcpd/normal/main.c | 54 ++++++-- src/ipcpd/normal/neighbors.c | 213 +++++++++++++++++++++++++++++ src/ipcpd/normal/neighbors.h | 81 +++++++++++ src/ipcpd/normal/pff.c | 7 +- src/ipcpd/normal/pff.h | 4 +- src/ipcpd/normal/pol-gam-ops.h | 14 +- src/ipcpd/normal/pol/complete.c | 189 ++++++++++--------------- src/ipcpd/normal/pol/complete.h | 29 ++-- src/ipcpd/normal/ribconfig.h | 2 + src/ipcpd/normal/ribmgr.c | 84 +++++++++--- src/ipcpd/normal/ribmgr.h | 3 - src/ipcpd/normal/routing.c | 132 ++++++++++++++++++ src/ipcpd/normal/routing.h | 42 ++++++ 24 files changed, 1065 insertions(+), 725 deletions(-) create mode 100644 src/ipcpd/normal/neighbors.c create mode 100644 src/ipcpd/normal/neighbors.h create mode 100644 src/ipcpd/normal/routing.c create mode 100644 src/ipcpd/normal/routing.h diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 70742336..772d5212 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -27,8 +27,10 @@ set(SOURCE_FILES frct.c gam.c main.c + neighbors.c pff.c ribmgr.c + routing.c shm_pci.c # Add policies last pol/complete.c diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index 387c38fd..0c908cd1 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -42,8 +42,6 @@ #include #include -#define FRCT_PROTO "frct" - struct ae_conn { struct list_head next; struct conn conn; @@ -272,7 +270,7 @@ void connmgr_ae_destroy(struct ae * ae) int connmgr_alloc(struct ae * ae, char * dst_name, - qosspec_t qs, + qosspec_t * qs, struct conn * conn) { assert(ae); @@ -281,13 +279,13 @@ int connmgr_alloc(struct ae * ae, memset(&conn->conn_info, 0, sizeof(conn->conn_info)); - conn->flow_info.fd = flow_alloc(dst_name, &qs); + conn->flow_info.fd = flow_alloc(dst_name, qs); if (conn->flow_info.fd < 0) { log_err("Failed to allocate flow to %s.", dst_name); return -1; } - conn->flow_info.qs = qs; + conn->flow_info.qs = *qs; if (flow_alloc_res(conn->flow_info.fd)) { log_err("Flow allocation to %s failed.", dst_name); diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h index bfb3d762..5dbf2bcc 100644 --- a/src/ipcpd/normal/connmgr.h +++ b/src/ipcpd/normal/connmgr.h @@ -48,7 +48,7 @@ void connmgr_ae_destroy(struct ae * ae); int connmgr_alloc(struct ae * ae, char * dst_name, - qosspec_t qs, + qosspec_t * qs, struct conn * conn); int connmgr_wait(struct ae * ae, diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c index 9c3b9973..25460161 100644 --- a/src/ipcpd/normal/enroll.c +++ b/src/ipcpd/normal/enroll.c @@ -22,6 +22,8 @@ #include #include +#include +#include #include #include #include @@ -29,29 +31,35 @@ #include #include "ae.h" -#include "cdap_flow.h" +#include "connmgr.h" #include "ribconfig.h" #include #include #include +#include /* Symbolic, will return current time */ #define TIME_NAME "localtime" #define TIME_PATH DLR TIME_NAME #define ENROLL_WARN_TIME_OFFSET 20 -int enroll_handle(int fd) +struct { + struct ae * ae; + pthread_t listener; +} enroll; + +static void * enroll_handle(void * o) { - struct cdap_flow * flow; - struct conn_info info; - cdap_key_t key; - enum cdap_opcode oc; - char * name; - uint8_t * buf; - uint8_t * data; - ssize_t len; - uint32_t flags; + struct cdap * cdap; + struct conn conn; + cdap_key_t key; + enum cdap_opcode oc; + char * name; + uint8_t * buf; + uint8_t * data; + ssize_t len; + uint32_t flags; bool boot_r = false; bool members_r = false; @@ -61,98 +69,107 @@ int enroll_handle(int fd) char * members_ro = MEMBERS_PATH; char * dif_ro = DIF_PATH; - memset(&info, 0, sizeof(info)); - - strcpy(info.ae_name, ENROLL_AE); - strcpy(info.protocol, CDAP_PROTO); - info.pref_version = 1; - info.pref_syntax = PROTO_GPB; + (void) o; - flow = cdap_flow_arr(fd, 0, &info); - if (flow == NULL) { - log_err("Failed to auth enrollment request."); - flow_dealloc(fd); - return -1; - } - - while (!(boot_r && members_r && dif_name_r)) { - key = cdap_request_wait(flow->ci, &oc, &name, &data, - (size_t *) &len , &flags); - assert(key >= 0); - assert(name); - - if (data != NULL) { - free(data); - log_warn("Received data with enrollment request."); - } - - if (oc != CDAP_READ) { - log_warn("Invalid request."); - cdap_reply_send(flow->ci, key, -1, NULL, 0); - cdap_flow_dealloc(flow); - free(name); - return -1; + while (true) { + if (connmgr_wait(enroll.ae, &conn)) { + log_err("Failed to get next connection."); + continue; } - if (strcmp(name, boot_ro) == 0) { - boot_r = true; - } else if (strcmp(name, members_ro) == 0) { - members_r = true; - } else if (strcmp(name, dif_ro) == 0) { - dif_name_r = true; - } else if (strcmp(name, TIME_PATH) == 0) { - struct timespec t; - uint64_t buf[2]; - clock_gettime(CLOCK_REALTIME, &t); - buf[0] = hton64(t.tv_sec); - buf[1] = hton64(t.tv_nsec); - cdap_reply_send(flow->ci, key, 0, buf, sizeof(buf)); - free(name); + cdap = cdap_create(conn.flow_info.fd); + if (cdap == NULL) { + log_err("Failed to instantiate CDAP."); + flow_dealloc(conn.flow_info.fd); continue; - } else { - log_warn("Illegal read: %s.", name); - cdap_reply_send(flow->ci, key, -1, NULL, 0); - cdap_flow_dealloc(flow); - free(name); - return -1; } - len = rib_pack(name, &buf, PACK_HASH_ROOT); - if (len < 0) { - log_err("Failed to pack %s.", name); - cdap_reply_send(flow->ci, key, -1, NULL, 0); - cdap_flow_dealloc(flow); - free(name); - return -1; - } + while (!(boot_r && members_r && dif_name_r)) { + key = cdap_request_wait(cdap, &oc, &name, &data, + (size_t *) &len , &flags); + assert(key >= 0); + assert(name); + + if (data != NULL) { + free(data); + log_warn("Received data with enroll request."); + } + + if (oc != CDAP_READ) { + log_warn("Invalid request."); + cdap_reply_send(cdap, key, -1, NULL, 0); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + free(name); + continue; + } + + if (strcmp(name, boot_ro) == 0) { + boot_r = true; + } else if (strcmp(name, members_ro) == 0) { + members_r = true; + } else if (strcmp(name, dif_ro) == 0) { + dif_name_r = true; + } else if (strcmp(name, TIME_PATH) == 0) { + struct timespec t; + uint64_t buf[2]; + clock_gettime(CLOCK_REALTIME, &t); + buf[0] = hton64(t.tv_sec); + buf[1] = hton64(t.tv_nsec); + cdap_reply_send(cdap, key, 0, buf, sizeof(buf)); + free(name); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + continue; + } else { + log_warn("Illegal read: %s.", name); + cdap_reply_send(cdap, key, -1, NULL, 0); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + free(name); + continue; + } + + len = rib_pack(name, &buf, PACK_HASH_ROOT); + if (len < 0) { + log_err("Failed to pack %s.", name); + cdap_reply_send(cdap, key, -1, NULL, 0); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + free(name); + continue; + } + + log_dbg("Packed %s (%zu bytes).", name, len); - log_dbg("Packed %s (%zu bytes).", name, len); + free(name); - free(name); + if (cdap_reply_send(cdap, key, 0, buf, len)) { + log_err("Failed to send CDAP reply."); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + continue; + } - if (cdap_reply_send(flow->ci, key, 0, buf, len)) { - log_err("Failed to send CDAP reply."); - cdap_flow_dealloc(flow); - return -1; + free(buf); } - free(buf); - } - - log_dbg("Sent boot info to new member."); + log_dbg("Sent boot info to new member."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + } return 0; } int enroll_boot(char * dst_name) { - struct cdap_flow * flow; - struct conn_info info; - cdap_key_t key; - uint8_t * data; - size_t len; + struct cdap * cdap; + cdap_key_t key; + uint8_t * data; + size_t len; + struct conn conn; struct timespec t0; struct timespec rtt; @@ -163,16 +180,14 @@ int enroll_boot(char * dst_name) char * members_ro = MEMBERS_PATH; char * dif_ro = DIF_PATH; - memset(&info, 0, sizeof(info)); - - strcpy(info.ae_name, ENROLL_AE); - strcpy(info.protocol, CDAP_PROTO); - info.pref_version = 1; - info.pref_syntax = PROTO_GPB; + if (connmgr_alloc(enroll.ae, dst_name, NULL, &conn)) { + log_err("Failed to get connection."); + return -1; + } - flow = cdap_flow_alloc(dst_name, NULL, &info); - if (flow == NULL) { - log_err("Failed to allocate flow for enrollment request."); + cdap = cdap_create(conn.flow_info.fd); + if (cdap == NULL) { + log_err("Failed to instantiate CDAP."); return -1; } @@ -180,16 +195,18 @@ int enroll_boot(char * dst_name) clock_gettime(CLOCK_REALTIME, &t0); - key = cdap_request_send(flow->ci, CDAP_READ, TIME_PATH, NULL, 0, 0); + key = cdap_request_send(cdap, CDAP_READ, TIME_PATH, NULL, 0, 0); if (key < 0) { log_err("Failed to send CDAP request."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } - if (cdap_reply_wait(flow->ci, key, &data, &len)) { + if (cdap_reply_wait(cdap, key, &data, &len)) { log_err("Failed to get CDAP reply."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } @@ -207,16 +224,18 @@ int enroll_boot(char * dst_name) free(data); - key = cdap_request_send(flow->ci, CDAP_READ, boot_ro, NULL, 0, 0); + key = cdap_request_send(cdap, CDAP_READ, boot_ro, NULL, 0, 0); if (key < 0) { log_err("Failed to send CDAP request."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } - if (cdap_reply_wait(flow->ci, key, &data, &len)) { + if (cdap_reply_wait(cdap, key, &data, &len)) { log_err("Failed to get CDAP reply."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } @@ -226,22 +245,25 @@ int enroll_boot(char * dst_name) log_warn("Error unpacking RIB data."); rib_del(boot_ro); free(data); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } log_dbg("Packed information inserted into RIB."); - key = cdap_request_send(flow->ci, CDAP_READ, members_ro, NULL, 0, 0); + key = cdap_request_send(cdap, CDAP_READ, members_ro, NULL, 0, 0); if (key < 0) { log_err("Failed to send CDAP request."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } - if (cdap_reply_wait(flow->ci, key, &data, &len)) { + if (cdap_reply_wait(cdap, key, &data, &len)) { log_err("Failed to get CDAP reply."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } @@ -251,22 +273,25 @@ int enroll_boot(char * dst_name) log_warn("Error unpacking RIB data."); rib_del(boot_ro); free(data); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } log_dbg("Packed information inserted into RIB."); - key = cdap_request_send(flow->ci, CDAP_READ, dif_ro, NULL, 0, 0); + key = cdap_request_send(cdap, CDAP_READ, dif_ro, NULL, 0, 0); if (key < 0) { log_err("Failed to send CDAP request."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } - if (cdap_reply_wait(flow->ci, key, &data, &len)) { + if (cdap_reply_wait(cdap, key, &data, &len)) { log_err("Failed to get CDAP reply."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } @@ -276,13 +301,52 @@ int enroll_boot(char * dst_name) log_warn("Error unpacking RIB data."); rib_del(boot_ro); free(data); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); return -1; } log_dbg("Packed information inserted into RIB."); - cdap_flow_dealloc(flow); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + + return 0; +} + +int enroll_init(void) +{ + struct conn_info info; + + memset(&info, 0, sizeof(info)); + + strcpy(info.ae_name, ENROLL_AE); + strcpy(info.protocol, CDAP_PROTO); + info.pref_version = 1; + info.pref_syntax = PROTO_GPB; + + enroll.ae = connmgr_ae_create(info); + if (enroll.ae == NULL) + return -1; + + return 0; +} + +void enroll_fini(void) +{ + connmgr_ae_destroy(enroll.ae); +} + +int enroll_start(void) +{ + if (pthread_create(&enroll.listener, NULL, enroll_handle, NULL)) + return -1; return 0; } + +void enroll_stop(void) +{ + pthread_cancel(enroll.listener); + pthread_join(enroll.listener, NULL); +} diff --git a/src/ipcpd/normal/enroll.h b/src/ipcpd/normal/enroll.h index 2980c380..3c81ae25 100644 --- a/src/ipcpd/normal/enroll.h +++ b/src/ipcpd/normal/enroll.h @@ -22,8 +22,14 @@ #ifndef OUROBOROS_IPCPD_NORMAL_ENROLL_H #define OUROBOROS_IPCPD_NORMAL_ENROLL_H -int enroll_handle(int fd); +int enroll_init(void); -int enroll_boot(char * dst_name); +void enroll_fini(void); + +int enroll_start(void); + +void enroll_stop(void); + +int enroll_boot(char * dst_name); #endif /* OUROBOROS_IPCPD_NORMAL_ENROLL_H */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 34724ddd..b7a99f6c 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -31,12 +31,16 @@ #include #include +#include "connmgr.h" #include "fmgr.h" #include "frct.h" #include "ipcp.h" #include "shm_pci.h" -#include "gam.h" #include "ribconfig.h" +#include "pff.h" +#include "neighbors.h" +#include "gam.h" +#include "routing.h" #include #include @@ -48,19 +52,7 @@ typedef FlowAllocMsg flow_alloc_msg_t; #define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */ -struct nm1_flow { - struct list_head next; - int fd; - qosspec_t qs; - struct conn_info * info; -}; - struct { - flow_set_t * nm1_set[QOS_CUBE_MAX]; - fqueue_t * nm1_fqs[QOS_CUBE_MAX]; - struct list_head nm1_flows; - pthread_rwlock_t nm1_flows_lock; - flow_set_t * np1_set[QOS_CUBE_MAX]; fqueue_t * np1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t np1_flows_lock; @@ -69,15 +61,43 @@ struct { int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; pthread_t np1_sdu_reader; + + flow_set_t * nm1_set[QOS_CUBE_MAX]; + fqueue_t * nm1_fqs[QOS_CUBE_MAX]; pthread_t nm1_sdu_reader; - pthread_t nm1_flow_wait; - /* FIXME: Replace with PFF */ - int fd; + struct pff * pff[QOS_CUBE_MAX]; + struct routing * routing[QOS_CUBE_MAX]; struct gam * gam; + struct nbs * nbs; + struct ae * ae; + + struct nb_notifier nb_notifier; } fmgr; +static int fmgr_neighbor_event(enum nb_event event, + struct conn conn) +{ + qoscube_t cube; + + /* We are only interested in neighbors being added and removed. */ + switch (event) { + case NEIGHBOR_ADDED: + ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); + flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd); + break; + case NEIGHBOR_REMOVED: + ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); + flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd); + break; + default: + break; + } + + return 0; +} + static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; @@ -171,12 +191,20 @@ void * fmgr_nm1_sdu_reader(void * o) continue; } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - continue; + fd = pff_nhop(fmgr.pff[i], pci.dst_addr); + if (fd < 0) { + log_err("No next hop for %lu", + pci.dst_addr); + ipcp_flow_del(sdb); + continue; + } + + if (ipcp_flow_write(fd, sdb)) { + log_err("Failed to write SDU to fd %d.", + fd); + ipcp_flow_del(sdb); + continue; + } } shm_pci_shrink(sdb); @@ -192,49 +220,6 @@ void * fmgr_nm1_sdu_reader(void * o) return (void *) 0; } -static void * fmgr_nm1_flow_wait(void * o) -{ - qoscube_t cube; - struct conn_info * info; - int fd; - qosspec_t qs; - struct nm1_flow * flow; - - (void) o; - - while (true) { - if (gam_flow_wait(fmgr.gam, &fd, &info, &qs)) { - log_err("Failed to get next flow descriptor."); - continue; - } - - ipcp_flow_get_qoscube(fd, &cube); - flow_set_add(fmgr.nm1_set[cube], fd); - - /* FIXME: Temporary, until we have a PFF */ - fmgr.fd = fd; - - pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); - flow = malloc(sizeof(*flow)); - if (flow == NULL) { - free(info); - pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - continue; - } - - flow->info = info; - flow->fd = fd; - flow->qs = qs; - - list_head_init(&flow->next); - list_add(&flow->next, &fmgr.nm1_flows); - - pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - } - - return (void *) 0; -} - static void fmgr_destroy_flows(void) { int i; @@ -247,12 +232,29 @@ static void fmgr_destroy_flows(void) } } -int fmgr_init(void) +static void fmgr_destroy_routing(void) { - enum pol_gam pg; + int i; + + for (i = 0; i < QOS_CUBE_MAX; ++i) + routing_destroy(fmgr.routing[i]); +} +static void fmgr_destroy_pff(void) +{ int i; + for (i = 0; i < QOS_CUBE_MAX; ++i) + pff_destroy(fmgr.pff[i]); +} + +int fmgr_init(void) +{ + enum pol_gam pg; + int i; + int j; + struct conn_info info; + for (i = 0; i < AP_MAX_FLOWS; ++i) fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; @@ -288,63 +290,116 @@ int fmgr_init(void) if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg)) != sizeof(pg)) { log_err("Failed to read policy for ribmgr gam."); + fmgr_destroy_flows(); return -1; } - fmgr.gam = gam_create(pg); - if (fmgr.gam == NULL) { - log_err("Failed to create graph adjacency manager."); + strcpy(info.ae_name, DT_AE); + strcpy(info.protocol, FRCT_PROTO); + info.pref_version = 1; + info.pref_syntax = PROTO_FIXED; + info.addr = ipcpi.dt_addr; + + fmgr.ae = connmgr_ae_create(info); + if (fmgr.ae == NULL) { + log_err("Failed to create AE struct."); fmgr_destroy_flows(); return -1; } - list_head_init(&fmgr.nm1_flows); + fmgr.nbs = nbs_create(); + if (fmgr.nbs == NULL) { + log_err("Failed to create neighbors struct."); + fmgr_destroy_flows(); + connmgr_ae_destroy(fmgr.ae); + return -1; + } - pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); - pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); + fmgr.nb_notifier.notify_call = fmgr_neighbor_event; + if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) { + log_err("Failed to register notifier."); + nbs_destroy(fmgr.nbs); + fmgr_destroy_flows(); + connmgr_ae_destroy(fmgr.ae); + return -1; + } + + if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) { + gam_destroy(fmgr.gam); + nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); + nbs_destroy(fmgr.nbs); + fmgr_destroy_flows(); + connmgr_ae_destroy(fmgr.ae); + return -1; + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + fmgr.pff[i] = pff_create(); + if (fmgr.pff[i] == NULL) { + for (j = 0; j < i; ++j) + pff_destroy(fmgr.pff[j]); + pthread_rwlock_destroy(&fmgr.np1_flows_lock); + nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); + nbs_destroy(fmgr.nbs); + fmgr_destroy_flows(); + connmgr_ae_destroy(fmgr.ae); + return -1; + } + + fmgr.routing[i] = routing_create(fmgr.pff[i], fmgr.nbs); + if (fmgr.routing[i] == NULL) { + for (j = 0; j < i; ++j) + routing_destroy(fmgr.routing[j]); + fmgr_destroy_pff(); + pthread_rwlock_destroy(&fmgr.np1_flows_lock); + nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); + nbs_destroy(fmgr.nbs); + fmgr_destroy_flows(); + connmgr_ae_destroy(fmgr.ae); + return -1; + } + } + + fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae); + if (fmgr.gam == NULL) { + log_err("Failed to init dt graph adjacency manager."); + fmgr_destroy_routing(); + fmgr_destroy_pff(); + pthread_rwlock_destroy(&fmgr.np1_flows_lock); + nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); + nbs_destroy(fmgr.nbs); + fmgr_destroy_flows(); + connmgr_ae_destroy(fmgr.ae); + return -1; + } pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL); - pthread_create(&fmgr.nm1_flow_wait, NULL, fmgr_nm1_flow_wait, NULL); return 0; } void fmgr_fini() { - struct list_head * pos = NULL; - struct list_head * n = NULL; - qoscube_t cube; - pthread_cancel(fmgr.np1_sdu_reader); pthread_cancel(fmgr.nm1_sdu_reader); - pthread_cancel(fmgr.nm1_flow_wait); pthread_join(fmgr.np1_sdu_reader, NULL); pthread_join(fmgr.nm1_sdu_reader, NULL); - pthread_join(fmgr.nm1_flow_wait, NULL); - gam_destroy(fmgr.gam); + nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); - - list_for_each_safe(pos, n, &fmgr.nm1_flows) { - struct nm1_flow * flow = - list_entry(pos, struct nm1_flow, next); - list_del(&flow->next); - flow_dealloc(flow->fd); - ipcp_flow_get_qoscube(flow->fd, &cube); - flow_set_del(fmgr.nm1_set[cube], flow->fd); - free(flow->info); - free(flow); - } + gam_destroy(fmgr.gam); - pthread_rwlock_unlock(&fmgr.nm1_flows_lock); + fmgr_destroy_routing(); - pthread_rwlock_destroy(&fmgr.nm1_flows_lock); - pthread_rwlock_destroy(&fmgr.np1_flows_lock); + fmgr_destroy_pff(); fmgr_destroy_flows(); + + connmgr_ae_destroy(fmgr.ae); + + nbs_destroy(fmgr.nbs); } int fmgr_np1_alloc(int fd, @@ -601,24 +656,20 @@ int fmgr_np1_post_sdu(cep_id_t cep_id, return 0; } -int fmgr_nm1_flow_arr(int fd, - qosspec_t qs) -{ - assert(fmgr.gam); - - if (gam_flow_arr(fmgr.gam, fd, qs)) { - log_err("Failed to hand to graph adjacency manager."); - return -1; - } - - return 0; -} - int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb) { + int fd; + if (pci == NULL || sdb == NULL) + return -EINVAL; + + fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); + if (fd < 0) { + log_err("Could not get nhop for address %lu", pci->dst_addr); + ipcp_flow_del(sdb); return -1; + } if (shm_pci_ser(sdb, pci)) { log_err("Failed to serialize PDU."); @@ -626,8 +677,8 @@ int fmgr_nm1_write_sdu(struct pci * pci, return -1; } - if (ipcp_flow_write(fmgr.fd, sdb)) { - log_err("Failed to write SDU to fd %d.", fmgr.fd); + if (ipcp_flow_write(fd, sdb)) { + log_err("Failed to write SDU to fd %d.", fd); ipcp_flow_del(sdb); return -1; } @@ -639,9 +690,17 @@ int fmgr_nm1_write_buf(struct pci * pci, buffer_t * buf) { buffer_t * buffer; + int fd; if (pci == NULL || buf == NULL || buf->data == NULL) + return -EINVAL; + + fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); + if (fd < 0) { + log_err("Could not get nhop for address %lu", pci->dst_addr); + free(buf->data); return -1; + } buffer = shm_pci_ser_buf(buf, pci); if (buffer == NULL) { @@ -650,7 +709,7 @@ int fmgr_nm1_write_buf(struct pci * pci, return -1; } - if (flow_write(fmgr.fd, buffer->data, buffer->len) == -1) { + if (flow_write(fd, buffer->data, buffer->len) == -1) { log_err("Failed to write buffer to fd."); free(buffer); return -1; diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index e75417f3..06eab0a1 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -53,8 +53,4 @@ int fmgr_nm1_write_sdu(struct pci * pci, int fmgr_nm1_write_buf(struct pci * pci, buffer_t * buf); -int fmgr_nm1_flow_arr(int fd, - qosspec_t qs); - - #endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */ diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index c9b23060..b5a42db4 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -198,12 +198,12 @@ int frct_fini() return 0; } -int frct_nm1_post_sdu(struct pci * pci, +int frct_nm1_post_sdu(struct pci * pci, struct shm_du_buff * sdb) { struct frct_i * instance; - buffer_t buf; - cep_id_t id; + buffer_t buf; + cep_id_t id; if (pci == NULL || sdb == NULL) return -1; @@ -267,8 +267,8 @@ cep_id_t frct_i_create(uint64_t address, qoscube_t cube) { struct frct_i * instance; - struct pci pci; - cep_id_t id; + struct pci pci; + cep_id_t id; if (buf == NULL || buf->data == NULL) return INVALID_CEP_ID; @@ -304,7 +304,7 @@ int frct_i_accept(cep_id_t id, buffer_t * buf, qoscube_t cube) { - struct pci pci; + struct pci pci; struct frct_i * instance; if (buf == NULL || buf->data == NULL) @@ -347,7 +347,7 @@ int frct_i_accept(cep_id_t id, int frct_i_destroy(cep_id_t id, buffer_t * buf) { - struct pci pci; + struct pci pci; struct frct_i * instance; pthread_mutex_lock(&frct.instances_lock); @@ -390,7 +390,7 @@ int frct_i_destroy(cep_id_t id, int frct_i_write_sdu(cep_id_t id, struct shm_du_buff * sdb) { - struct pci pci; + struct pci pci; struct frct_i * instance; if (sdb == NULL) diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 462b8cc3..d85d11f5 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -27,6 +27,8 @@ #include "shm_pci.h" +#define FRCT_PROTO "FRCT" + struct frct_i; int frct_init(void); diff --git a/src/ipcpd/normal/gam.c b/src/ipcpd/normal/gam.c index 212cfd83..cb4e662f 100644 --- a/src/ipcpd/normal/gam.c +++ b/src/ipcpd/normal/gam.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2017 * - * Graph adjacency manager for IPC Process components + * Data transfer graph adjacency manager * * Dimitri Staessens * Sander Vrijders @@ -20,7 +20,7 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define OUROBOROS_PREFIX "graph-adjacency-manager" +#define OUROBOROS_PREFIX "dt-gam" #include #include @@ -40,305 +40,43 @@ #include #include -struct ga { - struct list_head next; - - qosspec_t qs; - int fd; - struct conn_info * info; -}; - struct gam { - struct list_head gas; - pthread_mutex_t gas_lock; - pthread_cond_t gas_cond; - struct pol_gam_ops * ops; void * ops_o; }; -struct gam * gam_create(enum pol_gam gam_type) +struct gam * gam_create(enum pol_gam gam_type, + struct nbs * nbs, + struct ae * ae) { - struct gam * tmp; + struct gam * gam; - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) + gam = malloc(sizeof(*gam)); + if (gam == NULL) return NULL; switch (gam_type) { case COMPLETE: - tmp->ops = &complete_ops; + gam->ops = &complete_ops; break; default: log_err("Unknown gam policy: %d.", gam_type); - free(tmp); - return NULL; - } - - list_head_init(&tmp->gas); - - if (pthread_mutex_init(&tmp->gas_lock, NULL)) { - free(tmp); - return NULL; - } - - if (pthread_cond_init(&tmp->gas_cond, NULL)) { - pthread_mutex_destroy(&tmp->gas_lock); - free(tmp); return NULL; } - tmp->ops_o = tmp->ops->create(tmp); - if (tmp->ops_o == NULL) { - pthread_cond_destroy(&tmp->gas_cond); - pthread_mutex_destroy(&tmp->gas_lock); - free(tmp); + gam->ops_o = gam->ops->create(nbs, ae); + if (gam->ops_o == NULL) { + free(gam); return NULL; } - if (tmp->ops->start(tmp->ops_o)) { - pthread_cond_destroy(&tmp->gas_cond); - pthread_mutex_destroy(&tmp->gas_lock); - free(tmp); - return NULL; - } - - return tmp; -} - -void gam_destroy(struct gam * instance) -{ - struct list_head * p = NULL; - struct list_head * n = NULL; - - assert(instance); - - instance->ops->stop(instance->ops_o); - - pthread_mutex_lock(&instance->gas_lock); - - list_for_each_safe(p, n, &instance->gas) { - struct ga * e = list_entry(p, struct ga, next); - list_del(&e->next); - free(e->info); - free(e); - } - - pthread_mutex_unlock(&instance->gas_lock); - - pthread_mutex_destroy(&instance->gas_lock); - pthread_cond_destroy(&instance->gas_cond); - - instance->ops->destroy(instance->ops_o); - free(instance); -} - -static int add_ga(struct gam * instance, - int fd, - qosspec_t qs, - struct conn_info * info) -{ - struct ga * ga; - - ga = malloc(sizeof(*ga)); - if (ga == NULL) - return -ENOMEM; - - ga->fd = fd; - ga->info = info; - ga->qs = qs; - - list_head_init(&ga->next); - - pthread_mutex_lock(&instance->gas_lock); - list_add(&ga->next, &instance->gas); - pthread_cond_signal(&instance->gas_cond); - pthread_mutex_unlock(&instance->gas_lock); - - log_info("Added flow."); - - return 0; -} - -int gam_flow_arr(struct gam * instance, - int fd, - qosspec_t qs) -{ - struct conn_info * rcv_info; - struct conn_info snd_info; - - if (flow_alloc_resp(fd, instance->ops->accept_new_flow(instance->ops_o)) - < 0) { - log_err("Could not respond to new flow."); - return -1; - } - - rcv_info = malloc(sizeof(*rcv_info)); - if (rcv_info == NULL) - return -ENOMEM; - - memset(&snd_info, 0, sizeof(snd_info)); - memset(rcv_info, 0, sizeof(*rcv_info)); - - /* FIXME: send correct AE */ - strcpy(snd_info.ae_name, "FIXME:CORRECT_AE"); - strcpy(snd_info.protocol, CDAP_PROTO); - snd_info.pref_version = 1; - snd_info.pref_syntax = PROTO_GPB; - snd_info.addr = ipcpi.dt_addr; - - if (cacep_rcv(fd, rcv_info)) { - log_err("Error establishing application connection."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (cacep_snd(fd, &snd_info)) { - log_err("Failed to respond to application connection request."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (strcmp(snd_info.ae_name, rcv_info->ae_name)) { - log_err("Received connection for wrong AE."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (strcmp(snd_info.protocol, rcv_info->protocol) || - snd_info.pref_version != rcv_info->pref_version || - snd_info.pref_syntax != rcv_info->pref_syntax) { - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) { - flow_dealloc(fd); - free(rcv_info); - return 0; - } - - if (add_ga(instance, fd, qs, rcv_info)) { - log_err("Failed to add ga to graph adjacency manager list."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - return 0; + return gam; } -int gam_flow_alloc(struct gam * instance, - char * dst_name, - qosspec_t qs) +void gam_destroy(struct gam * gam) { - struct conn_info * rcv_info; - struct conn_info snd_info; - int fd; - - log_dbg("Allocating flow to %s.", dst_name); - - rcv_info = malloc(sizeof(*rcv_info)); - if (rcv_info == NULL) - return -ENOMEM; - - fd = flow_alloc(dst_name, NULL); - if (fd < 0) { - log_err("Failed to allocate flow to %s.", dst_name); - return -1; - } - - if (flow_alloc_res(fd)) { - log_err("Flow allocation to %s failed.", dst_name); - flow_dealloc(fd); - return -1; - } - - memset(&snd_info, 0, sizeof(snd_info)); - memset(rcv_info, 0, sizeof(*rcv_info)); - - /* FIXME: send correct AE */ - strcpy(snd_info.ae_name, "FIXME:CORRECT_AE"); - strcpy(snd_info.protocol, CDAP_PROTO); - snd_info.pref_version = 1; - snd_info.pref_syntax = PROTO_GPB; - snd_info.addr = ipcpi.dt_addr; - - if (cacep_snd(fd, &snd_info)) { - log_err("Failed to create application connection."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (cacep_rcv(fd, rcv_info)) { - log_err("Failed to connect to application."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (strcmp(snd_info.protocol, rcv_info->protocol) || - snd_info.pref_version != rcv_info->pref_version || - snd_info.pref_syntax != rcv_info->pref_syntax) { - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) { - flow_dealloc(fd); - free(rcv_info); - return 0; - } - - if (add_ga(instance, fd, qs, rcv_info)) { - log_err("Failed to add GA to graph adjacency manager list."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - return 0; -} - -int gam_flow_wait(struct gam * instance, - int * fd, - struct conn_info ** info, - qosspec_t * qs) -{ - struct ga * ga; - - assert(fd); - assert(info); - assert(qs); - - pthread_mutex_lock(&instance->gas_lock); - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) &instance->gas_lock); - - while (list_is_empty(&instance->gas)) - pthread_cond_wait(&instance->gas_cond, &instance->gas_lock); - - ga = list_first_entry((&instance->gas), struct ga, next); - if (ga == NULL) { - pthread_mutex_unlock(&instance->gas_lock); - return -1; - } - - *fd = ga->fd; - *info = ga->info; - *qs = ga->qs; - - list_del(&ga->next); - free(ga); - - pthread_cleanup_pop(true); + assert(gam); - return 0; + gam->ops->destroy(gam->ops_o); + free(gam); } diff --git a/src/ipcpd/normal/gam.h b/src/ipcpd/normal/gam.h index 58b028b9..01a6e40e 100644 --- a/src/ipcpd/normal/gam.h +++ b/src/ipcpd/normal/gam.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2017 * - * Graph adjacency manager for IPC Process components + * Data transfer graph adjacency manager * * Dimitri Staessens * Sander Vrijders @@ -26,21 +26,12 @@ #include #include -struct gam * gam_create(enum pol_gam gam_type); +#include "neighbors.h" -void gam_destroy(struct gam * instance); +struct gam * gam_create(enum pol_gam gam_type, + struct nbs * nbs, + struct ae * ae); -int gam_flow_arr(struct gam * instance, - int fd, - qosspec_t qs); - -int gam_flow_alloc(struct gam * instance, - char * dst_name, - qosspec_t qs); - -int gam_flow_wait(struct gam * instance, - int * fd, - struct conn_info ** info, - qosspec_t * qs); +void gam_destroy(struct gam * gam); #endif /* OUROBOROS_IPCPD_NORMAL_GAM_H */ diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 3e5907a8..8b9a7c09 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -105,11 +105,6 @@ static int boot_components(void) log_dbg("Starting components."); - if (connmgr_init()) { - log_err("Failed to init ap connection manager"); - return -1; - } - if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa)) != sizeof(pa)) { log_err("Failed to read policy for address authority."); @@ -126,7 +121,6 @@ static int boot_components(void) if (ipcpi.dt_addr == 0) { log_err("Failed to get a valid address."); addr_auth_fini(); - connmgr_fini(); return -1; } @@ -137,7 +131,6 @@ static int boot_components(void) if (ribmgr_init()) { log_err("Failed to initialize RIB manager."); addr_auth_fini(); - connmgr_fini(); return -1; } @@ -145,7 +138,6 @@ static int boot_components(void) log_err("Failed to initialize directory."); ribmgr_fini(); addr_auth_fini(); - connmgr_fini(); return -1; } @@ -155,7 +147,6 @@ static int boot_components(void) dir_fini(); ribmgr_fini(); addr_auth_fini(); - connmgr_fini(); log_err("Failed to start flow manager."); return -1; } @@ -165,20 +156,29 @@ static int boot_components(void) dir_fini(); ribmgr_fini(); addr_auth_fini(); - connmgr_fini(); log_err("Failed to initialize FRCT."); return -1; } + if (enroll_start()) { + fmgr_fini(); + dir_fini(); + ribmgr_fini(); + addr_auth_fini(); + log_err("Failed to start enroll."); + return -1; + } + ipcp_set_state(IPCP_OPERATIONAL); if (connmgr_start()) { ipcp_set_state(IPCP_INIT); + enroll_stop(); + frct_fini(); fmgr_fini(); dir_fini(); ribmgr_fini(); addr_auth_fini(); - connmgr_fini(); log_err("Failed to start AP connection manager."); return -1; } @@ -190,6 +190,8 @@ void shutdown_components(void) { connmgr_stop(); + enroll_stop(); + frct_fini(); fmgr_fini(); @@ -199,8 +201,6 @@ void shutdown_components(void) ribmgr_fini(); addr_auth_fini(); - - connmgr_fini(); } static int normal_ipcp_enroll(char * dst_name) @@ -418,11 +418,33 @@ int main(int argc, exit(EXIT_FAILURE); } + + if (connmgr_init()) { + log_err("Failed to initialize connection manager."); + ipcp_create_r(getpid(), -1); + rib_fini(); + irm_unbind_api(getpid(), ipcpi.name); + ipcp_fini(); + exit(EXIT_FAILURE); + } + + if (enroll_init()) { + log_err("Failed to initialize enroll component."); + ipcp_create_r(getpid(), -1); + connmgr_fini(); + rib_fini(); + irm_unbind_api(getpid(), ipcpi.name); + ipcp_fini(); + exit(EXIT_FAILURE); + } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); if (ipcp_boot() < 0) { log_err("Failed to boot IPCP."); ipcp_create_r(getpid(), -1); + enroll_fini(); + connmgr_fini(); rib_fini(); irm_unbind_api(getpid(), ipcpi.name); ipcp_fini(); @@ -435,6 +457,8 @@ int main(int argc, log_err("Failed to notify IRMd we are initialized."); ipcp_set_state(IPCP_NULL); ipcp_shutdown(); + enroll_fini(); + connmgr_fini(); rib_fini(); irm_unbind_api(getpid(), ipcpi.name); ipcp_fini(); @@ -448,6 +472,10 @@ int main(int argc, rib_fini(); + connmgr_fini(); + + enroll_fini(); + irm_unbind_api(getpid(), ipcpi.name); ipcp_fini(); diff --git a/src/ipcpd/normal/neighbors.c b/src/ipcpd/normal/neighbors.c new file mode 100644 index 00000000..40ef0d73 --- /dev/null +++ b/src/ipcpd/normal/neighbors.c @@ -0,0 +1,213 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Data transfer neighbors + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "neighbors" + +#include +#include +#include +#include +#include + +#include "neighbors.h" + +#include +#include +#include + +static void notify_listeners(enum nb_event event, + struct nb * nb, + struct nbs * nbs) +{ + struct list_head * p = NULL; + + list_for_each(p, &nbs->notifiers) { + struct nb_notifier * e = + list_entry(p, struct nb_notifier, next); + if (e->notify_call(event, nb->conn)) + log_err("Listener reported an error."); + } +} + +struct nbs * nbs_create(void) +{ + struct nbs * nbs; + + nbs = malloc(sizeof(*nbs)); + if (nbs == NULL) + return NULL; + + list_head_init(&nbs->list); + list_head_init(&nbs->notifiers); + + if (pthread_mutex_init(&nbs->list_lock, NULL)) + return NULL; + + if (pthread_mutex_init(&nbs->notifiers_lock, NULL)) { + pthread_mutex_destroy(&nbs->list_lock); + return NULL; + } + + return nbs; +} + +void nbs_destroy(struct nbs * nbs) +{ + struct list_head * p = NULL; + struct list_head * n = NULL; + + assert(nbs); + + pthread_mutex_lock(&nbs->list_lock); + + list_for_each_safe(p, n, &nbs->list) { + struct nb * e = list_entry(p, struct nb, next); + list_del(&e->next); + free(e); + } + + pthread_mutex_unlock(&nbs->list_lock); + + pthread_mutex_destroy(&nbs->list_lock); + pthread_mutex_destroy(&nbs->notifiers_lock); +} + +int nbs_add(struct nbs * nbs, + struct conn conn) +{ + struct nb * nb; + + assert(nbs); + + nb = malloc(sizeof(*nb)); + if (nb == NULL) + return -ENOMEM; + + nb->conn = conn; + + list_head_init(&nb->next); + + pthread_mutex_lock(&nbs->list_lock); + + list_add(&nb->next, &nbs->list); + + notify_listeners(NEIGHBOR_ADDED, nb, nbs); + + pthread_mutex_unlock(&nbs->list_lock); + + log_info("Added neighbor with address %" PRIu64 " to list.", + conn.conn_info.addr); + + return 0; +} + +int nbs_update_qos(struct nbs * nbs, + int fd, + qosspec_t qs) +{ + struct list_head * p = NULL; + + assert(nbs); + + pthread_mutex_lock(&nbs->list_lock); + + list_for_each(p, &nbs->list) { + struct nb * e = list_entry(p, struct nb, next); + if (e->conn.flow_info.fd == fd) { + e->conn.flow_info.qs = qs; + + notify_listeners(NEIGHBOR_QOS_CHANGE, e, nbs); + + pthread_mutex_unlock(&nbs->list_lock); + return 0; + } + } + + pthread_mutex_unlock(&nbs->list_lock); + + return -1; +} + +int nbs_del(struct nbs * nbs, + int fd) +{ + struct list_head * p = NULL; + struct list_head * n = NULL; + + assert(nbs); + + pthread_mutex_lock(&nbs->list_lock); + + list_for_each_safe(p, n, &nbs->list) { + struct nb * e = list_entry(p, struct nb, next); + if (e->conn.flow_info.fd == fd) { + notify_listeners(NEIGHBOR_REMOVED, e, nbs); + list_del(&e->next); + free(e); + pthread_mutex_unlock(&nbs->list_lock); + return 0; + } + } + + pthread_mutex_unlock(&nbs->list_lock); + + return -1; +} + +int nbs_reg_notifier(struct nbs * nbs, + struct nb_notifier * notify) +{ + assert(nbs); + assert(notify); + + pthread_mutex_lock(&nbs->notifiers_lock); + + list_head_init(¬ify->next); + list_add(¬ify->next, &nbs->notifiers); + + pthread_mutex_unlock(&nbs->notifiers_lock); + + return 0; +} + +int nbs_unreg_notifier(struct nbs * nbs, + struct nb_notifier * notify) +{ + struct list_head * p = NULL; + struct list_head * n = NULL; + + pthread_mutex_lock(&nbs->notifiers_lock); + + list_for_each_safe(p, n, &nbs->notifiers) { + struct nb_notifier * e = + list_entry(p, struct nb_notifier, next); + if (e == notify) { + list_del(&e->next); + pthread_mutex_unlock(&nbs->notifiers_lock); + return 0; + } + } + + pthread_mutex_unlock(&nbs->notifiers_lock); + + return -1; +} diff --git a/src/ipcpd/normal/neighbors.h b/src/ipcpd/normal/neighbors.h new file mode 100644 index 00000000..743bc7b8 --- /dev/null +++ b/src/ipcpd/normal/neighbors.h @@ -0,0 +1,81 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Data transfer neighbors + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H +#define OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H + +#include +#include +#include +#include +#include + +#include "connmgr.h" + +enum nb_event { + NEIGHBOR_ADDED, + NEIGHBOR_REMOVED, + NEIGHBOR_QOS_CHANGE +}; + +typedef int (* nb_notify_t)(enum nb_event event, + struct conn conn); + +struct nb_notifier { + struct list_head next; + nb_notify_t notify_call; +}; + +struct nb { + struct list_head next; + struct conn conn; +}; + +struct nbs { + struct list_head notifiers; + pthread_mutex_t notifiers_lock; + + struct list_head list; + pthread_mutex_t list_lock; +}; + +struct nbs * nbs_create(void); + +void nbs_destroy(struct nbs * nbs); + +int nbs_add(struct nbs * nbs, + struct conn conn); + +int nbs_update_qos(struct nbs * nbs, + int fd, + qosspec_t qs); + +int nbs_del(struct nbs * nbs, + int fd); + +int nbs_reg_notifier(struct nbs * nbs, + struct nb_notifier * notify); + +int nbs_unreg_notifier(struct nbs * nbs, + struct nb_notifier * notify); + +#endif diff --git a/src/ipcpd/normal/pff.c b/src/ipcpd/normal/pff.c index 2f7d554b..b44f79bf 100644 --- a/src/ipcpd/normal/pff.c +++ b/src/ipcpd/normal/pff.c @@ -55,15 +55,16 @@ struct pff * pff_create(void) return tmp; } -int pff_destroy(struct pff * instance) +void pff_destroy(struct pff * instance) { assert(instance); + pthread_mutex_lock(&instance->lock); htable_destroy(instance->table); + pthread_mutex_unlock(&instance->lock); + pthread_mutex_destroy(&instance->lock); free(instance); - - return 0; } int pff_add(struct pff * instance, uint64_t addr, int fd) diff --git a/src/ipcpd/normal/pff.h b/src/ipcpd/normal/pff.h index b4a1400b..d4edb90c 100644 --- a/src/ipcpd/normal/pff.h +++ b/src/ipcpd/normal/pff.h @@ -25,8 +25,6 @@ #include -struct pff; - /* * PFF will take a type in the future, * to allow different policies. @@ -34,7 +32,7 @@ struct pff; */ struct pff * pff_create(void); -int pff_destroy(struct pff * instance); +void pff_destroy(struct pff * instance); int pff_add(struct pff * instance, uint64_t addr, diff --git a/src/ipcpd/normal/pol-gam-ops.h b/src/ipcpd/normal/pol-gam-ops.h index 264f252b..a7753b8b 100644 --- a/src/ipcpd/normal/pol-gam-ops.h +++ b/src/ipcpd/normal/pol-gam-ops.h @@ -24,21 +24,13 @@ #define OUROBOROS_IPCPD_NORMAL_POL_GAM_OPS_H #include +#include struct pol_gam_ops { - void * (* create)(struct gam * instance); + void * (* create)(struct nbs * nbs, + struct ae * ae); void (* destroy)(void * o); - - int (* start)(void * o); - - int (* stop)(void * o); - - int (* accept_new_flow)(void * o); - - int (* accept_flow)(void * o, - qosspec_t qs, - const struct conn_info * info); }; #endif /* OUROBOROS_IPCPD_NORMAL_POL_GAM_OPS_H */ diff --git a/src/ipcpd/normal/pol/complete.c b/src/ipcpd/normal/pol/complete.c index daf8c9bf..f84c3a23 100644 --- a/src/ipcpd/normal/pol/complete.c +++ b/src/ipcpd/normal/pol/complete.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2017 * - * Graph adjacency manager for IPC Process components + * Sets up a complete graph * * Dimitri Staessens * Sander Vrijders @@ -20,35 +20,54 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define OUROBOROS_PREFIX "complete-graph-adjacency-manager" +#define OUROBOROS_PREFIX "complete" #include -#include -#include -#include +#include #include +#include +#include +#include +#include -#include "ipcp.h" -#include "gam.h" +#include "neighbors.h" +#include "frct.h" #include "ribconfig.h" +#include "ipcp.h" +#include "ae.h" #include #include #include -struct neighbor { - struct list_head next; - uint64_t neighbor; +struct complete { + struct nbs * nbs; + struct ae * ae; + pthread_t allocator; + pthread_t listener; }; -struct complete { - struct list_head neighbors; - pthread_mutex_t neighbors_lock; +static void * listener(void * o) +{ + struct complete * complete; + struct conn conn; - pthread_t allocator; + complete = (struct complete *) o; - struct gam * gam; -}; + while (true) { + if (connmgr_wait(complete->ae, &conn)) { + log_err("Error while getting next connection."); + continue; + } + + if (nbs_add(complete->nbs, conn)) { + log_err("Failed to add neighbor."); + continue; + } + } + + return (void *) 0; +} static void * allocator(void * o) { @@ -56,10 +75,10 @@ static void * allocator(void * o) ssize_t len; char ** children; ssize_t i; - struct complete * complete = (struct complete *) o; + struct complete * complete; + struct conn conn; - assert(complete); - assert(complete->gam); + complete = (struct complete *) o; qs.delay = 0; qs.jitter = 0; @@ -67,8 +86,23 @@ static void * allocator(void * o) /* FIXME: subscribe to members to keep the graph complete. */ len = rib_children("/" MEMBERS_NAME, &children); for (i = 0; i < len; ++i) { - if (strcmp(children[i], ipcpi.name) < 0) - gam_flow_alloc(complete->gam, children[i], qs); + if (strcmp(children[i], ipcpi.name) < 0) { + if (connmgr_alloc(complete->ae, + children[i], + &qs, + &conn)) { + log_warn("Failed to get a conn to neighbor."); + free(children[i]); + continue; + } + + if (nbs_add(complete->nbs, conn)) { + log_err("Failed to add neighbor."); + free(children[i]); + continue; + } + + } free(children[i]); } @@ -78,118 +112,41 @@ static void * allocator(void * o) return (void *) 0; } -void * complete_create(struct gam * gam) +void * complete_create(struct nbs * nbs, + struct ae * ae) { struct complete * complete; - assert(gam); - complete = malloc(sizeof(*complete)); if (complete == NULL) return NULL; - list_head_init(&complete->neighbors); - complete->gam = gam; - - if (pthread_mutex_init(&complete->neighbors_lock, NULL)) { - free(complete); - return NULL; - } - - return (void *) complete; -} - -int complete_start(void * o) -{ - struct complete * complete = (struct complete *) o; - - assert(complete); - assert(complete->gam); + complete->nbs = nbs; + complete->ae = ae; if (pthread_create(&complete->allocator, NULL, - allocator, (void *) complete)) { - pthread_mutex_destroy(&complete->neighbors_lock); - free(complete); - return -1; - } + allocator, (void *) complete)) + return NULL; - /* FIXME: Handle flooding of the flow allocator before detaching.*/ - pthread_join(complete->allocator, NULL); + if (pthread_create(&complete->listener, NULL, + listener, (void *) complete)) + return NULL; - return 0; + return complete; } -int complete_stop(void * o) +void complete_destroy(void * ops_o) { - (void) o; + struct complete * complete; - return 0; -} + assert(ops_o); -void complete_destroy(void * o) -{ - struct list_head * p = NULL; - struct list_head * n = NULL; - struct complete * complete = (struct complete *) o; - - list_for_each_safe(p, n, &complete->neighbors) { - struct neighbor * e = list_entry(p, struct neighbor, next); - list_del(&e->next); - free(e); - } + complete = (struct complete *) ops_o; - pthread_mutex_destroy(&complete->neighbors_lock); + pthread_cancel(complete->allocator); + pthread_cancel(complete->listener); + pthread_join(complete->allocator, NULL); + pthread_join(complete->listener, NULL); free(complete); } - -int complete_accept_new_flow(void * o) -{ - (void) o; - - return 0; -} - -int complete_accept_flow(void * o, - qosspec_t qs, - const struct conn_info * info) -{ - struct list_head * pos = NULL; - struct neighbor * n; - struct complete * complete = (struct complete *) o; - - (void) qs; - - assert(complete); - - pthread_mutex_lock(&complete->neighbors_lock); - - list_for_each(pos, &complete->neighbors) { - struct neighbor * e = list_entry(pos, struct neighbor, next); - /* FIXME: figure out union type and check name or address */ - if (e->neighbor == info->addr) { - pthread_mutex_unlock(&complete->neighbors_lock); - return -1; - } - - assert(complete); - assert(&complete->neighbors_lock); - assert(pos->nxt); - } - - n = malloc(sizeof(*n)); - if (n == NULL) { - pthread_mutex_unlock(&complete->neighbors_lock); - return -1; - } - - list_head_init(&n->next); - - n->neighbor = info->addr; - - list_add(&n->next, &complete->neighbors); - - pthread_mutex_unlock(&complete->neighbors_lock); - - return 0; -} diff --git a/src/ipcpd/normal/pol/complete.h b/src/ipcpd/normal/pol/complete.h index 8fe1437f..40aca69d 100644 --- a/src/ipcpd/normal/pol/complete.h +++ b/src/ipcpd/normal/pol/complete.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2017 * - * Graph adjacency manager for IPC Process components + * Sets up a complete graph * * Dimitri Staessens * Sander Vrijders @@ -23,30 +23,19 @@ #ifndef OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H #define OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H -#include "gam.h" -#include "pol-gam-ops.h" - -void * complete_create(struct gam * instance); - -void complete_destroy(void * o); +#include +#include -int complete_start(void * o); - -int complete_stop(void * o); +#include "pol-gam-ops.h" -int complete_accept_new_flow(void * o); +void * complete_create(struct nbs * nbs, + struct ae * ae); -int complete_accept_flow(void * o, - qosspec_t qs, - const struct conn_info * info); +void complete_destroy(void * ops_o); struct pol_gam_ops complete_ops = { - .create = complete_create, - .destroy = complete_destroy, - .start = complete_start, - .stop = complete_stop, - .accept_new_flow = complete_accept_new_flow, - .accept_flow = complete_accept_flow + .create = complete_create, + .destroy = complete_destroy }; #endif /* OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H */ diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h index 15b65ce2..5ecdaab3 100644 --- a/src/ipcpd/normal/ribconfig.h +++ b/src/ipcpd/normal/ribconfig.h @@ -31,9 +31,11 @@ #define MEMBERS_NAME "members" #define DIF_NAME "dif_name" #define DIR_NAME "directory" +#define ROUTING_NAME "fsdb" #define DIF_PATH DLR DIF_NAME #define DIR_PATH DLR DIR_NAME #define BOOT_PATH DLR BOOT_NAME #define MEMBERS_PATH DLR MEMBERS_NAME +#define ROUTING_PATH DLR ROUTING_NAME #endif /* OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H */ diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 25f1687e..e8fa77a4 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -45,25 +45,70 @@ #include struct { - flow_set_t * fs; - fqueue_t * fq; - struct gam * gam; + flow_set_t * fs; + fqueue_t * fq; + + struct gam * gam; + struct nbs * nbs; + struct ae * ae; + + struct nb_notifier nb_notifier; } ribmgr; +static int ribmgr_neighbor_event(enum nb_event event, + struct conn conn) +{ + /* We are only interested in neighbors being added and removed. */ + switch (event) { + case NEIGHBOR_ADDED: + flow_set_add(ribmgr.fs, conn.flow_info.fd); + break; + case NEIGHBOR_REMOVED: + flow_set_del(ribmgr.fs, conn.flow_info.fd); + break; + default: + break; + } + + return 0; +} int ribmgr_init(void) { - enum pol_gam pg; + enum pol_gam pg; + struct conn_info info; + + strcpy(info.ae_name, MGMT_AE); + strcpy(info.protocol, CDAP_PROTO); + info.pref_version = 1; + info.pref_syntax = PROTO_GPB; + + ribmgr.nbs = nbs_create(); + if (ribmgr.nbs == NULL) { + log_err("Failed to create neighbors."); + return -1; + } + + ribmgr.ae = connmgr_ae_create(info); + if (ribmgr.ae == NULL) { + log_err("Failed to create AE struct."); + nbs_destroy(ribmgr.nbs); + return -1; + } if (rib_read(BOOT_PATH "/rm/gam/type", &pg, sizeof(pg)) != sizeof(pg)) { log_err("Failed to read policy for ribmgr gam."); + connmgr_ae_destroy(ribmgr.ae); + nbs_destroy(ribmgr.nbs); return -1; } - ribmgr.gam = gam_create(pg); + ribmgr.gam = gam_create(pg, ribmgr.nbs, ribmgr.ae); if (ribmgr.gam == NULL) { log_err("Failed to create gam."); + connmgr_ae_destroy(ribmgr.ae); + nbs_destroy(ribmgr.nbs); return -1; } @@ -71,6 +116,8 @@ int ribmgr_init(void) if (ribmgr.fs == NULL) { log_err("Failed to create flow set."); gam_destroy(ribmgr.gam); + connmgr_ae_destroy(ribmgr.ae); + nbs_destroy(ribmgr.nbs); return -1; } @@ -79,6 +126,19 @@ int ribmgr_init(void) log_err("Failed to create fq."); flow_set_destroy(ribmgr.fs); gam_destroy(ribmgr.gam); + connmgr_ae_destroy(ribmgr.ae); + nbs_destroy(ribmgr.nbs); + return -1; + } + + ribmgr.nb_notifier.notify_call = ribmgr_neighbor_event; + if (nbs_reg_notifier(ribmgr.nbs, &ribmgr.nb_notifier)) { + log_err("Failed to register notifier."); + fqueue_destroy(ribmgr.fq); + flow_set_destroy(ribmgr.fs); + gam_destroy(ribmgr.gam); + connmgr_ae_destroy(ribmgr.ae); + nbs_destroy(ribmgr.nbs); return -1; } @@ -87,20 +147,12 @@ int ribmgr_init(void) void ribmgr_fini(void) { + nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier); flow_set_destroy(ribmgr.fs); fqueue_destroy(ribmgr.fq); gam_destroy(ribmgr.gam); -} - -int ribmgr_flow_arr(int fd, - qosspec_t qs) -{ - assert(ribmgr.gam); - - if (gam_flow_arr(ribmgr.gam, fd, qs)) - return -1; - - return 0; + connmgr_ae_destroy(ribmgr.ae); + nbs_destroy(ribmgr.nbs); } int ribmgr_disseminate(char * path, diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index 12f407ab..f3f4cc24 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -41,9 +41,6 @@ int ribmgr_init(void); void ribmgr_fini(void); -int ribmgr_flow_arr(int fd, - qosspec_t qs); - int ribmgr_disseminate(char * path, enum diss_target target, enum diss_freq freq, diff --git a/src/ipcpd/normal/routing.c b/src/ipcpd/normal/routing.c new file mode 100644 index 00000000..48c2c16d --- /dev/null +++ b/src/ipcpd/normal/routing.c @@ -0,0 +1,132 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Routing component of the IPCP + * + * Sander Vrijders + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "routing" + +#include +#include +#include +#include + +#include "routing.h" +#include "ribmgr.h" + +#include +#include +#include + +struct edge { + struct vertex * ep; + qosspec_t qs; +}; + +struct vertex { + struct list_head next; + + uint64_t addr; + + struct list_head edges; +}; + +struct routing { + struct pff * pff; + struct nbs * nbs; + + struct nb_notifier nb_notifier; + + struct list_head vertices; +}; + +static int routing_neighbor_event(enum nb_event event, + struct conn conn) +{ + (void) conn; + + /* FIXME: React to events here */ + switch (event) { + case NEIGHBOR_ADDED: + break; + case NEIGHBOR_REMOVED: + break; + case NEIGHBOR_QOS_CHANGE: + break; + default: + break; + } + + return 0; +} + +#if 0 +/* FIXME: If zeroed since it is not used currently */ +static int add_vertex(struct routing * instance, + uint64_t addr) +{ + struct vertex * vertex; + + vertex = malloc(sizeof(*vertex)); + if (vertex == NULL) + return -1; + + list_head_init(&vertex->next); + list_head_init(&vertex->edges); + vertex->addr = addr; + + list_add(&vertex->next, &instance->vertices); + + return 0; +} +#endif + +struct routing * routing_create(struct pff * pff, + struct nbs * nbs) +{ + struct routing * tmp; + + assert(pff); + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + return NULL; + + tmp->pff = pff; + tmp->nbs = nbs; + + list_head_init(&tmp->vertices); + + tmp->nb_notifier.notify_call = routing_neighbor_event; + if (nbs_reg_notifier(tmp->nbs, &tmp->nb_notifier)) { + free(tmp); + return NULL; + } + + return tmp; +} + +void routing_destroy(struct routing * instance) +{ + assert(instance); + + nbs_unreg_notifier(instance->nbs, &instance->nb_notifier); + + free(instance); +} diff --git a/src/ipcpd/normal/routing.h b/src/ipcpd/normal/routing.h new file mode 100644 index 00000000..624763ec --- /dev/null +++ b/src/ipcpd/normal/routing.h @@ -0,0 +1,42 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Routing component of the IPCP + * + * Sander Vrijders + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_ROUTING_H +#define OUROBOROS_IPCPD_NORMAL_ROUTING_H + +#include + +#include "pff.h" +#include "neighbors.h" + +#include + +/* + * Routing will take a type in the future, + * to allow different policies. + */ +struct routing * routing_create(struct pff * pff, + struct nbs * nbs); + +void routing_destroy(struct routing * instance); + +#endif /* OUROBOROS_IPCPD_NORMAL_ROUTING_H */ -- cgit v1.2.3