summaryrefslogtreecommitdiff
path: root/src/tools/oping
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-19 22:25:46 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-21 14:17:51 +0200
commitf516b51169020ea1957010fbd1005d746f01b1d9 (patch)
tree03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/tools/oping
parentc79ab46894053312f80390bf13a52c238a7d4704 (diff)
downloadouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz
ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API.
Diffstat (limited to 'src/tools/oping')
-rw-r--r--src/tools/oping/oping.c4
-rw-r--r--src/tools/oping/oping_client.c25
-rw-r--r--src/tools/oping/oping_server.c21
3 files changed, 33 insertions, 17 deletions
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c
index 7d41b497..0ca40326 100644
--- a/src/tools/oping/oping.c
+++ b/src/tools/oping/oping.c
@@ -23,7 +23,7 @@
#define _POSIX_C_SOURCE 199506L
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/dev.h>
#include <stdio.h>
@@ -53,6 +53,8 @@ struct c {
float rtt_avg;
float rtt_m2;
+ flow_set_t * flows;
+
/* needs locking */
struct timespec * times;
pthread_mutex_t lock;
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index 4742d0de..40f75785 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -55,20 +55,21 @@ void * reader(void * o)
struct timespec timeout = {2, 0};
struct timespec now = {0, 0};
- struct oping_msg * msg;
char buf[OPING_BUF_SIZE];
+ struct oping_msg * msg = (struct oping_msg *) buf;
int fd = 0;
int msg_len = 0;
float ms = 0;
float d = 0;
-
- msg = (struct oping_msg *) buf;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
/* FIXME: use flow timeout option once we have it */
- while(client.rcvd != client.count &&
- (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) {
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK);
- while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {
+ while (client.rcvd != client.count
+ && flow_event_wait(client.flows, fq, &timeout) != -ETIMEDOUT) {
+ while ((fd = fqueue_next(fq)) >= 0) {
+ msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
if (msg_len < 0)
continue;
@@ -165,12 +166,20 @@ int client_main()
struct timespec tic;
struct timespec toc;
- int fd = flow_alloc(client.s_apn, NULL, NULL);
+ int fd;
+
+ client.flows = flow_set_create();
+ if (client.flows == NULL)
+ return 0;
+
+ fd = flow_alloc(client.s_apn, NULL, NULL);
if (fd < 0) {
printf("Failed to allocate flow.\n");
return -1;
}
+ flow_set_add(client.flows, fd);
+
if (flow_alloc_res(fd)) {
printf("Flow allocation refused.\n");
flow_dealloc(fd);
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index 845f0cbd..8a5a3512 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -69,16 +69,23 @@ void * server_thread(void *o)
struct oping_msg * msg = (struct oping_msg *) buf;
struct timespec now = {0, 0};
struct timespec timeout = {0, 100 * MILLION};
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(server.flows, &timeout);
- if (fd == -ETIMEDOUT)
- continue;
- if (fd < 0) {
- printf("Failed to get active fd.\n");
+ int ret = flow_event_wait(server.flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
+
+ if (ret < 0) {
+ printf("Event error.\n");
+ break;
}
- while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {
+
+ while ((fd = fqueue_next(fq)) >= 0) {
+ msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
if (msg_len < 0)
continue;
@@ -160,8 +167,6 @@ int server_main()
if (server.flows == NULL)
return 0;
- flow_set_zero(server.flows);
-
pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);
pthread_create(&server.accept_pt, NULL, accept_thread, NULL);
pthread_create(&server.server_pt, NULL, server_thread, NULL);