From 79475a4742bc28e1737044f2300bcb601e6e10bf Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Thu, 30 Jun 2016 23:14:14 +0200 Subject: lib: robust locking in shared memory and crash recovery This PR enhances the shared memory providing recovery if a process crashes. It adds a SHM_DU_TIMEOUT_MICROS variable, setting an expiration time for SDU's when shared memory is full. If an application doesn't read a blocking SDU within this time, the shared memory will be cleansed of all SDU's for this application and the application's rbuff will be cleared. Some refactoring of the API's. Fixed wrong pthread checks in IRMd. Fixes #13 Fixes #14 --- src/lib/dev.c | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) (limited to 'src/lib/dev.c') diff --git a/src/lib/dev.c b/src/lib/dev.c index ac995b2d..19bc90e5 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -40,7 +40,7 @@ struct flow { int port_id; int oflags; - /* don't think this needs locking */ + pid_t api; }; struct ap_data { @@ -93,6 +93,7 @@ int ap_init(char * ap_name) for (i = 0; i < AP_MAX_FLOWS; ++i) { _ap_instance->flows[i].rb = NULL; _ap_instance->flows[i].port_id = -1; + _ap_instance->flows[i].api = 0; /* API_INVALID */ } pthread_rwlock_init(&_ap_instance->flows_lock, NULL); @@ -319,6 +320,8 @@ int flow_alloc(char * dst_name, _ap_instance->flows[fd].port_id = recv_msg->port_id; _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + _ap_instance->flows[fd].api = + shm_ap_rbuff_get_api(_ap_instance->flows[fd].rb); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -349,7 +352,7 @@ int flow_alloc_res(int fd) return -ENOTALLOC; } - msg.port_id = _ap_instance->flows[fd].port_id; + msg.port_id = _ap_instance->flows[fd].port_id; pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -389,11 +392,12 @@ int flow_dealloc(int fd) return -ENOTALLOC; } - msg.port_id = _ap_instance->flows[fd].port_id; + msg.port_id = _ap_instance->flows[fd].port_id; _ap_instance->flows[fd].port_id = -1; shm_ap_rbuff_close(_ap_instance->flows[fd].rb); _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].api = 0; bmp_release(_ap_instance->fds, fd); @@ -476,12 +480,12 @@ ssize_t flow_write(int fd, void * buf, size_t count) } if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - index = shm_create_du_buff(_ap_instance->dum, - count + DU_BUFF_HEADSPACE + - DU_BUFF_TAILSPACE, - DU_BUFF_HEADSPACE, - (uint8_t *) buf, - count); + index = shm_du_map_write(_ap_instance->dum, + _ap_instance->flows[fd].api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + (uint8_t *) buf, + count); if (index == -1) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); @@ -492,18 +496,18 @@ ssize_t flow_write(int fd, void * buf, size_t count) e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { - shm_release_du_buff(_ap_instance->dum, index); + shm_du_map_remove(_ap_instance->dum, index); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; } } else { /* blocking */ - while ((index = shm_create_du_buff(_ap_instance->dum, - count + DU_BUFF_HEADSPACE + - DU_BUFF_TAILSPACE, - DU_BUFF_HEADSPACE, - (uint8_t *) buf, - count)) < 0) + while ((index = shm_du_map_write(_ap_instance->dum, + _ap_instance->flows[fd].api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + (uint8_t *) buf, + count)) < 0) ; e.index = index; @@ -555,9 +559,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) return -EAGAIN; } - n = shm_du_map_read_sdu(&sdu, - _ap_instance->dum, - idx); + n = shm_du_map_read(&sdu, _ap_instance->dum, idx); if (n < 0) { pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; @@ -565,7 +567,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) memcpy(buf, sdu, MIN(n, count)); - shm_release_du_buff(_ap_instance->dum, idx); + shm_du_map_remove(_ap_instance->dum, idx); pthread_rwlock_unlock(&_ap_instance->data_lock); -- cgit v1.2.3