diff options
author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-07-03 11:26:59 +0200 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-07-03 11:26:59 +0200 |
commit | d2a7cb2d27dab595bd2948ad3724016ca948e61e (patch) | |
tree | 2c3c8037b780d60d34a1ac06e0439da79cf2fadb /src/lib | |
parent | de63f8b37f82ef6a760c7d3dafe2251160e2c114 (diff) | |
parent | 04f385a99f5e901598ee4b3d2655e458c92c06d8 (diff) | |
download | ouroboros-0.2.tar.gz ouroboros-0.2.zip |
Merged in dstaesse/ouroboros/be-multiblock-read (pull request #148)0.2
lib: shm_du_map full multi-block support
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/shm_du_map.c | 182 |
1 files changed, 79 insertions, 103 deletions
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index 24adac1a..31fcca8e 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -62,13 +62,11 @@ #define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) -#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \ - idx_to_du_buff_ptr(dum, idx)->du_head) - -#define MIN(a,b)(a < b ? a : b) - struct shm_du_buff { size_t size; +#ifdef SHM_DU_MAP_MULTI_BLOCK + size_t blocks; +#endif size_t du_head; size_t du_tail; pid_t dst_api; @@ -88,54 +86,32 @@ struct shm_du_map { static void garbage_collect(struct shm_du_map * dum) { -#ifndef SHM_MAP_SINGLE_BLOCK - long sz; - long blocks; -#endif - while (get_tail_ptr(dum)->dst_api == 0 && - !shm_map_empty(dum)) { -#ifndef SHM_MAP_SINGLE_BLOCK - blocks = 0; - sz = get_tail_ptr(dum)->size + - (long) sizeof(struct shm_du_buff); - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } - - *dum->ptr_tail = - (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); - +#ifdef SHM_DU_MAP_MULTI_BLOCK + struct shm_du_buff * sdb; + while ((sdb = get_tail_ptr(dum))->dst_api == 0 && + !shm_map_empty(dum)) + *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks) + & (SHM_BLOCKS_IN_MAP - 1); #else + while (get_tail_ptr(dum)->dst_api == 0 && + !shm_map_empty(dum)) *dum->ptr_tail = (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); + #endif - } } static void clean_sdus(struct shm_du_map * dum, pid_t api) { size_t idx = *dum->ptr_tail; struct shm_du_buff * buf; -#ifndef SHM_DU_MAP_SINGLE_BLOCK - long sz; - long blocks = 0; -#endif while (idx != *dum->ptr_head) { buf = idx_to_du_buff_ptr(dum, idx); if (buf->dst_api == api) buf->dst_api = 0; - -#ifndef SHM_DU_MAP_SINGLE_BLOCK - blocks = 0; - sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff); - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } - - idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1); +#ifdef SHM_DU_MAP_MULTI_BLOCK + idx = (idx + buf->blocks) & (SHM_BLOCKS_IN_MAP - 1); #else idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1); #endif @@ -320,6 +296,9 @@ void * shm_du_map_sanitize(void * o) while (true) { int ret = 0; + struct timespec now; + struct timespec dl; + if (pthread_cond_wait(dum->full, dum->shm_mutex) == EOWNERDEAD) { LOG_WARN("Recovering dead mutex."); @@ -328,33 +307,37 @@ void * shm_du_map_sanitize(void * o) *dum->choked = 1; + garbage_collect(dum); + + if (shm_map_empty(dum)) + continue; + api = get_tail_ptr(dum)->dst_api; - if (kill(api, 0) == 0) { - struct timespec now; - struct timespec dl; - clock_gettime(CLOCK_REALTIME, &now); - ts_add(&now, &intv, &dl); - while (*dum->choked) { - ret = pthread_cond_timedwait(dum->healthy, - dum->shm_mutex, - &dl); - if (!ret) - continue; - - if (ret == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); - } - - if (ret == ETIMEDOUT) { - LOG_DBGF("SDU timed out."); - clean_sdus(dum, api); - } - } - } else { - LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret); + if (kill(api, 0)) { + LOG_DBGF("Dead process %d left stale sdu.", api); clean_sdus(dum, api); + continue; + } + + clock_gettime(CLOCK_REALTIME, &now); + ts_add(&now, &intv, &dl); + while (*dum->choked) { + ret = pthread_cond_timedwait(dum->healthy, + dum->shm_mutex, + &dl); + if (!ret) + continue; + + if (ret == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + if (ret == ETIMEDOUT) { + LOG_DBGF("SDU timed out."); + clean_sdus(dum, api); + } } } @@ -406,12 +389,12 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, size_t len) { struct shm_du_buff * sdb; -#ifndef SHM_MAP_SINGLE_BLOCK - long blocks = 0; size_t size = headspace + len + tailspace; +#ifdef SHM_DU_MAP_MULTI_BLOCK + long blocks = 0; + long padblocks = 0; int sz = headspace + len + sizeof *sdb; int sz2 = sz + tailspace; - size_t copy_len; #endif uint8_t * write_pos; ssize_t index = -1; @@ -421,29 +404,17 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, return -1; } -#ifdef SHM_MAP_SINGLE_BLOCK +#ifndef SHM_DU_MAP_MULTI_BLOCK if (size + sizeof *sdb > SHM_DU_BUFF_BLOCK_SIZE) { LOG_DBGF("Multi-block SDU's disabled. Dropping."); return -1; } #endif - - if (headspace >= size) { - LOG_DBGF("Index out of bounds."); - return -1; - } - - if (headspace + len > size) { - LOG_DBGF("Buffer too small for data."); - return -1; - } - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); pthread_mutex_consistent(dum->shm_mutex); } - -#ifndef SHM_MAP_SINGLE_BLOCK +#ifdef SHM_DU_MAP_MULTI_BLOCK while (sz2 > 0) { sz2 -= SHM_DU_BUFF_BLOCK_SIZE; sz -= SHM_DU_BUFF_BLOCK_SIZE; @@ -455,7 +426,10 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, ++blocks; } - if (!shm_map_free(dum, blocks)) { + if (blocks + *dum->ptr_head > SHM_BLOCKS_IN_MAP - 1) + padblocks = SHM_BLOCKS_IN_MAP - *dum->ptr_head; + + if (!shm_map_free(dum, (blocks + padblocks))) { #else if (!shm_map_free(dum, 1)) { #endif @@ -464,31 +438,34 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, return -1; } - sdb = get_head_ptr(dum); - sdb->size = size; +#ifdef SHM_DU_MAP_MULTI_BLOCK + if (padblocks) { + sdb = get_head_ptr(dum); + sdb->size = 0; + sdb->blocks = padblocks; + sdb->dst_api = 0; + sdb->du_head = 0; + sdb->du_tail = 0; + + *dum->ptr_head = 0; + } +#endif + sdb = get_head_ptr(dum); + sdb->size = size; sdb->dst_api = dst_api; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; - +#ifdef SHM_DU_MAP_MULTI_BLOCK + sdb->blocks = blocks; +#endif write_pos = ((uint8_t *) sdb) + sizeof *sdb + headspace; -#ifndef SHM_MAP_SINGLE_BLOCK - copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb); - while (blocks > 0) { - memcpy(write_pos, data, copy_len); - *dum->ptr_head = (*dum->ptr_head + 1) - & (SHM_BLOCKS_IN_MAP - 1); - len -= copy_len; - copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE); - write_pos = (uint8_t *) get_head_ptr(dum); - --blocks; - } - - index = (*dum->ptr_head - 1 + SHM_BLOCKS_IN_MAP) - & (SHM_BLOCKS_IN_MAP - 1); -#else memcpy(write_pos, data, len); + index = *dum->ptr_head; +#ifdef SHM_DU_MAP_MULTI_BLOCK + *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BLOCKS_IN_MAP - 1); +#else *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); #endif pthread_mutex_unlock(dum->shm_mutex); @@ -496,12 +473,12 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, return index; } -/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ int shm_du_map_read(uint8_t ** dst, struct shm_du_map * dum, ssize_t idx) { size_t len = 0; + struct shm_du_buff * sdb; if (idx > SHM_BLOCKS_IN_MAP) return -1; @@ -516,10 +493,9 @@ int shm_du_map_read(uint8_t ** dst, return -1; } - *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) + - sizeof(struct shm_du_buff) + - idx_to_du_buff_ptr(dum, idx)->du_head; - len = sdu_size(dum, idx); + sdb = idx_to_du_buff_ptr(dum, idx); + len = sdb->du_tail - sdb->du_head; + *dst = ((uint8_t *) sdb) + sizeof(struct shm_du_buff) + sdb->du_head; pthread_mutex_unlock(dum->shm_mutex); |