summaryrefslogtreecommitdiff
path: root/src/lib/shm_flow_set.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_flow_set.c')
-rw-r--r--src/lib/shm_flow_set.c302
1 files changed, 140 insertions, 162 deletions
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
index d2107fc3..39913fd1 100644
--- a/src/lib/shm_flow_set.c
+++ b/src/lib/shm_flow_set.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2018
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Management of flow_sets for fqueue
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -24,22 +24,21 @@
#include "config.h"
+#include <ouroboros/errno.h>
#include <ouroboros/lockfile.h>
-#include <ouroboros/time_utils.h>
+#include <ouroboros/pthread.h>
#include <ouroboros/shm_flow_set.h>
-#include <ouroboros/fqueue.h>
-#include <ouroboros/errno.h>
+#include <ouroboros/time.h>
-#include <pthread.h>
-#include <sys/mman.h>
+#include <assert.h>
#include <fcntl.h>
+#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
+#include <string.h>
#include <unistd.h>
-#include <signal.h>
+#include <sys/mman.h>
#include <sys/stat.h>
-#include <string.h>
-#include <assert.h>
/*
* pthread_cond_timedwait has a WONTFIX bug as of glibc 2.25 where it
@@ -53,187 +52,166 @@
#endif
#define FN_MAX_CHARS 255
+#define FS_PROT (PROT_READ | PROT_WRITE)
-#define FQUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(int))
+#define QUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(struct flowevent))
-#define SHM_FLOW_SET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \
- + PROG_MAX_FQUEUES * sizeof(size_t) \
- + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \
- + PROG_MAX_FQUEUES * FQUEUESIZE \
- + sizeof(pthread_mutex_t))
+#define SHM_FSET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \
+ + PROG_MAX_FQUEUES * sizeof(size_t) \
+ + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \
+ + PROG_MAX_FQUEUES * QUEUESIZE \
+ + sizeof(pthread_mutex_t))
#define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx)
struct shm_flow_set {
- ssize_t * mtable;
- size_t * heads;
- pthread_cond_t * conds;
- int * fqueues;
- pthread_mutex_t * lock;
+ ssize_t * mtable;
+ size_t * heads;
+ pthread_cond_t * conds;
+ struct flowevent * fqueues;
+ pthread_mutex_t * lock;
- pid_t pid;
+ pid_t pid;
};
-struct shm_flow_set * shm_flow_set_create()
+static struct shm_flow_set * flow_set_create(pid_t pid,
+ int oflags)
{
struct shm_flow_set * set;
ssize_t * shm_base;
- pthread_mutexattr_t mattr;
- pthread_condattr_t cattr;
char fn[FN_MAX_CHARS];
- mode_t mask;
- int shm_fd;
- int i;
+ int fd;
- sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid());
+ sprintf(fn, SHM_FLOW_SET_PREFIX "%d", pid);
set = malloc(sizeof(*set));
if (set == NULL)
- return NULL;
+ goto fail_malloc;
- mask = umask(0);
+ fd = shm_open(fn, oflags, 0666);
+ if (fd == -1)
+ goto fail_shm_open;
- shm_fd = shm_open(fn, O_CREAT | O_RDWR, 0666);
- if (shm_fd == -1) {
- free(set);
- return NULL;
- }
+ if ((oflags & O_CREAT) && ftruncate(fd, SHM_FSET_FILE_SIZE) < 0)
+ goto fail_truncate;
- umask(mask);
-
- if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) {
- free(set);
- close(shm_fd);
- return NULL;
- }
+ shm_base = mmap(NULL, SHM_FSET_FILE_SIZE, FS_PROT, MAP_SHARED, fd, 0);
+ if (shm_base == MAP_FAILED)
+ goto fail_mmap;
- shm_base = mmap(NULL,
- SHM_FLOW_SET_FILE_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_SHARED,
- shm_fd,
- 0);
-
- close(shm_fd);
-
- if (shm_base == MAP_FAILED) {
- shm_unlink(fn);
- free(set);
- return NULL;
- }
+ close(fd);
set->mtable = shm_base;
set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS);
set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
- set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES);
+ set->fqueues = (struct flowevent *) (set->conds + PROG_MAX_FQUEUES);
set->lock = (pthread_mutex_t *)
(set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE));
- pthread_mutexattr_init(&mattr);
+ return set;
+
+ fail_mmap:
+ if (oflags & O_CREAT)
+ shm_unlink(fn);
+ fail_truncate:
+ close(fd);
+ fail_shm_open:
+ free(set);
+ fail_malloc:
+ return NULL;
+}
+
+struct shm_flow_set * shm_flow_set_create(pid_t pid)
+{
+ struct shm_flow_set * set;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ mode_t mask;
+ int i;
+
+ mask = umask(0);
+
+ set = flow_set_create(pid, O_CREAT | O_RDWR);
+
+ umask(mask);
+
+ if (set == NULL)
+ goto fail_set;
+
+ set->pid = pid;
+
+ if (pthread_mutexattr_init(&mattr))
+ goto fail_mutexattr_init;
+
#ifdef HAVE_ROBUST_MUTEX
- pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+ if (pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST))
+ goto fail_mattr_set;
#endif
- pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
- pthread_mutex_init(set->lock, &mattr);
+ if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED))
+ goto fail_mattr_set;
+
+ if (pthread_mutex_init(set->lock, &mattr))
+ goto fail_mattr_set;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_condattr_init;
+
+ if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED))
+ goto fail_condattr_set;
- pthread_condattr_init(&cattr);
- pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+ if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK))
+ goto fail_condattr_set;
#endif
for (i = 0; i < PROG_MAX_FQUEUES; ++i) {
set->heads[i] = 0;
- pthread_cond_init(&set->conds[i], &cattr);
+ if (pthread_cond_init(&set->conds[i], &cattr))
+ goto fail_init;
}
for (i = 0; i < SYS_MAX_FLOWS; ++i)
set->mtable[i] = -1;
- set->pid = getpid();
-
return set;
+
+ fail_init:
+ while (i-- > 0)
+ pthread_cond_destroy(&set->conds[i]);
+ fail_condattr_set:
+ pthread_condattr_destroy(&cattr);
+ fail_condattr_init:
+ pthread_mutex_destroy(set->lock);
+ fail_mattr_set:
+ pthread_mutexattr_destroy(&mattr);
+ fail_mutexattr_init:
+ shm_flow_set_destroy(set);
+ fail_set:
+ return NULL;
}
struct shm_flow_set * shm_flow_set_open(pid_t pid)
{
- struct shm_flow_set * set;
- ssize_t * shm_base;
- char fn[FN_MAX_CHARS];
- int shm_fd;
-
- sprintf(fn, SHM_FLOW_SET_PREFIX "%d", pid);
-
- set = malloc(sizeof(*set));
- if (set == NULL)
- return NULL;
-
- shm_fd = shm_open(fn, O_RDWR, 0666);
- if (shm_fd == -1) {
- free(set);
- return NULL;
- }
-
- shm_base = mmap(NULL,
- SHM_FLOW_SET_FILE_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_SHARED,
- shm_fd,
- 0);
-
- close(shm_fd);
-
- if (shm_base == MAP_FAILED) {
- shm_unlink(fn);
- free(set);
- return NULL;
- }
-
- set->mtable = shm_base;
- set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS);
- set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
- set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES);
- set->lock = (pthread_mutex_t *)
- (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE));
-
- set->pid = pid;
-
- return set;
+ return flow_set_create(pid, O_RDWR);
}
void shm_flow_set_destroy(struct shm_flow_set * set)
{
- char fn[25];
- struct lockfile * lf = NULL;
+ char fn[FN_MAX_CHARS];
assert(set);
- if (set->pid != getpid()) {
- lf = lockfile_open();
- if (lf == NULL)
- return;
-
- if (lockfile_owner(lf) == getpid()) {
- lockfile_close(lf);
- } else {
- lockfile_close(lf);
- return;
- }
- }
-
sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->pid);
- munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE);
- shm_unlink(fn);
+ shm_flow_set_close(set);
- free(set);
+ shm_unlink(fn);
}
void shm_flow_set_close(struct shm_flow_set * set)
{
assert(set);
- munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE);
-
+ munmap(set->mtable, SHM_FSET_FILE_SIZE);
free(set);
}
@@ -259,20 +237,20 @@ void shm_flow_set_zero(struct shm_flow_set * set,
int shm_flow_set_add(struct shm_flow_set * set,
size_t idx,
- int port_id)
+ int flow_id)
{
assert(set);
- assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
assert(idx < PROG_MAX_FQUEUES);
pthread_mutex_lock(set->lock);
- if (set->mtable[port_id] != -1) {
+ if (set->mtable[flow_id] != -1) {
pthread_mutex_unlock(set->lock);
return -EPERM;
}
- set->mtable[port_id] = idx;
+ set->mtable[flow_id] = idx;
pthread_mutex_unlock(set->lock);
@@ -281,33 +259,33 @@ int shm_flow_set_add(struct shm_flow_set * set,
void shm_flow_set_del(struct shm_flow_set * set,
size_t idx,
- int port_id)
+ int flow_id)
{
assert(set);
- assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
assert(idx < PROG_MAX_FQUEUES);
pthread_mutex_lock(set->lock);
- if (set->mtable[port_id] == (ssize_t) idx)
- set->mtable[port_id] = -1;
+ if (set->mtable[flow_id] == (ssize_t) idx)
+ set->mtable[flow_id] = -1;
pthread_mutex_unlock(set->lock);
}
int shm_flow_set_has(struct shm_flow_set * set,
size_t idx,
- int port_id)
+ int flow_id)
{
int ret = 0;
assert(set);
- assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
assert(idx < PROG_MAX_FQUEUES);
pthread_mutex_lock(set->lock);
- if (set->mtable[port_id] == (ssize_t) idx)
+ if (set->mtable[flow_id] == (ssize_t) idx)
ret = 1;
pthread_mutex_unlock(set->lock);
@@ -316,22 +294,30 @@ int shm_flow_set_has(struct shm_flow_set * set,
}
void shm_flow_set_notify(struct shm_flow_set * set,
- int port_id)
+ int flow_id,
+ int event)
{
+ struct flowevent * e;
+
assert(set);
- assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
pthread_mutex_lock(set->lock);
- if (set->mtable[port_id] == -1) {
+ if (set->mtable[flow_id] == -1) {
pthread_mutex_unlock(set->lock);
return;
}
- *(fqueue_ptr(set, set->mtable[port_id]) +
- (set->heads[set->mtable[port_id]])++) = port_id;
+ e = fqueue_ptr(set, set->mtable[flow_id]) +
+ set->heads[set->mtable[flow_id]];
+
+ e->flow_id = flow_id;
+ e->event = event;
+
+ ++set->heads[set->mtable[flow_id]];
- pthread_cond_signal(&set->conds[set->mtable[port_id]]);
+ pthread_cond_signal(&set->conds[set->mtable[flow_id]]);
pthread_mutex_unlock(set->lock);
}
@@ -339,7 +325,7 @@ void shm_flow_set_notify(struct shm_flow_set * set,
ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
size_t idx,
- int * fqueue,
+ struct flowevent * fqueue,
const struct timespec * abstime)
{
ssize_t ret = 0;
@@ -355,22 +341,14 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
pthread_mutex_consistent(set->lock);
#endif
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) set->lock);
+ pthread_cleanup_push(__cleanup_mutex_unlock, set->lock);
while (set->heads[idx] == 0 && ret != -ETIMEDOUT) {
- if (abstime != NULL) {
- ret = -pthread_cond_timedwait(set->conds + idx,
- set->lock,
- abstime);
+ ret = -__timedwait(set->conds + idx, set->lock, abstime);
#ifdef HAVE_CANCEL_BUG
- if (ret == -ETIMEDOUT)
- pthread_testcancel();
+ if (ret == -ETIMEDOUT)
+ pthread_testcancel();
#endif
- } else {
- ret = -pthread_cond_wait(set->conds + idx,
- set->lock);
- }
#ifdef HAVE_ROBUST_MUTEX
if (ret == -EOWNERDEAD)
pthread_mutex_consistent(set->lock);
@@ -380,7 +358,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
if (ret != -ETIMEDOUT) {
memcpy(fqueue,
fqueue_ptr(set, idx),
- set->heads[idx] * sizeof(int));
+ set->heads[idx] * sizeof(*fqueue));
ret = set->heads[idx];
set->heads[idx] = 0;
}