summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-26 19:30:52 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-26 20:09:21 +0200
commit963537079c7d5a9f9fb39355fb0e3b84a78eaa0b (patch)
tree7f9a78e0d57f95d903bcbbf01a00e71482593277 /src
parent7848ec4100f8677392fb6b07c42dd47ee6aa9b0d (diff)
downloadouroboros-963537079c7d5a9f9fb39355fb0e3b84a78eaa0b.tar.gz
ouroboros-963537079c7d5a9f9fb39355fb0e3b84a78eaa0b.zip
lib, ipcpd: Further stabilization of flows
The steps for flow deallocation have been further refined. An operation ipcp_flow_fini() which wait for all SDUs to be read from a flow has been added. The shim IPCPs and the local IPCP have been adapted to this new API. Deallocation messages have been removed from the shim IPCPs, since there is insufficient state synchronisation between them to make this work reliably.
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/local/main.c18
-rw-r--r--src/ipcpd/shim-eth-llc/main.c54
-rw-r--r--src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto5
-rw-r--r--src/ipcpd/shim-udp/main.c85
-rw-r--r--src/ipcpd/shim-udp/shim_udp_messages.proto1
-rw-r--r--src/irmd/main.c10
-rw-r--r--src/lib/dev.c55
-rw-r--r--src/lib/shm_rbuff.c47
8 files changed, 92 insertions, 183 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index a8d5c273..412795ec 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -110,7 +110,7 @@ static void * ipcp_local_sdu_loop(void * o)
while ((fd = fqueue_next(local_data.fq)) >= 0) {
idx = local_flow_read(fd);
- assert((size_t) idx < (SHM_BUFFER_SIZE));
+ assert(idx < (SHM_BUFFER_SIZE));
fd = local_data.in_out[fd];
@@ -243,13 +243,13 @@ static int ipcp_local_flow_alloc(int fd,
pthread_rwlock_wrlock(&local_data.lock);
- flow_set_add(local_data.flows, fd);
-
out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
local_data.in_out[fd] = out_fd;
local_data.in_out[out_fd] = fd;
+ flow_set_add(local_data.flows, fd);
+
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -291,24 +291,22 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
static int ipcp_local_flow_dealloc(int fd)
{
- struct timespec t = {0, 10000};
-
assert(!(fd < 0));
- flow_set_del(local_data.flows, fd);
-
- while (flow_dealloc(fd) == -EBUSY)
- nanosleep(&t, NULL);
+ ipcp_flow_fini(fd);
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&local_data.lock);
- flow_cntl(local_data.in_out[fd], FLOW_F_SETFL, FLOW_O_WRONLY);
+ flow_set_del(local_data.flows, fd);
+
local_data.in_out[fd] = -1;
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+ flow_dealloc(fd);
+
LOG_INFO("Flow with fd %d deallocated.", fd);
return 0;
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index f6cded2b..b7b9f783 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -348,17 +348,6 @@ static int eth_llc_ipcp_sap_alloc_resp(uint8_t * dst_addr,
return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);
}
-static int eth_llc_ipcp_sap_dealloc(uint8_t * dst_addr, uint8_t ssap)
-{
- shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT;
-
- msg.code = SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC;
- msg.has_ssap = true;
- msg.ssap = ssap;
-
- return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);
-}
-
static int eth_llc_ipcp_sap_req(uint8_t r_sap,
uint8_t * r_addr,
char * dst_name,
@@ -427,29 +416,6 @@ static int eth_llc_ipcp_sap_alloc_reply(uint8_t ssap,
}
-static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap)
-{
- int fd = -1;
-
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
-
- fd = eth_llc_data.ef_to_fd[ssap];
- if (fd < 0) {
- pthread_rwlock_unlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Flow already deallocated.");
- return 0;
- }
-
- pthread_rwlock_unlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
-
- return 0;
-}
-
static int eth_llc_ipcp_name_query_req(char * name, uint8_t * r_addr)
{
shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT;
@@ -509,9 +475,6 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)
msg->dsap,
msg->response);
break;
- case SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC:
- eth_llc_ipcp_flow_dealloc_req(msg->ssap);
- break;
case SHIM_ETH_LLC_MSG_CODE__NAME_QUERY_REQ:
eth_llc_ipcp_name_query_req(msg->dst_name, r_addr);
break;
@@ -1074,25 +1037,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)
static int eth_llc_ipcp_flow_dealloc(int fd)
{
- struct timespec t = {0, 10000};
-
uint8_t sap;
- uint8_t r_sap;
uint8_t addr[MAC_SIZE];
- int ret;
-
- flow_set_del(eth_llc_data.np1_flows, fd);
- while (flow_dealloc(fd) == -EBUSY)
- nanosleep(&t, NULL);
+ ipcp_flow_fini(fd);
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
- r_sap = eth_llc_data.fd_to_ef[fd].r_sap;
+ flow_set_del(eth_llc_data.np1_flows, fd);
+
sap = eth_llc_data.fd_to_ef[fd].sap;
memcpy(addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE);
-
bmp_release(eth_llc_data.saps, sap);
eth_llc_data.fd_to_ef[fd].sap = -1;
eth_llc_data.fd_to_ef[fd].r_sap = -1;
@@ -1103,9 +1059,7 @@ static int eth_llc_ipcp_flow_dealloc(int fd)
pthread_rwlock_unlock(&eth_llc_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- ret = eth_llc_ipcp_sap_dealloc(addr, r_sap);
- if (ret < 0)
- LOG_DBG("Could not notify remote.");
+ flow_dealloc(fd);
LOG_DBG("Flow with fd %d deallocated.", fd);
diff --git a/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto b/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto
index 4d027d98..045db5c2 100644
--- a/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto
+++ b/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto
@@ -1,9 +1,8 @@
enum shim_eth_llc_msg_code {
FLOW_REQ = 1;
FLOW_REPLY = 2;
- FLOW_DEALLOC = 3;
- NAME_QUERY_REQ = 4;
- NAME_QUERY_REPLY = 5;
+ NAME_QUERY_REQ = 3;
+ NAME_QUERY_REPLY = 4;
};
message shim_eth_llc_msg {
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index ba2805c5..e4ab4fac 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -230,17 +230,6 @@ static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr,
return send_shim_udp_msg(&msg, dst_ip_addr);
}
-static int ipcp_udp_port_dealloc(uint32_t dst_ip_addr,
- uint16_t src_udp_port)
-{
- shim_udp_msg_t msg = SHIM_UDP_MSG__INIT;
-
- msg.code = SHIM_UDP_MSG_CODE__FLOW_DEALLOC;
- msg.src_udp_port = src_udp_port;
-
- return send_shim_udp_msg(&msg, dst_ip_addr);
-}
-
static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
char * dst_name,
char * src_ae_name)
@@ -375,46 +364,6 @@ static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port,
return ret;
}
-static int ipcp_udp_flow_dealloc_req(uint16_t udp_port)
-{
- int skfd = -1;
- int fd = -1;
-
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_wrlock(&udp_data.flows_lock);
-
- fd = udp_port_to_fd(udp_port);
- if (fd < 0) {
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Could not find flow on UDP port %d.",
- ntohs(udp_port));
- return 0;
- }
-
- skfd = udp_data.fd_to_uf[fd].skfd;
-
- udp_data.uf_to_fd[skfd] = -1;
- udp_data.fd_to_uf[fd].udp = -1;
- udp_data.fd_to_uf[fd].skfd = -1;
-
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_rdlock(&udp_data.flows_lock);
-
- clr_fd(skfd);
-
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
-
- close(skfd);
-
- LOG_DBG("Flow with fd %d deallocated.", fd);
-
- return 0;
-}
-
static void * ipcp_udp_listener(void * o)
{
uint8_t buf[SHIM_UDP_MSG_SIZE];
@@ -456,9 +405,6 @@ static void * ipcp_udp_listener(void * o)
msg->dst_udp_port,
msg->response);
break;
- case SHIM_UDP_MSG_CODE__FLOW_DEALLOC:
- ipcp_udp_flow_dealloc_req(msg->src_udp_port);
- break;
default:
LOG_ERR("Unknown message received %d.", msg->code);
shim_udp_msg__free_unpacked(msg, NULL);
@@ -1153,15 +1099,10 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)
static int ipcp_udp_flow_dealloc(int fd)
{
int skfd = -1;
- uint16_t remote_udp;
- struct timespec t = {0, 10000};
- struct sockaddr_in r_saddr;
- socklen_t r_saddr_len = sizeof(r_saddr);
- flow_set_del(udp_data.np1_flows, fd);
+ ipcp_flow_fini(fd);
- while (flow_dealloc(fd) == -EBUSY)
- nanosleep(&t, NULL);
+ flow_set_del(udp_data.np1_flows, fd);
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&udp_data.flows_lock);
@@ -1180,28 +1121,10 @@ static int ipcp_udp_flow_dealloc(int fd)
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- if (getpeername(skfd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) {
- LOG_DBG("Socket with fd %d has no peer.", skfd);
- close(skfd);
- return 0;
- }
-
- remote_udp = r_saddr.sin_port;
- r_saddr.sin_port = LISTEN_PORT;
-
- if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) {
- close(skfd);
- return 0 ;
- }
-
- if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr, remote_udp) < 0) {
- LOG_DBG("Could not notify remote.");
- close(skfd);
- return 0;
- }
-
close(skfd);
+ flow_dealloc(fd);
+
LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
diff --git a/src/ipcpd/shim-udp/shim_udp_messages.proto b/src/ipcpd/shim-udp/shim_udp_messages.proto
index bd9bd3aa..bd23f8eb 100644
--- a/src/ipcpd/shim-udp/shim_udp_messages.proto
+++ b/src/ipcpd/shim-udp/shim_udp_messages.proto
@@ -1,7 +1,6 @@
enum shim_udp_msg_code {
FLOW_REQ = 1;
FLOW_REPLY = 2;
- FLOW_DEALLOC = 3;
};
message shim_udp_msg {
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 1ac989de..548ab1db 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1026,6 +1026,8 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name)
if (dst_ae_name != NULL)
*dst_ae_name = re->req_ae_name;
+ LOG_INFO("Flow on port_id %d allocated.", f->port_id);
+
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1156,7 +1158,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_wrlock(&irmd->flows_lock);
port_id = f->port_id = bmp_allocate(irmd->port_ids);
- if (!bmp_is_id_valid(irmd->port_ids, (ssize_t) port_id)) {
+ if (!bmp_is_id_valid(irmd->port_ids, port_id)) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Could not allocate port_id.");
@@ -1233,6 +1235,7 @@ static int flow_alloc_res(int port_id)
}
if (irm_flow_get_state(f) == FLOW_ALLOCATED) {
+ LOG_INFO("Flow on port_id %d allocated.", port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
@@ -1348,7 +1351,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
struct pid_el * c_api;
pid_t h_api = -1;
- int port_id = -1;
LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.",
api, dst_name, ae_name);
@@ -1467,7 +1469,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- port_id = f->port_id = bmp_allocate(irmd->port_ids);
+ f->port_id = bmp_allocate(irmd->port_ids);
if (!bmp_is_id_valid(irmd->port_ids, f->port_id)) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1532,8 +1534,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_mutex_unlock(&re->state_lock);
- LOG_INFO("Flow on port_id %d allocated.", port_id);
-
return f;
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 018cb692..a0c47403 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -612,12 +612,6 @@ int flow_dealloc(int fd)
return -ENOTALLOC;
}
- if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EBUSY;
- }
-
msg.port_id = ai.flows[fd].port_id;
pthread_rwlock_unlock(&ai.flows_lock);
@@ -1285,11 +1279,7 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return -EPERM;
}
- if (ai.flows[fd].tx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].tx_rb);
idx = shm_du_buff_get_idx(sdb);
@@ -1302,9 +1292,38 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return 0;
}
+int ipcp_flow_fini(int fd)
+{
+ struct shm_rbuff * rb;
+
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ rb = ai.flows[fd].rx_rb;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ shm_rbuff_fini(rb);
+
+ return 0;
+}
+
ssize_t local_flow_read(int fd)
{
- return shm_rbuff_read(ai.flows[fd].rx_rb);
+ ssize_t ret;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ ret = shm_rbuff_read(ai.flows[fd].rx_rb);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return ret;
}
int local_flow_write(int fd, size_t idx)
@@ -1315,11 +1334,7 @@ int local_flow_write(int fd, size_t idx)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].tx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].tx_rb);
shm_rbuff_write(ai.flows[fd].tx_rb, idx);
@@ -1338,11 +1353,7 @@ int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].rx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].rx_rb);
idx = shm_rbuff_read(ai.flows[fd].rx_rb);
if (idx < 0) {
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c
index 8b2e9229..301669e7 100644
--- a/src/lib/shm_rbuff.c
+++ b/src/lib/shm_rbuff.c
@@ -43,6 +43,8 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
+#define RB_CLOSED -1
+#define RB_OPEN 0
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 2 * sizeof(size_t) + sizeof(int8_t) \
@@ -144,7 +146,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = 0;
+ *rb->acl = RB_OPEN;
*rb->head = 0;
*rb->tail = 0;
@@ -291,6 +293,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
ret = *tail_el_ptr(rb);
*rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);
+ pthread_cond_broadcast(rb->del);
pthread_mutex_unlock(rb->lock);
@@ -353,10 +356,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-int shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_block(struct shm_rbuff * rb)
{
- int ret = 0;
-
assert(rb);
#ifdef __APPLE__
@@ -367,14 +368,9 @@ int shm_rbuff_block(struct shm_rbuff * rb)
pthread_mutex_consistent(rb->lock);
}
#endif
- *rb->acl = -1;
-
- if (!shm_rbuff_empty(rb))
- ret = -EBUSY;
+ *rb->acl = RB_CLOSED;
pthread_mutex_unlock(rb->lock);
-
- return ret;
}
void shm_rbuff_unblock(struct shm_rbuff * rb)
@@ -389,11 +385,40 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)
pthread_mutex_consistent(rb->lock);
}
#endif
- *rb->acl = 0; /* open */
+ *rb->acl = RB_OPEN;
pthread_mutex_unlock(rb->lock);
}
+void shm_rbuff_fini(struct shm_rbuff * rb)
+{
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ assert(*rb->acl == RB_CLOSED);
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while (!shm_rbuff_empty(rb))
+#ifdef __APPLE__
+ pthread_cond_wait(rb->del, rb->lock);
+#else
+ if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ pthread_cleanup_pop(true);
+}
+
void shm_rbuff_reset(struct shm_rbuff * rb)
{
assert(rb);