From 047052481dc0b47d740b778be239884f2811778c Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@intec.ugent.be>
Date: Mon, 12 Dec 2016 13:00:43 +0100
Subject: lib: Simplify shm_rbuff_read_b

---
 src/lib/shm_rbuff.c | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

(limited to 'src')

diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c
index 29a62f62..cc64fa09 100644
--- a/src/lib/shm_rbuff.c
+++ b/src/lib/shm_rbuff.c
@@ -287,7 +287,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,
                          const struct timespec * timeout)
 {
         struct timespec abstime;
-        int ret = 0;
         ssize_t idx = -1;
 
         assert(rb);
@@ -299,7 +298,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,
                 pthread_mutex_consistent(rb->lock);
 #endif
         if (timeout != NULL) {
-                idx = -ETIMEDOUT;
                 clock_gettime(PTHREAD_COND_CLOCK, &abstime);
                 ts_add(&abstime, timeout, &abstime);
         }
@@ -307,21 +305,17 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,
         pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
                              (void *) rb->lock);
 
-        while (shm_rbuff_empty(rb) && (ret != ETIMEDOUT)) {
+        while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) {
                 if (timeout != NULL)
-                        ret = pthread_cond_timedwait(rb->add,
-                                                     rb->lock,
-                                                     &abstime);
+                        idx = -pthread_cond_timedwait(rb->add,
+                                                      rb->lock,
+                                                      &abstime);
                 else
-                        ret = pthread_cond_wait(rb->add, rb->lock);
+                        idx = -pthread_cond_wait(rb->add, rb->lock);
 #ifndef __APPLE__
-                if (ret == EOWNERDEAD)
+                if (idx == -EOWNERDEAD)
                         pthread_mutex_consistent(rb->lock);
 #endif
-                if (ret == ETIMEDOUT) {
-                        idx = -ETIMEDOUT;
-                        break;
-                }
         }
 
         if (idx != -ETIMEDOUT) {
-- 
cgit v1.2.3


From b731adbf7b6fa16490f7abf94e2662d82d76cce0 Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@intec.ugent.be>
Date: Mon, 12 Dec 2016 13:01:54 +0100
Subject: lib: Fix indentation in shm_rdrbuff.c

---
 src/lib/shm_rdrbuff.c | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

(limited to 'src')

diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index 3ad8a470..ae661be4 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -398,10 +398,10 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
 }
 
 ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
