summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/flow.h2
-rw-r--r--src/ipcpd/local/main.c2
-rw-r--r--src/ipcpd/shim-eth-llc/main.c19
-rw-r--r--src/ipcpd/shim-udp/main.c20
-rw-r--r--src/irmd/main.c18
-rw-r--r--src/lib/dev.c42
-rw-r--r--src/lib/shm_ap_rbuff.c70
-rw-r--r--src/lib/shm_du_map.c257
8 files changed, 316 insertions, 114 deletions
diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h
index e27882e2..b0f1390a 100644
--- a/src/ipcpd/flow.h
+++ b/src/ipcpd/flow.h
@@ -32,6 +32,8 @@ struct flow {
int port_id;
struct shm_ap_rbuff * rb;
enum flow_state state;
+
+ pid_t api;
};
#endif /* OUROBOROS_FLOW_H */
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 2120e4e8..837cbf8c 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -178,7 +178,6 @@ static int port_id_to_fd(int port_id)
/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
static void * ipcp_local_sdu_loop(void * o)
{
-
while (true) {
struct rb_entry * e;
int fd;
@@ -208,6 +207,7 @@ static void * ipcp_local_sdu_loop(void * o)
while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0)
;
+
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ipcp->state_lock);
}
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 48b6391f..68e7e933 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -722,11 +722,12 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
continue;
}
- while ((index =
- shm_create_du_buff(shim_data(_ipcp)->dum,
- frame_len, 0,
- (uint8_t *) (buf + i),
- frame_len)) < 0)
+ while ((index = shm_du_map_write(shim_data(_ipcp)->dum,
+ ipcp_flow(j)->api,
+ 0,
+ 0,
+ (uint8_t *) (buf + i),
+ frame_len)) < 0)
;
e.index = index;
@@ -769,9 +770,9 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
return (void *) 1; /* -ENOTENROLLED */
}
- len = shm_du_map_read_sdu((uint8_t **) &buf,
- shim_data(_ipcp)->dum,
- e->index);
+ len = shm_du_map_read((uint8_t **) &buf,
+ shim_data(_ipcp)->dum,
+ e->index);
if (len <= 0) {
pthread_rwlock_unlock(&_ipcp->state_lock);
free(e);
@@ -798,7 +799,7 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
if (shim_data(_ipcp)->dum != NULL)
- shm_release_du_buff(shim_data(_ipcp)->dum, e->index);
+ shm_du_map_remove(shim_data(_ipcp)->dum, e->index);
pthread_rwlock_unlock(&_ipcp->state_lock);
}
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index a28c262f..68d393af 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -206,12 +206,12 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
pthread_rwlock_rdlock(&_ap_instance->flows_lock);
- while ((index = shm_create_du_buff(_ap_instance->dum,
- count + DU_BUFF_HEADSPACE +
- DU_BUFF_TAILSPACE,
- DU_BUFF_HEADSPACE,
- (uint8_t *) buf,
- count)) < 0)
+ while ((index = shm_du_map_write(_ap_instance->dum,
+ _ap_instance->flows[fd].api,
+ 0,
+ 0,
+ (uint8_t *) buf,
+ count)) < 0)
;
e.index = index;
@@ -772,9 +772,9 @@ static void * ipcp_udp_sdu_loop(void * o)
return (void *) 1; /* -ENOTENROLLED */
}
- len = shm_du_map_read_sdu((uint8_t **) &buf,
- _ap_instance->dum,
- e->index);
+ len = shm_du_map_read((uint8_t **) &buf,
+ _ap_instance->dum,
+ e->index);
if (len <= 0) {
pthread_rwlock_unlock(&_ipcp->state_lock);
free(e);
@@ -799,7 +799,7 @@ static void * ipcp_udp_sdu_loop(void * o)
pthread_rwlock_rdlock(&_ipcp->state_lock);
if (_ap_instance->dum != NULL)
- shm_release_du_buff(_ap_instance->dum, e->index);
+ shm_du_map_remove(_ap_instance->dum, e->index);
pthread_rwlock_unlock(&_ipcp->state_lock);
}
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 5ff84da1..b4771b89 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -30,6 +30,7 @@
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
#include <ouroboros/irm_config.h>
+#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/shm_du_map.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/flow.h>
@@ -163,7 +164,7 @@ static void reg_instance_destroy(struct reg_instance * i)
while (wait) {
pthread_mutex_lock(&i->mutex);
- if (pthread_cond_destroy(&i->wakeup) < 0)
+ if (pthread_cond_destroy(&i->wakeup))
pthread_cond_broadcast(&i->wakeup);
else
wait = false;
@@ -291,7 +292,7 @@ static void port_map_entry_destroy(struct port_map_entry * e)
while (wait) {
pthread_mutex_lock(&e->res_lock);
- if (pthread_cond_destroy(&e->res_signal) < 0)
+ if (pthread_cond_destroy(&e->res_signal))
pthread_cond_broadcast(&e->res_signal);
else
wait = false;
@@ -477,7 +478,7 @@ static void reg_entry_destroy(struct reg_entry * e)
while (wait) {
pthread_mutex_lock(&e->state_lock);
- if (pthread_cond_destroy(&e->acc_signal) < 0)
+ if (pthread_cond_destroy(&e->acc_signal))
pthread_cond_broadcast(&e->acc_signal);
else
wait = false;
@@ -1942,18 +1943,26 @@ void * irm_flow_cleaner()
pthread_mutex_unlock(&e->res_lock);
if (kill(e->n_api, 0) < 0) {
+ struct shm_ap_rbuff * n_rb =
+ shm_ap_rbuff_open(e->n_api);
bmp_release(instance->port_ids, e->port_id);
list_del(&e->next);
LOG_INFO("Process %d gone, %d deallocated.",
e->n_api, e->port_id);
ipcp_flow_dealloc(e->n_1_api, e->port_id);
+ if (n_rb != NULL)
+ shm_ap_rbuff_destroy(n_rb);
port_map_entry_destroy(e);
}
if (kill(e->n_1_api, 0) < 0) {
+ struct shm_ap_rbuff * n_1_rb =
+ shm_ap_rbuff_open(e->n_1_api);
list_del(&e->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
e->n_1_api, e->port_id);
+ if (n_1_rb != NULL)
+ shm_ap_rbuff_destroy(n_1_rb);
port_map_entry_destroy(e);
}
}
@@ -2205,7 +2214,8 @@ static struct irm * irm_create()
shm_du_map_destroy(dum);
LOG_INFO("Stale shm file removed.");
} else {
- LOG_INFO("IRMd already running, exiting.");
+ LOG_INFO("IRMd already running (%d), exiting.",
+ shm_du_map_owner(dum));
free(instance);
exit(EXIT_SUCCESS);
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index ac995b2d..19bc90e5 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -40,7 +40,7 @@ struct flow {
int port_id;
int oflags;
- /* don't think this needs locking */
+ pid_t api;
};
struct ap_data {
@@ -93,6 +93,7 @@ int ap_init(char * ap_name)
for (i = 0; i < AP_MAX_FLOWS; ++i) {
_ap_instance->flows[i].rb = NULL;
_ap_instance->flows[i].port_id = -1;
+ _ap_instance->flows[i].api = 0; /* API_INVALID */
}
pthread_rwlock_init(&_ap_instance->flows_lock, NULL);
@@ -319,6 +320,8 @@ int flow_alloc(char * dst_name,
_ap_instance->flows[fd].port_id = recv_msg->port_id;
_ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
+ _ap_instance->flows[fd].api =
+ shm_ap_rbuff_get_api(_ap_instance->flows[fd].rb);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
@@ -349,7 +352,7 @@ int flow_alloc_res(int fd)
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = _ap_instance->flows[fd].port_id;
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
@@ -389,11 +392,12 @@ int flow_dealloc(int fd)
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = _ap_instance->flows[fd].port_id;
_ap_instance->flows[fd].port_id = -1;
shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
_ap_instance->flows[fd].rb = NULL;
+ _ap_instance->flows[fd].api = 0;
bmp_release(_ap_instance->fds, fd);
@@ -476,12 +480,12 @@ ssize_t flow_write(int fd, void * buf, size_t count)
}
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- index = shm_create_du_buff(_ap_instance->dum,
- count + DU_BUFF_HEADSPACE +
- DU_BUFF_TAILSPACE,
- DU_BUFF_HEADSPACE,
- (uint8_t *) buf,
- count);
+ index = shm_du_map_write(_ap_instance->dum,
+ _ap_instance->flows[fd].api,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ (uint8_t *) buf,
+ count);
if (index == -1) {
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
@@ -492,18 +496,18 @@ ssize_t flow_write(int fd, void * buf, size_t count)
e.port_id = _ap_instance->flows[fd].port_id;
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
- shm_release_du_buff(_ap_instance->dum, index);
+ shm_du_map_remove(_ap_instance->dum, index);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
return -1;
}
} else { /* blocking */
- while ((index = shm_create_du_buff(_ap_instance->dum,
- count + DU_BUFF_HEADSPACE +
- DU_BUFF_TAILSPACE,
- DU_BUFF_HEADSPACE,
- (uint8_t *) buf,
- count)) < 0)
+ while ((index = shm_du_map_write(_ap_instance->dum,
+ _ap_instance->flows[fd].api,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ (uint8_t *) buf,
+ count)) < 0)
;
e.index = index;
@@ -555,9 +559,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
return -EAGAIN;
}
- n = shm_du_map_read_sdu(&sdu,
- _ap_instance->dum,
- idx);
+ n = shm_du_map_read(&sdu, _ap_instance->dum, idx);
if (n < 0) {
pthread_rwlock_unlock(&_ap_instance->data_lock);
return -1;
@@ -565,7 +567,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
memcpy(buf, sdu, MIN(n, count));
- shm_release_du_buff(_ap_instance->dum, idx);
+ shm_du_map_remove(_ap_instance->dum, idx);
pthread_rwlock_unlock(&_ap_instance->data_lock);
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 69e96c40..f54627b7 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -26,6 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/shm_du_map.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -34,7 +35,6 @@
#include <string.h>
#include <stdint.h>
#include <unistd.h>
-#include <stdbool.h>
#include <errno.h>
#include <sys/stat.h>
@@ -127,6 +127,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
rb->work = (pthread_cond_t *) (rb->shm_mutex + 1);
pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(rb->shm_mutex, &mattr);
@@ -213,6 +214,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
{
char fn[25];
+ struct shm_du_map * dum = NULL;
if (rb == NULL) {
LOG_DBGF("Bogus input. Bugging out.");
@@ -220,8 +222,17 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
}
if (rb->api != getpid()) {
- LOG_ERR("Tried to destroy other AP's rbuff.");
- return;
+ dum = shm_du_map_open();
+ if (shm_du_map_owner(dum) == getpid()) {
+ LOG_DBGF("Ringbuffer %d destroyed by IRMd %d.",
+ rb->api, getpid());
+ shm_du_map_close(dum);
+ } else {
+ LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.",
+ getpid(), rb->api);
+ shm_du_map_close(dum);
+ return;
+ }
}
if (close(rb->fd) < 0)
@@ -243,12 +254,16 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
if (rb == NULL || e == NULL)
return -1;
- pthread_mutex_lock(rb->shm_mutex);
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
if (!shm_rbuff_free(rb)) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
}
+
if (shm_rbuff_empty(rb))
pthread_cond_broadcast(rb->work);
@@ -269,10 +284,21 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->shm_mutex);
- pthread_mutex_lock(rb->shm_mutex);
+
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
+
+ while (tail_el_ptr->port_id < 0)
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
while(shm_rbuff_empty(rb))
- pthread_cond_wait(rb->work, rb->shm_mutex);
+ if (pthread_cond_wait(rb->work, rb->shm_mutex)
+ == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
e = malloc(sizeof(*e));
if (e == NULL) {
@@ -293,13 +319,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
{
ssize_t idx = -1;
- pthread_mutex_lock(rb->shm_mutex);
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
if (shm_rbuff_empty(rb)) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
}
+ while (tail_el_ptr->port_id < 0)
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+
if (tail_el_ptr->port_id != port_id) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
@@ -313,3 +345,27 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
return idx;
}
+
+pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff *rb)
+{
+ pid_t api = 0;
+ if (rb == NULL)
+ return 0;
+
+ pthread_mutex_lock(rb->shm_mutex);
+ api = rb->api;
+ pthread_mutex_unlock(rb->shm_mutex);
+
+ return api;
+}
+
+void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb)
+{
+ if (rb == NULL)
+ return;
+
+ pthread_mutex_lock(rb->shm_mutex);
+ *rb->ptr_tail = 0;
+ *rb->ptr_head = 0;
+ pthread_mutex_unlock(rb->shm_mutex);
+}
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index 2a316265..24adac1a 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -23,11 +23,14 @@
#include <ouroboros/config.h>
#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/time_utils.h>
#include <pthread.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
+#include <signal.h>
#include <sys/stat.h>
#define OUROBOROS_PREFIX "shm_du_map"
@@ -35,8 +38,8 @@
#include <ouroboros/logs.h>
#define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE)
-#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof (size_t) \
- + sizeof(pthread_mutex_t) + sizeof(pthread_cond_t) \
+#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \
+ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
#define get_head_ptr(dum) \
@@ -57,6 +60,8 @@
& (SHM_BLOCKS_IN_MAP - 1))
#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP)
+#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head)
+
#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \
idx_to_du_buff_ptr(dum, idx)->du_head)
@@ -66,7 +71,7 @@ struct shm_du_buff {
size_t size;
size_t du_head;
size_t du_tail;
- size_t garbage;
+ pid_t dst_api;
};
struct shm_du_map {
@@ -74,11 +79,80 @@ struct shm_du_map {
size_t * ptr_head; /* start of ringbuffer head */
size_t * ptr_tail; /* start of ringbuffer tail */
pthread_mutex_t * shm_mutex; /* lock all free space in shm */
- pthread_cond_t * sanitize; /* run sanitizer when buffer full */
+ size_t * choked; /* stale sdu detection */
+ pthread_cond_t * healthy; /* du map is healthy */
+ pthread_cond_t * full; /* run sanitizer when buffer full */
pid_t * api; /* api of the irmd owner */
int fd;
};
+static void garbage_collect(struct shm_du_map * dum)
+{
+#ifndef SHM_MAP_SINGLE_BLOCK
+ long sz;
+ long blocks;
+#endif
+ while (get_tail_ptr(dum)->dst_api == 0 &&
+ !shm_map_empty(dum)) {
+#ifndef SHM_MAP_SINGLE_BLOCK
+ blocks = 0;
+ sz = get_tail_ptr(dum)->size +
+ (long) sizeof(struct shm_du_buff);
+ while (sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ *dum->ptr_tail =
+ (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+
+#else
+ *dum->ptr_tail =
+ (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
+#endif
+ }
+}
+
+static void clean_sdus(struct shm_du_map * dum, pid_t api)
+{
+ size_t idx = *dum->ptr_tail;
+ struct shm_du_buff * buf;
+#ifndef SHM_DU_MAP_SINGLE_BLOCK
+ long sz;
+ long blocks = 0;
+#endif
+
+ while (idx != *dum->ptr_head) {
+ buf = idx_to_du_buff_ptr(dum, idx);
+ if (buf->dst_api == api)
+ buf->dst_api = 0;
+
+#ifndef SHM_DU_MAP_SINGLE_BLOCK
+ blocks = 0;
+ sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff);
+ while (sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+#else
+ idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1);
+#endif
+ }
+
+ garbage_collect(dum);
+
+ if (kill(api, 0) == 0) {
+ struct shm_ap_rbuff * rb;
+ rb = shm_ap_rbuff_open(api);
+ shm_ap_rbuff_reset(rb);
+ shm_ap_rbuff_close(rb);
+ }
+
+ *dum->choked = 0;
+}
+
struct shm_du_map * shm_du_map_create()
{
struct shm_du_map * dum;
@@ -140,8 +214,10 @@ struct shm_du_map * shm_du_map_create()
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1);
- dum->api = (pid_t *) (dum->sanitize + 1);
+ dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->healthy = (pthread_cond_t *) (dum->choked + 1);
+ dum->full = dum->healthy + 1;
+ dum->api = (pid_t *) (dum->full + 1);
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
@@ -150,11 +226,14 @@ struct shm_du_map * shm_du_map_create()
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
- pthread_cond_init(dum->sanitize, &cattr);
+ pthread_cond_init(dum->full, &cattr);
+ pthread_cond_init(dum->healthy, &cattr);
*dum->ptr_head = 0;
*dum->ptr_tail = 0;
+ *dum->choked = 0;
+
*dum->api = getpid();
dum->fd = shm_fd;
@@ -202,8 +281,10 @@ struct shm_du_map * shm_du_map_open()
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1);
- dum->api = (pid_t *) (dum->sanitize + 1);
+ dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->healthy = (pthread_cond_t *) (dum->choked + 1);
+ dum->full = dum->healthy + 1;
+ dum->api = (pid_t *) (dum->full + 1);
dum->fd = shm_fd;
@@ -212,12 +293,73 @@ struct shm_du_map * shm_du_map_open()
pid_t shm_du_map_owner(struct shm_du_map * dum)
{
+ if (dum == NULL)
+ return 0;
+
return *dum->api;
}
void * shm_du_map_sanitize(void * o)
{
- LOG_MISSING;
+ struct shm_du_map * dum = (struct shm_du_map *) o;
+ struct timespec intv
+ = {SHM_DU_TIMEOUT_MICROS / MILLION,
+ (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000};
+
+ pid_t api;
+
+ if (dum == NULL)
+ return (void *) -1;
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
+ (void *) dum->shm_mutex);
+
+ while (true) {
+ int ret = 0;
+ if (pthread_cond_wait(dum->full, dum->shm_mutex)
+ == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ *dum->choked = 1;
+
+ api = get_tail_ptr(dum)->dst_api;
+
+ if (kill(api, 0) == 0) {
+ struct timespec now;
+ struct timespec dl;
+ clock_gettime(CLOCK_REALTIME, &now);
+ ts_add(&now, &intv, &dl);
+ while (*dum->choked) {
+ ret = pthread_cond_timedwait(dum->healthy,
+ dum->shm_mutex,
+ &dl);
+ if (!ret)
+ continue;
+
+ if (ret == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ if (ret == ETIMEDOUT) {
+ LOG_DBGF("SDU timed out.");
+ clean_sdus(dum, api);
+ }
+ }
+ } else {
+ LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret);
+ clean_sdus(dum, api);
+ }
+ }
+
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
@@ -256,21 +398,23 @@ void shm_du_map_destroy(struct shm_du_map * dum)
free(dum);
}
-ssize_t shm_create_du_buff(struct shm_du_map * dum,
- size_t size,
- size_t headspace,
- uint8_t * data,
- size_t len)
+ssize_t shm_du_map_write(struct shm_du_map * dum,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t len)
{
struct shm_du_buff * sdb;
#ifndef SHM_MAP_SINGLE_BLOCK
long blocks = 0;
- int sz = size + sizeof *sdb;
- int sz2 = headspace + len + sizeof *sdb;
+ size_t size = headspace + len + tailspace;
+ int sz = headspace + len + sizeof *sdb;
+ int sz2 = sz + tailspace;
size_t copy_len;
#endif
uint8_t * write_pos;
- ssize_t index;
+ ssize_t index = -1;
if (dum == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -294,36 +438,35 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
return -1;
}
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
#ifndef SHM_MAP_SINGLE_BLOCK
- while (sz > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ while (sz2 > 0) {
sz2 -= SHM_DU_BUFF_BLOCK_SIZE;
- if (sz2 < 0 && sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ if (sz < 0 && sz2 > 0) {
pthread_mutex_unlock(dum->shm_mutex);
- LOG_DBG("Can't handle this packet now");
- return -1;
+ LOG_DBG("Can't handle this packet now.");
+ return -EAGAIN;
}
++blocks;
}
if (!shm_map_free(dum, blocks)) {
- pthread_mutex_unlock(dum->shm_mutex);
- pthread_cond_signal(dum->sanitize);
- return -1;
- }
#else
if (!shm_map_free(dum, 1)) {
+#endif
+ pthread_cond_signal(dum->full);
pthread_mutex_unlock(dum->shm_mutex);
- ptrhead_cond_signal(dum->sanitize);
return -1;
}
-#endif
sdb = get_head_ptr(dum);
sdb->size = size;
- sdb->garbage = 0;
+ sdb->dst_api = dst_api;
sdb->du_head = headspace;
sdb->du_tail = sdb->du_head + len;
@@ -333,7 +476,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb);
while (blocks > 0) {
memcpy(write_pos, data, copy_len);
- *(dum->ptr_head) = (*dum->ptr_head + 1)
+ *dum->ptr_head = (*dum->ptr_head + 1)
& (SHM_BLOCKS_IN_MAP - 1);
len -= copy_len;
copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE);
@@ -346,7 +489,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
#else
memcpy(write_pos, data, len);
index = *dum->ptr_head;
- *(dum->ptr_head) = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
#endif
pthread_mutex_unlock(dum->shm_mutex);
@@ -354,18 +497,21 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
}
/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */
-int shm_du_map_read_sdu(uint8_t ** dst,
- struct shm_du_map * dum,
- ssize_t idx)
+int shm_du_map_read(uint8_t ** dst,
+ struct shm_du_map * dum,
+ ssize_t idx)
{
size_t len = 0;
if (idx > SHM_BLOCKS_IN_MAP)
return -1;
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
- if (*dum->ptr_head == *dum->ptr_tail) {
+ if (shm_map_empty(dum)) {
pthread_mutex_unlock(dum->shm_mutex);
return -1;
}
@@ -380,47 +526,32 @@ int shm_du_map_read_sdu(uint8_t ** dst,
return len;
}
-int shm_release_du_buff(struct shm_du_map * dum, ssize_t idx)
+int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)
{
-#ifndef SHM_MAP_SINGLE_BLOCK
- long sz;
- long blocks = 0;
-#endif
if (idx > SHM_BLOCKS_IN_MAP)
return -1;
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
- if (*dum->ptr_head == *dum->ptr_tail) {
+ if (shm_map_empty(dum)) {
pthread_mutex_unlock(dum->shm_mutex);
return -1;
}
- idx_to_du_buff_ptr(dum, idx)->garbage = 1;
+ idx_to_du_buff_ptr(dum, idx)->dst_api = 0;
if (idx != *dum->ptr_tail) {
pthread_mutex_unlock(dum->shm_mutex);
return 0;
}
- while (get_tail_ptr(dum)->garbage == 1 &&
- *dum->ptr_tail != *dum->ptr_head) {
-#ifndef SHM_MAP_SINGLE_BLOCK
- sz = get_tail_ptr(dum)->size;
- while (sz + (long) sizeof(struct shm_du_buff) > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
- }
+ garbage_collect(dum);
- *(dum->ptr_tail) =
- (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
-
- blocks = 0;
-#else
- *(dum->ptr_tail) =
- (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
-#endif
- }
+ *dum->choked = 0;
+ pthread_cond_signal(dum->healthy);
pthread_mutex_unlock(dum->shm_mutex);