summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri.staessens@ugent.be>2018-06-08 15:54:37 +0200
committerSander Vrijders <sander.vrijders@ugent.be>2018-06-08 16:06:52 +0200
commit1c98f0bedc1d98d468ad0d89e57207535d068844 (patch)
treedff3b95c16c44f70744d2e9f3fac56d5578ccad4
parent22be1ea9cab402a921776a59ff9667bcb5e2c299 (diff)
downloadouroboros-1c98f0bedc1d98d468ad0d89e57207535d068844.tar.gz
ouroboros-1c98f0bedc1d98d468ad0d89e57207535d068844.zip
irmd: Remove shm_sanitizer thread
This removes the sanitizer thread in the IRMd to avoid the IRMd eating the CPU when the buffer is full. The processes will clean the head PDU if there is a broken lock in the rdrbuff. Chances for a lingering tail PDU are slim. Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be> Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
-rw-r--r--include/ouroboros/shm_rdrbuff.h3
-rw-r--r--src/irmd/main.c46
-rw-r--r--src/lib/shm_rdrbuff.c130
3 files changed, 40 insertions, 139 deletions
diff --git a/include/ouroboros/shm_rdrbuff.h b/include/ouroboros/shm_rdrbuff.h
index c27ff24d..277609c5 100644
--- a/include/ouroboros/shm_rdrbuff.h
+++ b/include/ouroboros/shm_rdrbuff.h
@@ -43,9 +43,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb);
void shm_rdrbuff_purge(void);
-int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,
- struct timespec * timeo);
-
/* returns the index of the buffer in the DU map */
ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
size_t headspace,
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 574223b0..aeb43f0d 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -134,7 +134,6 @@ struct {
struct tpm * tpm; /* thread pool manager */
pthread_t irm_sanitize; /* clean up irmd resources */
- pthread_t shm_sanitize; /* keep track of rdrbuff use */
pthread_t acceptor; /* accept new commands */
} irmd;
@@ -1683,43 +1682,6 @@ void irmd_sig_handler(int sig,
}
}
-void * shm_sanitize(void * o)
-{
- struct list_head * p = NULL;
- struct timespec ts = {SHM_SAN_HOLDOFF / 1000,
- (SHM_SAN_HOLDOFF % 1000) * MILLION};
- ssize_t idx;
-
- (void) o;
-
- while (irmd_get_state() == IRMD_RUNNING) {
- if (shm_rdrbuff_wait_full(irmd.rdrb, &ts) == -ETIMEDOUT)
- continue;
-
- pthread_rwlock_wrlock(&irmd.flows_lock);
-
- list_for_each(p, &irmd.irm_flows) {
- struct irm_flow * f =
- list_entry(p, struct irm_flow, next);
- if (kill(f->n_pid, 0) < 0) {
- while ((idx = shm_rbuff_read(f->n_rb)) >= 0)
- shm_rdrbuff_remove(irmd.rdrb, idx);
- continue;
- }
-
- if (kill(f->n_1_pid, 0) < 0) {
- while ((idx = shm_rbuff_read(f->n_1_rb)) >= 0)
- shm_rdrbuff_remove(irmd.rdrb, idx);
- continue;
- }
- }
-
- pthread_rwlock_unlock(&irmd.flows_lock);
- }
-
- return (void *) 0;
-}
-
void * irm_sanitize(void * o)
{
struct timespec now;
@@ -2362,11 +2324,6 @@ int main(int argc,
goto fail_irm_sanitize;
}
- if (pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb)) {
- irmd_set_state(IRMD_NULL);
- goto fail_shm_sanitize;
- }
-
if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL)) {
irmd_set_state(IRMD_NULL);
goto fail_acceptor;
@@ -2374,7 +2331,6 @@ int main(int argc,
pthread_join(irmd.acceptor, NULL);
pthread_join(irmd.irm_sanitize, NULL);
- pthread_join(irmd.shm_sanitize, NULL);
tpm_stop(irmd.tpm);
@@ -2393,8 +2349,6 @@ int main(int argc,
exit(EXIT_SUCCESS);
fail_acceptor:
- pthread_join(irmd.shm_sanitize, NULL);
- fail_shm_sanitize:
pthread_join(irmd.irm_sanitize, NULL);
fail_irm_sanitize:
tpm_stop(irmd.tpm);
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index 1dfcb2ca..5ae2085d 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -46,29 +46,17 @@
+ sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
-#ifndef SHM_RDRB_MULTI_BLOCK
-#define WAIT_BLOCKS 1
-#else
-#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4)
-#if WAIT_BLOCKS == 0
-#undef WAIT_BLOCKS
-#define WAIT_BLOCKS 1
-#endif
-#endif
-
#define get_head_ptr(rdrb) \
- ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head \
- * SHM_RDRB_BLOCK_SIZE)))
+ idx_to_du_buff_ptr(rdrb, *rdrb->head)
#define get_tail_ptr(rdrb) \
- ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail \
- * SHM_RDRB_BLOCK_SIZE)))
+ idx_to_du_buff_ptr(rdrb, *rdrb->tail)
#define idx_to_du_buff_ptr(rdrb, idx) \
((struct shm_du_buff *) (rdrb->shm_base + idx * SHM_RDRB_BLOCK_SIZE))
#define shm_rdrb_used(rdrb) \
- ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) \
+ (((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) + 1) \
& ((SHM_BUFFER_SIZE) - 1))
#define shm_rdrb_free(rdrb, i) \
@@ -118,6 +106,13 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)
pthread_cond_broadcast(rdrb->healthy);
}
+static void sanitize(struct shm_rdrbuff * rdrb)
+{
+ get_head_ptr(rdrb)->flags = SDB_NULL;
+ garbage_collect(rdrb);
+ pthread_mutex_consistent(rdrb->lock);
+}
+
static char * rdrb_filename(void)
{
char * str;
@@ -282,51 +277,6 @@ struct shm_rdrbuff * shm_rdrbuff_open()
return rdrb_create(O_RDWR);
}
-int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,
- struct timespec * timeo)
-{
- struct timespec abstime;
-
- if (timeo != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeo, &abstime);
- }
-
-#ifndef HAVE_ROBUST_MUTEX
- pthread_mutex_lock(rdrb->lock);
-#else
- if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
-#endif
-
- while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) {
-#ifndef HAVE_ROBUST_MUTEX
- if (pthread_cond_timedwait(rdrb->full,
- rdrb->lock,
- &abstime) == ETIMEDOUT) {
- pthread_mutex_unlock(rdrb->lock);
- return -ETIMEDOUT;
- }
-#else
- int ret = pthread_cond_timedwait(rdrb->full,
- rdrb->lock,
- &abstime);
- if (ret == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
- if (ret == ETIMEDOUT) {
- pthread_mutex_unlock(rdrb->lock);
- return -ETIMEDOUT;
- }
-#endif
- }
-
- garbage_collect(rdrb);
-
- pthread_mutex_unlock(rdrb->lock);
-
- return 0;
-}
-
void shm_rdrbuff_purge(void)
{
char * shm_rdrb_fn;
@@ -358,19 +308,19 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
#ifndef SHM_RDRB_MULTI_BLOCK
if (sz > SHM_RDRB_BLOCK_SIZE)
return -EMSGSIZE;
+#else
+ while (sz > 0) {
+ sz -= SHM_RDRB_BLOCK_SIZE;
+ ++blocks;
+ }
#endif
#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
#ifdef SHM_RDRB_MULTI_BLOCK
- while (sz > 0) {
- sz -= SHM_RDRB_BLOCK_SIZE;
- ++blocks;
- }
-
if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
@@ -426,39 +376,41 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
const struct timespec * abstime)
{
struct shm_du_buff * sdb;
- size_t size = headspace + len + tailspace;
+ size_t size = headspace + len + tailspace;
#ifdef SHM_RDRB_MULTI_BLOCK
- size_t blocks = 0;
+ size_t blocks = 0;
size_t padblocks = 0;
+ size_t rblocks;
#endif
- ssize_t sz = size + sizeof(*sdb);
- int ret = 0;
+ ssize_t sz = size + sizeof(*sdb);
+ int ret = 0;
assert(rdrb);
#ifndef SHM_RDRB_MULTI_BLOCK
if (sz > SHM_RDRB_BLOCK_SIZE)
return -EMSGSIZE;
+#else
+ while (sz > 0) {
+ sz -= SHM_RDRB_BLOCK_SIZE;
+ ++blocks;
+ }
+ rblocks = blocks;
#endif
#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
(void *) rdrb->lock);
#ifdef SHM_RDRB_MULTI_BLOCK
- while (sz > 0) {
- sz -= SHM_RDRB_BLOCK_SIZE;
- ++blocks;
- }
+ if (blocks > 1)
+ rblocks = blocks << 1;
- if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
- padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
-
- while (!shm_rdrb_free(rdrb, (blocks + padblocks)) && ret != ETIMEDOUT) {
+ while (!shm_rdrb_free(rdrb, rblocks) && ret != ETIMEDOUT) {
#else
while (!shm_rdrb_free(rdrb, 1) && ret != ETIMEDOUT) {
#endif
@@ -469,6 +421,11 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
abstime);
else
ret = pthread_cond_wait(rdrb->healthy, rdrb->lock);
+
+#ifdef SHM_RDRB_MULTI_BLOCK
+ if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
+ padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
+#endif
}
if (ret != ETIMEDOUT) {
@@ -547,21 +504,14 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
- if (shm_rdrb_empty(rdrb)) {
- pthread_mutex_unlock(rdrb->lock);
- return -1;
- }
+ assert(!shm_rdrb_empty(rdrb));
idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL;
- if (idx != *rdrb->tail) {
- pthread_mutex_unlock(rdrb->lock);
- return 0;
- }
-
- garbage_collect(rdrb);
+ if (idx == *rdrb->tail)
+ garbage_collect(rdrb);
pthread_mutex_unlock(rdrb->lock);