summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c197
1 files changed, 124 insertions, 73 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index a419e9f5..d839cf1b 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -28,6 +28,7 @@
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/fqueue.h>
#include <ouroboros/errno.h>
+#include <ouroboros/cacep.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -42,15 +43,24 @@
#include "dir.h"
#include "pathname.h"
#include "ro.h"
+#include "gam.h"
#include "flow_alloc.pb-c.h"
typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */
+struct nm1_flow {
+ struct list_head next;
+ int fd;
+ qosspec_t qs;
+ struct cacep_info * info;
+};
+
struct {
flow_set_t * nm1_set[QOS_CUBE_MAX];
fqueue_t * nm1_fqs[QOS_CUBE_MAX];
+ struct list_head nm1_flows;
pthread_rwlock_t nm1_flows_lock;
flow_set_t * np1_set[QOS_CUBE_MAX];
@@ -60,21 +70,23 @@ struct {
cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
- pthread_t nm1_sdu_reader;
pthread_t np1_sdu_reader;
+ pthread_t nm1_sdu_reader;
+ pthread_t nm1_flow_wait;
/* FIXME: Replace with PFF */
int fd;
+
+ struct gam * gam;
} fmgr;
static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
- struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- int fd;
-
- int i = 0;
- int ret;
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
+ int fd;
+ int i = 0;
+ int ret;
(void) o;
@@ -118,12 +130,12 @@ static void * fmgr_np1_sdu_reader(void * o)
void * fmgr_nm1_sdu_reader(void * o)
{
- struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct shm_du_buff * sdb;
- struct pci * pci;
- int fd;
- int i = 0;
- int ret;
+ struct pci * pci;
+ int fd;
+ int i = 0;
+ int ret;
(void) o;
@@ -202,6 +214,49 @@ void * fmgr_nm1_sdu_reader(void * o)
return (void *) 0;
}
+static void * fmgr_nm1_flow_wait(void * o)
+{
+ qoscube_t cube;
+ struct cacep_info * info;
+ int fd;
+ qosspec_t qs;
+ struct nm1_flow * flow;
+
+ (void) o;
+
+ while (true) {
+ if (gam_flow_wait(fmgr.gam, &fd, &info, &qs)) {
+ LOG_ERR("Failed to get next flow descriptor.");
+ continue;;
+ }
+
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_add(fmgr.nm1_set[cube], fd);
+
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
+
+ pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL) {
+ free(info);
+ pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
+ continue;
+ }
+
+ flow->info = info;
+ flow->fd = fd;
+ flow->qs = qs;
+
+ INIT_LIST_HEAD(&flow->next);
+ list_add(&flow->next, &fmgr.nm1_flows);
+
+ pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
+ }
+
+ return (void *) 0;
+}
+
static void fmgr_destroy_flows(void)
{
int i;
@@ -224,9 +279,6 @@ int fmgr_init()
for (i = 0; i < IPCPD_MAX_CONNS; ++i)
fmgr.np1_cep_id_to_fd[i] = -1;
- pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
- pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
-
for (i = 0; i < QOS_CUBE_MAX; ++i) {
fmgr.np1_set[i] = flow_set_create();
if (fmgr.np1_set[i] == NULL) {
@@ -253,29 +305,55 @@ int fmgr_init()
}
}
+ fmgr.gam = gam_create(DT_AE);
+ if (fmgr.gam == NULL) {
+ LOG_ERR("Failed to create graph adjacency manager.");
+ fmgr_destroy_flows();
+ return -1;
+ }
+
+ INIT_LIST_HEAD(&fmgr.nm1_flows);
+
+ pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
+ pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
+
pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);
pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL);
+ pthread_create(&fmgr.nm1_flow_wait, NULL, fmgr_nm1_flow_wait, NULL);
return 0;
}
int fmgr_fini()
{
- int i;
- int j;
+ struct list_head * pos = NULL;
+ struct list_head * n = NULL;
+ qoscube_t cube;
pthread_cancel(fmgr.np1_sdu_reader);
pthread_cancel(fmgr.nm1_sdu_reader);
+ pthread_cancel(fmgr.nm1_flow_wait);
pthread_join(fmgr.np1_sdu_reader, NULL);
pthread_join(fmgr.nm1_sdu_reader, NULL);
+ pthread_join(fmgr.nm1_flow_wait, NULL);
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- for (j = 0; j < QOS_CUBE_MAX; ++j)
- if (flow_set_has(fmgr.nm1_set[j], i)) {
- flow_dealloc(i);
- flow_set_del(fmgr.nm1_set[j], i);
- }
+ gam_destroy(fmgr.gam);
+
+ pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
+
+ list_for_each_safe(pos, n, &fmgr.nm1_flows) {
+ struct nm1_flow * flow =
+ list_entry(pos, struct nm1_flow, next);
+ list_del(&flow->next);
+ flow_dealloc(flow->fd);
+ ipcp_flow_get_qoscube(flow->fd, &cube);
+ flow_set_del(fmgr.nm1_set[cube], flow->fd);
+ free(flow->info);
+ free(flow);
+ }
+
+ pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
pthread_rwlock_destroy(&fmgr.nm1_flows_lock);
pthread_rwlock_destroy(&fmgr.np1_flows_lock);
@@ -290,12 +368,12 @@ int fmgr_np1_alloc(int fd,
char * src_ae_name,
qoscube_t cube)
{
- cep_id_t cep_id;
- buffer_t buf;
+ cep_id_t cep_id;
+ buffer_t buf;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- char * path;
- uint8_t * ro_data;
- uint64_t addr;
+ char * path;
+ uint8_t * ro_data;
+ uint64_t addr;
path = pathname_create(RO_DIR);
if (path == NULL)
@@ -359,9 +437,9 @@ int fmgr_np1_alloc(int fd,
static int np1_flow_dealloc(int fd)
{
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- buffer_t buf;
- int ret;
- qoscube_t cube;
+ buffer_t buf;
+ int ret;
+ qoscube_t cube;
ipcp_flow_get_qoscube(fd, &cube);
flow_set_del(fmgr.np1_set[cube], fd);
@@ -388,10 +466,11 @@ static int np1_flow_dealloc(int fd)
return ret;
}
-int fmgr_np1_alloc_resp(int fd, int response)
+int fmgr_np1_alloc_resp(int fd,
+ int response)
{
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- buffer_t buf;
+ buffer_t buf;
msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
msg.response = response;
@@ -443,7 +522,8 @@ int fmgr_np1_dealloc(int fd)
return ret;
}
-int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf)
+int fmgr_np1_post_buf(cep_id_t cep_id,
+ buffer_t * buf)
{
int ret = 0;
int fd;
@@ -512,7 +592,8 @@ int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf)
return ret;
}
-int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb)
+int fmgr_np1_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb)
{
int fd;
@@ -530,52 +611,21 @@ int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb)
return 0;
}
-/* FIXME: do this in a topologymanager instance */
-int fmgr_nm1_add_flow(int fd)
+int fmgr_nm1_flow_arr(int fd,
+ qosspec_t qs)
{
- qoscube_t qos;
+ assert(fmgr.gam);
- if (flow_alloc_resp(fd, 0) < 0) {
- LOG_ERR("Could not respond to new flow.");
+ if (gam_flow_arr(fmgr.gam, fd, qs)) {
+ LOG_ERR("Failed to hand to connectivy manager.");
return -1;
}
- ipcp_flow_get_qoscube(fd, &qos);
- flow_set_add(fmgr.nm1_set[qos], fd);
-
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
-
- return 0;
-}
-
-int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos)
-{
- int fd;
- int result;
-
- /* FIXME: Map qos cube on correct QoS. */
- fd = flow_alloc(dst_name, DT_AE, NULL);
- if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s.", dst_name);
- return -1;
- }
-
- result = flow_alloc_res(fd);
- if (result < 0) {
- LOG_ERR("Allocate flow to %s result %d.", dst_name, result);
- return -1;
- }
-
- flow_set_add(fmgr.nm1_set[qos], fd);
-
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
-
return 0;
}
-int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb)
+int fmgr_nm1_write_sdu(struct pci * pci,
+ struct shm_du_buff * sdb)
{
if (pci == NULL || sdb == NULL)
return -1;
@@ -595,7 +645,8 @@ int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb)
return 0;
}
-int fmgr_nm1_write_buf(struct pci * pci, buffer_t * buf)
+int fmgr_nm1_write_buf(struct pci * pci,
+ buffer_t * buf)
{
buffer_t * buffer;