diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/CMakeLists.txt | 338 | ||||
| -rw-r--r-- | src/lib/config.h.in | 1 | ||||
| -rw-r--r-- | src/lib/shm_rbuff.c | 288 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_ll.c | 249 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_pthr.c | 304 | ||||
| -rw-r--r-- | src/lib/tests/CMakeLists.txt | 2 |
6 files changed, 266 insertions, 916 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt deleted file mode 100644 index 14e89976..00000000 --- a/src/lib/CMakeLists.txt +++ /dev/null @@ -1,338 +0,0 @@ -include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -include_directories(${CMAKE_CURRENT_BINARY_DIR}) - -include_directories(${CMAKE_SOURCE_DIR}/include) -include_directories(${CMAKE_BINARY_DIR}/include) - -protobuf_generate_c(MODEL_PROTO_SRCS MODEL_PROTO_HDRS - pb/model.proto) -protobuf_generate_c(IPCP_CONFIG_PROTO_SRCS IPCP_CONFIG_PROTO_HDRS - pb/ipcp_config.proto) -protobuf_generate_c(ENROLL_PROTO_SRCS ENROLL_PROTO_HDRS - pb/enroll.proto) -protobuf_generate_c(CEP_PROTO_SRCS CEP_PROTO_HDRS - pb/cep.proto) -protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS - pb/irm.proto) -protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS - pb/ipcp.proto) - -if (NOT APPLE) - find_library(LIBRT_LIBRARIES rt) - if (NOT LIBRT_LIBRARIES) - message(FATAL_ERROR "Could not find librt") - endif () -else () - set(LIBRT_LIBRARIES "") -endif () - -find_library(LIBPTHREAD_LIBRARIES pthread) -if (NOT LIBPTHREAD_LIBRARIES) - message(FATAL_ERROR "Could not find libpthread") -endif () - -include(CheckSymbolExists) -list(APPEND CMAKE_REQUIRED_DEFINITIONS -D_POSIX_C_SOURCE=200809L) -list(APPEND CMAKE_REQUIRED_DEFINITIONS -D__XSI_VISIBLE=500) -list(APPEND CMAKE_REQUIRED_LIBRARIES pthread) -check_symbol_exists(pthread_mutexattr_setrobust pthread.h HAVE_ROBUST_MUTEX) - -if (HAVE_ROBUST_MUTEX) - set(DISABLE_ROBUST_MUTEXES FALSE CACHE BOOL "Disable robust mutex support") - if (NOT DISABLE_ROBUST_MUTEXES) - message(STATUS "Robust mutex support enabled") - set(HAVE_ROBUST_MUTEX TRUE) - else () - message(STATUS "Robust mutex support disabled by user") - unset(HAVE_ROBUST_MUTEX) - endif () -else() - message(STATUS "Robust mutex support not available") - unset(HAVE_ROBUST_MUTEX) -endif () - -find_library(FUSE_LIBRARIES fuse QUIET) -if (FUSE_LIBRARIES) - #FIXME: Check for version >= 2.6 - set(DISABLE_FUSE FALSE CACHE BOOL "Disable FUSE support") - if (NOT DISABLE_FUSE) - message(STATUS "FUSE support enabled") - set(FUSE_PREFIX "/tmp/ouroboros" CACHE STRING - "Mountpoint for RIB filesystem") - set(HAVE_FUSE TRUE CACHE INTERNAL "") - else () - message(STATUS "FUSE support disabled by user") - unset(HAVE_FUSE CACHE) - endif () -else () - message(STATUS "Install FUSE version > 2.6 to enable RIB access") -endif () - -if (NOT HAVE_FUSE) - set(FUSE_LIBRARIES "") - set(FUSE_INCLUDE_DIR "") -endif () - -mark_as_advanced(FUSE_LIBRARIES) - -find_library(LIBGCRYPT_LIBRARIES gcrypt QUIET) -if (LIBGCRYPT_LIBRARIES) - find_path(LIBGCRYPT_INCLUDE_DIR gcrypt.h - HINTS /usr/include /usr/local/include) - if (LIBGCRYPT_INCLUDE_DIR) - file(STRINGS ${LIBGCRYPT_INCLUDE_DIR}/gcrypt.h GCSTR - REGEX "^#define GCRYPT_VERSION ") - string(REGEX REPLACE "^#define GCRYPT_VERSION \"(.*)\".*$" "\\1" - GCVER "${GCSTR}") - if (NOT GCVER VERSION_LESS "1.7.0") - set(DISABLE_LIBGCRYPT FALSE CACHE BOOL "Disable libgcrypt support") - if (NOT DISABLE_LIBGCRYPT) - message(STATUS "libgcrypt support enabled") - set(HAVE_LIBGCRYPT TRUE CACHE INTERNAL "") - else () - message(STATUS "libgcrypt support disabled by user") - unset(HAVE_LIBGCRYPT CACHE) - endif() - else () - message(STATUS "Install version >= \"1.7.0\" to enable libgcrypt support " - "(found version \"${GCVER}\")") - endif() - endif () -endif () - -if (NOT HAVE_LIBGCRYPT) - set(LIBGCRYPT_LIBRARIES "") - set(LIBGCRYPT_INCLUDE_DIR "") -endif () - -find_package(OpenSSL QUIET) -if (OPENSSL_FOUND) - set(HAVE_OPENSSL_RNG TRUE) - if (OPENSSL_VERSION VERSION_LESS "1.1.0") - message(STATUS "Install version >= \"1.1.0\" to enable OpenSSL support " - "(found version \"${OPENSSL_VERSION}\")") - else () - set(DISABLE_OPENSSL FALSE CACHE BOOL "Disable OpenSSL support") - if (NOT DISABLE_OPENSSL) - message(STATUS "OpenSSL support enabled") - set(HAVE_OPENSSL TRUE CACHE INTERNAL "") - else() - message(STATUS "OpenSSL support disabled") - unset(HAVE_OPENSSL) - endif() - endif () - set(OPENSSL_SOURCES crypt/openssl.c) -else() - message(STATUS "Install openSSL version >= \"1.1.0\" to enable OpenSSL support") - unset(HAVE_OPENSSL_RNG) - unset(HAVE_OPENSSL) - set(OPENSSL_INCLUDE_DIR "") - set(OPENSSL_LIBRARIES "") - set(OPENSSL_CRYPTO_LIBRARY "") - set(OPENSSL_SOURCES "") -endif () - -if (APPLE OR CMAKE_SYSTEM_NAME STREQUAL "FreeBSD") - set(SYS_RND_HDR "") -else () - find_path(SYS_RND_HDR sys/random.h PATH /usr/include/ /usr/local/include/) - if (SYS_RND_HDR) - message(STATUS "Found sys/random.h in ${SYS_RND_HDR}") - set(HAVE_SYS_RANDOM TRUE) - else () - set(SYS_RND_HDR "") - unset(HAVE_SYS_RANDOM) - endif () -endif() - -if (NOT ((CMAKE_SYSTEM_NAME STREQUAL "FreeBSD") OR APPLE OR - HAVE_SYS_RANDOM OR HAVE_OPENSSL_RNG OR HAVE_LIBGCRYPT)) - message(FATAL_ERROR "No secure random generator found, " - "please install libgcrypt (> 1.7.0) or OpenSSL") -endif () - -mark_as_advanced(LIBRT_LIBRARIES LIBPTHREAD_LIBRARIES - LIBGCRYPT_LIBRARIES OPENSSL_LIBRARIES OPENSSL_CRYPTO_LIBRARY - SYS_RND_INCLUDE_DIR LIBGCRYPT_INCLUDE_DIR SYS_RND_HDR) - -set(SHM_BUFFER_SIZE 16384 CACHE STRING - "Number of blocks in packet buffer, must be a power of 2") -set(SHM_RBUFF_SIZE 1024 CACHE STRING - "Number of blocks in rbuff buffer, must be a power of 2") -set(SYS_MAX_FLOWS 10240 CACHE STRING - "Maximum number of total flows for this system") -set(PROG_MAX_FLOWS 4096 CACHE STRING - "Maximum number of flows in an application") -set(PROG_RES_FDS 64 CACHE STRING - "Number of reserved flow descriptors per application") -set(PROG_MAX_FQUEUES 32 CACHE STRING - "Maximum number of flow sets per application") -set(DU_BUFF_HEADSPACE 256 CACHE STRING - "Bytes of headspace to reserve for future headers") -set(DU_BUFF_TAILSPACE 32 CACHE STRING - "Bytes of tailspace to reserve for future tails") -if (NOT APPLE) - set(PTHREAD_COND_CLOCK "CLOCK_MONOTONIC" CACHE STRING - "Clock to use for condition variable timing") -else () - set(PTHREAD_COND_CLOCK "CLOCK_REALTIME" CACHE INTERNAL - "Clock to use for condition variable timing") -endif () -set(SOCKET_TIMEOUT 500 CACHE STRING - "Default timeout for responses from IPCPs (ms)") -set(SHM_PREFIX "ouroboros" CACHE STRING - "String to prepend to POSIX shared memory filenames") -set(SHM_RBUFF_PREFIX "/${SHM_PREFIX}.rbuff." CACHE INTERNAL - "Prefix for rbuff POSIX shared memory filenames") -set(SHM_LOCKFILE_NAME "/${SHM_PREFIX}.lockfile" CACHE INTERNAL - "Filename for the POSIX shared memory lockfile") -set(SHM_FLOW_SET_PREFIX "/${SHM_PREFIX}.set." CACHE INTERNAL - "Prefix for the POSIX shared memory flow set") -set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL - "Name for the main POSIX shared memory buffer") -set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING - "Packet buffer block size, multiple of pagesize for performance") -set(SHM_RDRB_MULTI_BLOCK TRUE CACHE BOOL - "Packet buffer multiblock packet support") -set(SHM_RBUFF_LOCKLESS FALSE CACHE BOOL - "Enable shared memory lockless rbuff support") -set(QOS_DISABLE_CRC TRUE CACHE BOOL - "Ignores ber setting on all QoS cubes") -set(DELTA_T_MPL 60 CACHE STRING - "Maximum packet lifetime (s)") -set(DELTA_T_ACK 10 CACHE STRING - "Maximum time to acknowledge a packet (s)") -set(DELTA_T_RTX 120 CACHE STRING - "Maximum time to retransmit a packet (s)") -set(FRCT_REORDER_QUEUE_SIZE 256 CACHE STRING - "Size of the reordering queue, must be a power of 2") -set(FRCT_START_WINDOW 64 CACHE STRING - "Start window, must be a power of 2") -set(FRCT_LINUX_RTT_ESTIMATOR TRUE CACHE BOOL - "Use Linux RTT estimator formula instead of the TCP RFC formula") -set(FRCT_RTO_MDEV_MULTIPLIER 2 CACHE STRING - "Multiplier for deviation term in the RTO: RTO = sRTT + (mdev << X)") -set(FRCT_RTO_INC_FACTOR 0 CACHE STRING - "Divisor for RTO increase after timeout: RTO += RTX >> X, 0: Karn/Partridge") -set(FRCT_RTO_MIN 250 CACHE STRING - "Minimum Retransmission Timeout (RTO) for FRCT (us)") -set(FRCT_TICK_TIME 5000 CACHE STRING - "Tick time for FRCT activity (retransmission, acknowledgments) (us)") -set(RXM_BUFFER_ON_HEAP FALSE CACHE BOOL - "Store packets for retransmission on the heap instead of in packet buffer") -set(RXM_BLOCKING TRUE CACHE BOOL - "Use blocking writes for retransmission") -set(RXM_MIN_RESOLUTION 20 CACHE STRING - "Minimum retransmission delay (ns), as a power to 2") -set(RXM_WHEEL_MULTIPLIER 4 CACHE STRING - "Factor for retransmission wheel levels as a power to 2") -set(RXM_WHEEL_LEVELS 3 CACHE STRING - "Number of levels in the retransmission wheel") -set(RXM_WHEEL_SLOTS_PER_LEVEL 256 CACHE STRING - "Number of slots per level in the retransmission wheel, must be a power of 2") -set(ACK_WHEEL_SLOTS 256 CACHE STRING - "Number of slots in the acknowledgment wheel, must be a power of 2") -set(ACK_WHEEL_RESOLUTION 18 CACHE STRING - "Minimum acknowledgment delay (ns), as a power to 2") -set(TPM_DEBUG_REPORT_INTERVAL 0 CACHE STRING - "Interval at wich the TPM will report long running threads (s), 0 disables") -set(TPM_DEBUG_ABORT_TIMEOUT 0 CACHE STRING - "TPM abort process after a thread reaches this timeout (s), 0 disables") - -if (HAVE_FUSE) - set(PROC_FLOW_STATS TRUE CACHE BOOL - "Enable flow statistics tracking for application flows") - if (PROC_FLOW_STATS) - message(STATUS "Application flow statistics enabled") - else () - message(STATUS "Application flow statistics disabled") - endif () -endif () - -set(SOURCE_FILES_DEV - # Add source files here - cep.c - dev.c - ) - -set(SOURCE_FILES_IRM - irm.c -) - -set(SOURCE_FILES_COMMON - bitmap.c - btree.c - crc32.c - crypt.c - hash.c - list.c - lockfile.c - logs.c - md5.c - notifier.c - protobuf.c - qoscube.c - random.c - rib.c - serdes-irm.c - serdes-oep.c - sha3.c - shm_flow_set.c - shm_rbuff.c - shm_rdrbuff.c - sockets.c - tpm.c - utils.c -) - -configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in" - "${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY) - -add_library(ouroboros-common SHARED ${SOURCE_FILES_COMMON} ${IRM_PROTO_SRCS} - ${IPCP_PROTO_SRCS} ${IPCP_CONFIG_PROTO_SRCS} ${MODEL_PROTO_SRCS} - ${ENROLL_PROTO_SRCS} ${OPENSSL_SOURCES}) - -add_library(ouroboros-dev SHARED ${SOURCE_FILES_DEV} ${CEP_PROTO_SRCS}) - -add_library(ouroboros-irm SHARED ${SOURCE_FILES_IRM}) - -set_target_properties(ouroboros-common PROPERTIES - VERSION ${PACKAGE_VERSION} - SOVERSION ${PACKAGE_VERSION_MAJOR}.${PACKAGE_VERSION_MINOR}) -set_target_properties(ouroboros-dev PROPERTIES - VERSION ${PACKAGE_VERSION} - SOVERSION ${PACKAGE_VERSION_MAJOR}.${PACKAGE_VERSION_MINOR}) -set_target_properties(ouroboros-irm PROPERTIES - VERSION ${PACKAGE_VERSION} - SOVERSION ${PACKAGE_VERSION_MAJOR}.${PACKAGE_VERSION_MINOR}) - -include(AddCompileFlags) -if (CMAKE_BUILD_TYPE MATCHES "Debug*") - add_compile_flags(ouroboros-common -DCONFIG_OUROBOROS_DEBUG) - add_compile_flags(ouroboros-dev -DCONFIG_OUROBOROS_DEBUG) - add_compile_flags(ouroboros-irm -DCONFIG_OUROBOROS_DEBUG) -endif () - -target_link_libraries(ouroboros-common ${LIBRT_LIBRARIES} - ${LIBPTHREAD_LIBRARIES} ${PROTOBUF_C_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} - ${LIBGCRYPT_LIBRARIES} ${FUSE_LIBRARIES}) - -target_link_libraries(ouroboros-dev ouroboros-common) -target_link_libraries(ouroboros-irm ouroboros-common) - -install(TARGETS ouroboros-common LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}) -install(TARGETS ouroboros-dev LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}) -install(TARGETS ouroboros-irm LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}) - -target_include_directories(ouroboros-common PUBLIC ${CMAKE_CURRENT_BINARY_DIR} - ${SYS_RND_HDR} ${LIBGCRYPT_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR}) - -target_include_directories(ouroboros-dev PUBLIC ${CMAKE_CURRENT_BINARY_DIR} - ${SYS_RND_HDR} ${LIBGCRYPT_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR}) - -target_include_directories(ouroboros-irm PUBLIC ${CMAKE_CURRENT_BINARY_DIR} - ${SYS_RND_HDR} ${LIBGCRYPT_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR}) - -if(BUILD_TESTS) - add_subdirectory(tests) -endif () diff --git a/src/lib/config.h.in b/src/lib/config.h.in index 8326a332..4533b00e 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -30,7 +30,6 @@ #define SYS_MAX_FLOWS @SYS_MAX_FLOWS@ -#cmakedefine SHM_RBUFF_LOCKLESS #cmakedefine SHM_RDRB_MULTI_BLOCK #cmakedefine QOS_DISABLE_CRC #cmakedefine HAVE_OPENSSL_RNG diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 22cff41c..ec3bd152 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -45,30 +45,59 @@ #define FN_MAX_CHARS 255 -#define SHM_RB_FILE_SIZE ((SHM_RBUFF_SIZE) * sizeof(ssize_t) \ +#define SHM_RBUFF_FILESIZE ((SHM_RBUFF_SIZE) * sizeof(ssize_t) \ + 3 * sizeof(size_t) \ + sizeof(pthread_mutex_t) \ - + 2 * sizeof (pthread_cond_t)) - -#define shm_rbuff_used(rb) ((*rb->head + (SHM_RBUFF_SIZE) - *rb->tail) \ - & ((SHM_RBUFF_SIZE) - 1)) -#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < (SHM_RBUFF_SIZE)) -#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) + + 2 * sizeof(pthread_cond_t)) + +#define HEAD(rb) \ + *(rb->shm_base + *rb->head) +#define TAIL(rb) \ + *(rb->shm_base + *rb->tail) +#define ADVANCE(el) \ + (*(el) = (*(el) + 1) & ((SHM_RBUFF_SIZE) - 1)) +#define QUEUED(rb) \ + ((*rb->head - *rb->tail + (SHM_RBUFF_SIZE)) & (SHM_RBUFF_SIZE - 1)) +#define IS_FULL(rb) \ + (QUEUED(rb) == (SHM_RBUFF_SIZE) - 1) +#define IS_EMPTY(rb) \ + (*rb->head == *rb->tail) struct shm_rbuff { ssize_t * shm_base; /* start of entry */ size_t * head; /* start of ringbuffer head */ size_t * tail; /* start of ringbuffer tail */ size_t * acl; /* access control */ - pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_mutex_t * mtx; /* lock all space in shm */ pthread_cond_t * add; /* packet arrived */ pthread_cond_t * del; /* packet removed */ pid_t pid; /* pid of the owner */ int flow_id; /* flow_id of the flow */ }; +static void robust_mutex_lock(pthread_mutex_t * mtx) +{ +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(mtx); +#else + if (pthread_mutex_lock(mtx) == EOWNERDEAD) + pthread_mutex_consistent(mtx); +#endif +} + +static int robust_wait(pthread_cond_t * cond, + pthread_mutex_t * mtx, + const struct timespec * abstime) +{ + int ret = __timedwait(cond, mtx, abstime); +#ifdef HAVE_ROBUST_MUTEX + if (ret == EOWNERDEAD) + pthread_mutex_consistent(mtx); +#endif + return ret; +} + + #define MM_FLAGS (PROT_READ | PROT_WRITE) static struct shm_rbuff * rbuff_create(pid_t pid, @@ -90,10 +119,10 @@ static struct shm_rbuff * rbuff_create(pid_t pid, if (fd == -1) goto fail_open; - if ((flags & O_CREAT) && ftruncate(fd, SHM_RB_FILE_SIZE) < 0) + if ((flags & O_CREAT) && ftruncate(fd, SHM_RBUFF_FILESIZE) < 0) goto fail_truncate; - shm_base = mmap(NULL, SHM_RB_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0); + shm_base = mmap(NULL, SHM_RBUFF_FILESIZE, MM_FLAGS, MAP_SHARED, fd, 0); if (shm_base == MAP_FAILED) goto fail_truncate; @@ -103,8 +132,8 @@ static struct shm_rbuff * rbuff_create(pid_t pid, rb->head = (size_t *) (rb->shm_base + (SHM_RBUFF_SIZE)); rb->tail = rb->head + 1; rb->acl = rb->tail + 1; - rb->lock = (pthread_mutex_t *) (rb->acl + 1); - rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->mtx = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->mtx + 1); rb->del = rb->add + 1; rb->pid = pid; rb->flow_id = flow_id; @@ -123,7 +152,7 @@ static struct shm_rbuff * rbuff_create(pid_t pid, static void rbuff_destroy(struct shm_rbuff * rb) { - munmap(rb->shm_base, SHM_RB_FILE_SIZE); + munmap(rb->shm_base, SHM_RBUFF_FILESIZE); free(rb); } @@ -152,7 +181,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, #ifdef HAVE_ROBUST_MUTEX pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); #endif - if (pthread_mutex_init(rb->lock, &mattr)) + if (pthread_mutex_init(rb->mtx, &mattr)) goto fail_mutex; if (pthread_condattr_init(&cattr)) @@ -185,7 +214,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, fail_add: pthread_condattr_destroy(&cattr); fail_cattr: - pthread_mutex_destroy(rb->lock); + pthread_mutex_destroy(rb->mtx); fail_mutex: pthread_mutexattr_destroy(&mattr); fail_mattr: @@ -194,6 +223,19 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, return NULL; } +void shm_rbuff_destroy(struct shm_rbuff * rb) +{ + char fn[FN_MAX_CHARS]; + + assert(rb != NULL); + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); + + shm_rbuff_close(rb); + + shm_unlink(fn); +} + struct shm_rbuff * shm_rbuff_open(pid_t pid, int flow_id) { @@ -207,9 +249,209 @@ void shm_rbuff_close(struct shm_rbuff * rb) rbuff_destroy(rb); } -#if (defined(SHM_RBUFF_LOCKLESS) && \ - (defined(__GNUC__) || defined (__clang__))) -#include "shm_rbuff_ll.c" -#else -#include "shm_rbuff_pthr.c" -#endif +int shm_rbuff_write(struct shm_rbuff * rb, + size_t idx) +{ + int ret = 0; + + assert(rb != NULL); + assert(idx < SHM_BUFFER_SIZE); + + robust_mutex_lock(rb->mtx); + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; + } + + if (IS_FULL(rb)) { + ret = -EAGAIN; + goto err; + } + + if (IS_EMPTY(rb)) + pthread_cond_broadcast(rb->add); + + HEAD(rb) = (ssize_t) idx; + ADVANCE(rb->head); + + pthread_mutex_unlock(rb->mtx); + + return 0; + err: + pthread_mutex_unlock(rb->mtx); + return ret; +} + +int shm_rbuff_write_b(struct shm_rbuff * rb, + size_t idx, + const struct timespec * abstime) +{ + int ret = 0; + + assert(rb != NULL); + assert(idx < SHM_BUFFER_SIZE); + + robust_mutex_lock(rb->mtx); + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); + + while (IS_FULL(rb) + && ret != -ETIMEDOUT + && !(*rb->acl & ACL_FLOWDOWN)) { + ret = -robust_wait(rb->del, rb->mtx, abstime); + } + + if (ret != -ETIMEDOUT) { + if (IS_EMPTY(rb)) + pthread_cond_broadcast(rb->add); + HEAD(rb) = (ssize_t) idx; + ADVANCE(rb->head); + } + + pthread_cleanup_pop(true); + + return ret; + err: + pthread_mutex_unlock(rb->mtx); + return ret; +} + +static int check_rb_acl(struct shm_rbuff * rb) +{ + assert(rb != NULL); + + if (*rb->acl & ACL_FLOWDOWN) + return -EFLOWDOWN; + + if (*rb->acl & ACL_FLOWPEER) + return -EFLOWPEER; + + return -EAGAIN; +} + +ssize_t shm_rbuff_read(struct shm_rbuff * rb) +{ + ssize_t ret = 0; + + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + + if (IS_EMPTY(rb)) { + ret = check_rb_acl(rb); + pthread_mutex_unlock(rb->mtx); + return ret; + } + + ret = TAIL(rb); + ADVANCE(rb->tail); + pthread_cond_broadcast(rb->del); + + pthread_mutex_unlock(rb->mtx); + + return ret; +} + +ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, + const struct timespec * abstime) +{ + ssize_t idx = -1; + + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + + if (IS_EMPTY(rb) && (*rb->acl & ACL_FLOWDOWN)) { + pthread_mutex_unlock(rb->mtx); + return -EFLOWDOWN; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); + + while (IS_EMPTY(rb) && + idx != -ETIMEDOUT && + check_rb_acl(rb) == -EAGAIN) { + idx = -robust_wait(rb->add, rb->mtx, abstime); + } + + if (!IS_EMPTY(rb)) { + idx = TAIL(rb); + ADVANCE(rb->tail); + pthread_cond_broadcast(rb->del); + } else if (idx != -ETIMEDOUT) { + idx = check_rb_acl(rb); + } + + pthread_cleanup_pop(true); + + assert(idx != -EAGAIN); + + return idx; +} + +void shm_rbuff_set_acl(struct shm_rbuff * rb, + uint32_t flags) +{ + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + *rb->acl = (size_t) flags; + + pthread_mutex_unlock(rb->mtx); +} + +uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) +{ + uint32_t flags; + + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + + flags = (uint32_t) *rb->acl; + + pthread_mutex_unlock(rb->mtx); + + return flags; +} + +void shm_rbuff_fini(struct shm_rbuff * rb) +{ + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); + + while (!IS_EMPTY(rb)) + robust_wait(rb->del, rb->mtx, NULL); + + pthread_cleanup_pop(true); +} + +size_t shm_rbuff_queued(struct shm_rbuff * rb) +{ + size_t ret; + + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + + ret = QUEUED(rb); + + pthread_mutex_unlock(rb->mtx); + + return ret; +} diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c deleted file mode 100644 index 46a5314e..00000000 --- a/src/lib/shm_rbuff_ll.c +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2024 - * - * Lockless ring buffer for incoming packets - * - * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define RB_HEAD __sync_fetch_and_add(rb->head, 0) -#define RB_TAIL __sync_fetch_and_add(rb->tail, 0) - -void shm_rbuff_destroy(struct shm_rbuff * rb) -{ - char fn[FN_MAX_CHARS]; - - assert(rb); - - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); - - __sync_bool_compare_and_swap(rb->acl, *rb->acl, ACL_FLOWDOWN); - - pthread_cond_broadcast(rb->del); - pthread_cond_broadcast(rb->add); - - shm_rbuff_close(rb); - - shm_unlink(fn); -} - -int shm_rbuff_write(struct shm_rbuff * rb, - size_t idx) -{ - size_t ohead; - size_t nhead; - bool was_empty = false; - - assert(rb); - assert(idx < SHM_BUFFER_SIZE); - - if (__sync_fetch_and_add(rb->acl, 0) != ACL_RDWR) { - if (__sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN) - return -EFLOWDOWN; - else if (__sync_fetch_and_add(rb->acl, 0) & ACL_RDONLY) - return -ENOTALLOC; - } - - if (!shm_rbuff_free(rb)) - return -EAGAIN; - - if (shm_rbuff_empty(rb)) - was_empty = true; - - nhead = RB_HEAD; - - *(rb->shm_base + nhead) = (ssize_t) idx; - - do { - ohead = nhead; - nhead = (ohead + 1) & ((SHM_RBUFF_SIZE) - 1); - nhead = __sync_val_compare_and_swap(rb->head, ohead, nhead); - } while (nhead != ohead); - - if (was_empty) - pthread_cond_broadcast(rb->add); - - return 0; -} - -/* FIXME: this is a copy of the pthr implementation */ -int shm_rbuff_write_b(struct shm_rbuff * rb, - size_t idx, - const struct timespec * abstime) -{ - int ret = 0; - - assert(rb); - assert(idx < SHM_BUFFER_SIZE); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - - if (*rb->acl != ACL_RDWR) { - if (*rb->acl & ACL_FLOWDOWN) - ret = -EFLOWDOWN; - else if (*rb->acl & ACL_RDONLY) - ret = -ENOTALLOC; - goto err; - } - - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); - - while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { - ret = -__timedwait(rb->add, rb->lock, abstime); -#ifdef HAVE_ROBUST_MUTEX - if (ret == -EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - } - - if (shm_rbuff_empty(rb)) - pthread_cond_broadcast(rb->add); - - if (ret != -ETIMEDOUT) { - *head_el_ptr(rb) = (ssize_t) idx; - *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) -1); - } - - pthread_cleanup_pop(true); - - return ret; - err: - pthread_mutex_unlock(rb->lock); - return ret; -} - -ssize_t shm_rbuff_read(struct shm_rbuff * rb) -{ - size_t otail; - size_t ntail; - - assert(rb); - - if (shm_rbuff_empty(rb)) { - if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN) - return -EFLOWDOWN; - - if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWPEER) - return -EFLOWPEER; - - return -EAGAIN; - } - - ntail = RB_TAIL; - - do { - otail = ntail; - ntail = (otail + 1) & ((SHM_RBUFF_SIZE) - 1); - ntail = __sync_val_compare_and_swap(rb->tail, otail, ntail); - } while (ntail != otail); - - pthread_cond_broadcast(rb->del); - - return *(rb->shm_base + ntail); -} - -ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, - const struct timespec * abstime) -{ - ssize_t idx = -1; - - assert(rb); - - /* try a non-blocking read first */ - idx = shm_rbuff_read(rb); - if (idx != -EAGAIN) - return idx; - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); - - while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { - idx = -__timedwait(rb->add, rb->lock, abstime); -#ifdef HAVE_ROBUST_MUTEX - if (idx == -EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - } - - if (idx != -ETIMEDOUT) { - /* do a nonblocking read */ - idx = shm_rbuff_read(rb); - assert(idx >= 0); - } - - pthread_cleanup_pop(true); - - return idx; -} - -void shm_rbuff_set_acl(struct shm_rbuff * rb, - uint32_t flags) -{ - assert(rb); - - __sync_bool_compare_and_swap(rb->acl, *rb->acl, flags); -} - -uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) -{ - assert(rb); - - return __sync_fetch_and_add(rb->acl, 0); -} - -void shm_rbuff_fini(struct shm_rbuff * rb) -{ - assert(rb); - - if (shm_rbuff_empty(rb)) - return; - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); - - while (!shm_rbuff_empty(rb)) -#ifndef HAVE_ROBUST_MUTEX - pthread_cond_wait(rb->del, rb->lock); -#else - if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - pthread_cleanup_pop(true); -} - -size_t shm_rbuff_queued(struct shm_rbuff * rb) -{ - assert(rb); - - return shm_rbuff_used(rb); -} diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c deleted file mode 100644 index b543fb07..00000000 --- a/src/lib/shm_rbuff_pthr.c +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2024 - * - * Ring buffer for incoming packets - * - * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -void shm_rbuff_destroy(struct shm_rbuff * rb) -{ - char fn[FN_MAX_CHARS]; - - assert(rb != NULL); - -#ifdef CONFIG_OUROBOROS_DEBUG - pthread_mutex_lock(rb->lock); - - *rb->acl = *rb->acl & ACL_FLOWDOWN; - - pthread_cond_broadcast(rb->del); - pthread_cond_broadcast(rb->add); - - pthread_mutex_unlock(rb->lock); -#endif - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); - - shm_rbuff_close(rb); - - shm_unlink(fn); -} - -int shm_rbuff_write(struct shm_rbuff * rb, - size_t idx) -{ - int ret = 0; - - assert(rb != NULL); - assert(idx < SHM_BUFFER_SIZE); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - - if (*rb->acl != ACL_RDWR) { - if (*rb->acl & ACL_FLOWDOWN) - ret = -EFLOWDOWN; - else if (*rb->acl & ACL_RDONLY) - ret = -ENOTALLOC; - goto err; - } - - if (!shm_rbuff_free(rb)) { - ret = -EAGAIN; - goto err; - } - - if (shm_rbuff_empty(rb)) - pthread_cond_broadcast(rb->add); - - *head_el_ptr(rb) = (ssize_t) idx; - *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); - - pthread_mutex_unlock(rb->lock); - - return 0; - err: - pthread_mutex_unlock(rb->lock); - return ret; -} - -int shm_rbuff_write_b(struct shm_rbuff * rb, - size_t idx, - const struct timespec * abstime) -{ - int ret = 0; - - assert(rb != NULL); - assert(idx < SHM_BUFFER_SIZE); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - - if (*rb->acl != ACL_RDWR) { - if (*rb->acl & ACL_FLOWDOWN) - ret = -EFLOWDOWN; - else if (*rb->acl & ACL_RDONLY) - ret = -ENOTALLOC; - goto err; - } - - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); - - while (!shm_rbuff_free(rb) - && ret != -ETIMEDOUT - && !(*rb->acl & ACL_FLOWDOWN)) { - ret = -__timedwait(rb->del, rb->lock, abstime); -#ifdef HAVE_ROBUST_MUTEX - if (ret == -EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - } - - if (ret != -ETIMEDOUT) { - if (shm_rbuff_empty(rb)) - pthread_cond_broadcast(rb->add); - *head_el_ptr(rb) = (ssize_t) idx; - *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); - } - - pthread_cleanup_pop(true); - - return ret; - err: - pthread_mutex_unlock(rb->lock); - return ret; -} - -static int check_rb_acl(struct shm_rbuff * rb) -{ - assert(rb != NULL); - - if (*rb->acl & ACL_FLOWDOWN) - return -EFLOWDOWN; - - if (*rb->acl & ACL_FLOWPEER) - return -EFLOWPEER; - - return -EAGAIN; -} - -ssize_t shm_rbuff_read(struct shm_rbuff * rb) -{ - ssize_t ret = 0; - - assert(rb != NULL); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - - if (shm_rbuff_empty(rb)) { - ret = check_rb_acl(rb); - pthread_mutex_unlock(rb->lock); - return ret; - } - - ret = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); - pthread_cond_broadcast(rb->del); - - pthread_mutex_unlock(rb->lock); - - return ret; -} - -ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, - const struct timespec * abstime) -{ - ssize_t idx = -1; - - assert(rb != NULL); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - - if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) { - pthread_mutex_unlock(rb->lock); - return -EFLOWDOWN; - } - - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); - - while (shm_rbuff_empty(rb) && - idx != -ETIMEDOUT && - check_rb_acl(rb) == -EAGAIN) { - idx = -__timedwait(rb->add, rb->lock, abstime); -#ifdef HAVE_ROBUST_MUTEX - if (idx == -EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - } - - if (!shm_rbuff_empty(rb)) { - idx = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); - pthread_cond_broadcast(rb->del); - } else if (idx != -ETIMEDOUT) { - idx = check_rb_acl(rb); - } - - pthread_cleanup_pop(true); - - assert(idx != -EAGAIN); - - return idx; -} - -void shm_rbuff_set_acl(struct shm_rbuff * rb, - uint32_t flags) -{ - assert(rb != NULL); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - *rb->acl = (size_t) flags; - - pthread_cond_broadcast(rb->del); - pthread_cond_broadcast(rb->add); - - pthread_mutex_unlock(rb->lock); -} - -uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) -{ - uint32_t flags; - - assert(rb != NULL); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - flags = (uint32_t) *rb->acl; - - pthread_mutex_unlock(rb->lock); - - return flags; -} - -void shm_rbuff_fini(struct shm_rbuff * rb) -{ - assert(rb != NULL); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); - - while (!shm_rbuff_empty(rb)) -#ifndef HAVE_ROBUST_MUTEX - pthread_cond_wait(rb->del, rb->lock); -#else - if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - pthread_cleanup_pop(true); -} - -size_t shm_rbuff_queued(struct shm_rbuff * rb) -{ - size_t ret; - - assert(rb != NULL); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - - ret = shm_rbuff_used(rb); - - pthread_mutex_unlock(rb->lock); - - return ret; -} diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 752d9065..8af8e9dd 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -32,7 +32,7 @@ endif() foreach (test ${tests_to_run}) get_filename_component(test_name ${test} NAME_WE) - add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) + add_test(${test_name} ${CMAKE_CURRENT_BINARY_DIR}/${PARENT_DIR}_test ${test_name}) endforeach (test) set_property(TEST auth_test PROPERTY SKIP_RETURN_CODE 1) |
