diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/lib/dev.c | 500 | ||||
| -rw-r--r-- | src/lib/lockfile.c | 39 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 661 | ||||
| -rw-r--r-- | src/lib/shm_flow_set.c | 407 | ||||
| -rw-r--r-- | src/lib/shm_rbuff.c | 405 | ||||
| -rw-r--r-- | src/lib/shm_rdrbuff.c | 29 | 
7 files changed, 1165 insertions, 879 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index b94d0eea..20ea473d 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -35,7 +35,8 @@ set(SOURCE_FILES    lockfile.c    logs.c    nsm.c -  shm_ap_rbuff.c +  shm_flow_set.c +  shm_rbuff.c    shm_rdrbuff.c    sockets.c    time_utils.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 77c2d06a..146070b7 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -3,7 +3,8 @@   *   * API for applications   * - *    Sander Vrijders <sander.vrijders@intec.ugent.be> + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be>   *   * This program is free software; you can redistribute it and/or modify   * it under the terms of the GNU General Public License as published by @@ -26,20 +27,24 @@  #include <ouroboros/sockets.h>  #include <ouroboros/fcntl.h>  #include <ouroboros/bitmap.h> +#include <ouroboros/shm_flow_set.h>  #include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_rbuff.h>  #include <ouroboros/utils.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h>  #include <stdlib.h>  #include <string.h>  #include <stdio.h>  struct flow_set { -        bool dirty; -        bool b[IRMD_MAX_FLOWS]; /* working copy */ -        bool s[IRMD_MAX_FLOWS]; /* safe copy */ -        pthread_rwlock_t lock; +        size_t idx; +}; + +struct fqueue { +        int    fqueue[SHM_BUFFER_SIZE]; /* safe copy from shm */ +        size_t fqsize; +        size_t next;  };  enum port_state { @@ -124,7 +129,9 @@ enum port_state port_wait_assign(struct port * p)  }  struct flow { -        struct shm_ap_rbuff * rb; +        struct shm_rbuff *    rx_rb; +        struct shm_rbuff *    tx_rb; +        struct shm_flow_set * set;          int                   port_id;          int                   oflags; @@ -139,10 +146,11 @@ struct {          pid_t                 api;          struct shm_rdrbuff *  rdrb; -        struct shm_ap_rbuff * rb; +        struct shm_flow_set * fqset;          pthread_rwlock_t      data_lock;          struct bmp *          fds; +        struct bmp *          fqueues;          struct flow *         flows;          struct port *         ports; @@ -194,40 +202,52 @@ int ap_init(char * ap_name)          if (ai.fds == NULL)                  return -ENOMEM; -        ai.rdrb = shm_rdrbuff_open(); -        if (ai.rdrb == NULL) { +        ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0); +        if (ai.fqueues == NULL) { +                bmp_destroy(ai.fds); +                return -ENOMEM; +        } + +        ai.fqset = shm_flow_set_create(); +        if (ai.fqset == NULL) { +                bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds);                  return -1;          } -        ai.rb = shm_ap_rbuff_create(); -        if (ai.rb == NULL) { -                shm_rdrbuff_close(ai.rdrb); +        ai.rdrb = shm_rdrbuff_open(); +        if (ai.rdrb == NULL) { +                shm_flow_set_destroy(ai.fqset); +                bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds);                  return -1;          }          ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);          if (ai.flows == NULL) { -                shm_ap_rbuff_destroy(ai.rb);                  shm_rdrbuff_close(ai.rdrb); +                shm_flow_set_destroy(ai.fqset); +                bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds);                  return -1;          }          for (i = 0; i < AP_MAX_FLOWS; ++i) { -                ai.flows[i].rb = NULL; +                ai.flows[i].rx_rb   = NULL; +                ai.flows[i].tx_rb   = NULL; +                ai.flows[i].set     = NULL;                  ai.flows[i].port_id = -1; -                ai.flows[i].oflags = 0; -                ai.flows[i].api = -1; +                ai.flows[i].oflags  = 0; +                ai.flows[i].api     = -1;                  ai.flows[i].timeout = NULL;          }          ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); -        if (ai.flows == NULL) { +        if (ai.ports == NULL) {                  free(ai.flows); -                shm_ap_rbuff_destroy(ai.rb);                  shm_rdrbuff_close(ai.rdrb); +                shm_flow_set_destroy(ai.fqset); +                bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds);                  return -1;          } @@ -253,16 +273,10 @@ void ap_fini()          pthread_rwlock_wrlock(&ai.data_lock); -        /* remove all remaining sdus */ -        while ((i = shm_ap_rbuff_pop_idx(ai.rb)) >= 0) -                shm_rdrbuff_remove(ai.rdrb, i); - -        if (ai.fds != NULL) -                bmp_destroy(ai.fds); -        if (ai.rb != NULL) -                shm_ap_rbuff_destroy(ai.rb); -        if (ai.rdrb != NULL) -                shm_rdrbuff_close(ai.rdrb); +        bmp_destroy(ai.fds); +        bmp_destroy(ai.fqueues); +        shm_flow_set_destroy(ai.fqset); +        shm_rdrbuff_close(ai.rdrb);          if (ai.daf_name != NULL)                  free(ai.daf_name); @@ -270,8 +284,15 @@ void ap_fini()          pthread_rwlock_rdlock(&ai.flows_lock);          for (i = 0; i < AP_MAX_FLOWS; ++i) { -                if (ai.flows[i].rb != NULL) -                        shm_ap_rbuff_close(ai.flows[i].rb); +                if (ai.flows[i].tx_rb != NULL) { +                        int idx; +                        while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) +                                shm_rdrbuff_remove(ai.rdrb, idx); +                        shm_rbuff_close(ai.flows[i].rx_rb); +                        shm_rbuff_close(ai.flows[i].tx_rb); +                        shm_flow_set_close(ai.flows[i].set); +                } +                  if (ai.flows[i].timeout != NULL)                          free(ai.flows[i].timeout);          } @@ -328,8 +349,8 @@ int flow_accept(char ** ae_name)                  return -1;          } -        ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); -        if (ai.flows[fd].rb == NULL) { +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); +        if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -337,10 +358,24 @@ int flow_accept(char ** ae_name)                  return -1;          } +        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); +        if (ai.flows[fd].set == NULL) { +                bmp_release(ai.fds, fd); +                shm_rbuff_close(ai.flows[fd].rx_rb); +                shm_rbuff_close(ai.flows[fd].tx_rb); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +          if (ae_name != NULL) {                  *ae_name = strdup(recv_msg->ae_name);                  if (*ae_name == NULL) { -                        shm_ap_rbuff_close(ai.flows[fd].rb); +                        shm_rbuff_close(ai.flows[fd].tx_rb); +                        shm_rbuff_close(ai.flows[fd].tx_rb); +                        shm_flow_set_close(ai.flows[fd].set);                          bmp_release(ai.fds, fd);                          pthread_rwlock_unlock(&ai.flows_lock);                          pthread_rwlock_unlock(&ai.data_lock); @@ -356,8 +391,6 @@ int flow_accept(char ** ae_name)          ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; -        shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); -          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -410,6 +443,17 @@ int flow_alloc_resp(int fd, int response)          ret = recv_msg->result; +        pthread_rwlock_wrlock(&ai.flows_lock); + +        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, +                                            ai.flows[fd].port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -461,8 +505,11 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)                  return -1;          } -        ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); -        if (ai.flows[fd].rb == NULL) { +        ai.flows[fd].port_id = recv_msg->port_id; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT; +        ai.flows[fd].api     = recv_msg->api; +        ai.flows[fd].rx_rb   = shm_rbuff_open(ai.api, recv_msg->port_id); +        if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -470,18 +517,12 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)                  return -1;          } -        ai.flows[fd].port_id = recv_msg->port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = recv_msg->api; -          ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); -        shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); -          irm_msg__free_unpacked(recv_msg, NULL);          return fd; @@ -510,6 +551,23 @@ int flow_alloc_res(int fd)          msg.port_id = ai.flows[fd].port_id; +        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, +                                            ai.flows[fd].port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); +        if (ai.flows[fd].set == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } +          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -537,7 +595,7 @@ int flow_dealloc(int fd)          msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC;          msg.has_port_id  = true;          msg.has_api      = true; -        msg.api          = getpid(); +        msg.api          = ai.api;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_wrlock(&ai.flows_lock); @@ -548,7 +606,7 @@ int flow_dealloc(int fd)                  return -ENOTALLOC;          } -        if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) { +        if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return -EBUSY; @@ -559,8 +617,10 @@ int flow_dealloc(int fd)          port_destroy(&ai.ports[msg.port_id]);          ai.flows[fd].port_id = -1; -        shm_ap_rbuff_close(ai.flows[fd].rb); -        ai.flows[fd].rb = NULL; +        shm_rbuff_close(ai.flows[fd].rx_rb); +        ai.flows[fd].rx_rb = NULL; +        shm_rbuff_close(ai.flows[fd].tx_rb); +        ai.flows[fd].tx_rb = NULL;          ai.flows[fd].oflags = 0;          ai.flows[fd].api = -1;          if (ai.flows[fd].timeout != NULL) { @@ -604,9 +664,9 @@ int flow_cntl(int fd, int cmd, int oflags)          case FLOW_F_SETFL: /* SET FLOW FLAGS */                  ai.flows[fd].oflags = oflags;                  if (oflags & FLOW_O_WRONLY) -                        shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id); +                        shm_rbuff_block(ai.flows[fd].rx_rb);                  if (oflags & FLOW_O_RDWR) -                        shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id); +                        shm_rbuff_unblock(ai.flows[fd].rx_rb);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return old; @@ -620,7 +680,6 @@ int flow_cntl(int fd, int cmd, int oflags)  ssize_t flow_write(int fd, void * buf, size_t count)  {          ssize_t idx; -        struct rb_entry e;          if (buf == NULL)                  return 0; @@ -653,13 +712,10 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  if (idx < 0) {                          pthread_rwlock_unlock(&ai.flows_lock);                          pthread_rwlock_unlock(&ai.data_lock); -                        return -idx; +                        return idx;                  } -                e.index   = idx; -                e.port_id = ai.flows[fd].port_id; - -                if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { +                if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) {                          shm_rdrbuff_remove(ai.rdrb, idx);                          pthread_rwlock_unlock(&ai.flows_lock);                          pthread_rwlock_unlock(&ai.data_lock); @@ -667,7 +723,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  }          } else { /* blocking */                  struct shm_rdrbuff * rdrb = ai.rdrb; -                pid_t                api = ai.flows[fd].api; +                pid_t                api  = ai.flows[fd].api;                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -681,17 +737,16 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  pthread_rwlock_rdlock(&ai.data_lock);                  pthread_rwlock_rdlock(&ai.flows_lock); -                e.index   = idx; -                e.port_id = ai.flows[fd].port_id; - -                if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { -                        shm_rdrbuff_remove(ai.rdrb, e.index); +                if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { +                        shm_rdrbuff_remove(ai.rdrb, idx);                          pthread_rwlock_unlock(&ai.flows_lock);                          pthread_rwlock_unlock(&ai.data_lock);                          return -ENOTALLOC;                  }          } +        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); +          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -717,15 +772,14 @@ ssize_t flow_read(int fd, void * buf, size_t count)          }          if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id); +                idx = shm_rbuff_read(ai.flows[fd].rx_rb);                  pthread_rwlock_unlock(&ai.flows_lock);          } else { -                struct shm_ap_rbuff * rb      = ai.rb; -                int                   port_id = ai.flows[fd].port_id; -                struct timespec *     timeout = ai.flows[fd].timeout; +                struct shm_rbuff * rb     = ai.flows[fd].rx_rb; +                struct timespec * timeout = ai.flows[fd].timeout;                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); -                idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); +                idx = shm_rbuff_read_b(rb, timeout);                  pthread_rwlock_rdlock(&ai.data_lock);          } @@ -757,79 +811,163 @@ struct flow_set * flow_set_create()          if (set == NULL)                  return NULL; -        if (pthread_rwlock_init(&set->lock, NULL)) { +        assert(ai.fqueues); + +        set->idx = bmp_allocate(ai.fqueues); +        if (!bmp_is_id_valid(ai.fqueues, set->idx)) {                  free(set);                  return NULL;          } -        memset(set->b, 0, IRMD_MAX_FLOWS); -        memset(set->s, 0, IRMD_MAX_FLOWS); +        return set; +} -        set->dirty = true; +void flow_set_destroy(struct flow_set * set) +{ +        if (set == NULL) +                return; -        return set; +        flow_set_zero(set); +        bmp_release(ai.fqueues, set->idx); +        free(set);  } -void flow_set_zero(struct flow_set * set) +struct fqueue * fqueue_create()  { -        pthread_rwlock_wrlock(&set->lock); -        memset(set->b, 0, IRMD_MAX_FLOWS); -        set->dirty = true; -        pthread_rwlock_unlock(&set->lock); +        struct fqueue * fq = malloc(sizeof(*fq)); +        if (fq == NULL) +                return NULL; + +        memset(fq->fqueue, -1, SHM_BUFFER_SIZE); +        fq->fqsize = 0; +        fq->next   = 0; + +        return fq;  } -void flow_set_add(struct flow_set * set, int fd) +void fqueue_destroy(struct fqueue * fq)  { -        pthread_rwlock_wrlock(&set->lock); -        set->b[ai.flows[fd].port_id] = true; -        set->dirty = true; -        pthread_rwlock_unlock(&set->lock); +        if (fq == NULL) +                return +        free(fq);  } -void flow_set_del(struct flow_set * set, int fd) +void flow_set_zero(struct flow_set * set)  { -        pthread_rwlock_wrlock(&set->lock); -        set->b[ai.flows[fd].port_id] = false; -        set->dirty = true; -        pthread_rwlock_unlock(&set->lock); +        if (set == NULL) +                return; + +        pthread_rwlock_rdlock(&ai.data_lock); + +        shm_flow_set_zero(ai.fqset, set->idx); + +        pthread_rwlock_unlock(&ai.data_lock);  } -bool flow_set_has(struct flow_set * set, int fd) +int flow_set_add(struct flow_set * set, int fd)  { -        bool ret; -        pthread_rwlock_rdlock(&set->lock); -        ret = set->b[ai.flows[fd].port_id]; -        pthread_rwlock_unlock(&set->lock); +        int ret; + +        if (set == NULL) +                return -EINVAL; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); +          return ret;  } -void flow_set_destroy(struct flow_set * set) +void flow_set_del(struct flow_set * set, int fd)  { -        pthread_rwlock_destroy(&set->lock); -        free(set); +        if (set == NULL) +                return; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id >= 0) +                shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock);  } -static void flow_set_cpy(struct flow_set * set) +bool flow_set_has(struct flow_set * set, int fd)  { -        pthread_rwlock_rdlock(&set->lock); -        if (set->dirty) -                memcpy(set->s, set->b, IRMD_MAX_FLOWS); -        set->dirty = false; -        pthread_rwlock_unlock(&set->lock); +        bool ret = false; + +        if (set == NULL || fd < 0) +                return false; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return false; +        } + +        ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return ret;  } -int flow_select(struct flow_set * set, const struct timespec * timeout) +int fqueue_next(struct fqueue * fq)  { -        int port_id; -        if (set == NULL) { -                port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout); -        } else { -                flow_set_cpy(set); -                port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout); +        int fd; + +        if (fq == NULL) +                return -EINVAL; + +        if (fq->next == fq->fqsize) { +                fq->fqsize = 0; +                fq->next = 0; +                return -EPERM;          } -        if (port_id < 0) -                return port_id; -        return ai.ports[port_id].fd; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        fd = ai.ports[fq->fqueue[fq->next++]].fd; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return fd; +} + +int flow_event_wait(struct flow_set *       set, +                    struct fqueue *         fq, +                    const struct timespec * timeout) +{ +        int ret; + +        if (set == NULL) +                return -EINVAL; + +        if (fq->fqsize > 0) +                return 0; + +        ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); +        if (ret == -ETIMEDOUT) +                return -ETIMEDOUT; + +        if (ret < 0) +                return ret; + +        fq->fqsize = ret; +        fq->next   = 0; + +        return 0;  }  /* ipcp-dev functions */ @@ -848,8 +986,8 @@ int np1_flow_alloc(pid_t n_api, int port_id)                  return -1;          } -        ai.flows[fd].rb = shm_ap_rbuff_open(n_api); -        if (ai.flows[fd].rb == NULL) { +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); +        if (ai.flows[fd].rx_rb == NULL) {                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -863,8 +1001,6 @@ int np1_flow_alloc(pid_t n_api, int port_id)          ai.ports[port_id].fd = fd;          port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); -        shm_ap_rbuff_open_port(ai.rb, port_id); -          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -890,7 +1026,6 @@ int np1_flow_dealloc(int port_id)  int np1_flow_resp(pid_t n_api, int port_id)  {          int fd; -        struct shm_ap_rbuff * rb;          port_wait_assign(&ai.ports[port_id]); @@ -904,18 +1039,26 @@ int np1_flow_resp(pid_t n_api, int port_id)                  return fd;          } -        rb = shm_ap_rbuff_open(n_api); -        if (rb == NULL) { +        ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); +        if (ai.flows[fd].tx_rb == NULL) {                  ai.flows[fd].port_id = -1; +                shm_rbuff_close(ai.flows[fd].rx_rb);                  port_destroy(&ai.ports[port_id]);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return -1;          } -        ai.flows[fd].rb = rb; - -        shm_ap_rbuff_open_port(ai.rb, port_id); +        ai.flows[fd].set = shm_flow_set_open(n_api); +        if (ai.flows[fd].set == NULL) { +                shm_rbuff_close(ai.flows[fd].tx_rb); +                ai.flows[fd].port_id = -1; +                shm_rbuff_close(ai.flows[fd].rx_rb); +                port_destroy(&ai.ports[port_id]); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        }          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -929,9 +1072,9 @@ int ipcp_create_r(pid_t api)          irm_msg_t * recv_msg = NULL;          int ret = -1; -        msg.code         = IRM_MSG_CODE__IPCP_CREATE_R; -        msg.has_api      = true; -        msg.api          = api; +        msg.code    = IRM_MSG_CODE__IPCP_CREATE_R; +        msg.has_api = true; +        msg.api     = api;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) @@ -958,11 +1101,11 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)          if (dst_name == NULL || src_ae_name == NULL)                  return -EINVAL; -        msg.code          = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; -        msg.has_api       = true; -        msg.api           = api; -        msg.dst_name      = dst_name; -        msg.ae_name       = src_ae_name; +        msg.code     = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; +        msg.has_api  = true; +        msg.api      = api; +        msg.dst_name = dst_name; +        msg.ae_name  = src_ae_name;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_wrlock(&ai.flows_lock); @@ -974,7 +1117,7 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)                  return -1; /* -ENOMOREFDS */          } -        ai.flows[fd].rb    = NULL; +        ai.flows[fd].tx_rb    = NULL;          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -996,8 +1139,16 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_wrlock(&ai.flows_lock); +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); +        if (ai.flows[fd].rx_rb == NULL) { +                ai.flows[fd].port_id = -1; +                port_destroy(&ai.ports[port_id]); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } +          ai.flows[fd].port_id = port_id; -        ai.flows[fd].rb      = NULL;          ai.ports[port_id].fd = fd;          port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); @@ -1019,16 +1170,13 @@ int ipcp_flow_alloc_reply(int fd, int response)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        msg.port_id      = ai.flows[fd].port_id; +        msg.port_id = ai.flows[fd].port_id;          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock);          msg.has_response = true;          msg.response     = response; -        if (response) -                shm_ap_rbuff_open_port(ai.rb, msg.port_id); -          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; @@ -1039,6 +1187,26 @@ int ipcp_flow_alloc_reply(int fd, int response)          }          ret = recv_msg->result; + +        pthread_rwlock_wrlock(&ai.flows_lock); + +        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, +                                            ai.flows[fd].port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); +        if (ai.flows[fd].set == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        pthread_rwlock_unlock(&ai.flows_lock); +          irm_msg__free_unpacked(recv_msg, NULL);          return ret; @@ -1061,7 +1229,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb)          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); -        idx = shm_ap_rbuff_read_port(ai.rb, port_id); +        idx = shm_rbuff_read(ai.flows[fd].rx_rb);          if (idx < 0) {                  pthread_rwlock_rdlock(&ai.data_lock);                  pthread_rwlock_rdlock(&ai.flows_lock); @@ -1081,7 +1249,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb)  int ipcp_flow_write(int fd, struct shm_du_buff * sdb)  { -        struct rb_entry e; +        ssize_t idx;          if (sdb == NULL)                  return -EINVAL; @@ -1095,16 +1263,16 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)                  return -EPERM;          } -        if (ai.flows[fd].rb == NULL) { +        if (ai.flows[fd].tx_rb == NULL) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return -EPERM;          } -        e.index = shm_du_buff_get_idx(sdb); -        e.port_id = ai.flows[fd].port_id; +        idx = shm_du_buff_get_idx(sdb); -        shm_ap_rbuff_write(ai.flows[fd].rb, &e); +        shm_rbuff_write(ai.flows[fd].tx_rb, idx); +        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -1112,46 +1280,28 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)          return 0;  } -struct rb_entry * local_flow_read(int fd) +ssize_t local_flow_read(int fd)  { -        int port_id; -        struct rb_entry * e = NULL; - -        pthread_rwlock_rdlock(&ai.data_lock); -        pthread_rwlock_rdlock(&ai.flows_lock); - -        port_id = ai.flows[fd].port_id; - -        pthread_rwlock_unlock(&ai.flows_lock); -        pthread_rwlock_unlock(&ai.data_lock); - -        if (port_id != -1) { -                e = malloc(sizeof(*e)); -                if (e == NULL) -                        return NULL; -                e->index = shm_ap_rbuff_read_port(ai.rb, port_id); -        } - -        return e; +        return shm_rbuff_read(ai.flows[fd].rx_rb);  } -int local_flow_write(int fd, struct rb_entry * e) +int local_flow_write(int fd, ssize_t idx)  { -        if (e == NULL || fd < 0) +        if (fd < 0)                  return -EINVAL;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai.flows[fd].rb == NULL) { +        if (ai.flows[fd].tx_rb == NULL) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock);                  return -EPERM;          } -        e->port_id = ai.flows[fd].port_id; +        shm_rbuff_write(ai.flows[fd].tx_rb, idx); -        shm_ap_rbuff_write(ai.flows[fd].rb, e); +        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -1159,22 +1309,26 @@ int local_flow_write(int fd, struct rb_entry * e)          return 0;  } -int ipcp_read_shim(struct shm_du_buff ** sdb) +int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)  { -        int fd; -        struct rb_entry * e = shm_ap_rbuff_read(ai.rb); +        ssize_t idx;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        fd = ai.ports[e->port_id].fd; +        if (ai.flows[fd].rx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -EPERM; +        } -        *sdb = shm_rdrbuff_get(ai.rdrb, e->index); +        idx = shm_rbuff_read(ai.flows[fd].rx_rb); +        *sdb = shm_rdrbuff_get(ai.rdrb, idx);          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); -        return fd; +        return 0;  }  void ipcp_flow_del(struct shm_du_buff * sdb) diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c index 04ce9324..a0222f18 100644 --- a/src/lib/lockfile.c +++ b/src/lib/lockfile.c @@ -39,10 +39,10 @@  struct lockfile {          pid_t * api; -        int fd;  };  struct lockfile * lockfile_create() { +        int fd;          mode_t mask;          struct lockfile * lf = malloc(sizeof(*lf));          if (lf == NULL) @@ -50,8 +50,8 @@ struct lockfile * lockfile_create() {          mask = umask(0); -        lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); -        if (lf->fd == -1) { +        fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (fd == -1) {                  LOG_DBGF("Could not create lock file.");                  free(lf);                  return NULL; @@ -59,30 +59,24 @@ struct lockfile * lockfile_create() {          umask(mask); -        if (ftruncate(lf->fd, LF_SIZE - 1) < 0) { +        if (ftruncate(fd, LF_SIZE - 1) < 0) {                  LOG_DBGF("Failed to extend lockfile.");                  free(lf);                  return NULL;          } -#ifndef __APPLE__ -        if (write(lf->fd, "", 1) != 1) { -                LOG_DBGF("Failed to finalise lockfile."); -                free(lf); -                return NULL; -        } -#endif +          lf->api = mmap(NULL,                         LF_SIZE, PROT_READ | PROT_WRITE,                         MAP_SHARED, -                       lf->fd, +                       fd,                         0); +        close (fd); +          if (lf->api == MAP_FAILED) {                  LOG_DBGF("Failed to map lockfile."); -                  if (shm_unlink(LOCKFILE_NAME) == -1)                          LOG_DBGF("Failed to remove invalid lockfile."); -                  free(lf);                  return NULL;          } @@ -93,12 +87,13 @@ struct lockfile * lockfile_create() {  }  struct lockfile * lockfile_open() { +        int fd;          struct lockfile * lf = malloc(sizeof(*lf));          if (lf == NULL)                  return NULL; -        lf->fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); -        if (lf->fd < 0) { +        fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); +        if (fd < 0) {                  LOG_DBGF("Could not open lock file.");                  free(lf);                  return NULL; @@ -107,15 +102,15 @@ struct lockfile * lockfile_open() {          lf->api = mmap(NULL,                         LF_SIZE, PROT_READ | PROT_WRITE,                         MAP_SHARED, -                       lf->fd, +                       fd,                         0); +        close(fd); +          if (lf->api == MAP_FAILED) {                  LOG_DBGF("Failed to map lockfile."); -                  if (shm_unlink(LOCKFILE_NAME) == -1)                          LOG_DBGF("Failed to remove invalid lockfile."); -                  free(lf);                  return NULL;          } @@ -130,9 +125,6 @@ void lockfile_close(struct lockfile * lf)                  return;          } -        if (close(lf->fd) < 0) -                LOG_DBGF("Couldn't close lockfile."); -          if (munmap(lf->api, LF_SIZE) == -1)                  LOG_DBGF("Couldn't unmap lockfile."); @@ -151,9 +143,6 @@ void lockfile_destroy(struct lockfile * lf)                  return;          } -        if (close(lf->fd) < 0) -                LOG_DBGF("Couldn't close lockfile."); -          if (munmap(lf->api, LF_SIZE) == -1)                  LOG_DBGF("Couldn't unmap lockfile."); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c deleted file mode 100644 index 5cbf5bd0..00000000 --- a/src/lib/shm_ap_rbuff.c +++ /dev/null @@ -1,661 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Ring buffer for application processes - * - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <ouroboros/config.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/lockfile.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> - -#define OUROBOROS_PREFIX "shm_ap_rbuff" - -#include <ouroboros/logs.h> - -#include <pthread.h> -#include <sys/mman.h> -#include <fcntl.h> -#include <stdlib.h> -#include <string.h> -#include <stdint.h> -#include <unistd.h> -#include <signal.h> -#include <sys/stat.h> -#include <assert.h> - -#define FN_MAX_CHARS 255 - -#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry)         \ -                             + IRMD_MAX_FLOWS * sizeof(int8_t)                 \ -                             + IRMD_MAX_FLOWS * sizeof (ssize_t)               \ -                             + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \ -                             + 2 * sizeof (pthread_cond_t)) - -#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail)   \ -                          & (SHM_BUFFER_SIZE - 1)) -#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) -#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) - -struct shm_ap_rbuff { -        struct rb_entry * shm_base; /* start of entry                */ -        size_t *          head;     /* start of ringbuffer head      */ -        size_t *          tail;     /* start of ringbuffer tail      */ -        int8_t *          acl;      /* start of port_id access table */ -        ssize_t *         cntrs;    /* start of port_id counters     */ -        pthread_mutex_t * lock;     /* lock all free space in shm    */ -        pthread_cond_t *  add;      /* SDU arrived                   */ -        pthread_cond_t *  del;      /* SDU removed                   */ -        pid_t             api;      /* api to which this rb belongs  */ -        int               fd; -}; - -struct shm_ap_rbuff * shm_ap_rbuff_create() -{ -        struct shm_ap_rbuff * rb; -        int                   shm_fd; -        struct rb_entry *     shm_base; -        pthread_mutexattr_t   mattr; -        pthread_condattr_t    cattr; -        char                  fn[FN_MAX_CHARS]; -        mode_t                mask; -        int                   i; - -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); - -        rb = malloc(sizeof(*rb)); -        if (rb == NULL) { -                LOG_DBG("Could not allocate struct."); -                return NULL; -        } - -        mask = umask(0); - -        shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); -        if (shm_fd == -1) { -                LOG_DBG("Failed creating ring buffer."); -                free(rb); -                return NULL; -        } - -        umask(mask); - -        if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { -                LOG_DBG("Failed to extend ringbuffer."); -                free(rb); -                return NULL; -        } -#ifndef __APPLE__ -        if (write(shm_fd, "", 1) != 1) { -                LOG_DBG("Failed to finalise extension of ringbuffer."); -                free(rb); -                return NULL; -        } -#endif -        shm_base = mmap(NULL, -                        SHM_RBUFF_FILE_SIZE, -                        PROT_READ | PROT_WRITE, -                        MAP_SHARED, -                        shm_fd, -                        0); - -        if (shm_base == MAP_FAILED) { -                LOG_DBG("Failed to map shared memory."); -                if (close(shm_fd) == -1) -                        LOG_DBG("Failed to close invalid shm."); - -                if (shm_unlink(fn) == -1) -                        LOG_DBG("Failed to remove invalid shm."); - -                free(rb); -                return NULL; -        } - -        rb->shm_base = shm_base; -        rb->head     = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); -        rb->tail     = rb->head + 1; -        rb->acl      = (int8_t *) (rb->tail + 1); -        rb->cntrs    = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); -        rb->lock     = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); -        rb->add      = (pthread_cond_t *) (rb->lock + 1); -        rb->del      = rb->add + 1; - -        pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ -        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); -#endif -        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); -        pthread_mutex_init(rb->lock, &mattr); - -        pthread_condattr_init(&cattr); -        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ -        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif -        for (i = 0; i < IRMD_MAX_FLOWS; ++i) { -                rb->cntrs[i] = 0; -                rb->acl[i] = -1; -        } - -        pthread_cond_init(rb->add, &cattr); -        pthread_cond_init(rb->del, &cattr); - -        *rb->head = 0; -        *rb->tail = 0; - -        rb->fd  = shm_fd; -        rb->api = getpid(); - -        return rb; -} - -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) -{ -        struct shm_ap_rbuff * rb; -        int                   shm_fd; -        struct rb_entry *     shm_base; -        char                  fn[FN_MAX_CHARS]; - -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); - -        rb = malloc(sizeof(*rb)); -        if (rb == NULL) { -                LOG_DBG("Could not allocate struct."); -                return NULL; -        } - -        shm_fd = shm_open(fn, O_RDWR, 0666); -        if (shm_fd == -1) { -                LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); -                free(rb); -                return NULL; -        } - -        shm_base = mmap(NULL, -                        SHM_RBUFF_FILE_SIZE, -                        PROT_READ | PROT_WRITE, -                        MAP_SHARED, -                        shm_fd, -                        0); - -        if (shm_base == MAP_FAILED) { -                LOG_DBG("Failed to map shared memory."); -                if (close(shm_fd) == -1) -                        LOG_DBG("Failed to close invalid shm."); - -                if (shm_unlink(fn) == -1) -                        LOG_DBG("Failed to remove invalid shm."); - -                free(rb); -                return NULL; -        } - -        rb->shm_base = shm_base; -        rb->head     = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); -        rb->tail     = rb->head + 1; -        rb->acl      = (int8_t *) (rb->tail + 1); -        rb->cntrs    = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); -        rb->lock     = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); -        rb->add      = (pthread_cond_t *) (rb->lock + 1); -        rb->del      = rb->add + 1; - -        rb->fd = shm_fd; -        rb->api = api; - -        return rb; -} - -void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) -{ -        assert(rb); - -        if (close(rb->fd) < 0) -                LOG_DBG("Couldn't close shared memory."); - -        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) -                LOG_DBG("Couldn't unmap shared memory."); - -        free(rb); -} - -void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id) -{ -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        rb->acl[port_id] = 0; /* open */ - -        pthread_mutex_unlock(rb->lock); -} - -int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) -{ -        int ret = 0; - -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        rb->acl[port_id] = -1; - -        if (rb->cntrs[port_id] > 0) -                ret = -EBUSY; - -        pthread_mutex_unlock(rb->lock); - -        return ret; -} - -void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) -{ -        char fn[25]; -        struct lockfile * lf = NULL; - -        assert(rb); - -        if (rb->api != getpid()) { -                lf = lockfile_open(); -                if (lf == NULL) -                        return; -                if (lockfile_owner(lf) == getpid()) { -                        LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", -                                 rb->api, getpid()); -                        lockfile_close(lf); -                } else { -                        LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", -                                getpid(), rb->api); -                        lockfile_close(lf); -                        return; -                } -        } - -        if (close(rb->fd) < 0) -                LOG_DBG("Couldn't close shared memory."); - -        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); - -        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) -                LOG_DBG("Couldn't unmap shared memory."); - -        if (shm_unlink(fn) == -1) -                LOG_DBG("Failed to unlink shm."); - -        free(rb); -} - -int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) -{ -        assert(rb); -        assert(e); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (rb->acl[e->port_id]) { -                pthread_mutex_unlock(rb->lock); -                return -ENOTALLOC; -        } - -        if (!shm_rbuff_free(rb)) { -                pthread_mutex_unlock(rb->lock); -                return -1; -        } - -        if (shm_rbuff_empty(rb)) -                pthread_cond_broadcast(rb->add); - -        *head_el_ptr(rb) = *e; -        *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); - -        ++rb->cntrs[e->port_id]; - -        pthread_mutex_unlock(rb->lock); - -        return 0; -} - -int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) -{ -        int ret = 0; - -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (shm_rbuff_empty(rb)) { -                pthread_mutex_unlock(rb->lock); -                return -1; -        } - -        ret = tail_el_ptr(rb)->index; -        --rb->cntrs[tail_el_ptr(rb)->port_id]; -        *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - -        pthread_mutex_unlock(rb->lock); - -        return ret; -} - -static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, -                                   const struct timespec * timeout) -{ -        struct timespec abstime; -        int ret = 0; - -        assert(rb); - -        if (timeout != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        } - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) rb->lock); -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        while (shm_rbuff_empty(rb)) { -                if (timeout != NULL) -                        ret = pthread_cond_timedwait(rb->add, -                                                     rb->lock, -                                                     &abstime); -                else -                        ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ -                if (ret == EOWNERDEAD) { -                        LOG_DBG("Recovering dead mutex."); -                        pthread_mutex_consistent(rb->lock); -                } -#endif -                if (ret == ETIMEDOUT) -                        break; -        } - -        if (ret != ETIMEDOUT) -                ret = tail_el_ptr(rb)->port_id; -        else -                ret = -ETIMEDOUT; - -        pthread_cleanup_pop(true); - -        return ret; -} - -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff *   rb, -                        bool *                  set, -                        const struct timespec * timeout) -{ -        struct timespec abstime; -        int ret; - -        assert(rb); - -        if (set == NULL) -                return shm_ap_rbuff_peek_b_all(rb, timeout); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (timeout != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        } - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) rb->lock); - -        while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) -               && (ret != ETIMEDOUT)) { -                while (shm_rbuff_empty(rb)) { -                        if (timeout != NULL) -                                ret = pthread_cond_timedwait(rb->add, -                                                             rb->lock, -                                                             &abstime); -                        else -                                ret = pthread_cond_wait(rb->add, rb->lock); - -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_DBG("Recovering dead mutex."); -                                pthread_mutex_consistent(rb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) -                                break; -                } - -                while (!set[tail_el_ptr(rb)->port_id]) { -                        if (timeout != NULL) -                                ret = pthread_cond_timedwait(rb->del, -                                                             rb->lock, -                                                             &abstime); -                        else -                                ret = pthread_cond_wait(rb->del, rb->lock); - -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_DBG("Recovering dead mutex."); -                                pthread_mutex_consistent(rb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) -                                break; -                } -        } - -        if (ret != ETIMEDOUT) -                ret = tail_el_ptr(rb)->port_id; -        else -                ret = -ETIMEDOUT; - -        pthread_cleanup_pop(true); - -        return ret; -} - - -struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) -{ -        struct rb_entry * e = NULL; - -        assert(rb); - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) rb->lock); -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        while (shm_rbuff_empty(rb)) -#ifdef __APPLE__ -                pthread_cond_wait(rb->add, rb->lock); -#else -                if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) { -                        LOG_DBG("Recovering dead mutex."); -                        pthread_mutex_consistent(rb->lock); -                } -#endif -        e = malloc(sizeof(*e)); -        if (e != NULL) { -                *e = *(rb->shm_base + *rb->tail); -                --rb->cntrs[e->port_id]; -                *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); -        } - -        pthread_cleanup_pop(true); - -        return e; -} - -ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) -{ -        ssize_t idx = -1; - -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { -                pthread_mutex_unlock(rb->lock); -                return -1; -        } - -        idx = tail_el_ptr(rb)->index; -        --rb->cntrs[port_id]; -        *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - -        pthread_cond_broadcast(rb->del); -        pthread_mutex_unlock(rb->lock); - -        return idx; -} - -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb, -                                 int                     port_id, -                                 const struct timespec * timeout) -{ -        struct timespec abstime; -        int ret = 0; -        ssize_t idx = -1; - -        assert(rb); - -#ifdef __APPLE__ -        pthread_mutex_lock(rb->lock); -#else -        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { -                LOG_DBG("Recovering dead mutex."); -                pthread_mutex_consistent(rb->lock); -        } -#endif -        if (timeout != NULL) { -                idx = -ETIMEDOUT; -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeout, &abstime); -        } - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) rb->lock); - -        while ((shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) -               && (ret != ETIMEDOUT)) { -                while (shm_rbuff_empty(rb)) { -                        if (timeout != NULL) -                                ret = pthread_cond_timedwait(rb->add, -                                                             rb->lock, -                                                             &abstime); -                        else -                                ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_DBG("Recovering dead mutex."); -                                pthread_mutex_consistent(rb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) -                                break; -                } - -                while (tail_el_ptr(rb)->port_id != port_id) { -                        if (timeout != NULL) -                                ret = pthread_cond_timedwait(rb->del, -                                                             rb->lock, -                                                             &abstime); -                        else -                                ret = pthread_cond_wait(rb->del, rb->lock); -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_DBG("Recovering dead mutex."); -                                pthread_mutex_consistent(rb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) -                                break; -                } -        } - -        if (ret != ETIMEDOUT) { -                idx = tail_el_ptr(rb)->index; -                --rb->cntrs[port_id]; -                *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - -                pthread_cond_broadcast(rb->del); -        } - -        pthread_cleanup_pop(true); - -        return idx; -} - -void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) -{ -        assert(rb); - -        pthread_mutex_lock(rb->lock); -        *rb->tail = 0; -        *rb->head = 0; -        pthread_mutex_unlock(rb->lock); -} diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c new file mode 100644 index 00000000..04de9fc5 --- /dev/null +++ b/src/lib/shm_flow_set.c @@ -0,0 +1,407 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Management of flow_sets for fqueue + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/shm_flow_set.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/errno.h> + +#define OUROBOROS_PREFIX "shm_flow_set" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <unistd.h> +#include <signal.h> +#include <sys/stat.h> +#include <string.h> +#include <assert.h> + +#define FN_MAX_CHARS 255 + +#define FQUEUESIZE (SHM_BUFFER_SIZE * sizeof(int)) + +#define SHM_FLOW_SET_FILE_SIZE (IRMD_MAX_FLOWS * sizeof(ssize_t)          \ +                                + AP_MAX_FQUEUES * sizeof(size_t)         \ +                                + AP_MAX_FQUEUES * sizeof(pthread_cond_t) \ +                                + AP_MAX_FQUEUES * FQUEUESIZE             \ +                                + sizeof(pthread_mutex_t)) + +#define fqueue_ptr(fs, idx) (fs->fqueues + SHM_BUFFER_SIZE * idx) + +struct shm_flow_set { +        ssize_t *         mtable; +        size_t *          heads; +        pthread_cond_t *  conds; +        int *             fqueues; +        pthread_mutex_t * lock; + +        pid_t             api; +}; + +struct shm_flow_set * shm_flow_set_create() +{ +        struct shm_flow_set * set; +        ssize_t *             shm_base; +        pthread_mutexattr_t   mattr; +        pthread_condattr_t    cattr; +        char                  fn[FN_MAX_CHARS]; +        mode_t                mask; +        int                   shm_fd; +        int                   i; + +        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid()); + +        set = malloc(sizeof(*set)); +        if (set == NULL) { +                LOG_DBG("Could not allocate struct."); +                return NULL; +        } + +        mask = umask(0); + +        shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBG("Failed creating flag file."); +                free(set); +                return NULL; +        } + +        umask(mask); + +        if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) { +                LOG_DBG("Failed to extend flag file."); +                free(set); +                close(shm_fd); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_FLOW_SET_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        close(shm_fd); + +        if (shm_base == MAP_FAILED) { +                LOG_DBG("Failed to map shared memory."); +                if (shm_unlink(fn) == -1) +                        LOG_DBG("Failed to remove invalid shm."); + +                free(set); +                return NULL; +        } + +        set->mtable  = shm_base; +        set->heads   = (size_t *) (set->mtable + IRMD_MAX_FLOWS); +        set->conds   = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); +        set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); +        set->lock    = (pthread_mutex_t *) +                (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + +        pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ +        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif +        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +        pthread_mutex_init(set->lock, &mattr); + +        pthread_condattr_init(&cattr); +        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        for (i = 0; i < AP_MAX_FQUEUES; ++i) { +                set->heads[i] = 0; +                pthread_cond_init(&set->conds[i], &cattr); +        } + +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) +                set->mtable[i] = -1; + +        set->api = getpid(); + +        return set; +} + +struct shm_flow_set * shm_flow_set_open(pid_t api) +{ +        struct shm_flow_set * set; +        ssize_t *             shm_base; +        char                  fn[FN_MAX_CHARS]; +        int                   shm_fd; + +        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", api); + +        set = malloc(sizeof(*set)); +        if (set == NULL) { +                LOG_DBG("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(fn, O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); +                free(set); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_FLOW_SET_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        close(shm_fd); + +        if (shm_base == MAP_FAILED) { +                LOG_DBG("Failed to map shared memory."); +                if (shm_unlink(fn) == -1) +                        LOG_DBG("Failed to remove invalid shm."); +                free(set); +                return NULL; +        } + +        set->mtable  = shm_base; +        set->heads   = (size_t *) (set->mtable + IRMD_MAX_FLOWS); +        set->conds   = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); +        set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); +        set->lock    = (pthread_mutex_t *) +                (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + +        set->api = api; + +        return set; +} + +void shm_flow_set_destroy(struct shm_flow_set * set) +{ +        char fn[25]; +        struct lockfile * lf = NULL; + +        assert(set); + +        if (set->api != getpid()) { +                lf = lockfile_open(); +                if (lf == NULL) { +                        LOG_ERR("Failed to open lockfile."); +                        return; +                } + +                if (lockfile_owner(lf) == getpid()) { +                        LOG_DBG("Flow set %d destroyed by IRMd %d.", +                                set->api, getpid()); +                        lockfile_close(lf); +                } else { +                        LOG_ERR("AP-I %d tried to destroy flowset owned by %d.", +                                getpid(), set->api); +                        lockfile_close(lf); +                        return; +                } +        } + +        sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->api); + +        if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        if (shm_unlink(fn) == -1) +                LOG_DBG("Failed to unlink shm."); + +        free(set); +} + +void shm_flow_set_close(struct shm_flow_set * set) +{ +        assert(set); + +        if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        free(set); +} + +void shm_flow_set_zero(struct shm_flow_set * shm_set, +                       ssize_t               idx) +{ +        ssize_t i = 0; + +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +        pthread_mutex_lock(shm_set->lock); + +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) +                if (shm_set->mtable[i] == idx) +                        shm_set->mtable[i] = -1; + +        shm_set->heads[idx] = 0; + +        pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_add(struct shm_flow_set * shm_set, +                     ssize_t               idx, +                     int                   port_id) +{ +        assert(shm_set); +        assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +        pthread_mutex_lock(shm_set->lock); + +        if (shm_set->mtable[port_id] != -1) { +                pthread_mutex_unlock(shm_set->lock); +                return -EPERM; +        } + +        shm_set->mtable[port_id] = idx; + +        pthread_mutex_unlock(shm_set->lock); + +        return 0; +} + +void shm_flow_set_del(struct shm_flow_set * shm_set, +                      ssize_t               idx, +                      int                   port_id) +{ +        assert(shm_set); +        assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +        pthread_mutex_lock(shm_set->lock); + +        if (shm_set->mtable[port_id] == idx) +                shm_set->mtable[port_id] = -1; + +        pthread_mutex_unlock(shm_set->lock); +} + +int shm_flow_set_has(struct shm_flow_set * shm_set, +                     ssize_t               idx, +                     int                   port_id) +{ +        int ret = 0; + +        assert(shm_set); +        assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +        pthread_mutex_lock(shm_set->lock); + +        if (shm_set->mtable[port_id] == idx) +                ret = 1; + +        pthread_mutex_unlock(shm_set->lock); + +        return ret; +} + +void shm_flow_set_notify(struct shm_flow_set * shm_set, int port_id) +{ +        assert(shm_set); +        assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + +        pthread_mutex_lock(shm_set->lock); + +        if (shm_set->mtable[port_id] == -1) { +                pthread_mutex_unlock(shm_set->lock); +                return; +        } + +        *(fqueue_ptr(shm_set, shm_set->mtable[port_id]) + +                     (shm_set->heads[shm_set->mtable[port_id]])++) = port_id; + +        pthread_cond_signal(&shm_set->conds[shm_set->mtable[port_id]]); + +        pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_wait(const struct shm_flow_set * shm_set, +                      ssize_t                     idx, +                      int *                       fqueue, +                      const struct timespec *     timeout) +{ +        int ret = 0; +        struct timespec abstime; + +        assert(shm_set); +        assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +#ifdef __APPLE__ +        pthread_mutex_lock(shm_set->lock); +#else +        if (pthread_mutex_lock(shm_set->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(shm_set->lock); +        } +#endif +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) shm_set->lock); + +        while (shm_set->heads[idx] == 0 && ret != -ETIMEDOUT) { +                if (timeout != NULL) +                        ret = pthread_cond_timedwait(shm_set->conds + idx, +                                                     shm_set->lock, +                                                     &abstime); +                else +                        ret = pthread_cond_wait(shm_set->conds + idx, +                                                shm_set->lock); +#ifndef __APPLE__ +                if (ret == EOWNERDEAD) { +                        LOG_DBG("Recovering dead mutex."); +                        pthread_mutex_consistent(shm_set->lock); +                } +#endif +                if (ret == ETIMEDOUT) { +                        ret = -ETIMEDOUT; +                        break; +                } +        } + +        if (ret != -ETIMEDOUT) { +                memcpy(fqueue, +                       fqueue_ptr(shm_set, idx), +                       shm_set->heads[idx] * sizeof(int)); +                ret = shm_set->heads[idx]; +                shm_set->heads[idx] = 0; +        } + +        pthread_cleanup_pop(true); + +        return ret; +} diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c new file mode 100644 index 00000000..a933fbff --- /dev/null +++ b/src/lib/shm_rbuff.c @@ -0,0 +1,405 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for incoming SDUs + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/shm_rbuff.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/errno.h> + +#define OUROBOROS_PREFIX "shm_rbuff" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <signal.h> +#include <sys/stat.h> +#include <assert.h> +#include <stdbool.h> + +#define FN_MAX_CHARS 255 + +#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(ssize_t)          \ +                             + 2 * sizeof(size_t) + sizeof(int8_t)      \ +                             + sizeof(pthread_mutex_t)                  \ +                             + 2 * sizeof (pthread_cond_t)) + +#define shm_rbuff_used(rb) ((*rb->head + SHM_BUFFER_SIZE - *rb->tail)   \ +                            & (SHM_BUFFER_SIZE - 1)) +#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) +#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) + +struct shm_rbuff { +        ssize_t *         shm_base; /* start of entry                */ +        size_t *          head;     /* start of ringbuffer head      */ +        size_t *          tail;     /* start of ringbuffer tail      */ +        int8_t *          acl;      /* access control                */ +        pthread_mutex_t * lock;     /* lock all free space in shm    */ +        pthread_cond_t *  add;      /* SDU arrived                   */ +        pthread_cond_t *  del;      /* SDU removed                   */ +        pid_t             api;      /* api of the owner              */ +        int               port_id;  /* port_id of the flow           */ +}; + +struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) +{ +        struct shm_rbuff *  rb; +        int                 shm_fd; +        ssize_t *           shm_base; +        pthread_mutexattr_t mattr; +        pthread_condattr_t  cattr; +        char                fn[FN_MAX_CHARS]; +        mode_t              mask; + +        sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", api, port_id); + +        rb = malloc(sizeof(*rb)); +        if (rb == NULL) { +                LOG_DBG("Could not allocate struct."); +                return NULL; +        } + +        mask = umask(0); + +        shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBG("Failed creating ring buffer."); +                free(rb); +                return NULL; +        } + +        umask(mask); + +        if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { +                LOG_DBG("Failed to extend ringbuffer."); +                free(rb); +                close(shm_fd); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_RBUFF_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        close(shm_fd); + +        if (shm_base == MAP_FAILED) { +                LOG_DBG("Failed to map shared memory."); +                if (shm_unlink(fn) == -1) +                        LOG_DBG("Failed to remove invalid shm."); +                free(rb); +                return NULL; +        } + +        rb->shm_base = shm_base; +        rb->head     = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); +        rb->tail     = rb->head + 1; +        rb->acl      = (int8_t *) (rb->tail + 1); +        rb->lock     = (pthread_mutex_t *) (rb->acl + 1); +        rb->add      = (pthread_cond_t *) (rb->lock + 1); +        rb->del      = rb->add + 1; + +        pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ +        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif +        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +        pthread_mutex_init(rb->lock, &mattr); + +        pthread_condattr_init(&cattr); +        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        pthread_cond_init(rb->add, &cattr); +        pthread_cond_init(rb->del, &cattr); + +        *rb->acl = 0; +        *rb->head = 0; +        *rb->tail = 0; + +        rb->api = api; +        rb->port_id = port_id; + +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        return rb; +} + +struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id) +{ +        struct shm_rbuff * rb; +        int                shm_fd; +        ssize_t *          shm_base; +        char               fn[FN_MAX_CHARS]; + +        sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", api, port_id); + +        rb = malloc(sizeof(*rb)); +        if (rb == NULL) { +                LOG_DBG("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(fn, O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); +                free(rb); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_RBUFF_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        close(shm_fd); + +        if (shm_base == MAP_FAILED) { +                LOG_DBG("Failed to map shared memory."); +                if (shm_unlink(fn) == -1) +                        LOG_DBG("Failed to remove invalid shm."); + +                free(rb); +                return NULL; +        } + +        rb->shm_base = shm_base; +        rb->head     = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); +        rb->tail     = rb->head + 1; +        rb->acl      = (int8_t *) (rb->tail + 1); +        rb->lock     = (pthread_mutex_t *) (rb->acl + 1); +        rb->add      = (pthread_cond_t *) (rb->lock + 1); +        rb->del      = rb->add + 1; + +        rb->api = api; +        rb->port_id = port_id; + +        return rb; +} + +void shm_rbuff_close(struct shm_rbuff * rb) +{ +        assert(rb); + +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        free(rb); +} + +void shm_rbuff_destroy(struct shm_rbuff * rb) +{ +        char fn[25]; + +        if (rb == NULL) +                return; + +        sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id); + +        if (shm_unlink(fn) == -1) +                LOG_DBG("Failed to unlink shm %s.", fn); + +        free(rb); +} + +int shm_rbuff_write(struct shm_rbuff * rb, ssize_t idx) +{ +        assert(rb); +        assert(idx >= 0); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (*rb->acl) { +                pthread_mutex_unlock(rb->lock); +                return -ENOTALLOC; +        } + +        if (!shm_rbuff_free(rb)) { +                pthread_mutex_unlock(rb->lock); +                return -1; +        } + +        if (shm_rbuff_empty(rb)) +                pthread_cond_broadcast(rb->add); + +        *head_el_ptr(rb) = idx; +        *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); + +        pthread_mutex_unlock(rb->lock); + +        return 0; +} + +ssize_t shm_rbuff_read(struct shm_rbuff * rb) +{ +        int ret = 0; + +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (shm_rbuff_empty(rb)) { +                pthread_mutex_unlock(rb->lock); +                return -1; +        } + +        ret = *tail_el_ptr(rb); +        *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); + +        pthread_mutex_unlock(rb->lock); + +        return ret; +} + +ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb, +                         const struct timespec * timeout) +{ +        struct timespec abstime; +        int ret = 0; +        ssize_t idx = -1; + +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (timeout != NULL) { +                idx = -ETIMEDOUT; +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while (shm_rbuff_empty(rb) && (ret != ETIMEDOUT)) { +                if (timeout != NULL) +                        ret = pthread_cond_timedwait(rb->add, +                                                     rb->lock, +                                                     &abstime); +                else +                        ret = pthread_cond_wait(rb->add, rb->lock); +#ifndef __APPLE__ +                if (ret == EOWNERDEAD) { +                        LOG_DBG("Recovering dead mutex."); +                        pthread_mutex_consistent(rb->lock); +                } +#endif +                if (ret == ETIMEDOUT) { +                        idx = -ETIMEDOUT; +                        break; +                } +        } + +        if (idx != -ETIMEDOUT) { +                idx = *tail_el_ptr(rb); +                *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); +                pthread_cond_broadcast(rb->del); +        } + +        pthread_cleanup_pop(true); + +        return idx; +} + +int shm_rbuff_block(struct shm_rbuff * rb) +{ +        int ret = 0; + +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        *rb->acl = -1; + +        if (!shm_rbuff_empty(rb)) +                ret = -EBUSY; + +        pthread_mutex_unlock(rb->lock); + +        return ret; +} + +void shm_rbuff_unblock(struct shm_rbuff * rb) +{ +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        *rb->acl = 0; /* open */ + +        pthread_mutex_unlock(rb->lock); +} + +void shm_rbuff_reset(struct shm_rbuff * rb) +{ +        assert(rb); + +        pthread_mutex_lock(rb->lock); +        *rb->tail = 0; +        *rb->head = 0; +        pthread_mutex_unlock(rb->lock); +} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index f6683dc2..e5a37577 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -90,7 +90,6 @@ struct shm_rdrbuff {          pthread_cond_t *  full;        /* run sanitizer when buffer full */          pid_t *           api;         /* api of the irmd owner */          enum qos_cube     qos;         /* qos id which this buffer serves */ -        int               fd;  };  static void garbage_collect(struct shm_rdrbuff * rdrb) @@ -189,17 +188,11 @@ struct shm_rdrbuff * shm_rdrbuff_create()          if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) {                  LOG_DBGF("Failed to extend shared memory map.");                  free(shm_rdrb_fn); +                close(shm_fd);                  free(rdrb);                  return NULL;          } -#ifndef __APPLE -        if (write(shm_fd, "", 1) != 1) { -                LOG_DBGF("Failed to finalise extension of shared memory map."); -                free(shm_rdrb_fn); -                free(rdrb); -                return NULL; -        } -#endif +          shm_base = mmap(NULL,                          SHM_FILE_SIZE,                          PROT_READ | PROT_WRITE, @@ -207,6 +200,8 @@ struct shm_rdrbuff * shm_rdrbuff_create()                          shm_fd,                          0); +        close(shm_fd); +          if (shm_base == MAP_FAILED) {                  LOG_DBGF("Failed to map shared memory.");                  if (shm_unlink(shm_rdrb_fn) == -1) @@ -235,6 +230,9 @@ struct shm_rdrbuff * shm_rdrbuff_create()          pthread_condattr_init(&cattr);          pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif          pthread_cond_init(rdrb->full, &cattr);          pthread_cond_init(rdrb->healthy, &cattr); @@ -246,7 +244,6 @@ struct shm_rdrbuff * shm_rdrbuff_create()          *rdrb->api = getpid();          rdrb->qos = qos; -        rdrb->fd  = shm_fd;          free(shm_rdrb_fn); @@ -287,10 +284,11 @@ struct shm_rdrbuff * shm_rdrbuff_open()                          MAP_SHARED,                          shm_fd,                          0); + +        close(shm_fd); +          if (shm_base == MAP_FAILED) {                  LOG_DBGF("Failed to map shared memory."); -                if (close(shm_fd) == -1) -                        LOG_DBG("Failed to close invalid shm.");                  if (shm_unlink(shm_rdrb_fn) == -1)                          LOG_DBG("Failed to unlink invalid shm.");                  free(shm_rdrb_fn); @@ -309,7 +307,6 @@ struct shm_rdrbuff * shm_rdrbuff_open()          rdrb->api = (pid_t *) (rdrb->full + 1);          rdrb->qos = qos; -        rdrb->fd = shm_fd;          free(shm_rdrb_fn); @@ -400,9 +397,6 @@ void shm_rdrbuff_close(struct shm_rdrbuff * rdrb)  {          assert(rdrb); -        if (close(rdrb->fd) < 0) -                LOG_DBGF("Couldn't close shared memory."); -          if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)                  LOG_DBGF("Couldn't unmap shared memory."); @@ -420,9 +414,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)                  return;          } -        if (close(rdrb->fd) < 0) -                LOG_DBG("Couldn't close shared memory."); -          if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)                  LOG_DBG("Couldn't unmap shared memory.");  | 
