summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/man/flow_read.33
-rw-r--r--include/ouroboros/errno.h1
-rw-r--r--include/ouroboros/fccntl.h2
-rw-r--r--include/ouroboros/shm_rbuff.h11
-rw-r--r--src/ipcpd/local/main.c18
-rw-r--r--src/ipcpd/normal/connmgr.h14
-rw-r--r--src/ipcpd/normal/dt.c12
-rw-r--r--src/ipcpd/shim-eth-llc/main.c207
-rw-r--r--src/lib/CMakeLists.txt2
-rw-r--r--src/lib/dev.c86
-rw-r--r--src/lib/shm_rbuff_ll.c30
-rw-r--r--src/lib/shm_rbuff_pthr.c52
12 files changed, 331 insertions, 107 deletions
diff --git a/doc/man/flow_read.3 b/doc/man/flow_read.3
index f4f94e67..36a506a6 100644
--- a/doc/man/flow_read.3
+++ b/doc/man/flow_read.3
@@ -50,6 +50,9 @@ Invalid flow descriptor passed.
.B -ENOTALLOC
The flow was not allocated.
+.B -EFLOWDOWN
+The flow has been reported down.
+
.SH ATTRIBUTES
For an explanation of the terms used in this section, see \fBattributes\fR(7).
diff --git a/include/ouroboros/errno.h b/include/ouroboros/errno.h
index 7cd58aa5..1710f7f8 100644
--- a/include/ouroboros/errno.h
+++ b/include/ouroboros/errno.h
@@ -30,5 +30,6 @@
#define EIRMD 1002 /* Failed to communicate with IRMD */
#define EIPCP 1003 /* Failed to communicate with IPCP */
#define EIPCPSTATE 1004 /* Target in wrong state */
+#define EFLOWDOWN 1005 /* Flow is down */
#endif /* OUROBOROS_ERRNO_H */
diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h
index d938439f..3bd09e65 100644
--- a/include/ouroboros/fccntl.h
+++ b/include/ouroboros/fccntl.h
@@ -33,6 +33,8 @@
#define FLOWFRDWR 00000002 /* Read-write flow */
#define FLOWFACCMODE 00000003 /* Access mask */
+#define FLOWFDOWN 00000004 /* Flow is down */
+
#define FLOWFNONBLOCK 00004000 /* Non-blocking flow */
#define FLOWFDEFAULT 00000002 /* Default, blocking, rw */
diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h
index 55d03b41..af50aada 100644
--- a/include/ouroboros/shm_rbuff.h
+++ b/include/ouroboros/shm_rbuff.h
@@ -26,6 +26,12 @@
#include <sys/types.h>
#include <sys/time.h>
+#include <stdint.h>
+
+#define ACL_RDWR 0000
+#define ACL_RDONLY 0001
+#define ACL_FLOWDOWN 0002
+
struct shm_rbuff;
struct shm_rbuff * shm_rbuff_create(pid_t api,
@@ -38,9 +44,10 @@ void shm_rbuff_close(struct shm_rbuff * rb);
void shm_rbuff_destroy(struct shm_rbuff * rb);
-void shm_rbuff_block(struct shm_rbuff * rb);
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags);
-void shm_rbuff_unblock(struct shm_rbuff * rb);
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb);
void shm_rbuff_fini(struct shm_rbuff * rb);
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index fb4e312b..7f44a2c6 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -99,30 +99,28 @@ static void * ipcp_local_sdu_loop(void * o)
(void) o;
- while (true) {
- int fd;
+ while (ipcp_get_state() == IPCP_OPERATIONAL) {
+ int fd;
ssize_t idx;
- if (ipcp_get_state() != IPCP_OPERATIONAL)
- return (void *) 1; /* -ENOTENROLLED */
-
fevent(local_data.flows, local_data.fq, &timeout);
while ((fd = fqueue_next(local_data.fq)) >= 0) {
- pthread_rwlock_rdlock(&local_data.lock);
-
idx = local_flow_read(fd);
+ if (idx < 0)
+ continue;
assert(idx < (SHM_BUFFER_SIZE));
+ pthread_rwlock_rdlock(&local_data.lock);
+
fd = local_data.in_out[fd];
+ pthread_rwlock_unlock(&local_data.lock);
+
if (fd != -1)
local_flow_write(fd, idx);
-
- pthread_rwlock_unlock(&local_data.lock);
}
-
}
return (void *) 0;
diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h
index ca5288ae..a8edee7d 100644
--- a/src/ipcpd/normal/connmgr.h
+++ b/src/ipcpd/normal/connmgr.h
@@ -28,12 +28,14 @@
#include "ae.h"
-#define NOTIFY_DT_CONN_ADD 0x00D0
-#define NOTIFY_DT_CONN_DEL 0x00D1
-#define NOTIFY_DT_CONN_QOS 0x00D2
-
-#define NOTIFY_MGMT_CONN_ADD 0x00F0
-#define NOTIFY_MGMT_CONN_DEL 0x00F1
+#define NOTIFY_DT_CONN_ADD 0x00D0
+#define NOTIFY_DT_CONN_DEL 0x00D1
+#define NOTIFY_DT_CONN_QOS 0x00D2
+#define NOTIFY_DT_CONN_DOWN 0x00D3
+
+#define NOTIFY_MGMT_CONN_ADD 0x00F0
+#define NOTIFY_MGMT_CONN_DEL 0x00F1
+#define NOTIFY_MGMT_CONN_DOWN 0x00F2
int connmgr_init(void);
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 2df17163..56cb5a61 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -93,6 +93,7 @@ static void sdu_handler(int fd,
struct shm_du_buff * sdb)
{
struct dt_pci dt_pci;
+ int ret;
memset(&dt_pci, 0, sizeof(dt_pci));
@@ -112,8 +113,11 @@ static void sdu_handler(int fd,
return;
}
- if (ipcp_flow_write(fd, sdb)) {
+ ret = ipcp_flow_write(fd, sdb);
+ if (ret < 0) {
log_err("Failed to write SDU to fd %d.", fd);
+ if (ret == -EFLOWDOWN)
+ notifier_event(NOTIFY_DT_CONN_DOWN, &fd);
ipcp_sdb_release(sdb);
return;
}
@@ -323,6 +327,7 @@ int dt_write_sdu(uint64_t dst_addr,
{
int fd;
struct dt_pci dt_pci;
+ int ret;
assert(sdb);
assert(dst_addr != ipcpi.dt_addr);
@@ -342,8 +347,11 @@ int dt_write_sdu(uint64_t dst_addr,
return -1;
}
- if (ipcp_flow_write(fd, sdb)) {
+ ret = ipcp_flow_write(fd, sdb);
+ if (ret < 0) {
log_err("Failed to write SDU to fd %d.", fd);
+ if (ret == -EFLOWDOWN)
+ notifier_event(NOTIFY_DT_CONN_DOWN, &fd);
return -1;
}
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 20734d6e..36d101e5 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -43,6 +43,7 @@
#include <ouroboros/fqueue.h>
#include <ouroboros/logs.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/fccntl.h>
#include "ipcp.h"
#include "shim-data.h"
@@ -64,6 +65,8 @@
#ifdef __linux__
#include <linux/if_packet.h>
#include <linux/if_ether.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
#endif
#ifdef __FreeBSD__
@@ -154,6 +157,10 @@ struct {
pthread_t sdu_writer;
pthread_t sdu_reader;
+#ifdef __linux__
+ pthread_t if_monitor;
+#endif
+
/* Handle mgmt frames in a different thread */
pthread_t mgmt_handler;
pthread_mutex_t mgmt_lock;
@@ -771,6 +778,114 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
return (void *) 1;
}
+#ifdef __linux__
+static int open_netlink_socket(void)
+{
+ struct sockaddr_nl sa;
+ int fd;
+
+ memset(&sa, 0, sizeof(sa));
+ sa.nl_family = AF_NETLINK;
+ sa.nl_pid = getpid();
+ sa.nl_groups = RTMGRP_LINK;
+
+ fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
+ if (fd < 0)
+ return -1;
+
+ if (bind(fd, (struct sockaddr *) &sa, sizeof(sa))) {
+ close(fd);
+ return -1;
+ }
+
+ return fd;
+}
+
+static void change_flows_state(bool up)
+{
+ int i;
+ uint32_t flags;
+
+ pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
+
+ for (i = 0; i < MAX_SAPS; i++) {
+ if (eth_llc_data.ef_to_fd[i] != -1) {
+ fccntl(i, FLOWGFLAGS, &flags);
+ if (up)
+ fccntl(eth_llc_data.ef_to_fd[i],
+ FLOWSFLAGS, flags & ~FLOWFDOWN);
+ else
+ fccntl(eth_llc_data.ef_to_fd[i],
+ FLOWSFLAGS, flags | FLOWFDOWN);
+ }
+ }
+
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+}
+
+static void * eth_llc_ipcp_if_monitor(void * o)
+{
+ int fd;
+ int status;
+ char buf[4096];
+ struct iovec iov = {buf, sizeof(buf)};
+ struct sockaddr_nl snl;
+ struct msghdr msg = {(void *) &snl, sizeof(snl),
+ &iov, 1, NULL, 0, 0};
+ struct nlmsghdr * h;
+ struct ifinfomsg * ifi;
+
+ (void ) o;
+
+ fd = open_netlink_socket();
+ if (fd < 0) {
+ log_err("Failed to open socket.");
+ return (void *) -1;
+ }
+
+ while (ipcp_get_state() == IPCP_OPERATIONAL) {
+ status = recvmsg(fd, &msg, 0);
+ if (status < 0)
+ continue;
+
+ for (h = (struct nlmsghdr *) buf;
+ NLMSG_OK(h, (unsigned int) status);
+ h = NLMSG_NEXT(h, status)) {
+
+ /* Finish reading */
+ if (h->nlmsg_type == NLMSG_DONE)
+ break;
+
+ /* Message is some kind of error */
+ if (h->nlmsg_type == NLMSG_ERROR)
+ continue;
+
+ /* Only interested in link up/down */
+ if (h->nlmsg_type != RTM_NEWLINK)
+ continue;
+
+ ifi = NLMSG_DATA(h);
+
+ /* Not our interface */
+ if (ifi->ifi_index != eth_llc_data.device.sll_ifindex)
+ continue;
+
+ if (ifi->ifi_flags & IFF_UP) {
+ change_flows_state(true);
+ log_dbg("Interface up.");
+ } else {
+ change_flows_state(false);
+ log_dbg("Interface down.");
+ }
+ }
+ }
+
+ close(fd);
+
+ return (void *) 0;
+}
+#endif
+
#if defined (HAVE_BPF) && !defined(HAVE_NETMAP)
static int open_bpf_device(void)
{
@@ -844,13 +959,13 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)
break;
}
+ freeifaddrs(ifaddr);
+
if (ifa == NULL) {
log_err("Interface not found.");
- freeifaddrs(ifaddr);
return -1;
}
- freeifaddrs(ifaddr);
#elif defined(__linux__)
skfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (skfd < 0) {
@@ -904,38 +1019,32 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)
if (BPF_BLEN < blen) {
log_err("BPF buffer too small (is: %ld must be: %d).",
BPF_BLEN, blen);
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCSETIF, &ifr) < 0) {
log_err("Failed to set interface.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCSHDRCMPLT, &enable) < 0) {
log_err("Failed to set BIOCSHDRCMPLT.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCSSEESENT, &disable) < 0) {
log_err("Failed to set BIOCSSEESENT.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCSRTIMEOUT, &tv) < 0) {
log_err("Failed to set BIOCSRTIMEOUT.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCIMMEDIATE, &enable) < 0) {
log_err("Failed to set BIOCIMMEDIATE.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
log_info("Using Berkeley Packet Filter.");
@@ -958,39 +1067,74 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)
if (bind(eth_llc_data.s_fd, (struct sockaddr *) &eth_llc_data.device,
sizeof(eth_llc_data.device))) {
log_err("Failed to bind socket to interface");
- close(eth_llc_data.s_fd);
- return -1;
+ goto fail_device;
}
if (setsockopt(eth_llc_data.s_fd, SOL_SOCKET, SO_RCVTIMEO,
&tv, sizeof(tv))) {
log_err("Failed to set socket timeout: %s.", strerror(errno));
- close(eth_llc_data.s_fd);
- return -1;
+ goto fail_device;
}
#endif /* HAVE_NETMAP */
ipcp_set_state(IPCP_OPERATIONAL);
- pthread_create(&eth_llc_data.mgmt_handler,
- NULL,
- eth_llc_ipcp_mgmt_handler,
- NULL);
+#ifdef __linux__
+ if (pthread_create(&eth_llc_data.if_monitor,
+ NULL,
+ eth_llc_ipcp_if_monitor,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_device;
+ }
+#endif
+
+ if (pthread_create(&eth_llc_data.mgmt_handler,
+ NULL,
+ eth_llc_ipcp_mgmt_handler,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_mgmt_handler;
+ }
- pthread_create(&eth_llc_data.sdu_reader,
- NULL,
- eth_llc_ipcp_sdu_reader,
- NULL);
+ if (pthread_create(&eth_llc_data.sdu_reader,
+ NULL,
+ eth_llc_ipcp_sdu_reader,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_sdu_reader;
+ }
- pthread_create(&eth_llc_data.sdu_writer,
- NULL,
- eth_llc_ipcp_sdu_writer,
- NULL);
+ if (pthread_create(&eth_llc_data.sdu_writer,
+ NULL,
+ eth_llc_ipcp_sdu_writer,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_sdu_writer;
+ }
log_dbg("Bootstrapped shim IPCP over Ethernet with LLC with api %d.",
getpid());
return 0;
+
+ fail_sdu_writer:
+ pthread_join(eth_llc_data.sdu_reader, NULL);
+ fail_sdu_reader:
+ pthread_join(eth_llc_data.mgmt_handler, NULL);
+ fail_mgmt_handler:
+#ifdef __linux__
+ pthread_join(eth_llc_data.if_monitor, NULL);
+#endif
+ fail_device:
+#if defined(HAVE_NETMAP)
+ nm_close(eth_llc_data.nmd);
+#elif defined(HAVE_BPF)
+ close(eth_llc_data.bpf);
+#elif defined(HAVE_RAW_SOCKETS)
+ close(eth_llc_data.s_fd);
+#endif
+ return -1;
}
static int eth_llc_ipcp_reg(const uint8_t * hash)
@@ -1228,6 +1372,9 @@ int main(int argc,
pthread_join(eth_llc_data.sdu_writer, NULL);
pthread_join(eth_llc_data.sdu_reader, NULL);
pthread_join(eth_llc_data.mgmt_handler, NULL);
+#ifdef __linux__
+ pthread_join(eth_llc_data.if_monitor, NULL);
+#endif
}
eth_llc_data_fini();
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 26cecb44..29ca4db2 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -163,6 +163,8 @@ set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL
"Name for the main POSIX shared memory buffer")
set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING
"SDU buffer block size, multiple of pagesize for performance")
+set(SHM_RBUFF_LOCKLESS 1 CACHE BOOL
+ "Enable shared memory rbuff lockless support")
set(SOURCE_FILES
# Add source files here
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 14ee31f4..b945968d 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -46,10 +46,10 @@
#include <stdio.h>
#include <stdarg.h>
-#define BUF_SIZE 1500
+#define BUF_SIZE 1500
-#define TW_ELEMENTS 6000
-#define TW_RESOLUTION 1 /* ms */
+#define TW_ELEMENTS 6000
+#define TW_RESOLUTION 1 /* ms */
#define MPL 2000 /* ms */
#define RQ_SIZE 20
@@ -249,12 +249,15 @@ static int api_announce(char * ap_name)
static int finalize_write(int fd,
size_t idx)
{
- if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0)
- return -ENOTALLOC;
+ int ret;
+
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret < 0)
+ return ret;
shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- return 0;
+ return ret;
}
static int frcti_init(int fd)
@@ -305,6 +308,7 @@ static int frcti_send(int fd,
{
struct timespec now = {0, 0};
struct frcti * frcti;
+ int ret;
frcti = &(ai.frcti[fd]);
@@ -331,9 +335,10 @@ static int frcti_send(int fd,
return -1;
}
- if (finalize_write(fd, shm_du_buff_get_idx(sdb))) {
+ ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
+ if (ret < 0) {
pthread_rwlock_unlock(&frcti->lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&frcti->lock);
@@ -871,12 +876,6 @@ int flow_alloc(const char * dst_name,
pthread_rwlock_unlock(&ai.lock);
- if (frcti_configure(fd, FRCTFORDERING | FRCTFERRCHCK)) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- return -1;
- }
-
return fd;
}
@@ -931,6 +930,8 @@ int fccntl(int fd,
va_list l;
struct timespec * timeo;
qosspec_t * qs;
+ uint32_t rx_acl;
+ uint32_t tx_acl;
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
@@ -988,10 +989,28 @@ int fccntl(int fd,
break;
case FLOWSFLAGS:
ai.flows[fd].oflags = va_arg(l, uint32_t);
+ rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ /*
+ * Making our own flow write only means making the
+ * the other side of the flow read only.
+ */
if (ai.flows[fd].oflags & FLOWFWRONLY)
- shm_rbuff_block(ai.flows[fd].rx_rb);
+ rx_acl |= ACL_RDONLY;
if (ai.flows[fd].oflags & FLOWFRDWR)
- shm_rbuff_unblock(ai.flows[fd].rx_rb);
+ rx_acl |= ACL_RDWR;
+
+ if (ai.flows[fd].oflags & FLOWFDOWN) {
+ rx_acl |= ACL_FLOWDOWN;
+ tx_acl |= ACL_FLOWDOWN;
+ } else {
+ rx_acl &= ~ACL_FLOWDOWN;
+ tx_acl &= ~ACL_FLOWDOWN;
+ }
+
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl);
+ shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl);
+
break;
case FLOWGFLAGS:
fflags = va_arg(l, uint32_t *);
@@ -1007,6 +1026,8 @@ int fccntl(int fd,
if (cflags == NULL)
goto einval;
*cflags = ai.frcti[fd].conf_flags;
+ if (frcti_configure(fd, ai.frcti[fd].conf_flags))
+ goto eperm;
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -1036,6 +1057,7 @@ ssize_t flow_write(int fd,
size_t count)
{
ssize_t idx;
+ int ret;
if (buf == NULL)
return 0;
@@ -1079,19 +1101,21 @@ ssize_t flow_write(int fd,
}
if (!ai.frcti[fd].used) {
- if (finalize_write(fd, idx)) {
+ ret = finalize_write(fd, idx);
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
} else {
pthread_rwlock_unlock(&ai.lock);
- if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) {
+ ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx));
+ if (ret < 0) {
shm_rdrbuff_remove(ai.rdrb, idx);
- return -1;
+ return ret;
}
}
@@ -1129,7 +1153,8 @@ ssize_t flow_read(int fd,
idx = frcti_read(fd);
if (idx < 0) {
- assert(idx == -EAGAIN || idx == -ETIMEDOUT);
+ assert(idx == -EAGAIN || idx == -ETIMEDOUT ||
+ idx == -EFLOWDOWN);
return idx;
}
@@ -1509,6 +1534,8 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
+ int ret;
+
if (sdb == NULL)
return -EINVAL;
@@ -1527,17 +1554,19 @@ int ipcp_flow_write(int fd,
assert(ai.flows[fd].tx_rb);
if (!ai.frcti[fd].used) {
- if (finalize_write(fd, shm_du_buff_get_idx(sdb))) {
+ ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
} else {
pthread_rwlock_unlock(&ai.lock);
- if (frcti_write(fd, sdb))
- return -1;
+ ret = frcti_write(fd, sdb);
+ if (ret < 0)
+ return ret;
}
return 0;
@@ -1618,6 +1647,8 @@ ssize_t local_flow_read(int fd)
int local_flow_write(int fd,
size_t idx)
{
+ int ret;
+
if (fd < 0)
return -EINVAL;
@@ -1628,9 +1659,10 @@ int local_flow_write(int fd,
return -ENOTALLOC;
}
- if (finalize_write(fd, idx)) {
+ ret = finalize_write(fd, idx);
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c
index ec0199c0..b77c4374 100644
--- a/src/lib/shm_rbuff_ll.c
+++ b/src/lib/shm_rbuff_ll.c
@@ -26,6 +26,7 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +42,6 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_OPEN 0
-#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 3 * sizeof(size_t) \
@@ -141,7 +140,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = RB_OPEN;
+ *rb->acl = ACL_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -227,14 +226,17 @@ int shm_rbuff_write(struct shm_rbuff * rb,
{
size_t ohead;
size_t nhead;
-
- bool was_empty = false;
+ bool was_empty = false;
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
- if (__sync_fetch_and_add(rb->acl, 0)) /* CLOSED */
- return -ENOTALLOC;
+ if (__sync_fetch_and_add(rb->acl, 0) != ACL_RDWR) {
+ if (__sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN)
+ return -EFLOWDOWN;
+ else if (__sync_fetch_and_add(rb->acl, 0) & ACL_RDONLY)
+ return -ENOTALLOC;
+ }
if (!shm_rbuff_free(rb))
return -EAGAIN;
@@ -266,7 +268,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
assert(rb);
if (shm_rbuff_empty(rb))
- return -EAGAIN;
+ return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ?
+ -EFLOWDOWN : -EAGAIN;
ntail = RB_TAIL;
@@ -327,26 +330,25 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-void shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags)
{
assert(rb);
- __sync_bool_compare_and_swap(rb->acl, RB_OPEN, RB_CLOSED);
+ __sync_bool_compare_and_swap(rb->acl, *rb->acl, flags);
}
-void shm_rbuff_unblock(struct shm_rbuff * rb)
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)
{
assert(rb);
- __sync_bool_compare_and_swap(rb->acl, RB_CLOSED, RB_OPEN);
+ return __sync_fetch_and_add(rb->acl, 0);
}
void shm_rbuff_fini(struct shm_rbuff * rb)
{
assert(rb);
- assert(__sync_fetch_and_add(rb->acl, 0) == RB_CLOSED);
-
if (shm_rbuff_empty(rb))
return;
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c
index 9567762f..fb183d8f 100644
--- a/src/lib/shm_rbuff_pthr.c
+++ b/src/lib/shm_rbuff_pthr.c
@@ -26,6 +26,7 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +42,6 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_OPEN 0
-#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 3 * sizeof(size_t) \
@@ -138,7 +137,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = RB_OPEN;
+ *rb->acl = ACL_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -226,8 +225,11 @@ void shm_rbuff_destroy(struct shm_rbuff * rb)
free(rb);
}
-int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
+int shm_rbuff_write(struct shm_rbuff * rb,
+ size_t idx)
{
+ int ret = 0;
+
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
@@ -237,14 +239,18 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- if (*rb->acl == RB_CLOSED) {
- pthread_mutex_unlock(rb->lock);
- return -ENOTALLOC;
+
+ if (*rb->acl != ACL_RDWR) {
+ if (*rb->acl & ACL_FLOWDOWN)
+ ret = -EFLOWDOWN;
+ else if (*rb->acl & ACL_RDONLY)
+ ret = -ENOTALLOC;
+ goto err;
}
if (!shm_rbuff_free(rb)) {
- pthread_mutex_unlock(rb->lock);
- return -EAGAIN;
+ ret = -EAGAIN;
+ goto err;
}
if (shm_rbuff_empty(rb))
@@ -256,6 +262,9 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
pthread_mutex_unlock(rb->lock);
return 0;
+ err:
+ pthread_mutex_unlock(rb->lock);
+ return ret;
}
ssize_t shm_rbuff_read(struct shm_rbuff * rb)
@@ -270,9 +279,11 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
+
if (shm_rbuff_empty(rb)) {
+ ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN;
pthread_mutex_unlock(rb->lock);
- return -EAGAIN;
+ return ret;
}
ret = *tail_el_ptr(rb);
@@ -297,6 +308,12 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
+
+ if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) {
+ pthread_mutex_unlock(rb->lock);
+ return -EFLOWDOWN;
+ }
+
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);
@@ -324,7 +341,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-void shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags)
{
assert(rb);
@@ -334,13 +352,15 @@ void shm_rbuff_block(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- *rb->acl = RB_CLOSED;
+ *rb->acl = (size_t) flags;
pthread_mutex_unlock(rb->lock);
}
-void shm_rbuff_unblock(struct shm_rbuff * rb)
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)
{
+ uint32_t flags;
+
assert(rb);
#ifndef HAVE_ROBUST_MUTEX
@@ -349,9 +369,11 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- *rb->acl = RB_OPEN;
+ flags = (uint32_t) *rb->acl;
pthread_mutex_unlock(rb->lock);
+
+ return flags;
}
void shm_rbuff_fini(struct shm_rbuff * rb)
@@ -364,8 +386,6 @@ void shm_rbuff_fini(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- assert(*rb->acl == RB_CLOSED);
-
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);