summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-04 15:23:54 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-04 15:23:54 +0200
commit1a7c0923206cfb98d43122621a585027c67040ea (patch)
treeacd08f09f5a094e897020e97961b2847209df043 /src/ipcpd/normal/fmgr.c
parentecdf47b97abb8c5107846f4ef4a17bd62ba6dc82 (diff)
parentc96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (diff)
downloadouroboros-1a7c0923206cfb98d43122621a585027c67040ea.tar.gz
ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.zip
Merged in dstaesse/ouroboros/be-unify (pull request #251)
lib, ipcp: Revise fast path and flow interfaces
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c202
1 files changed, 74 insertions, 128 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 79b1bb4b..b6ec1984 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -26,7 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
-#include <ouroboros/ipcp.h>
+#include <ouroboros/ipcp-dev.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -41,10 +41,8 @@
#include "flow_alloc.pb-c.h"
typedef FlowAllocMsg flow_alloc_msg_t;
-extern struct ipcp * _ipcp;
-
struct n_flow {
- struct flow flow;
+ int fd;
struct frct_i * frct_i;
enum qos_cube qos;
@@ -57,7 +55,7 @@ struct n_1_flow {
struct list_head next;
};
-struct fmgr {
+struct {
pthread_t listen_thread;
struct list_head n_1_flows;
@@ -66,10 +64,9 @@ struct fmgr {
struct list_head n_flows;
/* FIXME: Make this a read/write lock */
pthread_mutex_t n_flows_lock;
-} * fmgr = NULL;
+} fmgr;
-static int add_n_1_fd(int fd,
- char * ae_name)
+static int add_n_1_fd(int fd, char * ae_name)
{
struct n_1_flow * tmp;
@@ -85,9 +82,9 @@ static int add_n_1_fd(int fd,
INIT_LIST_HEAD(&tmp->next);
- pthread_mutex_lock(&fmgr->n_1_flows_lock);
- list_add(&tmp->next, &fmgr->n_1_flows);
- pthread_mutex_unlock(&fmgr->n_1_flows_lock);
+ pthread_mutex_lock(&fmgr.n_1_flows_lock);
+ list_add(&tmp->next, &fmgr.n_1_flows);
+ pthread_mutex_unlock(&fmgr.n_1_flows_lock);
return 0;
}
@@ -98,16 +95,16 @@ static void * fmgr_listen(void * o)
char * ae_name;
while (true) {
- ipcp_wait_state(_ipcp, IPCP_ENROLLED, NULL);
+ ipcp_wait_state(IPCP_ENROLLED, NULL);
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() == IPCP_SHUTDOWN) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
fd = flow_accept(&ae_name);
if (fd < 0) {
@@ -161,17 +158,13 @@ static void * fmgr_listen(void * o)
int fmgr_init()
{
- fmgr = malloc(sizeof(*fmgr));
- if (fmgr == NULL)
- return -1;
+ INIT_LIST_HEAD(&fmgr.n_1_flows);
+ INIT_LIST_HEAD(&fmgr.n_flows);
- INIT_LIST_HEAD(&fmgr->n_1_flows);
- INIT_LIST_HEAD(&fmgr->n_flows);
+ pthread_mutex_init(&fmgr.n_1_flows_lock, NULL);
+ pthread_mutex_init(&fmgr.n_flows_lock, NULL);
- pthread_mutex_init(&fmgr->n_1_flows_lock, NULL);
- pthread_mutex_init(&fmgr->n_flows_lock, NULL);
-
- pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL);
+ pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL);
return 0;
}
@@ -180,23 +173,20 @@ int fmgr_fini()
{
struct list_head * pos = NULL;
- pthread_cancel(fmgr->listen_thread);
+ pthread_cancel(fmgr.listen_thread);
- pthread_join(fmgr->listen_thread, NULL);
+ pthread_join(fmgr.listen_thread, NULL);
- list_for_each(pos, &fmgr->n_1_flows) {
- struct n_1_flow * e =
- list_entry(pos, struct n_1_flow, next);
+ list_for_each(pos, &fmgr.n_1_flows) {
+ struct n_1_flow * e = list_entry(pos, struct n_1_flow, next);
if (e->ae_name != NULL)
free(e->ae_name);
if (ribmgr_remove_flow(e->fd))
LOG_ERR("Failed to remove management flow.");
}
- pthread_mutex_destroy(&fmgr->n_1_flows_lock);
- pthread_mutex_destroy(&fmgr->n_flows_lock);
-
- free(fmgr);
+ pthread_mutex_destroy(&fmgr.n_1_flows_lock);
+ pthread_mutex_destroy(&fmgr.n_flows_lock);
return 0;
}
@@ -243,8 +233,7 @@ int fmgr_mgmt_flow(char * dst_name)
return 0;
}
-int fmgr_dt_flow(char * dst_name,
- enum qos_cube qos)
+int fmgr_dt_flow(char * dst_name, enum qos_cube qos)
{
int fd;
int result;
@@ -288,14 +277,13 @@ int fmgr_dt_flow(char * dst_name,
}
/* Call under n_flows lock */
-static struct n_flow * get_n_flow_by_port_id(int port_id)
+static struct n_flow * get_n_flow_by_fd(int fd)
{
struct list_head * pos = NULL;
- list_for_each(pos, &fmgr->n_flows) {
- struct n_flow * e =
- list_entry(pos, struct n_flow, next);
- if (e->flow.port_id == port_id)
+ list_for_each(pos, &fmgr.n_flows) {
+ struct n_flow * e = list_entry(pos, struct n_flow, next);
+ if (e->fd == fd)
return e;
}
@@ -307,9 +295,8 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
{
struct list_head * pos = NULL;
- list_for_each(pos, &fmgr->n_flows) {
- struct n_flow * e =
- list_entry(pos, struct n_flow, next);
+ list_for_each(pos, &fmgr.n_flows) {
+ struct n_flow * e = list_entry(pos, struct n_flow, next);
if (e->frct_i == frct_i)
return e;
}
@@ -317,8 +304,7 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
return NULL;
}
-int fmgr_flow_alloc(pid_t n_api,
- int port_id,
+int fmgr_flow_alloc(int fd,
char * dst_ap_name,
char * src_ae_name,
enum qos_cube qos)
@@ -355,49 +341,40 @@ int fmgr_flow_alloc(pid_t n_api,
flow_alloc_msg__pack(&msg, buf.data);
- pthread_mutex_lock(&fmgr->n_flows_lock);
+ pthread_mutex_lock(&fmgr.n_flows_lock);
frct_i = frct_i_create(address, &buf, qos);
if (frct_i == NULL) {
free(buf.data);
free(flow);
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return -1;
}
free(buf.data);
- flow->flow.rb = shm_ap_rbuff_open_s(n_api);
- if (flow->flow.rb == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- free(flow);
- return -1;
- }
-
- flow->flow.api = n_api;
- flow->flow.port_id = port_id;
- flow->flow.state = FLOW_PENDING;
+ flow->fd = fd;
flow->frct_i = frct_i;
- flow->qos = qos;
+ flow->qos = qos;
INIT_LIST_HEAD(&flow->next);
- list_add(&flow->next, &fmgr->n_flows);
+ list_add(&flow->next, &fmgr.n_flows);
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return 0;
}
/* Call under n_flows lock */
-static int n_flow_dealloc(int port_id)
+static int n_flow_dealloc(int fd)
{
struct n_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
int ret;
- flow = get_n_flow_by_port_id(port_id);
+ flow = get_n_flow_by_fd(fd);
if (flow == NULL)
return -1;
@@ -414,8 +391,6 @@ static int n_flow_dealloc(int port_id)
flow_alloc_msg__pack(&msg, buf.data);
ret = frct_i_destroy(flow->frct_i, &buf);
- if (flow->flow.rb != NULL)
- shm_ap_rbuff_close(flow->flow.rb);
list_del(&flow->next);
free(flow);
@@ -424,25 +399,17 @@ static int n_flow_dealloc(int port_id)
return ret;
}
-int fmgr_flow_alloc_resp(pid_t n_api,
- int port_id,
- int response)
+int fmgr_flow_alloc_resp(int fd, int response)
{
struct n_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
- pthread_mutex_lock(&fmgr->n_flows_lock);
+ pthread_mutex_lock(&fmgr.n_flows_lock);
- flow = get_n_flow_by_port_id(port_id);
+ flow = get_n_flow_by_fd(fd);
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- return -1;
- }
-
- if (flow->flow.state != FLOW_PENDING) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- LOG_ERR("Flow is not pending.");
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return -1;
}
@@ -452,13 +419,13 @@ int fmgr_flow_alloc_resp(pid_t n_api,
buf.len = flow_alloc_msg__get_packed_size(&msg);
if (buf.len == 0) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return -1;
}
buf.data = malloc(buf.len);
if (buf.data == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return -1;
}
@@ -469,106 +436,85 @@ int fmgr_flow_alloc_resp(pid_t n_api,
free(buf.data);
list_del(&flow->next);
free(flow);
- } else {
- if (frct_i_accept(flow->frct_i, &buf)) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- return -1;
- }
-
- flow->flow.state = FLOW_ALLOCATED;
- flow->flow.api = n_api;
-
- flow->flow.rb = shm_ap_rbuff_open_s(n_api);
- if (flow->flow.rb == NULL) {
- n_flow_dealloc(port_id);
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- return -1;
- }
+ } else if (frct_i_accept(flow->frct_i, &buf)) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ return -1;
}
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return 0;
}
-int fmgr_flow_dealloc(int port_id)
+int fmgr_flow_dealloc(int fd)
{
int ret;
- pthread_mutex_lock(&fmgr->n_flows_lock);
- ret = n_flow_dealloc(port_id);
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_lock(&fmgr.n_flows_lock);
+ ret = n_flow_dealloc(fd);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return ret;
}
-int fmgr_flow_alloc_msg(struct frct_i * frct_i,
- buffer_t * buf)
+int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
{
struct n_flow * flow;
int ret = 0;
- int port_id;
+ int fd;
flow_alloc_msg_t * msg;
- pthread_mutex_lock(&fmgr->n_flows_lock);
+ pthread_mutex_lock(&fmgr.n_flows_lock);
- /* Depending on what is in the message call the function in ipcp.h */
+ /* Depending on the message call the function in ipcp-dev.h */
msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
if (msg == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
LOG_ERR("Failed to unpack flow alloc message");
return -1;
}
switch (msg->code) {
case FLOW_ALLOC_CODE__FLOW_REQ:
-
flow = malloc(sizeof(*flow));
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
return -1;
}
- flow->flow.state = FLOW_PENDING;
flow->frct_i = frct_i;
flow->qos = msg->qos_cube;
- flow->flow.rb = NULL;
- flow->flow.api = 0;
-
- port_id = ipcp_flow_req_arr(getpid(),
- msg->dst_name,
- msg->src_ae_name);
- if (port_id < 0) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ fd = ipcp_flow_req_arr(getpid(),
+ msg->dst_name,
+ msg->src_ae_name);
+ if (fd < 0) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
free(flow);
flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("Failed to get port-id from IRMd.");
+ LOG_ERR("Failed to get fd for flow.");
return -1;
}
- flow->flow.port_id = port_id;
+ flow->fd = fd;
INIT_LIST_HEAD(&flow->next);
- list_add(&flow->next, &fmgr->n_flows);
+ list_add(&flow->next, &fmgr.n_flows);
break;
case FLOW_ALLOC_CODE__FLOW_REPLY:
flow = get_n_flow_by_frct_i(frct_i);
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("No such flow in flow manager.");
return -1;
}
- ret = ipcp_flow_alloc_reply(getpid(),
- flow->flow.port_id,
- msg->response);
-
+ ret = ipcp_flow_alloc_reply(flow->fd, msg->response);
if (msg->response < 0) {
- shm_ap_rbuff_close(flow->flow.rb);
list_del(&flow->next);
free(flow);
}
@@ -577,13 +523,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i,
case FLOW_ALLOC_CODE__FLOW_DEALLOC:
flow = get_n_flow_by_frct_i(frct_i);
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("No such flow in flow manager.");
return -1;
}
- ret = irm_flow_dealloc(flow->flow.port_id);
+ ret = flow_dealloc(flow->fd);
break;
default:
LOG_ERR("Got an unknown flow allocation message.");
@@ -591,7 +537,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i,
break;
}
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);