summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-08-03 13:40:16 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-08-03 13:40:16 +0200
commit4931526cf9b5e40294e043deab856f25bf56c7cf (patch)
treefff3eeadb6eb04edee21340ecdcdfc13da3115b4
parentca494922f3815077efbcd28da3748df38c8a6961 (diff)
downloadouroboros-4931526cf9b5e40294e043deab856f25bf56c7cf.tar.gz
ouroboros-4931526cf9b5e40294e043deab856f25bf56c7cf.zip
lib: Revise blocking I/O
Blocking I/O now uses condition variables in the shared memory instead of busy waiting. Timeouts can be specified. This requires the size of the rbuffs and du_map to be the same, to guarantee that when the shm_du_map is not full, the ap_rbuffs can't be full either. Added the timeout option to the flow for future use.
-rw-r--r--include/ouroboros/config.h.in3
-rw-r--r--include/ouroboros/shm_ap_rbuff.h3
-rw-r--r--include/ouroboros/shm_du_map.h6
-rw-r--r--src/ipcpd/shim-eth-llc/main.c18
-rw-r--r--src/lib/dev.c56
-rw-r--r--src/lib/shm_ap_rbuff.c126
-rw-r--r--src/lib/shm_du_map.c210
7 files changed, 291 insertions, 131 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 8898699c..46685569 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -40,12 +40,11 @@
#define SHM_DU_MAP_MULTI_BLOCK
#define SHM_DU_MAP_FILENAME "/ouroboros.shm"
#define LOCKFILE_NAME "/ouroboros.lockfile"
-#define SHM_BLOCKS_IN_MAP (1 << 14)
+#define SHM_BUFFER_SIZE (1 << 14)
#define SHM_DU_TIMEOUT_MICROS 15000
#define DU_BUFF_HEADSPACE 128
#define DU_BUFF_TAILSPACE 0
#define SHM_AP_RBUFF_PREFIX "/ouroboros.rbuff."
-#define SHM_RBUFF_SIZE (1 << 14)
#define IRMD_MAX_FLOWS 4096
#define IRMD_THREADPOOL_SIZE 5
#define IRMD_FLOW_TIMEOUT 5000 /* ms */
diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h
index 257a289d..50b077f0 100644
--- a/include/ouroboros/shm_ap_rbuff.h
+++ b/include/ouroboros/shm_ap_rbuff.h
@@ -46,6 +46,9 @@ int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb,
const struct timespec * timeout);
ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb,
int port_id);
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
+ const struct timespec * timeout);
pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff * rb);
void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb);
#endif /* OUROBOROS_SHM_AP_RBUFF_H */
diff --git a/include/ouroboros/shm_du_map.h b/include/ouroboros/shm_du_map.h
index b9c56cf8..98013fc9 100644
--- a/include/ouroboros/shm_du_map.h
+++ b/include/ouroboros/shm_du_map.h
@@ -45,6 +45,12 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
size_t tailspace,
uint8_t * data,
size_t data_len);
+ssize_t shm_du_map_write_b(struct shm_du_map * dum,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t data_len);
int shm_du_map_read(uint8_t ** dst,
struct shm_du_map * dum,
ssize_t idx);
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 9e315335..f98799a5 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -386,7 +386,7 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE],
shim_data(_ipcp)->tx_offset =
(shim_data(_ipcp)->tx_offset + 1)
- & (SHM_BLOCKS_IN_MAP -1);
+ & (SHM_BUFFER_SIZE -1);
#else
device = (shim_data(_ipcp))->device;
@@ -719,7 +719,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
MAC_SIZE) &&
memcmp(br_addr, &llc_frame->dst_hwaddr, MAC_SIZE)) {
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
- offset = (offset + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ offset = (offset + 1) & (SHM_BUFFER_SIZE - 1);
header->tp_status = TP_STATUS_KERNEL;
#endif
continue;
@@ -730,7 +730,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
if (ntohs(length) > SHIM_ETH_LLC_MAX_SDU_SIZE) {
/* Not an LLC packet. */
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
- offset = (offset + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ offset = (offset + 1) & (SHM_BUFFER_SIZE - 1);
header->tp_status = TP_STATUS_KERNEL;
#endif
continue;
@@ -758,7 +758,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
pthread_rwlock_unlock(&_ipcp->state_lock);
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
offset = (offset + 1)
- & (SHM_BLOCKS_IN_MAP - 1);
+ & (SHM_BUFFER_SIZE - 1);
header->tp_status = TP_STATUS_KERNEL;
#endif
continue;
@@ -784,7 +784,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
pthread_rwlock_unlock(&_ipcp->state_lock);
}
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
- offset = (offset + 1) & (SHM_BLOCKS_IN_MAP -1);
+ offset = (offset + 1) & (SHM_BUFFER_SIZE -1);
header->tp_status = TP_STATUS_KERNEL;
#endif
}
@@ -1009,8 +1009,8 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
req.tp_block_size = SHM_DU_BUFF_BLOCK_SIZE;
req.tp_frame_size = SHM_DU_BUFF_BLOCK_SIZE;
- req.tp_block_nr = SHM_BLOCKS_IN_MAP;
- req.tp_frame_nr = SHM_BLOCKS_IN_MAP;
+ req.tp_block_nr = SHM_BUFFER_SIZE;
+ req.tp_frame_nr = SHM_BUFFER_SIZE;
if (setsockopt(fd, SOL_PACKET, PACKET_RX_RING,
(void *) &req, sizeof(req))) {
@@ -1036,7 +1036,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
shim_data(_ipcp)->rx_ring = mmap(NULL,
2 * SHM_DU_BUFF_BLOCK_SIZE
- * SHM_BLOCKS_IN_MAP,
+ * SHM_BUFFER_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED,
fd, 0);
if (shim_data(_ipcp)->rx_ring == NULL) {
@@ -1045,7 +1045,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
return -1;
}
shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring
- + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BLOCKS_IN_MAP);
+ + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BUFFER_SIZE);
#endif
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 22e77169..ce919263 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -41,6 +41,8 @@ struct flow {
int oflags;
pid_t api;
+
+ struct timespec * timeout;
};
struct ap_data {
@@ -93,7 +95,9 @@ int ap_init(char * ap_name)
for (i = 0; i < AP_MAX_FLOWS; ++i) {
_ap_instance->flows[i].rb = NULL;
_ap_instance->flows[i].port_id = -1;
+ _ap_instance->flows[i].oflags = 0;
_ap_instance->flows[i].api = -1;
+ _ap_instance->flows[i].timeout = NULL;
}
pthread_rwlock_init(&_ap_instance->flows_lock, NULL);
@@ -127,6 +131,9 @@ void ap_fini(void)
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_destroy(&_ap_instance->flows_lock);
+ pthread_rwlock_destroy(&_ap_instance->data_lock);
+
free(_ap_instance);
}
@@ -458,7 +465,7 @@ int flow_cntl(int fd, int cmd, int oflags)
ssize_t flow_write(int fd, void * buf, size_t count)
{
- ssize_t index;
+ ssize_t idx;
struct rb_entry e;
if (buf == NULL)
@@ -477,37 +484,35 @@ ssize_t flow_write(int fd, void * buf, size_t count)
}
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- index = shm_du_map_write(_ap_instance->dum,
- _ap_instance->flows[fd].api,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- (uint8_t *) buf,
- count);
- if (index == -1) {
+ idx = shm_du_map_write(_ap_instance->dum,
+ _ap_instance->flows[fd].api,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ (uint8_t *) buf,
+ count);
+ if (idx == -1) {
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
return -EAGAIN;
}
- e.index = index;
+ e.index = idx;
e.port_id = _ap_instance->flows[fd].port_id;
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
- shm_du_map_remove(_ap_instance->dum, index);
+ shm_du_map_remove(_ap_instance->dum, idx);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
return -1;
}
} else { /* blocking */
- while ((index = shm_du_map_write(_ap_instance->dum,
- _ap_instance->flows[fd].api,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- (uint8_t *) buf,
- count)) < 0)
- ;
-
- e.index = index;
+ idx = shm_du_map_write_b(_ap_instance->dum,
+ _ap_instance->flows[fd].api,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ (uint8_t *) buf,
+ count);
+ e.index = idx;
e.port_id = _ap_instance->flows[fd].port_id;
while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0)
@@ -546,16 +551,13 @@ ssize_t flow_read(int fd, void * buf, size_t count)
return -ENOTALLOC;
}
- if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
+ if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK)
idx = shm_ap_rbuff_read_port(_ap_instance->rb,
_ap_instance->flows[fd].port_id);
- } else { /* block */
- while ((idx =
- shm_ap_rbuff_read_port(_ap_instance->rb,
- _ap_instance->
- flows[fd].port_id)) < 0)
- ;
- }
+ else
+ idx = shm_ap_rbuff_read_port_b(_ap_instance->rb,
+ _ap_instance->flows[fd].port_id,
+ _ap_instance->flows[fd].timeout);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index be4cd0c2..56555533 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -42,23 +42,27 @@
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
-#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \
+#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
- + sizeof (pthread_cond_t))
+ + 2 * sizeof (pthread_cond_t))
-#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail) \
- & (SHM_RBUFF_SIZE - 1))
-#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE)
+#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_BUFFER_SIZE - *rb->ptr_tail) \
+ & (SHM_BUFFER_SIZE - 1))
+#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE)
#define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail)
-#define head_el_ptr (rb->shm_base + *rb->ptr_head)
-#define tail_el_ptr (rb->shm_base + *rb->ptr_tail)
+#define head_el_ptr(rb) (rb->shm_base + *rb->ptr_head)
+#define tail_el_ptr(rb) (rb->shm_base + *rb->ptr_tail)
+#define clean_sdus(rb) \
+ while (!shm_rbuff_empty(rb) && tail_el_ptr(rb)->port_id < 0) \
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); \
struct shm_ap_rbuff {
struct rb_entry * shm_base; /* start of entry */
size_t * ptr_head; /* start of ringbuffer head */
size_t * ptr_tail; /* start of ringbuffer tail */
pthread_mutex_t * lock; /* lock all free space in shm */
- pthread_cond_t * work; /* threads will wait for a signal */
+ pthread_cond_t * add; /* SDU arrived */
+ pthread_cond_t * del; /* SDU removed */
pid_t api; /* api to which this rb belongs */
int fd;
};
@@ -125,10 +129,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
}
rb->shm_base = shm_base;
- rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+ rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
rb->ptr_tail = rb->ptr_head + 1;
rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1);
- rb->work = (pthread_cond_t *) (rb->lock + 1);
+ rb->add = (pthread_cond_t *) (rb->lock + 1);
+ rb->del = rb->add + 1;
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
@@ -138,7 +143,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
- pthread_cond_init(rb->work, &cattr);
+ pthread_cond_init(rb->add, &cattr);
+ pthread_cond_init(rb->del, &cattr);
*rb->ptr_head = 0;
*rb->ptr_tail = 0;
@@ -190,10 +196,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
}
rb->shm_base = shm_base;
- rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+ rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
rb->ptr_tail = rb->ptr_head + 1;
rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1);
- rb->work = (pthread_cond_t *) (rb->lock + 1);
+ rb->add = (pthread_cond_t *) (rb->lock + 1);
+ rb->del = rb->add + 1;
rb->fd = shm_fd;
rb->api = api;
@@ -243,7 +250,8 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
}
pthread_mutex_destroy(rb->lock);
- pthread_cond_destroy(rb->work);
+ pthread_cond_destroy(rb->add);
+ pthread_cond_destroy(rb->del);
if (close(rb->fd) < 0)
LOG_DBG("Couldn't close shared memory.");
@@ -275,10 +283,10 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
}
if (shm_rbuff_empty(rb))
- pthread_cond_broadcast(rb->work);
+ pthread_cond_broadcast(rb->add);
- *head_el_ptr = *e;
- *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1);
+ *head_el_ptr(rb) = *e;
+ *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1);
pthread_mutex_unlock(rb->lock);
@@ -307,13 +315,17 @@ int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb,
pthread_mutex_consistent(rb->lock);
}
+ clean_sdus(rb);
+
while (shm_rbuff_empty(rb)) {
if (timeout != NULL)
- ret = pthread_cond_timedwait(rb->work,
+ ret = pthread_cond_timedwait(rb->add,
rb->lock,
&abstime);
else
- ret = pthread_cond_wait(rb->work, rb->lock);
+ ret = pthread_cond_wait(rb->add, rb->lock);
+
+ clean_sdus(rb);
if (ret == EOWNERDEAD) {
LOG_DBG("Recovering dead mutex.");
@@ -348,11 +360,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
pthread_mutex_consistent(rb->lock);
}
- while (!shm_rbuff_empty(rb) && tail_el_ptr->port_id < 0)
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+ clean_sdus(rb);
while (shm_rbuff_empty(rb))
- if (pthread_cond_wait(rb->work, rb->lock) == EOWNERDEAD) {
+ if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) {
LOG_DBG("Recovering dead mutex.");
pthread_mutex_consistent(rb->lock);
}
@@ -365,7 +376,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
*e = *(rb->shm_base + *rb->ptr_tail);
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
pthread_cleanup_pop(true);
@@ -381,24 +392,75 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
pthread_mutex_consistent(rb->lock);
}
- if (shm_rbuff_empty(rb)) {
+ clean_sdus(rb);
+
+ if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) {
pthread_mutex_unlock(rb->lock);
return -1;
}
- while (tail_el_ptr->port_id < 0)
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+ idx = tail_el_ptr(rb)->index;
- if (tail_el_ptr->port_id != port_id) {
- pthread_mutex_unlock(rb->lock);
- return -1;
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
+
+ pthread_cond_broadcast(rb->del);
+
+ pthread_mutex_unlock(rb->lock);
+
+ return idx;
+}
+
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret = 0;
+ ssize_t idx = -1;
+
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
}
- idx = tail_el_ptr->index;
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+ clean_sdus(rb);
- pthread_mutex_unlock(rb->lock);
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while (tail_el_ptr(rb)->port_id != port_id) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->del,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->del, rb->lock);
+
+ clean_sdus(rb);
+
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+
+ if (ret == ETIMEDOUT) {
+ pthread_mutex_unlock(rb->lock);
+ return -ret;
+ }
+ }
+
+ idx = tail_el_ptr(rb)->index;
+
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
+
+ pthread_cond_broadcast(rb->del);
+
+ pthread_cleanup_pop(true);
return idx;
}
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index b090bb74..a12ef223 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -40,7 +40,7 @@
#include <ouroboros/logs.h>
-#define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE)
+#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE)
#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \
+ sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
@@ -59,9 +59,9 @@
#define block_ptr_to_idx(dum, sdb) \
(((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE)
-#define shm_map_used(dum)((*dum->ptr_head + SHM_BLOCKS_IN_MAP - *dum->ptr_tail)\
- & (SHM_BLOCKS_IN_MAP - 1))
-#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP)
+#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\
+ & (SHM_BUFFER_SIZE - 1))
+#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE)
#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head)
@@ -79,7 +79,7 @@ struct shm_du_map {
uint8_t * shm_base; /* start of blocks */
size_t * ptr_head; /* start of ringbuffer head */
size_t * ptr_tail; /* start of ringbuffer tail */
- pthread_mutex_t * shm_mutex; /* lock all free space in shm */
+ pthread_mutex_t * lock; /* lock all free space in shm */
size_t * choked; /* stale sdu detection */
pthread_cond_t * healthy; /* du map is healthy */
pthread_cond_t * full; /* run sanitizer when buffer full */
@@ -94,12 +94,12 @@ static void garbage_collect(struct shm_du_map * dum)
while ((sdb = get_tail_ptr(dum))->dst_api == -1 &&
!shm_map_empty(dum))
*dum->ptr_tail = (*dum->ptr_tail + sdb->blocks)
- & (SHM_BLOCKS_IN_MAP - 1);
+ & (SHM_BUFFER_SIZE - 1);
#else
while (get_tail_ptr(dum)->dst_api == -1 &&
!shm_map_empty(dum))
*dum->ptr_tail =
- (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1);
#endif
}
@@ -114,9 +114,9 @@ static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit)
if (buf->dst_api == api)
buf->dst_api = -1;
#ifdef SHM_DU_MAP_MULTI_BLOCK
- idx = (idx + buf->blocks) & (SHM_BLOCKS_IN_MAP - 1);
+ idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1);
#else
- idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ idx = (idx + 1) & (SHM_BUFFER_SIZE - 1);
#endif
}
@@ -194,8 +194,8 @@ struct shm_du_map * shm_du_map_create()
dum->ptr_head = (size_t *)
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
- dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1);
+ dum->choked = (size_t *) (dum->lock + 1);
dum->healthy = (pthread_cond_t *) (dum->choked + 1);
dum->full = dum->healthy + 1;
dum->api = (pid_t *) (dum->full + 1);
@@ -203,7 +203,7 @@ struct shm_du_map * shm_du_map_create()
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
- pthread_mutex_init(dum->shm_mutex, &mattr);
+ pthread_mutex_init(dum->lock, &mattr);
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
@@ -261,8 +261,8 @@ struct shm_du_map * shm_du_map_open()
dum->ptr_head = (size_t *)
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
- dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1);
+ dum->choked = (size_t *) (dum->lock + 1);
dum->healthy = (pthread_cond_t *) (dum->choked + 1);
dum->full = dum->healthy + 1;
dum->api = (pid_t *) (dum->full + 1);
@@ -283,23 +283,23 @@ void * shm_du_map_sanitize(void * o)
if (dum == NULL)
return (void *) -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
- (void *) dum->shm_mutex);
+ (void *) dum->lock);
while (true) {
int ret = 0;
struct timespec now;
struct timespec dl;
- if (pthread_cond_wait(dum->full, dum->shm_mutex)
+ if (pthread_cond_wait(dum->full, dum->lock)
== EOWNERDEAD) {
LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
*dum->choked = 1;
@@ -321,14 +321,14 @@ void * shm_du_map_sanitize(void * o)
ts_add(&now, &intv, &dl);
while (*dum->choked) {
ret = pthread_cond_timedwait(dum->healthy,
- dum->shm_mutex,
+ dum->lock,
&dl);
if (!ret)
continue;
if (ret == EOWNERDEAD) {
LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
if (ret == ETIMEDOUT) {
@@ -429,9 +429,9 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
return -1;
}
#endif
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
#ifdef SHM_DU_MAP_MULTI_BLOCK
while (sz > 0) {
@@ -439,15 +439,15 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
++blocks;
}
- if (blocks + *dum->ptr_head > SHM_BLOCKS_IN_MAP - 1)
- padblocks = SHM_BLOCKS_IN_MAP - *dum->ptr_head;
+ if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1)
+ padblocks = SHM_BUFFER_SIZE - *dum->ptr_head;
if (!shm_map_free(dum, (blocks + padblocks))) {
#else
if (!shm_map_free(dum, 1)) {
#endif
pthread_cond_signal(dum->full);
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return -1;
}
@@ -477,11 +477,99 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
idx = *dum->ptr_head;
#ifdef SHM_DU_MAP_MULTI_BLOCK
- *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+ *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
#else
- *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);
#endif
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
+
+ return idx;
+}
+
+ssize_t shm_du_map_write_b(struct shm_du_map * dum,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t len)
+{
+ struct shm_du_buff * sdb;
+ size_t size = headspace + len + tailspace;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ long blocks = 0;
+ long padblocks = 0;
+ int sz = size + sizeof *sdb;
+#endif
+ uint8_t * write_pos;
+ ssize_t idx = -1;
+
+ if (dum == NULL || data == NULL) {
+ LOG_DBGF("Bogus input, bugging out.");
+ return -1;
+ }
+
+#ifndef SHM_DU_MAP_MULTI_BLOCK
+ if (sz > SHM_DU_BUFF_BLOCK_SIZE) {
+ LOG_DBGF("Multi-block SDU's disabled. Dropping.");
+ return -1;
+ }
+#endif
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->lock);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) dum->lock);
+
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ while (sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1)
+ padblocks = SHM_BUFFER_SIZE - *dum->ptr_head;
+
+ while (!shm_map_free(dum, (blocks + padblocks))) {
+#else
+ while (!shm_map_free(dum, 1)) {
+#endif
+ pthread_cond_signal(dum->full);
+ pthread_cond_wait(dum->healthy, dum->lock);
+ }
+
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ if (padblocks) {
+ sdb = get_head_ptr(dum);
+ sdb->size = 0;
+ sdb->blocks = padblocks;
+ sdb->dst_api = -1;
+ sdb->du_head = 0;
+ sdb->du_tail = 0;
+
+ *dum->ptr_head = 0;
+ }
+#endif
+ sdb = get_head_ptr(dum);
+ sdb->size = size;
+ sdb->dst_api = dst_api;
+ sdb->du_head = headspace;
+ sdb->du_tail = sdb->du_head + len;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ sdb->blocks = blocks;
+#endif
+ write_pos = ((uint8_t *) (sdb + 1)) + headspace;
+
+ memcpy(write_pos, data, len);
+
+ idx = *dum->ptr_head;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
+#else
+ *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);
+#endif
+ pthread_cleanup_pop(true);
return idx;
}
@@ -493,16 +581,16 @@ int shm_du_map_read(uint8_t ** dst,
size_t len = 0;
struct shm_du_buff * sdb;
- if (idx > SHM_BLOCKS_IN_MAP)
+ if (idx > SHM_BUFFER_SIZE)
return -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
if (shm_map_empty(dum)) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return -1;
}
@@ -510,30 +598,30 @@ int shm_du_map_read(uint8_t ** dst,
len = sdb->du_tail - sdb->du_head;
*dst = ((uint8_t *) (sdb + 1)) + sdb->du_head;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return len;
}
int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)
{
- if (idx > SHM_BLOCKS_IN_MAP)
+ if (idx > SHM_BUFFER_SIZE)
return -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
if (shm_map_empty(dum)) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return -1;
}
idx_to_du_buff_ptr(dum, idx)->dst_api = -1;
if (idx != *dum->ptr_tail) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return 0;
}
@@ -541,9 +629,9 @@ int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)
*dum->choked = 0;
- pthread_cond_signal(dum->healthy);
+ pthread_cond_broadcast(dum->healthy);
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return 0;
}
@@ -558,18 +646,18 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum,
if (dum == NULL)
return NULL;
- if (idx < 0 || idx > SHM_BLOCKS_IN_MAP)
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
return NULL;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
sdb = idx_to_du_buff_ptr(dum, idx);
if ((long) (sdb->du_head - size) < 0) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
LOG_DBGF("Failed to allocate PCI headspace.");
return NULL;
}
@@ -578,7 +666,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum,
buf = (uint8_t *) (sdb + 1) + sdb->du_head;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return buf;
}
@@ -593,18 +681,18 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum,
if (dum == NULL)
return NULL;
- if (idx < 0 || idx > SHM_BLOCKS_IN_MAP)
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
return NULL;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
sdb = idx_to_du_buff_ptr(dum, idx);
if (sdb->du_tail + size >= sdb->size) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
LOG_DBGF("Failed to allocate PCI tailspace.");
return NULL;
}
@@ -613,7 +701,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum,
sdb->du_tail += size;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return buf;
}
@@ -627,25 +715,25 @@ int shm_du_buff_head_release(struct shm_du_map * dum,
if (dum == NULL)
return -1;
- if (idx < 0 || idx > SHM_BLOCKS_IN_MAP)
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
return -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
sdb = idx_to_du_buff_ptr(dum, idx);
if (size > sdb->du_tail - sdb->du_head) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
LOG_DBGF("Tried to release beyond sdu boundary.");
return -EOVERFLOW;
}
sdb->du_head += size;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return 0;
}
@@ -659,25 +747,25 @@ int shm_du_buff_tail_release(struct shm_du_map * dum,
if (dum == NULL)
return -1;
- if (idx < 0 || idx > SHM_BLOCKS_IN_MAP)
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
return -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
sdb = idx_to_du_buff_ptr(dum, idx);
if (size > sdb->du_tail - sdb->du_head) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
LOG_DBGF("Tried to release beyond sdu boundary.");
return -EOVERFLOW;
}
sdb->du_tail -= size;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return 0;
}