diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-07-02 19:15:26 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-07-02 19:15:26 +0200 | 
| commit | de63f8b37f82ef6a760c7d3dafe2251160e2c114 (patch) | |
| tree | cd79dba391c0ded80125836069d8187a22f7e5f5 /src/lib | |
| parent | cd4d09aae14afe7b0aa0890c61b0ad43e4f23b28 (diff) | |
| parent | 79475a4742bc28e1737044f2300bcb601e6e10bf (diff) | |
| download | ouroboros-de63f8b37f82ef6a760c7d3dafe2251160e2c114.tar.gz ouroboros-de63f8b37f82ef6a760c7d3dafe2251160e2c114.zip | |
Merged in dstaesse/ouroboros/be-shm-robust (pull request #147)
lib: robust locking in shared memory and crash recovery
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 42 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 70 | ||||
| -rw-r--r-- | src/lib/shm_du_map.c | 257 | 
3 files changed, 279 insertions, 90 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index ac995b2d..19bc90e5 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -40,7 +40,7 @@ struct flow {          int                   port_id;          int                   oflags; -        /* don't think this needs locking */ +        pid_t                 api;  };  struct ap_data { @@ -93,6 +93,7 @@ int ap_init(char * ap_name)          for (i = 0; i < AP_MAX_FLOWS; ++i) {                  _ap_instance->flows[i].rb = NULL;                  _ap_instance->flows[i].port_id = -1; +                _ap_instance->flows[i].api = 0; /* API_INVALID */          }          pthread_rwlock_init(&_ap_instance->flows_lock, NULL); @@ -319,6 +320,8 @@ int flow_alloc(char * dst_name,          _ap_instance->flows[fd].port_id = recv_msg->port_id;          _ap_instance->flows[fd].oflags  = FLOW_O_DEFAULT; +        _ap_instance->flows[fd].api     = +                shm_ap_rbuff_get_api(_ap_instance->flows[fd].rb);          pthread_rwlock_unlock(&_ap_instance->flows_lock);          pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -349,7 +352,7 @@ int flow_alloc_res(int fd)                  return -ENOTALLOC;          } -        msg.port_id      = _ap_instance->flows[fd].port_id; +        msg.port_id = _ap_instance->flows[fd].port_id;          pthread_rwlock_unlock(&_ap_instance->flows_lock);          pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -389,11 +392,12 @@ int flow_dealloc(int fd)                  return -ENOTALLOC;          } -        msg.port_id      = _ap_instance->flows[fd].port_id; +        msg.port_id = _ap_instance->flows[fd].port_id;          _ap_instance->flows[fd].port_id = -1;          shm_ap_rbuff_close(_ap_instance->flows[fd].rb);          _ap_instance->flows[fd].rb = NULL; +        _ap_instance->flows[fd].api = 0;          bmp_release(_ap_instance->fds, fd); @@ -476,12 +480,12 @@ ssize_t flow_write(int fd, void * buf, size_t count)          }          if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { -                index = shm_create_du_buff(_ap_instance->dum, -                                           count + DU_BUFF_HEADSPACE + -                                           DU_BUFF_TAILSPACE, -                                           DU_BUFF_HEADSPACE, -                                           (uint8_t *) buf, -                                           count); +                index = shm_du_map_write(_ap_instance->dum, +                                         _ap_instance->flows[fd].api, +                                         DU_BUFF_HEADSPACE, +                                         DU_BUFF_TAILSPACE, +                                         (uint8_t *) buf, +                                         count);                  if (index == -1) {                          pthread_rwlock_unlock(&_ap_instance->flows_lock);                          pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -492,18 +496,18 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  e.port_id = _ap_instance->flows[fd].port_id;                  if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { -                        shm_release_du_buff(_ap_instance->dum, index); +                        shm_du_map_remove(_ap_instance->dum, index);                          pthread_rwlock_unlock(&_ap_instance->flows_lock);                          pthread_rwlock_unlock(&_ap_instance->data_lock);                          return -1;                  }          } else { /* blocking */ -                while ((index = shm_create_du_buff(_ap_instance->dum, -                                                   count + DU_BUFF_HEADSPACE + -                                                   DU_BUFF_TAILSPACE, -                                                   DU_BUFF_HEADSPACE, -                                                   (uint8_t *) buf, -                                                   count)) < 0) +                while ((index = shm_du_map_write(_ap_instance->dum, +                                                 _ap_instance->flows[fd].api, +                                                 DU_BUFF_HEADSPACE, +                                                 DU_BUFF_TAILSPACE, +                                                 (uint8_t *) buf, +                                                 count)) < 0)                          ;                  e.index   = index; @@ -555,9 +559,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)                  return -EAGAIN;          } -        n = shm_du_map_read_sdu(&sdu, -                                _ap_instance->dum, -                                idx); +        n = shm_du_map_read(&sdu, _ap_instance->dum, idx);          if (n < 0) {                  pthread_rwlock_unlock(&_ap_instance->data_lock);                  return -1; @@ -565,7 +567,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)          memcpy(buf, sdu, MIN(n, count)); -        shm_release_du_buff(_ap_instance->dum, idx); +        shm_du_map_remove(_ap_instance->dum, idx);          pthread_rwlock_unlock(&_ap_instance->data_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 69e96c40..f54627b7 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -26,6 +26,7 @@  #include <ouroboros/logs.h>  #include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_du_map.h>  #include <pthread.h>  #include <sys/mman.h> @@ -34,7 +35,6 @@  #include <string.h>  #include <stdint.h>  #include <unistd.h> -#include <stdbool.h>  #include <errno.h>  #include <sys/stat.h> @@ -127,6 +127,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()          rb->work      = (pthread_cond_t *) (rb->shm_mutex + 1);          pthread_mutexattr_init(&mattr); +        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);          pthread_mutex_init(rb->shm_mutex, &mattr); @@ -213,6 +214,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)  void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)  {          char fn[25]; +        struct shm_du_map * dum = NULL;          if (rb == NULL) {                  LOG_DBGF("Bogus input. Bugging out."); @@ -220,8 +222,17 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)          }          if (rb->api != getpid()) { -                LOG_ERR("Tried to destroy other AP's rbuff."); -                return; +                dum = shm_du_map_open(); +                if (shm_du_map_owner(dum) == getpid()) { +                        LOG_DBGF("Ringbuffer %d destroyed by IRMd %d.", +                                 rb->api, getpid()); +                        shm_du_map_close(dum); +                } else { +                        LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", +                                getpid(), rb->api); +                        shm_du_map_close(dum); +                        return; +                }          }          if (close(rb->fd) < 0) @@ -243,12 +254,16 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)          if (rb == NULL || e == NULL)                  return -1; -        pthread_mutex_lock(rb->shm_mutex); +        if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rb->shm_mutex); +        }          if (!shm_rbuff_free(rb)) {                  pthread_mutex_unlock(rb->shm_mutex);                  return -1;          } +          if (shm_rbuff_empty(rb))                  pthread_cond_broadcast(rb->work); @@ -269,10 +284,21 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) rb->shm_mutex); -        pthread_mutex_lock(rb->shm_mutex); + +        if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rb->shm_mutex); +        } + +        while (tail_el_ptr->port_id < 0) +                *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);          while(shm_rbuff_empty(rb)) -                pthread_cond_wait(rb->work, rb->shm_mutex); +                if (pthread_cond_wait(rb->work, rb->shm_mutex) +                    == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rb->shm_mutex); +        }          e = malloc(sizeof(*e));          if (e == NULL) { @@ -293,13 +319,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)  {          ssize_t idx = -1; -        pthread_mutex_lock(rb->shm_mutex); +        if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rb->shm_mutex); +        }          if (shm_rbuff_empty(rb)) {                  pthread_mutex_unlock(rb->shm_mutex);                  return -1;          } +        while (tail_el_ptr->port_id < 0) +                *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); +          if (tail_el_ptr->port_id != port_id) {                  pthread_mutex_unlock(rb->shm_mutex);                  return -1; @@ -313,3 +345,27 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)          return idx;  } + +pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff *rb) +{ +        pid_t api = 0; +        if (rb == NULL) +                return 0; + +        pthread_mutex_lock(rb->shm_mutex); +        api = rb->api; +        pthread_mutex_unlock(rb->shm_mutex); + +        return api; +} + +void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) +{ +        if (rb == NULL) +                return; + +        pthread_mutex_lock(rb->shm_mutex); +        *rb->ptr_tail = 0; +        *rb->ptr_head = 0; +        pthread_mutex_unlock(rb->shm_mutex); +} diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index 2a316265..24adac1a 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -23,11 +23,14 @@  #include <ouroboros/config.h>  #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/time_utils.h>  #include <pthread.h>  #include <sys/mman.h>  #include <fcntl.h>  #include <stdlib.h>  #include <string.h> +#include <signal.h>  #include <sys/stat.h>  #define OUROBOROS_PREFIX "shm_du_map" @@ -35,8 +38,8 @@  #include <ouroboros/logs.h>  #define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof (size_t)                   \ -                       + sizeof(pthread_mutex_t) + sizeof(pthread_cond_t)      \ +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t)                   \ +                       + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t)  \                         + sizeof(pid_t))  #define get_head_ptr(dum)                                                      \ @@ -57,6 +60,8 @@                            & (SHM_BLOCKS_IN_MAP - 1))  #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) +  #define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail -            \                              idx_to_du_buff_ptr(dum, idx)->du_head) @@ -66,7 +71,7 @@ struct shm_du_buff {          size_t size;          size_t du_head;          size_t du_tail; -        size_t garbage; +        pid_t  dst_api;  };  struct shm_du_map { @@ -74,11 +79,80 @@ struct shm_du_map {          size_t *          ptr_head;    /* start of ringbuffer head */          size_t *          ptr_tail;    /* start of ringbuffer tail */          pthread_mutex_t * shm_mutex;   /* lock all free space in shm */ -        pthread_cond_t *  sanitize;    /* run sanitizer when buffer full */ +        size_t *          choked;      /* stale sdu detection */ +        pthread_cond_t *  healthy;     /* du map is healthy */ +        pthread_cond_t *  full;        /* run sanitizer when buffer full */          pid_t *           api;         /* api of the irmd owner */          int               fd;  }; +static void garbage_collect(struct shm_du_map * dum) +{ +#ifndef SHM_MAP_SINGLE_BLOCK +        long sz; +        long blocks; +#endif +        while (get_tail_ptr(dum)->dst_api == 0 && +               !shm_map_empty(dum)) { +#ifndef SHM_MAP_SINGLE_BLOCK +                blocks = 0; +                sz = get_tail_ptr(dum)->size + +                        (long) sizeof(struct shm_du_buff); +                while (sz > 0) { +                        sz -= SHM_DU_BUFF_BLOCK_SIZE; +                        ++blocks; +                } + +                *dum->ptr_tail = +                        (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); + +#else +                *dum->ptr_tail = +                        (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); +#endif +        } +} + +static void clean_sdus(struct shm_du_map * dum, pid_t api) +{ +        size_t idx = *dum->ptr_tail; +        struct shm_du_buff * buf; +#ifndef SHM_DU_MAP_SINGLE_BLOCK +        long sz; +        long blocks = 0; +#endif + +        while (idx != *dum->ptr_head) { +                buf = idx_to_du_buff_ptr(dum, idx); +                if (buf->dst_api == api) +                        buf->dst_api = 0; + +#ifndef SHM_DU_MAP_SINGLE_BLOCK +                blocks = 0; +                sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff); +                while (sz > 0) { +                        sz -= SHM_DU_BUFF_BLOCK_SIZE; +                        ++blocks; +                } + +                idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1); +#else +                idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1); +#endif +        } + +        garbage_collect(dum); + +        if (kill(api, 0) == 0) { +                struct shm_ap_rbuff * rb; +                rb = shm_ap_rbuff_open(api); +                shm_ap_rbuff_reset(rb); +                shm_ap_rbuff_close(rb); +        } + +        *dum->choked = 0; +} +  struct shm_du_map * shm_du_map_create()  {          struct shm_du_map * dum; @@ -140,8 +214,10 @@ struct shm_du_map * shm_du_map_create()                  ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);          dum->ptr_tail = dum->ptr_head + 1;          dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); -        dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1); -        dum->api = (pid_t *) (dum->sanitize + 1); +        dum->choked = (size_t *) (dum->shm_mutex + 1); +        dum->healthy = (pthread_cond_t *) (dum->choked + 1); +        dum->full = dum->healthy + 1; +        dum->api = (pid_t *) (dum->full + 1);          pthread_mutexattr_init(&mattr);          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -150,11 +226,14 @@ struct shm_du_map * shm_du_map_create()          pthread_condattr_init(&cattr);          pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); -        pthread_cond_init(dum->sanitize, &cattr); +        pthread_cond_init(dum->full, &cattr); +        pthread_cond_init(dum->healthy, &cattr);          *dum->ptr_head = 0;          *dum->ptr_tail = 0; +        *dum->choked = 0; +          *dum->api = getpid();          dum->fd = shm_fd; @@ -202,8 +281,10 @@ struct shm_du_map * shm_du_map_open()                  ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);          dum->ptr_tail = dum->ptr_head + 1;          dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); -        dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1); -        dum->api = (pid_t *) (dum->sanitize + 1); +        dum->choked = (size_t *) (dum->shm_mutex + 1); +        dum->healthy = (pthread_cond_t *) (dum->choked + 1); +        dum->full = dum->healthy + 1; +        dum->api = (pid_t *) (dum->full + 1);          dum->fd = shm_fd; @@ -212,12 +293,73 @@ struct shm_du_map * shm_du_map_open()  pid_t shm_du_map_owner(struct shm_du_map * dum)  { +        if (dum == NULL) +                return 0; +          return *dum->api;  }  void * shm_du_map_sanitize(void * o)  { -        LOG_MISSING; +        struct shm_du_map * dum = (struct shm_du_map *) o; +        struct timespec intv +                = {SHM_DU_TIMEOUT_MICROS / MILLION, +                   (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; + +        pid_t   api; + +        if (dum == NULL) +                return (void *) -1; +        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +                LOG_WARN("Recovering dead mutex."); +                pthread_mutex_consistent(dum->shm_mutex); +        } + +        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, +                             (void *) dum->shm_mutex); + +        while (true) { +                int ret = 0; +                if (pthread_cond_wait(dum->full, dum->shm_mutex) +                        == EOWNERDEAD) { +                        LOG_WARN("Recovering dead mutex."); +                        pthread_mutex_consistent(dum->shm_mutex); +                } + +                *dum->choked = 1; + +                api = get_tail_ptr(dum)->dst_api; + +                if (kill(api, 0) == 0) { +                        struct timespec now; +                        struct timespec dl; +                        clock_gettime(CLOCK_REALTIME, &now); +                        ts_add(&now, &intv, &dl); +                        while (*dum->choked) { +                                ret = pthread_cond_timedwait(dum->healthy, +                                                                 dum->shm_mutex, +                                                                 &dl); +                                if (!ret) +                                        continue; + +                                if (ret == EOWNERDEAD) { +                                        LOG_WARN("Recovering dead mutex."); +                                        pthread_mutex_consistent(dum->shm_mutex); +                                } + +                                if (ret == ETIMEDOUT) { +                                        LOG_DBGF("SDU timed out."); +                                        clean_sdus(dum, api); +                                } +                        } +                } else { +                        LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret); +                        clean_sdus(dum, api); +                } +        } + +        pthread_cleanup_pop(true); +          return (void *) 0;  } @@ -256,21 +398,23 @@ void shm_du_map_destroy(struct shm_du_map * dum)          free(dum);  } -ssize_t shm_create_du_buff(struct shm_du_map * dum, -                           size_t              size, -                           size_t              headspace, -                           uint8_t *           data, -                           size_t              len) +ssize_t shm_du_map_write(struct shm_du_map * dum, +                         pid_t               dst_api, +                         size_t              headspace, +                         size_t              tailspace, +                         uint8_t *           data, +                         size_t              len)  {          struct shm_du_buff * sdb;  #ifndef SHM_MAP_SINGLE_BLOCK          long                 blocks = 0; -        int                  sz = size + sizeof *sdb; -        int                  sz2 = headspace + len + sizeof *sdb; +        size_t               size = headspace + len + tailspace; +        int                  sz = headspace + len + sizeof *sdb; +        int                  sz2 = sz + tailspace;          size_t               copy_len;  #endif          uint8_t *            write_pos; -        ssize_t              index; +        ssize_t              index = -1;          if (dum == NULL || data == NULL) {                  LOG_DBGF("Bogus input, bugging out."); @@ -294,36 +438,35 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,                  return -1;          } -        pthread_mutex_lock(dum->shm_mutex); +        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(dum->shm_mutex); +        }  #ifndef SHM_MAP_SINGLE_BLOCK -        while (sz > 0) { -                sz -= SHM_DU_BUFF_BLOCK_SIZE; +        while (sz2 > 0) {                  sz2 -= SHM_DU_BUFF_BLOCK_SIZE; -                if (sz2 < 0 && sz > 0) { +                sz -= SHM_DU_BUFF_BLOCK_SIZE; +                if (sz < 0 && sz2 > 0) {                          pthread_mutex_unlock(dum->shm_mutex); -                        LOG_DBG("Can't handle this packet now"); -                        return -1; +                        LOG_DBG("Can't handle this packet now."); +                        return -EAGAIN;                  }                  ++blocks;          }          if (!shm_map_free(dum, blocks)) { -                pthread_mutex_unlock(dum->shm_mutex); -                pthread_cond_signal(dum->sanitize); -                return -1; -        }  #else          if (!shm_map_free(dum, 1)) { +#endif +                pthread_cond_signal(dum->full);                  pthread_mutex_unlock(dum->shm_mutex); -                ptrhead_cond_signal(dum->sanitize);                  return -1;          } -#endif          sdb = get_head_ptr(dum);          sdb->size = size; -        sdb->garbage = 0; +        sdb->dst_api = dst_api;          sdb->du_head = headspace;          sdb->du_tail = sdb->du_head + len; @@ -333,7 +476,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,          copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb);          while (blocks > 0) {                  memcpy(write_pos, data, copy_len); -                *(dum->ptr_head) = (*dum->ptr_head + 1) +                *dum->ptr_head = (*dum->ptr_head + 1)                          & (SHM_BLOCKS_IN_MAP - 1);                  len -= copy_len;                  copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE); @@ -346,7 +489,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,  #else          memcpy(write_pos, data, len);          index = *dum->ptr_head; -        *(dum->ptr_head) = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); +        *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);  #endif          pthread_mutex_unlock(dum->shm_mutex); @@ -354,18 +497,21 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,  }  /* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ -int shm_du_map_read_sdu(uint8_t **          dst, -                        struct shm_du_map * dum, -                        ssize_t             idx) +int shm_du_map_read(uint8_t **          dst, +                    struct shm_du_map * dum, +                    ssize_t             idx)  {          size_t len = 0;          if (idx > SHM_BLOCKS_IN_MAP)                  return -1; -        pthread_mutex_lock(dum->shm_mutex); +        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(dum->shm_mutex); +        } -        if (*dum->ptr_head == *dum->ptr_tail) { +        if (shm_map_empty(dum)) {                  pthread_mutex_unlock(dum->shm_mutex);                  return -1;          } @@ -380,47 +526,32 @@ int shm_du_map_read_sdu(uint8_t **          dst,          return len;  } -int shm_release_du_buff(struct shm_du_map * dum, ssize_t idx) +int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)  { -#ifndef SHM_MAP_SINGLE_BLOCK -        long sz; -        long blocks = 0; -#endif          if (idx > SHM_BLOCKS_IN_MAP)                  return -1; -        pthread_mutex_lock(dum->shm_mutex); +        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(dum->shm_mutex); +        } -        if (*dum->ptr_head == *dum->ptr_tail) { +        if (shm_map_empty(dum)) {                  pthread_mutex_unlock(dum->shm_mutex);                  return -1;          } -        idx_to_du_buff_ptr(dum, idx)->garbage = 1; +        idx_to_du_buff_ptr(dum, idx)->dst_api = 0;          if (idx != *dum->ptr_tail) {                  pthread_mutex_unlock(dum->shm_mutex);                  return 0;          } -        while (get_tail_ptr(dum)->garbage == 1 && -               *dum->ptr_tail != *dum->ptr_head) { -#ifndef SHM_MAP_SINGLE_BLOCK -                sz = get_tail_ptr(dum)->size; -                while (sz + (long) sizeof(struct shm_du_buff) > 0) { -                        sz -= SHM_DU_BUFF_BLOCK_SIZE; -                        ++blocks; -                } +        garbage_collect(dum); -                *(dum->ptr_tail) = -                        (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); - -                blocks = 0; -#else -                *(dum->ptr_tail) = -                        (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); -#endif -        } +        *dum->choked = 0; +        pthread_cond_signal(dum->healthy);          pthread_mutex_unlock(dum->shm_mutex); | 
