summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-08-22 11:13:31 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-08-23 22:40:01 +0200
commitbb0a01dbb52cb0a02ce684b6fef3ec85e6c1918a (patch)
treee378c45c9f834ae8155096387c90b905714abc81 /src/ipcpd/normal/fmgr.c
parente2fd96f11b6a90d92f2d33627cb57ebf266e62ef (diff)
downloadouroboros-bb0a01dbb52cb0a02ce684b6fef3ec85e6c1918a.tar.gz
ouroboros-bb0a01dbb52cb0a02ce684b6fef3ec85e6c1918a.zip
ipcpd: normal: Add initial steps for N+1 flow allocation
This adds the initial framework for flow allocation between two N+1 endpoints. The FMGR will receive flow allocation requests and will create a connection as a result, addressed to the right address, it will also pass a flow allocation message to this address. Upon receipt on the other side, the FMGR will be receive a flow allocation message and a pointer to a new connection. The FMGR can then accept or destroy the connection. This commit also introduces the RMT function, which is needed by the FRCT to forward its SDUs on the right file descriptor.
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c373
1 files changed, 358 insertions, 15 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 70afff37..58ae1dc8 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -26,6 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
+#include <ouroboros/ipcp.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -37,8 +38,19 @@
#include "frct.h"
#include "ipcp.h"
+#include "flow_alloc.pb-c.h"
+typedef FlowAllocMsg flow_alloc_msg_t;
+
extern struct ipcp * _ipcp;
+struct n_flow {
+ struct flow flow;
+ struct frct_i * frct_i;
+ enum qos_cube qos;
+
+ struct list_head next;
+};
+
struct n_1_flow {
int fd;
char * ae_name;
@@ -51,6 +63,9 @@ struct fmgr {
struct list_head n_1_flows;
pthread_mutex_t n_1_flows_lock;
+ struct list_head n_flows;
+ /* FIXME: Make this a read/write lock */
+ pthread_mutex_t n_flows_lock;
} * fmgr = NULL;
static int add_n_1_fd(int fd,
@@ -68,6 +83,8 @@ static int add_n_1_fd(int fd,
tmp->fd = fd;
tmp->ae_name = ae_name;
+ INIT_LIST_HEAD(&tmp->next);
+
pthread_mutex_lock(&fmgr->n_1_flows_lock);
list_add(&tmp->next, &fmgr->n_1_flows);
pthread_mutex_unlock(&fmgr->n_1_flows_lock);
@@ -125,7 +142,8 @@ static void * fmgr_listen(void * o)
}
if (strcmp(ae_name, DT_AE) == 0) {
- if (frct_dt_flow(fd)) {
+ /* FIXME: Pass correct QoS cube */
+ if (frct_dt_flow(fd, 0)) {
LOG_ERR("Failed to hand fd to FRCT.");
flow_dealloc(fd);
continue;
@@ -149,13 +167,12 @@ int fmgr_init()
return -1;
INIT_LIST_HEAD(&fmgr->n_1_flows);
+ INIT_LIST_HEAD(&fmgr->n_flows);
pthread_mutex_init(&fmgr->n_1_flows_lock, NULL);
+ pthread_mutex_init(&fmgr->n_flows_lock, NULL);
- pthread_create(&fmgr->listen_thread,
- NULL,
- fmgr_listen,
- NULL);
+ pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL);
return 0;
}
@@ -187,11 +204,17 @@ int fmgr_mgmt_flow(char * dst_name)
{
int fd;
int result;
+ char * ae_name;
+
+ ae_name = strdup(MGMT_AE);
+ if (ae_name == NULL)
+ return -1;
/* FIXME: Request retransmission. */
fd = flow_alloc(dst_name, MGMT_AE, NULL);
if (fd < 0) {
LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
return -1;
}
@@ -199,29 +222,98 @@ int fmgr_mgmt_flow(char * dst_name)
if (result < 0) {
LOG_ERR("Result of flow allocation to %s is %d",
dst_name, result);
+ free(ae_name);
return -1;
}
if (ribmgr_add_flow(fd)) {
LOG_ERR("Failed to hand file descriptor to RIB manager");
flow_dealloc(fd);
+ free(ae_name);
+ return -1;
+ }
+
+ if (add_n_1_fd(fd, ae_name)) {
+ LOG_ERR("Failed to add file descriptor to list.");
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+int fmgr_dt_flow(char * dst_name,
+ enum qos_cube qos)
+{
+ int fd;
+ int result;
+ char * ae_name;
+
+ ae_name = strdup(DT_AE);
+ if (ae_name == NULL)
+ return -1;
+
+ /* FIXME: Map qos cube on correct QoS. */
+ fd = flow_alloc(dst_name, DT_AE, NULL);
+ if (fd < 0) {
+ LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
+ return -1;
+ }
+
+ result = flow_alloc_res(fd);
+ if (result < 0) {
+ LOG_ERR("Result of flow allocation to %s is %d",
+ dst_name, result);
+ free(ae_name);
+ return -1;
+ }
+
+ if (frct_dt_flow(fd, qos)) {
+ LOG_ERR("Failed to hand file descriptor to FRCT");
+ flow_dealloc(fd);
+ free(ae_name);
return -1;
}
- if (add_n_1_fd(fd, strdup(MGMT_AE))) {
+ if (add_n_1_fd(fd, ae_name)) {
LOG_ERR("Failed to add file descriptor to list.");
flow_dealloc(fd);
+ free(ae_name);
return -1;
}
return 0;
}
-int fmgr_dt_flow(char * dst_name)
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_port_id(int port_id)
{
- LOG_MISSING;
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr->n_flows) {
+ struct n_flow * e =
+ list_entry(pos, struct n_flow, next);
+ if (e->flow.port_id == port_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr->n_flows) {
+ struct n_flow * e =
+ list_entry(pos, struct n_flow, next);
+ if (e->frct_i == frct_i)
+ return e;
+ }
- return -1;
+ return NULL;
}
int fmgr_flow_alloc(pid_t n_api,
@@ -230,23 +322,274 @@ int fmgr_flow_alloc(pid_t n_api,
char * src_ae_name,
enum qos_cube qos)
{
- LOG_MISSING;
+ struct n_flow * flow;
+ struct frct_i * frct_i;
+ uint32_t address = 0;
+ buffer_t buf;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL)
+ return -1;
+
+ /* FIXME: Obtain correct address here from DIF NSM */
- return -1;
+ msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
+ msg.dst_name = dst_ap_name;
+ msg.src_ae_name = src_ae_name;
+ msg.qos_cube = qos;
+ msg.has_qos_cube = true;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0) {
+ free(flow);
+ return -1;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ free(flow);
+ return -1;
+ }
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+
+ frct_i = frct_i_create(address, &buf, qos);
+ if (frct_i == NULL) {
+ free(buf.data);
+ free(flow);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ free(buf.data);
+
+ flow->flow.rb = shm_ap_rbuff_open(n_api);
+ if (flow->flow.rb == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ free(flow);
+ return -1;
+ }
+
+ flow->flow.api = n_api;
+ flow->flow.port_id = port_id;
+ flow->flow.state = FLOW_PENDING;
+ flow->frct_i = frct_i;
+ flow->qos = qos;
+
+ INIT_LIST_HEAD(&flow->next);
+
+ list_add(&flow->next, &fmgr->n_flows);
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return 0;
+}
+
+/* Call under n_flows lock */
+static int n_flow_dealloc(int port_id)
+{
+ struct n_flow * flow;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+ int ret;
+
+ flow = get_n_flow_by_port_id(port_id);
+ if (flow == NULL)
+ return -1;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -1;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ ret = frct_i_destroy(flow->frct_i, &buf);
+ if (flow->flow.rb != NULL)
+ shm_ap_rbuff_close(flow->flow.rb);
+ list_del(&flow->next);
+
+ free(flow);
+ free(buf.data);
+
+ return ret;
}
int fmgr_flow_alloc_resp(pid_t n_api,
int port_id,
int response)
{
- LOG_MISSING;
+ struct n_flow * flow;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ int ret;
+ buffer_t buf;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
- return -1;
+ flow = get_n_flow_by_port_id(port_id);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ if (flow->flow.state != FLOW_PENDING) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ LOG_ERR("Flow is not pending.");
+ return -1;
+ }
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
+ msg.response = response;
+ msg.has_response = true;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ if (response < 0) {
+ ret = frct_i_destroy(flow->frct_i, &buf);
+ free(buf.data);
+ list_del(&flow->next);
+ free(flow);
+ } else {
+ frct_i_accept(flow->frct_i, &buf);
+ flow->flow.state = FLOW_ALLOCATED;
+ flow->flow.api = n_api;
+
+ flow->flow.rb = shm_ap_rbuff_open(n_api);
+ if (flow->flow.rb == NULL) {
+ ret = n_flow_dealloc(port_id);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return ret;
+ }
+ }
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return ret;
}
int fmgr_flow_dealloc(int port_id)
{
- LOG_MISSING;
+ int ret;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+ ret = n_flow_dealloc(port_id);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return ret;
+}
+
+int fmgr_flow_alloc_msg(struct frct_i * frct_i,
+ buffer_t * buf)
+{
+ struct n_flow * flow;
+ int ret = 0;
+ int port_id;
+ flow_alloc_msg_t * msg;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+
+ /* Depending on what is in the message call the function in ipcp.h */
+
+ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ LOG_ERR("Failed to unpack flow alloc message");
+ return -1;
+ }
+
+ switch (msg->code) {
+ case FLOW_ALLOC_CODE__FLOW_REQ:
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+
+ flow->flow.state = FLOW_PENDING;
+ flow->frct_i = frct_i;
+ flow->qos = msg->qos_cube;
+ flow->flow.rb = NULL;
+ flow->flow.api = 0;
+
+ port_id = ipcp_flow_req_arr(getpid(),
+ msg->dst_name,
+ msg->src_ae_name);
+ if (port_id < 0) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ free(flow);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("Failed to get port-id from IRMd.");
+ return -1;
+ }
+
+ flow->flow.port_id = port_id;
+
+ INIT_LIST_HEAD(&flow->next);
+
+ list_add(&flow->next, &fmgr->n_flows);
+ break;
+ case FLOW_ALLOC_CODE__FLOW_REPLY:
+ flow = get_n_flow_by_frct_i(frct_i);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("No such flow in flow manager.");
+ return -1;
+ }
+
+ ret = ipcp_flow_alloc_reply(getpid(),
+ flow->flow.port_id,
+ msg->response);
+
+ if (msg->response < 0) {
+ shm_ap_rbuff_close(flow->flow.rb);
+ list_del(&flow->next);
+ free(flow);
+ }
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_DEALLOC:
+ flow = get_n_flow_by_frct_i(frct_i);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("No such flow in flow manager.");
+ return -1;
+ }
+
+ ret = ipcp_flow_dealloc(0, flow->flow.port_id);
+ break;
+ default:
+ LOG_ERR("Got an unknown flow allocation message.");
+ ret = -1;
+ break;
+ }
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ return ret;
}