summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/shm_flow_set.h2
-rw-r--r--src/irmd/proc_table.c55
-rw-r--r--src/irmd/proc_table.h21
-rw-r--r--src/lib/dev.c14
-rw-r--r--src/lib/shm_flow_set.c147
5 files changed, 112 insertions, 127 deletions
diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h
index 45d372a0..77f64264 100644
--- a/include/ouroboros/shm_flow_set.h
+++ b/include/ouroboros/shm_flow_set.h
@@ -29,7 +29,7 @@
struct shm_flow_set;
-struct shm_flow_set * shm_flow_set_create(void);
+struct shm_flow_set * shm_flow_set_create(pid_t pid);
void shm_flow_set_destroy(struct shm_flow_set * set);
diff --git a/src/irmd/proc_table.c b/src/irmd/proc_table.c
index 6f9d8e20..27fbb505 100644
--- a/src/irmd/proc_table.c
+++ b/src/irmd/proc_table.c
@@ -50,41 +50,44 @@ struct proc_entry * proc_entry_create(pid_t pid,
e = malloc(sizeof(*e));
if (e == NULL)
- return NULL;
+ goto fail_malloc;
- list_head_init(&e->next);
- list_head_init(&e->names);
-
- e->pid = pid;
- e->prog = prog;
- e->daf_name = NULL;
-
- e->re = NULL;
-
- e->state = PROC_INIT;
-
- if (pthread_condattr_init(&cattr)) {
- free(e);
- return NULL;
- }
+ if (pthread_condattr_init(&cattr))
+ goto fail_condattr;
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- if (pthread_mutex_init(&e->lock, NULL)) {
- free(e);
- return NULL;
- }
+ if (pthread_mutex_init(&e->lock, NULL))
+ goto fail_mutex;
+ if (pthread_cond_init(&e->cond, &cattr))
+ goto fail_cond;
- if (pthread_cond_init(&e->cond, &cattr)) {
- pthread_mutex_destroy(&e->lock);
- free(e);
- return NULL;
- }
+ e->set = shm_flow_set_create(pid);
+ if (e->set == NULL)
+ goto fail_set;
+
+ list_head_init(&e->next);
+ list_head_init(&e->names);
+
+ e->pid = pid;
+ e->prog = prog;
+ e->re = NULL;
+ e->state = PROC_INIT;
return e;
+ fail_set:
+ pthread_cond_destroy(&e->cond);;
+ fail_cond:
+ pthread_mutex_destroy(&e->lock);
+ fail_mutex:
+ pthread_condattr_destroy(&cattr);
+ fail_condattr:
+ free(e);
+ fail_malloc:
+ return NULL;
}
static void cancel_proc_entry(void * o)
@@ -124,6 +127,8 @@ void proc_entry_destroy(struct proc_entry * e)
pthread_mutex_unlock(&e->lock);
+ shm_flow_set_destroy(e->set);
+
pthread_cond_destroy(&e->cond);
pthread_mutex_destroy(&e->lock);
diff --git a/src/irmd/proc_table.h b/src/irmd/proc_table.h
index f3ef9aff..a18b0d8c 100644
--- a/src/irmd/proc_table.h
+++ b/src/irmd/proc_table.h
@@ -23,7 +23,8 @@
#ifndef OUROBOROS_IRMD_PROC_TABLE_H
#define OUROBOROS_IRMD_PROC_TABLE_H
-#include "time.h"
+#include <ouroboros/shm_flow_set.h>
+
#include "utils.h"
#include <unistd.h>
@@ -38,18 +39,18 @@ enum proc_state {
};
struct proc_entry {
- struct list_head next;
- pid_t pid;
- char * prog; /* program instantiated */
- char * daf_name; /* DAF this process belongs to */
- struct list_head names; /* names for which process accepts flows */
+ struct list_head next;
+ pid_t pid;
+ char * prog; /* program instantiated */
+ struct list_head names; /* names for which process accepts flows */
+ struct shm_flow_set * set;
- struct reg_entry * re; /* reg_entry for which a flow arrived */
+ struct reg_entry * re; /* reg_entry for which a flow arrived */
/* The process will block on this */
- enum proc_state state;
- pthread_cond_t cond;
- pthread_mutex_t lock;
+ enum proc_state state;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
};
struct proc_entry * proc_entry_create(pid_t proc,
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 42eed7e4..a2ec836f 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -357,10 +357,6 @@ static void init(int argc,
if (ai.fqueues == NULL)
goto fail_fqueues;
- ai.fqset = shm_flow_set_create();
- if (ai.fqset == NULL)
- goto fail_fqset;
-
ai.rdrb = shm_rdrbuff_open();
if (ai.rdrb == NULL)
goto fail_rdrb;
@@ -407,8 +403,14 @@ static void init(int argc,
if (rxmwheel_init())
goto fail_rxmwheel;
+ ai.fqset = shm_flow_set_open(getpid());
+ if (ai.fqset == NULL)
+ goto fail_fqset;
+
return;
+ fail_fqset:
+ rxmwheel_fini();
fail_rxmwheel:
pthread_rwlock_destroy(&ai.lock);
fail_lock:
@@ -426,8 +428,6 @@ static void init(int argc,
fail_flows:
shm_rdrbuff_close(ai.rdrb);
fail_rdrb:
- shm_flow_set_destroy(ai.fqset);
- fail_fqset:
bmp_destroy(ai.fqueues);
fail_fqueues:
bmp_destroy(ai.fds);
@@ -462,7 +462,7 @@ static void fini(void)
}
}
- shm_flow_set_destroy(ai.fqset);
+ shm_flow_set_close(ai.fqset);
for (i = 0; i < SYS_MAX_FLOWS; ++i) {
pthread_mutex_destroy(&ai.ports[i].state_lock);
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
index 1c94c599..bab5d86e 100644
--- a/src/lib/shm_flow_set.c
+++ b/src/lib/shm_flow_set.c
@@ -78,31 +78,24 @@ struct shm_flow_set {
pid_t pid;
};
-struct shm_flow_set * shm_flow_set_create()
+static struct shm_flow_set * flow_set_create(pid_t pid,
+ int flags)
{
struct shm_flow_set * set;
ssize_t * shm_base;
- pthread_mutexattr_t mattr;
- pthread_condattr_t cattr;
char fn[FN_MAX_CHARS];
- mode_t mask;
int shm_fd;
- int i;
- sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid());
+ sprintf(fn, SHM_FLOW_SET_PREFIX "%d", pid);
set = malloc(sizeof(*set));
if (set == NULL)
- return NULL;
+ goto fail_malloc;
- mask = umask(0);
-
- shm_fd = shm_open(fn, O_CREAT | O_RDWR, 0666);
+ shm_fd = shm_open(fn, flags, 0666);
if (shm_fd == -1)
goto fail_shm_open;
- umask(mask);
-
if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) {
close(shm_fd);
goto fail_shm_open;
@@ -127,27 +120,61 @@ struct shm_flow_set * shm_flow_set_create()
set->lock = (pthread_mutex_t *)
(set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE));
+ return set;
+
+ fail_mmap:
+ if (flags & O_CREAT)
+ shm_unlink(fn);
+ fail_shm_open:
+ free(set);
+ fail_malloc:
+ return NULL;
+}
+
+struct shm_flow_set * shm_flow_set_create(pid_t pid)
+{
+ struct shm_flow_set * set;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ mode_t mask;
+ int i;
+
+ mask = umask(0);
+
+ set = flow_set_create(pid, O_CREAT | O_RDWR);
+
+ umask(mask);
+
+ if (set == NULL)
+ goto fail_set;
+
if (pthread_mutexattr_init(&mattr))
- goto fail_mmap;
+ goto fail_mutexattr_init;
#ifdef HAVE_ROBUST_MUTEX
if (pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST))
- goto fail_mmap;
+ goto fail_mattr_set;
#endif
- if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) ||
- pthread_mutex_init(set->lock, &mattr) ||
- pthread_condattr_init(&cattr) ||
- pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED))
- goto fail_mmap;
+ if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED))
+ goto fail_mattr_set;
+
+ if (pthread_mutex_init(set->lock, &mattr))
+ goto fail_mattr_set;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_condattr_init;
+
+ if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED))
+ goto fail_condattr_set;
#ifndef __APPLE__
if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK))
- goto fail_mmap;
+ goto fail_condattr_set;
#endif
for (i = 0; i < PROG_MAX_FQUEUES; ++i) {
set->heads[i] = 0;
if (pthread_cond_init(&set->conds[i], &cattr))
- goto fail_mmap;
+ goto fail_init;
}
for (i = 0; i < SYS_MAX_FLOWS; ++i)
@@ -157,84 +184,37 @@ struct shm_flow_set * shm_flow_set_create()
return set;
- fail_mmap:
- shm_unlink(fn);
- fail_shm_open:
- free(set);
+ fail_init:
+ while (i-- > 0)
+ pthread_cond_destroy(&set->conds[i]);
+ fail_condattr_set:
+ pthread_condattr_destroy(&cattr);
+ fail_condattr_init:
+ pthread_mutex_destroy(set->lock);
+ fail_mattr_set:
+ pthread_mutexattr_destroy(&mattr);
+ fail_mutexattr_init:
+ shm_flow_set_destroy(set);
+ fail_set:
return NULL;
}
struct shm_flow_set * shm_flow_set_open(pid_t pid)
{
- struct shm_flow_set * set;
- ssize_t * shm_base;
- char fn[FN_MAX_CHARS];
- int shm_fd;
-
- sprintf(fn, SHM_FLOW_SET_PREFIX "%d", pid);
-
- set = malloc(sizeof(*set));
- if (set == NULL)
- return NULL;
-
- shm_fd = shm_open(fn, O_RDWR, 0666);
- if (shm_fd == -1) {
- free(set);
- return NULL;
- }
-
- shm_base = mmap(NULL,
- SHM_FLOW_SET_FILE_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_SHARED,
- shm_fd,
- 0);
-
- close(shm_fd);
-
- if (shm_base == MAP_FAILED) {
- shm_unlink(fn);
- free(set);
- return NULL;
- }
-
- set->mtable = shm_base;
- set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS);
- set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
- set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES);
- set->lock = (pthread_mutex_t *)
- (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE));
- set->pid = pid;
-
- return set;
+ return flow_set_create(pid, O_RDWR);
}
void shm_flow_set_destroy(struct shm_flow_set * set)
{
char fn[25];
- struct lockfile * lf = NULL;
assert(set);
- if (set->pid != getpid()) {
- lf = lockfile_open();
- if (lf == NULL)
- return;
-
- if (lockfile_owner(lf) == getpid()) {
- lockfile_close(lf);
- } else {
- lockfile_close(lf);
- return;
- }
- }
-
sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->pid);
- munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE);
- shm_unlink(fn);
+ shm_flow_set_close(set);
- free(set);
+ shm_unlink(fn);
}
void shm_flow_set_close(struct shm_flow_set * set)
@@ -242,7 +222,6 @@ void shm_flow_set_close(struct shm_flow_set * set)
assert(set);
munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE);
-
free(set);
}