diff options
Diffstat (limited to 'src/ipcpd/shim-udp')
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 97 | 
1 files changed, 57 insertions, 40 deletions
| diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 819c8149..a75f3ce3 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -90,7 +90,6 @@ struct shim_ap_data {          struct shm_du_map *   dum;          struct bmp *          fds;          struct shm_ap_rbuff * rb; -        rw_lock_t             data_lock;          struct flow           flows[AP_MAX_FLOWS];          rw_lock_t             flows_lock; @@ -99,6 +98,9 @@ struct shim_ap_data {          pthread_t             sduloop;          pthread_t             handler;          pthread_t             sdu_reader; + +        bool                  fd_set_sync; +        pthread_mutex_t       fd_set_lock;  } * _ap_instance;  static int shim_ap_init(char * ap_name) @@ -155,6 +157,7 @@ static int shim_ap_init(char * ap_name)          }          rw_lock_init(&_ap_instance->flows_lock); +        pthread_mutex_init(&_ap_instance->fd_set_lock, NULL);          return 0;  } @@ -226,7 +229,7 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)          e.index = index; -        rw_lock_rdlock(&_ap_instance->flows_lock); +        rw_lock_wrlock(&_ap_instance->flows_lock);          e.port_id = _ap_instance->flows[fd].port_id; @@ -492,52 +495,47 @@ static int ipcp_udp_port_alloc_reply(int src_udp_port,          port_id = _ap_instance->flows[fd].port_id; -        rw_lock_unlock(&_ap_instance->flows_lock); -        rw_lock_unlock(&_ipcp->state_lock); - -        if ((ret = ipcp_flow_alloc_reply(getpid(), -                                         port_id, -                                         response)) < 0) { -                return -1; /* -EPIPE */ -        } - -        rw_lock_rdlock(&_ipcp->state_lock); -        rw_lock_wrlock(&_ap_instance->flows_lock); -          if (response) {                  _ap_instance->flows[fd].port_id = -1;                  _ap_instance->flows[fd].rb      = NULL; +                shm_ap_rbuff_close(_ap_instance->flows[fd].rb);                  _ap_instance->flows[fd].state   = FLOW_NULL; -                rw_lock_unlock(&_ap_instance->flows_lock); -                rw_lock_unlock(&_ipcp->state_lock); -                close(fd); -                return 0; -        } +        } else { +                /* get the original address with the LISTEN PORT */ +                if (getpeername(fd, +                                (struct sockaddr *) &t_saddr, +                                &t_saddr_len) < 0) { +                        rw_lock_unlock(&_ap_instance->flows_lock); +                        rw_lock_unlock(&_ipcp->state_lock); +                        LOG_DBGF("Flow with port_id %d has no peer.", port_id); +                        return -1; +                } -        /* get the original address with the LISTEN PORT */ -        if (getpeername(fd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { -                rw_lock_unlock(&_ap_instance->flows_lock); -                rw_lock_unlock(&_ipcp->state_lock); -                LOG_DBGF("Flow with port_id %d has no peer.", port_id); -                return 0; -        }; +                /* connect to the flow udp port */ +                t_saddr.sin_port = src_udp_port; -        /* connect to the flow udp port */ -        t_saddr.sin_port = src_udp_port; +                if (connect(fd, +                            (struct sockaddr *) &t_saddr, +                            sizeof(t_saddr)) < 0) { +                        rw_lock_unlock(&_ap_instance->flows_lock); +                        rw_lock_unlock(&_ipcp->state_lock); +                        close(fd); +                        return -1; +                } -        if (connect(fd, -                    (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { -                rw_lock_unlock(&_ap_instance->flows_lock); -                rw_lock_unlock(&_ipcp->state_lock); -                close(fd); -                return -1; +                _ap_instance->flows[fd].state   = FLOW_ALLOCATED;          } -        _ap_instance->flows[fd].state = FLOW_ALLOCATED; -          rw_lock_unlock(&_ap_instance->flows_lock);          rw_lock_unlock(&_ipcp->state_lock); + +        if ((ret = ipcp_flow_alloc_reply(getpid(), +                                         port_id, +                                         response)) < 0) { +                return -1; /* -EPIPE */ +        } +          LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).",                   ntohs(src_udp_port), ntohs(dst_udp_port)); @@ -635,7 +633,12 @@ static void * ipcp_udp_sdu_reader()                  rw_lock_rdlock(&_ap_instance->flows_lock); +                pthread_mutex_lock(&_ap_instance->fd_set_lock); +                  read_fds = shim_data(_ipcp)->flow_fd_s; +                _ap_instance->fd_set_sync = false; + +                pthread_mutex_unlock(&_ap_instance->fd_set_lock);                  rw_lock_unlock(&_ap_instance->flows_lock);                  rw_lock_unlock(&_ipcp->state_lock); @@ -1240,7 +1243,7 @@ static int ipcp_udp_flow_alloc(pid_t         n_pid,                  _ap_instance->flows[fd].port_id = -1;                  _ap_instance->flows[fd].state   = FLOW_NULL;                  shm_ap_rbuff_close(_ap_instance->flows[fd].rb); -                _ap_instance->flows[fd].rb      = NULL; +                 _ap_instance->flows[fd].rb     = NULL;                  rw_lock_unlock(&_ap_instance->flows_lock);                  rw_lock_unlock(&_ipcp->state_lock); @@ -1260,6 +1263,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid,          struct sockaddr_in f_saddr;          struct sockaddr_in r_saddr;          socklen_t len = sizeof(r_saddr); +        bool fd_wait = true;          if (response)                  return 0; @@ -1316,8 +1320,20 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid,          _ap_instance->flows[fd].state = FLOW_ALLOCATED;          _ap_instance->flows[fd].rb    = rb; +        pthread_mutex_lock(&_ap_instance->fd_set_lock); + +        _ap_instance->fd_set_sync =  true;          FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); +        pthread_mutex_unlock(&_ap_instance->fd_set_lock); + +        while (fd_wait) { +                sched_yield(); +                pthread_mutex_lock(&_ap_instance->fd_set_lock); +                fd_wait = _ap_instance->fd_set_sync; +                pthread_mutex_unlock(&_ap_instance->fd_set_lock); +        } +          rw_lock_unlock(&_ap_instance->flows_lock);          rw_lock_unlock(&_ipcp->state_lock); @@ -1369,12 +1385,13 @@ static int ipcp_udp_flow_dealloc(int port_id)          rb = _ap_instance->flows[fd].rb;          _ap_instance->flows[fd].rb      = NULL; -        if (rb != NULL) -                shm_ap_rbuff_close(rb); -          FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);          rw_lock_unlock(&_ap_instance->flows_lock); + +        if (rb != NULL) +                shm_ap_rbuff_close(rb); +          rw_lock_unlock(&_ipcp->state_lock);          close(fd); | 