-                           size_t                headspace,
-                           size_t                tailspace,
-                           uint8_t *             data,
-                           size_t                len)
+                            size_t               headspace,
+                            size_t               tailspace,
+                            uint8_t *            data,
+                            size_t               len)
 {
         struct shm_du_buff * sdb;
         size_t               size = headspace + len + tailspace;
-- 
cgit v1.2.3


From f8c14e0246a6c9cb5e8ff47869b5968abb63f010 Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@intec.ugent.be>
Date: Mon, 12 Dec 2016 13:24:17 +0100
Subject: src, tools: Set/get timeout and get qos for flows

Receiver timeouts can now be set on a flow using the flow_set_timeout
function. Specifying NULL disables the timeout.  The flow_get_timeout
function gets the value for the timeout.

This commit also deprecates fcntl in favor of flow_get_flags and
flow_set_flags functions.

struct qos_spec is typedef'd as a qosspec_t.

The tools and cdap.c are updated to use the new API.

Fixes a bug in operf client where the client's writer thread wouldn't
cancel on SIGINT.
---
 include/ouroboros/dev.h        |  10 +--
 include/ouroboros/fcntl.h      |  19 +++--
 include/ouroboros/ipcp-dev.h   |   7 +-
 include/ouroboros/qos.h        |   4 +-
 src/ipcpd/normal/fmgr.c        |   6 +-
 src/irmd/main.c                |  10 +--
 src/lib/cdap.c                 |   2 +-
 src/lib/dev.c                  | 190 +++++++++++++++++++++++++++++++++--------
 src/tools/cbr/cbr_server.c     |   4 +-
 src/tools/echo/echo_server.c   |   2 +-
 src/tools/operf/operf.c        |   3 -
 src/tools/operf/operf_client.c |  68 +++------------
 src/tools/operf/operf_server.c |   2 +-
 src/tools/oping/oping.c        |   3 -
 src/tools/oping/oping_client.c | 115 ++++++++++---------------
 src/tools/oping/oping_server.c |   4 +-
 16 files changed, 251 insertions(+), 198 deletions(-)

(limited to 'src')

diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h
index f2e42d03..47665ca4 100644
--- a/include/ouroboros/dev.h
+++ b/include/ouroboros/dev.h
@@ -35,8 +35,8 @@ int     ap_init(char * ap_name);
 void    ap_fini(void);
 
 /* Returns flow descriptor (> 0), client AE name and qos spec. */
-int     flow_accept(char **           ae_name,
-                    struct qos_spec * qos);
+int     flow_accept(char **     ae_name,
+                    qosspec_t * qos);
 
 int     flow_alloc_resp(int fd,
                         int response);
@@ -45,9 +45,9 @@ int     flow_alloc_resp(int fd,
  * Returns flow descriptor (> 0).
  * On returning, qos will contain the actual supplied QoS.
  */
-int     flow_alloc(char *            dst_name,
-                   char *            src_ae_name,
-                   struct qos_spec * qos);
+int     flow_alloc(char *      dst_name,
+                   char *      src_ae_name,
+                   qosspec_t * qos);
 
 int     flow_alloc_res(int fd);
 
diff --git a/include/ouroboros/fcntl.h b/include/ouroboros/fcntl.h
index ccb45996..3ad3ccac 100644
--- a/include/ouroboros/fcntl.h
+++ b/include/ouroboros/fcntl.h
@@ -23,6 +23,8 @@
 #ifndef OUROBOROS_FCNTL_H
 #define OUROBOROS_FCNTL_H
 
+#include <sys/time.h>
+
 /* same values as fcntl.h */
 #define FLOW_O_RDONLY   00000000
 #define FLOW_O_WRONLY   00000001
@@ -34,11 +36,18 @@
 
 #define FLOW_O_INVALID  (FLOW_O_WRONLY | FLOW_O_RDWR)
 
-#define FLOW_F_GETFL    00000001
-#define FLOW_F_SETFL    00000002
+int               flow_set_flags(int fd,
+                                 int flags);
+
+int               flow_get_flags(int fd);
+
+int               flow_set_timeout(int               fd,
+                                   struct timespec * to);
+
+int               flow_get_timeout(int               fd,
+                                   struct timespec * to);
 
-int     flow_cntl(int fd,
-                  int cmd,
-                  int oflags);
+int               flow_get_qosspec(int         fd,
+                                   qosspec_t * spec);
 
 #endif /* OUROBOROS_FCNTL_H */
diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h
index ee7c83ad..19a66762 100644
--- a/include/ouroboros/ipcp-dev.h
+++ b/include/ouroboros/ipcp-dev.h
@@ -21,10 +21,6 @@
  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
-#include <unistd.h>
-#include <time.h>
-
-#include <ouroboros/qos.h>
 #include <ouroboros/shm_rdrbuff.h>
 
 #ifndef OUROBOROS_IPCP_DEV_H
@@ -49,4 +45,7 @@ void ipcp_flow_fini(int fd);
 
 void ipcp_flow_del(struct shm_du_buff * sdb);
 
+int  ipcp_flow_get_qoscube(int             fd,
+                           enum qos_cube * cube);
+
 #endif /* OUROBOROS_IPCP_DEV_H */
diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h
index 8f573b7d..047e6288 100644
--- a/include/ouroboros/qos.h
+++ b/include/ouroboros/qos.h
@@ -26,9 +26,9 @@
 #include <stdint.h>
 
 /* FIXME: may need revision */
-struct qos_spec {
+typedef struct qos_spec {
         uint32_t delay;
         uint32_t jitter;
-};
+} qosspec_t;
 
 #endif
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 41785ae4..8e416aa4 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -123,9 +123,9 @@ static int add_np1_fd(int           fd,
 
 static void * fmgr_nm1_acceptor(void * o)
 {
-        int    fd;
-        char * ae_name;
-        struct qos_spec qs;
+        int       fd;
+        char *    ae_name;
+        qosspec_t qs;
 
         (void) o;
 
diff --git a/src/irmd/main.c b/src/irmd/main.c
index dc0c26f2..6a6aedf8 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1125,16 +1125,16 @@ static int flow_alloc_resp(pid_t n_api,
         return ret;
 }
 
-static struct irm_flow * flow_alloc(pid_t  api,
-                                    char * dst_name,
-                                    char * src_ae_name,
-                                    struct qos_spec * qos)
+static struct irm_flow * flow_alloc(pid_t       api,
+                                    char *      dst_name,
+                                    char *      src_ae_name,
+                                    qosspec_t * qos)
 {
         struct irm_flow * f;
         pid_t ipcp;
         int port_id;
 
-        /* FIXME: Map qos_spec to qos_cube */
+        /* FIXME: Map qosspec to qos_cube */
         (void) qos;
 
         pthread_rwlock_rdlock(&irmd->state_lock);
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index d06a7d39..df79be54 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -203,7 +203,7 @@ struct cdap * cdap_create(struct cdap_ops * ops,
             ops->cdap_request == NULL)
                 return NULL;
 
-        flags = flow_cntl(fd, FLOW_F_GETFL, 0);
+        flags = flow_get_flags(fd);
         if (flags & FLOW_O_NONBLOCK)
                 return NULL;
 
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 1c0d73a1..bad56129 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -138,7 +138,8 @@ struct flow {
 
         pid_t                 api;
 
-        struct timespec *     timeout;
+        bool                  timesout;
+        struct timespec       rcv_timeo;
 };
 
 struct {
@@ -213,11 +214,7 @@ static void reset_flow(int fd)
 
         ai.flows[fd].oflags = 0;
         ai.flows[fd].api = -1;
-
-        if (ai.flows[fd].timeout != NULL) {
-                free(ai.flows[fd].timeout);
-                ai.flows[fd].timeout = NULL;
-        }
+        ai.flows[fd].timesout = false;
 }
 
 int ap_init(char * ap_name)
@@ -265,13 +262,13 @@ int ap_init(char * ap_name)
         }
 
         for (i = 0; i < AP_MAX_FLOWS; ++i) {
-                ai.flows[i].rx_rb   = NULL;
-                ai.flows[i].tx_rb   = NULL;
-                ai.flows[i].set     = NULL;
-                ai.flows[i].port_id = -1;
-                ai.flows[i].oflags  = 0;
-                ai.flows[i].api     = -1;
-                ai.flows[i].timeout = NULL;
+                ai.flows[i].rx_rb    = NULL;
+                ai.flows[i].tx_rb    = NULL;
+                ai.flows[i].set      = NULL;
+                ai.flows[i].port_id  = -1;
+                ai.flows[i].oflags   = 0;
+                ai.flows[i].api      = -1;
+                ai.flows[i].timesout = false;
         }
 
         ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
@@ -341,7 +338,7 @@ void ap_fini()
         pthread_rwlock_destroy(&ai.data_lock);
 }
 
-int flow_accept(char ** ae_name, struct qos_spec * qos)
+int flow_accept(char ** ae_name, qosspec_t * qos)
 {
         irm_msg_t msg = IRM_MSG__INIT;
         irm_msg_t * recv_msg = NULL;
@@ -490,7 +487,7 @@ int flow_alloc_resp(int fd, int response)
         return ret;
 }
 
-int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
+int flow_alloc(char * dst_name, char * src_ae_name, qosspec_t * qos)
 {
         irm_msg_t msg = IRM_MSG__INIT;
         irm_msg_t * recv_msg = NULL;
@@ -680,7 +677,7 @@ int flow_dealloc(int fd)
         return 0;
 }
 
-int flow_cntl(int fd, int cmd, int oflags)
+int flow_set_flags(int fd, int flags)
 {
         int old;
 
@@ -698,25 +695,115 @@ int flow_cntl(int fd, int cmd, int oflags)
 
         old = ai.flows[fd].oflags;
 
-        switch (cmd) {
-        case FLOW_F_GETFL: /* GET FLOW FLAGS */
+        ai.flows[fd].oflags = flags;
+        if (flags & FLOW_O_WRONLY)
+                shm_rbuff_block(ai.flows[fd].rx_rb);
+        if (flags & FLOW_O_RDWR)
+                shm_rbuff_unblock(ai.flows[fd].rx_rb);
+        pthread_rwlock_unlock(&ai.flows_lock);
+        pthread_rwlock_unlock(&ai.data_lock);
+
+        return old;
+}
+
+int flow_get_flags(int fd)
+{
+        int old;
+
+        if (fd < 0 || fd >= AP_MAX_FLOWS)
+                return -EBADF;
+
+        pthread_rwlock_rdlock(&ai.data_lock);
+        pthread_rwlock_wrlock(&ai.flows_lock);
+
+        if (ai.flows[fd].port_id < 0) {
+                pthread_rwlock_unlock(&ai.flows_lock);
+                pthread_rwlock_unlock(&ai.data_lock);
+                return -ENOTALLOC;
+        }
+
+        old = ai.flows[fd].oflags;
+
+        pthread_rwlock_unlock(&ai.flows_lock);
+        pthread_rwlock_unlock(&ai.data_lock);
+
+        return old;
+}
+
+int flow_get_timeout(int fd, struct timespec * timeo)
+{
+        int ret = 0;
+
+        if (fd < 0 || fd >= AP_MAX_FLOWS || timeo == NULL)
+                return -EINVAL;
+
+        pthread_rwlock_rdlock(&ai.data_lock);
+        pthread_rwlock_wrlock(&ai.flows_lock);
+
+        if (ai.flows[fd].port_id < 0) {
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
-                return old;
-        case FLOW_F_SETFL: /* SET FLOW FLAGS */
-                ai.flows[fd].oflags = oflags;
-                if (oflags & FLOW_O_WRONLY)
-                        shm_rbuff_block(ai.flows[fd].rx_rb);
-                if (oflags & FLOW_O_RDWR)
-                        shm_rbuff_unblock(ai.flows[fd].rx_rb);
+                return -ENOTALLOC;
+        }
+
+        if (ai.flows[fd].timesout)
+                *timeo = ai.flows[fd].rcv_timeo;
+        else
+                ret = -EPERM;
+
+        pthread_rwlock_unlock(&ai.flows_lock);
+        pthread_rwlock_unlock(&ai.data_lock);
+
+        return ret;
+}
+
+int flow_set_timeout(int fd, struct timespec * timeo)
+{
+        if (fd < 0 || fd >= AP_MAX_FLOWS)
+                return -EINVAL;
+
+        pthread_rwlock_rdlock(&ai.data_lock);
+        pthread_rwlock_wrlock(&ai.flows_lock);
+
+        if (ai.flows[fd].port_id < 0) {
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
-                return old;
-        default:
+                return -ENOTALLOC;
+        }
+
+        if (timeo == NULL) {
+                ai.flows[fd].timesout = false;
+        } else {
+                ai.flows[fd].timesout = true;
+                ai.flows[fd].rcv_timeo = *timeo;
+        }
+
+        pthread_rwlock_unlock(&ai.flows_lock);
+        pthread_rwlock_unlock(&ai.data_lock);
+
+        return 0;
+}
+
+int flow_get_qosspec(int fd, qosspec_t * spec)
+{
+        if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL)
+                return -EINVAL;
+
+        pthread_rwlock_rdlock(&ai.data_lock);
+        pthread_rwlock_wrlock(&ai.flows_lock);
+
+        if (ai.flows[fd].port_id < 0) {
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
-                return FLOW_O_INVALID; /* unknown command */
+                return -ENOTALLOC;
         }
+
+        /* FIXME: map cube to spec */
+
+        pthread_rwlock_unlock(&ai.flows_lock);
+        pthread_rwlock_unlock(&ai.data_lock);
+
+        return 0;
 }
 
 ssize_t flow_write(int fd, void * buf, size_t count)
@@ -764,7 +851,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)
                 }
         } else { /* blocking */
                 struct shm_rdrbuff * rdrb = ai.rdrb;
-                struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb;
+                struct shm_rbuff * tx_rb  = ai.flows[fd].tx_rb;
 
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
@@ -816,19 +903,28 @@ ssize_t flow_read(int fd, void * buf, size_t count)
                 idx = shm_rbuff_read(ai.flows[fd].rx_rb);
                 pthread_rwlock_unlock(&ai.flows_lock);
         } else {
-                struct shm_rbuff * rb     = ai.flows[fd].rx_rb;
-                struct timespec * timeout = ai.flows[fd].timeout;
+                struct shm_rbuff * rb   = ai.flows[fd].rx_rb;
+                bool timeo = ai.flows[fd].timesout;
+                struct timespec timeout = ai.flows[fd].rcv_timeo;
+
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
-                idx = shm_rbuff_read_b(rb, timeout);
+
+                if (timeo)
+                        idx = shm_rbuff_read_b(rb, &timeout);
+                else
+                        idx = shm_rbuff_read_b(rb, NULL);
+
                 pthread_rwlock_rdlock(&ai.data_lock);
         }
 
