From b8b05e3b1980146ab8acb40cbe77e0271634c688 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 15 Oct 2016 19:30:27 +0200 Subject: lib: Stabilize fast flow deallocation over local IPCP --- include/ouroboros/local-dev.h | 7 +++---- include/ouroboros/shm_ap_rbuff.h | 2 +- src/ipcpd/local/main.c | 35 +++++++++++++++++++---------------- src/lib/dev.c | 32 ++++++++++++++++++++------------ src/lib/shm_ap_rbuff.c | 31 ++++++++++--------------------- 5 files changed, 53 insertions(+), 54 deletions(-) diff --git a/include/ouroboros/local-dev.h b/include/ouroboros/local-dev.h index b4915672..77ff47e9 100644 --- a/include/ouroboros/local-dev.h +++ b/include/ouroboros/local-dev.h @@ -25,10 +25,9 @@ #ifndef OUROBOROS_LOCAL_DEV_H #define OUROBOROS_LOCAL_DEV_H -/* returns flow descriptor and rb_entry, no access to du_buff */ -int local_flow_read(struct rb_entry * e); +struct rb_entry * local_flow_read(int fd); -int local_flow_write(int fd, - struct rb_entry * e); +int local_flow_write(int fd, + struct rb_entry * e); #endif /* OUROBOROS_LOCAL_DEV_H */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index 1e45ef7f..453e4bf8 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -47,7 +47,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb); void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id); -void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, +int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id); int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 7d23c08d..b8b3335c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -24,6 +24,7 @@ #include "ipcp.h" #include #include +#include #include #include #define OUROBOROS_PREFIX "ipcpd/local" @@ -66,8 +67,10 @@ void local_data_fini() static void * ipcp_local_sdu_loop(void * o) { while (true) { - struct rb_entry e; - int fd = local_flow_read(&e); + int fd; + struct rb_entry * e; + + fd = flow_select(NULL, NULL); pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -77,13 +80,18 @@ static void * ipcp_local_sdu_loop(void * o) } pthread_rwlock_rdlock(&local_data.lock); + + e = local_flow_read(fd); + fd = local_data.in_out[fd]; - pthread_rwlock_unlock(&local_data.lock); if (fd != -1) - local_flow_write(fd, &e); + local_flow_write(fd, e); + pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); + + free(e); } return (void *) 1; @@ -209,8 +217,6 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) int out_fd = -1; int ret = -1; - LOG_DBG("Received response for fd %d: %d.", fd, response); - if (response) return 0; @@ -235,25 +241,22 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) static int ipcp_local_flow_dealloc(int fd) { - int out_fd = -1; + struct timespec t = {0, 10000}; - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_wrlock(&local_data.lock); + if (fd < 0) + return -EINVAL; - out_fd = local_data.in_out[fd]; + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); - if (out_fd != -1) { - local_data.in_out[out_fd] = -1; - flow_dealloc(out_fd); - } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&local_data.lock); local_data.in_out[fd] = -1; pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); - LOG_INFO("Flow with fd %d deallocated.", fd); return 0; diff --git a/src/lib/dev.c b/src/lib/dev.c index 577fa7a7..a3082a7d 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -548,6 +548,12 @@ int flow_dealloc(int fd) return -ENOTALLOC; } + if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EBUSY; + } + msg.port_id = ai.flows[fd].port_id; port_destroy(&ai.ports[msg.port_id]); @@ -563,8 +569,6 @@ int flow_dealloc(int fd) bmp_release(ai.fds, fd); - shm_ap_rbuff_close_port(ai.rb, msg.port_id); - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1091,26 +1095,32 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return 0; } -int local_flow_read(struct rb_entry * e) +struct rb_entry * local_flow_read(int fd) { - int fd; - - *e = *(shm_ap_rbuff_read(ai.rb)); + int port_id; + struct rb_entry * e = NULL; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - fd = ai.ports[e->port_id].fd; + port_id = ai.flows[fd].port_id; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return fd; + if (port_id != -1) { + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + e->index = shm_ap_rbuff_read_port(ai.rb, port_id); + } + + return e; } int local_flow_write(int fd, struct rb_entry * e) { - if (e == NULL) + if (e == NULL || fd < 0) return -EINVAL; pthread_rwlock_rdlock(&ai.data_lock); @@ -1135,9 +1145,7 @@ int local_flow_write(int fd, struct rb_entry * e) int ipcp_read_shim(struct shm_du_buff ** sdb) { int fd; - struct rb_entry * e; - - e = shm_ap_rbuff_read(ai.rb); + struct rb_entry * e = shm_ap_rbuff_read(ai.rb); pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index ede0b7f7..5cbf5bd0 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -249,18 +249,14 @@ void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id) pthread_mutex_consistent(rb->lock); } #endif - -#ifdef OUROBOROS_CONFIG_DEBUG - if (!rb->acl[port_id]) - LOG_DBG("Trying to open open port."); -#endif rb->acl[port_id] = 0; /* open */ pthread_mutex_unlock(rb->lock); } -void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) +int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) { + int ret = 0; assert(rb); @@ -271,14 +267,15 @@ void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) LOG_DBG("Recovering dead mutex."); pthread_mutex_consistent(rb->lock); } -#endif -#ifdef OUROBOROS_CONFIG_DEBUG - if (rb->acl[port_id]) - LOG_DBG("Trying to close closed port."); #endif rb->acl[port_id] = -1; + if (rb->cntrs[port_id] > 0) + ret = -EBUSY; + pthread_mutex_unlock(rb->lock); + + return ret; } void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) @@ -551,6 +548,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) { ssize_t idx = -1; + assert(rb); + #ifdef __APPLE__ pthread_mutex_lock(rb->lock); #else @@ -559,11 +558,6 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) pthread_mutex_consistent(rb->lock); } #endif - if (rb->acl[port_id]) { - pthread_mutex_unlock(rb->lock); - return -ENOTALLOC; - } - if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { pthread_mutex_unlock(rb->lock); return -1; @@ -597,11 +591,6 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, pthread_mutex_consistent(rb->lock); } #endif - if (rb->acl[port_id]) { - pthread_mutex_unlock(rb->lock); - return -ENOTALLOC; - } - if (timeout != NULL) { idx = -ETIMEDOUT; clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -650,7 +639,7 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, if (ret != ETIMEDOUT) { idx = tail_el_ptr(rb)->index; - --rb->cntrs[port_id]; + --rb->cntrs[port_id]; *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); pthread_cond_broadcast(rb->del); -- cgit v1.2.3