diff options
27 files changed, 1642 insertions, 1145 deletions
| diff --git a/CMakeLists.txt b/CMakeLists.txt index 0bfb46d8..95ed6b8a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,6 +43,7 @@ test_and_set_c_compiler_flag_global(-std=c89)  test_and_set_c_compiler_flag_global(-Wall)  test_and_set_c_compiler_flag_global(-Werror)  test_and_set_c_compiler_flag_global(-Wundef) +test_and_set_c_compiler_flag_global(-Wdeclaration-after-statement)  test_and_set_c_compiler_flag_global(-fmax-errors=5)  configure_file( diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt index f24857ed..41feb65e 100644 --- a/include/ouroboros/CMakeLists.txt +++ b/include/ouroboros/CMakeLists.txt @@ -7,11 +7,11 @@ set(HEADER_FILES    dev.h    errno.h    fcntl.h +  fqueue.h    irm.h    irm_config.h    nsm.h -  qos.h -  select.h) +  qos.h)  install(FILES ${HEADER_FILES} DESTINATION usr/include/ouroboros) diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 143ae7c8..a9d65aec 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -36,6 +36,7 @@  #define IPCP_NORMAL_EXEC       "@IPCP_NORMAL_TARGET@"  #define IPCP_LOCAL_EXEC        "@IPCP_LOCAL_TARGET@"  #define AP_MAX_FLOWS           256 +#define AP_MAX_FQUEUES         64  #define SHM_RDRB_BLOCK_SIZE    sysconf(_SC_PAGESIZE)  #define SHM_RDRB_MULTI_BLOCK  #define SHM_RDRB_PREFIX        "/ouroboros.rdrb." @@ -43,7 +44,8 @@  #define SHM_BUFFER_SIZE        (1 << 14)  #define DU_BUFF_HEADSPACE      128  #define DU_BUFF_TAILSPACE      0 -#define SHM_AP_RBUFF_PREFIX    "/ouroboros.rbuff." +#define SHM_RBUFF_PREFIX       "/ouroboros.rbuff." +#define SHM_FLOW_SET_PREFIX    "/ouroboros.sets."  #define IRMD_MAX_FLOWS         4096  #define IRMD_THREADPOOL_SIZE   5  #define LOG_DIR                "/@LOG_DIR@/" diff --git a/include/ouroboros/select.h b/include/ouroboros/fqueue.h index de309b8d..943d6510 100644 --- a/include/ouroboros/select.h +++ b/include/ouroboros/fqueue.h @@ -1,7 +1,7 @@  /*   * Ouroboros - Copyright (C) 2016   * - * A select call for flows + * Flow queues   *   *    Dimitri Staessens <dimitri.staessens@intec.ugent.be>   *    Sander Vrijders   <sander.vrijders@intec.ugent.be> @@ -21,32 +21,42 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#ifndef OUROBOROS_SELECT_H -#define OUROBOROS_SELECT_H +#ifndef OUROBOROS_FQUEUE_H +#define OUROBOROS_FQUEUE_H  #include <stdbool.h>  #include <time.h>  struct flow_set; +struct fqueue; +  typedef struct flow_set flow_set_t; +typedef struct fqueue fqueue_t;  flow_set_t * flow_set_create();  void         flow_set_destroy(flow_set_t * set); +fqueue_t *   fqueue_create(); + +void         fqueue_destroy(struct fqueue * fq); +  void         flow_set_zero(flow_set_t * set); -void         flow_set_add(flow_set_t * set, +int          flow_set_add(flow_set_t * set,                            int          fd); -void         flow_set_del(flow_set_t * set, +bool         flow_set_has(flow_set_t * set,                            int          fd); -bool         flow_set_has(flow_set_t * set, +void         flow_set_del(flow_set_t * set,                            int          fd); -int          flow_select(flow_set_t *            set, -                         const struct timespec * timeout); +int          fqueue_next(fqueue_t * fq); + +int          flow_event_wait(flow_set_t *            set, +                             fqueue_t *              fq, +                             const struct timespec * timeout);  #endif /* OUROBOROS_SELECT_H */ diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 9343aeaa..3ab05bd7 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -47,7 +47,4 @@ int  ipcp_flow_write(int                  fd,  void ipcp_flow_del(struct shm_du_buff * sdb); -/* returns flow descriptor and du buff */ -int  ipcp_read_shim(struct shm_du_buff ** sdb); -  #endif /* OUROBOROS_IPCP_DEV_H */ diff --git a/include/ouroboros/local-dev.h b/include/ouroboros/local-dev.h index 77ff47e9..30f440b1 100644 --- a/include/ouroboros/local-dev.h +++ b/include/ouroboros/local-dev.h @@ -20,14 +20,12 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#include <ouroboros/shm_ap_rbuff.h> -  #ifndef OUROBOROS_LOCAL_DEV_H  #define OUROBOROS_LOCAL_DEV_H -struct rb_entry * local_flow_read(int fd); +ssize_t local_flow_read(int fd); -int               local_flow_write(int               fd, -                                   struct rb_entry * e); +int     local_flow_write(int     fd, +                         ssize_t idx);  #endif /* OUROBOROS_LOCAL_DEV_H */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h deleted file mode 100644 index 453e4bf8..00000000 --- a/include/ouroboros/shm_ap_rbuff.h +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Ring buffer for application processes - * - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - *    Sander Vrijders   <sander.vrijders@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_SHM_AP_RBUFF_H -#define OUROBOROS_SHM_AP_RBUFF_H - -#include <ouroboros/select.h> -#include <sys/types.h> -#include <sys/time.h> -#include <stdbool.h> - -struct shm_ap_rbuff; - -struct rb_entry { -        ssize_t index; -        int     port_id; -}; - -struct shm_ap_rbuff * shm_ap_rbuff_create(); - -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api); - -void                  shm_ap_rbuff_close(struct shm_ap_rbuff * rb); - -void                  shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb); - -void                  shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, -                                             int                   port_id); - -int                   shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, -                                              int                   port_id); - -int                   shm_ap_rbuff_write(struct shm_ap_rbuff * rb, -                                         struct rb_entry *     e); - -struct rb_entry *     shm_ap_rbuff_read(struct shm_ap_rbuff * rb); - -int                   shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb); - -int                   shm_ap_rbuff_peek_b(struct shm_ap_rbuff *   rb, -                                          bool *                  set, -                                          const struct timespec * timeout); - -ssize_t               shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, -                                             int                   port_id); - -ssize_t               shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb, -                                               int                     port_id, -                                               const struct timespec * timeout); - -void                  shm_ap_rbuff_reset(struct shm_ap_rbuff * rb); - -#endif /* OUROBOROS_SHM_AP_RBUFF_H */ diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h new file mode 100644 index 00000000..32db5d36 --- /dev/null +++ b/include/ouroboros/shm_flow_set.h @@ -0,0 +1,63 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Management of flow_sets for fqueue + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SHM_FLOW_SET_H +#define OUROBOROS_SHM_FLOW_SET_H + +#include <ouroboros/fqueue.h> + +#include <sys/time.h> + +struct shm_flow_set; + +struct shm_flow_set * shm_flow_set_create(); + +void                  shm_flow_set_destroy(struct shm_flow_set * set); + +struct shm_flow_set * shm_flow_set_open(pid_t api); + +void                  shm_flow_set_close(struct shm_flow_set * set); + +void                  shm_flow_set_zero(struct shm_flow_set * shm_set, +                                        ssize_t               idx); + +int                   shm_flow_set_add(struct shm_flow_set * shm_set, +                                       ssize_t               idx, +                                       int                   port_id); + +int                   shm_flow_set_has(struct shm_flow_set * shm_set, +                                       ssize_t               idx, +                                       int                   port_id); + +void                  shm_flow_set_del(struct shm_flow_set * shm_set, +                                       ssize_t               idx, +                                       int                   port_id); + +void                  shm_flow_set_notify(struct shm_flow_set * set, +                                          int                   port_id); + +int                   shm_flow_set_wait(const struct shm_flow_set * shm_set, +                                        ssize_t                     idx, +                                        int *                       fqueue, +                                        const struct timespec *     timeout); + +#endif /* OUROBOROS_SHM_FLOW_SET_H */ diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h new file mode 100644 index 00000000..03660b88 --- /dev/null +++ b/include/ouroboros/shm_rbuff.h @@ -0,0 +1,53 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for incoming SDUs + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SHM_RBUFF_H +#define OUROBOROS_SHM_RBUFF_H + +#include <sys/types.h> +#include <sys/time.h> + +struct shm_rbuff; + +struct shm_rbuff * shm_rbuff_create(int port_id); + +struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id); + +void               shm_rbuff_close(struct shm_rbuff * rb); + +void               shm_rbuff_destroy(struct shm_rbuff * rb); + +int                shm_rbuff_block(struct shm_rbuff * rb); + +void               shm_rbuff_unblock(struct shm_rbuff * rb); + +int                shm_rbuff_write(struct shm_rbuff * rb, +                                   ssize_t            idx); + +ssize_t            shm_rbuff_read(struct shm_rbuff * rb); + +ssize_t            shm_rbuff_read_b(struct shm_rbuff *      rb, +                                    const struct timespec * timeout); + +void               shm_rbuff_reset(struct shm_rbuff * rb); + +#endif /* OUROBOROS_SHM_RBUFF_H */ diff --git a/include/ouroboros/wrap/ouroboros.i b/include/ouroboros/wrap/ouroboros.i index 394b505a..26cc6076 100644 --- a/include/ouroboros/wrap/ouroboros.i +++ b/include/ouroboros/wrap/ouroboros.i @@ -26,11 +26,11 @@  #include "ouroboros/dev.h"  #include "ouroboros/errno.h"  #include "ouroboros/fcntl.h" +#include "ouroboros/fqueue.h"  #include "ouroboros/irm.h"  #include "ouroboros/irm_config.h"  #include "ouroboros/nsm.h"  #include "ouroboros/qos.h" -#include "ouroboros/select.h"  %}  typedef int pid_t; @@ -39,8 +39,8 @@ typedef int pid_t;  %include "ouroboros/dev.h"  %include "ouroboros/errno.h"  %include "ouroboros/fcntl.h" +%include "ouroboros/fqueue.h"  %include "ouroboros/irm.h"  %include "ouroboros/irm_config.h"  %include "ouroboros/nsm.h"  %include "ouroboros/qos.h" -%include "ouroboros/select.h" diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a9f80ee7..f9246c7a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -323,9 +323,9 @@ void * ipcp_main_loop(void * o)                          ret_msg.has_result = true;                          ret_msg.result =                                  ipcpi.ops->ipcp_flow_alloc(fd, -                                                            msg->dst_name, -                                                            msg->src_ae_name, -                                                            msg->qos_cube); +                                                           msg->dst_name, +                                                           msg->src_ae_name, +                                                           msg->qos_cube);                          if (ret_msg.result < 0) {                                  LOG_DBG("Deallocate failed on port_id %d.",                                          msg->port_id); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 4e500a8a..68c9ae8c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -25,7 +25,7 @@  #include <ouroboros/errno.h>  #include <ouroboros/dev.h>  #include <ouroboros/fcntl.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h>  #include <ouroboros/ipcp-dev.h>  #include <ouroboros/local-dev.h>  #define OUROBOROS_PREFIX "ipcpd/local" @@ -39,6 +39,7 @@  #include <sys/wait.h>  #include <fcntl.h> +#define EVENT_WAIT_TIMEOUT 100 /* us */  #define THIS_TYPE IPCP_LOCAL  /* global for trapping signal */ @@ -46,18 +47,25 @@ int irmd_api;  struct {          int                   in_out[IRMD_MAX_FLOWS]; +        flow_set_t *          flows;          pthread_rwlock_t      lock;          pthread_t             sduloop;  } local_data; -void local_data_init() +int local_data_init()  {          int i;          for (i = 0; i < IRMD_MAX_FLOWS; ++i)                  local_data.in_out[i] = -1; +        local_data.flows = flow_set_create(); +        if (local_data.flows == NULL) +                return -ENFILE; +          pthread_rwlock_init(&local_data.lock, NULL); + +        return 0;  }  void local_data_fini() @@ -67,11 +75,24 @@ void local_data_fini()  static void * ipcp_local_sdu_loop(void * o)  { +        struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1; +          while (true) {                  int fd; -                struct rb_entry * e; +                int ret; +                ssize_t idx; + +                ret = flow_event_wait(local_data.flows, fq, &timeout); +                if (ret == -ETIMEDOUT) +                        continue; -                fd = flow_select(NULL, NULL); +                if (ret < 0) { +                        LOG_ERR("Event wait returned error code %d.", -ret); +                        continue; +                }                  pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o)                  pthread_rwlock_rdlock(&local_data.lock); -                e = local_flow_read(fd); +                while ((fd = fqueue_next(fq)) >= 0) { +                        idx = local_flow_read(fd); -                fd = local_data.in_out[fd]; +                        fd = local_data.in_out[fd]; -                if (fd != -1) -                        local_flow_write(fd, e); +                        if (fd != -1) +                                local_flow_write(fd, idx); +                }                  pthread_rwlock_unlock(&local_data.lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); - -                free(e);          } -        return (void *) 1; +        return (void *) 0;  }  void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name)          if (ipcp_data_add_reg_entry(ipcpi.data, name)) {                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_DBGF("Failed to add %s to local registry.", name); +                LOG_DBG("Failed to add %s to local registry.", name);                  return -1;          } @@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int           fd,          if (ipcp_get_state() != IPCP_ENROLLED) {                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_DBGF("Won't register with non-enrolled IPCP."); +                LOG_DBG("Won't register with non-enrolled IPCP.");                  return -1; /* -ENOTENROLLED */          }          pthread_rwlock_wrlock(&local_data.lock); +        flow_set_add(local_data.flows, fd); +          out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);          local_data.in_out[fd]  = out_fd; @@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)                  return 0;          pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_rdlock(&local_data.lock);          out_fd = local_data.in_out[fd];          if (out_fd < 0) { @@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)                  return -1;          } +        flow_set_add(local_data.flows, fd); + +        pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock);          if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) @@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd)          if (fd < 0)                  return -EINVAL; +        flow_set_del(local_data.flows, fd); +          while (flow_dealloc(fd) == -EBUSY)                  nanosleep(&t, NULL); @@ -289,9 +318,14 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        local_data_init(); -          if (ap_init(NULL) < 0) { +                LOG_ERR("Failed to init application."); +                close_logfile(); +                exit(EXIT_FAILURE); +        } + +        if (local_data_init() < 0) { +                LOG_ERR("Failed to init local data.");                  close_logfile();                  exit(EXIT_FAILURE);          } @@ -331,10 +365,10 @@ int main(int argc, char * argv[])          pthread_cancel(local_data.sduloop);          pthread_join(local_data.sduloop, NULL); -        ap_fini(); -          local_data_fini(); +        ap_fini(); +          close_logfile();          exit(EXIT_SUCCESS); diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8c627641..2800dcb2 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,7 +27,7 @@  #include <ouroboros/dev.h>  #include <ouroboros/list.h>  #include <ouroboros/ipcp-dev.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h>  #include <ouroboros/errno.h>  #include <stdlib.h> @@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o)          struct shm_du_buff * sdb;          struct timespec timeout = {0, FD_UPDATE_TIMEOUT};          struct np1_flow * flow; +        int fd; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1;          while (true) { -                int fd = flow_select(fmgr.np1_set, &timeout); -                if (fd == -ETIMEDOUT) +                int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); +                if (ret == -ETIMEDOUT)                          continue; -                if (fd < 0) { -                        LOG_ERR("Failed to get active fd."); +                if (ret < 0) { +                        LOG_ERR("Event error: %d.", ret);                          continue;                  } -                if (ipcp_flow_read(fd, &sdb)) { -                        LOG_ERR("Failed to read SDU from fd %d.", fd); -                        continue; -                } +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                LOG_ERR("Failed to read SDU from fd %d.", fd); +                                continue; +                        } -                pthread_rwlock_rdlock(&fmgr.np1_flows_lock); -                flow = fmgr.np1_flows[fd]; -                if (flow == NULL) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        ipcp_flow_del(sdb); -                        LOG_ERR("Failed to retrieve flow."); -                        continue; -                } +                        pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + +                        flow = fmgr.np1_flows[fd]; +                        if (flow == NULL) { +                                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                                ipcp_flow_del(sdb); +                                LOG_ERR("Failed to retrieve flow."); +                                continue; +                        } + +                        if (frct_i_write_sdu(flow->cep_id, sdb)) { +                                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                                ipcp_flow_del(sdb); +                                LOG_ERR("Failed to hand SDU to FRCT."); +                                continue; +                        } -                if (frct_i_write_sdu(flow->cep_id, sdb)) {                          pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        ipcp_flow_del(sdb); -                        LOG_ERR("Failed to hand SDU to FRCT."); -                        continue; -                } -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                }          }          return (void *) 0; @@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o)          struct timespec timeout = {0, FD_UPDATE_TIMEOUT};          struct shm_du_buff * sdb;          struct pci * pci; - +        int fd; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1;          while (true) { -                int fd = flow_select(fmgr.nm1_set, &timeout); -                if (fd == -ETIMEDOUT) -                        continue; - -                if (fd < 0) { -                        LOG_ERR("Failed to get active fd."); +                int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); +                if (ret == -ETIMEDOUT)                          continue; -                } -                if (ipcp_flow_read(fd, &sdb)) { -                        LOG_ERR("Failed to read SDU from fd %d.", fd); +                if (ret < 0) { +                        LOG_ERR("Event error: %d.", ret);                          continue;                  } -                pci = shm_pci_des(sdb); -                if (pci == NULL) { -                        LOG_ERR("Failed to get PCI."); -                        ipcp_flow_del(sdb); -                        continue; -                } +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                LOG_ERR("Failed to read SDU from fd %d.", fd); +                                continue; +                        } -                if (pci->dst_addr != ribmgr_address()) { -                        LOG_DBG("PDU needs to be forwarded."); +                        pci = shm_pci_des(sdb); +                        if (pci == NULL) { +                                LOG_ERR("Failed to get PCI."); +                                ipcp_flow_del(sdb); +                                continue; +                        } -                        if (pci->ttl == 0) { -                                LOG_DBG("TTL was zero."); +                        if (pci->dst_addr != ribmgr_address()) { +                                LOG_DBG("PDU needs to be forwarded."); + +                                if (pci->ttl == 0) { +                                        LOG_DBG("TTL was zero."); +                                        ipcp_flow_del(sdb); +                                        free(pci); +                                        continue; +                                } + +                                if (shm_pci_dec_ttl(sdb)) { +                                        LOG_ERR("Failed to decrease TTL."); +                                        ipcp_flow_del(sdb); +                                        free(pci); +                                        continue; +                                } +                                /* +                                 * FIXME: Dropping for now, since +                                 * we don't have a PFF yet +                                 */                                  ipcp_flow_del(sdb);                                  free(pci);                                  continue;                          } -                        if (shm_pci_dec_ttl(sdb)) { -                                LOG_ERR("Failed to decrease TTL."); +                        if (shm_pci_shrink(sdb)) { +                                LOG_ERR("Failed to shrink PDU.");                                  ipcp_flow_del(sdb);                                  free(pci);                                  continue;                          } -                        /* -                         * FIXME: Dropping for now, since -                         * we don't have a PFF yet -                         */ -                        ipcp_flow_del(sdb); -                        free(pci); -                        continue; -                } - -                if (shm_pci_shrink(sdb)) { -                        LOG_ERR("Failed to shrink PDU."); -                        ipcp_flow_del(sdb); -                        free(pci); -                        continue; -                } -                if (frct_nm1_post_sdu(pci, sdb)) { -                        LOG_ERR("Failed to hand PDU to FRCT."); -                        ipcp_flow_del(sdb); -                        free(pci); -                        continue; +                        if (frct_nm1_post_sdu(pci, sdb)) { +                                LOG_ERR("Failed to hand PDU to FRCT."); +                                ipcp_flow_del(sdb); +                                free(pci); +                                continue; +                        }                  }          } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 399d3dc8..db258c8b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -30,6 +30,8 @@  #include <ouroboros/bitmap.h>  #include <ouroboros/dev.h>  #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/fqueue.h>  #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc" @@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t;  #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \                          + SHIM_ETH_LLC_MAX_SDU_SIZE) +#define EVENT_WAIT_TIMEOUT 100 /* us */ +  /* global for trapping signal */  int irmd_api; @@ -110,6 +114,7 @@ struct {          uint8_t *          tx_ring;          int                tx_offset;  #endif +        flow_set_t *       np1_flows;          int *              ef_to_fd;          struct ef *        fd_to_ef;          pthread_rwlock_t   flows_lock; @@ -139,6 +144,14 @@ static int eth_llc_data_init()                  return -ENOMEM;          } +        eth_llc_data.np1_flows = flow_set_create(); +        if (eth_llc_data.np1_flows == NULL) { +                bmp_destroy(eth_llc_data.saps); +                free(eth_llc_data.ef_to_fd); +                free(eth_llc_data.fd_to_ef); +                return -ENOMEM; +        } +          for (i = 0; i < MAX_SAPS; ++i)                  eth_llc_data.ef_to_fd[i] = -1; @@ -156,6 +169,7 @@ static int eth_llc_data_init()  void eth_llc_data_fini()  {          bmp_destroy(eth_llc_data.saps); +        flow_set_destroy(eth_llc_data.np1_flows);          free(eth_llc_data.fd_to_ef);          free(eth_llc_data.ef_to_fd);          pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr)                  return 0;          } -        bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); -          pthread_rwlock_unlock(ð_llc_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        flow_dealloc(fd); - -        LOG_DBG("Flow with fd %d deallocated.", fd); +        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);          return 0;  }  static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)  { -        shim_eth_llc_msg_t * msg = NULL; - -        msg = shim_eth_llc_msg__unpack(NULL, len, buf); +        shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf);          if (msg == NULL) {                  LOG_ERR("Failed to unpack.");                  return -1; @@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o)  static void * eth_llc_ipcp_sdu_writer(void * o)  { +        int fd; +        struct shm_du_buff * sdb; +        uint8_t ssap; +        uint8_t dsap; +        uint8_t r_addr[MAC_SIZE]; +        struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1; +          while (true) { -                int fd; -                struct shm_du_buff * sdb; -                uint8_t ssap; -                uint8_t dsap; -                uint8_t r_addr[MAC_SIZE]; - -                fd = ipcp_read_shim(&sdb); -                if (fd < 0) +                int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); +                if (ret == -ETIMEDOUT)                          continue; -                pthread_rwlock_rdlock(&ipcpi.state_lock); -                pthread_rwlock_rdlock(ð_llc_data.flows_lock); +                if (ret < 0) { +                        LOG_ERR("Event wait returned error code %d.", -ret); +                        continue; +                } -                ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); -                dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); -                memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                LOG_ERR("Bad read from fd %d.", fd); +                                continue; +                        } +                        pthread_rwlock_rdlock(&ipcpi.state_lock); +                        pthread_rwlock_rdlock(ð_llc_data.flows_lock); -                pthread_rwlock_unlock(ð_llc_data.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock); +                        ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); +                        dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); +                        memcpy(r_addr, +                               eth_llc_data.fd_to_ef[fd].r_addr, +                               MAC_SIZE); + +                        pthread_rwlock_unlock(ð_llc_data.flows_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock); -                eth_llc_ipcp_send_frame(r_addr, dsap, ssap, -                                        shm_du_buff_head(sdb), -                                        shm_du_buff_tail(sdb) -                                        - shm_du_buff_head(sdb)); -                ipcp_flow_del(sdb); +                        eth_llc_ipcp_send_frame(r_addr, dsap, ssap, +                                                shm_du_buff_head(sdb), +                                                shm_du_buff_tail(sdb) +                                                - shm_du_buff_head(sdb)); +                        ipcp_flow_del(sdb); +                }          }          return (void *) 1; @@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int           fd,          uint8_t ssap = 0;          uint8_t r_addr[MAC_SIZE]; -        LOG_INFO("Allocating flow to %s.", dst_name); +        LOG_DBG("Allocating flow to %s.", dst_name);          if (dst_name == NULL || src_ae_name == NULL)                  return -1; @@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int           fd,                  return -1;          } +        flow_set_add(eth_llc_data.np1_flows, fd); +          LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap);          return 0; @@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)                  return -1;          } +        flow_set_add(eth_llc_data.np1_flows, fd); +          LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap);          return 0; @@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)  static int eth_llc_ipcp_flow_dealloc(int fd)  { +        struct timespec t = {0, 10000}; +          uint8_t sap;          uint8_t r_sap;          uint8_t addr[MAC_SIZE];          int ret; +        flow_set_del(eth_llc_data.np1_flows, fd); + +        while (flow_dealloc(fd) == -EBUSY) +                nanosleep(&t, NULL); +          pthread_rwlock_rdlock(&ipcpi.state_lock);          pthread_rwlock_wrlock(ð_llc_data.flows_lock); @@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd)          if (ret < 0)                  LOG_DBG("Could not notify remote."); -        flow_dealloc(fd); -          LOG_DBG("Flow with fd %d deallocated.", fd);          return 0; @@ -1008,10 +1042,12 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        if (eth_llc_data_init() < 0) +        if (ap_init(NULL) < 0) { +                close_logfile();                  exit(EXIT_FAILURE); +        } -        if (ap_init(NULL) < 0) { +        if (eth_llc_data_init() < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } @@ -1054,10 +1090,10 @@ int main(int argc, char * argv[])          pthread_join(eth_llc_data.sdu_writer, NULL);          pthread_join(eth_llc_data.sdu_reader, NULL); -        ap_fini(); -          eth_llc_data_fini(); +        ap_fini(); +          close_logfile();          exit(EXIT_SUCCESS); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 7c109a8a..050623e4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -27,6 +27,9 @@  #include <ouroboros/utils.h>  #include <ouroboros/dev.h>  #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/errno.h>  #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -75,6 +78,7 @@ struct {          struct sockaddr_in s_saddr;          int                s_fd; +        flow_set_t *       np1_flows;          fd_set             flow_fd_s;          /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */          int                uf_to_fd[FD_SETSIZE]; @@ -90,7 +94,7 @@ struct {          pthread_mutex_t    fd_set_lock;  } udp_data; -static void udp_data_init() +static int udp_data_init()  {          int i; @@ -104,13 +108,21 @@ static void udp_data_init()          FD_ZERO(&udp_data.flow_fd_s); +        udp_data.np1_flows = flow_set_create(); +        if (udp_data.np1_flows == NULL) +                return -ENOMEM; +          pthread_rwlock_init(&udp_data.flows_lock, NULL);          pthread_cond_init(&udp_data.fd_set_cond, NULL);          pthread_mutex_init(&udp_data.fd_set_lock, NULL); + +        return 0;  }  static void udp_data_fini()  { +        flow_set_destroy(udp_data.np1_flows); +          pthread_rwlock_destroy(&udp_data.flows_lock);          pthread_mutex_destroy(&udp_data.fd_set_lock);          pthread_cond_destroy(&udp_data.fd_set_cond); @@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port)          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        flow_dealloc(fd); +        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);          close(skfd); @@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader()  static void * ipcp_udp_sdu_loop(void * o)  { +        int fd; +        struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; +        struct shm_du_buff * sdb; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1;          while (true) { -                int fd; -                struct shm_du_buff * sdb; +                int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); +                if (ret == -ETIMEDOUT) +                        continue; -                fd = ipcp_read_shim(&sdb); -                if (fd < 0) +                if (ret < 0) { +                        LOG_ERR("Event wait returned error code %d.", -ret);                          continue; +                } -                pthread_rwlock_rdlock(&ipcpi.state_lock); -                pthread_rwlock_rdlock(&udp_data.flows_lock); +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                LOG_ERR("Bad read from fd %d.", fd); +                                continue; +                        } -                fd = udp_data.fd_to_uf[fd].skfd; +                        pthread_rwlock_rdlock(&ipcpi.state_lock); +                        pthread_rwlock_rdlock(&udp_data.flows_lock); -                pthread_rwlock_unlock(&udp_data.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock); +                        fd = udp_data.fd_to_uf[fd].skfd; + +                        pthread_rwlock_unlock(&udp_data.flows_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock); -                if (send(fd, -                         shm_du_buff_head(sdb), -                         shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), -                         0) < 0) -                        LOG_ERR("Failed to send SDU."); +                        if (send(fd, +                                 shm_du_buff_head(sdb), +                                 shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), +                                 0) < 0) +                                LOG_ERR("Failed to send SDU."); -                ipcp_flow_del(sdb); +                        ipcp_flow_del(sdb); +                }          }          return (void *) 1; @@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int           fd,          udp_data.fd_to_uf[fd].skfd = skfd;          udp_data.uf_to_fd[skfd]    = fd; +        flow_set_add(udp_data.np1_flows, fd); +          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)          set_fd(skfd); +        flow_set_add(udp_data.np1_flows, fd); +          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd)  {          int skfd = -1;          int remote_udp = -1; +        struct timespec t = {0, 10000};          struct sockaddr_in    r_saddr;          socklen_t             r_saddr_len = sizeof(r_saddr); +        flow_set_del(udp_data.np1_flows, fd); + +        while (flow_dealloc(fd) == -EBUSY) +                nanosleep(&t, NULL); +          pthread_rwlock_rdlock(&ipcpi.state_lock);          pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd)          close(skfd); -        flow_dealloc(fd); -          LOG_DBG("Flow with fd %d deallocated.", fd);          return 0; @@ -1149,13 +1184,16 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        udp_data_init(); -          if (ap_init(NULL) < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } +        if (udp_data_init() < 0) { +                close_logfile(); +                exit(EXIT_FAILURE); +        } +          /* store the process id of the irmd */          irmd_api = atoi(argv[1]); @@ -1196,10 +1234,10 @@ int main(int argc, char * argv[])          pthread_join(udp_data.handler, NULL);          pthread_join(udp_data.sdu_reader, NULL); -        ap_fini(); -          udp_data_fini(); +        ap_fini(); +          close_logfile();          exit(EXIT_SUCCESS); diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index f79e6caf..33f7650a 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -111,6 +111,7 @@ pid_t ipcp_create(enum ipcp_type ipcp_type)          char * full_name = NULL;          char * exec_name = NULL;          char * log_file = NULL; +        char * argv[4];          sprintf(irmd_api, "%u", getpid()); @@ -161,14 +162,12 @@ pid_t ipcp_create(enum ipcp_type ipcp_type)          }          /* log_file to be placed at the end */ -        char * argv[] = {full_name, -                         irmd_api, -                         log_file, -                         0}; +        argv[0] = full_name; +        argv[1] = irmd_api; +        argv[2] = log_file; +        argv[3] = NULL; -        char * envp[] = {0}; - -        execve(argv[0], &argv[0], envp); +        execv(argv[0], &argv[0]);          LOG_DBG("%s", strerror(errno));          LOG_ERR("Failed to load IPCP daemon"); diff --git a/src/irmd/main.c b/src/irmd/main.c index 157fd8eb..67941e41 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -28,7 +28,7 @@  #include <ouroboros/utils.h>  #include <ouroboros/irm_config.h>  #include <ouroboros/lockfile.h> -#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_rbuff.h>  #include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/qos.h> @@ -1692,26 +1692,26 @@ void * irm_sanitize()                          }                          if (kill(f->n_api, 0) < 0) { -                                struct shm_ap_rbuff * rb = -                                        shm_ap_rbuff_open(f->n_api); +                                struct shm_rbuff * rb = +                                        shm_rbuff_open(f->n_api, f->port_id);                                  bmp_release(irmd->port_ids, f->port_id);                                  list_del(&f->next);                                  LOG_INFO("AP-I %d gone, flow %d deallocated.",                                           f->n_api, f->port_id);                                  ipcp_flow_dealloc(f->n_1_api, f->port_id);                                  if (rb != NULL) -                                        shm_ap_rbuff_destroy(rb); +                                        shm_rbuff_destroy(rb);                                  irm_flow_destroy(f);                                  continue;                          }                          if (kill(f->n_1_api, 0) < 0) { -                                struct shm_ap_rbuff * rb = -                                        shm_ap_rbuff_open(f->n_1_api); +                                struct shm_rbuff * rb = +                                        shm_rbuff_open(f->n_1_api, f->port_id);                                  list_del(&f->next);                                  LOG_ERR("IPCP %d gone, flow %d removed.",                                          f->n_1_api, f->port_id);                                  if (rb != NULL) -                                        shm_ap_rbuff_destroy(rb); +                                        shm_rbuff_destroy(rb);                                  irm_flow_destroy(f);                          }                  } diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index b94d0eea..20ea473d 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -35,7 +35,8 @@ set(SOURCE_FILES    lockfile.c    logs.c    nsm.c -  shm_ap_rbuff.c +  shm_flow_set.c +  shm_rbuff.c    shm_rdrbuff.c    sockets.c    time_utils.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 77c2d06a..f735e72b 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -3,7 +3,8 @@   *   * API for applications   * - *    Sander Vrijders <sander.vrijders@intec.ugent.be> + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be>   *   * This program is free software; you can redistribute it and/or modify   * it under the terms of the GNU General Public License as published by @@ -26,20 +27,24 @@  #include <ouroboros/sockets.h>  #include <ouroboros/fcntl.h>  #include <ouroboros/bitmap.h> +#include <ouroboros/shm_flow_set.h>  #include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_rbuff.h>  #include <ouroboros/utils.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h>  #include <stdlib.h>  #include <string.h>  #include <stdio.h>  struct flow_set { -        bool dirty; -        bool b[IRMD_MAX_FLOWS]; /* working copy */ -        bool s[IRMD_MAX_FLOWS]; /* safe copy */ -        pthread_rwlock_t lock; +        size_t idx; +}; + +struct fqueue { +        int    fqueue[SHM_BUFFER_SIZE]; /* safe copy from shm */ +        size_t fqsize; +        size_t next;  };  enum port_state { @@ -124,7 +129,9 @@ enum port_state port_wait_assign(struct port * p)  }  struct flow { -        struct shm_ap_rbuff * rb; +        struct shm_rbuff *    rx_rb; +        struct shm_rbuff *    tx_rb; +        struct shm_flow_set * set;          int                   port_id;          int                   oflags; @@ -139,10 +146,11 @@ struct {          pid_t                 api;          struct shm_rdrbuff *  rdrb; -        struct shm_ap_rbuff * rb; +        struct shm_flow_set * fqset;          pthread_rwlock_t      data_lock;          struct bmp *          fds; +        struct bmp *          fqueues;          struct flow *         flows;          struct port *         ports; @@ -194,40 +202,52 @@ int ap_init(char * ap_name)          if (ai.fds == NULL)                  return -ENOMEM; -        ai.rdrb = shm_rdrbuff_open(); -        if (ai.rdrb == NULL) { +        ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0); +        if (ai.fqueues == NULL) { +                bmp_destroy(ai.fds); +                return -ENOMEM; +        } + +        ai.fqset = shm_flow_set_create(); +        if (ai.fqset == NULL) { +                bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds);                  return -1;          } -        ai.rb = shm_ap_rbuff_create(); -        if (ai.rb == NULL) { -                shm_rdrbuff_close(ai.rdrb); +        ai.rdrb = shm_rdrbuff_open(); +        if (ai.rdrb == NULL) { +                shm_flow_set_destroy(ai.fqset); +                bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds);                  return -1;          }          ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);          if (ai.flows == NULL) { -                shm_ap_rbuff_destroy(ai.rb);                  shm_rdrbuff_close(ai.rdrb); +                shm_flow_set_destroy(ai.fqset); +                bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds);                  return -1;          }          for (i = 0; i < AP_MAX_FLOWS; ++i) { -                ai.flows[i].rb = NULL; +                ai.flows[i].rx_rb   = NULL; +                ai.flows[i].tx_rb   = NULL; +                ai.flows[i].set     = NULL;                  ai.flows[i].port_id = -1; -                ai.flows[i].oflags = 0; -                ai.flows[i].api = -1; +                ai.flows[i].oflags  = 0; +                ai.flows[i].api     = -1;                  ai.flows[i].timeout = NULL;          }          ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); -        if (ai.flows == NULL) { +        if (ai.ports == NULL) {                  free(ai.flows); -                shm_ap_rbuff_destroy(ai.rb);                  shm_rdrbuff_close(ai.rdrb); +                shm_flow_set_destroy(ai.fqset); +                bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds);                  return -1;          } @@ -253,16 +273,10 @@ void ap_fini()          pthread_rwlock_wrlock(&ai.data_lock); -        /* remove all remaining sdus */ -        while ((i = shm_ap_rbuff_pop_idx(ai.rb)) >= 0) -                shm_rdrbuff_remove(ai.rdrb, i); - -        if (ai.fds != NULL) -                bmp_destroy(ai.fds); -        if (ai.rb != NULL) -                shm_ap_rbuff_destroy(ai.rb); -        if (ai.rdrb != NULL) -                shm_rdrbuff_close(ai.rdrb); +        bmp_destroy(ai.fds); +        bmp_destroy(ai.fqueues); +        shm_flow_set_destroy(ai.fqset); +        shm_rdrbuff_close(ai.rdrb);          if (ai.daf_name != NULL)                  free(ai.daf_name); @@ -270,8 +284,15 @@ void ap_fini()          pthread_rwlock_rdlock(&ai.flows_lock);          for (i = 0; i < AP_MAX_FLOWS; ++i) { -                if (ai.flows[i].rb != NULL) -                        shm_ap_rbuff_close(ai.flows[i].rb); +                if (ai.flows[i].tx_rb != NULL) { +                        int idx; +                        while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) +                                shm_rdrbuff_remove(ai.rdrb, idx); +                        shm_rbuff_destroy(ai.flows[i].rx_rb); +                        shm_rbuff_close(ai.flows[i].tx_rb); +                        shm_flow_set_close(ai.flows[i].set); +                } +                  if (ai.flows[i].timeout != NULL)                          free(ai.flows[i].timeout);          } @@ -328,8 +349,8 @@ int flow_accept(char ** ae_name)                  return -1;          } -        ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); -        if (ai.flows[fd].rb == NULL) { +        ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id); +        if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -337,10 +358,24 @@ int flow_accept(char ** ae_name)                  return -1;          } +        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); +        if (ai.flows[fd].set == NULL) { +                bmp_release(ai.fds, fd); +                shm_rbuff_destroy(ai.flows[fd].rx_rb); +                shm_rbuff_close(ai.flows[fd].tx_rb); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +          if (ae_name != NULL) {                  *ae_name = strdup(recv_msg->ae_name);                  if (*ae_name == NULL) { -                        shm_ap_rbuff_close(ai.flows[fd].rb); +                        shm_rbuff_destroy(ai.flows[fd].tx_rb); +                        shm_rbuff_close(ai.flows[fd].tx_rb); +                        shm_flow_set_close(ai.flows[fd].set);                          bmp_release(ai.fds, fd);                          pthread_rwlock_unlock(&ai.flows_lock);                          pthread_rwlock_unlock(&ai.data_lock); @@ -356,8 +391,6 @@ int flow_accept(char ** ae_name)          ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; -        shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); -          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -410,6 +443,17 @@ int flow_alloc_resp(int fd, int response)          ret = recv_msg->result; +        pthread_rwlock_wrlock(&ai.flows_lock); + +        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, +                                            ai.flows[fd].port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -461,8 +505,11 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)                  return -1;          } -        ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); -        if (ai.flows[fd].rb == NULL) { +        ai.flows[fd].port_id = recv_msg->port_id; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT; +        ai.flows[fd].api     = recv_msg->api; +        ai.flows[fd].rx_rb   = shm_rbuff_create(recv_msg->port_id); +        if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -470,9 +517,26 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)                  return -1;          } -        ai.flows[fd].port_id = recv_msg->port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = recv_msg->api; +        ai.flows[fd].tx_rb   = shm_rbuff_open(recv_msg->api, recv_msg->port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                shm_rbuff_destroy(ai.flows[fd].rx_rb); +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); +        if (ai.flows[fd].set == NULL) { +                shm_rbuff_close(ai.flows[fd].tx_rb); +                shm_rbuff_destroy(ai.flows[fd].rx_rb); +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        }          ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -480,8 +544,6 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); -        shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); -          irm_msg__free_unpacked(recv_msg, NULL);          return fd; @@ -548,7 +610,7 @@ int flow_dealloc(int fd)                  return -ENOTALLOC;          } -        if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) { +        if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return -EBUSY; @@ -559,8 +621,10 @@ int flow_dealloc(int fd)          port_destroy(&ai.ports[msg.port_id]);          ai.flows[fd].port_id = -1; -        shm_ap_rbuff_close(ai.flows[fd].rb); -        ai.flows[fd].rb = NULL; +        shm_rbuff_destroy(ai.flows[fd].rx_rb); +        ai.flows[fd].rx_rb = NULL; +        shm_rbuff_close(ai.flows[fd].tx_rb); +        ai.flows[fd].tx_rb = NULL;          ai.flows[fd].oflags = 0;          ai.flows[fd].api = -1;          if (ai.flows[fd].timeout != NULL) { @@ -604,9 +668,9 @@ int flow_cntl(int fd, int cmd, int oflags)          case FLOW_F_SETFL: /* SET FLOW FLAGS */                  ai.flows[fd].oflags = oflags;                  if (oflags & FLOW_O_WRONLY) -                        shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id); +                        shm_rbuff_block(ai.flows[fd].rx_rb);                  if (oflags & FLOW_O_RDWR) -                        shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id); +                        shm_rbuff_unblock(ai.flows[fd].rx_rb);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return old; @@ -620,7 +684,6 @@ int flow_cntl(int fd, int cmd, int oflags)  ssize_t flow_write(int fd, void * buf, size_t count)  {          ssize_t idx; -        struct rb_entry e;          if (buf == NULL)                  return 0; @@ -653,13 +716,10 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  if (idx < 0) {                          pthread_rwlock_unlock(&ai.flows_lock);                          pthread_rwlock_unlock(&ai.data_lock); -                        return -idx; +                        return idx;                  } -                e.index   = idx; -                e.port_id = ai.flows[fd].port_id; - -                if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { +                if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) {                          shm_rdrbuff_remove(ai.rdrb, idx);                          pthread_rwlock_unlock(&ai.flows_lock);                          pthread_rwlock_unlock(&ai.data_lock); @@ -667,7 +727,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  }          } else { /* blocking */                  struct shm_rdrbuff * rdrb = ai.rdrb; -                pid_t                api = ai.flows[fd].api; +                pid_t                api  = ai.flows[fd].api;                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -681,17 +741,16 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  pthread_rwlock_rdlock(&ai.data_lock);                  pthread_rwlock_rdlock(&ai.flows_lock); -                e.index   = idx; -                e.port_id = ai.flows[fd].port_id; - -                if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { -                        shm_rdrbuff_remove(ai.rdrb, e.index); +                if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { +                        shm_rdrbuff_remove(ai.rdrb, idx);                          pthread_rwlock_unlock(&ai.flows_lock);                          pthread_rwlock_unlock(&ai.data_lock);                          return -ENOTALLOC;                  }          } +        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); +          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -717,15 +776,14 @@ ssize_t flow_read(int fd, void * buf, size_t count)          }          if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id); +                idx = shm_rbuff_read(ai.flows[fd].rx_rb);                  pthread_rwlock_unlock(&ai.flows_lock);          } else { -                struct shm_ap_rbuff * rb      = ai.rb; -                int                   port_id = ai.flows[fd].port_id; -                struct timespec *     timeout = ai.flows[fd].timeout; +                struct shm_rbuff * rb     = ai.flows[fd].rx_rb; +                struct timespec * timeout = ai.flows[fd].timeout;                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); -                idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); +                idx = shm_rbuff_read_b(rb, timeout);                  pthread_rwlock_rdlock(&ai.data_lock);          } @@ -757,79 +815,163 @@ struct flow_set * flow_set_create()          if (set == NULL)                  return NULL; -        if (pthread_rwlock_init(&set->lock, NULL)) { +        assert(ai.fqueues); + +        set->idx = bmp_allocate(ai.fqueues); +        if (!bmp_is_id_valid(ai.fqueues, set->idx)) {                  free(set);                  return NULL;          } -        memset(set->b, 0, IRMD_MAX_FLOWS); -        memset(set->s, 0, IRMD_MAX_FLOWS); +        return set; +} -        set->dirty = true; +void flow_set_destroy(struct flow_set * set) +{ +        if (set == NULL) +                return; -        return set; +        flow_set_zero(set); +        bmp_release(ai.fqueues, set->idx); +        free(set);  } -void flow_set_zero(struct flow_set * set) +struct fqueue * fqueue_create()  { -        pthread_rwlock_wrlock(&set->lock); -        memset(set->b, 0, IRMD_MAX_FLOWS); -        set->dirty = true; -        pthread_rwlock_unlock(&set->lock); +        struct fqueue * fq = malloc(sizeof(*fq)); +        if (fq == NULL) +                return NULL; + +        memset(fq->fqueue, -1, SHM_BUFFER_SIZE); +        fq->fqsize = 0; +        fq->next   = 0; + +        return fq;  } -void flow_set_add(struct flow_set * set, int fd) +void fqueue_destroy(struct fqueue * fq)  { -        pthread_rwlock_wrlock(&set->lock); -        set->b[ai.flows[fd].port_id] = true; -        set->dirty = true; -        pthread_rwlock_unlock(&set->lock); +        if (fq == NULL) +                return +        free(fq);  } -void flow_set_del(struct flow_set * set, int fd) +void flow_set_zero(struct flow_set * set)  { -        pthread_rwlock_wrlock(&set->lock); -        set->b[ai.flows[fd].port_id] = false; -        set->dirty = true; -        pthread_rwlock_unlock(&set->lock); +        if (set == NULL) +                return; + +        pthread_rwlock_rdlock(&ai.data_lock); + +        shm_flow_set_zero(ai.fqset, set->idx); + +        pthread_rwlock_unlock(&ai.data_lock);  } -bool flow_set_has(struct flow_set * set, int fd) +int flow_set_add(struct flow_set * set, int fd)  { -        bool ret; -        pthread_rwlock_rdlock(&set->lock); -        ret = set->b[ai.flows[fd].port_id]; -        pthread_rwlock_unlock(&set->lock); +        int ret; + +        if (set == NULL) +                return -EINVAL; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); +          return ret;  } -void flow_set_destroy(struct flow_set * set) +void flow_set_del(struct flow_set * set, int fd)  { -        pthread_rwlock_destroy(&set->lock); -        free(set); +        if (set == NULL) +                return; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id >= 0) +                shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock);  } -static void flow_set_cpy(struct flow_set * set) +bool flow_set_has(struct flow_set * set, int fd)  { -        pthread_rwlock_rdlock(&set->lock); -        if (set->dirty) -                memcpy(set->s, set->b, IRMD_MAX_FLOWS); -        set->dirty = false; -        pthread_rwlock_unlock(&set->lock); +        bool ret = false; + +        if (set == NULL || fd < 0) +                return false; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return false; +        } + +        ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return ret;  } -int flow_select(struct flow_set * set, const struct timespec * timeout) +int fqueue_next(struct fqueue * fq)  { -        int port_id; -        if (set == NULL) { -                port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout); -        } else { -                flow_set_cpy(set); -                port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout); +        int fd; + +        if (fq == NULL) +                return -EINVAL; + +        if (fq->next == fq->fqsize) { +                fq->fqsize = 0; +                fq->next = 0; +                return -EPERM;          } -        if (port_id < 0) -                return port_id; -        return ai.ports[port_id].fd; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        fd = ai.ports[fq->fqueue[fq->next++]].fd; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return fd; +} + +int flow_event_wait(struct flow_set *       set, +                    struct fqueue *         fq, +                    const struct timespec * timeout) +{ +        int ret; + +        if (set == NULL) +                return -EINVAL; + +        if (fq->fqsize > 0) +                return 0; + +        ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); +        if (ret == -ETIMEDOUT) +                return -ETIMEDOUT; + +        if (ret < 0) +                return ret; + +        fq->fqsize = ret; +        fq->next   = 0; + +        return 0;  }  /* ipcp-dev functions */ @@ -848,8 +990,8 @@ int np1_flow_alloc(pid_t n_api, int port_id)                  return -1;          } -        ai.flows[fd].rb = shm_ap_rbuff_open(n_api); -        if (ai.flows[fd].rb == NULL) { +        ai.flows[fd].rx_rb = shm_rbuff_create(port_id); +        if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -863,8 +1005,6 @@ int np1_flow_alloc(pid_t n_api, int port_id)          ai.ports[port_id].fd = fd;          port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); -        shm_ap_rbuff_open_port(ai.rb, port_id); -          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -890,7 +1030,6 @@ int np1_flow_dealloc(int port_id)  int np1_flow_resp(pid_t n_api, int port_id)  {          int fd; -        struct shm_ap_rbuff * rb;          port_wait_assign(&ai.ports[port_id]); @@ -904,18 +1043,26 @@ int np1_flow_resp(pid_t n_api, int port_id)                  return fd;          } -        rb = shm_ap_rbuff_open(n_api); -        if (rb == NULL) { +        ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); +        if (ai.flows[fd].tx_rb == NULL) {                  ai.flows[fd].port_id = -1; +                shm_rbuff_destroy(ai.flows[fd].rx_rb);                  port_destroy(&ai.ports[port_id]);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return -1;          } -        ai.flows[fd].rb = rb; - -        shm_ap_rbuff_open_port(ai.rb, port_id); +        ai.flows[fd].set = shm_flow_set_open(n_api); +        if (ai.flows[fd].set == NULL) { +                shm_rbuff_close(ai.flows[fd].tx_rb); +                ai.flows[fd].port_id = -1; +                shm_rbuff_destroy(ai.flows[fd].rx_rb); +                port_destroy(&ai.ports[port_id]); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        }          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -929,9 +1076,9 @@ int ipcp_create_r(pid_t api)          irm_msg_t * recv_msg = NULL;          int ret = -1; -        msg.code         = IRM_MSG_CODE__IPCP_CREATE_R; -        msg.has_api      = true; -        msg.api          = api; +        msg.code    = IRM_MSG_CODE__IPCP_CREATE_R; +        msg.has_api = true; +        msg.api     = api;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) @@ -958,11 +1105,11 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)          if (dst_name == NULL || src_ae_name == NULL)                  return -EINVAL; -        msg.code          = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; -        msg.has_api       = true; -        msg.api           = api; -        msg.dst_name      = dst_name; -        msg.ae_name       = src_ae_name; +        msg.code     = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; +        msg.has_api  = true; +        msg.api      = api; +        msg.dst_name = dst_name; +        msg.ae_name  = src_ae_name;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_wrlock(&ai.flows_lock); @@ -974,7 +1121,7 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)                  return -1; /* -ENOMOREFDS */          } -        ai.flows[fd].rb    = NULL; +        ai.flows[fd].tx_rb    = NULL;          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -996,8 +1143,16 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_wrlock(&ai.flows_lock); +        ai.flows[fd].rx_rb = shm_rbuff_create(port_id); +        if (ai.flows[fd].rx_rb == NULL) { +                ai.flows[fd].port_id = -1; +                port_destroy(&ai.ports[port_id]); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } +          ai.flows[fd].port_id = port_id; -        ai.flows[fd].rb      = NULL;          ai.ports[port_id].fd = fd;          port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); @@ -1019,16 +1174,13 @@ int ipcp_flow_alloc_reply(int fd, int response)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        msg.port_id      = ai.flows[fd].port_id; +        msg.port_id = ai.flows[fd].port_id;          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock);          msg.has_response = true;          msg.response     = response; -        if (response) -                shm_ap_rbuff_open_port(ai.rb, msg.port_id); -          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; @@ -1039,6 +1191,26 @@ int ipcp_flow_alloc_reply(int fd, int response)          }          ret = recv_msg->result; + +        pthread_rwlock_wrlock(&ai.flows_lock); + +        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, +                                            ai.flows[fd].port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); +        if (ai.flows[fd].set == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        pthread_rwlock_unlock(&ai.flows_lock); +          irm_msg__free_unpacked(recv_msg, NULL);          return ret; @@ -1061,7 +1233,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb)          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); -        idx = shm_ap_rbuff_read_port(ai.rb, port_id); +        idx = shm_rbuff_read(ai.flows[fd].rx_rb);          if (idx < 0) {                  pthread_rwlock_rdlock(&ai.data_lock);                  pthread_rwlock_rdlock(&ai.flows_lock); @@ -1081,7 +1253,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb)  int ipcp_flow_write(int fd, struct shm_du_buff * sdb)  { -        struct rb_entry e; +        ssize_t idx;          if (sdb == NULL)                  return -EINVAL; @@ -1095,16 +1267,16 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)                  return -EPERM;          } -        if (ai.flows[fd].rb == NULL) { +        if (ai.flows[fd].tx_rb == NULL) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return -EPERM;          } -        e.index = shm_du_buff_get_idx(sdb); -        e.port_id = ai.flows[fd].port_id; +        idx = shm_du_buff_get_idx(sdb); -        shm_ap_rbuff_write(ai.flows[fd].rb, &e); +        shm_rbuff_write(ai.flows[fd].tx_rb, idx); +        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -1112,46 +1284,28 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)          return 0;  } -struct rb_entry * local_flow_read(int fd) +ssize_t local_flow_read(int fd)  { -        int port_id; -        struct rb_entry * e = NULL; - -        pthread_rwlock_rdlock(&ai.data_lock); -        pthread_rwlock_rdlock(&ai.flows_lock); - -        port_id = ai.flows[fd].port_id; - -        pthread_rwlock_unlock(&ai.flows_lock); -        pthread_rwlock_unlock(&ai.data_lock); - -        if (port_id != -1) { -                e = malloc(sizeof(*e)); -                if (e == NULL) -                        return NULL; -                e->index = shm_ap_rbuff_read_port(ai.rb, port_id); -        } - -        return e; +        return shm_rbuff_read(ai.flows[fd].rx_rb);  } -int local_flow_write(int fd, struct rb_entry * e) +int local_flow_write(int fd, ssize_t idx)  { -        if (e == NULL || fd < 0) +        if (fd < 0)                  return -EINVAL;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai.flows[fd].rb == NULL) { +        if (ai.flows[fd].tx_rb == NULL) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return -EPERM;          } -        e->port_id = ai.flows[fd].port_id; +        shm_rbuff_write(ai.flows[fd].tx_rb, idx); -        shm_ap_rbuff_write(ai.flows[fd].rb, e); +        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -1159,22 +1313,26 @@ int local_flow_write(int fd, struct rb_entry * e)          return 0;  } -int ipcp_read_shim(struct shm_du_buff ** sdb) +int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)  { -        int fd; -        struct rb_entry * e = shm_ap_rbuff_read(ai.rb); +        ssize_t idx;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        fd = ai.ports[e->port_id].fd; +        if (ai.flows[fd].rx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -EPERM; +        } -        *sdb = shm_rdrbuff_get(ai.rdrb, e->index); +        idx = shm_rbuff_read(ai.flows[fd].rx_rb); +        *sdb = shm_rdrbuff_get(ai.rdrb, idx);          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); -        return fd; +        return 0;  }  void ipcp_flow_del(struct shm_du_buff * sdb) diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c index 04ce9324..a0222f18 100644 --- a/src/lib/lockfile.c +++ b/src/lib/lockfile.c @@ -39,10 +39,10 @@  struct lockfile {          pid_t * api; -        int fd;  };  struct lockfile * lockfile_create() { +        int fd;          mode_t mask;          struct lockfile * lf = malloc(sizeof(*lf));          if (lf == NULL) @@ -50,8 +50,8 @@ struct lockfile * lockfile_create() {          mask = umask(0); -        lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); -        if (lf->fd == -1) { +        fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (fd == -1) {                  LOG_DBGF("Could not create lock file.");                  free(lf);                  return NULL; @@ -59,30 +59,24 @@ struct lockfile * lockfile_create() {          umask(mask); -        if (ftruncate(lf->fd, LF_SIZE - 1) < 0) { +        if (ftruncate(fd, LF_SIZE - 1) < 0) {                  LOG_DBGF("Failed to extend lockfile.");                  free(lf);                  return NULL;          } -#ifndef __APPLE__ -        if (write(lf->fd, "", 1) != 1) { -                LOG_DBGF("Failed to finalise lockfile."); -                free(lf); -                return NULL; -        } -#endif +          lf->api = mmap(NULL,                         LF_SIZE, PROT_READ | PROT_WRITE,                         MAP_SHARED, -                       lf->fd, +                       fd,                         0); +        close (fd); +          if (lf->api == MAP_FAILED) {                  LOG_DBGF("Failed to map lockfile."); -                  if (shm_unlink(LOCKFILE_NAME) == -1)                          LOG_DBGF("Failed to remove invalid lockfile."); -                  free(lf);                  return NULL;          } @@ -93,12 +87,13 @@ struct lockfile * lockfile_create() {  }  struct lockfile * lockfile_open() { +        int fd;          struct lockfile * lf = malloc(sizeof(*lf));          if (lf == NULL)                  return NULL; -        lf->fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); -        if (lf->fd < 0) { +        fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); +        if (fd < 0) {                  LOG_DBGF("Could not open lock file.");                  free(lf);                  return NULL; @@ -107,15 +102,15 @@ struct lockfile * lockfile_open() {          lf->api = mmap(NULL,                         LF_SIZE, PROT_READ | PROT_WRITE,                         MAP_SHARED, -                       lf->fd, +                       fd,                         0); +        close(fd); +          if (lf->api == MAP_FAILED) {                  LOG_DBGF("Failed to map lockfile."); -                  if (shm_unlink(LOCKFILE_NAME) == -1)                          LOG_DBGF("Failed to remove invalid lockfile."); -                  free(lf);                  return NULL;          } @@ -130,9 +125,6 @@ void lockfile_close(struct lockfile * lf)                  return;          } -        if (close(lf->fd) < 0) -                LOG_DBGF("Couldn't close lockfile."); -          if (munmap(lf->api, LF_SIZE) == -1)                  LOG_DBGF("Couldn't unmap lockfile."); @@ -151,9 +143,6 @@ void lockfile_destroy(struct lockfile * lf)                  return;          } -        if (close(lf->fd) < 0) -                LOG_DBGF("Couldn't close lockfile."); -          if (munmap(lf->api, LF_SIZE) == -1)                  LOG_DBGF("Couldn't unmap lockfile."); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c deleted file mode 100644 index 5cbf5bd0..00000000 --- a/src/lib/shm_ap_rbuff.c +++ /dev/null @@ -1,661 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Ring buffer for application processes - * - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <ouroboros/config.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/lockfile.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> - -#define OUROBOROS_PREFIX "shm_ap_rbuff" - -#include <ouroboros/logs.h> - -#include <pthread.h> -#include <sys/mman.h> -#include <fcntl.h> -#include <stdlib.h> -#include <string.h> -#include <stdint.h> -#include <unistd.h> -#include <signal.h> -#include <sys/stat.h> -#include <assert.h> - -#define FN_MAX_CHARS 255 - -#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry)         \ -                             + IRMD_MAX_FLOWS * sizeof(int8_t)                 \ -                             + IRMD_MAX_FLOWS * sizeof (ssize_t)               \ -                             + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \ -                             + 2 * sizeof (pthread_cond_t)) - -#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail)   \ -                          & (SHM_BUFFER_SIZE - 1)) -#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) -#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) - -struct shm_ap_rbuff { -        struct rb_entry * shm_base; /* start of entry                */ -        size_t *          head;     /* start of ringbuffer head      */ -        size_t *          tail;     /* start of ringbuffer tail      */ -        int8_t *          acl;      /* start of port_id access table */ -        ssize_t *         cntrs;    /* start of port_id counters     */ -        pthread_mutex_t * lock;     /* lock all free space in shm    */ -        pthread_cond_t *  add;      /* SDU arrived                   */ -        pthread_cond_t *  del;      /* SDU removed                   */ -        pid_t             api;      /* api to which this rb belongs  */ -        int               fd; -}; - -struct shm_ap_rbuff * shm_ap_rbuff_create() -{ -        struct shm_ap_rbuff * rb; -        int                   shm_fd; -        struct rb_entry *     shm_base; -        pthread_mutexattr_t   mattr; -        pthread_condattr_t    cattr; -        char                  fn[FN_MAX_CHARS]; -        mode_t                mask; -        int                   i; - -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); - -        rb = malloc(sizeof(*rb)); -        if (rb == NULL) { -                LOG_DBG("Could not allocate struct."); -                return NULL; -        } - -        mask = umask(0); - -        shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); -        if (shm_fd == -1) { -                LOG_DBG("Failed creating ring buffer."); -                free(rb); -                return NULL; -        } - -        umask(mask); - -        if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { -                LOG_DBG("Failed to extend ringbuffer."); -                free(rb); -                return NULL; -        } -#ifndef __APPLE__ -        if (write(shm_fd, "", 1) != 1) { -                LOG_DBG("Failed to finalise extension of ringbuffer."); -                free(rb); -                return NULL; -        } -#endif -        shm_base = mmap(NULL, -                        SHM_RBUFF_FILE_SIZE, -                        PROT_READ | PROT_WRITE, -                        MAP_SHARED, -                        shm_fd, -                        0); - -        if (shm_base == MAP_FAILED) { -                LOG_DBG("Failed to map shared memory."); -                if (close(shm_fd) == -1) -                        LOG_DBG("Failed to close invalid shm."); - -                if (shm_unlink(fn) == -1) -                        LOG_DBG("Failed to remove invalid shm."); - -                free(rb); -                return NULL; -        } - -        rb->shm_base = shm_base; -        rb->head     = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); -        rb->tail     = rb->head + 1; -        rb->acl      = (int8_t *) (rb->tail + 1); -        rb->cntrs    = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); -        rb->lock     = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); -        rb->add      = (pthread_cond_t *) (rb->lock + 1); -        rb->del      = rb->add + 1; - -        pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ -        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); -#endif -        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); -        pthread_mutex_init(rb->lock, &mattr); - -        pthread_condattr_init(&cattr); -        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ -        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif -        for (i = 0; i < IRMD_MAX_FLOWS; ++i) { -                rb->cntrs[i] = 0; -                rb->acl[i] = -1; -        } - -        pthread_cond_init(rb->add, &cattr); -        pthread_cond_init(rb->del, &cattr); - -        *rb->head = 0; -        *rb->tail = 0; - -        rb->fd  = shm_fd; -        rb->api = getpid(); - -        return rb; -} - -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) -{ -        struct shm_ap_rbuff * rb; -        int                   shm_fd; -        struct rb_entry *     shm_base; -        char                  fn[FN_MAX_CHARS]; - -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); - -        rb = malloc(sizeof(*rb)); -        if (rb == NULL) { -                LOG_DBG("Could not allocate struct."); -                return NULL; -        } - -        shm_fd = shm_open(fn, O_RDWR, 0666); -        if (shm_fd == -1) { -                LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); -                free(rb); -                return NULL; -        } - -        shm_base = mmap(NULL, -                        SHM_RBUFF_FILE_SIZE, -                        PROT_READ | PROT_WRITE, -                        MAP_SHARED, -                        shm_fd, -                        0); - -        if (shm_base == MAP_FAILED) { -                LOG_DBG("Failed to map shared memory."); -                if (close(shm_fd) == -1) -                        LOG_DBG("Failed to close invalid shm."); - -                if (shm_unlink(fn) == -1) -                        LOG_DBG("Failed to remove invalid shm."); - -                free(rb); -                return NULL; -        } - -        rb->shm_base = shm_base; -        rb->head     = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); -        rb->tail     = rb->head + 1; -        rb->acl      = (int8_t *) (rb->tail + 1); -        rb->cntrs    = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); -        rb->lock     = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); -        rb->add      = (pthread_cond_t *) (rb->lock + 1); -        rb->del      = rb->add + 1; - -        rb->fd = shm_fd; -        rb->api = api; - -        return rb; -} - -void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) -{ -        assert(rb); - -        if (close(rb->fd) < 0) -                LOG_DBG("Couldn't close shared memory."); - -        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) -                LOG_DBG("Couldn't unmap shared memory."); - -        free(rb); -} - -void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id) -{ -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        rb->acl[port_id] = 0; /* open */ - -        pthread_mutex_unlock(rb->lock); -} - -int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) -{ -        int ret = 0; - -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        rb->acl[port_id] = -1; - -        if (rb->cntrs[port_id] > 0) -                ret = -EBUSY; - -        pthread_mutex_unlock(rb->lock); - -        return ret; -} - -void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) -{ -        char fn[25]; -        struct lockfile * lf = NULL; - -        assert(rb); - -        if (rb->api != getpid()) { -                lf = lockfile_open(); -                if (lf == NULL) -                        return; -                if (lockfile_owner(lf) == getpid()) { -                        LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", -                                 rb->api, getpid()); -                        lockfile_close(lf); -                } else { -                        LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", -                                getpid(), rb->api); -                        lockfile_close(lf); -                        return; -                } -        } - -        if (close(rb->fd) < 0) -                LOG_DBG("Couldn't close shared memory."); - -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); - -        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) -                LOG_DBG("Couldn't unmap shared memory."); - -        if (shm_unlink(fn) == -1) -                LOG_DBG("Failed to unlink shm."); - -        free(rb); -} - -int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) -{ -        assert(rb); -        assert(e); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (rb->acl[e->port_id]) { -                pthread_mutex_unlock(rb->lock); -                return -ENOTALLOC; -        } - -        if (!shm_rbuff_free(rb)) { -                pthread_mutex_unlock(rb->lock); -                return -1; -        } - -        if (shm_rbuff_empty(rb)) -                pthread_cond_broadcast(rb->add); - -        *head_el_ptr(rb) = *e; -        *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); - -        ++rb->cntrs[e->port_id]; - -        pthread_mutex_unlock(rb->lock); - -        return 0; -} - -int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) -{ -        int ret = 0; - -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (shm_rbuff_empty(rb)) { -                pthread_mutex_unlock(rb->lock); -                return -1; -        } - -        ret = tail_el_ptr(rb)->index; -        --rb->cntrs[tail_el_ptr(rb)->port_id]; -        *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - -        pthread_mutex_unlock(rb->lock); - -        return ret; -} - -static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, -                                   const struct timespec * timeout) -{ -        struct timespec abstime; -        int ret = 0; - -        assert(rb); - -        if (timeout != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        } - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) rb->lock); -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        while (shm_rbuff_empty(rb)) { -                if (timeout != NULL) -                        ret = pthread_cond_timedwait(rb->add, -                                                     rb->lock, -                                                     &abstime); -                else -                        ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ -                if (ret == EOWNERDEAD) { -                        LOG_DBG("Recovering dead mutex."); -                        pthread_mutex_consistent(rb->lock); -                } -#endif -                if (ret == ETIMEDOUT) -                        break; -        } - -        if (ret != ETIMEDOUT) -                ret = tail_el_ptr(rb)->port_id; -        else -                ret = -ETIMEDOUT; - -        pthread_cleanup_pop(true); - -        return ret; -} - -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff *   rb, -                        bool *                  set, -                        const struct timespec * timeout) -{ -        struct timespec abstime; -        int ret; - -        assert(rb); - -        if (set == NULL) -                return shm_ap_rbuff_peek_b_all(rb, timeout); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (timeout != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        } - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) rb->lock); - -        while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) -               && (ret != ETIMEDOUT)) { -                while (shm_rbuff_empty(rb)) { -                        if (timeout != NULL) -                                ret = pthread_cond_timedwait(rb->add, -                                                             rb->lock, -                                                             &abstime); -                        else -                                ret = pthread_cond_wait(rb->add, rb->lock); - -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_DBG("Recovering dead mutex."); -                                pthread_mutex_consistent(rb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) -                                break; -                } - -                while (!set[tail_el_ptr(rb)->port_id]) { -                        if (timeout != NULL) -                                ret = pthread_cond_timedwait(rb->del, -                                                             rb->lock, -                                                             &abstime); -                        else -                                ret = pthread_cond_wait(rb->del, rb->lock); - -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_DBG("Recovering dead mutex."); -                                pthread_mutex_consistent(rb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) -                                break; -                } -        } - -        if (ret != ETIMEDOUT) -                ret = tail_el_ptr(rb)->port_id; -        else -                ret = -ETIMEDOUT; - -        pthread_cleanup_pop(true); - -        return ret; -} - - -struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) -{ -        struct rb_entry * e = NULL; - -        assert(rb); - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) rb->lock); -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        while (shm_rbuff_empty(rb)) -#ifdef __APPLE__ -                pthread_cond_wait(rb->add, rb->lock); -#else -                if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) { -                        LOG_DBG("Recovering dead mutex."); -                        pthread_mutex_consistent(rb->lock); -                } -#endif -        e = malloc(sizeof(*e)); -        if (e != NULL) { -                *e = *(rb->shm_base + *rb->tail); -                --rb->cntrs[e->port_id]; -                *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); -        } - -        pthread_cleanup_pop(true); - -        return e; -} - -ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) -{ -        ssize_t idx = -1; - -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { -                pthread_mutex_unlock(rb->lock); -                return -1; -        } - -        idx = tail_el_ptr(rb)->index; -        --rb->cntrs[port_id]; -        *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - -        pthread_cond_broadcast(rb->del); -        pthread_mutex_unlock(rb->lock); - -        return idx; -} - -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb, -                                 int                     port_id, -                                 const struct timespec * timeout) -{ -        struct timespec abstime; -        int ret = 0; -        ssize_t idx = -1; - -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (timeout != NULL) { -                idx = -ETIMEDOUT; -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        } - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) rb->lock); - -        while ((shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) -               && (ret != ETIMEDOUT)) { -                while (shm_rbuff_empty(rb)) { -                        if (timeout != NULL) -                                ret = pthread_cond_timedwait(rb->add, -                                                             rb->lock, -                                                             &abstime); -                        else -                                ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_DBG("Recovering dead mutex."); -                                pthread_mutex_consistent(rb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) -                                break; -                } - -                while (tail_el_ptr(rb)->port_id != port_id) { -                        if (timeout != NULL) -                                ret = pthread_cond_timedwait(rb->del, -                                                             rb->lock, -                                                             &abstime); -                        else -                                ret = pthread_cond_wait(rb->del, rb->lock); -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_DBG("Recovering dead mutex."); -                                pthread_mutex_consistent(rb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) -                                break; -                } -        } - -        if (ret != ETIMEDOUT) { -                idx = tail_el_ptr(rb)->index; -                --rb->cntrs[port_id]; -                *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - -                pthread_cond_broadcast(rb->del); -        } - -        pthread_cleanup_pop(true); - -        return idx; -} - -void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) -{ -        assert(rb); - -        pthread_mutex_lock(rb->lock); -        *rb->tail = 0; -        *rb->head = 0; -        pthread_mutex_unlock(rb->lock); -} diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c new file mode 100644 index 00000000..c960bd25 --- /dev/null +++ b/src/lib/shm_flow_set.c @@ -0,0 +1,408 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Management of flow_sets for fqueue + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/shm_flow_set.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/errno.h> + +#define OUROBOROS_PREFIX "shm_flow_set" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <unistd.h> +#include <signal.h> +#include <sys/stat.h> +#include <string.h> +#include <assert.h> + +#define FN_MAX_CHARS 255 + +#define FQUEUESIZE (SHM_BUFFER_SIZE * sizeof(int)) + +#define SHM_FLOW_SET_FILE_SIZE (IRMD_MAX_FLOWS * sizeof(ssize_t)          \ +                                + AP_MAX_FQUEUES * sizeof(size_t)         \ +                                + AP_MAX_FQUEUES * sizeof(pthread_cond_t) \ +                                + AP_MAX_FQUEUES * FQUEUESIZE             \ +                                + sizeof(pthread_mutex_t)) + +#define fqueue_ptr(fs, idx) (fs->fqueues + SHM_BUFFER_SIZE * idx) + +struct shm_flow_set { +        ssize_t *         mtable; +        size_t *          heads; +        pthread_cond_t *  conds; +        int *             fqueues; +        pthread_mutex_t * lock; + +        pid_t             api; +}; + +struct shm_flow_set * shm_flow_set_create() +{ +        struct shm_flow_set * set; +        ssize_t *             shm_base; +        pthread_mutexattr_t   mattr; +        pthread_condattr_t    cattr; +        char                  fn[FN_MAX_CHARS]; +        mode_t                mask; +        int                   shm_fd; +        int                   i; + +        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid()); + +        set = malloc(sizeof(*set)); +        if (set == NULL) { +                LOG_DBG("Could not allocate struct."); +                return NULL; +        } + +        mask = umask(0); + +        shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBG("Failed creating flag file."); +                free(set); +                return NULL; +        } + +        umask(mask); + +        if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) { +                LOG_DBG("Failed to extend flag file."); +                free(set); +                close(shm_fd); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_FLOW_SET_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        close(shm_fd); + +        if (shm_base == MAP_FAILED) { +                LOG_DBG("Failed to map shared memory."); +                if (shm_unlink(fn) == -1) +                        LOG_DBG("Failed to remove invalid shm."); + +                free(set); +                return NULL; +        } + +        set->mtable  = shm_base; +        set->heads   = (size_t *) (set->mtable + IRMD_MAX_FLOWS); +        set->conds   = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); +        set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); +        set->lock    = (pthread_mutex_t *) +                (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + +        pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ +        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif +        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +        pthread_mutex_init(set->lock, &mattr); + +        pthread_condattr_init(&cattr); +        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        for (i = 0; i < AP_MAX_FQUEUES; ++i) { +                set->heads[i] = 0; +                pthread_cond_init(&set->conds[i], &cattr); +        } + +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) +                set->mtable[i] = -1; + +        set->api = getpid(); + +        return set; +} + +struct shm_flow_set * shm_flow_set_open(pid_t api) +{ +        struct shm_flow_set * set; +        ssize_t *             shm_base; +        char                  fn[FN_MAX_CHARS]; +        int                   shm_fd; + +        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", api); + +        set = malloc(sizeof(*set)); +        if (set == NULL) { +                LOG_DBG("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(fn, O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); +                free(set); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_FLOW_SET_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        close(shm_fd); + +        if (shm_base == MAP_FAILED) { +                LOG_DBG("Failed to map shared memory."); +                if (shm_unlink(fn) == -1) +                        LOG_DBG("Failed to remove invalid shm."); +                free(set); +                return NULL; +        } + +        set->mtable  = shm_base; +        set->heads   = (size_t *) (set->mtable + IRMD_MAX_FLOWS); +        set->conds   = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); +        set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); +        set->lock    = (pthread_mutex_t *) +                (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + +        set->api = api; + +        return set; +} + +void shm_flow_set_destroy(struct shm_flow_set * set) +{ +        char fn[25]; +        struct lockfile * lf = NULL; + +        assert(set); + +        if (set->api != getpid()) { +                lf = lockfile_open(); +                if (lf == NULL) { +                        LOG_ERR("Failed to open lockfile."); +                        return; +                } + +                if (lockfile_owner(lf) == getpid()) { +                        LOG_DBG("Flow set %d destroyed by IRMd %d.", +                                set->api, getpid()); +                        lockfile_close(lf); +                } else { +                        LOG_ERR("AP-I %d tried to destroy flowset owned by %d.", +                                getpid(), set->api); +                        lockfile_close(lf); +                        return; +                } +        } + +        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->api); + +        if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        if (shm_unlink(fn) == -1) +                LOG_DBG("Failed to unlink shm."); + +        free(set); +} + +void shm_flow_set_close(struct shm_flow_set * set) +{ +        assert(set); + +        if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        free(set); +} + +void shm_flow_set_zero(struct shm_flow_set * shm_set, +                       ssize_t               idx) +{ +        ssize_t i = 0; + +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +        pthread_mutex_lock(shm_set->lock); + +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) +                if (shm_set->mtable[i] == idx) +                        shm_set->mtable[i] = -1; + +        shm_set->heads[idx] = 0; + +        pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_add(struct shm_flow_set * shm_set, +                     ssize_t               idx, +                     int                   port_id) +{ +        assert(shm_set); +        assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +        pthread_mutex_lock(shm_set->lock); + +        if (shm_set->mtable[port_id] != -1) { +                pthread_mutex_unlock(shm_set->lock); +                return -EPERM; +        } + +        shm_set->mtable[port_id] = idx; + +        pthread_mutex_unlock(shm_set->lock); + +        return 0; +} + +void shm_flow_set_del(struct shm_flow_set * shm_set, +                      ssize_t               idx, +                      int                   port_id) +{ +        assert(shm_set); +        assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +        pthread_mutex_lock(shm_set->lock); + +        if (shm_set->mtable[port_id] == idx) +                shm_set->mtable[port_id] = -1; + +        pthread_mutex_unlock(shm_set->lock); +} + +int shm_flow_set_has(struct shm_flow_set * shm_set, +                     ssize_t               idx, +                     int                   port_id) +{ +        int ret = 0; + +        assert(shm_set); +        assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + +        pthread_mutex_lock(shm_set->lock); + +        if (shm_set->mtable[port_id] == idx) +                ret = 1; + +        pthread_mutex_unlock(shm_set->lock); + +        return ret; +} + +void shm_flow_set_notify(struct shm_flow_set * shm_set, int port_id) +{ +        assert(shm_set); +        assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + +        pthread_mutex_lock(shm_set->lock); + +        if (shm_set->mtable[port_id] == -1) { +                pthread_mutex_unlock(shm_set->lock); +                return; +        } + +        *(fqueue_ptr(shm_set, shm_set->mtable[port_id]) + +                     (shm_set->heads[shm_set->mtable[port_id]])++) = port_id; + +        pthread_cond_signal(&shm_set->conds[shm_set->mtable[port_id]]); + +        pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_wait(const struct shm_flow_set * shm_set, +                      ssize_t                     idx, +                      int *                       fqueue, +                      const struct timespec *     timeout) +{ +        int ret = 0; +        struct timespec abstime; + +        assert(shm_set); +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +#ifdef __APPLE__ +        pthread_mutex_lock(shm_set->lock); +#else +        if (pthread_mutex_lock(shm_set->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(shm_set->lock); +        } +#endif +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) shm_set->lock); + +        while (shm_set->heads[idx] == 0 && ret != -ETIMEDOUT) { +                if (timeout != NULL) +                        ret = pthread_cond_timedwait(shm_set->conds + idx, +                                                     shm_set->lock, +                                                     &abstime); +                else +                        ret = pthread_cond_wait(shm_set->conds + idx, +                                                shm_set->lock); +#ifndef __APPLE__ +                if (ret == EOWNERDEAD) { +                        LOG_DBG("Recovering dead mutex."); +                        pthread_mutex_consistent(shm_set->lock); +                } +#endif +                if (ret == ETIMEDOUT) { +                        ret = -ETIMEDOUT; +                        break; +                } +        } + +        if (ret != -ETIMEDOUT) { +                memcpy(fqueue, +                       fqueue_ptr(shm_set, idx), +                       shm_set->heads[idx] * sizeof(int)); +                ret = shm_set->heads[idx]; +                shm_set->heads[idx] = 0; +        } + +        pthread_cleanup_pop(true); + +        return ret; +} diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c new file mode 100644 index 00000000..cf094488 --- /dev/null +++ b/src/lib/shm_rbuff.c @@ -0,0 +1,424 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for incoming SDUs + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/shm_rbuff.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/errno.h> + +#define OUROBOROS_PREFIX "shm_rbuff" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <signal.h> +#include <sys/stat.h> +#include <assert.h> +#include <stdbool.h> + +#define FN_MAX_CHARS 255 + +#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(ssize_t)          \ +                             + 2 * sizeof(size_t) + sizeof(int8_t)      \ +                             + sizeof(pthread_mutex_t)                  \ +                             + 2 * sizeof (pthread_cond_t)) + +#define shm_rbuff_used(rb) ((*rb->head + SHM_BUFFER_SIZE - *rb->tail)   \ +                            & (SHM_BUFFER_SIZE - 1)) +#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) +#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) + +struct shm_rbuff { +        ssize_t *         shm_base; /* start of entry                */ +        size_t *          head;     /* start of ringbuffer head      */ +        size_t *          tail;     /* start of ringbuffer tail      */ +        int8_t *          acl;      /* access control                */ +        pthread_mutex_t * lock;     /* lock all free space in shm    */ +        pthread_cond_t *  add;      /* SDU arrived                   */ +        pthread_cond_t *  del;      /* SDU removed                   */ +        pid_t             api;      /* api of the owner              */ +        int               port_id;  /* port_id of the flow           */ +}; + +struct shm_rbuff * shm_rbuff_create(int port_id) +{ +        struct shm_rbuff *  rb; +        int                 shm_fd; +        ssize_t *           shm_base; +        pthread_mutexattr_t mattr; +        pthread_condattr_t  cattr; +        char                fn[FN_MAX_CHARS]; +        mode_t              mask; + +        sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", getpid(), port_id); + +        rb = malloc(sizeof(*rb)); +        if (rb == NULL) { +                LOG_DBG("Could not allocate struct."); +                return NULL; +        } + +        mask = umask(0); + +        shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBG("Failed creating ring buffer."); +                free(rb); +                return NULL; +        } + +        umask(mask); + +        if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { +                LOG_DBG("Failed to extend ringbuffer."); +                free(rb); +                close(shm_fd); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_RBUFF_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        close(shm_fd); + +        if (shm_base == MAP_FAILED) { +                LOG_DBG("Failed to map shared memory."); +                if (shm_unlink(fn) == -1) +                        LOG_DBG("Failed to remove invalid shm."); +                free(rb); +                return NULL; +        } + +        rb->shm_base = shm_base; +        rb->head     = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); +        rb->tail     = rb->head + 1; +        rb->acl      = (int8_t *) (rb->tail + 1); +        rb->lock     = (pthread_mutex_t *) (rb->acl + 1); +        rb->add      = (pthread_cond_t *) (rb->lock + 1); +        rb->del      = rb->add + 1; + +        pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ +        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif +        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +        pthread_mutex_init(rb->lock, &mattr); + +        pthread_condattr_init(&cattr); +        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        pthread_cond_init(rb->add, &cattr); +        pthread_cond_init(rb->del, &cattr); + +        *rb->acl = 0; +        *rb->head = 0; +        *rb->tail = 0; + +        rb->api = getpid(); +        rb->port_id = port_id; + +        return rb; +} + +struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id) +{ +        struct shm_rbuff * rb; +        int                shm_fd; +        ssize_t *          shm_base; +        char               fn[FN_MAX_CHARS]; + +        sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", api, port_id); + +        rb = malloc(sizeof(*rb)); +        if (rb == NULL) { +                LOG_DBG("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(fn, O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); +                free(rb); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_RBUFF_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        close(shm_fd); + +        if (shm_base == MAP_FAILED) { +                LOG_DBG("Failed to map shared memory."); +                if (shm_unlink(fn) == -1) +                        LOG_DBG("Failed to remove invalid shm."); + +                free(rb); +                return NULL; +        } + +        rb->shm_base = shm_base; +        rb->head     = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); +        rb->tail     = rb->head + 1; +        rb->acl      = (int8_t *) (rb->tail + 1); +        rb->lock     = (pthread_mutex_t *) (rb->acl + 1); +        rb->add      = (pthread_cond_t *) (rb->lock + 1); +        rb->del      = rb->add + 1; + +        rb->api = api; +        rb->port_id = port_id; + +        return rb; +} + +void shm_rbuff_close(struct shm_rbuff * rb) +{ +        assert(rb); + +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        free(rb); +} + +void shm_rbuff_destroy(struct shm_rbuff * rb) +{ +        char fn[25]; +        struct lockfile * lf = NULL; + +        assert(rb); + +        if (rb->api != getpid()) { +                lf = lockfile_open(); +                if (lf == NULL) { +                        LOG_ERR("Failed to open lockfile."); +                        return; +                } + +                if (lockfile_owner(lf) == getpid()) { +                        LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", +                                 rb->api, getpid()); +                        lockfile_close(lf); +                } else { +                        LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", +                                getpid(), rb->api); +                        lockfile_close(lf); +                        return; +                } +        } + +        sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id); + +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        if (shm_unlink(fn) == -1) +                LOG_DBG("Failed to unlink shm."); + +        free(rb); +} + +int shm_rbuff_write(struct shm_rbuff * rb, ssize_t idx) +{ +        assert(rb); +        assert(idx >= 0); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (*rb->acl) { +                pthread_mutex_unlock(rb->lock); +                return -ENOTALLOC; +        } + +        if (!shm_rbuff_free(rb)) { +                pthread_mutex_unlock(rb->lock); +                return -1; +        } + +        if (shm_rbuff_empty(rb)) +                pthread_cond_broadcast(rb->add); + +        *head_el_ptr(rb) = idx; +        *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); + +        pthread_mutex_unlock(rb->lock); + +        return 0; +} + +ssize_t shm_rbuff_read(struct shm_rbuff * rb) +{ +        int ret = 0; + +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (shm_rbuff_empty(rb)) { +                pthread_mutex_unlock(rb->lock); +                return -1; +        } + +        ret = *tail_el_ptr(rb); +        *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); + +        pthread_mutex_unlock(rb->lock); + +        return ret; +} + +ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb, +                         const struct timespec * timeout) +{ +        struct timespec abstime; +        int ret = 0; +        ssize_t idx = -1; + +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (timeout != NULL) { +                idx = -ETIMEDOUT; +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while (shm_rbuff_empty(rb) && (ret != ETIMEDOUT)) { +                if (timeout != NULL) +                        ret = pthread_cond_timedwait(rb->add, +                                                     rb->lock, +                                                     &abstime); +                else +                        ret = pthread_cond_wait(rb->add, rb->lock); +#ifndef __APPLE__ +                if (ret == EOWNERDEAD) { +                        LOG_DBG("Recovering dead mutex."); +                        pthread_mutex_consistent(rb->lock); +                } +#endif +                if (ret == ETIMEDOUT) { +                        idx = -ETIMEDOUT; +                        break; +                } +        } + +        if (idx != -ETIMEDOUT) { +                idx = *tail_el_ptr(rb); +                *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); +                pthread_cond_broadcast(rb->del); +        } + +        pthread_cleanup_pop(true); + +        return idx; +} + +int shm_rbuff_block(struct shm_rbuff * rb) +{ +        int ret = 0; + +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        *rb->acl = -1; + +        if (!shm_rbuff_empty(rb)) +                ret = -EBUSY; + +        pthread_mutex_unlock(rb->lock); + +        return ret; +} + +void shm_rbuff_unblock(struct shm_rbuff * rb) +{ +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        *rb->acl = 0; /* open */ + +        pthread_mutex_unlock(rb->lock); +} + +void shm_rbuff_reset(struct shm_rbuff * rb) +{ +        assert(rb); + +        pthread_mutex_lock(rb->lock); +        *rb->tail = 0; +        *rb->head = 0; +        pthread_mutex_unlock(rb->lock); +} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index f6683dc2..e5a37577 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -90,7 +90,6 @@ struct shm_rdrbuff {          pthread_cond_t *  full;        /* run sanitizer when buffer full */          pid_t *           api;         /* api of the irmd owner */          enum qos_cube     qos;         /* qos id which this buffer serves */ -        int               fd;  };  static void garbage_collect(struct shm_rdrbuff * rdrb) @@ -189,17 +188,11 @@ struct shm_rdrbuff * shm_rdrbuff_create()          if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) {                  LOG_DBGF("Failed to extend shared memory map.");                  free(shm_rdrb_fn); +                close(shm_fd);                  free(rdrb);                  return NULL;          } -#ifndef __APPLE -        if (write(shm_fd, "", 1) != 1) { -                LOG_DBGF("Failed to finalise extension of shared memory map."); -                free(shm_rdrb_fn); -                free(rdrb); -                return NULL; -        } -#endif +          shm_base = mmap(NULL,                          SHM_FILE_SIZE,                          PROT_READ | PROT_WRITE, @@ -207,6 +200,8 @@ struct shm_rdrbuff * shm_rdrbuff_create()                          shm_fd,                          0); +        close(shm_fd); +          if (shm_base == MAP_FAILED) {                  LOG_DBGF("Failed to map shared memory.");                  if (shm_unlink(shm_rdrb_fn) == -1) @@ -235,6 +230,9 @@ struct shm_rdrbuff * shm_rdrbuff_create()          pthread_condattr_init(&cattr);          pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif          pthread_cond_init(rdrb->full, &cattr);          pthread_cond_init(rdrb->healthy, &cattr); @@ -246,7 +244,6 @@ struct shm_rdrbuff * shm_rdrbuff_create()          *rdrb->api = getpid();          rdrb->qos = qos; -        rdrb->fd  = shm_fd;          free(shm_rdrb_fn); @@ -287,10 +284,11 @@ struct shm_rdrbuff * shm_rdrbuff_open()                          MAP_SHARED,                          shm_fd,                          0); + +        close(shm_fd); +          if (shm_base == MAP_FAILED) {                  LOG_DBGF("Failed to map shared memory."); -                if (close(shm_fd) == -1) -                        LOG_DBG("Failed to close invalid shm.");                  if (shm_unlink(shm_rdrb_fn) == -1)                          LOG_DBG("Failed to unlink invalid shm.");                  free(shm_rdrb_fn); @@ -309,7 +307,6 @@ struct shm_rdrbuff * shm_rdrbuff_open()          rdrb->api = (pid_t *) (rdrb->full + 1);          rdrb->qos = qos; -        rdrb->fd = shm_fd;          free(shm_rdrb_fn); @@ -400,9 +397,6 @@ void shm_rdrbuff_close(struct shm_rdrbuff * rdrb)  {          assert(rdrb); -        if (close(rdrb->fd) < 0) -                LOG_DBGF("Couldn't close shared memory."); -          if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)                  LOG_DBGF("Couldn't unmap shared memory."); @@ -420,9 +414,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)                  return;          } -        if (close(rdrb->fd) < 0) -                LOG_DBG("Couldn't close shared memory."); -          if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)                  LOG_DBG("Couldn't unmap shared memory."); diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 7d41b497..0ca40326 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -23,7 +23,7 @@  #define _POSIX_C_SOURCE 199506L -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h>  #include <ouroboros/dev.h>  #include <stdio.h> @@ -53,6 +53,8 @@ struct c {          float  rtt_avg;          float  rtt_m2; +        flow_set_t * flows; +          /* needs locking */          struct timespec * times;          pthread_mutex_t lock; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 4742d0de..40f75785 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -55,20 +55,21 @@ void * reader(void * o)          struct timespec timeout = {2, 0};          struct timespec now = {0, 0}; -        struct oping_msg * msg;          char buf[OPING_BUF_SIZE]; +        struct oping_msg * msg = (struct oping_msg *) buf;          int fd = 0;          int msg_len = 0;          float ms = 0;          float d = 0; - -        msg = (struct oping_msg *) buf; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1;          /* FIXME: use flow timeout option once we have it */ -        while(client.rcvd != client.count && -              (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) { -                flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK); -                while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { +        while (client.rcvd != client.count +               && flow_event_wait(client.flows, fq, &timeout) != -ETIMEDOUT) { +                while ((fd = fqueue_next(fq)) >= 0) { +                        msg_len = flow_read(fd, buf, OPING_BUF_SIZE);                          if (msg_len < 0)                                  continue; @@ -165,12 +166,20 @@ int client_main()          struct timespec tic;          struct timespec toc; -        int fd = flow_alloc(client.s_apn, NULL, NULL); +        int fd; + +        client.flows = flow_set_create(); +        if (client.flows == NULL) +                return 0; + +        fd = flow_alloc(client.s_apn, NULL, NULL);          if (fd < 0) {                  printf("Failed to allocate flow.\n");                  return -1;          } +        flow_set_add(client.flows, fd); +          if (flow_alloc_res(fd)) {                  printf("Flow allocation refused.\n");                  flow_dealloc(fd); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 845f0cbd..8a5a3512 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -69,16 +69,23 @@ void * server_thread(void *o)          struct oping_msg * msg = (struct oping_msg *) buf;          struct timespec now = {0, 0};          struct timespec timeout = {0, 100 * MILLION}; +        int fd; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1;          while (true) { -                int fd = flow_select(server.flows, &timeout); -                if (fd == -ETIMEDOUT) -                        continue; -                if (fd < 0) { -                        printf("Failed to get active fd.\n"); +                int ret = flow_event_wait(server.flows, fq, &timeout); +                if (ret == -ETIMEDOUT)                          continue; + +                if (ret < 0) { +                        printf("Event error.\n"); +                        break;                  } -                while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { + +                while ((fd = fqueue_next(fq)) >= 0) { +                        msg_len = flow_read(fd, buf, OPING_BUF_SIZE);                          if (msg_len < 0)                                  continue; @@ -160,8 +167,6 @@ int server_main()          if (server.flows == NULL)                  return 0; -        flow_set_zero(server.flows); -          pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);          pthread_create(&server.accept_pt, NULL, accept_thread, NULL);          pthread_create(&server.server_pt, NULL, server_thread, NULL); | 
