From 963537079c7d5a9f9fb39355fb0e3b84a78eaa0b Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 26 Oct 2016 19:30:52 +0200 Subject: lib, ipcpd: Further stabilization of flows The steps for flow deallocation have been further refined. An operation ipcp_flow_fini() which wait for all SDUs to be read from a flow has been added. The shim IPCPs and the local IPCP have been adapted to this new API. Deallocation messages have been removed from the shim IPCPs, since there is insufficient state synchronisation between them to make this work reliably. --- src/lib/dev.c | 55 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 22 deletions(-) (limited to 'src/lib/dev.c') diff --git a/src/lib/dev.c b/src/lib/dev.c index 018cb692..a0c47403 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -612,12 +612,6 @@ int flow_dealloc(int fd) return -ENOTALLOC; } - if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -EBUSY; - } - msg.port_id = ai.flows[fd].port_id; pthread_rwlock_unlock(&ai.flows_lock); @@ -1285,11 +1279,7 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return -EPERM; } - if (ai.flows[fd].tx_rb == NULL) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -EPERM; - } + assert(ai.flows[fd].tx_rb); idx = shm_du_buff_get_idx(sdb); @@ -1302,9 +1292,38 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return 0; } +int ipcp_flow_fini(int fd) +{ + struct shm_rbuff * rb; + + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + rb = ai.flows[fd].rx_rb; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + shm_rbuff_fini(rb); + + return 0; +} + ssize_t local_flow_read(int fd) { - return shm_rbuff_read(ai.flows[fd].rx_rb); + ssize_t ret; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + ret = shm_rbuff_read(ai.flows[fd].rx_rb); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return ret; } int local_flow_write(int fd, size_t idx) @@ -1315,11 +1334,7 @@ int local_flow_write(int fd, size_t idx) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].tx_rb == NULL) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -EPERM; - } + assert(ai.flows[fd].tx_rb); shm_rbuff_write(ai.flows[fd].tx_rb, idx); @@ -1338,11 +1353,7 @@ int ipcp_read_shim(int fd, struct shm_du_buff ** sdb) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].rx_rb == NULL) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -EPERM; - } + assert(ai.flows[fd].rx_rb); idx = shm_rbuff_read(ai.flows[fd].rx_rb); if (idx < 0) { -- cgit v1.2.3