diff options
author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-11-17 16:06:08 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-11-17 16:06:08 +0000 |
commit | 4af11ad11303ae2efbd887e81fddaff8155aa23d (patch) | |
tree | 18e6b9f690e7aaf615a91c4ccbc8ab0bc2edb967 /src/lib/shm_rdrbuff.c | |
parent | b0e0c74a14906639f5cd36d942d46b2d793e1fd4 (diff) | |
parent | 2ae032ddc4f5eb6d0e7eaa5400c1ffb80e2c0a8d (diff) | |
download | ouroboros-4af11ad11303ae2efbd887e81fddaff8155aa23d.tar.gz ouroboros-4af11ad11303ae2efbd887e81fddaff8155aa23d.zip |
Merged in dstaesse/ouroboros/be-rdrbuff (pull request #303)
lib: Remove dst_api field from rdrbuff blocks
Diffstat (limited to 'src/lib/shm_rdrbuff.c')
-rw-r--r-- | src/lib/shm_rdrbuff.c | 256 |
1 files changed, 83 insertions, 173 deletions
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index dc1feb10..a8245447 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -42,16 +42,26 @@ #include <ouroboros/logs.h> #define SHM_BLOCKS_SIZE ((SHM_BUFFER_SIZE) * SHM_RDRB_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof(size_t) \ +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof(size_t) \ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) +#ifndef SHM_RDRB_MULTI_BLOCK +#define WAIT_BLOCKS 1 +#else +#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4) +#if WAIT_BLOCKS == 0 +#undef WAIT_BLOCKS +#define WAIT_BLOCKS 1 +#endif +#endif + #define get_head_ptr(rdrb) \ - ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->ptr_head \ + ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head \ * SHM_RDRB_BLOCK_SIZE))) #define get_tail_ptr(rdrb) \ - ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->ptr_tail \ + ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail \ * SHM_RDRB_BLOCK_SIZE))) #define idx_to_du_buff_ptr(rdrb, idx) \ @@ -61,13 +71,19 @@ (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE) #define shm_rdrb_used(rdrb) \ - ((*rdrb->ptr_head + (SHM_BUFFER_SIZE) - *rdrb->ptr_tail) \ + ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) \ & ((SHM_BUFFER_SIZE) - 1)) + #define shm_rdrb_free(rdrb, i) \ (shm_rdrb_used(rdrb) + i < (SHM_BUFFER_SIZE)) #define shm_rdrb_empty(rdrb) \ - (*rdrb->ptr_tail == *rdrb->ptr_head) + (*rdrb->tail == *rdrb->head) + +enum shm_du_buff_flags { + SDB_VALID = 0, + SDB_NULL +}; struct shm_du_buff { size_t size; @@ -76,20 +92,18 @@ struct shm_du_buff { #endif size_t du_head; size_t du_tail; - pid_t dst_api; + size_t flags; size_t idx; }; struct shm_rdrbuff { - uint8_t * shm_base; /* start of blocks */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * lock; /* lock all free space in shm */ - size_t * choked; /* stale sdu detection */ - pthread_cond_t * healthy; /* du map is healthy */ - pthread_cond_t * full; /* run sanitizer when buffer full */ - pid_t * api; /* api of the irmd owner */ - enum qos_cube qos; /* qos id which this buffer serves */ + uint8_t * shm_base; /* start of blocks */ + size_t * head; /* start of ringbuffer head */ + size_t * tail; /* start of ringbuffer tail */ + pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_cond_t * full; /* flag when full */ + pthread_cond_t * healthy; /* flag when SDU is read */ + pid_t * api; /* api of the irmd owner */ }; static void garbage_collect(struct shm_rdrbuff * rdrb) @@ -97,61 +111,31 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) #ifdef SHM_RDRB_MULTI_BLOCK struct shm_du_buff * sdb; while (!shm_rdrb_empty(rdrb) && - (sdb = get_tail_ptr(rdrb))->dst_api == -1) - *rdrb->ptr_tail = (*rdrb->ptr_tail + sdb->blocks) + (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL) + *rdrb->tail = (*rdrb->tail + sdb->blocks) & ((SHM_BUFFER_SIZE) - 1); #else - while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->dst_api == -1) - *rdrb->ptr_tail = - (*rdrb->ptr_tail + 1) & ((SHM_BUFFER_SIZE) - 1); - + while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL) + *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); #endif + pthread_cond_broadcast(rdrb->healthy); } -static void clean_sdus(struct shm_rdrbuff * rdrb, pid_t api) -{ - size_t idx = *rdrb->ptr_tail; - struct shm_du_buff * buf; - - while (idx != *rdrb->ptr_head) { - buf = idx_to_du_buff_ptr(rdrb, idx); - if (buf->dst_api == api) - buf->dst_api = -1; -#ifdef SHM_RDRB_MULTI_BLOCK - idx = (idx + buf->blocks) & ((SHM_BUFFER_SIZE) - 1); -#else - idx = (idx + 1) & ((SHM_BUFFER_SIZE) - 1); -#endif - } - - garbage_collect(rdrb); - - *rdrb->choked = 0; -} - -static char * rdrb_filename(enum qos_cube qos) +static char * rdrb_filename(void) { - size_t chars = 0; char * str; - int qm = QOS_MAX; - do { - qm /= 10; - ++chars; - } while (qm > 0); - - str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1); + str = malloc(strlen(SHM_RDRB_PREFIX) + 1); if (str == NULL) { LOG_ERR("Failed to create shm_rdrbuff: Out of Memory."); return NULL; } - sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos); + sprintf(str, "%s", SHM_RDRB_PREFIX); return str; } -/* FIXME: create a ringbuffer for each qos cube in the system */ struct shm_rdrbuff * shm_rdrbuff_create() { struct shm_rdrbuff * rdrb; @@ -160,8 +144,7 @@ struct shm_rdrbuff * shm_rdrbuff_create() uint8_t * shm_base; pthread_mutexattr_t mattr; pthread_condattr_t cattr; - enum qos_cube qos = QOS_CUBE_BE; - char * shm_rdrb_fn = rdrb_filename(qos); + char * shm_rdrb_fn = rdrb_filename(); if (shm_rdrb_fn == NULL) { LOG_ERR("Could not create rdrbuff. Out of Memory"); return NULL; @@ -212,14 +195,12 @@ struct shm_rdrbuff * shm_rdrbuff_create() } rdrb->shm_base = shm_base; - rdrb->ptr_head = (size_t *) - ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); - rdrb->ptr_tail = rdrb->ptr_head + 1; - rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); - rdrb->choked = (size_t *) (rdrb->lock + 1); - rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); - rdrb->full = rdrb->healthy + 1; - rdrb->api = (pid_t *) (rdrb->full + 1); + rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); + rdrb->tail = rdrb->head + 1; + rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1); + rdrb->full = (pthread_cond_t *) (rdrb->lock + 1); + rdrb->healthy = rdrb->full + 1; + rdrb->api = (pid_t *) (rdrb->healthy + 1); pthread_mutexattr_init(&mattr); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -236,29 +217,22 @@ struct shm_rdrbuff * shm_rdrbuff_create() pthread_cond_init(rdrb->full, &cattr); pthread_cond_init(rdrb->healthy, &cattr); - *rdrb->ptr_head = 0; - *rdrb->ptr_tail = 0; - - *rdrb->choked = 0; + *rdrb->head = 0; + *rdrb->tail = 0; *rdrb->api = getpid(); - rdrb->qos = qos; - free(shm_rdrb_fn); return rdrb; } -/* FIXME: open a ringbuffer for each qos cube in the system */ struct shm_rdrbuff * shm_rdrbuff_open() { struct shm_rdrbuff * rdrb; int shm_fd; uint8_t * shm_base; - - enum qos_cube qos = QOS_CUBE_BE; - char * shm_rdrb_fn = rdrb_filename(qos); + char * shm_rdrb_fn = rdrb_filename(); if (shm_rdrb_fn == NULL) { LOG_ERR("Could not create rdrbuff. Out of Memory"); return NULL; @@ -297,32 +271,20 @@ struct shm_rdrbuff * shm_rdrbuff_open() } rdrb->shm_base = shm_base; - rdrb->ptr_head = (size_t *) - ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); - rdrb->ptr_tail = rdrb->ptr_head + 1; - rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); - rdrb->choked = (size_t *) (rdrb->lock + 1); - rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); - rdrb->full = rdrb->healthy + 1; - rdrb->api = (pid_t *) (rdrb->full + 1); - - rdrb->qos = qos; + rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); + rdrb->tail = rdrb->head + 1; + rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1); + rdrb->full = (pthread_cond_t *) (rdrb->lock + 1); + rdrb->healthy = rdrb->full + 1; + rdrb->api = (pid_t *) (rdrb->healthy + 1); free(shm_rdrb_fn); return rdrb; } -void * shm_rdrbuff_sanitize(void * o) +void shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb) { - struct shm_rdrbuff * rdrb = (struct shm_rdrbuff *) o; - struct timespec intv - = {SHM_DU_TIMEOUT_MICROS / MILLION, - (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; - - pid_t api; - - assert(o); #ifdef __APPLE__ pthread_mutex_lock(rdrb->lock); @@ -332,14 +294,10 @@ void * shm_rdrbuff_sanitize(void * o) pthread_mutex_consistent(rdrb->lock); } #endif - pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, (void *) rdrb->lock); - while (true) { - int ret = 0; - struct timespec now; - struct timespec dl; + while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) { #ifdef __APPLE__ pthread_cond_wait(rdrb->full, rdrb->lock); #else @@ -348,49 +306,11 @@ void * shm_rdrbuff_sanitize(void * o) pthread_mutex_consistent(rdrb->lock); } #endif - *rdrb->choked = 1; - - garbage_collect(rdrb); - - if (shm_rdrb_empty(rdrb)) { - pthread_cond_broadcast(rdrb->healthy); - continue; - } - - api = get_tail_ptr(rdrb)->dst_api; - - if (kill(api, 0)) { - LOG_DBGF("Dead process %d left stale sdu.", api); - clean_sdus(rdrb, api); - pthread_cond_broadcast(rdrb->healthy); - continue; - } - - clock_gettime(CLOCK_REALTIME, &now); - ts_add(&now, &intv, &dl); - while (*rdrb->choked) { - ret = pthread_cond_timedwait(rdrb->healthy, - rdrb->lock, - &dl); - if (!ret) - continue; -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(rdrb->lock); - } -#endif - if (ret == ETIMEDOUT) { - LOG_DBGF("SDU timed out (dst: %d).", api); - clean_sdus(rdrb, api); - } - } - pthread_cond_broadcast(rdrb->healthy); } - pthread_cleanup_pop(true); + garbage_collect(rdrb); - return (void *) 0; + pthread_cleanup_pop(true); } void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) @@ -417,7 +337,7 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); - shm_rdrb_fn = rdrb_filename(rdrb->qos); + shm_rdrb_fn = rdrb_filename(); if (shm_rdrb_fn == NULL) { LOG_ERR("Could not create rdrbuff. Out of Memory"); return; @@ -431,7 +351,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) } ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, - pid_t dst_api, size_t headspace, size_t tailspace, uint8_t * data, @@ -444,7 +363,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, size_t padblocks = 0; #endif ssize_t sz = size + sizeof(*sdb); - uint8_t * write_pos; assert(rdrb); assert(data); @@ -469,14 +387,15 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, ++blocks; } - if (blocks + *rdrb->ptr_head > (SHM_BUFFER_SIZE)) - padblocks = (SHM_BUFFER_SIZE) - *rdrb->ptr_head; + if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) + padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; if (!shm_rdrb_free(rdrb, blocks + padblocks)) { #else if (!shm_rdrb_free(rdrb, 1)) { #endif - pthread_cond_signal(rdrb->full); + LOG_DBG("buffer full, idx = %ld.", *rdrb->tail); + pthread_cond_broadcast(rdrb->full); pthread_mutex_unlock(rdrb->lock); return -1; } @@ -486,31 +405,29 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->dst_api = -1; + sdb->flags = SDB_NULL; sdb->du_head = 0; sdb->du_tail = 0; - sdb->idx = *rdrb->ptr_head; + sdb->idx = *rdrb->head; - *rdrb->ptr_head = 0; + *rdrb->head = 0; } #endif sdb = get_head_ptr(rdrb); sdb->size = size; - sdb->dst_api = dst_api; + sdb->flags = SDB_VALID; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; #endif - write_pos = ((uint8_t *) (sdb + 1)) + headspace; - - memcpy(write_pos, data, len); + memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); - sdb->idx = *rdrb->ptr_head; + sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK - *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & ((SHM_BUFFER_SIZE) - 1); + *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); #else - *rdrb->ptr_head = (*rdrb->ptr_head + 1) & ((SHM_BUFFER_SIZE) - 1); + *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); #endif pthread_mutex_unlock(rdrb->lock); @@ -518,7 +435,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, } ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, - pid_t dst_api, size_t headspace, size_t tailspace, uint8_t * data, @@ -531,7 +447,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, size_t padblocks = 0; #endif ssize_t sz = size + sizeof(*sdb); - uint8_t * write_pos; assert(rdrb); assert(data); @@ -559,14 +474,14 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, ++blocks; } - if (blocks + *rdrb->ptr_head > (SHM_BUFFER_SIZE)) - padblocks = (SHM_BUFFER_SIZE) - *rdrb->ptr_head; + if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) + padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; while (!shm_rdrb_free(rdrb, (blocks + padblocks))) { #else while (!shm_rdrb_free(rdrb, 1)) { #endif - pthread_cond_signal(rdrb->full); + pthread_cond_broadcast(rdrb->full); pthread_cond_wait(rdrb->healthy, rdrb->lock); } @@ -575,31 +490,29 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->dst_api = -1; + sdb->flags = SDB_NULL; sdb->du_head = 0; sdb->du_tail = 0; - sdb->idx = *rdrb->ptr_head; + sdb->idx = *rdrb->head; - *rdrb->ptr_head = 0; + *rdrb->head = 0; } #endif sdb = get_head_ptr(rdrb); sdb->size = size; - sdb->dst_api = dst_api; + sdb->flags = SDB_VALID; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; #endif - write_pos = ((uint8_t *) (sdb + 1)) + headspace; - - memcpy(write_pos, data, len); + memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); - sdb->idx = *rdrb->ptr_head; + sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK - *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & ((SHM_BUFFER_SIZE) - 1); + *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); #else - *rdrb->ptr_head = (*rdrb->ptr_head + 1) & ((SHM_BUFFER_SIZE) - 1); + *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); #endif pthread_cleanup_pop(true); @@ -684,18 +597,15 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, size_t idx) return -1; } - idx_to_du_buff_ptr(rdrb, idx)->dst_api = -1; + idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; - if (idx != *rdrb->ptr_tail) { + if (idx != *rdrb->tail) { pthread_mutex_unlock(rdrb->lock); return 0; } garbage_collect(rdrb); - *rdrb->choked = 0; - - pthread_cond_broadcast(rdrb->healthy); pthread_mutex_unlock(rdrb->lock); return 0; |