diff options
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 97 |
1 files changed, 57 insertions, 40 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 819c8149..a75f3ce3 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -90,7 +90,6 @@ struct shim_ap_data { struct shm_du_map * dum; struct bmp * fds; struct shm_ap_rbuff * rb; - rw_lock_t data_lock; struct flow flows[AP_MAX_FLOWS]; rw_lock_t flows_lock; @@ -99,6 +98,9 @@ struct shim_ap_data { pthread_t sduloop; pthread_t handler; pthread_t sdu_reader; + + bool fd_set_sync; + pthread_mutex_t fd_set_lock; } * _ap_instance; static int shim_ap_init(char * ap_name) @@ -155,6 +157,7 @@ static int shim_ap_init(char * ap_name) } rw_lock_init(&_ap_instance->flows_lock); + pthread_mutex_init(&_ap_instance->fd_set_lock, NULL); return 0; } @@ -226,7 +229,7 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) e.index = index; - rw_lock_rdlock(&_ap_instance->flows_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); e.port_id = _ap_instance->flows[fd].port_id; @@ -492,52 +495,47 @@ static int ipcp_udp_port_alloc_reply(int src_udp_port, port_id = _ap_instance->flows[fd].port_id; - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - - if ((ret = ipcp_flow_alloc_reply(getpid(), - port_id, - response)) < 0) { - return -1; /* -EPIPE */ - } - - rw_lock_rdlock(&_ipcp->state_lock); - rw_lock_wrlock(&_ap_instance->flows_lock); - if (response) { _ap_instance->flows[fd].port_id = -1; _ap_instance->flows[fd].rb = NULL; + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); _ap_instance->flows[fd].state = FLOW_NULL; - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - close(fd); - return 0; - } + } else { + /* get the original address with the LISTEN PORT */ + if (getpeername(fd, + (struct sockaddr *) &t_saddr, + &t_saddr_len) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d has no peer.", port_id); + return -1; + } - /* get the original address with the LISTEN PORT */ - if (getpeername(fd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - LOG_DBGF("Flow with port_id %d has no peer.", port_id); - return 0; - }; + /* connect to the flow udp port */ + t_saddr.sin_port = src_udp_port; - /* connect to the flow udp port */ - t_saddr.sin_port = src_udp_port; + if (connect(fd, + (struct sockaddr *) &t_saddr, + sizeof(t_saddr)) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + close(fd); + return -1; + } - if (connect(fd, - (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - close(fd); - return -1; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; } - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); + + if ((ret = ipcp_flow_alloc_reply(getpid(), + port_id, + response)) < 0) { + return -1; /* -EPIPE */ + } + LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).", ntohs(src_udp_port), ntohs(dst_udp_port)); @@ -635,7 +633,12 @@ static void * ipcp_udp_sdu_reader() rw_lock_rdlock(&_ap_instance->flows_lock); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + read_fds = shim_data(_ipcp)->flow_fd_s; + _ap_instance->fd_set_sync = false; + + pthread_mutex_unlock(&_ap_instance->fd_set_lock); rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); @@ -1240,7 +1243,7 @@ static int ipcp_udp_flow_alloc(pid_t n_pid, _ap_instance->flows[fd].port_id = -1; _ap_instance->flows[fd].state = FLOW_NULL; shm_ap_rbuff_close(_ap_instance->flows[fd].rb); - _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].rb = NULL; rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); @@ -1260,6 +1263,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid, struct sockaddr_in f_saddr; struct sockaddr_in r_saddr; socklen_t len = sizeof(r_saddr); + bool fd_wait = true; if (response) return 0; @@ -1316,8 +1320,20 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid, _ap_instance->flows[fd].state = FLOW_ALLOCATED; _ap_instance->flows[fd].rb = rb; + pthread_mutex_lock(&_ap_instance->fd_set_lock); + + _ap_instance->fd_set_sync = true; FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + + while (fd_wait) { + sched_yield(); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + fd_wait = _ap_instance->fd_set_sync; + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + } + rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); @@ -1369,12 +1385,13 @@ static int ipcp_udp_flow_dealloc(int port_id) rb = _ap_instance->flows[fd].rb; _ap_instance->flows[fd].rb = NULL; - if (rb != NULL) - shm_ap_rbuff_close(rb); - FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); rw_lock_unlock(&_ap_instance->flows_lock); + + if (rb != NULL) + shm_ap_rbuff_close(rb); + rw_lock_unlock(&_ipcp->state_lock); close(fd); |