From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@intec.ugent.be>
Date: Wed, 19 Oct 2016 22:25:46 +0200
Subject: lib: Demultiplex the fast path

The fast path will now use an incoming ring buffer per flow per
process. This necessitated the development of a new method for the
asynchronous io call, which is now based on an event queue system for
scalability (fqueue). The ipcpd's and tools have been updated to this
API.
---
 src/tools/oping/oping.c        |  4 +++-
 src/tools/oping/oping_client.c | 25 +++++++++++++++++--------
 src/tools/oping/oping_server.c | 21 +++++++++++++--------
 3 files changed, 33 insertions(+), 17 deletions(-)

(limited to 'src/tools')

diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c
index 7d41b497..0ca40326 100644
--- a/src/tools/oping/oping.c
+++ b/src/tools/oping/oping.c
@@ -23,7 +23,7 @@
 
 #define _POSIX_C_SOURCE 199506L
 
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
 #include <ouroboros/dev.h>
 
 #include <stdio.h>
@@ -53,6 +53,8 @@ struct c {
         float  rtt_avg;
         float  rtt_m2;
 
+        flow_set_t * flows;
+
         /* needs locking */
         struct timespec * times;
         pthread_mutex_t lock;
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index 4742d0de..40f75785 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -55,20 +55,21 @@ void * reader(void * o)
         struct timespec timeout = {2, 0};
         struct timespec now = {0, 0};
 
-        struct oping_msg * msg;
         char buf[OPING_BUF_SIZE];
+        struct oping_msg * msg = (struct oping_msg *) buf;
         int fd = 0;
         int msg_len = 0;
         float ms = 0;
         float d = 0;
-
-        msg = (struct oping_msg *) buf;
+        fqueue_t * fq = fqueue_create();
+        if (fq == NULL)
+                return (void *) 1;
 
         /* FIXME: use flow timeout option once we have it */
-        while(client.rcvd != client.count &&
-              (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) {
-                flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK);
-                while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {
+        while (client.rcvd != client.count
+               && flow_event_wait(client.flows, fq, &timeout) != -ETIMEDOUT) {
+                while ((fd = fqueue_next(fq)) >= 0) {
+                        msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
                         if (msg_len < 0)
                                 continue;
 
@@ -165,12 +166,20 @@ int client_main()
         struct timespec tic;
         struct timespec toc;
 
-        int fd = flow_alloc(client.s_apn, NULL, NULL);
+        int fd;
+
+        client.flows = flow_set_create();
+        if (client.flows == NULL)
+                return 0;
+
+        fd = flow_alloc(client.s_apn, NULL, NULL);
         if (fd < 0) {
                 printf("Failed to allocate flow.\n");
                 return -1;
         }
 
+        flow_set_add(client.flows, fd);
+
         if (flow_alloc_res(fd)) {
                 printf("Flow allocation refused.\n");
                 flow_dealloc(fd);
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index 845f0cbd..8a5a3512 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -69,16 +69,23 @@ void * server_thread(void *o)
         struct oping_msg * msg = (struct oping_msg *) buf;
         struct timespec now = {0, 0};
         struct timespec timeout = {0, 100 * MILLION};
+        int fd;
+        fqueue_t * fq = fqueue_create();
+        if (fq == NULL)
+                return (void *) 1;
 
         while (true) {
-                int fd = flow_select(server.flows, &timeout);
-                if (fd == -ETIMEDOUT)
-                        continue;
-                if (fd < 0) {
-                        printf("Failed to get active fd.\n");
+                int ret = flow_event_wait(server.flows, fq, &timeout);
+                if (ret == -ETIMEDOUT)
                         continue;
+
+                if (ret < 0) {
+                        printf("Event error.\n");
+                        break;
                 }
-                while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {
+
+                while ((fd = fqueue_next(fq)) >= 0) {
+                        msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
                         if (msg_len < 0)
                                 continue;
 
@@ -160,8 +167,6 @@ int server_main()
         if (server.flows == NULL)
                 return 0;
 
-        flow_set_zero(server.flows);
-
         pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);
         pthread_create(&server.accept_pt, NULL, accept_thread, NULL);
         pthread_create(&server.server_pt, NULL, server_thread, NULL);
-- 
cgit v1.2.3