summaryrefslogtreecommitdiff
path: root/src/lib/shm_du_map.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_du_map.c')
-rw-r--r--src/lib/shm_du_map.c257
1 files changed, 194 insertions, 63 deletions
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index 2a316265..24adac1a 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -23,11 +23,14 @@
#include <ouroboros/config.h>
#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/time_utils.h>
#include <pthread.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
+#include <signal.h>
#include <sys/stat.h>
#define OUROBOROS_PREFIX "shm_du_map"
@@ -35,8 +38,8 @@
#include <ouroboros/logs.h>
#define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE)
-#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof (size_t) \
- + sizeof(pthread_mutex_t) + sizeof(pthread_cond_t) \
+#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \
+ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
#define get_head_ptr(dum) \
@@ -57,6 +60,8 @@
& (SHM_BLOCKS_IN_MAP - 1))
#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP)
+#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head)
+
#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \
idx_to_du_buff_ptr(dum, idx)->du_head)
@@ -66,7 +71,7 @@ struct shm_du_buff {
size_t size;
size_t du_head;
size_t du_tail;
- size_t garbage;
+ pid_t dst_api;
};
struct shm_du_map {
@@ -74,11 +79,80 @@ struct shm_du_map {
size_t * ptr_head; /* start of ringbuffer head */
size_t * ptr_tail; /* start of ringbuffer tail */
pthread_mutex_t * shm_mutex; /* lock all free space in shm */
- pthread_cond_t * sanitize; /* run sanitizer when buffer full */
+ size_t * choked; /* stale sdu detection */
+ pthread_cond_t * healthy; /* du map is healthy */
+ pthread_cond_t * full; /* run sanitizer when buffer full */
pid_t * api; /* api of the irmd owner */
int fd;
};
+static void garbage_collect(struct shm_du_map * dum)
+{
+#ifndef SHM_MAP_SINGLE_BLOCK
+ long sz;
+ long blocks;
+#endif
+ while (get_tail_ptr(dum)->dst_api == 0 &&
+ !shm_map_empty(dum)) {
+#ifndef SHM_MAP_SINGLE_BLOCK
+ blocks = 0;
+ sz = get_tail_ptr(dum)->size +
+ (long) sizeof(struct shm_du_buff);
+ while (sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ *dum->ptr_tail =
+ (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+
+#else
+ *dum->ptr_tail =
+ (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
+#endif
+ }
+}
+
+static void clean_sdus(struct shm_du_map * dum, pid_t api)
+{
+ size_t idx = *dum->ptr_tail;
+ struct shm_du_buff * buf;
+#ifndef SHM_DU_MAP_SINGLE_BLOCK
+ long sz;
+ long blocks = 0;
+#endif
+
+ while (idx != *dum->ptr_head) {
+ buf = idx_to_du_buff_ptr(dum, idx);
+ if (buf->dst_api == api)
+ buf->dst_api = 0;
+
+#ifndef SHM_DU_MAP_SINGLE_BLOCK
+ blocks = 0;
+ sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff);
+ while (sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+#else
+ idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1);
+#endif
+ }
+
+ garbage_collect(dum);
+
+ if (kill(api, 0) == 0) {
+ struct shm_ap_rbuff * rb;
+ rb = shm_ap_rbuff_open(api);
+ shm_ap_rbuff_reset(rb);
+ shm_ap_rbuff_close(rb);
+ }
+
+ *dum->choked = 0;
+}
+
struct shm_du_map * shm_du_map_create()
{
struct shm_du_map * dum;
@@ -140,8 +214,10 @@ struct shm_du_map * shm_du_map_create()
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1);
- dum->api = (pid_t *) (dum->sanitize + 1);
+ dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->healthy = (pthread_cond_t *) (dum->choked + 1);
+ dum->full = dum->healthy + 1;
+ dum->api = (pid_t *) (dum->full + 1);
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
@@ -150,11 +226,14 @@ struct shm_du_map * shm_du_map_create()
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
- pthread_cond_init(dum->sanitize, &cattr);
+ pthread_cond_init(dum->full, &cattr);
+ pthread_cond_init(dum->healthy, &cattr);
*dum->ptr_head = 0;
*dum->ptr_tail = 0;
+ *dum->choked = 0;
+
*dum->api = getpid();
dum->fd = shm_fd;
@@ -202,8 +281,10 @@ struct shm_du_map * shm_du_map_open()
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1);
- dum->api = (pid_t *) (dum->sanitize + 1);
+ dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->healthy = (pthread_cond_t *) (dum->choked + 1);
+ dum->full = dum->healthy + 1;
+ dum->api = (pid_t *) (dum->full + 1);
dum->fd = shm_fd;
@@ -212,12 +293,73 @@ struct shm_du_map * shm_du_map_open()
pid_t shm_du_map_owner(struct shm_du_map * dum)
{
+ if (dum == NULL)
+ return 0;
+
return *dum->api;
}
void * shm_du_map_sanitize(void * o)
{
- LOG_MISSING;
+ struct shm_du_map * dum = (struct shm_du_map *) o;
+ struct timespec intv
+ = {SHM_DU_TIMEOUT_MICROS / MILLION,
+ (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000};
+
+ pid_t api;
+
+ if (dum == NULL)
+ return (void *) -1;
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
+ (void *) dum->shm_mutex);
+
+ while (true) {
+ int ret = 0;
+ if (pthread_cond_wait(dum->full, dum->shm_mutex)
+ == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ *dum->choked = 1;
+
+ api = get_tail_ptr(dum)->dst_api;
+
+ if (kill(api, 0) == 0) {
+ struct timespec now;
+ struct timespec dl;
+ clock_gettime(CLOCK_REALTIME, &now);
+ ts_add(&now, &intv, &dl);
+ while (*dum->choked) {
+ ret = pthread_cond_timedwait(dum->healthy,
+ dum->shm_mutex,
+ &dl);
+ if (!ret)
+ continue;
+
+ if (ret == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ if (ret == ETIMEDOUT) {
+ LOG_DBGF("SDU timed out.");
+ clean_sdus(dum, api);
+ }
+ }
+ } else {
+ LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret);
+ clean_sdus(dum, api);
+ }
+ }
+
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
@@ -256,21 +398,23 @@ void shm_du_map_destroy(struct shm_du_map * dum)
free(dum);
}
-ssize_t shm_create_du_buff(struct shm_du_map * dum,
- size_t size,
- size_t headspace,
- uint8_t * data,
- size_t len)
+ssize_t shm_du_map_write(struct shm_du_map * dum,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t len)
{
struct shm_du_buff * sdb;
#ifndef SHM_MAP_SINGLE_BLOCK
long blocks = 0;
- int sz = size + sizeof *sdb;
- int sz2 = headspace + len + sizeof *sdb;
+ size_t size = headspace + len + tailspace;
+ int sz = headspace + len + sizeof *sdb;
+ int sz2 = sz + tailspace;
size_t copy_len;
#endif
uint8_t * write_pos;
- ssize_t index;
+ ssize_t index = -1;
if (dum == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -294,36 +438,35 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
return -1;
}
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
#ifndef SHM_MAP_SINGLE_BLOCK
- while (sz > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ while (sz2 > 0) {
sz2 -= SHM_DU_BUFF_BLOCK_SIZE;
- if (sz2 < 0 && sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ if (sz < 0 && sz2 > 0) {
pthread_mutex_unlock(dum->shm_mutex);
- LOG_DBG("Can't handle this packet now");
- return -1;
+ LOG_DBG("Can't handle this packet now.");
+ return -EAGAIN;
}
++blocks;
}
if (!shm_map_free(dum, blocks)) {
- pthread_mutex_unlock(dum->shm_mutex);
- pthread_cond_signal(dum->sanitize);
- return -1;
- }
#else
if (!shm_map_free(dum, 1)) {
+#endif
+ pthread_cond_signal(dum->full);
pthread_mutex_unlock(dum->shm_mutex);
- ptrhead_cond_signal(dum->sanitize);
return -1;
}
-#endif
sdb = get_head_ptr(dum);
sdb->size = size;
- sdb->garbage = 0;
+ sdb->dst_api = dst_api;
sdb->du_head = headspace;
sdb->du_tail = sdb->du_head + len;
@@ -333,7 +476,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb);
while (blocks > 0) {
memcpy(write_pos, data, copy_len);
- *(dum->ptr_head) = (*dum->ptr_head + 1)
+ *dum->ptr_head = (*dum->ptr_head + 1)
& (SHM_BLOCKS_IN_MAP - 1);
len -= copy_len;
copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE);
@@ -346,7 +489,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
#else
memcpy(write_pos, data, len);
index = *dum->ptr_head;
- *(dum->ptr_head) = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
#endif
pthread_mutex_unlock(dum->shm_mutex);
@@ -354,18 +497,21 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
}
/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */
-int shm_du_map_read_sdu(uint8_t ** dst,
- struct shm_du_map * dum,
- ssize_t idx)
+int shm_du_map_read(uint8_t ** dst,
+ struct shm_du_map * dum,
+ ssize_t idx)
{
size_t len = 0;
if (idx > SHM_BLOCKS_IN_MAP)
return -1;
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
- if (*dum->ptr_head == *dum->ptr_tail) {
+ if (shm_map_empty(dum)) {
pthread_mutex_unlock(dum->shm_mutex);
return -1;
}
@@ -380,47 +526,32 @@ int shm_du_map_read_sdu(uint8_t ** dst,
return len;
}
-int shm_release_du_buff(struct shm_du_map * dum, ssize_t idx)
+int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)
{
-#ifndef SHM_MAP_SINGLE_BLOCK
- long sz;
- long blocks = 0;
-#endif
if (idx > SHM_BLOCKS_IN_MAP)
return -1;
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
- if (*dum->ptr_head == *dum->ptr_tail) {
+ if (shm_map_empty(dum)) {
pthread_mutex_unlock(dum->shm_mutex);
return -1;
}
- idx_to_du_buff_ptr(dum, idx)->garbage = 1;
+ idx_to_du_buff_ptr(dum, idx)->dst_api = 0;
if (idx != *dum->ptr_tail) {
pthread_mutex_unlock(dum->shm_mutex);
return 0;
}
- while (get_tail_ptr(dum)->garbage == 1 &&
- *dum->ptr_tail != *dum->ptr_head) {
-#ifndef SHM_MAP_SINGLE_BLOCK
- sz = get_tail_ptr(dum)->size;
- while (sz + (long) sizeof(struct shm_du_buff) > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
- }
+ garbage_collect(dum);
- *(dum->ptr_tail) =
- (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
-
- blocks = 0;
-#else
- *(dum->ptr_tail) =
- (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
-#endif
- }
+ *dum->choked = 0;
+ pthread_cond_signal(dum->healthy);
pthread_mutex_unlock(dum->shm_mutex);