diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-03-31 09:58:23 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-03-31 09:58:23 +0000 |
commit | ad01a7fd0b6cd798b2d5a2901ae8499b25360707 (patch) | |
tree | 16b6fd66c3fe93d178e10a137179923b513851f9 /src/irmd | |
parent | 5f79a21b80e68ba59616f0fa431287c3e94c43cf (diff) | |
parent | 7ba0fd0ce19244745c8d2512ce8a003783d914a7 (diff) | |
download | ouroboros-ad01a7fd0b6cd798b2d5a2901ae8499b25360707.tar.gz ouroboros-ad01a7fd0b6cd798b2d5a2901ae8499b25360707.zip |
Merged in dstaesse/ouroboros/be-new-api (pull request #439)
lib: Revise flow allocation API
Diffstat (limited to 'src/irmd')
-rw-r--r-- | src/irmd/ipcp.c | 7 | ||||
-rw-r--r-- | src/irmd/irm_flow.c | 12 | ||||
-rw-r--r-- | src/irmd/main.c | 203 |
3 files changed, 68 insertions, 154 deletions
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 06b66d3b..a8263580 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -176,18 +176,11 @@ pid_t ipcp_create(char * name, int ipcp_destroy(pid_t api) { - int status; - if (kill(api, SIGTERM)) { log_err("Failed to destroy IPCP"); return -1; } - if (waitpid(api, &status, 0) < 0) { - log_err("Failed to destroy IPCP"); - return -1; - } - return 0; } diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index 99966561..4e7c22ef 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -45,6 +45,7 @@ struct irm_flow * irm_flow_create(pid_t n_api, } if (pthread_mutex_init(&f->state_lock, NULL)) { + pthread_cond_destroy(&f->state_cond); free(f); return NULL; } @@ -63,6 +64,9 @@ struct irm_flow * irm_flow_create(pid_t n_api, f->n_1_rb = shm_rbuff_create(n_1_api, port_id); if (f->n_1_rb == NULL) { log_err("Could not create ringbuffer for AP-I %d.", n_1_api); + shm_rbuff_destroy(f->n_rb); + pthread_mutex_destroy(&f->state_lock); + pthread_cond_destroy(&f->state_cond); free(f); return NULL; } @@ -122,7 +126,8 @@ enum flow_state irm_flow_get_state(struct irm_flow * f) return state; } -void irm_flow_set_state(struct irm_flow * f, enum flow_state state) +void irm_flow_set_state(struct irm_flow * f, + enum flow_state state) { assert(f); assert(state != FLOW_DESTROY); @@ -135,7 +140,8 @@ void irm_flow_set_state(struct irm_flow * f, enum flow_state state) pthread_mutex_unlock(&f->state_lock); } -enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state) +enum flow_state irm_flow_wait_state(struct irm_flow * f, + enum flow_state state) { assert(f); assert(state != FLOW_NULL); @@ -143,6 +149,8 @@ enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state) pthread_mutex_lock(&f->state_lock); + assert(f->state != FLOW_NULL); + while (!(f->state == state || f->state == FLOW_DESTROY)) pthread_cond_wait(&f->state_cond, &f->state_lock); diff --git a/src/irmd/main.c b/src/irmd/main.c index 9901a608..c7adf386 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -133,7 +133,8 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api) list_for_each(pos, &irmd->irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->n_api == n_api) + if (e->n_api == n_api && + irm_flow_get_state(e) == FLOW_ALLOC_PENDING) return e; } @@ -982,7 +983,12 @@ static struct irm_flow * flow_accept(pid_t api, struct irm_flow * f = NULL; struct api_entry * e = NULL; struct reg_entry * re = NULL; - struct list_head * p; + struct list_head * p = NULL; + + pid_t api_n1; + pid_t api_n; + int port_id; + int ret; pthread_rwlock_rdlock(&irmd->state_lock); @@ -1016,7 +1022,7 @@ static struct irm_flow * flow_accept(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - while (api_entry_sleep(e) == -ETIMEDOUT) { + while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) { pthread_rwlock_rdlock(&irmd->state_lock); if (irmd->state != IRMD_RUNNING) { pthread_rwlock_unlock(&irmd->state_lock); @@ -1025,126 +1031,76 @@ static struct irm_flow * flow_accept(pid_t api, pthread_rwlock_unlock(&irmd->state_lock); } - pthread_rwlock_rdlock(&irmd->state_lock); - - if (irmd->state != IRMD_RUNNING) { - reg_entry_set_state(re, REG_NAME_NULL); - pthread_rwlock_unlock(&irmd->state_lock); + if (ret == -1) { + /* The process died, we can exit here. */ return NULL; } - pthread_rwlock_rdlock(&irmd->reg_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - e = api_table_get(&irmd->api_table, api); - if (e == NULL) { - pthread_rwlock_unlock(&irmd->reg_lock); + if (irmd->state != IRMD_RUNNING) { + reg_entry_set_state(re, REG_NAME_NULL); pthread_rwlock_unlock(&irmd->state_lock); - log_dbg("Process gone while accepting flow."); return NULL; } - pthread_mutex_lock(&e->state_lock); - - re = e->re; - - pthread_mutex_unlock(&e->state_lock); - - if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) { - pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_unlock(&irmd->state_lock); - log_err("Entry in wrong state."); - return NULL; - } - pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_rdlock(&irmd->flows_lock); f = get_irm_flow_n(api); if (f == NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - log_err("Port_id was not created yet."); + log_warn("Port_id was not created yet."); return NULL; } *cube = re->qos; + api_n = f->n_api; + api_n1 = f->n_1_api; + port_id = f->port_id; + log_info("Flow on port_id %d allocated.", f->port_id); pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - - return f; -} - -static int flow_alloc_resp(pid_t n_api, - int port_id, - int response) -{ - struct irm_flow * f = NULL; - struct reg_entry * re = NULL; - struct api_entry * e = NULL; - int ret = -1; - - pid_t api_n1; - pid_t api_n; - - pthread_rwlock_rdlock(&irmd->state_lock); - - if (irmd->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&irmd->state_lock); - return -1; - } - - pthread_rwlock_wrlock(&irmd->reg_lock); + pthread_rwlock_rdlock(&irmd->reg_lock); - e = api_table_get(&irmd->api_table, n_api); + e = api_table_get(&irmd->api_table, api); if (e == NULL) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - log_err("Unknown AP-I %d responding for port_id %d.", - n_api, port_id); - return -1; + ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); + log_dbg("Process gone while accepting flow."); + return NULL; } + pthread_mutex_lock(&e->state_lock); + re = e->re; - if (re == NULL) { - pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_unlock(&irmd->state_lock); - log_err("AP-I %d is not handling a flow request.", n_api); - return -1; - } + + pthread_mutex_unlock(&e->state_lock); if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - log_err("Name %s has no pending flow request.", re->name); - return -1; + ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); + log_err("Entry in wrong state."); + return NULL; } - registry_del_api(&irmd->registry, n_api); + registry_del_api(&irmd->registry, api); pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - - f = get_irm_flow(port_id); - if (f == NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - return -1; - } - - api_n = f->n_api; - api_n1 = f->n_1_api; - - pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response); + if (ipcp_flow_alloc_resp(api_n1, port_id, api_n, 0)) { + log_dbg("Failed to respond to alloc."); + return NULL; + } - if (!(response || ret)) - irm_flow_set_state(f, FLOW_ALLOCATED); + irm_flow_set_state(f, FLOW_ALLOCATED); - return ret; + return f; } static struct irm_flow * flow_alloc(pid_t api, @@ -1196,6 +1152,8 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); + assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); + if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) { pthread_rwlock_rdlock(&irmd->state_lock); @@ -1210,54 +1168,16 @@ static struct irm_flow * flow_alloc(pid_t api, return NULL; } - return f; -} - -static int flow_alloc_res(int port_id) -{ - struct irm_flow * f; - - pthread_rwlock_rdlock(&irmd->state_lock); - - if (irmd->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&irmd->state_lock); - return -1; - } - pthread_rwlock_rdlock(&irmd->flows_lock); - - f = get_irm_flow(port_id); - if (f == NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - log_err("Could not find port %d.", port_id); - return -1; - } - - if (irm_flow_get_state(f) == FLOW_NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - log_info("Port %d is deprecated.", port_id); - return -1; - } - - if (irm_flow_get_state(f) == FLOW_ALLOCATED) { - log_info("Flow on port_id %d allocated.", port_id); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - return 0; + if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) { + log_info("Pending flow on port_id %d torn down.", port_id); + return NULL; } - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - - if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) { - log_info("Flow on port_id %d allocated.", port_id); - return 0; - } + assert(irm_flow_get_state(f) == FLOW_ALLOCATED); - log_info("Pending flow on port_id %d torn down.", port_id); + log_info("Flow on port_id %d allocated.", port_id); - return -1; + return f; } static int flow_dealloc(pid_t api, @@ -1293,6 +1213,9 @@ static int flow_dealloc(pid_t api, if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) { list_del(&f->next); + if ((kill(f->n_api, 0) < 0 && f->n_1_api == -1) || + (kill (f->n_1_api, 0) < 0 && f->n_api == -1)) + irm_flow_set_state(f, FLOW_NULL); clear_irm_flow(f); irm_flow_destroy(f); bmp_release(irmd->port_ids, port_id); @@ -1305,12 +1228,11 @@ static int flow_dealloc(pid_t api, } pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); if (n_1_api != -1) ret = ipcp_flow_dealloc(n_1_api, port_id); - pthread_rwlock_unlock(&irmd->state_lock); - return ret; } @@ -1501,7 +1423,7 @@ static int flow_alloc_reply(int port_id, struct irm_flow * f; pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); + pthread_rwlock_rdlock(&irmd->flows_lock); f = get_irm_flow(port_id); if (f == NULL) { @@ -1551,18 +1473,19 @@ static void irm_destroy(void) list_for_each_safe(p, h, &irmd->ipcps) { struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next); list_del(&e->next); - ipcp_destroy(e->api); - clear_spawned_api(e->api); - registry_del_api(&irmd->registry, e->api); ipcp_entry_destroy(e); } - list_for_each_safe(p, h, &irmd->spawned_apis) { + list_for_each(p, &irmd->spawned_apis) { struct pid_el * e = list_entry(p, struct pid_el, next); - int status; if (kill(e->pid, SIGTERM)) log_dbg("Could not send kill signal to %d.", e->pid); - else if (waitpid(e->pid, &status, 0) < 0) + } + + list_for_each_safe(p, h, &irmd->spawned_apis) { + struct pid_el * e = list_entry(p, struct pid_el, next); + int status; + if (waitpid(e->pid, &status, 0) < 0) log_dbg("Error waiting for %d to exit.", e->pid); list_del(&e->next); registry_del_api(&irmd->registry, e->pid); @@ -1940,12 +1863,6 @@ void * mainloop(void * o) ret_msg.has_api = true; ret_msg.api = e->n_1_api; break; - case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP: - ret_msg.has_result = true; - ret_msg.result = flow_alloc_resp(msg->api, - msg->port_id, - msg->response); - break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: e = flow_alloc(msg->api, msg->dst_name, @@ -1960,10 +1877,6 @@ void * mainloop(void * o) ret_msg.has_api = true; ret_msg.api = e->n_1_api; break; - case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: - ret_msg.has_result = true; - ret_msg.result = flow_alloc_res(msg->port_id); - break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; ret_msg.result = flow_dealloc(msg->api, msg->port_id); |