From 4931526cf9b5e40294e043deab856f25bf56c7cf Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 3 Aug 2016 13:40:16 +0200 Subject: lib: Revise blocking I/O Blocking I/O now uses condition variables in the shared memory instead of busy waiting. Timeouts can be specified. This requires the size of the rbuffs and du_map to be the same, to guarantee that when the shm_du_map is not full, the ap_rbuffs can't be full either. Added the timeout option to the flow for future use. --- src/lib/dev.c | 56 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 27 deletions(-) (limited to 'src/lib/dev.c') diff --git a/src/lib/dev.c b/src/lib/dev.c index 22e77169..ce919263 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -41,6 +41,8 @@ struct flow { int oflags; pid_t api; + + struct timespec * timeout; }; struct ap_data { @@ -93,7 +95,9 @@ int ap_init(char * ap_name) for (i = 0; i < AP_MAX_FLOWS; ++i) { _ap_instance->flows[i].rb = NULL; _ap_instance->flows[i].port_id = -1; + _ap_instance->flows[i].oflags = 0; _ap_instance->flows[i].api = -1; + _ap_instance->flows[i].timeout = NULL; } pthread_rwlock_init(&_ap_instance->flows_lock, NULL); @@ -127,6 +131,9 @@ void ap_fini(void) pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_destroy(&_ap_instance->flows_lock); + pthread_rwlock_destroy(&_ap_instance->data_lock); + free(_ap_instance); } @@ -458,7 +465,7 @@ int flow_cntl(int fd, int cmd, int oflags) ssize_t flow_write(int fd, void * buf, size_t count) { - ssize_t index; + ssize_t idx; struct rb_entry e; if (buf == NULL) @@ -477,37 +484,35 @@ ssize_t flow_write(int fd, void * buf, size_t count) } if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - index = shm_du_map_write(_ap_instance->dum, - _ap_instance->flows[fd].api, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - (uint8_t *) buf, - count); - if (index == -1) { + idx = shm_du_map_write(_ap_instance->dum, + _ap_instance->flows[fd].api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + (uint8_t *) buf, + count); + if (idx == -1) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); return -EAGAIN; } - e.index = index; + e.index = idx; e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { - shm_du_map_remove(_ap_instance->dum, index); + shm_du_map_remove(_ap_instance->dum, idx); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; } } else { /* blocking */ - while ((index = shm_du_map_write(_ap_instance->dum, - _ap_instance->flows[fd].api, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - (uint8_t *) buf, - count)) < 0) - ; - - e.index = index; + idx = shm_du_map_write_b(_ap_instance->dum, + _ap_instance->flows[fd].api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + (uint8_t *) buf, + count); + e.index = idx; e.port_id = _ap_instance->flows[fd].port_id; while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) @@ -546,16 +551,13 @@ ssize_t flow_read(int fd, void * buf, size_t count) return -ENOTALLOC; } - if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { + if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) idx = shm_ap_rbuff_read_port(_ap_instance->rb, _ap_instance->flows[fd].port_id); - } else { /* block */ - while ((idx = - shm_ap_rbuff_read_port(_ap_instance->rb, - _ap_instance-> - flows[fd].port_id)) < 0) - ; - } + else + idx = shm_ap_rbuff_read_port_b(_ap_instance->rb, + _ap_instance->flows[fd].port_id, + _ap_instance->flows[fd].timeout); pthread_rwlock_unlock(&_ap_instance->flows_lock); -- cgit v1.2.3