summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-07-03 11:26:59 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-07-03 11:26:59 +0200
commitd2a7cb2d27dab595bd2948ad3724016ca948e61e (patch)
tree2c3c8037b780d60d34a1ac06e0439da79cf2fadb
parentde63f8b37f82ef6a760c7d3dafe2251160e2c114 (diff)
parent04f385a99f5e901598ee4b3d2655e458c92c06d8 (diff)
downloadouroboros-0.2.tar.gz
ouroboros-0.2.zip
Merged in dstaesse/ouroboros/be-multiblock-read (pull request #148)0.2
lib: shm_du_map full multi-block support
-rw-r--r--include/ouroboros/config.h.in3
-rw-r--r--src/lib/shm_du_map.c182
2 files changed, 81 insertions, 104 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 6861c6cb..a3f6e87c 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -33,9 +33,10 @@
#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@"
#define AP_MAX_FLOWS 256
#define SHM_DU_BUFF_BLOCK_SIZE sysconf(_SC_PAGESIZE)
+#define SHM_DU_MAP_MULTI_BLOCK
#define SHM_DU_MAP_FILENAME "ouroboros_du_map"
#define SHM_BLOCKS_IN_MAP (1 << 14)
-#define SHM_DU_TIMEOUT_MICROS 2000
+#define SHM_DU_TIMEOUT_MICROS 15000
#define DU_BUFF_HEADSPACE 128
#define DU_BUFF_TAILSPACE 0
#define SHM_AP_RBUFF_PREFIX "ouroboros_rb_"
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index 24adac1a..31fcca8e 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -62,13 +62,11 @@
#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head)
-#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \
- idx_to_du_buff_ptr(dum, idx)->du_head)
-
-#define MIN(a,b)(a < b ? a : b)
-
struct shm_du_buff {
size_t size;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ size_t blocks;
+#endif
size_t du_head;
size_t du_tail;
pid_t dst_api;
@@ -88,54 +86,32 @@ struct shm_du_map {
static void garbage_collect(struct shm_du_map * dum)
{
-#ifndef SHM_MAP_SINGLE_BLOCK
- long sz;
- long blocks;
-#endif
- while (get_tail_ptr(dum)->dst_api == 0 &&
- !shm_map_empty(dum)) {
-#ifndef SHM_MAP_SINGLE_BLOCK
- blocks = 0;
- sz = get_tail_ptr(dum)->size +
- (long) sizeof(struct shm_du_buff);
- while (sz > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
- }
-
- *dum->ptr_tail =
- (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
-
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ struct shm_du_buff * sdb;
+ while ((sdb = get_tail_ptr(dum))->dst_api == 0 &&
+ !shm_map_empty(dum))
+ *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks)
+ & (SHM_BLOCKS_IN_MAP - 1);
#else
+ while (get_tail_ptr(dum)->dst_api == 0 &&
+ !shm_map_empty(dum))
*dum->ptr_tail =
(*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
+
#endif
- }
}
static void clean_sdus(struct shm_du_map * dum, pid_t api)
{
size_t idx = *dum->ptr_tail;
struct shm_du_buff * buf;
-#ifndef SHM_DU_MAP_SINGLE_BLOCK
- long sz;
- long blocks = 0;
-#endif
while (idx != *dum->ptr_head) {
buf = idx_to_du_buff_ptr(dum, idx);
if (buf->dst_api == api)
buf->dst_api = 0;
-
-#ifndef SHM_DU_MAP_SINGLE_BLOCK
- blocks = 0;
- sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff);
- while (sz > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
- }
-
- idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ idx = (idx + buf->blocks) & (SHM_BLOCKS_IN_MAP - 1);
#else
idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1);
#endif
@@ -320,6 +296,9 @@ void * shm_du_map_sanitize(void * o)
while (true) {
int ret = 0;
+ struct timespec now;
+ struct timespec dl;
+
if (pthread_cond_wait(dum->full, dum->shm_mutex)
== EOWNERDEAD) {
LOG_WARN("Recovering dead mutex.");
@@ -328,33 +307,37 @@ void * shm_du_map_sanitize(void * o)
*dum->choked = 1;
+ garbage_collect(dum);
+
+ if (shm_map_empty(dum))
+ continue;
+
api = get_tail_ptr(dum)->dst_api;
- if (kill(api, 0) == 0) {
- struct timespec now;
- struct timespec dl;
- clock_gettime(CLOCK_REALTIME, &now);
- ts_add(&now, &intv, &dl);
- while (*dum->choked) {
- ret = pthread_cond_timedwait(dum->healthy,
- dum->shm_mutex,
- &dl);
- if (!ret)
- continue;
-
- if (ret == EOWNERDEAD) {
- LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
- }
-
- if (ret == ETIMEDOUT) {
- LOG_DBGF("SDU timed out.");
- clean_sdus(dum, api);
- }
- }
- } else {
- LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret);
+ if (kill(api, 0)) {
+ LOG_DBGF("Dead process %d left stale sdu.", api);
clean_sdus(dum, api);
+ continue;
+ }
+
+ clock_gettime(CLOCK_REALTIME, &now);
+ ts_add(&now, &intv, &dl);
+ while (*dum->choked) {
+ ret = pthread_cond_timedwait(dum->healthy,
+ dum->shm_mutex,
+ &dl);
+ if (!ret)
+ continue;
+
+ if (ret == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ if (ret == ETIMEDOUT) {
+ LOG_DBGF("SDU timed out.");
+ clean_sdus(dum, api);
+ }
}
}
@@ -406,12 +389,12 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
size_t len)
{
struct shm_du_buff * sdb;
-#ifndef SHM_MAP_SINGLE_BLOCK
- long blocks = 0;
size_t size = headspace + len + tailspace;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ long blocks = 0;
+ long padblocks = 0;
int sz = headspace + len + sizeof *sdb;
int sz2 = sz + tailspace;
- size_t copy_len;
#endif
uint8_t * write_pos;
ssize_t index = -1;
@@ -421,29 +404,17 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
return -1;
}
-#ifdef SHM_MAP_SINGLE_BLOCK
+#ifndef SHM_DU_MAP_MULTI_BLOCK
if (size + sizeof *sdb > SHM_DU_BUFF_BLOCK_SIZE) {
LOG_DBGF("Multi-block SDU's disabled. Dropping.");
return -1;
}
#endif
-
- if (headspace >= size) {
- LOG_DBGF("Index out of bounds.");
- return -1;
- }
-
- if (headspace + len > size) {
- LOG_DBGF("Buffer too small for data.");
- return -1;
- }
-
if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
pthread_mutex_consistent(dum->shm_mutex);
}
-
-#ifndef SHM_MAP_SINGLE_BLOCK
+#ifdef SHM_DU_MAP_MULTI_BLOCK
while (sz2 > 0) {
sz2 -= SHM_DU_BUFF_BLOCK_SIZE;
sz -= SHM_DU_BUFF_BLOCK_SIZE;
@@ -455,7 +426,10 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
++blocks;
}
- if (!shm_map_free(dum, blocks)) {
+ if (blocks + *dum->ptr_head > SHM_BLOCKS_IN_MAP - 1)
+ padblocks = SHM_BLOCKS_IN_MAP - *dum->ptr_head;
+
+ if (!shm_map_free(dum, (blocks + padblocks))) {
#else
if (!shm_map_free(dum, 1)) {
#endif
@@ -464,31 +438,34 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
return -1;
}
- sdb = get_head_ptr(dum);
- sdb->size = size;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ if (padblocks) {
+ sdb = get_head_ptr(dum);
+ sdb->size = 0;
+ sdb->blocks = padblocks;
+ sdb->dst_api = 0;
+ sdb->du_head = 0;
+ sdb->du_tail = 0;
+
+ *dum->ptr_head = 0;
+ }
+#endif
+ sdb = get_head_ptr(dum);
+ sdb->size = size;
sdb->dst_api = dst_api;
sdb->du_head = headspace;
sdb->du_tail = sdb->du_head + len;
-
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ sdb->blocks = blocks;
+#endif
write_pos = ((uint8_t *) sdb) + sizeof *sdb + headspace;
-#ifndef SHM_MAP_SINGLE_BLOCK
- copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb);
- while (blocks > 0) {
- memcpy(write_pos, data, copy_len);
- *dum->ptr_head = (*dum->ptr_head + 1)
- & (SHM_BLOCKS_IN_MAP - 1);
- len -= copy_len;
- copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE);
- write_pos = (uint8_t *) get_head_ptr(dum);
- --blocks;
- }
-
- index = (*dum->ptr_head - 1 + SHM_BLOCKS_IN_MAP)
- & (SHM_BLOCKS_IN_MAP - 1);
-#else
memcpy(write_pos, data, len);
+
index = *dum->ptr_head;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+#else
*dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
#endif
pthread_mutex_unlock(dum->shm_mutex);
@@ -496,12 +473,12 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
return index;
}
-/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */
int shm_du_map_read(uint8_t ** dst,
struct shm_du_map * dum,
ssize_t idx)
{
size_t len = 0;
+ struct shm_du_buff * sdb;
if (idx > SHM_BLOCKS_IN_MAP)
return -1;
@@ -516,10 +493,9 @@ int shm_du_map_read(uint8_t ** dst,
return -1;
}
- *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) +
- sizeof(struct shm_du_buff) +
- idx_to_du_buff_ptr(dum, idx)->du_head;
- len = sdu_size(dum, idx);
+ sdb = idx_to_du_buff_ptr(dum, idx);
+ len = sdb->du_tail - sdb->du_head;
+ *dst = ((uint8_t *) sdb) + sizeof(struct shm_du_buff) + sdb->du_head;
pthread_mutex_unlock(dum->shm_mutex);