summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-08-24 00:47:06 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-08-24 00:47:06 +0200
commitf32db895e62152e1518fc5e184d19743d35e6cad (patch)
treeb32afcf153f4e6ac5880b5a986c3d6526d6d70c2 /src/ipcpd/normal/fmgr.c
parentd8744f9b77a98183ca4ecc6e0be5ce9a6e92ede0 (diff)
parentbb0a01dbb52cb0a02ce684b6fef3ec85e6c1918a (diff)
downloadouroboros-f32db895e62152e1518fc5e184d19743d35e6cad.tar.gz
ouroboros-f32db895e62152e1518fc5e184d19743d35e6cad.zip
Merged in sandervrijders/ouroboros/be-normal-flow-alloc (pull request #219)
ipcpd: normal: Add initial steps for N+1 flow allocation
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c373
1 files changed, 358 insertions, 15 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 70afff37..58ae1dc8 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -26,6 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
+#include <ouroboros/ipcp.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -37,8 +38,19 @@
#include "frct.h"
#include "ipcp.h"
+#include "flow_alloc.pb-c.h"
+typedef FlowAllocMsg flow_alloc_msg_t;
+
extern struct ipcp * _ipcp;
+struct n_flow {
+ struct flow flow;
+ struct frct_i * frct_i;
+ enum qos_cube qos;
+
+ struct list_head next;
+};
+
struct n_1_flow {
int fd;
char * ae_name;
@@ -51,6 +63,9 @@ struct fmgr {
struct list_head n_1_flows;
pthread_mutex_t n_1_flows_lock;
+ struct list_head n_flows;
+ /* FIXME: Make this a read/write lock */
+ pthread_mutex_t n_flows_lock;
} * fmgr = NULL;
static int add_n_1_fd(int fd,
@@ -68,6 +83,8 @@ static int add_n_1_fd(int fd,
tmp->fd = fd;
tmp->ae_name = ae_name;
+ INIT_LIST_HEAD(&tmp->next);
+
pthread_mutex_lock(&fmgr->n_1_flows_lock);
list_add(&tmp->next, &fmgr->n_1_flows);
pthread_mutex_unlock(&fmgr->n_1_flows_lock);
@@ -125,7 +142,8 @@ static void * fmgr_listen(void * o)
}
if (strcmp(ae_name, DT_AE) == 0) {
- if (frct_dt_flow(fd)) {
+ /* FIXME: Pass correct QoS cube */
+ if (frct_dt_flow(fd, 0)) {
LOG_ERR("Failed to hand fd to FRCT.");
flow_dealloc(fd);
continue;
@@ -149,13 +167,12 @@ int fmgr_init()
return -1;
INIT_LIST_HEAD(&fmgr->n_1_flows);
+ INIT_LIST_HEAD(&fmgr->n_flows);
pthread_mutex_init(&fmgr->n_1_flows_lock, NULL);
+ pthread_mutex_init(&fmgr->n_flows_lock, NULL);
- pthread_create(&fmgr->listen_thread,
- NULL,
- fmgr_listen,
- NULL);
+ pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL);
return 0;
}
@@ -187,11 +204,17 @@ int fmgr_mgmt_flow(char * dst_name)
{
int fd;
int result;
+ char * ae_name;
+
+ ae_name = strdup(MGMT_AE);
+ if (ae_name == NULL)
+ return -1;
/* FIXME: Request retransmission. */
fd = flow_alloc(dst_name, MGMT_AE, NULL);
if (fd < 0) {
LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
return -1;
}
@@ -199,29 +222,98 @@ int fmgr_mgmt_flow(char * dst_name)
if (result < 0) {
LOG_ERR("Result of flow allocation to %s is %d",
dst_name, result);
+ free(ae_name);
return -1;
}
if (ribmgr_add_flow(fd)) {
LOG_ERR("Failed to hand file descriptor to RIB manager");
flow_dealloc(fd);
+ free(ae_name);
+ return -1;
+ }
+
+ if (add_n_1_fd(fd, ae_name)) {
+ LOG_ERR("Failed to add file descriptor to list.");
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+int fmgr_dt_flow(char * dst_name,
+ enum qos_cube qos)
+{
+ int fd;
+ int result;
+ char * ae_name;
+
+ ae_name = strdup(DT_AE);
+ if (ae_name == NULL)
+ return -1;
+
+ /* FIXME: Map qos cube on correct QoS. */
+ fd = flow_alloc(dst_name, DT_AE, NULL);
+ if (fd < 0) {
+ LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
+ return -1;
+ }
+
+ result = flow_alloc_res(fd);
+ if (result < 0) {
+ LOG_ERR("Result of flow allocation to %s is %d",
+ dst_name, result);
+ free(ae_name);
+ return -1;
+ }
+
+ if (frct_dt_flow(fd, qos)) {
+ LOG_ERR("Failed to hand file descriptor to FRCT");
+ flow_dealloc(fd);
+ free(ae_name);
return -1;
}
- if (add_n_1_fd(fd, strdup(MGMT_AE))) {
+ if (add_n_1_fd(fd, ae_name)) {
LOG_ERR("Failed to add file descriptor to list.");
flow_dealloc(fd);
+ free(ae_name);
return -1;
}
return 0;
}
-int fmgr_dt_flow(char * dst_name)
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_port_id(int port_id)
{
- LOG_MISSING;
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr->n_flows) {
+ struct n_flow * e =
+ list_entry(pos, struct n_flow, next);
+ if (e->flow.port_id == port_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr->n_flows) {
+ struct n_flow * e =
+ list_entry(pos, struct n_flow, next);
+ if (e->frct_i == frct_i)
+ return e;
+ }
- return -1;
+ return NULL;
}
int fmgr_flow_alloc(pid_t n_api,
@@ -230,23 +322,274 @@ int fmgr_flow_alloc(pid_t n_api,
char * src_ae_name,
enum qos_cube qos)
{
- LOG_MISSING;
+ struct n_flow * flow;
+ struct frct_i * frct_i;
+ uint32_t address = 0;
+ buffer_t buf;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL)
+ return -1;
+
+ /* FIXME: Obtain correct address here from DIF NSM */
- return -1;
+ msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
+ msg.dst_name = dst_ap_name;
+ msg.src_ae_name = src_ae_name;
+ msg.qos_cube = qos;
+ msg.has_qos_cube = true;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0) {
+ free(flow);
+ return -1;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ free(flow);
+ return -1;
+ }
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+
+ frct_i = frct_i_create(address, &buf, qos);
+ if (frct_i == NULL) {
+ free(buf.data);
+ free(flow);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ free(buf.data);
+
+ flow->flow.rb = shm_ap_rbuff_open(n_api);
+ if (flow->flow.rb == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ free(flow);
+ return -1;
+ }
+
+ flow->flow.api = n_api;
+ flow->flow.port_id = port_id;
+ flow->flow.state = FLOW_PENDING;
+ flow->frct_i = frct_i;
+ flow->qos = qos;
+
+ INIT_LIST_HEAD(&flow->next);
+
+ list_add(&flow->next, &fmgr->n_flows);
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return 0;
+}
+
+/* Call under n_flows lock */
+static int n_flow_dealloc(int port_id)
+{
+ struct n_flow * flow;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+ int ret;
+
+ flow = get_n_flow_by_port_id(port_id);
+ if (flow == NULL)
+ return -1;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -1;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ ret = frct_i_destroy(flow->frct_i, &buf);
+ if (flow->flow.rb != NULL)
+ shm_ap_rbuff_close(flow->flow.rb);
+ list_del(&flow->next);
+
+ free(flow);
+ free(buf.data);
+
+ return ret;
}
int fmgr_flow_alloc_resp(pid_t n_api,
int port_id,
int response)
{
- LOG_MISSING;
+ struct n_flow * flow;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ int ret;
+ buffer_t buf;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
- return -1;
+ flow = get_n_flow_by_port_id(port_id);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ if (flow->flow.state != FLOW_PENDING) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ LOG_ERR("Flow is not pending.");
+ return -1;
+ }
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
+ msg.response = response;
+ msg.has_response = true;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ if (response < 0) {
+ ret = frct_i_destroy(flow->frct_i, &buf);
+ free(buf.data);
+ list_del(&flow->next);
+ free(flow);
+ } else {
+ frct_i_accept(flow->frct_i, &buf);
+ flow->flow.state = FLOW_ALLOCATED;
+ flow->flow.api = n_api;
+
+ flow->flow.rb = shm_ap_rbuff_open(n_api);
+ if (flow->flow.rb == NULL) {
+ ret = n_flow_dealloc(port_id);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return ret;
+ }
+ }
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return ret;
}
int fmgr_flow_dealloc(int port_id)
{
- LOG_MISSING;
+ int ret;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+ ret = n_flow_dealloc(port_id);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return ret;
+}
+
+int fmgr_flow_alloc_msg(struct frct_i * frct_i,
+ buffer_t * buf)
+{
+ struct n_flow * flow;
+ int ret = 0;
+ int port_id;
+ flow_alloc_msg_t * msg;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+
+ /* Depending on what is in the message call the function in ipcp.h */
+
+ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ LOG_ERR("Failed to unpack flow alloc message");
+ return -1;
+ }
+
+ switch (msg->code) {
+ case FLOW_ALLOC_CODE__FLOW_REQ:
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+
+ flow->flow.state = FLOW_PENDING;
+ flow->frct_i = frct_i;
+ flow->qos = msg->qos_cube;
+ flow->flow.rb = NULL;
+ flow->flow.api = 0;
+
+ port_id = ipcp_flow_req_arr(getpid(),
+ msg->dst_name,
+ msg->src_ae_name);
+ if (port_id < 0) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ free(flow);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("Failed to get port-id from IRMd.");
+ return -1;
+ }
+
+ flow->flow.port_id = port_id;
+
+ INIT_LIST_HEAD(&flow->next);
+
+ list_add(&flow->next, &fmgr->n_flows);
+ break;
+ case FLOW_ALLOC_CODE__FLOW_REPLY:
+ flow = get_n_flow_by_frct_i(frct_i);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("No such flow in flow manager.");
+ return -1;
+ }
+
+ ret = ipcp_flow_alloc_reply(getpid(),
+ flow->flow.port_id,
+ msg->response);
+
+ if (msg->response < 0) {
+ shm_ap_rbuff_close(flow->flow.rb);
+ list_del(&flow->next);
+ free(flow);
+ }
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_DEALLOC:
+ flow = get_n_flow_by_frct_i(frct_i);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("No such flow in flow manager.");
+ return -1;
+ }
+
+ ret = ipcp_flow_dealloc(0, flow->flow.port_id);
+ break;
+ default:
+ LOG_ERR("Got an unknown flow allocation message.");
+ ret = -1;
+ break;
+ }
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ return ret;
}