From 9d2fbef7b8569aac930c95ca1afb92a5dec79dac Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 2 Mar 2017 15:29:11 +0100 Subject: ipcpd: normal: Add connection manager This adds the connection manager which allows the different AEs of the normal IPCP to register with it. An AE can then use the connection manager to allocate a flow to a neighbor, or to wait for a new connection from a neighbor. --- src/ipcpd/normal/CMakeLists.txt | 1 + src/ipcpd/normal/connmgr.c | 350 ++++++++++++++++++++++++++++++++++++++++ src/ipcpd/normal/connmgr.h | 57 +++++++ src/ipcpd/normal/enroll.c | 2 +- src/ipcpd/normal/fmgr.c | 2 +- src/ipcpd/normal/frct.c | 8 +- src/ipcpd/normal/gam.c | 4 +- src/ipcpd/normal/main.c | 93 +++-------- src/ipcpd/normal/pol/complete.c | 13 +- 9 files changed, 445 insertions(+), 85 deletions(-) create mode 100644 src/ipcpd/normal/connmgr.c create mode 100644 src/ipcpd/normal/connmgr.h (limited to 'src/ipcpd/normal') diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 7e10cc0d..70742336 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -20,6 +20,7 @@ set(SOURCE_FILES # Add source files here addr_auth.c cdap_flow.c + connmgr.c dir.c enroll.c fmgr.c diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c new file mode 100644 index 00000000..387c38fd --- /dev/null +++ b/src/ipcpd/normal/connmgr.c @@ -0,0 +1,350 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Handles the different AP connections + * + * Sander Vrijders + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "normal-ipcp" + +#include +#include +#include +#include +#include +#include + +#include "ae.h" +#include "connmgr.h" +#include "enroll.h" +#include "fmgr.h" +#include "frct.h" +#include "ipcp.h" +#include "ribmgr.h" + +#include +#include +#include +#include + +#define FRCT_PROTO "frct" + +struct ae_conn { + struct list_head next; + struct conn conn; +}; + +struct ae { + struct list_head next; + struct conn_info info; + + struct list_head conn_list; + pthread_cond_t conn_cond; + pthread_mutex_t conn_lock; +}; + +struct { + pthread_t acceptor; + + struct list_head aes; + pthread_mutex_t aes_lock; +} connmgr; + +static int add_ae_conn(struct ae * ae, + int fd, + qosspec_t qs, + struct conn_info * rcv_info) +{ + struct ae_conn * ae_conn = NULL; + + ae_conn = malloc(sizeof(*ae_conn)); + if (ae_conn == NULL) { + log_err("Not enough memory."); + return -1; + } + + ae_conn->conn.conn_info = *rcv_info; + ae_conn->conn.flow_info.fd = fd; + ae_conn->conn.flow_info.qs = qs; + + list_head_init(&ae_conn->next); + + pthread_mutex_lock(&ae->conn_lock); + list_add(&ae_conn->next, &ae->conn_list); + pthread_cond_signal(&ae->conn_cond); + pthread_mutex_unlock(&ae->conn_lock); + + return 0; +} + +static struct ae * find_ae_by_name(char * name) +{ + struct list_head * p = NULL; + + list_for_each(p, &connmgr.aes) { + struct ae * ae = list_entry(p, struct ae, next); + if (strcmp(ae->info.ae_name, name) == 0) + return ae; + } + + return NULL; +} + +static void * flow_acceptor(void * o) +{ + int fd; + qosspec_t qs; + struct conn_info rcv_info; + struct conn_info fail_info; + struct ae * ae = NULL; + + (void) o; + + memset(&fail_info, 0, sizeof(fail_info)); + + while (true) { + pthread_rwlock_rdlock(&ipcpi.state_lock); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_rwlock_unlock(&ipcpi.state_lock); + log_info("Shutting down flow acceptor."); + return 0; + } + + pthread_rwlock_unlock(&ipcpi.state_lock); + + fd = flow_accept(&qs); + if (fd < 0) { + if (fd != -EIRMD) + log_warn("Flow accept failed: %d", fd); + continue; + } + + if (flow_alloc_resp(fd, 0)) { + log_err("Failed to respond to flow alloc request."); + continue; + } + + if (cacep_rcv(fd, &rcv_info)) { + log_err("Error establishing application connection."); + flow_dealloc(fd); + continue; + } + + pthread_mutex_lock(&connmgr.aes_lock); + ae = find_ae_by_name(rcv_info.ae_name); + pthread_mutex_unlock(&connmgr.aes_lock); + + if (ae != NULL) { + if (cacep_snd(fd, &ae->info)) { + log_err("Failed to respond to req."); + flow_dealloc(fd); + continue; + } + + if (add_ae_conn(ae, fd, qs, &rcv_info)) { + log_err("Failed to add ae conn."); + flow_dealloc(fd); + continue; + } + } else { + cacep_snd(fd, &fail_info); + flow_dealloc(fd); + } + } + + return (void *) 0; +} + +int connmgr_init(void) +{ + list_head_init(&connmgr.aes); + + if (pthread_mutex_init(&connmgr.aes_lock, NULL)) + return -1; + + return 0; +} + +int connmgr_start(void) +{ + pthread_create(&connmgr.acceptor, NULL, flow_acceptor, NULL); + + return 0; +} + +void connmgr_stop(void) +{ + pthread_cancel(connmgr.acceptor); + pthread_join(connmgr.acceptor, NULL); +} + +void connmgr_fini(void) +{ + struct list_head * p = NULL; + struct list_head * n = NULL; + + pthread_mutex_lock(&connmgr.aes_lock); + + list_for_each_safe(p, n, &connmgr.aes) { + struct ae * e = list_entry(p, struct ae, next); + connmgr_ae_destroy(e); + } + + pthread_mutex_unlock(&connmgr.aes_lock); + + pthread_mutex_destroy(&connmgr.aes_lock); +} + +struct ae * connmgr_ae_create(struct conn_info info) +{ + struct ae * ae; + + ae = malloc(sizeof(*ae)); + if (ae == NULL) + return NULL; + + list_head_init(&ae->next); + list_head_init(&ae->conn_list); + + ae->info = info; + + if (pthread_mutex_init(&ae->conn_lock, NULL)) { + free(ae); + return NULL; + } + + if (pthread_cond_init(&ae->conn_cond, NULL)) { + pthread_mutex_destroy(&ae->conn_lock); + free(ae); + return NULL; + } + + pthread_mutex_lock(&connmgr.aes_lock); + list_add(&ae->next, &connmgr.aes); + pthread_mutex_unlock(&connmgr.aes_lock); + + return ae; +} + +void connmgr_ae_destroy(struct ae * ae) +{ + struct list_head * p = NULL; + struct list_head * n = NULL; + + assert(ae); + + pthread_mutex_lock(&connmgr.aes_lock); + pthread_mutex_lock(&ae->conn_lock); + + list_for_each_safe(p, n, &ae->conn_list) { + struct ae_conn * e = list_entry(p, struct ae_conn, next); + list_del(&e->next); + free(e); + } + + pthread_mutex_unlock(&ae->conn_lock); + + pthread_cond_destroy(&ae->conn_cond); + pthread_mutex_destroy(&ae->conn_lock); + + list_del(&ae->next); + + pthread_mutex_unlock(&connmgr.aes_lock); + + free(ae); +} + +int connmgr_alloc(struct ae * ae, + char * dst_name, + qosspec_t qs, + struct conn * conn) +{ + assert(ae); + assert(dst_name); + assert(conn); + + memset(&conn->conn_info, 0, sizeof(conn->conn_info)); + + conn->flow_info.fd = flow_alloc(dst_name, &qs); + if (conn->flow_info.fd < 0) { + log_err("Failed to allocate flow to %s.", dst_name); + return -1; + } + + conn->flow_info.qs = qs; + + if (flow_alloc_res(conn->flow_info.fd)) { + log_err("Flow allocation to %s failed.", dst_name); + flow_dealloc(conn->flow_info.fd); + return -1; + } + + if (cacep_snd(conn->flow_info.fd, &ae->info)) { + log_err("Failed to create application connection."); + flow_dealloc(conn->flow_info.fd); + return -1; + } + + if (cacep_rcv(conn->flow_info.fd, &conn->conn_info)) { + log_err("Failed to connect to application."); + flow_dealloc(conn->flow_info.fd); + return -1; + } + + if (strcmp(ae->info.protocol, conn->conn_info.protocol) || + ae->info.pref_version != conn->conn_info.pref_version || + ae->info.pref_syntax != conn->conn_info.pref_syntax) { + flow_dealloc(conn->flow_info.fd); + return -1; + } + + return 0; +} + +int connmgr_wait(struct ae * ae, + struct conn * conn) +{ + struct ae_conn * ae_conn; + + assert(ae); + assert(conn); + + pthread_mutex_lock(&ae->conn_lock); + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) &ae->conn_lock); + + while (list_is_empty(&ae->conn_list)) + pthread_cond_wait(&ae->conn_cond, &ae->conn_lock); + + ae_conn = list_first_entry((&ae->conn_list), struct ae_conn, next); + if (ae_conn == NULL) { + pthread_mutex_unlock(&ae->conn_lock); + return -1; + } + + *conn = ae_conn->conn; + + list_del(&ae_conn->next); + free(ae_conn); + + pthread_cleanup_pop(true); + + return 0; +} diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h new file mode 100644 index 00000000..bfb3d762 --- /dev/null +++ b/src/ipcpd/normal/connmgr.h @@ -0,0 +1,57 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Handles the different AP connections + * + * Sander Vrijders + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_CONNMGR_H +#define OUROBOROS_IPCPD_NORMAL_CONNMGR_H + +#include +#include + +struct conn { + struct conn_info conn_info; + struct flow_info { + int fd; + qosspec_t qs; + } flow_info; +}; + +int connmgr_init(void); + +void connmgr_fini(void); + +int connmgr_start(void); + +void connmgr_stop(void); + +struct ae * connmgr_ae_create(struct conn_info info); + +void connmgr_ae_destroy(struct ae * ae); + +int connmgr_alloc(struct ae * ae, + char * dst_name, + qosspec_t qs, + struct conn * conn); + +int connmgr_wait(struct ae * ae, + struct conn * conn); + +#endif /* OUROBOROS_IPCPD_NORMAL_CONNMGR_H */ diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c index 5c7ebd7e..9c3b9973 100644 --- a/src/ipcpd/normal/enroll.c +++ b/src/ipcpd/normal/enroll.c @@ -197,7 +197,7 @@ int enroll_boot(char * dst_name) delta_t = ts_diff_ms(&t0, &rtt); - assert (len == 2 * sizeof (uint64_t)); + assert(len == 2 * sizeof (uint64_t)); rtt.tv_sec = ntoh64(((uint64_t *) data)[0]); rtt.tv_nsec = ntoh64(((uint64_t *) data)[1]); diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 0c927fc7..34724ddd 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -162,7 +162,7 @@ void * fmgr_nm1_sdu_reader(void * o) shm_pci_des(sdb, &pci); - if (pci.dst_addr != ipcpi.address) { + if (pci.dst_addr != ipcpi.dt_addr) { log_dbg("PDU needs to be forwarded."); if (pci.ttl == 0) { diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 915feaf8..c9b23060 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -285,7 +285,7 @@ cep_id_t frct_i_create(uint64_t address, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = address; - pci.src_addr = ipcpi.address; + pci.src_addr = ipcpi.dt_addr; pci.dst_cep_id = 0; pci.src_cep_id = id; pci.seqno = 0; @@ -330,7 +330,7 @@ int frct_i_accept(cep_id_t id, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = instance->r_address; - pci.src_addr = ipcpi.address; + pci.src_addr = ipcpi.dt_addr; pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = 0; @@ -367,7 +367,7 @@ int frct_i_destroy(cep_id_t id, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = instance->r_address; - pci.src_addr = ipcpi.address; + pci.src_addr = ipcpi.dt_addr; pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = 0; @@ -413,7 +413,7 @@ int frct_i_write_sdu(cep_id_t id, pci.pdu_type = PDU_TYPE_DTP; pci.dst_addr = instance->r_address; - pci.src_addr = ipcpi.address; + pci.src_addr = ipcpi.dt_addr; pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = (instance->seqno)++; diff --git a/src/ipcpd/normal/gam.c b/src/ipcpd/normal/gam.c index f98c0d4f..212cfd83 100644 --- a/src/ipcpd/normal/gam.c +++ b/src/ipcpd/normal/gam.c @@ -185,7 +185,7 @@ int gam_flow_arr(struct gam * instance, strcpy(snd_info.protocol, CDAP_PROTO); snd_info.pref_version = 1; snd_info.pref_syntax = PROTO_GPB; - snd_info.ae.addr = ipcpi.address; + snd_info.addr = ipcpi.dt_addr; if (cacep_rcv(fd, rcv_info)) { log_err("Error establishing application connection."); @@ -266,7 +266,7 @@ int gam_flow_alloc(struct gam * instance, strcpy(snd_info.protocol, CDAP_PROTO); snd_info.pref_version = 1; snd_info.pref_syntax = PROTO_GPB; - snd_info.ae.addr = ipcpi.address; + snd_info.addr = ipcpi.dt_addr; if (cacep_snd(fd, &snd_info)) { log_err("Failed to create application connection."); diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index c75a74d6..3e5907a8 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -24,7 +24,6 @@ #include #include -#include #include #include #include @@ -33,11 +32,10 @@ #include #include "addr_auth.h" -#include "ae.h" +#include "connmgr.h" #include "dir.h" #include "enroll.h" #include "fmgr.h" -#include "frct.h" #include "ipcp.h" #include "ribconfig.h" #include "ribmgr.h" @@ -45,16 +43,11 @@ #include #include #include -#include #include #include #include -#define THIS_TYPE IPCP_NORMAL - -struct { - pthread_t acceptor; -} normal; +#define THIS_TYPE IPCP_NORMAL void ipcp_sig_handler(int sig, siginfo_t * info, @@ -82,53 +75,6 @@ void ipcp_sig_handler(int sig, } } -static void * flow_acceptor(void * o) -{ - int fd; - qosspec_t qs; - /* FIXME: Remove once correct AE is known. */ - char * ae_name = ENROLL_AE; - - (void) o; - - while (true) { - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - log_info("Shutting down flow acceptor."); - return 0; - } - - pthread_rwlock_unlock(&ipcpi.state_lock); - - fd = flow_accept(&qs); - if (fd < 0) { - if (fd != -EIRMD) - log_warn("Flow accept failed: %d", fd); - continue; - } - - /* FIXME: Perform CACEP at this point */ - - if (strcmp(ae_name, ENROLL_AE) == 0) { - enroll_handle(fd); - } else if (strcmp(ae_name, MGMT_AE) == 0) { - ribmgr_flow_arr(fd, qs); - } else if (strcmp(ae_name, DT_AE) == 0) { - fmgr_nm1_flow_arr(fd, qs); - } else { - log_dbg("Flow allocation request for unknown AE %s.", - ae_name); - if (flow_alloc_resp(fd, -1)) - log_warn("Failed to reply to flow allocation."); - flow_dealloc(fd); - } - } - - return (void *) 0; -} - /* * Boots the IPCP off information in the rib. * Common function after bootstrap or enroll. @@ -153,15 +99,21 @@ static int boot_components(void) } if (rib_add(MEMBERS_PATH, ipcpi.name)) { - log_warn("Failed to add name to " MEMBERS_PATH); + log_err("Failed to add name to " MEMBERS_PATH); return -1; } log_dbg("Starting components."); + if (connmgr_init()) { + log_err("Failed to init ap connection manager"); + return -1; + } + if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa)) != sizeof(pa)) { log_err("Failed to read policy for address authority."); + connmgr_fini(); return -1; } @@ -170,20 +122,22 @@ static int boot_components(void) return -1; } - ipcpi.address = addr_auth_address(); - if (ipcpi.address == 0) { + ipcpi.dt_addr = addr_auth_address(); + if (ipcpi.dt_addr == 0) { log_err("Failed to get a valid address."); addr_auth_fini(); + connmgr_fini(); return -1; } - log_dbg("IPCP got address %" PRIu64 ".", ipcpi.address); + log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); log_dbg("Starting ribmgr."); if (ribmgr_init()) { log_err("Failed to initialize RIB manager."); addr_auth_fini(); + connmgr_fini(); return -1; } @@ -191,6 +145,7 @@ static int boot_components(void) log_err("Failed to initialize directory."); ribmgr_fini(); addr_auth_fini(); + connmgr_fini(); return -1; } @@ -200,6 +155,7 @@ static int boot_components(void) dir_fini(); ribmgr_fini(); addr_auth_fini(); + connmgr_fini(); log_err("Failed to start flow manager."); return -1; } @@ -209,19 +165,21 @@ static int boot_components(void) dir_fini(); ribmgr_fini(); addr_auth_fini(); + connmgr_fini(); log_err("Failed to initialize FRCT."); return -1; } ipcp_set_state(IPCP_OPERATIONAL); - if (pthread_create(&normal.acceptor, NULL, flow_acceptor, NULL)) { + if (connmgr_start()) { ipcp_set_state(IPCP_INIT); fmgr_fini(); dir_fini(); ribmgr_fini(); addr_auth_fini(); - log_err("Failed to create acceptor thread."); + connmgr_fini(); + log_err("Failed to start AP connection manager."); return -1; } @@ -230,8 +188,7 @@ static int boot_components(void) void shutdown_components(void) { - pthread_cancel(normal.acceptor); - pthread_join(normal.acceptor, NULL); + connmgr_stop(); frct_fini(); @@ -242,6 +199,8 @@ void shutdown_components(void) ribmgr_fini(); addr_auth_fini(); + + connmgr_fini(); } static int normal_ipcp_enroll(char * dst_name) @@ -410,9 +369,9 @@ static struct ipcp_ops normal_ops = { .ipcp_name_reg = dir_name_reg, .ipcp_name_unreg = dir_name_unreg, .ipcp_name_query = dir_name_query, - .ipcp_flow_alloc = NULL, /* fmgr_np1_alloc, */ - .ipcp_flow_alloc_resp = NULL, /* fmgr_np1_alloc_resp, */ - .ipcp_flow_dealloc = NULL, /* fmgr_np1_dealloc */ + .ipcp_flow_alloc = fmgr_np1_alloc, + .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp, + .ipcp_flow_dealloc = fmgr_np1_dealloc }; int main(int argc, diff --git a/src/ipcpd/normal/pol/complete.c b/src/ipcpd/normal/pol/complete.c index 68f43e81..daf8c9bf 100644 --- a/src/ipcpd/normal/pol/complete.c +++ b/src/ipcpd/normal/pol/complete.c @@ -38,7 +38,7 @@ struct neighbor { struct list_head next; - char * neighbor; + uint64_t neighbor; }; struct complete { @@ -135,7 +135,6 @@ void complete_destroy(void * o) list_for_each_safe(p, n, &complete->neighbors) { struct neighbor * e = list_entry(p, struct neighbor, next); list_del(&e->next); - free(e->neighbor); free(e); } @@ -168,7 +167,7 @@ int complete_accept_flow(void * o, list_for_each(pos, &complete->neighbors) { struct neighbor * e = list_entry(pos, struct neighbor, next); /* FIXME: figure out union type and check name or address */ - if (strcmp(e->neighbor, info->ae.name) == 0) { + if (e->neighbor == info->addr) { pthread_mutex_unlock(&complete->neighbors_lock); return -1; } @@ -186,13 +185,7 @@ int complete_accept_flow(void * o, list_head_init(&n->next); - /* FIXME: figure out union type and check name or address */ - n->neighbor = strdup(info->ae.name); - if (n->neighbor == NULL) { - pthread_mutex_unlock(&complete->neighbors_lock); - free(n); - return -1; - } + n->neighbor = info->addr; list_add(&n->next, &complete->neighbors); -- cgit v1.2.3