summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/local/main.c18
-rw-r--r--src/ipcpd/shim-eth-llc/main.c54
-rw-r--r--src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto5
-rw-r--r--src/ipcpd/shim-udp/main.c85
-rw-r--r--src/ipcpd/shim-udp/shim_udp_messages.proto1
-rw-r--r--src/irmd/main.c10
-rw-r--r--src/lib/dev.c55
-rw-r--r--src/lib/shm_rbuff.c47
8 files changed, 92 insertions, 183 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index a8d5c273..412795ec 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -110,7 +110,7 @@ static void * ipcp_local_sdu_loop(void * o)
while ((fd = fqueue_next(local_data.fq)) >= 0) {
idx = local_flow_read(fd);
- assert((size_t) idx < (SHM_BUFFER_SIZE));
+ assert(idx < (SHM_BUFFER_SIZE));
fd = local_data.in_out[fd];
@@ -243,13 +243,13 @@ static int ipcp_local_flow_alloc(int fd,
pthread_rwlock_wrlock(&local_data.lock);
- flow_set_add(local_data.flows, fd);
-
out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
local_data.in_out[fd] = out_fd;
local_data.in_out[out_fd] = fd;
+ flow_set_add(local_data.flows, fd);
+
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -291,24 +291,22 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
static int ipcp_local_flow_dealloc(int fd)
{
- struct timespec t = {0, 10000};
-
assert(!(fd < 0));
- flow_set_del(local_data.flows, fd);
-
- while (flow_dealloc(fd) == -EBUSY)
- nanosleep(&t, NULL);
+ ipcp_flow_fini(fd);
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&local_data.lock);
- flow_cntl(local_data.in_out[fd], FLOW_F_SETFL, FLOW_O_WRONLY);
+ flow_set_del(local_data.flows, fd);
+
local_data.in_out[fd] = -1;
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+ flow_dealloc(fd);
+
LOG_INFO("Flow with fd %d deallocated.", fd);
return 0;
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index f6cded2b..b7b9f783 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -348,17 +348,6 @@ static int eth_llc_ipcp_sap_alloc_resp(uint8_t * dst_addr,
return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);
}
-static int eth_llc_ipcp_sap_dealloc(uint8_t * dst_addr, uint8_t ssap)
-{
- shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT;
-
- msg.code = SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC;
- msg.has_ssap = true;
- msg.ssap = ssap;
-
- return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);
-}
-
static int eth_llc_ipcp_sap_req(uint8_t r_sap,
uint8_t * r_addr,
char * dst_name,
@@ -427,29 +416,6 @@ static int eth_llc_ipcp_sap_alloc_reply(uint8_t ssap,
}
-static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap)
-{
- int fd = -1;
-
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
-
- fd = eth_llc_data.ef_to_fd[ssap];
- if (fd < 0) {
- pthread_rwlock_unlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Flow already deallocated.");
- return 0;
- }
-
- pthread_rwlock_unlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
-
- return 0;
-}
-
static int eth_llc_ipcp_name_query_req(char * name, uint8_t * r_addr)
{
shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT;
@@ -509,9 +475,6 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)
msg->dsap,
msg->response);
break;
- case SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC:
- eth_llc_ipcp_flow_dealloc_req(msg->ssap);
- break;
case SHIM_ETH_LLC_MSG_CODE__NAME_QUERY_REQ:
eth_llc_ipcp_name_query_req(msg->dst_name, r_addr);
break;
@@ -1074,25 +1037,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)
static int eth_llc_ipcp_flow_dealloc(int fd)
{
- struct timespec t = {0, 10000};
-
uint8_t sap;
- uint8_t r_sap;
uint8_t addr[MAC_SIZE];
- int ret;
-
- flow_set_del(eth_llc_data.np1_flows, fd);
- while (flow_dealloc(fd) == -EBUSY)
- nanosleep(&t, NULL);
+ ipcp_flow_fini(fd);
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
- r_sap = eth_llc_data.fd_to_ef[fd].r_sap;
+ flow_set_del(eth_llc_data.np1_flows, fd);
+
sap = eth_llc_data.fd_to_ef[fd].sap;
memcpy(addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE);
-
bmp_release(eth_llc_data.saps, sap);
eth_llc_data.fd_to_ef[fd].sap = -1;
eth_llc_data.fd_to_ef[fd].r_sap = -1;
@@ -1103,9 +1059,7 @@ static int eth_llc_ipcp_flow_dealloc(int fd)
pthread_rwlock_unlock(&eth_llc_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- ret = eth_llc_ipcp_sap_dealloc(addr, r_sap);
- if (ret < 0)
- LOG_DBG("Could not notify remote.");
+ flow_dealloc(fd);
LOG_DBG("Flow with fd %d deallocated.", fd);
diff --git a/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto b/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto
index 4d027d98..045db5c2 100644
--- a/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto
+++ b/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto
@@ -1,9 +1,8 @@
enum shim_eth_llc_msg_code {
FLOW_REQ = 1;
FLOW_REPLY = 2;
- FLOW_DEALLOC = 3;
- NAME_QUERY_REQ = 4;
- NAME_QUERY_REPLY = 5;
+ NAME_QUERY_REQ = 3;
+ NAME_QUERY_REPLY = 4;
};
message shim_eth_llc_msg {
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index ba2805c5..e4ab4fac 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -230,17 +230,6 @@ static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr,
return send_shim_udp_msg(&msg, dst_ip_addr);
}
-static int ipcp_udp_port_dealloc(uint32_t dst_ip_addr,
- uint16_t src_udp_port)
-{
- shim_udp_msg_t msg = SHIM_UDP_MSG__INIT;
-
- msg.code = SHIM_UDP_MSG_CODE__FLOW_DEALLOC;
- msg.src_udp_port = src_udp_port;
-
- return send_shim_udp_msg(&msg, dst_ip_addr);
-}
-
static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
char * dst_name,
char * src_ae_name)
@@ -375,46 +364,6 @@ static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port,
return ret;
}
-static int ipcp_udp_flow_dealloc_req(uint16_t udp_port)
-{
- int skfd = -1;
- int fd = -1;
-
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_wrlock(&udp_data.flows_lock);
-
- fd = udp_port_to_fd(udp_port);
- if (fd < 0) {
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Could not find flow on UDP port %d.",
- ntohs(udp_port));
- return 0;
- }
-
- skfd = udp_data.fd_to_uf[fd].skfd;
-
- udp_data.uf_to_fd[skfd] = -1;
- udp_data.fd_to_uf[fd].udp = -1;
- udp_data.fd_to_uf[fd].skfd = -1;
-
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_rdlock(&udp_data.flows_lock);
-
- clr_fd(skfd);
-
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
-
- close(skfd);
-
- LOG_DBG("Flow with fd %d deallocated.", fd);
-
- return 0;
-}
-
static void * ipcp_udp_listener(void * o)
{
uint8_t buf[SHIM_UDP_MSG_SIZE];
@@ -456,9 +405,6 @@ static void * ipcp_udp_listener(void * o)
msg->dst_udp_port,
msg->response);
break;
- case SHIM_UDP_MSG_CODE__FLOW_DEALLOC:
- ipcp_udp_flow_dealloc_req(msg->src_udp_port);
- break;
default:
LOG_ERR("Unknown message received %d.", msg->code);
shim_udp_msg__free_unpacked(msg, NULL);
@@ -1153,15 +1099,10 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)
static int ipcp_udp_flow_dealloc(int fd)
{
int skfd = -1;
- uint16_t remote_udp;
- struct timespec t = {0, 10000};
- struct sockaddr_in r_saddr;
- socklen_t r_saddr_len = sizeof(r_saddr);
- flow_set_del(udp_data.np1_flows, fd);
+ ipcp_flow_fini(fd);
- while (flow_dealloc(fd) == -EBUSY)
- nanosleep(&t, NULL);
+ flow_set_del(udp_data.np1_flows, fd);
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&udp_data.flows_lock);
@@ -1180,28 +1121,10 @@ static int ipcp_udp_flow_dealloc(int fd)
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- if (getpeername(skfd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) {
- LOG_DBG("Socket with fd %d has no peer.", skfd);
- close(skfd);
- return 0;
- }
-
- remote_udp = r_saddr.sin_port;
- r_saddr.sin_port = LISTEN_PORT;
-
- if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) {
- close(skfd);
- return 0 ;
- }
-
- if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr, remote_udp) < 0) {
- LOG_DBG("Could not notify remote.");
- close(skfd);
- return 0;
- }
-
close(skfd);
+ flow_dealloc(fd);
+
LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
diff --git a/src/ipcpd/shim-udp/shim_udp_messages.proto b/src/ipcpd/shim-udp/shim_udp_messages.proto
index bd9bd3aa..bd23f8eb 100644
--- a/src/ipcpd/shim-udp/shim_udp_messages.proto
+++ b/src/ipcpd/shim-udp/shim_udp_messages.proto
@@ -1,7 +1,6 @@
enum shim_udp_msg_code {
FLOW_REQ = 1;
FLOW_REPLY = 2;
- FLOW_DEALLOC = 3;
};
message shim_udp_msg {
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 1ac989de..548ab1db 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1026,6 +1026,8 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name)
if (dst_ae_name != NULL)
*dst_ae_name = re->req_ae_name;
+ LOG_INFO("Flow on port_id %d allocated.", f->port_id);
+
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1156,7 +1158,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_wrlock(&irmd->flows_lock);
port_id = f->port_id = bmp_allocate(irmd->port_ids);
- if (!bmp_is_id_valid(irmd->port_ids, (ssize_t) port_id)) {
+ if (!bmp_is_id_valid(irmd->port_ids, port_id)) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Could not allocate port_id.");
@@ -1233,6 +1235,7 @@ static int flow_alloc_res(int port_id)
}
if (irm_flow_get_state(f) == FLOW_ALLOCATED) {
+ LOG_INFO("Flow on port_id %d allocated.", port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
@@ -1348,7 +1351,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
struct pid_el * c_api;
pid_t h_api = -1;
- int port_id = -1;
LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.",
api, dst_name, ae_name);
@@ -1467,7 +1469,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- port_id = f->port_id = bmp_allocate(irmd->port_ids);
+ f->port_id = bmp_allocate(irmd->port_ids);
if (!bmp_is_id_valid(irmd->port_ids, f->port_id)) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1532,8 +1534,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_mutex_unlock(&re->state_lock);
- LOG_INFO("Flow on port_id %d allocated.", port_id);
-
return f;
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 018cb692..a0c47403 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -612,12 +612,6 @@ int flow_dealloc(int fd)
return -ENOTALLOC;
}
- if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EBUSY;
- }
-
msg.port_id = ai.flows[fd].port_id;
pthread_rwlock_unlock(&ai.flows_lock);
@@ -1285,11 +1279,7 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return -EPERM;
}
- if (ai.flows[fd].tx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].tx_rb);
idx = shm_du_buff_get_idx(sdb);
@@ -1302,9 +1292,38 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return 0;
}
+int ipcp_flow_fini(int fd)
+{
+ struct shm_rbuff * rb;
+
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ rb = ai.flows[fd].rx_rb;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ shm_rbuff_fini(rb);
+
+ return 0;
+}
+
ssize_t local_flow_read(int fd)
{
- return shm_rbuff_read(ai.flows[fd].rx_rb);
+ ssize_t ret;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ ret = shm_rbuff_read(ai.flows[fd].rx_rb);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return ret;
}
int local_flow_write(int fd, size_t idx)
@@ -1315,11 +1334,7 @@ int local_flow_write(int fd, size_t idx)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].tx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].tx_rb);
shm_rbuff_write(ai.flows[fd].tx_rb, idx);
@@ -1338,11 +1353,7 @@ int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].rx_rb == NULL) {
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
- return -EPERM;
- }
+ assert(ai.flows[fd].rx_rb);
idx = shm_rbuff_read(ai.flows[fd].rx_rb);
if (idx < 0) {
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c
index 8b2e9229..301669e7 100644
--- a/src/lib/shm_rbuff.c
+++ b/src/lib/shm_rbuff.c
@@ -43,6 +43,8 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
+#define RB_CLOSED -1
+#define RB_OPEN 0
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 2 * sizeof(size_t) + sizeof(int8_t) \
@@ -144,7 +146,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = 0;
+ *rb->acl = RB_OPEN;
*rb->head = 0;
*rb->tail = 0;
@@ -291,6 +293,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
ret = *tail_el_ptr(rb);
*rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);
+ pthread_cond_broadcast(rb->del);
pthread_mutex_unlock(rb->lock);
@@ -353,10 +356,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-int shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_block(struct shm_rbuff * rb)
{
- int ret = 0;
-
assert(rb);
#ifdef __APPLE__
@@ -367,14 +368,9 @@ int shm_rbuff_block(struct shm_rbuff * rb)
pthread_mutex_consistent(rb->lock);
}
#endif
- *rb->acl = -1;
-
- if (!shm_rbuff_empty(rb))
- ret = -EBUSY;
+ *rb->acl = RB_CLOSED;
pthread_mutex_unlock(rb->lock);
-
- return ret;
}
void shm_rbuff_unblock(struct shm_rbuff * rb)
@@ -389,11 +385,40 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)
pthread_mutex_consistent(rb->lock);
}
#endif
- *rb->acl = 0; /* open */
+ *rb->acl = RB_OPEN;
pthread_mutex_unlock(rb->lock);
}
+void shm_rbuff_fini(struct shm_rbuff * rb)
+{
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ assert(*rb->acl == RB_CLOSED);
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while (!shm_rbuff_empty(rb))
+#ifdef __APPLE__
+ pthread_cond_wait(rb->del, rb->lock);
+#else
+ if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ pthread_cleanup_pop(true);
+}
+
void shm_rbuff_reset(struct shm_rbuff * rb)
{
assert(rb);