From 18e440197cae6d537765a4de6a915f074dce4de5 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Mon, 8 Aug 2016 21:15:57 +0200 Subject: irmd: Refactor and bugfixes Refactors the IRMd to extract reg_api and irm_flow structures to their own sources. Fixes some locking bugs. --- src/irmd/main.c | 320 +++++++++++++++++++++++--------------------------------- 1 file changed, 131 insertions(+), 189 deletions(-) (limited to 'src/irmd/main.c') diff --git a/src/irmd/main.c b/src/irmd/main.c index 69ce765c..a79330ef 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -42,6 +42,7 @@ #include "utils.h" #include "registry.h" +#include "irm_flow.h" #include #include @@ -57,11 +58,11 @@ #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ struct ipcp_entry { - struct list_head next; - char * name; - pid_t api; - enum ipcp_type type; - char * dif_name; + struct list_head next; + char * name; + pid_t api; + enum ipcp_type type; + char * dif_name; }; enum irm_state { @@ -76,20 +77,6 @@ struct spawned_api { }; /* keeps track of port_id's between N and N - 1 */ -struct irm_flow { - struct list_head next; - - int port_id; - - pid_t n_api; - pid_t n_1_api; - - struct timespec t0; - - enum flow_state state; - pthread_cond_t state_cond; - pthread_mutex_t state_lock; -}; struct irm { /* FIXME: list of ipcps could be merged into the registry */ @@ -118,59 +105,6 @@ struct irm { pthread_t shm_sanitize; } * irmd = NULL; -static struct irm_flow * irm_flow_create() -{ - struct irm_flow * e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; - - e->n_api = -1; - e->n_1_api = -1; - e->port_id = -1; - e->state = FLOW_NULL; - - if (pthread_cond_init(&e->state_cond, NULL)) { - free(e); - return NULL; - } - - if (pthread_mutex_init(&e->state_lock, NULL)) { - free(e); - return NULL; - } - - e->t0.tv_sec = 0; - e->t0.tv_nsec = 0; - - return e; -} - -static void irm_flow_destroy(struct irm_flow * e) -{ - pthread_mutex_lock(&e->state_lock); - - if (e->state == FLOW_PENDING) - e->state = FLOW_DESTROY; - else - e->state = FLOW_NULL; - - pthread_cond_signal(&e->state_cond); - pthread_mutex_unlock(&e->state_lock); - - pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, - (void *) &e->state_lock); - - while (e->state != FLOW_NULL) - pthread_cond_wait(&e->state_cond, &e->state_lock); - - pthread_cleanup_pop(true); - - pthread_cond_destroy(&e->state_cond); - pthread_mutex_destroy(&e->state_lock); - - free(e); -} - static struct irm_flow * get_irm_flow(int port_id) { struct list_head * pos = NULL; @@ -294,8 +228,8 @@ static pid_t get_ipcp_by_dst_name(char * dst_name) static pid_t create_ipcp(char * name, enum ipcp_type ipcp_type) { - struct spawned_api * api; - struct ipcp_entry * tmp = NULL; + struct spawned_api * api = NULL; + struct ipcp_entry * tmp = NULL; struct list_head * pos; @@ -738,9 +672,9 @@ static struct irm_flow * flow_accept(pid_t api, char * srv_ap_name, char ** dst_ae_name) { - struct irm_flow * pme = NULL; - struct reg_entry * rne = NULL; - struct reg_api * rgi = NULL; + struct irm_flow * f = NULL; + struct reg_entry * rne = NULL; + struct reg_api * rgi = NULL; pthread_rwlock_rdlock(&irmd->state_lock); @@ -759,7 +693,7 @@ static struct irm_flow * flow_accept(pid_t api, return NULL; } - if (!reg_entry_get_reg_api(rne, api)) { + if ((rgi = reg_entry_get_reg_api(rne, api)) == NULL) { rgi = registry_add_api_name(&irmd->registry, api, rne->name); @@ -794,8 +728,8 @@ static struct irm_flow * flow_accept(pid_t api, pthread_rwlock_rdlock(&irmd->flows_lock); - pme = get_irm_flow_n(api); - if (pme == NULL) { + f = get_irm_flow_n(api); + if (f == NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Port_id was not created yet."); @@ -808,15 +742,15 @@ static struct irm_flow * flow_accept(pid_t api, pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - return pme; + return f; } static int flow_alloc_resp(pid_t n_api, int port_id, int response) { - struct irm_flow * pme = NULL; - struct reg_entry * rne = NULL; + struct irm_flow * f = NULL; + struct reg_entry * rne = NULL; int ret = -1; pthread_rwlock_rdlock(&irmd->state_lock); @@ -835,15 +769,16 @@ static int flow_alloc_resp(pid_t n_api, return -1; } + pthread_mutex_lock(&rne->state_lock); + if (rne->state != REG_NAME_FLOW_ARRIVED) { + pthread_mutex_unlock(&rne->state_lock); pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Process not listening for this name."); return -1; } - pthread_mutex_lock(&rne->state_lock); - registry_del_api(&irmd->registry, n_api); pthread_mutex_unlock(&rne->state_lock); @@ -853,20 +788,20 @@ static int flow_alloc_resp(pid_t n_api, if (!response) { pthread_rwlock_wrlock(&irmd->flows_lock); - pme = get_irm_flow(port_id); - if (pme == NULL) { + f = get_irm_flow(port_id); + if (f == NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pme->state = FLOW_ALLOCATED; - pthread_cond_signal(&pme->state_cond); + f->state = FLOW_ALLOCATED; + pthread_cond_signal(&f->state_cond); pthread_rwlock_unlock(&irmd->flows_lock); - ret = ipcp_flow_alloc_resp(pme->n_1_api, + ret = ipcp_flow_alloc_resp(f->n_1_api, port_id, - pme->n_api, + f->n_api, response); } @@ -880,7 +815,7 @@ static struct irm_flow * flow_alloc(pid_t api, char * src_ae_name, struct qos_spec * qos) { - struct irm_flow * pme; + struct irm_flow * f; pid_t ipcp; /* FIXME: Map qos_spec to qos_cube */ @@ -892,16 +827,16 @@ static struct irm_flow * flow_alloc(pid_t api, return NULL; } - pme = irm_flow_create(); - if (pme == NULL) { + f = irm_flow_create(); + if (f == NULL) { pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Failed to create irm_flow."); return NULL; } - pme->n_api = api; - pme->state = FLOW_PENDING; - if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) + f->n_api = api; + f->state = FLOW_PENDING; + if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) LOG_WARN("Failed to set timestamp."); pthread_rwlock_rdlock(&irmd->reg_lock); @@ -917,44 +852,44 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - pme->port_id = bmp_allocate(irmd->port_ids); - pme->n_1_api = ipcp; + f->port_id = bmp_allocate(irmd->port_ids); + f->n_1_api = ipcp; - list_add(&pme->next, &irmd->irm_flows); + list_add(&f->next, &irmd->irm_flows); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); if (ipcp_flow_alloc(ipcp, - pme->port_id, - pme->n_api, + f->port_id, + f->n_api, dst_name, src_ae_name, QOS_CUBE_BE) < 0) { pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - list_del(&pme->next); - bmp_release(irmd->port_ids, pme->port_id); + list_del(&f->next); + bmp_release(irmd->port_ids, f->port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - free(pme); + free(f); return NULL; } - return pme; + return f; } static void cleanup_alloc_res(void * o) { - struct irm_flow * e = (struct irm_flow *) o; - if (e->state == FLOW_PENDING) - e->state = FLOW_NULL; - pthread_mutex_unlock(&e->state_lock); + struct irm_flow * f = (struct irm_flow *) o; + if (f->state == FLOW_PENDING) + f->state = FLOW_NULL; + pthread_mutex_unlock(&f->state_lock); } static int flow_alloc_res(int port_id) { - struct irm_flow * e; + struct irm_flow * f; pthread_rwlock_rdlock(&irmd->state_lock); @@ -964,22 +899,22 @@ static int flow_alloc_res(int port_id) } pthread_rwlock_rdlock(&irmd->flows_lock); - e = get_irm_flow(port_id); - if (e == NULL) { + f = get_irm_flow(port_id); + if (f == NULL) { LOG_ERR("Could not find port %d.", port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return -1; } - if (e->state == FLOW_NULL) { + if (f->state == FLOW_NULL) { LOG_INFO("Port %d is deprecated.", port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return -1; } - if (e->state == FLOW_ALLOCATED) { + if (f->state == FLOW_ALLOCATED) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return 0; @@ -988,28 +923,28 @@ static int flow_alloc_res(int port_id) pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - pthread_mutex_lock(&e->state_lock); - pthread_cleanup_push(cleanup_alloc_res, (void *) e); + pthread_mutex_lock(&f->state_lock); + pthread_cleanup_push(cleanup_alloc_res, (void *) f); - while (e->state == FLOW_PENDING) - pthread_cond_wait(&e->state_cond, &e->state_lock); + while (f->state == FLOW_PENDING) + pthread_cond_wait(&f->state_cond, &f->state_lock); pthread_cleanup_pop(true); pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - pthread_mutex_lock(&e->state_lock); + pthread_mutex_lock(&f->state_lock); - if (e->state == FLOW_ALLOCATED) { - pthread_mutex_unlock(&e->state_lock); + if (f->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&f->state_lock); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return 0; } - e->state = FLOW_NULL; - pthread_cond_signal(&e->state_cond); - pthread_mutex_unlock(&e->state_lock); + f->state = FLOW_NULL; + pthread_cond_signal(&f->state_cond); + pthread_mutex_unlock(&f->state_lock); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); @@ -1021,22 +956,22 @@ static int flow_dealloc(int port_id) pid_t n_1_api; int ret = 0; - struct irm_flow * e = NULL; + struct irm_flow * f = NULL; pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->flows_lock); bmp_release(irmd->port_ids, port_id); - e = get_irm_flow(port_id); - if (e == NULL) { + f = get_irm_flow(port_id); + if (f == NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return 0; } - n_1_api = e->n_1_api; + n_1_api = f->n_1_api; - list_del(&e->next); + list_del(&f->next); pthread_rwlock_unlock(&irmd->flows_lock); @@ -1044,7 +979,7 @@ static int flow_dealloc(int port_id) pthread_rwlock_unlock(&irmd->state_lock); - irm_flow_destroy(e); + irm_flow_destroy(f); return ret; } @@ -1085,22 +1020,23 @@ static struct irm_flow * flow_req_arr(pid_t api, char * dst_name, char * ae_name) { - struct reg_entry * rne = NULL; - struct irm_flow * pme = NULL; + struct reg_entry * rne = NULL; + struct irm_flow * f = NULL; + struct reg_api * rgi = NULL; enum reg_name_state state; struct spawned_api * c_api; - pme = irm_flow_create(); - if (pme == NULL) { + f = irm_flow_create(); + if (f == NULL) { LOG_ERR("Failed to create irm_flow."); return NULL; } - pme->state = FLOW_PENDING; - pme->n_1_api = api; - if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) + f->state = FLOW_PENDING; + f->n_1_api = api; + if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) LOG_WARN("Failed to set timestamp."); pthread_rwlock_rdlock(&irmd->state_lock); @@ -1111,7 +1047,7 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Unknown name: %s.", dst_name); - free(pme); + free(f); return NULL; } @@ -1124,14 +1060,14 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("No AP's for %s.", dst_name); - free(pme); + free(f); return NULL; case REG_NAME_AUTO_ACCEPT: c_api = malloc(sizeof(*c_api)); if (c_api == NULL) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - free(pme); + free(f); return NULL; } @@ -1146,7 +1082,7 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_mutex_unlock(&rne->state_lock); pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - free(pme); + free(f); free(c_api); return NULL; } @@ -1154,6 +1090,7 @@ static struct irm_flow * flow_req_arr(pid_t api, list_add(&c_api->next, &irmd->spawned_apis); pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); pthread_mutex_lock(&rne->state_lock); pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, @@ -1164,18 +1101,20 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_cleanup_pop(true); + pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_rdlock(&irmd->reg_lock); pthread_mutex_lock(&rne->state_lock); if (rne->state == REG_NAME_DESTROY) { rne->state = REG_NAME_NULL; pthread_mutex_unlock(&rne->state_lock); pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); return NULL; } pthread_mutex_unlock(&rne->state_lock); case REG_NAME_FLOW_ACCEPT: - pme->n_api = reg_entry_resolve_api(rne); - if (pme->n_api == -1) { + f->n_api = reg_entry_resolve_api(rne); + if (f->n_api == -1) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Invalid api returned."); @@ -1187,70 +1126,73 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("IRMd in wrong state."); - free(pme); + free(f); return NULL; } pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - pme->port_id = bmp_allocate(irmd->port_ids); + f->port_id = bmp_allocate(irmd->port_ids); - list_add(&pme->next, &irmd->irm_flows); + list_add(&f->next, &irmd->irm_flows); pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_rdlock(&irmd->reg_lock); pthread_mutex_lock(&rne->state_lock); rne->req_ae_name = ae_name; rne->state = REG_NAME_FLOW_ARRIVED; - reg_api_wake(reg_entry_get_reg_api(rne, pme->n_api)); + rgi = reg_entry_get_reg_api(rne, f->n_api); pthread_mutex_unlock(&rne->state_lock); - + pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); + reg_api_wake(rgi); + + pthread_mutex_lock(&rne->state_lock); pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, (void *) &rne->state_lock); - while (rne->state == REG_NAME_FLOW_ARRIVED && - irmd->state == IRMD_RUNNING) + while (rne->state == REG_NAME_FLOW_ARRIVED) pthread_cond_wait(&rne->state_cond, &rne->state_lock); pthread_cleanup_pop(true); - return pme; + return f; } static int flow_alloc_reply(int port_id, int response) { - struct irm_flow * e; + struct irm_flow * f; pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_rdlock(&irmd->flows_lock); - e = get_irm_flow(port_id); - if (e == NULL) { + f = get_irm_flow(port_id); + if (f == NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return -1; } - pthread_mutex_lock(&e->state_lock); + pthread_mutex_lock(&f->state_lock); if (!response) - e->state = FLOW_ALLOCATED; + f->state = FLOW_ALLOCATED; else - e->state = FLOW_NULL; + f->state = FLOW_NULL; - if (pthread_cond_signal(&e->state_cond)) + if (pthread_cond_signal(&f->state_cond)) LOG_ERR("Failed to send signal."); - pthread_mutex_unlock(&e->state_lock); + pthread_mutex_unlock(&f->state_lock); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); @@ -1260,24 +1202,24 @@ static int flow_alloc_reply(int port_id, static int flow_dealloc_ipcp(int port_id) { - struct irm_flow * e = NULL; + struct irm_flow * f = NULL; pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - e = get_irm_flow(port_id); - if (e == NULL) { + f = get_irm_flow(port_id); + if (f == NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return 0; } - list_del(&e->next); + list_del(&f->next); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - irm_flow_destroy(e); + irm_flow_destroy(f); return 0; } @@ -1324,9 +1266,9 @@ static void irm_destroy() pthread_rwlock_wrlock(&irmd->flows_lock); list_for_each_safe(h, t, &irmd->irm_flows) { - struct irm_flow * e = list_entry(h, struct irm_flow, next); - list_del(&e->next); - irm_flow_destroy(e); + struct irm_flow * f = list_entry(h, struct irm_flow, next); + list_del(&f->next); + irm_flow_destroy(f); } if (irmd->port_ids != NULL) @@ -1405,46 +1347,46 @@ void * irm_flow_cleaner() pthread_rwlock_wrlock(&irmd->flows_lock); list_for_each_safe(pos, n, &(irmd->irm_flows)) { - struct irm_flow * e = + struct irm_flow * f = list_entry(pos, struct irm_flow, next); - pthread_mutex_lock(&e->state_lock); + pthread_mutex_lock(&f->state_lock); - if (e->state == FLOW_PENDING && - ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) { + if (f->state == FLOW_PENDING && + ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { LOG_INFO("Pending port_id %d timed out.", - e->port_id); - e->state = FLOW_NULL; - pthread_cond_signal(&e->state_cond); - pthread_mutex_unlock(&e->state_lock); + f->port_id); + f->state = FLOW_NULL; + pthread_cond_signal(&f->state_cond); + pthread_mutex_unlock(&f->state_lock); continue; } - pthread_mutex_unlock(&e->state_lock); + pthread_mutex_unlock(&f->state_lock); - if (kill(e->n_api, 0) < 0) { + if (kill(f->n_api, 0) < 0) { struct shm_ap_rbuff * n_rb = - shm_ap_rbuff_open(e->n_api); - bmp_release(irmd->port_ids, e->port_id); + shm_ap_rbuff_open(f->n_api); + bmp_release(irmd->port_ids, f->port_id); - list_del(&e->next); + list_del(&f->next); LOG_INFO("Process %d gone, %d deallocated.", - e->n_api, e->port_id); - ipcp_flow_dealloc(e->n_1_api, e->port_id); + f->n_api, f->port_id); + ipcp_flow_dealloc(f->n_1_api, f->port_id); if (n_rb != NULL) shm_ap_rbuff_destroy(n_rb); - irm_flow_destroy(e); + irm_flow_destroy(f); continue; } - if (kill(e->n_1_api, 0) < 0) { + if (kill(f->n_1_api, 0) < 0) { struct shm_ap_rbuff * n_1_rb = - shm_ap_rbuff_open(e->n_1_api); - list_del(&e->next); + shm_ap_rbuff_open(f->n_1_api); + list_del(&f->next); LOG_ERR("IPCP %d gone, flow %d removed.", - e->n_1_api, e->port_id); + f->n_1_api, f->port_id); if (n_1_rb != NULL) shm_ap_rbuff_destroy(n_1_rb); - irm_flow_destroy(e); + irm_flow_destroy(f); } } -- cgit v1.2.3