diff options
Diffstat (limited to 'src/lib/shm_du_map.c')
-rw-r--r-- | src/lib/shm_du_map.c | 257 |
1 files changed, 194 insertions, 63 deletions
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index 2a316265..24adac1a 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -23,11 +23,14 @@ #include <ouroboros/config.h> #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/time_utils.h> #include <pthread.h> #include <sys/mman.h> #include <fcntl.h> #include <stdlib.h> #include <string.h> +#include <signal.h> #include <sys/stat.h> #define OUROBOROS_PREFIX "shm_du_map" @@ -35,8 +38,8 @@ #include <ouroboros/logs.h> #define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof (size_t) \ - + sizeof(pthread_mutex_t) + sizeof(pthread_cond_t) \ +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ + + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) #define get_head_ptr(dum) \ @@ -57,6 +60,8 @@ & (SHM_BLOCKS_IN_MAP - 1)) #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) + #define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \ idx_to_du_buff_ptr(dum, idx)->du_head) @@ -66,7 +71,7 @@ struct shm_du_buff { size_t size; size_t du_head; size_t du_tail; - size_t garbage; + pid_t dst_api; }; struct shm_du_map { @@ -74,11 +79,80 @@ struct shm_du_map { size_t * ptr_head; /* start of ringbuffer head */ size_t * ptr_tail; /* start of ringbuffer tail */ pthread_mutex_t * shm_mutex; /* lock all free space in shm */ - pthread_cond_t * sanitize; /* run sanitizer when buffer full */ + size_t * choked; /* stale sdu detection */ + pthread_cond_t * healthy; /* du map is healthy */ + pthread_cond_t * full; /* run sanitizer when buffer full */ pid_t * api; /* api of the irmd owner */ int fd; }; +static void garbage_collect(struct shm_du_map * dum) +{ +#ifndef SHM_MAP_SINGLE_BLOCK + long sz; + long blocks; +#endif + while (get_tail_ptr(dum)->dst_api == 0 && + !shm_map_empty(dum)) { +#ifndef SHM_MAP_SINGLE_BLOCK + blocks = 0; + sz = get_tail_ptr(dum)->size + + (long) sizeof(struct shm_du_buff); + while (sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + *dum->ptr_tail = + (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); + +#else + *dum->ptr_tail = + (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); +#endif + } +} + +static void clean_sdus(struct shm_du_map * dum, pid_t api) +{ + size_t idx = *dum->ptr_tail; + struct shm_du_buff * buf; +#ifndef SHM_DU_MAP_SINGLE_BLOCK + long sz; + long blocks = 0; +#endif + + while (idx != *dum->ptr_head) { + buf = idx_to_du_buff_ptr(dum, idx); + if (buf->dst_api == api) + buf->dst_api = 0; + +#ifndef SHM_DU_MAP_SINGLE_BLOCK + blocks = 0; + sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff); + while (sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1); +#else + idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1); +#endif + } + + garbage_collect(dum); + + if (kill(api, 0) == 0) { + struct shm_ap_rbuff * rb; + rb = shm_ap_rbuff_open(api); + shm_ap_rbuff_reset(rb); + shm_ap_rbuff_close(rb); + } + + *dum->choked = 0; +} + struct shm_du_map * shm_du_map_create() { struct shm_du_map * dum; @@ -140,8 +214,10 @@ struct shm_du_map * shm_du_map_create() ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1); - dum->api = (pid_t *) (dum->sanitize + 1); + dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->healthy = (pthread_cond_t *) (dum->choked + 1); + dum->full = dum->healthy + 1; + dum->api = (pid_t *) (dum->full + 1); pthread_mutexattr_init(&mattr); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -150,11 +226,14 @@ struct shm_du_map * shm_du_map_create() pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); - pthread_cond_init(dum->sanitize, &cattr); + pthread_cond_init(dum->full, &cattr); + pthread_cond_init(dum->healthy, &cattr); *dum->ptr_head = 0; *dum->ptr_tail = 0; + *dum->choked = 0; + *dum->api = getpid(); dum->fd = shm_fd; @@ -202,8 +281,10 @@ struct shm_du_map * shm_du_map_open() ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1); - dum->api = (pid_t *) (dum->sanitize + 1); + dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->healthy = (pthread_cond_t *) (dum->choked + 1); + dum->full = dum->healthy + 1; + dum->api = (pid_t *) (dum->full + 1); dum->fd = shm_fd; @@ -212,12 +293,73 @@ struct shm_du_map * shm_du_map_open() pid_t shm_du_map_owner(struct shm_du_map * dum) { + if (dum == NULL) + return 0; + return *dum->api; } void * shm_du_map_sanitize(void * o) { - LOG_MISSING; + struct shm_du_map * dum = (struct shm_du_map *) o; + struct timespec intv + = {SHM_DU_TIMEOUT_MICROS / MILLION, + (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; + + pid_t api; + + if (dum == NULL) + return (void *) -1; + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, + (void *) dum->shm_mutex); + + while (true) { + int ret = 0; + if (pthread_cond_wait(dum->full, dum->shm_mutex) + == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + *dum->choked = 1; + + api = get_tail_ptr(dum)->dst_api; + + if (kill(api, 0) == 0) { + struct timespec now; + struct timespec dl; + clock_gettime(CLOCK_REALTIME, &now); + ts_add(&now, &intv, &dl); + while (*dum->choked) { + ret = pthread_cond_timedwait(dum->healthy, + dum->shm_mutex, + &dl); + if (!ret) + continue; + + if (ret == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + if (ret == ETIMEDOUT) { + LOG_DBGF("SDU timed out."); + clean_sdus(dum, api); + } + } + } else { + LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret); + clean_sdus(dum, api); + } + } + + pthread_cleanup_pop(true); + return (void *) 0; } @@ -256,21 +398,23 @@ void shm_du_map_destroy(struct shm_du_map * dum) free(dum); } -ssize_t shm_create_du_buff(struct shm_du_map * dum, - size_t size, - size_t headspace, - uint8_t * data, - size_t len) +ssize_t shm_du_map_write(struct shm_du_map * dum, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) { struct shm_du_buff * sdb; #ifndef SHM_MAP_SINGLE_BLOCK long blocks = 0; - int sz = size + sizeof *sdb; - int sz2 = headspace + len + sizeof *sdb; + size_t size = headspace + len + tailspace; + int sz = headspace + len + sizeof *sdb; + int sz2 = sz + tailspace; size_t copy_len; #endif uint8_t * write_pos; - ssize_t index; + ssize_t index = -1; if (dum == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -294,36 +438,35 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, return -1; } - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } #ifndef SHM_MAP_SINGLE_BLOCK - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; + while (sz2 > 0) { sz2 -= SHM_DU_BUFF_BLOCK_SIZE; - if (sz2 < 0 && sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + if (sz < 0 && sz2 > 0) { pthread_mutex_unlock(dum->shm_mutex); - LOG_DBG("Can't handle this packet now"); - return -1; + LOG_DBG("Can't handle this packet now."); + return -EAGAIN; } ++blocks; } if (!shm_map_free(dum, blocks)) { - pthread_mutex_unlock(dum->shm_mutex); - pthread_cond_signal(dum->sanitize); - return -1; - } #else if (!shm_map_free(dum, 1)) { +#endif + pthread_cond_signal(dum->full); pthread_mutex_unlock(dum->shm_mutex); - ptrhead_cond_signal(dum->sanitize); return -1; } -#endif sdb = get_head_ptr(dum); sdb->size = size; - sdb->garbage = 0; + sdb->dst_api = dst_api; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; @@ -333,7 +476,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb); while (blocks > 0) { memcpy(write_pos, data, copy_len); - *(dum->ptr_head) = (*dum->ptr_head + 1) + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); len -= copy_len; copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE); @@ -346,7 +489,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, #else memcpy(write_pos, data, len); index = *dum->ptr_head; - *(dum->ptr_head) = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); #endif pthread_mutex_unlock(dum->shm_mutex); @@ -354,18 +497,21 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, } /* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ -int shm_du_map_read_sdu(uint8_t ** dst, - struct shm_du_map * dum, - ssize_t idx) +int shm_du_map_read(uint8_t ** dst, + struct shm_du_map * dum, + ssize_t idx) { size_t len = 0; if (idx > SHM_BLOCKS_IN_MAP) return -1; - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } - if (*dum->ptr_head == *dum->ptr_tail) { + if (shm_map_empty(dum)) { pthread_mutex_unlock(dum->shm_mutex); return -1; } @@ -380,47 +526,32 @@ int shm_du_map_read_sdu(uint8_t ** dst, return len; } -int shm_release_du_buff(struct shm_du_map * dum, ssize_t idx) +int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) { -#ifndef SHM_MAP_SINGLE_BLOCK - long sz; - long blocks = 0; -#endif if (idx > SHM_BLOCKS_IN_MAP) return -1; - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } - if (*dum->ptr_head == *dum->ptr_tail) { + if (shm_map_empty(dum)) { pthread_mutex_unlock(dum->shm_mutex); return -1; } - idx_to_du_buff_ptr(dum, idx)->garbage = 1; + idx_to_du_buff_ptr(dum, idx)->dst_api = 0; if (idx != *dum->ptr_tail) { pthread_mutex_unlock(dum->shm_mutex); return 0; } - while (get_tail_ptr(dum)->garbage == 1 && - *dum->ptr_tail != *dum->ptr_head) { -#ifndef SHM_MAP_SINGLE_BLOCK - sz = get_tail_ptr(dum)->size; - while (sz + (long) sizeof(struct shm_du_buff) > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } + garbage_collect(dum); - *(dum->ptr_tail) = - (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); - - blocks = 0; -#else - *(dum->ptr_tail) = - (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); -#endif - } + *dum->choked = 0; + pthread_cond_signal(dum->healthy); pthread_mutex_unlock(dum->shm_mutex); |