diff options
-rw-r--r-- | include/ouroboros/shm_rdrbuff.h | 3 | ||||
-rw-r--r-- | src/irmd/main.c | 46 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 130 |
3 files changed, 40 insertions, 139 deletions
diff --git a/include/ouroboros/shm_rdrbuff.h b/include/ouroboros/shm_rdrbuff.h index c27ff24d..277609c5 100644 --- a/include/ouroboros/shm_rdrbuff.h +++ b/include/ouroboros/shm_rdrbuff.h @@ -43,9 +43,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb); void shm_rdrbuff_purge(void); -int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb, - struct timespec * timeo); - /* returns the index of the buffer in the DU map */ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, size_t headspace, diff --git a/src/irmd/main.c b/src/irmd/main.c index 574223b0..aeb43f0d 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -134,7 +134,6 @@ struct { struct tpm * tpm; /* thread pool manager */ pthread_t irm_sanitize; /* clean up irmd resources */ - pthread_t shm_sanitize; /* keep track of rdrbuff use */ pthread_t acceptor; /* accept new commands */ } irmd; @@ -1683,43 +1682,6 @@ void irmd_sig_handler(int sig, } } -void * shm_sanitize(void * o) -{ - struct list_head * p = NULL; - struct timespec ts = {SHM_SAN_HOLDOFF / 1000, - (SHM_SAN_HOLDOFF % 1000) * MILLION}; - ssize_t idx; - - (void) o; - - while (irmd_get_state() == IRMD_RUNNING) { - if (shm_rdrbuff_wait_full(irmd.rdrb, &ts) == -ETIMEDOUT) - continue; - - pthread_rwlock_wrlock(&irmd.flows_lock); - - list_for_each(p, &irmd.irm_flows) { - struct irm_flow * f = - list_entry(p, struct irm_flow, next); - if (kill(f->n_pid, 0) < 0) { - while ((idx = shm_rbuff_read(f->n_rb)) >= 0) - shm_rdrbuff_remove(irmd.rdrb, idx); - continue; - } - - if (kill(f->n_1_pid, 0) < 0) { - while ((idx = shm_rbuff_read(f->n_1_rb)) >= 0) - shm_rdrbuff_remove(irmd.rdrb, idx); - continue; - } - } - - pthread_rwlock_unlock(&irmd.flows_lock); - } - - return (void *) 0; -} - void * irm_sanitize(void * o) { struct timespec now; @@ -2362,11 +2324,6 @@ int main(int argc, goto fail_irm_sanitize; } - if (pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb)) { - irmd_set_state(IRMD_NULL); - goto fail_shm_sanitize; - } - if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL)) { irmd_set_state(IRMD_NULL); goto fail_acceptor; @@ -2374,7 +2331,6 @@ int main(int argc, pthread_join(irmd.acceptor, NULL); pthread_join(irmd.irm_sanitize, NULL); - pthread_join(irmd.shm_sanitize, NULL); tpm_stop(irmd.tpm); @@ -2393,8 +2349,6 @@ int main(int argc, exit(EXIT_SUCCESS); fail_acceptor: - pthread_join(irmd.shm_sanitize, NULL); - fail_shm_sanitize: pthread_join(irmd.irm_sanitize, NULL); fail_irm_sanitize: tpm_stop(irmd.tpm); diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 1dfcb2ca..5ae2085d 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -46,29 +46,17 @@ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) -#ifndef SHM_RDRB_MULTI_BLOCK -#define WAIT_BLOCKS 1 -#else -#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4) -#if WAIT_BLOCKS == 0 -#undef WAIT_BLOCKS -#define WAIT_BLOCKS 1 -#endif -#endif - #define get_head_ptr(rdrb) \ - ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head \ - * SHM_RDRB_BLOCK_SIZE))) + idx_to_du_buff_ptr(rdrb, *rdrb->head) #define get_tail_ptr(rdrb) \ - ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail \ - * SHM_RDRB_BLOCK_SIZE))) + idx_to_du_buff_ptr(rdrb, *rdrb->tail) #define idx_to_du_buff_ptr(rdrb, idx) \ ((struct shm_du_buff *) (rdrb->shm_base + idx * SHM_RDRB_BLOCK_SIZE)) #define shm_rdrb_used(rdrb) \ - ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) \ + (((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) + 1) \ & ((SHM_BUFFER_SIZE) - 1)) #define shm_rdrb_free(rdrb, i) \ @@ -118,6 +106,13 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) pthread_cond_broadcast(rdrb->healthy); } +static void sanitize(struct shm_rdrbuff * rdrb) +{ + get_head_ptr(rdrb)->flags = SDB_NULL; + garbage_collect(rdrb); + pthread_mutex_consistent(rdrb->lock); +} + static char * rdrb_filename(void) { char * str; @@ -282,51 +277,6 @@ struct shm_rdrbuff * shm_rdrbuff_open() return rdrb_create(O_RDWR); } -int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb, - struct timespec * timeo) -{ - struct timespec abstime; - - if (timeo != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeo, &abstime); - } - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rdrb->lock); -#else - if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); -#endif - - while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) { -#ifndef HAVE_ROBUST_MUTEX - if (pthread_cond_timedwait(rdrb->full, - rdrb->lock, - &abstime) == ETIMEDOUT) { - pthread_mutex_unlock(rdrb->lock); - return -ETIMEDOUT; - } -#else - int ret = pthread_cond_timedwait(rdrb->full, - rdrb->lock, - &abstime); - if (ret == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); - if (ret == ETIMEDOUT) { - pthread_mutex_unlock(rdrb->lock); - return -ETIMEDOUT; - } -#endif - } - - garbage_collect(rdrb); - - pthread_mutex_unlock(rdrb->lock); - - return 0; -} - void shm_rdrbuff_purge(void) { char * shm_rdrb_fn; @@ -358,19 +308,19 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, #ifndef SHM_RDRB_MULTI_BLOCK if (sz > SHM_RDRB_BLOCK_SIZE) return -EMSGSIZE; +#else + while (sz > 0) { + sz -= SHM_RDRB_BLOCK_SIZE; + ++blocks; + } #endif #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); + sanitize(rdrb); #endif #ifdef SHM_RDRB_MULTI_BLOCK - while (sz > 0) { - sz -= SHM_RDRB_BLOCK_SIZE; - ++blocks; - } - if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; @@ -426,39 +376,41 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, const struct timespec * abstime) { struct shm_du_buff * sdb; - size_t size = headspace + len + tailspace; + size_t size = headspace + len + tailspace; #ifdef SHM_RDRB_MULTI_BLOCK - size_t blocks = 0; + size_t blocks = 0; size_t padblocks = 0; + size_t rblocks; #endif - ssize_t sz = size + sizeof(*sdb); - int ret = 0; + ssize_t sz = size + sizeof(*sdb); + int ret = 0; assert(rdrb); #ifndef SHM_RDRB_MULTI_BLOCK if (sz > SHM_RDRB_BLOCK_SIZE) return -EMSGSIZE; +#else + while (sz > 0) { + sz -= SHM_RDRB_BLOCK_SIZE; + ++blocks; + } + rblocks = blocks; #endif #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); + sanitize(rdrb); #endif pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, (void *) rdrb->lock); #ifdef SHM_RDRB_MULTI_BLOCK - while (sz > 0) { - sz -= SHM_RDRB_BLOCK_SIZE; - ++blocks; - } + if (blocks > 1) + rblocks = blocks << 1; - if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) - padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; - - while (!shm_rdrb_free(rdrb, (blocks + padblocks)) && ret != ETIMEDOUT) { + while (!shm_rdrb_free(rdrb, rblocks) && ret != ETIMEDOUT) { #else while (!shm_rdrb_free(rdrb, 1) && ret != ETIMEDOUT) { #endif @@ -469,6 +421,11 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, abstime); else ret = pthread_cond_wait(rdrb->healthy, rdrb->lock); + +#ifdef SHM_RDRB_MULTI_BLOCK + if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) + padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; +#endif } if (ret != ETIMEDOUT) { @@ -547,21 +504,14 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); + sanitize(rdrb); #endif - if (shm_rdrb_empty(rdrb)) { - pthread_mutex_unlock(rdrb->lock); - return -1; - } + assert(!shm_rdrb_empty(rdrb)); idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; - if (idx != *rdrb->tail) { - pthread_mutex_unlock(rdrb->lock); - return 0; - } - - garbage_collect(rdrb); + if (idx == *rdrb->tail) + garbage_collect(rdrb); pthread_mutex_unlock(rdrb->lock); |