summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c390
1 files changed, 152 insertions, 238 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 8e416aa4..6684db7c 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -49,82 +49,31 @@ typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */
-struct np1_flow {
- int fd;
- cep_id_t cep_id;
- enum qos_cube qos;
-};
-
-struct nm1_flow {
- int fd;
- enum qos_cube qos;
-};
-
struct {
- pthread_t nm1_flow_acceptor;
- struct nm1_flow ** nm1_flows;
+ flow_set_t * nm1_set[QOS_CUBE_MAX];
+ fqueue_t * nm1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t nm1_flows_lock;
- flow_set_t * nm1_set;
- pthread_t nm1_sdu_reader;
- struct np1_flow ** np1_flows;
- struct np1_flow ** np1_flows_cep;
+ flow_set_t * np1_set[QOS_CUBE_MAX];
+ fqueue_t * np1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t np1_flows_lock;
- flow_set_t * np1_set;
+
+ cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
+ int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
+
+ pthread_t nm1_flow_acceptor;
+ pthread_t nm1_sdu_reader;
pthread_t np1_sdu_reader;
/* FIXME: Replace with PFF */
int fd;
} fmgr;
-static int add_nm1_fd(int fd,
- enum qos_cube qos)
-{
- struct nm1_flow * tmp;
-
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL)
- return -1;
-
- tmp->fd = fd;
- tmp->qos = qos;
-
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
- fmgr.nm1_flows[fd] = tmp;
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
-
- flow_set_add(fmgr.nm1_set, fd);
-
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
-
- return 0;
-}
-
-static int add_np1_fd(int fd,
- cep_id_t cep_id,
- enum qos_cube qos)
-{
- struct np1_flow * flow;
-
- flow = malloc(sizeof(*flow));
- if (flow == NULL)
- return -1;
-
- flow->cep_id = cep_id;
- flow->qos = qos;
- flow->fd = fd;
-
- fmgr.np1_flows[fd] = flow;
- fmgr.np1_flows_cep[cep_id] = flow;
-
- return 0;
-}
-
static void * fmgr_nm1_acceptor(void * o)
{
int fd;
char * ae_name;
+ qoscube_t cube;
qosspec_t qs;
(void) o;
@@ -150,13 +99,13 @@ static void * fmgr_nm1_acceptor(void * o)
if (!(strcmp(ae_name, MGMT_AE) == 0 ||
strcmp(ae_name, DT_AE) == 0)) {
if (flow_alloc_resp(fd, -1))
- LOG_ERR("Failed to reply to flow allocation.");
+ LOG_WARN("Failed to reply to flow allocation.");
flow_dealloc(fd);
continue;
}
if (flow_alloc_resp(fd, 0)) {
- LOG_ERR("Failed to reply to flow allocation.");
+ LOG_WARN("Failed to reply to flow allocation.");
flow_dealloc(fd);
continue;
}
@@ -166,17 +115,19 @@ static void * fmgr_nm1_acceptor(void * o)
if (strcmp(ae_name, MGMT_AE) == 0) {
if (ribmgr_add_flow(fd)) {
- LOG_ERR("Failed to hand fd to RIB.");
+ LOG_WARN("Failed to hand fd to RIB.");
flow_dealloc(fd);
continue;
}
} else {
- /* FIXME: Pass correct QoS cube */
- if (add_nm1_fd(fd, QOS_CUBE_BE)) {
- LOG_ERR("Failed to add fd to list.");
+ ipcp_flow_get_qoscube(fd, &cube);
+ if (flow_set_add(fmgr.nm1_set[cube], fd)) {
+ LOG_WARN("Failed to add fd.");
flow_dealloc(fd);
continue;
}
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
}
free(ae_name);
@@ -189,44 +140,40 @@ static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- struct np1_flow * flow;
int fd;
- fqueue_t * fq = fqueue_create();
- if (fq == NULL)
- return (void *) 1;
+
+ int i = 0;
+ int ret;
(void) o;
while (true) {
- int ret = flow_event_wait(fmgr.np1_set, fq, &timeout);
+ /* FIXME: replace with scheduling policy call */
+ i = (i + 1) % QOS_CUBE_MAX;
+
+ ret = flow_event_wait(fmgr.np1_set[i],
+ fmgr.np1_fqs[i],
+ &timeout);
if (ret == -ETIMEDOUT)
continue;
if (ret < 0) {
- LOG_ERR("Event error: %d.", ret);
+ LOG_WARN("Event error: %d.", ret);
continue;
}
- while ((fd = fqueue_next(fq)) >= 0) {
+ while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
+ LOG_WARN("Failed to read SDU from fd %d.", fd);
continue;
}
pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to retrieve flow.");
- continue;
- }
-
- if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
ipcp_flow_del(sdb);
- LOG_ERR("Failed to hand SDU to FRCT.");
+ LOG_WARN("Failed to hand SDU to FRCT.");
continue;
}
@@ -244,14 +191,18 @@ void * fmgr_nm1_sdu_reader(void * o)
struct shm_du_buff * sdb;
struct pci * pci;
int fd;
- fqueue_t * fq = fqueue_create();
- if (fq == NULL)
- return (void *) 1;
+ int i = 0;
+ int ret;
(void) o;
while (true) {
- int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout);
+ /* FIXME: replace with scheduling policy call */
+ i = (i + 1) % QOS_CUBE_MAX;
+
+ ret = flow_event_wait(fmgr.nm1_set[i],
+ fmgr.nm1_fqs[i],
+ &timeout);
if (ret == -ETIMEDOUT)
continue;
@@ -260,7 +211,7 @@ void * fmgr_nm1_sdu_reader(void * o)
continue;
}
- while ((fd = fqueue_next(fq)) >= 0) {
+ while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
LOG_ERR("Failed to read SDU from fd %d.", fd);
continue;
@@ -320,52 +271,55 @@ void * fmgr_nm1_sdu_reader(void * o)
return (void *) 0;
}
-int fmgr_init()
+static void fmgr_destroy_flows(void)
{
int i;
- fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS);
- if (fmgr.nm1_flows == NULL)
- return -1;
-
- fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS);
- if (fmgr.np1_flows == NULL) {
- free(fmgr.nm1_flows);
- return -1;
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ flow_set_destroy(fmgr.nm1_set[i]);
+ flow_set_destroy(fmgr.np1_set[i]);
+ fqueue_destroy(fmgr.nm1_fqs[i]);
+ fqueue_destroy(fmgr.np1_fqs[i]);
}
+}
- fmgr.np1_flows_cep =
- malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS);
- if (fmgr.np1_flows_cep == NULL) {
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
- }
+int fmgr_init()
+{
+ int i;
- for (i = 0; i < IRMD_MAX_FLOWS; i++) {
- fmgr.nm1_flows[i] = NULL;
- fmgr.np1_flows[i] = NULL;
- fmgr.np1_flows_cep[i] = NULL;
- }
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
+
+ for (i = 0; i < IPCPD_MAX_CONNS; ++i)
+ fmgr.np1_cep_id_to_fd[i] = -1;
pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
- fmgr.np1_set = flow_set_create();
- if (fmgr.np1_set == NULL) {
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
- }
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fmgr.np1_set[i] = flow_set_create();
+ if (fmgr.np1_set == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
- fmgr.nm1_set = flow_set_create();
- if (fmgr.nm1_set == NULL) {
- flow_set_destroy(fmgr.np1_set);
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
+ fmgr.np1_fqs[i] = fqueue_create();
+ if (fmgr.np1_fqs[i] == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
+
+ fmgr.nm1_set[i] = flow_set_create();
+ if (fmgr.nm1_set == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
+
+ fmgr.nm1_fqs[i] = fqueue_create();
+ if (fmgr.nm1_fqs[i] == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
}
pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL);
@@ -378,6 +332,7 @@ int fmgr_init()
int fmgr_fini()
{
int i;
+ int j;
pthread_cancel(fmgr.nm1_flow_acceptor);
pthread_cancel(fmgr.np1_sdu_reader);
@@ -387,29 +342,25 @@ int fmgr_fini()
pthread_join(fmgr.np1_sdu_reader, NULL);
pthread_join(fmgr.nm1_sdu_reader, NULL);
- for (i = 0; i < IRMD_MAX_FLOWS; i++) {
- if (fmgr.nm1_flows[i] == NULL)
- continue;
- flow_dealloc(fmgr.nm1_flows[i]->fd);
- free(fmgr.nm1_flows[i]);
- }
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ for (j = 0; j < QOS_CUBE_MAX; ++j)
+ if (flow_set_has(fmgr.nm1_set[j], i)) {
+ flow_dealloc(i);
+ flow_set_del(fmgr.nm1_set[j], i);
+ }
pthread_rwlock_destroy(&fmgr.nm1_flows_lock);
pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- flow_set_destroy(fmgr.nm1_set);
- flow_set_destroy(fmgr.np1_set);
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
+ fmgr_destroy_flows();
return 0;
}
-int fmgr_np1_alloc(int fd,
- char * dst_ap_name,
- char * src_ae_name,
- enum qos_cube qos)
+int fmgr_np1_alloc(int fd,
+ char * dst_ap_name,
+ char * src_ae_name,
+ qoscube_t qos)
{
cep_id_t cep_id;
buffer_t buf;
@@ -478,10 +429,8 @@ int fmgr_np1_alloc(int fd,
free(ro_data);
- if (add_np1_fd(fd, cep_id, qos)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
+ fmgr.np1_fd_to_cep_id[fd] = cep_id;
+ fmgr.np1_cep_id_to_fd[cep_id] = fd;
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
@@ -491,16 +440,13 @@ int fmgr_np1_alloc(int fd,
/* Call under np1_flows lock */
static int np1_flow_dealloc(int fd)
{
- struct np1_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
int ret;
+ qoscube_t cube;
- flow_set_del(fmgr.np1_set, fd);
-
- flow = fmgr.np1_flows[fd];
- if (flow == NULL)
- return -1;
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_del(fmgr.np1_set[cube], fd);
msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
@@ -510,16 +456,15 @@ static int np1_flow_dealloc(int fd)
buf.data = malloc(buf.len);
if (buf.data == NULL)
- return -1;
+ return -ENOMEM;
flow_alloc_msg__pack(&msg, buf.data);
- ret = frct_i_destroy(flow->cep_id, &buf);
+ ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
- fmgr.np1_flows[fd] = NULL;
- fmgr.np1_flows_cep[flow->cep_id] = NULL;
+ fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID;
+ fmgr.np1_fd_to_cep_id[fd] = -1;
- free(flow);
free(buf.data);
return ret;
@@ -527,48 +472,39 @@ static int np1_flow_dealloc(int fd)
int fmgr_np1_alloc_resp(int fd, int response)
{
- struct np1_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
-
msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
msg.response = response;
msg.has_response = true;
buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ if (buf.len == 0)
return -1;
- }
buf.data = malloc(buf.len);
- if (buf.data == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
+ if (buf.data == NULL)
+ return -ENOMEM;
flow_alloc_msg__pack(&msg, buf.data);
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
if (response < 0) {
- frct_i_destroy(flow->cep_id, &buf);
+ frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
free(buf.data);
- fmgr.np1_flows[fd] = NULL;
- fmgr.np1_flows_cep[flow->cep_id] = NULL;
- free(flow);
+ fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]]
+ = INVALID_CEP_ID;
+ fmgr.np1_fd_to_cep_id[fd] = -1;
} else {
- if (frct_i_accept(flow->cep_id, &buf, flow->qos)) {
+ qoscube_t cube;
+ ipcp_flow_get_qoscube(fd, &cube);
+ if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return -1;
}
- flow_set_add(fmgr.np1_set, fd);
+ flow_set_add(fmgr.np1_set[cube], fd);
}
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
@@ -581,27 +517,25 @@ int fmgr_np1_dealloc(int fd)
int ret;
pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
ret = np1_flow_dealloc(fd);
+
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return ret;
}
-int fmgr_np1_post_buf(cep_id_t cep_id,
- buffer_t * buf)
+int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf)
{
- struct np1_flow * flow;
int ret = 0;
int fd;
flow_alloc_msg_t * msg;
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+ qoscube_t cube;
/* Depending on the message call the function in ipcp-dev.h */
msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
if (msg == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to unpack flow alloc message");
return -1;
}
@@ -612,51 +546,41 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
msg->dst_name,
msg->src_ae_name);
if (fd < 0) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("Failed to get fd for flow.");
return -1;
}
- if (add_np1_fd(fd, cep_id, msg->qos_cube)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("Failed to add np1 flow.");
- return -1;
- }
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
+ fmgr.np1_fd_to_cep_id[fd] = cep_id;
+ fmgr.np1_cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
break;
case FLOW_ALLOC_CODE__FLOW_REPLY:
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("No such flow in flow manager.");
- return -1;
- }
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
- ret = ipcp_flow_alloc_reply(flow->fd, msg->response);
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ ret = ipcp_flow_alloc_reply(fd, msg->response);
if (msg->response < 0) {
- fmgr.np1_flows[flow->fd] = NULL;
- fmgr.np1_flows_cep[cep_id] = NULL;
- free(flow);
+ fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID;
+ fmgr.np1_cep_id_to_fd[cep_id] = -1;
} else {
- flow_set_add(fmgr.np1_set, flow->fd);
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_add(fmgr.np1_set[cube],
+ fmgr.np1_cep_id_to_fd[cep_id]);
}
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+
break;
case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("No such flow in flow manager.");
- return -1;
- }
-
- flow_set_del(fmgr.np1_set, flow->fd);
-
- ret = flow_dealloc(flow->fd);
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_del(fmgr.np1_set[cube], fd);
+ ret = flow_dealloc(fd);
break;
default:
LOG_ERR("Got an unknown flow allocation message.");
@@ -674,18 +598,12 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
int fmgr_np1_post_sdu(cep_id_t cep_id,
struct shm_du_buff * sdb)
{
- struct np1_flow * flow;
+ int fd;
pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- LOG_ERR("Failed to find N flow.");
- return -1;
- }
-
- if (ipcp_flow_write(flow->fd, sdb)) {
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ if (ipcp_flow_write(fd, sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to hand SDU to N flow.");
return -1;
@@ -704,19 +622,19 @@ int fmgr_nm1_mgmt_flow(char * dst_name)
/* FIXME: Request retransmission. */
fd = flow_alloc(dst_name, MGMT_AE, NULL);
if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s", dst_name);
+ LOG_ERR("Failed to allocate flow to %s.", dst_name);
return -1;
}
result = flow_alloc_res(fd);
if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d",
+ LOG_ERR("Result of flow allocation to %s is %d.",
dst_name, result);
return -1;
}
if (ribmgr_add_flow(fd)) {
- LOG_ERR("Failed to hand file descriptor to RIB manager");
+ LOG_ERR("Failed to hand file descriptor to RIB manager.");
flow_dealloc(fd);
return -1;
}
@@ -724,8 +642,7 @@ int fmgr_nm1_mgmt_flow(char * dst_name)
return 0;
}
-int fmgr_nm1_dt_flow(char * dst_name,
- enum qos_cube qos)
+int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos)
{
int fd;
int result;
@@ -733,28 +650,25 @@ int fmgr_nm1_dt_flow(char * dst_name,
/* FIXME: Map qos cube on correct QoS. */
fd = flow_alloc(dst_name, DT_AE, NULL);
if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s", dst_name);
+ LOG_ERR("Failed to allocate flow to %s.", dst_name);
return -1;
}
result = flow_alloc_res(fd);
if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d",
- dst_name, result);
+ LOG_ERR("Allocate flow to %s result %d.", dst_name, result);
return -1;
}
- if (add_nm1_fd(fd, qos)) {
- LOG_ERR("Failed to add file descriptor to list.");
- flow_dealloc(fd);
- return -1;
- }
+ flow_set_add(fmgr.nm1_set[qos], fd);
+
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
return 0;
}
-int fmgr_nm1_write_sdu(struct pci * pci,
- struct shm_du_buff * sdb)
+int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb)
{
if (pci == NULL || sdb == NULL)
return -1;