From b8b05e3b1980146ab8acb40cbe77e0271634c688 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 15 Oct 2016 19:30:27 +0200 Subject: lib: Stabilize fast flow deallocation over local IPCP --- src/ipcpd/local/main.c | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) (limited to 'src/ipcpd/local') diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 7d23c08d..b8b3335c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -24,6 +24,7 @@ #include "ipcp.h" #include #include +#include #include #include #define OUROBOROS_PREFIX "ipcpd/local" @@ -66,8 +67,10 @@ void local_data_fini() static void * ipcp_local_sdu_loop(void * o) { while (true) { - struct rb_entry e; - int fd = local_flow_read(&e); + int fd; + struct rb_entry * e; + + fd = flow_select(NULL, NULL); pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -77,13 +80,18 @@ static void * ipcp_local_sdu_loop(void * o) } pthread_rwlock_rdlock(&local_data.lock); + + e = local_flow_read(fd); + fd = local_data.in_out[fd]; - pthread_rwlock_unlock(&local_data.lock); if (fd != -1) - local_flow_write(fd, &e); + local_flow_write(fd, e); + pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); + + free(e); } return (void *) 1; @@ -209,8 +217,6 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) int out_fd = -1; int ret = -1; - LOG_DBG("Received response for fd %d: %d.", fd, response); - if (response) return 0; @@ -235,25 +241,22 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) static int ipcp_local_flow_dealloc(int fd) { - int out_fd = -1; + struct timespec t = {0, 10000}; - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_wrlock(&local_data.lock); + if (fd < 0) + return -EINVAL; - out_fd = local_data.in_out[fd]; + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); - if (out_fd != -1) { - local_data.in_out[out_fd] = -1; - flow_dealloc(out_fd); - } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&local_data.lock); local_data.in_out[fd] = -1; pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); - LOG_INFO("Flow with fd %d deallocated.", fd); return 0; -- cgit v1.2.3 From c79ab46894053312f80390bf13a52c238a7d4704 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 16 Oct 2016 09:38:01 +0200 Subject: lib, dev: Implement read/write options for flows Added the missing implementation of setting read/write options for flows. This allows applications to block the fast path for remotes. IPCPs can use this to block the fast path for the N + 1 flow when receiving remote deallocation requests. --- src/ipcpd/local/main.c | 2 ++ src/lib/dev.c | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) (limited to 'src/ipcpd/local') diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index b8b3335c..4e500a8a 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -24,6 +24,7 @@ #include "ipcp.h" #include #include +#include #include #include #include @@ -252,6 +253,7 @@ static int ipcp_local_flow_dealloc(int fd) pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&local_data.lock); + flow_cntl(local_data.in_out[fd], FLOW_F_SETFL, FLOW_O_WRONLY); local_data.in_out[fd] = -1; pthread_rwlock_unlock(&local_data.lock); diff --git a/src/lib/dev.c b/src/lib/dev.c index a3082a7d..77c2d06a 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -561,6 +561,7 @@ int flow_dealloc(int fd) ai.flows[fd].port_id = -1; shm_ap_rbuff_close(ai.flows[fd].rb); ai.flows[fd].rb = NULL; + ai.flows[fd].oflags = 0; ai.flows[fd].api = -1; if (ai.flows[fd].timeout != NULL) { free(ai.flows[fd].timeout); @@ -602,6 +603,10 @@ int flow_cntl(int fd, int cmd, int oflags) return old; case FLOW_F_SETFL: /* SET FLOW FLAGS */ ai.flows[fd].oflags = oflags; + if (oflags & FLOW_O_WRONLY) + shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id); + if (oflags & FLOW_O_RDWR) + shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return old; @@ -632,6 +637,12 @@ ssize_t flow_write(int fd, void * buf, size_t count) return -ENOTALLOC; } + if (ai.flows[fd].oflags & FLOW_O_RDONLY) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { idx = shm_rdrbuff_write(ai.rdrb, ai.flows[fd].api, @@ -1078,6 +1089,12 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); + if (ai.flows[fd].oflags & FLOW_O_RDONLY) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } + if (ai.flows[fd].rb == NULL) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); -- cgit v1.2.3