summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/connmgr.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-08-28 20:10:22 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-08-28 20:10:22 +0000
commite8875c08ac04a1d9aca342d94d4f788239334f72 (patch)
tree4a96be10ea1b9f3d03bb2fd10f3f2e403c1dd934 /src/ipcpd/normal/connmgr.c
parentc3185b9a6e471b534a370b2d913425962af88654 (diff)
parent999b5dec615ce4cfb30ee909bdd16e79a5e2a1ce (diff)
downloadouroboros-e8875c08ac04a1d9aca342d94d4f788239334f72.tar.gz
ouroboros-e8875c08ac04a1d9aca342d94d4f788239334f72.zip
Merged in dstaesse/ouroboros/be-deprecate-gam (pull request #572)
Be deprecate gam
Diffstat (limited to 'src/ipcpd/normal/connmgr.c')
-rw-r--r--src/ipcpd/normal/connmgr.c373
1 files changed, 222 insertions, 151 deletions
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c
index a83d71c3..fa43b97a 100644
--- a/src/ipcpd/normal/connmgr.c
+++ b/src/ipcpd/normal/connmgr.c
@@ -22,7 +22,7 @@
#define _POSIX_C_SOURCE 200112L
-#define OUROBOROS_PREFIX "normal-ipcp"
+#define OUROBOROS_PREFIX "Connection manager"
#include <ouroboros/dev.h>
#include <ouroboros/cacep.h>
@@ -42,92 +42,69 @@
#include <stdlib.h>
#include <assert.h>
-struct ae_conn {
+enum connmgr_state {
+ CONNMGR_NULL = 0,
+ CONNMGR_INIT,
+ CONNMGR_RUNNING
+};
+
+struct conn_el {
struct list_head next;
struct conn conn;
};
struct ae {
- struct list_head next;
+ struct nbs * nbs;
struct conn_info info;
- struct list_head conn_list;
- pthread_cond_t conn_cond;
- pthread_mutex_t conn_lock;
+ struct list_head conns;
+ struct list_head pending;
+
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
};
struct {
- pthread_t acceptor;
+ struct ae aes[AEID_MAX];
+ enum connmgr_state state;
- struct list_head aes;
- pthread_rwlock_t aes_lock;
+ pthread_t acceptor;
} connmgr;
-
-static int get_info_by_name(const char * name,
- struct conn_info * info)
+static int get_id_by_name(const char * name)
{
- struct list_head * p;
+ enum ae_id i;
- pthread_rwlock_rdlock(&connmgr.aes_lock);
-
- list_for_each(p, &connmgr.aes) {
- struct ae * ae = list_entry(p, struct ae, next);
- if (strcmp(ae->info.ae_name, name) == 0) {
- memcpy(info, &ae->info, sizeof(*info));
- pthread_rwlock_unlock(&connmgr.aes_lock);
- return 0;
- }
- }
-
- pthread_rwlock_unlock(&connmgr.aes_lock);
+ for (i = 0; i < AEID_MAX; ++i)
+ if (strcmp(name, connmgr.aes[i].info.ae_name) == 0)
+ return i;
return -1;
}
-static int add_ae_conn(const char * name,
+static int add_ae_conn(enum ae_id id,
int fd,
qosspec_t qs,
struct conn_info * rcv_info)
{
- struct ae_conn * ae_conn;
- struct list_head * p;
- struct ae * ae = NULL;
+ struct conn_el * el;
- ae_conn = malloc(sizeof(*ae_conn));
- if (ae_conn == NULL) {
+ el = malloc(sizeof(*el));
+ if (el == NULL) {
log_err("Not enough memory.");
return -1;
}
- ae_conn->conn.conn_info = *rcv_info;
- ae_conn->conn.flow_info.fd = fd;
- ae_conn->conn.flow_info.qs = qs;
+ el->conn.conn_info = *rcv_info;
+ el->conn.flow_info.fd = fd;
+ el->conn.flow_info.qs = qs;
- list_head_init(&ae_conn->next);
+ pthread_mutex_lock(&connmgr.aes[id].lock);
- pthread_rwlock_wrlock(&connmgr.aes_lock);
+ list_add(&el->next, &connmgr.aes[id].pending);
+ pthread_cond_signal(&connmgr.aes[id].cond);
- list_for_each(p, &connmgr.aes) {
- ae = list_entry(p, struct ae, next);
- if (strcmp(ae->info.ae_name, name) == 0)
- break;
- }
-
- /* Check if entry was removed during allocation. */
- if (ae == NULL || strcmp(ae->info.ae_name, name) != 0) {
- pthread_rwlock_unlock(&connmgr.aes_lock);
- return -1;
- }
-
- pthread_mutex_lock(&ae->conn_lock);
-
- list_add(&ae_conn->next, &ae->conn_list);
- pthread_cond_signal(&ae->conn_cond);
-
- pthread_mutex_unlock(&ae->conn_lock);
-
- pthread_rwlock_unlock(&connmgr.aes_lock);
+ pthread_mutex_unlock(&connmgr.aes[id].lock);
return 0;
}
@@ -136,7 +113,6 @@ static void * flow_acceptor(void * o)
{
int fd;
qosspec_t qs;
- struct conn_info snd_info;
struct conn_info rcv_info;
struct conn_info fail_info;
@@ -145,10 +121,7 @@ static void * flow_acceptor(void * o)
memset(&fail_info, 0, sizeof(fail_info));
while (true) {
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_info("Shutting down flow acceptor.");
- return 0;
- }
+ int id;
fd = flow_accept(&qs, NULL);
if (fd < 0) {
@@ -158,26 +131,30 @@ static void * flow_acceptor(void * o)
}
if (cacep_rcv(fd, &rcv_info)) {
- log_err("Error establishing application connection.");
+ log_dbg("Error establishing application connection.");
flow_dealloc(fd);
continue;
}
- if (get_info_by_name(rcv_info.ae_name, &snd_info)) {
- log_err("Failed to get info for %s.", rcv_info.ae_name);
+ id = get_id_by_name(rcv_info.ae_name);
+ if (id < 0) {
+ log_dbg("Connection request for unknown AE %s.",
+ rcv_info.ae_name);
cacep_snd(fd, &fail_info);
flow_dealloc(fd);
continue;
}
- if (cacep_snd(fd, &snd_info)) {
- log_err("Failed to respond to request.");
+ assert(id < AEID_MAX);
+
+ if (cacep_snd(fd, &connmgr.aes[id].info)) {
+ log_dbg("Failed to respond to request.");
flow_dealloc(fd);
continue;
}
- if (add_ae_conn(rcv_info.ae_name, fd, qs, &rcv_info)) {
- log_err("Failed to add new connection.");
+ if (add_ae_conn(id, fd, qs, &rcv_info)) {
+ log_dbg("Failed to add new connection.");
flow_dealloc(fd);
continue;
}
@@ -188,130 +165,192 @@ static void * flow_acceptor(void * o)
int connmgr_init(void)
{
- if (pthread_rwlock_init(&connmgr.aes_lock, NULL))
- return -1;
-
- list_head_init(&connmgr.aes);
+ connmgr.state = CONNMGR_INIT;
return 0;
}
+void connmgr_fini(void)
+{
+ int i;
+
+ if (connmgr.state == CONNMGR_RUNNING)
+ pthread_join(connmgr.acceptor, NULL);
+
+ for (i = 0; i < AEID_MAX; ++i)
+ connmgr_ae_fini(i);
+}
+
int connmgr_start(void)
{
if (pthread_create(&connmgr.acceptor, NULL, flow_acceptor, NULL))
return -1;
+ connmgr.state = CONNMGR_RUNNING;
+
return 0;
}
void connmgr_stop(void)
{
- pthread_cancel(connmgr.acceptor);
+ if (connmgr.state == CONNMGR_RUNNING)
+ pthread_cancel(connmgr.acceptor);
}
-static void destroy_ae(struct ae * ae)
+int connmgr_ae_init(enum ae_id id,
+ const struct conn_info * info,
+ struct nbs * nbs)
{
- struct list_head * p;
- struct list_head * h;
+ struct ae * ae;
- pthread_mutex_lock(&ae->conn_lock);
+ assert(id >= 0 && id < AEID_MAX);
- list_for_each_safe(p, h, &ae->conn_list) {
- struct ae_conn * e = list_entry(p, struct ae_conn, next);
- list_del(&e->next);
- free(e);
+ ae = connmgr.aes + id;
+
+ if (pthread_mutex_init(&ae->lock, NULL))
+ return -1;
+
+ if (pthread_cond_init(&ae->cond, NULL)) {
+ pthread_mutex_destroy(&ae->lock);
+ return -1;
}
- pthread_mutex_unlock(&ae->conn_lock);
+ list_head_init(&ae->conns);
+ list_head_init(&ae->pending);
+
+ memcpy(&connmgr.aes[id].info, info, sizeof(connmgr.aes[id].info));
- pthread_cond_destroy(&ae->conn_cond);
- pthread_mutex_destroy(&ae->conn_lock);
+ connmgr.aes[id].nbs = nbs;
- free(ae);
+ return 0;
}
-void connmgr_fini(void)
+void connmgr_ae_fini(enum ae_id id)
{
struct list_head * p;
struct list_head * h;
+ struct ae * ae;
+
+ assert(id >= 0 && id < AEID_MAX);
- pthread_join(connmgr.acceptor, NULL);
+ if (strlen(connmgr.aes[id].info.ae_name) == 0)
+ return;
- pthread_rwlock_wrlock(&connmgr.aes_lock);
+ ae = connmgr.aes + id;
- list_for_each_safe(p, h, &connmgr.aes) {
- struct ae * e = list_entry(p, struct ae, next);
+ pthread_mutex_lock(&ae->lock);
+
+ list_for_each_safe(p, h, &ae->conns) {
+ struct conn_el * e = list_entry(p, struct conn_el, next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ list_for_each_safe(p, h, &ae->pending) {
+ struct conn_el * e = list_entry(p, struct conn_el, next);
list_del(&e->next);
- destroy_ae(e);
+ free(e);
}
- pthread_rwlock_unlock(&connmgr.aes_lock);
+ pthread_mutex_unlock(&ae->lock);
- pthread_rwlock_destroy(&connmgr.aes_lock);
+ pthread_cond_destroy(&ae->cond);
+ pthread_mutex_destroy(&ae->lock);
+
+ memset(&connmgr.aes[id].info, 0, sizeof(connmgr.aes[id].info));
+
+ connmgr.aes[id].nbs = NULL;
}
-struct ae * connmgr_ae_create(struct conn_info info)
+int connmgr_ipcp_connect(const char * dst,
+ const char * component)
{
- struct ae * ae;
+ struct conn_el * ce;
+ int id;
- ae = malloc(sizeof(*ae));
- if (ae == NULL)
- goto fail_malloc;
+ assert(dst);
+ assert(component);
- list_head_init(&ae->next);
- list_head_init(&ae->conn_list);
-
- ae->info = info;
+ ce = malloc(sizeof(*ce));
+ if (ce == NULL) {
+ log_dbg("Out of memory.");
+ return -1;
+ }
- if (pthread_mutex_init(&ae->conn_lock, NULL))
- goto fail_mutex_init;
+ id = get_id_by_name(component);
+ if (id < 0) {
+ log_dbg("No such component: %s", component);
+ free(ce);
+ return -1;
+ }
- if (pthread_cond_init(&ae->conn_cond, NULL))
- goto fail_cond_init;
+ /* FIXME: get the correct qos for the component. */
+ if (connmgr_alloc(id, dst, NULL, &ce->conn)) {
+ free(ce);
+ return -1;
+ }
- pthread_rwlock_wrlock(&connmgr.aes_lock);
+ if (strlen(dst) > DST_MAX_STRLEN) {
+ log_warn("Truncating dst length for connection.");
+ memcpy(ce->conn.flow_info.dst, dst, DST_MAX_STRLEN);
+ ce->conn.flow_info.dst[DST_MAX_STRLEN] = '\0';
+ } else {
+ strcpy(ce->conn.flow_info.dst, dst);
+ }
- list_add(&ae->next, &connmgr.aes);
+ pthread_mutex_lock(&connmgr.aes[id].lock);
- pthread_rwlock_unlock(&connmgr.aes_lock);
+ list_add(&ce->next, &connmgr.aes[id].conns);
- return ae;
+ pthread_mutex_unlock(&connmgr.aes[id].lock);
- fail_cond_init:
- pthread_mutex_destroy(&ae->conn_lock);
- fail_mutex_init:
- free(ae);
- fail_malloc:
- return NULL;
+ return 0;
}
-void connmgr_ae_destroy(struct ae * ae)
+int connmgr_ipcp_disconnect(const char * dst,
+ const char * component)
{
- assert(ae);
+ struct list_head * p;
+ struct list_head * h;
+ int id;
- pthread_rwlock_wrlock(&connmgr.aes_lock);
+ assert(dst);
+ assert(component);
- list_del(&ae->next);
+ id = get_id_by_name(component);
+ if (id < 0)
+ return -1;
- pthread_rwlock_unlock(&connmgr.aes_lock);
+ pthread_mutex_lock(&connmgr.aes[id].lock);
+
+ list_for_each_safe(p,h, &connmgr.aes[id].conns) {
+ struct conn_el * el = list_entry(p, struct conn_el, next);
+ if (strcmp(el->conn.flow_info.dst, dst) == 0) {
+ int ret;
+ pthread_mutex_unlock(&connmgr.aes[id].lock);
+ list_del(&el->next);
+ ret = connmgr_dealloc(id, &el->conn);
+ free(el);
+ return ret;
+ }
+ }
+
+ pthread_mutex_unlock(&connmgr.aes[id].lock);
- destroy_ae(ae);
+ return 0;
}
-int connmgr_alloc(struct ae * ae,
- const char * dst_name,
+int connmgr_alloc(enum ae_id id,
+ const char * dst,
qosspec_t * qs,
struct conn * conn)
{
- assert(ae);
- assert(dst_name);
- assert(conn);
-
- memset(&conn->conn_info, 0, sizeof(conn->conn_info));
+ assert(id >= 0 && id < AEID_MAX);
+ assert(dst);
- conn->flow_info.fd = flow_alloc(dst_name, qs, NULL);
+ conn->flow_info.fd = flow_alloc(dst, qs, NULL);
if (conn->flow_info.fd < 0) {
- log_err("Failed to allocate flow to %s.", dst_name);
+ log_dbg("Failed to allocate flow to %s.", dst);
return -1;
}
@@ -320,58 +359,90 @@ int connmgr_alloc(struct ae * ae,
else
memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs));
- if (cacep_snd(conn->flow_info.fd, &ae->info)) {
- log_err("Failed to create application connection.");
+ log_dbg("Sending cacep info for protocol %s to fd %d.",
+ connmgr.aes[id].info.protocol, conn->flow_info.fd);
+
+ if (cacep_snd(conn->flow_info.fd, &connmgr.aes[id].info)) {
+ log_dbg("Failed to create application connection.");
flow_dealloc(conn->flow_info.fd);
return -1;
}
if (cacep_rcv(conn->flow_info.fd, &conn->conn_info)) {
- log_err("Failed to connect to application.");
+ log_dbg("Failed to connect to application.");
flow_dealloc(conn->flow_info.fd);
return -1;
}
- if (strcmp(ae->info.protocol, conn->conn_info.protocol) ||
- ae->info.pref_version != conn->conn_info.pref_version ||
- ae->info.pref_syntax != conn->conn_info.pref_syntax) {
+ if (strcmp(connmgr.aes[id].info.protocol, conn->conn_info.protocol)) {
+ log_dbg("Unknown protocol (requested %s, got %s).",
+ connmgr.aes[id].info.protocol,
+ conn->conn_info.protocol);
flow_dealloc(conn->flow_info.fd);
return -1;
}
+ if (connmgr.aes[id].info.pref_version != conn->conn_info.pref_version) {
+ log_dbg("Unknown protocol version.");
+ flow_dealloc(conn->flow_info.fd);
+ return -1;
+ }
+
+ if (connmgr.aes[id].info.pref_syntax != conn->conn_info.pref_syntax) {
+ log_dbg("Unknown protocol syntax.");
+ flow_dealloc(conn->flow_info.fd);
+ return -1;
+ }
+
+ if (connmgr.aes[id].nbs != NULL)
+ nbs_add(connmgr.aes[id].nbs, *conn);
+
return 0;
}
-int connmgr_wait(struct ae * ae,
+int connmgr_dealloc(enum ae_id id,
+ struct conn * conn)
+{
+ if (connmgr.aes[id].nbs != NULL)
+ nbs_del(connmgr.aes[id].nbs, conn->flow_info.fd);
+
+ return flow_dealloc(conn->flow_info.fd);
+}
+
+
+int connmgr_wait(enum ae_id id,
struct conn * conn)
{
- struct ae_conn * ae_conn;
+ struct conn_el * el;
+ struct ae * ae;
- assert(ae);
+ assert(id >= 0 && id < AEID_MAX);
assert(conn);
- pthread_mutex_lock(&ae->conn_lock);
+ ae = connmgr.aes + id;
+
+ pthread_mutex_lock(&ae->lock);
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) &ae->conn_lock);
+ (void *) &ae->lock);
- while (list_is_empty(&ae->conn_list))
- pthread_cond_wait(&ae->conn_cond, &ae->conn_lock);
+ while (list_is_empty(&ae->pending))
+ pthread_cond_wait(&ae->cond, &ae->lock);
pthread_cleanup_pop(false);
- ae_conn = list_first_entry((&ae->conn_list), struct ae_conn, next);
- if (ae_conn == NULL) {
- pthread_mutex_unlock(&ae->conn_lock);
+ el = list_first_entry((&ae->pending), struct conn_el, next);
+ if (el == NULL) {
+ pthread_mutex_unlock(&ae->lock);
return -1;
}
- *conn = ae_conn->conn;
+ *conn = el->conn;
- list_del(&ae_conn->next);
- free(ae_conn);
+ list_del(&el->next);
+ free(el);
- pthread_mutex_unlock(&ae->conn_lock);
+ pthread_mutex_unlock(&ae->lock);
return 0;
}