-        if (idx < 0) {
+        if (idx == -ETIMEDOUT) {
                 pthread_rwlock_unlock(&ai.data_lock);
-                return -EAGAIN;
+                return -ETIMEDOUT;
         }
 
+        assert(idx >= 0);
+
         n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
         if (n < 0) {
                 pthread_rwlock_unlock(&ai.data_lock);
@@ -844,7 +940,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
         return n;
 }
 
-/* select functions */
+/* fqueue functions */
 
 struct flow_set * flow_set_create()
 {
@@ -1328,7 +1424,7 @@ int ipcp_flow_fini(int fd)
 {
         struct shm_rbuff * rb;
 
-        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
+        flow_set_flags(fd, FLOW_O_WRONLY);
 
         pthread_rwlock_rdlock(&ai.data_lock);
         pthread_rwlock_rdlock(&ai.flows_lock);
@@ -1343,6 +1439,28 @@ int ipcp_flow_fini(int fd)
         return 0;
 }
 
+int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube)
+{
+        if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL)
+                return -EINVAL;
+
+        pthread_rwlock_rdlock(&ai.data_lock);
+        pthread_rwlock_wrlock(&ai.flows_lock);
+
+        if (ai.flows[fd].port_id < 0) {
+                pthread_rwlock_unlock(&ai.flows_lock);
+                pthread_rwlock_unlock(&ai.data_lock);
+                return -ENOTALLOC;
+        }
+
+        *cube = ai.flows[fd].qos;
+
+        pthread_rwlock_unlock(&ai.flows_lock);
+        pthread_rwlock_unlock(&ai.data_lock);
+
+        return 0;
+}
+
 ssize_t local_flow_read(int fd)
 {
         ssize_t ret;
diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c
index 104c5e9e..64055cfb 100644
--- a/src/tools/cbr/cbr_server.c
+++ b/src/tools/cbr/cbr_server.c
@@ -85,7 +85,7 @@ static void handle_flow(int fd)
         alive = iv_start;
         ts_add(&iv_start, &intv, &iv_end);
 
-        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK);
+        flow_set_flags(fd, FLOW_O_NONBLOCK);
 
         while (!stop) {
                 clock_gettime(CLOCK_REALTIME, &now);
@@ -157,7 +157,7 @@ static void * listener(void * o)
 {
         int client_fd = 0;
         int response = 0;
-        struct qos_spec qs;
+        qosspec_t qs;
 
         (void) o;
 
diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c
index 09575364..c369d3e6 100644
--- a/src/tools/echo/echo_server.c
+++ b/src/tools/echo/echo_server.c
@@ -40,7 +40,7 @@ int server_main(void)
         int    client_fd = 0;
         char   buf[BUF_SIZE];
         ssize_t count = 0;
-        struct qos_spec qs;
+        qosspec_t qs;
 
         printf("Starting the server.\n");
 
diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c
index bc7ade3a..1716a598 100644
--- a/src/tools/operf/operf.c
+++ b/src/tools/operf/operf.c
@@ -47,9 +47,6 @@ struct c {
         unsigned long sent;
         unsigned long rcvd;
 
-        flow_set_t * flows;
-        fqueue_t *   fq;
-
         pthread_t reader_pt;
         pthread_t writer_pt;
 } client;
diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c
index 902a7b41..44f25893 100644
--- a/src/tools/operf/operf_client.c
+++ b/src/tools/operf/operf_client.c
@@ -22,6 +22,7 @@
  */
 
 #include <ouroboros/dev.h>
+#include <ouroboros/fcntl.h>
 #include <ouroboros/time_utils.h>
 
 #ifdef __FreeBSD__
@@ -45,6 +46,7 @@ static void busy_wait_until(const struct timespec * deadline)
         while (now.tv_sec == deadline->tv_sec
                && now.tv_nsec < deadline->tv_nsec)
                 clock_gettime(CLOCK_REALTIME, &now);
+        pthread_testcancel();
 }
 
 void shutdown_client(int signo, siginfo_t * info, void * c)
@@ -68,23 +70,20 @@ void * reader(void * o)
         struct timespec timeout = {2, 0};
 
         char buf[OPERF_BUF_SIZE];
-        int fd = 0;
+        int fd = *((int *) o);
         int msg_len = 0;
 
-        (void) o;
+        flow_set_timeout(fd, &timeout);
 
-        /* FIXME: use flow timeout option once we have it */
-        while (flow_event_wait(client.flows, client.fq, &timeout) != -ETIMEDOUT)
-                while ((fd = fqueue_next(client.fq)) >= 0) {
-                        msg_len = flow_read(fd, buf, OPERF_BUF_SIZE);
-                        if (msg_len != client.size) {
-                                printf("Invalid message on fd %d.\n", fd);
-                                continue;
-                        }
-
-                        ++client.rcvd;
+        while ((msg_len = flow_read(fd, buf, OPERF_BUF_SIZE)) != -ETIMEDOUT) {
+                if (msg_len != client.size) {
+                        printf("Invalid message on fd %d.\n", fd);
+                        continue;
                 }
 
+                ++client.rcvd;
+        }
+
         return (void *) 0;
 }
 
@@ -160,33 +159,6 @@ void * writer(void * o)
         return (void *) 0;
 }
 
