diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-06 16:46:52 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-06 16:51:02 +0200 |
commit | a4ce5e7d47d27c8b582e27b38ce61c9cb9735992 (patch) | |
tree | ec99dad12704ff9728d408fb93b356d449fe70c8 /src/ipcpd | |
parent | 0f9954dd086834a996d5585d923364b765b752e4 (diff) | |
download | ouroboros-a4ce5e7d47d27c8b582e27b38ce61c9cb9735992.tar.gz ouroboros-a4ce5e7d47d27c8b582e27b38ce61c9cb9735992.zip |
ipcpd: Fix race condition with concurrent allocs
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/ipcp.c | 13 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 2 | ||||
-rw-r--r-- | src/ipcpd/local/main.c | 46 | ||||
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 44 | ||||
-rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 43 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 42 |
6 files changed, 167 insertions, 23 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index f08e4ce7..587d70c5 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -262,11 +262,9 @@ static void * ipcp_main_loop(void * o) } ret_msg.has_result = true; - pthread_mutex_lock(&ipcpi.alloc_lock); ret_msg.result = ipcpi.ops->ipcp_flow_alloc_resp(fd, msg->response); - pthread_mutex_unlock(&ipcpi.alloc_lock); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: if (ipcpi.ops->ipcp_flow_dealloc == NULL) { @@ -455,6 +453,13 @@ int ipcp_init(int argc, goto fail_alloc_lock; } + if (pthread_cond_init(&ipcpi.alloc_cond, NULL)) { + log_err("Failed to init convar."); + goto fail_alloc_cond; + } + + ipcpi.alloc_id = -1; + if (type == IPCP_NORMAL) { pthread_condattr_destroy(&cattr); return 0; @@ -471,6 +476,8 @@ int ipcp_init(int argc, return 0; fail_shim_data: + pthread_cond_destroy(&ipcpi.alloc_cond); + fail_alloc_cond: pthread_mutex_destroy(&ipcpi.alloc_lock); fail_alloc_lock: bmp_destroy(ipcpi.thread_ids); @@ -601,6 +608,8 @@ void ipcp_fini() pthread_mutex_destroy(&ipcpi.threads_lock); pthread_mutex_destroy(&ipcpi.state_mtx); pthread_rwlock_destroy(&ipcpi.state_lock); + pthread_cond_destroy(&ipcpi.alloc_cond); + pthread_mutex_destroy(&ipcpi.alloc_lock); log_fini(); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 145e91f5..e5c4b9af 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -81,6 +81,8 @@ struct ipcp { int sockfd; char * sock_path; + int alloc_id; + pthread_cond_t alloc_cond; pthread_mutex_t alloc_lock; pthread_t * threadpool; diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 88d0912e..897ec3a0 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -72,8 +72,7 @@ static int local_data_init(void) return 0; } -static void local_data_fini(void) -{ +static void local_data_fini(void){ flow_set_destroy(local_data.flows); fqueue_destroy(local_data.fq); pthread_rwlock_destroy(&local_data.lock); @@ -212,18 +211,27 @@ static int ipcp_local_flow_alloc(int fd, char * dst_name, qoscube_t cube) { - int out_fd = -1; + struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000}; + int out_fd = -1; log_dbg("Allocating flow to %s on fd %d.", dst_name, fd); assert(dst_name); + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + if (ipcp_get_state() != IPCP_OPERATIONAL) { log_dbg("Won't allocate over non-operational IPCP."); - return -1; /* -ENOTENROLLED */ + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; } - pthread_mutex_lock(&ipcpi.alloc_lock); + assert(ipcpi.alloc_id == -1); out_fd = ipcp_flow_req_arr(getpid(), dst_name, cube); if (out_fd < 0) { @@ -238,6 +246,10 @@ static int ipcp_local_flow_alloc(int fd, local_data.in_out[out_fd] = fd; pthread_rwlock_unlock(&local_data.lock); + + ipcpi.alloc_id = out_fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + pthread_mutex_unlock(&ipcpi.alloc_lock); flow_set_add(local_data.flows, fd); @@ -250,8 +262,26 @@ static int ipcp_local_flow_alloc(int fd, static int ipcp_local_flow_alloc_resp(int fd, int response) { - int out_fd = -1; - int ret = -1; + struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000}; + int out_fd = -1; + int ret = -1; + + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + ipcpi.alloc_id = -1; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); pthread_rwlock_wrlock(&local_data.lock); @@ -278,7 +308,7 @@ static int ipcp_local_flow_alloc_resp(int fd, log_info("Flow allocation completed, fds (%d, %d).", out_fd, fd); - return ret; + return 0; } static int ipcp_local_flow_dealloc(int fd) diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 3191eac5..19c329af 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -542,6 +542,7 @@ static int np1_flow_dealloc(int fd) int fmgr_np1_alloc_resp(int fd, int response) { + struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; @@ -549,6 +550,23 @@ int fmgr_np1_alloc_resp(int fd, msg.response = response; msg.has_response = true; + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + ipcpi.alloc_id = -1; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + buf.len = flow_alloc_msg__get_packed_size(&msg); if (buf.len == 0) return -1; @@ -601,10 +619,11 @@ int fmgr_np1_dealloc(int fd) int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf) { - int ret = 0; - int fd; + struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; + int ret = 0; + int fd; flow_alloc_msg_t * msg; - qoscube_t cube; + qoscube_t cube; /* Depending on the message call the function in ipcp-dev.h */ @@ -617,6 +636,21 @@ int fmgr_np1_post_buf(cep_id_t cep_id, switch (msg->code) { case FLOW_ALLOC_CODE__FLOW_REQ: pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != -1 && + ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_dbg("Won't allocate over non-operational IPCP."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + assert(ipcpi.alloc_id == -1); + fd = ipcp_flow_req_arr(getpid(), msg->dst_name, msg->qoscube); @@ -633,6 +667,10 @@ int fmgr_np1_post_buf(cep_id_t cep_id, fmgr.np1_cep_id_to_fd[cep_id] = fd; pthread_rwlock_unlock(&fmgr.np1_flows_lock); + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + pthread_mutex_unlock(&ipcpi.alloc_lock); break; diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 1b6e02c2..20189e08 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -352,10 +352,22 @@ static int eth_llc_ipcp_sap_req(uint8_t r_sap, char * dst_name, qoscube_t cube) { - int fd; + struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000}; + int fd; pthread_mutex_lock(&ipcpi.alloc_lock); + while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_dbg("Won't allocate over non-operational IPCP."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + /* reply to IRM, called under lock to prevent race */ fd = ipcp_flow_req_arr(getpid(), dst_name, cube); if (fd < 0) { @@ -369,6 +381,9 @@ static int eth_llc_ipcp_sap_req(uint8_t r_sap, eth_llc_data.fd_to_ef[fd].r_sap = r_sap; memcpy(eth_llc_data.fd_to_ef[fd].r_addr, r_addr, MAC_SIZE); + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + pthread_rwlock_unlock(ð_llc_data.flows_lock); pthread_mutex_unlock(&ipcpi.alloc_lock); @@ -630,12 +645,12 @@ static void * eth_llc_ipcp_sdu_reader(void * o) static void * eth_llc_ipcp_sdu_writer(void * o) { + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; int fd; struct shm_du_buff * sdb; uint8_t ssap; uint8_t dsap; uint8_t r_addr[MAC_SIZE]; - struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; (void) o; @@ -956,9 +971,27 @@ static int eth_llc_ipcp_flow_alloc(int fd, static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) { - uint8_t ssap = 0; - uint8_t r_sap = 0; - uint8_t r_addr[MAC_SIZE]; + struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000}; + uint8_t ssap = 0; + uint8_t r_sap = 0; + uint8_t r_addr[MAC_SIZE]; + + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + ipcpi.alloc_id = -1; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); pthread_rwlock_wrlock(ð_llc_data.flows_lock); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index ea3d1f88..b1a88fae 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -232,11 +232,11 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, char * dst_name, qoscube_t cube) { - int skfd; - int fd; - + struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; struct sockaddr_in f_saddr; socklen_t f_saddr_len = sizeof(f_saddr); + int skfd; + int fd; log_dbg("Port request arrived from UDP port %d", ntohs(c_saddr->sin_port)); @@ -271,6 +271,17 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, pthread_mutex_lock(&ipcpi.alloc_lock); + while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_dbg("Won't allocate over non-operational IPCP."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + /* reply to IRM */ fd = ipcp_flow_req_arr(getpid(), dst_name, cube); if (fd < 0) { @@ -291,6 +302,9 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, pthread_rwlock_unlock(&ipcpi.state_lock); pthread_mutex_unlock(&ipcpi.alloc_lock); + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + log_dbg("Pending allocation request, fd %d, UDP port (%d, %d).", fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); @@ -1063,14 +1077,32 @@ static int ipcp_udp_flow_alloc(int fd, static int ipcp_udp_flow_alloc_resp(int fd, int response) { - int skfd = -1; + struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; + int skfd = -1; struct sockaddr_in f_saddr; struct sockaddr_in r_saddr; - socklen_t len = sizeof(r_saddr); + socklen_t len = sizeof(r_saddr); if (response) return 0; + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + ipcpi.alloc_id = -1; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&udp_data.flows_lock); |