diff options
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 203 |
1 files changed, 150 insertions, 53 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index b6ec1984..25898661 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,6 +27,8 @@ #include <ouroboros/dev.h> #include <ouroboros/list.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/select.h> +#include <ouroboros/errno.h> #include <stdlib.h> #include <stdbool.h> @@ -37,33 +39,41 @@ #include "ribmgr.h" #include "frct.h" #include "ipcp.h" +#include "rmt.h" +#include "shm_pci.h" +#include "config.h" #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; struct n_flow { - int fd; - struct frct_i * frct_i; + int fd; + cep_id_t cep_id; enum qos_cube qos; struct list_head next; }; struct n_1_flow { - int fd; - char * ae_name; + int fd; + char * ae_name; struct list_head next; }; struct { - pthread_t listen_thread; + pthread_t n_1_flow_acceptor; + /* FIXME: Make this a table */ struct list_head n_1_flows; pthread_mutex_t n_1_flows_lock; + /* FIXME: Make this a table */ struct list_head n_flows; /* FIXME: Make this a read/write lock */ pthread_mutex_t n_flows_lock; + + struct flow_set * set; + pthread_t n_reader; } fmgr; static int add_n_1_fd(int fd, char * ae_name) @@ -89,9 +99,37 @@ static int add_n_1_fd(int fd, char * ae_name) return 0; } -static void * fmgr_listen(void * o) +/* Call under n_flows lock */ +static struct n_flow * get_n_flow_by_fd(int fd) { - int fd; + struct list_head * pos = NULL; + + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->fd == fd) + return e; + } + + return NULL; +} + +/* Call under n_flows lock */ +static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->cep_id == cep_id) + return e; + } + + return NULL; +} + +static void * fmgr_n_1_acceptor(void * o) +{ + int fd; char * ae_name; while (true) { @@ -139,7 +177,7 @@ static void * fmgr_listen(void * o) if (strcmp(ae_name, DT_AE) == 0) { /* FIXME: Pass correct QoS cube */ - if (frct_dt_flow(fd, 0)) { + if (rmt_dt_flow(fd, 0)) { LOG_ERR("Failed to hand fd to FRCT."); flow_dealloc(fd); continue; @@ -156,6 +194,49 @@ static void * fmgr_listen(void * o) return (void *) 0; } +static void * fmgr_n_reader(void * o) +{ + struct shm_du_buff * sdb; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct n_flow * flow; + + while (true) { + int fd = flow_select(fmgr.set, &timeout); + if (fd == -ETIMEDOUT) + continue; + + if (fd < 0) { + LOG_ERR("Failed to get active fd."); + continue; + } + + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } + + pthread_mutex_lock(&fmgr.n_flows_lock); + flow = get_n_flow_by_fd(fd); + if (flow == NULL) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to retrieve flow."); + continue; + } + + if (frct_i_write_sdu(flow->cep_id, sdb)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to hand SDU to FRCT."); + continue; + } + + pthread_mutex_unlock(&fmgr.n_flows_lock); + } + + return (void *) 0; +} + int fmgr_init() { INIT_LIST_HEAD(&fmgr.n_1_flows); @@ -164,7 +245,12 @@ int fmgr_init() pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); pthread_mutex_init(&fmgr.n_flows_lock, NULL); - pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL); + fmgr.set = flow_set_create(); + if (fmgr.set == NULL) + return -1; + + pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL); + pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL); return 0; } @@ -173,9 +259,11 @@ int fmgr_fini() { struct list_head * pos = NULL; - pthread_cancel(fmgr.listen_thread); + pthread_cancel(fmgr.n_1_flow_acceptor); + pthread_cancel(fmgr.n_reader); - pthread_join(fmgr.listen_thread, NULL); + pthread_join(fmgr.n_1_flow_acceptor, NULL); + pthread_join(fmgr.n_reader, NULL); list_for_each(pos, &fmgr.n_1_flows) { struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); @@ -188,6 +276,8 @@ int fmgr_fini() pthread_mutex_destroy(&fmgr.n_1_flows_lock); pthread_mutex_destroy(&fmgr.n_flows_lock); + flow_set_destroy(fmgr.set); + return 0; } @@ -259,7 +349,7 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos) return -1; } - if (frct_dt_flow(fd, qos)) { + if (rmt_dt_flow(fd, qos)) { LOG_ERR("Failed to hand file descriptor to FRCT"); flow_dealloc(fd); free(ae_name); @@ -276,41 +366,13 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos) return 0; } -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_fd(int fd) -{ - struct list_head * pos = NULL; - - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->fd == fd) - return e; - } - - return NULL; -} - -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) -{ - struct list_head * pos = NULL; - - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->frct_i == frct_i) - return e; - } - - return NULL; -} - int fmgr_flow_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) { struct n_flow * flow; - struct frct_i * frct_i; + cep_id_t cep_id; uint32_t address = 0; buffer_t buf; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; @@ -343,8 +405,8 @@ int fmgr_flow_alloc(int fd, pthread_mutex_lock(&fmgr.n_flows_lock); - frct_i = frct_i_create(address, &buf, qos); - if (frct_i == NULL) { + cep_id = frct_i_create(address, &buf, qos); + if (cep_id == INVALID_CEP_ID) { free(buf.data); free(flow); pthread_mutex_unlock(&fmgr.n_flows_lock); @@ -354,7 +416,7 @@ int fmgr_flow_alloc(int fd, free(buf.data); flow->fd = fd; - flow->frct_i = frct_i; + flow->cep_id = cep_id; flow->qos = qos; INIT_LIST_HEAD(&flow->next); @@ -374,6 +436,8 @@ static int n_flow_dealloc(int fd) buffer_t buf; int ret; + flow_set_del(fmgr.set, fd); + flow = get_n_flow_by_fd(fd); if (flow == NULL) return -1; @@ -390,7 +454,7 @@ static int n_flow_dealloc(int fd) flow_alloc_msg__pack(&msg, buf.data); - ret = frct_i_destroy(flow->frct_i, &buf); + ret = frct_i_destroy(flow->cep_id, &buf); list_del(&flow->next); free(flow); @@ -432,13 +496,16 @@ int fmgr_flow_alloc_resp(int fd, int response) flow_alloc_msg__pack(&msg, buf.data); if (response < 0) { - frct_i_destroy(flow->frct_i, &buf); + frct_i_destroy(flow->cep_id, &buf); free(buf.data); list_del(&flow->next); free(flow); - } else if (frct_i_accept(flow->frct_i, &buf)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - return -1; + } else { + if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + return -1; + } + flow_set_add(fmgr.set, fd); } pthread_mutex_unlock(&fmgr.n_flows_lock); @@ -457,7 +524,8 @@ int fmgr_flow_dealloc(int fd) return ret; } -int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) +int fmgr_frct_post_buf(cep_id_t cep_id, + buffer_t * buf) { struct n_flow * flow; int ret = 0; @@ -484,7 +552,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return -1; } - flow->frct_i = frct_i; + flow->cep_id = cep_id; flow->qos = msg->qos_cube; fd = ipcp_flow_req_arr(getpid(), @@ -505,7 +573,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = get_n_flow_by_frct_i(frct_i); + flow = get_n_flow_by_cep_id(cep_id); if (flow == NULL) { pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); @@ -517,11 +585,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) if (msg->response < 0) { list_del(&flow->next); free(flow); + } else { + flow_set_add(fmgr.set, flow->fd); } break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = get_n_flow_by_frct_i(frct_i); + flow = get_n_flow_by_cep_id(cep_id); if (flow == NULL) { pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); @@ -529,6 +599,8 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return -1; } + flow_set_del(fmgr.set, flow->fd); + ret = flow_dealloc(flow->fd); break; default: @@ -543,3 +615,28 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return ret; } + +int fmgr_frct_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb) +{ + struct n_flow * flow; + + pthread_mutex_lock(&fmgr.n_flows_lock); + + flow = get_n_flow_by_cep_id(cep_id); + if (flow == NULL) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + LOG_ERR("Failed to find N flow."); + return -1; + } + + if (ipcp_flow_write(flow->fd, sdb)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + LOG_ERR("Failed to hand SDU to N flow."); + return -1; + } + + pthread_mutex_unlock(&fmgr.n_flows_lock); + + return 0; +} |