summaryrefslogtreecommitdiff
path: root/src/irmd
diff options
context:
space:
mode:
Diffstat (limited to 'src/irmd')
-rw-r--r--src/irmd/main.c464
1 files changed, 242 insertions, 222 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 5f7c1ddc..50055c4d 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -36,7 +36,6 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/flow.h>
#include <ouroboros/qos.h>
-#include <ouroboros/rw_lock.h>
#include <ouroboros/time_utils.h>
#include "utils.h"
@@ -113,20 +112,21 @@ struct irm {
/* FIXME: list of ipcps could be merged with registered names */
struct list_head ipcps;
struct list_head reg_names;
- rw_lock_t reg_lock;
+ pthread_rwlock_t reg_lock;
/* keep track of all flows in this processing system */
struct bmp * port_ids;
/* maps port_ids to pid pair */
struct list_head port_map;
- rw_lock_t flows_lock;
+ pthread_rwlock_t flows_lock;
struct shm_du_map * dum;
pthread_t * threadpool;
int sockfd;
- rw_lock_t state_lock;
+ pthread_rwlock_t state_lock;
pthread_t cleanup_flows;
+ pthread_t shm_sanitize;
} * instance = NULL;
static struct port_map_entry * port_map_entry_create()
@@ -435,18 +435,18 @@ static pid_t create_ipcp(char * ap_name,
pid_t pid;
struct ipcp_entry * tmp = NULL;
- rw_lock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
pid = ipcp_create(ap_name, ipcp_type);
if (pid == -1) {
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("Failed to create IPCP.");
return -1;
}
tmp = ipcp_entry_create();
if (tmp == NULL) {
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
@@ -455,25 +455,25 @@ static pid_t create_ipcp(char * ap_name,
tmp->api = instance_name_create();
if (tmp->api == NULL) {
ipcp_entry_destroy(tmp);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) {
instance_name_destroy(tmp->api);
ipcp_entry_destroy(tmp);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
tmp->dif_name = NULL;
- rw_lock_wrlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&instance->reg_lock);
list_add(&tmp->next, &instance->ipcps);
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_INFO("Created IPCP %s-%d.", ap_name, pid);
@@ -489,15 +489,10 @@ static int destroy_ipcp(instance_name_t * api)
if (api == NULL)
return 0;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->reg_lock);
-
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP in the system.");
return 0;
}
@@ -516,8 +511,6 @@ static int destroy_ipcp(instance_name_t * api)
}
}
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
LOG_INFO("Destroyed IPCP %d.", pid);
@@ -529,46 +522,46 @@ static int bootstrap_ipcp(instance_name_t * api,
{
struct ipcp_entry * entry = NULL;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->reg_lock);
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP in the system.");
return -1;
}
entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(conf->dif_name);
if (entry->dif_name == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
if (ipcp_bootstrap(entry->api->id, conf)) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("Could not bootstrap IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.",
api->name, api->id, conf->dif_name);
@@ -584,21 +577,21 @@ static int enroll_ipcp(instance_name_t * api,
ssize_t n_1_difs_size = 0;
struct ipcp_entry * entry = NULL;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_rdlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->reg_lock);
entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(dif_name);
if (entry->dif_name == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
@@ -607,8 +600,8 @@ static int enroll_ipcp(instance_name_t * api,
if (member == NULL) {
free(entry->dif_name);
entry->dif_name = NULL;
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
@@ -616,8 +609,8 @@ static int enroll_ipcp(instance_name_t * api,
if (n_1_difs_size < 1) {
free(entry->dif_name);
entry->dif_name = NULL;
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("Could not find N-1 DIFs.");
return -1;
}
@@ -625,14 +618,14 @@ static int enroll_ipcp(instance_name_t * api,
if (ipcp_enroll(api->id, member, n_1_difs[0])) {
free(entry->dif_name);
entry->dif_name = NULL;
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("Could not enroll IPCP.");
return -1;
}
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_INFO("Enrolled IPCP %s-%d in DIF %s.",
api->name, api->id, dif_name);
@@ -657,25 +650,25 @@ static int ap_reg(char * name,
instance_name_t * api = NULL;
char ** argv_dup = NULL;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->reg_lock);
if (instance->ipcps.next == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
api = instance_name_create();
if (api == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
if (instance_name_init_from(api, path_strip(ap_name), ap_id) == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
instance_name_destroy(api);
return -1;
}
@@ -683,8 +676,8 @@ static int ap_reg(char * name,
/* check if this name is already registered */
rne = get_reg_name_entry_by_name(name);
if (rne != NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
instance_name_destroy(api);
return -1; /* can only register one instance for now */
}
@@ -711,8 +704,8 @@ static int ap_reg(char * name,
}
}
if (ret == 0) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
instance_name_destroy(api);
return -1;
}
@@ -735,8 +728,8 @@ static int ap_reg(char * name,
< 0)
LOG_DBGF("Failed to add application %s.", api->name);
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return ret;
}
@@ -756,8 +749,8 @@ static int ap_unreg(char * name,
if (name == NULL || len == 0 || difs == NULL || difs[0] == NULL)
return -1;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->reg_lock);
if (!hard && strcmp(difs[0], "*") != 0) {
LOG_INFO("Unregistration not complete yet.");
@@ -787,8 +780,8 @@ static int ap_unreg(char * name,
reg_name_entry_del_name(rne->name);
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return ret;
}
@@ -800,13 +793,13 @@ static struct port_map_entry * flow_accept(pid_t pid,
struct port_map_entry * pme;
struct reg_name_entry * rne = NULL;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_rdlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->reg_lock);
rne = get_reg_name_entry_by_ap_name(srv_ap_name);
if (rne == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_DBGF("AP %s is unknown.", srv_ap_name);
return NULL;
}
@@ -814,16 +807,16 @@ static struct port_map_entry * flow_accept(pid_t pid,
if (rne->api->id == 0) {
rne->api->id = pid;
} else if (rne->api->id != pid) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_DBGF("Can only register one instance.");
LOG_MISSING;
return NULL;
}
if (rne->accept) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_DBGF("This AP still has a pending accept().");
return NULL;
}
@@ -833,18 +826,17 @@ static struct port_map_entry * flow_accept(pid_t pid,
pthread_cond_broadcast(&rne->acc_signal);
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
pthread_mutex_lock(&rne->acc_lock);
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void*) &rne->acc_lock);
+ (void *) &rne->acc_lock);
while (rne->flow_arrived == -1)
pthread_cond_wait(&rne->acc_arr_signal, &rne->acc_lock);
- pthread_mutex_unlock(&rne->acc_lock);
- pthread_cleanup_pop(0);
+ pthread_cleanup_pop(true);
pthread_mutex_lock(&rne->acc_lock);
@@ -856,13 +848,13 @@ static struct port_map_entry * flow_accept(pid_t pid,
pthread_mutex_unlock(&rne->acc_lock);
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_rdlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->flows_lock);
pme = get_port_map_entry_n(pid);
if (pme == NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("Port_id was not created yet.");
return NULL;
}
@@ -870,8 +862,8 @@ static struct port_map_entry * flow_accept(pid_t pid,
if (dst_ae_name != NULL)
*dst_ae_name = rne->req_ae_name;
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return pme;
}
@@ -884,20 +876,20 @@ static int flow_alloc_resp(pid_t n_pid,
struct reg_name_entry * rne = NULL;
int ret = -1;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_rdlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->reg_lock);
rne = get_reg_name_entry_by_id(n_pid);
if (rne == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
/* FIXME: check all instances associated with the name */
if (!rne->accept) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("No process listening for this name.");
return -1;
}
@@ -914,28 +906,30 @@ static int flow_alloc_resp(pid_t n_pid,
pthread_mutex_unlock(&rne->acc_lock);
- rw_lock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
if (!response) {
- rw_lock_wrlock(&instance->flows_lock);
+ pthread_rwlock_wrlock(&instance->flows_lock);
pme = get_port_map_entry(port_id);
if (pme == NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
pme->state = FLOW_ALLOCATED;
+
+ pthread_rwlock_unlock(&instance->flows_lock);
+
ret = ipcp_flow_alloc_resp(pme->n_1_pid,
port_id,
pme->n_pid,
response);
- rw_lock_unlock(&instance->flows_lock);
}
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return ret;
}
@@ -962,30 +956,30 @@ static struct port_map_entry * flow_alloc(pid_t pid,
if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)
LOG_WARN("Failed to set timestamp.");
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_rdlock(&instance->reg_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->reg_lock);
if (qos != NULL)
dif_name = qos->dif_name;
ipcp = get_ipcp_by_dst_name(dst_name, dif_name);
if (ipcp == NULL) {
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_DBG("Unknown DIF name.");
return NULL;
}
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_wrlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&instance->flows_lock);
pme->port_id = bmp_allocate(instance->port_ids);
pme->n_1_pid = ipcp->id;
list_add(&pme->next, &instance->port_map);
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
if (ipcp_flow_alloc(ipcp->id,
pme->port_id,
@@ -993,12 +987,12 @@ static struct port_map_entry * flow_alloc(pid_t pid,
dst_name,
src_ae_name,
QOS_CUBE_BE) < 0) {
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->flows_lock);
list_del(&pme->next);
bmp_release(instance->port_ids, pme->port_id);
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
free(pme);
return NULL;
}
@@ -1010,30 +1004,30 @@ static int flow_alloc_res(int port_id)
{
struct port_map_entry * e;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_rdlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
if (e->state == FLOW_NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
if (e->state == FLOW_ALLOCATED) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return 0;
}
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
while (true) {
pthread_mutex_lock(&e->res_lock);
@@ -1042,35 +1036,34 @@ static int flow_alloc_res(int port_id)
pthread_cond_wait(&e->res_signal, &e->res_lock);
- pthread_mutex_unlock(&e->res_lock);
- pthread_cleanup_pop(0);
+ pthread_cleanup_pop(true);
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
if (e->state == FLOW_ALLOCATED) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return 0;
}
if (e->state == FLOW_NULL) {
/* don't release the port_id, AP has to call dealloc */
list_del(&e->next);
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
free(e);
return -1;
}
/* still pending, spurious wake */
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
}
return 0;
@@ -1083,15 +1076,14 @@ static int flow_dealloc(int port_id)
struct port_map_entry * e = NULL;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->flows_lock);
-
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->flows_lock);
bmp_release(instance->port_ids, port_id);
e = get_port_map_entry(port_id);
if (e == NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return 0;
}
@@ -1099,10 +1091,11 @@ static int flow_dealloc(int port_id)
list_del(&e->next);
+ pthread_rwlock_unlock(&instance->flows_lock);
+
ret = ipcp_flow_dealloc(n_1_pid, port_id);
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
free(e);
@@ -1148,17 +1141,17 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
pme->state = FLOW_PENDING;
pme->n_1_pid = pid;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_rdlock(&instance->reg_lock);
- rw_lock_wrlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->reg_lock);
+ pthread_rwlock_wrlock(&instance->flows_lock);
pme->port_id = bmp_allocate(instance->port_ids);
rne = get_reg_name_entry_by_name(dst_name);
if (rne == NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_DBGF("Destination name %s unknown.", dst_name);
free(pme);
return NULL;
@@ -1168,13 +1161,20 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
list_add(&pme->next, &instance->port_map);
+ pthread_rwlock_unlock(&instance->flows_lock);
+
pthread_mutex_lock(&rne->acc_lock);
+ pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
+ (void *) &rne->acc_lock);
+
rne->req_ae_name = ae_name;
if (rne->accept == false) {
if (rne->autoexec) {
+ pthread_rwlock_wrlock(&instance->flows_lock);
pme->n_pid = auto_execute(rne->api->name, rne->argv);
+ pthread_rwlock_unlock(&instance->flows_lock);
while (rne->accept == false)
pthread_cond_wait(&rne->acc_signal,
&rne->acc_lock);
@@ -1188,21 +1188,19 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
rne->flow_arrived = 0;
- pthread_mutex_unlock(&rne->acc_lock);
-
if (pthread_cond_signal(&rne->acc_arr_signal))
LOG_ERR("Failed to send signal.");
+ pthread_cleanup_pop(true);
+
while (acc_wait) {
- sched_yield();
pthread_mutex_lock(&rne->acc_lock);
acc_wait = (rne->flow_arrived != -1);
pthread_mutex_unlock(&rne->acc_lock);
}
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->reg_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return pme;
}
@@ -1212,13 +1210,13 @@ static int flow_alloc_reply(int port_id,
{
struct port_map_entry * e;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_rdlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
@@ -1235,8 +1233,8 @@ static int flow_alloc_reply(int port_id,
pthread_mutex_unlock(&e->res_lock);
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return 0;
}
@@ -1245,48 +1243,46 @@ static int flow_dealloc_ipcp(int port_id)
{
struct port_map_entry * e = NULL;
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->flows_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return 0;
}
list_del(&e->next);
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
free(e);
return 0;
}
-static void irm_destroy(struct irm * irm)
+static void irm_destroy()
{
struct list_head * h;
struct list_head * t;
- if (irm == NULL)
- return;
-
- rw_lock_wrlock(&irm->state_lock);
+ pthread_rwlock_wrlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->reg_lock);
- if (irm->threadpool != NULL)
- free(irm->threadpool);
+ if (instance->threadpool != NULL)
+ free(instance->threadpool);
- if (irm->port_ids != NULL)
- bmp_destroy(irm->port_ids);
+ if (instance->port_ids != NULL)
+ bmp_destroy(instance->port_ids);
/* clear the lists */
- list_for_each_safe(h, t, &irm->ipcps) {
+ list_for_each_safe(h, t, &instance->ipcps) {
struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next);
destroy_ipcp(e->api);
}
- list_for_each_safe(h, t, &irm->reg_names) {
+ list_for_each_safe(h, t, &instance->reg_names) {
struct reg_name_entry * e = list_entry(h,
struct reg_name_entry,
next);
@@ -1294,7 +1290,11 @@ static void irm_destroy(struct irm * irm)
reg_name_entry_destroy(e);
}
- list_for_each_safe(h, t, &irm->port_map) {
+ pthread_rwlock_unlock(&instance->reg_lock);
+
+ pthread_rwlock_wrlock(&instance->flows_lock);
+
+ list_for_each_safe(h, t, &instance->port_map) {
struct port_map_entry * e = list_entry(h,
struct port_map_entry,
next);
@@ -1303,15 +1303,16 @@ static void irm_destroy(struct irm * irm)
free(e);
}
+ pthread_rwlock_unlock(&instance->flows_lock);
- if (irm->dum != NULL)
- shm_du_map_destroy(irm->dum);
+ if (instance->dum != NULL)
+ shm_du_map_destroy(instance->dum);
- close(irm->sockfd);
+ close(instance->sockfd);
- rw_lock_unlock(&irm->state_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
- free(irm);
+ free(instance);
}
@@ -1323,7 +1324,11 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)
case SIGINT:
case SIGTERM:
case SIGHUP:
- rw_lock_wrlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->state_lock);
+
+
+
+ pthread_rwlock_unlock(&instance->state_lock);
if (instance->threadpool != NULL) {
for (i = 0; i < IRMD_THREADPOOL_SIZE; i++)
@@ -1331,9 +1336,9 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)
}
+ pthread_cancel(instance->shm_sanitize);
pthread_cancel(instance->cleanup_flows);
- rw_lock_unlock(&instance->state_lock);
break;
case SIGPIPE:
@@ -1355,13 +1360,13 @@ void * irm_flow_cleaner()
if(clock_gettime(CLOCK_MONOTONIC, &now) < 0)
LOG_WARN("Failed to get time.");
/* cleanup stale PENDING flows */
- rw_lock_rdlock(&instance->state_lock);
- rw_lock_wrlock(&instance->flows_lock);
+
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->flows_lock);
list_for_each_safe(pos, n, &(instance->port_map)) {
struct port_map_entry * e =
list_entry(pos, struct port_map_entry, next);
-
pthread_mutex_lock(&e->res_lock);
if (e->state == FLOW_PENDING &&
@@ -1369,9 +1374,12 @@ void * irm_flow_cleaner()
LOG_DBGF("Pending port_id %d timed out.",
e->port_id);
e->state = FLOW_NULL;
- pthread_cond_broadcast(&e->res_signal);
+ pthread_cond_signal(&e->res_signal);
+ pthread_mutex_unlock(&e->res_lock);
+ continue;
}
+ pthread_mutex_unlock(&e->res_lock);
if (kill(e->n_pid, 0) < 0) {
bmp_release(instance->port_ids, e->port_id);
@@ -1388,17 +1396,20 @@ void * irm_flow_cleaner()
e->n_1_pid, e->port_id);
free(e);
}
-
- pthread_mutex_unlock(&e->res_lock);
}
- rw_lock_unlock(&instance->flows_lock);
- rw_lock_unlock(&instance->state_lock);
+ pthread_rwlock_unlock(&instance->flows_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
nanosleep(&timeout, NULL);
}
}
+void clean_msg(void * msg)
+{
+ irm_msg__free_unpacked(msg, NULL);
+}
+
void * mainloop()
{
uint8_t buf[IRM_MSG_BUF_SIZE];
@@ -1433,6 +1444,8 @@ void * mainloop()
continue;
}
+ pthread_cleanup_push(clean_msg, (void *) msg);
+
api.name = msg->ap_name;
if (msg->has_api_id == true)
api.id = msg->api_id;
@@ -1542,7 +1555,7 @@ void * mainloop()
break;
}
- irm_msg__free_unpacked(msg, NULL);
+ pthread_cleanup_pop(true);
buffer.size = irm_msg__get_packed_size(&ret_msg);
if (buffer.size == 0) {
@@ -1574,8 +1587,8 @@ static struct irm * irm_create()
{
struct stat st = {0};
- struct irm * i = malloc(sizeof(*i));
- if (i == NULL)
+ instance = malloc(sizeof(*instance));
+ if (instance == NULL)
return NULL;
if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) {
@@ -1583,6 +1596,7 @@ static struct irm * irm_create()
if (dum == NULL) {
LOG_ERR("Could not examine existing shm file.");
+ free(instance);
exit(EXIT_FAILURE);
}
@@ -1592,68 +1606,71 @@ static struct irm * irm_create()
LOG_INFO("Stale shm file removed.");
} else {
LOG_INFO("IRMd already running, exiting.");
- free(i);
+ free(instance);
exit(EXIT_SUCCESS);
}
}
- if (rw_lock_init(&i->state_lock)) {
- irm_destroy(i);
+ if (pthread_rwlock_init(&instance->state_lock, NULL)) {
+ LOG_ERR("Failed to initialize rwlock.");
+ free(instance);
+ return NULL;
+ }
+
+ if (pthread_rwlock_init(&instance->reg_lock, NULL)) {
+ LOG_ERR("Failed to initialize rwlock.");
+ free(instance);
+ return NULL;
+ }
+
+ if (pthread_rwlock_init(&instance->flows_lock, NULL)) {
+ LOG_ERR("Failed to initialize rwlock.");
+ free(instance);
return NULL;
}
- i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
- if (i->threadpool == NULL) {
- irm_destroy(i);
+ instance->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
+ if (instance->threadpool == NULL) {
+ irm_destroy();
return NULL;
}
- if ((i->dum = shm_du_map_create()) == NULL) {
- irm_destroy(i);
+ if ((instance->dum = shm_du_map_create()) == NULL) {
+ irm_destroy();
return NULL;
}
- INIT_LIST_HEAD(&i->ipcps);
- INIT_LIST_HEAD(&i->reg_names);
- INIT_LIST_HEAD(&i->port_map);
+ INIT_LIST_HEAD(&instance->ipcps);
+ INIT_LIST_HEAD(&instance->reg_names);
+ INIT_LIST_HEAD(&instance->port_map);
- i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0);
- if (i->port_ids == NULL) {
- irm_destroy(i);
+ instance->port_ids = bmp_create(IRMD_MAX_FLOWS, 0);
+ if (instance->port_ids == NULL) {
+ irm_destroy();
return NULL;
}
if (stat(SOCK_PATH, &st) == -1) {
if (mkdir(SOCK_PATH, 0777)) {
LOG_ERR("Failed to create sockets directory.");
- irm_destroy(i);
+ irm_destroy();
return NULL;
}
}
- i->sockfd = server_socket_open(IRM_SOCK_PATH);
- if (i->sockfd < 0) {
- irm_destroy(i);
+ instance->sockfd = server_socket_open(IRM_SOCK_PATH);
+ if (instance->sockfd < 0) {
+ irm_destroy();
return NULL;
}
if (chmod(IRM_SOCK_PATH, 0666)) {
LOG_ERR("Failed to chmod socket.");
- irm_destroy(i);
- return NULL;
- }
-
- if (rw_lock_init(&i->reg_lock)) {
- irm_destroy(i);
+ irm_destroy();
return NULL;
}
- if (rw_lock_init(&i->flows_lock)) {
- irm_destroy(i);
- return NULL;
- }
-
- return i;
+ return instance;
}
int main()
@@ -1687,18 +1704,21 @@ int main()
if (instance == NULL)
exit(EXIT_FAILURE);
- pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL);
-
for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
pthread_create(&instance->threadpool[t], NULL, mainloop, NULL);
+ pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL);
+ pthread_create(&instance->shm_sanitize, NULL,
+ shm_du_map_sanitize, NULL);
+
/* wait for (all of them) to return */
for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
pthread_join(instance->threadpool[t], NULL);
+ pthread_join(instance->shm_sanitize, NULL);
pthread_join(instance->cleanup_flows, NULL);
- irm_destroy(instance);
+ irm_destroy();
exit(EXIT_SUCCESS);
}