diff options
author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-07-02 19:15:26 +0200 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-07-02 19:15:26 +0200 |
commit | de63f8b37f82ef6a760c7d3dafe2251160e2c114 (patch) | |
tree | cd79dba391c0ded80125836069d8187a22f7e5f5 /src | |
parent | cd4d09aae14afe7b0aa0890c61b0ad43e4f23b28 (diff) | |
parent | 79475a4742bc28e1737044f2300bcb601e6e10bf (diff) | |
download | ouroboros-de63f8b37f82ef6a760c7d3dafe2251160e2c114.tar.gz ouroboros-de63f8b37f82ef6a760c7d3dafe2251160e2c114.zip |
Merged in dstaesse/ouroboros/be-shm-robust (pull request #147)
lib: robust locking in shared memory and crash recovery
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/flow.h | 2 | ||||
-rw-r--r-- | src/ipcpd/local/main.c | 2 | ||||
-rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 19 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 20 | ||||
-rw-r--r-- | src/irmd/main.c | 18 | ||||
-rw-r--r-- | src/lib/dev.c | 42 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 70 | ||||
-rw-r--r-- | src/lib/shm_du_map.c | 257 |
8 files changed, 316 insertions, 114 deletions
diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h index e27882e2..b0f1390a 100644 --- a/src/ipcpd/flow.h +++ b/src/ipcpd/flow.h @@ -32,6 +32,8 @@ struct flow { int port_id; struct shm_ap_rbuff * rb; enum flow_state state; + + pid_t api; }; #endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 2120e4e8..837cbf8c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -178,7 +178,6 @@ static int port_id_to_fd(int port_id) /* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ static void * ipcp_local_sdu_loop(void * o) { - while (true) { struct rb_entry * e; int fd; @@ -208,6 +207,7 @@ static void * ipcp_local_sdu_loop(void * o) while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0) ; + pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 48b6391f..68e7e933 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -722,11 +722,12 @@ static void * eth_llc_ipcp_sdu_reader(void * o) continue; } - while ((index = - shm_create_du_buff(shim_data(_ipcp)->dum, - frame_len, 0, - (uint8_t *) (buf + i), - frame_len)) < 0) + while ((index = shm_du_map_write(shim_data(_ipcp)->dum, + ipcp_flow(j)->api, + 0, + 0, + (uint8_t *) (buf + i), + frame_len)) < 0) ; e.index = index; @@ -769,9 +770,9 @@ static void * eth_llc_ipcp_sdu_writer(void * o) return (void *) 1; /* -ENOTENROLLED */ } - len = shm_du_map_read_sdu((uint8_t **) &buf, - shim_data(_ipcp)->dum, - e->index); + len = shm_du_map_read((uint8_t **) &buf, + shim_data(_ipcp)->dum, + e->index); if (len <= 0) { pthread_rwlock_unlock(&_ipcp->state_lock); free(e); @@ -798,7 +799,7 @@ static void * eth_llc_ipcp_sdu_writer(void * o) pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); if (shim_data(_ipcp)->dum != NULL) - shm_release_du_buff(shim_data(_ipcp)->dum, e->index); + shm_du_map_remove(shim_data(_ipcp)->dum, e->index); pthread_rwlock_unlock(&_ipcp->state_lock); } diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index a28c262f..68d393af 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -206,12 +206,12 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) pthread_rwlock_rdlock(&_ap_instance->flows_lock); - while ((index = shm_create_du_buff(_ap_instance->dum, - count + DU_BUFF_HEADSPACE + - DU_BUFF_TAILSPACE, - DU_BUFF_HEADSPACE, - (uint8_t *) buf, - count)) < 0) + while ((index = shm_du_map_write(_ap_instance->dum, + _ap_instance->flows[fd].api, + 0, + 0, + (uint8_t *) buf, + count)) < 0) ; e.index = index; @@ -772,9 +772,9 @@ static void * ipcp_udp_sdu_loop(void * o) return (void *) 1; /* -ENOTENROLLED */ } - len = shm_du_map_read_sdu((uint8_t **) &buf, - _ap_instance->dum, - e->index); + len = shm_du_map_read((uint8_t **) &buf, + _ap_instance->dum, + e->index); if (len <= 0) { pthread_rwlock_unlock(&_ipcp->state_lock); free(e); @@ -799,7 +799,7 @@ static void * ipcp_udp_sdu_loop(void * o) pthread_rwlock_rdlock(&_ipcp->state_lock); if (_ap_instance->dum != NULL) - shm_release_du_buff(_ap_instance->dum, e->index); + shm_du_map_remove(_ap_instance->dum, e->index); pthread_rwlock_unlock(&_ipcp->state_lock); } diff --git a/src/irmd/main.c b/src/irmd/main.c index 5ff84da1..b4771b89 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -30,6 +30,7 @@ #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/irm_config.h> +#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/shm_du_map.h> #include <ouroboros/bitmap.h> #include <ouroboros/flow.h> @@ -163,7 +164,7 @@ static void reg_instance_destroy(struct reg_instance * i) while (wait) { pthread_mutex_lock(&i->mutex); - if (pthread_cond_destroy(&i->wakeup) < 0) + if (pthread_cond_destroy(&i->wakeup)) pthread_cond_broadcast(&i->wakeup); else wait = false; @@ -291,7 +292,7 @@ static void port_map_entry_destroy(struct port_map_entry * e) while (wait) { pthread_mutex_lock(&e->res_lock); - if (pthread_cond_destroy(&e->res_signal) < 0) + if (pthread_cond_destroy(&e->res_signal)) pthread_cond_broadcast(&e->res_signal); else wait = false; @@ -477,7 +478,7 @@ static void reg_entry_destroy(struct reg_entry * e) while (wait) { pthread_mutex_lock(&e->state_lock); - if (pthread_cond_destroy(&e->acc_signal) < 0) + if (pthread_cond_destroy(&e->acc_signal)) pthread_cond_broadcast(&e->acc_signal); else wait = false; @@ -1942,18 +1943,26 @@ void * irm_flow_cleaner() pthread_mutex_unlock(&e->res_lock); if (kill(e->n_api, 0) < 0) { + struct shm_ap_rbuff * n_rb = + shm_ap_rbuff_open(e->n_api); bmp_release(instance->port_ids, e->port_id); list_del(&e->next); LOG_INFO("Process %d gone, %d deallocated.", e->n_api, e->port_id); ipcp_flow_dealloc(e->n_1_api, e->port_id); + if (n_rb != NULL) + shm_ap_rbuff_destroy(n_rb); port_map_entry_destroy(e); } if (kill(e->n_1_api, 0) < 0) { + struct shm_ap_rbuff * n_1_rb = + shm_ap_rbuff_open(e->n_1_api); list_del(&e->next); LOG_ERR("IPCP %d gone, flow %d removed.", e->n_1_api, e->port_id); + if (n_1_rb != NULL) + shm_ap_rbuff_destroy(n_1_rb); port_map_entry_destroy(e); } } @@ -2205,7 +2214,8 @@ static struct irm * irm_create() shm_du_map_destroy(dum); LOG_INFO("Stale shm file removed."); } else { - LOG_INFO("IRMd already running, exiting."); + LOG_INFO("IRMd already running (%d), exiting.", + shm_du_map_owner(dum)); free(instance); exit(EXIT_SUCCESS); } diff --git a/src/lib/dev.c b/src/lib/dev.c index ac995b2d..19bc90e5 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -40,7 +40,7 @@ struct flow { int port_id; int oflags; - /* don't think this needs locking */ + pid_t api; }; struct ap_data { @@ -93,6 +93,7 @@ int ap_init(char * ap_name) for (i = 0; i < AP_MAX_FLOWS; ++i) { _ap_instance->flows[i].rb = NULL; _ap_instance->flows[i].port_id = -1; + _ap_instance->flows[i].api = 0; /* API_INVALID */ } pthread_rwlock_init(&_ap_instance->flows_lock, NULL); @@ -319,6 +320,8 @@ int flow_alloc(char * dst_name, _ap_instance->flows[fd].port_id = recv_msg->port_id; _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + _ap_instance->flows[fd].api = + shm_ap_rbuff_get_api(_ap_instance->flows[fd].rb); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -349,7 +352,7 @@ int flow_alloc_res(int fd) return -ENOTALLOC; } - msg.port_id = _ap_instance->flows[fd].port_id; + msg.port_id = _ap_instance->flows[fd].port_id; pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -389,11 +392,12 @@ int flow_dealloc(int fd) return -ENOTALLOC; } - msg.port_id = _ap_instance->flows[fd].port_id; + msg.port_id = _ap_instance->flows[fd].port_id; _ap_instance->flows[fd].port_id = -1; shm_ap_rbuff_close(_ap_instance->flows[fd].rb); _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].api = 0; bmp_release(_ap_instance->fds, fd); @@ -476,12 +480,12 @@ ssize_t flow_write(int fd, void * buf, size_t count) } if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - index = shm_create_du_buff(_ap_instance->dum, - count + DU_BUFF_HEADSPACE + - DU_BUFF_TAILSPACE, - DU_BUFF_HEADSPACE, - (uint8_t *) buf, - count); + index = shm_du_map_write(_ap_instance->dum, + _ap_instance->flows[fd].api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + (uint8_t *) buf, + count); if (index == -1) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -492,18 +496,18 @@ ssize_t flow_write(int fd, void * buf, size_t count) e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { - shm_release_du_buff(_ap_instance->dum, index); + shm_du_map_remove(_ap_instance->dum, index); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; } } else { /* blocking */ - while ((index = shm_create_du_buff(_ap_instance->dum, - count + DU_BUFF_HEADSPACE + - DU_BUFF_TAILSPACE, - DU_BUFF_HEADSPACE, - (uint8_t *) buf, - count)) < 0) + while ((index = shm_du_map_write(_ap_instance->dum, + _ap_instance->flows[fd].api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + (uint8_t *) buf, + count)) < 0) ; e.index = index; @@ -555,9 +559,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) return -EAGAIN; } - n = shm_du_map_read_sdu(&sdu, - _ap_instance->dum, - idx); + n = shm_du_map_read(&sdu, _ap_instance->dum, idx); if (n < 0) { pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; @@ -565,7 +567,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) memcpy(buf, sdu, MIN(n, count)); - shm_release_du_buff(_ap_instance->dum, idx); + shm_du_map_remove(_ap_instance->dum, idx); pthread_rwlock_unlock(&_ap_instance->data_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 69e96c40..f54627b7 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -26,6 +26,7 @@ #include <ouroboros/logs.h> #include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_du_map.h> #include <pthread.h> #include <sys/mman.h> @@ -34,7 +35,6 @@ #include <string.h> #include <stdint.h> #include <unistd.h> -#include <stdbool.h> #include <errno.h> #include <sys/stat.h> @@ -127,6 +127,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() rb->work = (pthread_cond_t *) (rb->shm_mutex + 1); pthread_mutexattr_init(&mattr); + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(rb->shm_mutex, &mattr); @@ -213,6 +214,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) { char fn[25]; + struct shm_du_map * dum = NULL; if (rb == NULL) { LOG_DBGF("Bogus input. Bugging out."); @@ -220,8 +222,17 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) } if (rb->api != getpid()) { - LOG_ERR("Tried to destroy other AP's rbuff."); - return; + dum = shm_du_map_open(); + if (shm_du_map_owner(dum) == getpid()) { + LOG_DBGF("Ringbuffer %d destroyed by IRMd %d.", + rb->api, getpid()); + shm_du_map_close(dum); + } else { + LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", + getpid(), rb->api); + shm_du_map_close(dum); + return; + } } if (close(rb->fd) < 0) @@ -243,12 +254,16 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) if (rb == NULL || e == NULL) return -1; - pthread_mutex_lock(rb->shm_mutex); + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } if (!shm_rbuff_free(rb)) { pthread_mutex_unlock(rb->shm_mutex); return -1; } + if (shm_rbuff_empty(rb)) pthread_cond_broadcast(rb->work); @@ -269,10 +284,21 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->shm_mutex); - pthread_mutex_lock(rb->shm_mutex); + + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } + + while (tail_el_ptr->port_id < 0) + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); while(shm_rbuff_empty(rb)) - pthread_cond_wait(rb->work, rb->shm_mutex); + if (pthread_cond_wait(rb->work, rb->shm_mutex) + == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } e = malloc(sizeof(*e)); if (e == NULL) { @@ -293,13 +319,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) { ssize_t idx = -1; - pthread_mutex_lock(rb->shm_mutex); + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } if (shm_rbuff_empty(rb)) { pthread_mutex_unlock(rb->shm_mutex); return -1; } + while (tail_el_ptr->port_id < 0) + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + if (tail_el_ptr->port_id != port_id) { pthread_mutex_unlock(rb->shm_mutex); return -1; @@ -313,3 +345,27 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) return idx; } + +pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff *rb) +{ + pid_t api = 0; + if (rb == NULL) + return 0; + + pthread_mutex_lock(rb->shm_mutex); + api = rb->api; + pthread_mutex_unlock(rb->shm_mutex); + + return api; +} + +void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) +{ + if (rb == NULL) + return; + + pthread_mutex_lock(rb->shm_mutex); + *rb->ptr_tail = 0; + *rb->ptr_head = 0; + pthread_mutex_unlock(rb->shm_mutex); +} diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index 2a316265..24adac1a 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -23,11 +23,14 @@ #include <ouroboros/config.h> #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/time_utils.h> #include <pthread.h> #include <sys/mman.h> #include <fcntl.h> #include <stdlib.h> #include <string.h> +#include <signal.h> #include <sys/stat.h> #define OUROBOROS_PREFIX "shm_du_map" @@ -35,8 +38,8 @@ #include <ouroboros/logs.h> #define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof (size_t) \ - + sizeof(pthread_mutex_t) + sizeof(pthread_cond_t) \ +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ + + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) #define get_head_ptr(dum) \ @@ -57,6 +60,8 @@ & (SHM_BLOCKS_IN_MAP - 1)) #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) + #define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \ idx_to_du_buff_ptr(dum, idx)->du_head) @@ -66,7 +71,7 @@ struct shm_du_buff { size_t size; size_t du_head; size_t du_tail; - size_t garbage; + pid_t dst_api; }; struct shm_du_map { @@ -74,11 +79,80 @@ struct shm_du_map { size_t * ptr_head; /* start of ringbuffer head */ size_t * ptr_tail; /* start of ringbuffer tail */ pthread_mutex_t * shm_mutex; /* lock all free space in shm */ - pthread_cond_t * sanitize; /* run sanitizer when buffer full */ + size_t * choked; /* stale sdu detection */ + pthread_cond_t * healthy; /* du map is healthy */ + pthread_cond_t * full; /* run sanitizer when buffer full */ pid_t * api; /* api of the irmd owner */ int fd; }; +static void garbage_collect(struct shm_du_map * dum) +{ +#ifndef SHM_MAP_SINGLE_BLOCK + long sz; + long blocks; +#endif + while (get_tail_ptr(dum)->dst_api == 0 && + !shm_map_empty(dum)) { +#ifndef SHM_MAP_SINGLE_BLOCK + blocks = 0; + sz = get_tail_ptr(dum)->size + + (long) sizeof(struct shm_du_buff); + while (sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + *dum->ptr_tail = + (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); + +#else + *dum->ptr_tail = + (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); +#endif + } +} + +static void clean_sdus(struct shm_du_map * dum, pid_t api) +{ + size_t idx = *dum->ptr_tail; + struct shm_du_buff * buf; +#ifndef SHM_DU_MAP_SINGLE_BLOCK + long sz; + long blocks = 0; +#endif + + while (idx != *dum->ptr_head) { + buf = idx_to_du_buff_ptr(dum, idx); + if (buf->dst_api == api) + buf->dst_api = 0; + +#ifndef SHM_DU_MAP_SINGLE_BLOCK + blocks = 0; + sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff); + while (sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1); +#else + idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1); +#endif + } + + garbage_collect(dum); + + if (kill(api, 0) == 0) { + struct shm_ap_rbuff * rb; + rb = shm_ap_rbuff_open(api); + shm_ap_rbuff_reset(rb); + shm_ap_rbuff_close(rb); + } + + *dum->choked = 0; +} + struct shm_du_map * shm_du_map_create() { struct shm_du_map * dum; @@ -140,8 +214,10 @@ struct shm_du_map * shm_du_map_create() ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1); - dum->api = (pid_t *) (dum->sanitize + 1); + dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->healthy = (pthread_cond_t *) (dum->choked + 1); + dum->full = dum->healthy + 1; + dum->api = (pid_t *) (dum->full + 1); pthread_mutexattr_init(&mattr); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -150,11 +226,14 @@ struct shm_du_map * shm_du_map_create() pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); - pthread_cond_init(dum->sanitize, &cattr); + pthread_cond_init(dum->full, &cattr); + pthread_cond_init(dum->healthy, &cattr); *dum->ptr_head = 0; *dum->ptr_tail = 0; + *dum->choked = 0; + *dum->api = getpid(); dum->fd = shm_fd; @@ -202,8 +281,10 @@ struct shm_du_map * shm_du_map_open() ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1); - dum->api = (pid_t *) (dum->sanitize + 1); + dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->healthy = (pthread_cond_t *) (dum->choked + 1); + dum->full = dum->healthy + 1; + dum->api = (pid_t *) (dum->full + 1); dum->fd = shm_fd; @@ -212,12 +293,73 @@ struct shm_du_map * shm_du_map_open() pid_t shm_du_map_owner(struct shm_du_map * dum) { + if (dum == NULL) + return 0; + return *dum->api; } void * shm_du_map_sanitize(void * o) { - LOG_MISSING; + struct shm_du_map * dum = (struct shm_du_map *) o; + struct timespec intv + = {SHM_DU_TIMEOUT_MICROS / MILLION, + (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; + + pid_t api; + + if (dum == NULL) + return (void *) -1; + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, + (void *) dum->shm_mutex); + + while (true) { + int ret = 0; + if (pthread_cond_wait(dum->full, dum->shm_mutex) + == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + *dum->choked = 1; + + api = get_tail_ptr(dum)->dst_api; + + if (kill(api, 0) == 0) { + struct timespec now; + struct timespec dl; + clock_gettime(CLOCK_REALTIME, &now); + ts_add(&now, &intv, &dl); + while (*dum->choked) { + ret = pthread_cond_timedwait(dum->healthy, + dum->shm_mutex, + &dl); + if (!ret) + continue; + + if (ret == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + if (ret == ETIMEDOUT) { + LOG_DBGF("SDU timed out."); + clean_sdus(dum, api); + } + } + } else { + LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret); + clean_sdus(dum, api); + } + } + + pthread_cleanup_pop(true); + return (void *) 0; } @@ -256,21 +398,23 @@ void shm_du_map_destroy(struct shm_du_map * dum) free(dum); } -ssize_t shm_create_du_buff(struct shm_du_map * dum, - size_t size, - size_t headspace, - uint8_t * data, - size_t len) +ssize_t shm_du_map_write(struct shm_du_map * dum, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) { struct shm_du_buff * sdb; #ifndef SHM_MAP_SINGLE_BLOCK long blocks = 0; - int sz = size + sizeof *sdb; - int sz2 = headspace + len + sizeof *sdb; + size_t size = headspace + len + tailspace; + int sz = headspace + len + sizeof *sdb; + int sz2 = sz + tailspace; size_t copy_len; #endif uint8_t * write_pos; - ssize_t index; + ssize_t index = -1; if (dum == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -294,36 +438,35 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, return -1; } - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } #ifndef SHM_MAP_SINGLE_BLOCK - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; + while (sz2 > 0) { sz2 -= SHM_DU_BUFF_BLOCK_SIZE; - if (sz2 < 0 && sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + if (sz < 0 && sz2 > 0) { pthread_mutex_unlock(dum->shm_mutex); - LOG_DBG("Can't handle this packet now"); - return -1; + LOG_DBG("Can't handle this packet now."); + return -EAGAIN; } ++blocks; } if (!shm_map_free(dum, blocks)) { - pthread_mutex_unlock(dum->shm_mutex); - pthread_cond_signal(dum->sanitize); - return -1; - } #else if (!shm_map_free(dum, 1)) { +#endif + pthread_cond_signal(dum->full); pthread_mutex_unlock(dum->shm_mutex); - ptrhead_cond_signal(dum->sanitize); return -1; } -#endif sdb = get_head_ptr(dum); sdb->size = size; - sdb->garbage = 0; + sdb->dst_api = dst_api; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; @@ -333,7 +476,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb); while (blocks > 0) { memcpy(write_pos, data, copy_len); - *(dum->ptr_head) = (*dum->ptr_head + 1) + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); len -= copy_len; copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE); @@ -346,7 +489,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, #else memcpy(write_pos, data, len); index = *dum->ptr_head; - *(dum->ptr_head) = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); #endif pthread_mutex_unlock(dum->shm_mutex); @@ -354,18 +497,21 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, } /* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ -int shm_du_map_read_sdu(uint8_t ** dst, - struct shm_du_map * dum, - ssize_t idx) +int shm_du_map_read(uint8_t ** dst, + struct shm_du_map * dum, + ssize_t idx) { size_t len = 0; if (idx > SHM_BLOCKS_IN_MAP) return -1; - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } - if (*dum->ptr_head == *dum->ptr_tail) { + if (shm_map_empty(dum)) { pthread_mutex_unlock(dum->shm_mutex); return -1; } @@ -380,47 +526,32 @@ int shm_du_map_read_sdu(uint8_t ** dst, return len; } -int shm_release_du_buff(struct shm_du_map * dum, ssize_t idx) +int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) { -#ifndef SHM_MAP_SINGLE_BLOCK - long sz; - long blocks = 0; -#endif if (idx > SHM_BLOCKS_IN_MAP) return -1; - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } - if (*dum->ptr_head == *dum->ptr_tail) { + if (shm_map_empty(dum)) { pthread_mutex_unlock(dum->shm_mutex); return -1; } - idx_to_du_buff_ptr(dum, idx)->garbage = 1; + idx_to_du_buff_ptr(dum, idx)->dst_api = 0; if (idx != *dum->ptr_tail) { pthread_mutex_unlock(dum->shm_mutex); return 0; } - while (get_tail_ptr(dum)->garbage == 1 && - *dum->ptr_tail != *dum->ptr_head) { -#ifndef SHM_MAP_SINGLE_BLOCK - sz = get_tail_ptr(dum)->size; - while (sz + (long) sizeof(struct shm_du_buff) > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } + garbage_collect(dum); - *(dum->ptr_tail) = - (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); - - blocks = 0; -#else - *(dum->ptr_tail) = - (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); -#endif - } + *dum->choked = 0; + pthread_cond_signal(dum->healthy); pthread_mutex_unlock(dum->shm_mutex); |