diff options
| -rw-r--r-- | include/ouroboros/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | include/ouroboros/dev.h | 19 | ||||
| -rw-r--r-- | include/ouroboros/select.h | 50 | ||||
| -rw-r--r-- | include/ouroboros/shm_ap_rbuff.h | 35 | ||||
| -rw-r--r-- | include/ouroboros/wrap/ouroboros.i | 2 | ||||
| -rw-r--r-- | src/ipcpd/local/main.c | 6 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/main.c | 2 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 7 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 6 | ||||
| -rw-r--r-- | src/irmd/main.c | 18 | ||||
| -rw-r--r-- | src/lib/dev.c | 489 | ||||
| -rw-r--r-- | src/lib/lockfile.c | 9 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 148 | ||||
| -rw-r--r-- | src/lib/shm_rdrbuff.c | 18 | ||||
| -rw-r--r-- | src/tools/cbr/cbr.c | 8 | ||||
| -rw-r--r-- | src/tools/oping/oping.c | 9 | ||||
| -rw-r--r-- | src/tools/oping/oping_client.c | 4 | ||||
| -rw-r--r-- | src/tools/oping/oping_server.c | 26 | 
19 files changed, 581 insertions, 282 deletions
diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt index ae922b89..78a7bb9c 100644 --- a/include/ouroboros/CMakeLists.txt +++ b/include/ouroboros/CMakeLists.txt @@ -10,7 +10,8 @@ set(HEADER_FILES    irm.h    irm_config.h    nsm.h -  qos.h) +  qos.h +  select.h)  install(FILES ${HEADER_FILES} DESTINATION usr/include/ouroboros) diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index d5fb744b..fe5ff4b5 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -22,7 +22,6 @@  #include <unistd.h>  #include <stdint.h> -#include <time.h>  #include <ouroboros/qos.h>  #include <ouroboros/flow.h> @@ -34,10 +33,12 @@  /* These calls should be removed once we write the ouroboros OS. */  int     ap_init(char * ap_name); +  void    ap_fini(void);  /* Returns file descriptor (> 0) and client AE name. */  int     flow_accept(char ** ae_name); +  int     flow_alloc_resp(int fd, int result);  /* @@ -47,13 +48,21 @@ int     flow_alloc_resp(int fd, int result);  int     flow_alloc(char * dst_name,                     char * src_ae_name,                     struct qos_spec * qos); +  int     flow_alloc_res(int fd);  int     flow_dealloc(int fd); -int     flow_cntl(int fd, int cmd, int oflags); -ssize_t flow_write(int fd, void * buf, size_t count); -ssize_t flow_read(int fd, void * buf, size_t count); -int     flow_select(const struct timespec * timeout); +int     flow_cntl(int fd, +                  int cmd, +                  int oflags); + +ssize_t flow_write(int fd, +                   void * buf, +                   size_t count); + +ssize_t flow_read(int fd, +                  void * buf, +                  size_t count);  #endif diff --git a/include/ouroboros/select.h b/include/ouroboros/select.h new file mode 100644 index 00000000..9e0b8fec --- /dev/null +++ b/include/ouroboros/select.h @@ -0,0 +1,50 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * A select call for flows + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SELECT_H +#define OUROBOROS_SELECT_H + +#include <stdbool.h> +#include <time.h> + +struct flow_set; + +struct flow_set * flow_set_create(); + +void              flow_set_destroy(struct flow_set * set); + +void              flow_set_zero(struct flow_set * set); + +void              flow_set_add(struct flow_set * set, +                               int               fd); + +void              flow_set_del(struct flow_set * set, +                               int               fd); + +bool              flow_set_has(struct flow_set * set, +                               int               fd); + +int               flow_select(struct flow_set *       set, +                              const struct timespec * timeout); + +#endif /* OUROBOROS_SELECT_H */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index 9dad0863..6b11fd2d 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -24,6 +24,7 @@  #ifndef OUROBOROS_SHM_AP_RBUFF_H  #define OUROBOROS_SHM_AP_RBUFF_H +#include <ouroboros/select.h>  #include <sys/types.h>  #include <sys/time.h>  #include <stdbool.h> @@ -35,20 +36,40 @@ struct rb_entry {          int     port_id;  }; -struct shm_ap_rbuff * shm_ap_rbuff_create(); -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api); +/* recv SDUs from N + 1 */ +struct shm_ap_rbuff * shm_ap_rbuff_create_n(); + +/* recv SDUs from N - 1 */ +struct shm_ap_rbuff * shm_ap_rbuff_create_s(); + +/* write SDUs to N - 1 */ +struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api); + +/* write SDUs to N + 1 */ +struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api); +  void                  shm_ap_rbuff_close(struct shm_ap_rbuff * rb); +  void                  shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb); +  int                   shm_ap_rbuff_write(struct shm_ap_rbuff * rb, -                                         struct rb_entry * e); +                                         struct rb_entry *     e); +  struct rb_entry *     shm_ap_rbuff_read(struct shm_ap_rbuff * rb); +  int                   shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb); -int                   shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, + +int                   shm_ap_rbuff_peek_b(struct shm_ap_rbuff *   rb, +                                          bool *                  set,                                            const struct timespec * timeout); +  ssize_t               shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, -                                             int port_id); -ssize_t               shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, -                                               int port_id, +                                             int                   port_id); + +ssize_t               shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb, +                                               int                     port_id,                                                 const struct timespec * timeout); +  void                  shm_ap_rbuff_reset(struct shm_ap_rbuff * rb); +  #endif /* OUROBOROS_SHM_AP_RBUFF_H */ diff --git a/include/ouroboros/wrap/ouroboros.i b/include/ouroboros/wrap/ouroboros.i index 386c21cc..2f66aa16 100644 --- a/include/ouroboros/wrap/ouroboros.i +++ b/include/ouroboros/wrap/ouroboros.i @@ -30,6 +30,7 @@  #include "ouroboros/irm_config.h"  #include "ouroboros/nsm.h"  #include "ouroboros/qos.h" +#include "ouroboros/select.h"  %}  typedef int pid_t; @@ -42,3 +43,4 @@ typedef int pid_t;  %include "ouroboros/irm_config.h"  %include "ouroboros/nsm.h"  %include "ouroboros/qos.h" +%include "ouroboros/select.h" diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index f1b6dd9e..c0809429 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -105,7 +105,7 @@ static int shim_ap_init()                  return -1;          } -        _ap_instance->rb = shm_ap_rbuff_create(); +        _ap_instance->rb = shm_ap_rbuff_create_n();          if (_ap_instance->rb == NULL) {                  shm_rdrbuff_close(_ap_instance->rdrb);                  bmp_destroy(_ap_instance->fds); @@ -331,7 +331,7 @@ static int ipcp_local_flow_alloc(pid_t         n_api,                  return -1; /* -ENOTENROLLED */          } -        rb = shm_ap_rbuff_open(n_api); +        rb = shm_ap_rbuff_open_s(n_api);          if (rb == NULL) {                  pthread_rwlock_unlock(&_ipcp->state_lock);                  return -1; /* -ENORBUFF */ @@ -421,7 +421,7 @@ static int ipcp_local_flow_alloc_resp(pid_t n_api,                  return -1;          } -        rb = shm_ap_rbuff_open(n_api); +        rb = shm_ap_rbuff_open_s(n_api);          if (rb == NULL) {                  LOG_ERR("Could not open N + 1 ringbuffer.");                  _ap_instance->flows[in_fd].state   = FLOW_NULL; diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 5d54842e..79b1bb4b 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -367,7 +367,7 @@ int fmgr_flow_alloc(pid_t         n_api,          free(buf.data); -        flow->flow.rb = shm_ap_rbuff_open(n_api); +        flow->flow.rb = shm_ap_rbuff_open_s(n_api);          if (flow->flow.rb == NULL) {                  pthread_mutex_unlock(&fmgr->n_flows_lock);                  free(flow); @@ -478,7 +478,7 @@ int fmgr_flow_alloc_resp(pid_t n_api,                  flow->flow.state = FLOW_ALLOCATED;                  flow->flow.api = n_api; -                flow->flow.rb = shm_ap_rbuff_open(n_api); +                flow->flow.rb = shm_ap_rbuff_open_s(n_api);                  if (flow->flow.rb == NULL) {                          n_flow_dealloc(port_id);                          pthread_mutex_unlock(&fmgr->n_flows_lock); diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index cf4ae3f1..082973f4 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -212,7 +212,7 @@ struct normal_ipcp_data * normal_ipcp_data_create()                  return NULL;          } -        normal_data->rb = shm_ap_rbuff_open(getpid()); +        normal_data->rb = shm_ap_rbuff_create_n();          if (normal_data->rb == NULL) {                  shm_rdrbuff_close(normal_data->rdrb);                  free(normal_data); diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index a1ded117..d74984cc 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -62,6 +62,7 @@  #ifdef __FreeBSD__  #include <net/if_dl.h>  #include <netinet/if_ether.h> +#include <ifaddrs.h>  #endif  #include <poll.h> @@ -161,7 +162,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()                  return NULL;          } -        eth_llc_data->rb = shm_ap_rbuff_create(); +        eth_llc_data->rb = shm_ap_rbuff_create_n();          if (eth_llc_data->rb == NULL) {                  shm_rdrbuff_close(eth_llc_data->rdrb);                  free(eth_llc_data); @@ -1084,7 +1085,7 @@ static int eth_llc_ipcp_flow_alloc(pid_t         n_api,          if (qos != QOS_CUBE_BE)                  LOG_DBGF("QoS requested. Ethernet LLC can't do that. For now."); -        rb = shm_ap_rbuff_open(n_api); +        rb = shm_ap_rbuff_open_s(n_api);          if (rb == NULL)                  return -1; /* -ENORBUFF */ @@ -1169,7 +1170,7 @@ static int eth_llc_ipcp_flow_alloc_resp(pid_t n_api,                  return -1;          } -        rb = shm_ap_rbuff_open(n_api); +        rb = shm_ap_rbuff_open_s(n_api);          if (rb == NULL) {                  LOG_ERR("Could not open N + 1 ringbuffer.");                  ipcp_flow(index)->state = FLOW_NULL; diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 34af71a7..c35bd244 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -128,7 +128,7 @@ static int shim_ap_init()                  return -1;          } -        _ap_instance->rb = shm_ap_rbuff_create(); +        _ap_instance->rb = shm_ap_rbuff_create_n();          if (_ap_instance->rb == NULL) {                  shm_rdrbuff_close(_ap_instance->rdrb);                  bmp_destroy(_ap_instance->fds); @@ -1179,7 +1179,7 @@ static int ipcp_udp_flow_alloc(pid_t         n_api,          if (qos != QOS_CUBE_BE)                  LOG_DBG("QoS requested. UDP/IP can't do that."); -        rb = shm_ap_rbuff_open(n_api); +        rb = shm_ap_rbuff_open_s(n_api);          if (rb == NULL)                  return -1; /* -ENORBUFF */ @@ -1333,7 +1333,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_api,                  return -1;          } -        rb = shm_ap_rbuff_open(n_api); +        rb = shm_ap_rbuff_open_s(n_api);          if (rb == NULL) {                  LOG_ERR("Could not open N + 1 ringbuffer.");                  _ap_instance->flows[fd].state   = FLOW_NULL; diff --git a/src/irmd/main.c b/src/irmd/main.c index a69dd526..cc9160bf 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -212,6 +212,9 @@ static pid_t get_ipcp_by_dst_name(char * dst_name)          list_for_each(p, &irmd->ipcps) {                  struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next); +                if (e->dif_name == NULL) +                        continue; +                  if (strcmp(e->dif_name, dif_name) == 0)                          return e->api;          } @@ -1742,7 +1745,7 @@ void * irm_sanitize()                          if (kill(f->n_api, 0) < 0) {                                  struct shm_ap_rbuff * n_rb = -                                        shm_ap_rbuff_open(f->n_api); +                                        shm_ap_rbuff_open_s(f->n_api);                                  bmp_release(irmd->port_ids, f->port_id);                                  list_del(&f->next); @@ -1755,13 +1758,17 @@ void * irm_sanitize()                                  continue;                          }                          if (kill(f->n_1_api, 0) < 0) { -                                struct shm_ap_rbuff * n_1_rb = -                                        shm_ap_rbuff_open(f->n_1_api); +                                struct shm_ap_rbuff * n_1_rb_s = +                                        shm_ap_rbuff_open_s(f->n_1_api); +                                struct shm_ap_rbuff * n_1_rb_n = +                                        shm_ap_rbuff_open_n(f->n_1_api);                                  list_del(&f->next);                                  LOG_ERR("IPCP %d gone, flow %d removed.",                                          f->n_1_api, f->port_id); -                                if (n_1_rb != NULL) -                                        shm_ap_rbuff_destroy(n_1_rb); +                                if (n_1_rb_n != NULL) +                                        shm_ap_rbuff_destroy(n_1_rb_n); +                                if (n_1_rb_s != NULL) +                                        shm_ap_rbuff_destroy(n_1_rb_s);                                  irm_flow_destroy(f);                          }                  } @@ -2152,7 +2159,6 @@ int main(int argc, char ** argv)                  }          } -          if (!use_stdout &&              (log_dir = opendir(INSTALL_PREFIX LOG_DIR)) != NULL) {                  while ((ent = readdir(log_dir)) != NULL) { diff --git a/src/lib/dev.c b/src/lib/dev.c index 17c473ed..391563da 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -28,9 +28,18 @@  #include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/utils.h> +#include <ouroboros/select.h>  #include <stdlib.h>  #include <string.h> +#include <stdio.h> + +struct flow_set { +        bool dirty; +        bool b[IRMD_MAX_FLOWS]; /* working copy */ +        bool s[IRMD_MAX_FLOWS]; /* safe copy */ +        pthread_rwlock_t lock; +};  struct flow {          struct shm_ap_rbuff * rb; @@ -42,17 +51,21 @@ struct flow {          struct timespec *     timeout;  }; -struct ap_data { +struct ap_instance {          char *                ap_name; +        char *                daf_name;          pid_t                 api; +          struct shm_rdrbuff *  rdrb;          struct bmp *          fds;          struct shm_ap_rbuff * rb;          pthread_rwlock_t      data_lock;          struct flow           flows[AP_MAX_FLOWS]; +        int                   ports[AP_MAX_FLOWS]; +          pthread_rwlock_t      flows_lock; -} * _ap_instance; +} * ai;  static int api_announce(char * ap_name)  { @@ -63,12 +76,12 @@ static int api_announce(char * ap_name)          msg.code    = IRM_MSG_CODE__IRM_API_ANNOUNCE;          msg.has_api = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); +        pthread_rwlock_rdlock(&ai->data_lock); -        msg.api = _ap_instance->api; +        msg.api = ai->api;          msg.ap_name = ap_name; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { @@ -91,45 +104,47 @@ int ap_init(char * ap_name)          ap_name = path_strip(ap_name); -        _ap_instance = malloc(sizeof(struct ap_data)); -        if (_ap_instance == NULL) { +        ai = malloc(sizeof(*ai)); +        if (ai == NULL) {                  return -ENOMEM;          } -        _ap_instance->api = getpid(); -        _ap_instance->ap_name = ap_name; +        ai->api = getpid(); +        ai->ap_name = ap_name; +        ai->daf_name = NULL; -        _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); -        if (_ap_instance->fds == NULL) { -                free(_ap_instance); +        ai->fds = bmp_create(AP_MAX_FLOWS, 0); +        if (ai->fds == NULL) { +                free(ai);                  return -ENOMEM;          } -        _ap_instance->rdrb = shm_rdrbuff_open(); -        if (_ap_instance->rdrb == NULL) { -                bmp_destroy(_ap_instance->fds); -                free(_ap_instance); +        ai->rdrb = shm_rdrbuff_open(); +        if (ai->rdrb == NULL) { +                bmp_destroy(ai->fds); +                free(ai);                  return -1;          } -        _ap_instance->rb = shm_ap_rbuff_create(); -        if (_ap_instance->rb == NULL) { -                shm_rdrbuff_close(_ap_instance->rdrb); -                bmp_destroy(_ap_instance->fds); -                free(_ap_instance); +        ai->rb = shm_ap_rbuff_create_s(); +        if (ai->rb == NULL) { +                shm_rdrbuff_close(ai->rdrb); +                bmp_destroy(ai->fds); +                free(ai);                  return -1;          }          for (i = 0; i < AP_MAX_FLOWS; ++i) { -                _ap_instance->flows[i].rb = NULL; -                _ap_instance->flows[i].port_id = -1; -                _ap_instance->flows[i].oflags = 0; -                _ap_instance->flows[i].api = -1; -                _ap_instance->flows[i].timeout = NULL; +                ai->flows[i].rb = NULL; +                ai->flows[i].port_id = -1; +                ai->flows[i].oflags = 0; +                ai->flows[i].api = -1; +                ai->flows[i].timeout = NULL; +                ai->ports[i] = -1;          } -        pthread_rwlock_init(&_ap_instance->flows_lock, NULL); -        pthread_rwlock_init(&_ap_instance->data_lock, NULL); +        pthread_rwlock_init(&ai->flows_lock, NULL); +        pthread_rwlock_init(&ai->data_lock, NULL);          if (ap_name != NULL)                  return api_announce(ap_name); @@ -141,60 +156,56 @@ void ap_fini(void)  {          int i = 0; -        if (_ap_instance == NULL) +        if (ai == NULL)                  return; -        pthread_rwlock_wrlock(&_ap_instance->data_lock); +        pthread_rwlock_wrlock(&ai->data_lock);          /* remove all remaining sdus */ -        while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) -                shm_rdrbuff_remove(_ap_instance->rdrb, i); +        while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0) +                shm_rdrbuff_remove(ai->rdrb, i); -        if (_ap_instance->fds != NULL) -                bmp_destroy(_ap_instance->fds); -        if (_ap_instance->rb != NULL) -                shm_ap_rbuff_destroy(_ap_instance->rb); -        if (_ap_instance->rdrb != NULL) -                shm_rdrbuff_close(_ap_instance->rdrb); +        if (ai->fds != NULL) +                bmp_destroy(ai->fds); +        if (ai->rb != NULL) +                shm_ap_rbuff_destroy(ai->rb); +        if (ai->rdrb != NULL) +                shm_rdrbuff_close(ai->rdrb); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        if (ai->daf_name != NULL) +                free(ai->daf_name); -        for (i = 0; i < AP_MAX_FLOWS; ++i) -                if (_ap_instance->flows[i].rb != NULL) -                        shm_ap_rbuff_close(_ap_instance->flows[i].rb); +        pthread_rwlock_rdlock(&ai->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        for (i = 0; i < AP_MAX_FLOWS; ++i) { +                if (ai->flows[i].rb != NULL) +                        shm_ap_rbuff_close(ai->flows[i].rb); +                ai->ports[ai->flows[i].port_id] = -1; +        } -        pthread_rwlock_destroy(&_ap_instance->flows_lock); -        pthread_rwlock_destroy(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock); -        free(_ap_instance); -} +        pthread_rwlock_destroy(&ai->flows_lock); +        pthread_rwlock_destroy(&ai->data_lock); -static int port_id_to_fd(int port_id) -{ -        int i; -        for (i = 0; i < AP_MAX_FLOWS; ++i) -                if (_ap_instance->flows[i].port_id == port_id) -                        return i; -        return -1; +        free(ai);  }  int flow_accept(char ** ae_name)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int cfd = -1; +        int fd = -1;          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_api = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); +        pthread_rwlock_rdlock(&ai->data_lock); -        msg.api     = _ap_instance->api; +        msg.api     = ai->api; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          recv_msg = send_recv_irm_msg_b(&msg);          if (recv_msg == NULL) @@ -205,22 +216,22 @@ int flow_accept(char ** ae_name)                  return -1;          } -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai->flows_lock); -        cfd = bmp_allocate(_ap_instance->fds); -        if (!bmp_is_id_valid(_ap_instance->fds, cfd)) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        fd = bmp_allocate(ai->fds); +        if (!bmp_is_id_valid(ai->fds, fd)) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->api); -        if (_ap_instance->flows[cfd].rb == NULL) { -                bmp_release(_ap_instance->fds, cfd); -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); +        if (ai->flows[fd].rb == NULL) { +                bmp_release(ai->fds, fd); +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -228,25 +239,27 @@ int flow_accept(char ** ae_name)          if (ae_name != NULL) {                  *ae_name = strdup(recv_msg->ae_name);                  if (*ae_name == NULL) { -                        shm_ap_rbuff_close(_ap_instance->flows[cfd].rb); -                        bmp_release(_ap_instance->fds, cfd); -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ap_instance->data_lock); +                        shm_ap_rbuff_close(ai->flows[fd].rb); +                        bmp_release(ai->fds, fd); +                        pthread_rwlock_unlock(&ai->flows_lock); +                        pthread_rwlock_unlock(&ai->data_lock);                          irm_msg__free_unpacked(recv_msg, NULL);                          return -ENOMEM;                  }          } -        _ap_instance->flows[cfd].port_id = recv_msg->port_id; -        _ap_instance->flows[cfd].oflags  = FLOW_O_DEFAULT; -        _ap_instance->flows[cfd].api     = recv_msg->api; +        ai->flows[fd].port_id = recv_msg->port_id; +        ai->flows[fd].oflags  = FLOW_O_DEFAULT; +        ai->flows[fd].api     = recv_msg->api; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        ai->ports[recv_msg->port_id] = fd; + +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock);          irm_msg__free_unpacked(recv_msg, NULL); -        return cfd; +        return fd;  }  int flow_alloc_resp(int fd, @@ -261,40 +274,40 @@ int flow_alloc_resp(int fd,          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;          msg.has_api      = true; -        msg.api          = _ap_instance->api; +        msg.api          = ai->api;          msg.has_port_id  = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        msg.port_id      = _ap_instance->flows[fd].port_id; +        msg.port_id      = ai->flows[fd].port_id; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); +        pthread_rwlock_unlock(&ai->flows_lock);          msg.has_response = true;          msg.response     = response;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -1;          }          if (!recv_msg->has_result) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ret = recv_msg->result; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -320,11 +333,11 @@ int flow_alloc(char * dst_name,          msg.ae_name     = src_ae_name;          msg.has_api     = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); +        pthread_rwlock_rdlock(&ai->data_lock); -        msg.api         = _ap_instance->api; +        msg.api         = ai->api; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { @@ -336,32 +349,34 @@ int flow_alloc(char * dst_name,                  return -1;          } -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai->flows_lock); -        fd = bmp_allocate(_ap_instance->fds); -        if (!bmp_is_id_valid(_ap_instance->fds, fd)) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        fd = bmp_allocate(ai->fds); +        if (!bmp_is_id_valid(ai->fds, fd)) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); -        if (_ap_instance->flows[fd].rb == NULL) { -                bmp_release(_ap_instance->fds, fd); -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); +        if (ai->flows[fd].rb == NULL) { +                bmp_release(ai->fds, fd); +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        _ap_instance->flows[fd].port_id = recv_msg->port_id; -        _ap_instance->flows[fd].oflags  = FLOW_O_DEFAULT; -        _ap_instance->flows[fd].api     = recv_msg->api; +        ai->flows[fd].port_id = recv_msg->port_id; +        ai->flows[fd].oflags  = FLOW_O_DEFAULT; +        ai->flows[fd].api     = recv_msg->api; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        ai->ports[recv_msg->port_id] = fd; + +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -380,19 +395,19 @@ int flow_alloc_res(int fd)          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;          msg.has_port_id  = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        msg.port_id = _ap_instance->flows[fd].port_id; +        msg.port_id = ai->flows[fd].port_id; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock);          recv_msg = send_recv_irm_msg_b(&msg);          if (recv_msg == NULL) { @@ -422,41 +437,43 @@ int flow_dealloc(int fd)          msg.has_api      = true;          msg.api          = getpid(); -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        msg.port_id = _ap_instance->flows[fd].port_id; +        msg.port_id = ai->flows[fd].port_id; + +        ai->ports[msg.port_id] = -1; -        _ap_instance->flows[fd].port_id = -1; -        shm_ap_rbuff_close(_ap_instance->flows[fd].rb); -        _ap_instance->flows[fd].rb = NULL; -        _ap_instance->flows[fd].api = -1; +        ai->flows[fd].port_id = -1; +        shm_ap_rbuff_close(ai->flows[fd].rb); +        ai->flows[fd].rb = NULL; +        ai->flows[fd].api = -1; -        bmp_release(_ap_instance->fds, fd); +        bmp_release(ai->fds, fd); -        pthread_rwlock_unlock(&_ap_instance->flows_lock); +        pthread_rwlock_unlock(&ai->flows_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -1;          }          if (!recv_msg->has_result) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ret = recv_msg->result; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -470,30 +487,30 @@ int flow_cntl(int fd, int cmd, int oflags)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        old = _ap_instance->flows[fd].oflags; +        old = ai->flows[fd].oflags;          switch (cmd) {          case FLOW_F_GETFL: /* GET FLOW FLAGS */ -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return old;          case FLOW_F_SETFL: /* SET FLOW FLAGS */ -                _ap_instance->flows[fd].oflags = oflags; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                ai->flows[fd].oflags = oflags; +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return old;          default: -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return FLOW_O_INVALID; /* unknown command */          }  } @@ -509,42 +526,42 @@ ssize_t flow_write(int fd, void * buf, size_t count)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_rdrbuff_write(_ap_instance->rdrb, -                                       _ap_instance->flows[fd].api, +        if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { +                idx = shm_rdrbuff_write(ai->rdrb, +                                       ai->flows[fd].api,                                         DU_BUFF_HEADSPACE,                                         DU_BUFF_TAILSPACE,                                         (uint8_t *) buf,                                         count);                  if (idx == -1) { -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ap_instance->data_lock); +                        pthread_rwlock_unlock(&ai->flows_lock); +                        pthread_rwlock_unlock(&ai->data_lock);                          return -EAGAIN;                  }                  e.index   = idx; -                e.port_id = _ap_instance->flows[fd].port_id; +                e.port_id = ai->flows[fd].port_id; -                if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { -                        shm_rdrbuff_remove(_ap_instance->rdrb, idx); -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ap_instance->data_lock); +                if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) { +                        shm_rdrbuff_remove(ai->rdrb, idx); +                        pthread_rwlock_unlock(&ai->flows_lock); +                        pthread_rwlock_unlock(&ai->data_lock);                          return -1;                  }          } else { /* blocking */ -                struct shm_rdrbuff * rdrb = _ap_instance->rdrb; -                pid_t                api = _ap_instance->flows[fd].api; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                struct shm_rdrbuff * rdrb = ai->rdrb; +                pid_t                api = ai->flows[fd].api; +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  idx = shm_rdrbuff_write_b(rdrb,                                           api, @@ -553,30 +570,22 @@ ssize_t flow_write(int fd, void * buf, size_t count)                                           (uint8_t *) buf,                                           count); -                pthread_rwlock_rdlock(&_ap_instance->data_lock); -                pthread_rwlock_rdlock(&_ap_instance->flows_lock); +                pthread_rwlock_rdlock(&ai->data_lock); +                pthread_rwlock_rdlock(&ai->flows_lock);                  e.index   = idx; -                e.port_id = _ap_instance->flows[fd].port_id; +                e.port_id = ai->flows[fd].port_id; -                while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) +                while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0)                          ;          } -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock);          return 0;  } -int flow_select(const struct timespec * timeout) -{ -        int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout); -        if (port_id < 0) -                return port_id; -        return port_id_to_fd(port_id); -} -  ssize_t flow_read(int fd, void * buf, size_t count)  {          int idx = -1; @@ -586,47 +595,129 @@ ssize_t flow_read(int fd, void * buf, size_t count)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_ap_rbuff_read_port(_ap_instance->rb, -                                             _ap_instance->flows[fd].port_id); -                pthread_rwlock_unlock(&_ap_instance->flows_lock); +        if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { +                idx = shm_ap_rbuff_read_port(ai->rb, +                                             ai->flows[fd].port_id); +                pthread_rwlock_unlock(&ai->flows_lock);          } else { -                struct shm_ap_rbuff * rb      = _ap_instance->rb; -                int                   port_id = _ap_instance->flows[fd].port_id; -                struct timespec *     timeout = _ap_instance->flows[fd].timeout; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                struct shm_ap_rbuff * rb      = ai->rb; +                int                   port_id = ai->flows[fd].port_id; +                struct timespec *     timeout = ai->flows[fd].timeout; +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); -                pthread_rwlock_rdlock(&_ap_instance->data_lock); +                pthread_rwlock_rdlock(&ai->data_lock);          }          if (idx < 0) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -EAGAIN;          } -        n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx); +        n = shm_rdrbuff_read(&sdu, ai->rdrb, idx);          if (n < 0) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -1;          }          memcpy(buf, sdu, MIN(n, count)); -        shm_rdrbuff_remove(_ap_instance->rdrb, idx); +        shm_rdrbuff_remove(ai->rdrb, idx); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          return n;  } + +/* select functions */ + +struct flow_set * flow_set_create() +{ +        struct flow_set * set = malloc(sizeof(*set)); +        if (set == NULL) +                return NULL; + +        if (pthread_rwlock_init(&set->lock, NULL)) { +                free(set); +                return NULL; +        } + +        memset(&set->b, 0, sizeof(set->b)); + +        set->dirty = true; + +        return set; +} + +void flow_set_zero(struct flow_set * set) +{ +        pthread_rwlock_wrlock(&set->lock); +        memset(&set->b, 0, sizeof(set->b)); +        set->dirty = true; +        pthread_rwlock_unlock(&set->lock); +} + +void flow_set_add(struct flow_set * set, int fd) +{ +        pthread_rwlock_wrlock(&set->lock); +        set->b[ai->flows[fd].port_id] = true; +        set->dirty = true; +        pthread_rwlock_unlock(&set->lock); +} + +void flow_set_del(struct flow_set * set, int fd) +{ +        pthread_rwlock_wrlock(&set->lock); +        set->b[ai->flows[fd].port_id] = false; +        set->dirty = true; +        pthread_rwlock_unlock(&set->lock); +} + +bool flow_set_has(struct flow_set * set, int fd) +{ +        bool ret; +        pthread_rwlock_rdlock(&set->lock); +        ret = set->b[ai->flows[fd].port_id]; +        pthread_rwlock_unlock(&set->lock); +        return ret; +} + +void flow_set_destroy(struct flow_set * set) +{ +        pthread_rwlock_destroy(&set->lock); +        free(set); +} + +static void flow_set_cpy(struct flow_set * set) +{ +        pthread_rwlock_rdlock(&set->lock); +        if (set->dirty) +                memcpy(set->s, set->b, IRMD_MAX_FLOWS); +        set->dirty = false; +        pthread_rwlock_unlock(&set->lock); +} + +int flow_select(struct flow_set * set, const struct timespec * timeout) +{ +        int port_id; +        if (set == NULL) { +                port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout); +        } else { +                flow_set_cpy(set); +                port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout); +        } +        if (port_id < 0) +                return port_id; +        return ai->ports[port_id]; +} diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c index 75ee2085..81bed687 100644 --- a/src/lib/lockfile.c +++ b/src/lib/lockfile.c @@ -43,10 +43,13 @@ struct lockfile {  };  struct lockfile * lockfile_create() { +        mode_t mask;          struct lockfile * lf = malloc(sizeof(*lf));          if (lf == NULL)                  return NULL; +        mask = umask(0); +          lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666);          if (lf->fd == -1) {                  LOG_DBGF("Could not create lock file."); @@ -54,11 +57,7 @@ struct lockfile * lockfile_create() {                  return NULL;          } -        if (fchmod(lf->fd, 0666)) { -                LOG_DBGF("Failed to chmod lockfile."); -                free(lf); -                return NULL; -        } +        umask(mask);          if (ftruncate(lf->fd, LF_SIZE - 1) < 0) {                  LOG_DBGF("Failed to extend lockfile."); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 77e288a8..473894d5 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -40,6 +40,10 @@  #include <signal.h>  #include <sys/stat.h> +#define FN_MAX_CHARS 255 +#define NORTH false +#define SOUTH true +  #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry)         \                               + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \                               + 2 * sizeof (pthread_cond_t)) @@ -59,19 +63,24 @@ struct shm_ap_rbuff {          pthread_cond_t *  add;         /* SDU arrived */          pthread_cond_t *  del;         /* SDU removed */          pid_t             api;         /* api to which this rb belongs */ +        bool              dir;         /* direction, false = N */          int               fd;  }; -struct shm_ap_rbuff * shm_ap_rbuff_create() +static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)  {          struct shm_ap_rbuff * rb;          int                   shm_fd;          struct rb_entry *     shm_base;          pthread_mutexattr_t   mattr;          pthread_condattr_t    cattr; -        char                  fn[25]; +        char                  fn[FN_MAX_CHARS]; +        mode_t                mask; -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); +        if (dir == SOUTH) +                sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid()); +        else +                sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid());          rb = malloc(sizeof(*rb));          if (rb == NULL) { @@ -79,6 +88,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()                  return NULL;          } +        mask = umask(0); +          shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);          if (shm_fd == -1) {                  LOG_DBG("Failed creating ring buffer."); @@ -86,11 +97,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()                  return NULL;          } -        if (fchmod(shm_fd, 0666)) { -                LOG_DBG("Failed to chmod shared memory."); -                free(rb); -                return NULL; -        } +        umask(mask);          if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) {                  LOG_DBG("Failed to extend ringbuffer."); @@ -150,18 +157,22 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()          rb->fd  = shm_fd;          rb->api = getpid(); +        rb->dir = dir;          return rb;  } -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) +static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)  {          struct shm_ap_rbuff * rb;          int                   shm_fd;          struct rb_entry *     shm_base; -        char                  fn[25]; +        char                  fn[FN_MAX_CHARS]; -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); +        if (dir == SOUTH) +                sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api); +        else +                sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api);          rb = malloc(sizeof(*rb));          if (rb == NULL) { @@ -204,9 +215,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)          rb->fd = shm_fd;          rb->api = api; +        rb->dir = dir;          return rb;  } + +struct shm_ap_rbuff * shm_ap_rbuff_create_n() +{ +        return shm_ap_rbuff_create(NORTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_create_s() +{ +        return shm_ap_rbuff_create(SOUTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api) +{ +        return shm_ap_rbuff_open(api, NORTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api) +{ +        return shm_ap_rbuff_open(api, SOUTH); +} +  void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)  {          if (rb == NULL) { @@ -252,7 +285,10 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)          if (close(rb->fd) < 0)                  LOG_DBG("Couldn't close shared memory."); -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); +        if (rb->dir == SOUTH) +                sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api); +        else +                sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api);          if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)                  LOG_DBG("Couldn't unmap shared memory."); @@ -311,15 +347,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)                  return -1;          } -        ret = (rb->shm_base + *rb->ptr_tail)->index; +        ret = tail_el_ptr(rb)->index;          pthread_mutex_unlock(rb->lock);          return ret;  } -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, -                        const struct timespec * timeout) +static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, +                                   const struct timespec * timeout)  {          struct timespec abstime;          int ret = 0; @@ -360,7 +396,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,          }          if (ret != ETIMEDOUT) -                ret = (rb->shm_base + *rb->ptr_tail)->port_id; +                ret = tail_el_ptr(rb)->port_id; +        else +                ret = -ETIMEDOUT; + +        pthread_cleanup_pop(true); + +        return ret; +} + +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff *   rb, +                        bool *                  set, +                        const struct timespec * timeout) +{ +        struct timespec abstime; +        int ret; + +        if (set == NULL) +                return shm_ap_rbuff_peek_b_all(rb, timeout); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) +               && (ret != ETIMEDOUT)) { +                while (shm_rbuff_empty(rb)) { +                        if (timeout != NULL) +                                ret = pthread_cond_timedwait(rb->add, +                                                             rb->lock, +                                                             &abstime); +                        else +                                ret = pthread_cond_wait(rb->add, rb->lock); + +#ifndef __APPLE__ +                        if (ret == EOWNERDEAD) { +                                LOG_DBG("Recovering dead mutex."); +                                pthread_mutex_consistent(rb->lock); +                        } +#endif +                        if (ret == ETIMEDOUT) +                                break; +                } + +                while (!set[tail_el_ptr(rb)->port_id]) { +                        if (timeout != NULL) +                                ret = pthread_cond_timedwait(rb->del, +                                                             rb->lock, +                                                             &abstime); +                        else +                                ret = pthread_cond_wait(rb->del, rb->lock); + +#ifndef __APPLE__ +                        if (ret == EOWNERDEAD) { +                                LOG_DBG("Recovering dead mutex."); +                                pthread_mutex_consistent(rb->lock); +                        } +#endif +                        if (ret == ETIMEDOUT) +                                break; +                } +        } + +        if (ret != ETIMEDOUT) +                ret = tail_el_ptr(rb)->port_id;          else                  ret = -ETIMEDOUT; @@ -369,6 +480,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,          return ret;  } +  struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)  {          struct rb_entry * e = NULL; @@ -434,8 +546,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)          return idx;  } -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, -                                 int port_id, +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb, +                                 int                     port_id,                                   const struct timespec * timeout)  {          struct timespec abstime; diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index b0d295d9..7c4927fc 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -139,13 +139,13 @@ static char * rdrb_filename(enum qos_cube qos)                  ++chars;          } while (qm > 0); -        str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2); +        str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1);          if (str == NULL) {                  LOG_ERR("Failed to create shm_rdrbuff: Out of Memory.");                  return NULL;          } -        sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos); +        sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos);          return str;  } @@ -154,6 +154,7 @@ static char * rdrb_filename(enum qos_cube qos)  struct shm_rdrbuff * shm_rdrbuff_create()  {          struct shm_rdrbuff * rdrb; +        mode_t               mask;          int                  shm_fd;          uint8_t *            shm_base;          pthread_mutexattr_t  mattr; @@ -171,6 +172,8 @@ struct shm_rdrbuff * shm_rdrbuff_create()                  return NULL;          } +        mask = umask(0); +          shm_fd = shm_open(shm_rdrb_fn, O_CREAT | O_EXCL | O_RDWR, 0666);          if (shm_fd == -1) {                  LOG_DBGF("Failed creating shared memory map."); @@ -179,12 +182,7 @@ struct shm_rdrbuff * shm_rdrbuff_create()                  return NULL;          } -        if (fchmod(shm_fd, 0666)) { -                LOG_DBGF("Failed to chmod shared memory map."); -                free(shm_rdrb_fn); -                free(rdrb); -                return NULL; -        } +        umask(mask);          if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) {                  LOG_DBGF("Failed to extend shared memory map."); @@ -469,7 +467,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,  #ifndef SHM_RDRB_MULTI_BLOCK          if (sz > SHM_RDRB_BLOCK_SIZE) { -                LOG_DBGF("Multi-block SDU's disabled. Dropping."); +                LOG_DBGF("Multi-block SDUs disabled. Dropping.");                  return -1;          }  #endif @@ -558,7 +556,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,  #ifndef SHM_RDRB_MULTI_BLOCK          if (sz > SHM_RDRB_BLOCK_SIZE) { -                LOG_DBGF("Multi-block SDU's disabled. Dropping."); +                LOG_DBGF("Multi-block SDUs disabled. Dropping.");                  return -1;          }  #endif diff --git a/src/tools/cbr/cbr.c b/src/tools/cbr/cbr.c index e42492df..27c51586 100644 --- a/src/tools/cbr/cbr.c +++ b/src/tools/cbr/cbr.c @@ -44,7 +44,7 @@ struct s {  static void usage(void)  {          printf("Usage: cbr [OPTION]...\n" -               "Sends SDU's from client to server at a constant bit rate.\n\n" +               "Sends SDUs from client to server at a constant bit rate.\n\n"                 "  -l, --listen              Run in server mode\n"                 "\n"                 "Server options:\n" @@ -54,10 +54,10 @@ static void usage(void)                 "Client options:\n"                 "  -n, --server_apn          Specify the name of the server.\n"                 "  -d, --duration            Duration for sending (s)\n" -               "  -f, --flood               Send SDU's as fast as possible\n" +               "  -f, --flood               Send SDUs as fast as possible\n"                 "  -s, --size                SDU size (B)\n"                 "  -r, --rate                Rate (b/s)\n" -               "      --sleep               Sleep in between sending sdu's\n" +               "      --sleep               Sleep in between sending SDUs\n"                 "\n\n"                 "      --help                Display this help text and exit\n");  } @@ -65,7 +65,7 @@ static void usage(void)  int main(int argc, char ** argv)  {          int    duration = 60;      /* One minute test */ -        int    size = 1000;    /* 1000 byte SDU's */ +        int    size = 1000;    /* 1000 byte SDUs */          long   rate = 1000000; /* 1 Mb/s */          bool   flood = false;          bool   sleep = false; diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 2871e79e..7d2edf33 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -23,6 +23,9 @@  #define _POSIX_C_SOURCE 199506L +#include <ouroboros/select.h> +#include <ouroboros/dev.h> +  #include <stdio.h>  #include <string.h>  #include <pthread.h> @@ -59,9 +62,9 @@ struct c {  } client;  struct s { -        struct timespec times[OPING_MAX_FLOWS]; -        bool            flows[OPING_MAX_FLOWS]; -        pthread_mutex_t lock; +        struct timespec   times[OPING_MAX_FLOWS]; +        struct flow_set * flows; +        pthread_mutex_t   lock;          pthread_t cleaner_pt;          pthread_t accept_pt; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 0d4a10af..3a254984 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -65,7 +65,7 @@ void * reader(void * o)          /* FIXME: use flow timeout option once we have it */          while(client.rcvd != client.count && -              (fd = flow_select(&timeout)) != -ETIMEDOUT) { +              (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) {                  flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK);                  while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {                          if (msg_len < 0) @@ -216,7 +216,7 @@ int client_main()          printf("\n");          printf("--- %s ping statistics ---\n", client.s_apn); -        printf("%d SDU's transmitted, ", client.sent); +        printf("%d SDUs transmitted, ", client.sent);          printf("%d received, ", client.rcvd);          printf("%d%% packet loss, ", client.sent == 0 ? 0 :                 100 - ((100 * client.rcvd) / client.sent)); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 7761110d..9c7b1be7 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -21,8 +21,6 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#include <ouroboros/dev.h> -  #ifdef __FreeBSD__  #define __XSI_VISIBLE 500  #endif @@ -53,9 +51,9 @@ void * cleaner_thread(void * o)                  clock_gettime(CLOCK_REALTIME, &now);                  pthread_mutex_lock(&server.lock);                  for (i = 0; i < OPING_MAX_FLOWS; ++i) -                        if (server.flows[i] && +                        if (flow_set_has(server.flows, i) &&                              ts_diff_ms(&server.times[i], &now) > deadline_ms) { -                                server.flows[i] = false; +                                flow_set_del(server.flows, i);                                  flow_dealloc(i);                          } @@ -70,10 +68,16 @@ void * server_thread(void *o)          int msg_len = 0;          struct oping_msg * msg = (struct oping_msg *) buf;          struct timespec now = {0, 0}; +        struct timespec timeout = {0, 100 * MILLION};          while (true) { - -                int fd = flow_select(NULL); +                int fd = flow_select(server.flows, &timeout); +                if (fd == -ETIMEDOUT) +                        continue; +                if (fd < 0) { +                        printf("Failed to get active fd.\n"); +                        continue; +                }                  while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {                          if (msg_len < 0)                                  continue; @@ -126,7 +130,7 @@ void * accept_thread(void * o)                  clock_gettime(CLOCK_REALTIME, &now);                  pthread_mutex_lock(&server.lock); -                server.flows[fd] = true; +                flow_set_add(server.flows, fd);                  server.times[fd] = now;                  pthread_mutex_unlock(&server.lock); @@ -139,7 +143,6 @@ void * accept_thread(void * o)  int server_main()  {          struct sigaction sig_act; -        int i = 0;          memset(&sig_act, 0, sizeof sig_act);          sig_act.sa_sigaction = &shutdown_server; @@ -153,8 +156,11 @@ int server_main()                  return -1;          } -        for (i = 0; i < OPING_MAX_FLOWS; ++i) -                server.flows[i] = false; +        server.flows = flow_set_create(); +        if (server.flows == NULL) +                return 0; + +        flow_set_zero(server.flows);          pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);          pthread_create(&server.accept_pt, NULL, accept_thread, NULL);  | 
