diff options
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 193 |
1 files changed, 66 insertions, 127 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index ba36812f..d055b311 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -42,6 +42,7 @@ #include "neighbors.h" #include "gam.h" #include "routing.h" +#include "sdu_sched.h" #include <stdlib.h> #include <stdbool.h> @@ -55,18 +56,15 @@ typedef FlowAllocMsg flow_alloc_msg_t; #define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ struct { - flow_set_t * np1_set[QOS_CUBE_MAX]; - fqueue_t * np1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t np1_flows_lock; - cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - pthread_t np1_sdu_reader; + flow_set_t * np1_set[QOS_CUBE_MAX]; + struct sdu_sched * np1_sdu_sched; flow_set_t * nm1_set[QOS_CUBE_MAX]; - fqueue_t * nm1_fqs[QOS_CUBE_MAX]; - pthread_t nm1_sdu_reader; + struct sdu_sched * nm1_sdu_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; @@ -102,125 +100,70 @@ static int fmgr_neighbor_event(enum nb_event event, return 0; } -static void * fmgr_np1_sdu_reader(void * o) +static int np1_sdu_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { - struct shm_du_buff * sdb; - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - int fd; - int i = 0; - int ret; - - (void) o; - - while (true) { - /* FIXME: replace with scheduling policy call */ - i = (i + 1) % QOS_CUBE_MAX; - - ret = flow_event_wait(fmgr.np1_set[i], - fmgr.np1_fqs[i], - &timeout); - if (ret == -ETIMEDOUT) - continue; - - if (ret < 0) { - log_warn("Event error: %d.", ret); - continue; - } - - while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) { - if (ipcp_flow_read(fd, &sdb)) { - log_warn("Failed to read SDU from fd %d.", fd); - continue; - } + (void) qc; - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - - if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - log_warn("Failed to hand SDU to FRCT."); - continue; - } + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - } + if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + log_warn("Failed to hand SDU to FRCT."); + return -1; } - return (void *) 0; + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + + return 0; } -void * fmgr_nm1_sdu_reader(void * o) +static int nm1_sdu_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct shm_du_buff * sdb; - struct pci pci; - int fd; - int i = 0; - int ret; - - (void) o; + struct pci pci; memset(&pci, 0, sizeof(pci)); - while (true) { - /* FIXME: replace with scheduling policy call */ - i = (i + 1) % QOS_CUBE_MAX; + shm_pci_des(sdb, &pci); + + if (pci.dst_addr != ipcpi.dt_addr) { + if (pci.ttl == 0) { + log_dbg("TTL was zero."); + ipcp_flow_del(sdb); + return 0; + } + + pff_lock(fmgr.pff[qc]); + + fd = pff_nhop(fmgr.pff[qc], pci.dst_addr); + if (fd < 0) { + pff_unlock(fmgr.pff[qc]); + log_err("No next hop for %" PRIu64, pci.dst_addr); + ipcp_flow_del(sdb); + return -1; + } - ret = flow_event_wait(fmgr.nm1_set[i], - fmgr.nm1_fqs[i], - &timeout); - if (ret == -ETIMEDOUT) - continue; + pff_unlock(fmgr.pff[qc]); - if (ret < 0) { - log_err("Event error: %d.", ret); - continue; + if (ipcp_flow_write(fd, sdb)) { + log_err("Failed to write SDU to fd %d.", fd); + ipcp_flow_del(sdb); + return -1; } + } else { + shm_pci_shrink(sdb); - while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) { - if (ipcp_flow_read(fd, &sdb)) { - log_err("Failed to read SDU from fd %d.", fd); - continue; - } - - shm_pci_des(sdb, &pci); - - if (pci.dst_addr != ipcpi.dt_addr) { - if (pci.ttl == 0) { - log_dbg("TTL was zero."); - ipcp_flow_del(sdb); - continue; - } - - pff_lock(fmgr.pff[i]); - fd = pff_nhop(fmgr.pff[i], pci.dst_addr); - if (fd < 0) { - pff_unlock(fmgr.pff[i]); - log_err("No next hop for %" PRIu64, - pci.dst_addr); - ipcp_flow_del(sdb); - continue; - } - pff_unlock(fmgr.pff[i]); - - if (ipcp_flow_write(fd, sdb)) { - log_err("Failed to write SDU to fd %d.", - fd); - ipcp_flow_del(sdb); - continue; - } - } else { - shm_pci_shrink(sdb); - - if (frct_nm1_post_sdu(&pci, sdb)) { - log_err("Failed to hand PDU to FRCT."); - continue; - } - } + if (frct_nm1_post_sdu(&pci, sdb)) { + log_err("Failed to hand PDU to FRCT."); + return -1; } } - return (void *) 0; + return 0; } static void fmgr_destroy_flows(void) @@ -230,8 +173,6 @@ static void fmgr_destroy_flows(void) for (i = 0; i < QOS_CUBE_MAX; ++i) { flow_set_destroy(fmgr.nm1_set[i]); flow_set_destroy(fmgr.np1_set[i]); - fqueue_destroy(fmgr.nm1_fqs[i]); - fqueue_destroy(fmgr.np1_fqs[i]); } } @@ -270,23 +211,11 @@ int fmgr_init(void) return -1; } - fmgr.np1_fqs[i] = fqueue_create(); - if (fmgr.np1_fqs[i] == NULL) { - fmgr_destroy_flows(); - return -1; - } - fmgr.nm1_set[i] = flow_set_create(); if (fmgr.nm1_set[i] == NULL) { fmgr_destroy_flows(); return -1; } - - fmgr.nm1_fqs[i] = fqueue_create(); - if (fmgr.nm1_fqs[i] == NULL) { - fmgr_destroy_flows(); - return -1; - } } if (shm_pci_init()) { @@ -410,19 +339,29 @@ int fmgr_start(void) return -1; } - pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); - pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL); + fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler); + if (fmgr.nm1_sdu_sched == NULL) { + log_err("Failed to create N-1 SDU scheduler."); + gam_destroy(fmgr.gam); + return -1; + } + + fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler); + if (fmgr.np1_sdu_sched == NULL) { + log_err("Failed to create N+1 SDU scheduler."); + sdu_sched_destroy(fmgr.nm1_sdu_sched); + gam_destroy(fmgr.gam); + return -1; + } return 0; } void fmgr_stop(void) { - pthread_cancel(fmgr.np1_sdu_reader); - pthread_cancel(fmgr.nm1_sdu_reader); + sdu_sched_destroy(fmgr.np1_sdu_sched); - pthread_join(fmgr.np1_sdu_reader, NULL); - pthread_join(fmgr.nm1_sdu_reader, NULL); + sdu_sched_destroy(fmgr.nm1_sdu_sched); gam_destroy(fmgr.gam); } |