diff options
author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-10-26 18:17:36 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-10-26 18:17:36 +0000 |
commit | 6c2164a59ce4d3ed91a65326ac89bb247e9f622f (patch) | |
tree | 7f9a78e0d57f95d903bcbbf01a00e71482593277 /src/lib | |
parent | 7848ec4100f8677392fb6b07c42dd47ee6aa9b0d (diff) | |
parent | 963537079c7d5a9f9fb39355fb0e3b84a78eaa0b (diff) | |
download | ouroboros-6c2164a59ce4d3ed91a65326ac89bb247e9f622f.tar.gz ouroboros-6c2164a59ce4d3ed91a65326ac89bb247e9f622f.zip |
Merged in dstaesse/ouroboros/be-more_flow_alloc (pull request #285)
lib, ipcpd: Further stabilization of flows
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/dev.c | 55 | ||||
-rw-r--r-- | src/lib/shm_rbuff.c | 47 |
2 files changed, 69 insertions, 33 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 018cb692..a0c47403 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -612,12 +612,6 @@ int flow_dealloc(int fd) return -ENOTALLOC; } - if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -EBUSY; - } - msg.port_id = ai.flows[fd].port_id; pthread_rwlock_unlock(&ai.flows_lock); @@ -1285,11 +1279,7 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return -EPERM; } - if (ai.flows[fd].tx_rb == NULL) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -EPERM; - } + assert(ai.flows[fd].tx_rb); idx = shm_du_buff_get_idx(sdb); @@ -1302,9 +1292,38 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return 0; } +int ipcp_flow_fini(int fd) +{ + struct shm_rbuff * rb; + + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + rb = ai.flows[fd].rx_rb; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + shm_rbuff_fini(rb); + + return 0; +} + ssize_t local_flow_read(int fd) { - return shm_rbuff_read(ai.flows[fd].rx_rb); + ssize_t ret; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + ret = shm_rbuff_read(ai.flows[fd].rx_rb); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return ret; } int local_flow_write(int fd, size_t idx) @@ -1315,11 +1334,7 @@ int local_flow_write(int fd, size_t idx) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].tx_rb == NULL) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -EPERM; - } + assert(ai.flows[fd].tx_rb); shm_rbuff_write(ai.flows[fd].tx_rb, idx); @@ -1338,11 +1353,7 @@ int ipcp_read_shim(int fd, struct shm_du_buff ** sdb) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].rx_rb == NULL) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -EPERM; - } + assert(ai.flows[fd].rx_rb); idx = shm_rbuff_read(ai.flows[fd].rx_rb); if (idx < 0) { diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 8b2e9229..301669e7 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -43,6 +43,8 @@ #include <stdbool.h> #define FN_MAX_CHARS 255 +#define RB_CLOSED -1 +#define RB_OPEN 0 #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \ + 2 * sizeof(size_t) + sizeof(int8_t) \ @@ -144,7 +146,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) pthread_cond_init(rb->add, &cattr); pthread_cond_init(rb->del, &cattr); - *rb->acl = 0; + *rb->acl = RB_OPEN; *rb->head = 0; *rb->tail = 0; @@ -291,6 +293,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) ret = *tail_el_ptr(rb); *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); + pthread_cond_broadcast(rb->del); pthread_mutex_unlock(rb->lock); @@ -353,10 +356,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, return idx; } -int shm_rbuff_block(struct shm_rbuff * rb) +void shm_rbuff_block(struct shm_rbuff * rb) { - int ret = 0; - assert(rb); #ifdef __APPLE__ @@ -367,14 +368,9 @@ int shm_rbuff_block(struct shm_rbuff * rb) pthread_mutex_consistent(rb->lock); } #endif - *rb->acl = -1; - - if (!shm_rbuff_empty(rb)) - ret = -EBUSY; + *rb->acl = RB_CLOSED; pthread_mutex_unlock(rb->lock); - - return ret; } void shm_rbuff_unblock(struct shm_rbuff * rb) @@ -389,11 +385,40 @@ void shm_rbuff_unblock(struct shm_rbuff * rb) pthread_mutex_consistent(rb->lock); } #endif - *rb->acl = 0; /* open */ + *rb->acl = RB_OPEN; pthread_mutex_unlock(rb->lock); } +void shm_rbuff_fini(struct shm_rbuff * rb) +{ + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + assert(*rb->acl == RB_CLOSED); + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (!shm_rbuff_empty(rb)) +#ifdef __APPLE__ + pthread_cond_wait(rb->del, rb->lock); +#else + if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + pthread_cleanup_pop(true); +} + void shm_rbuff_reset(struct shm_rbuff * rb) { assert(rb); |