summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-26 19:30:52 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-26 20:09:21 +0200
commit963537079c7d5a9f9fb39355fb0e3b84a78eaa0b (patch)
tree7f9a78e0d57f95d903bcbbf01a00e71482593277 /src/lib
parent7848ec4100f8677392fb6b07c42dd47ee6aa9b0d (diff)
downloadouroboros-963537079c7d5a9f9fb39355fb0e3b84a78eaa0b.tar.gz
ouroboros-963537079c7d5a9f9fb39355fb0e3b84a78eaa0b.zip
lib, ipcpd: Further stabilization of flows
The steps for flow deallocation have been further refined. An operation ipcp_flow_fini() which wait for all SDUs to be read from a flow has been added. The shim IPCPs and the local IPCP have been adapted to this new API. Deallocation messages have been removed from the shim IPCPs, since there is insufficient state synchronisation between them to make this work reliably.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/dev.c55
-rw-r--r--src/lib/shm_rbuff.c47
2 files changed, 69 insertions, 33 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 018cb692..a0c47403 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -612,12 +612,6 @@ int flow_dealloc(int fd)
return -ENOTALLOC;
}
- if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EBUSY;
- }
-
msg.port_id = ai.flows[fd].port_id;
pthread_rwlock_unlock(&ai.flows_lock);
@@ -1285,11 +1279,7 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return -EPERM;
}
- if (ai.flows[fd].tx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].tx_rb);
idx = shm_du_buff_get_idx(sdb);
@@ -1302,9 +1292,38 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return 0;
}
+int ipcp_flow_fini(int fd)
+{
+ struct shm_rbuff * rb;
+
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ rb = ai.flows[fd].rx_rb;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ shm_rbuff_fini(rb);
+
+ return 0;
+}
+
ssize_t local_flow_read(int fd)
{
- return shm_rbuff_read(ai.flows[fd].rx_rb);
+ ssize_t ret;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ ret = shm_rbuff_read(ai.flows[fd].rx_rb);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return ret;
}
int local_flow_write(int fd, size_t idx)
@@ -1315,11 +1334,7 @@ int local_flow_write(int fd, size_t idx)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].tx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].tx_rb);
shm_rbuff_write(ai.flows[fd].tx_rb, idx);
@@ -1338,11 +1353,7 @@ int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].rx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].rx_rb);
idx = shm_rbuff_read(ai.flows[fd].rx_rb);
if (idx < 0) {
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c
index 8b2e9229..301669e7 100644
--- a/src/lib/shm_rbuff.c
+++ b/src/lib/shm_rbuff.c
@@ -43,6 +43,8 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
+#define RB_CLOSED -1
+#define RB_OPEN 0
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 2 * sizeof(size_t) + sizeof(int8_t) \
@@ -144,7 +146,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = 0;
+ *rb->acl = RB_OPEN;
*rb->head = 0;
*rb->tail = 0;
@@ -291,6 +293,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
ret = *tail_el_ptr(rb);
*rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);
+ pthread_cond_broadcast(rb->del);
pthread_mutex_unlock(rb->lock);
@@ -353,10 +356,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-int shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_block(struct shm_rbuff * rb)
{
- int ret = 0;
-
assert(rb);
#ifdef __APPLE__
@@ -367,14 +368,9 @@ int shm_rbuff_block(struct shm_rbuff * rb)
pthread_mutex_consistent(rb->lock);
}
#endif
- *rb->acl = -1;
-
- if (!shm_rbuff_empty(rb))
- ret = -EBUSY;
+ *rb->acl = RB_CLOSED;
pthread_mutex_unlock(rb->lock);
-
- return ret;
}
void shm_rbuff_unblock(struct shm_rbuff * rb)
@@ -389,11 +385,40 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)
pthread_mutex_consistent(rb->lock);
}
#endif
- *rb->acl = 0; /* open */
+ *rb->acl = RB_OPEN;
pthread_mutex_unlock(rb->lock);
}
+void shm_rbuff_fini(struct shm_rbuff * rb)
+{
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ assert(*rb->acl == RB_CLOSED);
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while (!shm_rbuff_empty(rb))
+#ifdef __APPLE__
+ pthread_cond_wait(rb->del, rb->lock);
+#else
+ if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ pthread_cleanup_pop(true);
+}
+
void shm_rbuff_reset(struct shm_rbuff * rb)
{
assert(rb);