summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-12 14:54:18 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-12 14:54:18 +0200
commit43e2f332770007a3fcea011ffb35e8fbb24a6205 (patch)
treec792e7674d1247fa95e096f746a3559e2f4f8b7e /src/ipcpd/normal/fmgr.c
parent2ba45f5efe0486b88f91ecced451f74cc782e8a4 (diff)
downloadouroboros-43e2f332770007a3fcea011ffb35e8fbb24a6205.tar.gz
ouroboros-43e2f332770007a3fcea011ffb35e8fbb24a6205.zip
ipcpd: normal: Improve upon the internal design
This commit will remove the RMT component from the normal IPCP, as some of its functionality would else be duplicated in the FMGR. Now all reading from flows, either N-1 or N+1 is done in the FMGR, then either passed to the FRCT or a lookup is performed in the PFF (not there yet) and the PDU is forwarded.
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c591
1 files changed, 356 insertions, 235 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 25898661..8c627641 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -39,46 +39,46 @@
#include "ribmgr.h"
#include "frct.h"
#include "ipcp.h"
-#include "rmt.h"
#include "shm_pci.h"
-#include "config.h"
#include "flow_alloc.pb-c.h"
typedef FlowAllocMsg flow_alloc_msg_t;
-struct n_flow {
+#define FD_UPDATE_TIMEOUT 100 /* microseconds */
+
+struct np1_flow {
int fd;
cep_id_t cep_id;
enum qos_cube qos;
-
- struct list_head next;
};
-struct n_1_flow {
- int fd;
- char * ae_name;
- struct list_head next;
+struct nm1_flow {
+ int fd;
+ char * ae_name;
+ enum qos_cube qos;
};
struct {
- pthread_t n_1_flow_acceptor;
-
- /* FIXME: Make this a table */
- struct list_head n_1_flows;
- pthread_mutex_t n_1_flows_lock;
-
- /* FIXME: Make this a table */
- struct list_head n_flows;
- /* FIXME: Make this a read/write lock */
- pthread_mutex_t n_flows_lock;
-
- struct flow_set * set;
- pthread_t n_reader;
+ pthread_t nm1_flow_acceptor;
+ struct nm1_flow ** nm1_flows;
+ pthread_rwlock_t nm1_flows_lock;
+ flow_set_t * nm1_set;
+
+ struct np1_flow ** np1_flows;
+ struct np1_flow ** np1_flows_cep;
+ pthread_rwlock_t np1_flows_lock;
+ flow_set_t * np1_set;
+ pthread_t np1_sdu_reader;
+
+ /* FIXME: Replace with PFF */
+ int fd;
} fmgr;
-static int add_n_1_fd(int fd, char * ae_name)
+static int add_nm1_fd(int fd,
+ char * ae_name,
+ enum qos_cube qos)
{
- struct n_1_flow * tmp;
+ struct nm1_flow * tmp;
if (ae_name == NULL)
return -1;
@@ -89,45 +89,39 @@ static int add_n_1_fd(int fd, char * ae_name)
tmp->fd = fd;
tmp->ae_name = ae_name;
+ tmp->qos = qos;
- INIT_LIST_HEAD(&tmp->next);
+ pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
+ fmgr.nm1_flows[fd] = tmp;
+ pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
- pthread_mutex_lock(&fmgr.n_1_flows_lock);
- list_add(&tmp->next, &fmgr.n_1_flows);
- pthread_mutex_unlock(&fmgr.n_1_flows_lock);
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
return 0;
}
-/* Call under n_flows lock */
-static struct n_flow * get_n_flow_by_fd(int fd)
+static int add_np1_fd(int fd,
+ cep_id_t cep_id,
+ enum qos_cube qos)
{
- struct list_head * pos = NULL;
+ struct np1_flow * flow;
- list_for_each(pos, &fmgr.n_flows) {
- struct n_flow * e = list_entry(pos, struct n_flow, next);
- if (e->fd == fd)
- return e;
- }
-
- return NULL;
-}
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL)
+ return -1;
-/* Call under n_flows lock */
-static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id)
-{
- struct list_head * pos = NULL;
+ flow->cep_id = cep_id;
+ flow->qos = qos;
+ flow->fd = fd;
- list_for_each(pos, &fmgr.n_flows) {
- struct n_flow * e = list_entry(pos, struct n_flow, next);
- if (e->cep_id == cep_id)
- return e;
- }
+ fmgr.np1_flows[fd] = flow;
+ fmgr.np1_flows_cep[fd] = flow;
- return NULL;
+ return 0;
}
-static void * fmgr_n_1_acceptor(void * o)
+static void * fmgr_nm1_acceptor(void * o)
{
int fd;
char * ae_name;
@@ -175,16 +169,8 @@ static void * fmgr_n_1_acceptor(void * o)
}
}
- if (strcmp(ae_name, DT_AE) == 0) {
- /* FIXME: Pass correct QoS cube */
- if (rmt_dt_flow(fd, 0)) {
- LOG_ERR("Failed to hand fd to FRCT.");
- flow_dealloc(fd);
- continue;
- }
- }
-
- if (add_n_1_fd(fd, ae_name)) {
+ /* FIXME: Pass correct QoS cube */
+ if (add_nm1_fd(fd, ae_name, QOS_CUBE_BE)) {
LOG_ERR("Failed to add file descriptor to list.");
flow_dealloc(fd);
continue;
@@ -194,14 +180,14 @@ static void * fmgr_n_1_acceptor(void * o)
return (void *) 0;
}
-static void * fmgr_n_reader(void * o)
+static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- struct n_flow * flow;
+ struct np1_flow * flow;
while (true) {
- int fd = flow_select(fmgr.set, &timeout);
+ int fd = flow_select(fmgr.np1_set, &timeout);
if (fd == -ETIMEDOUT)
continue;
@@ -215,172 +201,194 @@ static void * fmgr_n_reader(void * o)
continue;
}
- pthread_mutex_lock(&fmgr.n_flows_lock);
- flow = get_n_flow_by_fd(fd);
+ pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
+ flow = fmgr.np1_flows[fd];
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
ipcp_flow_del(sdb);
LOG_ERR("Failed to retrieve flow.");
continue;
}
if (frct_i_write_sdu(flow->cep_id, sdb)) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
ipcp_flow_del(sdb);
LOG_ERR("Failed to hand SDU to FRCT.");
continue;
}
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
}
return (void *) 0;
}
-int fmgr_init()
+void * fmgr_nm1_sdu_reader(void * o)
{
- INIT_LIST_HEAD(&fmgr.n_1_flows);
- INIT_LIST_HEAD(&fmgr.n_flows);
-
- pthread_mutex_init(&fmgr.n_1_flows_lock, NULL);
- pthread_mutex_init(&fmgr.n_flows_lock, NULL);
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
+ struct shm_du_buff * sdb;
+ struct pci * pci;
- fmgr.set = flow_set_create();
- if (fmgr.set == NULL)
- return -1;
+ while (true) {
+ int fd = flow_select(fmgr.nm1_set, &timeout);
+ if (fd == -ETIMEDOUT)
+ continue;
- pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL);
- pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL);
+ if (fd < 0) {
+ LOG_ERR("Failed to get active fd.");
+ continue;
+ }
- return 0;
-}
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
-int fmgr_fini()
-{
- struct list_head * pos = NULL;
+ pci = shm_pci_des(sdb);
+ if (pci == NULL) {
+ LOG_ERR("Failed to get PCI.");
+ ipcp_flow_del(sdb);
+ continue;
+ }
- pthread_cancel(fmgr.n_1_flow_acceptor);
- pthread_cancel(fmgr.n_reader);
+ if (pci->dst_addr != ribmgr_address()) {
+ LOG_DBG("PDU needs to be forwarded.");
- pthread_join(fmgr.n_1_flow_acceptor, NULL);
- pthread_join(fmgr.n_reader, NULL);
+ if (pci->ttl == 0) {
+ LOG_DBG("TTL was zero.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
- list_for_each(pos, &fmgr.n_1_flows) {
- struct n_1_flow * e = list_entry(pos, struct n_1_flow, next);
- if (e->ae_name != NULL)
- free(e->ae_name);
- if (ribmgr_remove_flow(e->fd))
- LOG_ERR("Failed to remove management flow.");
- }
+ if (shm_pci_dec_ttl(sdb)) {
+ LOG_ERR("Failed to decrease TTL.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+ /*
+ * FIXME: Dropping for now, since
+ * we don't have a PFF yet
+ */
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
- pthread_mutex_destroy(&fmgr.n_1_flows_lock);
- pthread_mutex_destroy(&fmgr.n_flows_lock);
+ if (shm_pci_shrink(sdb)) {
+ LOG_ERR("Failed to shrink PDU.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
- flow_set_destroy(fmgr.set);
+ if (frct_nm1_post_sdu(pci, sdb)) {
+ LOG_ERR("Failed to hand PDU to FRCT.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+ }
- return 0;
+ return (void *) 0;
}
-int fmgr_mgmt_flow(char * dst_name)
+int fmgr_init()
{
- int fd;
- int result;
- char * ae_name;
+ int i;
- ae_name = strdup(MGMT_AE);
- if (ae_name == NULL)
+ fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS);
+ if (fmgr.nm1_flows == NULL)
return -1;
- /* FIXME: Request retransmission. */
- fd = flow_alloc(dst_name, MGMT_AE, NULL);
- if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s", dst_name);
- free(ae_name);
+ fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS);
+ if (fmgr.np1_flows == NULL) {
+ free(fmgr.nm1_flows);
return -1;
}
- result = flow_alloc_res(fd);
- if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d",
- dst_name, result);
- free(ae_name);
+ fmgr.np1_flows_cep =
+ malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS);
+ if (fmgr.np1_flows_cep == NULL) {
+ free(fmgr.np1_flows);
+ free(fmgr.nm1_flows);
return -1;
}
- if (ribmgr_add_flow(fd)) {
- LOG_ERR("Failed to hand file descriptor to RIB manager");
- flow_dealloc(fd);
- free(ae_name);
+ for (i = 0; i < IRMD_MAX_FLOWS; i++) {
+ fmgr.nm1_flows[i] = NULL;
+ fmgr.np1_flows[i] = NULL;
+ fmgr.np1_flows_cep[i] = NULL;
+ }
+
+ pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
+ pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
+
+ fmgr.np1_set = flow_set_create();
+ if (fmgr.np1_set == NULL) {
+ free(fmgr.np1_flows_cep);
+ free(fmgr.np1_flows);
+ free(fmgr.nm1_flows);
return -1;
}
- if (add_n_1_fd(fd, ae_name)) {
- LOG_ERR("Failed to add file descriptor to list.");
- flow_dealloc(fd);
+ fmgr.nm1_set = flow_set_create();
+ if (fmgr.nm1_set == NULL) {
+ flow_set_destroy(fmgr.np1_set);
+ free(fmgr.np1_flows_cep);
+ free(fmgr.np1_flows);
+ free(fmgr.nm1_flows);
return -1;
}
+ pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL);
+ pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);
+
return 0;
}
-int fmgr_dt_flow(char * dst_name, enum qos_cube qos)
+int fmgr_fini()
{
- int fd;
- int result;
- char * ae_name;
+ int i;
- ae_name = strdup(DT_AE);
- if (ae_name == NULL)
- return -1;
+ pthread_cancel(fmgr.nm1_flow_acceptor);
+ pthread_cancel(fmgr.np1_sdu_reader);
- /* FIXME: Map qos cube on correct QoS. */
- fd = flow_alloc(dst_name, DT_AE, NULL);
- if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s", dst_name);
- free(ae_name);
- return -1;
- }
+ pthread_join(fmgr.nm1_flow_acceptor, NULL);
+ pthread_join(fmgr.np1_sdu_reader, NULL);
- result = flow_alloc_res(fd);
- if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d",
- dst_name, result);
- free(ae_name);
- return -1;
+ for (i = 0; i < IRMD_MAX_FLOWS; i++) {
+ if (fmgr.nm1_flows[i] == NULL)
+ continue;
+ if (fmgr.nm1_flows[i]->ae_name != NULL)
+ free(fmgr.nm1_flows[i]->ae_name);
+ if (ribmgr_remove_flow(fmgr.nm1_flows[i]->fd))
+ LOG_ERR("Failed to remove management flow.");
}
- if (rmt_dt_flow(fd, qos)) {
- LOG_ERR("Failed to hand file descriptor to FRCT");
- flow_dealloc(fd);
- free(ae_name);
- return -1;
- }
+ pthread_rwlock_destroy(&fmgr.nm1_flows_lock);
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- if (add_n_1_fd(fd, ae_name)) {
- LOG_ERR("Failed to add file descriptor to list.");
- flow_dealloc(fd);
- free(ae_name);
- return -1;
- }
+ flow_set_destroy(fmgr.nm1_set);
+ flow_set_destroy(fmgr.np1_set);
+ free(fmgr.np1_flows_cep);
+ free(fmgr.np1_flows);
+ free(fmgr.nm1_flows);
return 0;
}
-int fmgr_flow_alloc(int fd,
+int fmgr_np1_alloc(int fd,
char * dst_ap_name,
char * src_ae_name,
enum qos_cube qos)
{
- struct n_flow * flow;
cep_id_t cep_id;
uint32_t address = 0;
buffer_t buf;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- flow = malloc(sizeof(*flow));
- if (flow == NULL)
- return -1;
-
/* FIXME: Obtain correct address here from DIF NSM */
msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
@@ -390,55 +398,47 @@ int fmgr_flow_alloc(int fd,
msg.has_qos_cube = true;
buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0) {
- free(flow);
+ if (buf.len == 0)
return -1;
- }
buf.data = malloc(buf.len);
- if (buf.data == NULL) {
- free(flow);
+ if (buf.data == NULL)
return -1;
- }
flow_alloc_msg__pack(&msg, buf.data);
- pthread_mutex_lock(&fmgr.n_flows_lock);
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
cep_id = frct_i_create(address, &buf, qos);
if (cep_id == INVALID_CEP_ID) {
free(buf.data);
- free(flow);
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return -1;
}
free(buf.data);
- flow->fd = fd;
- flow->cep_id = cep_id;
- flow->qos = qos;
-
- INIT_LIST_HEAD(&flow->next);
-
- list_add(&flow->next, &fmgr.n_flows);
+ if (add_np1_fd(fd, cep_id, qos)) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ return -1;
+ }
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return 0;
}
-/* Call under n_flows lock */
-static int n_flow_dealloc(int fd)
+/* Call under np1_flows lock */
+static int np1_flow_dealloc(int fd)
{
- struct n_flow * flow;
+ struct np1_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
int ret;
- flow_set_del(fmgr.set, fd);
+ flow_set_del(fmgr.np1_set, fd);
- flow = get_n_flow_by_fd(fd);
+ flow = fmgr.np1_flows[fd];
if (flow == NULL)
return -1;
@@ -455,7 +455,9 @@ static int n_flow_dealloc(int fd)
flow_alloc_msg__pack(&msg, buf.data);
ret = frct_i_destroy(flow->cep_id, &buf);
- list_del(&flow->next);
+
+ fmgr.np1_flows[fd] = NULL;
+ fmgr.np1_flows_cep[flow->cep_id] = NULL;
free(flow);
free(buf.data);
@@ -463,17 +465,17 @@ static int n_flow_dealloc(int fd)
return ret;
}
-int fmgr_flow_alloc_resp(int fd, int response)
+int fmgr_np1_alloc_resp(int fd, int response)
{
- struct n_flow * flow;
+ struct np1_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
- pthread_mutex_lock(&fmgr.n_flows_lock);
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
- flow = get_n_flow_by_fd(fd);
+ flow = fmgr.np1_flows[fd];
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return -1;
}
@@ -483,13 +485,13 @@ int fmgr_flow_alloc_resp(int fd, int response)
buf.len = flow_alloc_msg__get_packed_size(&msg);
if (buf.len == 0) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return -1;
}
buf.data = malloc(buf.len);
if (buf.data == NULL) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return -1;
}
@@ -498,84 +500,76 @@ int fmgr_flow_alloc_resp(int fd, int response)
if (response < 0) {
frct_i_destroy(flow->cep_id, &buf);
free(buf.data);
- list_del(&flow->next);
+ fmgr.np1_flows[fd] = NULL;
+ fmgr.np1_flows_cep[flow->cep_id] = NULL;
free(flow);
} else {
if (frct_i_accept(flow->cep_id, &buf, flow->qos)) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return -1;
}
- flow_set_add(fmgr.set, fd);
+ flow_set_add(fmgr.np1_set, fd);
}
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return 0;
}
-int fmgr_flow_dealloc(int fd)
+int fmgr_np1_dealloc(int fd)
{
int ret;
- pthread_mutex_lock(&fmgr.n_flows_lock);
- ret = n_flow_dealloc(fd);
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+ ret = np1_flow_dealloc(fd);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return ret;
}
-int fmgr_frct_post_buf(cep_id_t cep_id,
- buffer_t * buf)
+int fmgr_np1_post_buf(cep_id_t cep_id,
+ buffer_t * buf)
{
- struct n_flow * flow;
+ struct np1_flow * flow;
int ret = 0;
int fd;
flow_alloc_msg_t * msg;
- pthread_mutex_lock(&fmgr.n_flows_lock);
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
/* Depending on the message call the function in ipcp-dev.h */
msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
if (msg == NULL) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to unpack flow alloc message");
return -1;
}
switch (msg->code) {
case FLOW_ALLOC_CODE__FLOW_REQ:
- flow = malloc(sizeof(*flow));
- if (flow == NULL) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
- }
-
- flow->cep_id = cep_id;
- flow->qos = msg->qos_cube;
-
fd = ipcp_flow_req_arr(getpid(),
msg->dst_name,
msg->src_ae_name);
if (fd < 0) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
- free(flow);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("Failed to get fd for flow.");
return -1;
}
- flow->fd = fd;
-
- INIT_LIST_HEAD(&flow->next);
+ if (add_np1_fd(fd, cep_id, msg->qos_cube)) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("Failed to add np1 flow.");
+ return -1;
+ }
- list_add(&flow->next, &fmgr.n_flows);
break;
case FLOW_ALLOC_CODE__FLOW_REPLY:
- flow = get_n_flow_by_cep_id(cep_id);
+ flow = fmgr.np1_flows_cep[cep_id];
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("No such flow in flow manager.");
return -1;
@@ -583,23 +577,24 @@ int fmgr_frct_post_buf(cep_id_t cep_id,
ret = ipcp_flow_alloc_reply(flow->fd, msg->response);
if (msg->response < 0) {
- list_del(&flow->next);
+ fmgr.np1_flows[flow->fd] = NULL;
+ fmgr.np1_flows_cep[cep_id] = NULL;
free(flow);
} else {
- flow_set_add(fmgr.set, flow->fd);
+ flow_set_add(fmgr.np1_set, flow->fd);
}
break;
case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- flow = get_n_flow_by_cep_id(cep_id);
+ flow = fmgr.np1_flows_cep[cep_id];
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("No such flow in flow manager.");
return -1;
}
- flow_set_del(fmgr.set, flow->fd);
+ flow_set_del(fmgr.np1_set, flow->fd);
ret = flow_dealloc(flow->fd);
break;
@@ -609,34 +604,160 @@ int fmgr_frct_post_buf(cep_id_t cep_id,
break;
}
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
return ret;
}
-int fmgr_frct_post_sdu(cep_id_t cep_id,
- struct shm_du_buff * sdb)
+int fmgr_np1_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb)
{
- struct n_flow * flow;
+ struct np1_flow * flow;
- pthread_mutex_lock(&fmgr.n_flows_lock);
+ pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = get_n_flow_by_cep_id(cep_id);
+ flow = fmgr.np1_flows_cep[cep_id];
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to find N flow.");
return -1;
}
if (ipcp_flow_write(flow->fd, sdb)) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to hand SDU to N flow.");
return -1;
}
- pthread_mutex_unlock(&fmgr.n_flows_lock);
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+
+ return 0;
+}
+
+int fmgr_nm1_mgmt_flow(char * dst_name)
+{
+ int fd;
+ int result;
+ char * ae_name;
+
+ ae_name = strdup(MGMT_AE);
+ if (ae_name == NULL)
+ return -1;
+
+ /* FIXME: Request retransmission. */
+ fd = flow_alloc(dst_name, MGMT_AE, NULL);
+ if (fd < 0) {
+ LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
+ return -1;
+ }
+
+ result = flow_alloc_res(fd);
+ if (result < 0) {
+ LOG_ERR("Result of flow allocation to %s is %d",
+ dst_name, result);
+ free(ae_name);
+ return -1;
+ }
+
+ if (ribmgr_add_flow(fd)) {
+ LOG_ERR("Failed to hand file descriptor to RIB manager");
+ flow_dealloc(fd);
+ free(ae_name);
+ return -1;
+ }
+
+ /* FIXME: Pass correct QoS cube */
+ if (add_nm1_fd(fd, ae_name, QOS_CUBE_BE)) {
+ LOG_ERR("Failed to add file descriptor to list.");
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+int fmgr_nm1_dt_flow(char * dst_name,
+ enum qos_cube qos)
+{
+ int fd;
+ int result;
+ char * ae_name;
+
+ ae_name = strdup(DT_AE);
+ if (ae_name == NULL)
+ return -1;
+
+ /* FIXME: Map qos cube on correct QoS. */
+ fd = flow_alloc(dst_name, DT_AE, NULL);
+ if (fd < 0) {
+ LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
+ return -1;
+ }
+
+ result = flow_alloc_res(fd);
+ if (result < 0) {
+ LOG_ERR("Result of flow allocation to %s is %d",
+ dst_name, result);
+ free(ae_name);
+ return -1;
+ }
+
+ if (add_nm1_fd(fd, ae_name, qos)) {
+ LOG_ERR("Failed to add file descriptor to list.");
+ flow_dealloc(fd);
+ free(ae_name);
+ return -1;
+ }
+
+ return 0;
+}
+
+int fmgr_nm1_write_sdu(struct pci * pci,
+ struct shm_du_buff * sdb)
+{
+ if (pci == NULL || sdb == NULL)
+ return -1;
+
+ if (shm_pci_ser(sdb, pci)) {
+ LOG_ERR("Failed to serialize PDU.");
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ if (ipcp_flow_write(fmgr.fd, sdb)) {
+ LOG_ERR("Failed to write SDU to fd %d.", fmgr.fd);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ return 0;
+}
+
+int fmgr_nm1_write_buf(struct pci * pci,
+ buffer_t * buf)
+{
+ buffer_t * buffer;
+
+ if (pci == NULL || buf == NULL || buf->data == NULL)
+ return -1;
+
+ buffer = shm_pci_ser_buf(buf, pci);
+ if (buffer == NULL) {
+ LOG_ERR("Failed to serialize buffer.");
+ free(buf->data);
+ return -1;
+ }
+
+ if (flow_write(fmgr.fd, buffer->data, buffer->len) == -1) {
+ LOG_ERR("Failed to write buffer to fd.");
+ free(buffer);
+ return -1;
+ }
+ free(buffer);
return 0;
}