From 0dda754f6eb91af15f7c69523e2ebb627086b457 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 27 Jul 2016 15:29:28 +0200 Subject: irmd: Revised flow allocation Flow allocation requests and registered api states revised so all states are tracked with a condition variable. This is a more reliable approach and improves stability of flow allocation. Some other refactoring was also done, such as renaming port_map_entry to irm_flow and hiding some internal structures of the registry. --- src/irmd/main.c | 812 +++++++++++++++++++++++++--------------------------- src/irmd/registry.c | 148 +++++++--- src/irmd/registry.h | 40 +-- 3 files changed, 508 insertions(+), 492 deletions(-) (limited to 'src') diff --git a/src/irmd/main.c b/src/irmd/main.c index 625c28c8..6cf16505 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -76,7 +76,7 @@ struct spawned_api { }; /* keeps track of port_id's between N and N - 1 */ -struct port_map_entry { +struct irm_flow { struct list_head next; int port_id; @@ -84,12 +84,11 @@ struct port_map_entry { pid_t n_api; pid_t n_1_api; - pthread_cond_t res_signal; - pthread_mutex_t res_lock; + struct timespec t0; enum flow_state state; - - struct timespec t0; + pthread_cond_t state_cond; + pthread_mutex_t state_lock; }; struct irm { @@ -104,23 +103,24 @@ struct irm { /* keep track of all flows in this processing system */ struct bmp * port_ids; /* maps port_ids to api pair */ - struct list_head port_map; + struct list_head irm_flows; pthread_rwlock_t flows_lock; - enum irm_state state; struct lockfile * lf; struct shm_du_map * dum; pthread_t * threadpool; int sockfd; + + enum irm_state state; pthread_rwlock_t state_lock; pthread_t cleanup_flows; pthread_t shm_sanitize; -} * instance = NULL; +} * irmd = NULL; -static struct port_map_entry * port_map_entry_create() +static struct irm_flow * irm_flow_create() { - struct port_map_entry * e = malloc(sizeof(*e)); + struct irm_flow * e = malloc(sizeof(*e)); if (e == NULL) return NULL; @@ -129,12 +129,12 @@ static struct port_map_entry * port_map_entry_create() e->port_id = -1; e->state = FLOW_NULL; - if (pthread_cond_init(&e->res_signal, NULL)) { + if (pthread_cond_init(&e->state_cond, NULL)) { free(e); return NULL; } - if (pthread_mutex_init(&e->res_lock, NULL)) { + if (pthread_mutex_init(&e->state_lock, NULL)) { free(e); return NULL; } @@ -145,36 +145,36 @@ static struct port_map_entry * port_map_entry_create() return e; } -static void port_map_entry_destroy(struct port_map_entry * e) +static void irm_flow_destroy(struct irm_flow * e) { - bool wait = true; - pthread_mutex_lock(&e->res_lock); - e->state = FLOW_NULL; + pthread_mutex_lock(&e->state_lock); - pthread_cond_broadcast(&e->res_signal); - pthread_mutex_unlock(&e->res_lock); + if (e->state == FLOW_PENDING) + e->state = FLOW_DESTROY; - while (wait) { - pthread_mutex_lock(&e->res_lock); - if (pthread_cond_destroy(&e->res_signal)) - pthread_cond_broadcast(&e->res_signal); - else - wait = false; - pthread_mutex_unlock(&e->res_lock); - } + pthread_cond_signal(&e->state_cond); + pthread_mutex_unlock(&e->state_lock); + + pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, + (void *) &e->state_lock); + + while (e->state != FLOW_NULL) + pthread_cond_wait(&e->state_cond, &e->state_lock); - pthread_mutex_destroy(&e->res_lock); + pthread_cleanup_pop(true); + + pthread_cond_destroy(&e->state_cond); + pthread_mutex_destroy(&e->state_lock); free(e); } -static struct port_map_entry * get_port_map_entry(int port_id) +static struct irm_flow * get_irm_flow(int port_id) { struct list_head * pos = NULL; - list_for_each(pos, &instance->port_map) { - struct port_map_entry * e = - list_entry(pos, struct port_map_entry, next); + list_for_each(pos, &irmd->irm_flows) { + struct irm_flow * e = list_entry(pos, struct irm_flow, next); if (e->port_id == port_id) return e; @@ -183,13 +183,12 @@ static struct port_map_entry * get_port_map_entry(int port_id) return NULL; } -static struct port_map_entry * get_port_map_entry_n(pid_t n_api) +static struct irm_flow * get_irm_flow_n(pid_t n_api) { struct list_head * pos = NULL; - list_for_each(pos, &instance->port_map) { - struct port_map_entry * e = - list_entry(pos, struct port_map_entry, next); + list_for_each(pos, &irmd->irm_flows) { + struct irm_flow * e = list_entry(pos, struct irm_flow, next); if (e->n_api == n_api) return e; @@ -230,7 +229,7 @@ static struct ipcp_entry * get_ipcp_entry_by_api(pid_t api) { struct list_head * pos = NULL; - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * tmp = list_entry(pos, struct ipcp_entry, next); if (api == tmp->api) @@ -246,9 +245,9 @@ static pid_t get_ipcp_by_dst_name(char * dst_name) { struct list_head * pos = NULL; char * dif_name = - registry_get_dif_for_dst(&instance->registry, dst_name); + registry_get_dif_for_dst(&irmd->registry, dst_name); if (dif_name == NULL) { - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (e->type == IPCP_NORMAL) { @@ -257,7 +256,7 @@ static pid_t get_ipcp_by_dst_name(char * dst_name) } } - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (e->type == IPCP_SHIM_ETH_LLC) { @@ -267,7 +266,7 @@ static pid_t get_ipcp_by_dst_name(char * dst_name) } - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (e->type == IPCP_SHIM_UDP) { @@ -280,7 +279,7 @@ static pid_t get_ipcp_by_dst_name(char * dst_name) if (dif_name == NULL) return -1; - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (strcmp(e->dif_name, dif_name) == 0) @@ -302,23 +301,23 @@ static pid_t create_ipcp(char * name, if (api == NULL) return -ENOMEM; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } api->api = ipcp_create(ipcp_type); if (api->api == -1) { - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Failed to create IPCP."); return -1; } tmp = ipcp_entry_create(); if (tmp == NULL) { - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } @@ -328,28 +327,28 @@ static pid_t create_ipcp(char * name, tmp->name = strdup(name); if (tmp->name == NULL) { ipcp_entry_destroy(tmp); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } tmp->dif_name = NULL; tmp->type = ipcp_type; - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (e->type < ipcp_type) break; } - list_add(&tmp->next, &instance->ipcps); + list_add(&tmp->next, &irmd->ipcps); - list_add(&api->next, &instance->spawned_apis); + list_add(&api->next, &irmd->spawned_apis); - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_INFO("Created IPCP %d.", api->api); @@ -361,7 +360,7 @@ static void clear_spawned_api(pid_t api) struct list_head * pos = NULL; struct list_head * n = NULL; - list_for_each_safe(pos, n, &(instance->spawned_apis)) { + list_for_each_safe(pos, n, &(irmd->spawned_apis)) { struct spawned_api * a = list_entry(pos, struct spawned_api, next); @@ -377,10 +376,10 @@ static int destroy_ipcp(pid_t api) struct list_head * pos = NULL; struct list_head * n = NULL; - pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); - list_for_each_safe(pos, n, &(instance->ipcps)) { + list_for_each_safe(pos, n, &(irmd->ipcps)) { struct ipcp_entry * tmp = list_entry(pos, struct ipcp_entry, next); @@ -395,8 +394,8 @@ static int destroy_ipcp(pid_t api) } } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); return 0; } @@ -406,42 +405,42 @@ static int bootstrap_ipcp(pid_t api, { struct ipcp_entry * entry = NULL; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); entry = get_ipcp_entry_by_api(api); if (entry == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api, conf)) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_INFO("Bootstrapped IPCP %d in DIF %s.", entry->api, conf->dif_name); @@ -454,27 +453,27 @@ static int enroll_ipcp(pid_t api, { struct ipcp_entry * entry = NULL; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_rdlock(&instance->reg_lock); + pthread_rwlock_rdlock(&irmd->reg_lock); entry = get_ipcp_entry_by_api(api); if (entry == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Failed to strdup."); return -1; } @@ -482,14 +481,14 @@ static int enroll_ipcp(pid_t api, if (ipcp_enroll(api, dif_name)) { free(entry->dif_name); entry->dif_name = NULL; - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Could not enroll IPCP."); return -1; } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_INFO("Enrolled IPCP %d in DIF %s.", entry->api, dif_name); @@ -510,14 +509,14 @@ static int bind_name(char * name, if (name == NULL || ap_name == NULL) return -EINVAL; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); if (opts & BIND_AP_AUTO) { /* we need to duplicate argv */ @@ -530,17 +529,17 @@ static int bind_name(char * name, } } - if (registry_add_binding(&instance->registry, + if (registry_add_binding(&irmd->registry, strdup(name), strdup(apn), opts, argv_dup) < 0) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Failed to register %s.", name); return -1; } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_INFO("Bound %s to registered name %s.", ap_name, name); @@ -558,24 +557,24 @@ static int unbind_name(char * name, if (!(opts & UNBIND_AP_HARD) && apn == NULL) return -EINVAL; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); if ((opts & UNBIND_AP_HARD) && apn == NULL) { - registry_deassign(&instance->registry, name); - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + registry_deassign(&irmd->registry, name); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_INFO("Removed all bindings of %s.", name); } else { - registry_del_binding(&instance->registry, name, apn); - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + registry_del_binding(&irmd->registry, name, apn); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_INFO("Removed binding from %s to %s.", apn, name); } @@ -589,7 +588,7 @@ static ssize_t list_ipcps(char * name, ssize_t count = 0; int i = 0; - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * tmp = list_entry(pos, struct ipcp_entry, next); @@ -603,7 +602,7 @@ static ssize_t list_ipcps(char * name, return -1; } - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * tmp = list_entry(pos, struct ipcp_entry, next); @@ -626,22 +625,22 @@ static int ap_reg(char * name, if (name == NULL || difs == NULL || len == 0 || difs[0] == NULL) return -EINVAL; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); - if (list_empty(&instance->ipcps)) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + if (list_empty(&irmd->ipcps)) { + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); @@ -656,7 +655,7 @@ static int ap_reg(char * name, LOG_ERR("Could not register %s in DIF %s.", name, e->dif_name); } else { - if(registry_add_name_to_dif(&instance->registry, + if(registry_add_name_to_dif(&irmd->registry, name, e->dif_name, e->type) < 0) @@ -671,13 +670,13 @@ static int ap_reg(char * name, } if (ret == 0) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); return ret; } @@ -693,16 +692,16 @@ static int ap_unreg(char * name, if (name == NULL || len == 0 || difs == NULL || difs[0] == NULL) return -1; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); - list_for_each(pos, &instance->ipcps) { + list_for_each(pos, &irmd->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); @@ -718,7 +717,7 @@ static int ap_unreg(char * name, name, e->dif_name); --ret; } else { - registry_del_name_from_dif(&instance->registry, + registry_del_name_from_dif(&irmd->registry, name, e->dif_name); LOG_INFO("Unregistered %s from %s.", @@ -727,44 +726,44 @@ static int ap_unreg(char * name, } } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); return ret; } -static struct port_map_entry * flow_accept(pid_t api, - char * srv_ap_name, - char ** dst_ae_name) +static struct irm_flow * flow_accept(pid_t api, + char * srv_ap_name, + char ** dst_ae_name) { - struct port_map_entry * pme = NULL; + struct irm_flow * pme = NULL; struct reg_entry * rne = NULL; struct reg_api * rgi = NULL; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return NULL; } - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); - rne = registry_get_entry_by_apn(&instance->registry, srv_ap_name); + rne = registry_get_entry_by_apn(&irmd->registry, srv_ap_name); if (rne == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("AP %s is unknown.", srv_ap_name); return NULL; } if (!reg_entry_get_reg_api(rne, api)) { - rgi = registry_add_api_name(&instance->registry, + rgi = registry_add_api_name(&irmd->registry, api, rne->name); if (rgi == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Failed to register instance %d with %s.", api,srv_ap_name); return NULL; @@ -772,31 +771,31 @@ static struct port_map_entry * flow_accept(pid_t api, LOG_INFO("New instance (%d) of %s added.", api, srv_ap_name); } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); reg_api_sleep(rgi); - pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_rdlock(&instance->reg_lock); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_rdlock(&irmd->reg_lock); pthread_mutex_lock(&rne->state_lock); if (rne->state != REG_NAME_FLOW_ARRIVED) { pthread_mutex_unlock(&rne->state_lock); - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); return NULL; } pthread_mutex_unlock(&rne->state_lock); - pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_rdlock(&instance->flows_lock); + pthread_rwlock_rdlock(&irmd->flows_lock); - pme = get_port_map_entry_n(api); + pme = get_irm_flow_n(api); if (pme == NULL) { - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Port_id was not created yet."); return NULL; } @@ -804,8 +803,8 @@ static struct port_map_entry * flow_accept(pid_t api, if (dst_ae_name != NULL) *dst_ae_name = rne->req_ae_name; - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return pme; } @@ -814,53 +813,54 @@ static int flow_alloc_resp(pid_t n_api, int port_id, int response) { - struct port_map_entry * pme = NULL; + struct irm_flow * pme = NULL; struct reg_entry * rne = NULL; int ret = -1; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); - rne = registry_get_entry_by_api(&instance->registry, n_api); + rne = registry_get_entry_by_api(&irmd->registry, n_api); if (rne == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } if (rne->state != REG_NAME_FLOW_ARRIVED) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Process not listening for this name."); return -1; } pthread_mutex_lock(&rne->state_lock); - registry_del_api(&instance->registry, n_api); + registry_del_api(&irmd->registry, n_api); pthread_mutex_unlock(&rne->state_lock); - pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&irmd->reg_lock); if (!response) { - pthread_rwlock_wrlock(&instance->flows_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); - pme = get_port_map_entry(port_id); + pme = get_irm_flow(port_id); if (pme == NULL) { - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } pme->state = FLOW_ALLOCATED; - pthread_rwlock_unlock(&instance->flows_lock); + pthread_cond_signal(&pme->state_cond); + pthread_rwlock_unlock(&irmd->flows_lock); ret = ipcp_flow_alloc_resp(pme->n_1_api, port_id, @@ -868,32 +868,32 @@ static int flow_alloc_resp(pid_t n_api, response); } - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->state_lock); return ret; } -static struct port_map_entry * flow_alloc(pid_t api, - char * dst_name, - char * src_ae_name, - struct qos_spec * qos) +static struct irm_flow * flow_alloc(pid_t api, + char * dst_name, + char * src_ae_name, + struct qos_spec * qos) { - struct port_map_entry * pme; + struct irm_flow * pme; pid_t ipcp; /* FIXME: Map qos_spec to qos_cube */ - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return NULL; } - pme = port_map_entry_create(); + pme = irm_flow_create(); if (pme == NULL) { - pthread_rwlock_unlock(&instance->state_lock); - LOG_ERR("Failed to create port_map_entry."); + pthread_rwlock_unlock(&irmd->state_lock); + LOG_ERR("Failed to create irm_flow."); return NULL; } @@ -902,26 +902,26 @@ static struct port_map_entry * flow_alloc(pid_t api, if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) LOG_WARN("Failed to set timestamp."); - pthread_rwlock_rdlock(&instance->reg_lock); + pthread_rwlock_rdlock(&irmd->reg_lock); ipcp = get_ipcp_by_dst_name(dst_name); if (ipcp == -1) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_INFO("Destination unreachable."); return NULL; } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_wrlock(&instance->flows_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); - pme->port_id = bmp_allocate(instance->port_ids); + pme->port_id = bmp_allocate(irmd->port_ids); pme->n_1_api = ipcp; - list_add(&pme->next, &instance->port_map); + list_add(&pme->next, &irmd->irm_flows); - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); if (ipcp_flow_alloc(ipcp, pme->port_id, @@ -929,12 +929,12 @@ static struct port_map_entry * flow_alloc(pid_t api, dst_name, src_ae_name, QOS_CUBE_BE) < 0) { - pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_wrlock(&instance->flows_lock); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); list_del(&pme->next); - bmp_release(instance->port_ids, pme->port_id); - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + bmp_release(irmd->port_ids, pme->port_id); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); free(pme); return NULL; } @@ -944,75 +944,68 @@ static struct port_map_entry * flow_alloc(pid_t api, static int flow_alloc_res(int port_id) { - struct port_map_entry * e; + struct irm_flow * e; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_rwlock_rdlock(&instance->flows_lock); + pthread_rwlock_rdlock(&irmd->flows_lock); - e = get_port_map_entry(port_id); + e = get_irm_flow(port_id); if (e == NULL) { LOG_ERR("Could not find port %d.", port_id); - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } if (e->state == FLOW_NULL) { LOG_INFO("Port %d is deprecated.", port_id); - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } if (e->state == FLOW_ALLOCATED) { - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return 0; } - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); - while (true) { - pthread_mutex_lock(&e->res_lock); - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void*) &e->res_lock); + pthread_mutex_lock(&e->state_lock); + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void*) &e->state_lock); - pthread_cond_wait(&e->res_signal, &e->res_lock); + while (e->state == FLOW_PENDING) + pthread_cond_wait(&e->state_cond, &e->state_lock); - pthread_cleanup_pop(true); + pthread_cleanup_pop(true); - pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_wrlock(&instance->flows_lock); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); + pthread_mutex_lock(&e->state_lock); - e = get_port_map_entry(port_id); - if (e == NULL) { - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); - return -1; - } - if (e->state == FLOW_ALLOCATED) { - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); - return 0; - } - if (e->state == FLOW_NULL) { - /* don't release the port_id, AP has to call dealloc */ - list_del(&e->next); - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); - free(e); - return -1; + if (e->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&e->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); + return 0; + } - } - /* still pending, spurious wake */ - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + if (e->state == FLOW_DESTROY) { + /* don't release the port_id, AP has to call dealloc */ + e->state = FLOW_NULL; + pthread_cond_signal(&e->state_cond); + pthread_mutex_unlock(&e->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); + return -1; } return 0; @@ -1023,16 +1016,16 @@ static int flow_dealloc(int port_id) pid_t n_1_api; int ret = 0; - struct port_map_entry * e = NULL; + struct irm_flow * e = NULL; - pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_wrlock(&instance->flows_lock); - bmp_release(instance->port_ids, port_id); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); + bmp_release(irmd->port_ids, port_id); - e = get_port_map_entry(port_id); + e = get_irm_flow(port_id); if (e == NULL) { - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return 0; } @@ -1040,11 +1033,11 @@ static int flow_dealloc(int port_id) list_del(&e->next); - pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&irmd->flows_lock); ret = ipcp_flow_dealloc(n_1_api, port_id); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->state_lock); free(e); @@ -1083,21 +1076,21 @@ static pid_t auto_execute(char ** argv) exit(EXIT_FAILURE); } -static struct port_map_entry * flow_req_arr(pid_t api, - char * dst_name, - char * ae_name) +static struct irm_flow * flow_req_arr(pid_t api, + char * dst_name, + char * ae_name) { struct reg_entry * rne = NULL; - struct port_map_entry * pme = NULL; + struct irm_flow * pme = NULL; bool acc_wait = true; enum reg_name_state state; struct spawned_api * c_api; - pme = port_map_entry_create(); + pme = irm_flow_create(); if (pme == NULL) { - LOG_ERR("Failed to create port_map_entry."); + LOG_ERR("Failed to create irm_flow."); return NULL; } @@ -1106,13 +1099,13 @@ static struct port_map_entry * flow_req_arr(pid_t api, if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) LOG_WARN("Failed to set timestamp."); - pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_rdlock(&instance->reg_lock); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_rdlock(&irmd->reg_lock); - rne = registry_get_entry_by_name(&instance->registry, dst_name); + rne = registry_get_entry_by_name(&irmd->registry, dst_name); if (rne == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Unknown name: %s.", dst_name); free(pme); return NULL; @@ -1124,16 +1117,16 @@ static struct port_map_entry * flow_req_arr(pid_t api, switch (state) { case REG_NAME_IDLE: - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("No AP's for %s.", dst_name); free(pme); return NULL; case REG_NAME_AUTO_ACCEPT: c_api = malloc(sizeof(*c_api)); if (c_api == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); free(pme); return NULL; } @@ -1147,55 +1140,60 @@ static struct port_map_entry * flow_req_arr(pid_t api, pthread_mutex_lock(&rne->state_lock); rne->state = REG_NAME_AUTO_ACCEPT; pthread_mutex_unlock(&rne->state_lock); - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); free(pme); free(c_api); return NULL; } - list_add(&c_api->next, &instance->spawned_apis); + list_add(&c_api->next, &irmd->spawned_apis); - pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&irmd->reg_lock); pthread_mutex_lock(&rne->state_lock); pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, (void *) &rne->state_lock); while (rne->state == REG_NAME_AUTO_EXEC) - pthread_cond_wait(&rne->acc_signal, - &rne->state_lock); + pthread_cond_wait(&rne->state_cond, &rne->state_lock); pthread_cleanup_pop(true); - pthread_rwlock_rdlock(&instance->reg_lock); - + pthread_rwlock_rdlock(&irmd->reg_lock); + pthread_mutex_lock(&rne->state_lock); + if (rne->state == REG_NAME_DESTROY) { + pthread_mutex_unlock(&rne->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + return NULL; + } + pthread_mutex_unlock(&rne->state_lock); case REG_NAME_FLOW_ACCEPT: pme->n_api = reg_entry_resolve_api(rne); if (pme->n_api == -1) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Invalid api returned."); return NULL; } break; default: - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("IRMd in wrong state."); free(pme); return NULL; } - pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_wrlock(&instance->flows_lock); - pme->port_id = bmp_allocate(instance->port_ids); + pthread_rwlock_wrlock(&irmd->flows_lock); + pme->port_id = bmp_allocate(irmd->port_ids); - list_add(&pme->next, &instance->port_map); + list_add(&pme->next, &irmd->irm_flows); - pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&irmd->flows_lock); pthread_mutex_lock(&rne->state_lock); @@ -1207,15 +1205,15 @@ static struct port_map_entry * flow_req_arr(pid_t api, pthread_mutex_unlock(&rne->state_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->state_lock); while (acc_wait) { - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); pthread_mutex_lock(&rne->state_lock); acc_wait = (rne->state == REG_NAME_FLOW_ARRIVED && - instance->state == IRMD_RUNNING); + irmd->state == IRMD_RUNNING); pthread_mutex_unlock(&rne->state_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->state_lock); } return pme; @@ -1224,19 +1222,19 @@ static struct port_map_entry * flow_req_arr(pid_t api, static int flow_alloc_reply(int port_id, int response) { - struct port_map_entry * e; + struct irm_flow * e; - pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_rdlock(&instance->flows_lock); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_rdlock(&irmd->flows_lock); - e = get_port_map_entry(port_id); + e = get_irm_flow(port_id); if (e == NULL) { - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_mutex_lock(&e->res_lock); + pthread_mutex_lock(&e->state_lock); if (!response) e->state = FLOW_ALLOCATED; @@ -1244,35 +1242,35 @@ static int flow_alloc_reply(int port_id, else e->state = FLOW_NULL; - if (pthread_cond_signal(&e->res_signal)) + if (pthread_cond_signal(&e->state_cond)) LOG_ERR("Failed to send signal."); - pthread_mutex_unlock(&e->res_lock); + pthread_mutex_unlock(&e->state_lock); - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return 0; } static int flow_dealloc_ipcp(int port_id) { - struct port_map_entry * e = NULL; + struct irm_flow * e = NULL; - pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_wrlock(&instance->flows_lock); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); - e = get_port_map_entry(port_id); + e = get_irm_flow(port_id); if (e == NULL) { - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); return 0; } list_del(&e->next); - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); free(e); @@ -1284,17 +1282,17 @@ static void irm_destroy() struct list_head * h; struct list_head * t; - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state != IRMD_NULL) + if (irmd->state != IRMD_NULL) LOG_WARN("Unsafe destroy."); - if (instance->threadpool != NULL) - free(instance->threadpool); + if (irmd->threadpool != NULL) + free(irmd->threadpool); - pthread_rwlock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); /* clear the lists */ - list_for_each_safe(h, t, &instance->ipcps) { + list_for_each_safe(h, t, &irmd->ipcps) { struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); list_del(&e->next); ipcp_destroy(e->api); @@ -1302,52 +1300,49 @@ static void irm_destroy() ipcp_entry_destroy(e); } - list_for_each_safe(h, t, &instance->registry) { - struct reg_entry * e = list_entry(h, struct reg_entry, next); - list_del(&e->next); - reg_entry_destroy(e); - } + registry_destroy(&irmd->registry); - list_for_each_safe(h, t, &instance->spawned_apis) { + list_for_each_safe(h, t, &irmd->spawned_apis) { struct spawned_api * api = list_entry(h, struct spawned_api, next); int status; if (kill(api->api, SIGTERM)) - LOG_DBGF("Could not send kill signal to %d.", api->api); + LOG_DBG("Could not send kill signal to %d.", api->api); else if (waitpid(api->api, &status, 0) < 0) - LOG_DBGF("Error waiting for %d to exit.", api->api); + LOG_DBG("Error waiting for %d to exit.", api->api); list_del(&api->next); free(api); } - pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_wrlock(&instance->flows_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); - list_for_each_safe(h, t, &instance->port_map) { - struct port_map_entry * e = list_entry(h, - struct port_map_entry, - next); + list_for_each_safe(h, t, &irmd->irm_flows) { + struct irm_flow * e = list_entry(h, struct irm_flow, next); list_del(&e->next); - port_map_entry_destroy(e); + irm_flow_destroy(e); } - if (instance->port_ids != NULL) - bmp_destroy(instance->port_ids); + if (irmd->port_ids != NULL) + bmp_destroy(irmd->port_ids); + + pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&instance->flows_lock); + if (irmd->dum != NULL) + shm_du_map_destroy(irmd->dum); - if (instance->dum != NULL) - shm_du_map_destroy(instance->dum); + if (irmd->lf != NULL) + lockfile_destroy(irmd->lf); - if (instance->lf != NULL) - lockfile_destroy(instance->lf); + close(irmd->sockfd); - close(instance->sockfd); + pthread_rwlock_unlock(&irmd->state_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_destroy(&irmd->reg_lock); + pthread_rwlock_destroy(&irmd->state_lock); - free(instance); + free(irmd); } void irmd_sig_handler(int sig, siginfo_t * info, void * c) @@ -1358,20 +1353,20 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) case SIGINT: case SIGTERM: case SIGHUP: - pthread_rwlock_wrlock(&instance->state_lock); + pthread_rwlock_wrlock(&irmd->state_lock); - instance->state = IRMD_NULL; + irmd->state = IRMD_NULL; - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->state_lock); - if (instance->threadpool != NULL) { + if (irmd->threadpool != NULL) { for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) - pthread_cancel(instance->threadpool[i]); + pthread_cancel(irmd->threadpool[i]); } - pthread_cancel(instance->shm_sanitize); - pthread_cancel(instance->cleanup_flows); + pthread_cancel(irmd->shm_sanitize); + pthread_cancel(irmd->cleanup_flows); break; case SIGPIPE: LOG_DBG("Ignored SIGPIPE."); @@ -1386,9 +1381,6 @@ void * irm_flow_cleaner() struct list_head * pos = NULL; struct list_head * n = NULL; - struct list_head * pos2 = NULL; - struct list_head * n2 = NULL; - struct timespec timeout = {IRMD_CLEANUP_TIMER / BILLION, IRMD_CLEANUP_TIMER % BILLION}; int status; @@ -1398,37 +1390,37 @@ void * irm_flow_cleaner() LOG_WARN("Failed to get time."); /* cleanup stale PENDING flows */ - pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - if (instance->state == IRMD_NULL) { - pthread_rwlock_unlock(&instance->state_lock); + if (irmd->state == IRMD_NULL) { + pthread_rwlock_unlock(&irmd->state_lock); return (void *) 0; } - pthread_rwlock_wrlock(&instance->flows_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); - list_for_each_safe(pos, n, &(instance->port_map)) { - struct port_map_entry * e = - list_entry(pos, struct port_map_entry, next); + list_for_each_safe(pos, n, &(irmd->irm_flows)) { + struct irm_flow * e = + list_entry(pos, struct irm_flow, next); - pthread_mutex_lock(&e->res_lock); + pthread_mutex_lock(&e->state_lock); if (e->state == FLOW_PENDING && ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) { LOG_INFO("Pending port_id %d timed out.", e->port_id); e->state = FLOW_NULL; - pthread_cond_signal(&e->res_signal); - pthread_mutex_unlock(&e->res_lock); + pthread_cond_signal(&e->state_cond); + pthread_mutex_unlock(&e->state_lock); continue; } - pthread_mutex_unlock(&e->res_lock); + pthread_mutex_unlock(&e->state_lock); if (kill(e->n_api, 0) < 0) { struct shm_ap_rbuff * n_rb = shm_ap_rbuff_open(e->n_api); - bmp_release(instance->port_ids, e->port_id); + bmp_release(irmd->port_ids, e->port_id); list_del(&e->next); LOG_INFO("Process %d gone, %d deallocated.", @@ -1436,7 +1428,7 @@ void * irm_flow_cleaner() ipcp_flow_dealloc(e->n_1_api, e->port_id); if (n_rb != NULL) shm_ap_rbuff_destroy(n_rb); - port_map_entry_destroy(e); + irm_flow_destroy(e); } if (kill(e->n_1_api, 0) < 0) { struct shm_ap_rbuff * n_1_rb = @@ -1446,34 +1438,16 @@ void * irm_flow_cleaner() e->n_1_api, e->port_id); if (n_1_rb != NULL) shm_ap_rbuff_destroy(n_1_rb); - port_map_entry_destroy(e); + irm_flow_destroy(e); } } - pthread_rwlock_unlock(&instance->flows_lock); - pthread_rwlock_wrlock(&instance->reg_lock); - - list_for_each_safe(pos, n, &instance->registry) { - struct reg_entry * e = - list_entry(pos, struct reg_entry, next); - - list_for_each_safe(pos2, n2, &e->reg_apis) { - struct reg_api * r = - list_entry(pos2, - struct reg_api, - next); - if (kill(r->api, 0) < 0) { - LOG_INFO("Process %d gone, " - "registry binding removed.", - r->api); - registry_del_api( - &instance->registry, - r->api); - } - } - } + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); + + registry_sanitize_apis(&irmd->registry); - list_for_each_safe(pos, n, &instance->spawned_apis) { + list_for_each_safe(pos, n, &irmd->spawned_apis) { struct spawned_api * api = list_entry(pos, struct spawned_api, next); waitpid(api->api, &status, WNOHANG); @@ -1487,8 +1461,8 @@ void * irm_flow_cleaner() } } - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); nanosleep(&timeout, NULL); } @@ -1509,12 +1483,12 @@ void * mainloop() ssize_t count; buffer_t buffer; irm_msg_t ret_msg = IRM_MSG__INIT; - struct port_map_entry * e = NULL; + struct irm_flow * e = NULL; pid_t * apis = NULL; ret_msg.code = IRM_MSG_CODE__IRM_REPLY; - cli_sockfd = accept(instance->sockfd, 0, 0); + cli_sockfd = accept(irmd->sockfd, 0, 0); if (cli_sockfd < 0) { LOG_ERR("Cannot accept new connection."); continue; @@ -1703,17 +1677,17 @@ static struct irm * irm_create() { struct stat st = {0}; - instance = malloc(sizeof(*instance)); - if (instance == NULL) + irmd = malloc(sizeof(*irmd)); + if (irmd == NULL) return NULL; - instance->state = IRMD_NULL; + irmd->state = IRMD_NULL; if (access("/dev/shm/" LOCKFILE_NAME, F_OK) != -1) { struct lockfile * lf = lockfile_open(); if (lf == NULL) { LOG_ERR("Failed to open existing lockfile."); - free(instance); + free(irmd); return NULL; } @@ -1726,42 +1700,42 @@ static struct irm * irm_create() LOG_INFO("IRMd already running (%d), exiting.", lockfile_owner(lf)); lockfile_close(lf); - free(instance); + free(irmd); return NULL; } } - if (pthread_rwlock_init(&instance->state_lock, NULL)) { + if (pthread_rwlock_init(&irmd->state_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); - free(instance); + free(irmd); return NULL; } - if (pthread_rwlock_init(&instance->reg_lock, NULL)) { + if (pthread_rwlock_init(&irmd->reg_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); - free(instance); + free(irmd); return NULL; } - if (pthread_rwlock_init(&instance->flows_lock, NULL)) { + if (pthread_rwlock_init(&irmd->flows_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); - free(instance); + free(irmd); return NULL; } - INIT_LIST_HEAD(&instance->ipcps); - INIT_LIST_HEAD(&instance->spawned_apis); - INIT_LIST_HEAD(&instance->registry); - INIT_LIST_HEAD(&instance->port_map); + INIT_LIST_HEAD(&irmd->ipcps); + INIT_LIST_HEAD(&irmd->spawned_apis); + INIT_LIST_HEAD(&irmd->registry); + INIT_LIST_HEAD(&irmd->irm_flows); - instance->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); - if (instance->port_ids == NULL) { + irmd->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); + if (irmd->port_ids == NULL) { irm_destroy(); return NULL; } - instance->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); - if (instance->threadpool == NULL) { + irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + if (irmd->threadpool == NULL) { irm_destroy(); return NULL; } @@ -1774,8 +1748,8 @@ static struct irm * irm_create() } } - instance->sockfd = server_socket_open(IRM_SOCK_PATH); - if (instance->sockfd < 0) { + irmd->sockfd = server_socket_open(IRM_SOCK_PATH); + if (irmd->sockfd < 0) { irm_destroy(); return NULL; } @@ -1786,19 +1760,19 @@ static struct irm * irm_create() return NULL; } - if ((instance->lf = lockfile_create()) == NULL) { + if ((irmd->lf = lockfile_create()) == NULL) { irm_destroy(); return NULL; } - if ((instance->dum = shm_du_map_create()) == NULL) { + if ((irmd->dum = shm_du_map_create()) == NULL) { irm_destroy(); return NULL; } - instance->state = IRMD_RUNNING; + irmd->state = IRMD_RUNNING; - return instance; + return irmd; } static void usage() @@ -1892,25 +1866,25 @@ int main(int argc, char ** argv) if (sigaction(SIGPIPE, &sig_act, NULL) < 0) exit(EXIT_FAILURE); - instance = irm_create(); - if (instance == NULL) { + irmd = irm_create(); + if (irmd == NULL) { close_logfile(); exit(EXIT_FAILURE); } for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); + pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL); - pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL); - pthread_create(&instance->shm_sanitize, NULL, - shm_du_map_sanitize, instance->dum); + pthread_create(&irmd->cleanup_flows, NULL, irm_flow_cleaner, NULL); + pthread_create(&irmd->shm_sanitize, NULL, + shm_du_map_sanitize, irmd->dum); /* wait for (all of them) to return */ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_join(instance->threadpool[t], NULL); + pthread_join(irmd->threadpool[t], NULL); - pthread_join(instance->shm_sanitize, NULL); - pthread_join(instance->cleanup_flows, NULL); + pthread_join(irmd->shm_sanitize, NULL); + pthread_join(irmd->cleanup_flows, NULL); irm_destroy(); diff --git a/src/irmd/registry.c b/src/irmd/registry.c index 32741460..ab47fede 100644 --- a/src/irmd/registry.c +++ b/src/irmd/registry.c @@ -33,6 +33,7 @@ #include #include #include +#include #define reg_entry_has_auto_binding(e) (reg_entry_get_auto_info(e) != NULL) #define reg_entry_has_api(e, api) (reg_entry_get_reg_api(e, api) != NULL) @@ -52,7 +53,25 @@ struct reg_dif { enum ipcp_type type; }; -struct reg_api * reg_api_create(pid_t api) +enum api_state { + REG_I_NULL = 0, + REG_I_INIT, + REG_I_SLEEP, + REG_I_WAKE, + REG_I_DESTROY +}; + +struct reg_api { + struct list_head next; + pid_t api; + + /* the api will block on this */ + enum api_state state; + pthread_cond_t state_cond; + pthread_mutex_t state_lock; +}; + +static struct reg_api * reg_api_create(pid_t api) { struct reg_api * i; i = malloc(sizeof(*i)); @@ -60,74 +79,86 @@ struct reg_api * reg_api_create(pid_t api) return NULL; i->api = api; - i->state = REG_I_WAKE; + i->state = REG_I_INIT; - pthread_mutex_init(&i->mutex, NULL); - pthread_cond_init(&i->cond_state, NULL); + pthread_mutex_init(&i->state_lock, NULL); + pthread_cond_init(&i->state_cond, NULL); INIT_LIST_HEAD(&i->next); return i; } -void reg_api_destroy(struct reg_api * i) +static void reg_api_destroy(struct reg_api * i) { - pthread_mutex_lock(&i->mutex); + pthread_mutex_lock(&i->state_lock); + + if (i->state != REG_I_NULL) + i->state = REG_I_DESTROY; + + pthread_cond_signal(&i->state_cond); - if (i->state != REG_I_SLEEP) - i->state = REG_I_WAKE; - else - i->state = REG_I_NULL; + pthread_mutex_unlock(&i->state_lock); - pthread_cond_broadcast(&i->cond_state); - pthread_mutex_unlock(&i->mutex); + pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, + (void *) &i->state_lock); + + while (i->state != REG_I_NULL) + pthread_cond_wait(&i->state_cond, &i->state_lock); - while (i->state != REG_I_WAKE) - ; + pthread_cleanup_pop(true); - pthread_mutex_destroy(&i->mutex); + pthread_cond_destroy(&i->state_cond); + pthread_mutex_destroy(&i->state_lock); free(i); } void reg_api_sleep(struct reg_api * i) { - pthread_mutex_lock(&i->mutex); - if (i->state != REG_I_WAKE) { - pthread_mutex_unlock(&i->mutex); + if (i == NULL) + return; + + pthread_mutex_lock(&i->state_lock); + if (i->state != REG_I_INIT) { + pthread_mutex_unlock(&i->state_lock); return; } i->state = REG_I_SLEEP; pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, - (void *) &i->mutex); + (void *) &i->state_lock); while (i->state == REG_I_SLEEP) - pthread_cond_wait(&i->cond_state, &i->mutex); + pthread_cond_wait(&i->state_cond, &i->state_lock); - i->state = REG_I_WAKE; - pthread_cond_signal(&i->cond_state); + i->state = REG_I_NULL; + pthread_cond_signal(&i->state_cond); pthread_cleanup_pop(true); } void reg_api_wake(struct reg_api * i) { - pthread_mutex_lock(&i->mutex); + pthread_mutex_lock(&i->state_lock); if (i->state == REG_I_NULL) { - pthread_mutex_unlock(&i->mutex); + pthread_mutex_unlock(&i->state_lock); return; } i->state = REG_I_WAKE; - pthread_cond_signal(&i->cond_state); - pthread_mutex_unlock(&i->mutex); + pthread_cond_broadcast(&i->state_cond); + + while (i->state == REG_I_WAKE) + pthread_cond_wait(&i->state_cond, &i->state_lock); + + pthread_mutex_unlock(&i->state_lock); } -struct reg_binding * reg_binding_create(char * apn, +static struct reg_binding * reg_binding_create(char * apn, uint32_t flags, char ** argv) { @@ -144,7 +175,7 @@ struct reg_binding * reg_binding_create(char * apn, return b; } -void reg_binding_destroy(struct reg_binding * b) +static void reg_binding_destroy(struct reg_binding * b) { if (b == NULL) return; @@ -160,7 +191,7 @@ void reg_binding_destroy(struct reg_binding * b) free(b); } -struct reg_entry * reg_entry_create() +static struct reg_entry * reg_entry_create() { struct reg_entry * e = malloc(sizeof(*e)); if (e == NULL) @@ -175,7 +206,7 @@ struct reg_entry * reg_entry_create() return e; } -struct reg_entry * reg_entry_init(struct reg_entry * e, +static struct reg_entry * reg_entry_init(struct reg_entry * e, char * name) { if (e == NULL || name == NULL) @@ -188,7 +219,7 @@ struct reg_entry * reg_entry_init(struct reg_entry * e, e->name = name; - if (pthread_cond_init(&e->acc_signal, NULL)) + if (pthread_cond_init(&e->state_cond, NULL)) return NULL; if (pthread_mutex_init(&e->state_lock, NULL)) @@ -199,7 +230,7 @@ struct reg_entry * reg_entry_init(struct reg_entry * e, return e; } -void reg_entry_destroy(struct reg_entry * e) +static void reg_entry_destroy(struct reg_entry * e) { struct list_head * pos = NULL; struct list_head * n = NULL; @@ -211,17 +242,14 @@ void reg_entry_destroy(struct reg_entry * e) pthread_mutex_lock(&e->state_lock); - e->state = REG_NAME_NULL; + e->state = REG_NAME_DESTROY; - pthread_cond_broadcast(&e->acc_signal); + pthread_cond_broadcast(&e->state_cond); pthread_mutex_unlock(&e->state_lock); while (wait) { pthread_mutex_lock(&e->state_lock); - if (pthread_cond_destroy(&e->acc_signal)) - pthread_cond_broadcast(&e->acc_signal); - else - wait = false; + pthread_cond_broadcast(&e->state_cond); pthread_mutex_unlock(&e->state_lock); } @@ -593,7 +621,7 @@ struct reg_api * registry_add_api_name(struct list_head * registry, if (e->state == REG_NAME_IDLE || e->state == REG_NAME_AUTO_ACCEPT || e->state == REG_NAME_AUTO_EXEC) { e->state = REG_NAME_FLOW_ACCEPT; - pthread_cond_signal(&e->acc_signal); + pthread_cond_signal(&e->state_cond); } list_add(&i->next, &e->reg_apis); @@ -636,10 +664,33 @@ void registry_del_api(struct list_head * registry, e->state = REG_NAME_FLOW_ACCEPT; } + pthread_cond_signal(&e->state_cond); + return; } -/* FIXME: optimize this */ +void registry_sanitize_apis(struct list_head * registry) +{ + struct list_head * pos = NULL; + struct list_head * n = NULL; + + struct list_head * pos2 = NULL; + struct list_head * n2 = NULL; + + list_for_each_safe(pos, n, registry) { + struct reg_entry * e = list_entry(pos, struct reg_entry, next); + list_for_each_safe(pos2, n2, &e->reg_apis) { + struct reg_api * r + = list_entry(pos2, struct reg_api, next); + if (kill(r->api, 0) < 0) { + LOG_DBG("Process %d gone, binding removed.", + r->api); + registry_del_api(registry, r->api); + } + } + } +} + char * registry_get_dif_for_dst(struct list_head * registry, char * dst_name) { @@ -673,7 +724,7 @@ char * registry_get_dif_for_dst(struct list_head * registry, return NULL; } else { - LOG_DBGF("No local ap %s found.", dst_name); + LOG_DBG("No local ap %s found.", dst_name); return NULL; } } @@ -700,3 +751,18 @@ void registry_del_name_from_dif(struct list_head * registry, reg_entry_del_local_from_dif(re, dif_name); } + +void registry_destroy(struct list_head * registry) +{ + struct list_head * h = NULL; + struct list_head * t = NULL; + + if (registry == NULL) + return; + + list_for_each_safe(h, t, registry) { + struct reg_entry * e = list_entry(h, struct reg_entry, next); + list_del(&e->next); + reg_entry_destroy(e); + } +} diff --git a/src/irmd/registry.h b/src/irmd/registry.h index 8e9a7af1..fb0dceb7 100644 --- a/src/irmd/registry.h +++ b/src/irmd/registry.h @@ -44,24 +44,11 @@ enum reg_name_state { REG_NAME_AUTO_ACCEPT, REG_NAME_AUTO_EXEC, REG_NAME_FLOW_ACCEPT, - REG_NAME_FLOW_ARRIVED + REG_NAME_FLOW_ARRIVED, + REG_NAME_DESTROY }; -enum reg_i_state { - REG_I_NULL = 0, - REG_I_SLEEP, - REG_I_WAKE -}; - -struct reg_api { - struct list_head next; - pid_t api; - - /* the api will block on this */ - enum reg_i_state state; - pthread_cond_t cond_state; - pthread_mutex_t mutex; -}; +struct reg_api; /* an entry in the registry */ struct reg_entry { @@ -76,31 +63,17 @@ struct reg_entry { /* known instances */ struct list_head reg_apis; - /* flow handling information */ + /* FIXME: flow handling information should not be here */ enum reg_name_state state; char * req_ae_name; int response; - pthread_cond_t acc_signal; + pthread_cond_t state_cond; pthread_mutex_t state_lock; }; -struct reg_api * reg_api_create(pid_t api); -void reg_api_destroy(struct reg_api * i); void reg_api_sleep(struct reg_api * i); void reg_api_wake(struct reg_api * i); - - -struct reg_binding * reg_binding_create(char * apn, - uint32_t opts, - char ** argv); -void reg_binding_destroy(); - -struct reg_entry * reg_entry_create(); -struct reg_entry * reg_entry_init(struct reg_entry * e, - char * name); -void reg_entry_destroy(struct reg_entry * e); - struct reg_binding * reg_entry_get_binding(struct reg_entry * e, char * apn); char ** reg_entry_get_auto_info(struct reg_entry * e); @@ -135,6 +108,7 @@ struct reg_api * registry_add_api_name(struct list_head * registry, char * name); void registry_del_api(struct list_head * registry, pid_t api); +void registry_sanitize_apis(struct list_head * registry); struct reg_api * registry_get_api_by_name(struct list_head * registry, char * name); struct reg_entry * registry_get_entry_by_name(struct list_head * registry, @@ -152,4 +126,6 @@ int registry_add_name_to_dif(struct list_head * registry, void registry_del_name_from_dif(struct list_head * registry, char * name, char * dif_name); +void registry_destroy(struct list_head * registry); + #endif -- cgit v1.2.3