From 7ba0fd0ce19244745c8d2512ce8a003783d914a7 Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@ugent.be>
Date: Thu, 30 Mar 2017 20:33:22 +0200
Subject: lib: Revise flow allocation API

The flow_alloc_res and flow_alloc_resp calls have been removed. The
flow_alloc and flow_accept calls are now both blocking and take an
additional timeout argument.
---
 include/ouroboros/dev.h        |  23 ++---
 src/ipcpd/normal/connmgr.c     |  15 +--
 src/irmd/ipcp.c                |   7 --
 src/irmd/irm_flow.c            |  12 ++-
 src/irmd/main.c                | 203 ++++++++++++-----------------------------
 src/lib/dev.c                  | 193 ++++++++++++---------------------------
 src/lib/irmd_messages.proto    |  18 ++--
 src/lib/shm_flow_set.c         |   3 +-
 src/tools/cbr/cbr_client.c     |  10 +-
 src/tools/cbr/cbr_server.c     |  38 ++++----
 src/tools/echo/echo_client.c   |  12 +--
 src/tools/echo/echo_server.c   |  22 ++---
 src/tools/operf/operf_client.c |   8 +-
 src/tools/operf/operf_server.c |   8 +-
 src/tools/oping/oping_client.c |  11 +--
 src/tools/oping/oping_server.c |  13 +--
 16 files changed, 184 insertions(+), 412 deletions(-)

diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h
index e92cdd1c..4984736c 100644
--- a/include/ouroboros/dev.h
+++ b/include/ouroboros/dev.h
@@ -24,6 +24,7 @@
 #include <ouroboros/qos.h>
 
 #include <unistd.h>
+#include <time.h>
 
 #ifndef OUROBOROS_DEV_H
 #define OUROBOROS_DEV_H
@@ -33,20 +34,14 @@ int     ap_init(const char * ap_name);
 
 void    ap_fini(void);
 
-/* Returns flow descriptor (> 0) and qos spec. */
-int     flow_accept(qosspec_t * spec);
+/* Returns flow descriptor, qs updates to supplied QoS. */
+int     flow_alloc(const char *      dst_name,
+                   qosspec_t *       qs,
+                   struct timespec * timeo);
 
-int     flow_alloc_resp(int fd,
-                        int response);
-
-/*
- * Returns flow descriptor (> 0).
- * On returning, spec will contain the actual supplied QoS.
- */
-int     flow_alloc(const char * dst_name,
-                   qosspec_t *  spec);
-
-int     flow_alloc_res(int fd);
+/* Returns flow descriptor, qs updates to supplied QoS. */
+int     flow_accept(qosspec_t *       qs,
+                    struct timespec * timeo);
 
 int     flow_dealloc(int fd);
 
@@ -58,4 +53,4 @@ ssize_t flow_read(int    fd,
                   void * buf,
                   size_t count);
 
-#endif
+#endif /* OUROBOROS_DEV_H */
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c
index b8314917..8068d173 100644
--- a/src/ipcpd/normal/connmgr.c
+++ b/src/ipcpd/normal/connmgr.c
@@ -126,18 +126,13 @@ static void * flow_acceptor(void * o)
 
                 pthread_rwlock_unlock(&ipcpi.state_lock);
 
-                fd = flow_accept(&qs);
+                fd = flow_accept(&qs, NULL);
                 if (fd < 0) {
                         if (fd != -EIRMD)
                                 log_warn("Flow accept failed: %d", fd);
                         continue;
                 }
 
