summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-21 12:44:00 +0000
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-21 12:44:00 +0000
commit482c44232d4deda3f89a7d85fbad99c1c64e80ec (patch)
treef3fb790d93da3cbe198b0f0c58d9c7513b0eff23 /src/ipcpd/normal/fmgr.c
parent680017a72c7a15b90f223bafcea80fd3e264e984 (diff)
parent02976060919566d1a217b818ca8f33297700d56d (diff)
downloadouroboros-482c44232d4deda3f89a7d85fbad99c1c64e80ec.tar.gz
ouroboros-482c44232d4deda3f89a7d85fbad99c1c64e80ec.zip
Merged in dstaesse/ouroboros/be-demux (pull request #267)
lib: Demultiplex the fast path
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c143
1 files changed, 78 insertions, 65 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index cb25072e..3da392c5 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -27,7 +27,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/errno.h>
#include <stdlib.h>
@@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o)
struct shm_du_buff * sdb;
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct np1_flow * flow;
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(fmgr.np1_set, &timeout);
- if (fd == -ETIMEDOUT)
+ int ret = flow_event_wait(fmgr.np1_set, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- if (fd < 0) {
- LOG_ERR("Failed to get active fd.");
+ if (ret < 0) {
+ LOG_ERR("Event error: %d.", ret);
continue;
}
- if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
- continue;
- }
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to retrieve flow.");
- continue;
- }
+ pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
+
+ flow = fmgr.np1_flows[fd];
+ if (flow == NULL) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to retrieve flow.");
+ continue;
+ }
+
+ if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to hand SDU to FRCT.");
+ continue;
+ }
- if (frct_i_write_sdu(flow->cep_id, sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to hand SDU to FRCT.");
- continue;
- }
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ }
}
return (void *) 0;
@@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o)
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct shm_du_buff * sdb;
struct pci * pci;
-
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(fmgr.nm1_set, &timeout);
- if (fd == -ETIMEDOUT)
- continue;
-
- if (fd < 0) {
- LOG_ERR("Failed to get active fd.");
+ int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- }
- if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
+ if (ret < 0) {
+ LOG_ERR("Event error: %d.", ret);
continue;
}
- pci = shm_pci_des(sdb);
- if (pci == NULL) {
- LOG_ERR("Failed to get PCI.");
- ipcp_flow_del(sdb);
- continue;
- }
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
- if (pci->dst_addr != ribmgr_address()) {
- LOG_DBG("PDU needs to be forwarded.");
+ pci = shm_pci_des(sdb);
+ if (pci == NULL) {
+ LOG_ERR("Failed to get PCI.");
+ ipcp_flow_del(sdb);
+ continue;
+ }
- if (pci->ttl == 0) {
- LOG_DBG("TTL was zero.");
+ if (pci->dst_addr != ribmgr_address()) {
+ LOG_DBG("PDU needs to be forwarded.");
+
+ if (pci->ttl == 0) {
+ LOG_DBG("TTL was zero.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+
+ if (shm_pci_dec_ttl(sdb)) {
+ LOG_ERR("Failed to decrease TTL.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+ /*
+ * FIXME: Dropping for now, since
+ * we don't have a PFF yet
+ */
ipcp_flow_del(sdb);
free(pci);
continue;
}
- if (shm_pci_dec_ttl(sdb)) {
- LOG_ERR("Failed to decrease TTL.");
+ if (shm_pci_shrink(sdb)) {
+ LOG_ERR("Failed to shrink PDU.");
ipcp_flow_del(sdb);
free(pci);
continue;
}
- /*
- * FIXME: Dropping for now, since
- * we don't have a PFF yet
- */
- ipcp_flow_del(sdb);
- free(pci);
- continue;
- }
-
- if (shm_pci_shrink(sdb)) {
- LOG_ERR("Failed to shrink PDU.");
- ipcp_flow_del(sdb);
- free(pci);
- continue;
- }
- if (frct_nm1_post_sdu(pci, sdb)) {
- LOG_ERR("Failed to hand PDU to FRCT.");
- ipcp_flow_del(sdb);
- free(pci);
- continue;
+ if (frct_nm1_post_sdu(pci, sdb)) {
+ LOG_ERR("Failed to hand PDU to FRCT.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
}
}