summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-07-27 15:29:28 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-07-27 15:55:54 +0200
commit0dda754f6eb91af15f7c69523e2ebb627086b457 (patch)
tree62905dc63ab974aee7d8cddf49ca5234fac7338f /src/irmd/main.c
parentbee74baa8fa8ffa71dbb659496bc88df3e8ce6a5 (diff)
downloadouroboros-0dda754f6eb91af15f7c69523e2ebb627086b457.tar.gz
ouroboros-0dda754f6eb91af15f7c69523e2ebb627086b457.zip
irmd: Revised flow allocation
Flow allocation requests and registered api states revised so all states are tracked with a condition variable. This is a more reliable approach and improves stability of flow allocation. Some other refactoring was also done, such as renaming port_map_entry to irm_flow and hiding some internal structures of the registry.
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c812
1 files changed, 393 insertions, 419 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 625c28c8..6cf16505 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -76,7 +76,7 @@ struct spawned_api {
};
/* keeps track of port_id's between N and N - 1 */
-struct port_map_entry {
+struct irm_flow {
struct list_head next;
int port_id;
@@ -84,12 +84,11 @@ struct port_map_entry {
pid_t n_api;
pid_t n_1_api;
- pthread_cond_t res_signal;
- pthread_mutex_t res_lock;
+ struct timespec t0;
enum flow_state state;
-
- struct timespec t0;
+ pthread_cond_t state_cond;
+ pthread_mutex_t state_lock;
};
struct irm {
@@ -104,23 +103,24 @@ struct irm {
/* keep track of all flows in this processing system */
struct bmp * port_ids;
/* maps port_ids to api pair */
- struct list_head port_map;
+ struct list_head irm_flows;
pthread_rwlock_t flows_lock;
- enum irm_state state;
struct lockfile * lf;
struct shm_du_map * dum;
pthread_t * threadpool;
int sockfd;
+
+ enum irm_state state;
pthread_rwlock_t state_lock;
pthread_t cleanup_flows;
pthread_t shm_sanitize;
-} * instance = NULL;
+} * irmd = NULL;
-static struct port_map_entry * port_map_entry_create()
+static struct irm_flow * irm_flow_create()
{
- struct port_map_entry * e = malloc(sizeof(*e));
+ struct irm_flow * e = malloc(sizeof(*e));
if (e == NULL)
return NULL;
@@ -129,12 +129,12 @@ static struct port_map_entry * port_map_entry_create()
e->port_id = -1;
e->state = FLOW_NULL;
- if (pthread_cond_init(&e->res_signal, NULL)) {
+ if (pthread_cond_init(&e->state_cond, NULL)) {
free(e);
return NULL;
}
- if (pthread_mutex_init(&e->res_lock, NULL)) {
+ if (pthread_mutex_init(&e->state_lock, NULL)) {
free(e);
return NULL;
}
@@ -145,36 +145,36 @@ static struct port_map_entry * port_map_entry_create()
return e;
}
-static void port_map_entry_destroy(struct port_map_entry * e)
+static void irm_flow_destroy(struct irm_flow * e)
{
- bool wait = true;
- pthread_mutex_lock(&e->res_lock);
- e->state = FLOW_NULL;
+ pthread_mutex_lock(&e->state_lock);
- pthread_cond_broadcast(&e->res_signal);
- pthread_mutex_unlock(&e->res_lock);
+ if (e->state == FLOW_PENDING)
+ e->state = FLOW_DESTROY;
- while (wait) {
- pthread_mutex_lock(&e->res_lock);
- if (pthread_cond_destroy(&e->res_signal))
- pthread_cond_broadcast(&e->res_signal);
- else
- wait = false;
- pthread_mutex_unlock(&e->res_lock);
- }
+ pthread_cond_signal(&e->state_cond);
+ pthread_mutex_unlock(&e->state_lock);
+
+ pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
+ (void *) &e->state_lock);
+
+ while (e->state != FLOW_NULL)
+ pthread_cond_wait(&e->state_cond, &e->state_lock);
- pthread_mutex_destroy(&e->res_lock);
+ pthread_cleanup_pop(true);
+
+ pthread_cond_destroy(&e->state_cond);
+ pthread_mutex_destroy(&e->state_lock);
free(e);
}
-static struct port_map_entry * get_port_map_entry(int port_id)
+static struct irm_flow * get_irm_flow(int port_id)
{
struct list_head * pos = NULL;
- list_for_each(pos, &instance->port_map) {
- struct port_map_entry * e =
- list_entry(pos, struct port_map_entry, next);
+ list_for_each(pos, &irmd->irm_flows) {
+ struct irm_flow * e = list_entry(pos, struct irm_flow, next);
if (e->port_id == port_id)
return e;
@@ -183,13 +183,12 @@ static struct port_map_entry * get_port_map_entry(int port_id)
return NULL;
}
-static struct port_map_entry * get_port_map_entry_n(pid_t n_api)
+static struct irm_flow * get_irm_flow_n(pid_t n_api)
{
struct list_head * pos = NULL;
- list_for_each(pos, &instance->port_map) {
- struct port_map_entry * e =
- list_entry(pos, struct port_map_entry, next);
+ list_for_each(pos, &irmd->irm_flows) {
+ struct irm_flow * e = list_entry(pos, struct irm_flow, next);
if (e->n_api == n_api)
return e;
@@ -230,7 +229,7 @@ static struct ipcp_entry * get_ipcp_entry_by_api(pid_t api)
{
struct list_head * pos = NULL;
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * tmp =
list_entry(pos, struct ipcp_entry, next);
if (api == tmp->api)
@@ -246,9 +245,9 @@ static pid_t get_ipcp_by_dst_name(char * dst_name)
{
struct list_head * pos = NULL;
char * dif_name =
- registry_get_dif_for_dst(&instance->registry, dst_name);
+ registry_get_dif_for_dst(&irmd->registry, dst_name);
if (dif_name == NULL) {
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * e =
list_entry(pos, struct ipcp_entry, next);
if (e->type == IPCP_NORMAL) {
@@ -257,7 +256,7 @@ static pid_t get_ipcp_by_dst_name(char * dst_name)
}
}
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * e =
list_entry(pos, struct ipcp_entry, next);
if (e->type == IPCP_SHIM_ETH_LLC) {
@@ -267,7 +266,7 @@ static pid_t get_ipcp_by_dst_name(char * dst_name)
}
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * e =
list_entry(pos, struct ipcp_entry, next);
if (e->type == IPCP_SHIM_UDP) {
@@ -280,7 +279,7 @@ static pid_t get_ipcp_by_dst_name(char * dst_name)
if (dif_name == NULL)
return -1;
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * e =
list_entry(pos, struct ipcp_entry, next);
if (strcmp(e->dif_name, dif_name) == 0)
@@ -302,23 +301,23 @@ static pid_t create_ipcp(char * name,
if (api == NULL)
return -ENOMEM;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
api->api = ipcp_create(ipcp_type);
if (api->api == -1) {
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Failed to create IPCP.");
return -1;
}
tmp = ipcp_entry_create();
if (tmp == NULL) {
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
@@ -328,28 +327,28 @@ static pid_t create_ipcp(char * name,
tmp->name = strdup(name);
if (tmp->name == NULL) {
ipcp_entry_destroy(tmp);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
tmp->dif_name = NULL;
tmp->type = ipcp_type;
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * e =
list_entry(pos, struct ipcp_entry, next);
if (e->type < ipcp_type)
break;
}
- list_add(&tmp->next, &instance->ipcps);
+ list_add(&tmp->next, &irmd->ipcps);
- list_add(&api->next, &instance->spawned_apis);
+ list_add(&api->next, &irmd->spawned_apis);
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_INFO("Created IPCP %d.", api->api);
@@ -361,7 +360,7 @@ static void clear_spawned_api(pid_t api)
struct list_head * pos = NULL;
struct list_head * n = NULL;
- list_for_each_safe(pos, n, &(instance->spawned_apis)) {
+ list_for_each_safe(pos, n, &(irmd->spawned_apis)) {
struct spawned_api * a =
list_entry(pos, struct spawned_api, next);
@@ -377,10 +376,10 @@ static int destroy_ipcp(pid_t api)
struct list_head * pos = NULL;
struct list_head * n = NULL;
- pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
- list_for_each_safe(pos, n, &(instance->ipcps)) {
+ list_for_each_safe(pos, n, &(irmd->ipcps)) {
struct ipcp_entry * tmp =
list_entry(pos, struct ipcp_entry, next);
@@ -395,8 +394,8 @@ static int destroy_ipcp(pid_t api)
}
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
@@ -406,42 +405,42 @@ static int bootstrap_ipcp(pid_t api,
{
struct ipcp_entry * entry = NULL;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
entry = get_ipcp_entry_by_api(api);
if (entry == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(conf->dif_name);
if (entry->dif_name == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
if (ipcp_bootstrap(entry->api, conf)) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Could not bootstrap IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_INFO("Bootstrapped IPCP %d in DIF %s.",
entry->api, conf->dif_name);
@@ -454,27 +453,27 @@ static int enroll_ipcp(pid_t api,
{
struct ipcp_entry * entry = NULL;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_rdlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&irmd->reg_lock);
entry = get_ipcp_entry_by_api(api);
if (entry == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(dif_name);
if (entry->dif_name == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
@@ -482,14 +481,14 @@ static int enroll_ipcp(pid_t api,
if (ipcp_enroll(api, dif_name)) {
free(entry->dif_name);
entry->dif_name = NULL;
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Could not enroll IPCP.");
return -1;
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_INFO("Enrolled IPCP %d in DIF %s.",
entry->api, dif_name);
@@ -510,14 +509,14 @@ static int bind_name(char * name,
if (name == NULL || ap_name == NULL)
return -EINVAL;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
if (opts & BIND_AP_AUTO) {
/* we need to duplicate argv */
@@ -530,17 +529,17 @@ static int bind_name(char * name,
}
}
- if (registry_add_binding(&instance->registry,
+ if (registry_add_binding(&irmd->registry,
strdup(name), strdup(apn),
opts, argv_dup) < 0) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Failed to register %s.", name);
return -1;
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_INFO("Bound %s to registered name %s.", ap_name, name);
@@ -558,24 +557,24 @@ static int unbind_name(char * name,
if (!(opts & UNBIND_AP_HARD) && apn == NULL)
return -EINVAL;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
if ((opts & UNBIND_AP_HARD) && apn == NULL) {
- registry_deassign(&instance->registry, name);
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ registry_deassign(&irmd->registry, name);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_INFO("Removed all bindings of %s.", name);
} else {
- registry_del_binding(&instance->registry, name, apn);
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ registry_del_binding(&irmd->registry, name, apn);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_INFO("Removed binding from %s to %s.", apn, name);
}
@@ -589,7 +588,7 @@ static ssize_t list_ipcps(char * name,
ssize_t count = 0;
int i = 0;
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * tmp =
list_entry(pos, struct ipcp_entry, next);
@@ -603,7 +602,7 @@ static ssize_t list_ipcps(char * name,
return -1;
}
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * tmp =
list_entry(pos, struct ipcp_entry, next);
@@ -626,22 +625,22 @@ static int ap_reg(char * name,
if (name == NULL || difs == NULL || len == 0 || difs[0] == NULL)
return -EINVAL;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
- if (list_empty(&instance->ipcps)) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ if (list_empty(&irmd->ipcps)) {
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * e =
list_entry(pos, struct ipcp_entry, next);
@@ -656,7 +655,7 @@ static int ap_reg(char * name,
LOG_ERR("Could not register %s in DIF %s.",
name, e->dif_name);
} else {
- if(registry_add_name_to_dif(&instance->registry,
+ if(registry_add_name_to_dif(&irmd->registry,
name,
e->dif_name,
e->type) < 0)
@@ -671,13 +670,13 @@ static int ap_reg(char * name,
}
if (ret == 0) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return ret;
}
@@ -693,16 +692,16 @@ static int ap_unreg(char * name,
if (name == NULL || len == 0 || difs == NULL || difs[0] == NULL)
return -1;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
- list_for_each(pos, &instance->ipcps) {
+ list_for_each(pos, &irmd->ipcps) {
struct ipcp_entry * e =
list_entry(pos, struct ipcp_entry, next);
@@ -718,7 +717,7 @@ static int ap_unreg(char * name,
name, e->dif_name);
--ret;
} else {
- registry_del_name_from_dif(&instance->registry,
+ registry_del_name_from_dif(&irmd->registry,
name,
e->dif_name);
LOG_INFO("Unregistered %s from %s.",
@@ -727,44 +726,44 @@ static int ap_unreg(char * name,
}
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return ret;
}
-static struct port_map_entry * flow_accept(pid_t api,
- char * srv_ap_name,
- char ** dst_ae_name)
+static struct irm_flow * flow_accept(pid_t api,
+ char * srv_ap_name,
+ char ** dst_ae_name)
{
- struct port_map_entry * pme = NULL;
+ struct irm_flow * pme = NULL;
struct reg_entry * rne = NULL;
struct reg_api * rgi = NULL;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return NULL;
}
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
- rne = registry_get_entry_by_apn(&instance->registry, srv_ap_name);
+ rne = registry_get_entry_by_apn(&irmd->registry, srv_ap_name);
if (rne == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("AP %s is unknown.", srv_ap_name);
return NULL;
}
if (!reg_entry_get_reg_api(rne, api)) {
- rgi = registry_add_api_name(&instance->registry,
+ rgi = registry_add_api_name(&irmd->registry,
api,
rne->name);
if (rgi == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Failed to register instance %d with %s.",
api,srv_ap_name);
return NULL;
@@ -772,31 +771,31 @@ static struct port_map_entry * flow_accept(pid_t api,
LOG_INFO("New instance (%d) of %s added.", api, srv_ap_name);
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
reg_api_sleep(rgi);
- pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_rdlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_rdlock(&irmd->reg_lock);
pthread_mutex_lock(&rne->state_lock);
if (rne->state != REG_NAME_FLOW_ARRIVED) {
pthread_mutex_unlock(&rne->state_lock);
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return NULL;
}
pthread_mutex_unlock(&rne->state_lock);
- pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
- pthread_rwlock_rdlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&irmd->flows_lock);
- pme = get_port_map_entry_n(api);
+ pme = get_irm_flow_n(api);
if (pme == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Port_id was not created yet.");
return NULL;
}
@@ -804,8 +803,8 @@ static struct port_map_entry * flow_accept(pid_t api,
if (dst_ae_name != NULL)
*dst_ae_name = rne->req_ae_name;
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return pme;
}
@@ -814,53 +813,54 @@ static int flow_alloc_resp(pid_t n_api,
int port_id,
int response)
{
- struct port_map_entry * pme = NULL;
+ struct irm_flow * pme = NULL;
struct reg_entry * rne = NULL;
int ret = -1;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
- rne = registry_get_entry_by_api(&instance->registry, n_api);
+ rne = registry_get_entry_by_api(&irmd->registry, n_api);
if (rne == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
if (rne->state != REG_NAME_FLOW_ARRIVED) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Process not listening for this name.");
return -1;
}
pthread_mutex_lock(&rne->state_lock);
- registry_del_api(&instance->registry, n_api);
+ registry_del_api(&irmd->registry, n_api);
pthread_mutex_unlock(&rne->state_lock);
- pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
if (!response) {
- pthread_rwlock_wrlock(&instance->flows_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
- pme = get_port_map_entry(port_id);
+ pme = get_irm_flow(port_id);
if (pme == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
pme->state = FLOW_ALLOCATED;
- pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_cond_signal(&pme->state_cond);
+ pthread_rwlock_unlock(&irmd->flows_lock);
ret = ipcp_flow_alloc_resp(pme->n_1_api,
port_id,
@@ -868,32 +868,32 @@ static int flow_alloc_resp(pid_t n_api,
response);
}
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return ret;
}
-static struct port_map_entry * flow_alloc(pid_t api,
- char * dst_name,
- char * src_ae_name,
- struct qos_spec * qos)
+static struct irm_flow * flow_alloc(pid_t api,
+ char * dst_name,
+ char * src_ae_name,
+ struct qos_spec * qos)
{
- struct port_map_entry * pme;
+ struct irm_flow * pme;
pid_t ipcp;
/* FIXME: Map qos_spec to qos_cube */
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return NULL;
}
- pme = port_map_entry_create();
+ pme = irm_flow_create();
if (pme == NULL) {
- pthread_rwlock_unlock(&instance->state_lock);
- LOG_ERR("Failed to create port_map_entry.");
+ pthread_rwlock_unlock(&irmd->state_lock);
+ LOG_ERR("Failed to create irm_flow.");
return NULL;
}
@@ -902,26 +902,26 @@ static struct port_map_entry * flow_alloc(pid_t api,
if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)
LOG_WARN("Failed to set timestamp.");
- pthread_rwlock_rdlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&irmd->reg_lock);
ipcp = get_ipcp_by_dst_name(dst_name);
if (ipcp == -1) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_INFO("Destination unreachable.");
return NULL;
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_wrlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
- pme->port_id = bmp_allocate(instance->port_ids);
+ pme->port_id = bmp_allocate(irmd->port_ids);
pme->n_1_api = ipcp;
- list_add(&pme->next, &instance->port_map);
+ list_add(&pme->next, &irmd->irm_flows);
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
if (ipcp_flow_alloc(ipcp,
pme->port_id,
@@ -929,12 +929,12 @@ static struct port_map_entry * flow_alloc(pid_t api,
dst_name,
src_ae_name,
QOS_CUBE_BE) < 0) {
- pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_wrlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
list_del(&pme->next);
- bmp_release(instance->port_ids, pme->port_id);
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ bmp_release(irmd->port_ids, pme->port_id);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
free(pme);
return NULL;
}
@@ -944,75 +944,68 @@ static struct port_map_entry * flow_alloc(pid_t api,
static int flow_alloc_res(int port_id)
{
- struct port_map_entry * e;
+ struct irm_flow * e;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_RUNNING) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_rwlock_rdlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&irmd->flows_lock);
- e = get_port_map_entry(port_id);
+ e = get_irm_flow(port_id);
if (e == NULL) {
LOG_ERR("Could not find port %d.", port_id);
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
if (e->state == FLOW_NULL) {
LOG_INFO("Port %d is deprecated.", port_id);
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
if (e->state == FLOW_ALLOCATED) {
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
- while (true) {
- pthread_mutex_lock(&e->res_lock);
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void*) &e->res_lock);
+ pthread_mutex_lock(&e->state_lock);
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void*) &e->state_lock);
- pthread_cond_wait(&e->res_signal, &e->res_lock);
+ while (e->state == FLOW_PENDING)
+ pthread_cond_wait(&e->state_cond, &e->state_lock);
- pthread_cleanup_pop(true);
+ pthread_cleanup_pop(true);
- pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_wrlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
+ pthread_mutex_lock(&e->state_lock);
- e = get_port_map_entry(port_id);
- if (e == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
- return -1;
- }
- if (e->state == FLOW_ALLOCATED) {
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
- return 0;
- }
- if (e->state == FLOW_NULL) {
- /* don't release the port_id, AP has to call dealloc */
- list_del(&e->next);
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
- free(e);
- return -1;
+ if (e->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&e->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
+ return 0;
+ }
- }
- /* still pending, spurious wake */
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ if (e->state == FLOW_DESTROY) {
+ /* don't release the port_id, AP has to call dealloc */
+ e->state = FLOW_NULL;
+ pthread_cond_signal(&e->state_cond);
+ pthread_mutex_unlock(&e->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
+ return -1;
}
return 0;
@@ -1023,16 +1016,16 @@ static int flow_dealloc(int port_id)
pid_t n_1_api;
int ret = 0;
- struct port_map_entry * e = NULL;
+ struct irm_flow * e = NULL;
- pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_wrlock(&instance->flows_lock);
- bmp_release(instance->port_ids, port_id);
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
+ bmp_release(irmd->port_ids, port_id);
- e = get_port_map_entry(port_id);
+ e = get_irm_flow(port_id);
if (e == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
@@ -1040,11 +1033,11 @@ static int flow_dealloc(int port_id)
list_del(&e->next);
- pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
ret = ipcp_flow_dealloc(n_1_api, port_id);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
free(e);
@@ -1083,21 +1076,21 @@ static pid_t auto_execute(char ** argv)
exit(EXIT_FAILURE);
}
-static struct port_map_entry * flow_req_arr(pid_t api,
- char * dst_name,
- char * ae_name)
+static struct irm_flow * flow_req_arr(pid_t api,
+ char * dst_name,
+ char * ae_name)
{
struct reg_entry * rne = NULL;
- struct port_map_entry * pme = NULL;
+ struct irm_flow * pme = NULL;
bool acc_wait = true;
enum reg_name_state state;
struct spawned_api * c_api;
- pme = port_map_entry_create();
+ pme = irm_flow_create();
if (pme == NULL) {
- LOG_ERR("Failed to create port_map_entry.");
+ LOG_ERR("Failed to create irm_flow.");
return NULL;
}
@@ -1106,13 +1099,13 @@ static struct port_map_entry * flow_req_arr(pid_t api,
if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)
LOG_WARN("Failed to set timestamp.");
- pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_rdlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_rdlock(&irmd->reg_lock);
- rne = registry_get_entry_by_name(&instance->registry, dst_name);
+ rne = registry_get_entry_by_name(&irmd->registry, dst_name);
if (rne == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Unknown name: %s.", dst_name);
free(pme);
return NULL;
@@ -1124,16 +1117,16 @@ static struct port_map_entry * flow_req_arr(pid_t api,
switch (state) {
case REG_NAME_IDLE:
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("No AP's for %s.", dst_name);
free(pme);
return NULL;
case REG_NAME_AUTO_ACCEPT:
c_api = malloc(sizeof(*c_api));
if (c_api == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
free(pme);
return NULL;
}
@@ -1147,55 +1140,60 @@ static struct port_map_entry * flow_req_arr(pid_t api,
pthread_mutex_lock(&rne->state_lock);
rne->state = REG_NAME_AUTO_ACCEPT;
pthread_mutex_unlock(&rne->state_lock);
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
free(pme);
free(c_api);
return NULL;
}
- list_add(&c_api->next, &instance->spawned_apis);
+ list_add(&c_api->next, &irmd->spawned_apis);
- pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
pthread_mutex_lock(&rne->state_lock);
pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
(void *) &rne->state_lock);
while (rne->state == REG_NAME_AUTO_EXEC)
- pthread_cond_wait(&rne->acc_signal,
- &rne->state_lock);
+ pthread_cond_wait(&rne->state_cond, &rne->state_lock);
pthread_cleanup_pop(true);
- pthread_rwlock_rdlock(&instance->reg_lock);
-
+ pthread_rwlock_rdlock(&irmd->reg_lock);
+ pthread_mutex_lock(&rne->state_lock);
+ if (rne->state == REG_NAME_DESTROY) {
+ pthread_mutex_unlock(&rne->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ return NULL;
+ }
+ pthread_mutex_unlock(&rne->state_lock);
case REG_NAME_FLOW_ACCEPT:
pme->n_api = reg_entry_resolve_api(rne);
if (pme->n_api == -1) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Invalid api returned.");
return NULL;
}
break;
default:
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("IRMd in wrong state.");
free(pme);
return NULL;
}
- pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
- pthread_rwlock_wrlock(&instance->flows_lock);
- pme->port_id = bmp_allocate(instance->port_ids);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
+ pme->port_id = bmp_allocate(irmd->port_ids);
- list_add(&pme->next, &instance->port_map);
+ list_add(&pme->next, &irmd->irm_flows);
- pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
pthread_mutex_lock(&rne->state_lock);
@@ -1207,15 +1205,15 @@ static struct port_map_entry * flow_req_arr(pid_t api,
pthread_mutex_unlock(&rne->state_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
while (acc_wait) {
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
pthread_mutex_lock(&rne->state_lock);
acc_wait = (rne->state == REG_NAME_FLOW_ARRIVED &&
- instance->state == IRMD_RUNNING);
+ irmd->state == IRMD_RUNNING);
pthread_mutex_unlock(&rne->state_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
}
return pme;
@@ -1224,19 +1222,19 @@ static struct port_map_entry * flow_req_arr(pid_t api,
static int flow_alloc_reply(int port_id,
int response)
{
- struct port_map_entry * e;
+ struct irm_flow * e;
- pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_rdlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_rdlock(&irmd->flows_lock);
- e = get_port_map_entry(port_id);
+ e = get_irm_flow(port_id);
if (e == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_mutex_lock(&e->res_lock);
+ pthread_mutex_lock(&e->state_lock);
if (!response)
e->state = FLOW_ALLOCATED;
@@ -1244,35 +1242,35 @@ static int flow_alloc_reply(int port_id,
else
e->state = FLOW_NULL;
- if (pthread_cond_signal(&e->res_signal))
+ if (pthread_cond_signal(&e->state_cond))
LOG_ERR("Failed to send signal.");
- pthread_mutex_unlock(&e->res_lock);
+ pthread_mutex_unlock(&e->state_lock);
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
static int flow_dealloc_ipcp(int port_id)
{
- struct port_map_entry * e = NULL;
+ struct irm_flow * e = NULL;
- pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_wrlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
- e = get_port_map_entry(port_id);
+ e = get_irm_flow(port_id);
if (e == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
list_del(&e->next);
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
free(e);
@@ -1284,17 +1282,17 @@ static void irm_destroy()
struct list_head * h;
struct list_head * t;
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state != IRMD_NULL)
+ if (irmd->state != IRMD_NULL)
LOG_WARN("Unsafe destroy.");
- if (instance->threadpool != NULL)
- free(instance->threadpool);
+ if (irmd->threadpool != NULL)
+ free(irmd->threadpool);
- pthread_rwlock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
/* clear the lists */
- list_for_each_safe(h, t, &instance->ipcps) {
+ list_for_each_safe(h, t, &irmd->ipcps) {
struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next);
list_del(&e->next);
ipcp_destroy(e->api);
@@ -1302,52 +1300,49 @@ static void irm_destroy()
ipcp_entry_destroy(e);
}
- list_for_each_safe(h, t, &instance->registry) {
- struct reg_entry * e = list_entry(h, struct reg_entry, next);
- list_del(&e->next);
- reg_entry_destroy(e);
- }
+ registry_destroy(&irmd->registry);
- list_for_each_safe(h, t, &instance->spawned_apis) {
+ list_for_each_safe(h, t, &irmd->spawned_apis) {
struct spawned_api * api =
list_entry(h, struct spawned_api, next);
int status;
if (kill(api->api, SIGTERM))
- LOG_DBGF("Could not send kill signal to %d.", api->api);
+ LOG_DBG("Could not send kill signal to %d.", api->api);
else if (waitpid(api->api, &status, 0) < 0)
- LOG_DBGF("Error waiting for %d to exit.", api->api);
+ LOG_DBG("Error waiting for %d to exit.", api->api);
list_del(&api->next);
free(api);
}
- pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
- pthread_rwlock_wrlock(&instance->flows_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
- list_for_each_safe(h, t, &instance->port_map) {
- struct port_map_entry * e = list_entry(h,
- struct port_map_entry,
- next);
+ list_for_each_safe(h, t, &irmd->irm_flows) {
+ struct irm_flow * e = list_entry(h, struct irm_flow, next);
list_del(&e->next);
- port_map_entry_destroy(e);
+ irm_flow_destroy(e);
}
- if (instance->port_ids != NULL)
- bmp_destroy(instance->port_ids);
+ if (irmd->port_ids != NULL)
+ bmp_destroy(irmd->port_ids);
+
+ pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&instance->flows_lock);
+ if (irmd->dum != NULL)
+ shm_du_map_destroy(irmd->dum);
- if (instance->dum != NULL)
- shm_du_map_destroy(instance->dum);
+ if (irmd->lf != NULL)
+ lockfile_destroy(irmd->lf);
- if (instance->lf != NULL)
- lockfile_destroy(instance->lf);
+ close(irmd->sockfd);
- close(instance->sockfd);
+ pthread_rwlock_unlock(&irmd->state_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_destroy(&irmd->reg_lock);
+ pthread_rwlock_destroy(&irmd->state_lock);
- free(instance);
+ free(irmd);
}
void irmd_sig_handler(int sig, siginfo_t * info, void * c)
@@ -1358,20 +1353,20 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)
case SIGINT:
case SIGTERM:
case SIGHUP:
- pthread_rwlock_wrlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&irmd->state_lock);
- instance->state = IRMD_NULL;
+ irmd->state = IRMD_NULL;
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
- if (instance->threadpool != NULL) {
+ if (irmd->threadpool != NULL) {
for (i = 0; i < IRMD_THREADPOOL_SIZE; i++)
- pthread_cancel(instance->threadpool[i]);
+ pthread_cancel(irmd->threadpool[i]);
}
- pthread_cancel(instance->shm_sanitize);
- pthread_cancel(instance->cleanup_flows);
+ pthread_cancel(irmd->shm_sanitize);
+ pthread_cancel(irmd->cleanup_flows);
break;
case SIGPIPE:
LOG_DBG("Ignored SIGPIPE.");
@@ -1386,9 +1381,6 @@ void * irm_flow_cleaner()
struct list_head * pos = NULL;
struct list_head * n = NULL;
- struct list_head * pos2 = NULL;
- struct list_head * n2 = NULL;
-
struct timespec timeout = {IRMD_CLEANUP_TIMER / BILLION,
IRMD_CLEANUP_TIMER % BILLION};
int status;
@@ -1398,37 +1390,37 @@ void * irm_flow_cleaner()
LOG_WARN("Failed to get time.");
/* cleanup stale PENDING flows */
- pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&irmd->state_lock);
- if (instance->state == IRMD_NULL) {
- pthread_rwlock_unlock(&instance->state_lock);
+ if (irmd->state == IRMD_NULL) {
+ pthread_rwlock_unlock(&irmd->state_lock);
return (void *) 0;
}
- pthread_rwlock_wrlock(&instance->flows_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
- list_for_each_safe(pos, n, &(instance->port_map)) {
- struct port_map_entry * e =
- list_entry(pos, struct port_map_entry, next);
+ list_for_each_safe(pos, n, &(irmd->irm_flows)) {
+ struct irm_flow * e =
+ list_entry(pos, struct irm_flow, next);
- pthread_mutex_lock(&e->res_lock);
+ pthread_mutex_lock(&e->state_lock);
if (e->state == FLOW_PENDING &&
ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) {
LOG_INFO("Pending port_id %d timed out.",
e->port_id);
e->state = FLOW_NULL;
- pthread_cond_signal(&e->res_signal);
- pthread_mutex_unlock(&e->res_lock);
+ pthread_cond_signal(&e->state_cond);
+ pthread_mutex_unlock(&e->state_lock);
continue;
}
- pthread_mutex_unlock(&e->res_lock);
+ pthread_mutex_unlock(&e->state_lock);
if (kill(e->n_api, 0) < 0) {
struct shm_ap_rbuff * n_rb =
shm_ap_rbuff_open(e->n_api);
- bmp_release(instance->port_ids, e->port_id);
+ bmp_release(irmd->port_ids, e->port_id);
list_del(&e->next);
LOG_INFO("Process %d gone, %d deallocated.",
@@ -1436,7 +1428,7 @@ void * irm_flow_cleaner()
ipcp_flow_dealloc(e->n_1_api, e->port_id);
if (n_rb != NULL)
shm_ap_rbuff_destroy(n_rb);
- port_map_entry_destroy(e);
+ irm_flow_destroy(e);
}
if (kill(e->n_1_api, 0) < 0) {
struct shm_ap_rbuff * n_1_rb =
@@ -1446,34 +1438,16 @@ void * irm_flow_cleaner()
e->n_1_api, e->port_id);
if (n_1_rb != NULL)
shm_ap_rbuff_destroy(n_1_rb);
- port_map_entry_destroy(e);
+ irm_flow_destroy(e);
}
}
- pthread_rwlock_unlock(&instance->flows_lock);
- pthread_rwlock_wrlock(&instance->reg_lock);
-
- list_for_each_safe(pos, n, &instance->registry) {
- struct reg_entry * e =
- list_entry(pos, struct reg_entry, next);
-
- list_for_each_safe(pos2, n2, &e->reg_apis) {
- struct reg_api * r =
- list_entry(pos2,
- struct reg_api,
- next);
- if (kill(r->api, 0) < 0) {
- LOG_INFO("Process %d gone, "
- "registry binding removed.",
- r->api);
- registry_del_api(
- &instance->registry,
- r->api);
- }
- }
- }
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
+
+ registry_sanitize_apis(&irmd->registry);
- list_for_each_safe(pos, n, &instance->spawned_apis) {
+ list_for_each_safe(pos, n, &irmd->spawned_apis) {
struct spawned_api * api =
list_entry(pos, struct spawned_api, next);
waitpid(api->api, &status, WNOHANG);
@@ -1487,8 +1461,8 @@ void * irm_flow_cleaner()
}
}
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
nanosleep(&timeout, NULL);
}
@@ -1509,12 +1483,12 @@ void * mainloop()
ssize_t count;
buffer_t buffer;
irm_msg_t ret_msg = IRM_MSG__INIT;
- struct port_map_entry * e = NULL;
+ struct irm_flow * e = NULL;
pid_t * apis = NULL;
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
- cli_sockfd = accept(instance->sockfd, 0, 0);
+ cli_sockfd = accept(irmd->sockfd, 0, 0);
if (cli_sockfd < 0) {
LOG_ERR("Cannot accept new connection.");
continue;
@@ -1703,17 +1677,17 @@ static struct irm * irm_create()
{
struct stat st = {0};
- instance = malloc(sizeof(*instance));
- if (instance == NULL)
+ irmd = malloc(sizeof(*irmd));
+ if (irmd == NULL)
return NULL;
- instance->state = IRMD_NULL;
+ irmd->state = IRMD_NULL;
if (access("/dev/shm/" LOCKFILE_NAME, F_OK) != -1) {
struct lockfile * lf = lockfile_open();
if (lf == NULL) {
LOG_ERR("Failed to open existing lockfile.");
- free(instance);
+ free(irmd);
return NULL;
}
@@ -1726,42 +1700,42 @@ static struct irm * irm_create()
LOG_INFO("IRMd already running (%d), exiting.",
lockfile_owner(lf));
lockfile_close(lf);
- free(instance);
+ free(irmd);
return NULL;
}
}
- if (pthread_rwlock_init(&instance->state_lock, NULL)) {
+ if (pthread_rwlock_init(&irmd->state_lock, NULL)) {
LOG_ERR("Failed to initialize rwlock.");
- free(instance);
+ free(irmd);
return NULL;
}
- if (pthread_rwlock_init(&instance->reg_lock, NULL)) {
+ if (pthread_rwlock_init(&irmd->reg_lock, NULL)) {
LOG_ERR("Failed to initialize rwlock.");
- free(instance);
+ free(irmd);
return NULL;
}
- if (pthread_rwlock_init(&instance->flows_lock, NULL)) {
+ if (pthread_rwlock_init(&irmd->flows_lock, NULL)) {
LOG_ERR("Failed to initialize rwlock.");
- free(instance);
+ free(irmd);
return NULL;
}
- INIT_LIST_HEAD(&instance->ipcps);
- INIT_LIST_HEAD(&instance->spawned_apis);
- INIT_LIST_HEAD(&instance->registry);
- INIT_LIST_HEAD(&instance->port_map);
+ INIT_LIST_HEAD(&irmd->ipcps);
+ INIT_LIST_HEAD(&irmd->spawned_apis);
+ INIT_LIST_HEAD(&irmd->registry);
+ INIT_LIST_HEAD(&irmd->irm_flows);
- instance->port_ids = bmp_create(IRMD_MAX_FLOWS, 0);
- if (instance->port_ids == NULL) {
+ irmd->port_ids = bmp_create(IRMD_MAX_FLOWS, 0);
+ if (irmd->port_ids == NULL) {
irm_destroy();
return NULL;
}
- instance->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
- if (instance->threadpool == NULL) {
+ irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
+ if (irmd->threadpool == NULL) {
irm_destroy();
return NULL;
}
@@ -1774,8 +1748,8 @@ static struct irm * irm_create()
}
}
- instance->sockfd = server_socket_open(IRM_SOCK_PATH);
- if (instance->sockfd < 0) {
+ irmd->sockfd = server_socket_open(IRM_SOCK_PATH);
+ if (irmd->sockfd < 0) {
irm_destroy();
return NULL;
}
@@ -1786,19 +1760,19 @@ static struct irm * irm_create()
return NULL;
}
- if ((instance->lf = lockfile_create()) == NULL) {
+ if ((irmd->lf = lockfile_create()) == NULL) {
irm_destroy();
return NULL;
}
- if ((instance->dum = shm_du_map_create()) == NULL) {
+ if ((irmd->dum = shm_du_map_create()) == NULL) {
irm_destroy();
return NULL;
}
- instance->state = IRMD_RUNNING;
+ irmd->state = IRMD_RUNNING;
- return instance;
+ return irmd;
}
static void usage()
@@ -1892,25 +1866,25 @@ int main(int argc, char ** argv)
if (sigaction(SIGPIPE, &sig_act, NULL) < 0)
exit(EXIT_FAILURE);
- instance = irm_create();
- if (instance == NULL) {
+ irmd = irm_create();
+ if (irmd == NULL) {
close_logfile();
exit(EXIT_FAILURE);
}
for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
- pthread_create(&instance->threadpool[t], NULL, mainloop, NULL);
+ pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL);
- pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL);
- pthread_create(&instance->shm_sanitize, NULL,
- shm_du_map_sanitize, instance->dum);
+ pthread_create(&irmd->cleanup_flows, NULL, irm_flow_cleaner, NULL);
+ pthread_create(&irmd->shm_sanitize, NULL,
+ shm_du_map_sanitize, irmd->dum);
/* wait for (all of them) to return */
for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
- pthread_join(instance->threadpool[t], NULL);
+ pthread_join(irmd->threadpool[t], NULL);
- pthread_join(instance->shm_sanitize, NULL);
- pthread_join(instance->cleanup_flows, NULL);
+ pthread_join(irmd->shm_sanitize, NULL);
+ pthread_join(irmd->cleanup_flows, NULL);
irm_destroy();