summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/ipcp.c13
-rw-r--r--src/ipcpd/ipcp.h2
-rw-r--r--src/ipcpd/local/main.c46
-rw-r--r--src/ipcpd/normal/fmgr.c44
-rw-r--r--src/ipcpd/shim-eth-llc/main.c43
-rw-r--r--src/ipcpd/shim-udp/main.c42
6 files changed, 167 insertions, 23 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index f08e4ce7..587d70c5 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -262,11 +262,9 @@ static void * ipcp_main_loop(void * o)
}
ret_msg.has_result = true;
- pthread_mutex_lock(&ipcpi.alloc_lock);
ret_msg.result =
ipcpi.ops->ipcp_flow_alloc_resp(fd,
msg->response);
- pthread_mutex_unlock(&ipcpi.alloc_lock);
break;
case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC:
if (ipcpi.ops->ipcp_flow_dealloc == NULL) {
@@ -455,6 +453,13 @@ int ipcp_init(int argc,
goto fail_alloc_lock;
}
+ if (pthread_cond_init(&ipcpi.alloc_cond, NULL)) {
+ log_err("Failed to init convar.");
+ goto fail_alloc_cond;
+ }
+
+ ipcpi.alloc_id = -1;
+
if (type == IPCP_NORMAL) {
pthread_condattr_destroy(&cattr);
return 0;
@@ -471,6 +476,8 @@ int ipcp_init(int argc,
return 0;
fail_shim_data:
+ pthread_cond_destroy(&ipcpi.alloc_cond);
+ fail_alloc_cond:
pthread_mutex_destroy(&ipcpi.alloc_lock);
fail_alloc_lock:
bmp_destroy(ipcpi.thread_ids);
@@ -601,6 +608,8 @@ void ipcp_fini()
pthread_mutex_destroy(&ipcpi.threads_lock);
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_rwlock_destroy(&ipcpi.state_lock);
+ pthread_cond_destroy(&ipcpi.alloc_cond);
+ pthread_mutex_destroy(&ipcpi.alloc_lock);
log_fini();
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 145e91f5..e5c4b9af 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -81,6 +81,8 @@ struct ipcp {
int sockfd;
char * sock_path;
+ int alloc_id;
+ pthread_cond_t alloc_cond;
pthread_mutex_t alloc_lock;
pthread_t * threadpool;
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 88d0912e..897ec3a0 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -72,8 +72,7 @@ static int local_data_init(void)
return 0;
}
-static void local_data_fini(void)
-{
+static void local_data_fini(void){
flow_set_destroy(local_data.flows);
fqueue_destroy(local_data.fq);
pthread_rwlock_destroy(&local_data.lock);
@@ -212,18 +211,27 @@ static int ipcp_local_flow_alloc(int fd,
char * dst_name,
qoscube_t cube)
{
- int out_fd = -1;
+ struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000};
+ int out_fd = -1;
log_dbg("Allocating flow to %s on fd %d.", dst_name, fd);
assert(dst_name);
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
if (ipcp_get_state() != IPCP_OPERATIONAL) {
log_dbg("Won't allocate over non-operational IPCP.");
- return -1; /* -ENOTENROLLED */
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
}
- pthread_mutex_lock(&ipcpi.alloc_lock);
+ assert(ipcpi.alloc_id == -1);
out_fd = ipcp_flow_req_arr(getpid(), dst_name, cube);
if (out_fd < 0) {
@@ -238,6 +246,10 @@ static int ipcp_local_flow_alloc(int fd,
local_data.in_out[out_fd] = fd;
pthread_rwlock_unlock(&local_data.lock);
+
+ ipcpi.alloc_id = out_fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_set_add(local_data.flows, fd);
@@ -250,8 +262,26 @@ static int ipcp_local_flow_alloc(int fd,
static int ipcp_local_flow_alloc_resp(int fd,
int response)
{
- int out_fd = -1;
- int ret = -1;
+ struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000};
+ int out_fd = -1;
+ int ret = -1;
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
pthread_rwlock_wrlock(&local_data.lock);
@@ -278,7 +308,7 @@ static int ipcp_local_flow_alloc_resp(int fd,
log_info("Flow allocation completed, fds (%d, %d).", out_fd, fd);
- return ret;
+ return 0;
}
static int ipcp_local_flow_dealloc(int fd)
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 3191eac5..19c329af 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -542,6 +542,7 @@ static int np1_flow_dealloc(int fd)
int fmgr_np1_alloc_resp(int fd,
int response)
{
+ struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
@@ -549,6 +550,23 @@ int fmgr_np1_alloc_resp(int fd,
msg.response = response;
msg.has_response = true;
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
buf.len = flow_alloc_msg__get_packed_size(&msg);
if (buf.len == 0)
return -1;
@@ -601,10 +619,11 @@ int fmgr_np1_dealloc(int fd)
int fmgr_np1_post_buf(cep_id_t cep_id,
buffer_t * buf)
{
- int ret = 0;
- int fd;
+ struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
+ int ret = 0;
+ int fd;
flow_alloc_msg_t * msg;
- qoscube_t cube;
+ qoscube_t cube;
/* Depending on the message call the function in ipcp-dev.h */
@@ -617,6 +636,21 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
switch (msg->code) {
case FLOW_ALLOC_CODE__FLOW_REQ:
pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != -1 &&
+ ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ log_dbg("Won't allocate over non-operational IPCP.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ assert(ipcpi.alloc_id == -1);
+
fd = ipcp_flow_req_arr(getpid(),
msg->dst_name,
msg->qoscube);
@@ -633,6 +667,10 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
fmgr.np1_cep_id_to_fd[cep_id] = fd;
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
pthread_mutex_unlock(&ipcpi.alloc_lock);
break;
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 1b6e02c2..20189e08 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -352,10 +352,22 @@ static int eth_llc_ipcp_sap_req(uint8_t r_sap,
char * dst_name,
qoscube_t cube)
{
- int fd;
+ struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000};
+ int fd;
pthread_mutex_lock(&ipcpi.alloc_lock);
+ while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ log_dbg("Won't allocate over non-operational IPCP.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
/* reply to IRM, called under lock to prevent race */
fd = ipcp_flow_req_arr(getpid(), dst_name, cube);
if (fd < 0) {
@@ -369,6 +381,9 @@ static int eth_llc_ipcp_sap_req(uint8_t r_sap,
eth_llc_data.fd_to_ef[fd].r_sap = r_sap;
memcpy(eth_llc_data.fd_to_ef[fd].r_addr, r_addr, MAC_SIZE);
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
pthread_rwlock_unlock(&eth_llc_data.flows_lock);
pthread_mutex_unlock(&ipcpi.alloc_lock);
@@ -630,12 +645,12 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
static void * eth_llc_ipcp_sdu_writer(void * o)
{
+ struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
int fd;
struct shm_du_buff * sdb;
uint8_t ssap;
uint8_t dsap;
uint8_t r_addr[MAC_SIZE];
- struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
(void) o;
@@ -956,9 +971,27 @@ static int eth_llc_ipcp_flow_alloc(int fd,
static int eth_llc_ipcp_flow_alloc_resp(int fd,
int response)
{
- uint8_t ssap = 0;
- uint8_t r_sap = 0;
- uint8_t r_addr[MAC_SIZE];
+ struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000};
+ uint8_t ssap = 0;
+ uint8_t r_sap = 0;
+ uint8_t r_addr[MAC_SIZE];
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index ea3d1f88..b1a88fae 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -232,11 +232,11 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
char * dst_name,
qoscube_t cube)
{
- int skfd;
- int fd;
-
+ struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
struct sockaddr_in f_saddr;
socklen_t f_saddr_len = sizeof(f_saddr);
+ int skfd;
+ int fd;
log_dbg("Port request arrived from UDP port %d",
ntohs(c_saddr->sin_port));
@@ -271,6 +271,17 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
pthread_mutex_lock(&ipcpi.alloc_lock);
+ while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ log_dbg("Won't allocate over non-operational IPCP.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
/* reply to IRM */
fd = ipcp_flow_req_arr(getpid(), dst_name, cube);
if (fd < 0) {
@@ -291,6 +302,9 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
pthread_rwlock_unlock(&ipcpi.state_lock);
pthread_mutex_unlock(&ipcpi.alloc_lock);
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
log_dbg("Pending allocation request, fd %d, UDP port (%d, %d).",
fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port));
@@ -1063,14 +1077,32 @@ static int ipcp_udp_flow_alloc(int fd,
static int ipcp_udp_flow_alloc_resp(int fd,
int response)
{
- int skfd = -1;
+ struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
+ int skfd = -1;
struct sockaddr_in f_saddr;
struct sockaddr_in r_saddr;
- socklen_t len = sizeof(r_saddr);
+ socklen_t len = sizeof(r_saddr);
if (response)
return 0;
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&udp_data.flows_lock);