From 4931526cf9b5e40294e043deab856f25bf56c7cf Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 3 Aug 2016 13:40:16 +0200 Subject: lib: Revise blocking I/O Blocking I/O now uses condition variables in the shared memory instead of busy waiting. Timeouts can be specified. This requires the size of the rbuffs and du_map to be the same, to guarantee that when the shm_du_map is not full, the ap_rbuffs can't be full either. Added the timeout option to the flow for future use. --- include/ouroboros/config.h.in | 3 +- include/ouroboros/shm_ap_rbuff.h | 3 + include/ouroboros/shm_du_map.h | 6 ++ src/ipcpd/shim-eth-llc/main.c | 18 ++-- src/lib/dev.c | 56 ++++++----- src/lib/shm_ap_rbuff.c | 126 +++++++++++++++++------ src/lib/shm_du_map.c | 210 +++++++++++++++++++++++++++------------ 7 files changed, 291 insertions(+), 131 deletions(-) diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 8898699c..46685569 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -40,12 +40,11 @@ #define SHM_DU_MAP_MULTI_BLOCK #define SHM_DU_MAP_FILENAME "/ouroboros.shm" #define LOCKFILE_NAME "/ouroboros.lockfile" -#define SHM_BLOCKS_IN_MAP (1 << 14) +#define SHM_BUFFER_SIZE (1 << 14) #define SHM_DU_TIMEOUT_MICROS 15000 #define DU_BUFF_HEADSPACE 128 #define DU_BUFF_TAILSPACE 0 #define SHM_AP_RBUFF_PREFIX "/ouroboros.rbuff." -#define SHM_RBUFF_SIZE (1 << 14) #define IRMD_MAX_FLOWS 4096 #define IRMD_THREADPOOL_SIZE 5 #define IRMD_FLOW_TIMEOUT 5000 /* ms */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index 257a289d..50b077f0 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -46,6 +46,9 @@ int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb, const struct timespec * timeout); ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id); +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, + int port_id, + const struct timespec * timeout); pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff * rb); void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb); #endif /* OUROBOROS_SHM_AP_RBUFF_H */ diff --git a/include/ouroboros/shm_du_map.h b/include/ouroboros/shm_du_map.h index b9c56cf8..98013fc9 100644 --- a/include/ouroboros/shm_du_map.h +++ b/include/ouroboros/shm_du_map.h @@ -45,6 +45,12 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, size_t tailspace, uint8_t * data, size_t data_len); +ssize_t shm_du_map_write_b(struct shm_du_map * dum, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t data_len); int shm_du_map_read(uint8_t ** dst, struct shm_du_map * dum, ssize_t idx); diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 9e315335..f98799a5 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -386,7 +386,7 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE], shim_data(_ipcp)->tx_offset = (shim_data(_ipcp)->tx_offset + 1) - & (SHM_BLOCKS_IN_MAP -1); + & (SHM_BUFFER_SIZE -1); #else device = (shim_data(_ipcp))->device; @@ -719,7 +719,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o) MAC_SIZE) && memcmp(br_addr, &llc_frame->dst_hwaddr, MAC_SIZE)) { #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) - offset = (offset + 1) & (SHM_BLOCKS_IN_MAP - 1); + offset = (offset + 1) & (SHM_BUFFER_SIZE - 1); header->tp_status = TP_STATUS_KERNEL; #endif continue; @@ -730,7 +730,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o) if (ntohs(length) > SHIM_ETH_LLC_MAX_SDU_SIZE) { /* Not an LLC packet. */ #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) - offset = (offset + 1) & (SHM_BLOCKS_IN_MAP - 1); + offset = (offset + 1) & (SHM_BUFFER_SIZE - 1); header->tp_status = TP_STATUS_KERNEL; #endif continue; @@ -758,7 +758,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o) pthread_rwlock_unlock(&_ipcp->state_lock); #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) offset = (offset + 1) - & (SHM_BLOCKS_IN_MAP - 1); + & (SHM_BUFFER_SIZE - 1); header->tp_status = TP_STATUS_KERNEL; #endif continue; @@ -784,7 +784,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o) pthread_rwlock_unlock(&_ipcp->state_lock); } #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) - offset = (offset + 1) & (SHM_BLOCKS_IN_MAP -1); + offset = (offset + 1) & (SHM_BUFFER_SIZE -1); header->tp_status = TP_STATUS_KERNEL; #endif } @@ -1009,8 +1009,8 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) req.tp_block_size = SHM_DU_BUFF_BLOCK_SIZE; req.tp_frame_size = SHM_DU_BUFF_BLOCK_SIZE; - req.tp_block_nr = SHM_BLOCKS_IN_MAP; - req.tp_frame_nr = SHM_BLOCKS_IN_MAP; + req.tp_block_nr = SHM_BUFFER_SIZE; + req.tp_frame_nr = SHM_BUFFER_SIZE; if (setsockopt(fd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req))) { @@ -1036,7 +1036,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) shim_data(_ipcp)->rx_ring = mmap(NULL, 2 * SHM_DU_BUFF_BLOCK_SIZE - * SHM_BLOCKS_IN_MAP, + * SHM_BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (shim_data(_ipcp)->rx_ring == NULL) { @@ -1045,7 +1045,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) return -1; } shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring - + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BLOCKS_IN_MAP); + + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BUFFER_SIZE); #endif diff --git a/src/lib/dev.c b/src/lib/dev.c index 22e77169..ce919263 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -41,6 +41,8 @@ struct flow { int oflags; pid_t api; + + struct timespec * timeout; }; struct ap_data { @@ -93,7 +95,9 @@ int ap_init(char * ap_name) for (i = 0; i < AP_MAX_FLOWS; ++i) { _ap_instance->flows[i].rb = NULL; _ap_instance->flows[i].port_id = -1; + _ap_instance->flows[i].oflags = 0; _ap_instance->flows[i].api = -1; + _ap_instance->flows[i].timeout = NULL; } pthread_rwlock_init(&_ap_instance->flows_lock, NULL); @@ -127,6 +131,9 @@ void ap_fini(void) pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_destroy(&_ap_instance->flows_lock); + pthread_rwlock_destroy(&_ap_instance->data_lock); + free(_ap_instance); } @@ -458,7 +465,7 @@ int flow_cntl(int fd, int cmd, int oflags) ssize_t flow_write(int fd, void * buf, size_t count) { - ssize_t index; + ssize_t idx; struct rb_entry e; if (buf == NULL) @@ -477,37 +484,35 @@ ssize_t flow_write(int fd, void * buf, size_t count) } if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - index = shm_du_map_write(_ap_instance->dum, - _ap_instance->flows[fd].api, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - (uint8_t *) buf, - count); - if (index == -1) { + idx = shm_du_map_write(_ap_instance->dum, + _ap_instance->flows[fd].api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + (uint8_t *) buf, + count); + if (idx == -1) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); return -EAGAIN; } - e.index = index; + e.index = idx; e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { - shm_du_map_remove(_ap_instance->dum, index); + shm_du_map_remove(_ap_instance->dum, idx); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; } } else { /* blocking */ - while ((index = shm_du_map_write(_ap_instance->dum, - _ap_instance->flows[fd].api, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - (uint8_t *) buf, - count)) < 0) - ; - - e.index = index; + idx = shm_du_map_write_b(_ap_instance->dum, + _ap_instance->flows[fd].api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + (uint8_t *) buf, + count); + e.index = idx; e.port_id = _ap_instance->flows[fd].port_id; while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) @@ -546,16 +551,13 @@ ssize_t flow_read(int fd, void * buf, size_t count) return -ENOTALLOC; } - if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { + if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) idx = shm_ap_rbuff_read_port(_ap_instance->rb, _ap_instance->flows[fd].port_id); - } else { /* block */ - while ((idx = - shm_ap_rbuff_read_port(_ap_instance->rb, - _ap_instance-> - flows[fd].port_id)) < 0) - ; - } + else + idx = shm_ap_rbuff_read_port_b(_ap_instance->rb, + _ap_instance->flows[fd].port_id, + _ap_instance->flows[fd].timeout); pthread_rwlock_unlock(&_ap_instance->flows_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index be4cd0c2..56555533 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -42,23 +42,27 @@ #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC -#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \ +#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ - + sizeof (pthread_cond_t)) + + 2 * sizeof (pthread_cond_t)) -#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail) \ - & (SHM_RBUFF_SIZE - 1)) -#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_BUFFER_SIZE - *rb->ptr_tail) \ + & (SHM_BUFFER_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) #define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail) -#define head_el_ptr (rb->shm_base + *rb->ptr_head) -#define tail_el_ptr (rb->shm_base + *rb->ptr_tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->ptr_head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->ptr_tail) +#define clean_sdus(rb) \ + while (!shm_rbuff_empty(rb) && tail_el_ptr(rb)->port_id < 0) \ + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); \ struct shm_ap_rbuff { struct rb_entry * shm_base; /* start of entry */ size_t * ptr_head; /* start of ringbuffer head */ size_t * ptr_tail; /* start of ringbuffer tail */ pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * work; /* threads will wait for a signal */ + pthread_cond_t * add; /* SDU arrived */ + pthread_cond_t * del; /* SDU removed */ pid_t api; /* api to which this rb belongs */ int fd; }; @@ -125,10 +129,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() } rb->shm_base = shm_base; - rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); rb->ptr_tail = rb->ptr_head + 1; rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1); - rb->work = (pthread_cond_t *) (rb->lock + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; pthread_mutexattr_init(&mattr); pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); @@ -138,7 +143,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); - pthread_cond_init(rb->work, &cattr); + pthread_cond_init(rb->add, &cattr); + pthread_cond_init(rb->del, &cattr); *rb->ptr_head = 0; *rb->ptr_tail = 0; @@ -190,10 +196,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) } rb->shm_base = shm_base; - rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); rb->ptr_tail = rb->ptr_head + 1; rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1); - rb->work = (pthread_cond_t *) (rb->lock + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; rb->fd = shm_fd; rb->api = api; @@ -243,7 +250,8 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) } pthread_mutex_destroy(rb->lock); - pthread_cond_destroy(rb->work); + pthread_cond_destroy(rb->add); + pthread_cond_destroy(rb->del); if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); @@ -275,10 +283,10 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) } if (shm_rbuff_empty(rb)) - pthread_cond_broadcast(rb->work); + pthread_cond_broadcast(rb->add); - *head_el_ptr = *e; - *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); + *head_el_ptr(rb) = *e; + *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1); pthread_mutex_unlock(rb->lock); @@ -307,13 +315,17 @@ int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb, pthread_mutex_consistent(rb->lock); } + clean_sdus(rb); + while (shm_rbuff_empty(rb)) { if (timeout != NULL) - ret = pthread_cond_timedwait(rb->work, + ret = pthread_cond_timedwait(rb->add, rb->lock, &abstime); else - ret = pthread_cond_wait(rb->work, rb->lock); + ret = pthread_cond_wait(rb->add, rb->lock); + + clean_sdus(rb); if (ret == EOWNERDEAD) { LOG_DBG("Recovering dead mutex."); @@ -348,11 +360,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) pthread_mutex_consistent(rb->lock); } - while (!shm_rbuff_empty(rb) && tail_el_ptr->port_id < 0) - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + clean_sdus(rb); while (shm_rbuff_empty(rb)) - if (pthread_cond_wait(rb->work, rb->lock) == EOWNERDEAD) { + if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) { LOG_DBG("Recovering dead mutex."); pthread_mutex_consistent(rb->lock); } @@ -365,7 +376,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) *e = *(rb->shm_base + *rb->ptr_tail); - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); pthread_cleanup_pop(true); @@ -381,24 +392,75 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) pthread_mutex_consistent(rb->lock); } - if (shm_rbuff_empty(rb)) { + clean_sdus(rb); + + if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { pthread_mutex_unlock(rb->lock); return -1; } - while (tail_el_ptr->port_id < 0) - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + idx = tail_el_ptr(rb)->index; - if (tail_el_ptr->port_id != port_id) { - pthread_mutex_unlock(rb->lock); - return -1; + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + + pthread_cond_broadcast(rb->del); + + pthread_mutex_unlock(rb->lock); + + return idx; +} + +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, + int port_id, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + ssize_t idx = -1; + + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); } - idx = tail_el_ptr->index; + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + clean_sdus(rb); - pthread_mutex_unlock(rb->lock); + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (tail_el_ptr(rb)->port_id != port_id) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->del, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->del, rb->lock); + + clean_sdus(rb); + + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } + + if (ret == ETIMEDOUT) { + pthread_mutex_unlock(rb->lock); + return -ret; + } + } + + idx = tail_el_ptr(rb)->index; + + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + + pthread_cond_broadcast(rb->del); + + pthread_cleanup_pop(true); return idx; } diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index b090bb74..a12ef223 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -40,7 +40,7 @@ #include -#define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE) +#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE) #define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) @@ -59,9 +59,9 @@ #define block_ptr_to_idx(dum, sdb) \ (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) -#define shm_map_used(dum)((*dum->ptr_head + SHM_BLOCKS_IN_MAP - *dum->ptr_tail)\ - & (SHM_BLOCKS_IN_MAP - 1)) -#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\ + & (SHM_BUFFER_SIZE - 1)) +#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE) #define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) @@ -79,7 +79,7 @@ struct shm_du_map { uint8_t * shm_base; /* start of blocks */ size_t * ptr_head; /* start of ringbuffer head */ size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + pthread_mutex_t * lock; /* lock all free space in shm */ size_t * choked; /* stale sdu detection */ pthread_cond_t * healthy; /* du map is healthy */ pthread_cond_t * full; /* run sanitizer when buffer full */ @@ -94,12 +94,12 @@ static void garbage_collect(struct shm_du_map * dum) while ((sdb = get_tail_ptr(dum))->dst_api == -1 && !shm_map_empty(dum)) *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks) - & (SHM_BLOCKS_IN_MAP - 1); + & (SHM_BUFFER_SIZE - 1); #else while (get_tail_ptr(dum)->dst_api == -1 && !shm_map_empty(dum)) *dum->ptr_tail = - (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); + (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1); #endif } @@ -114,9 +114,9 @@ static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit) if (buf->dst_api == api) buf->dst_api = -1; #ifdef SHM_DU_MAP_MULTI_BLOCK - idx = (idx + buf->blocks) & (SHM_BLOCKS_IN_MAP - 1); + idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1); #else - idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1); + idx = (idx + 1) & (SHM_BUFFER_SIZE - 1); #endif } @@ -194,8 +194,8 @@ struct shm_du_map * shm_du_map_create() dum->ptr_head = (size_t *) ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; - dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); + dum->choked = (size_t *) (dum->lock + 1); dum->healthy = (pthread_cond_t *) (dum->choked + 1); dum->full = dum->healthy + 1; dum->api = (pid_t *) (dum->full + 1); @@ -203,7 +203,7 @@ struct shm_du_map * shm_du_map_create() pthread_mutexattr_init(&mattr); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); - pthread_mutex_init(dum->shm_mutex, &mattr); + pthread_mutex_init(dum->lock, &mattr); pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); @@ -261,8 +261,8 @@ struct shm_du_map * shm_du_map_open() dum->ptr_head = (size_t *) ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; - dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); + dum->choked = (size_t *) (dum->lock + 1); dum->healthy = (pthread_cond_t *) (dum->choked + 1); dum->full = dum->healthy + 1; dum->api = (pid_t *) (dum->full + 1); @@ -283,23 +283,23 @@ void * shm_du_map_sanitize(void * o) if (dum == NULL) return (void *) -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, - (void *) dum->shm_mutex); + (void *) dum->lock); while (true) { int ret = 0; struct timespec now; struct timespec dl; - if (pthread_cond_wait(dum->full, dum->shm_mutex) + if (pthread_cond_wait(dum->full, dum->lock) == EOWNERDEAD) { LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } *dum->choked = 1; @@ -321,14 +321,14 @@ void * shm_du_map_sanitize(void * o) ts_add(&now, &intv, &dl); while (*dum->choked) { ret = pthread_cond_timedwait(dum->healthy, - dum->shm_mutex, + dum->lock, &dl); if (!ret) continue; if (ret == EOWNERDEAD) { LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } if (ret == ETIMEDOUT) { @@ -429,9 +429,9 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, return -1; } #endif - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } #ifdef SHM_DU_MAP_MULTI_BLOCK while (sz > 0) { @@ -439,15 +439,15 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, ++blocks; } - if (blocks + *dum->ptr_head > SHM_BLOCKS_IN_MAP - 1) - padblocks = SHM_BLOCKS_IN_MAP - *dum->ptr_head; + if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1) + padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; if (!shm_map_free(dum, (blocks + padblocks))) { #else if (!shm_map_free(dum, 1)) { #endif pthread_cond_signal(dum->full); - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return -1; } @@ -477,11 +477,99 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, idx = *dum->ptr_head; #ifdef SHM_DU_MAP_MULTI_BLOCK - *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BLOCKS_IN_MAP - 1); + *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); #else - *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); #endif - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); + + return idx; +} + +ssize_t shm_du_map_write_b(struct shm_du_map * dum, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) +{ + struct shm_du_buff * sdb; + size_t size = headspace + len + tailspace; +#ifdef SHM_DU_MAP_MULTI_BLOCK + long blocks = 0; + long padblocks = 0; + int sz = size + sizeof *sdb; +#endif + uint8_t * write_pos; + ssize_t idx = -1; + + if (dum == NULL || data == NULL) { + LOG_DBGF("Bogus input, bugging out."); + return -1; + } + +#ifndef SHM_DU_MAP_MULTI_BLOCK + if (sz > SHM_DU_BUFF_BLOCK_SIZE) { + LOG_DBGF("Multi-block SDU's disabled. Dropping."); + return -1; + } +#endif + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->lock); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) dum->lock); + +#ifdef SHM_DU_MAP_MULTI_BLOCK + while (sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1) + padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; + + while (!shm_map_free(dum, (blocks + padblocks))) { +#else + while (!shm_map_free(dum, 1)) { +#endif + pthread_cond_signal(dum->full); + pthread_cond_wait(dum->healthy, dum->lock); + } + +#ifdef SHM_DU_MAP_MULTI_BLOCK + if (padblocks) { + sdb = get_head_ptr(dum); + sdb->size = 0; + sdb->blocks = padblocks; + sdb->dst_api = -1; + sdb->du_head = 0; + sdb->du_tail = 0; + + *dum->ptr_head = 0; + } +#endif + sdb = get_head_ptr(dum); + sdb->size = size; + sdb->dst_api = dst_api; + sdb->du_head = headspace; + sdb->du_tail = sdb->du_head + len; +#ifdef SHM_DU_MAP_MULTI_BLOCK + sdb->blocks = blocks; +#endif + write_pos = ((uint8_t *) (sdb + 1)) + headspace; + + memcpy(write_pos, data, len); + + idx = *dum->ptr_head; +#ifdef SHM_DU_MAP_MULTI_BLOCK + *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif + pthread_cleanup_pop(true); return idx; } @@ -493,16 +581,16 @@ int shm_du_map_read(uint8_t ** dst, size_t len = 0; struct shm_du_buff * sdb; - if (idx > SHM_BLOCKS_IN_MAP) + if (idx > SHM_BUFFER_SIZE) return -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } if (shm_map_empty(dum)) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return -1; } @@ -510,30 +598,30 @@ int shm_du_map_read(uint8_t ** dst, len = sdb->du_tail - sdb->du_head; *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return len; } int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) { - if (idx > SHM_BLOCKS_IN_MAP) + if (idx > SHM_BUFFER_SIZE) return -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } if (shm_map_empty(dum)) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return -1; } idx_to_du_buff_ptr(dum, idx)->dst_api = -1; if (idx != *dum->ptr_tail) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return 0; } @@ -541,9 +629,9 @@ int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) *dum->choked = 0; - pthread_cond_signal(dum->healthy); + pthread_cond_broadcast(dum->healthy); - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return 0; } @@ -558,18 +646,18 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum, if (dum == NULL) return NULL; - if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) + if (idx < 0 || idx > SHM_BUFFER_SIZE) return NULL; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } sdb = idx_to_du_buff_ptr(dum, idx); if ((long) (sdb->du_head - size) < 0) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); LOG_DBGF("Failed to allocate PCI headspace."); return NULL; } @@ -578,7 +666,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum, buf = (uint8_t *) (sdb + 1) + sdb->du_head; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return buf; } @@ -593,18 +681,18 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum, if (dum == NULL) return NULL; - if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) + if (idx < 0 || idx > SHM_BUFFER_SIZE) return NULL; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } sdb = idx_to_du_buff_ptr(dum, idx); if (sdb->du_tail + size >= sdb->size) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); LOG_DBGF("Failed to allocate PCI tailspace."); return NULL; } @@ -613,7 +701,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum, sdb->du_tail += size; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return buf; } @@ -627,25 +715,25 @@ int shm_du_buff_head_release(struct shm_du_map * dum, if (dum == NULL) return -1; - if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) + if (idx < 0 || idx > SHM_BUFFER_SIZE) return -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } sdb = idx_to_du_buff_ptr(dum, idx); if (size > sdb->du_tail - sdb->du_head) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); LOG_DBGF("Tried to release beyond sdu boundary."); return -EOVERFLOW; } sdb->du_head += size; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return 0; } @@ -659,25 +747,25 @@ int shm_du_buff_tail_release(struct shm_du_map * dum, if (dum == NULL) return -1; - if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) + if (idx < 0 || idx > SHM_BUFFER_SIZE) return -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } sdb = idx_to_du_buff_ptr(dum, idx); if (size > sdb->du_tail - sdb->du_head) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); LOG_DBGF("Tried to release beyond sdu boundary."); return -EOVERFLOW; } sdb->du_tail -= size; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return 0; } -- cgit v1.2.3