From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 19 Oct 2016 22:25:46 +0200 Subject: lib: Demultiplex the fast path The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API. --- src/lib/CMakeLists.txt | 3 +- src/lib/dev.c | 500 ++++++++++++++++++++++++------------- src/lib/lockfile.c | 39 ++- src/lib/shm_ap_rbuff.c | 661 ------------------------------------------------- src/lib/shm_flow_set.c | 408 ++++++++++++++++++++++++++++++ src/lib/shm_rbuff.c | 424 +++++++++++++++++++++++++++++++ src/lib/shm_rdrbuff.c | 29 +-- 7 files changed, 1187 insertions(+), 877 deletions(-) delete mode 100644 src/lib/shm_ap_rbuff.c create mode 100644 src/lib/shm_flow_set.c create mode 100644 src/lib/shm_rbuff.c (limited to 'src/lib') diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index b94d0eea..20ea473d 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -35,7 +35,8 @@ set(SOURCE_FILES lockfile.c logs.c nsm.c - shm_ap_rbuff.c + shm_flow_set.c + shm_rbuff.c shm_rdrbuff.c sockets.c time_utils.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 77c2d06a..f735e72b 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -3,7 +3,8 @@ * * API for applications * - * Sander Vrijders + * Dimitri Staessens + * Sander Vrijders * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -26,20 +27,24 @@ #include #include #include +#include #include -#include +#include #include -#include +#include #include #include #include struct flow_set { - bool dirty; - bool b[IRMD_MAX_FLOWS]; /* working copy */ - bool s[IRMD_MAX_FLOWS]; /* safe copy */ - pthread_rwlock_t lock; + size_t idx; +}; + +struct fqueue { + int fqueue[SHM_BUFFER_SIZE]; /* safe copy from shm */ + size_t fqsize; + size_t next; }; enum port_state { @@ -124,7 +129,9 @@ enum port_state port_wait_assign(struct port * p) } struct flow { - struct shm_ap_rbuff * rb; + struct shm_rbuff * rx_rb; + struct shm_rbuff * tx_rb; + struct shm_flow_set * set; int port_id; int oflags; @@ -139,10 +146,11 @@ struct { pid_t api; struct shm_rdrbuff * rdrb; - struct shm_ap_rbuff * rb; + struct shm_flow_set * fqset; pthread_rwlock_t data_lock; struct bmp * fds; + struct bmp * fqueues; struct flow * flows; struct port * ports; @@ -194,40 +202,52 @@ int ap_init(char * ap_name) if (ai.fds == NULL) return -ENOMEM; - ai.rdrb = shm_rdrbuff_open(); - if (ai.rdrb == NULL) { + ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0); + if (ai.fqueues == NULL) { + bmp_destroy(ai.fds); + return -ENOMEM; + } + + ai.fqset = shm_flow_set_create(); + if (ai.fqset == NULL) { + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } - ai.rb = shm_ap_rbuff_create(); - if (ai.rb == NULL) { - shm_rdrbuff_close(ai.rdrb); + ai.rdrb = shm_rdrbuff_open(); + if (ai.rdrb == NULL) { + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS); if (ai.flows == NULL) { - shm_ap_rbuff_destroy(ai.rb); shm_rdrbuff_close(ai.rdrb); + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai.flows[i].rb = NULL; + ai.flows[i].rx_rb = NULL; + ai.flows[i].tx_rb = NULL; + ai.flows[i].set = NULL; ai.flows[i].port_id = -1; - ai.flows[i].oflags = 0; - ai.flows[i].api = -1; + ai.flows[i].oflags = 0; + ai.flows[i].api = -1; ai.flows[i].timeout = NULL; } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); - if (ai.flows == NULL) { + if (ai.ports == NULL) { free(ai.flows); - shm_ap_rbuff_destroy(ai.rb); shm_rdrbuff_close(ai.rdrb); + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } @@ -253,16 +273,10 @@ void ap_fini() pthread_rwlock_wrlock(&ai.data_lock); - /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_pop_idx(ai.rb)) >= 0) - shm_rdrbuff_remove(ai.rdrb, i); - - if (ai.fds != NULL) - bmp_destroy(ai.fds); - if (ai.rb != NULL) - shm_ap_rbuff_destroy(ai.rb); - if (ai.rdrb != NULL) - shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); + bmp_destroy(ai.fqueues); + shm_flow_set_destroy(ai.fqset); + shm_rdrbuff_close(ai.rdrb); if (ai.daf_name != NULL) free(ai.daf_name); @@ -270,8 +284,15 @@ void ap_fini() pthread_rwlock_rdlock(&ai.flows_lock); for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (ai.flows[i].rb != NULL) - shm_ap_rbuff_close(ai.flows[i].rb); + if (ai.flows[i].tx_rb != NULL) { + int idx; + while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) + shm_rdrbuff_remove(ai.rdrb, idx); + shm_rbuff_destroy(ai.flows[i].rx_rb); + shm_rbuff_close(ai.flows[i].tx_rb); + shm_flow_set_close(ai.flows[i].set); + } + if (ai.flows[i].timeout != NULL) free(ai.flows[i].timeout); } @@ -328,8 +349,8 @@ int flow_accept(char ** ae_name) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -337,10 +358,24 @@ int flow_accept(char ** ae_name) return -1; } + ai.flows[fd].set = shm_flow_set_open(recv_msg->api); + if (ai.flows[fd].set == NULL) { + bmp_release(ai.fds, fd); + shm_rbuff_destroy(ai.flows[fd].rx_rb); + shm_rbuff_close(ai.flows[fd].tx_rb); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + if (ae_name != NULL) { *ae_name = strdup(recv_msg->ae_name); if (*ae_name == NULL) { - shm_ap_rbuff_close(ai.flows[fd].rb); + shm_rbuff_destroy(ai.flows[fd].tx_rb); + shm_rbuff_close(ai.flows[fd].tx_rb); + shm_flow_set_close(ai.flows[fd].set); bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -356,8 +391,6 @@ int flow_accept(char ** ae_name) ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -410,6 +443,17 @@ int flow_alloc_resp(int fd, int response) ret = recv_msg->result; + pthread_rwlock_wrlock(&ai.flows_lock); + + ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, + ai.flows[fd].port_id); + if (ai.flows[fd].tx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -461,8 +505,11 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].port_id = recv_msg->port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = recv_msg->api; + ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -470,9 +517,26 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) return -1; } - ai.flows[fd].port_id = recv_msg->port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; - ai.flows[fd].api = recv_msg->api; + ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); + if (ai.flows[fd].tx_rb == NULL) { + shm_rbuff_destroy(ai.flows[fd].rx_rb); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ai.flows[fd].set = shm_flow_set_open(recv_msg->api); + if (ai.flows[fd].set == NULL) { + shm_rbuff_close(ai.flows[fd].tx_rb); + shm_rbuff_destroy(ai.flows[fd].rx_rb); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -480,8 +544,6 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); - irm_msg__free_unpacked(recv_msg, NULL); return fd; @@ -548,7 +610,7 @@ int flow_dealloc(int fd) return -ENOTALLOC; } - if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) { + if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EBUSY; @@ -559,8 +621,10 @@ int flow_dealloc(int fd) port_destroy(&ai.ports[msg.port_id]); ai.flows[fd].port_id = -1; - shm_ap_rbuff_close(ai.flows[fd].rb); - ai.flows[fd].rb = NULL; + shm_rbuff_destroy(ai.flows[fd].rx_rb); + ai.flows[fd].rx_rb = NULL; + shm_rbuff_close(ai.flows[fd].tx_rb); + ai.flows[fd].tx_rb = NULL; ai.flows[fd].oflags = 0; ai.flows[fd].api = -1; if (ai.flows[fd].timeout != NULL) { @@ -604,9 +668,9 @@ int flow_cntl(int fd, int cmd, int oflags) case FLOW_F_SETFL: /* SET FLOW FLAGS */ ai.flows[fd].oflags = oflags; if (oflags & FLOW_O_WRONLY) - shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id); + shm_rbuff_block(ai.flows[fd].rx_rb); if (oflags & FLOW_O_RDWR) - shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id); + shm_rbuff_unblock(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return old; @@ -620,7 +684,6 @@ int flow_cntl(int fd, int cmd, int oflags) ssize_t flow_write(int fd, void * buf, size_t count) { ssize_t idx; - struct rb_entry e; if (buf == NULL) return 0; @@ -653,13 +716,10 @@ ssize_t flow_write(int fd, void * buf, size_t count) if (idx < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return -idx; + return idx; } - e.index = idx; - e.port_id = ai.flows[fd].port_id; - - if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -667,7 +727,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) } } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; - pid_t api = ai.flows[fd].api; + pid_t api = ai.flows[fd].api; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -681,17 +741,16 @@ ssize_t flow_write(int fd, void * buf, size_t count) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - e.index = idx; - e.port_id = ai.flows[fd].port_id; - - if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { - shm_rdrbuff_remove(ai.rdrb, e.index); + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } } + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -717,15 +776,14 @@ ssize_t flow_read(int fd, void * buf, size_t count) } if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.flows_lock); } else { - struct shm_ap_rbuff * rb = ai.rb; - int port_id = ai.flows[fd].port_id; - struct timespec * timeout = ai.flows[fd].timeout; + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + struct timespec * timeout = ai.flows[fd].timeout; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); + idx = shm_rbuff_read_b(rb, timeout); pthread_rwlock_rdlock(&ai.data_lock); } @@ -757,79 +815,163 @@ struct flow_set * flow_set_create() if (set == NULL) return NULL; - if (pthread_rwlock_init(&set->lock, NULL)) { + assert(ai.fqueues); + + set->idx = bmp_allocate(ai.fqueues); + if (!bmp_is_id_valid(ai.fqueues, set->idx)) { free(set); return NULL; } - memset(set->b, 0, IRMD_MAX_FLOWS); - memset(set->s, 0, IRMD_MAX_FLOWS); + return set; +} - set->dirty = true; +void flow_set_destroy(struct flow_set * set) +{ + if (set == NULL) + return; - return set; + flow_set_zero(set); + bmp_release(ai.fqueues, set->idx); + free(set); } -void flow_set_zero(struct flow_set * set) +struct fqueue * fqueue_create() { - pthread_rwlock_wrlock(&set->lock); - memset(set->b, 0, IRMD_MAX_FLOWS); - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + struct fqueue * fq = malloc(sizeof(*fq)); + if (fq == NULL) + return NULL; + + memset(fq->fqueue, -1, SHM_BUFFER_SIZE); + fq->fqsize = 0; + fq->next = 0; + + return fq; } -void flow_set_add(struct flow_set * set, int fd) +void fqueue_destroy(struct fqueue * fq) { - pthread_rwlock_wrlock(&set->lock); - set->b[ai.flows[fd].port_id] = true; - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + if (fq == NULL) + return + free(fq); } -void flow_set_del(struct flow_set * set, int fd) +void flow_set_zero(struct flow_set * set) { - pthread_rwlock_wrlock(&set->lock); - set->b[ai.flows[fd].port_id] = false; - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + if (set == NULL) + return; + + pthread_rwlock_rdlock(&ai.data_lock); + + shm_flow_set_zero(ai.fqset, set->idx); + + pthread_rwlock_unlock(&ai.data_lock); } -bool flow_set_has(struct flow_set * set, int fd) +int flow_set_add(struct flow_set * set, int fd) { - bool ret; - pthread_rwlock_rdlock(&set->lock); - ret = set->b[ai.flows[fd].port_id]; - pthread_rwlock_unlock(&set->lock); + int ret; + + if (set == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return ret; } -void flow_set_destroy(struct flow_set * set) +void flow_set_del(struct flow_set * set, int fd) { - pthread_rwlock_destroy(&set->lock); - free(set); + if (set == NULL) + return; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].port_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); } -static void flow_set_cpy(struct flow_set * set) +bool flow_set_has(struct flow_set * set, int fd) { - pthread_rwlock_rdlock(&set->lock); - if (set->dirty) - memcpy(set->s, set->b, IRMD_MAX_FLOWS); - set->dirty = false; - pthread_rwlock_unlock(&set->lock); + bool ret = false; + + if (set == NULL || fd < 0) + return false; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return false; + } + + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return ret; } -int flow_select(struct flow_set * set, const struct timespec * timeout) +int fqueue_next(struct fqueue * fq) { - int port_id; - if (set == NULL) { - port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout); - } else { - flow_set_cpy(set); - port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout); + int fd; + + if (fq == NULL) + return -EINVAL; + + if (fq->next == fq->fqsize) { + fq->fqsize = 0; + fq->next = 0; + return -EPERM; } - if (port_id < 0) - return port_id; - return ai.ports[port_id].fd; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + fd = ai.ports[fq->fqueue[fq->next++]].fd; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int flow_event_wait(struct flow_set * set, + struct fqueue * fq, + const struct timespec * timeout) +{ + int ret; + + if (set == NULL) + return -EINVAL; + + if (fq->fqsize > 0) + return 0; + + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (ret == -ETIMEDOUT) + return -ETIMEDOUT; + + if (ret < 0) + return ret; + + fq->fqsize = ret; + fq->next = 0; + + return 0; } /* ipcp-dev functions */ @@ -848,8 +990,8 @@ int np1_flow_alloc(pid_t n_api, int port_id) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(n_api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].rx_rb = shm_rbuff_create(port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -863,8 +1005,6 @@ int np1_flow_alloc(pid_t n_api, int port_id) ai.ports[port_id].fd = fd; port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); - shm_ap_rbuff_open_port(ai.rb, port_id); - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -890,7 +1030,6 @@ int np1_flow_dealloc(int port_id) int np1_flow_resp(pid_t n_api, int port_id) { int fd; - struct shm_ap_rbuff * rb; port_wait_assign(&ai.ports[port_id]); @@ -904,18 +1043,26 @@ int np1_flow_resp(pid_t n_api, int port_id) return fd; } - rb = shm_ap_rbuff_open(n_api); - if (rb == NULL) { + ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); + if (ai.flows[fd].tx_rb == NULL) { ai.flows[fd].port_id = -1; + shm_rbuff_destroy(ai.flows[fd].rx_rb); port_destroy(&ai.ports[port_id]); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -1; } - ai.flows[fd].rb = rb; - - shm_ap_rbuff_open_port(ai.rb, port_id); + ai.flows[fd].set = shm_flow_set_open(n_api); + if (ai.flows[fd].set == NULL) { + shm_rbuff_close(ai.flows[fd].tx_rb); + ai.flows[fd].port_id = -1; + shm_rbuff_destroy(ai.flows[fd].rx_rb); + port_destroy(&ai.ports[port_id]); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -929,9 +1076,9 @@ int ipcp_create_r(pid_t api) irm_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.has_api = true; - msg.api = api; + msg.code = IRM_MSG_CODE__IPCP_CREATE_R; + msg.has_api = true; + msg.api = api; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) @@ -958,11 +1105,11 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) if (dst_name == NULL || src_ae_name == NULL) return -EINVAL; - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_api = true; - msg.api = api; - msg.dst_name = dst_name; - msg.ae_name = src_ae_name; + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_api = true; + msg.api = api; + msg.dst_name = dst_name; + msg.ae_name = src_ae_name; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_wrlock(&ai.flows_lock); @@ -974,7 +1121,7 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) return -1; /* -ENOMOREFDS */ } - ai.flows[fd].rb = NULL; + ai.flows[fd].tx_rb = NULL; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -996,8 +1143,16 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_wrlock(&ai.flows_lock); + ai.flows[fd].rx_rb = shm_rbuff_create(port_id); + if (ai.flows[fd].rx_rb == NULL) { + ai.flows[fd].port_id = -1; + port_destroy(&ai.ports[port_id]); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + ai.flows[fd].port_id = port_id; - ai.flows[fd].rb = NULL; ai.ports[port_id].fd = fd; port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); @@ -1019,16 +1174,13 @@ int ipcp_flow_alloc_reply(int fd, int response) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - msg.port_id = ai.flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); msg.has_response = true; msg.response = response; - if (response) - shm_ap_rbuff_open_port(ai.rb, msg.port_id); - recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -1039,6 +1191,26 @@ int ipcp_flow_alloc_reply(int fd, int response) } ret = recv_msg->result; + + pthread_rwlock_wrlock(&ai.flows_lock); + + ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, + ai.flows[fd].port_id); + if (ai.flows[fd].tx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); + if (ai.flows[fd].set == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + pthread_rwlock_unlock(&ai.flows_lock); + irm_msg__free_unpacked(recv_msg, NULL); return ret; @@ -1061,7 +1233,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_ap_rbuff_read_port(ai.rb, port_id); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); if (idx < 0) { pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1081,7 +1253,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - struct rb_entry e; + ssize_t idx; if (sdb == NULL) return -EINVAL; @@ -1095,16 +1267,16 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return -EPERM; } - if (ai.flows[fd].rb == NULL) { + if (ai.flows[fd].tx_rb == NULL) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EPERM; } - e.index = shm_du_buff_get_idx(sdb); - e.port_id = ai.flows[fd].port_id; + idx = shm_du_buff_get_idx(sdb); - shm_ap_rbuff_write(ai.flows[fd].rb, &e); + shm_rbuff_write(ai.flows[fd].tx_rb, idx); + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1112,46 +1284,28 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return 0; } -struct rb_entry * local_flow_read(int fd) +ssize_t local_flow_read(int fd) { - int port_id; - struct rb_entry * e = NULL; - - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_rdlock(&ai.flows_lock); - - port_id = ai.flows[fd].port_id; - - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - - if (port_id != -1) { - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; - e->index = shm_ap_rbuff_read_port(ai.rb, port_id); - } - - return e; + return shm_rbuff_read(ai.flows[fd].rx_rb); } -int local_flow_write(int fd, struct rb_entry * e) +int local_flow_write(int fd, ssize_t idx) { - if (e == NULL || fd < 0) + if (fd < 0) return -EINVAL; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].rb == NULL) { + if (ai.flows[fd].tx_rb == NULL) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EPERM; } - e->port_id = ai.flows[fd].port_id; + shm_rbuff_write(ai.flows[fd].tx_rb, idx); - shm_ap_rbuff_write(ai.flows[fd].rb, e); + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1159,22 +1313,26 @@ int local_flow_write(int fd, struct rb_entry * e) return 0; } -int ipcp_read_shim(struct shm_du_buff ** sdb) +int ipcp_read_shim(int fd, struct shm_du_buff ** sdb) { - int fd; - struct rb_entry * e = shm_ap_rbuff_read(ai.rb); + ssize_t idx; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - fd = ai.ports[e->port_id].fd; + if (ai.flows[fd].rx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } - *sdb = shm_rdrbuff_get(ai.rdrb, e->index); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return fd; + return 0; } void ipcp_flow_del(struct shm_du_buff * sdb) diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c index 04ce9324..a0222f18 100644 --- a/src/lib/lockfile.c +++ b/src/lib/lockfile.c @@ -39,10 +39,10 @@ struct lockfile { pid_t * api; - int fd; }; struct lockfile * lockfile_create() { + int fd; mode_t mask; struct lockfile * lf = malloc(sizeof(*lf)); if (lf == NULL) @@ -50,8 +50,8 @@ struct lockfile * lockfile_create() { mask = umask(0); - lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); - if (lf->fd == -1) { + fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); + if (fd == -1) { LOG_DBGF("Could not create lock file."); free(lf); return NULL; @@ -59,30 +59,24 @@ struct lockfile * lockfile_create() { umask(mask); - if (ftruncate(lf->fd, LF_SIZE - 1) < 0) { + if (ftruncate(fd, LF_SIZE - 1) < 0) { LOG_DBGF("Failed to extend lockfile."); free(lf); return NULL; } -#ifndef __APPLE__ - if (write(lf->fd, "", 1) != 1) { - LOG_DBGF("Failed to finalise lockfile."); - free(lf); - return NULL; - } -#endif + lf->api = mmap(NULL, LF_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, - lf->fd, + fd, 0); + close (fd); + if (lf->api == MAP_FAILED) { LOG_DBGF("Failed to map lockfile."); - if (shm_unlink(LOCKFILE_NAME) == -1) LOG_DBGF("Failed to remove invalid lockfile."); - free(lf); return NULL; } @@ -93,12 +87,13 @@ struct lockfile * lockfile_create() { } struct lockfile * lockfile_open() { + int fd; struct lockfile * lf = malloc(sizeof(*lf)); if (lf == NULL) return NULL; - lf->fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); - if (lf->fd < 0) { + fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); + if (fd < 0) { LOG_DBGF("Could not open lock file."); free(lf); return NULL; @@ -107,15 +102,15 @@ struct lockfile * lockfile_open() { lf->api = mmap(NULL, LF_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, - lf->fd, + fd, 0); + close(fd); + if (lf->api == MAP_FAILED) { LOG_DBGF("Failed to map lockfile."); - if (shm_unlink(LOCKFILE_NAME) == -1) LOG_DBGF("Failed to remove invalid lockfile."); - free(lf); return NULL; } @@ -130,9 +125,6 @@ void lockfile_close(struct lockfile * lf) return; } - if (close(lf->fd) < 0) - LOG_DBGF("Couldn't close lockfile."); - if (munmap(lf->api, LF_SIZE) == -1) LOG_DBGF("Couldn't unmap lockfile."); @@ -151,9 +143,6 @@ void lockfile_destroy(struct lockfile * lf) return; } - if (close(lf->fd) < 0) - LOG_DBGF("Couldn't close lockfile."); - if (munmap(lf->api, LF_SIZE) == -1) LOG_DBGF("Couldn't unmap lockfile."); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c deleted file mode 100644 index 5cbf5bd0..00000000 --- a/src/lib/shm_ap_rbuff.c +++ /dev/null @@ -1,661 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Ring buffer for application processes - * - * Dimitri Staessens - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include -#include -#include -#include -#include - -#define OUROBOROS_PREFIX "shm_ap_rbuff" - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define FN_MAX_CHARS 255 - -#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ - + IRMD_MAX_FLOWS * sizeof(int8_t) \ - + IRMD_MAX_FLOWS * sizeof (ssize_t) \ - + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ - + 2 * sizeof (pthread_cond_t)) - -#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \ - & (SHM_BUFFER_SIZE - 1)) -#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) -#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) - -struct shm_ap_rbuff { - struct rb_entry * shm_base; /* start of entry */ - size_t * head; /* start of ringbuffer head */ - size_t * tail; /* start of ringbuffer tail */ - int8_t * acl; /* start of port_id access table */ - ssize_t * cntrs; /* start of port_id counters */ - pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * add; /* SDU arrived */ - pthread_cond_t * del; /* SDU removed */ - pid_t api; /* api to which this rb belongs */ - int fd; -}; - -struct shm_ap_rbuff * shm_ap_rbuff_create() -{ - struct shm_ap_rbuff * rb; - int shm_fd; - struct rb_entry * shm_base; - pthread_mutexattr_t mattr; - pthread_condattr_t cattr; - char fn[FN_MAX_CHARS]; - mode_t mask; - int i; - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); - - rb = malloc(sizeof(*rb)); - if (rb == NULL) { - LOG_DBG("Could not allocate struct."); - return NULL; - } - - mask = umask(0); - - shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); - if (shm_fd == -1) { - LOG_DBG("Failed creating ring buffer."); - free(rb); - return NULL; - } - - umask(mask); - - if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { - LOG_DBG("Failed to extend ringbuffer."); - free(rb); - return NULL; - } -#ifndef __APPLE__ - if (write(shm_fd, "", 1) != 1) { - LOG_DBG("Failed to finalise extension of ringbuffer."); - free(rb); - return NULL; - } -#endif - shm_base = mmap(NULL, - SHM_RBUFF_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - - if (shm_base == MAP_FAILED) { - LOG_DBG("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to remove invalid shm."); - - free(rb); - return NULL; - } - - rb->shm_base = shm_base; - rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->tail = rb->head + 1; - rb->acl = (int8_t *) (rb->tail + 1); - rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); - rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); - rb->add = (pthread_cond_t *) (rb->lock + 1); - rb->del = rb->add + 1; - - pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ - pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); -#endif - pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(rb->lock, &mattr); - - pthread_condattr_init(&cattr); - pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - for (i = 0; i < IRMD_MAX_FLOWS; ++i) { - rb->cntrs[i] = 0; - rb->acl[i] = -1; - } - - pthread_cond_init(rb->add, &cattr); - pthread_cond_init(rb->del, &cattr); - - *rb->head = 0; - *rb->tail = 0; - - rb->fd = shm_fd; - rb->api = getpid(); - - return rb; -} - -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) -{ - struct shm_ap_rbuff * rb; - int shm_fd; - struct rb_entry * shm_base; - char fn[FN_MAX_CHARS]; - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); - - rb = malloc(sizeof(*rb)); - if (rb == NULL) { - LOG_DBG("Could not allocate struct."); - return NULL; - } - - shm_fd = shm_open(fn, O_RDWR, 0666); - if (shm_fd == -1) { - LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); - free(rb); - return NULL; - } - - shm_base = mmap(NULL, - SHM_RBUFF_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - - if (shm_base == MAP_FAILED) { - LOG_DBG("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to remove invalid shm."); - - free(rb); - return NULL; - } - - rb->shm_base = shm_base; - rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->tail = rb->head + 1; - rb->acl = (int8_t *) (rb->tail + 1); - rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); - rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); - rb->add = (pthread_cond_t *) (rb->lock + 1); - rb->del = rb->add + 1; - - rb->fd = shm_fd; - rb->api = api; - - return rb; -} - -void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) -{ - assert(rb); - - if (close(rb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - - if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) - LOG_DBG("Couldn't unmap shared memory."); - - free(rb); -} - -void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id) -{ - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - rb->acl[port_id] = 0; /* open */ - - pthread_mutex_unlock(rb->lock); -} - -int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) -{ - int ret = 0; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - rb->acl[port_id] = -1; - - if (rb->cntrs[port_id] > 0) - ret = -EBUSY; - - pthread_mutex_unlock(rb->lock); - - return ret; -} - -void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) -{ - char fn[25]; - struct lockfile * lf = NULL; - - assert(rb); - - if (rb->api != getpid()) { - lf = lockfile_open(); - if (lf == NULL) - return; - if (lockfile_owner(lf) == getpid()) { - LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", - rb->api, getpid()); - lockfile_close(lf); - } else { - LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", - getpid(), rb->api); - lockfile_close(lf); - return; - } - } - - if (close(rb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); - - if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) - LOG_DBG("Couldn't unmap shared memory."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to unlink shm."); - - free(rb); -} - -int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) -{ - assert(rb); - assert(e); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (rb->acl[e->port_id]) { - pthread_mutex_unlock(rb->lock); - return -ENOTALLOC; - } - - if (!shm_rbuff_free(rb)) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - if (shm_rbuff_empty(rb)) - pthread_cond_broadcast(rb->add); - - *head_el_ptr(rb) = *e; - *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); - - ++rb->cntrs[e->port_id]; - - pthread_mutex_unlock(rb->lock); - - return 0; -} - -int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) -{ - int ret = 0; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (shm_rbuff_empty(rb)) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - ret = tail_el_ptr(rb)->index; - --rb->cntrs[tail_el_ptr(rb)->port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_mutex_unlock(rb->lock); - - return ret; -} - -static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret = 0; - - assert(rb); - - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - if (ret != ETIMEDOUT) - ret = tail_el_ptr(rb)->port_id; - else - ret = -ETIMEDOUT; - - pthread_cleanup_pop(true); - - return ret; -} - -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - bool * set, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret; - - assert(rb); - - if (set == NULL) - return shm_ap_rbuff_peek_b_all(rb, timeout); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); - - while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) - && (ret != ETIMEDOUT)) { - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); - -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - while (!set[tail_el_ptr(rb)->port_id]) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->del, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->del, rb->lock); - -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - } - - if (ret != ETIMEDOUT) - ret = tail_el_ptr(rb)->port_id; - else - ret = -ETIMEDOUT; - - pthread_cleanup_pop(true); - - return ret; -} - - -struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) -{ - struct rb_entry * e = NULL; - - assert(rb); - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - while (shm_rbuff_empty(rb)) -#ifdef __APPLE__ - pthread_cond_wait(rb->add, rb->lock); -#else - if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - e = malloc(sizeof(*e)); - if (e != NULL) { - *e = *(rb->shm_base + *rb->tail); - --rb->cntrs[e->port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - } - - pthread_cleanup_pop(true); - - return e; -} - -ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) -{ - ssize_t idx = -1; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - idx = tail_el_ptr(rb)->index; - --rb->cntrs[port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_cond_broadcast(rb->del); - pthread_mutex_unlock(rb->lock); - - return idx; -} - -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret = 0; - ssize_t idx = -1; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (timeout != NULL) { - idx = -ETIMEDOUT; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); - - while ((shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) - && (ret != ETIMEDOUT)) { - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - while (tail_el_ptr(rb)->port_id != port_id) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->del, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->del, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - } - - if (ret != ETIMEDOUT) { - idx = tail_el_ptr(rb)->index; - --rb->cntrs[port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_cond_broadcast(rb->del); - } - - pthread_cleanup_pop(true); - - return idx; -} - -void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) -{ - assert(rb); - - pthread_mutex_lock(rb->lock); - *rb->tail = 0; - *rb->head = 0; - pthread_mutex_unlock(rb->lock); -} diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c new file mode 100644 index 00000000..c960bd25 --- /dev/null +++ b/src/lib/shm_flow_set.c @@ -0,0 +1,408 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Management of flow_sets for fqueue + * + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include +#include + +#define OUROBOROS_PREFIX "shm_flow_set" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define FN_MAX_CHARS 255 + +#define FQUEUESIZE (SHM_BUFFER_SIZE * sizeof(int)) + +#define SHM_FLOW_SET_FILE_SIZE (IRMD_MAX_FLOWS * sizeof(ssize_t) \ + + AP_MAX_FQUEUES * sizeof(size_t) \ + + AP_MAX_FQUEUES * sizeof(pthread_cond_t) \ + + AP_MAX_FQUEUES * FQUEUESIZE \ + + sizeof(pthread_mutex_t)) + +#define fqueue_ptr(fs, idx) (fs->fqueues + SHM_BUFFER_SIZE * idx) + +struct shm_flow_set { + ssize_t * mtable; + size_t * heads; + pthread_cond_t * conds; + int * fqueues; + pthread_mutex_t * lock; + + pid_t api; +}; + +struct shm_flow_set * shm_flow_set_create() +{ + struct shm_flow_set * set; + ssize_t * shm_base; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + char fn[FN_MAX_CHARS]; + mode_t mask; + int shm_fd; + int i; + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid()); + + set = malloc(sizeof(*set)); + if (set == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + mask = umask(0); + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("Failed creating flag file."); + free(set); + return NULL; + } + + umask(mask); + + if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) { + LOG_DBG("Failed to extend flag file."); + free(set); + close(shm_fd); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FLOW_SET_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + + free(set); + return NULL; + } + + set->mtable = shm_base; + set->heads = (size_t *) (set->mtable + IRMD_MAX_FLOWS); + set->conds = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); + set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); + set->lock = (pthread_mutex_t *) + (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + + pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(set->lock, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + for (i = 0; i < AP_MAX_FQUEUES; ++i) { + set->heads[i] = 0; + pthread_cond_init(&set->conds[i], &cattr); + } + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + set->mtable[i] = -1; + + set->api = getpid(); + + return set; +} + +struct shm_flow_set * shm_flow_set_open(pid_t api) +{ + struct shm_flow_set * set; + ssize_t * shm_base; + char fn[FN_MAX_CHARS]; + int shm_fd; + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", api); + + set = malloc(sizeof(*set)); + if (set == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); + free(set); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FLOW_SET_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + free(set); + return NULL; + } + + set->mtable = shm_base; + set->heads = (size_t *) (set->mtable + IRMD_MAX_FLOWS); + set->conds = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); + set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); + set->lock = (pthread_mutex_t *) + (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + + set->api = api; + + return set; +} + +void shm_flow_set_destroy(struct shm_flow_set * set) +{ + char fn[25]; + struct lockfile * lf = NULL; + + assert(set); + + if (set->api != getpid()) { + lf = lockfile_open(); + if (lf == NULL) { + LOG_ERR("Failed to open lockfile."); + return; + } + + if (lockfile_owner(lf) == getpid()) { + LOG_DBG("Flow set %d destroyed by IRMd %d.", + set->api, getpid()); + lockfile_close(lf); + } else { + LOG_ERR("AP-I %d tried to destroy flowset owned by %d.", + getpid(), set->api); + lockfile_close(lf); + return; + } + } + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->api); + + if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to unlink shm."); + + free(set); +} + +void shm_flow_set_close(struct shm_flow_set * set) +{ + assert(set); + + if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + free(set); +} + +void shm_flow_set_zero(struct shm_flow_set * shm_set, + ssize_t idx) +{ + ssize_t i = 0; + + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + if (shm_set->mtable[i] == idx) + shm_set->mtable[i] = -1; + + shm_set->heads[idx] = 0; + + pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_add(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] != -1) { + pthread_mutex_unlock(shm_set->lock); + return -EPERM; + } + + shm_set->mtable[port_id] = idx; + + pthread_mutex_unlock(shm_set->lock); + + return 0; +} + +void shm_flow_set_del(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == idx) + shm_set->mtable[port_id] = -1; + + pthread_mutex_unlock(shm_set->lock); +} + +int shm_flow_set_has(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + int ret = 0; + + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == idx) + ret = 1; + + pthread_mutex_unlock(shm_set->lock); + + return ret; +} + +void shm_flow_set_notify(struct shm_flow_set * shm_set, int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == -1) { + pthread_mutex_unlock(shm_set->lock); + return; + } + + *(fqueue_ptr(shm_set, shm_set->mtable[port_id]) + + (shm_set->heads[shm_set->mtable[port_id]])++) = port_id; + + pthread_cond_signal(&shm_set->conds[shm_set->mtable[port_id]]); + + pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_wait(const struct shm_flow_set * shm_set, + ssize_t idx, + int * fqueue, + const struct timespec * timeout) +{ + int ret = 0; + struct timespec abstime; + + assert(shm_set); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +#ifdef __APPLE__ + pthread_mutex_lock(shm_set->lock); +#else + if (pthread_mutex_lock(shm_set->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(shm_set->lock); + } +#endif + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) shm_set->lock); + + while (shm_set->heads[idx] == 0 && ret != -ETIMEDOUT) { + if (timeout != NULL) + ret = pthread_cond_timedwait(shm_set->conds + idx, + shm_set->lock, + &abstime); + else + ret = pthread_cond_wait(shm_set->conds + idx, + shm_set->lock); +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(shm_set->lock); + } +#endif + if (ret == ETIMEDOUT) { + ret = -ETIMEDOUT; + break; + } + } + + if (ret != -ETIMEDOUT) { + memcpy(fqueue, + fqueue_ptr(shm_set, idx), + shm_set->heads[idx] * sizeof(int)); + ret = shm_set->heads[idx]; + shm_set->heads[idx] = 0; + } + + pthread_cleanup_pop(true); + + return ret; +} diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c new file mode 100644 index 00000000..cf094488 --- /dev/null +++ b/src/lib/shm_rbuff.c @@ -0,0 +1,424 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for incoming SDUs + * + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include + +#define OUROBOROS_PREFIX "shm_rbuff" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define FN_MAX_CHARS 255 + +#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(ssize_t) \ + + 2 * sizeof(size_t) + sizeof(int8_t) \ + + sizeof(pthread_mutex_t) \ + + 2 * sizeof (pthread_cond_t)) + +#define shm_rbuff_used(rb) ((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \ + & (SHM_BUFFER_SIZE - 1)) +#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) +#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) + +struct shm_rbuff { + ssize_t * shm_base; /* start of entry */ + size_t * head; /* start of ringbuffer head */ + size_t * tail; /* start of ringbuffer tail */ + int8_t * acl; /* access control */ + pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_cond_t * add; /* SDU arrived */ + pthread_cond_t * del; /* SDU removed */ + pid_t api; /* api of the owner */ + int port_id; /* port_id of the flow */ +}; + +struct shm_rbuff * shm_rbuff_create(int port_id) +{ + struct shm_rbuff * rb; + int shm_fd; + ssize_t * shm_base; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + char fn[FN_MAX_CHARS]; + mode_t mask; + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", getpid(), port_id); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + mask = umask(0); + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("Failed creating ring buffer."); + free(rb); + return NULL; + } + + umask(mask); + + if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { + LOG_DBG("Failed to extend ringbuffer."); + free(rb); + close(shm_fd); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->lock = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; + + pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(rb->lock, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + pthread_cond_init(rb->add, &cattr); + pthread_cond_init(rb->del, &cattr); + + *rb->acl = 0; + *rb->head = 0; + *rb->tail = 0; + + rb->api = getpid(); + rb->port_id = port_id; + + return rb; +} + +struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id) +{ + struct shm_rbuff * rb; + int shm_fd; + ssize_t * shm_base; + char fn[FN_MAX_CHARS]; + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", api, port_id); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); + free(rb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->lock = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; + + rb->api = api; + rb->port_id = port_id; + + return rb; +} + +void shm_rbuff_close(struct shm_rbuff * rb) +{ + assert(rb); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + free(rb); +} + +void shm_rbuff_destroy(struct shm_rbuff * rb) +{ + char fn[25]; + struct lockfile * lf = NULL; + + assert(rb); + + if (rb->api != getpid()) { + lf = lockfile_open(); + if (lf == NULL) { + LOG_ERR("Failed to open lockfile."); + return; + } + + if (lockfile_owner(lf) == getpid()) { + LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", + rb->api, getpid()); + lockfile_close(lf); + } else { + LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", + getpid(), rb->api); + lockfile_close(lf); + return; + } + } + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to unlink shm."); + + free(rb); +} + +int shm_rbuff_write(struct shm_rbuff * rb, ssize_t idx) +{ + assert(rb); + assert(idx >= 0); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (*rb->acl) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + + if (!shm_rbuff_free(rb)) { + pthread_mutex_unlock(rb->lock); + return -1; + } + + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + + *head_el_ptr(rb) = idx; + *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); + + pthread_mutex_unlock(rb->lock); + + return 0; +} + +ssize_t shm_rbuff_read(struct shm_rbuff * rb) +{ + int ret = 0; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (shm_rbuff_empty(rb)) { + pthread_mutex_unlock(rb->lock); + return -1; + } + + ret = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + ssize_t idx = -1; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (timeout != NULL) { + idx = -ETIMEDOUT; + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (shm_rbuff_empty(rb) && (ret != ETIMEDOUT)) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->add, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->add, rb->lock); +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) { + idx = -ETIMEDOUT; + break; + } + } + + if (idx != -ETIMEDOUT) { + idx = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); + pthread_cond_broadcast(rb->del); + } + + pthread_cleanup_pop(true); + + return idx; +} + +int shm_rbuff_block(struct shm_rbuff * rb) +{ + int ret = 0; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + *rb->acl = -1; + + if (!shm_rbuff_empty(rb)) + ret = -EBUSY; + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +void shm_rbuff_unblock(struct shm_rbuff * rb) +{ + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + *rb->acl = 0; /* open */ + + pthread_mutex_unlock(rb->lock); +} + +void shm_rbuff_reset(struct shm_rbuff * rb) +{ + assert(rb); + + pthread_mutex_lock(rb->lock); + *rb->tail = 0; + *rb->head = 0; + pthread_mutex_unlock(rb->lock); +} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index f6683dc2..e5a37577 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -90,7 +90,6 @@ struct shm_rdrbuff { pthread_cond_t * full; /* run sanitizer when buffer full */ pid_t * api; /* api of the irmd owner */ enum qos_cube qos; /* qos id which this buffer serves */ - int fd; }; static void garbage_collect(struct shm_rdrbuff * rdrb) @@ -189,17 +188,11 @@ struct shm_rdrbuff * shm_rdrbuff_create() if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { LOG_DBGF("Failed to extend shared memory map."); free(shm_rdrb_fn); + close(shm_fd); free(rdrb); return NULL; } -#ifndef __APPLE - if (write(shm_fd, "", 1) != 1) { - LOG_DBGF("Failed to finalise extension of shared memory map."); - free(shm_rdrb_fn); - free(rdrb); - return NULL; - } -#endif + shm_base = mmap(NULL, SHM_FILE_SIZE, PROT_READ | PROT_WRITE, @@ -207,6 +200,8 @@ struct shm_rdrbuff * shm_rdrbuff_create() shm_fd, 0); + close(shm_fd); + if (shm_base == MAP_FAILED) { LOG_DBGF("Failed to map shared memory."); if (shm_unlink(shm_rdrb_fn) == -1) @@ -235,6 +230,9 @@ struct shm_rdrbuff * shm_rdrbuff_create() pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif pthread_cond_init(rdrb->full, &cattr); pthread_cond_init(rdrb->healthy, &cattr); @@ -246,7 +244,6 @@ struct shm_rdrbuff * shm_rdrbuff_create() *rdrb->api = getpid(); rdrb->qos = qos; - rdrb->fd = shm_fd; free(shm_rdrb_fn); @@ -287,10 +284,11 @@ struct shm_rdrbuff * shm_rdrbuff_open() MAP_SHARED, shm_fd, 0); + + close(shm_fd); + if (shm_base == MAP_FAILED) { LOG_DBGF("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); if (shm_unlink(shm_rdrb_fn) == -1) LOG_DBG("Failed to unlink invalid shm."); free(shm_rdrb_fn); @@ -309,7 +307,6 @@ struct shm_rdrbuff * shm_rdrbuff_open() rdrb->api = (pid_t *) (rdrb->full + 1); rdrb->qos = qos; - rdrb->fd = shm_fd; free(shm_rdrb_fn); @@ -400,9 +397,6 @@ void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) { assert(rdrb); - if (close(rdrb->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) LOG_DBGF("Couldn't unmap shared memory."); @@ -420,9 +414,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) return; } - if (close(rdrb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); -- cgit v1.2.3