From 22ef5ea7feeace47bff9d16434a4b705d1ae4a82 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Mon, 16 May 2016 01:44:27 +0200 Subject: irmd: new locking implementation This locking should be more consistent, It now has three locks, one guarding flows and port_id's, one guarding the registered apps and ipcps, and one guarding the overall state of the irmd. There are two additional mutexes guarding the condition variables. --- src/irmd/main.c | 396 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 221 insertions(+), 175 deletions(-) diff --git a/src/irmd/main.c b/src/irmd/main.c index 9c515d2a..b7e1ad18 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -102,23 +103,18 @@ struct irm { /* FIXME: list of ipcps could be merged with registered names */ struct list_head ipcps; struct list_head reg_names; - - int sockfd; + rw_lock_t reg_lock; /* keep track of all flows in this processing system */ struct bmp * port_ids; - /* maps port_ids to pid pair */ struct list_head port_map; + rw_lock_t flows_lock; struct shm_du_map * dum; - - pthread_t * threadpool; - - pthread_mutex_t r_lock; - - bool shutdown; - pthread_mutex_t s_lock; + pthread_t * threadpool; + int sockfd; + rw_lock_t state_lock; } * instance = NULL; static struct port_map_entry * port_map_entry_create() @@ -405,45 +401,45 @@ static pid_t create_ipcp(char * ap_name, pid_t pid; struct ipcp_entry * tmp = NULL; - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return -1; - } - pthread_mutex_unlock(&instance->s_lock); - + rw_lock_rdlock(&instance->state_lock); pid = ipcp_create(ap_name, ipcp_type); if (pid == -1) { + rw_lock_unlock(&instance->state_lock); LOG_ERR("Failed to create IPCP."); return -1; } tmp = ipcp_entry_create(); - if (tmp == NULL) + if (tmp == NULL) { + rw_lock_unlock(&instance->state_lock); return -1; + } INIT_LIST_HEAD(&tmp->next); tmp->api = instance_name_create(); if (tmp->api == NULL) { ipcp_entry_destroy(tmp); + rw_lock_unlock(&instance->state_lock); return -1; } if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) { instance_name_destroy(tmp->api); ipcp_entry_destroy(tmp); + rw_lock_unlock(&instance->state_lock); return -1; } tmp->dif_name = NULL; - pthread_mutex_lock(&instance->r_lock); + rw_lock_wrlock(&instance->reg_lock); list_add(&tmp->next, &instance->ipcps); - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_INFO("Created IPCP %s-%d.", ap_name, pid); @@ -456,14 +452,18 @@ static int destroy_ipcp(instance_name_t * api) struct list_head * n = NULL; pid_t pid = 0; - if (api == NULL) return 0; + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->reg_lock); + if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("No such IPCP in the system."); return 0; } @@ -482,6 +482,9 @@ static int destroy_ipcp(instance_name_t * api) ipcp_entry_destroy(tmp); } + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); + LOG_INFO("Destroyed IPCP %d.", pid); return 0; @@ -492,47 +495,46 @@ static int bootstrap_ipcp(instance_name_t * api, { struct ipcp_entry * entry = NULL; - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return -1; - } - pthread_mutex_unlock(&instance->s_lock); - - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->reg_lock); if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("No such IPCP in the system."); return -1; } entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api->id, conf)) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.", api->name, api->id, conf->dif_name); @@ -548,58 +550,56 @@ static int enroll_ipcp(instance_name_t * api, ssize_t n_1_difs_size = 0; struct ipcp_entry * entry = NULL; - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return -1; - } - pthread_mutex_unlock(&instance->s_lock); - - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->reg_lock); entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("Failed to strdup."); return -1; } member = da_resolve_daf(dif_name); if (member == NULL) { - LOG_ERR("Could not find a member of that DIF."); free(entry->dif_name); entry->dif_name = NULL; - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return -1; } n_1_difs_size = da_resolve_dap(member, n_1_difs); if (n_1_difs_size < 1) { - LOG_ERR("Could not find N-1 DIFs."); free(entry->dif_name); entry->dif_name = NULL; - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); + LOG_ERR("Could not find N-1 DIFs."); return -1; } - pthread_mutex_unlock(&instance->r_lock); - if (ipcp_enroll(api->id, member, n_1_difs[0])) { - LOG_ERR("Could not enroll IPCP."); - pthread_mutex_lock(&instance->r_lock); free(entry->dif_name); entry->dif_name = NULL; - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); + LOG_ERR("Could not enroll IPCP."); return -1; } + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); + LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", api->name, api->id, dif_name); @@ -610,18 +610,19 @@ static int reg_ipcp(instance_name_t * api, char ** difs, size_t difs_size) { - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return -1; - } - pthread_mutex_unlock(&instance->s_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->reg_lock); if (ipcp_reg(api->id, difs, difs_size)) { + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("Could not register IPCP to N-1 DIF(s)."); return -1; } + rw_lock_wrlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); + return 0; } @@ -629,11 +630,16 @@ static int unreg_ipcp(instance_name_t * api, char ** difs, size_t difs_size) { - + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->reg_lock); if (ipcp_unreg(api->id, difs, difs_size)) { + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("Could not unregister IPCP from N-1 DIF(s)."); return -1; } + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return 0; } @@ -651,28 +657,25 @@ static int ap_reg(char * ap_name, instance_name_t * api = NULL; instance_name_t * ipcpi = NULL; - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return -1; - } - pthread_mutex_unlock(&instance->s_lock); - - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->reg_lock); if (instance->ipcps.next == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return -1; } api = instance_name_create(); if (api == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return -1; } if (instance_name_init_from(api, ap_name, ap_id) == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); instance_name_destroy(api); return -1; } @@ -681,8 +684,9 @@ static int ap_reg(char * ap_name, rne = get_reg_name_entry_by_name(ap_name); if (rne != NULL) { + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); instance_name_destroy(api); - pthread_mutex_unlock(&instance->r_lock); return -1; /* can only register one instance for now */ } @@ -721,15 +725,17 @@ static int ap_reg(char * ap_name, } if (ret == 0) { + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); instance_name_destroy(api); - pthread_mutex_unlock(&instance->r_lock); return -1; } /* for now, we register single instances */ ret = reg_name_entry_add_name_instance(strdup(ap_name), api); - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return ret; } @@ -744,22 +750,26 @@ static int ap_unreg(char * ap_name, struct reg_name_entry * rne = NULL; struct list_head * pos = NULL; - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->reg_lock); /* check if ap_name is registered */ rne = get_reg_name_entry_by_id(ap_id); if (rne == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return 0; /* no such id */ } if (strcmp(ap_name, rne->api->name)) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return 0; } if (instance->ipcps.next == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("No IPCPs in this system."); return 0; } @@ -788,7 +798,8 @@ static int ap_unreg(char * ap_name, /* FIXME: check if name is not registered in any DIF before removing */ reg_name_entry_del_name(rne->name); - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return ret; } @@ -800,22 +811,19 @@ static struct port_map_entry * flow_accept(pid_t pid, struct port_map_entry * pme; struct reg_name_entry * rne = NULL; - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return NULL; - } - pthread_mutex_unlock(&instance->s_lock); - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->reg_lock); rne = get_reg_name_entry_by_id(pid); if (rne == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_DBGF("Unregistered AP calling accept()."); return NULL; } if (rne->accept) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_DBGF("This AP still has a pending accept()."); return NULL; } @@ -823,7 +831,8 @@ static struct port_map_entry * flow_accept(pid_t pid, rne->accept = true; rne->flow_arrived = -1; - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); pthread_mutex_lock(&rne->acc_lock); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, @@ -840,16 +849,20 @@ static struct port_map_entry * flow_accept(pid_t pid, /* ap with pending accept being unregistered */ if (rne->flow_arrived == -2 ) { pthread_mutex_unlock(&rne->acc_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return NULL; } pthread_mutex_unlock(&rne->acc_lock); - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->flows_lock); pme = get_port_map_entry_n(pid); if (pme == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("Port_id was not created yet."); return NULL; } @@ -858,7 +871,8 @@ static struct port_map_entry * flow_accept(pid_t pid, if (ae_name != NULL) *ae_name = rne->req_ae_name; - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); return pme; } @@ -869,25 +883,22 @@ static int flow_alloc_resp(pid_t n_pid, { struct port_map_entry * pme = NULL; struct reg_name_entry * rne = NULL; + int ret = -1; - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return -1; - } - pthread_mutex_unlock(&instance->s_lock); - - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->reg_lock); rne = get_reg_name_entry_by_id(n_pid); if (rne == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return -1; } /* FIXME: check all instances associated with the name */ if (!rne->accept) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_ERR("No process listening for this name."); return -1; } @@ -897,20 +908,37 @@ static int flow_alloc_resp(pid_t n_pid, * once we can handle a list of AP-I's, remove it from the list */ + pthread_mutex_lock(&rne->acc_lock); + rne->accept = false; rne->flow_arrived = -1; + pthread_mutex_unlock(&rne->acc_lock); + + rw_lock_unlock(&instance->reg_lock); + if (!response) { + rw_lock_wrlock(&instance->flows_lock); + pme = get_port_map_entry(port_id); + if (pme == NULL) { + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); + return -1; + } + pme->state = FLOW_ALLOCATED; + ret = ipcp_flow_alloc_resp(pme->n_1_pid, + port_id, + pme->n_pid, + response); + + rw_lock_unlock(&instance->flows_lock); } - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->state_lock); - return ipcp_flow_alloc_resp(pme->n_1_pid, - port_id, - pme->n_pid, - response); + return ret; } static struct port_map_entry * flow_alloc(pid_t pid, @@ -924,13 +952,6 @@ static struct port_map_entry * flow_alloc(pid_t pid, /* FIXME: Map qos_spec to qos_cube */ - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return NULL; - } - pthread_mutex_unlock(&instance->s_lock); - pme = port_map_entry_create(); if (pme == NULL) { LOG_ERR("Failed malloc of port_map_entry."); @@ -940,22 +961,29 @@ static struct port_map_entry * flow_alloc(pid_t pid, pme->n_pid = pid; pme->state = FLOW_PENDING; - pthread_mutex_lock(&instance->r_lock); - - pme->port_id = bmp_allocate(instance->port_ids); - pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; - - list_add(&pme->next, &instance->port_map); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->reg_lock); ipcp = get_ipcp_by_dst_name(dst_name); - pthread_mutex_unlock(&instance->r_lock); - if (ipcp == NULL) { + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_DBG("unknown ipcp"); return NULL; } + rw_lock_unlock(&instance->reg_lock); + rw_lock_wrlock(&instance->flows_lock); + + pme->port_id = bmp_allocate(instance->port_ids); + pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + + list_add(&pme->next, &instance->port_map); + + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); + if (ipcp_flow_alloc(ipcp->id, pme->port_id, pme->n_pid, @@ -963,15 +991,13 @@ static struct port_map_entry * flow_alloc(pid_t pid, src_ap_name, src_ae_name, QOS_CUBE_BE) < 0) { - pthread_mutex_lock(&instance->r_lock); - + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->flows_lock); list_del(&pme->next); - - pthread_mutex_unlock(&instance->r_lock); - bmp_release(instance->port_ids, pme->port_id); + rw_lock_unlock(&instance->flows_lock); + rw_lock_rdlock(&instance->state_lock); free(pme); - return NULL; } @@ -982,27 +1008,24 @@ static int flow_alloc_res(int port_id) { struct port_map_entry * e; - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return -1; - } - pthread_mutex_unlock(&instance->s_lock); - - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); return -1; } if (e->state == FLOW_ALLOCATED) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); return 0; } - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); while (true) { pthread_mutex_lock(&e->res_lock); @@ -1013,45 +1036,52 @@ static int flow_alloc_res(int port_id) pthread_mutex_unlock(&e->res_lock); pthread_cleanup_pop(0); - pthread_mutex_lock(&instance->r_lock); + + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); return -1; } if (e->state == FLOW_ALLOCATED) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); LOG_DBGF("Returning 0."); return 0; } if (e->state == FLOW_NULL) { list_del(&e->next); - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); free(e); return -1; } /* still pending, spurious wake */ - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); } - pthread_mutex_unlock(&instance->r_lock); - return 0; } static int flow_dealloc(int port_id) { - pid_t n_1_pid; + pid_t n_1_pid; + int ret = 0; struct port_map_entry * e = NULL; - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_wrlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); return 0; } @@ -1061,11 +1091,14 @@ static int flow_dealloc(int port_id) bmp_release(instance->port_ids, port_id); - pthread_mutex_unlock(&instance->r_lock); + ret = ipcp_flow_dealloc(n_1_pid, port_id); + + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); free(e); - return ipcp_flow_dealloc(n_1_pid, port_id); + return ret; } static struct port_map_entry * flow_req_arr(pid_t pid, @@ -1076,13 +1109,6 @@ static struct port_map_entry * flow_req_arr(pid_t pid, struct reg_name_entry * rne; struct port_map_entry * pme; - pthread_mutex_lock(&instance->s_lock); - if (instance->shutdown) { - pthread_mutex_unlock(&instance->s_lock); - return NULL; - } - pthread_mutex_unlock(&instance->s_lock); - pme = malloc(sizeof(*pme)); if (pme == NULL) { LOG_ERR("Failed malloc of port_map_entry."); @@ -1092,13 +1118,17 @@ static struct port_map_entry * flow_req_arr(pid_t pid, pme->state = FLOW_PENDING; pme->n_1_pid = pid; - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->reg_lock); + rw_lock_wrlock(&instance->flows_lock); pme->port_id = bmp_allocate(instance->port_ids); rne = get_reg_name_entry_by_name(dst_name); if (rne == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); LOG_DBGF("Destination name %s unknown.", dst_name); free(pme); return NULL; @@ -1115,12 +1145,14 @@ static struct port_map_entry * flow_req_arr(pid_t pid, rne->flow_arrived = 0; + pthread_mutex_unlock(&rne->acc_lock); + if (pthread_cond_signal(&rne->acc_signal)) LOG_ERR("Failed to send signal."); - pthread_mutex_unlock(&rne->acc_lock); - - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->reg_lock); + rw_lock_unlock(&instance->state_lock); return pme; } @@ -1130,11 +1162,13 @@ static int flow_alloc_reply(int port_id, { struct port_map_entry * e; - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); return -1; } @@ -1151,7 +1185,8 @@ static int flow_alloc_reply(int port_id, pthread_mutex_unlock(&e->res_lock); - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); return 0; } @@ -1160,17 +1195,20 @@ static int flow_dealloc_ipcp(int port_id) { struct port_map_entry * e = NULL; - pthread_mutex_lock(&instance->r_lock); + rw_lock_rdlock(&instance->state_lock); + rw_lock_rdlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); return 0; } list_del(&e->next); - pthread_mutex_unlock(&instance->r_lock); + rw_lock_unlock(&instance->flows_lock); + rw_lock_unlock(&instance->state_lock); free(e); @@ -1185,9 +1223,7 @@ static void irm_destroy(struct irm * irm) if (irm == NULL) return; - pthread_mutex_lock(&irm->s_lock); - instance->shutdown = true; - pthread_mutex_unlock(&irm->s_lock); + rw_lock_wrlock(&instance->state_lock); if (irm->threadpool != NULL) free(irm->threadpool); @@ -1221,6 +1257,8 @@ static void irm_destroy(struct irm * irm) close(irm->sockfd); free(irm); + + rw_lock_unlock(&instance->state_lock); } void irmd_sig_handler(int sig, siginfo_t * info, void * c) @@ -1231,11 +1269,16 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) case SIGINT: case SIGTERM: case SIGHUP: + rw_lock_wrlock(&instance->state_lock); + if (instance->threadpool != NULL) { for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) pthread_cancel(instance->threadpool[i]); + } + rw_lock_unlock(&instance->state_lock); + case SIGPIPE: LOG_DBG("Ignoring SIGPIPE."); default: @@ -1458,14 +1501,17 @@ static struct irm * irm_create() return NULL; } - if (pthread_mutex_init(&i->r_lock, NULL)) { + if (rw_lock_init(&i->state_lock)) { irm_destroy(i); return NULL; } - i->shutdown = false; + if (rw_lock_init(&i->reg_lock)) { + irm_destroy(i); + return NULL; + } - if (pthread_mutex_init(&i->s_lock, NULL)) { + if (rw_lock_init(&i->flows_lock)) { irm_destroy(i); return NULL; } -- cgit v1.2.3