diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/irmd/main.c | 40 | ||||
| -rw-r--r-- | src/lib/shm_rbuff.c | 10 | 
2 files changed, 42 insertions, 8 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index 13bfa052..d8cb27fa 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -30,6 +30,7 @@  #include <ouroboros/utils.h>  #include <ouroboros/irm_config.h>  #include <ouroboros/lockfile.h> +#include <ouroboros/shm_flow_set.h>  #include <ouroboros/shm_rbuff.h>  #include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/bitmap.h> @@ -103,6 +104,18 @@ struct irm {          pthread_t            shm_sanitize;  } * irmd; +static void clear_irm_flow(struct irm_flow * f) { +        ssize_t idx; + +        assert(f); + +        while ((idx = shm_rbuff_read(f->n_rb)) >= 0) +                shm_rdrbuff_remove(irmd->rdrb, idx); + +        while ((idx = shm_rbuff_read(f->n_1_rb)) >= 0) +                shm_rdrbuff_remove(irmd->rdrb, idx); +} +  static struct irm_flow * get_irm_flow(int port_id)  {          struct list_head * pos = NULL; @@ -1170,6 +1183,7 @@ static struct irm_flow * flow_alloc(pid_t  api,                  pthread_rwlock_rdlock(&irmd->state_lock);                  pthread_rwlock_wrlock(&irmd->flows_lock);                  list_del(&f->next); +                clear_irm_flow(f);                  bmp_release(irmd->port_ids, f->port_id);                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock); @@ -1259,6 +1273,7 @@ static int flow_dealloc(pid_t api, int port_id)          if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) {                  list_del(&f->next); +                clear_irm_flow(f);                  irm_flow_destroy(f);                  bmp_release(irmd->port_ids, port_id);                  LOG_INFO("Completed deallocation of port_id %d by AP-I %d.", @@ -1458,6 +1473,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,          if (e == NULL) {                  pthread_rwlock_unlock(&irmd->reg_lock);                  pthread_rwlock_wrlock(&irmd->flows_lock); +                clear_irm_flow(f);                  bmp_release(irmd->port_ids, f->port_id);                  list_del(&f->next);                  pthread_rwlock_unlock(&irmd->flows_lock); @@ -1525,6 +1541,7 @@ static void irm_destroy(void)          list_for_each_safe(p, h, &irmd->irm_flows) {                  struct irm_flow * f = list_entry(p, struct irm_flow, next);                  list_del(&f->next); +                clear_irm_flow(f);                  irm_flow_destroy(f);          } @@ -1734,28 +1751,41 @@ void * irm_sanitize(void * o)                          if (irm_flow_get_state(f) == FLOW_ALLOC_PENDING                              && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { -                                bmp_release(irmd->port_ids, f->port_id);                                  list_del(&f->next);                                  LOG_DBG("Pending port_id %d timed out.",                                           f->port_id); +                                clear_irm_flow(f);                                  ipcp_flow_dealloc(f->n_1_api, f->port_id); +                                bmp_release(irmd->port_ids, f->port_id);                                  irm_flow_destroy(f);                                  continue;                          }                          if (kill(f->n_api, 0) < 0) { -                                bmp_release(irmd->port_ids, f->port_id); -                                list_del(&f->next); +                                struct shm_flow_set * set;                                  LOG_DBG("AP-I %d gone, flow %d deallocated.",                                           f->n_api, f->port_id); +                                set = shm_flow_set_open(f->n_api); +                                if (set != NULL) +                                        shm_flow_set_destroy(set); +                                f->n_api = -1; +                                irm_flow_set_state(f, FLOW_DEALLOC_PENDING);                                  ipcp_flow_dealloc(f->n_1_api, f->port_id); -                                irm_flow_destroy(f); +                                clear_irm_flow(f);                                  continue;                          } +                          if (kill(f->n_1_api, 0) < 0) { +                                struct shm_flow_set * set;                                  list_del(&f->next);                                  LOG_ERR("IPCP %d gone, flow %d removed.",                                          f->n_1_api, f->port_id); +                                set = shm_flow_set_open(f->n_api); +                                if (set != NULL) +                                        shm_flow_set_destroy(set); + +                                clear_irm_flow(f); +                                bmp_release(irmd->port_ids, f->port_id);                                  irm_flow_destroy(f);                          }                  } @@ -2064,7 +2094,6 @@ static int irm_create(void)                  if (kill(lockfile_owner(irmd->lf), 0) < 0) {                          LOG_INFO("IRMd didn't properly shut down last time."); -                        /* FIXME: do this for each QOS_CUBE in the system */                          shm_rdrbuff_destroy(shm_rdrbuff_open());                          LOG_INFO("Stale resources cleaned");                          lockfile_destroy(irmd->lf); @@ -2083,7 +2112,6 @@ static int irm_create(void)                  return -1;          } -        /* FIXME: create an rdrb for each QOS_CUBE in the system */          if ((irmd->rdrb = shm_rdrbuff_create()) == NULL) {                  irm_destroy();                  return -1; diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 1e97364c..5d6d30c7 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -224,9 +224,15 @@ void shm_rbuff_destroy(struct shm_rbuff * rb)  {          char fn[FN_MAX_CHARS]; -        if (rb == NULL) -                return; +        assert(rb); + +#ifdef CONFIG_OUROBOROS_DEBUG +        pthread_mutex_lock(rb->lock); +        assert(shm_rbuff_empty(rb)); + +        pthread_mutex_unlock(rb->lock); +#endif          sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id);          if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) | 
