diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-06-21 09:54:37 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-06-21 09:54:37 +0200 | 
| commit | aa6255a605cac034089c78562c0d000aacd0af1e (patch) | |
| tree | 9a76d884afc10c276c0b4e3af989d68a9556551b /src/irmd | |
| parent | 34f96731f5fb8ab8a1f7018366fc28fd041d73e2 (diff) | |
| parent | a46114ec01e8d174a41744f4f1b49905613847dc (diff) | |
| download | ouroboros-aa6255a605cac034089c78562c0d000aacd0af1e.tar.gz ouroboros-aa6255a605cac034089c78562c0d000aacd0af1e.zip | |
Merged in dstaesse/ouroboros/be-robust (pull request #135)
lib, irmd, ipcp: robust mutexes
Diffstat (limited to 'src/irmd')
| -rw-r--r-- | src/irmd/main.c | 464 | 
1 files changed, 242 insertions, 222 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index 5f7c1ddc..50055c4d 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -36,7 +36,6 @@  #include <ouroboros/bitmap.h>  #include <ouroboros/flow.h>  #include <ouroboros/qos.h> -#include <ouroboros/rw_lock.h>  #include <ouroboros/time_utils.h>  #include "utils.h" @@ -113,20 +112,21 @@ struct irm {          /* FIXME: list of ipcps could be merged with registered names */          struct list_head ipcps;          struct list_head reg_names; -        rw_lock_t  reg_lock; +        pthread_rwlock_t reg_lock;          /* keep track of all flows in this processing system */          struct bmp * port_ids;          /* maps port_ids to pid pair */          struct list_head port_map; -        rw_lock_t  flows_lock; +        pthread_rwlock_t  flows_lock;          struct shm_du_map * dum;          pthread_t *         threadpool;          int                 sockfd; -        rw_lock_t           state_lock; +        pthread_rwlock_t    state_lock;          pthread_t           cleanup_flows; +        pthread_t           shm_sanitize;  } * instance = NULL;  static struct port_map_entry * port_map_entry_create() @@ -435,18 +435,18 @@ static pid_t create_ipcp(char *         ap_name,          pid_t pid;          struct ipcp_entry * tmp = NULL; -        rw_lock_rdlock(&instance->state_lock); +        pthread_rwlock_rdlock(&instance->state_lock);          pid = ipcp_create(ap_name, ipcp_type);          if (pid == -1) { -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("Failed to create IPCP.");                  return -1;          }          tmp = ipcp_entry_create();          if (tmp == NULL) { -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          } @@ -455,25 +455,25 @@ static pid_t create_ipcp(char *         ap_name,          tmp->api = instance_name_create();          if (tmp->api == NULL) {                  ipcp_entry_destroy(tmp); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          }          if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) {                  instance_name_destroy(tmp->api);                  ipcp_entry_destroy(tmp); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          }          tmp->dif_name = NULL; -        rw_lock_wrlock(&instance->reg_lock); +        pthread_rwlock_wrlock(&instance->reg_lock);          list_add(&tmp->next, &instance->ipcps); -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->reg_lock); +        pthread_rwlock_unlock(&instance->state_lock);          LOG_INFO("Created IPCP %s-%d.", ap_name, pid); @@ -489,15 +489,10 @@ static int destroy_ipcp(instance_name_t * api)          if (api == NULL)                  return 0; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_wrlock(&instance->reg_lock); -          if (api->id == 0)                  api = get_ipcp_by_name(api->name);          if (api == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock);                  LOG_ERR("No such IPCP in the system.");                  return 0;          } @@ -516,8 +511,6 @@ static int destroy_ipcp(instance_name_t * api)                  }          } -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_unlock(&instance->state_lock);          LOG_INFO("Destroyed IPCP %d.", pid); @@ -529,46 +522,46 @@ static int bootstrap_ipcp(instance_name_t *  api,  {          struct ipcp_entry * entry = NULL; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_wrlock(&instance->reg_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_wrlock(&instance->reg_lock);          if (api->id == 0)                  api = get_ipcp_by_name(api->name);          if (api == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("No such IPCP in the system.");                  return -1;          }          entry = get_ipcp_entry_by_name(api);          if (entry == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("No such IPCP.");                  return -1;          }          entry->dif_name = strdup(conf->dif_name);          if (entry->dif_name == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("Failed to strdup.");                  return -1;          }          if (ipcp_bootstrap(entry->api->id, conf)) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("Could not bootstrap IPCP.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1;          } -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->reg_lock); +        pthread_rwlock_unlock(&instance->state_lock);          LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.",                   api->name, api->id, conf->dif_name); @@ -584,21 +577,21 @@ static int enroll_ipcp(instance_name_t  * api,          ssize_t n_1_difs_size = 0;          struct ipcp_entry * entry = NULL; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_rdlock(&instance->reg_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_rdlock(&instance->reg_lock);          entry = get_ipcp_entry_by_name(api);          if (entry == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("No such IPCP.");                  return -1;          }          entry->dif_name = strdup(dif_name);          if (entry->dif_name == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("Failed to strdup.");                  return -1;          } @@ -607,8 +600,8 @@ static int enroll_ipcp(instance_name_t  * api,          if (member == NULL) {                  free(entry->dif_name);                  entry->dif_name = NULL; -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          } @@ -616,8 +609,8 @@ static int enroll_ipcp(instance_name_t  * api,          if (n_1_difs_size < 1) {                  free(entry->dif_name);                  entry->dif_name = NULL; -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("Could not find N-1 DIFs.");                  return -1;          } @@ -625,14 +618,14 @@ static int enroll_ipcp(instance_name_t  * api,          if (ipcp_enroll(api->id, member, n_1_difs[0])) {                  free(entry->dif_name);                  entry->dif_name = NULL; -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("Could not enroll IPCP.");                  return -1;          } -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->reg_lock); +        pthread_rwlock_unlock(&instance->state_lock);          LOG_INFO("Enrolled IPCP %s-%d in DIF %s.",                   api->name, api->id, dif_name); @@ -657,25 +650,25 @@ static int ap_reg(char *  name,          instance_name_t * api   = NULL;          char ** argv_dup = NULL; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_wrlock(&instance->reg_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_wrlock(&instance->reg_lock);          if (instance->ipcps.next == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          }          api = instance_name_create();          if (api == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          }          if (instance_name_init_from(api, path_strip(ap_name), ap_id) == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  instance_name_destroy(api);                  return -1;          } @@ -683,8 +676,8 @@ static int ap_reg(char *  name,          /* check if this name is already registered */          rne = get_reg_name_entry_by_name(name);          if (rne != NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  instance_name_destroy(api);                  return -1; /* can only register one instance for now */          } @@ -711,8 +704,8 @@ static int ap_reg(char *  name,                  }          }          if (ret ==  0) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  instance_name_destroy(api);                  return -1;          } @@ -735,8 +728,8 @@ static int ap_reg(char *  name,              < 0)                  LOG_DBGF("Failed to add application %s.", api->name); -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->reg_lock); +        pthread_rwlock_unlock(&instance->state_lock);          return ret;  } @@ -756,8 +749,8 @@ static int ap_unreg(char *  name,          if (name == NULL || len == 0 || difs == NULL || difs[0] == NULL)                  return -1; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_wrlock(&instance->reg_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_wrlock(&instance->reg_lock);          if (!hard && strcmp(difs[0], "*") != 0) {                  LOG_INFO("Unregistration not complete yet."); @@ -787,8 +780,8 @@ static int ap_unreg(char *  name,          reg_name_entry_del_name(rne->name); -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->reg_lock); +        pthread_rwlock_unlock(&instance->state_lock);          return ret;  } @@ -800,13 +793,13 @@ static struct port_map_entry * flow_accept(pid_t    pid,          struct port_map_entry * pme;          struct reg_name_entry * rne = NULL; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_rdlock(&instance->reg_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_wrlock(&instance->reg_lock);          rne = get_reg_name_entry_by_ap_name(srv_ap_name);          if (rne == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_DBGF("AP %s is unknown.", srv_ap_name);                  return NULL;          } @@ -814,16 +807,16 @@ static struct port_map_entry * flow_accept(pid_t    pid,          if (rne->api->id == 0) {                  rne->api->id = pid;          } else if (rne->api->id != pid) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_DBGF("Can only register one instance.");                  LOG_MISSING;                  return NULL;          }          if (rne->accept) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_DBGF("This AP still has a pending accept().");                  return NULL;          } @@ -833,18 +826,17 @@ static struct port_map_entry * flow_accept(pid_t    pid,          pthread_cond_broadcast(&rne->acc_signal); -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->reg_lock); +        pthread_rwlock_unlock(&instance->state_lock);          pthread_mutex_lock(&rne->acc_lock);          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void*) &rne->acc_lock); +                             (void *) &rne->acc_lock);          while (rne->flow_arrived == -1)                  pthread_cond_wait(&rne->acc_arr_signal, &rne->acc_lock); -        pthread_mutex_unlock(&rne->acc_lock); -        pthread_cleanup_pop(0); +        pthread_cleanup_pop(true);          pthread_mutex_lock(&rne->acc_lock); @@ -856,13 +848,13 @@ static struct port_map_entry * flow_accept(pid_t    pid,          pthread_mutex_unlock(&rne->acc_lock); -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_rdlock(&instance->flows_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_rdlock(&instance->flows_lock);          pme = get_port_map_entry_n(pid);          if (pme == NULL) { -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("Port_id was not created yet.");                  return NULL;          } @@ -870,8 +862,8 @@ static struct port_map_entry * flow_accept(pid_t    pid,          if (dst_ae_name != NULL)                  *dst_ae_name = rne->req_ae_name; -        rw_lock_unlock(&instance->flows_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->flows_lock); +        pthread_rwlock_unlock(&instance->state_lock);          return pme;  } @@ -884,20 +876,20 @@ static int flow_alloc_resp(pid_t n_pid,          struct reg_name_entry * rne = NULL;          int ret = -1; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_rdlock(&instance->reg_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_rdlock(&instance->reg_lock);          rne = get_reg_name_entry_by_id(n_pid);          if (rne == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          }          /* FIXME: check all instances associated with the name */          if (!rne->accept) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_ERR("No process listening for this name.");                  return -1;          } @@ -914,28 +906,30 @@ static int flow_alloc_resp(pid_t n_pid,          pthread_mutex_unlock(&rne->acc_lock); -        rw_lock_unlock(&instance->reg_lock); +        pthread_rwlock_unlock(&instance->reg_lock);          if (!response) { -                rw_lock_wrlock(&instance->flows_lock); +                pthread_rwlock_wrlock(&instance->flows_lock);                  pme = get_port_map_entry(port_id);                  if (pme == NULL) { -                        rw_lock_unlock(&instance->flows_lock); -                        rw_lock_unlock(&instance->state_lock); +                        pthread_rwlock_unlock(&instance->flows_lock); +                        pthread_rwlock_unlock(&instance->state_lock);                          return -1;                  }                  pme->state = FLOW_ALLOCATED; + +                pthread_rwlock_unlock(&instance->flows_lock); +                  ret = ipcp_flow_alloc_resp(pme->n_1_pid,                                             port_id,                                             pme->n_pid,                                             response); -                rw_lock_unlock(&instance->flows_lock);          } -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->state_lock);          return ret;  } @@ -962,30 +956,30 @@ static struct port_map_entry * flow_alloc(pid_t  pid,          if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)                  LOG_WARN("Failed to set timestamp."); -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_rdlock(&instance->reg_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_rdlock(&instance->reg_lock);          if (qos != NULL)                  dif_name = qos->dif_name;          ipcp = get_ipcp_by_dst_name(dst_name, dif_name);          if (ipcp == NULL) { -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_DBG("Unknown DIF name.");                  return NULL;          } -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_wrlock(&instance->flows_lock); +        pthread_rwlock_unlock(&instance->reg_lock); +        pthread_rwlock_wrlock(&instance->flows_lock);          pme->port_id = bmp_allocate(instance->port_ids);          pme->n_1_pid = ipcp->id;          list_add(&pme->next, &instance->port_map); -        rw_lock_unlock(&instance->flows_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->flows_lock); +        pthread_rwlock_unlock(&instance->state_lock);          if (ipcp_flow_alloc(ipcp->id,                              pme->port_id, @@ -993,12 +987,12 @@ static struct port_map_entry * flow_alloc(pid_t  pid,                              dst_name,                              src_ae_name,                              QOS_CUBE_BE) < 0) { -                rw_lock_rdlock(&instance->state_lock); -                rw_lock_wrlock(&instance->flows_lock); +                pthread_rwlock_rdlock(&instance->state_lock); +                pthread_rwlock_wrlock(&instance->flows_lock);                  list_del(&pme->next);                  bmp_release(instance->port_ids, pme->port_id); -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  free(pme);                  return NULL;          } @@ -1010,30 +1004,30 @@ static int flow_alloc_res(int port_id)  {          struct port_map_entry * e; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_rdlock(&instance->flows_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_rdlock(&instance->flows_lock);          e = get_port_map_entry(port_id);          if (e == NULL) { -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          }          if (e->state == FLOW_NULL) { -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          }          if (e->state == FLOW_ALLOCATED) { -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return 0;          } -        rw_lock_unlock(&instance->flows_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->flows_lock); +        pthread_rwlock_unlock(&instance->state_lock);          while (true) {                  pthread_mutex_lock(&e->res_lock); @@ -1042,35 +1036,34 @@ static int flow_alloc_res(int port_id)                  pthread_cond_wait(&e->res_signal, &e->res_lock); -                pthread_mutex_unlock(&e->res_lock); -                pthread_cleanup_pop(0); +                pthread_cleanup_pop(true); -                rw_lock_rdlock(&instance->state_lock); -                rw_lock_wrlock(&instance->flows_lock); +                pthread_rwlock_rdlock(&instance->state_lock); +                pthread_rwlock_wrlock(&instance->flows_lock);                  e = get_port_map_entry(port_id);                  if (e == NULL) { -                        rw_lock_unlock(&instance->flows_lock); -                        rw_lock_unlock(&instance->state_lock); +                        pthread_rwlock_unlock(&instance->flows_lock); +                        pthread_rwlock_unlock(&instance->state_lock);                          return -1;                  }                  if (e->state == FLOW_ALLOCATED) { -                        rw_lock_unlock(&instance->flows_lock); -                        rw_lock_unlock(&instance->state_lock); +                        pthread_rwlock_unlock(&instance->flows_lock); +                        pthread_rwlock_unlock(&instance->state_lock);                          return 0;                  }                  if (e->state == FLOW_NULL) {                          /* don't release the port_id, AP has to call dealloc */                          list_del(&e->next); -                        rw_lock_unlock(&instance->flows_lock); -                        rw_lock_unlock(&instance->state_lock); +                        pthread_rwlock_unlock(&instance->flows_lock); +                        pthread_rwlock_unlock(&instance->state_lock);                          free(e);                          return -1;                  }                  /* still pending, spurious wake */ -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);          }          return 0; @@ -1083,15 +1076,14 @@ static int flow_dealloc(int port_id)          struct port_map_entry * e = NULL; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_wrlock(&instance->flows_lock); - +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_wrlock(&instance->flows_lock);          bmp_release(instance->port_ids, port_id);          e = get_port_map_entry(port_id);          if (e == NULL) { -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return 0;          } @@ -1099,10 +1091,11 @@ static int flow_dealloc(int port_id)          list_del(&e->next); +        pthread_rwlock_unlock(&instance->flows_lock); +          ret = ipcp_flow_dealloc(n_1_pid, port_id); -        rw_lock_unlock(&instance->flows_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->state_lock);          free(e); @@ -1148,17 +1141,17 @@ static struct port_map_entry * flow_req_arr(pid_t  pid,          pme->state   = FLOW_PENDING;          pme->n_1_pid = pid; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_rdlock(&instance->reg_lock); -        rw_lock_wrlock(&instance->flows_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_rdlock(&instance->reg_lock); +        pthread_rwlock_wrlock(&instance->flows_lock);          pme->port_id = bmp_allocate(instance->port_ids);          rne = get_reg_name_entry_by_name(dst_name);          if (rne == NULL) { -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->reg_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->reg_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  LOG_DBGF("Destination name %s unknown.", dst_name);                  free(pme);                  return NULL; @@ -1168,13 +1161,20 @@ static struct port_map_entry * flow_req_arr(pid_t  pid,          list_add(&pme->next, &instance->port_map); +        pthread_rwlock_unlock(&instance->flows_lock); +          pthread_mutex_lock(&rne->acc_lock); +        pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, +                             (void *) &rne->acc_lock); +          rne->req_ae_name = ae_name;          if (rne->accept == false) {                  if (rne->autoexec) { +                        pthread_rwlock_wrlock(&instance->flows_lock);                          pme->n_pid = auto_execute(rne->api->name, rne->argv); +                        pthread_rwlock_unlock(&instance->flows_lock);                          while (rne->accept == false)                                  pthread_cond_wait(&rne->acc_signal,                                                    &rne->acc_lock); @@ -1188,21 +1188,19 @@ static struct port_map_entry * flow_req_arr(pid_t  pid,          rne->flow_arrived = 0; -        pthread_mutex_unlock(&rne->acc_lock); -          if (pthread_cond_signal(&rne->acc_arr_signal))                  LOG_ERR("Failed to send signal."); +        pthread_cleanup_pop(true); +          while (acc_wait) { -                sched_yield();                  pthread_mutex_lock(&rne->acc_lock);                  acc_wait = (rne->flow_arrived != -1);                  pthread_mutex_unlock(&rne->acc_lock);          } -        rw_lock_unlock(&instance->flows_lock); -        rw_lock_unlock(&instance->reg_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->reg_lock); +        pthread_rwlock_unlock(&instance->state_lock);          return pme;  } @@ -1212,13 +1210,13 @@ static int flow_alloc_reply(int port_id,  {          struct port_map_entry * e; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_rdlock(&instance->flows_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_rdlock(&instance->flows_lock);          e = get_port_map_entry(port_id);          if (e == NULL) { -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return -1;          } @@ -1235,8 +1233,8 @@ static int flow_alloc_reply(int port_id,          pthread_mutex_unlock(&e->res_lock); -        rw_lock_unlock(&instance->flows_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->flows_lock); +        pthread_rwlock_unlock(&instance->state_lock);          return 0;  } @@ -1245,48 +1243,46 @@ static int flow_dealloc_ipcp(int port_id)  {          struct port_map_entry * e = NULL; -        rw_lock_rdlock(&instance->state_lock); -        rw_lock_wrlock(&instance->flows_lock); +        pthread_rwlock_rdlock(&instance->state_lock); +        pthread_rwlock_wrlock(&instance->flows_lock);          e = get_port_map_entry(port_id);          if (e == NULL) { -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  return 0;          }          list_del(&e->next); -        rw_lock_unlock(&instance->flows_lock); -        rw_lock_unlock(&instance->state_lock); +        pthread_rwlock_unlock(&instance->flows_lock); +        pthread_rwlock_unlock(&instance->state_lock);          free(e);          return 0;  } -static void irm_destroy(struct irm * irm) +static void irm_destroy()  {          struct list_head * h;          struct list_head * t; -        if (irm == NULL) -                return; - -        rw_lock_wrlock(&irm->state_lock); +        pthread_rwlock_wrlock(&instance->state_lock); +        pthread_rwlock_wrlock(&instance->reg_lock); -        if (irm->threadpool != NULL) -                free(irm->threadpool); +        if (instance->threadpool != NULL) +                free(instance->threadpool); -        if (irm->port_ids != NULL) -                bmp_destroy(irm->port_ids); +        if (instance->port_ids != NULL) +                bmp_destroy(instance->port_ids);          /* clear the lists */ -        list_for_each_safe(h, t, &irm->ipcps) { +        list_for_each_safe(h, t, &instance->ipcps) {                  struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next);                  destroy_ipcp(e->api);          } -        list_for_each_safe(h, t, &irm->reg_names) { +        list_for_each_safe(h, t, &instance->reg_names) {                  struct reg_name_entry * e = list_entry(h,                                                         struct reg_name_entry,                                                         next); @@ -1294,7 +1290,11 @@ static void irm_destroy(struct irm * irm)                  reg_name_entry_destroy(e);          } -        list_for_each_safe(h, t, &irm->port_map) { +        pthread_rwlock_unlock(&instance->reg_lock); + +        pthread_rwlock_wrlock(&instance->flows_lock); + +        list_for_each_safe(h, t, &instance->port_map) {                  struct port_map_entry * e = list_entry(h,                                                         struct port_map_entry,                                                         next); @@ -1303,15 +1303,16 @@ static void irm_destroy(struct irm * irm)                  free(e);          } +        pthread_rwlock_unlock(&instance->flows_lock); -        if (irm->dum != NULL) -                shm_du_map_destroy(irm->dum); +        if (instance->dum != NULL) +                shm_du_map_destroy(instance->dum); -        close(irm->sockfd); +        close(instance->sockfd); -        rw_lock_unlock(&irm->state_lock); +        pthread_rwlock_unlock(&instance->state_lock); -        free(irm); +        free(instance);  } @@ -1323,7 +1324,11 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)          case SIGINT:          case SIGTERM:          case SIGHUP: -                rw_lock_wrlock(&instance->state_lock); +                pthread_rwlock_wrlock(&instance->state_lock); + + + +                pthread_rwlock_unlock(&instance->state_lock);                  if (instance->threadpool != NULL) {                          for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) @@ -1331,9 +1336,9 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)                  } +                pthread_cancel(instance->shm_sanitize);                  pthread_cancel(instance->cleanup_flows); -                rw_lock_unlock(&instance->state_lock);                  break;          case SIGPIPE: @@ -1355,13 +1360,13 @@ void * irm_flow_cleaner()                  if(clock_gettime(CLOCK_MONOTONIC, &now) < 0)                          LOG_WARN("Failed to get time.");                  /* cleanup stale PENDING flows */ -                rw_lock_rdlock(&instance->state_lock); -                rw_lock_wrlock(&instance->flows_lock); + +                pthread_rwlock_rdlock(&instance->state_lock); +                pthread_rwlock_wrlock(&instance->flows_lock);                  list_for_each_safe(pos, n, &(instance->port_map)) {                          struct port_map_entry * e =                                  list_entry(pos, struct port_map_entry, next); -                          pthread_mutex_lock(&e->res_lock);                          if (e->state == FLOW_PENDING && @@ -1369,9 +1374,12 @@ void * irm_flow_cleaner()                                  LOG_DBGF("Pending port_id %d timed out.",                                           e->port_id);                                  e->state = FLOW_NULL; -                                pthread_cond_broadcast(&e->res_signal); +                                pthread_cond_signal(&e->res_signal); +                                pthread_mutex_unlock(&e->res_lock); +                                continue;                          } +                        pthread_mutex_unlock(&e->res_lock);                          if (kill(e->n_pid, 0) < 0) {                                  bmp_release(instance->port_ids, e->port_id); @@ -1388,17 +1396,20 @@ void * irm_flow_cleaner()                                          e->n_1_pid, e->port_id);                                  free(e);                          } - -                        pthread_mutex_unlock(&e->res_lock);                  } -                rw_lock_unlock(&instance->flows_lock); -                rw_lock_unlock(&instance->state_lock); +                pthread_rwlock_unlock(&instance->flows_lock); +                pthread_rwlock_unlock(&instance->state_lock);                  nanosleep(&timeout, NULL);          }  } +void clean_msg(void * msg) +{ +        irm_msg__free_unpacked(msg, NULL); +} +  void * mainloop()  {          uint8_t buf[IRM_MSG_BUF_SIZE]; @@ -1433,6 +1444,8 @@ void * mainloop()                          continue;                  } +                pthread_cleanup_push(clean_msg, (void *) msg); +                  api.name = msg->ap_name;                  if (msg->has_api_id == true)                          api.id = msg->api_id; @@ -1542,7 +1555,7 @@ void * mainloop()                          break;                  } -                irm_msg__free_unpacked(msg, NULL); +                pthread_cleanup_pop(true);                  buffer.size = irm_msg__get_packed_size(&ret_msg);                  if (buffer.size == 0) { @@ -1574,8 +1587,8 @@ static struct irm * irm_create()  {          struct stat st = {0}; -        struct irm * i = malloc(sizeof(*i)); -        if (i == NULL) +        instance = malloc(sizeof(*instance)); +        if (instance == NULL)                  return NULL;          if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) { @@ -1583,6 +1596,7 @@ static struct irm * irm_create()                  if (dum == NULL) {                          LOG_ERR("Could not examine existing shm file."); +                        free(instance);                          exit(EXIT_FAILURE);                  } @@ -1592,68 +1606,71 @@ static struct irm * irm_create()                          LOG_INFO("Stale shm file removed.");                  } else {                          LOG_INFO("IRMd already running, exiting."); -                        free(i); +                        free(instance);                          exit(EXIT_SUCCESS);                  }          } -        if (rw_lock_init(&i->state_lock)) { -                irm_destroy(i); +        if (pthread_rwlock_init(&instance->state_lock, NULL)) { +                LOG_ERR("Failed to initialize rwlock."); +                free(instance); +                return NULL; +        } + +        if (pthread_rwlock_init(&instance->reg_lock, NULL)) { +                LOG_ERR("Failed to initialize rwlock."); +                free(instance); +                return NULL; +        } + +        if (pthread_rwlock_init(&instance->flows_lock, NULL)) { +                LOG_ERR("Failed to initialize rwlock."); +                free(instance);                  return NULL;          } -        i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); -        if (i->threadpool == NULL) { -                irm_destroy(i); +        instance->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); +        if (instance->threadpool == NULL) { +                irm_destroy();                  return NULL;          } -        if ((i->dum = shm_du_map_create()) == NULL) { -                irm_destroy(i); +        if ((instance->dum = shm_du_map_create()) == NULL) { +                irm_destroy();                  return NULL;          } -        INIT_LIST_HEAD(&i->ipcps); -        INIT_LIST_HEAD(&i->reg_names); -        INIT_LIST_HEAD(&i->port_map); +        INIT_LIST_HEAD(&instance->ipcps); +        INIT_LIST_HEAD(&instance->reg_names); +        INIT_LIST_HEAD(&instance->port_map); -        i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); -        if (i->port_ids == NULL) { -                irm_destroy(i); +        instance->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); +        if (instance->port_ids == NULL) { +                irm_destroy();                  return NULL;          }          if (stat(SOCK_PATH, &st) == -1) {                  if (mkdir(SOCK_PATH, 0777)) {                          LOG_ERR("Failed to create sockets directory."); -                        irm_destroy(i); +                        irm_destroy();                          return NULL;                  }          } -        i->sockfd = server_socket_open(IRM_SOCK_PATH); -        if (i->sockfd < 0) { -                irm_destroy(i); +        instance->sockfd = server_socket_open(IRM_SOCK_PATH); +        if (instance->sockfd < 0) { +                irm_destroy();                  return NULL;          }          if (chmod(IRM_SOCK_PATH, 0666)) {                  LOG_ERR("Failed to chmod socket."); -                irm_destroy(i); -                return NULL; -        } - -        if (rw_lock_init(&i->reg_lock)) { -                irm_destroy(i); +                irm_destroy();                  return NULL;          } -        if (rw_lock_init(&i->flows_lock)) { -                irm_destroy(i); -                return NULL; -        } - -        return i; +        return instance;  }  int main() @@ -1687,18 +1704,21 @@ int main()          if (instance == NULL)                  exit(EXIT_FAILURE); -        pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL); -          for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)                  pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); +        pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL); +        pthread_create(&instance->shm_sanitize, NULL, +                       shm_du_map_sanitize, NULL); +          /* wait for (all of them) to return */          for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)                  pthread_join(instance->threadpool[t], NULL); +        pthread_join(instance->shm_sanitize, NULL);          pthread_join(instance->cleanup_flows, NULL); -        irm_destroy(instance); +        irm_destroy();          exit(EXIT_SUCCESS);  } | 
