diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-10-11 13:39:43 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-10-11 13:45:02 +0200 |
commit | c69dd846c5aa2bed4db16961c5774a20cea7f828 (patch) | |
tree | 7e9eb9f82c4fab120abc11c4b539d4beb4c19982 /src/lib/shm_ap_rbuff.c | |
parent | 99356adf207e0fe81a34ee1acfd8cacc3d2860c7 (diff) | |
download | ouroboros-c69dd846c5aa2bed4db16961c5774a20cea7f828.tar.gz ouroboros-c69dd846c5aa2bed4db16961c5774a20cea7f828.zip |
lib: Track SDUs in the fast path
This will allow to finalize deallocating flows until all SDUs have
been processed. Read and write calls will now block when a flow was
deallocated. Replaces NULL checks in the fast path with asserts.
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 169 |
1 files changed, 121 insertions, 48 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index acbc81a6..ede0b7f7 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -39,28 +39,33 @@ #include <unistd.h> #include <signal.h> #include <sys/stat.h> +#include <assert.h> #define FN_MAX_CHARS 255 #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + + IRMD_MAX_FLOWS * sizeof(int8_t) \ + + IRMD_MAX_FLOWS * sizeof (ssize_t) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) -#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_BUFFER_SIZE - *rb->ptr_tail) \ +#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \ & (SHM_BUFFER_SIZE - 1)) #define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) -#define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->ptr_head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->ptr_tail) +#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) struct shm_ap_rbuff { - struct rb_entry * shm_base; /* start of entry */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * add; /* SDU arrived */ - pthread_cond_t * del; /* SDU removed */ - pid_t api; /* api to which this rb belongs */ + struct rb_entry * shm_base; /* start of entry */ + size_t * head; /* start of ringbuffer head */ + size_t * tail; /* start of ringbuffer tail */ + int8_t * acl; /* start of port_id access table */ + ssize_t * cntrs; /* start of port_id counters */ + pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_cond_t * add; /* SDU arrived */ + pthread_cond_t * del; /* SDU removed */ + pid_t api; /* api to which this rb belongs */ int fd; }; @@ -73,6 +78,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() pthread_condattr_t cattr; char fn[FN_MAX_CHARS]; mode_t mask; + int i; sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); @@ -125,9 +131,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() } rb->shm_base = shm_base; - rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->ptr_tail = rb->ptr_head + 1; - rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1); + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); + rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); rb->add = (pthread_cond_t *) (rb->lock + 1); rb->del = rb->add + 1; @@ -143,11 +151,16 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif + for (i = 0; i < IRMD_MAX_FLOWS; ++i) { + rb->cntrs[i] = 0; + rb->acl[i] = -1; + } + pthread_cond_init(rb->add, &cattr); pthread_cond_init(rb->del, &cattr); - *rb->ptr_head = 0; - *rb->ptr_tail = 0; + *rb->head = 0; + *rb->tail = 0; rb->fd = shm_fd; rb->api = getpid(); @@ -197,9 +210,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) } rb->shm_base = shm_base; - rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->ptr_tail = rb->ptr_head + 1; - rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1); + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); + rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); rb->add = (pthread_cond_t *) (rb->lock + 1); rb->del = rb->add + 1; @@ -211,10 +226,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) { - if (rb == NULL) { - LOG_DBG("Bogus input. Bugging out."); - return; - } + assert(rb); if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); @@ -225,15 +237,56 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) free(rb); } +void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id) +{ + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + +#ifdef OUROBOROS_CONFIG_DEBUG + if (!rb->acl[port_id]) + LOG_DBG("Trying to open open port."); +#endif + rb->acl[port_id] = 0; /* open */ + + pthread_mutex_unlock(rb->lock); +} + +void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) +{ + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif +#ifdef OUROBOROS_CONFIG_DEBUG + if (rb->acl[port_id]) + LOG_DBG("Trying to close closed port."); +#endif + rb->acl[port_id] = -1; + + pthread_mutex_unlock(rb->lock); +} + void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) { char fn[25]; struct lockfile * lf = NULL; - if (rb == NULL) { - LOG_DBG("Bogus input. Bugging out."); - return; - } + assert(rb); if (rb->api != getpid()) { lf = lockfile_open(); @@ -267,8 +320,8 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) { - if (rb == NULL || e == NULL) - return -1; + assert(rb); + assert(e); #ifdef __APPLE__ pthread_mutex_lock(rb->lock); @@ -278,6 +331,11 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) pthread_mutex_consistent(rb->lock); } #endif + if (rb->acl[e->port_id]) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + if (!shm_rbuff_free(rb)) { pthread_mutex_unlock(rb->lock); return -1; @@ -287,7 +345,9 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) pthread_cond_broadcast(rb->add); *head_el_ptr(rb) = *e; - *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1); + *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); + + ++rb->cntrs[e->port_id]; pthread_mutex_unlock(rb->lock); @@ -298,8 +358,8 @@ int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) { int ret = 0; - if (rb == NULL) - return -EINVAL; + assert(rb); + #ifdef __APPLE__ pthread_mutex_lock(rb->lock); #else @@ -314,8 +374,8 @@ int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) } ret = tail_el_ptr(rb)->index; - - *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1); + --rb->cntrs[tail_el_ptr(rb)->port_id]; + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); pthread_mutex_unlock(rb->lock); @@ -328,8 +388,7 @@ static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, struct timespec abstime; int ret = 0; - if (rb == NULL) - return -EINVAL; + assert(rb); if (timeout != NULL) { clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -380,6 +439,8 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, struct timespec abstime; int ret; + assert(rb); + if (set == NULL) return shm_ap_rbuff_peek_b_all(rb, timeout); @@ -453,12 +514,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) { struct rb_entry * e = NULL; - if (rb == NULL) - return NULL; + assert(rb); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); - #ifdef __APPLE__ pthread_mutex_lock(rb->lock); #else @@ -478,8 +537,9 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) #endif e = malloc(sizeof(*e)); if (e != NULL) { - *e = *(rb->shm_base + *rb->ptr_tail); - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + *e = *(rb->shm_base + *rb->tail); + --rb->cntrs[e->port_id]; + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); } pthread_cleanup_pop(true); @@ -499,14 +559,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) pthread_mutex_consistent(rb->lock); } #endif + if (rb->acl[port_id]) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { pthread_mutex_unlock(rb->lock); return -1; } idx = tail_el_ptr(rb)->index; - - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + --rb->cntrs[port_id]; + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); pthread_cond_broadcast(rb->del); pthread_mutex_unlock(rb->lock); @@ -522,6 +587,8 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, int ret = 0; ssize_t idx = -1; + assert(rb); + #ifdef __APPLE__ pthread_mutex_lock(rb->lock); #else @@ -530,7 +597,13 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, pthread_mutex_consistent(rb->lock); } #endif + if (rb->acl[port_id]) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + if (timeout != NULL) { + idx = -ETIMEDOUT; clock_gettime(PTHREAD_COND_CLOCK, &abstime); ts_add(&abstime, timeout, &abstime); } @@ -577,7 +650,8 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, if (ret != ETIMEDOUT) { idx = tail_el_ptr(rb)->index; - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + --rb->cntrs[port_id]; + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); pthread_cond_broadcast(rb->del); } @@ -589,11 +663,10 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) { - if (rb == NULL) - return; + assert(rb); pthread_mutex_lock(rb->lock); - *rb->ptr_tail = 0; - *rb->ptr_head = 0; + *rb->tail = 0; + *rb->head = 0; pthread_mutex_unlock(rb->lock); } |