summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c193
1 files changed, 66 insertions, 127 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index ba36812f..d055b311 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -42,6 +42,7 @@
#include "neighbors.h"
#include "gam.h"
#include "routing.h"
+#include "sdu_sched.h"
#include <stdlib.h>
#include <stdbool.h>
@@ -55,18 +56,15 @@ typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */
struct {
- flow_set_t * np1_set[QOS_CUBE_MAX];
- fqueue_t * np1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t np1_flows_lock;
-
cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
- pthread_t np1_sdu_reader;
+ flow_set_t * np1_set[QOS_CUBE_MAX];
+ struct sdu_sched * np1_sdu_sched;
flow_set_t * nm1_set[QOS_CUBE_MAX];
- fqueue_t * nm1_fqs[QOS_CUBE_MAX];
- pthread_t nm1_sdu_reader;
+ struct sdu_sched * nm1_sdu_sched;
struct pff * pff[QOS_CUBE_MAX];
struct routing_i * routing[QOS_CUBE_MAX];
@@ -102,125 +100,70 @@ static int fmgr_neighbor_event(enum nb_event event,
return 0;
}
-static void * fmgr_np1_sdu_reader(void * o)
+static int np1_sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
{
- struct shm_du_buff * sdb;
- struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- int fd;
- int i = 0;
- int ret;
-
- (void) o;
-
- while (true) {
- /* FIXME: replace with scheduling policy call */
- i = (i + 1) % QOS_CUBE_MAX;
-
- ret = flow_event_wait(fmgr.np1_set[i],
- fmgr.np1_fqs[i],
- &timeout);
- if (ret == -ETIMEDOUT)
- continue;
-
- if (ret < 0) {
- log_warn("Event error: %d.", ret);
- continue;
- }
-
- while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) {
- if (ipcp_flow_read(fd, &sdb)) {
- log_warn("Failed to read SDU from fd %d.", fd);
- continue;
- }
+ (void) qc;
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
-
- if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- log_warn("Failed to hand SDU to FRCT.");
- continue;
- }
+ pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- }
+ if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ log_warn("Failed to hand SDU to FRCT.");
+ return -1;
}
- return (void *) 0;
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+
+ return 0;
}
-void * fmgr_nm1_sdu_reader(void * o)
+static int nm1_sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
{
- struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- struct shm_du_buff * sdb;
- struct pci pci;
- int fd;
- int i = 0;
- int ret;
-
- (void) o;
+ struct pci pci;
memset(&pci, 0, sizeof(pci));
- while (true) {
- /* FIXME: replace with scheduling policy call */
- i = (i + 1) % QOS_CUBE_MAX;
+ shm_pci_des(sdb, &pci);
+
+ if (pci.dst_addr != ipcpi.dt_addr) {
+ if (pci.ttl == 0) {
+ log_dbg("TTL was zero.");
+ ipcp_flow_del(sdb);
+ return 0;
+ }
+
+ pff_lock(fmgr.pff[qc]);
+
+ fd = pff_nhop(fmgr.pff[qc], pci.dst_addr);
+ if (fd < 0) {
+ pff_unlock(fmgr.pff[qc]);
+ log_err("No next hop for %" PRIu64, pci.dst_addr);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
- ret = flow_event_wait(fmgr.nm1_set[i],
- fmgr.nm1_fqs[i],
- &timeout);
- if (ret == -ETIMEDOUT)
- continue;
+ pff_unlock(fmgr.pff[qc]);
- if (ret < 0) {
- log_err("Event error: %d.", ret);
- continue;
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
+ ipcp_flow_del(sdb);
+ return -1;
}
+ } else {
+ shm_pci_shrink(sdb);
- while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) {
- if (ipcp_flow_read(fd, &sdb)) {
- log_err("Failed to read SDU from fd %d.", fd);
- continue;
- }
-
- shm_pci_des(sdb, &pci);
-
- if (pci.dst_addr != ipcpi.dt_addr) {
- if (pci.ttl == 0) {
- log_dbg("TTL was zero.");
- ipcp_flow_del(sdb);
- continue;
- }
-
- pff_lock(fmgr.pff[i]);
- fd = pff_nhop(fmgr.pff[i], pci.dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[i]);
- log_err("No next hop for %" PRIu64,
- pci.dst_addr);
- ipcp_flow_del(sdb);
- continue;
- }
- pff_unlock(fmgr.pff[i]);
-
- if (ipcp_flow_write(fd, sdb)) {
- log_err("Failed to write SDU to fd %d.",
- fd);
- ipcp_flow_del(sdb);
- continue;
- }
- } else {
- shm_pci_shrink(sdb);
-
- if (frct_nm1_post_sdu(&pci, sdb)) {
- log_err("Failed to hand PDU to FRCT.");
- continue;
- }
- }
+ if (frct_nm1_post_sdu(&pci, sdb)) {
+ log_err("Failed to hand PDU to FRCT.");
+ return -1;
}
}
- return (void *) 0;
+ return 0;
}
static void fmgr_destroy_flows(void)
@@ -230,8 +173,6 @@ static void fmgr_destroy_flows(void)
for (i = 0; i < QOS_CUBE_MAX; ++i) {
flow_set_destroy(fmgr.nm1_set[i]);
flow_set_destroy(fmgr.np1_set[i]);
- fqueue_destroy(fmgr.nm1_fqs[i]);
- fqueue_destroy(fmgr.np1_fqs[i]);
}
}
@@ -270,23 +211,11 @@ int fmgr_init(void)
return -1;
}
- fmgr.np1_fqs[i] = fqueue_create();
- if (fmgr.np1_fqs[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
-
fmgr.nm1_set[i] = flow_set_create();
if (fmgr.nm1_set[i] == NULL) {
fmgr_destroy_flows();
return -1;
}
-
- fmgr.nm1_fqs[i] = fqueue_create();
- if (fmgr.nm1_fqs[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
}
if (shm_pci_init()) {
@@ -410,19 +339,29 @@ int fmgr_start(void)
return -1;
}
- pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);
- pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL);
+ fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler);
+ if (fmgr.nm1_sdu_sched == NULL) {
+ log_err("Failed to create N-1 SDU scheduler.");
+ gam_destroy(fmgr.gam);
+ return -1;
+ }
+
+ fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler);
+ if (fmgr.np1_sdu_sched == NULL) {
+ log_err("Failed to create N+1 SDU scheduler.");
+ sdu_sched_destroy(fmgr.nm1_sdu_sched);
+ gam_destroy(fmgr.gam);
+ return -1;
+ }
return 0;
}
void fmgr_stop(void)
{
- pthread_cancel(fmgr.np1_sdu_reader);
- pthread_cancel(fmgr.nm1_sdu_reader);
+ sdu_sched_destroy(fmgr.np1_sdu_sched);
- pthread_join(fmgr.np1_sdu_reader, NULL);
- pthread_join(fmgr.nm1_sdu_reader, NULL);
+ sdu_sched_destroy(fmgr.nm1_sdu_sched);
gam_destroy(fmgr.gam);
}