From c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 18 Sep 2016 06:27:43 +0200 Subject: lib, ipcp: Revise fast path and flow interfaces IPCPs can now use ap_init() to initialize the memory. All flows are accessed using flow descriptors, this greatly simplifies IPCP development. Reverts the fast path to a single ap_rbuff per process. Splits lib/ipcp into irmd/ipcp and lib/ipcp-dev. Adds a lib/shim-dev holding tailored functions for shims. Moves the buffer_t to utils.h. Fixes the shim-eth-llc length field. Removes the flow from shared.h. Fixes #4 Fixes #5 --- src/ipcpd/normal/fmgr.c | 202 ++++++++++++++++++------------------------------ 1 file changed, 74 insertions(+), 128 deletions(-) (limited to 'src/ipcpd/normal/fmgr.c') diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 79b1bb4b..b6ec1984 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -26,7 +26,7 @@ #include #include #include -#include +#include #include #include @@ -41,10 +41,8 @@ #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; -extern struct ipcp * _ipcp; - struct n_flow { - struct flow flow; + int fd; struct frct_i * frct_i; enum qos_cube qos; @@ -57,7 +55,7 @@ struct n_1_flow { struct list_head next; }; -struct fmgr { +struct { pthread_t listen_thread; struct list_head n_1_flows; @@ -66,10 +64,9 @@ struct fmgr { struct list_head n_flows; /* FIXME: Make this a read/write lock */ pthread_mutex_t n_flows_lock; -} * fmgr = NULL; +} fmgr; -static int add_n_1_fd(int fd, - char * ae_name) +static int add_n_1_fd(int fd, char * ae_name) { struct n_1_flow * tmp; @@ -85,9 +82,9 @@ static int add_n_1_fd(int fd, INIT_LIST_HEAD(&tmp->next); - pthread_mutex_lock(&fmgr->n_1_flows_lock); - list_add(&tmp->next, &fmgr->n_1_flows); - pthread_mutex_unlock(&fmgr->n_1_flows_lock); + pthread_mutex_lock(&fmgr.n_1_flows_lock); + list_add(&tmp->next, &fmgr.n_1_flows); + pthread_mutex_unlock(&fmgr.n_1_flows_lock); return 0; } @@ -98,16 +95,16 @@ static void * fmgr_listen(void * o) char * ae_name; while (true) { - ipcp_wait_state(_ipcp, IPCP_ENROLLED, NULL); + ipcp_wait_state(IPCP_ENROLLED, NULL); - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() == IPCP_SHUTDOWN) { + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); fd = flow_accept(&ae_name); if (fd < 0) { @@ -161,17 +158,13 @@ static void * fmgr_listen(void * o) int fmgr_init() { - fmgr = malloc(sizeof(*fmgr)); - if (fmgr == NULL) - return -1; + INIT_LIST_HEAD(&fmgr.n_1_flows); + INIT_LIST_HEAD(&fmgr.n_flows); - INIT_LIST_HEAD(&fmgr->n_1_flows); - INIT_LIST_HEAD(&fmgr->n_flows); + pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); + pthread_mutex_init(&fmgr.n_flows_lock, NULL); - pthread_mutex_init(&fmgr->n_1_flows_lock, NULL); - pthread_mutex_init(&fmgr->n_flows_lock, NULL); - - pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL); + pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL); return 0; } @@ -180,23 +173,20 @@ int fmgr_fini() { struct list_head * pos = NULL; - pthread_cancel(fmgr->listen_thread); + pthread_cancel(fmgr.listen_thread); - pthread_join(fmgr->listen_thread, NULL); + pthread_join(fmgr.listen_thread, NULL); - list_for_each(pos, &fmgr->n_1_flows) { - struct n_1_flow * e = - list_entry(pos, struct n_1_flow, next); + list_for_each(pos, &fmgr.n_1_flows) { + struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); if (e->ae_name != NULL) free(e->ae_name); if (ribmgr_remove_flow(e->fd)) LOG_ERR("Failed to remove management flow."); } - pthread_mutex_destroy(&fmgr->n_1_flows_lock); - pthread_mutex_destroy(&fmgr->n_flows_lock); - - free(fmgr); + pthread_mutex_destroy(&fmgr.n_1_flows_lock); + pthread_mutex_destroy(&fmgr.n_flows_lock); return 0; } @@ -243,8 +233,7 @@ int fmgr_mgmt_flow(char * dst_name) return 0; } -int fmgr_dt_flow(char * dst_name, - enum qos_cube qos) +int fmgr_dt_flow(char * dst_name, enum qos_cube qos) { int fd; int result; @@ -288,14 +277,13 @@ int fmgr_dt_flow(char * dst_name, } /* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_port_id(int port_id) +static struct n_flow * get_n_flow_by_fd(int fd) { struct list_head * pos = NULL; - list_for_each(pos, &fmgr->n_flows) { - struct n_flow * e = - list_entry(pos, struct n_flow, next); - if (e->flow.port_id == port_id) + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->fd == fd) return e; } @@ -307,9 +295,8 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) { struct list_head * pos = NULL; - list_for_each(pos, &fmgr->n_flows) { - struct n_flow * e = - list_entry(pos, struct n_flow, next); + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); if (e->frct_i == frct_i) return e; } @@ -317,8 +304,7 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) return NULL; } -int fmgr_flow_alloc(pid_t n_api, - int port_id, +int fmgr_flow_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) @@ -355,49 +341,40 @@ int fmgr_flow_alloc(pid_t n_api, flow_alloc_msg__pack(&msg, buf.data); - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); frct_i = frct_i_create(address, &buf, qos); if (frct_i == NULL) { free(buf.data); free(flow); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } free(buf.data); - flow->flow.rb = shm_ap_rbuff_open_s(n_api); - if (flow->flow.rb == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - free(flow); - return -1; - } - - flow->flow.api = n_api; - flow->flow.port_id = port_id; - flow->flow.state = FLOW_PENDING; + flow->fd = fd; flow->frct_i = frct_i; - flow->qos = qos; + flow->qos = qos; INIT_LIST_HEAD(&flow->next); - list_add(&flow->next, &fmgr->n_flows); + list_add(&flow->next, &fmgr.n_flows); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return 0; } /* Call under n_flows lock */ -static int n_flow_dealloc(int port_id) +static int n_flow_dealloc(int fd) { struct n_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; - flow = get_n_flow_by_port_id(port_id); + flow = get_n_flow_by_fd(fd); if (flow == NULL) return -1; @@ -414,8 +391,6 @@ static int n_flow_dealloc(int port_id) flow_alloc_msg__pack(&msg, buf.data); ret = frct_i_destroy(flow->frct_i, &buf); - if (flow->flow.rb != NULL) - shm_ap_rbuff_close(flow->flow.rb); list_del(&flow->next); free(flow); @@ -424,25 +399,17 @@ static int n_flow_dealloc(int port_id) return ret; } -int fmgr_flow_alloc_resp(pid_t n_api, - int port_id, - int response) +int fmgr_flow_alloc_resp(int fd, int response) { struct n_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); - flow = get_n_flow_by_port_id(port_id); + flow = get_n_flow_by_fd(fd); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } - - if (flow->flow.state != FLOW_PENDING) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - LOG_ERR("Flow is not pending."); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } @@ -452,13 +419,13 @@ int fmgr_flow_alloc_resp(pid_t n_api, buf.len = flow_alloc_msg__get_packed_size(&msg); if (buf.len == 0) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } buf.data = malloc(buf.len); if (buf.data == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } @@ -469,106 +436,85 @@ int fmgr_flow_alloc_resp(pid_t n_api, free(buf.data); list_del(&flow->next); free(flow); - } else { - if (frct_i_accept(flow->frct_i, &buf)) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } - - flow->flow.state = FLOW_ALLOCATED; - flow->flow.api = n_api; - - flow->flow.rb = shm_ap_rbuff_open_s(n_api); - if (flow->flow.rb == NULL) { - n_flow_dealloc(port_id); - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } + } else if (frct_i_accept(flow->frct_i, &buf)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + return -1; } - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return 0; } -int fmgr_flow_dealloc(int port_id) +int fmgr_flow_dealloc(int fd) { int ret; - pthread_mutex_lock(&fmgr->n_flows_lock); - ret = n_flow_dealloc(port_id); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); + ret = n_flow_dealloc(fd); + pthread_mutex_unlock(&fmgr.n_flows_lock); return ret; } -int fmgr_flow_alloc_msg(struct frct_i * frct_i, - buffer_t * buf) +int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) { struct n_flow * flow; int ret = 0; - int port_id; + int fd; flow_alloc_msg_t * msg; - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); - /* Depending on what is in the message call the function in ipcp.h */ + /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } switch (msg->code) { case FLOW_ALLOC_CODE__FLOW_REQ: - flow = malloc(sizeof(*flow)); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); return -1; } - flow->flow.state = FLOW_PENDING; flow->frct_i = frct_i; flow->qos = msg->qos_cube; - flow->flow.rb = NULL; - flow->flow.api = 0; - - port_id = ipcp_flow_req_arr(getpid(), - msg->dst_name, - msg->src_ae_name); - if (port_id < 0) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + + fd = ipcp_flow_req_arr(getpid(), + msg->dst_name, + msg->src_ae_name); + if (fd < 0) { + pthread_mutex_unlock(&fmgr.n_flows_lock); free(flow); flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to get port-id from IRMd."); + LOG_ERR("Failed to get fd for flow."); return -1; } - flow->flow.port_id = port_id; + flow->fd = fd; INIT_LIST_HEAD(&flow->next); - list_add(&flow->next, &fmgr->n_flows); + list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: flow = get_n_flow_by_frct_i(frct_i); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - ret = ipcp_flow_alloc_reply(getpid(), - flow->flow.port_id, - msg->response); - + ret = ipcp_flow_alloc_reply(flow->fd, msg->response); if (msg->response < 0) { - shm_ap_rbuff_close(flow->flow.rb); list_del(&flow->next); free(flow); } @@ -577,13 +523,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, case FLOW_ALLOC_CODE__FLOW_DEALLOC: flow = get_n_flow_by_frct_i(frct_i); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - ret = irm_flow_dealloc(flow->flow.port_id); + ret = flow_dealloc(flow->fd); break; default: LOG_ERR("Got an unknown flow allocation message."); @@ -591,7 +537,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, break; } - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); -- cgit v1.2.3