-                if (flow_alloc_resp(fd, 0)) {
-                        log_err("Failed to respond to flow alloc request.");
-                        continue;
-                }
-
                 if (cacep_rcv(fd, &rcv_info)) {
                         log_err("Error establishing application connection.");
                         flow_dealloc(fd);
@@ -286,7 +281,7 @@ int connmgr_alloc(struct ae *   ae,
 
         memset(&conn->conn_info, 0, sizeof(conn->conn_info));
 
-        conn->flow_info.fd = flow_alloc(dst_name, qs);
+        conn->flow_info.fd = flow_alloc(dst_name, qs, NULL);
         if (conn->flow_info.fd < 0) {
                 log_err("Failed to allocate flow to %s.", dst_name);
                 return -1;
@@ -297,12 +292,6 @@ int connmgr_alloc(struct ae *   ae,
         else
                 memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs));
 
-        if (flow_alloc_res(conn->flow_info.fd)) {
-                log_err("Flow allocation to %s failed.", dst_name);
-                flow_dealloc(conn->flow_info.fd);
-                return -1;
-        }
-
         if (cacep_snd(conn->flow_info.fd, &ae->info)) {
                 log_err("Failed to create application connection.");
                 flow_dealloc(conn->flow_info.fd);
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 06b66d3b..a8263580 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -176,18 +176,11 @@ pid_t ipcp_create(char *         name,
 
 int ipcp_destroy(pid_t api)
 {
-        int status;
-
         if (kill(api, SIGTERM)) {
                 log_err("Failed to destroy IPCP");
                 return -1;
         }
 
-        if (waitpid(api, &status, 0) < 0) {
-                log_err("Failed to destroy IPCP");
-                return -1;
-        }
-
         return 0;
 }
 
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index 99966561..4e7c22ef 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -45,6 +45,7 @@ struct irm_flow * irm_flow_create(pid_t n_api,
         }
 
         if (pthread_mutex_init(&f->state_lock, NULL)) {
+                pthread_cond_destroy(&f->state_cond);
                 free(f);
                 return NULL;
         }
@@ -63,6 +64,9 @@ struct irm_flow * irm_flow_create(pid_t n_api,
         f->n_1_rb = shm_rbuff_create(n_1_api, port_id);
         if (f->n_1_rb == NULL) {
                 log_err("Could not create ringbuffer for AP-I %d.", n_1_api);
+                shm_rbuff_destroy(f->n_rb);
+                pthread_mutex_destroy(&f->state_lock);
+                pthread_cond_destroy(&f->state_cond);
                 free(f);
                 return NULL;
         }
@@ -122,7 +126,8 @@ enum flow_state irm_flow_get_state(struct irm_flow * f)
         return state;
 }
 
-void irm_flow_set_state(struct irm_flow * f, enum flow_state state)
+void irm_flow_set_state(struct irm_flow * f,
+                        enum flow_state   state)
 {
         assert(f);
         assert(state != FLOW_DESTROY);
@@ -135,7 +140,8 @@ void irm_flow_set_state(struct irm_flow * f, enum flow_state state)
         pthread_mutex_unlock(&f->state_lock);
 }
 
-enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state)
+enum flow_state irm_flow_wait_state(struct irm_flow * f,
+                                    enum flow_state   state)
 {
         assert(f);
         assert(state != FLOW_NULL);
@@ -143,6 +149,8 @@ enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state)
 
         pthread_mutex_lock(&f->state_lock);
 
+        assert(f->state != FLOW_NULL);
+
         while (!(f->state == state || f->state == FLOW_DESTROY))
                 pthread_cond_wait(&f->state_cond, &f->state_lock);
 
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 9901a608..c7adf386 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -133,7 +133,8 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api)
 
         list_for_each(pos, &irmd->irm_flows) {
                 struct irm_flow * e = list_entry(pos, struct irm_flow, next);
-                if (e->n_api == n_api)
+                if (e->n_api == n_api &&
+                    irm_flow_get_state(e) == FLOW_ALLOC_PENDING)
                         return e;
         }
 
@@ -982,7 +983,12 @@ static struct irm_flow * flow_accept(pid_t       api,
         struct irm_flow *  f  = NULL;
         struct api_entry * e  = NULL;
         struct reg_entry * re = NULL;
-        struct list_head * p;
+        struct list_head * p  = NULL;
+
+        pid_t api_n1;
+        pid_t api_n;
+        int   port_id;
+        int   ret;
 
         pthread_rwlock_rdlock(&irmd->state_lock);
 
@@ -1016,7 +1022,7 @@ static struct irm_flow * flow_accept(pid_t       api,
         pthread_rwlock_unlock(&irmd->reg_lock);
         pthread_rwlock_unlock(&irmd->state_lock);
 
-        while (api_entry_sleep(e) == -ETIMEDOUT) {
+        while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) {
                 pthread_rwlock_rdlock(&irmd->state_lock);
                 if (irmd->state != IRMD_RUNNING) {
                         pthread_rwlock_unlock(&irmd->state_lock);
@@ -1025,126 +1031,76 @@ static struct irm_flow * flow_accept(pid_t       api,
                 pthread_rwlock_unlock(&irmd->state_lock);
         }
 
-        pthread_rwlock_rdlock(&irmd->state_lock);
-
-        if (irmd->state != IRMD_RUNNING) {
-                reg_entry_set_state(re, REG_NAME_NULL);
-                pthread_rwlock_unlock(&irmd->state_lock);
+        if (ret == -1) {
+                /* The process died, we can exit here. */
                 return NULL;
         }
 
-        pthread_rwlock_rdlock(&irmd->reg_lock);
+        pthread_rwlock_rdlock(&irmd->state_lock);
 
-        e = api_table_get(&irmd->api_table, api);
-        if (e == NULL) {
-                pthread_rwlock_unlock(&irmd->reg_lock);
+        if (irmd->state != IRMD_RUNNING) {
+                reg_entry_set_state(re, REG_NAME_NULL);
                 pthread_rwlock_unlock(&irmd->state_lock);
-                log_dbg("Process gone while accepting flow.");
                 return NULL;
         }
 
-        pthread_mutex_lock(&e->state_lock);
-
-        re = e->re;
-
-        pthread_mutex_unlock(&e->state_lock);
-
-        if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {
-                pthread_rwlock_unlock(&irmd->reg_lock);
-                pthread_rwlock_unlock(&irmd->state_lock);
-                log_err("Entry in wrong state.");
-                return NULL;
-        }
-        pthread_rwlock_unlock(&irmd->reg_lock);
         pthread_rwlock_rdlock(&irmd->flows_lock);
 
         f = get_irm_flow_n(api);
         if (f == NULL) {
                 pthread_rwlock_unlock(&irmd->flows_lock);
                 pthread_rwlock_unlock(&irmd->state_lock);
-                log_err("Port_id was not created yet.");
+                log_warn("Port_id was not created yet.");
                 return NULL;
         }
 
         *cube = re->qos;
 
+        api_n   = f->n_api;
+        api_n1  = f->n_1_api;
+        port_id = f->port_id;
+
         log_info("Flow on port_id %d allocated.", f->port_id);
 
         pthread_rwlock_unlock(&irmd->flows_lock);
-        pthread_rwlock_unlock(&irmd->state_lock);
-
-        return f;
-}
-
-static int flow_alloc_resp(pid_t n_api,
-                           int   port_id,
-                           int   response)
-{
-        struct irm_flow *  f  = NULL;
-        struct reg_entry * re = NULL;
-        struct api_entry * e  = NULL;
-        int ret = -1;
-
-        pid_t api_n1;
-        pid_t api_n;
-
-        pthread_rwlock_rdlock(&irmd->state_lock);
-
-        if (irmd->state != IRMD_RUNNING) {
-                pthread_rwlock_unlock(&irmd->state_lock);
-                return -1;
-        }
-
-        pthread_rwlock_wrlock(&irmd->reg_lock);
+        pthread_rwlock_rdlock(&irmd->reg_lock);
 
-        e = api_table_get(&irmd->api_table, n_api);
+        e = api_table_get(&irmd->api_table, api);
         if (e == NULL) {
                 pthread_rwlock_unlock(&irmd->reg_lock);
                 pthread_rwlock_unlock(&irmd->state_lock);
-                log_err("Unknown AP-I %d responding for port_id %d.",
-                        n_api, port_id);
-                return -1;
+                ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1);
+                log_dbg("Process gone while accepting flow.");
+                return NULL;
         }
 
+        pthread_mutex_lock(&e->state_lock);
+
         re = e->re;
-        if (re == NULL) {
-                pthread_rwlock_unlock(&irmd->reg_lock);
-                pthread_rwlock_unlock(&irmd->state_lock);
-                log_err("AP-I %d is not handling a flow request.", n_api);
-                return -1;
-        }
+
+        pthread_mutex_unlock(&e->state_lock);
 
         if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {
                 pthread_rwlock_unlock(&irmd->reg_lock);
                 pthread_rwlock_unlock(&irmd->state_lock);
-                log_err("Name %s has no pending flow request.", re->name);
-                return -1;
+                ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1);
+                log_err("Entry in wrong state.");
+                return NULL;
         }
 
-        registry_del_api(&irmd->registry, n_api);
+        registry_del_api(&irmd->registry, api);
 
         pthread_rwlock_unlock(&irmd->reg_lock);
-        pthread_rwlock_wrlock(&irmd->flows_lock);
-
-        f = get_irm_flow(port_id);
-        if (f == NULL) {
-                pthread_rwlock_unlock(&irmd->flows_lock);
-                pthread_rwlock_unlock(&irmd->state_lock);
-                return -1;
-        }
-
-        api_n  = f->n_api;
-        api_n1 = f->n_1_api;
-
-        pthread_rwlock_unlock(&irmd->flows_lock);
         pthread_rwlock_unlock(&irmd->state_lock);
 
-        ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response);
+        if (ipcp_flow_alloc_resp(api_n1, port_id, api_n, 0)) {
+                log_dbg("Failed to respond to alloc.");
+                return NULL;
+        }
 
-        if (!(response || ret))
-                irm_flow_set_state(f, FLOW_ALLOCATED);
+        irm_flow_set_state(f, FLOW_ALLOCATED);
 
-        return ret;
+        return f;
 }
 
 static struct irm_flow * flow_alloc(pid_t     api,
@@ -1196,6 +1152,8 @@ static struct irm_flow * flow_alloc(pid_t     api,
         pthread_rwlock_unlock(&irmd->flows_lock);
         pthread_rwlock_unlock(&irmd->state_lock);
 
+        assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING);
+
         if (ipcp_flow_alloc(ipcp, port_id, api,
                             dst_name, cube) < 0) {
                 pthread_rwlock_rdlock(&irmd->state_lock);
@@ -1210,54 +1168,16 @@ static struct irm_flow * flow_alloc(pid_t     api,
                 return NULL;
         }
 
-        return f;
-}
-
-static int flow_alloc_res(int port_id)
-{
-        struct irm_flow * f;
-
-        pthread_rwlock_rdlock(&irmd->state_lock);
-
-        if (irmd->state != IRMD_RUNNING) {
-                pthread_rwlock_unlock(&irmd->state_lock);
-                return -1;
-        }
-        pthread_rwlock_rdlock(&irmd->flows_lock);
-
-        f = get_irm_flow(port_id);
-        if (f == NULL) {
-                pthread_rwlock_unlock(&irmd->flows_lock);
-                pthread_rwlock_unlock(&irmd->state_lock);
-                log_err("Could not find port %d.", port_id);
-                return -1;
-        }
-
-        if (irm_flow_get_state(f) == FLOW_NULL) {
-                pthread_rwlock_unlock(&irmd->flows_lock);
-                pthread_rwlock_unlock(&irmd->state_lock);
-                log_info("Port %d is deprecated.", port_id);
-                return -1;
-        }
-
-        if (irm_flow_get_state(f) == FLOW_ALLOCATED) {
-                log_info("Flow on port_id %d allocated.", port_id);
-                pthread_rwlock_unlock(&irmd->flows_lock);
-                pthread_rwlock_unlock(&irmd->state_lock);
-                return 0;
+        if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) {
+                log_info("Pending flow on port_id %d torn down.", port_id);
+                return NULL;
         }
 
-        pthread_rwlock_unlock(&irmd->flows_lock);
-        pthread_rwlock_unlock(&irmd->state_lock);
-
-        if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) {
-                log_info("Flow on port_id %d allocated.", port_id);
-                return 0;
-        }
+        assert(irm_flow_get_state(f) == FLOW_ALLOCATED);
 
-        log_info("Pending flow on port_id %d torn down.", port_id);
+        log_info("Flow on port_id %d allocated.", port_id);
 
-        return -1;
+        return f;
 }
 
 static int flow_dealloc(pid_t api,
@@ -1293,6 +1213,9 @@ static int flow_dealloc(pid_t api,
 
         if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) {
                 list_del(&f->next);
+                if ((kill(f->n_api, 0) < 0 && f->n_1_api == -1) ||
+                    (kill (f->n_1_api, 0) < 0 && f->n_api == -1))
+                        irm_flow_set_state(f, FLOW_NULL);
                 clear_irm_flow(f);
                 irm_flow_destroy(f);
                 bmp_release(irmd->port_ids, port_id);
@@ -1305,12 +1228,11 @@ static int flow_dealloc(pid_t api,
         }
 
         pthread_rwlock_unlock(&irmd->flows_lock);
+        pthread_rwlock_unlock(&irmd->state_lock);
 
         if (n_1_api != -1)
                 ret = ipcp_flow_dealloc(n_1_api, port_id);
 
-        pthread_rwlock_unlock(&irmd->state_lock);
-
         return ret;
 }
 
@@ -1501,7 +1423,7 @@ static int flow_alloc_reply(int port_id,
         struct irm_flow * f;
 
         pthread_rwlock_rdlock(&irmd->state_lock);
-        pthread_rwlock_wrlock(&irmd->flows_lock);
+        pthread_rwlock_rdlock(&irmd->flows_lock);
 
         f = get_irm_flow(port_id);
         if (f == NULL) {
@@ -1551,18 +1473,19 @@ static void irm_destroy(void)
         list_for_each_safe(p, h, &irmd->ipcps) {
                 struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next);
                 list_del(&e->next);
-                ipcp_destroy(e->api);
-                clear_spawned_api(e->api);
-                registry_del_api(&irmd->registry, e->api);
                 ipcp_entry_destroy(e);
         }
 
-        list_for_each_safe(p, h, &irmd->spawned_apis) {
+        list_for_each(p, &irmd->spawned_apis) {
                 struct pid_el * e = list_entry(p, struct pid_el, next);
-                int status;
                 if (kill(e->pid, SIGTERM))
                         log_dbg("Could not send kill signal to %d.", e->pid);
-                else if (waitpid(e->pid, &status, 0) < 0)
+        }
+
+        list_for_each_safe(p, h, &irmd->spawned_apis) {
+                struct pid_el * e = list_entry(p, struct pid_el, next);
+                int status;
+                if (waitpid(e->pid, &status, 0) < 0)
                         log_dbg("Error waiting for %d to exit.", e->pid);
                 list_del(&e->next);
                 registry_del_api(&irmd->registry, e->pid);
@@ -1940,12 +1863,6 @@ void * mainloop(void * o)
                         ret_msg.has_api     = true;
                         ret_msg.api         = e->n_1_api;
                         break;
-                case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP:
-                        ret_msg.has_result = true;
-                        ret_msg.result = flow_alloc_resp(msg->api,
-                                                         msg->port_id,
-                                                         msg->response);
-                        break;
                 case IRM_MSG_CODE__IRM_FLOW_ALLOC:
                         e = flow_alloc(msg->api,
                                        msg->dst_name,
@@ -1960,10 +1877,6 @@ void * mainloop(void * o)
                         ret_msg.has_api     = true;
                         ret_msg.api         = e->n_1_api;
                         break;
-                case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES:
-                        ret_msg.has_result = true;
-                        ret_msg.result = flow_alloc_res(msg->port_id);
-                        break;
                 case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
                         ret_msg.has_result = true;
                         ret_msg.result = flow_dealloc(msg->api, msg->port_id);
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 79797b92..e19083c3 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -161,21 +161,21 @@ struct {
 } ai;
 
 /* FIXME: translate real spec to cube */
-static qoscube_t spec_to_cube(qosspec_t * spec)
+static qoscube_t spec_to_cube(qosspec_t * qs)
 {
-        if (spec == NULL)
+        if (qs == NULL)
                 return QOS_CUBE_BE;
 
-        return spec->cube;
+        return qs->cube;
 }
 
 /* FIXME: fill real spec */
-static void fill_qosspec(qosspec_t * spec,
+static void fill_qosspec(qosspec_t * qs,
                          qoscube_t   cube)
 {
-        assert(spec);
+        assert(qs);
 
-        spec->cube = cube;
+        qs->cube = cube;
 }
 
 static int api_announce(char * ap_name)
@@ -209,6 +209,17 @@ static int api_announce(char * ap_name)
         return ret;
 }
 
+static void init_flow(int fd)
+{
+        assert(!(fd < 0));
+
+        memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
+
+        ai.flows[fd].port_id  = -1;
+        ai.flows[fd].api      = -1;
+        ai.flows[fd].cube     = QOS_CUBE_BE;
+}
+
 static void reset_flow(int fd)
 {
         assert (!(fd < 0));
@@ -216,25 +227,17 @@ static void reset_flow(int fd)
         if (ai.flows[fd].port_id != -1)
                 port_destroy(&ai.ports[ai.flows[fd].port_id]);
 
-        ai.flows[fd].port_id = -1;
-        if (ai.flows[fd].rx_rb != NULL) {
+        if (ai.flows[fd].rx_rb != NULL)
                 shm_rbuff_close(ai.flows[fd].rx_rb);
-                ai.flows[fd].rx_rb = NULL;
-        }
-        if (ai.flows[fd].tx_rb != NULL) {
+
+        if (ai.flows[fd].tx_rb != NULL)
                 shm_rbuff_close(ai.flows[fd].tx_rb);
-                ai.flows[fd].tx_rb = NULL;
-        }
 
-        if (ai.flows[fd].set != NULL) {
+        if (ai.flows[fd].set != NULL)
                 shm_flow_set_close(ai.flows[fd].set);
-                ai.flows[fd].set = NULL;
-        }
 
-        ai.flows[fd].oflags = 0;
-        ai.flows[fd].api = -1;
-        ai.flows[fd].timesout = false;
-        ai.flows[fd].cube = QOS_CUBE_BE;
+        init_flow(fd);
+
 }
 
 int ap_init(const char * ap_name)
@@ -280,16 +283,8 @@ int ap_init(const char * ap_name)
                 return -1;
         }
 
-        for (i = 0; i < AP_MAX_FLOWS; ++i) {
-                ai.flows[i].rx_rb    = NULL;
-                ai.flows[i].tx_rb    = NULL;
-                ai.flows[i].set      = NULL;
-                ai.flows[i].port_id  = -1;
-                ai.flows[i].oflags   = 0;
-                ai.flows[i].api      = -1;
-                ai.flows[i].timesout = false;
-                ai.flows[i].cube     = QOS_CUBE_BE;
-        }
+        for (i = 0; i < AP_MAX_FLOWS; ++i)
+                init_flow(i);
 
         ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
         if (ai.ports == NULL) {
@@ -382,7 +377,8 @@ void ap_fini()
         pthread_rwlock_destroy(&ai.data_lock);
 }
 
-int flow_accept(qosspec_t * spec)
+int flow_accept(qosspec_t *       qs,
+                struct timespec * timeo)
 {
         irm_msg_t msg = IRM_MSG__INIT;
         irm_msg_t * recv_msg = NULL;
@@ -391,6 +387,13 @@ int flow_accept(qosspec_t * spec)
         msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
         msg.has_api = true;
 
+        if (timeo != NULL) {
+                msg.has_timeo_sec = true;
+                msg.has_timeo_usec = true;
+                msg.timeo_sec  = timeo->tv_sec;
+                msg.timeo_usec = timeo->tv_nsec / 1000;
+        }
+
         pthread_rwlock_rdlock(&ai.data_lock);
 
         msg.api     = ai.api;
@@ -424,7 +427,6 @@ int flow_accept(qosspec_t * spec)
 
         ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);
         if (ai.flows[fd].rx_rb == NULL) {
-                reset_flow(fd);
                 bmp_release(ai.fds, fd);
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
@@ -435,8 +437,10 @@ int flow_accept(qosspec_t * spec)
         ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id);
         if (ai.flows[fd].tx_rb == NULL) {
                 reset_flow(fd);
+                bmp_release(ai.fds, fd);
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
+                irm_msg__free_unpacked(recv_msg, NULL);
                 return -1;
         }
 
@@ -455,8 +459,8 @@ int flow_accept(qosspec_t * spec)
         ai.flows[fd].api     = recv_msg->api;
         ai.flows[fd].cube    = recv_msg->qoscube;
 
-        if (spec != NULL)
-                fill_qosspec(spec, ai.flows[fd].cube);
+        if (qs != NULL)
+                fill_qosspec(qs, ai.flows[fd].cube);
 
         ai.ports[recv_msg->port_id].fd    = fd;
         ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
@@ -469,69 +473,27 @@ int flow_accept(qosspec_t * spec)
         return fd;
 }
 
-int flow_alloc_resp(int fd,
-                    int response)
+int flow_alloc(const char *      dst_name,
+               qosspec_t *       qs,
+               struct timespec * timeo)
 {
         irm_msg_t msg = IRM_MSG__INIT;
         irm_msg_t * recv_msg = NULL;
-        int ret = -1;
-
-        if (fd < 0 || fd >= AP_MAX_FLOWS)
-                return -EBADF;
-
-        msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
-        msg.has_api      = true;
-        msg.api          = ai.api;
-        msg.has_port_id  = true;
-
-        pthread_rwlock_rdlock(&ai.data_lock);
-        pthread_rwlock_rdlock(&ai.flows_lock);
 
-        if (ai.flows[fd].port_id < 0) {
-                pthread_rwlock_unlock(&ai.flows_lock);
-                pthread_rwlock_unlock(&ai.data_lock);
-                return -ENOTALLOC;
-        }
-
-        msg.port_id      = ai.flows[fd].port_id;
-
-        pthread_rwlock_unlock(&ai.flows_lock);
-        pthread_rwlock_unlock(&ai.data_lock);
-
-        msg.has_response = true;
-        msg.response     = response;
-
-        recv_msg = send_recv_irm_msg(&msg);
-        if (recv_msg == NULL)
-                return -EIRMD;
-
-        if (!recv_msg->has_result) {
-                irm_msg__free_unpacked(recv_msg, NULL);
-                return -1;
-        }
-
-        ret = recv_msg->result;
-
-        irm_msg__free_unpacked(recv_msg, NULL);
-
-        return ret;
-}
-
-int flow_alloc(const char * dst_name,
-               qosspec_t *  spec)
-{
-        irm_msg_t msg = IRM_MSG__INIT;
-        irm_msg_t * recv_msg = NULL;
-        int fd = -1;
-
-        if (dst_name == NULL)
-                return -EINVAL;
+        int fd;
 
         msg.code        = IRM_MSG_CODE__IRM_FLOW_ALLOC;
         msg.dst_name    = (char *) dst_name;
         msg.has_api     = true;
         msg.has_qoscube = true;
-        msg.qoscube     = spec_to_cube(spec);
+        msg.qoscube     = spec_to_cube(qs);
+
+        if (timeo != NULL) {
+                msg.has_timeo_sec = true;
+                msg.has_timeo_usec = true;
+                msg.timeo_sec  = timeo->tv_sec;
+                msg.timeo_usec = timeo->tv_nsec / 1000;
+        }
 
         pthread_rwlock_rdlock(&ai.data_lock);
 
@@ -561,7 +523,6 @@ int flow_alloc(const char * dst_name,
 
         ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);
         if (ai.flows[fd].rx_rb == NULL) {
-                reset_flow(fd);
                 bmp_release(ai.fds, fd);
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
@@ -571,16 +532,21 @@ int flow_alloc(const char * dst_name,
 
         ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id);
         if (ai.flows[fd].tx_rb == NULL) {
+                reset_flow(fd);
+                bmp_release(ai.fds, fd);
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
+                irm_msg__free_unpacked(recv_msg, NULL);
                 return -1;
         }
 
         ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
         if (ai.flows[fd].set == NULL) {
                 reset_flow(fd);
+                bmp_release(ai.fds, fd);
                 pthread_rwlock_unlock(&ai.flows_lock);
                 pthread_rwlock_unlock(&ai.data_lock);
+                irm_msg__free_unpacked(recv_msg, NULL);
                 return -1;
         }
 
@@ -589,7 +555,6 @@ int flow_alloc(const char * dst_name,
         ai.flows[fd].api     = recv_msg->api;
         ai.flows[fd].cube    = recv_msg->qoscube;
 
-        ai.ports[recv_msg->port_id].fd    = fd;
         ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
 
         pthread_rwlock_unlock(&ai.flows_lock);
@@ -600,48 +565,6 @@ int flow_alloc(const char * dst_name,
         return fd;
 }
 
-int flow_alloc_res(int fd)
-{
-        irm_msg_t msg = IRM_MSG__INIT;
-        irm_msg_t * recv_msg = NULL;
-        int result = 0;
-
-        if (fd < 0 || fd >= AP_MAX_FLOWS)
-                return -EBADF;
-
-        msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
-        msg.has_port_id  = true;
-
-        pthread_rwlock_rdlock(&ai.data_lock);
-        pthread_rwlock_rdlock(&ai.flows_lock);
-
-        if (ai.flows[fd].port_id < 0) {
-                pthread_rwlock_unlock(&ai.flows_lock);
-                pthread_rwlock_unlock(&ai.data_lock);
-                return -ENOTALLOC;
-        }
-
-        msg.port_id = ai.flows[fd].port_id;
-
-        pthread_rwlock_unlock(&ai.flows_lock);
-        pthread_rwlock_unlock(&ai.data_lock);
-
-        recv_msg = send_recv_irm_msg_b(&msg);
-        if (recv_msg == NULL)
-                return -EIRMD;
-
-        if (!recv_msg->has_result) {
-                irm_msg__free_unpacked(recv_msg, NULL);
-                return -1;
-        }
-
-        result = recv_msg->result;
-
-        irm_msg__free_unpacked(recv_msg, NULL);
-
-        return result;
-}
-
 int flow_dealloc(int fd)
 {
         irm_msg_t msg = IRM_MSG__INIT;
@@ -804,9 +727,9 @@ int flow_set_timeout(int                     fd,
 }
 
 int flow_get_qosspec(int         fd,
-                     qosspec_t * spec)
+                     qosspec_t * qs)
 {
-        if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL)
+        if (fd < 0 || fd >= AP_MAX_FLOWS || qs == NULL)
                 return -EINVAL;
 
         pthread_rwlock_rdlock(&ai.data_lock);
@@ -818,7 +741,7 @@ int flow_get_qosspec(int         fd,
                 return -ENOTALLOC;
         }
 
-        fill_qosspec(spec, ai.flows[fd].cube);
+        fill_qosspec(qs, ai.flows[fd].cube);
 
         pthread_rwlock_unlock(&ai.flows_lock);
         pthread_rwlock_unlock(&ai.data_lock);
diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto
index c25d2c18..4fbd676e 100644
--- a/src/lib/irmd_messages.proto
+++ b/src/lib/irmd_messages.proto
@@ -39,14 +39,12 @@ enum irm_msg_code {
         IRM_UNBIND_API        = 11;
         IRM_REG               = 12;
         IRM_UNREG             = 13;
-        IRM_FLOW_ACCEPT       = 14;
-        IRM_FLOW_ALLOC_RESP   = 15;
-        IRM_FLOW_ALLOC        = 16;
-        IRM_FLOW_ALLOC_RES    = 17;
-        IRM_FLOW_DEALLOC      = 18;
-        IPCP_FLOW_REQ_ARR     = 19;
-        IPCP_FLOW_ALLOC_REPLY = 20;
-        IRM_REPLY             = 21;
+        IRM_FLOW_ALLOC        = 14;
+        IRM_FLOW_ACCEPT       = 15;
+        IRM_FLOW_DEALLOC      = 16;
+        IPCP_FLOW_REQ_ARR     = 17;
+        IPCP_FLOW_ALLOC_REPLY = 18;
+        IRM_REPLY             = 19;
 };
 
 message irm_msg {
@@ -63,5 +61,7 @@ message irm_msg {
         optional dif_config_msg conf = 11;
         optional uint32 opts         = 12;
         repeated sint32 apis         = 13;
-        optional sint32 result       = 14;
+        optional uint32 timeo_sec    = 14;
+        optional uint32 timeo_usec   = 15;
+        optional sint32 result       = 16;
 };
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
index 615fbd2b..67abbb5b 100644
--- a/src/lib/shm_flow_set.c
+++ b/src/lib/shm_flow_set.c
@@ -302,7 +302,8 @@ int shm_flow_set_has(struct shm_flow_set * set,
         return ret;
 }
 
-void shm_flow_set_notify(struct shm_flow_set * set, int port_id)
+void shm_flow_set_notify(struct shm_flow_set * set,
+                         int                   port_id)
 {
         assert(set);
         assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS);
diff --git a/src/tools/cbr/cbr_client.c b/src/tools/cbr/cbr_client.c
index 16ade13d..5ec1d560 100644
--- a/src/tools/cbr/cbr_client.c
+++ b/src/tools/cbr/cbr_client.c
@@ -63,7 +63,6 @@ int client_main(char * server,
         struct sigaction sig_act;
 
         int fd = 0;
-        int result = 0;
         char buf[size];
         long seqnr = 0;
         long gap = size * 8.0 * (BILLION / (double) rate);
@@ -90,19 +89,12 @@ int client_main(char * server,
         printf("Client started, duration %d, rate %lu b/s, size %d B.\n",
                duration, rate, size);
 
-        fd = flow_alloc(server, NULL);
+        fd = flow_alloc(server, NULL, NULL);
         if (fd < 0) {
                 printf("Failed to allocate flow.\n");
                 return -1;
         }
 
-        result = flow_alloc_res(fd);
-        if (result < 0) {
-                printf("Flow allocation refused.\n");
-                flow_dealloc(fd);
-                return -1;
-        }
-
         clock_gettime(CLOCK_REALTIME, &start);
         if (!flood) {
                 while (!stop) {
diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c
index 9198858c..1a963a64 100644
--- a/src/tools/cbr/cbr_server.c
+++ b/src/tools/cbr/cbr_server.c
@@ -146,6 +146,8 @@ static void * worker(void * o)
 
                 pthread_mutex_lock(&fds_lock);
                 fds_count--;
+
+                pthread_cond_signal(&fds_signal);
                 pthread_mutex_unlock(&fds_lock);
         }
 
@@ -154,8 +156,7 @@ static void * worker(void * o)
 
 static void * listener(void * o)
 {
-        int client_fd = 0;
-        int response = 0;
+        int fd = 0;
         qosspec_t qs;
 
         (void) o;
@@ -164,8 +165,19 @@ static void * listener(void * o)
                server_settings.interval, server_settings.timeout);
 
         while (true) {
-                client_fd = flow_accept(&qs);
-                if (client_fd < 0) {
+                pthread_mutex_lock(&fds_lock);
+                pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
+                                     (void *) &fds_lock);
+
+                while (fds_count == THREADS_SIZE) {
+                        printf("Can't accept any more flows, waiting.\n");
+                        pthread_cond_wait(&fds_signal, &fds_lock);
+                }
+
+                pthread_cleanup_pop(true);
+
+                fd = flow_accept(&qs, NULL);
+                if (fd < 0) {
                         printf("Failed to accept flow.\n");
                         break;
                 }
@@ -174,26 +186,12 @@ static void * listener(void * o)
 
                 pthread_mutex_lock(&fds_lock);
 
-                response = (fds_count < THREADS_SIZE) ? 0 : -1;
-
-                if (flow_alloc_resp(client_fd, response)) {
-                        printf("Failed to give an allocate response.\n");
-                        flow_dealloc(client_fd);
-                        pthread_mutex_unlock(&fds_lock);
-                        continue;
-                }
-
-                if (response) {
-                        printf("Can't accept any more flows, denying.\n");
-                        continue;
-                }
-
                 fds_count++;
                 fds_index = (fds_index + 1) % THREADS_SIZE;
-                fds[fds_index] = client_fd;
+                fds[fds_index] = fd;
 
-                pthread_mutex_unlock(&fds_lock);
                 pthread_cond_signal(&fds_signal);
+                pthread_mutex_unlock(&fds_lock);
         }
 
         return 0;
diff --git a/src/tools/echo/echo_client.c b/src/tools/echo/echo_client.c
index f84de73a..5ec2051f 100644
--- a/src/tools/echo/echo_client.c
+++ b/src/tools/echo/echo_client.c
@@ -26,25 +26,17 @@
 int client_main(void)
 {
         int fd = 0;
-        int result = 0;
         char buf[BUF_SIZE];
         char * message  = "Client says hi!";
         ssize_t count = 0;
 
-        fd = flow_alloc("echo", NULL);
+        fd = flow_alloc("echo", NULL, NULL);
         if (fd < 0) {
                 printf("Failed to allocate flow.\n");
                 return -1;
         }
 
-        result = flow_alloc_res(fd);
-        if (result < 0) {
-                printf("Flow allocation refused.\n");
-                flow_dealloc(fd);
-                return -1;
-        }
-
-        if (flow_write(fd, message, strlen(message) + 1) == -1) {
+        if (flow_write(fd, message, strlen(message) + 1) < 0) {
                 printf("Failed to write SDU.\n");
                 flow_dealloc(fd);
                 return -1;
diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c
index aa136485..771155f4 100644
--- a/src/tools/echo/echo_server.c
+++ b/src/tools/echo/echo_server.c
@@ -37,7 +37,7 @@ void shutdown_server(int signo)
 
 int server_main(void)
 {
-        int    client_fd = 0;
+        int    fd = 0;
         char   buf[BUF_SIZE];
         ssize_t count = 0;
         qosspec_t qs;
@@ -51,36 +51,30 @@ int server_main(void)
         }
 
         while (true) {
-                client_fd = flow_accept(&qs);
-                if (client_fd < 0) {
+                fd = flow_accept(&qs, NULL);
+                if (fd < 0) {
                         printf("Failed to accept flow.\n");
                         break;
                 }
 
                 printf("New flow.\n");
 
-                if (flow_alloc_resp(client_fd, 0)) {
-                        printf("Failed to give an allocate response.\n");
-                        flow_dealloc(client_fd);
-                        continue;
-                }
-
-                count = flow_read(client_fd, &buf, BUF_SIZE);
+                count = flow_read(fd, &buf, BUF_SIZE);
                 if (count < 0) {
                         printf("Failed to read SDU.\n");
-                        flow_dealloc(client_fd);
+                        flow_dealloc(fd);
                         continue;
                 }
 
                 printf("Message from client is %.*s.\n", (int) count, buf);
 
-                if (flow_write(client_fd, buf, count) == -1) {
+                if (flow_write(fd, buf, count) == -1) {
                         printf("Failed to write SDU.\n");
-                        flow_dealloc(client_fd);
+                        flow_dealloc(fd);
                         continue;
                 }
 
-                flow_dealloc(client_fd);
+                flow_dealloc(fd);
         }
 
         return 0;
diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c
index d2f08ef4..7827b62b 100644
--- a/src/tools/operf/operf_client.c
+++ b/src/tools/operf/operf_client.c
@@ -182,18 +182,12 @@ int client_main(void)
         client.sent = 0;
         client.rcvd = 0;
 
-        fd = flow_alloc(client.s_apn, NULL);
+        fd = flow_alloc(client.s_apn, NULL, NULL);
         if (fd < 0) {
                 printf("Failed to allocate flow.\n");
                 return -1;
         }
 
-        if (flow_alloc_res(fd)) {
-                printf("Flow allocation refused.\n");
-                flow_dealloc(fd);
-                return -1;
-        }
-
         clock_gettime(CLOCK_REALTIME, &tic);
 
         pthread_create(&client.reader_pt, NULL, reader, &fd);
diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c
index 3665d4cc..b17a4f7b 100644
--- a/src/tools/operf/operf_server.c
+++ b/src/tools/operf/operf_server.c
@@ -108,7 +108,7 @@ void * accept_thread(void * o)
         printf("Ouroboros perf server started.\n");
 
         while (true) {
-                fd = flow_accept(&qs);
+                fd = flow_accept(&qs, NULL);
                 if (fd < 0) {
                         printf("Failed to accept flow.\n");
                         break;
@@ -116,12 +116,6 @@ void * accept_thread(void * o)
 
                 printf("New flow %d.\n", fd);
 
-                if (flow_alloc_resp(fd, 0)) {
-                        printf("Failed to give an allocate response.\n");
-                        flow_dealloc(fd);
-                        continue;
-                }
-
                 clock_gettime(CLOCK_REALTIME, &now);
 
                 pthread_mutex_lock(&server.lock);
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index a91a126c..77a08db7 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -176,7 +176,6 @@ static int client_init(void)
         client.rtt_m2 = 0;
 
         pthread_mutex_init(&client.lock, NULL);
-        pthread_mutex_lock(&client.lock);
 
         return 0;
 }
@@ -213,21 +212,13 @@ int client_main(void)
                 return -1;
         }
 
-        fd = flow_alloc(client.s_apn, NULL);
+        fd = flow_alloc(client.s_apn, NULL, NULL);
         if (fd < 0) {
                 printf("Failed to allocate flow.\n");
-                return -1;
-        }
-
-        if (flow_alloc_res(fd)) {
-                printf("Flow allocation refused.\n");
-                flow_dealloc(fd);
                 client_fini();
                 return -1;
         }
 
-        pthread_mutex_unlock(&client.lock);
-
         clock_gettime(CLOCK_REALTIME, &tic);
 
         pthread_create(&client.reader_pt, NULL, reader, &fd);
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index e20e236d..44a301ba 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -57,6 +57,7 @@ void * cleaner_thread(void * o)
                 for (i = 0; i < OPING_MAX_FLOWS; ++i)
                         if (flow_set_has(server.flows, i) &&
                             ts_diff_ms(&server.times[i], &now) > deadline_ms) {
+                                printf("Flow %d timed out.\n", i);
                                 flow_set_del(server.flows, i);
                                 flow_dealloc(i);
                         }
@@ -110,8 +111,8 @@ void * server_thread(void *o)
 
 void * accept_thread(void * o)
 {
-        int fd = 0;
-        struct timespec now = {0, 0};
+        int fd;
+        struct timespec now;
         qosspec_t qs;
 
         (void) o;
@@ -119,7 +120,7 @@ void * accept_thread(void * o)
         printf("Ouroboros ping server started.\n");
 
         while (true) {
-                fd = flow_accept(&qs);
+                fd = flow_accept(&qs, NULL);
                 if (fd < 0) {
                         printf("Failed to accept flow.\n");
                         break;
@@ -127,12 +128,6 @@ void * accept_thread(void * o)
 
                 printf("New flow %d.\n", fd);
 
-                if (flow_alloc_resp(fd, 0)) {
-                        printf("Failed to give an allocate response.\n");
-                        flow_dealloc(fd);
-                        continue;
-                }
-
                 clock_gettime(CLOCK_REALTIME, &now);
 
                 pthread_mutex_lock(&server.lock);
-- 
cgit v1.2.3