diff options
| -rw-r--r-- | include/ouroboros/shm_rbuff.h | 2 | ||||
| -rw-r--r-- | src/irmd/irm_flow.c | 6 | ||||
| -rw-r--r-- | src/irmd/irm_flow.h | 20 | ||||
| -rw-r--r-- | src/irmd/main.c | 65 | ||||
| -rw-r--r-- | src/lib/dev.c | 60 | ||||
| -rw-r--r-- | src/lib/shm_flow_set.c | 1 | ||||
| -rw-r--r-- | src/lib/shm_rbuff.c | 37 | 
7 files changed, 106 insertions, 85 deletions
| diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index 03660b88..4c4e8c64 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -28,7 +28,7 @@  struct shm_rbuff; -struct shm_rbuff * shm_rbuff_create(int port_id); +struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id);  struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id); diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index df1302b4..dc5d22d8 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -36,6 +36,9 @@ struct irm_flow * irm_flow_create()          f->n_api   = -1;          f->n_1_api = -1;          f->port_id = -1; +        f->n_rb    = NULL; +        f->n_1_rb  = NULL; +          f->state   = FLOW_NULL;          if (pthread_cond_init(&f->state_cond, NULL)) { @@ -78,6 +81,9 @@ void irm_flow_destroy(struct irm_flow * f)          pthread_cond_destroy(&f->state_cond);          pthread_mutex_destroy(&f->state_lock); +        shm_rbuff_destroy(f->n_rb); +        shm_rbuff_destroy(f->n_1_rb); +          free(f);  } diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index 5ec6d90e..507295bd 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -24,6 +24,7 @@  #define OUROBOROS_IRMD_IRM_FLOW_H  #include <ouroboros/list.h> +#include <ouroboros/shm_rbuff.h>  #include <sys/types.h>  #include <pthread.h> @@ -38,18 +39,21 @@ enum flow_state {  };  struct irm_flow { -        struct list_head next; +        struct list_head   next; -        int              port_id; +        int                port_id; -        pid_t            n_api; -        pid_t            n_1_api; +        pid_t              n_api; +        pid_t              n_1_api; -        struct timespec  t0; +        struct shm_rbuff * n_rb; +        struct shm_rbuff * n_1_rb; -        enum flow_state  state; -        pthread_cond_t   state_cond; -        pthread_mutex_t  state_lock; +        struct timespec    t0; + +        enum flow_state    state; +        pthread_cond_t     state_cond; +        pthread_mutex_t    state_lock;  };  struct irm_flow * irm_flow_create(); diff --git a/src/irmd/main.c b/src/irmd/main.c index 67941e41..8d9d04ac 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1164,6 +1164,24 @@ static struct irm_flow * flow_alloc(pid_t  api,          port_id = f->port_id = bmp_allocate(irmd->port_ids);          f->n_1_api = ipcp; +        f->n_rb = shm_rbuff_create(api, port_id); +        if (f->n_rb == NULL) { +                pthread_rwlock_unlock(&irmd->flows_lock); +                pthread_rwlock_unlock(&irmd->state_lock); +                LOG_ERR("Could not create ringbuffer for AP-I %d.", api); +                irm_flow_destroy(f); +                return NULL; +        } + +        f->n_1_rb = shm_rbuff_create(ipcp, port_id); +        if (f->n_1_rb == NULL) { +                pthread_rwlock_unlock(&irmd->flows_lock); +                pthread_rwlock_unlock(&irmd->state_lock); +                LOG_ERR("Could not create ringbuffer for AP-I %d.", ipcp); +                irm_flow_destroy(f); +                return NULL; +        } +          list_add(&f->next, &irmd->irm_flows);          pthread_rwlock_unlock(&irmd->flows_lock); @@ -1346,7 +1364,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,                  pthread_rwlock_unlock(&irmd->reg_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  LOG_ERR("Unknown name: %s.", dst_name); -                free(f); +                irm_flow_destroy(f);                  return NULL;          } @@ -1359,14 +1377,14 @@ static struct irm_flow * flow_req_arr(pid_t  api,                  pthread_rwlock_unlock(&irmd->reg_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  LOG_ERR("No AP's for %s.", dst_name); -                free(f); +                irm_flow_destroy(f);                  return NULL;          case REG_NAME_AUTO_ACCEPT:                  c_api = malloc(sizeof(*c_api));                  if (c_api == NULL) {                          pthread_rwlock_unlock(&irmd->reg_lock);                          pthread_rwlock_unlock(&irmd->state_lock); -                        free(f); +                        irm_flow_destroy(f);                          return NULL;                  } @@ -1384,7 +1402,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,                          pthread_rwlock_unlock(&irmd->state_lock);                          LOG_ERR("Could not get start apn for reg_entry %s.",                                  re->name); -                        free(f); +                        irm_flow_destroy(f);                          free(c_api);                          return NULL;                  } @@ -1411,6 +1429,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,                          pthread_mutex_unlock(&re->state_lock);                          pthread_rwlock_unlock(&irmd->reg_lock);                          pthread_rwlock_unlock(&irmd->state_lock); +                        irm_flow_destroy(f);                          return NULL;                  } @@ -1424,6 +1443,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,                          pthread_rwlock_unlock(&irmd->reg_lock);                          pthread_rwlock_unlock(&irmd->state_lock);                          LOG_ERR("Invalid api returned."); +                        irm_flow_destroy(f);                          return NULL;                  } @@ -1432,7 +1452,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,                  pthread_rwlock_unlock(&irmd->reg_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  LOG_ERR("IRMd in wrong state."); -                free(f); +                irm_flow_destroy(f);                  return NULL;          } @@ -1441,6 +1461,26 @@ static struct irm_flow * flow_req_arr(pid_t  api,          pthread_rwlock_wrlock(&irmd->flows_lock);          f->port_id = bmp_allocate(irmd->port_ids); +        f->n_rb = shm_rbuff_create(f->n_api, f->port_id); +        if (f->n_rb == NULL) { +                bmp_release(irmd->port_ids, f->port_id); +                pthread_rwlock_unlock(&irmd->flows_lock); +                pthread_rwlock_unlock(&irmd->state_lock); +                LOG_ERR("Could not create ringbuffer for AP-I %d.", f->n_api); +                irm_flow_destroy(f); +                return NULL; +        } + +        f->n_1_rb = shm_rbuff_create(f->n_1_api, f->port_id); +        if (f->n_1_rb == NULL) { +                bmp_release(irmd->port_ids, f->port_id); +                pthread_rwlock_unlock(&irmd->flows_lock); +                pthread_rwlock_unlock(&irmd->state_lock); +                LOG_ERR("Could not create ringbuffer for AP-I %d.", f->n_1_api); +                irm_flow_destroy(f); +                return NULL; +        } +          list_add(&f->next, &irmd->irm_flows);          pthread_rwlock_unlock(&irmd->flows_lock); @@ -1455,10 +1495,13 @@ static struct irm_flow * flow_req_arr(pid_t  api,          e = api_table_get(&irmd->api_table, h_api);          if (e == NULL) { -                LOG_ERR("Could not get api table entry for %d.", h_api);                  pthread_rwlock_unlock(&irmd->reg_lock); +                pthread_rwlock_wrlock(&irmd->flows_lock); +                bmp_release(irmd->port_ids, f->port_id); +                pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock); -                free(f); +                LOG_ERR("Could not get api table entry for %d.", h_api); +                irm_flow_destroy(f);                  return NULL;          } @@ -1692,26 +1735,18 @@ void * irm_sanitize()                          }                          if (kill(f->n_api, 0) < 0) { -                                struct shm_rbuff * rb = -                                        shm_rbuff_open(f->n_api, f->port_id);                                  bmp_release(irmd->port_ids, f->port_id);                                  list_del(&f->next);                                  LOG_INFO("AP-I %d gone, flow %d deallocated.",                                           f->n_api, f->port_id);                                  ipcp_flow_dealloc(f->n_1_api, f->port_id); -                                if (rb != NULL) -                                        shm_rbuff_destroy(rb);                                  irm_flow_destroy(f);                                  continue;                          }                          if (kill(f->n_1_api, 0) < 0) { -                                struct shm_rbuff * rb = -                                        shm_rbuff_open(f->n_1_api, f->port_id);                                  list_del(&f->next);                                  LOG_ERR("IPCP %d gone, flow %d removed.",                                          f->n_1_api, f->port_id); -                                if (rb != NULL) -                                        shm_rbuff_destroy(rb);                                  irm_flow_destroy(f);                          }                  } diff --git a/src/lib/dev.c b/src/lib/dev.c index f735e72b..146070b7 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -288,7 +288,7 @@ void ap_fini()                          int idx;                          while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)                                  shm_rdrbuff_remove(ai.rdrb, idx); -                        shm_rbuff_destroy(ai.flows[i].rx_rb); +                        shm_rbuff_close(ai.flows[i].rx_rb);                          shm_rbuff_close(ai.flows[i].tx_rb);                          shm_flow_set_close(ai.flows[i].set);                  } @@ -349,7 +349,7 @@ int flow_accept(char ** ae_name)                  return -1;          } -        ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id); +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);          if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock); @@ -361,7 +361,7 @@ int flow_accept(char ** ae_name)          ai.flows[fd].set = shm_flow_set_open(recv_msg->api);          if (ai.flows[fd].set == NULL) {                  bmp_release(ai.fds, fd); -                shm_rbuff_destroy(ai.flows[fd].rx_rb); +                shm_rbuff_close(ai.flows[fd].rx_rb);                  shm_rbuff_close(ai.flows[fd].tx_rb);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -373,7 +373,7 @@ int flow_accept(char ** ae_name)          if (ae_name != NULL) {                  *ae_name = strdup(recv_msg->ae_name);                  if (*ae_name == NULL) { -                        shm_rbuff_destroy(ai.flows[fd].tx_rb); +                        shm_rbuff_close(ai.flows[fd].tx_rb);                          shm_rbuff_close(ai.flows[fd].tx_rb);                          shm_flow_set_close(ai.flows[fd].set);                          bmp_release(ai.fds, fd); @@ -508,7 +508,7 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)          ai.flows[fd].port_id = recv_msg->port_id;          ai.flows[fd].oflags  = FLOW_O_DEFAULT;          ai.flows[fd].api     = recv_msg->api; -        ai.flows[fd].rx_rb   = shm_rbuff_create(recv_msg->port_id); +        ai.flows[fd].rx_rb   = shm_rbuff_open(ai.api, recv_msg->port_id);          if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock); @@ -517,27 +517,6 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)                  return -1;          } -        ai.flows[fd].tx_rb   = shm_rbuff_open(recv_msg->api, recv_msg->port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                shm_rbuff_destroy(ai.flows[fd].rx_rb); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); -        if (ai.flows[fd].set == NULL) { -                shm_rbuff_close(ai.flows[fd].tx_rb); -                shm_rbuff_destroy(ai.flows[fd].rx_rb); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } -          ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -572,6 +551,23 @@ int flow_alloc_res(int fd)          msg.port_id = ai.flows[fd].port_id; +        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, +                                            ai.flows[fd].port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); +        if (ai.flows[fd].set == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } +          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -599,7 +595,7 @@ int flow_dealloc(int fd)          msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC;          msg.has_port_id  = true;          msg.has_api      = true; -        msg.api          = getpid(); +        msg.api          = ai.api;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_wrlock(&ai.flows_lock); @@ -621,7 +617,7 @@ int flow_dealloc(int fd)          port_destroy(&ai.ports[msg.port_id]);          ai.flows[fd].port_id = -1; -        shm_rbuff_destroy(ai.flows[fd].rx_rb); +        shm_rbuff_close(ai.flows[fd].rx_rb);          ai.flows[fd].rx_rb = NULL;          shm_rbuff_close(ai.flows[fd].tx_rb);          ai.flows[fd].tx_rb = NULL; @@ -990,7 +986,7 @@ int np1_flow_alloc(pid_t n_api, int port_id)                  return -1;          } -        ai.flows[fd].rx_rb = shm_rbuff_create(port_id); +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);          if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock); @@ -1046,7 +1042,7 @@ int np1_flow_resp(pid_t n_api, int port_id)          ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id);          if (ai.flows[fd].tx_rb == NULL) {                  ai.flows[fd].port_id = -1; -                shm_rbuff_destroy(ai.flows[fd].rx_rb); +                shm_rbuff_close(ai.flows[fd].rx_rb);                  port_destroy(&ai.ports[port_id]);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -1057,7 +1053,7 @@ int np1_flow_resp(pid_t n_api, int port_id)          if (ai.flows[fd].set == NULL) {                  shm_rbuff_close(ai.flows[fd].tx_rb);                  ai.flows[fd].port_id = -1; -                shm_rbuff_destroy(ai.flows[fd].rx_rb); +                shm_rbuff_close(ai.flows[fd].rx_rb);                  port_destroy(&ai.ports[port_id]);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -1143,7 +1139,7 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_wrlock(&ai.flows_lock); -        ai.flows[fd].rx_rb = shm_rbuff_create(port_id); +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);          if (ai.flows[fd].rx_rb == NULL) {                  ai.flows[fd].port_id = -1;                  port_destroy(&ai.ports[port_id]); diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index c960bd25..04de9fc5 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -315,7 +315,6 @@ int shm_flow_set_has(struct shm_flow_set * shm_set,          assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS);          assert(!(idx < 0) && idx < AP_MAX_FQUEUES); -          pthread_mutex_lock(shm_set->lock);          if (shm_set->mtable[port_id] == idx) diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index cf094488..a933fbff 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -68,7 +68,7 @@ struct shm_rbuff {          int               port_id;  /* port_id of the flow           */  }; -struct shm_rbuff * shm_rbuff_create(int port_id) +struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)  {          struct shm_rbuff *  rb;          int                 shm_fd; @@ -78,7 +78,7 @@ struct shm_rbuff * shm_rbuff_create(int port_id)          char                fn[FN_MAX_CHARS];          mode_t              mask; -        sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", getpid(), port_id); +        sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", api, port_id);          rb = malloc(sizeof(*rb));          if (rb == NULL) { @@ -148,9 +148,12 @@ struct shm_rbuff * shm_rbuff_create(int port_id)          *rb->head = 0;          *rb->tail = 0; -        rb->api = getpid(); +        rb->api = api;          rb->port_id = port_id; +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); +          return rb;  } @@ -221,36 +224,14 @@ void shm_rbuff_close(struct shm_rbuff * rb)  void shm_rbuff_destroy(struct shm_rbuff * rb)  {          char fn[25]; -        struct lockfile * lf = NULL; - -        assert(rb); -        if (rb->api != getpid()) { -                lf = lockfile_open(); -                if (lf == NULL) { -                        LOG_ERR("Failed to open lockfile."); -                        return; -                } - -                if (lockfile_owner(lf) == getpid()) { -                        LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", -                                 rb->api, getpid()); -                        lockfile_close(lf); -                } else { -                        LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", -                                getpid(), rb->api); -                        lockfile_close(lf); -                        return; -                } -        } +        if (rb == NULL) +                return;          sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id); -        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) -                LOG_DBG("Couldn't unmap shared memory."); -          if (shm_unlink(fn) == -1) -                LOG_DBG("Failed to unlink shm."); +                LOG_DBG("Failed to unlink shm %s.", fn);          free(rb);  } | 
