From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 19 Oct 2016 22:25:46 +0200 Subject: lib: Demultiplex the fast path The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API. --- src/tools/oping/oping_server.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'src/tools/oping/oping_server.c') diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 845f0cbd..8a5a3512 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -69,16 +69,23 @@ void * server_thread(void *o) struct oping_msg * msg = (struct oping_msg *) buf; struct timespec now = {0, 0}; struct timespec timeout = {0, 100 * MILLION}; + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(server.flows, &timeout); - if (fd == -ETIMEDOUT) - continue; - if (fd < 0) { - printf("Failed to get active fd.\n"); + int ret = flow_event_wait(server.flows, fq, &timeout); + if (ret == -ETIMEDOUT) continue; + + if (ret < 0) { + printf("Event error.\n"); + break; } - while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { + + while ((fd = fqueue_next(fq)) >= 0) { + msg_len = flow_read(fd, buf, OPING_BUF_SIZE); if (msg_len < 0) continue; @@ -160,8 +167,6 @@ int server_main() if (server.flows == NULL) return 0; - flow_set_zero(server.flows); - pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL); pthread_create(&server.accept_pt, NULL, accept_thread, NULL); pthread_create(&server.server_pt, NULL, server_thread, NULL); -- cgit v1.2.3