diff options
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 390 |
1 files changed, 152 insertions, 238 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8e416aa4..6684db7c 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -49,82 +49,31 @@ typedef FlowAllocMsg flow_alloc_msg_t; #define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */ -struct np1_flow { - int fd; - cep_id_t cep_id; - enum qos_cube qos; -}; - -struct nm1_flow { - int fd; - enum qos_cube qos; -}; - struct { - pthread_t nm1_flow_acceptor; - struct nm1_flow ** nm1_flows; + flow_set_t * nm1_set[QOS_CUBE_MAX]; + fqueue_t * nm1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t nm1_flows_lock; - flow_set_t * nm1_set; - pthread_t nm1_sdu_reader; - struct np1_flow ** np1_flows; - struct np1_flow ** np1_flows_cep; + flow_set_t * np1_set[QOS_CUBE_MAX]; + fqueue_t * np1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t np1_flows_lock; - flow_set_t * np1_set; + + cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; + int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; + + pthread_t nm1_flow_acceptor; + pthread_t nm1_sdu_reader; pthread_t np1_sdu_reader; /* FIXME: Replace with PFF */ int fd; } fmgr; -static int add_nm1_fd(int fd, - enum qos_cube qos) -{ - struct nm1_flow * tmp; - - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) - return -1; - - tmp->fd = fd; - tmp->qos = qos; - - pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); - fmgr.nm1_flows[fd] = tmp; - pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - - flow_set_add(fmgr.nm1_set, fd); - - /* FIXME: Temporary, until we have a PFF */ - fmgr.fd = fd; - - return 0; -} - -static int add_np1_fd(int fd, - cep_id_t cep_id, - enum qos_cube qos) -{ - struct np1_flow * flow; - - flow = malloc(sizeof(*flow)); - if (flow == NULL) - return -1; - - flow->cep_id = cep_id; - flow->qos = qos; - flow->fd = fd; - - fmgr.np1_flows[fd] = flow; - fmgr.np1_flows_cep[cep_id] = flow; - - return 0; -} - static void * fmgr_nm1_acceptor(void * o) { int fd; char * ae_name; + qoscube_t cube; qosspec_t qs; (void) o; @@ -150,13 +99,13 @@ static void * fmgr_nm1_acceptor(void * o) if (!(strcmp(ae_name, MGMT_AE) == 0 || strcmp(ae_name, DT_AE) == 0)) { if (flow_alloc_resp(fd, -1)) - LOG_ERR("Failed to reply to flow allocation."); + LOG_WARN("Failed to reply to flow allocation."); flow_dealloc(fd); continue; } if (flow_alloc_resp(fd, 0)) { - LOG_ERR("Failed to reply to flow allocation."); + LOG_WARN("Failed to reply to flow allocation."); flow_dealloc(fd); continue; } @@ -166,17 +115,19 @@ static void * fmgr_nm1_acceptor(void * o) if (strcmp(ae_name, MGMT_AE) == 0) { if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand fd to RIB."); + LOG_WARN("Failed to hand fd to RIB."); flow_dealloc(fd); continue; } } else { - /* FIXME: Pass correct QoS cube */ - if (add_nm1_fd(fd, QOS_CUBE_BE)) { - LOG_ERR("Failed to add fd to list."); + ipcp_flow_get_qoscube(fd, &cube); + if (flow_set_add(fmgr.nm1_set[cube], fd)) { + LOG_WARN("Failed to add fd."); flow_dealloc(fd); continue; } + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; } free(ae_name); @@ -189,44 +140,40 @@ static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct np1_flow * flow; int fd; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; + + int i = 0; + int ret; (void) o; while (true) { - int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); + /* FIXME: replace with scheduling policy call */ + i = (i + 1) % QOS_CUBE_MAX; + + ret = flow_event_wait(fmgr.np1_set[i], + fmgr.np1_fqs[i], + &timeout); if (ret == -ETIMEDOUT) continue; if (ret < 0) { - LOG_ERR("Event error: %d.", ret); + LOG_WARN("Event error: %d.", ret); continue; } - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); + LOG_WARN("Failed to read SDU from fd %d.", fd); continue; } pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to retrieve flow."); - continue; - } - - if (frct_i_write_sdu(flow->cep_id, sdb)) { + if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); - LOG_ERR("Failed to hand SDU to FRCT."); + LOG_WARN("Failed to hand SDU to FRCT."); continue; } @@ -244,14 +191,18 @@ void * fmgr_nm1_sdu_reader(void * o) struct shm_du_buff * sdb; struct pci * pci; int fd; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; + int i = 0; + int ret; (void) o; while (true) { - int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); + /* FIXME: replace with scheduling policy call */ + i = (i + 1) % QOS_CUBE_MAX; + + ret = flow_event_wait(fmgr.nm1_set[i], + fmgr.nm1_fqs[i], + &timeout); if (ret == -ETIMEDOUT) continue; @@ -260,7 +211,7 @@ void * fmgr_nm1_sdu_reader(void * o) continue; } - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { LOG_ERR("Failed to read SDU from fd %d.", fd); continue; @@ -320,52 +271,55 @@ void * fmgr_nm1_sdu_reader(void * o) return (void *) 0; } -int fmgr_init() +static void fmgr_destroy_flows(void) { int i; - fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS); - if (fmgr.nm1_flows == NULL) - return -1; - - fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS); - if (fmgr.np1_flows == NULL) { - free(fmgr.nm1_flows); - return -1; + for (i = 0; i < QOS_CUBE_MAX; ++i) { + flow_set_destroy(fmgr.nm1_set[i]); + flow_set_destroy(fmgr.np1_set[i]); + fqueue_destroy(fmgr.nm1_fqs[i]); + fqueue_destroy(fmgr.np1_fqs[i]); } +} - fmgr.np1_flows_cep = - malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS); - if (fmgr.np1_flows_cep == NULL) { - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; - } +int fmgr_init() +{ + int i; - for (i = 0; i < IRMD_MAX_FLOWS; i++) { - fmgr.nm1_flows[i] = NULL; - fmgr.np1_flows[i] = NULL; - fmgr.np1_flows_cep[i] = NULL; - } + for (i = 0; i < AP_MAX_FLOWS; ++i) + fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; + + for (i = 0; i < IPCPD_MAX_CONNS; ++i) + fmgr.np1_cep_id_to_fd[i] = -1; pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); - fmgr.np1_set = flow_set_create(); - if (fmgr.np1_set == NULL) { - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; - } + for (i = 0; i < QOS_CUBE_MAX; ++i) { + fmgr.np1_set[i] = flow_set_create(); + if (fmgr.np1_set == NULL) { + fmgr_destroy_flows(); + return -1; + } - fmgr.nm1_set = flow_set_create(); - if (fmgr.nm1_set == NULL) { - flow_set_destroy(fmgr.np1_set); - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); - return -1; + fmgr.np1_fqs[i] = fqueue_create(); + if (fmgr.np1_fqs[i] == NULL) { + fmgr_destroy_flows(); + return -1; + } + + fmgr.nm1_set[i] = flow_set_create(); + if (fmgr.nm1_set == NULL) { + fmgr_destroy_flows(); + return -1; + } + + fmgr.nm1_fqs[i] = fqueue_create(); + if (fmgr.nm1_fqs[i] == NULL) { + fmgr_destroy_flows(); + return -1; + } } pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); @@ -378,6 +332,7 @@ int fmgr_init() int fmgr_fini() { int i; + int j; pthread_cancel(fmgr.nm1_flow_acceptor); pthread_cancel(fmgr.np1_sdu_reader); @@ -387,29 +342,25 @@ int fmgr_fini() pthread_join(fmgr.np1_sdu_reader, NULL); pthread_join(fmgr.nm1_sdu_reader, NULL); - for (i = 0; i < IRMD_MAX_FLOWS; i++) { - if (fmgr.nm1_flows[i] == NULL) - continue; - flow_dealloc(fmgr.nm1_flows[i]->fd); - free(fmgr.nm1_flows[i]); - } + for (i = 0; i < AP_MAX_FLOWS; ++i) + for (j = 0; j < QOS_CUBE_MAX; ++j) + if (flow_set_has(fmgr.nm1_set[j], i)) { + flow_dealloc(i); + flow_set_del(fmgr.nm1_set[j], i); + } pthread_rwlock_destroy(&fmgr.nm1_flows_lock); pthread_rwlock_destroy(&fmgr.np1_flows_lock); - flow_set_destroy(fmgr.nm1_set); - flow_set_destroy(fmgr.np1_set); - free(fmgr.np1_flows_cep); - free(fmgr.np1_flows); - free(fmgr.nm1_flows); + fmgr_destroy_flows(); return 0; } -int fmgr_np1_alloc(int fd, - char * dst_ap_name, - char * src_ae_name, - enum qos_cube qos) +int fmgr_np1_alloc(int fd, + char * dst_ap_name, + char * src_ae_name, + qoscube_t qos) { cep_id_t cep_id; buffer_t buf; @@ -478,10 +429,8 @@ int fmgr_np1_alloc(int fd, free(ro_data); - if (add_np1_fd(fd, cep_id, qos)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } + fmgr.np1_fd_to_cep_id[fd] = cep_id; + fmgr.np1_cep_id_to_fd[cep_id] = fd; pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -491,16 +440,13 @@ int fmgr_np1_alloc(int fd, /* Call under np1_flows lock */ static int np1_flow_dealloc(int fd) { - struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; + qoscube_t cube; - flow_set_del(fmgr.np1_set, fd); - - flow = fmgr.np1_flows[fd]; - if (flow == NULL) - return -1; + ipcp_flow_get_qoscube(fd, &cube); + flow_set_del(fmgr.np1_set[cube], fd); msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; @@ -510,16 +456,15 @@ static int np1_flow_dealloc(int fd) buf.data = malloc(buf.len); if (buf.data == NULL) - return -1; + return -ENOMEM; flow_alloc_msg__pack(&msg, buf.data); - ret = frct_i_destroy(flow->cep_id, &buf); + ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - fmgr.np1_flows[fd] = NULL; - fmgr.np1_flows_cep[flow->cep_id] = NULL; + fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; + fmgr.np1_fd_to_cep_id[fd] = -1; - free(flow); free(buf.data); return ret; @@ -527,48 +472,39 @@ static int np1_flow_dealloc(int fd) int fmgr_np1_alloc_resp(int fd, int response) { - struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } - msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; msg.response = response; msg.has_response = true; buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); + if (buf.len == 0) return -1; - } buf.data = malloc(buf.len); - if (buf.data == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - return -1; - } + if (buf.data == NULL) + return -ENOMEM; flow_alloc_msg__pack(&msg, buf.data); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + if (response < 0) { - frct_i_destroy(flow->cep_id, &buf); + frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); free(buf.data); - fmgr.np1_flows[fd] = NULL; - fmgr.np1_flows_cep[flow->cep_id] = NULL; - free(flow); + fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] + = INVALID_CEP_ID; + fmgr.np1_fd_to_cep_id[fd] = -1; } else { - if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { + qoscube_t cube; + ipcp_flow_get_qoscube(fd, &cube); + if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } - flow_set_add(fmgr.np1_set, fd); + flow_set_add(fmgr.np1_set[cube], fd); } pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -581,27 +517,25 @@ int fmgr_np1_dealloc(int fd) int ret; pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + ret = np1_flow_dealloc(fd); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return ret; } -int fmgr_np1_post_buf(cep_id_t cep_id, - buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf) { - struct np1_flow * flow; int ret = 0; int fd; flow_alloc_msg_t * msg; - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + qoscube_t cube; /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } @@ -612,51 +546,41 @@ int fmgr_np1_post_buf(cep_id_t cep_id, msg->dst_name, msg->src_ae_name); if (fd < 0) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to get fd for flow."); return -1; } - if (add_np1_fd(fd, cep_id, msg->qos_cube)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to add np1 flow."); - return -1; - } + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + + fmgr.np1_fd_to_cep_id[fd] = cep_id; + fmgr.np1_cep_id_to_fd[cep_id] = fd; + + pthread_rwlock_unlock(&fmgr.np1_flows_lock); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("No such flow in flow manager."); - return -1; - } + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - ret = ipcp_flow_alloc_reply(flow->fd, msg->response); + fd = fmgr.np1_cep_id_to_fd[cep_id]; + ret = ipcp_flow_alloc_reply(fd, msg->response); if (msg->response < 0) { - fmgr.np1_flows[flow->fd] = NULL; - fmgr.np1_flows_cep[cep_id] = NULL; - free(flow); + fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; + fmgr.np1_cep_id_to_fd[cep_id] = -1; } else { - flow_set_add(fmgr.np1_set, flow->fd); + ipcp_flow_get_qoscube(fd, &cube); + flow_set_add(fmgr.np1_set[cube], + fmgr.np1_cep_id_to_fd[cep_id]); } + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("No such flow in flow manager."); - return -1; - } - - flow_set_del(fmgr.np1_set, flow->fd); - - ret = flow_dealloc(flow->fd); + fd = fmgr.np1_cep_id_to_fd[cep_id]; + ipcp_flow_get_qoscube(fd, &cube); + flow_set_del(fmgr.np1_set[cube], fd); + ret = flow_dealloc(fd); break; default: LOG_ERR("Got an unknown flow allocation message."); @@ -674,18 +598,12 @@ int fmgr_np1_post_buf(cep_id_t cep_id, int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb) { - struct np1_flow * flow; + int fd; pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows_cep[cep_id]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - LOG_ERR("Failed to find N flow."); - return -1; - } - - if (ipcp_flow_write(flow->fd, sdb)) { + fd = fmgr.np1_cep_id_to_fd[cep_id]; + if (ipcp_flow_write(fd, sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to hand SDU to N flow."); return -1; @@ -704,19 +622,19 @@ int fmgr_nm1_mgmt_flow(char * dst_name) /* FIXME: Request retransmission. */ fd = flow_alloc(dst_name, MGMT_AE, NULL); if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); + LOG_ERR("Failed to allocate flow to %s.", dst_name); return -1; } result = flow_alloc_res(fd); if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", + LOG_ERR("Result of flow allocation to %s is %d.", dst_name, result); return -1; } if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand file descriptor to RIB manager"); + LOG_ERR("Failed to hand file descriptor to RIB manager."); flow_dealloc(fd); return -1; } @@ -724,8 +642,7 @@ int fmgr_nm1_mgmt_flow(char * dst_name) return 0; } -int fmgr_nm1_dt_flow(char * dst_name, - enum qos_cube qos) +int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos) { int fd; int result; @@ -733,28 +650,25 @@ int fmgr_nm1_dt_flow(char * dst_name, /* FIXME: Map qos cube on correct QoS. */ fd = flow_alloc(dst_name, DT_AE, NULL); if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); + LOG_ERR("Failed to allocate flow to %s.", dst_name); return -1; } result = flow_alloc_res(fd); if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); + LOG_ERR("Allocate flow to %s result %d.", dst_name, result); return -1; } - if (add_nm1_fd(fd, qos)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); - return -1; - } + flow_set_add(fmgr.nm1_set[qos], fd); + + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; return 0; } -int fmgr_nm1_write_sdu(struct pci * pci, - struct shm_du_buff * sdb) +int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb) { if (pci == NULL || sdb == NULL) return -1; |