diff options
author | Sander Vrijders <sander.vrijders@ugent.be> | 2017-09-19 14:53:11 +0000 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-19 14:53:11 +0000 |
commit | 115431af51795dfd583e24a051a7749c58a900b3 (patch) | |
tree | a5817c5bd030b8a07713dcaa7dde95edbd0392d2 /src | |
parent | 669a8d4edfcc0fb2a7cd6f93e0ad2d0de820371a (diff) | |
parent | 541b1c5eeb5fe9f9aaa4aa6487852e5c59761139 (diff) | |
download | ouroboros-115431af51795dfd583e24a051a7749c58a900b3.tar.gz ouroboros-115431af51795dfd583e24a051a7749c58a900b3.zip |
Merged in sandervrijders/ouroboros/be-flow-down (pull request #596)
ipcpd, lib: Add flow down events
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/local/main.c | 18 | ||||
-rw-r--r-- | src/ipcpd/normal/connmgr.h | 14 | ||||
-rw-r--r-- | src/ipcpd/normal/dt.c | 12 | ||||
-rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 207 | ||||
-rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/lib/dev.c | 86 | ||||
-rw-r--r-- | src/lib/shm_rbuff_ll.c | 30 | ||||
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 52 |
8 files changed, 316 insertions, 105 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index fb4e312b..7f44a2c6 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -99,30 +99,28 @@ static void * ipcp_local_sdu_loop(void * o) (void) o; - while (true) { - int fd; + while (ipcp_get_state() == IPCP_OPERATIONAL) { + int fd; ssize_t idx; - if (ipcp_get_state() != IPCP_OPERATIONAL) - return (void *) 1; /* -ENOTENROLLED */ - fevent(local_data.flows, local_data.fq, &timeout); while ((fd = fqueue_next(local_data.fq)) >= 0) { - pthread_rwlock_rdlock(&local_data.lock); - idx = local_flow_read(fd); + if (idx < 0) + continue; assert(idx < (SHM_BUFFER_SIZE)); + pthread_rwlock_rdlock(&local_data.lock); + fd = local_data.in_out[fd]; + pthread_rwlock_unlock(&local_data.lock); + if (fd != -1) local_flow_write(fd, idx); - - pthread_rwlock_unlock(&local_data.lock); } - } return (void *) 0; diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h index ca5288ae..a8edee7d 100644 --- a/src/ipcpd/normal/connmgr.h +++ b/src/ipcpd/normal/connmgr.h @@ -28,12 +28,14 @@ #include "ae.h" -#define NOTIFY_DT_CONN_ADD 0x00D0 -#define NOTIFY_DT_CONN_DEL 0x00D1 -#define NOTIFY_DT_CONN_QOS 0x00D2 - -#define NOTIFY_MGMT_CONN_ADD 0x00F0 -#define NOTIFY_MGMT_CONN_DEL 0x00F1 +#define NOTIFY_DT_CONN_ADD 0x00D0 +#define NOTIFY_DT_CONN_DEL 0x00D1 +#define NOTIFY_DT_CONN_QOS 0x00D2 +#define NOTIFY_DT_CONN_DOWN 0x00D3 + +#define NOTIFY_MGMT_CONN_ADD 0x00F0 +#define NOTIFY_MGMT_CONN_DEL 0x00F1 +#define NOTIFY_MGMT_CONN_DOWN 0x00F2 int connmgr_init(void); diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 2df17163..56cb5a61 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -93,6 +93,7 @@ static void sdu_handler(int fd, struct shm_du_buff * sdb) { struct dt_pci dt_pci; + int ret; memset(&dt_pci, 0, sizeof(dt_pci)); @@ -112,8 +113,11 @@ static void sdu_handler(int fd, return; } - if (ipcp_flow_write(fd, sdb)) { + ret = ipcp_flow_write(fd, sdb); + if (ret < 0) { log_err("Failed to write SDU to fd %d.", fd); + if (ret == -EFLOWDOWN) + notifier_event(NOTIFY_DT_CONN_DOWN, &fd); ipcp_sdb_release(sdb); return; } @@ -323,6 +327,7 @@ int dt_write_sdu(uint64_t dst_addr, { int fd; struct dt_pci dt_pci; + int ret; assert(sdb); assert(dst_addr != ipcpi.dt_addr); @@ -342,8 +347,11 @@ int dt_write_sdu(uint64_t dst_addr, return -1; } - if (ipcp_flow_write(fd, sdb)) { + ret = ipcp_flow_write(fd, sdb); + if (ret < 0) { log_err("Failed to write SDU to fd %d.", fd); + if (ret == -EFLOWDOWN) + notifier_event(NOTIFY_DT_CONN_DOWN, &fd); return -1; } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 20734d6e..36d101e5 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -43,6 +43,7 @@ #include <ouroboros/fqueue.h> #include <ouroboros/logs.h> #include <ouroboros/time_utils.h> +#include <ouroboros/fccntl.h> #include "ipcp.h" #include "shim-data.h" @@ -64,6 +65,8 @@ #ifdef __linux__ #include <linux/if_packet.h> #include <linux/if_ether.h> +#include <linux/netlink.h> +#include <linux/rtnetlink.h> #endif #ifdef __FreeBSD__ @@ -154,6 +157,10 @@ struct { pthread_t sdu_writer; pthread_t sdu_reader; +#ifdef __linux__ + pthread_t if_monitor; +#endif + /* Handle mgmt frames in a different thread */ pthread_t mgmt_handler; pthread_mutex_t mgmt_lock; @@ -771,6 +778,114 @@ static void * eth_llc_ipcp_sdu_writer(void * o) return (void *) 1; } +#ifdef __linux__ +static int open_netlink_socket(void) +{ + struct sockaddr_nl sa; + int fd; + + memset(&sa, 0, sizeof(sa)); + sa.nl_family = AF_NETLINK; + sa.nl_pid = getpid(); + sa.nl_groups = RTMGRP_LINK; + + fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); + if (fd < 0) + return -1; + + if (bind(fd, (struct sockaddr *) &sa, sizeof(sa))) { + close(fd); + return -1; + } + + return fd; +} + +static void change_flows_state(bool up) +{ + int i; + uint32_t flags; + + pthread_rwlock_rdlock(ð_llc_data.flows_lock); + + for (i = 0; i < MAX_SAPS; i++) { + if (eth_llc_data.ef_to_fd[i] != -1) { + fccntl(i, FLOWGFLAGS, &flags); + if (up) + fccntl(eth_llc_data.ef_to_fd[i], + FLOWSFLAGS, flags & ~FLOWFDOWN); + else + fccntl(eth_llc_data.ef_to_fd[i], + FLOWSFLAGS, flags | FLOWFDOWN); + } + } + + pthread_rwlock_unlock(ð_llc_data.flows_lock); +} + +static void * eth_llc_ipcp_if_monitor(void * o) +{ + int fd; + int status; + char buf[4096]; + struct iovec iov = {buf, sizeof(buf)}; + struct sockaddr_nl snl; + struct msghdr msg = {(void *) &snl, sizeof(snl), + &iov, 1, NULL, 0, 0}; + struct nlmsghdr * h; + struct ifinfomsg * ifi; + + (void ) o; + + fd = open_netlink_socket(); + if (fd < 0) { + log_err("Failed to open socket."); + return (void *) -1; + } + + while (ipcp_get_state() == IPCP_OPERATIONAL) { + status = recvmsg(fd, &msg, 0); + if (status < 0) + continue; + + for (h = (struct nlmsghdr *) buf; + NLMSG_OK(h, (unsigned int) status); + h = NLMSG_NEXT(h, status)) { + + /* Finish reading */ + if (h->nlmsg_type == NLMSG_DONE) + break; + + /* Message is some kind of error */ + if (h->nlmsg_type == NLMSG_ERROR) + continue; + + /* Only interested in link up/down */ + if (h->nlmsg_type != RTM_NEWLINK) + continue; + + ifi = NLMSG_DATA(h); + + /* Not our interface */ + if (ifi->ifi_index != eth_llc_data.device.sll_ifindex) + continue; + + if (ifi->ifi_flags & IFF_UP) { + change_flows_state(true); + log_dbg("Interface up."); + } else { + change_flows_state(false); + log_dbg("Interface down."); + } + } + } + + close(fd); + + return (void *) 0; +} +#endif + #if defined (HAVE_BPF) && !defined(HAVE_NETMAP) static int open_bpf_device(void) { @@ -844,13 +959,13 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf) break; } + freeifaddrs(ifaddr); + if (ifa == NULL) { log_err("Interface not found."); - freeifaddrs(ifaddr); return -1; } - freeifaddrs(ifaddr); #elif defined(__linux__) skfd = socket(AF_UNIX, SOCK_STREAM, 0); if (skfd < 0) { @@ -904,38 +1019,32 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf) if (BPF_BLEN < blen) { log_err("BPF buffer too small (is: %ld must be: %d).", BPF_BLEN, blen); - close(eth_llc_data.bpf); - return -1; + goto fail_device; } if (ioctl(eth_llc_data.bpf, BIOCSETIF, &ifr) < 0) { log_err("Failed to set interface."); - close(eth_llc_data.bpf); - return -1; + goto fail_device; } if (ioctl(eth_llc_data.bpf, BIOCSHDRCMPLT, &enable) < 0) { log_err("Failed to set BIOCSHDRCMPLT."); - close(eth_llc_data.bpf); - return -1; + goto fail_device; } if (ioctl(eth_llc_data.bpf, BIOCSSEESENT, &disable) < 0) { log_err("Failed to set BIOCSSEESENT."); - close(eth_llc_data.bpf); - return -1; + goto fail_device; } if (ioctl(eth_llc_data.bpf, BIOCSRTIMEOUT, &tv) < 0) { log_err("Failed to set BIOCSRTIMEOUT."); - close(eth_llc_data.bpf); - return -1; + goto fail_device; } if (ioctl(eth_llc_data.bpf, BIOCIMMEDIATE, &enable) < 0) { log_err("Failed to set BIOCIMMEDIATE."); - close(eth_llc_data.bpf); - return -1; + goto fail_device; } log_info("Using Berkeley Packet Filter."); @@ -958,39 +1067,74 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf) if (bind(eth_llc_data.s_fd, (struct sockaddr *) ð_llc_data.device, sizeof(eth_llc_data.device))) { log_err("Failed to bind socket to interface"); - close(eth_llc_data.s_fd); - return -1; + goto fail_device; } if (setsockopt(eth_llc_data.s_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) { log_err("Failed to set socket timeout: %s.", strerror(errno)); - close(eth_llc_data.s_fd); - return -1; + goto fail_device; } #endif /* HAVE_NETMAP */ ipcp_set_state(IPCP_OPERATIONAL); - pthread_create(ð_llc_data.mgmt_handler, - NULL, - eth_llc_ipcp_mgmt_handler, - NULL); +#ifdef __linux__ + if (pthread_create(ð_llc_data.if_monitor, + NULL, + eth_llc_ipcp_if_monitor, + NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_device; + } +#endif + + if (pthread_create(ð_llc_data.mgmt_handler, + NULL, + eth_llc_ipcp_mgmt_handler, + NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_mgmt_handler; + } - pthread_create(ð_llc_data.sdu_reader, - NULL, - eth_llc_ipcp_sdu_reader, - NULL); + if (pthread_create(ð_llc_data.sdu_reader, + NULL, + eth_llc_ipcp_sdu_reader, + NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_sdu_reader; + } - pthread_create(ð_llc_data.sdu_writer, - NULL, - eth_llc_ipcp_sdu_writer, - NULL); + if (pthread_create(ð_llc_data.sdu_writer, + NULL, + eth_llc_ipcp_sdu_writer, + NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_sdu_writer; + } log_dbg("Bootstrapped shim IPCP over Ethernet with LLC with api %d.", getpid()); return 0; + + fail_sdu_writer: + pthread_join(eth_llc_data.sdu_reader, NULL); + fail_sdu_reader: + pthread_join(eth_llc_data.mgmt_handler, NULL); + fail_mgmt_handler: +#ifdef __linux__ + pthread_join(eth_llc_data.if_monitor, NULL); +#endif + fail_device: +#if defined(HAVE_NETMAP) + nm_close(eth_llc_data.nmd); +#elif defined(HAVE_BPF) + close(eth_llc_data.bpf); +#elif defined(HAVE_RAW_SOCKETS) + close(eth_llc_data.s_fd); +#endif + return -1; } static int eth_llc_ipcp_reg(const uint8_t * hash) @@ -1228,6 +1372,9 @@ int main(int argc, pthread_join(eth_llc_data.sdu_writer, NULL); pthread_join(eth_llc_data.sdu_reader, NULL); pthread_join(eth_llc_data.mgmt_handler, NULL); +#ifdef __linux__ + pthread_join(eth_llc_data.if_monitor, NULL); +#endif } eth_llc_data_fini(); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 26cecb44..29ca4db2 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -163,6 +163,8 @@ set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL "Name for the main POSIX shared memory buffer") set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING "SDU buffer block size, multiple of pagesize for performance") +set(SHM_RBUFF_LOCKLESS 1 CACHE BOOL + "Enable shared memory rbuff lockless support") set(SOURCE_FILES # Add source files here diff --git a/src/lib/dev.c b/src/lib/dev.c index 14ee31f4..b945968d 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -46,10 +46,10 @@ #include <stdio.h> #include <stdarg.h> -#define BUF_SIZE 1500 +#define BUF_SIZE 1500 -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ +#define TW_ELEMENTS 6000 +#define TW_RESOLUTION 1 /* ms */ #define MPL 2000 /* ms */ #define RQ_SIZE 20 @@ -249,12 +249,15 @@ static int api_announce(char * ap_name) static int finalize_write(int fd, size_t idx) { - if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) - return -ENOTALLOC; + int ret; + + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret < 0) + return ret; shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - return 0; + return ret; } static int frcti_init(int fd) @@ -305,6 +308,7 @@ static int frcti_send(int fd, { struct timespec now = {0, 0}; struct frcti * frcti; + int ret; frcti = &(ai.frcti[fd]); @@ -331,9 +335,10 @@ static int frcti_send(int fd, return -1; } - if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { + ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); + if (ret < 0) { pthread_rwlock_unlock(&frcti->lock); - return -ENOTALLOC; + return ret; } pthread_rwlock_unlock(&frcti->lock); @@ -871,12 +876,6 @@ int flow_alloc(const char * dst_name, pthread_rwlock_unlock(&ai.lock); - if (frcti_configure(fd, FRCTFORDERING | FRCTFERRCHCK)) { - flow_fini(fd); - bmp_release(ai.fds, fd); - return -1; - } - return fd; } @@ -931,6 +930,8 @@ int fccntl(int fd, va_list l; struct timespec * timeo; qosspec_t * qs; + uint32_t rx_acl; + uint32_t tx_acl; if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; @@ -988,10 +989,28 @@ int fccntl(int fd, break; case FLOWSFLAGS: ai.flows[fd].oflags = va_arg(l, uint32_t); + rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); + tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); + /* + * Making our own flow write only means making the + * the other side of the flow read only. + */ if (ai.flows[fd].oflags & FLOWFWRONLY) - shm_rbuff_block(ai.flows[fd].rx_rb); + rx_acl |= ACL_RDONLY; if (ai.flows[fd].oflags & FLOWFRDWR) - shm_rbuff_unblock(ai.flows[fd].rx_rb); + rx_acl |= ACL_RDWR; + + if (ai.flows[fd].oflags & FLOWFDOWN) { + rx_acl |= ACL_FLOWDOWN; + tx_acl |= ACL_FLOWDOWN; + } else { + rx_acl &= ~ACL_FLOWDOWN; + tx_acl &= ~ACL_FLOWDOWN; + } + + shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl); + shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl); + break; case FLOWGFLAGS: fflags = va_arg(l, uint32_t *); @@ -1007,6 +1026,8 @@ int fccntl(int fd, if (cflags == NULL) goto einval; *cflags = ai.frcti[fd].conf_flags; + if (frcti_configure(fd, ai.frcti[fd].conf_flags)) + goto eperm; break; default: pthread_rwlock_unlock(&ai.lock); @@ -1036,6 +1057,7 @@ ssize_t flow_write(int fd, size_t count) { ssize_t idx; + int ret; if (buf == NULL) return 0; @@ -1079,19 +1101,21 @@ ssize_t flow_write(int fd, } if (!ai.frcti[fd].used) { - if (finalize_write(fd, idx)) { + ret = finalize_write(fd, idx); + if (ret < 0) { pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); - return -ENOTALLOC; + return ret; } pthread_rwlock_unlock(&ai.lock); } else { pthread_rwlock_unlock(&ai.lock); - if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { + ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx)); + if (ret < 0) { shm_rdrbuff_remove(ai.rdrb, idx); - return -1; + return ret; } } @@ -1129,7 +1153,8 @@ ssize_t flow_read(int fd, idx = frcti_read(fd); if (idx < 0) { - assert(idx == -EAGAIN || idx == -ETIMEDOUT); + assert(idx == -EAGAIN || idx == -ETIMEDOUT || + idx == -EFLOWDOWN); return idx; } @@ -1509,6 +1534,8 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { + int ret; + if (sdb == NULL) return -EINVAL; @@ -1527,17 +1554,19 @@ int ipcp_flow_write(int fd, assert(ai.flows[fd].tx_rb); if (!ai.frcti[fd].used) { - if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { + ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); + if (ret < 0) { pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; + return ret; } pthread_rwlock_unlock(&ai.lock); } else { pthread_rwlock_unlock(&ai.lock); - if (frcti_write(fd, sdb)) - return -1; + ret = frcti_write(fd, sdb); + if (ret < 0) + return ret; } return 0; @@ -1618,6 +1647,8 @@ ssize_t local_flow_read(int fd) int local_flow_write(int fd, size_t idx) { + int ret; + if (fd < 0) return -EINVAL; @@ -1628,9 +1659,10 @@ int local_flow_write(int fd, return -ENOTALLOC; } - if (finalize_write(fd, idx)) { + ret = finalize_write(fd, idx); + if (ret < 0) { pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; + return ret; } pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index ec0199c0..b77c4374 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -26,6 +26,7 @@ #include <ouroboros/lockfile.h> #include <ouroboros/time_utils.h> #include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> #include <pthread.h> #include <sys/mman.h> @@ -41,8 +42,6 @@ #include <stdbool.h> #define FN_MAX_CHARS 255 -#define RB_OPEN 0 -#define RB_CLOSED 1 #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \ + 3 * sizeof(size_t) \ @@ -141,7 +140,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) pthread_cond_init(rb->add, &cattr); pthread_cond_init(rb->del, &cattr); - *rb->acl = RB_OPEN; + *rb->acl = ACL_RDWR; *rb->head = 0; *rb->tail = 0; @@ -227,14 +226,17 @@ int shm_rbuff_write(struct shm_rbuff * rb, { size_t ohead; size_t nhead; - - bool was_empty = false; + bool was_empty = false; assert(rb); assert(idx < SHM_BUFFER_SIZE); - if (__sync_fetch_and_add(rb->acl, 0)) /* CLOSED */ - return -ENOTALLOC; + if (__sync_fetch_and_add(rb->acl, 0) != ACL_RDWR) { + if (__sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN) + return -EFLOWDOWN; + else if (__sync_fetch_and_add(rb->acl, 0) & ACL_RDONLY) + return -ENOTALLOC; + } if (!shm_rbuff_free(rb)) return -EAGAIN; @@ -266,7 +268,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) assert(rb); if (shm_rbuff_empty(rb)) - return -EAGAIN; + return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ? + -EFLOWDOWN : -EAGAIN; ntail = RB_TAIL; @@ -327,26 +330,25 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, return idx; } -void shm_rbuff_block(struct shm_rbuff * rb) +void shm_rbuff_set_acl(struct shm_rbuff * rb, + uint32_t flags) { assert(rb); - __sync_bool_compare_and_swap(rb->acl, RB_OPEN, RB_CLOSED); + __sync_bool_compare_and_swap(rb->acl, *rb->acl, flags); } -void shm_rbuff_unblock(struct shm_rbuff * rb) +uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) { assert(rb); - __sync_bool_compare_and_swap(rb->acl, RB_CLOSED, RB_OPEN); + return __sync_fetch_and_add(rb->acl, 0); } void shm_rbuff_fini(struct shm_rbuff * rb) { assert(rb); - assert(__sync_fetch_and_add(rb->acl, 0) == RB_CLOSED); - if (shm_rbuff_empty(rb)) return; diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 9567762f..fb183d8f 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -26,6 +26,7 @@ #include <ouroboros/lockfile.h> #include <ouroboros/time_utils.h> #include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> #include <pthread.h> #include <sys/mman.h> @@ -41,8 +42,6 @@ #include <stdbool.h> #define FN_MAX_CHARS 255 -#define RB_OPEN 0 -#define RB_CLOSED 1 #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \ + 3 * sizeof(size_t) \ @@ -138,7 +137,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) pthread_cond_init(rb->add, &cattr); pthread_cond_init(rb->del, &cattr); - *rb->acl = RB_OPEN; + *rb->acl = ACL_RDWR; *rb->head = 0; *rb->tail = 0; @@ -226,8 +225,11 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) free(rb); } -int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) +int shm_rbuff_write(struct shm_rbuff * rb, + size_t idx) { + int ret = 0; + assert(rb); assert(idx < SHM_BUFFER_SIZE); @@ -237,14 +239,18 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - if (*rb->acl == RB_CLOSED) { - pthread_mutex_unlock(rb->lock); - return -ENOTALLOC; + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; } if (!shm_rbuff_free(rb)) { - pthread_mutex_unlock(rb->lock); - return -EAGAIN; + ret = -EAGAIN; + goto err; } if (shm_rbuff_empty(rb)) @@ -256,6 +262,9 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) pthread_mutex_unlock(rb->lock); return 0; + err: + pthread_mutex_unlock(rb->lock); + return ret; } ssize_t shm_rbuff_read(struct shm_rbuff * rb) @@ -270,9 +279,11 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif + if (shm_rbuff_empty(rb)) { + ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN; pthread_mutex_unlock(rb->lock); - return -EAGAIN; + return ret; } ret = *tail_el_ptr(rb); @@ -297,6 +308,12 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif + + if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) { + pthread_mutex_unlock(rb->lock); + return -EFLOWDOWN; + } + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); @@ -324,7 +341,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, return idx; } -void shm_rbuff_block(struct shm_rbuff * rb) +void shm_rbuff_set_acl(struct shm_rbuff * rb, + uint32_t flags) { assert(rb); @@ -334,13 +352,15 @@ void shm_rbuff_block(struct shm_rbuff * rb) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - *rb->acl = RB_CLOSED; + *rb->acl = (size_t) flags; pthread_mutex_unlock(rb->lock); } -void shm_rbuff_unblock(struct shm_rbuff * rb) +uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) { + uint32_t flags; + assert(rb); #ifndef HAVE_ROBUST_MUTEX @@ -349,9 +369,11 @@ void shm_rbuff_unblock(struct shm_rbuff * rb) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - *rb->acl = RB_OPEN; + flags = (uint32_t) *rb->acl; pthread_mutex_unlock(rb->lock); + + return flags; } void shm_rbuff_fini(struct shm_rbuff * rb) @@ -364,8 +386,6 @@ void shm_rbuff_fini(struct shm_rbuff * rb) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - assert(*rb->acl == RB_CLOSED); - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); |