diff options
Diffstat (limited to 'src/irmd')
-rw-r--r-- | src/irmd/main.c | 464 |
1 files changed, 242 insertions, 222 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 5f7c1ddc..50055c4d 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -36,7 +36,6 @@ #include <ouroboros/bitmap.h> #include <ouroboros/flow.h> #include <ouroboros/qos.h> -#include <ouroboros/rw_lock.h> #include <ouroboros/time_utils.h> #include "utils.h" @@ -113,20 +112,21 @@ struct irm { /* FIXME: list of ipcps could be merged with registered names */ struct list_head ipcps; struct list_head reg_names; - rw_lock_t reg_lock; + pthread_rwlock_t reg_lock; /* keep track of all flows in this processing system */ struct bmp * port_ids; /* maps port_ids to pid pair */ struct list_head port_map; - rw_lock_t flows_lock; + pthread_rwlock_t flows_lock; struct shm_du_map * dum; pthread_t * threadpool; int sockfd; - rw_lock_t state_lock; + pthread_rwlock_t state_lock; pthread_t cleanup_flows; + pthread_t shm_sanitize; } * instance = NULL; static struct port_map_entry * port_map_entry_create() @@ -435,18 +435,18 @@ static pid_t create_ipcp(char * ap_name, pid_t pid; struct ipcp_entry * tmp = NULL; - rw_lock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->state_lock); pid = ipcp_create(ap_name, ipcp_type); if (pid == -1) { - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("Failed to create IPCP."); return -1; } tmp = ipcp_entry_create(); if (tmp == NULL) { - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } @@ -455,25 +455,25 @@ static pid_t create_ipcp(char * ap_name, tmp->api = instance_name_create(); if (tmp->api == NULL) { ipcp_entry_destroy(tmp); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) { instance_name_destroy(tmp->api); ipcp_entry_destroy(tmp); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } tmp->dif_name = NULL; - rw_lock_wrlock(&instance->reg_lock); + pthread_rwlock_wrlock(&instance->reg_lock); list_add(&tmp->next, &instance->ipcps); - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_INFO("Created IPCP %s-%d.", ap_name, pid); @@ -489,15 +489,10 @@ static int destroy_ipcp(instance_name_t * api) if (api == NULL) return 0; - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->reg_lock); - if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); LOG_ERR("No such IPCP in the system."); return 0; } @@ -516,8 +511,6 @@ static int destroy_ipcp(instance_name_t * api) } } - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); LOG_INFO("Destroyed IPCP %d.", pid); @@ -529,46 +522,46 @@ static int bootstrap_ipcp(instance_name_t * api, { struct ipcp_entry * entry = NULL; - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->reg_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->reg_lock); if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("No such IPCP in the system."); return -1; } entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api->id, conf)) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.", api->name, api->id, conf->dif_name); @@ -584,21 +577,21 @@ static int enroll_ipcp(instance_name_t * api, ssize_t n_1_difs_size = 0; struct ipcp_entry * entry = NULL; - rw_lock_rdlock(&instance->state_lock); - rw_lock_rdlock(&instance->reg_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->reg_lock); entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("Failed to strdup."); return -1; } @@ -607,8 +600,8 @@ static int enroll_ipcp(instance_name_t * api, if (member == NULL) { free(entry->dif_name); entry->dif_name = NULL; - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } @@ -616,8 +609,8 @@ static int enroll_ipcp(instance_name_t * api, if (n_1_difs_size < 1) { free(entry->dif_name); entry->dif_name = NULL; - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("Could not find N-1 DIFs."); return -1; } @@ -625,14 +618,14 @@ static int enroll_ipcp(instance_name_t * api, if (ipcp_enroll(api->id, member, n_1_difs[0])) { free(entry->dif_name); entry->dif_name = NULL; - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("Could not enroll IPCP."); return -1; } - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", api->name, api->id, dif_name); @@ -657,25 +650,25 @@ static int ap_reg(char * name, instance_name_t * api = NULL; char ** argv_dup = NULL; - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->reg_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->reg_lock); if (instance->ipcps.next == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } api = instance_name_create(); if (api == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } if (instance_name_init_from(api, path_strip(ap_name), ap_id) == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); instance_name_destroy(api); return -1; } @@ -683,8 +676,8 @@ static int ap_reg(char * name, /* check if this name is already registered */ rne = get_reg_name_entry_by_name(name); if (rne != NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); instance_name_destroy(api); return -1; /* can only register one instance for now */ } @@ -711,8 +704,8 @@ static int ap_reg(char * name, } } if (ret == 0) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); instance_name_destroy(api); return -1; } @@ -735,8 +728,8 @@ static int ap_reg(char * name, < 0) LOG_DBGF("Failed to add application %s.", api->name); - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); return ret; } @@ -756,8 +749,8 @@ static int ap_unreg(char * name, if (name == NULL || len == 0 || difs == NULL || difs[0] == NULL) return -1; - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->reg_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->reg_lock); if (!hard && strcmp(difs[0], "*") != 0) { LOG_INFO("Unregistration not complete yet."); @@ -787,8 +780,8 @@ static int ap_unreg(char * name, reg_name_entry_del_name(rne->name); - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); return ret; } @@ -800,13 +793,13 @@ static struct port_map_entry * flow_accept(pid_t pid, struct port_map_entry * pme; struct reg_name_entry * rne = NULL; - rw_lock_rdlock(&instance->state_lock); - rw_lock_rdlock(&instance->reg_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->reg_lock); rne = get_reg_name_entry_by_ap_name(srv_ap_name); if (rne == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_DBGF("AP %s is unknown.", srv_ap_name); return NULL; } @@ -814,16 +807,16 @@ static struct port_map_entry * flow_accept(pid_t pid, if (rne->api->id == 0) { rne->api->id = pid; } else if (rne->api->id != pid) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_DBGF("Can only register one instance."); LOG_MISSING; return NULL; } if (rne->accept) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_DBGF("This AP still has a pending accept()."); return NULL; } @@ -833,18 +826,17 @@ static struct port_map_entry * flow_accept(pid_t pid, pthread_cond_broadcast(&rne->acc_signal); - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); pthread_mutex_lock(&rne->acc_lock); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void*) &rne->acc_lock); + (void *) &rne->acc_lock); while (rne->flow_arrived == -1) pthread_cond_wait(&rne->acc_arr_signal, &rne->acc_lock); - pthread_mutex_unlock(&rne->acc_lock); - pthread_cleanup_pop(0); + pthread_cleanup_pop(true); pthread_mutex_lock(&rne->acc_lock); @@ -856,13 +848,13 @@ static struct port_map_entry * flow_accept(pid_t pid, pthread_mutex_unlock(&rne->acc_lock); - rw_lock_rdlock(&instance->state_lock); - rw_lock_rdlock(&instance->flows_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->flows_lock); pme = get_port_map_entry_n(pid); if (pme == NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("Port_id was not created yet."); return NULL; } @@ -870,8 +862,8 @@ static struct port_map_entry * flow_accept(pid_t pid, if (dst_ae_name != NULL) *dst_ae_name = rne->req_ae_name; - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return pme; } @@ -884,20 +876,20 @@ static int flow_alloc_resp(pid_t n_pid, struct reg_name_entry * rne = NULL; int ret = -1; - rw_lock_rdlock(&instance->state_lock); - rw_lock_rdlock(&instance->reg_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->reg_lock); rne = get_reg_name_entry_by_id(n_pid); if (rne == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } /* FIXME: check all instances associated with the name */ if (!rne->accept) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("No process listening for this name."); return -1; } @@ -914,28 +906,30 @@ static int flow_alloc_resp(pid_t n_pid, pthread_mutex_unlock(&rne->acc_lock); - rw_lock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->reg_lock); if (!response) { - rw_lock_wrlock(&instance->flows_lock); + pthread_rwlock_wrlock(&instance->flows_lock); pme = get_port_map_entry(port_id); if (pme == NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } pme->state = FLOW_ALLOCATED; + + pthread_rwlock_unlock(&instance->flows_lock); + ret = ipcp_flow_alloc_resp(pme->n_1_pid, port_id, pme->n_pid, response); - rw_lock_unlock(&instance->flows_lock); } - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->state_lock); return ret; } @@ -962,30 +956,30 @@ static struct port_map_entry * flow_alloc(pid_t pid, if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) LOG_WARN("Failed to set timestamp."); - rw_lock_rdlock(&instance->state_lock); - rw_lock_rdlock(&instance->reg_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->reg_lock); if (qos != NULL) dif_name = qos->dif_name; ipcp = get_ipcp_by_dst_name(dst_name, dif_name); if (ipcp == NULL) { - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_DBG("Unknown DIF name."); return NULL; } - rw_lock_unlock(&instance->reg_lock); - rw_lock_wrlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_wrlock(&instance->flows_lock); pme->port_id = bmp_allocate(instance->port_ids); pme->n_1_pid = ipcp->id; list_add(&pme->next, &instance->port_map); - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); if (ipcp_flow_alloc(ipcp->id, pme->port_id, @@ -993,12 +987,12 @@ static struct port_map_entry * flow_alloc(pid_t pid, dst_name, src_ae_name, QOS_CUBE_BE) < 0) { - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->flows_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->flows_lock); list_del(&pme->next); bmp_release(instance->port_ids, pme->port_id); - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); free(pme); return NULL; } @@ -1010,30 +1004,30 @@ static int flow_alloc_res(int port_id) { struct port_map_entry * e; - rw_lock_rdlock(&instance->state_lock); - rw_lock_rdlock(&instance->flows_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } if (e->state == FLOW_NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } if (e->state == FLOW_ALLOCATED) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return 0; } - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); while (true) { pthread_mutex_lock(&e->res_lock); @@ -1042,35 +1036,34 @@ static int flow_alloc_res(int port_id) pthread_cond_wait(&e->res_signal, &e->res_lock); - pthread_mutex_unlock(&e->res_lock); - pthread_cleanup_pop(0); + pthread_cleanup_pop(true); - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->flows_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } if (e->state == FLOW_ALLOCATED) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return 0; } if (e->state == FLOW_NULL) { /* don't release the port_id, AP has to call dealloc */ list_del(&e->next); - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); free(e); return -1; } /* still pending, spurious wake */ - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); } return 0; @@ -1083,15 +1076,14 @@ static int flow_dealloc(int port_id) struct port_map_entry * e = NULL; - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->flows_lock); - + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->flows_lock); bmp_release(instance->port_ids, port_id); e = get_port_map_entry(port_id); if (e == NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return 0; } @@ -1099,10 +1091,11 @@ static int flow_dealloc(int port_id) list_del(&e->next); + pthread_rwlock_unlock(&instance->flows_lock); + ret = ipcp_flow_dealloc(n_1_pid, port_id); - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->state_lock); free(e); @@ -1148,17 +1141,17 @@ static struct port_map_entry * flow_req_arr(pid_t pid, pme->state = FLOW_PENDING; pme->n_1_pid = pid; - rw_lock_rdlock(&instance->state_lock); - rw_lock_rdlock(&instance->reg_lock); - rw_lock_wrlock(&instance->flows_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->reg_lock); + pthread_rwlock_wrlock(&instance->flows_lock); pme->port_id = bmp_allocate(instance->port_ids); rne = get_reg_name_entry_by_name(dst_name); if (rne == NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_DBGF("Destination name %s unknown.", dst_name); free(pme); return NULL; @@ -1168,13 +1161,20 @@ static struct port_map_entry * flow_req_arr(pid_t pid, list_add(&pme->next, &instance->port_map); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_mutex_lock(&rne->acc_lock); + pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, + (void *) &rne->acc_lock); + rne->req_ae_name = ae_name; if (rne->accept == false) { if (rne->autoexec) { + pthread_rwlock_wrlock(&instance->flows_lock); pme->n_pid = auto_execute(rne->api->name, rne->argv); + pthread_rwlock_unlock(&instance->flows_lock); while (rne->accept == false) pthread_cond_wait(&rne->acc_signal, &rne->acc_lock); @@ -1188,21 +1188,19 @@ static struct port_map_entry * flow_req_arr(pid_t pid, rne->flow_arrived = 0; - pthread_mutex_unlock(&rne->acc_lock); - if (pthread_cond_signal(&rne->acc_arr_signal)) LOG_ERR("Failed to send signal."); + pthread_cleanup_pop(true); + while (acc_wait) { - sched_yield(); pthread_mutex_lock(&rne->acc_lock); acc_wait = (rne->flow_arrived != -1); pthread_mutex_unlock(&rne->acc_lock); } - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->reg_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); return pme; } @@ -1212,13 +1210,13 @@ static int flow_alloc_reply(int port_id, { struct port_map_entry * e; - rw_lock_rdlock(&instance->state_lock); - rw_lock_rdlock(&instance->flows_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return -1; } @@ -1235,8 +1233,8 @@ static int flow_alloc_reply(int port_id, pthread_mutex_unlock(&e->res_lock); - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return 0; } @@ -1245,48 +1243,46 @@ static int flow_dealloc_ipcp(int port_id) { struct port_map_entry * e = NULL; - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->flows_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); return 0; } list_del(&e->next); - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); free(e); return 0; } -static void irm_destroy(struct irm * irm) +static void irm_destroy() { struct list_head * h; struct list_head * t; - if (irm == NULL) - return; - - rw_lock_wrlock(&irm->state_lock); + pthread_rwlock_wrlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->reg_lock); - if (irm->threadpool != NULL) - free(irm->threadpool); + if (instance->threadpool != NULL) + free(instance->threadpool); - if (irm->port_ids != NULL) - bmp_destroy(irm->port_ids); + if (instance->port_ids != NULL) + bmp_destroy(instance->port_ids); /* clear the lists */ - list_for_each_safe(h, t, &irm->ipcps) { + list_for_each_safe(h, t, &instance->ipcps) { struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); destroy_ipcp(e->api); } - list_for_each_safe(h, t, &irm->reg_names) { + list_for_each_safe(h, t, &instance->reg_names) { struct reg_name_entry * e = list_entry(h, struct reg_name_entry, next); @@ -1294,7 +1290,11 @@ static void irm_destroy(struct irm * irm) reg_name_entry_destroy(e); } - list_for_each_safe(h, t, &irm->port_map) { + pthread_rwlock_unlock(&instance->reg_lock); + + pthread_rwlock_wrlock(&instance->flows_lock); + + list_for_each_safe(h, t, &instance->port_map) { struct port_map_entry * e = list_entry(h, struct port_map_entry, next); @@ -1303,15 +1303,16 @@ static void irm_destroy(struct irm * irm) free(e); } + pthread_rwlock_unlock(&instance->flows_lock); - if (irm->dum != NULL) - shm_du_map_destroy(irm->dum); + if (instance->dum != NULL) + shm_du_map_destroy(instance->dum); - close(irm->sockfd); + close(instance->sockfd); - rw_lock_unlock(&irm->state_lock); + pthread_rwlock_unlock(&instance->state_lock); - free(irm); + free(instance); } @@ -1323,7 +1324,11 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) case SIGINT: case SIGTERM: case SIGHUP: - rw_lock_wrlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->state_lock); + + + + pthread_rwlock_unlock(&instance->state_lock); if (instance->threadpool != NULL) { for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) @@ -1331,9 +1336,9 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) } + pthread_cancel(instance->shm_sanitize); pthread_cancel(instance->cleanup_flows); - rw_lock_unlock(&instance->state_lock); break; case SIGPIPE: @@ -1355,13 +1360,13 @@ void * irm_flow_cleaner() if(clock_gettime(CLOCK_MONOTONIC, &now) < 0) LOG_WARN("Failed to get time."); /* cleanup stale PENDING flows */ - rw_lock_rdlock(&instance->state_lock); - rw_lock_wrlock(&instance->flows_lock); + + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->flows_lock); list_for_each_safe(pos, n, &(instance->port_map)) { struct port_map_entry * e = list_entry(pos, struct port_map_entry, next); - pthread_mutex_lock(&e->res_lock); if (e->state == FLOW_PENDING && @@ -1369,9 +1374,12 @@ void * irm_flow_cleaner() LOG_DBGF("Pending port_id %d timed out.", e->port_id); e->state = FLOW_NULL; - pthread_cond_broadcast(&e->res_signal); + pthread_cond_signal(&e->res_signal); + pthread_mutex_unlock(&e->res_lock); + continue; } + pthread_mutex_unlock(&e->res_lock); if (kill(e->n_pid, 0) < 0) { bmp_release(instance->port_ids, e->port_id); @@ -1388,17 +1396,20 @@ void * irm_flow_cleaner() e->n_1_pid, e->port_id); free(e); } - - pthread_mutex_unlock(&e->res_lock); } - rw_lock_unlock(&instance->flows_lock); - rw_lock_unlock(&instance->state_lock); + pthread_rwlock_unlock(&instance->flows_lock); + pthread_rwlock_unlock(&instance->state_lock); nanosleep(&timeout, NULL); } } +void clean_msg(void * msg) +{ + irm_msg__free_unpacked(msg, NULL); +} + void * mainloop() { uint8_t buf[IRM_MSG_BUF_SIZE]; @@ -1433,6 +1444,8 @@ void * mainloop() continue; } + pthread_cleanup_push(clean_msg, (void *) msg); + api.name = msg->ap_name; if (msg->has_api_id == true) api.id = msg->api_id; @@ -1542,7 +1555,7 @@ void * mainloop() break; } - irm_msg__free_unpacked(msg, NULL); + pthread_cleanup_pop(true); buffer.size = irm_msg__get_packed_size(&ret_msg); if (buffer.size == 0) { @@ -1574,8 +1587,8 @@ static struct irm * irm_create() { struct stat st = {0}; - struct irm * i = malloc(sizeof(*i)); - if (i == NULL) + instance = malloc(sizeof(*instance)); + if (instance == NULL) return NULL; if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) { @@ -1583,6 +1596,7 @@ static struct irm * irm_create() if (dum == NULL) { LOG_ERR("Could not examine existing shm file."); + free(instance); exit(EXIT_FAILURE); } @@ -1592,68 +1606,71 @@ static struct irm * irm_create() LOG_INFO("Stale shm file removed."); } else { LOG_INFO("IRMd already running, exiting."); - free(i); + free(instance); exit(EXIT_SUCCESS); } } - if (rw_lock_init(&i->state_lock)) { - irm_destroy(i); + if (pthread_rwlock_init(&instance->state_lock, NULL)) { + LOG_ERR("Failed to initialize rwlock."); + free(instance); + return NULL; + } + + if (pthread_rwlock_init(&instance->reg_lock, NULL)) { + LOG_ERR("Failed to initialize rwlock."); + free(instance); + return NULL; + } + + if (pthread_rwlock_init(&instance->flows_lock, NULL)) { + LOG_ERR("Failed to initialize rwlock."); + free(instance); return NULL; } - i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); - if (i->threadpool == NULL) { - irm_destroy(i); + instance->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + if (instance->threadpool == NULL) { + irm_destroy(); return NULL; } - if ((i->dum = shm_du_map_create()) == NULL) { - irm_destroy(i); + if ((instance->dum = shm_du_map_create()) == NULL) { + irm_destroy(); return NULL; } - INIT_LIST_HEAD(&i->ipcps); - INIT_LIST_HEAD(&i->reg_names); - INIT_LIST_HEAD(&i->port_map); + INIT_LIST_HEAD(&instance->ipcps); + INIT_LIST_HEAD(&instance->reg_names); + INIT_LIST_HEAD(&instance->port_map); - i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); - if (i->port_ids == NULL) { - irm_destroy(i); + instance->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); + if (instance->port_ids == NULL) { + irm_destroy(); return NULL; } if (stat(SOCK_PATH, &st) == -1) { if (mkdir(SOCK_PATH, 0777)) { LOG_ERR("Failed to create sockets directory."); - irm_destroy(i); + irm_destroy(); return NULL; } } - i->sockfd = server_socket_open(IRM_SOCK_PATH); - if (i->sockfd < 0) { - irm_destroy(i); + instance->sockfd = server_socket_open(IRM_SOCK_PATH); + if (instance->sockfd < 0) { + irm_destroy(); return NULL; } if (chmod(IRM_SOCK_PATH, 0666)) { LOG_ERR("Failed to chmod socket."); - irm_destroy(i); - return NULL; - } - - if (rw_lock_init(&i->reg_lock)) { - irm_destroy(i); + irm_destroy(); return NULL; } - if (rw_lock_init(&i->flows_lock)) { - irm_destroy(i); - return NULL; - } - - return i; + return instance; } int main() @@ -1687,18 +1704,21 @@ int main() if (instance == NULL) exit(EXIT_FAILURE); - pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL); - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); + pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL); + pthread_create(&instance->shm_sanitize, NULL, + shm_du_map_sanitize, NULL); + /* wait for (all of them) to return */ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) pthread_join(instance->threadpool[t], NULL); + pthread_join(instance->shm_sanitize, NULL); pthread_join(instance->cleanup_flows, NULL); - irm_destroy(instance); + irm_destroy(); exit(EXIT_SUCCESS); } |