From c69dd846c5aa2bed4db16961c5774a20cea7f828 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 11 Oct 2016 13:39:43 +0200 Subject: lib: Track SDUs in the fast path This will allow to finalize deallocating flows until all SDUs have been processed. Read and write calls will now block when a flow was deallocated. Replaces NULL checks in the fast path with asserts. --- include/ouroboros/shm_ap_rbuff.h | 6 ++ src/lib/dev.c | 29 +++++-- src/lib/shm_ap_rbuff.c | 169 ++++++++++++++++++++++++++++----------- src/lib/shm_rdrbuff.c | 64 +++++++-------- 4 files changed, 180 insertions(+), 88 deletions(-) diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index dd82c01c..1e45ef7f 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -44,6 +44,12 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb); void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb); +void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, + int port_id); + +void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, + int port_id); + int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e); diff --git a/src/lib/dev.c b/src/lib/dev.c index e20d23d4..c1f769ad 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -356,6 +356,8 @@ int flow_accept(char ** ae_name) ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; + shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -478,6 +480,8 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); + irm_msg__free_unpacked(recv_msg, NULL); return fd; @@ -559,6 +563,8 @@ int flow_dealloc(int fd) bmp_release(ai.fds, fd); + shm_ap_rbuff_close_port(ai.rb, msg.port_id); + pthread_rwlock_unlock(&ai.flows_lock); send_irm_msg(&msg); @@ -630,10 +636,10 @@ ssize_t flow_write(int fd, void * buf, size_t count) DU_BUFF_TAILSPACE, buf, count); - if (idx == -1) { + if (idx < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return -EAGAIN; + return -idx; } e.index = idx; @@ -643,7 +649,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return -1; + return -ENOTALLOC; } } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; @@ -664,8 +670,12 @@ ssize_t flow_write(int fd, void * buf, size_t count) e.index = idx; e.port_id = ai.flows[fd].port_id; - while (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) - ; + if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { + shm_rdrbuff_remove(ai.rdrb, e.index); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -ENOTALLOC; + } } pthread_rwlock_unlock(&ai.flows_lock); @@ -838,6 +848,8 @@ int np1_flow_alloc(pid_t n_api, int port_id) ai.ports[port_id].fd = fd; port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + shm_ap_rbuff_open_port(ai.rb, port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -867,6 +879,8 @@ int np1_flow_dealloc(int port_id) port_destroy(&ai.ports[port_id]); + shm_ap_rbuff_close_port(ai.rb, port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -902,6 +916,8 @@ int np1_flow_resp(pid_t n_api, int port_id) ai.flows[fd].rb = rb; + shm_ap_rbuff_open_port(ai.rb, port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1011,6 +1027,9 @@ int ipcp_flow_alloc_reply(int fd, int response) msg.has_response = true; msg.response = response; + if (response) + shm_ap_rbuff_open_port(ai.rb, msg.port_id); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index acbc81a6..ede0b7f7 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -39,28 +39,33 @@ #include #include #include +#include #define FN_MAX_CHARS 255 #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + + IRMD_MAX_FLOWS * sizeof(int8_t) \ + + IRMD_MAX_FLOWS * sizeof (ssize_t) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) -#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_BUFFER_SIZE - *rb->ptr_tail) \ +#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \ & (SHM_BUFFER_SIZE - 1)) #define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) -#define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->ptr_head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->ptr_tail) +#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) struct shm_ap_rbuff { - struct rb_entry * shm_base; /* start of entry */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * add; /* SDU arrived */ - pthread_cond_t * del; /* SDU removed */ - pid_t api; /* api to which this rb belongs */ + struct rb_entry * shm_base; /* start of entry */ + size_t * head; /* start of ringbuffer head */ + size_t * tail; /* start of ringbuffer tail */ + int8_t * acl; /* start of port_id access table */ + ssize_t * cntrs; /* start of port_id counters */ + pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_cond_t * add; /* SDU arrived */ + pthread_cond_t * del; /* SDU removed */ + pid_t api; /* api to which this rb belongs */ int fd; }; @@ -73,6 +78,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() pthread_condattr_t cattr; char fn[FN_MAX_CHARS]; mode_t mask; + int i; sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); @@ -125,9 +131,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() } rb->shm_base = shm_base; - rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->ptr_tail = rb->ptr_head + 1; - rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1); + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); + rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); rb->add = (pthread_cond_t *) (rb->lock + 1); rb->del = rb->add + 1; @@ -143,11 +151,16 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif + for (i = 0; i < IRMD_MAX_FLOWS; ++i) { + rb->cntrs[i] = 0; + rb->acl[i] = -1; + } + pthread_cond_init(rb->add, &cattr); pthread_cond_init(rb->del, &cattr); - *rb->ptr_head = 0; - *rb->ptr_tail = 0; + *rb->head = 0; + *rb->tail = 0; rb->fd = shm_fd; rb->api = getpid(); @@ -197,9 +210,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) } rb->shm_base = shm_base; - rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->ptr_tail = rb->ptr_head + 1; - rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1); + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); + rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); rb->add = (pthread_cond_t *) (rb->lock + 1); rb->del = rb->add + 1; @@ -211,10 +226,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) { - if (rb == NULL) { - LOG_DBG("Bogus input. Bugging out."); - return; - } + assert(rb); if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); @@ -225,15 +237,56 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) free(rb); } +void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id) +{ + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + +#ifdef OUROBOROS_CONFIG_DEBUG + if (!rb->acl[port_id]) + LOG_DBG("Trying to open open port."); +#endif + rb->acl[port_id] = 0; /* open */ + + pthread_mutex_unlock(rb->lock); +} + +void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) +{ + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif +#ifdef OUROBOROS_CONFIG_DEBUG + if (rb->acl[port_id]) + LOG_DBG("Trying to close closed port."); +#endif + rb->acl[port_id] = -1; + + pthread_mutex_unlock(rb->lock); +} + void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) { char fn[25]; struct lockfile * lf = NULL; - if (rb == NULL) { - LOG_DBG("Bogus input. Bugging out."); - return; - } + assert(rb); if (rb->api != getpid()) { lf = lockfile_open(); @@ -267,8 +320,8 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) { - if (rb == NULL || e == NULL) - return -1; + assert(rb); + assert(e); #ifdef __APPLE__ pthread_mutex_lock(rb->lock); @@ -278,6 +331,11 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) pthread_mutex_consistent(rb->lock); } #endif + if (rb->acl[e->port_id]) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + if (!shm_rbuff_free(rb)) { pthread_mutex_unlock(rb->lock); return -1; @@ -287,7 +345,9 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) pthread_cond_broadcast(rb->add); *head_el_ptr(rb) = *e; - *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1); + *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); + + ++rb->cntrs[e->port_id]; pthread_mutex_unlock(rb->lock); @@ -298,8 +358,8 @@ int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) { int ret = 0; - if (rb == NULL) - return -EINVAL; + assert(rb); + #ifdef __APPLE__ pthread_mutex_lock(rb->lock); #else @@ -314,8 +374,8 @@ int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) } ret = tail_el_ptr(rb)->index; - - *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1); + --rb->cntrs[tail_el_ptr(rb)->port_id]; + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); pthread_mutex_unlock(rb->lock); @@ -328,8 +388,7 @@ static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, struct timespec abstime; int ret = 0; - if (rb == NULL) - return -EINVAL; + assert(rb); if (timeout != NULL) { clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -380,6 +439,8 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, struct timespec abstime; int ret; + assert(rb); + if (set == NULL) return shm_ap_rbuff_peek_b_all(rb, timeout); @@ -453,12 +514,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) { struct rb_entry * e = NULL; - if (rb == NULL) - return NULL; + assert(rb); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); - #ifdef __APPLE__ pthread_mutex_lock(rb->lock); #else @@ -478,8 +537,9 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) #endif e = malloc(sizeof(*e)); if (e != NULL) { - *e = *(rb->shm_base + *rb->ptr_tail); - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + *e = *(rb->shm_base + *rb->tail); + --rb->cntrs[e->port_id]; + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); } pthread_cleanup_pop(true); @@ -499,14 +559,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) pthread_mutex_consistent(rb->lock); } #endif + if (rb->acl[port_id]) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { pthread_mutex_unlock(rb->lock); return -1; } idx = tail_el_ptr(rb)->index; - - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + --rb->cntrs[port_id]; + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); pthread_cond_broadcast(rb->del); pthread_mutex_unlock(rb->lock); @@ -522,6 +587,8 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, int ret = 0; ssize_t idx = -1; + assert(rb); + #ifdef __APPLE__ pthread_mutex_lock(rb->lock); #else @@ -530,7 +597,13 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, pthread_mutex_consistent(rb->lock); } #endif + if (rb->acl[port_id]) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + if (timeout != NULL) { + idx = -ETIMEDOUT; clock_gettime(PTHREAD_COND_CLOCK, &abstime); ts_add(&abstime, timeout, &abstime); } @@ -577,7 +650,8 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, if (ret != ETIMEDOUT) { idx = tail_el_ptr(rb)->index; - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + --rb->cntrs[port_id]; + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); pthread_cond_broadcast(rb->del); } @@ -589,11 +663,10 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) { - if (rb == NULL) - return; + assert(rb); pthread_mutex_lock(rb->lock); - *rb->ptr_tail = 0; - *rb->ptr_head = 0; + *rb->tail = 0; + *rb->head = 0; pthread_mutex_unlock(rb->lock); } diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index fb58a4d6..f6683dc2 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -35,6 +35,7 @@ #include #include #include +#include #define OUROBOROS_PREFIX "shm_rdrbuff" @@ -324,8 +325,8 @@ void * shm_rdrbuff_sanitize(void * o) pid_t api; - if (rdrb == NULL) - return (void *) -1; + assert(o); + #ifdef __APPLE__ pthread_mutex_lock(rdrb->lock); #else @@ -397,10 +398,7 @@ void * shm_rdrbuff_sanitize(void * o) void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) { - if (rdrb == NULL) { - LOG_DBGF("Bogus input. Bugging out."); - return; - } + assert(rdrb); if (close(rdrb->fd) < 0) LOG_DBGF("Couldn't close shared memory."); @@ -415,10 +413,7 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) { char * shm_rdrb_fn; - if (rdrb == NULL) { - LOG_DBGF("Bogus input. Bugging out."); - return; - } + assert(rdrb); if (getpid() != *rdrb->api && kill(*rdrb->api, 0) == 0) { LOG_DBG("Process %d tried to destroy active rdrb.", getpid()); @@ -460,10 +455,8 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, int sz = size + sizeof *sdb; uint8_t * write_pos; - if (rdrb == NULL || data == NULL) { - LOG_DBGF("Bogus input, bugging out."); - return -1; - } + assert(rdrb); + assert(data); #ifndef SHM_RDRB_MULTI_BLOCK if (sz > SHM_RDRB_BLOCK_SIZE) { @@ -534,11 +527,11 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, } ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, - pid_t dst_api, - size_t headspace, - size_t tailspace, - uint8_t * data, - size_t len) + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) { struct shm_du_buff * sdb; size_t size = headspace + len + tailspace; @@ -549,10 +542,8 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, int sz = size + sizeof *sdb; uint8_t * write_pos; - if (rdrb == NULL || data == NULL) { - LOG_DBGF("Bogus input, bugging out."); - return -1; - } + assert(rdrb); + assert(data); #ifndef SHM_RDRB_MULTI_BLOCK if (sz > SHM_RDRB_BLOCK_SIZE) { @@ -631,6 +622,9 @@ int shm_rdrbuff_read(uint8_t ** dst, size_t len = 0; struct shm_du_buff * sdb; + assert(dst); + assert(rdrb); + if (idx > SHM_BUFFER_SIZE) return -1; #ifdef __APPLE__ @@ -659,6 +653,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, ssize_t idx) { struct shm_du_buff * sdb; + assert(rdrb); + if (idx > SHM_BUFFER_SIZE) return NULL; #ifdef __APPLE__ @@ -683,6 +679,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, ssize_t idx) int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) { + assert(rdrb); + if (idx > SHM_BUFFER_SIZE) return -1; #ifdef __APPLE__ @@ -717,21 +715,21 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) size_t shm_du_buff_get_idx(struct shm_du_buff * sdb) { + assert(sdb); + return sdb->idx; } uint8_t * shm_du_buff_head(struct shm_du_buff * sdb) { - if (sdb == NULL) - return NULL; + assert(sdb); return (uint8_t *) (sdb + 1) + sdb->du_head; } uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb) { - if (sdb == NULL) - return NULL; + assert(sdb); return (uint8_t *) (sdb + 1) + sdb->du_tail; } @@ -741,8 +739,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, { uint8_t * buf = NULL; - if (sdb == NULL) - return NULL; + assert(sdb); if ((long) (sdb->du_head - size) < 0) { LOG_ERR("Failed to allocate PCI headspace."); @@ -761,8 +758,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, { uint8_t * buf = NULL; - if (sdb == NULL) - return NULL; + assert(sdb); if (sdb->du_tail + size >= sdb->size) { LOG_ERR("Failed to allocate PCI tailspace."); @@ -779,8 +775,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, int shm_du_buff_head_release(struct shm_du_buff * sdb, size_t size) { - if (sdb == NULL) - return -1; + assert(sdb); if (size > sdb->du_tail - sdb->du_head) { LOG_DBGF("Tried to release beyond SDU boundary."); @@ -795,8 +790,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb, int shm_du_buff_tail_release(struct shm_du_buff * sdb, size_t size) { - if (sdb == NULL) - return -1; + assert(sdb); if (size > sdb->du_tail - sdb->du_head) { LOG_ERR("Tried to release beyond SDU boundary."); -- cgit v1.2.3