diff options
| -rw-r--r-- | include/ouroboros/shm_flow_set.h | 2 | ||||
| -rw-r--r-- | src/irmd/proc_table.c | 55 | ||||
| -rw-r--r-- | src/irmd/proc_table.h | 21 | ||||
| -rw-r--r-- | src/lib/dev.c | 14 | ||||
| -rw-r--r-- | src/lib/shm_flow_set.c | 147 | 
5 files changed, 112 insertions, 127 deletions
| diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h index 45d372a0..77f64264 100644 --- a/include/ouroboros/shm_flow_set.h +++ b/include/ouroboros/shm_flow_set.h @@ -29,7 +29,7 @@  struct shm_flow_set; -struct shm_flow_set * shm_flow_set_create(void); +struct shm_flow_set * shm_flow_set_create(pid_t pid);  void                  shm_flow_set_destroy(struct shm_flow_set * set); diff --git a/src/irmd/proc_table.c b/src/irmd/proc_table.c index 6f9d8e20..27fbb505 100644 --- a/src/irmd/proc_table.c +++ b/src/irmd/proc_table.c @@ -50,41 +50,44 @@ struct proc_entry * proc_entry_create(pid_t  pid,          e = malloc(sizeof(*e));          if (e == NULL) -                return NULL; +                goto fail_malloc; -        list_head_init(&e->next); -        list_head_init(&e->names); - -        e->pid      = pid; -        e->prog     = prog; -        e->daf_name = NULL; - -        e->re       = NULL; - -        e->state    = PROC_INIT; - -        if (pthread_condattr_init(&cattr)) { -                free(e); -                return NULL; -        } +        if (pthread_condattr_init(&cattr)) +                goto fail_condattr;  #ifndef __APPLE__          pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);  #endif -        if (pthread_mutex_init(&e->lock, NULL)) { -                free(e); -                return NULL; -        } +        if (pthread_mutex_init(&e->lock, NULL)) +                goto fail_mutex; +        if (pthread_cond_init(&e->cond, &cattr)) +                goto fail_cond; -        if (pthread_cond_init(&e->cond, &cattr)) { -                pthread_mutex_destroy(&e->lock); -                free(e); -                return NULL; -        } +        e->set = shm_flow_set_create(pid); +        if (e->set == NULL) +                goto fail_set; + +        list_head_init(&e->next); +        list_head_init(&e->names); + +        e->pid      = pid; +        e->prog     = prog; +        e->re       = NULL; +        e->state    = PROC_INIT;          return e; + fail_set: +        pthread_cond_destroy(&e->cond);; + fail_cond: +        pthread_mutex_destroy(&e->lock); + fail_mutex: +        pthread_condattr_destroy(&cattr); + fail_condattr: +        free(e); + fail_malloc: +        return NULL;  }  static void cancel_proc_entry(void * o) @@ -124,6 +127,8 @@ void proc_entry_destroy(struct proc_entry * e)          pthread_mutex_unlock(&e->lock); +        shm_flow_set_destroy(e->set); +          pthread_cond_destroy(&e->cond);          pthread_mutex_destroy(&e->lock); diff --git a/src/irmd/proc_table.h b/src/irmd/proc_table.h index f3ef9aff..a18b0d8c 100644 --- a/src/irmd/proc_table.h +++ b/src/irmd/proc_table.h @@ -23,7 +23,8 @@  #ifndef OUROBOROS_IRMD_PROC_TABLE_H  #define OUROBOROS_IRMD_PROC_TABLE_H -#include "time.h" +#include <ouroboros/shm_flow_set.h> +  #include "utils.h"  #include <unistd.h> @@ -38,18 +39,18 @@ enum proc_state {  };  struct proc_entry { -        struct list_head   next; -        pid_t              pid; -        char *             prog;     /* program instantiated */ -        char *             daf_name; /* DAF this process belongs to */ -        struct list_head   names;    /* names for which process accepts flows */ +        struct list_head      next; +        pid_t                 pid; +        char *                prog;  /* program instantiated */ +        struct list_head      names; /* names for which process accepts flows */ +        struct shm_flow_set * set; -        struct reg_entry * re;       /* reg_entry for which a flow arrived */ +        struct reg_entry *    re;    /* reg_entry for which a flow arrived */          /* The process will block on this */ -        enum proc_state    state; -        pthread_cond_t     cond; -        pthread_mutex_t    lock; +        enum proc_state       state; +        pthread_cond_t        cond; +        pthread_mutex_t       lock;  };  struct proc_entry * proc_entry_create(pid_t  proc, diff --git a/src/lib/dev.c b/src/lib/dev.c index 42eed7e4..a2ec836f 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -357,10 +357,6 @@ static void init(int     argc,          if (ai.fqueues == NULL)                  goto fail_fqueues; -        ai.fqset = shm_flow_set_create(); -        if (ai.fqset == NULL) -                goto fail_fqset; -          ai.rdrb = shm_rdrbuff_open();          if (ai.rdrb == NULL)                  goto fail_rdrb; @@ -407,8 +403,14 @@ static void init(int     argc,          if (rxmwheel_init())                  goto fail_rxmwheel; +        ai.fqset = shm_flow_set_open(getpid()); +        if (ai.fqset == NULL) +                goto fail_fqset; +          return; + fail_fqset: +        rxmwheel_fini();   fail_rxmwheel:          pthread_rwlock_destroy(&ai.lock);   fail_lock: @@ -426,8 +428,6 @@ static void init(int     argc,   fail_flows:          shm_rdrbuff_close(ai.rdrb);   fail_rdrb: -        shm_flow_set_destroy(ai.fqset); - fail_fqset:          bmp_destroy(ai.fqueues);   fail_fqueues:          bmp_destroy(ai.fds); @@ -462,7 +462,7 @@ static void fini(void)                  }          } -        shm_flow_set_destroy(ai.fqset); +        shm_flow_set_close(ai.fqset);          for (i = 0; i < SYS_MAX_FLOWS; ++i) {                  pthread_mutex_destroy(&ai.ports[i].state_lock); diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 1c94c599..bab5d86e 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -78,31 +78,24 @@ struct shm_flow_set {          pid_t pid;  }; -struct shm_flow_set * shm_flow_set_create() +static struct shm_flow_set * flow_set_create(pid_t pid, +                                             int   flags)  {          struct shm_flow_set * set;          ssize_t *             shm_base; -        pthread_mutexattr_t   mattr; -        pthread_condattr_t    cattr;          char                  fn[FN_MAX_CHARS]; -        mode_t                mask;          int                   shm_fd; -        int                   i; -        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid()); +        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", pid);          set = malloc(sizeof(*set));          if (set == NULL) -                return NULL; +                goto fail_malloc; -        mask = umask(0); - -        shm_fd = shm_open(fn, O_CREAT | O_RDWR, 0666); +        shm_fd = shm_open(fn, flags, 0666);          if (shm_fd == -1)                  goto fail_shm_open; -        umask(mask); -          if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) {                  close(shm_fd);                  goto fail_shm_open; @@ -127,27 +120,61 @@ struct shm_flow_set * shm_flow_set_create()          set->lock    = (pthread_mutex_t *)                  (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); +        return set; + + fail_mmap: +        if (flags & O_CREAT) +                shm_unlink(fn); + fail_shm_open: +        free(set); + fail_malloc: +        return NULL; +} + +struct shm_flow_set * shm_flow_set_create(pid_t pid) +{ +        struct shm_flow_set * set; +        pthread_mutexattr_t   mattr; +        pthread_condattr_t    cattr; +        mode_t                mask; +        int                   i; + +        mask = umask(0); + +        set = flow_set_create(pid, O_CREAT | O_RDWR); + +        umask(mask); + +        if (set == NULL) +                goto fail_set; +          if (pthread_mutexattr_init(&mattr)) -                goto fail_mmap; +                goto fail_mutexattr_init;  #ifdef HAVE_ROBUST_MUTEX          if (pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST)) -                goto fail_mmap; +                goto fail_mattr_set;  #endif -        if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) || -            pthread_mutex_init(set->lock, &mattr) || -            pthread_condattr_init(&cattr) || -            pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED)) -                goto fail_mmap; +        if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) +                goto fail_mattr_set; + +        if (pthread_mutex_init(set->lock, &mattr)) +                goto fail_mattr_set; + +        if (pthread_condattr_init(&cattr)) +                goto fail_condattr_init; + +        if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED)) +                goto fail_condattr_set;  #ifndef __APPLE__          if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) -                goto fail_mmap; +                goto fail_condattr_set;  #endif          for (i = 0; i < PROG_MAX_FQUEUES; ++i) {                  set->heads[i] = 0;                  if (pthread_cond_init(&set->conds[i], &cattr)) -                        goto fail_mmap; +                        goto fail_init;          }          for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -157,84 +184,37 @@ struct shm_flow_set * shm_flow_set_create()          return set; - fail_mmap: -        shm_unlink(fn); - fail_shm_open: -        free(set); + fail_init: +        while (i-- > 0) +                pthread_cond_destroy(&set->conds[i]); + fail_condattr_set: +        pthread_condattr_destroy(&cattr); + fail_condattr_init: +        pthread_mutex_destroy(set->lock); + fail_mattr_set: +        pthread_mutexattr_destroy(&mattr); + fail_mutexattr_init: +        shm_flow_set_destroy(set); + fail_set:          return NULL;  }  struct shm_flow_set * shm_flow_set_open(pid_t pid)  { -        struct shm_flow_set * set; -        ssize_t *             shm_base; -        char                  fn[FN_MAX_CHARS]; -        int                   shm_fd; - -        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", pid); - -        set = malloc(sizeof(*set)); -        if (set == NULL) -                return NULL; - -        shm_fd = shm_open(fn, O_RDWR, 0666); -        if (shm_fd == -1) { -                free(set); -                return NULL; -        } - -        shm_base = mmap(NULL, -                        SHM_FLOW_SET_FILE_SIZE, -                        PROT_READ | PROT_WRITE, -                        MAP_SHARED, -                        shm_fd, -                        0); - -        close(shm_fd); - -        if (shm_base == MAP_FAILED) { -                shm_unlink(fn); -                free(set); -                return NULL; -        } - -        set->mtable  = shm_base; -        set->heads   = (size_t *) (set->mtable + SYS_MAX_FLOWS); -        set->conds   = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); -        set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES); -        set->lock    = (pthread_mutex_t *) -                (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); -        set->pid = pid; - -        return set; +        return flow_set_create(pid, O_RDWR);  }  void shm_flow_set_destroy(struct shm_flow_set * set)  {          char fn[25]; -        struct lockfile * lf = NULL;          assert(set); -        if (set->pid != getpid()) { -                lf = lockfile_open(); -                if (lf == NULL) -                        return; - -                if (lockfile_owner(lf) == getpid()) { -                        lockfile_close(lf); -                } else { -                        lockfile_close(lf); -                        return; -                } -        } -          sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->pid); -        munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE); -        shm_unlink(fn); +        shm_flow_set_close(set); -        free(set); +        shm_unlink(fn);  }  void shm_flow_set_close(struct shm_flow_set * set) @@ -242,7 +222,6 @@ void shm_flow_set_close(struct shm_flow_set * set)          assert(set);          munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE); -          free(set);  } | 
