diff options
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r-- | src/irmd/main.c | 182 |
1 files changed, 115 insertions, 67 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 673e39ea..41beb049 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -983,23 +983,33 @@ static int api_announce(pid_t api, return 0; } -static struct irm_flow * flow_accept(pid_t api) +static int flow_accept(pid_t api, + struct timespec * timeo, + struct irm_flow ** fl) { - struct irm_flow * f = NULL; + struct irm_flow * f = NULL; struct api_entry * e = NULL; struct reg_entry * re = NULL; struct list_head * p = NULL; + struct timespec dl; + struct timespec now; + pid_t api_n1; pid_t api_n; int port_id; int ret; + if (timeo != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, timeo, &dl); + } + pthread_rwlock_rdlock(&irmd.state_lock); if (irmd.state != IRMD_RUNNING) { pthread_rwlock_unlock(&irmd.state_lock); - return NULL; + return -EIRMD; } pthread_rwlock_wrlock(&irmd.reg_lock); @@ -1010,7 +1020,7 @@ static struct irm_flow * flow_accept(pid_t api) pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_unlock(&irmd.state_lock); log_err("Unknown instance %d calling accept.", api); - return NULL; + return -EINVAL; } log_dbg("New instance (%d) of %s added.", api, e->apn); @@ -1027,18 +1037,33 @@ static struct irm_flow * flow_accept(pid_t api) pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_unlock(&irmd.state_lock); - while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) { + while (true) { + if (timeo != NULL && ts_diff_ns(&now, &dl) < 0) { + log_dbg("Accept timed out."); + return -ETIMEDOUT; + } + pthread_rwlock_rdlock(&irmd.state_lock); + if (irmd.state != IRMD_RUNNING) { pthread_rwlock_unlock(&irmd.state_lock); - return NULL; + return -EIRMD; } + pthread_rwlock_unlock(&irmd.state_lock); - } - if (ret == -1) { - /* The process died, we can exit here. */ - return NULL; + ret = api_entry_sleep(e); + if (ret == -ETIMEDOUT) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + api_entry_cancel(e); + continue; + } + + if (ret == -1) + return -EPIPE; + + if (ret == 0) + break; } pthread_rwlock_rdlock(&irmd.state_lock); @@ -1046,7 +1071,7 @@ static struct irm_flow * flow_accept(pid_t api) if (irmd.state != IRMD_RUNNING) { reg_entry_set_state(re, REG_NAME_NULL); pthread_rwlock_unlock(&irmd.state_lock); - return NULL; + return -EIRMD; } pthread_rwlock_rdlock(&irmd.flows_lock); @@ -1056,7 +1081,7 @@ static struct irm_flow * flow_accept(pid_t api) pthread_rwlock_unlock(&irmd.flows_lock); pthread_rwlock_unlock(&irmd.state_lock); log_warn("Port_id was not created yet."); - return NULL; + return -EPERM; } api_n = f->n_api; @@ -1079,7 +1104,7 @@ static struct irm_flow * flow_accept(pid_t api) irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); log_dbg("Process gone while accepting flow."); - return NULL; + return -EPERM; } pthread_mutex_lock(&e->state_lock); @@ -1100,7 +1125,7 @@ static struct irm_flow * flow_accept(pid_t api) irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); log_err("Entry in wrong state."); - return NULL; + return -EPERM; } registry_del_api(&irmd.registry, api); @@ -1118,29 +1143,34 @@ static struct irm_flow * flow_accept(pid_t api) clear_irm_flow(f); irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); - return NULL; + return -EPERM; } irm_flow_set_state(f, FLOW_ALLOCATED); log_info("Flow on port_id %d allocated.", f->port_id); - return f; + *fl = f; + + return 0; } -static struct irm_flow * flow_alloc(pid_t api, - char * dst_name, - qoscube_t cube) +static int flow_alloc(pid_t api, + char * dst_name, + qoscube_t cube, + struct timespec * timeo, + struct irm_flow ** e) { struct irm_flow * f; - pid_t ipcp; - int port_id; + pid_t ipcp; + int port_id; + int state; pthread_rwlock_rdlock(&irmd.state_lock); if (irmd.state != IRMD_RUNNING) { pthread_rwlock_unlock(&irmd.state_lock); - return NULL; + return -1; } pthread_rwlock_rdlock(&irmd.reg_lock); @@ -1150,7 +1180,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_unlock(&irmd.state_lock); log_info("Destination unreachable."); - return NULL; + return -1; } pthread_rwlock_unlock(&irmd.reg_lock); @@ -1160,7 +1190,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd.flows_lock); pthread_rwlock_unlock(&irmd.state_lock); log_err("Could not allocate port_id."); - return NULL; + return -EBADF; } f = irm_flow_create(api, ipcp, port_id, cube); @@ -1169,7 +1199,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd.flows_lock); pthread_rwlock_unlock(&irmd.state_lock); log_err("Could not allocate port_id."); - return NULL; + return -ENOMEM; } list_add(&f->next, &irmd.irm_flows); @@ -1179,22 +1209,30 @@ static struct irm_flow * flow_alloc(pid_t api, assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); - if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) { + if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube)) { /* sanitizer cleans this */ - log_info("Failed to respond to alloc."); - return NULL; + log_info("Flow_allocation failed."); + return -EAGAIN; } - if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) { - log_info("Pending flow on port_id %d torn down.", port_id); - return NULL; + state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo); + if (state != FLOW_ALLOCATED) { + if (state == -ETIMEDOUT) { + log_dbg("Flow allocation timed out"); + return -ETIMEDOUT; + } + + log_info("Pending flow to %s torn down.", dst_name); + return -EPIPE; } assert(irm_flow_get_state(f) == FLOW_ALLOCATED); + *e = f; + log_info("Flow on port_id %d allocated.", port_id); - return f; + return 0; } static int flow_dealloc(pid_t api, @@ -1382,7 +1420,6 @@ static struct irm_flow * flow_req_arr(pid_t api, return NULL; } - pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); port_id = bmp_allocate(irmd.port_ids); @@ -1798,15 +1835,17 @@ void * mainloop(void * o) struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; #endif - int cli_sockfd; - irm_msg_t * msg; - ssize_t count; - buffer_t buffer; - irm_msg_t ret_msg = IRM_MSG__INIT; - struct irm_flow * e = NULL; - pid_t * apis = NULL; - struct timeval tv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; + int cli_sockfd; + irm_msg_t * msg; + ssize_t count; + buffer_t buffer; + irm_msg_t ret_msg = IRM_MSG__INIT; + struct irm_flow * e = NULL; + pid_t * apis = NULL; + struct timespec * timeo = NULL; + struct timespec ts = {0, 0}; + struct timeval tv = {(SOCKET_TIMEOUT / 1000), + (SOCKET_TIMEOUT % 1000) * 1000}; pthread_rwlock_rdlock(&irmd.state_lock); @@ -1849,6 +1888,14 @@ void * mainloop(void * o) thread_dec(); + if (msg->has_timeo_sec) { + assert(msg->has_timeo_nsec); + + ts.tv_sec = msg->timeo_sec; + ts.tv_nsec = msg->timeo_nsec; + timeo = &ts; + } + switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; @@ -1897,9 +1944,9 @@ void * mainloop(void * o) ret_msg.result = unbind_api(msg->api, msg->dst_name); break; case IRM_MSG_CODE__IRM_LIST_IPCPS: + ret_msg.has_result = true; ret_msg.n_apis = list_ipcps(msg->dst_name, &apis); ret_msg.apis = apis; - ret_msg.has_result = true; break; case IRM_MSG_CODE__IRM_REG: ret_msg.has_result = true; @@ -1914,32 +1961,27 @@ void * mainloop(void * o) msg->n_dif_name); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: - e = flow_accept(msg->api); - if (e == NULL) { - ret_msg.has_result = true; - ret_msg.result = -EIRMD; - break; + ret_msg.has_result = true; + ret_msg.result = flow_accept(msg->api, timeo, &e); + if (ret_msg.result == 0) { + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_api = true; + ret_msg.api = e->n_1_api; + ret_msg.has_qoscube = true; + ret_msg.qoscube = e->qc; } - ret_msg.has_port_id = true; - ret_msg.port_id = e->port_id; - ret_msg.has_api = true; - ret_msg.api = e->n_1_api; - ret_msg.has_qoscube = true; - ret_msg.qoscube = e->qc; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: - e = flow_alloc(msg->api, - msg->dst_name, - msg->qoscube); - if (e == NULL) { - ret_msg.has_result = true; - ret_msg.result = -1; - break; + ret_msg.has_result = true; + ret_msg.result = flow_alloc(msg->api, msg->dst_name, + msg->qoscube, timeo, &e); + if (ret_msg.result == 0) { + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_api = true; + ret_msg.api = e->n_1_api; } - ret_msg.has_port_id = true; - ret_msg.port_id = e->port_id; - ret_msg.has_api = true; - ret_msg.api = e->n_1_api; break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; @@ -1949,8 +1991,8 @@ void * mainloop(void * o) e = flow_req_arr(msg->api, msg->dst_name, msg->qoscube); + ret_msg.has_result = true; if (e == NULL) { - ret_msg.has_result = true; ret_msg.result = -1; break; } @@ -1971,6 +2013,12 @@ void * mainloop(void * o) irm_msg__free_unpacked(msg, NULL); + if (ret_msg.result == -EPIPE || !ret_msg.has_result) { + close(cli_sockfd); + thread_inc(); + continue; + } + buffer.len = irm_msg__get_packed_size(&ret_msg); if (buffer.len == 0) { log_err("Failed to calculate length of reply message."); @@ -2065,7 +2113,7 @@ void * threadpoolmgr(void * o) if (pthread_cond_timedwait(&irmd.threads_cond, &irmd.threads_lock, &dl) == ETIMEDOUT) - if (irmd.threads > IRMD_MIN_AV_THREADS) + if (irmd.threads > IRMD_MIN_AV_THREADS ) --irmd.max_threads; pthread_mutex_unlock(&irmd.threads_lock); |