-static int client_init(void)
-{
-        client.flows = flow_set_create();
-        if (client.flows == NULL)
-                return -ENOMEM;
-
-        client.fq = fqueue_create();
-        if (client.fq == NULL) {
-                flow_set_destroy(client.flows);
-                return -ENOMEM;
-        }
-
-        client.sent = 0;
-        client.rcvd = 0;
-
-        return 0;
-}
-
-void client_fini(void)
-{
-        if (client.flows != NULL)
-                flow_set_destroy(client.flows);
-
-        if (client.fq != NULL)
-                fqueue_destroy(client.fq);
-}
-
 int client_main(void)
 {
         struct sigaction sig_act;
@@ -208,32 +180,24 @@ int client_main(void)
                 return -1;
         }
 
-        if (client_init()) {
-                printf("Failed to initialize client.\n");
-                return -1;
-        }
+        client.sent = 0;
+        client.rcvd = 0;
 
         fd = flow_alloc(client.s_apn, NULL, NULL);
         if (fd < 0) {
-                flow_set_destroy(client.flows);
-                fqueue_destroy(client.fq);
                 printf("Failed to allocate flow.\n");
                 return -1;
         }
 
-        flow_set_add(client.flows, fd);
-
         if (flow_alloc_res(fd)) {
                 printf("Flow allocation refused.\n");
-                flow_set_del(client.flows, fd);
                 flow_dealloc(fd);
-                client_fini();
                 return -1;
         }
 
         clock_gettime(CLOCK_REALTIME, &tic);
 
