diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-10-26 18:17:36 +0000 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-10-26 18:17:36 +0000 | 
| commit | 6c2164a59ce4d3ed91a65326ac89bb247e9f622f (patch) | |
| tree | 7f9a78e0d57f95d903bcbbf01a00e71482593277 /src/lib | |
| parent | 7848ec4100f8677392fb6b07c42dd47ee6aa9b0d (diff) | |
| parent | 963537079c7d5a9f9fb39355fb0e3b84a78eaa0b (diff) | |
| download | ouroboros-6c2164a59ce4d3ed91a65326ac89bb247e9f622f.tar.gz ouroboros-6c2164a59ce4d3ed91a65326ac89bb247e9f622f.zip | |
Merged in dstaesse/ouroboros/be-more_flow_alloc (pull request #285)
lib, ipcpd: Further stabilization of flows
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 55 | ||||
| -rw-r--r-- | src/lib/shm_rbuff.c | 47 | 
2 files changed, 69 insertions, 33 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index 018cb692..a0c47403 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -612,12 +612,6 @@ int flow_dealloc(int fd)                  return -ENOTALLOC;          } -        if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -EBUSY; -        } -          msg.port_id = ai.flows[fd].port_id;          pthread_rwlock_unlock(&ai.flows_lock); @@ -1285,11 +1279,7 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)                  return -EPERM;          } -        if (ai.flows[fd].tx_rb == NULL) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -EPERM; -        } +        assert(ai.flows[fd].tx_rb);          idx = shm_du_buff_get_idx(sdb); @@ -1302,9 +1292,38 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)          return 0;  } +int ipcp_flow_fini(int fd) +{ +        struct shm_rbuff * rb; + +        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        rb = ai.flows[fd].rx_rb; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        shm_rbuff_fini(rb); + +        return 0; +} +  ssize_t local_flow_read(int fd)  { -        return shm_rbuff_read(ai.flows[fd].rx_rb); +        ssize_t ret; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        ret = shm_rbuff_read(ai.flows[fd].rx_rb); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return ret;  }  int local_flow_write(int fd, size_t idx) @@ -1315,11 +1334,7 @@ int local_flow_write(int fd, size_t idx)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai.flows[fd].tx_rb == NULL) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -EPERM; -        } +        assert(ai.flows[fd].tx_rb);          shm_rbuff_write(ai.flows[fd].tx_rb, idx); @@ -1338,11 +1353,7 @@ int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai.flows[fd].rx_rb == NULL) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -EPERM; -        } +        assert(ai.flows[fd].rx_rb);          idx = shm_rbuff_read(ai.flows[fd].rx_rb);          if (idx < 0) { diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 8b2e9229..301669e7 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -43,6 +43,8 @@  #include <stdbool.h>  #define FN_MAX_CHARS 255 +#define RB_CLOSED -1 +#define RB_OPEN 0  #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t)          \                               + 2 * sizeof(size_t) + sizeof(int8_t)      \ @@ -144,7 +146,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          pthread_cond_init(rb->add, &cattr);          pthread_cond_init(rb->del, &cattr); -        *rb->acl = 0; +        *rb->acl = RB_OPEN;          *rb->head = 0;          *rb->tail = 0; @@ -291,6 +293,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          ret = *tail_el_ptr(rb);          *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); +        pthread_cond_broadcast(rb->del);          pthread_mutex_unlock(rb->lock); @@ -353,10 +356,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          return idx;  } -int shm_rbuff_block(struct shm_rbuff * rb) +void shm_rbuff_block(struct shm_rbuff * rb)  { -        int ret = 0; -          assert(rb);  #ifdef __APPLE__ @@ -367,14 +368,9 @@ int shm_rbuff_block(struct shm_rbuff * rb)                  pthread_mutex_consistent(rb->lock);          }  #endif -        *rb->acl = -1; - -        if (!shm_rbuff_empty(rb)) -                ret = -EBUSY; +        *rb->acl = RB_CLOSED;          pthread_mutex_unlock(rb->lock); - -        return ret;  }  void shm_rbuff_unblock(struct shm_rbuff * rb) @@ -389,11 +385,40 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)                  pthread_mutex_consistent(rb->lock);          }  #endif -        *rb->acl = 0; /* open */ +        *rb->acl = RB_OPEN;          pthread_mutex_unlock(rb->lock);  } +void shm_rbuff_fini(struct shm_rbuff * rb) +{ +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        assert(*rb->acl == RB_CLOSED); + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while (!shm_rbuff_empty(rb)) +#ifdef __APPLE__ +                pthread_cond_wait(rb->del, rb->lock); +#else +                if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) { +                        LOG_DBG("Recovering dead mutex."); +                        pthread_mutex_consistent(rb->lock); +                } +#endif +        pthread_cleanup_pop(true); +} +  void shm_rbuff_reset(struct shm_rbuff * rb)  {          assert(rb); | 
