diff options
| -rw-r--r-- | doc/man/flow_read.3 | 3 | ||||
| -rw-r--r-- | include/ouroboros/errno.h | 1 | ||||
| -rw-r--r-- | include/ouroboros/fccntl.h | 2 | ||||
| -rw-r--r-- | include/ouroboros/shm_rbuff.h | 11 | ||||
| -rw-r--r-- | src/ipcpd/local/main.c | 18 | ||||
| -rw-r--r-- | src/ipcpd/normal/connmgr.h | 14 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.c | 12 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 207 | ||||
| -rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/lib/dev.c | 86 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_ll.c | 30 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_pthr.c | 52 | 
12 files changed, 331 insertions, 107 deletions
| diff --git a/doc/man/flow_read.3 b/doc/man/flow_read.3 index f4f94e67..36a506a6 100644 --- a/doc/man/flow_read.3 +++ b/doc/man/flow_read.3 @@ -50,6 +50,9 @@ Invalid flow descriptor passed.  .B -ENOTALLOC  The flow was not allocated. +.B -EFLOWDOWN +The flow has been reported down. +  .SH ATTRIBUTES  For an explanation of the terms used in this section, see \fBattributes\fR(7). diff --git a/include/ouroboros/errno.h b/include/ouroboros/errno.h index 7cd58aa5..1710f7f8 100644 --- a/include/ouroboros/errno.h +++ b/include/ouroboros/errno.h @@ -30,5 +30,6 @@  #define EIRMD        1002 /* Failed to communicate with IRMD */  #define EIPCP        1003 /* Failed to communicate with IPCP */  #define EIPCPSTATE   1004 /* Target in wrong state */ +#define EFLOWDOWN    1005 /* Flow is down */  #endif /* OUROBOROS_ERRNO_H */ diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h index d938439f..3bd09e65 100644 --- a/include/ouroboros/fccntl.h +++ b/include/ouroboros/fccntl.h @@ -33,6 +33,8 @@  #define FLOWFRDWR     00000002 /* Read-write flow       */  #define FLOWFACCMODE  00000003 /* Access mask           */ +#define FLOWFDOWN     00000004 /* Flow is down          */ +  #define FLOWFNONBLOCK 00004000 /* Non-blocking flow     */  #define FLOWFDEFAULT  00000002 /* Default, blocking, rw */ diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index 55d03b41..af50aada 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -26,6 +26,12 @@  #include <sys/types.h>  #include <sys/time.h> +#include <stdint.h> + +#define ACL_RDWR     0000 +#define ACL_RDONLY   0001 +#define ACL_FLOWDOWN 0002 +  struct shm_rbuff;  struct shm_rbuff * shm_rbuff_create(pid_t api, @@ -38,9 +44,10 @@ void               shm_rbuff_close(struct shm_rbuff * rb);  void               shm_rbuff_destroy(struct shm_rbuff * rb); -void               shm_rbuff_block(struct shm_rbuff * rb); +void               shm_rbuff_set_acl(struct shm_rbuff * rb, +                                     uint32_t           flags); -void               shm_rbuff_unblock(struct shm_rbuff * rb); +uint32_t           shm_rbuff_get_acl(struct shm_rbuff * rb);  void               shm_rbuff_fini(struct shm_rbuff * rb); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index fb4e312b..7f44a2c6 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -99,30 +99,28 @@ static void * ipcp_local_sdu_loop(void * o)          (void) o; -        while (true) { -                int fd; +        while (ipcp_get_state() == IPCP_OPERATIONAL) { +                int     fd;                  ssize_t idx; -                if (ipcp_get_state() != IPCP_OPERATIONAL) -                        return (void *) 1; /* -ENOTENROLLED */ -                  fevent(local_data.flows, local_data.fq, &timeout);                  while ((fd = fqueue_next(local_data.fq)) >= 0) { -                        pthread_rwlock_rdlock(&local_data.lock); -                          idx = local_flow_read(fd); +                        if (idx < 0) +                                continue;                          assert(idx < (SHM_BUFFER_SIZE)); +                        pthread_rwlock_rdlock(&local_data.lock); +                          fd = local_data.in_out[fd]; +                        pthread_rwlock_unlock(&local_data.lock); +                          if (fd != -1)                                  local_flow_write(fd, idx); - -                        pthread_rwlock_unlock(&local_data.lock);                  } -          }          return (void *) 0; diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h index ca5288ae..a8edee7d 100644 --- a/src/ipcpd/normal/connmgr.h +++ b/src/ipcpd/normal/connmgr.h @@ -28,12 +28,14 @@  #include "ae.h" -#define NOTIFY_DT_CONN_ADD   0x00D0 -#define NOTIFY_DT_CONN_DEL   0x00D1 -#define NOTIFY_DT_CONN_QOS   0x00D2 - -#define NOTIFY_MGMT_CONN_ADD 0x00F0 -#define NOTIFY_MGMT_CONN_DEL 0x00F1 +#define NOTIFY_DT_CONN_ADD    0x00D0 +#define NOTIFY_DT_CONN_DEL    0x00D1 +#define NOTIFY_DT_CONN_QOS    0x00D2 +#define NOTIFY_DT_CONN_DOWN   0x00D3 + +#define NOTIFY_MGMT_CONN_ADD  0x00F0 +#define NOTIFY_MGMT_CONN_DEL  0x00F1 +#define NOTIFY_MGMT_CONN_DOWN 0x00F2  int         connmgr_init(void); diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 2df17163..56cb5a61 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -93,6 +93,7 @@ static void sdu_handler(int                  fd,                          struct shm_du_buff * sdb)  {          struct dt_pci dt_pci; +        int           ret;          memset(&dt_pci, 0, sizeof(dt_pci)); @@ -112,8 +113,11 @@ static void sdu_handler(int                  fd,                          return;                  } -                if (ipcp_flow_write(fd, sdb)) { +                ret = ipcp_flow_write(fd, sdb); +                if (ret < 0) {                          log_err("Failed to write SDU to fd %d.", fd); +                        if (ret == -EFLOWDOWN) +                                notifier_event(NOTIFY_DT_CONN_DOWN, &fd);                          ipcp_sdb_release(sdb);                          return;                  } @@ -323,6 +327,7 @@ int dt_write_sdu(uint64_t             dst_addr,  {          int           fd;          struct dt_pci dt_pci; +        int           ret;          assert(sdb);          assert(dst_addr != ipcpi.dt_addr); @@ -342,8 +347,11 @@ int dt_write_sdu(uint64_t             dst_addr,                  return -1;          } -        if (ipcp_flow_write(fd, sdb)) { +        ret = ipcp_flow_write(fd, sdb); +        if (ret < 0) {                  log_err("Failed to write SDU to fd %d.", fd); +                if (ret == -EFLOWDOWN) +                        notifier_event(NOTIFY_DT_CONN_DOWN, &fd);                  return -1;          } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 20734d6e..36d101e5 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -43,6 +43,7 @@  #include <ouroboros/fqueue.h>  #include <ouroboros/logs.h>  #include <ouroboros/time_utils.h> +#include <ouroboros/fccntl.h>  #include "ipcp.h"  #include "shim-data.h" @@ -64,6 +65,8 @@  #ifdef __linux__  #include <linux/if_packet.h>  #include <linux/if_ether.h> +#include <linux/netlink.h> +#include <linux/rtnetlink.h>  #endif  #ifdef __FreeBSD__ @@ -154,6 +157,10 @@ struct {          pthread_t          sdu_writer;          pthread_t          sdu_reader; +#ifdef __linux__ +        pthread_t          if_monitor; +#endif +          /* Handle mgmt frames in a different thread */          pthread_t          mgmt_handler;          pthread_mutex_t    mgmt_lock; @@ -771,6 +778,114 @@ static void * eth_llc_ipcp_sdu_writer(void * o)          return (void *) 1;  } +#ifdef __linux__ +static int open_netlink_socket(void) +{ +        struct sockaddr_nl sa; +        int                fd; + +        memset(&sa, 0, sizeof(sa)); +        sa.nl_family = AF_NETLINK; +        sa.nl_pid = getpid(); +        sa.nl_groups = RTMGRP_LINK; + +        fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); +        if (fd < 0) +                return -1; + +        if (bind(fd, (struct sockaddr *) &sa, sizeof(sa))) { +                close(fd); +                return -1; +        } + +        return fd; +} + +static void change_flows_state(bool up) +{ +        int      i; +        uint32_t flags; + +        pthread_rwlock_rdlock(ð_llc_data.flows_lock); + +        for (i = 0; i < MAX_SAPS; i++) { +                if (eth_llc_data.ef_to_fd[i] != -1) { +                        fccntl(i, FLOWGFLAGS, &flags); +                        if (up) +                                fccntl(eth_llc_data.ef_to_fd[i], +                                       FLOWSFLAGS, flags & ~FLOWFDOWN); +                        else +                                fccntl(eth_llc_data.ef_to_fd[i], +                                       FLOWSFLAGS, flags | FLOWFDOWN); +                } +        } + +        pthread_rwlock_unlock(ð_llc_data.flows_lock); +} + +static void * eth_llc_ipcp_if_monitor(void * o) +{ +        int                fd; +        int                status; +        char               buf[4096]; +        struct iovec       iov = {buf, sizeof(buf)}; +        struct sockaddr_nl snl; +        struct msghdr      msg = {(void *) &snl, sizeof(snl), +                                  &iov, 1, NULL, 0, 0}; +        struct nlmsghdr *  h; +        struct ifinfomsg * ifi; + +        (void ) o; + +        fd = open_netlink_socket(); +        if (fd < 0) { +                log_err("Failed to open socket."); +                return (void *) -1; +        } + +        while (ipcp_get_state() == IPCP_OPERATIONAL) { +                status = recvmsg(fd, &msg, 0); +                if (status < 0) +                        continue; + +                for (h = (struct nlmsghdr *) buf; +                     NLMSG_OK(h, (unsigned int) status); +                     h = NLMSG_NEXT(h, status)) { + +                        /* Finish reading */ +                        if (h->nlmsg_type == NLMSG_DONE) +                                break; + +                        /* Message is some kind of error */ +                        if (h->nlmsg_type == NLMSG_ERROR) +                                continue; + +                        /* Only interested in link up/down */ +                        if (h->nlmsg_type != RTM_NEWLINK) +                                continue; + +                        ifi = NLMSG_DATA(h); + +                        /* Not our interface */ +                        if (ifi->ifi_index != eth_llc_data.device.sll_ifindex) +                                continue; + +                        if (ifi->ifi_flags & IFF_UP) { +                                change_flows_state(true); +                                log_dbg("Interface up."); +                        } else { +                                change_flows_state(false); +                                log_dbg("Interface down."); +                        } +                } +        } + +        close(fd); + +        return (void *) 0; +} +#endif +  #if defined (HAVE_BPF) && !defined(HAVE_NETMAP)  static int open_bpf_device(void)  { @@ -844,13 +959,13 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)                  break;          } +        freeifaddrs(ifaddr); +          if (ifa == NULL) {                  log_err("Interface not found."); -                freeifaddrs(ifaddr);                  return -1;          } -        freeifaddrs(ifaddr);  #elif defined(__linux__)          skfd = socket(AF_UNIX, SOCK_STREAM, 0);          if (skfd < 0) { @@ -904,38 +1019,32 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)          if (BPF_BLEN < blen) {                  log_err("BPF buffer too small (is: %ld must be: %d).",                          BPF_BLEN, blen); -                close(eth_llc_data.bpf); -                return -1; +                goto fail_device;          }          if (ioctl(eth_llc_data.bpf, BIOCSETIF, &ifr) < 0) {                  log_err("Failed to set interface."); -                close(eth_llc_data.bpf); -                return -1; +                goto fail_device;          }          if (ioctl(eth_llc_data.bpf, BIOCSHDRCMPLT, &enable) < 0) {                  log_err("Failed to set BIOCSHDRCMPLT."); -                close(eth_llc_data.bpf); -                return -1; +                goto fail_device;          }          if (ioctl(eth_llc_data.bpf, BIOCSSEESENT, &disable) < 0) {                  log_err("Failed to set BIOCSSEESENT."); -                close(eth_llc_data.bpf); -                return -1; +                goto fail_device;          }          if (ioctl(eth_llc_data.bpf, BIOCSRTIMEOUT, &tv) < 0) {                  log_err("Failed to set BIOCSRTIMEOUT."); -                close(eth_llc_data.bpf); -                return -1; +                goto fail_device;          }          if (ioctl(eth_llc_data.bpf, BIOCIMMEDIATE, &enable) < 0) {                  log_err("Failed to set BIOCIMMEDIATE."); -                close(eth_llc_data.bpf); -                return -1; +                goto fail_device;          }          log_info("Using Berkeley Packet Filter."); @@ -958,39 +1067,74 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)          if (bind(eth_llc_data.s_fd, (struct sockaddr *) ð_llc_data.device,                  sizeof(eth_llc_data.device))) {                  log_err("Failed to bind socket to interface"); -                close(eth_llc_data.s_fd); -                return -1; +                goto fail_device;          }          if (setsockopt(eth_llc_data.s_fd, SOL_SOCKET, SO_RCVTIMEO,                         &tv, sizeof(tv))) {                  log_err("Failed to set socket timeout: %s.", strerror(errno)); -                close(eth_llc_data.s_fd); -                return -1; +                goto fail_device;          }  #endif /* HAVE_NETMAP */          ipcp_set_state(IPCP_OPERATIONAL); -        pthread_create(ð_llc_data.mgmt_handler, -                       NULL, -                       eth_llc_ipcp_mgmt_handler, -                       NULL); +#ifdef __linux__ +        if (pthread_create(ð_llc_data.if_monitor, +                           NULL, +                           eth_llc_ipcp_if_monitor, +                           NULL)) { +                ipcp_set_state(IPCP_INIT); +                goto fail_device; +        } +#endif + +        if (pthread_create(ð_llc_data.mgmt_handler, +                           NULL, +                           eth_llc_ipcp_mgmt_handler, +                           NULL)) { +                ipcp_set_state(IPCP_INIT); +                goto fail_mgmt_handler; +        } -        pthread_create(ð_llc_data.sdu_reader, -                       NULL, -                       eth_llc_ipcp_sdu_reader, -                       NULL); +        if (pthread_create(ð_llc_data.sdu_reader, +                           NULL, +                           eth_llc_ipcp_sdu_reader, +                           NULL)) { +                ipcp_set_state(IPCP_INIT); +                goto fail_sdu_reader; +        } -        pthread_create(ð_llc_data.sdu_writer, -                       NULL, -                       eth_llc_ipcp_sdu_writer, -                       NULL); +        if (pthread_create(ð_llc_data.sdu_writer, +                           NULL, +                           eth_llc_ipcp_sdu_writer, +                           NULL)) { +                ipcp_set_state(IPCP_INIT); +                goto fail_sdu_writer; +        }          log_dbg("Bootstrapped shim IPCP over Ethernet with LLC with api %d.",                  getpid());          return 0; + + fail_sdu_writer: +        pthread_join(eth_llc_data.sdu_reader, NULL); + fail_sdu_reader: +        pthread_join(eth_llc_data.mgmt_handler, NULL); + fail_mgmt_handler: +#ifdef __linux__ +        pthread_join(eth_llc_data.if_monitor, NULL); +#endif + fail_device: +#if defined(HAVE_NETMAP) +        nm_close(eth_llc_data.nmd); +#elif defined(HAVE_BPF) +        close(eth_llc_data.bpf); +#elif defined(HAVE_RAW_SOCKETS) +        close(eth_llc_data.s_fd); +#endif +        return -1;  }  static int eth_llc_ipcp_reg(const uint8_t * hash) @@ -1228,6 +1372,9 @@ int main(int    argc,                  pthread_join(eth_llc_data.sdu_writer, NULL);                  pthread_join(eth_llc_data.sdu_reader, NULL);                  pthread_join(eth_llc_data.mgmt_handler, NULL); +#ifdef __linux__ +                pthread_join(eth_llc_data.if_monitor, NULL); +#endif          }          eth_llc_data_fini(); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 26cecb44..29ca4db2 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -163,6 +163,8 @@ set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL    "Name for the main POSIX shared memory buffer")  set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING    "SDU buffer block size, multiple of pagesize for performance") +set(SHM_RBUFF_LOCKLESS 1 CACHE BOOL +  "Enable shared memory rbuff lockless support")  set(SOURCE_FILES    # Add source files here diff --git a/src/lib/dev.c b/src/lib/dev.c index 14ee31f4..b945968d 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -46,10 +46,10 @@  #include <stdio.h>  #include <stdarg.h> -#define BUF_SIZE 1500 +#define BUF_SIZE       1500 -#define TW_ELEMENTS   6000 -#define TW_RESOLUTION 1   /* ms */ +#define TW_ELEMENTS    6000 +#define TW_RESOLUTION  1   /* ms */  #define MPL            2000 /* ms */  #define RQ_SIZE        20 @@ -249,12 +249,15 @@ static int api_announce(char * ap_name)  static int finalize_write(int    fd,                            size_t idx)  { -        if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) -                return -ENOTALLOC; +        int ret; + +        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); +        if (ret < 0) +                return ret;          shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); -        return 0; +        return ret;  }  static int frcti_init(int fd) @@ -305,6 +308,7 @@ static int frcti_send(int                  fd,  {          struct timespec now = {0, 0};          struct frcti *  frcti; +        int             ret;          frcti = &(ai.frcti[fd]); @@ -331,9 +335,10 @@ static int frcti_send(int                  fd,                  return -1;          } -        if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { +        ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); +        if (ret < 0) {                  pthread_rwlock_unlock(&frcti->lock); -                return -ENOTALLOC; +                return ret;          }          pthread_rwlock_unlock(&frcti->lock); @@ -871,12 +876,6 @@ int flow_alloc(const char *            dst_name,          pthread_rwlock_unlock(&ai.lock); -        if (frcti_configure(fd, FRCTFORDERING | FRCTFERRCHCK)) { -                flow_fini(fd); -                bmp_release(ai.fds, fd); -                return -1; -        } -          return fd;  } @@ -931,6 +930,8 @@ int fccntl(int fd,          va_list           l;          struct timespec * timeo;          qosspec_t *       qs; +        uint32_t          rx_acl; +        uint32_t          tx_acl;          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; @@ -988,10 +989,28 @@ int fccntl(int fd,                  break;          case FLOWSFLAGS:                  ai.flows[fd].oflags = va_arg(l, uint32_t); +                rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); +                tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); +                /* +                 * Making our own flow write only means making the +                 * the other side of the flow read only. +                 */                  if (ai.flows[fd].oflags & FLOWFWRONLY) -                        shm_rbuff_block(ai.flows[fd].rx_rb); +                        rx_acl |= ACL_RDONLY;                  if (ai.flows[fd].oflags & FLOWFRDWR) -                        shm_rbuff_unblock(ai.flows[fd].rx_rb); +                        rx_acl |= ACL_RDWR; + +                if (ai.flows[fd].oflags & FLOWFDOWN) { +                        rx_acl |= ACL_FLOWDOWN; +                        tx_acl |= ACL_FLOWDOWN; +                } else { +                        rx_acl &= ~ACL_FLOWDOWN; +                        tx_acl &= ~ACL_FLOWDOWN; +                } + +                shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl); +                shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl); +                  break;          case FLOWGFLAGS:                  fflags = va_arg(l, uint32_t *); @@ -1007,6 +1026,8 @@ int fccntl(int fd,                  if (cflags == NULL)                          goto einval;                  *cflags = ai.frcti[fd].conf_flags; +                if (frcti_configure(fd, ai.frcti[fd].conf_flags)) +                        goto eperm;                  break;          default:                  pthread_rwlock_unlock(&ai.lock); @@ -1036,6 +1057,7 @@ ssize_t flow_write(int          fd,                     size_t       count)  {          ssize_t idx; +        int     ret;          if (buf == NULL)                  return 0; @@ -1079,19 +1101,21 @@ ssize_t flow_write(int          fd,          }          if (!ai.frcti[fd].used) { -                if (finalize_write(fd, idx)) { +                ret = finalize_write(fd, idx); +                if (ret < 0) {                          pthread_rwlock_unlock(&ai.lock);                          shm_rdrbuff_remove(ai.rdrb, idx); -                        return -ENOTALLOC; +                        return ret;                  }                  pthread_rwlock_unlock(&ai.lock);          } else {                  pthread_rwlock_unlock(&ai.lock); -                if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { +                ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx)); +                if (ret < 0) {                          shm_rdrbuff_remove(ai.rdrb, idx); -                        return -1; +                        return ret;                  }          } @@ -1129,7 +1153,8 @@ ssize_t flow_read(int    fd,                  idx = frcti_read(fd);          if (idx < 0) { -                assert(idx == -EAGAIN || idx == -ETIMEDOUT); +                assert(idx == -EAGAIN || idx == -ETIMEDOUT || +                       idx == -EFLOWDOWN);                  return idx;          } @@ -1509,6 +1534,8 @@ int ipcp_flow_read(int                   fd,  int ipcp_flow_write(int                  fd,                      struct shm_du_buff * sdb)  { +        int ret; +          if (sdb == NULL)                  return -EINVAL; @@ -1527,17 +1554,19 @@ int ipcp_flow_write(int                  fd,          assert(ai.flows[fd].tx_rb);          if (!ai.frcti[fd].used) { -                if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { +                ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); +                if (ret < 0) {                          pthread_rwlock_unlock(&ai.lock); -                        return -ENOTALLOC; +                        return ret;                  }                  pthread_rwlock_unlock(&ai.lock);          } else {                  pthread_rwlock_unlock(&ai.lock); -                if (frcti_write(fd, sdb)) -                        return -1; +                ret = frcti_write(fd, sdb); +                if (ret < 0) +                        return ret;          }          return 0; @@ -1618,6 +1647,8 @@ ssize_t local_flow_read(int fd)  int local_flow_write(int    fd,                       size_t idx)  { +        int ret; +          if (fd < 0)                  return -EINVAL; @@ -1628,9 +1659,10 @@ int local_flow_write(int    fd,                  return -ENOTALLOC;          } -        if (finalize_write(fd, idx)) { +        ret = finalize_write(fd, idx); +        if (ret < 0) {                  pthread_rwlock_unlock(&ai.lock); -                return -ENOTALLOC; +                return ret;          }          pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index ec0199c0..b77c4374 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -26,6 +26,7 @@  #include <ouroboros/lockfile.h>  #include <ouroboros/time_utils.h>  #include <ouroboros/errno.h> +#include <ouroboros/fccntl.h>  #include <pthread.h>  #include <sys/mman.h> @@ -41,8 +42,6 @@  #include <stdbool.h>  #define FN_MAX_CHARS 255 -#define RB_OPEN 0 -#define RB_CLOSED 1  #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t)        \                               + 3 * sizeof(size_t)                       \ @@ -141,7 +140,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          pthread_cond_init(rb->add, &cattr);          pthread_cond_init(rb->del, &cattr); -        *rb->acl = RB_OPEN; +        *rb->acl = ACL_RDWR;          *rb->head = 0;          *rb->tail = 0; @@ -227,14 +226,17 @@ int shm_rbuff_write(struct shm_rbuff * rb,  {          size_t ohead;          size_t nhead; - -        bool was_empty = false; +        bool   was_empty = false;          assert(rb);          assert(idx < SHM_BUFFER_SIZE); -        if (__sync_fetch_and_add(rb->acl, 0)) /* CLOSED */ -                return -ENOTALLOC; +        if (__sync_fetch_and_add(rb->acl, 0) != ACL_RDWR) { +                if (__sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN) +                        return -EFLOWDOWN; +                else if (__sync_fetch_and_add(rb->acl, 0) & ACL_RDONLY) +                        return -ENOTALLOC; +        }          if (!shm_rbuff_free(rb))                  return -EAGAIN; @@ -266,7 +268,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          assert(rb);          if (shm_rbuff_empty(rb)) -                return -EAGAIN; +                return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ? +                        -EFLOWDOWN : -EAGAIN;          ntail = RB_TAIL; @@ -327,26 +330,25 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          return idx;  } -void shm_rbuff_block(struct shm_rbuff * rb) +void shm_rbuff_set_acl(struct shm_rbuff * rb, +                       uint32_t           flags)  {          assert(rb); -        __sync_bool_compare_and_swap(rb->acl, RB_OPEN, RB_CLOSED); +        __sync_bool_compare_and_swap(rb->acl, *rb->acl, flags);  } -void shm_rbuff_unblock(struct shm_rbuff * rb) +uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)  {          assert(rb); -        __sync_bool_compare_and_swap(rb->acl, RB_CLOSED, RB_OPEN); +        return __sync_fetch_and_add(rb->acl, 0);  }  void shm_rbuff_fini(struct shm_rbuff * rb)  {          assert(rb); -        assert(__sync_fetch_and_add(rb->acl, 0) == RB_CLOSED); -          if (shm_rbuff_empty(rb))                  return; diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 9567762f..fb183d8f 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -26,6 +26,7 @@  #include <ouroboros/lockfile.h>  #include <ouroboros/time_utils.h>  #include <ouroboros/errno.h> +#include <ouroboros/fccntl.h>  #include <pthread.h>  #include <sys/mman.h> @@ -41,8 +42,6 @@  #include <stdbool.h>  #define FN_MAX_CHARS 255 -#define RB_OPEN 0 -#define RB_CLOSED 1  #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t)          \                               + 3 * sizeof(size_t)                         \ @@ -138,7 +137,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          pthread_cond_init(rb->add, &cattr);          pthread_cond_init(rb->del, &cattr); -        *rb->acl = RB_OPEN; +        *rb->acl  = ACL_RDWR;          *rb->head = 0;          *rb->tail = 0; @@ -226,8 +225,11 @@ void shm_rbuff_destroy(struct shm_rbuff * rb)          free(rb);  } -int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) +int shm_rbuff_write(struct shm_rbuff * rb, +                    size_t             idx)  { +        int ret = 0; +          assert(rb);          assert(idx < SHM_BUFFER_SIZE); @@ -237,14 +239,18 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)                  pthread_mutex_consistent(rb->lock);  #endif -        if (*rb->acl == RB_CLOSED) { -                pthread_mutex_unlock(rb->lock); -                return -ENOTALLOC; + +        if (*rb->acl != ACL_RDWR) { +                if (*rb->acl & ACL_FLOWDOWN) +                        ret = -EFLOWDOWN; +                else if (*rb->acl & ACL_RDONLY) +                        ret = -ENOTALLOC; +                goto err;          }          if (!shm_rbuff_free(rb)) { -                pthread_mutex_unlock(rb->lock); -                return -EAGAIN; +                ret = -EAGAIN; +                goto err;          }          if (shm_rbuff_empty(rb)) @@ -256,6 +262,9 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)          pthread_mutex_unlock(rb->lock);          return 0; + err: +        pthread_mutex_unlock(rb->lock); +        return ret;  }  ssize_t shm_rbuff_read(struct shm_rbuff * rb) @@ -270,9 +279,11 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)                  pthread_mutex_consistent(rb->lock);  #endif +          if (shm_rbuff_empty(rb)) { +                ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN;                  pthread_mutex_unlock(rb->lock); -                return -EAGAIN; +                return ret;          }          ret = *tail_el_ptr(rb); @@ -297,6 +308,12 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)                  pthread_mutex_consistent(rb->lock);  #endif + +        if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) { +                pthread_mutex_unlock(rb->lock); +                return -EFLOWDOWN; +        } +          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) rb->lock); @@ -324,7 +341,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          return idx;  } -void shm_rbuff_block(struct shm_rbuff * rb) +void shm_rbuff_set_acl(struct shm_rbuff * rb, +                       uint32_t           flags)  {          assert(rb); @@ -334,13 +352,15 @@ void shm_rbuff_block(struct shm_rbuff * rb)          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)                  pthread_mutex_consistent(rb->lock);  #endif -        *rb->acl = RB_CLOSED; +        *rb->acl = (size_t) flags;          pthread_mutex_unlock(rb->lock);  } -void shm_rbuff_unblock(struct shm_rbuff * rb) +uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)  { +        uint32_t flags; +          assert(rb);  #ifndef HAVE_ROBUST_MUTEX @@ -349,9 +369,11 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)                  pthread_mutex_consistent(rb->lock);  #endif -        *rb->acl = RB_OPEN; +        flags = (uint32_t) *rb->acl;          pthread_mutex_unlock(rb->lock); + +        return flags;  }  void shm_rbuff_fini(struct shm_rbuff * rb) @@ -364,8 +386,6 @@ void shm_rbuff_fini(struct shm_rbuff * rb)          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)                  pthread_mutex_consistent(rb->lock);  #endif -        assert(*rb->acl == RB_CLOSED); -          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) rb->lock); | 
