summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-04-06 09:30:01 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-04-06 09:30:01 +0000
commit34ef0da6c1b3a3419dbdf2041ed1e3ba107dc915 (patch)
treed8e793cffbe829d64855eaa5a429b90ebe3dc3a4
parentc6ad4f96f8bb2f1ee749e92308e7173523ddd0b8 (diff)
parente1c0714d5827cd927961f3a687d9720e6e9aa802 (diff)
downloadouroboros-34ef0da6c1b3a3419dbdf2041ed1e3ba107dc915.tar.gz
ouroboros-34ef0da6c1b3a3419dbdf2041ed1e3ba107dc915.zip
Merged in dstaesse/ouroboros/be-tim (pull request #464)
lib, irmd: Implement flow allocation timeout
-rw-r--r--include/ouroboros/sockets.h2
-rw-r--r--src/ipcpd/ipcp.c4
-rw-r--r--src/irmd/api_table.c85
-rw-r--r--src/irmd/api_table.h2
-rw-r--r--src/irmd/ipcp.c14
-rw-r--r--src/irmd/irm_flow.c37
-rw-r--r--src/irmd/irm_flow.h5
-rw-r--r--src/irmd/main.c182
-rw-r--r--src/lib/dev.c31
-rw-r--r--src/lib/irm.c4
-rw-r--r--src/lib/irmd_messages.proto2
-rw-r--r--src/lib/sockets.c14
12 files changed, 243 insertions, 139 deletions
diff --git a/include/ouroboros/sockets.h b/include/ouroboros/sockets.h
index bcc60e17..8237efb6 100644
--- a/include/ouroboros/sockets.h
+++ b/include/ouroboros/sockets.h
@@ -53,6 +53,4 @@ int client_socket_open(char * file_name);
irm_msg_t * send_recv_irm_msg(irm_msg_t * msg);
-irm_msg_t * send_recv_irm_msg_b(irm_msg_t * msg);
-
#endif
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 4b7da030..f08e4ce7 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -296,7 +296,7 @@ static void * ipcp_main_loop(void * o)
buffer.len = ipcp_msg__get_packed_size(&ret_msg);
if (buffer.len == 0) {
- log_err("Failed to send reply message");
+ log_err("Failed to pack reply message");
close(lsockfd);
thread_inc();
continue;
@@ -304,6 +304,7 @@ static void * ipcp_main_loop(void * o)
buffer.data = malloc(buffer.len);
if (buffer.data == NULL) {
+ log_err("Failed to create reply buffer.");
close(lsockfd);
thread_inc();
continue;
@@ -312,6 +313,7 @@ static void * ipcp_main_loop(void * o)
ipcp_msg__pack(&ret_msg, buffer.data);
if (write(lsockfd, buffer.data, buffer.len) == -1) {
+ log_err("Failed to send reply message");
free(buffer.data);
close(lsockfd);
thread_inc();
diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c
index 5ff0fcf6..268f8231 100644
--- a/src/irmd/api_table.c
+++ b/src/irmd/api_table.c
@@ -31,14 +31,17 @@
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
+#include <assert.h>
-struct api_entry * api_entry_create(pid_t api, char * apn)
+#define ENTRY_SLEEP_TIMEOUT 10 /* ms */
+
+struct api_entry * api_entry_create(pid_t api,
+ char * apn)
{
struct api_entry * e;
pthread_condattr_t cattr;
- if (apn == NULL)
- return NULL;
+ assert(apn);
e = malloc(sizeof(*e));
if (e == NULL)
@@ -84,8 +87,7 @@ void api_entry_destroy(struct api_entry * e)
struct list_head * p;
struct list_head * h;
- if (e == NULL)
- return;
+ assert(e);
pthread_mutex_lock(&e->state_lock);
@@ -121,11 +123,13 @@ void api_entry_destroy(struct api_entry * e)
free(e);
}
-int api_entry_add_name(struct api_entry * e, char * name)
+int api_entry_add_name(struct api_entry * e,
+ char * name)
{
struct str_el * s;
- if (e == NULL || name == NULL)
- return -EINVAL;
+
+ assert(e);
+ assert(name);
s = malloc(sizeof(*s));
if (s == NULL)
@@ -137,11 +141,15 @@ int api_entry_add_name(struct api_entry * e, char * name)
return 0;
}
-void api_entry_del_name(struct api_entry * e, char * name)
+void api_entry_del_name(struct api_entry * e,
+ char * name)
{
struct list_head * p = NULL;
struct list_head * h = NULL;
+ assert(e);
+ assert(name);
+
list_for_each_safe(p, h, &e->names) {
struct str_el * s = list_entry(p, struct str_el, next);
if (!wildcard_match(name, s->str)) {
@@ -153,31 +161,34 @@ void api_entry_del_name(struct api_entry * e, char * name)
}
}
+void api_entry_cancel(struct api_entry * e)
+{
+ pthread_mutex_lock(&e->state_lock);
+
+ e->state = API_INIT;
+ pthread_cond_broadcast(&e->state_cond);
+
+ pthread_mutex_unlock(&e->state_lock);
+}
+
int api_entry_sleep(struct api_entry * e)
{
- struct timespec timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
- (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION};
+ struct timespec timeout = {(ENTRY_SLEEP_TIMEOUT / 1000),
+ (ENTRY_SLEEP_TIMEOUT % 1000) * MILLION};
struct timespec now;
struct timespec dl;
int ret = 0;
- if (e == NULL)
- return -EINVAL;
-
- e->re = NULL;
+ assert(e);
clock_gettime(PTHREAD_COND_CLOCK, &now);
-
ts_add(&now, &timeout, &dl);
pthread_mutex_lock(&e->state_lock);
- if (e->state != API_INIT) {
- pthread_mutex_unlock(&e->state_lock);
- return -EINVAL;
- }
- e->state = API_SLEEP;
+ if (e->state != API_WAKE && e->state != API_DESTROY)
+ e->state = API_SLEEP;
while (e->state == API_SLEEP && ret != -ETIMEDOUT)
ret = -pthread_cond_timedwait(&e->state_cond,
@@ -190,17 +201,20 @@ int api_entry_sleep(struct api_entry * e)
ret = -1;
}
- e->state = API_INIT;
+ if (ret != -ETIMEDOUT)
+ e->state = API_INIT;
+
pthread_cond_broadcast(&e->state_cond);
pthread_mutex_unlock(&e->state_lock);
return ret;
}
-void api_entry_wake(struct api_entry * e, struct reg_entry * re)
+void api_entry_wake(struct api_entry * e,
+ struct reg_entry * re)
{
- if (e == NULL)
- return;
+ assert(e);
+ assert(re);
pthread_mutex_lock(&e->state_lock);
@@ -217,24 +231,32 @@ void api_entry_wake(struct api_entry * e, struct reg_entry * re)
while (e->state == API_WAKE)
pthread_cond_wait(&e->state_cond, &e->state_lock);
+ if (e->state == API_DESTROY)
+ e->state = API_INIT;
+
pthread_mutex_unlock(&e->state_lock);
}
-int api_table_add(struct list_head * api_table, struct api_entry * e)
+int api_table_add(struct list_head * api_table,
+ struct api_entry * e)
{
- if (api_table == NULL || e == NULL)
- return -EINVAL;
+
+ assert(api_table);
+ assert(e);
list_add(&e->next, api_table);
return 0;
}
-void api_table_del(struct list_head * api_table, pid_t api)
+void api_table_del(struct list_head * api_table,
+ pid_t api)
{
struct list_head * p;
struct list_head * h;
+ assert(api_table);
+
list_for_each_safe(p, h, api_table) {
struct api_entry * e = list_entry(p, struct api_entry, next);
if (api == e->api) {
@@ -244,10 +266,13 @@ void api_table_del(struct list_head * api_table, pid_t api)
}
}
-struct api_entry * api_table_get(struct list_head * api_table, pid_t api)
+struct api_entry * api_table_get(struct list_head * api_table,
+ pid_t api)
{
struct list_head * h;
+ assert(api_table);
+
list_for_each(h, api_table) {
struct api_entry * e = list_entry(h, struct api_entry, next);
if (api == e->api)
diff --git a/src/irmd/api_table.h b/src/irmd/api_table.h
index c7998c7f..f9c4d0aa 100644
--- a/src/irmd/api_table.h
+++ b/src/irmd/api_table.h
@@ -61,6 +61,8 @@ int api_entry_sleep(struct api_entry * e);
void api_entry_wake(struct api_entry * e,
struct reg_entry * re);
+void api_entry_cancel(struct api_entry * e);
+
int api_entry_add_name(struct api_entry * e,
char * name);
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index a8263580..eb0c2de0 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -53,10 +53,12 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
char * sock_path = NULL;
ssize_t count = 0;
ipcp_msg_t * recv_msg = NULL;
-
struct timeval tv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
+ if (kill(api, 0) < 0)
+ return NULL;
+
sock_path = ipcp_sock_path(api);
if (sock_path == NULL)
return NULL;
@@ -67,10 +69,6 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
return NULL;
}
- if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
- (void *) &tv, sizeof(tv)))
- log_warn("Failed to set timeout on socket.");
-
free(sock_path);
buf.len = ipcp_msg__get_packed_size(msg);
@@ -85,6 +83,10 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
return NULL;
}
+ if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
+ (void *) &tv, sizeof(tv)))
+ log_warn("Failed to set timeout on socket.");
+
pthread_cleanup_push(close_ptr, (void *) &sockfd);
pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data);
@@ -184,7 +186,7 @@ int ipcp_destroy(pid_t api)
return 0;
}
-int ipcp_bootstrap(pid_t api,
+int ipcp_bootstrap(pid_t api,
dif_config_msg_t * conf)
{
ipcp_msg_t msg = IPCP_MSG__INIT;
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index 7a02b01a..8b85f36f 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -23,7 +23,9 @@
#define OUROBOROS_PREFIX "irm_flow"
#include <ouroboros/config.h>
+#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
+#include <ouroboros/time_utils.h>
#include "irm_flow.h"
@@ -142,31 +144,52 @@ void irm_flow_set_state(struct irm_flow * f,
pthread_mutex_unlock(&f->state_lock);
}
-enum flow_state irm_flow_wait_state(struct irm_flow * f,
- enum flow_state state)
+int irm_flow_wait_state(struct irm_flow * f,
+ enum flow_state state,
+ struct timespec * timeo)
{
+ int ret = 0;
+ int s;
+
+ struct timespec dl;
+
assert(f);
assert(state != FLOW_NULL);
assert(state != FLOW_DESTROY);
assert(state != FLOW_DEALLOC_PENDING);
+ if (timeo != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, timeo, &dl);
+ }
+
pthread_mutex_lock(&f->state_lock);
assert(f->state != FLOW_NULL);
while (!(f->state == state ||
f->state == FLOW_DESTROY ||
- f->state == FLOW_DEALLOC_PENDING))
- pthread_cond_wait(&f->state_cond, &f->state_lock);
+ f->state == FLOW_DEALLOC_PENDING) &&
+ ret != -ETIMEDOUT) {
+ if (timeo == NULL)
+ ret = -pthread_cond_wait(&f->state_cond,
+ &f->state_lock);
+ else
+ ret = -pthread_cond_timedwait(&f->state_cond,
+ &f->state_lock,
+ &dl);
+ }
- if (f->state == FLOW_DESTROY || f->state == FLOW_DEALLOC_PENDING) {
+ if (f->state == FLOW_DESTROY ||
+ f->state == FLOW_DEALLOC_PENDING ||
+ ret == -ETIMEDOUT) {
f->state = FLOW_NULL;
pthread_cond_broadcast(&f->state_cond);
}
- state = f->state;
+ s = f->state;
pthread_mutex_unlock(&f->state_lock);
- return state;
+ return ret ? ret : s;
}
diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h
index 97770117..8902a6ab 100644
--- a/src/irmd/irm_flow.h
+++ b/src/irmd/irm_flow.h
@@ -71,7 +71,8 @@ enum flow_state irm_flow_get_state(struct irm_flow * f);
void irm_flow_set_state(struct irm_flow * f,
enum flow_state state);
-enum flow_state irm_flow_wait_state(struct irm_flow * f,
- enum flow_state state);
+int irm_flow_wait_state(struct irm_flow * f,
+ enum flow_state state,
+ struct timespec * timeo);
#endif /* OUROBOROS_IRMD_IRM_FLOW_H */
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 673e39ea..41beb049 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -983,23 +983,33 @@ static int api_announce(pid_t api,
return 0;
}
-static struct irm_flow * flow_accept(pid_t api)
+static int flow_accept(pid_t api,
+ struct timespec * timeo,
+ struct irm_flow ** fl)
{
- struct irm_flow * f = NULL;
+ struct irm_flow * f = NULL;
struct api_entry * e = NULL;
struct reg_entry * re = NULL;
struct list_head * p = NULL;
+ struct timespec dl;
+ struct timespec now;
+
pid_t api_n1;
pid_t api_n;
int port_id;
int ret;
+ if (timeo != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, timeo, &dl);
+ }
+
pthread_rwlock_rdlock(&irmd.state_lock);
if (irmd.state != IRMD_RUNNING) {
pthread_rwlock_unlock(&irmd.state_lock);
- return NULL;
+ return -EIRMD;
}
pthread_rwlock_wrlock(&irmd.reg_lock);
@@ -1010,7 +1020,7 @@ static struct irm_flow * flow_accept(pid_t api)
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_err("Unknown instance %d calling accept.", api);
- return NULL;
+ return -EINVAL;
}
log_dbg("New instance (%d) of %s added.", api, e->apn);
@@ -1027,18 +1037,33 @@ static struct irm_flow * flow_accept(pid_t api)
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_unlock(&irmd.state_lock);
- while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) {
+ while (true) {
+ if (timeo != NULL && ts_diff_ns(&now, &dl) < 0) {
+ log_dbg("Accept timed out.");
+ return -ETIMEDOUT;
+ }
+
pthread_rwlock_rdlock(&irmd.state_lock);
+
if (irmd.state != IRMD_RUNNING) {
pthread_rwlock_unlock(&irmd.state_lock);
- return NULL;
+ return -EIRMD;
}
+
pthread_rwlock_unlock(&irmd.state_lock);
- }
- if (ret == -1) {
- /* The process died, we can exit here. */
- return NULL;
+ ret = api_entry_sleep(e);
+ if (ret == -ETIMEDOUT) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ api_entry_cancel(e);
+ continue;
+ }
+
+ if (ret == -1)
+ return -EPIPE;
+
+ if (ret == 0)
+ break;
}
pthread_rwlock_rdlock(&irmd.state_lock);
@@ -1046,7 +1071,7 @@ static struct irm_flow * flow_accept(pid_t api)
if (irmd.state != IRMD_RUNNING) {
reg_entry_set_state(re, REG_NAME_NULL);
pthread_rwlock_unlock(&irmd.state_lock);
- return NULL;
+ return -EIRMD;
}
pthread_rwlock_rdlock(&irmd.flows_lock);
@@ -1056,7 +1081,7 @@ static struct irm_flow * flow_accept(pid_t api)
pthread_rwlock_unlock(&irmd.flows_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_warn("Port_id was not created yet.");
- return NULL;
+ return -EPERM;
}
api_n = f->n_api;
@@ -1079,7 +1104,7 @@ static struct irm_flow * flow_accept(pid_t api)
irm_flow_set_state(f, FLOW_NULL);
irm_flow_destroy(f);
log_dbg("Process gone while accepting flow.");
- return NULL;
+ return -EPERM;
}
pthread_mutex_lock(&e->state_lock);
@@ -1100,7 +1125,7 @@ static struct irm_flow * flow_accept(pid_t api)
irm_flow_set_state(f, FLOW_NULL);
irm_flow_destroy(f);
log_err("Entry in wrong state.");
- return NULL;
+ return -EPERM;
}
registry_del_api(&irmd.registry, api);
@@ -1118,29 +1143,34 @@ static struct irm_flow * flow_accept(pid_t api)
clear_irm_flow(f);
irm_flow_set_state(f, FLOW_NULL);
irm_flow_destroy(f);
- return NULL;
+ return -EPERM;
}
irm_flow_set_state(f, FLOW_ALLOCATED);
log_info("Flow on port_id %d allocated.", f->port_id);
- return f;
+ *fl = f;
+
+ return 0;
}
-static struct irm_flow * flow_alloc(pid_t api,
- char * dst_name,
- qoscube_t cube)
+static int flow_alloc(pid_t api,
+ char * dst_name,
+ qoscube_t cube,
+ struct timespec * timeo,
+ struct irm_flow ** e)
{
struct irm_flow * f;
- pid_t ipcp;
- int port_id;
+ pid_t ipcp;
+ int port_id;
+ int state;
pthread_rwlock_rdlock(&irmd.state_lock);
if (irmd.state != IRMD_RUNNING) {
pthread_rwlock_unlock(&irmd.state_lock);
- return NULL;
+ return -1;
}
pthread_rwlock_rdlock(&irmd.reg_lock);
@@ -1150,7 +1180,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_info("Destination unreachable.");
- return NULL;
+ return -1;
}
pthread_rwlock_unlock(&irmd.reg_lock);
@@ -1160,7 +1190,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd.flows_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_err("Could not allocate port_id.");
- return NULL;
+ return -EBADF;
}
f = irm_flow_create(api, ipcp, port_id, cube);
@@ -1169,7 +1199,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd.flows_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_err("Could not allocate port_id.");
- return NULL;
+ return -ENOMEM;
}
list_add(&f->next, &irmd.irm_flows);
@@ -1179,22 +1209,30 @@ static struct irm_flow * flow_alloc(pid_t api,
assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING);
- if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) {
+ if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube)) {
/* sanitizer cleans this */
- log_info("Failed to respond to alloc.");
- return NULL;
+ log_info("Flow_allocation failed.");
+ return -EAGAIN;
}
- if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) {
- log_info("Pending flow on port_id %d torn down.", port_id);
- return NULL;
+ state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo);
+ if (state != FLOW_ALLOCATED) {
+ if (state == -ETIMEDOUT) {
+ log_dbg("Flow allocation timed out");
+ return -ETIMEDOUT;
+ }
+
+ log_info("Pending flow to %s torn down.", dst_name);
+ return -EPIPE;
}
assert(irm_flow_get_state(f) == FLOW_ALLOCATED);
+ *e = f;
+
log_info("Flow on port_id %d allocated.", port_id);
- return f;
+ return 0;
}
static int flow_dealloc(pid_t api,
@@ -1382,7 +1420,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
return NULL;
}
-
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_wrlock(&irmd.flows_lock);
port_id = bmp_allocate(irmd.port_ids);
@@ -1798,15 +1835,17 @@ void * mainloop(void * o)
struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
(IRMD_ACCEPT_TIMEOUT % 1000) * 1000};
#endif
- int cli_sockfd;
- irm_msg_t * msg;
- ssize_t count;
- buffer_t buffer;
- irm_msg_t ret_msg = IRM_MSG__INIT;
- struct irm_flow * e = NULL;
- pid_t * apis = NULL;
- struct timeval tv = {(SOCKET_TIMEOUT / 1000),
- (SOCKET_TIMEOUT % 1000) * 1000};
+ int cli_sockfd;
+ irm_msg_t * msg;
+ ssize_t count;
+ buffer_t buffer;
+ irm_msg_t ret_msg = IRM_MSG__INIT;
+ struct irm_flow * e = NULL;
+ pid_t * apis = NULL;
+ struct timespec * timeo = NULL;
+ struct timespec ts = {0, 0};
+ struct timeval tv = {(SOCKET_TIMEOUT / 1000),
+ (SOCKET_TIMEOUT % 1000) * 1000};
pthread_rwlock_rdlock(&irmd.state_lock);
@@ -1849,6 +1888,14 @@ void * mainloop(void * o)
thread_dec();
+ if (msg->has_timeo_sec) {
+ assert(msg->has_timeo_nsec);
+
+ ts.tv_sec = msg->timeo_sec;
+ ts.tv_nsec = msg->timeo_nsec;
+ timeo = &ts;
+ }
+
switch (msg->code) {
case IRM_MSG_CODE__IRM_CREATE_IPCP:
ret_msg.has_result = true;
@@ -1897,9 +1944,9 @@ void * mainloop(void * o)
ret_msg.result = unbind_api(msg->api, msg->dst_name);
break;
case IRM_MSG_CODE__IRM_LIST_IPCPS:
+ ret_msg.has_result = true;
ret_msg.n_apis = list_ipcps(msg->dst_name, &apis);
ret_msg.apis = apis;
- ret_msg.has_result = true;
break;
case IRM_MSG_CODE__IRM_REG:
ret_msg.has_result = true;
@@ -1914,32 +1961,27 @@ void * mainloop(void * o)
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
- e = flow_accept(msg->api);
- if (e == NULL) {
- ret_msg.has_result = true;
- ret_msg.result = -EIRMD;
- break;
+ ret_msg.has_result = true;
+ ret_msg.result = flow_accept(msg->api, timeo, &e);
+ if (ret_msg.result == 0) {
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_api = true;
+ ret_msg.api = e->n_1_api;
+ ret_msg.has_qoscube = true;
+ ret_msg.qoscube = e->qc;
}
- ret_msg.has_port_id = true;
- ret_msg.port_id = e->port_id;
- ret_msg.has_api = true;
- ret_msg.api = e->n_1_api;
- ret_msg.has_qoscube = true;
- ret_msg.qoscube = e->qc;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
- e = flow_alloc(msg->api,
- msg->dst_name,
- msg->qoscube);
- if (e == NULL) {
- ret_msg.has_result = true;
- ret_msg.result = -1;
- break;
+ ret_msg.has_result = true;
+ ret_msg.result = flow_alloc(msg->api, msg->dst_name,
+ msg->qoscube, timeo, &e);
+ if (ret_msg.result == 0) {
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_api = true;
+ ret_msg.api = e->n_1_api;
}
- ret_msg.has_port_id = true;
- ret_msg.port_id = e->port_id;
- ret_msg.has_api = true;
- ret_msg.api = e->n_1_api;
break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
@@ -1949,8 +1991,8 @@ void * mainloop(void * o)
e = flow_req_arr(msg->api,
msg->dst_name,
msg->qoscube);
+ ret_msg.has_result = true;
if (e == NULL) {
- ret_msg.has_result = true;
ret_msg.result = -1;
break;
}
@@ -1971,6 +2013,12 @@ void * mainloop(void * o)
irm_msg__free_unpacked(msg, NULL);
+ if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
+ close(cli_sockfd);
+ thread_inc();
+ continue;
+ }
+
buffer.len = irm_msg__get_packed_size(&ret_msg);
if (buffer.len == 0) {
log_err("Failed to calculate length of reply message.");
@@ -2065,7 +2113,7 @@ void * threadpoolmgr(void * o)
if (pthread_cond_timedwait(&irmd.threads_cond,
&irmd.threads_lock,
&dl) == ETIMEDOUT)
- if (irmd.threads > IRMD_MIN_AV_THREADS)
+ if (irmd.threads > IRMD_MIN_AV_THREADS )
--irmd.max_threads;
pthread_mutex_unlock(&irmd.threads_lock);
diff --git a/src/lib/dev.c b/src/lib/dev.c
index c063fd47..389ff278 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -276,7 +276,7 @@ int ap_init(const char * ap_name)
shm_flow_set_destroy(ai.fqset);
bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
- return -1;
+ return -EIRMD;
}
ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);
@@ -393,9 +393,9 @@ int flow_accept(qosspec_t * qs,
if (timeo != NULL) {
msg.has_timeo_sec = true;
- msg.has_timeo_usec = true;
+ msg.has_timeo_nsec = true;
msg.timeo_sec = timeo->tv_sec;
- msg.timeo_usec = timeo->tv_nsec / 1000;
+ msg.timeo_nsec = timeo->tv_nsec;
}
pthread_rwlock_rdlock(&ai.data_lock);
@@ -404,15 +404,21 @@ int flow_accept(qosspec_t * qs,
pthread_rwlock_unlock(&ai.data_lock);
- recv_msg = send_recv_irm_msg_b(&msg);
+ recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -EIRMD;
- if (recv_msg->has_result) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -EIRMD;
}
+ if (recv_msg->result != 0) {
+ int res = recv_msg->result;
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return res;
+ }
+
if (!recv_msg->has_api || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
@@ -496,9 +502,9 @@ int flow_alloc(const char * dst_name,
if (timeo != NULL) {
msg.has_timeo_sec = true;
- msg.has_timeo_usec = true;
+ msg.has_timeo_nsec = true;
msg.timeo_sec = timeo->tv_sec;
- msg.timeo_usec = timeo->tv_nsec / 1000;
+ msg.timeo_nsec = timeo->tv_nsec;
}
pthread_rwlock_rdlock(&ai.data_lock);
@@ -511,6 +517,17 @@ int flow_alloc(const char * dst_name,
if (recv_msg == NULL)
return -EIRMD;
+ if (!recv_msg->has_result) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -EIRMD;
+ }
+
+ if (recv_msg->result != 0) {
+ int res = recv_msg->result;
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return res;
+ }
+
if (!recv_msg->has_api || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
diff --git a/src/lib/irm.c b/src/lib/irm.c
index 0e4bfc40..57e09369 100644
--- a/src/lib/irm.c
+++ b/src/lib/irm.c
@@ -177,10 +177,8 @@ ssize_t irm_list_ipcps(const char * name,
msg.dst_name = (char *) name;
recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL) {
- free(msg.dif_name);
+ if (recv_msg == NULL)
return -EIRMD;
- }
if (recv_msg->apis == NULL) {
irm_msg__free_unpacked(recv_msg, NULL);
diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto
index 4fbd676e..e218f6f6 100644
--- a/src/lib/irmd_messages.proto
+++ b/src/lib/irmd_messages.proto
@@ -62,6 +62,6 @@ message irm_msg {
optional uint32 opts = 12;
repeated sint32 apis = 13;
optional uint32 timeo_sec = 14;
- optional uint32 timeo_usec = 15;
+ optional uint32 timeo_nsec = 15;
optional sint32 result = 16;
};
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 3a26a2cf..63f928cf 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -95,23 +95,17 @@ static void close_ptr(void * o)
close(*(int *) o);
}
-static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed)
+irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
{
int sockfd;
buffer_t buf;
ssize_t count = 0;
irm_msg_t * recv_msg = NULL;
- struct timeval tv = {(SOCKET_TIMEOUT / 1000),
- (SOCKET_TIMEOUT % 1000) * 1000};
sockfd = client_socket_open(IRM_SOCK_PATH);
if (sockfd < 0)
return NULL;
- if (timed)
- setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
- (void *) &tv, sizeof(tv));
-
buf.len = irm_msg__get_packed_size(msg);
if (buf.len == 0) {
close(sockfd);
@@ -141,12 +135,6 @@ static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed)
return recv_msg;
}
-irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
-{ return send_recv_irm_msg_timed(msg, true); }
-
-irm_msg_t * send_recv_irm_msg_b(irm_msg_t * msg)
-{ return send_recv_irm_msg_timed(msg, false); }
-
char * ipcp_sock_path(pid_t api)
{
char * full_name = NULL;