diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-08-28 20:10:22 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-08-28 20:10:22 +0000 |
commit | e8875c08ac04a1d9aca342d94d4f788239334f72 (patch) | |
tree | 4a96be10ea1b9f3d03bb2fd10f3f2e403c1dd934 /src/ipcpd/normal/connmgr.c | |
parent | c3185b9a6e471b534a370b2d913425962af88654 (diff) | |
parent | 999b5dec615ce4cfb30ee909bdd16e79a5e2a1ce (diff) | |
download | ouroboros-e8875c08ac04a1d9aca342d94d4f788239334f72.tar.gz ouroboros-e8875c08ac04a1d9aca342d94d4f788239334f72.zip |
Merged in dstaesse/ouroboros/be-deprecate-gam (pull request #572)
Be deprecate gam
Diffstat (limited to 'src/ipcpd/normal/connmgr.c')
-rw-r--r-- | src/ipcpd/normal/connmgr.c | 373 |
1 files changed, 222 insertions, 151 deletions
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index a83d71c3..fa43b97a 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -22,7 +22,7 @@ #define _POSIX_C_SOURCE 200112L -#define OUROBOROS_PREFIX "normal-ipcp" +#define OUROBOROS_PREFIX "Connection manager" #include <ouroboros/dev.h> #include <ouroboros/cacep.h> @@ -42,92 +42,69 @@ #include <stdlib.h> #include <assert.h> -struct ae_conn { +enum connmgr_state { + CONNMGR_NULL = 0, + CONNMGR_INIT, + CONNMGR_RUNNING +}; + +struct conn_el { struct list_head next; struct conn conn; }; struct ae { - struct list_head next; + struct nbs * nbs; struct conn_info info; - struct list_head conn_list; - pthread_cond_t conn_cond; - pthread_mutex_t conn_lock; + struct list_head conns; + struct list_head pending; + + pthread_cond_t cond; + pthread_mutex_t lock; }; struct { - pthread_t acceptor; + struct ae aes[AEID_MAX]; + enum connmgr_state state; - struct list_head aes; - pthread_rwlock_t aes_lock; + pthread_t acceptor; } connmgr; - -static int get_info_by_name(const char * name, - struct conn_info * info) +static int get_id_by_name(const char * name) { - struct list_head * p; + enum ae_id i; - pthread_rwlock_rdlock(&connmgr.aes_lock); - - list_for_each(p, &connmgr.aes) { - struct ae * ae = list_entry(p, struct ae, next); - if (strcmp(ae->info.ae_name, name) == 0) { - memcpy(info, &ae->info, sizeof(*info)); - pthread_rwlock_unlock(&connmgr.aes_lock); - return 0; - } - } - - pthread_rwlock_unlock(&connmgr.aes_lock); + for (i = 0; i < AEID_MAX; ++i) + if (strcmp(name, connmgr.aes[i].info.ae_name) == 0) + return i; return -1; } -static int add_ae_conn(const char * name, +static int add_ae_conn(enum ae_id id, int fd, qosspec_t qs, struct conn_info * rcv_info) { - struct ae_conn * ae_conn; - struct list_head * p; - struct ae * ae = NULL; + struct conn_el * el; - ae_conn = malloc(sizeof(*ae_conn)); - if (ae_conn == NULL) { + el = malloc(sizeof(*el)); + if (el == NULL) { log_err("Not enough memory."); return -1; } - ae_conn->conn.conn_info = *rcv_info; - ae_conn->conn.flow_info.fd = fd; - ae_conn->conn.flow_info.qs = qs; + el->conn.conn_info = *rcv_info; + el->conn.flow_info.fd = fd; + el->conn.flow_info.qs = qs; - list_head_init(&ae_conn->next); + pthread_mutex_lock(&connmgr.aes[id].lock); - pthread_rwlock_wrlock(&connmgr.aes_lock); + list_add(&el->next, &connmgr.aes[id].pending); + pthread_cond_signal(&connmgr.aes[id].cond); - list_for_each(p, &connmgr.aes) { - ae = list_entry(p, struct ae, next); - if (strcmp(ae->info.ae_name, name) == 0) - break; - } - - /* Check if entry was removed during allocation. */ - if (ae == NULL || strcmp(ae->info.ae_name, name) != 0) { - pthread_rwlock_unlock(&connmgr.aes_lock); - return -1; - } - - pthread_mutex_lock(&ae->conn_lock); - - list_add(&ae_conn->next, &ae->conn_list); - pthread_cond_signal(&ae->conn_cond); - - pthread_mutex_unlock(&ae->conn_lock); - - pthread_rwlock_unlock(&connmgr.aes_lock); + pthread_mutex_unlock(&connmgr.aes[id].lock); return 0; } @@ -136,7 +113,6 @@ static void * flow_acceptor(void * o) { int fd; qosspec_t qs; - struct conn_info snd_info; struct conn_info rcv_info; struct conn_info fail_info; @@ -145,10 +121,7 @@ static void * flow_acceptor(void * o) memset(&fail_info, 0, sizeof(fail_info)); while (true) { - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_info("Shutting down flow acceptor."); - return 0; - } + int id; fd = flow_accept(&qs, NULL); if (fd < 0) { @@ -158,26 +131,30 @@ static void * flow_acceptor(void * o) } if (cacep_rcv(fd, &rcv_info)) { - log_err("Error establishing application connection."); + log_dbg("Error establishing application connection."); flow_dealloc(fd); continue; } - if (get_info_by_name(rcv_info.ae_name, &snd_info)) { - log_err("Failed to get info for %s.", rcv_info.ae_name); + id = get_id_by_name(rcv_info.ae_name); + if (id < 0) { + log_dbg("Connection request for unknown AE %s.", + rcv_info.ae_name); cacep_snd(fd, &fail_info); flow_dealloc(fd); continue; } - if (cacep_snd(fd, &snd_info)) { - log_err("Failed to respond to request."); + assert(id < AEID_MAX); + + if (cacep_snd(fd, &connmgr.aes[id].info)) { + log_dbg("Failed to respond to request."); flow_dealloc(fd); continue; } - if (add_ae_conn(rcv_info.ae_name, fd, qs, &rcv_info)) { - log_err("Failed to add new connection."); + if (add_ae_conn(id, fd, qs, &rcv_info)) { + log_dbg("Failed to add new connection."); flow_dealloc(fd); continue; } @@ -188,130 +165,192 @@ static void * flow_acceptor(void * o) int connmgr_init(void) { - if (pthread_rwlock_init(&connmgr.aes_lock, NULL)) - return -1; - - list_head_init(&connmgr.aes); + connmgr.state = CONNMGR_INIT; return 0; } +void connmgr_fini(void) +{ + int i; + + if (connmgr.state == CONNMGR_RUNNING) + pthread_join(connmgr.acceptor, NULL); + + for (i = 0; i < AEID_MAX; ++i) + connmgr_ae_fini(i); +} + int connmgr_start(void) { if (pthread_create(&connmgr.acceptor, NULL, flow_acceptor, NULL)) return -1; + connmgr.state = CONNMGR_RUNNING; + return 0; } void connmgr_stop(void) { - pthread_cancel(connmgr.acceptor); + if (connmgr.state == CONNMGR_RUNNING) + pthread_cancel(connmgr.acceptor); } -static void destroy_ae(struct ae * ae) +int connmgr_ae_init(enum ae_id id, + const struct conn_info * info, + struct nbs * nbs) { - struct list_head * p; - struct list_head * h; + struct ae * ae; - pthread_mutex_lock(&ae->conn_lock); + assert(id >= 0 && id < AEID_MAX); - list_for_each_safe(p, h, &ae->conn_list) { - struct ae_conn * e = list_entry(p, struct ae_conn, next); - list_del(&e->next); - free(e); + ae = connmgr.aes + id; + + if (pthread_mutex_init(&ae->lock, NULL)) + return -1; + + if (pthread_cond_init(&ae->cond, NULL)) { + pthread_mutex_destroy(&ae->lock); + return -1; } - pthread_mutex_unlock(&ae->conn_lock); + list_head_init(&ae->conns); + list_head_init(&ae->pending); + + memcpy(&connmgr.aes[id].info, info, sizeof(connmgr.aes[id].info)); - pthread_cond_destroy(&ae->conn_cond); - pthread_mutex_destroy(&ae->conn_lock); + connmgr.aes[id].nbs = nbs; - free(ae); + return 0; } -void connmgr_fini(void) +void connmgr_ae_fini(enum ae_id id) { struct list_head * p; struct list_head * h; + struct ae * ae; + + assert(id >= 0 && id < AEID_MAX); - pthread_join(connmgr.acceptor, NULL); + if (strlen(connmgr.aes[id].info.ae_name) == 0) + return; - pthread_rwlock_wrlock(&connmgr.aes_lock); + ae = connmgr.aes + id; - list_for_each_safe(p, h, &connmgr.aes) { - struct ae * e = list_entry(p, struct ae, next); + pthread_mutex_lock(&ae->lock); + + list_for_each_safe(p, h, &ae->conns) { + struct conn_el * e = list_entry(p, struct conn_el, next); + list_del(&e->next); + free(e); + } + + list_for_each_safe(p, h, &ae->pending) { + struct conn_el * e = list_entry(p, struct conn_el, next); list_del(&e->next); - destroy_ae(e); + free(e); } - pthread_rwlock_unlock(&connmgr.aes_lock); + pthread_mutex_unlock(&ae->lock); - pthread_rwlock_destroy(&connmgr.aes_lock); + pthread_cond_destroy(&ae->cond); + pthread_mutex_destroy(&ae->lock); + + memset(&connmgr.aes[id].info, 0, sizeof(connmgr.aes[id].info)); + + connmgr.aes[id].nbs = NULL; } -struct ae * connmgr_ae_create(struct conn_info info) +int connmgr_ipcp_connect(const char * dst, + const char * component) { - struct ae * ae; + struct conn_el * ce; + int id; - ae = malloc(sizeof(*ae)); - if (ae == NULL) - goto fail_malloc; + assert(dst); + assert(component); - list_head_init(&ae->next); - list_head_init(&ae->conn_list); - - ae->info = info; + ce = malloc(sizeof(*ce)); + if (ce == NULL) { + log_dbg("Out of memory."); + return -1; + } - if (pthread_mutex_init(&ae->conn_lock, NULL)) - goto fail_mutex_init; + id = get_id_by_name(component); + if (id < 0) { + log_dbg("No such component: %s", component); + free(ce); + return -1; + } - if (pthread_cond_init(&ae->conn_cond, NULL)) - goto fail_cond_init; + /* FIXME: get the correct qos for the component. */ + if (connmgr_alloc(id, dst, NULL, &ce->conn)) { + free(ce); + return -1; + } - pthread_rwlock_wrlock(&connmgr.aes_lock); + if (strlen(dst) > DST_MAX_STRLEN) { + log_warn("Truncating dst length for connection."); + memcpy(ce->conn.flow_info.dst, dst, DST_MAX_STRLEN); + ce->conn.flow_info.dst[DST_MAX_STRLEN] = '\0'; + } else { + strcpy(ce->conn.flow_info.dst, dst); + } - list_add(&ae->next, &connmgr.aes); + pthread_mutex_lock(&connmgr.aes[id].lock); - pthread_rwlock_unlock(&connmgr.aes_lock); + list_add(&ce->next, &connmgr.aes[id].conns); - return ae; + pthread_mutex_unlock(&connmgr.aes[id].lock); - fail_cond_init: - pthread_mutex_destroy(&ae->conn_lock); - fail_mutex_init: - free(ae); - fail_malloc: - return NULL; + return 0; } -void connmgr_ae_destroy(struct ae * ae) +int connmgr_ipcp_disconnect(const char * dst, + const char * component) { - assert(ae); + struct list_head * p; + struct list_head * h; + int id; - pthread_rwlock_wrlock(&connmgr.aes_lock); + assert(dst); + assert(component); - list_del(&ae->next); + id = get_id_by_name(component); + if (id < 0) + return -1; - pthread_rwlock_unlock(&connmgr.aes_lock); + pthread_mutex_lock(&connmgr.aes[id].lock); + + list_for_each_safe(p,h, &connmgr.aes[id].conns) { + struct conn_el * el = list_entry(p, struct conn_el, next); + if (strcmp(el->conn.flow_info.dst, dst) == 0) { + int ret; + pthread_mutex_unlock(&connmgr.aes[id].lock); + list_del(&el->next); + ret = connmgr_dealloc(id, &el->conn); + free(el); + return ret; + } + } + + pthread_mutex_unlock(&connmgr.aes[id].lock); - destroy_ae(ae); + return 0; } -int connmgr_alloc(struct ae * ae, - const char * dst_name, +int connmgr_alloc(enum ae_id id, + const char * dst, qosspec_t * qs, struct conn * conn) { - assert(ae); - assert(dst_name); - assert(conn); - - memset(&conn->conn_info, 0, sizeof(conn->conn_info)); + assert(id >= 0 && id < AEID_MAX); + assert(dst); - conn->flow_info.fd = flow_alloc(dst_name, qs, NULL); + conn->flow_info.fd = flow_alloc(dst, qs, NULL); if (conn->flow_info.fd < 0) { - log_err("Failed to allocate flow to %s.", dst_name); + log_dbg("Failed to allocate flow to %s.", dst); return -1; } @@ -320,58 +359,90 @@ int connmgr_alloc(struct ae * ae, else memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs)); - if (cacep_snd(conn->flow_info.fd, &ae->info)) { - log_err("Failed to create application connection."); + log_dbg("Sending cacep info for protocol %s to fd %d.", + connmgr.aes[id].info.protocol, conn->flow_info.fd); + + if (cacep_snd(conn->flow_info.fd, &connmgr.aes[id].info)) { + log_dbg("Failed to create application connection."); flow_dealloc(conn->flow_info.fd); return -1; } if (cacep_rcv(conn->flow_info.fd, &conn->conn_info)) { - log_err("Failed to connect to application."); + log_dbg("Failed to connect to application."); flow_dealloc(conn->flow_info.fd); return -1; } - if (strcmp(ae->info.protocol, conn->conn_info.protocol) || - ae->info.pref_version != conn->conn_info.pref_version || - ae->info.pref_syntax != conn->conn_info.pref_syntax) { + if (strcmp(connmgr.aes[id].info.protocol, conn->conn_info.protocol)) { + log_dbg("Unknown protocol (requested %s, got %s).", + connmgr.aes[id].info.protocol, + conn->conn_info.protocol); flow_dealloc(conn->flow_info.fd); return -1; } + if (connmgr.aes[id].info.pref_version != conn->conn_info.pref_version) { + log_dbg("Unknown protocol version."); + flow_dealloc(conn->flow_info.fd); + return -1; + } + + if (connmgr.aes[id].info.pref_syntax != conn->conn_info.pref_syntax) { + log_dbg("Unknown protocol syntax."); + flow_dealloc(conn->flow_info.fd); + return -1; + } + + if (connmgr.aes[id].nbs != NULL) + nbs_add(connmgr.aes[id].nbs, *conn); + return 0; } -int connmgr_wait(struct ae * ae, +int connmgr_dealloc(enum ae_id id, + struct conn * conn) +{ + if (connmgr.aes[id].nbs != NULL) + nbs_del(connmgr.aes[id].nbs, conn->flow_info.fd); + + return flow_dealloc(conn->flow_info.fd); +} + + +int connmgr_wait(enum ae_id id, struct conn * conn) { - struct ae_conn * ae_conn; + struct conn_el * el; + struct ae * ae; - assert(ae); + assert(id >= 0 && id < AEID_MAX); assert(conn); - pthread_mutex_lock(&ae->conn_lock); + ae = connmgr.aes + id; + + pthread_mutex_lock(&ae->lock); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) &ae->conn_lock); + (void *) &ae->lock); - while (list_is_empty(&ae->conn_list)) - pthread_cond_wait(&ae->conn_cond, &ae->conn_lock); + while (list_is_empty(&ae->pending)) + pthread_cond_wait(&ae->cond, &ae->lock); pthread_cleanup_pop(false); - ae_conn = list_first_entry((&ae->conn_list), struct ae_conn, next); - if (ae_conn == NULL) { - pthread_mutex_unlock(&ae->conn_lock); + el = list_first_entry((&ae->pending), struct conn_el, next); + if (el == NULL) { + pthread_mutex_unlock(&ae->lock); return -1; } - *conn = ae_conn->conn; + *conn = el->conn; - list_del(&ae_conn->next); - free(ae_conn); + list_del(&el->next); + free(el); - pthread_mutex_unlock(&ae->conn_lock); + pthread_mutex_unlock(&ae->lock); return 0; } |