summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-19 22:25:46 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-21 14:17:51 +0200
commitf516b51169020ea1957010fbd1005d746f01b1d9 (patch)
tree03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/lib
parentc79ab46894053312f80390bf13a52c238a7d4704 (diff)
downloadouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz
ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/CMakeLists.txt3
-rw-r--r--src/lib/dev.c500
-rw-r--r--src/lib/lockfile.c39
-rw-r--r--src/lib/shm_ap_rbuff.c661
-rw-r--r--src/lib/shm_flow_set.c408
-rw-r--r--src/lib/shm_rbuff.c424
-rw-r--r--src/lib/shm_rdrbuff.c29
7 files changed, 1187 insertions, 877 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index b94d0eea..20ea473d 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -35,7 +35,8 @@ set(SOURCE_FILES
lockfile.c
logs.c
nsm.c
- shm_ap_rbuff.c
+ shm_flow_set.c
+ shm_rbuff.c
shm_rdrbuff.c
sockets.c
time_utils.c
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 77c2d06a..f735e72b 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -3,7 +3,8 @@
*
* API for applications
*
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -26,20 +27,24 @@
#include <ouroboros/sockets.h>
#include <ouroboros/fcntl.h>
#include <ouroboros/bitmap.h>
+#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/shm_rbuff.h>
#include <ouroboros/utils.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
struct flow_set {
- bool dirty;
- bool b[IRMD_MAX_FLOWS]; /* working copy */
- bool s[IRMD_MAX_FLOWS]; /* safe copy */
- pthread_rwlock_t lock;
+ size_t idx;
+};
+
+struct fqueue {
+ int fqueue[SHM_BUFFER_SIZE]; /* safe copy from shm */
+ size_t fqsize;
+ size_t next;
};
enum port_state {
@@ -124,7 +129,9 @@ enum port_state port_wait_assign(struct port * p)
}
struct flow {
- struct shm_ap_rbuff * rb;
+ struct shm_rbuff * rx_rb;
+ struct shm_rbuff * tx_rb;
+ struct shm_flow_set * set;
int port_id;
int oflags;
@@ -139,10 +146,11 @@ struct {
pid_t api;
struct shm_rdrbuff * rdrb;
- struct shm_ap_rbuff * rb;
+ struct shm_flow_set * fqset;
pthread_rwlock_t data_lock;
struct bmp * fds;
+ struct bmp * fqueues;
struct flow * flows;
struct port * ports;
@@ -194,40 +202,52 @@ int ap_init(char * ap_name)
if (ai.fds == NULL)
return -ENOMEM;
- ai.rdrb = shm_rdrbuff_open();
- if (ai.rdrb == NULL) {
+ ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0);
+ if (ai.fqueues == NULL) {
+ bmp_destroy(ai.fds);
+ return -ENOMEM;
+ }
+
+ ai.fqset = shm_flow_set_create();
+ if (ai.fqset == NULL) {
+ bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
return -1;
}
- ai.rb = shm_ap_rbuff_create();
- if (ai.rb == NULL) {
- shm_rdrbuff_close(ai.rdrb);
+ ai.rdrb = shm_rdrbuff_open();
+ if (ai.rdrb == NULL) {
+ shm_flow_set_destroy(ai.fqset);
+ bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
return -1;
}
ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);
if (ai.flows == NULL) {
- shm_ap_rbuff_destroy(ai.rb);
shm_rdrbuff_close(ai.rdrb);
+ shm_flow_set_destroy(ai.fqset);
+ bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
return -1;
}
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- ai.flows[i].rb = NULL;
+ ai.flows[i].rx_rb = NULL;
+ ai.flows[i].tx_rb = NULL;
+ ai.flows[i].set = NULL;
ai.flows[i].port_id = -1;
- ai.flows[i].oflags = 0;
- ai.flows[i].api = -1;
+ ai.flows[i].oflags = 0;
+ ai.flows[i].api = -1;
ai.flows[i].timeout = NULL;
}
ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
- if (ai.flows == NULL) {
+ if (ai.ports == NULL) {
free(ai.flows);
- shm_ap_rbuff_destroy(ai.rb);
shm_rdrbuff_close(ai.rdrb);
+ shm_flow_set_destroy(ai.fqset);
+ bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
return -1;
}
@@ -253,16 +273,10 @@ void ap_fini()
pthread_rwlock_wrlock(&ai.data_lock);
- /* remove all remaining sdus */
- while ((i = shm_ap_rbuff_pop_idx(ai.rb)) >= 0)
- shm_rdrbuff_remove(ai.rdrb, i);
-
- if (ai.fds != NULL)
- bmp_destroy(ai.fds);
- if (ai.rb != NULL)
- shm_ap_rbuff_destroy(ai.rb);
- if (ai.rdrb != NULL)
- shm_rdrbuff_close(ai.rdrb);
+ bmp_destroy(ai.fds);
+ bmp_destroy(ai.fqueues);
+ shm_flow_set_destroy(ai.fqset);
+ shm_rdrbuff_close(ai.rdrb);
if (ai.daf_name != NULL)
free(ai.daf_name);
@@ -270,8 +284,15 @@ void ap_fini()
pthread_rwlock_rdlock(&ai.flows_lock);
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (ai.flows[i].rb != NULL)
- shm_ap_rbuff_close(ai.flows[i].rb);
+ if (ai.flows[i].tx_rb != NULL) {
+ int idx;
+ while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ shm_rbuff_destroy(ai.flows[i].rx_rb);
+ shm_rbuff_close(ai.flows[i].tx_rb);
+ shm_flow_set_close(ai.flows[i].set);
+ }
+
if (ai.flows[i].timeout != NULL)
free(ai.flows[i].timeout);
}
@@ -328,8 +349,8 @@ int flow_accept(char ** ae_name)
return -1;
}
- ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
- if (ai.flows[fd].rb == NULL) {
+ ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -337,10 +358,24 @@ int flow_accept(char ** ae_name)
return -1;
}
+ ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
+ if (ai.flows[fd].set == NULL) {
+ bmp_release(ai.fds, fd);
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+
if (ae_name != NULL) {
*ae_name = strdup(recv_msg->ae_name);
if (*ae_name == NULL) {
- shm_ap_rbuff_close(ai.flows[fd].rb);
+ shm_rbuff_destroy(ai.flows[fd].tx_rb);
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ shm_flow_set_close(ai.flows[fd].set);
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -356,8 +391,6 @@ int flow_accept(char ** ae_name)
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
- shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id);
-
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -410,6 +443,17 @@ int flow_alloc_resp(int fd, int response)
ret = recv_msg->result;
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api,
+ ai.flows[fd].port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -461,8 +505,11 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
return -1;
}
- ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
- if (ai.flows[fd].rb == NULL) {
+ ai.flows[fd].port_id = recv_msg->port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = recv_msg->api;
+ ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -470,9 +517,26 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
return -1;
}
- ai.flows[fd].port_id = recv_msg->port_id;
- ai.flows[fd].oflags = FLOW_O_DEFAULT;
- ai.flows[fd].api = recv_msg->api;
+ ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
+ if (ai.flows[fd].set == NULL) {
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
@@ -480,8 +544,6 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id);
-
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
@@ -548,7 +610,7 @@ int flow_dealloc(int fd)
return -ENOTALLOC;
}
- if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) {
+ if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -EBUSY;
@@ -559,8 +621,10 @@ int flow_dealloc(int fd)
port_destroy(&ai.ports[msg.port_id]);
ai.flows[fd].port_id = -1;
- shm_ap_rbuff_close(ai.flows[fd].rb);
- ai.flows[fd].rb = NULL;
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ ai.flows[fd].rx_rb = NULL;
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ ai.flows[fd].tx_rb = NULL;
ai.flows[fd].oflags = 0;
ai.flows[fd].api = -1;
if (ai.flows[fd].timeout != NULL) {
@@ -604,9 +668,9 @@ int flow_cntl(int fd, int cmd, int oflags)
case FLOW_F_SETFL: /* SET FLOW FLAGS */
ai.flows[fd].oflags = oflags;
if (oflags & FLOW_O_WRONLY)
- shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id);
+ shm_rbuff_block(ai.flows[fd].rx_rb);
if (oflags & FLOW_O_RDWR)
- shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id);
+ shm_rbuff_unblock(ai.flows[fd].rx_rb);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return old;
@@ -620,7 +684,6 @@ int flow_cntl(int fd, int cmd, int oflags)
ssize_t flow_write(int fd, void * buf, size_t count)
{
ssize_t idx;
- struct rb_entry e;
if (buf == NULL)
return 0;
@@ -653,13 +716,10 @@ ssize_t flow_write(int fd, void * buf, size_t count)
if (idx < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return -idx;
+ return idx;
}
- e.index = idx;
- e.port_id = ai.flows[fd].port_id;
-
- if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) {
+ if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) {
shm_rdrbuff_remove(ai.rdrb, idx);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -667,7 +727,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)
}
} else { /* blocking */
struct shm_rdrbuff * rdrb = ai.rdrb;
- pid_t api = ai.flows[fd].api;
+ pid_t api = ai.flows[fd].api;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -681,17 +741,16 @@ ssize_t flow_write(int fd, void * buf, size_t count)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- e.index = idx;
- e.port_id = ai.flows[fd].port_id;
-
- if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) {
- shm_rdrbuff_remove(ai.rdrb, e.index);
+ if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
}
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -717,15 +776,14 @@ ssize_t flow_read(int fd, void * buf, size_t count)
}
if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id);
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
pthread_rwlock_unlock(&ai.flows_lock);
} else {
- struct shm_ap_rbuff * rb = ai.rb;
- int port_id = ai.flows[fd].port_id;
- struct timespec * timeout = ai.flows[fd].timeout;
+ struct shm_rbuff * rb = ai.flows[fd].rx_rb;
+ struct timespec * timeout = ai.flows[fd].timeout;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout);
+ idx = shm_rbuff_read_b(rb, timeout);
pthread_rwlock_rdlock(&ai.data_lock);
}
@@ -757,79 +815,163 @@ struct flow_set * flow_set_create()
if (set == NULL)
return NULL;
- if (pthread_rwlock_init(&set->lock, NULL)) {
+ assert(ai.fqueues);
+
+ set->idx = bmp_allocate(ai.fqueues);
+ if (!bmp_is_id_valid(ai.fqueues, set->idx)) {
free(set);
return NULL;
}
- memset(set->b, 0, IRMD_MAX_FLOWS);
- memset(set->s, 0, IRMD_MAX_FLOWS);
+ return set;
+}
- set->dirty = true;
+void flow_set_destroy(struct flow_set * set)
+{
+ if (set == NULL)
+ return;
- return set;
+ flow_set_zero(set);
+ bmp_release(ai.fqueues, set->idx);
+ free(set);
}
-void flow_set_zero(struct flow_set * set)
+struct fqueue * fqueue_create()
{
- pthread_rwlock_wrlock(&set->lock);
- memset(set->b, 0, IRMD_MAX_FLOWS);
- set->dirty = true;
- pthread_rwlock_unlock(&set->lock);
+ struct fqueue * fq = malloc(sizeof(*fq));
+ if (fq == NULL)
+ return NULL;
+
+ memset(fq->fqueue, -1, SHM_BUFFER_SIZE);
+ fq->fqsize = 0;
+ fq->next = 0;
+
+ return fq;
}
-void flow_set_add(struct flow_set * set, int fd)
+void fqueue_destroy(struct fqueue * fq)
{
- pthread_rwlock_wrlock(&set->lock);
- set->b[ai.flows[fd].port_id] = true;
- set->dirty = true;
- pthread_rwlock_unlock(&set->lock);
+ if (fq == NULL)
+ return
+ free(fq);
}
-void flow_set_del(struct flow_set * set, int fd)
+void flow_set_zero(struct flow_set * set)
{
- pthread_rwlock_wrlock(&set->lock);
- set->b[ai.flows[fd].port_id] = false;
- set->dirty = true;
- pthread_rwlock_unlock(&set->lock);
+ if (set == NULL)
+ return;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+
+ shm_flow_set_zero(ai.fqset, set->idx);
+
+ pthread_rwlock_unlock(&ai.data_lock);
}
-bool flow_set_has(struct flow_set * set, int fd)
+int flow_set_add(struct flow_set * set, int fd)
{
- bool ret;
- pthread_rwlock_rdlock(&set->lock);
- ret = set->b[ai.flows[fd].port_id];
- pthread_rwlock_unlock(&set->lock);
+ int ret;
+
+ if (set == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
return ret;
}
-void flow_set_destroy(struct flow_set * set)
+void flow_set_del(struct flow_set * set, int fd)
{
- pthread_rwlock_destroy(&set->lock);
- free(set);
+ if (set == NULL)
+ return;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
}
-static void flow_set_cpy(struct flow_set * set)
+bool flow_set_has(struct flow_set * set, int fd)
{
- pthread_rwlock_rdlock(&set->lock);
- if (set->dirty)
- memcpy(set->s, set->b, IRMD_MAX_FLOWS);
- set->dirty = false;
- pthread_rwlock_unlock(&set->lock);
+ bool ret = false;
+
+ if (set == NULL || fd < 0)
+ return false;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return false;
+ }
+
+ ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return ret;
}
-int flow_select(struct flow_set * set, const struct timespec * timeout)
+int fqueue_next(struct fqueue * fq)
{
- int port_id;
- if (set == NULL) {
- port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout);
- } else {
- flow_set_cpy(set);
- port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout);
+ int fd;
+
+ if (fq == NULL)
+ return -EINVAL;
+
+ if (fq->next == fq->fqsize) {
+ fq->fqsize = 0;
+ fq->next = 0;
+ return -EPERM;
}
- if (port_id < 0)
- return port_id;
- return ai.ports[port_id].fd;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ fd = ai.ports[fq->fqueue[fq->next++]].fd;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int flow_event_wait(struct flow_set * set,
+ struct fqueue * fq,
+ const struct timespec * timeout)
+{
+ int ret;
+
+ if (set == NULL)
+ return -EINVAL;
+
+ if (fq->fqsize > 0)
+ return 0;
+
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout);
+ if (ret == -ETIMEDOUT)
+ return -ETIMEDOUT;
+
+ if (ret < 0)
+ return ret;
+
+ fq->fqsize = ret;
+ fq->next = 0;
+
+ return 0;
}
/* ipcp-dev functions */
@@ -848,8 +990,8 @@ int np1_flow_alloc(pid_t n_api, int port_id)
return -1;
}
- ai.flows[fd].rb = shm_ap_rbuff_open(n_api);
- if (ai.flows[fd].rb == NULL) {
+ ai.flows[fd].rx_rb = shm_rbuff_create(port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -863,8 +1005,6 @@ int np1_flow_alloc(pid_t n_api, int port_id)
ai.ports[port_id].fd = fd;
port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
- shm_ap_rbuff_open_port(ai.rb, port_id);
-
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -890,7 +1030,6 @@ int np1_flow_dealloc(int port_id)
int np1_flow_resp(pid_t n_api, int port_id)
{
int fd;
- struct shm_ap_rbuff * rb;
port_wait_assign(&ai.ports[port_id]);
@@ -904,18 +1043,26 @@ int np1_flow_resp(pid_t n_api, int port_id)
return fd;
}
- rb = shm_ap_rbuff_open(n_api);
- if (rb == NULL) {
+ ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
ai.flows[fd].port_id = -1;
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
port_destroy(&ai.ports[port_id]);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
- ai.flows[fd].rb = rb;
-
- shm_ap_rbuff_open_port(ai.rb, port_id);
+ ai.flows[fd].set = shm_flow_set_open(n_api);
+ if (ai.flows[fd].set == NULL) {
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ ai.flows[fd].port_id = -1;
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ port_destroy(&ai.ports[port_id]);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -929,9 +1076,9 @@ int ipcp_create_r(pid_t api)
irm_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.has_api = true;
- msg.api = api;
+ msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
+ msg.has_api = true;
+ msg.api = api;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
@@ -958,11 +1105,11 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name)
if (dst_name == NULL || src_ae_name == NULL)
return -EINVAL;
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_api = true;
- msg.api = api;
- msg.dst_name = dst_name;
- msg.ae_name = src_ae_name;
+ msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg.has_api = true;
+ msg.api = api;
+ msg.dst_name = dst_name;
+ msg.ae_name = src_ae_name;
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_wrlock(&ai.flows_lock);
@@ -974,7 +1121,7 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name)
return -1; /* -ENOMOREFDS */
}
- ai.flows[fd].rb = NULL;
+ ai.flows[fd].tx_rb = NULL;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -996,8 +1143,16 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_wrlock(&ai.flows_lock);
+ ai.flows[fd].rx_rb = shm_rbuff_create(port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
+ ai.flows[fd].port_id = -1;
+ port_destroy(&ai.ports[port_id]);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
ai.flows[fd].port_id = port_id;
- ai.flows[fd].rb = NULL;
ai.ports[port_id].fd = fd;
port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED);
@@ -1019,16 +1174,13 @@ int ipcp_flow_alloc_reply(int fd, int response)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- msg.port_id = ai.flows[fd].port_id;
+ msg.port_id = ai.flows[fd].port_id;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
msg.has_response = true;
msg.response = response;
- if (response)
- shm_ap_rbuff_open_port(ai.rb, msg.port_id);
-
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -1039,6 +1191,26 @@ int ipcp_flow_alloc_reply(int fd, int response)
}
ret = recv_msg->result;
+
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api,
+ ai.flows[fd].port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api);
+ if (ai.flows[fd].set == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+
irm_msg__free_unpacked(recv_msg, NULL);
return ret;
@@ -1061,7 +1233,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb)
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- idx = shm_ap_rbuff_read_port(ai.rb, port_id);
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
if (idx < 0) {
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
@@ -1081,7 +1253,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb)
int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
{
- struct rb_entry e;
+ ssize_t idx;
if (sdb == NULL)
return -EINVAL;
@@ -1095,16 +1267,16 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return -EPERM;
}
- if (ai.flows[fd].rb == NULL) {
+ if (ai.flows[fd].tx_rb == NULL) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -EPERM;
}
- e.index = shm_du_buff_get_idx(sdb);
- e.port_id = ai.flows[fd].port_id;
+ idx = shm_du_buff_get_idx(sdb);
- shm_ap_rbuff_write(ai.flows[fd].rb, &e);
+ shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1112,46 +1284,28 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return 0;
}
-struct rb_entry * local_flow_read(int fd)
+ssize_t local_flow_read(int fd)
{
- int port_id;
- struct rb_entry * e = NULL;
-
- pthread_rwlock_rdlock(&ai.data_lock);
- pthread_rwlock_rdlock(&ai.flows_lock);
-
- port_id = ai.flows[fd].port_id;
-
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
-
- if (port_id != -1) {
- e = malloc(sizeof(*e));
- if (e == NULL)
- return NULL;
- e->index = shm_ap_rbuff_read_port(ai.rb, port_id);
- }
-
- return e;
+ return shm_rbuff_read(ai.flows[fd].rx_rb);
}
-int local_flow_write(int fd, struct rb_entry * e)
+int local_flow_write(int fd, ssize_t idx)
{
- if (e == NULL || fd < 0)
+ if (fd < 0)
return -EINVAL;
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].rb == NULL) {
+ if (ai.flows[fd].tx_rb == NULL) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -EPERM;
}
- e->port_id = ai.flows[fd].port_id;
+ shm_rbuff_write(ai.flows[fd].tx_rb, idx);
- shm_ap_rbuff_write(ai.flows[fd].rb, e);
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1159,22 +1313,26 @@ int local_flow_write(int fd, struct rb_entry * e)
return 0;
}
-int ipcp_read_shim(struct shm_du_buff ** sdb)
+int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)
{
- int fd;
- struct rb_entry * e = shm_ap_rbuff_read(ai.rb);
+ ssize_t idx;
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- fd = ai.ports[e->port_id].fd;
+ if (ai.flows[fd].rx_rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EPERM;
+ }
- *sdb = shm_rdrbuff_get(ai.rdrb, e->index);
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return fd;
+ return 0;
}
void ipcp_flow_del(struct shm_du_buff * sdb)
diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c
index 04ce9324..a0222f18 100644
--- a/src/lib/lockfile.c
+++ b/src/lib/lockfile.c
@@ -39,10 +39,10 @@
struct lockfile {
pid_t * api;
- int fd;
};
struct lockfile * lockfile_create() {
+ int fd;
mode_t mask;
struct lockfile * lf = malloc(sizeof(*lf));
if (lf == NULL)
@@ -50,8 +50,8 @@ struct lockfile * lockfile_create() {
mask = umask(0);
- lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666);
- if (lf->fd == -1) {
+ fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666);
+ if (fd == -1) {
LOG_DBGF("Could not create lock file.");
free(lf);
return NULL;
@@ -59,30 +59,24 @@ struct lockfile * lockfile_create() {
umask(mask);
- if (ftruncate(lf->fd, LF_SIZE - 1) < 0) {
+ if (ftruncate(fd, LF_SIZE - 1) < 0) {
LOG_DBGF("Failed to extend lockfile.");
free(lf);
return NULL;
}
-#ifndef __APPLE__
- if (write(lf->fd, "", 1) != 1) {
- LOG_DBGF("Failed to finalise lockfile.");
- free(lf);
- return NULL;
- }
-#endif
+
lf->api = mmap(NULL,
LF_SIZE, PROT_READ | PROT_WRITE,
MAP_SHARED,
- lf->fd,
+ fd,
0);
+ close (fd);
+
if (lf->api == MAP_FAILED) {
LOG_DBGF("Failed to map lockfile.");
-
if (shm_unlink(LOCKFILE_NAME) == -1)
LOG_DBGF("Failed to remove invalid lockfile.");
-
free(lf);
return NULL;
}
@@ -93,12 +87,13 @@ struct lockfile * lockfile_create() {
}
struct lockfile * lockfile_open() {
+ int fd;
struct lockfile * lf = malloc(sizeof(*lf));
if (lf == NULL)
return NULL;
- lf->fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666);
- if (lf->fd < 0) {
+ fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666);
+ if (fd < 0) {
LOG_DBGF("Could not open lock file.");
free(lf);
return NULL;
@@ -107,15 +102,15 @@ struct lockfile * lockfile_open() {
lf->api = mmap(NULL,
LF_SIZE, PROT_READ | PROT_WRITE,
MAP_SHARED,
- lf->fd,
+ fd,
0);
+ close(fd);
+
if (lf->api == MAP_FAILED) {
LOG_DBGF("Failed to map lockfile.");
-
if (shm_unlink(LOCKFILE_NAME) == -1)
LOG_DBGF("Failed to remove invalid lockfile.");
-
free(lf);
return NULL;
}
@@ -130,9 +125,6 @@ void lockfile_close(struct lockfile * lf)
return;
}
- if (close(lf->fd) < 0)
- LOG_DBGF("Couldn't close lockfile.");
-
if (munmap(lf->api, LF_SIZE) == -1)
LOG_DBGF("Couldn't unmap lockfile.");
@@ -151,9 +143,6 @@ void lockfile_destroy(struct lockfile * lf)
return;
}
- if (close(lf->fd) < 0)
- LOG_DBGF("Couldn't close lockfile.");
-
if (munmap(lf->api, LF_SIZE) == -1)
LOG_DBGF("Couldn't unmap lockfile.");
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
deleted file mode 100644
index 5cbf5bd0..00000000
--- a/src/lib/shm_ap_rbuff.c
+++ /dev/null
@@ -1,661 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016
- *
- * Ring buffer for application processes
- *
- * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#include <ouroboros/config.h>
-#include <ouroboros/shm_ap_rbuff.h>
-#include <ouroboros/lockfile.h>
-#include <ouroboros/time_utils.h>
-#include <ouroboros/errno.h>
-
-#define OUROBOROS_PREFIX "shm_ap_rbuff"
-
-#include <ouroboros/logs.h>
-
-#include <pthread.h>
-#include <sys/mman.h>
-#include <fcntl.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdint.h>
-#include <unistd.h>
-#include <signal.h>
-#include <sys/stat.h>
-#include <assert.h>
-
-#define FN_MAX_CHARS 255
-
-#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
- + IRMD_MAX_FLOWS * sizeof(int8_t) \
- + IRMD_MAX_FLOWS * sizeof (ssize_t) \
- + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
- + 2 * sizeof (pthread_cond_t))
-
-#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \
- & (SHM_BUFFER_SIZE - 1))
-#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE)
-#define shm_rbuff_empty(rb) (*rb->head == *rb->tail)
-#define head_el_ptr(rb) (rb->shm_base + *rb->head)
-#define tail_el_ptr(rb) (rb->shm_base + *rb->tail)
-
-struct shm_ap_rbuff {
- struct rb_entry * shm_base; /* start of entry */
- size_t * head; /* start of ringbuffer head */
- size_t * tail; /* start of ringbuffer tail */
- int8_t * acl; /* start of port_id access table */
- ssize_t * cntrs; /* start of port_id counters */
- pthread_mutex_t * lock; /* lock all free space in shm */
- pthread_cond_t * add; /* SDU arrived */
- pthread_cond_t * del; /* SDU removed */
- pid_t api; /* api to which this rb belongs */
- int fd;
-};
-
-struct shm_ap_rbuff * shm_ap_rbuff_create()
-{
- struct shm_ap_rbuff * rb;
- int shm_fd;
- struct rb_entry * shm_base;
- pthread_mutexattr_t mattr;
- pthread_condattr_t cattr;
- char fn[FN_MAX_CHARS];
- mode_t mask;
- int i;
-
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
-
- rb = malloc(sizeof(*rb));
- if (rb == NULL) {
- LOG_DBG("Could not allocate struct.");
- return NULL;
- }
-
- mask = umask(0);
-
- shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
- if (shm_fd == -1) {
- LOG_DBG("Failed creating ring buffer.");
- free(rb);
- return NULL;
- }
-
- umask(mask);
-
- if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) {
- LOG_DBG("Failed to extend ringbuffer.");
- free(rb);
- return NULL;
- }
-#ifndef __APPLE__
- if (write(shm_fd, "", 1) != 1) {
- LOG_DBG("Failed to finalise extension of ringbuffer.");
- free(rb);
- return NULL;
- }
-#endif
- shm_base = mmap(NULL,
- SHM_RBUFF_FILE_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_SHARED,
- shm_fd,
- 0);
-
- if (shm_base == MAP_FAILED) {
- LOG_DBG("Failed to map shared memory.");
- if (close(shm_fd) == -1)
- LOG_DBG("Failed to close invalid shm.");
-
- if (shm_unlink(fn) == -1)
- LOG_DBG("Failed to remove invalid shm.");
-
- free(rb);
- return NULL;
- }
-
- rb->shm_base = shm_base;
- rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
- rb->tail = rb->head + 1;
- rb->acl = (int8_t *) (rb->tail + 1);
- rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS);
- rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS);
- rb->add = (pthread_cond_t *) (rb->lock + 1);
- rb->del = rb->add + 1;
-
- pthread_mutexattr_init(&mattr);
-#ifndef __APPLE__
- pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
-#endif
- pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
- pthread_mutex_init(rb->lock, &mattr);
-
- pthread_condattr_init(&cattr);
- pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
- rb->cntrs[i] = 0;
- rb->acl[i] = -1;
- }
-
- pthread_cond_init(rb->add, &cattr);
- pthread_cond_init(rb->del, &cattr);
-
- *rb->head = 0;
- *rb->tail = 0;
-
- rb->fd = shm_fd;
- rb->api = getpid();
-
- return rb;
-}
-
-struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
-{
- struct shm_ap_rbuff * rb;
- int shm_fd;
- struct rb_entry * shm_base;
- char fn[FN_MAX_CHARS];
-
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api);
-
- rb = malloc(sizeof(*rb));
- if (rb == NULL) {
- LOG_DBG("Could not allocate struct.");
- return NULL;
- }
-
- shm_fd = shm_open(fn, O_RDWR, 0666);
- if (shm_fd == -1) {
- LOG_DBG("%d failed opening shared memory %s.", getpid(), fn);
- free(rb);
- return NULL;
- }
-
- shm_base = mmap(NULL,
- SHM_RBUFF_FILE_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_SHARED,
- shm_fd,
- 0);
-
- if (shm_base == MAP_FAILED) {
- LOG_DBG("Failed to map shared memory.");
- if (close(shm_fd) == -1)
- LOG_DBG("Failed to close invalid shm.");
-
- if (shm_unlink(fn) == -1)
- LOG_DBG("Failed to remove invalid shm.");
-
- free(rb);
- return NULL;
- }
-
- rb->shm_base = shm_base;
- rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
- rb->tail = rb->head + 1;
- rb->acl = (int8_t *) (rb->tail + 1);
- rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS);
- rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS);
- rb->add = (pthread_cond_t *) (rb->lock + 1);
- rb->del = rb->add + 1;
-
- rb->fd = shm_fd;
- rb->api = api;
-
- return rb;
-}
-
-void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
-{
- assert(rb);
-
- if (close(rb->fd) < 0)
- LOG_DBG("Couldn't close shared memory.");
-
- if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
- LOG_DBG("Couldn't unmap shared memory.");
-
- free(rb);
-}
-
-void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id)
-{
- assert(rb);
-
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- rb->acl[port_id] = 0; /* open */
-
- pthread_mutex_unlock(rb->lock);
-}
-
-int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id)
-{
- int ret = 0;
-
- assert(rb);
-
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- rb->acl[port_id] = -1;
-
- if (rb->cntrs[port_id] > 0)
- ret = -EBUSY;
-
- pthread_mutex_unlock(rb->lock);
-
- return ret;
-}
-
-void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
-{
- char fn[25];
- struct lockfile * lf = NULL;
-
- assert(rb);
-
- if (rb->api != getpid()) {
- lf = lockfile_open();
- if (lf == NULL)
- return;
- if (lockfile_owner(lf) == getpid()) {
- LOG_DBG("Ringbuffer %d destroyed by IRMd %d.",
- rb->api, getpid());
- lockfile_close(lf);
- } else {
- LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.",
- getpid(), rb->api);
- lockfile_close(lf);
- return;
- }
- }
-
- if (close(rb->fd) < 0)
- LOG_DBG("Couldn't close shared memory.");
-
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api);
-
- if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
- LOG_DBG("Couldn't unmap shared memory.");
-
- if (shm_unlink(fn) == -1)
- LOG_DBG("Failed to unlink shm.");
-
- free(rb);
-}
-
-int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
-{
- assert(rb);
- assert(e);
-
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (rb->acl[e->port_id]) {
- pthread_mutex_unlock(rb->lock);
- return -ENOTALLOC;
- }
-
- if (!shm_rbuff_free(rb)) {
- pthread_mutex_unlock(rb->lock);
- return -1;
- }
-
- if (shm_rbuff_empty(rb))
- pthread_cond_broadcast(rb->add);
-
- *head_el_ptr(rb) = *e;
- *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1);
-
- ++rb->cntrs[e->port_id];
-
- pthread_mutex_unlock(rb->lock);
-
- return 0;
-}
-
-int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb)
-{
- int ret = 0;
-
- assert(rb);
-
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (shm_rbuff_empty(rb)) {
- pthread_mutex_unlock(rb->lock);
- return -1;
- }
-
- ret = tail_el_ptr(rb)->index;
- --rb->cntrs[tail_el_ptr(rb)->port_id];
- *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
-
- pthread_mutex_unlock(rb->lock);
-
- return ret;
-}
-
-static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb,
- const struct timespec * timeout)
-{
- struct timespec abstime;
- int ret = 0;
-
- assert(rb);
-
- if (timeout != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeout, &abstime);
- }
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) rb->lock);
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- while (shm_rbuff_empty(rb)) {
- if (timeout != NULL)
- ret = pthread_cond_timedwait(rb->add,
- rb->lock,
- &abstime);
- else
- ret = pthread_cond_wait(rb->add, rb->lock);
-#ifndef __APPLE__
- if (ret == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (ret == ETIMEDOUT)
- break;
- }
-
- if (ret != ETIMEDOUT)
- ret = tail_el_ptr(rb)->port_id;
- else
- ret = -ETIMEDOUT;
-
- pthread_cleanup_pop(true);
-
- return ret;
-}
-
-int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
- bool * set,
- const struct timespec * timeout)
-{
- struct timespec abstime;
- int ret;
-
- assert(rb);
-
- if (set == NULL)
- return shm_ap_rbuff_peek_b_all(rb, timeout);
-
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (timeout != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeout, &abstime);
- }
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) rb->lock);
-
- while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id])
- && (ret != ETIMEDOUT)) {
- while (shm_rbuff_empty(rb)) {
- if (timeout != NULL)
- ret = pthread_cond_timedwait(rb->add,
- rb->lock,
- &abstime);
- else
- ret = pthread_cond_wait(rb->add, rb->lock);
-
-#ifndef __APPLE__
- if (ret == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (ret == ETIMEDOUT)
- break;
- }
-
- while (!set[tail_el_ptr(rb)->port_id]) {
- if (timeout != NULL)
- ret = pthread_cond_timedwait(rb->del,
- rb->lock,
- &abstime);
- else
- ret = pthread_cond_wait(rb->del, rb->lock);
-
-#ifndef __APPLE__
- if (ret == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (ret == ETIMEDOUT)
- break;
- }
- }
-
- if (ret != ETIMEDOUT)
- ret = tail_el_ptr(rb)->port_id;
- else
- ret = -ETIMEDOUT;
-
- pthread_cleanup_pop(true);
-
- return ret;
-}
-
-
-struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
-{
- struct rb_entry * e = NULL;
-
- assert(rb);
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) rb->lock);
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- while (shm_rbuff_empty(rb))
-#ifdef __APPLE__
- pthread_cond_wait(rb->add, rb->lock);
-#else
- if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- e = malloc(sizeof(*e));
- if (e != NULL) {
- *e = *(rb->shm_base + *rb->tail);
- --rb->cntrs[e->port_id];
- *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
- }
-
- pthread_cleanup_pop(true);
-
- return e;
-}
-
-ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
-{
- ssize_t idx = -1;
-
- assert(rb);
-
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) {
- pthread_mutex_unlock(rb->lock);
- return -1;
- }
-
- idx = tail_el_ptr(rb)->index;
- --rb->cntrs[port_id];
- *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
-
- pthread_cond_broadcast(rb->del);
- pthread_mutex_unlock(rb->lock);
-
- return idx;
-}
-
-ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
- int port_id,
- const struct timespec * timeout)
-{
- struct timespec abstime;
- int ret = 0;
- ssize_t idx = -1;
-
- assert(rb);
-
-#ifdef __APPLE__
- pthread_mutex_lock(rb->lock);
-#else
- if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (timeout != NULL) {
- idx = -ETIMEDOUT;
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeout, &abstime);
- }
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) rb->lock);
-
- while ((shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id)
- && (ret != ETIMEDOUT)) {
- while (shm_rbuff_empty(rb)) {
- if (timeout != NULL)
- ret = pthread_cond_timedwait(rb->add,
- rb->lock,
- &abstime);
- else
- ret = pthread_cond_wait(rb->add, rb->lock);
-#ifndef __APPLE__
- if (ret == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (ret == ETIMEDOUT)
- break;
- }
-
- while (tail_el_ptr(rb)->port_id != port_id) {
- if (timeout != NULL)
- ret = pthread_cond_timedwait(rb->del,
- rb->lock,
- &abstime);
- else
- ret = pthread_cond_wait(rb->del, rb->lock);
-#ifndef __APPLE__
- if (ret == EOWNERDEAD) {
- LOG_DBG("Recovering dead mutex.");
- pthread_mutex_consistent(rb->lock);
- }
-#endif
- if (ret == ETIMEDOUT)
- break;
- }
- }
-
- if (ret != ETIMEDOUT) {
- idx = tail_el_ptr(rb)->index;
- --rb->cntrs[port_id];
- *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
-
- pthread_cond_broadcast(rb->del);
- }
-
- pthread_cleanup_pop(true);
-
- return idx;
-}
-
-void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb)
-{
- assert(rb);
-
- pthread_mutex_lock(rb->lock);
- *rb->tail = 0;
- *rb->head = 0;
- pthread_mutex_unlock(rb->lock);
-}
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
new file mode 100644
index 00000000..c960bd25
--- /dev/null
+++ b/src/lib/shm_flow_set.c
@@ -0,0 +1,408 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Management of flow_sets for fqueue
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/config.h>
+#include <ouroboros/lockfile.h>
+#include <ouroboros/time_utils.h>
+#include <ouroboros/shm_flow_set.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/errno.h>
+
+#define OUROBOROS_PREFIX "shm_flow_set"
+
+#include <ouroboros/logs.h>
+
+#include <pthread.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <string.h>
+#include <assert.h>
+
+#define FN_MAX_CHARS 255
+
+#define FQUEUESIZE (SHM_BUFFER_SIZE * sizeof(int))
+
+#define SHM_FLOW_SET_FILE_SIZE (IRMD_MAX_FLOWS * sizeof(ssize_t) \
+ + AP_MAX_FQUEUES * sizeof(size_t) \
+ + AP_MAX_FQUEUES * sizeof(pthread_cond_t) \
+ + AP_MAX_FQUEUES * FQUEUESIZE \
+ + sizeof(pthread_mutex_t))
+
+#define fqueue_ptr(fs, idx) (fs->fqueues + SHM_BUFFER_SIZE * idx)
+
+struct shm_flow_set {
+ ssize_t * mtable;
+ size_t * heads;
+ pthread_cond_t * conds;
+ int * fqueues;
+ pthread_mutex_t * lock;
+
+ pid_t api;
+};
+
+struct shm_flow_set * shm_flow_set_create()
+{
+ struct shm_flow_set * set;
+ ssize_t * shm_base;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ char fn[FN_MAX_CHARS];
+ mode_t mask;
+ int shm_fd;
+ int i;
+
+ sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid());
+
+ set = malloc(sizeof(*set));
+ if (set == NULL) {
+ LOG_DBG("Could not allocate struct.");
+ return NULL;
+ }
+
+ mask = umask(0);
+
+ shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBG("Failed creating flag file.");
+ free(set);
+ return NULL;
+ }
+
+ umask(mask);
+
+ if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) {
+ LOG_DBG("Failed to extend flag file.");
+ free(set);
+ close(shm_fd);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_FLOW_SET_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ close(shm_fd);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBG("Failed to map shared memory.");
+ if (shm_unlink(fn) == -1)
+ LOG_DBG("Failed to remove invalid shm.");
+
+ free(set);
+ return NULL;
+ }
+
+ set->mtable = shm_base;
+ set->heads = (size_t *) (set->mtable + IRMD_MAX_FLOWS);
+ set->conds = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES);
+ set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES);
+ set->lock = (pthread_mutex_t *)
+ (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE);
+
+ pthread_mutexattr_init(&mattr);
+#ifndef __APPLE__
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+#endif
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(set->lock, &mattr);
+
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ for (i = 0; i < AP_MAX_FQUEUES; ++i) {
+ set->heads[i] = 0;
+ pthread_cond_init(&set->conds[i], &cattr);
+ }
+
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i)
+ set->mtable[i] = -1;
+
+ set->api = getpid();
+
+ return set;
+}
+
+struct shm_flow_set * shm_flow_set_open(pid_t api)
+{
+ struct shm_flow_set * set;
+ ssize_t * shm_base;
+ char fn[FN_MAX_CHARS];
+ int shm_fd;
+
+ sprintf(fn, SHM_FLOW_SET_PREFIX "%d", api);
+
+ set = malloc(sizeof(*set));
+ if (set == NULL) {
+ LOG_DBG("Could not allocate struct.");
+ return NULL;
+ }
+
+ shm_fd = shm_open(fn, O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBG("%d failed opening shared memory %s.", getpid(), fn);
+ free(set);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_FLOW_SET_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ close(shm_fd);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBG("Failed to map shared memory.");
+ if (shm_unlink(fn) == -1)
+ LOG_DBG("Failed to remove invalid shm.");
+ free(set);
+ return NULL;
+ }
+
+ set->mtable = shm_base;
+ set->heads = (size_t *) (set->mtable + IRMD_MAX_FLOWS);
+ set->conds = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES);
+ set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES);
+ set->lock = (pthread_mutex_t *)
+ (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE);
+
+ set->api = api;
+
+ return set;
+}
+
+void shm_flow_set_destroy(struct shm_flow_set * set)
+{
+ char fn[25];
+ struct lockfile * lf = NULL;
+
+ assert(set);
+
+ if (set->api != getpid()) {
+ lf = lockfile_open();
+ if (lf == NULL) {
+ LOG_ERR("Failed to open lockfile.");
+ return;
+ }
+
+ if (lockfile_owner(lf) == getpid()) {
+ LOG_DBG("Flow set %d destroyed by IRMd %d.",
+ set->api, getpid());
+ lockfile_close(lf);
+ } else {
+ LOG_ERR("AP-I %d tried to destroy flowset owned by %d.",
+ getpid(), set->api);
+ lockfile_close(lf);
+ return;
+ }
+ }
+
+ sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->api);
+
+ if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1)
+ LOG_DBG("Couldn't unmap shared memory.");
+
+ if (shm_unlink(fn) == -1)
+ LOG_DBG("Failed to unlink shm.");
+
+ free(set);
+}
+
+void shm_flow_set_close(struct shm_flow_set * set)
+{
+ assert(set);
+
+ if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1)
+ LOG_DBG("Couldn't unmap shared memory.");
+
+ free(set);
+}
+
+void shm_flow_set_zero(struct shm_flow_set * shm_set,
+ ssize_t idx)
+{
+ ssize_t i = 0;
+
+ assert(!(idx < 0) && idx < AP_MAX_FQUEUES);
+
+ pthread_mutex_lock(shm_set->lock);
+
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i)
+ if (shm_set->mtable[i] == idx)
+ shm_set->mtable[i] = -1;
+
+ shm_set->heads[idx] = 0;
+
+ pthread_mutex_unlock(shm_set->lock);
+}
+
+
+int shm_flow_set_add(struct shm_flow_set * shm_set,
+ ssize_t idx,
+ int port_id)
+{
+ assert(shm_set);
+ assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS);
+ assert(!(idx < 0) && idx < AP_MAX_FQUEUES);
+
+ pthread_mutex_lock(shm_set->lock);
+
+ if (shm_set->mtable[port_id] != -1) {
+ pthread_mutex_unlock(shm_set->lock);
+ return -EPERM;
+ }
+
+ shm_set->mtable[port_id] = idx;
+
+ pthread_mutex_unlock(shm_set->lock);
+
+ return 0;
+}
+
+void shm_flow_set_del(struct shm_flow_set * shm_set,
+ ssize_t idx,
+ int port_id)
+{
+ assert(shm_set);
+ assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS);
+ assert(!(idx < 0) && idx < AP_MAX_FQUEUES);
+
+ pthread_mutex_lock(shm_set->lock);
+
+ if (shm_set->mtable[port_id] == idx)
+ shm_set->mtable[port_id] = -1;
+
+ pthread_mutex_unlock(shm_set->lock);
+}
+
+int shm_flow_set_has(struct shm_flow_set * shm_set,
+ ssize_t idx,
+ int port_id)
+{
+ int ret = 0;
+
+ assert(shm_set);
+ assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS);
+ assert(!(idx < 0) && idx < AP_MAX_FQUEUES);
+
+
+ pthread_mutex_lock(shm_set->lock);
+
+ if (shm_set->mtable[port_id] == idx)
+ ret = 1;
+
+ pthread_mutex_unlock(shm_set->lock);
+
+ return ret;
+}
+
+void shm_flow_set_notify(struct shm_flow_set * shm_set, int port_id)
+{
+ assert(shm_set);
+ assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS);
+
+ pthread_mutex_lock(shm_set->lock);
+
+ if (shm_set->mtable[port_id] == -1) {
+ pthread_mutex_unlock(shm_set->lock);
+ return;
+ }
+
+ *(fqueue_ptr(shm_set, shm_set->mtable[port_id]) +
+ (shm_set->heads[shm_set->mtable[port_id]])++) = port_id;
+
+ pthread_cond_signal(&shm_set->conds[shm_set->mtable[port_id]]);
+
+ pthread_mutex_unlock(shm_set->lock);
+}
+
+
+int shm_flow_set_wait(const struct shm_flow_set * shm_set,
+ ssize_t idx,
+ int * fqueue,
+ const struct timespec * timeout)
+{
+ int ret = 0;
+ struct timespec abstime;
+
+ assert(shm_set);
+ assert(!(idx < 0) && idx < AP_MAX_FQUEUES);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(shm_set->lock);
+#else
+ if (pthread_mutex_lock(shm_set->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(shm_set->lock);
+ }
+#endif
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) shm_set->lock);
+
+ while (shm_set->heads[idx] == 0 && ret != -ETIMEDOUT) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(shm_set->conds + idx,
+ shm_set->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(shm_set->conds + idx,
+ shm_set->lock);
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(shm_set->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT) {
+ ret = -ETIMEDOUT;
+ break;
+ }
+ }
+
+ if (ret != -ETIMEDOUT) {
+ memcpy(fqueue,
+ fqueue_ptr(shm_set, idx),
+ shm_set->heads[idx] * sizeof(int));
+ ret = shm_set->heads[idx];
+ shm_set->heads[idx] = 0;
+ }
+
+ pthread_cleanup_pop(true);
+
+ return ret;
+}
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c
new file mode 100644
index 00000000..cf094488
--- /dev/null
+++ b/src/lib/shm_rbuff.c
@@ -0,0 +1,424 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Ring buffer for incoming SDUs
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/config.h>
+#include <ouroboros/shm_rbuff.h>
+#include <ouroboros/lockfile.h>
+#include <ouroboros/time_utils.h>
+#include <ouroboros/errno.h>
+
+#define OUROBOROS_PREFIX "shm_rbuff"
+
+#include <ouroboros/logs.h>
+
+#include <pthread.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <assert.h>
+#include <stdbool.h>
+
+#define FN_MAX_CHARS 255
+
+#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(ssize_t) \
+ + 2 * sizeof(size_t) + sizeof(int8_t) \
+ + sizeof(pthread_mutex_t) \
+ + 2 * sizeof (pthread_cond_t))
+
+#define shm_rbuff_used(rb) ((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \
+ & (SHM_BUFFER_SIZE - 1))
+#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE)
+#define shm_rbuff_empty(rb) (*rb->head == *rb->tail)
+#define head_el_ptr(rb) (rb->shm_base + *rb->head)
+#define tail_el_ptr(rb) (rb->shm_base + *rb->tail)
+
+struct shm_rbuff {
+ ssize_t * shm_base; /* start of entry */
+ size_t * head; /* start of ringbuffer head */
+ size_t * tail; /* start of ringbuffer tail */
+ int8_t * acl; /* access control */
+ pthread_mutex_t * lock; /* lock all free space in shm */
+ pthread_cond_t * add; /* SDU arrived */
+ pthread_cond_t * del; /* SDU removed */
+ pid_t api; /* api of the owner */
+ int port_id; /* port_id of the flow */
+};
+
+struct shm_rbuff * shm_rbuff_create(int port_id)
+{
+ struct shm_rbuff * rb;
+ int shm_fd;
+ ssize_t * shm_base;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ char fn[FN_MAX_CHARS];
+ mode_t mask;
+
+ sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", getpid(), port_id);
+
+ rb = malloc(sizeof(*rb));
+ if (rb == NULL) {
+ LOG_DBG("Could not allocate struct.");
+ return NULL;
+ }
+
+ mask = umask(0);
+
+ shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBG("Failed creating ring buffer.");
+ free(rb);
+ return NULL;
+ }
+
+ umask(mask);
+
+ if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) {
+ LOG_DBG("Failed to extend ringbuffer.");
+ free(rb);
+ close(shm_fd);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_RBUFF_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ close(shm_fd);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBG("Failed to map shared memory.");
+ if (shm_unlink(fn) == -1)
+ LOG_DBG("Failed to remove invalid shm.");
+ free(rb);
+ return NULL;
+ }
+
+ rb->shm_base = shm_base;
+ rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
+ rb->tail = rb->head + 1;
+ rb->acl = (int8_t *) (rb->tail + 1);
+ rb->lock = (pthread_mutex_t *) (rb->acl + 1);
+ rb->add = (pthread_cond_t *) (rb->lock + 1);
+ rb->del = rb->add + 1;
+
+ pthread_mutexattr_init(&mattr);
+#ifndef __APPLE__
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+#endif
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(rb->lock, &mattr);
+
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ pthread_cond_init(rb->add, &cattr);
+ pthread_cond_init(rb->del, &cattr);
+
+ *rb->acl = 0;
+ *rb->head = 0;
+ *rb->tail = 0;
+
+ rb->api = getpid();
+ rb->port_id = port_id;
+
+ return rb;
+}
+
+struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id)
+{
+ struct shm_rbuff * rb;
+ int shm_fd;
+ ssize_t * shm_base;
+ char fn[FN_MAX_CHARS];
+
+ sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", api, port_id);
+
+ rb = malloc(sizeof(*rb));
+ if (rb == NULL) {
+ LOG_DBG("Could not allocate struct.");
+ return NULL;
+ }
+
+ shm_fd = shm_open(fn, O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBG("%d failed opening shared memory %s.", getpid(), fn);
+ free(rb);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_RBUFF_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ close(shm_fd);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBG("Failed to map shared memory.");
+ if (shm_unlink(fn) == -1)
+ LOG_DBG("Failed to remove invalid shm.");
+
+ free(rb);
+ return NULL;
+ }
+
+ rb->shm_base = shm_base;
+ rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
+ rb->tail = rb->head + 1;
+ rb->acl = (int8_t *) (rb->tail + 1);
+ rb->lock = (pthread_mutex_t *) (rb->acl + 1);
+ rb->add = (pthread_cond_t *) (rb->lock + 1);
+ rb->del = rb->add + 1;
+
+ rb->api = api;
+ rb->port_id = port_id;
+
+ return rb;
+}
+
+void shm_rbuff_close(struct shm_rbuff * rb)
+{
+ assert(rb);
+
+ if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
+ LOG_DBG("Couldn't unmap shared memory.");
+
+ free(rb);
+}
+
+void shm_rbuff_destroy(struct shm_rbuff * rb)
+{
+ char fn[25];
+ struct lockfile * lf = NULL;
+
+ assert(rb);
+
+ if (rb->api != getpid()) {
+ lf = lockfile_open();
+ if (lf == NULL) {
+ LOG_ERR("Failed to open lockfile.");
+ return;
+ }
+
+ if (lockfile_owner(lf) == getpid()) {
+ LOG_DBG("Ringbuffer %d destroyed by IRMd %d.",
+ rb->api, getpid());
+ lockfile_close(lf);
+ } else {
+ LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.",
+ getpid(), rb->api);
+ lockfile_close(lf);
+ return;
+ }
+ }
+
+ sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id);
+
+ if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
+ LOG_DBG("Couldn't unmap shared memory.");
+
+ if (shm_unlink(fn) == -1)
+ LOG_DBG("Failed to unlink shm.");
+
+ free(rb);
+}
+
+int shm_rbuff_write(struct shm_rbuff * rb, ssize_t idx)
+{
+ assert(rb);
+ assert(idx >= 0);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (*rb->acl) {
+ pthread_mutex_unlock(rb->lock);
+ return -ENOTALLOC;
+ }
+
+ if (!shm_rbuff_free(rb)) {
+ pthread_mutex_unlock(rb->lock);
+ return -1;
+ }
+
+ if (shm_rbuff_empty(rb))
+ pthread_cond_broadcast(rb->add);
+
+ *head_el_ptr(rb) = idx;
+ *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1);
+
+ pthread_mutex_unlock(rb->lock);
+
+ return 0;
+}
+
+ssize_t shm_rbuff_read(struct shm_rbuff * rb)
+{
+ int ret = 0;
+
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (shm_rbuff_empty(rb)) {
+ pthread_mutex_unlock(rb->lock);
+ return -1;
+ }
+
+ ret = *tail_el_ptr(rb);
+ *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
+
+ pthread_mutex_unlock(rb->lock);
+
+ return ret;
+}
+
+ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret = 0;
+ ssize_t idx = -1;
+
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (timeout != NULL) {
+ idx = -ETIMEDOUT;
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while (shm_rbuff_empty(rb) && (ret != ETIMEDOUT)) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->add,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->add, rb->lock);
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT) {
+ idx = -ETIMEDOUT;
+ break;
+ }
+ }
+
+ if (idx != -ETIMEDOUT) {
+ idx = *tail_el_ptr(rb);
+ *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
+ pthread_cond_broadcast(rb->del);
+ }
+
+ pthread_cleanup_pop(true);
+
+ return idx;
+}
+
+int shm_rbuff_block(struct shm_rbuff * rb)
+{
+ int ret = 0;
+
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ *rb->acl = -1;
+
+ if (!shm_rbuff_empty(rb))
+ ret = -EBUSY;
+
+ pthread_mutex_unlock(rb->lock);
+
+ return ret;
+}
+
+void shm_rbuff_unblock(struct shm_rbuff * rb)
+{
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ *rb->acl = 0; /* open */
+
+ pthread_mutex_unlock(rb->lock);
+}
+
+void shm_rbuff_reset(struct shm_rbuff * rb)
+{
+ assert(rb);
+
+ pthread_mutex_lock(rb->lock);
+ *rb->tail = 0;
+ *rb->head = 0;
+ pthread_mutex_unlock(rb->lock);
+}
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index f6683dc2..e5a37577 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -90,7 +90,6 @@ struct shm_rdrbuff {
pthread_cond_t * full; /* run sanitizer when buffer full */
pid_t * api; /* api of the irmd owner */
enum qos_cube qos; /* qos id which this buffer serves */
- int fd;
};
static void garbage_collect(struct shm_rdrbuff * rdrb)
@@ -189,17 +188,11 @@ struct shm_rdrbuff * shm_rdrbuff_create()
if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) {
LOG_DBGF("Failed to extend shared memory map.");
free(shm_rdrb_fn);
+ close(shm_fd);
free(rdrb);
return NULL;
}
-#ifndef __APPLE
- if (write(shm_fd, "", 1) != 1) {
- LOG_DBGF("Failed to finalise extension of shared memory map.");
- free(shm_rdrb_fn);
- free(rdrb);
- return NULL;
- }
-#endif
+
shm_base = mmap(NULL,
SHM_FILE_SIZE,
PROT_READ | PROT_WRITE,
@@ -207,6 +200,8 @@ struct shm_rdrbuff * shm_rdrbuff_create()
shm_fd,
0);
+ close(shm_fd);
+
if (shm_base == MAP_FAILED) {
LOG_DBGF("Failed to map shared memory.");
if (shm_unlink(shm_rdrb_fn) == -1)
@@ -235,6 +230,9 @@ struct shm_rdrbuff * shm_rdrbuff_create()
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
pthread_cond_init(rdrb->full, &cattr);
pthread_cond_init(rdrb->healthy, &cattr);
@@ -246,7 +244,6 @@ struct shm_rdrbuff * shm_rdrbuff_create()
*rdrb->api = getpid();
rdrb->qos = qos;
- rdrb->fd = shm_fd;
free(shm_rdrb_fn);
@@ -287,10 +284,11 @@ struct shm_rdrbuff * shm_rdrbuff_open()
MAP_SHARED,
shm_fd,
0);
+
+ close(shm_fd);
+
if (shm_base == MAP_FAILED) {
LOG_DBGF("Failed to map shared memory.");
- if (close(shm_fd) == -1)
- LOG_DBG("Failed to close invalid shm.");
if (shm_unlink(shm_rdrb_fn) == -1)
LOG_DBG("Failed to unlink invalid shm.");
free(shm_rdrb_fn);
@@ -309,7 +307,6 @@ struct shm_rdrbuff * shm_rdrbuff_open()
rdrb->api = (pid_t *) (rdrb->full + 1);
rdrb->qos = qos;
- rdrb->fd = shm_fd;
free(shm_rdrb_fn);
@@ -400,9 +397,6 @@ void shm_rdrbuff_close(struct shm_rdrbuff * rdrb)
{
assert(rdrb);
- if (close(rdrb->fd) < 0)
- LOG_DBGF("Couldn't close shared memory.");
-
if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)
LOG_DBGF("Couldn't unmap shared memory.");
@@ -420,9 +414,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)
return;
}
- if (close(rdrb->fd) < 0)
- LOG_DBG("Couldn't close shared memory.");
-
if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)
LOG_DBG("Couldn't unmap shared memory.");