diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/CMakeLists.txt | 7 | ||||
| -rw-r--r-- | src/lib/dev.c | 371 | ||||
| -rw-r--r-- | src/lib/frct_enroll.proto | 32 | ||||
| -rw-r--r-- | src/lib/shm_flow_set.c | 6 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_ll.c | 10 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_pthr.c | 20 | ||||
| -rw-r--r-- | src/lib/shm_rdrbuff.c | 14 | ||||
| -rw-r--r-- | src/lib/tests/time_utils_test.c | 4 | ||||
| -rw-r--r-- | src/lib/tpm.c | 266 | 
9 files changed, 376 insertions, 354 deletions
| diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index e08869b8..fe4dd88c 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -11,7 +11,6 @@ protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS  protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS cdap.proto)  protobuf_generate_c(RO_PROTO_SRCS RO_PROTO_HDRS ro.proto)  protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto) -protobuf_generate_c(FRCT_ENROLL_SRCS FRCT_ENROLL_HDRS frct_enroll.proto)  if (NOT APPLE)    find_library(LIBRT_LIBRARIES rt) @@ -59,12 +58,13 @@ set(SOURCE_FILES    shm_rdrbuff.c    sockets.c    time_utils.c +  tpm.c    utils.c    )  add_library(ouroboros SHARED ${SOURCE_FILES} ${IRM_PROTO_SRCS}    ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS} -  ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS} ${FRCT_ENROLL_SRCS}) +  ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS})  include(AddCompileFlags)  if (CMAKE_BUILD_TYPE MATCHES Debug) @@ -77,9 +77,8 @@ else()    if (${LINUX_RND_HDR} STREQUAL "LINUX_RND_HDR-NOTFOUND")      find_package(OpenSSL)      if (NOT OPENSSL_FOUND) -      message(STATUS "No secure random generation, please install OpenSSL.") +      message(FATAL_ERROR "No secure random generation, please install libssl.")      else() -      message(STATUS "OpenSSL found")        include_directories($OPENSSL_INCLUDE_DIR})        add_compile_flags(ouroboros -DHAVE_OPENSSL)      endif() diff --git a/src/lib/dev.c b/src/lib/dev.c index 14971528..c8e43778 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -85,7 +85,6 @@ struct flow {  struct {          char *                ap_name; -        char *                daf_name;          pid_t                 api;          struct shm_rdrbuff *  rdrb; @@ -205,7 +204,7 @@ static int api_announce(char * ap_name)          return ret;  } -static void init_flow(int fd) +static void flow_clear(int fd)  {          assert(!(fd < 0)); @@ -216,9 +215,9 @@ static void init_flow(int fd)          ai.flows[fd].cube     = QOS_CUBE_BE;  } -static void reset_flow(int fd) +static void flow_fini(int fd)  { -        assert (!(fd < 0)); +        assert(!(fd < 0));          if (ai.flows[fd].port_id != -1)                  port_destroy(&ai.ports[ai.flows[fd].port_id]); @@ -232,7 +231,59 @@ static void reset_flow(int fd)          if (ai.flows[fd].set != NULL)                  shm_flow_set_close(ai.flows[fd].set); -        init_flow(fd); +        flow_clear(fd); +} + +static int flow_init(int       port_id, +                     pid_t     api, +                     qoscube_t qc) +{ +        int fd; + +        pthread_rwlock_wrlock(&ai.flows_lock); + +        fd = bmp_allocate(ai.fds); +        if (!bmp_is_id_valid(ai.fds, fd)) { +                pthread_rwlock_unlock(&ai.flows_lock); +                return -EBADF; +        } + +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); +        if (ai.flows[fd].rx_rb == NULL) { +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                return -ENOMEM; +        } + +        ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                flow_fini(fd); +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                return -ENOMEM; +        } + +        ai.flows[fd].set = shm_flow_set_open(api); +        if (ai.flows[fd].set == NULL) { +                flow_fini(fd); +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                return -ENOMEM; +        } + +        ai.flows[fd].port_id = port_id; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT; +        ai.flows[fd].api     = api; +        ai.flows[fd].cube    = qc; +        ai.flows[fd].spec    = qos_cube_to_spec(qc); + +        ai.ports[port_id].fd = fd; + +        port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + +        pthread_rwlock_unlock(&ai.flows_lock); + +        return fd;  }  int ouroboros_init(const char * ap_name) @@ -242,7 +293,6 @@ int ouroboros_init(const char * ap_name)          assert(ai.ap_name == NULL);          ai.api = getpid(); -        ai.daf_name = NULL;          ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS);          if (ai.fds == NULL) @@ -279,7 +329,7 @@ int ouroboros_init(const char * ap_name)          }          for (i = 0; i < AP_MAX_FLOWS; ++i) -                init_flow(i); +                flow_clear(i);          ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);          if (ai.ports == NULL) { @@ -333,9 +383,6 @@ void ouroboros_fini()          shm_flow_set_destroy(ai.fqset); -        if (ai.daf_name != NULL) -                free(ai.daf_name); -          if (ai.ap_name != NULL)                  free(ai.ap_name); @@ -346,7 +393,7 @@ void ouroboros_fini()                          ssize_t idx;                          while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)                                  shm_rdrbuff_remove(ai.rdrb, idx); -                        reset_flow(i); +                        flow_fini(i);                  }          } @@ -368,13 +415,9 @@ void ouroboros_fini()  int flow_accept(qosspec_t *             qs,                  const struct timespec * timeo)  { -        irm_msg_t           msg      = IRM_MSG__INIT; -        irm_msg_t *         recv_msg = NULL; -        int                 fd       = -1; -        frct_enroll_msg_t * frct_enroll; -        qosspec_t           spec; -        uint8_t             data[BUF_SIZE]; -        ssize_t             n; +        irm_msg_t   msg      = IRM_MSG__INIT; +        irm_msg_t * recv_msg = NULL; +        int         fd       = -1;          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_api = true; @@ -408,83 +451,15 @@ int flow_accept(qosspec_t *             qs,                  return -EIRMD;          } -        pthread_rwlock_wrlock(&ai.flows_lock); - -        fd = bmp_allocate(ai.fds); -        if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -EBADF; -        } - -        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].port_id = recv_msg->port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = recv_msg->api; -        ai.flows[fd].cube    = recv_msg->qoscube; - -        assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT); - -        spec = qos_cube_to_spec(recv_msg->qoscube); - -        ai.ports[recv_msg->port_id].fd    = fd; -        ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - -        pthread_rwlock_unlock(&ai.flows_lock); +        fd = flow_init(recv_msg->port_id, recv_msg->api, recv_msg->qoscube);          irm_msg__free_unpacked(recv_msg, NULL); -        n = flow_read(fd, data, BUF_SIZE); -        if (n < 0) { -                flow_dealloc(fd); -                return n; -        } - -        frct_enroll = frct_enroll_msg__unpack(NULL, n, data); -        if (frct_enroll == NULL) { -                flow_dealloc(fd); -                return -1; -        } - -        spec.resource_control = frct_enroll->resource_control; -        spec.reliable = frct_enroll->reliable; -        spec.error_check = frct_enroll->error_check; -        spec.ordered = frct_enroll->ordered; -        spec.partial = frct_enroll->partial; - -        frct_enroll_msg__free_unpacked(frct_enroll, NULL); - -        pthread_rwlock_wrlock(&ai.flows_lock); -        ai.flows[fd].spec = spec; -        pthread_rwlock_unlock(&ai.flows_lock); +        if (fd < 0) +                return fd;          if (qs != NULL) -                *qs = spec; +                *qs = ai.flows[fd].spec;          return fd;  } @@ -493,14 +468,10 @@ int flow_alloc(const char *            dst_name,                 qosspec_t *             qs,                 const struct timespec * timeo)  { -        irm_msg_t         msg         = IRM_MSG__INIT; -        frct_enroll_msg_t frct_enroll = FRCT_ENROLL_MSG__INIT; -        irm_msg_t *       recv_msg    = NULL; -        qoscube_t         qc          = QOS_CUBE_BE; -        int               fd; -        ssize_t           len; -        uint8_t *         data; -        int               ret; +        irm_msg_t   msg      = IRM_MSG__INIT; +        irm_msg_t * recv_msg = NULL; +        qoscube_t   qc       = QOS_CUBE_BE; +        int         fd;          msg.code        = IRM_MSG_CODE__IRM_FLOW_ALLOC;          msg.dst_name    = (char *) dst_name; @@ -508,15 +479,8 @@ int flow_alloc(const char *            dst_name,          msg.has_qoscube = true;          msg.api         = ai.api; -        if (qs != NULL) { -                frct_enroll.resource_control = qs->resource_control; -                frct_enroll.reliable = qs->reliable; -                frct_enroll.error_check = qs->error_check; -                frct_enroll.ordered = qs->ordered; -                frct_enroll.partial = qs->partial; - +        if (qs != NULL)                  qc = qos_spec_to_cube(*qs); -        }          msg.qoscube = qc; @@ -547,78 +511,10 @@ int flow_alloc(const char *            dst_name,                  return -EIRMD;          } -        pthread_rwlock_wrlock(&ai.flows_lock); - -        fd = bmp_allocate(ai.fds); -        if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -EBADF; -        } - -        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -ENOMEM; -        } - -        ai.flows[fd].port_id = recv_msg->port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = recv_msg->api; -        ai.flows[fd].cube    = recv_msg->qoscube; - -        assert(ai.ports[recv_msg->port_id].state == PORT_INIT); - -        ai.ports[recv_msg->port_id].fd    = fd; -        ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; +        fd = flow_init(recv_msg->port_id, recv_msg->api, qc);          irm_msg__free_unpacked(recv_msg, NULL); -        pthread_rwlock_unlock(&ai.flows_lock); - -        len = frct_enroll_msg__get_packed_size(&frct_enroll); -        if (len < 0) { -                flow_dealloc(fd); -                return -1; -        } - -        data = malloc(len); -        if (data == NULL) { -                flow_dealloc(fd); -                return -ENOMEM; -        } - -        frct_enroll_msg__pack(&frct_enroll, data); - -        ret = flow_write(fd, data, len); -        if (ret < 0) { -                flow_dealloc(fd); -                free(data); -                return ret; -        } - -        free(data); -          return fd;  } @@ -657,7 +553,7 @@ int flow_dealloc(int fd)          pthread_rwlock_wrlock(&ai.flows_lock); -        reset_flow(fd); +        flow_fini(fd);          bmp_release(ai.fds, fd);          pthread_rwlock_unlock(&ai.flows_lock); @@ -1073,53 +969,11 @@ int flow_event_wait(struct flow_set *       set,  /* ipcp-dev functions */ -int np1_flow_alloc(pid_t n_api, -                   int   port_id) +int np1_flow_alloc(pid_t     n_api, +                   int       port_id, +                   qoscube_t qc)  { -        int fd; - -        pthread_rwlock_wrlock(&ai.flows_lock); - -        fd = bmp_allocate(ai.fds); -        if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; -        } - -        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; -        } - -        ai.flows[fd].set = shm_flow_set_open(n_api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; -        } - -        ai.flows[fd].port_id = port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = n_api; - -        ai.ports[port_id].fd    = fd; -        ai.ports[port_id].state = PORT_ID_ASSIGNED; - -        pthread_rwlock_unlock(&ai.flows_lock); - -        return fd; +        return flow_init(port_id, n_api, qc);  }  int np1_flow_dealloc(int port_id) @@ -1182,11 +1036,10 @@ int ipcp_create_r(pid_t api,  int ipcp_flow_req_arr(pid_t           api,                        const uint8_t * dst,                        size_t          len, -                      qoscube_t       cube) +                      qoscube_t       qc)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int port_id = -1;          int fd = -1;          if (dst == NULL) @@ -1199,88 +1052,24 @@ int ipcp_flow_req_arr(pid_t           api,          msg.hash.len    = len;          msg.hash.data   = (uint8_t *) dst;          msg.has_qoscube = true; -        msg.qoscube     = cube; - -        pthread_rwlock_wrlock(&ai.flows_lock); - -        fd = bmp_allocate(ai.fds); -        if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.flows_lock); -                return -1; /* -ENOMOREFDS */ -        } - -        pthread_rwlock_unlock(&ai.flows_lock); +        msg.qoscube     = qc;          recv_msg = send_recv_irm_msg(&msg); -        pthread_rwlock_wrlock(&ai.flows_lock); - -        if (recv_msg == NULL) { -                ai.ports[fd].state = PORT_INIT; -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); +        if (recv_msg == NULL)                  return -EIRMD; -        }          if (!recv_msg->has_port_id || !recv_msg->has_api) { -                ai.ports[fd].state = PORT_INIT; -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          if (recv_msg->has_result && recv_msg->result) { -                ai.ports[fd].state = PORT_INIT; -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        port_id = recv_msg->port_id; -        if (port_id < 0) { -                ai.ports[fd].state = PORT_INIT; -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ai.flows[fd].port_id = port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; - -        ai.ports[port_id].fd = fd; -        port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); - -        pthread_rwlock_unlock(&ai.flows_lock); +        fd = flow_init(recv_msg->port_id, recv_msg->api, qc);          irm_msg__free_unpacked(recv_msg, NULL); diff --git a/src/lib/frct_enroll.proto b/src/lib/frct_enroll.proto deleted file mode 100644 index 497d6acc..00000000 --- a/src/lib/frct_enroll.proto +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * QoS messages - * - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - *    Sander Vrijders   <sander.vrijders@intec.ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - * 02110-1301 USA - */ - -syntax = "proto2"; - -message frct_enroll_msg { -        required bool resource_control = 1; -        required bool reliable         = 2; -        required bool error_check      = 3; -        required bool ordered          = 4; -        required bool partial          = 5; -}; diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 67abbb5b..7660b1dd 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -117,7 +117,7 @@ struct shm_flow_set * shm_flow_set_create()                  (set->fqueues + AP_MAX_FQUEUES * (SHM_BUFFER_SIZE));          pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);  #endif          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -336,7 +336,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,          assert(idx < AP_MAX_FQUEUES);          assert(fqueue); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(set->lock);  #else          if (pthread_mutex_lock(set->lock) == EOWNERDEAD) @@ -358,7 +358,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,                  else                          ret = -pthread_cond_wait(set->conds + idx,                                                   set->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX                  if (ret == -EOWNERDEAD)                          pthread_mutex_consistent(set->lock);  #endif diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 1e072b21..757f65c8 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -127,7 +127,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          rb->del      = rb->add + 1;          pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);  #endif          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -299,7 +299,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                  ts_add(&abstime, timeout, &abstime);          } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -315,7 +315,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                                                        &abstime);                  else                          idx = -pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX                  if (idx == -EOWNERDEAD)                          pthread_mutex_consistent(rb->lock);  #endif @@ -356,7 +356,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)          if (shm_rbuff_empty(rb))                  return; -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -367,7 +367,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)                               (void *) rb->lock);          while (!shm_rbuff_empty(rb)) -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX                  pthread_cond_wait(rb->del, rb->lock);  #else                  if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index d3c1e143..1b9f07d1 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -124,7 +124,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          rb->del      = rb->add + 1;          pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);  #endif          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -231,7 +231,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)          assert(rb);          assert(idx < SHM_BUFFER_SIZE); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -264,7 +264,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -297,7 +297,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                  ts_add(&abstime, timeout, &abstime);          } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -313,7 +313,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,                                                        &abstime);                  else                          idx = -pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX                  if (idx == -EOWNERDEAD)                          pthread_mutex_consistent(rb->lock);  #endif @@ -334,7 +334,7 @@ void shm_rbuff_block(struct shm_rbuff * rb)  {          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -349,7 +349,7 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)  {          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -364,7 +364,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)  {          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -376,7 +376,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb)                               (void *) rb->lock);          while (!shm_rbuff_empty(rb)) -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX                  pthread_cond_wait(rb->del, rb->lock);  #else                  if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) @@ -391,7 +391,7 @@ size_t shm_rbuff_queued(struct shm_rbuff * rb)          assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rb->lock);  #else          if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 1f999f93..d454fef8 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -192,7 +192,7 @@ struct shm_rdrbuff * shm_rdrbuff_create()          pthread_mutexattr_init(&mattr);          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);  #endif          pthread_mutex_init(rdrb->lock, &mattr); @@ -274,7 +274,7 @@ int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,                  ts_add(&abstime, timeo, &abstime);          } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -282,9 +282,9 @@ int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,  #endif          while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) { -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX                  if (pthread_cond_timedwait(rdrb->full, -                                           rdrb->lock +                                           rdrb->lock,                                             &abstime) == ETIMEDOUT) {                          pthread_mutex_unlock(rdrb->lock);                          return -ETIMEDOUT; @@ -358,7 +358,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,          if (sz > SHM_RDRB_BLOCK_SIZE)                  return -EMSGSIZE;  #endif -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -437,7 +437,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,          if (sz > SHM_RDRB_BLOCK_SIZE)                  return -EMSGSIZE;  #endif -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -535,7 +535,7 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,          assert(rdrb);          assert(idx < (SHM_BUFFER_SIZE)); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) diff --git a/src/lib/tests/time_utils_test.c b/src/lib/tests/time_utils_test.c index 6d1ae32f..86636c15 100644 --- a/src/lib/tests/time_utils_test.c +++ b/src/lib/tests/time_utils_test.c @@ -28,12 +28,12 @@  static void ts_print(struct timespec * s)  { -        printf("timespec is %zd:%zd.\n", s->tv_sec, s->tv_nsec); +        printf("timespec is %zd:%ld.\n", (ssize_t) s->tv_sec, s->tv_nsec);  }  static void tv_print(struct timeval * v)  { -        printf("timeval is %zd:%zd.\n", v->tv_sec, v->tv_usec); +        printf("timeval is %zd:%ld.\n", (ssize_t) v->tv_sec, v->tv_usec);  }  static void ts_init(struct timespec * s, diff --git a/src/lib/tpm.c b/src/lib/tpm.c new file mode 100644 index 00000000..8298eeb5 --- /dev/null +++ b/src/lib/tpm.c @@ -0,0 +1,266 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Threadpool management + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#include <ouroboros/config.h> +#include <ouroboros/errno.h> +#include <ouroboros/list.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h> + +#include <pthread.h> +#include <stdlib.h> + +#define TPM_TIMEOUT 1000 + +struct pthr_el { +        struct list_head next; + +        bool             join; + +        pthread_t        thr; +}; + +enum tpm_state { +        TPM_NULL = 0, +        TPM_INIT, +        TPM_RUNNING +}; + +struct { +        size_t           min; +        size_t           inc; +        size_t           max; +        size_t           cur; + +        void * (* func)(void *); + +        struct list_head pool; + +        enum tpm_state   state; + +        pthread_cond_t   cond; +        pthread_mutex_t  lock; + +        pthread_t        mgr; +} tpm; + +static void tpm_join(void) +{ +        struct list_head * p; +        struct list_head * h; + +        list_for_each_safe(p, h, &tpm.pool) { +                struct pthr_el * e = list_entry(p, struct pthr_el, next); +                if (tpm.state != TPM_RUNNING) +                        while (!e->join) +                                pthread_cond_wait(&tpm.cond, &tpm.lock); + +                if (e->join) { +                        pthread_join(e->thr, NULL); +                        list_del(&e->next); +                        free(e); +                } +        } +} + +static void * tpmgr(void * o) +{ +        struct timespec dl; +        struct timespec to = {(TPM_TIMEOUT / 1000), +                              (TPM_TIMEOUT % 1000) * MILLION}; +        (void) o; + +        while (true) { +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_mutex_lock(&tpm.lock); + +                tpm_join(); + +                if (tpm.state != TPM_RUNNING) { +                        tpm.max = 0; +                        tpm_join(); +                        pthread_mutex_unlock(&tpm.lock); +                        break; +                } + +                if (tpm.cur < tpm.min) { +                        tpm.max = tpm.inc; + +                        while (tpm.cur < tpm.max) { +                                struct pthr_el * e = malloc(sizeof(*e)); +                                if (e == NULL) +                                        break; + +                                e->join = false; + +                                if (pthread_create(&e->thr, NULL, +                                                   tpm.func, NULL)) { +                                        free(e); +                                } else { +                                        list_add(&e->next, &tpm.pool); +                                        ++tpm.cur; +                                } +                        } +                } + +                if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl) +                    == ETIMEDOUT) +                        if (tpm.cur > tpm.min ) +                                --tpm.max; + +                pthread_mutex_unlock(&tpm.lock); +        } + +        return (void *) 0; +} + +int tpm_init(size_t min, +             size_t inc, +             void * (* func)(void *)) +{ +        pthread_condattr_t cattr; + +        if (pthread_mutex_init(&tpm.lock, NULL)) +                goto fail_lock; + +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&tpm.cond, &cattr)) +                goto fail_cond; + +        list_head_init(&tpm.pool); + +        pthread_condattr_destroy(&cattr); + +        tpm.state = TPM_INIT; +        tpm.func  = func; +        tpm.min   = min; +        tpm.inc   = inc; +        tpm.max   = 0; +        tpm.cur   = 0; + +        return 0; + + fail_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        pthread_mutex_destroy(&tpm.lock); + fail_lock: +        return -1; +} + +int tpm_start(void) +{ +        pthread_mutex_lock(&tpm.lock); + +        if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) { +                pthread_mutex_unlock(&tpm.lock); +                return -1; +        } + +        tpm.state = TPM_RUNNING; + +        pthread_mutex_unlock(&tpm.lock); + +        return 0; +} + +void tpm_stop(void) +{ +        pthread_mutex_lock(&tpm.lock); + +        tpm.state = TPM_NULL; + +        pthread_mutex_unlock(&tpm.lock); +} + +void tpm_fini(void) +{ +        pthread_join(tpm.mgr, NULL); + +        pthread_mutex_destroy(&tpm.lock); +        pthread_cond_destroy(&tpm.cond); +} + +bool tpm_check(void) +{ +        bool ret; + +        pthread_mutex_lock(&tpm.lock); + +        ret = tpm.cur > tpm.max; + +        pthread_mutex_unlock(&tpm.lock); + +        return ret; +} + +void tpm_inc(void) +{ +        pthread_mutex_lock(&tpm.lock); + +        ++tpm.cur; + +        pthread_mutex_unlock(&tpm.lock); +} + +void tpm_dec(void) +{ +        pthread_mutex_lock(&tpm.lock); + +        --tpm.cur; + +        pthread_cond_signal(&tpm.cond); + +        pthread_mutex_unlock(&tpm.lock); +} + +void tpm_exit(void) +{ +        struct list_head * p; +        pthread_t          id; + +        id = pthread_self(); + +        pthread_mutex_lock(&tpm.lock); + +        --tpm.cur; + +        list_for_each(p, &tpm.pool) { +                struct pthr_el * e = list_entry(p, struct pthr_el, next); +                if (e->thr == id) { +                        e->join = true; +                        break; +                } +        } + +        pthread_cond_signal(&tpm.cond); + +        pthread_mutex_unlock(&tpm.lock); +} | 
