summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-04-05 20:02:28 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-04-06 10:36:24 +0200
commite1c0714d5827cd927961f3a687d9720e6e9aa802 (patch)
treed8e793cffbe829d64855eaa5a429b90ebe3dc3a4 /src/irmd/main.c
parentc6ad4f96f8bb2f1ee749e92308e7173523ddd0b8 (diff)
downloadouroboros-e1c0714d5827cd927961f3a687d9720e6e9aa802.tar.gz
ouroboros-e1c0714d5827cd927961f3a687d9720e6e9aa802.zip
lib, irmd: Implement flow allocation timeout
Setting the timeouts on flow_alloc and flow_accept will now work. This makes some changes to the UNIX sockets used for management communication between the APs, IRMd and IPCPs.
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c182
1 files changed, 115 insertions, 67 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 673e39ea..41beb049 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -983,23 +983,33 @@ static int api_announce(pid_t api,
return 0;
}
-static struct irm_flow * flow_accept(pid_t api)
+static int flow_accept(pid_t api,
+ struct timespec * timeo,
+ struct irm_flow ** fl)
{
- struct irm_flow * f = NULL;
+ struct irm_flow * f = NULL;
struct api_entry * e = NULL;
struct reg_entry * re = NULL;
struct list_head * p = NULL;
+ struct timespec dl;
+ struct timespec now;
+
pid_t api_n1;
pid_t api_n;
int port_id;
int ret;
+ if (timeo != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, timeo, &dl);
+ }
+
pthread_rwlock_rdlock(&irmd.state_lock);
if (irmd.state != IRMD_RUNNING) {
pthread_rwlock_unlock(&irmd.state_lock);
- return NULL;
+ return -EIRMD;
}
pthread_rwlock_wrlock(&irmd.reg_lock);
@@ -1010,7 +1020,7 @@ static struct irm_flow * flow_accept(pid_t api)
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_err("Unknown instance %d calling accept.", api);
- return NULL;
+ return -EINVAL;
}
log_dbg("New instance (%d) of %s added.", api, e->apn);
@@ -1027,18 +1037,33 @@ static struct irm_flow * flow_accept(pid_t api)
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_unlock(&irmd.state_lock);
- while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) {
+ while (true) {
+ if (timeo != NULL && ts_diff_ns(&now, &dl) < 0) {
+ log_dbg("Accept timed out.");
+ return -ETIMEDOUT;
+ }
+
pthread_rwlock_rdlock(&irmd.state_lock);
+
if (irmd.state != IRMD_RUNNING) {
pthread_rwlock_unlock(&irmd.state_lock);
- return NULL;
+ return -EIRMD;
}
+
pthread_rwlock_unlock(&irmd.state_lock);
- }
- if (ret == -1) {
- /* The process died, we can exit here. */
- return NULL;
+ ret = api_entry_sleep(e);
+ if (ret == -ETIMEDOUT) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ api_entry_cancel(e);
+ continue;
+ }
+
+ if (ret == -1)
+ return -EPIPE;
+
+ if (ret == 0)
+ break;
}
pthread_rwlock_rdlock(&irmd.state_lock);
@@ -1046,7 +1071,7 @@ static struct irm_flow * flow_accept(pid_t api)
if (irmd.state != IRMD_RUNNING) {
reg_entry_set_state(re, REG_NAME_NULL);
pthread_rwlock_unlock(&irmd.state_lock);
- return NULL;
+ return -EIRMD;
}
pthread_rwlock_rdlock(&irmd.flows_lock);
@@ -1056,7 +1081,7 @@ static struct irm_flow * flow_accept(pid_t api)
pthread_rwlock_unlock(&irmd.flows_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_warn("Port_id was not created yet.");
- return NULL;
+ return -EPERM;
}
api_n = f->n_api;
@@ -1079,7 +1104,7 @@ static struct irm_flow * flow_accept(pid_t api)
irm_flow_set_state(f, FLOW_NULL);
irm_flow_destroy(f);
log_dbg("Process gone while accepting flow.");
- return NULL;
+ return -EPERM;
}
pthread_mutex_lock(&e->state_lock);
@@ -1100,7 +1125,7 @@ static struct irm_flow * flow_accept(pid_t api)
irm_flow_set_state(f, FLOW_NULL);
irm_flow_destroy(f);
log_err("Entry in wrong state.");
- return NULL;
+ return -EPERM;
}
registry_del_api(&irmd.registry, api);
@@ -1118,29 +1143,34 @@ static struct irm_flow * flow_accept(pid_t api)
clear_irm_flow(f);
irm_flow_set_state(f, FLOW_NULL);
irm_flow_destroy(f);
- return NULL;
+ return -EPERM;
}
irm_flow_set_state(f, FLOW_ALLOCATED);
log_info("Flow on port_id %d allocated.", f->port_id);
- return f;
+ *fl = f;
+
+ return 0;
}
-static struct irm_flow * flow_alloc(pid_t api,
- char * dst_name,
- qoscube_t cube)
+static int flow_alloc(pid_t api,
+ char * dst_name,
+ qoscube_t cube,
+ struct timespec * timeo,
+ struct irm_flow ** e)
{
struct irm_flow * f;
- pid_t ipcp;
- int port_id;
+ pid_t ipcp;
+ int port_id;
+ int state;
pthread_rwlock_rdlock(&irmd.state_lock);
if (irmd.state != IRMD_RUNNING) {
pthread_rwlock_unlock(&irmd.state_lock);
- return NULL;
+ return -1;
}
pthread_rwlock_rdlock(&irmd.reg_lock);
@@ -1150,7 +1180,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_info("Destination unreachable.");
- return NULL;
+ return -1;
}
pthread_rwlock_unlock(&irmd.reg_lock);
@@ -1160,7 +1190,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd.flows_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_err("Could not allocate port_id.");
- return NULL;
+ return -EBADF;
}
f = irm_flow_create(api, ipcp, port_id, cube);
@@ -1169,7 +1199,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd.flows_lock);
pthread_rwlock_unlock(&irmd.state_lock);
log_err("Could not allocate port_id.");
- return NULL;
+ return -ENOMEM;
}
list_add(&f->next, &irmd.irm_flows);
@@ -1179,22 +1209,30 @@ static struct irm_flow * flow_alloc(pid_t api,
assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING);
- if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) {
+ if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube)) {
/* sanitizer cleans this */
- log_info("Failed to respond to alloc.");
- return NULL;
+ log_info("Flow_allocation failed.");
+ return -EAGAIN;
}
- if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) {
- log_info("Pending flow on port_id %d torn down.", port_id);
- return NULL;
+ state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo);
+ if (state != FLOW_ALLOCATED) {
+ if (state == -ETIMEDOUT) {
+ log_dbg("Flow allocation timed out");
+ return -ETIMEDOUT;
+ }
+
+ log_info("Pending flow to %s torn down.", dst_name);
+ return -EPIPE;
}
assert(irm_flow_get_state(f) == FLOW_ALLOCATED);
+ *e = f;
+
log_info("Flow on port_id %d allocated.", port_id);
- return f;
+ return 0;
}
static int flow_dealloc(pid_t api,
@@ -1382,7 +1420,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
return NULL;
}
-
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_wrlock(&irmd.flows_lock);
port_id = bmp_allocate(irmd.port_ids);
@@ -1798,15 +1835,17 @@ void * mainloop(void * o)
struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
(IRMD_ACCEPT_TIMEOUT % 1000) * 1000};
#endif
- int cli_sockfd;
- irm_msg_t * msg;
- ssize_t count;
- buffer_t buffer;
- irm_msg_t ret_msg = IRM_MSG__INIT;
- struct irm_flow * e = NULL;
- pid_t * apis = NULL;
- struct timeval tv = {(SOCKET_TIMEOUT / 1000),
- (SOCKET_TIMEOUT % 1000) * 1000};
+ int cli_sockfd;
+ irm_msg_t * msg;
+ ssize_t count;
+ buffer_t buffer;
+ irm_msg_t ret_msg = IRM_MSG__INIT;
+ struct irm_flow * e = NULL;
+ pid_t * apis = NULL;
+ struct timespec * timeo = NULL;
+ struct timespec ts = {0, 0};
+ struct timeval tv = {(SOCKET_TIMEOUT / 1000),
+ (SOCKET_TIMEOUT % 1000) * 1000};
pthread_rwlock_rdlock(&irmd.state_lock);
@@ -1849,6 +1888,14 @@ void * mainloop(void * o)
thread_dec();
+ if (msg->has_timeo_sec) {
+ assert(msg->has_timeo_nsec);
+
+ ts.tv_sec = msg->timeo_sec;
+ ts.tv_nsec = msg->timeo_nsec;
+ timeo = &ts;
+ }
+
switch (msg->code) {
case IRM_MSG_CODE__IRM_CREATE_IPCP:
ret_msg.has_result = true;
@@ -1897,9 +1944,9 @@ void * mainloop(void * o)
ret_msg.result = unbind_api(msg->api, msg->dst_name);
break;
case IRM_MSG_CODE__IRM_LIST_IPCPS:
+ ret_msg.has_result = true;
ret_msg.n_apis = list_ipcps(msg->dst_name, &apis);
ret_msg.apis = apis;
- ret_msg.has_result = true;
break;
case IRM_MSG_CODE__IRM_REG:
ret_msg.has_result = true;
@@ -1914,32 +1961,27 @@ void * mainloop(void * o)
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
- e = flow_accept(msg->api);
- if (e == NULL) {
- ret_msg.has_result = true;
- ret_msg.result = -EIRMD;
- break;
+ ret_msg.has_result = true;
+ ret_msg.result = flow_accept(msg->api, timeo, &e);
+ if (ret_msg.result == 0) {
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_api = true;
+ ret_msg.api = e->n_1_api;
+ ret_msg.has_qoscube = true;
+ ret_msg.qoscube = e->qc;
}
- ret_msg.has_port_id = true;
- ret_msg.port_id = e->port_id;
- ret_msg.has_api = true;
- ret_msg.api = e->n_1_api;
- ret_msg.has_qoscube = true;
- ret_msg.qoscube = e->qc;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
- e = flow_alloc(msg->api,
- msg->dst_name,
- msg->qoscube);
- if (e == NULL) {
- ret_msg.has_result = true;
- ret_msg.result = -1;
- break;
+ ret_msg.has_result = true;
+ ret_msg.result = flow_alloc(msg->api, msg->dst_name,
+ msg->qoscube, timeo, &e);
+ if (ret_msg.result == 0) {
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_api = true;
+ ret_msg.api = e->n_1_api;
}
- ret_msg.has_port_id = true;
- ret_msg.port_id = e->port_id;
- ret_msg.has_api = true;
- ret_msg.api = e->n_1_api;
break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
@@ -1949,8 +1991,8 @@ void * mainloop(void * o)
e = flow_req_arr(msg->api,
msg->dst_name,
msg->qoscube);
+ ret_msg.has_result = true;
if (e == NULL) {
- ret_msg.has_result = true;
ret_msg.result = -1;
break;
}
@@ -1971,6 +2013,12 @@ void * mainloop(void * o)
irm_msg__free_unpacked(msg, NULL);
+ if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
+ close(cli_sockfd);
+ thread_inc();
+ continue;
+ }
+
buffer.len = irm_msg__get_packed_size(&ret_msg);
if (buffer.len == 0) {
log_err("Failed to calculate length of reply message.");
@@ -2065,7 +2113,7 @@ void * threadpoolmgr(void * o)
if (pthread_cond_timedwait(&irmd.threads_cond,
&irmd.threads_lock,
&dl) == ETIMEDOUT)
- if (irmd.threads > IRMD_MIN_AV_THREADS)
+ if (irmd.threads > IRMD_MIN_AV_THREADS )
--irmd.max_threads;
pthread_mutex_unlock(&irmd.threads_lock);