summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-16 01:44:27 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-16 01:44:27 +0200
commit22ef5ea7feeace47bff9d16434a4b705d1ae4a82 (patch)
tree1c157dbf5d2c1ab5982da6e0b091b2d83365d44f
parent8f79d80e7fe7f52f310edddc73589f4f71457747 (diff)
downloadouroboros-22ef5ea7feeace47bff9d16434a4b705d1ae4a82.tar.gz
ouroboros-22ef5ea7feeace47bff9d16434a4b705d1ae4a82.zip
irmd: new locking implementation
This locking should be more consistent, It now has three locks, one guarding flows and port_id's, one guarding the registered apps and ipcps, and one guarding the overall state of the irmd. There are two additional mutexes guarding the condition variables.
-rw-r--r--src/irmd/main.c396
1 files changed, 221 insertions, 175 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 9c515d2a..b7e1ad18 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -36,6 +36,7 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/flow.h>
#include <ouroboros/qos.h>
+#include <ouroboros/rw_lock.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -102,23 +103,18 @@ struct irm {
/* FIXME: list of ipcps could be merged with registered names */
struct list_head ipcps;
struct list_head reg_names;
-
- int sockfd;
+ rw_lock_t reg_lock;
/* keep track of all flows in this processing system */
struct bmp * port_ids;
-
/* maps port_ids to pid pair */
struct list_head port_map;
+ rw_lock_t flows_lock;
struct shm_du_map * dum;
-
- pthread_t * threadpool;
-
- pthread_mutex_t r_lock;
-
- bool shutdown;
- pthread_mutex_t s_lock;
+ pthread_t * threadpool;
+ int sockfd;
+ rw_lock_t state_lock;
} * instance = NULL;
static struct port_map_entry * port_map_entry_create()
@@ -405,45 +401,45 @@ static pid_t create_ipcp(char * ap_name,
pid_t pid;
struct ipcp_entry * tmp = NULL;
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return -1;
- }
- pthread_mutex_unlock(&instance->s_lock);
-
+ rw_lock_rdlock(&instance->state_lock);
pid = ipcp_create(ap_name, ipcp_type);
if (pid == -1) {
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("Failed to create IPCP.");
return -1;
}
tmp = ipcp_entry_create();
- if (tmp == NULL)
+ if (tmp == NULL) {
+ rw_lock_unlock(&instance->state_lock);
return -1;
+ }
INIT_LIST_HEAD(&tmp->next);
tmp->api = instance_name_create();
if (tmp->api == NULL) {
ipcp_entry_destroy(tmp);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) {
instance_name_destroy(tmp->api);
ipcp_entry_destroy(tmp);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
tmp->dif_name = NULL;
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_wrlock(&instance->reg_lock);
list_add(&tmp->next, &instance->ipcps);
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_INFO("Created IPCP %s-%d.", ap_name, pid);
@@ -456,14 +452,18 @@ static int destroy_ipcp(instance_name_t * api)
struct list_head * n = NULL;
pid_t pid = 0;
-
if (api == NULL)
return 0;
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->reg_lock);
+
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP in the system.");
return 0;
}
@@ -482,6 +482,9 @@ static int destroy_ipcp(instance_name_t * api)
ipcp_entry_destroy(tmp);
}
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
+
LOG_INFO("Destroyed IPCP %d.", pid);
return 0;
@@ -492,47 +495,46 @@ static int bootstrap_ipcp(instance_name_t * api,
{
struct ipcp_entry * entry = NULL;
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return -1;
- }
- pthread_mutex_unlock(&instance->s_lock);
-
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->reg_lock);
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP in the system.");
return -1;
}
entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(conf->dif_name);
if (entry->dif_name == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
if (ipcp_bootstrap(entry->api->id, conf)) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("Could not bootstrap IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.",
api->name, api->id, conf->dif_name);
@@ -548,58 +550,56 @@ static int enroll_ipcp(instance_name_t * api,
ssize_t n_1_difs_size = 0;
struct ipcp_entry * entry = NULL;
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return -1;
- }
- pthread_mutex_unlock(&instance->s_lock);
-
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->reg_lock);
entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(dif_name);
if (entry->dif_name == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
member = da_resolve_daf(dif_name);
if (member == NULL) {
- LOG_ERR("Could not find a member of that DIF.");
free(entry->dif_name);
entry->dif_name = NULL;
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
n_1_difs_size = da_resolve_dap(member, n_1_difs);
if (n_1_difs_size < 1) {
- LOG_ERR("Could not find N-1 DIFs.");
free(entry->dif_name);
entry->dif_name = NULL;
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
+ LOG_ERR("Could not find N-1 DIFs.");
return -1;
}
- pthread_mutex_unlock(&instance->r_lock);
-
if (ipcp_enroll(api->id, member, n_1_difs[0])) {
- LOG_ERR("Could not enroll IPCP.");
- pthread_mutex_lock(&instance->r_lock);
free(entry->dif_name);
entry->dif_name = NULL;
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
+ LOG_ERR("Could not enroll IPCP.");
return -1;
}
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
+
LOG_INFO("Enrolled IPCP %s-%d in DIF %s.",
api->name, api->id, dif_name);
@@ -610,18 +610,19 @@ static int reg_ipcp(instance_name_t * api,
char ** difs,
size_t difs_size)
{
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return -1;
- }
- pthread_mutex_unlock(&instance->s_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->reg_lock);
if (ipcp_reg(api->id, difs, difs_size)) {
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("Could not register IPCP to N-1 DIF(s).");
return -1;
}
+ rw_lock_wrlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
+
return 0;
}
@@ -629,11 +630,16 @@ static int unreg_ipcp(instance_name_t * api,
char ** difs,
size_t difs_size)
{
-
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->reg_lock);
if (ipcp_unreg(api->id, difs, difs_size)) {
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("Could not unregister IPCP from N-1 DIF(s).");
return -1;
}
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return 0;
}
@@ -651,28 +657,25 @@ static int ap_reg(char * ap_name,
instance_name_t * api = NULL;
instance_name_t * ipcpi = NULL;
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return -1;
- }
- pthread_mutex_unlock(&instance->s_lock);
-
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->reg_lock);
if (instance->ipcps.next == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
api = instance_name_create();
if (api == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
instance_name_destroy(api);
return -1;
}
@@ -681,8 +684,9 @@ static int ap_reg(char * ap_name,
rne = get_reg_name_entry_by_name(ap_name);
if (rne != NULL) {
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
instance_name_destroy(api);
- pthread_mutex_unlock(&instance->r_lock);
return -1; /* can only register one instance for now */
}
@@ -721,15 +725,17 @@ static int ap_reg(char * ap_name,
}
if (ret == 0) {
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
instance_name_destroy(api);
- pthread_mutex_unlock(&instance->r_lock);
return -1;
}
/* for now, we register single instances */
ret = reg_name_entry_add_name_instance(strdup(ap_name),
api);
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return ret;
}
@@ -744,22 +750,26 @@ static int ap_unreg(char * ap_name,
struct reg_name_entry * rne = NULL;
struct list_head * pos = NULL;
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->reg_lock);
/* check if ap_name is registered */
rne = get_reg_name_entry_by_id(ap_id);
if (rne == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return 0; /* no such id */
}
if (strcmp(ap_name, rne->api->name)) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return 0;
}
if (instance->ipcps.next == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("No IPCPs in this system.");
return 0;
}
@@ -788,7 +798,8 @@ static int ap_unreg(char * ap_name,
/* FIXME: check if name is not registered in any DIF before removing */
reg_name_entry_del_name(rne->name);
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return ret;
}
@@ -800,22 +811,19 @@ static struct port_map_entry * flow_accept(pid_t pid,
struct port_map_entry * pme;
struct reg_name_entry * rne = NULL;
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return NULL;
- }
- pthread_mutex_unlock(&instance->s_lock);
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->reg_lock);
rne = get_reg_name_entry_by_id(pid);
if (rne == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_DBGF("Unregistered AP calling accept().");
return NULL;
}
if (rne->accept) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_DBGF("This AP still has a pending accept().");
return NULL;
}
@@ -823,7 +831,8 @@ static struct port_map_entry * flow_accept(pid_t pid,
rne->accept = true;
rne->flow_arrived = -1;
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
pthread_mutex_lock(&rne->acc_lock);
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
@@ -840,16 +849,20 @@ static struct port_map_entry * flow_accept(pid_t pid,
/* ap with pending accept being unregistered */
if (rne->flow_arrived == -2 ) {
pthread_mutex_unlock(&rne->acc_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return NULL;
}
pthread_mutex_unlock(&rne->acc_lock);
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->flows_lock);
pme = get_port_map_entry_n(pid);
if (pme == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("Port_id was not created yet.");
return NULL;
}
@@ -858,7 +871,8 @@ static struct port_map_entry * flow_accept(pid_t pid,
if (ae_name != NULL)
*ae_name = rne->req_ae_name;
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
return pme;
}
@@ -869,25 +883,22 @@ static int flow_alloc_resp(pid_t n_pid,
{
struct port_map_entry * pme = NULL;
struct reg_name_entry * rne = NULL;
+ int ret = -1;
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return -1;
- }
- pthread_mutex_unlock(&instance->s_lock);
-
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->reg_lock);
rne = get_reg_name_entry_by_id(n_pid);
if (rne == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
/* FIXME: check all instances associated with the name */
if (!rne->accept) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_ERR("No process listening for this name.");
return -1;
}
@@ -897,20 +908,37 @@ static int flow_alloc_resp(pid_t n_pid,
* once we can handle a list of AP-I's, remove it from the list
*/
+ pthread_mutex_lock(&rne->acc_lock);
+
rne->accept = false;
rne->flow_arrived = -1;
+ pthread_mutex_unlock(&rne->acc_lock);
+
+ rw_lock_unlock(&instance->reg_lock);
+
if (!response) {
+ rw_lock_wrlock(&instance->flows_lock);
+
pme = get_port_map_entry(port_id);
+ if (pme == NULL) {
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
+ return -1;
+ }
+
pme->state = FLOW_ALLOCATED;
+ ret = ipcp_flow_alloc_resp(pme->n_1_pid,
+ port_id,
+ pme->n_pid,
+ response);
+
+ rw_lock_unlock(&instance->flows_lock);
}
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->state_lock);
- return ipcp_flow_alloc_resp(pme->n_1_pid,
- port_id,
- pme->n_pid,
- response);
+ return ret;
}
static struct port_map_entry * flow_alloc(pid_t pid,
@@ -924,13 +952,6 @@ static struct port_map_entry * flow_alloc(pid_t pid,
/* FIXME: Map qos_spec to qos_cube */
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return NULL;
- }
- pthread_mutex_unlock(&instance->s_lock);
-
pme = port_map_entry_create();
if (pme == NULL) {
LOG_ERR("Failed malloc of port_map_entry.");
@@ -940,22 +961,29 @@ static struct port_map_entry * flow_alloc(pid_t pid,
pme->n_pid = pid;
pme->state = FLOW_PENDING;
- pthread_mutex_lock(&instance->r_lock);
-
- pme->port_id = bmp_allocate(instance->port_ids);
- pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
-
- list_add(&pme->next, &instance->port_map);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->reg_lock);
ipcp = get_ipcp_by_dst_name(dst_name);
- pthread_mutex_unlock(&instance->r_lock);
-
if (ipcp == NULL) {
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_DBG("unknown ipcp");
return NULL;
}
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_wrlock(&instance->flows_lock);
+
+ pme->port_id = bmp_allocate(instance->port_ids);
+ pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
+
+ list_add(&pme->next, &instance->port_map);
+
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
+
if (ipcp_flow_alloc(ipcp->id,
pme->port_id,
pme->n_pid,
@@ -963,15 +991,13 @@ static struct port_map_entry * flow_alloc(pid_t pid,
src_ap_name,
src_ae_name,
QOS_CUBE_BE) < 0) {
- pthread_mutex_lock(&instance->r_lock);
-
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->flows_lock);
list_del(&pme->next);
-
- pthread_mutex_unlock(&instance->r_lock);
-
bmp_release(instance->port_ids, pme->port_id);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_rdlock(&instance->state_lock);
free(pme);
-
return NULL;
}
@@ -982,27 +1008,24 @@ static int flow_alloc_res(int port_id)
{
struct port_map_entry * e;
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return -1;
- }
- pthread_mutex_unlock(&instance->s_lock);
-
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
if (e->state == FLOW_ALLOCATED) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
return 0;
}
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
while (true) {
pthread_mutex_lock(&e->res_lock);
@@ -1013,45 +1036,52 @@ static int flow_alloc_res(int port_id)
pthread_mutex_unlock(&e->res_lock);
pthread_cleanup_pop(0);
- pthread_mutex_lock(&instance->r_lock);
+
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
if (e->state == FLOW_ALLOCATED) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_DBGF("Returning 0.");
return 0;
}
if (e->state == FLOW_NULL) {
list_del(&e->next);
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
free(e);
return -1;
}
/* still pending, spurious wake */
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
}
- pthread_mutex_unlock(&instance->r_lock);
-
return 0;
}
static int flow_dealloc(int port_id)
{
- pid_t n_1_pid;
+ pid_t n_1_pid;
+ int ret = 0;
struct port_map_entry * e = NULL;
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_wrlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
return 0;
}
@@ -1061,11 +1091,14 @@ static int flow_dealloc(int port_id)
bmp_release(instance->port_ids, port_id);
- pthread_mutex_unlock(&instance->r_lock);
+ ret = ipcp_flow_dealloc(n_1_pid, port_id);
+
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
free(e);
- return ipcp_flow_dealloc(n_1_pid, port_id);
+ return ret;
}
static struct port_map_entry * flow_req_arr(pid_t pid,
@@ -1076,13 +1109,6 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
struct reg_name_entry * rne;
struct port_map_entry * pme;
- pthread_mutex_lock(&instance->s_lock);
- if (instance->shutdown) {
- pthread_mutex_unlock(&instance->s_lock);
- return NULL;
- }
- pthread_mutex_unlock(&instance->s_lock);
-
pme = malloc(sizeof(*pme));
if (pme == NULL) {
LOG_ERR("Failed malloc of port_map_entry.");
@@ -1092,13 +1118,17 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
pme->state = FLOW_PENDING;
pme->n_1_pid = pid;
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->reg_lock);
+ rw_lock_wrlock(&instance->flows_lock);
pme->port_id = bmp_allocate(instance->port_ids);
rne = get_reg_name_entry_by_name(dst_name);
if (rne == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
LOG_DBGF("Destination name %s unknown.", dst_name);
free(pme);
return NULL;
@@ -1115,12 +1145,14 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
rne->flow_arrived = 0;
+ pthread_mutex_unlock(&rne->acc_lock);
+
if (pthread_cond_signal(&rne->acc_signal))
LOG_ERR("Failed to send signal.");
- pthread_mutex_unlock(&rne->acc_lock);
-
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->reg_lock);
+ rw_lock_unlock(&instance->state_lock);
return pme;
}
@@ -1130,11 +1162,13 @@ static int flow_alloc_reply(int port_id,
{
struct port_map_entry * e;
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
return -1;
}
@@ -1151,7 +1185,8 @@ static int flow_alloc_reply(int port_id,
pthread_mutex_unlock(&e->res_lock);
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
return 0;
}
@@ -1160,17 +1195,20 @@ static int flow_dealloc_ipcp(int port_id)
{
struct port_map_entry * e = NULL;
- pthread_mutex_lock(&instance->r_lock);
+ rw_lock_rdlock(&instance->state_lock);
+ rw_lock_rdlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
return 0;
}
list_del(&e->next);
- pthread_mutex_unlock(&instance->r_lock);
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
free(e);
@@ -1185,9 +1223,7 @@ static void irm_destroy(struct irm * irm)
if (irm == NULL)
return;
- pthread_mutex_lock(&irm->s_lock);
- instance->shutdown = true;
- pthread_mutex_unlock(&irm->s_lock);
+ rw_lock_wrlock(&instance->state_lock);
if (irm->threadpool != NULL)
free(irm->threadpool);
@@ -1221,6 +1257,8 @@ static void irm_destroy(struct irm * irm)
close(irm->sockfd);
free(irm);
+
+ rw_lock_unlock(&instance->state_lock);
}
void irmd_sig_handler(int sig, siginfo_t * info, void * c)
@@ -1231,11 +1269,16 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)
case SIGINT:
case SIGTERM:
case SIGHUP:
+ rw_lock_wrlock(&instance->state_lock);
+
if (instance->threadpool != NULL) {
for (i = 0; i < IRMD_THREADPOOL_SIZE; i++)
pthread_cancel(instance->threadpool[i]);
+
}
+ rw_lock_unlock(&instance->state_lock);
+
case SIGPIPE:
LOG_DBG("Ignoring SIGPIPE.");
default:
@@ -1458,14 +1501,17 @@ static struct irm * irm_create()
return NULL;
}
- if (pthread_mutex_init(&i->r_lock, NULL)) {
+ if (rw_lock_init(&i->state_lock)) {
irm_destroy(i);
return NULL;
}
- i->shutdown = false;
+ if (rw_lock_init(&i->reg_lock)) {
+ irm_destroy(i);
+ return NULL;
+ }
- if (pthread_mutex_init(&i->s_lock, NULL)) {
+ if (rw_lock_init(&i->flows_lock)) {
irm_destroy(i);
return NULL;
}