summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-17 09:08:21 +0000
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-17 09:08:21 +0000
commitafb04d957a5322d483e6422f3baf1a651915af10 (patch)
tree038cbefec387fc15af2dd0b8325b063977a08590
parent059dc0f942330b1e8a989f37a7caf3766f8e6c50 (diff)
parentc79ab46894053312f80390bf13a52c238a7d4704 (diff)
downloadouroboros-afb04d957a5322d483e6422f3baf1a651915af10.tar.gz
ouroboros-afb04d957a5322d483e6422f3baf1a651915af10.zip
Merged in dstaesse/ouroboros/be-fp (pull request #265)
lib: Stabilize fast flow deallocation over local IPCP
-rw-r--r--include/ouroboros/local-dev.h7
-rw-r--r--include/ouroboros/shm_ap_rbuff.h2
-rw-r--r--src/ipcpd/local/main.c37
-rw-r--r--src/lib/dev.c49
-rw-r--r--src/lib/shm_ap_rbuff.c31
5 files changed, 72 insertions, 54 deletions
diff --git a/include/ouroboros/local-dev.h b/include/ouroboros/local-dev.h
index b4915672..77ff47e9 100644
--- a/include/ouroboros/local-dev.h
+++ b/include/ouroboros/local-dev.h
@@ -25,10 +25,9 @@
#ifndef OUROBOROS_LOCAL_DEV_H
#define OUROBOROS_LOCAL_DEV_H
-/* returns flow descriptor and rb_entry, no access to du_buff */
-int local_flow_read(struct rb_entry * e);
+struct rb_entry * local_flow_read(int fd);
-int local_flow_write(int fd,
- struct rb_entry * e);
+int local_flow_write(int fd,
+ struct rb_entry * e);
#endif /* OUROBOROS_LOCAL_DEV_H */
diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h
index 1e45ef7f..453e4bf8 100644
--- a/include/ouroboros/shm_ap_rbuff.h
+++ b/include/ouroboros/shm_ap_rbuff.h
@@ -47,7 +47,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);
void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb,
int port_id);
-void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb,
+int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb,
int port_id);
int shm_ap_rbuff_write(struct shm_ap_rbuff * rb,
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 7d23c08d..4e500a8a 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -24,6 +24,8 @@
#include "ipcp.h"
#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
+#include <ouroboros/fcntl.h>
+#include <ouroboros/select.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/local-dev.h>
#define OUROBOROS_PREFIX "ipcpd/local"
@@ -66,8 +68,10 @@ void local_data_fini()
static void * ipcp_local_sdu_loop(void * o)
{
while (true) {
- struct rb_entry e;
- int fd = local_flow_read(&e);
+ int fd;
+ struct rb_entry * e;
+
+ fd = flow_select(NULL, NULL);
pthread_rwlock_rdlock(&ipcpi.state_lock);
@@ -77,13 +81,18 @@ static void * ipcp_local_sdu_loop(void * o)
}
pthread_rwlock_rdlock(&local_data.lock);
+
+ e = local_flow_read(fd);
+
fd = local_data.in_out[fd];
- pthread_rwlock_unlock(&local_data.lock);
if (fd != -1)
- local_flow_write(fd, &e);
+ local_flow_write(fd, e);
+ pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ free(e);
}
return (void *) 1;
@@ -209,8 +218,6 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
int out_fd = -1;
int ret = -1;
- LOG_DBG("Received response for fd %d: %d.", fd, response);
-
if (response)
return 0;
@@ -235,25 +242,23 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
static int ipcp_local_flow_dealloc(int fd)
{
- int out_fd = -1;
+ struct timespec t = {0, 10000};
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_wrlock(&local_data.lock);
+ if (fd < 0)
+ return -EINVAL;
- out_fd = local_data.in_out[fd];
+ while (flow_dealloc(fd) == -EBUSY)
+ nanosleep(&t, NULL);
- if (out_fd != -1) {
- local_data.in_out[out_fd] = -1;
- flow_dealloc(out_fd);
- }
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&local_data.lock);
+ flow_cntl(local_data.in_out[fd], FLOW_F_SETFL, FLOW_O_WRONLY);
local_data.in_out[fd] = -1;
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- flow_dealloc(fd);
-
LOG_INFO("Flow with fd %d deallocated.", fd);
return 0;
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 577fa7a7..77c2d06a 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -548,6 +548,12 @@ int flow_dealloc(int fd)
return -ENOTALLOC;
}
+ if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EBUSY;
+ }
+
msg.port_id = ai.flows[fd].port_id;
port_destroy(&ai.ports[msg.port_id]);
@@ -555,6 +561,7 @@ int flow_dealloc(int fd)
ai.flows[fd].port_id = -1;
shm_ap_rbuff_close(ai.flows[fd].rb);
ai.flows[fd].rb = NULL;
+ ai.flows[fd].oflags = 0;
ai.flows[fd].api = -1;
if (ai.flows[fd].timeout != NULL) {
free(ai.flows[fd].timeout);
@@ -563,8 +570,6 @@ int flow_dealloc(int fd)
bmp_release(ai.fds, fd);
- shm_ap_rbuff_close_port(ai.rb, msg.port_id);
-
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -598,6 +603,10 @@ int flow_cntl(int fd, int cmd, int oflags)
return old;
case FLOW_F_SETFL: /* SET FLOW FLAGS */
ai.flows[fd].oflags = oflags;
+ if (oflags & FLOW_O_WRONLY)
+ shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id);
+ if (oflags & FLOW_O_RDWR)
+ shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return old;
@@ -628,6 +637,12 @@ ssize_t flow_write(int fd, void * buf, size_t count)
return -ENOTALLOC;
}
+ if (ai.flows[fd].oflags & FLOW_O_RDONLY) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EPERM;
+ }
+
if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
idx = shm_rdrbuff_write(ai.rdrb,
ai.flows[fd].api,
@@ -1074,6 +1089,12 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
+ if (ai.flows[fd].oflags & FLOW_O_RDONLY) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EPERM;
+ }
+
if (ai.flows[fd].rb == NULL) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1091,26 +1112,32 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return 0;
}
-int local_flow_read(struct rb_entry * e)
+struct rb_entry * local_flow_read(int fd)
{
- int fd;
-
- *e = *(shm_ap_rbuff_read(ai.rb));
+ int port_id;
+ struct rb_entry * e = NULL;
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- fd = ai.ports[e->port_id].fd;
+ port_id = ai.flows[fd].port_id;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return fd;
+ if (port_id != -1) {
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+ e->index = shm_ap_rbuff_read_port(ai.rb, port_id);
+ }
+
+ return e;
}
int local_flow_write(int fd, struct rb_entry * e)
{
- if (e == NULL)
+ if (e == NULL || fd < 0)
return -EINVAL;
pthread_rwlock_rdlock(&ai.data_lock);
@@ -1135,9 +1162,7 @@ int local_flow_write(int fd, struct rb_entry * e)
int ipcp_read_shim(struct shm_du_buff ** sdb)
{
int fd;
- struct rb_entry * e;
-
- e = shm_ap_rbuff_read(ai.rb);
+ struct rb_entry * e = shm_ap_rbuff_read(ai.rb);
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index ede0b7f7..5cbf5bd0 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -249,18 +249,14 @@ void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id)
pthread_mutex_consistent(rb->lock);
}
#endif
-
-#ifdef OUROBOROS_CONFIG_DEBUG
- if (!rb->acl[port_id])
- LOG_DBG("Trying to open open port.");
-#endif
rb->acl[port_id] = 0; /* open */
pthread_mutex_unlock(rb->lock);
}
-void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id)
+int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id)
{
+ int ret = 0;
assert(rb);
@@ -272,13 +268,14 @@ void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id)
pthread_mutex_consistent(rb->lock);
}
#endif
-#ifdef OUROBOROS_CONFIG_DEBUG
- if (rb->acl[port_id])
- LOG_DBG("Trying to close closed port.");
-#endif
rb->acl[port_id] = -1;
+ if (rb->cntrs[port_id] > 0)
+ ret = -EBUSY;
+
pthread_mutex_unlock(rb->lock);
+
+ return ret;
}
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
@@ -551,6 +548,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
{
ssize_t idx = -1;
+ assert(rb);
+
#ifdef __APPLE__
pthread_mutex_lock(rb->lock);
#else
@@ -559,11 +558,6 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
pthread_mutex_consistent(rb->lock);
}
#endif
- if (rb->acl[port_id]) {
- pthread_mutex_unlock(rb->lock);
- return -ENOTALLOC;
- }
-
if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) {
pthread_mutex_unlock(rb->lock);
return -1;
@@ -597,11 +591,6 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
pthread_mutex_consistent(rb->lock);
}
#endif
- if (rb->acl[port_id]) {
- pthread_mutex_unlock(rb->lock);
- return -ENOTALLOC;
- }
-
if (timeout != NULL) {
idx = -ETIMEDOUT;
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
@@ -650,7 +639,7 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
if (ret != ETIMEDOUT) {
idx = tail_el_ptr(rb)->index;
- --rb->cntrs[port_id];
+ --rb->cntrs[port_id];
*rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
pthread_cond_broadcast(rb->del);