-        pthread_create(&client.reader_pt, NULL, reader, NULL);
+        pthread_create(&client.reader_pt, NULL, reader, &fd);
         pthread_create(&client.writer_pt, NULL, writer, &fd);
 
         pthread_join(client.writer_pt, NULL);
@@ -253,11 +217,7 @@ int client_main(void)
                (client.rcvd * client.size * 8)
                / (double) ts_diff_us(&tic, &toc));
 
-        flow_set_del(client.flows, fd);
-
         flow_dealloc(fd);
 
-        client_fini();
-
         return 0;
 }
diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c
index 4eb93879..340103d2 100644
--- a/src/tools/operf/operf_server.c
+++ b/src/tools/operf/operf_server.c
@@ -102,7 +102,7 @@ void * accept_thread(void * o)
 {
         int fd = 0;
         struct timespec now = {0, 0};
-        struct qos_spec qs;
+        qosspec_t qs;
 
         (void) o;
 
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c
index 98d12a7b..224b182b 100644
--- a/src/tools/oping/oping.c
+++ b/src/tools/oping/oping.c
@@ -54,9 +54,6 @@ struct c {
         double rtt_avg;
         double rtt_m2;
 
-        flow_set_t * flows;
-        fqueue_t *   fq;
-
         /* needs locking */
         struct timespec * times;
         pthread_mutex_t lock;
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index b30ba5f4..c439cf46 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -60,56 +60,54 @@ void * reader(void * o)
 
         char buf[OPING_BUF_SIZE];
         struct oping_msg * msg = (struct oping_msg *) buf;
-        int fd = 0;
+        int fd = *((int *) o);
         int msg_len = 0;
         double ms = 0;
         double d = 0;
 
-        (void) o;
-
-        /* FIXME: use flow timeout option once we have it */
-        while (client.rcvd != client.count
-               && (flow_event_wait(client.flows, client.fq, &timeout)
-                   != -ETIMEDOUT)) {
-                while ((fd = fqueue_next(client.fq)) >= 0) {
-                        msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
-                        if (msg_len < 0)
-                                continue;
-
-                        if (ntohl(msg->type) != ECHO_REPLY) {
-                                printf("Invalid message on fd %d.\n", fd);
-                                continue;
-                        }
-
-                        if (ntohl(msg->id) >= client.count) {
-                                printf("Invalid id.\n");
-                                continue;
-                        }
-
-                        ++client.rcvd;
-
-                        clock_gettime(CLOCK_REALTIME, &now);
-
-                        pthread_mutex_lock(&client.lock);
-                        ms = ts_diff_us(&client.times[ntohl(msg->id)], &now)
-                                / 1000.0;
-                        pthread_mutex_unlock(&client.lock);
-
-                        printf("%d bytes from %s: seq=%d time=%.3f ms\n",
-                               msg_len,
-                               client.s_apn,
-                               ntohl(msg->id),
-                               ms);
-
-                        if (ms < client.rtt_min)
-                                client.rtt_min = ms;
-                        if (ms > client.rtt_max)
-                                client.rtt_max = ms;
-
-                        d = (ms - client.rtt_avg);
-                        client.rtt_avg += d  / (float) client.rcvd;
-                        client.rtt_m2 += d * (ms - client.rtt_avg);
+        flow_set_timeout(fd, &timeout);
+
+        while (client.rcvd != client.count) {
+                msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
+                if (msg_len == -ETIMEDOUT)
+                        break;
+
+                if (msg_len < 0)
+                        continue;
+
+                if (ntohl(msg->type) != ECHO_REPLY) {
+                        printf("Invalid message on fd %d.\n", fd);
+                        continue;
                 }
+
+                if (ntohl(msg->id) >= client.count) {
+                        printf("Invalid id.\n");
+                        continue;
+                }
+
+                ++client.rcvd;
+
+                clock_gettime(CLOCK_REALTIME, &now);
+
+                pthread_mutex_lock(&client.lock);
+                ms = ts_diff_us(&client.times[ntohl(msg->id)], &now)
+                        / 1000.0;
+                pthread_mutex_unlock(&client.lock);
+
+                printf("%d bytes from %s: seq=%d time=%.3f ms\n",
+                       msg_len,
+                       client.s_apn,
+                       ntohl(msg->id),
+                       ms);
+
+                if (ms < client.rtt_min)
+                        client.rtt_min = ms;
+                if (ms > client.rtt_max)
+                        client.rtt_max = ms;
+
+                d = (ms - client.rtt_avg);
+                client.rtt_avg += d  / (float) client.rcvd;
+                client.rtt_m2 += d * (ms - client.rtt_avg);
         }
 
         return (void *) 0;
@@ -164,20 +162,8 @@ void * writer(void * o)
 
 static int client_init(void)
 {
-        client.flows = flow_set_create();
-        if (client.flows == NULL)
-                return -ENOMEM;
-
-        client.fq = fqueue_create();
-        if (client.fq == NULL) {
-                flow_set_destroy(client.flows);
-                return -ENOMEM;
-        }
-
         client.times = malloc(sizeof(struct timespec) * client.count);
         if (client.times == NULL) {
-                flow_set_destroy(client.flows);
-                fqueue_destroy(client.fq);
                 pthread_mutex_unlock(&client.lock);
                 return -ENOMEM;
         }
@@ -197,12 +183,6 @@ static int client_init(void)
 
 void client_fini(void)
 {
-        if (client.flows != NULL)
-                flow_set_destroy(client.flows);
-
-        if (client.fq != NULL)
-                fqueue_destroy(client.fq);
-
         if (client.times != NULL)
                 free(client.times);
 }
@@ -235,17 +215,12 @@ int client_main(void)
 
         fd = flow_alloc(client.s_apn, NULL, NULL);
         if (fd < 0) {
-                flow_set_destroy(client.flows);
-                fqueue_destroy(client.fq);
                 printf("Failed to allocate flow.\n");
                 return -1;
         }
 
-        flow_set_add(client.flows, fd);
-
         if (flow_alloc_res(fd)) {
                 printf("Flow allocation refused.\n");
-                flow_set_del(client.flows, fd);
                 flow_dealloc(fd);
                 client_fini();
                 return -1;
@@ -255,7 +230,7 @@ int client_main(void)
 
         clock_gettime(CLOCK_REALTIME, &tic);
 
-        pthread_create(&client.reader_pt, NULL, reader, NULL);
+        pthread_create(&client.reader_pt, NULL, reader, &fd);
         pthread_create(&client.writer_pt, NULL, writer, &fd);
 
         pthread_join(client.writer_pt, NULL);
@@ -283,8 +258,6 @@ int client_main(void)
                         printf("NaN ms\n");
         }
 
-        flow_set_del(client.flows, fd);
-
         flow_dealloc(fd);
 
         client_fini();
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index 8d7ab1db..63fca567 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -115,7 +115,7 @@ void * accept_thread(void * o)
 {
         int fd = 0;
         struct timespec now = {0, 0};
-        struct qos_spec qs;
+        qosspec_t qs;
 
         (void) o;
 
@@ -143,7 +143,7 @@ void * accept_thread(void * o)
                 server.times[fd] = now;
                 pthread_mutex_unlock(&server.lock);
 
-                flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK | FLOW_O_RDWR);
+                flow_set_flags(fd, FLOW_O_NONBLOCK | FLOW_O_RDWR);
         }
 
         return (void *) 0;
-- 
cgit v1.2.3