summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-06-30 23:14:14 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-07-02 19:11:12 +0200
commit79475a4742bc28e1737044f2300bcb601e6e10bf (patch)
treecd79dba391c0ded80125836069d8187a22f7e5f5
parentd85f211d53a0cb35a756d0c44a2b28807eff4e5d (diff)
downloadouroboros-79475a4742bc28e1737044f2300bcb601e6e10bf.tar.gz
ouroboros-79475a4742bc28e1737044f2300bcb601e6e10bf.zip
lib: robust locking in shared memory and crash recovery
This PR enhances the shared memory providing recovery if a process crashes. It adds a SHM_DU_TIMEOUT_MICROS variable, setting an expiration time for SDU's when shared memory is full. If an application doesn't read a blocking SDU within this time, the shared memory will be cleansed of all SDU's for this application and the application's rbuff will be cleared. Some refactoring of the API's. Fixed wrong pthread checks in IRMd. Fixes #13 Fixes #14
-rw-r--r--include/ouroboros/config.h.in8
-rw-r--r--include/ouroboros/shm_ap_rbuff.h14
-rw-r--r--include/ouroboros/shm_du_map.h34
-rw-r--r--src/ipcpd/flow.h2
-rw-r--r--src/ipcpd/local/main.c2
-rw-r--r--src/ipcpd/shim-eth-llc/main.c19
-rw-r--r--src/ipcpd/shim-udp/main.c20
-rw-r--r--src/irmd/main.c18
-rw-r--r--src/lib/dev.c42
-rw-r--r--src/lib/shm_ap_rbuff.c70
-rw-r--r--src/lib/shm_du_map.c257
11 files changed, 338 insertions, 148 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 0dce7acd..6861c6cb 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -26,13 +26,19 @@
#define PROJECT_NAME "@CMAKE_PROJECT_NAME@"
#define PROJECT_VERSION "@PACKAGE_VERSION@"
#define INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@"
-#define _POSIX_C_SOURCE 200112L
+#define _POSIX_C_SOURCE 200809L
#define IPCP_SHIM_UDP_EXEC "@IPCP_SHIM_UDP_TARGET@"
#define IPCP_SHIM_ETH_LLC_EXEC "@IPCP_SHIM_ETH_LLC_TARGET@"
#define IPCP_NORMAL_EXEC "@IPCP_NORMAL_TARGET@"
#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@"
#define AP_MAX_FLOWS 256
+#define SHM_DU_BUFF_BLOCK_SIZE sysconf(_SC_PAGESIZE)
+#define SHM_DU_MAP_FILENAME "ouroboros_du_map"
+#define SHM_BLOCKS_IN_MAP (1 << 14)
+#define SHM_DU_TIMEOUT_MICROS 2000
#define DU_BUFF_HEADSPACE 128
#define DU_BUFF_TAILSPACE 0
+#define SHM_AP_RBUFF_PREFIX "ouroboros_rb_"
+#define SHM_RBUFF_SIZE (1 << 14)
#endif
diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h
index 053709bb..78926869 100644
--- a/include/ouroboros/shm_ap_rbuff.h
+++ b/include/ouroboros/shm_ap_rbuff.h
@@ -24,15 +24,8 @@
#ifndef OUROBOROS_SHM_AP_RBUFF_H
#define OUROBOROS_SHM_AP_RBUFF_H
-#ifndef SHM_AP_RBUFF
-#define SHM_AP_RBUFF_PREFIX "ouroboros_rb_"
-#endif
-
-#ifndef SHM_RBUFF_SIZE
-#define SHM_RBUFF_SIZE (1 << 14)
-#endif
-
#include <sys/types.h>
+#include <stdbool.h>
struct shm_ap_rbuff;
@@ -42,7 +35,7 @@ struct rb_entry {
};
struct shm_ap_rbuff * shm_ap_rbuff_create();
-struct shm_ap_rbuff * shm_ap_rbuff_open();
+struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api);
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb);
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);
int shm_ap_rbuff_write(struct shm_ap_rbuff * rb,
@@ -50,5 +43,6 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb,
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb);
ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb,
int port_id);
-
+pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff * rb);
+void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb);
#endif /* OUROBOROS_SHM_AP_RBUFF_H */
diff --git a/include/ouroboros/shm_du_map.h b/include/ouroboros/shm_du_map.h
index 2d215651..e8934bae 100644
--- a/include/ouroboros/shm_du_map.h
+++ b/include/ouroboros/shm_du_map.h
@@ -24,18 +24,6 @@
#ifndef OUROBOROS_SHM_DU_MAP_H
#define OUROBOROS_SHM_DU_MAP_H
-#ifndef SHM_DU_BUFF_BLOCK_SIZE
-#define SHM_DU_BUFF_BLOCK_SIZE sysconf(_SC_PAGESIZE)
-#endif
-
-#ifndef SHM_DU_MAP_FILENAME
-#define SHM_DU_MAP_FILENAME "ouroboros_du_map"
-#endif
-
-#ifndef SHM_BLOCKS_IN_MAP
-#define SHM_BLOCKS_IN_MAP (1 << 14)
-#endif
-
#include "common.h"
#include <sys/types.h>
#include <pthread.h>
@@ -51,18 +39,18 @@ pid_t shm_du_map_owner(struct shm_du_map * dum);
void * shm_du_map_sanitize(void * o);
/* returns the index of the buffer in the DU map */
-ssize_t shm_create_du_buff(struct shm_du_map * dum,
- size_t size,
- size_t headspace,
- uint8_t * data,
- size_t len);
+ssize_t shm_du_map_write(struct shm_du_map * dum,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t data_len);
-/* FIXME: revise these */
-int shm_du_map_read_sdu(uint8_t ** dst,
- struct shm_du_map * dum,
- ssize_t idx);
-int shm_release_du_buff(struct shm_du_map * dum,
- ssize_t idx);
+int shm_du_map_read(uint8_t ** dst,
+ struct shm_du_map * dum,
+ ssize_t idx);
+int shm_du_map_remove(struct shm_du_map * dum,
+ ssize_t idx);
/* FIXME: use shm_du_map * and index */
uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,
diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h
index e27882e2..b0f1390a 100644
--- a/src/ipcpd/flow.h
+++ b/src/ipcpd/flow.h
@@ -32,6 +32,8 @@ struct flow {
int port_id;
struct shm_ap_rbuff * rb;
enum flow_state state;
+
+ pid_t api;
};
#endif /* OUROBOROS_FLOW_H */
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 2120e4e8..837cbf8c 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -178,7 +178,6 @@ static int port_id_to_fd(int port_id)
/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
static void * ipcp_local_sdu_loop(void * o)
{
-
while (true) {
struct rb_entry * e;
int fd;
@@ -208,6 +207,7 @@ static void * ipcp_local_sdu_loop(void * o)
while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0)
;
+
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ipcp->state_lock);
}
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 48b6391f..68e7e933 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -722,11 +722,12 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
continue;
}
- while ((index =
- shm_create_du_buff(shim_data(_ipcp)->dum,
- frame_len, 0,
- (uint8_t *) (buf + i),
- frame_len)) < 0)
+ while ((index = shm_du_map_write(shim_data(_ipcp)->dum,
+ ipcp_flow(j)->api,
+ 0,
+ 0,
+ (uint8_t *) (buf + i),
+ frame_len)) < 0)
;
e.index = index;
@@ -769,9 +770,9 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
return (void *) 1; /* -ENOTENROLLED */
}
- len = shm_du_map_read_sdu((uint8_t **) &buf,
- shim_data(_ipcp)->dum,
- e->index);
+ len = shm_du_map_read((uint8_t **) &buf,
+ shim_data(_ipcp)->dum,
+ e->index);
if (len <= 0) {
pthread_rwlock_unlock(&_ipcp->state_lock);
free(e);
@@ -798,7 +799,7 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
if (shim_data(_ipcp)->dum != NULL)
- shm_release_du_buff(shim_data(_ipcp)->dum, e->index);
+ shm_du_map_remove(shim_data(_ipcp)->dum, e->index);
pthread_rwlock_unlock(&_ipcp->state_lock);
}
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index a28c262f..68d393af 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -206,12 +206,12 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
pthread_rwlock_rdlock(&_ap_instance->flows_lock);
- while ((index = shm_create_du_buff(_ap_instance->dum,
- count + DU_BUFF_HEADSPACE +
- DU_BUFF_TAILSPACE,
- DU_BUFF_HEADSPACE,
- (uint8_t *) buf,
- count)) < 0)
+ while ((index = shm_du_map_write(_ap_instance->dum,
+ _ap_instance->flows[fd].api,
+ 0,
+ 0,
+ (uint8_t *) buf,
+ count)) < 0)
;
e.index = index;
@@ -772,9 +772,9 @@ static void * ipcp_udp_sdu_loop(void * o)
return (void *) 1; /* -ENOTENROLLED */
}
- len = shm_du_map_read_sdu((uint8_t **) &buf,
- _ap_instance->dum,
- e->index);
+ len = shm_du_map_read((uint8_t **) &buf,
+ _ap_instance->dum,
+ e->index);
if (len <= 0) {
pthread_rwlock_unlock(&_ipcp->state_lock);
free(e);
@@ -799,7 +799,7 @@ static void * ipcp_udp_sdu_loop(void * o)
pthread_rwlock_rdlock(&_ipcp->state_lock);
if (_ap_instance->dum != NULL)
- shm_release_du_buff(_ap_instance->dum, e->index);
+ shm_du_map_remove(_ap_instance->dum, e->index);
pthread_rwlock_unlock(&_ipcp->state_lock);
}
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 5ff84da1..b4771b89 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -30,6 +30,7 @@
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
#include <ouroboros/irm_config.h>
+#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/shm_du_map.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/flow.h>
@@ -163,7 +164,7 @@ static void reg_instance_destroy(struct reg_instance * i)
while (wait) {
pthread_mutex_lock(&i->mutex);
- if (pthread_cond_destroy(&i->wakeup) < 0)
+ if (pthread_cond_destroy(&i->wakeup))
pthread_cond_broadcast(&i->wakeup);
else
wait = false;
@@ -291,7 +292,7 @@ static void port_map_entry_destroy(struct port_map_entry * e)
while (wait) {
pthread_mutex_lock(&e->res_lock);
- if (pthread_cond_destroy(&e->res_signal) < 0)
+ if (pthread_cond_destroy(&e->res_signal))
pthread_cond_broadcast(&e->res_signal);
else
wait = false;
@@ -477,7 +478,7 @@ static void reg_entry_destroy(struct reg_entry * e)
while (wait) {
pthread_mutex_lock(&e->state_lock);
- if (pthread_cond_destroy(&e->acc_signal) < 0)
+ if (pthread_cond_destroy(&e->acc_signal))
pthread_cond_broadcast(&e->acc_signal);
else
wait = false;
@@ -1942,18 +1943,26 @@ void * irm_flow_cleaner()
pthread_mutex_unlock(&e->res_lock);
if (kill(e->n_api, 0) < 0) {
+ struct shm_ap_rbuff * n_rb =
+ shm_ap_rbuff_open(e->n_api);
bmp_release(instance->port_ids, e->port_id);
list_del(&e->next);
LOG_INFO("Process %d gone, %d deallocated.",
e->n_api, e->port_id);
ipcp_flow_dealloc(e->n_1_api, e->port_id);
+ if (n_rb != NULL)
+ shm_ap_rbuff_destroy(n_rb);
port_map_entry_destroy(e);
}
if (kill(e->n_1_api, 0) < 0) {
+ struct shm_ap_rbuff * n_1_rb =
+ shm_ap_rbuff_open(e->n_1_api);
list_del(&e->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
e->n_1_api, e->port_id);
+ if (n_1_rb != NULL)
+ shm_ap_rbuff_destroy(n_1_rb);
port_map_entry_destroy(e);
}
}
@@ -2205,7 +2214,8 @@ static struct irm * irm_create()
shm_du_map_destroy(dum);
LOG_INFO("Stale shm file removed.");
} else {
- LOG_INFO("IRMd already running, exiting.");
+ LOG_INFO("IRMd already running (%d), exiting.",
+ shm_du_map_owner(dum));
free(instance);
exit(EXIT_SUCCESS);
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index ac995b2d..19bc90e5 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -40,7 +40,7 @@ struct flow {
int port_id;
int oflags;
- /* don't think this needs locking */
+ pid_t api;
};
struct ap_data {
@@ -93,6 +93,7 @@ int ap_init(char * ap_name)
for (i = 0; i < AP_MAX_FLOWS; ++i) {
_ap_instance->flows[i].rb = NULL;
_ap_instance->flows[i].port_id = -1;
+ _ap_instance->flows[i].api = 0; /* API_INVALID */
}
pthread_rwlock_init(&_ap_instance->flows_lock, NULL);
@@ -319,6 +320,8 @@ int flow_alloc(char * dst_name,
_ap_instance->flows[fd].port_id = recv_msg->port_id;
_ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
+ _ap_instance->flows[fd].api =
+ shm_ap_rbuff_get_api(_ap_instance->flows[fd].rb);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
@@ -349,7 +352,7 @@ int flow_alloc_res(int fd)
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = _ap_instance->flows[fd].port_id;
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
@@ -389,11 +392,12 @@ int flow_dealloc(int fd)
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = _ap_instance->flows[fd].port_id;
_ap_instance->flows[fd].port_id = -1;
shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
_ap_instance->flows[fd].rb = NULL;
+ _ap_instance->flows[fd].api = 0;
bmp_release(_ap_instance->fds, fd);
@@ -476,12 +480,12 @@ ssize_t flow_write(int fd, void * buf, size_t count)
}
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- index = shm_create_du_buff(_ap_instance->dum,
- count + DU_BUFF_HEADSPACE +
- DU_BUFF_TAILSPACE,
- DU_BUFF_HEADSPACE,
- (uint8_t *) buf,
- count);
+ index = shm_du_map_write(_ap_instance->dum,
+ _ap_instance->flows[fd].api,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ (uint8_t *) buf,
+ count);
if (index == -1) {
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
@@ -492,18 +496,18 @@ ssize_t flow_write(int fd, void * buf, size_t count)
e.port_id = _ap_instance->flows[fd].port_id;
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
- shm_release_du_buff(_ap_instance->dum, index);
+ shm_du_map_remove(_ap_instance->dum, index);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
return -1;
}
} else { /* blocking */
- while ((index = shm_create_du_buff(_ap_instance->dum,
- count + DU_BUFF_HEADSPACE +
- DU_BUFF_TAILSPACE,
- DU_BUFF_HEADSPACE,
- (uint8_t *) buf,
- count)) < 0)
+ while ((index = shm_du_map_write(_ap_instance->dum,
+ _ap_instance->flows[fd].api,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ (uint8_t *) buf,
+ count)) < 0)
;
e.index = index;
@@ -555,9 +559,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
return -EAGAIN;
}
- n = shm_du_map_read_sdu(&sdu,
- _ap_instance->dum,
- idx);
+ n = shm_du_map_read(&sdu, _ap_instance->dum, idx);
if (n < 0) {
pthread_rwlock_unlock(&_ap_instance->data_lock);
return -1;
@@ -565,7 +567,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
memcpy(buf, sdu, MIN(n, count));
- shm_release_du_buff(_ap_instance->dum, idx);
+ shm_du_map_remove(_ap_instance->dum, idx);
pthread_rwlock_unlock(&_ap_instance->data_lock);
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 69e96c40..f54627b7 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -26,6 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/shm_du_map.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -34,7 +35,6 @@
#include <string.h>
#include <stdint.h>
#include <unistd.h>
-#include <stdbool.h>
#include <errno.h>
#include <sys/stat.h>
@@ -127,6 +127,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
rb->work = (pthread_cond_t *) (rb->shm_mutex + 1);
pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(rb->shm_mutex, &mattr);
@@ -213,6 +214,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
{
char fn[25];
+ struct shm_du_map * dum = NULL;
if (rb == NULL) {
LOG_DBGF("Bogus input. Bugging out.");
@@ -220,8 +222,17 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
}
if (rb->api != getpid()) {
- LOG_ERR("Tried to destroy other AP's rbuff.");
- return;
+ dum = shm_du_map_open();
+ if (shm_du_map_owner(dum) == getpid()) {
+ LOG_DBGF("Ringbuffer %d destroyed by IRMd %d.",
+ rb->api, getpid());
+ shm_du_map_close(dum);
+ } else {
+ LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.",
+ getpid(), rb->api);
+ shm_du_map_close(dum);
+ return;
+ }
}
if (close(rb->fd) < 0)
@@ -243,12 +254,16 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
if (rb == NULL || e == NULL)
return -1;
- pthread_mutex_lock(rb->shm_mutex);
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
if (!shm_rbuff_free(rb)) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
}
+
if (shm_rbuff_empty(rb))
pthread_cond_broadcast(rb->work);
@@ -269,10 +284,21 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->shm_mutex);
- pthread_mutex_lock(rb->shm_mutex);
+
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
+
+ while (tail_el_ptr->port_id < 0)
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
while(shm_rbuff_empty(rb))
- pthread_cond_wait(rb->work, rb->shm_mutex);
+ if (pthread_cond_wait(rb->work, rb->shm_mutex)
+ == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
e = malloc(sizeof(*e));
if (e == NULL) {
@@ -293,13 +319,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
{
ssize_t idx = -1;
- pthread_mutex_lock(rb->shm_mutex);
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
if (shm_rbuff_empty(rb)) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
}
+ while (tail_el_ptr->port_id < 0)
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+
if (tail_el_ptr->port_id != port_id) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
@@ -313,3 +345,27 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
return idx;
}
+
+pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff *rb)
+{
+ pid_t api = 0;
+ if (rb == NULL)
+ return 0;
+
+ pthread_mutex_lock(rb->shm_mutex);
+ api = rb->api;
+ pthread_mutex_unlock(rb->shm_mutex);
+
+ return api;
+}
+
+void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb)
+{
+ if (rb == NULL)
+ return;
+
+ pthread_mutex_lock(rb->shm_mutex);
+ *rb->ptr_tail = 0;
+ *rb->ptr_head = 0;
+ pthread_mutex_unlock(rb->shm_mutex);
+}
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index 2a316265..24adac1a 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -23,11 +23,14 @@
#include <ouroboros/config.h>
#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/time_utils.h>
#include <pthread.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
+#include <signal.h>
#include <sys/stat.h>
#define OUROBOROS_PREFIX "shm_du_map"
@@ -35,8 +38,8 @@
#include <ouroboros/logs.h>
#define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE)
-#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof (size_t) \
- + sizeof(pthread_mutex_t) + sizeof(pthread_cond_t) \
+#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \
+ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
#define get_head_ptr(dum) \
@@ -57,6 +60,8 @@
& (SHM_BLOCKS_IN_MAP - 1))
#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP)
+#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head)
+
#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \
idx_to_du_buff_ptr(dum, idx)->du_head)
@@ -66,7 +71,7 @@ struct shm_du_buff {
size_t size;
size_t du_head;
size_t du_tail;
- size_t garbage;
+ pid_t dst_api;
};
struct shm_du_map {
@@ -74,11 +79,80 @@ struct shm_du_map {
size_t * ptr_head; /* start of ringbuffer head */
size_t * ptr_tail; /* start of ringbuffer tail */
pthread_mutex_t * shm_mutex; /* lock all free space in shm */
- pthread_cond_t * sanitize; /* run sanitizer when buffer full */
+ size_t * choked; /* stale sdu detection */
+ pthread_cond_t * healthy; /* du map is healthy */
+ pthread_cond_t * full; /* run sanitizer when buffer full */
pid_t * api; /* api of the irmd owner */
int fd;
};
+static void garbage_collect(struct shm_du_map * dum)
+{
+#ifndef SHM_MAP_SINGLE_BLOCK
+ long sz;
+ long blocks;
+#endif
+ while (get_tail_ptr(dum)->dst_api == 0 &&
+ !shm_map_empty(dum)) {
+#ifndef SHM_MAP_SINGLE_BLOCK
+ blocks = 0;
+ sz = get_tail_ptr(dum)->size +
+ (long) sizeof(struct shm_du_buff);
+ while (sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ *dum->ptr_tail =
+ (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+
+#else
+ *dum->ptr_tail =
+ (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
+#endif
+ }
+}
+
+static void clean_sdus(struct shm_du_map * dum, pid_t api)
+{
+ size_t idx = *dum->ptr_tail;
+ struct shm_du_buff * buf;
+#ifndef SHM_DU_MAP_SINGLE_BLOCK
+ long sz;
+ long blocks = 0;
+#endif
+
+ while (idx != *dum->ptr_head) {
+ buf = idx_to_du_buff_ptr(dum, idx);
+ if (buf->dst_api == api)
+ buf->dst_api = 0;
+
+#ifndef SHM_DU_MAP_SINGLE_BLOCK
+ blocks = 0;
+ sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff);
+ while (sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+#else
+ idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1);
+#endif
+ }
+
+ garbage_collect(dum);
+
+ if (kill(api, 0) == 0) {
+ struct shm_ap_rbuff * rb;
+ rb = shm_ap_rbuff_open(api);
+ shm_ap_rbuff_reset(rb);
+ shm_ap_rbuff_close(rb);
+ }
+
+ *dum->choked = 0;
+}
+
struct shm_du_map * shm_du_map_create()
{
struct shm_du_map * dum;
@@ -140,8 +214,10 @@ struct shm_du_map * shm_du_map_create()
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1);
- dum->api = (pid_t *) (dum->sanitize + 1);
+ dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->healthy = (pthread_cond_t *) (dum->choked + 1);
+ dum->full = dum->healthy + 1;
+ dum->api = (pid_t *) (dum->full + 1);
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
@@ -150,11 +226,14 @@ struct shm_du_map * shm_du_map_create()
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
- pthread_cond_init(dum->sanitize, &cattr);
+ pthread_cond_init(dum->full, &cattr);
+ pthread_cond_init(dum->healthy, &cattr);
*dum->ptr_head = 0;
*dum->ptr_tail = 0;
+ *dum->choked = 0;
+
*dum->api = getpid();
dum->fd = shm_fd;
@@ -202,8 +281,10 @@ struct shm_du_map * shm_du_map_open()
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1);
- dum->api = (pid_t *) (dum->sanitize + 1);
+ dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->healthy = (pthread_cond_t *) (dum->choked + 1);
+ dum->full = dum->healthy + 1;
+ dum->api = (pid_t *) (dum->full + 1);
dum->fd = shm_fd;
@@ -212,12 +293,73 @@ struct shm_du_map * shm_du_map_open()
pid_t shm_du_map_owner(struct shm_du_map * dum)
{
+ if (dum == NULL)
+ return 0;
+
return *dum->api;
}
void * shm_du_map_sanitize(void * o)
{
- LOG_MISSING;
+ struct shm_du_map * dum = (struct shm_du_map *) o;
+ struct timespec intv
+ = {SHM_DU_TIMEOUT_MICROS / MILLION,
+ (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000};
+
+ pid_t api;
+
+ if (dum == NULL)
+ return (void *) -1;
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
+ (void *) dum->shm_mutex);
+
+ while (true) {
+ int ret = 0;
+ if (pthread_cond_wait(dum->full, dum->shm_mutex)
+ == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ *dum->choked = 1;
+
+ api = get_tail_ptr(dum)->dst_api;
+
+ if (kill(api, 0) == 0) {
+ struct timespec now;
+ struct timespec dl;
+ clock_gettime(CLOCK_REALTIME, &now);
+ ts_add(&now, &intv, &dl);
+ while (*dum->choked) {
+ ret = pthread_cond_timedwait(dum->healthy,
+ dum->shm_mutex,
+ &dl);
+ if (!ret)
+ continue;
+
+ if (ret == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
+
+ if (ret == ETIMEDOUT) {
+ LOG_DBGF("SDU timed out.");
+ clean_sdus(dum, api);
+ }
+ }
+ } else {
+ LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret);
+ clean_sdus(dum, api);
+ }
+ }
+
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
@@ -256,21 +398,23 @@ void shm_du_map_destroy(struct shm_du_map * dum)
free(dum);
}
-ssize_t shm_create_du_buff(struct shm_du_map * dum,
- size_t size,
- size_t headspace,
- uint8_t * data,
- size_t len)
+ssize_t shm_du_map_write(struct shm_du_map * dum,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t len)
{
struct shm_du_buff * sdb;
#ifndef SHM_MAP_SINGLE_BLOCK
long blocks = 0;
- int sz = size + sizeof *sdb;
- int sz2 = headspace + len + sizeof *sdb;
+ size_t size = headspace + len + tailspace;
+ int sz = headspace + len + sizeof *sdb;
+ int sz2 = sz + tailspace;
size_t copy_len;
#endif
uint8_t * write_pos;
- ssize_t index;
+ ssize_t index = -1;
if (dum == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -294,36 +438,35 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
return -1;
}
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
#ifndef SHM_MAP_SINGLE_BLOCK
- while (sz > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ while (sz2 > 0) {
sz2 -= SHM_DU_BUFF_BLOCK_SIZE;
- if (sz2 < 0 && sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ if (sz < 0 && sz2 > 0) {
pthread_mutex_unlock(dum->shm_mutex);
- LOG_DBG("Can't handle this packet now");
- return -1;
+ LOG_DBG("Can't handle this packet now.");
+ return -EAGAIN;
}
++blocks;
}
if (!shm_map_free(dum, blocks)) {
- pthread_mutex_unlock(dum->shm_mutex);
- pthread_cond_signal(dum->sanitize);
- return -1;
- }
#else
if (!shm_map_free(dum, 1)) {
+#endif
+ pthread_cond_signal(dum->full);
pthread_mutex_unlock(dum->shm_mutex);
- ptrhead_cond_signal(dum->sanitize);
return -1;
}
-#endif
sdb = get_head_ptr(dum);
sdb->size = size;
- sdb->garbage = 0;
+ sdb->dst_api = dst_api;
sdb->du_head = headspace;
sdb->du_tail = sdb->du_head + len;
@@ -333,7 +476,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb);
while (blocks > 0) {
memcpy(write_pos, data, copy_len);
- *(dum->ptr_head) = (*dum->ptr_head + 1)
+ *dum->ptr_head = (*dum->ptr_head + 1)
& (SHM_BLOCKS_IN_MAP - 1);
len -= copy_len;
copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE);
@@ -346,7 +489,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
#else
memcpy(write_pos, data, len);
index = *dum->ptr_head;
- *(dum->ptr_head) = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
#endif
pthread_mutex_unlock(dum->shm_mutex);
@@ -354,18 +497,21 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum,
}
/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */
-int shm_du_map_read_sdu(uint8_t ** dst,
- struct shm_du_map * dum,
- ssize_t idx)
+int shm_du_map_read(uint8_t ** dst,
+ struct shm_du_map * dum,
+ ssize_t idx)
{
size_t len = 0;
if (idx > SHM_BLOCKS_IN_MAP)
return -1;
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
- if (*dum->ptr_head == *dum->ptr_tail) {
+ if (shm_map_empty(dum)) {
pthread_mutex_unlock(dum->shm_mutex);
return -1;
}
@@ -380,47 +526,32 @@ int shm_du_map_read_sdu(uint8_t ** dst,
return len;
}
-int shm_release_du_buff(struct shm_du_map * dum, ssize_t idx)
+int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)
{
-#ifndef SHM_MAP_SINGLE_BLOCK
- long sz;
- long blocks = 0;
-#endif
if (idx > SHM_BLOCKS_IN_MAP)
return -1;
- pthread_mutex_lock(dum->shm_mutex);
+ if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->shm_mutex);
+ }
- if (*dum->ptr_head == *dum->ptr_tail) {
+ if (shm_map_empty(dum)) {
pthread_mutex_unlock(dum->shm_mutex);
return -1;
}
- idx_to_du_buff_ptr(dum, idx)->garbage = 1;
+ idx_to_du_buff_ptr(dum, idx)->dst_api = 0;
if (idx != *dum->ptr_tail) {
pthread_mutex_unlock(dum->shm_mutex);
return 0;
}
- while (get_tail_ptr(dum)->garbage == 1 &&
- *dum->ptr_tail != *dum->ptr_head) {
-#ifndef SHM_MAP_SINGLE_BLOCK
- sz = get_tail_ptr(dum)->size;
- while (sz + (long) sizeof(struct shm_du_buff) > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
- }
+ garbage_collect(dum);
- *(dum->ptr_tail) =
- (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
-
- blocks = 0;
-#else
- *(dum->ptr_tail) =
- (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
-#endif
- }
+ *dum->choked = 0;
+ pthread_cond_signal(dum->healthy);
pthread_mutex_unlock(dum->shm_mutex);