summaryrefslogtreecommitdiff
path: root/src/irmd
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-03-31 09:58:23 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-03-31 09:58:23 +0000
commitad01a7fd0b6cd798b2d5a2901ae8499b25360707 (patch)
tree16b6fd66c3fe93d178e10a137179923b513851f9 /src/irmd
parent5f79a21b80e68ba59616f0fa431287c3e94c43cf (diff)
parent7ba0fd0ce19244745c8d2512ce8a003783d914a7 (diff)
downloadouroboros-ad01a7fd0b6cd798b2d5a2901ae8499b25360707.tar.gz
ouroboros-ad01a7fd0b6cd798b2d5a2901ae8499b25360707.zip
Merged in dstaesse/ouroboros/be-new-api (pull request #439)
lib: Revise flow allocation API
Diffstat (limited to 'src/irmd')
-rw-r--r--src/irmd/ipcp.c7
-rw-r--r--src/irmd/irm_flow.c12
-rw-r--r--src/irmd/main.c203
3 files changed, 68 insertions, 154 deletions
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 06b66d3b..a8263580 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -176,18 +176,11 @@ pid_t ipcp_create(char * name,
int ipcp_destroy(pid_t api)
{
- int status;
-
if (kill(api, SIGTERM)) {
log_err("Failed to destroy IPCP");
return -1;
}
- if (waitpid(api, &status, 0) < 0) {
- log_err("Failed to destroy IPCP");
- return -1;
- }
-
return 0;
}
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index 99966561..4e7c22ef 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -45,6 +45,7 @@ struct irm_flow * irm_flow_create(pid_t n_api,
}
if (pthread_mutex_init(&f->state_lock, NULL)) {
+ pthread_cond_destroy(&f->state_cond);
free(f);
return NULL;
}
@@ -63,6 +64,9 @@ struct irm_flow * irm_flow_create(pid_t n_api,
f->n_1_rb = shm_rbuff_create(n_1_api, port_id);
if (f->n_1_rb == NULL) {
log_err("Could not create ringbuffer for AP-I %d.", n_1_api);
+ shm_rbuff_destroy(f->n_rb);
+ pthread_mutex_destroy(&f->state_lock);
+ pthread_cond_destroy(&f->state_cond);
free(f);
return NULL;
}
@@ -122,7 +126,8 @@ enum flow_state irm_flow_get_state(struct irm_flow * f)
return state;
}
-void irm_flow_set_state(struct irm_flow * f, enum flow_state state)
+void irm_flow_set_state(struct irm_flow * f,
+ enum flow_state state)
{
assert(f);
assert(state != FLOW_DESTROY);
@@ -135,7 +140,8 @@ void irm_flow_set_state(struct irm_flow * f, enum flow_state state)
pthread_mutex_unlock(&f->state_lock);
}
-enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state)
+enum flow_state irm_flow_wait_state(struct irm_flow * f,
+ enum flow_state state)
{
assert(f);
assert(state != FLOW_NULL);
@@ -143,6 +149,8 @@ enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state)
pthread_mutex_lock(&f->state_lock);
+ assert(f->state != FLOW_NULL);
+
while (!(f->state == state || f->state == FLOW_DESTROY))
pthread_cond_wait(&f->state_cond, &f->state_lock);
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 9901a608..c7adf386 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -133,7 +133,8 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api)
list_for_each(pos, &irmd->irm_flows) {
struct irm_flow * e = list_entry(pos, struct irm_flow, next);
- if (e->n_api == n_api)
+ if (e->n_api == n_api &&
+ irm_flow_get_state(e) == FLOW_ALLOC_PENDING)
return e;
}
@@ -982,7 +983,12 @@ static struct irm_flow * flow_accept(pid_t api,
struct irm_flow * f = NULL;
struct api_entry * e = NULL;
struct reg_entry * re = NULL;
- struct list_head * p;
+ struct list_head * p = NULL;
+
+ pid_t api_n1;
+ pid_t api_n;
+ int port_id;
+ int ret;
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -1016,7 +1022,7 @@ static struct irm_flow * flow_accept(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- while (api_entry_sleep(e) == -ETIMEDOUT) {
+ while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) {
pthread_rwlock_rdlock(&irmd->state_lock);
if (irmd->state != IRMD_RUNNING) {
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1025,126 +1031,76 @@ static struct irm_flow * flow_accept(pid_t api,
pthread_rwlock_unlock(&irmd->state_lock);
}
- pthread_rwlock_rdlock(&irmd->state_lock);
-
- if (irmd->state != IRMD_RUNNING) {
- reg_entry_set_state(re, REG_NAME_NULL);
- pthread_rwlock_unlock(&irmd->state_lock);
+ if (ret == -1) {
+ /* The process died, we can exit here. */
return NULL;
}
- pthread_rwlock_rdlock(&irmd->reg_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- e = api_table_get(&irmd->api_table, api);
- if (e == NULL) {
- pthread_rwlock_unlock(&irmd->reg_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ reg_entry_set_state(re, REG_NAME_NULL);
pthread_rwlock_unlock(&irmd->state_lock);
- log_dbg("Process gone while accepting flow.");
return NULL;
}
- pthread_mutex_lock(&e->state_lock);
-
- re = e->re;
-
- pthread_mutex_unlock(&e->state_lock);
-
- if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {
- pthread_rwlock_unlock(&irmd->reg_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- log_err("Entry in wrong state.");
- return NULL;
- }
- pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_rdlock(&irmd->flows_lock);
f = get_irm_flow_n(api);
if (f == NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- log_err("Port_id was not created yet.");
+ log_warn("Port_id was not created yet.");
return NULL;
}
*cube = re->qos;
+ api_n = f->n_api;
+ api_n1 = f->n_1_api;
+ port_id = f->port_id;
+
log_info("Flow on port_id %d allocated.", f->port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
-
- return f;
-}
-
-static int flow_alloc_resp(pid_t n_api,
- int port_id,
- int response)
-{
- struct irm_flow * f = NULL;
- struct reg_entry * re = NULL;
- struct api_entry * e = NULL;
- int ret = -1;
-
- pid_t api_n1;
- pid_t api_n;
-
- pthread_rwlock_rdlock(&irmd->state_lock);
-
- if (irmd->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&irmd->state_lock);
- return -1;
- }
-
- pthread_rwlock_wrlock(&irmd->reg_lock);
+ pthread_rwlock_rdlock(&irmd->reg_lock);
- e = api_table_get(&irmd->api_table, n_api);
+ e = api_table_get(&irmd->api_table, api);
if (e == NULL) {
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- log_err("Unknown AP-I %d responding for port_id %d.",
- n_api, port_id);
- return -1;
+ ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1);
+ log_dbg("Process gone while accepting flow.");
+ return NULL;
}
+ pthread_mutex_lock(&e->state_lock);
+
re = e->re;
- if (re == NULL) {
- pthread_rwlock_unlock(&irmd->reg_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- log_err("AP-I %d is not handling a flow request.", n_api);
- return -1;
- }
+
+ pthread_mutex_unlock(&e->state_lock);
if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- log_err("Name %s has no pending flow request.", re->name);
- return -1;
+ ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1);
+ log_err("Entry in wrong state.");
+ return NULL;
}
- registry_del_api(&irmd->registry, n_api);
+ registry_del_api(&irmd->registry, api);
pthread_rwlock_unlock(&irmd->reg_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
-
- f = get_irm_flow(port_id);
- if (f == NULL) {
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- return -1;
- }
-
- api_n = f->n_api;
- api_n1 = f->n_1_api;
-
- pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response);
+ if (ipcp_flow_alloc_resp(api_n1, port_id, api_n, 0)) {
+ log_dbg("Failed to respond to alloc.");
+ return NULL;
+ }
- if (!(response || ret))
- irm_flow_set_state(f, FLOW_ALLOCATED);
+ irm_flow_set_state(f, FLOW_ALLOCATED);
- return ret;
+ return f;
}
static struct irm_flow * flow_alloc(pid_t api,
@@ -1196,6 +1152,8 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
+ assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING);
+
if (ipcp_flow_alloc(ipcp, port_id, api,
dst_name, cube) < 0) {
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -1210,54 +1168,16 @@ static struct irm_flow * flow_alloc(pid_t api,
return NULL;
}
- return f;
-}
-
-static int flow_alloc_res(int port_id)
-{
- struct irm_flow * f;
-
- pthread_rwlock_rdlock(&irmd->state_lock);
-
- if (irmd->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&irmd->state_lock);
- return -1;
- }
- pthread_rwlock_rdlock(&irmd->flows_lock);
-
- f = get_irm_flow(port_id);
- if (f == NULL) {
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- log_err("Could not find port %d.", port_id);
- return -1;
- }
-
- if (irm_flow_get_state(f) == FLOW_NULL) {
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- log_info("Port %d is deprecated.", port_id);
- return -1;
- }
-
- if (irm_flow_get_state(f) == FLOW_ALLOCATED) {
- log_info("Flow on port_id %d allocated.", port_id);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- return 0;
+ if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) {
+ log_info("Pending flow on port_id %d torn down.", port_id);
+ return NULL;
}
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
-
- if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) {
- log_info("Flow on port_id %d allocated.", port_id);
- return 0;
- }
+ assert(irm_flow_get_state(f) == FLOW_ALLOCATED);
- log_info("Pending flow on port_id %d torn down.", port_id);
+ log_info("Flow on port_id %d allocated.", port_id);
- return -1;
+ return f;
}
static int flow_dealloc(pid_t api,
@@ -1293,6 +1213,9 @@ static int flow_dealloc(pid_t api,
if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) {
list_del(&f->next);
+ if ((kill(f->n_api, 0) < 0 && f->n_1_api == -1) ||
+ (kill (f->n_1_api, 0) < 0 && f->n_api == -1))
+ irm_flow_set_state(f, FLOW_NULL);
clear_irm_flow(f);
irm_flow_destroy(f);
bmp_release(irmd->port_ids, port_id);
@@ -1305,12 +1228,11 @@ static int flow_dealloc(pid_t api,
}
pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
if (n_1_api != -1)
ret = ipcp_flow_dealloc(n_1_api, port_id);
- pthread_rwlock_unlock(&irmd->state_lock);
-
return ret;
}
@@ -1501,7 +1423,7 @@ static int flow_alloc_reply(int port_id,
struct irm_flow * f;
pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
+ pthread_rwlock_rdlock(&irmd->flows_lock);
f = get_irm_flow(port_id);
if (f == NULL) {
@@ -1551,18 +1473,19 @@ static void irm_destroy(void)
list_for_each_safe(p, h, &irmd->ipcps) {
struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next);
list_del(&e->next);
- ipcp_destroy(e->api);
- clear_spawned_api(e->api);
- registry_del_api(&irmd->registry, e->api);
ipcp_entry_destroy(e);
}
- list_for_each_safe(p, h, &irmd->spawned_apis) {
+ list_for_each(p, &irmd->spawned_apis) {
struct pid_el * e = list_entry(p, struct pid_el, next);
- int status;
if (kill(e->pid, SIGTERM))
log_dbg("Could not send kill signal to %d.", e->pid);
- else if (waitpid(e->pid, &status, 0) < 0)
+ }
+
+ list_for_each_safe(p, h, &irmd->spawned_apis) {
+ struct pid_el * e = list_entry(p, struct pid_el, next);
+ int status;
+ if (waitpid(e->pid, &status, 0) < 0)
log_dbg("Error waiting for %d to exit.", e->pid);
list_del(&e->next);
registry_del_api(&irmd->registry, e->pid);
@@ -1940,12 +1863,6 @@ void * mainloop(void * o)
ret_msg.has_api = true;
ret_msg.api = e->n_1_api;
break;
- case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP:
- ret_msg.has_result = true;
- ret_msg.result = flow_alloc_resp(msg->api,
- msg->port_id,
- msg->response);
- break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
e = flow_alloc(msg->api,
msg->dst_name,
@@ -1960,10 +1877,6 @@ void * mainloop(void * o)
ret_msg.has_api = true;
ret_msg.api = e->n_1_api;
break;
- case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES:
- ret_msg.has_result = true;
- ret_msg.result = flow_alloc_res(msg->port_id);
- break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
ret_msg.result = flow_dealloc(msg->api, msg->port_id);