From 79475a4742bc28e1737044f2300bcb601e6e10bf Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Thu, 30 Jun 2016 23:14:14 +0200 Subject: lib: robust locking in shared memory and crash recovery This PR enhances the shared memory providing recovery if a process crashes. It adds a SHM_DU_TIMEOUT_MICROS variable, setting an expiration time for SDU's when shared memory is full. If an application doesn't read a blocking SDU within this time, the shared memory will be cleansed of all SDU's for this application and the application's rbuff will be cleared. Some refactoring of the API's. Fixed wrong pthread checks in IRMd. Fixes #13 Fixes #14 --- src/ipcpd/flow.h | 2 ++ src/ipcpd/local/main.c | 2 +- src/ipcpd/shim-eth-llc/main.c | 19 ++++++++++--------- src/ipcpd/shim-udp/main.c | 20 ++++++++++---------- 4 files changed, 23 insertions(+), 20 deletions(-) (limited to 'src/ipcpd') diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h index e27882e2..b0f1390a 100644 --- a/src/ipcpd/flow.h +++ b/src/ipcpd/flow.h @@ -32,6 +32,8 @@ struct flow { int port_id; struct shm_ap_rbuff * rb; enum flow_state state; + + pid_t api; }; #endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 2120e4e8..837cbf8c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -178,7 +178,6 @@ static int port_id_to_fd(int port_id) /* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ static void * ipcp_local_sdu_loop(void * o) { - while (true) { struct rb_entry * e; int fd; @@ -208,6 +207,7 @@ static void * ipcp_local_sdu_loop(void * o) while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0) ; + pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 48b6391f..68e7e933 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -722,11 +722,12 @@ static void * eth_llc_ipcp_sdu_reader(void * o) continue; } - while ((index = - shm_create_du_buff(shim_data(_ipcp)->dum, - frame_len, 0, - (uint8_t *) (buf + i), - frame_len)) < 0) + while ((index = shm_du_map_write(shim_data(_ipcp)->dum, + ipcp_flow(j)->api, + 0, + 0, + (uint8_t *) (buf + i), + frame_len)) < 0) ; e.index = index; @@ -769,9 +770,9 @@ static void * eth_llc_ipcp_sdu_writer(void * o) return (void *) 1; /* -ENOTENROLLED */ } - len = shm_du_map_read_sdu((uint8_t **) &buf, - shim_data(_ipcp)->dum, - e->index); + len = shm_du_map_read((uint8_t **) &buf, + shim_data(_ipcp)->dum, + e->index); if (len <= 0) { pthread_rwlock_unlock(&_ipcp->state_lock); free(e); @@ -798,7 +799,7 @@ static void * eth_llc_ipcp_sdu_writer(void * o) pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); if (shim_data(_ipcp)->dum != NULL) - shm_release_du_buff(shim_data(_ipcp)->dum, e->index); + shm_du_map_remove(shim_data(_ipcp)->dum, e->index); pthread_rwlock_unlock(&_ipcp->state_lock); } diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index a28c262f..68d393af 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -206,12 +206,12 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) pthread_rwlock_rdlock(&_ap_instance->flows_lock); - while ((index = shm_create_du_buff(_ap_instance->dum, - count + DU_BUFF_HEADSPACE + - DU_BUFF_TAILSPACE, - DU_BUFF_HEADSPACE, - (uint8_t *) buf, - count)) < 0) + while ((index = shm_du_map_write(_ap_instance->dum, + _ap_instance->flows[fd].api, + 0, + 0, + (uint8_t *) buf, + count)) < 0) ; e.index = index; @@ -772,9 +772,9 @@ static void * ipcp_udp_sdu_loop(void * o) return (void *) 1; /* -ENOTENROLLED */ } - len = shm_du_map_read_sdu((uint8_t **) &buf, - _ap_instance->dum, - e->index); + len = shm_du_map_read((uint8_t **) &buf, + _ap_instance->dum, + e->index); if (len <= 0) { pthread_rwlock_unlock(&_ipcp->state_lock); free(e); @@ -799,7 +799,7 @@ static void * ipcp_udp_sdu_loop(void * o) pthread_rwlock_rdlock(&_ipcp->state_lock); if (_ap_instance->dum != NULL) - shm_release_du_buff(_ap_instance->dum, e->index); + shm_du_map_remove(_ap_instance->dum, e->index); pthread_rwlock_unlock(&_ipcp->state_lock); } -- cgit v1.2.3