diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/local/main.c | 18 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 54 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto | 5 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 85 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/shim_udp_messages.proto | 1 | ||||
| -rw-r--r-- | src/irmd/main.c | 10 | ||||
| -rw-r--r-- | src/lib/dev.c | 55 | ||||
| -rw-r--r-- | src/lib/shm_rbuff.c | 47 | 
8 files changed, 92 insertions, 183 deletions
| diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index a8d5c273..412795ec 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -110,7 +110,7 @@ static void * ipcp_local_sdu_loop(void * o)                  while ((fd = fqueue_next(local_data.fq)) >= 0) {                          idx = local_flow_read(fd); -                        assert((size_t) idx < (SHM_BUFFER_SIZE)); +                        assert(idx < (SHM_BUFFER_SIZE));                          fd = local_data.in_out[fd]; @@ -243,13 +243,13 @@ static int ipcp_local_flow_alloc(int           fd,          pthread_rwlock_wrlock(&local_data.lock); -        flow_set_add(local_data.flows, fd); -          out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);          local_data.in_out[fd]  = out_fd;          local_data.in_out[out_fd] = fd; +        flow_set_add(local_data.flows, fd); +          pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -291,24 +291,22 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)  static int ipcp_local_flow_dealloc(int fd)  { -        struct timespec t = {0, 10000}; -          assert(!(fd < 0)); -        flow_set_del(local_data.flows, fd); - -        while (flow_dealloc(fd) == -EBUSY) -                nanosleep(&t, NULL); +        ipcp_flow_fini(fd);          pthread_rwlock_rdlock(&ipcpi.state_lock);          pthread_rwlock_wrlock(&local_data.lock); -        flow_cntl(local_data.in_out[fd], FLOW_F_SETFL, FLOW_O_WRONLY); +        flow_set_del(local_data.flows, fd); +          local_data.in_out[fd] = -1;          pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock); +        flow_dealloc(fd); +          LOG_INFO("Flow with fd %d deallocated.", fd);          return 0; diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index f6cded2b..b7b9f783 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -348,17 +348,6 @@ static int eth_llc_ipcp_sap_alloc_resp(uint8_t * dst_addr,          return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);  } -static int eth_llc_ipcp_sap_dealloc(uint8_t * dst_addr, uint8_t ssap) -{ -        shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT; - -        msg.code     = SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC; -        msg.has_ssap = true; -        msg.ssap     = ssap; - -        return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr); -} -  static int eth_llc_ipcp_sap_req(uint8_t   r_sap,                                  uint8_t * r_addr,                                  char *    dst_name, @@ -427,29 +416,6 @@ static int eth_llc_ipcp_sap_alloc_reply(uint8_t   ssap,  } -static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap) -{ -        int fd = -1; - -        pthread_rwlock_rdlock(&ipcpi.state_lock); -        pthread_rwlock_wrlock(ð_llc_data.flows_lock); - -        fd = eth_llc_data.ef_to_fd[ssap]; -        if (fd < 0) { -                pthread_rwlock_unlock(ð_llc_data.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_DBG("Flow already deallocated."); -                return 0; -        } - -        pthread_rwlock_unlock(ð_llc_data.flows_lock); -        pthread_rwlock_unlock(&ipcpi.state_lock); - -        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); - -        return 0; -} -  static int eth_llc_ipcp_name_query_req(char * name, uint8_t * r_addr)  {          shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT; @@ -509,9 +475,6 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)                                               msg->dsap,                                               msg->response);                  break; -        case SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC: -                eth_llc_ipcp_flow_dealloc_req(msg->ssap); -                break;          case SHIM_ETH_LLC_MSG_CODE__NAME_QUERY_REQ:                  eth_llc_ipcp_name_query_req(msg->dst_name, r_addr);                  break; @@ -1074,25 +1037,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)  static int eth_llc_ipcp_flow_dealloc(int fd)  { -        struct timespec t = {0, 10000}; -          uint8_t sap; -        uint8_t r_sap;          uint8_t addr[MAC_SIZE]; -        int ret; - -        flow_set_del(eth_llc_data.np1_flows, fd); -        while (flow_dealloc(fd) == -EBUSY) -                nanosleep(&t, NULL); +        ipcp_flow_fini(fd);          pthread_rwlock_rdlock(&ipcpi.state_lock);          pthread_rwlock_wrlock(ð_llc_data.flows_lock); -        r_sap = eth_llc_data.fd_to_ef[fd].r_sap; +        flow_set_del(eth_llc_data.np1_flows, fd); +          sap = eth_llc_data.fd_to_ef[fd].sap;          memcpy(addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); -          bmp_release(eth_llc_data.saps, sap);          eth_llc_data.fd_to_ef[fd].sap = -1;          eth_llc_data.fd_to_ef[fd].r_sap = -1; @@ -1103,9 +1059,7 @@ static int eth_llc_ipcp_flow_dealloc(int fd)          pthread_rwlock_unlock(ð_llc_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        ret = eth_llc_ipcp_sap_dealloc(addr, r_sap); -        if (ret < 0) -                LOG_DBG("Could not notify remote."); +        flow_dealloc(fd);          LOG_DBG("Flow with fd %d deallocated.", fd); diff --git a/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto b/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto index 4d027d98..045db5c2 100644 --- a/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto +++ b/src/ipcpd/shim-eth-llc/shim_eth_llc_messages.proto @@ -1,9 +1,8 @@  enum shim_eth_llc_msg_code {          FLOW_REQ         = 1;          FLOW_REPLY       = 2; -        FLOW_DEALLOC     = 3; -        NAME_QUERY_REQ   = 4; -        NAME_QUERY_REPLY = 5; +        NAME_QUERY_REQ   = 3; +        NAME_QUERY_REPLY = 4;  };  message shim_eth_llc_msg { diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index ba2805c5..e4ab4fac 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -230,17 +230,6 @@ static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr,          return send_shim_udp_msg(&msg, dst_ip_addr);  } -static int ipcp_udp_port_dealloc(uint32_t dst_ip_addr, -                                 uint16_t src_udp_port) -{ -        shim_udp_msg_t msg = SHIM_UDP_MSG__INIT; - -        msg.code             = SHIM_UDP_MSG_CODE__FLOW_DEALLOC; -        msg.src_udp_port     = src_udp_port; - -        return send_shim_udp_msg(&msg, dst_ip_addr); -} -  static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,                               char * dst_name,                               char * src_ae_name) @@ -375,46 +364,6 @@ static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port,          return ret;  } -static int ipcp_udp_flow_dealloc_req(uint16_t udp_port) -{ -        int skfd = -1; -        int fd   = -1; - -        pthread_rwlock_rdlock(&ipcpi.state_lock); -        pthread_rwlock_wrlock(&udp_data.flows_lock); - -        fd = udp_port_to_fd(udp_port); -        if (fd < 0) { -                pthread_rwlock_unlock(&udp_data.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_DBG("Could not find flow on UDP port %d.", -                         ntohs(udp_port)); -                return 0; -        } - -        skfd = udp_data.fd_to_uf[fd].skfd; - -        udp_data.uf_to_fd[skfd]    = -1; -        udp_data.fd_to_uf[fd].udp  = -1; -        udp_data.fd_to_uf[fd].skfd = -1; - -        pthread_rwlock_unlock(&udp_data.flows_lock); -        pthread_rwlock_rdlock(&udp_data.flows_lock); - -        clr_fd(skfd); - -        pthread_rwlock_unlock(&udp_data.flows_lock); -        pthread_rwlock_unlock(&ipcpi.state_lock); - -        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); - -        close(skfd); - -        LOG_DBG("Flow with fd %d deallocated.", fd); - -        return 0; -} -  static void * ipcp_udp_listener(void * o)  {          uint8_t buf[SHIM_UDP_MSG_SIZE]; @@ -456,9 +405,6 @@ static void * ipcp_udp_listener(void * o)                                                    msg->dst_udp_port,                                                    msg->response);                          break; -                case SHIM_UDP_MSG_CODE__FLOW_DEALLOC: -                        ipcp_udp_flow_dealloc_req(msg->src_udp_port); -                        break;                  default:                          LOG_ERR("Unknown message received %d.", msg->code);                          shim_udp_msg__free_unpacked(msg, NULL); @@ -1153,15 +1099,10 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)  static int ipcp_udp_flow_dealloc(int fd)  {          int skfd = -1; -        uint16_t remote_udp; -        struct timespec t = {0, 10000}; -        struct sockaddr_in    r_saddr; -        socklen_t             r_saddr_len = sizeof(r_saddr); -        flow_set_del(udp_data.np1_flows, fd); +        ipcp_flow_fini(fd); -        while (flow_dealloc(fd) == -EBUSY) -                nanosleep(&t, NULL); +        flow_set_del(udp_data.np1_flows, fd);          pthread_rwlock_rdlock(&ipcpi.state_lock);          pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1180,28 +1121,10 @@ static int ipcp_udp_flow_dealloc(int fd)          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        if (getpeername(skfd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) { -                LOG_DBG("Socket with fd %d has no peer.", skfd); -                close(skfd); -                return 0; -        } - -        remote_udp       = r_saddr.sin_port; -        r_saddr.sin_port = LISTEN_PORT; - -        if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { -                close(skfd); -                return 0 ; -        } - -        if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr, remote_udp) < 0) { -                LOG_DBG("Could not notify remote."); -                close(skfd); -                return 0; -        } -          close(skfd); +        flow_dealloc(fd); +          LOG_DBG("Flow with fd %d deallocated.", fd);          return 0; diff --git a/src/ipcpd/shim-udp/shim_udp_messages.proto b/src/ipcpd/shim-udp/shim_udp_messages.proto index bd9bd3aa..bd23f8eb 100644 --- a/src/ipcpd/shim-udp/shim_udp_messages.proto +++ b/src/ipcpd/shim-udp/shim_udp_messages.proto @@ -1,7 +1,6 @@  enum shim_udp_msg_code {          FLOW_REQ     = 1;          FLOW_REPLY   = 2; -        FLOW_DEALLOC = 3;  };  message shim_udp_msg { diff --git a/src/irmd/main.c b/src/irmd/main.c index 1ac989de..548ab1db 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1026,6 +1026,8 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name)          if (dst_ae_name != NULL)                  *dst_ae_name = re->req_ae_name; +        LOG_INFO("Flow on port_id %d allocated.", f->port_id); +          pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_unlock(&irmd->state_lock); @@ -1156,7 +1158,7 @@ static struct irm_flow * flow_alloc(pid_t  api,          pthread_rwlock_wrlock(&irmd->flows_lock);          port_id = f->port_id = bmp_allocate(irmd->port_ids); -        if (!bmp_is_id_valid(irmd->port_ids, (ssize_t) port_id)) { +        if (!bmp_is_id_valid(irmd->port_ids, port_id)) {                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  LOG_ERR("Could not allocate port_id."); @@ -1233,6 +1235,7 @@ static int flow_alloc_res(int port_id)          }          if (irm_flow_get_state(f) == FLOW_ALLOCATED) { +                LOG_INFO("Flow on port_id %d allocated.", port_id);                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  return 0; @@ -1348,7 +1351,6 @@ static struct irm_flow * flow_req_arr(pid_t  api,          struct pid_el * c_api;          pid_t h_api = -1; -        int port_id = -1;          LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.",                   api, dst_name, ae_name); @@ -1467,7 +1469,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,          pthread_rwlock_unlock(&irmd->reg_lock);          pthread_rwlock_wrlock(&irmd->flows_lock); -        port_id = f->port_id = bmp_allocate(irmd->port_ids); +        f->port_id = bmp_allocate(irmd->port_ids);          if (!bmp_is_id_valid(irmd->port_ids, f->port_id)) {                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock); @@ -1532,8 +1534,6 @@ static struct irm_flow * flow_req_arr(pid_t  api,          pthread_mutex_unlock(&re->state_lock); -        LOG_INFO("Flow on port_id %d allocated.", port_id); -          return f;  } diff --git a/src/lib/dev.c b/src/lib/dev.c index 018cb692..a0c47403 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -612,12 +612,6 @@ int flow_dealloc(int fd)                  return -ENOTALLOC;          } -        if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -EBUSY; -        } -          msg.port_id = ai.flows[fd].port_id;          pthread_rwlock_unlock(&ai.flows_lock); @@ -1285,11 +1279,7 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)                  return -EPERM;          } -        if (ai.flows[fd].tx_rb == NULL) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -EPERM; -        } +        assert(ai.flows[fd].tx_rb);          idx = shm_du_buff_get_idx(sdb); @@ -1302,9 +1292,38 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)          return 0;  } +int ipcp_flow_fini(int fd) +{ +        struct shm_rbuff * rb; + +        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        rb = ai.flows[fd].rx_rb; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        shm_rbuff_fini(rb); + +        return 0; +} +  ssize_t local_flow_read(int fd)  { -        return shm_rbuff_read(ai.flows[fd].rx_rb); +        ssize_t ret; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        ret = shm_rbuff_read(ai.flows[fd].rx_rb); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return ret;  }  int local_flow_write(int fd, size_t idx) @@ -1315,11 +1334,7 @@ int local_flow_write(int fd, size_t idx)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai.flows[fd].tx_rb == NULL) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -EPERM; -        } +        assert(ai.flows[fd].tx_rb);          shm_rbuff_write(ai.flows[fd].tx_rb, idx); @@ -1338,11 +1353,7 @@ int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai.flows[fd].rx_rb == NULL) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -EPERM; -        } +        assert(ai.flows[fd].rx_rb);          idx = shm_rbuff_read(ai.flows[fd].rx_rb);          if (idx < 0) { diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 8b2e9229..301669e7 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -43,6 +43,8 @@  #include <stdbool.h>  #define FN_MAX_CHARS 255 +#define RB_CLOSED -1 +#define RB_OPEN 0  #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t)          \                               + 2 * sizeof(size_t) + sizeof(int8_t)      \ @@ -144,7 +146,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          pthread_cond_init(rb->add, &cattr);          pthread_cond_init(rb->del, &cattr); -        *rb->acl = 0; +        *rb->acl = RB_OPEN;          *rb->head = 0;          *rb->tail = 0; @@ -291,6 +293,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          ret = *tail_el_ptr(rb);          *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); +        pthread_cond_broadcast(rb->del);          pthread_mutex_unlock(rb->lock); @@ -353,10 +356,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          return idx;  } -int shm_rbuff_block(struct shm_rbuff * rb) +void shm_rbuff_block(struct shm_rbuff * rb)  { -        int ret = 0; -          assert(rb);  #ifdef __APPLE__ @@ -367,14 +368,9 @@ int shm_rbuff_block(struct shm_rbuff * rb)                  pthread_mutex_consistent(rb->lock);          }  #endif -        *rb->acl = -1; - -        if (!shm_rbuff_empty(rb)) -                ret = -EBUSY; +        *rb->acl = RB_CLOSED;          pthread_mutex_unlock(rb->lock); - -        return ret;  }  void shm_rbuff_unblock(struct shm_rbuff * rb) @@ -389,11 +385,40 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)                  pthread_mutex_consistent(rb->lock);          }  #endif -        *rb->acl = 0; /* open */ +        *rb->acl = RB_OPEN;          pthread_mutex_unlock(rb->lock);  } +void shm_rbuff_fini(struct shm_rbuff * rb) +{ +        assert(rb); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        assert(*rb->acl == RB_CLOSED); + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while (!shm_rbuff_empty(rb)) +#ifdef __APPLE__ +                pthread_cond_wait(rb->del, rb->lock); +#else +                if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) { +                        LOG_DBG("Recovering dead mutex."); +                        pthread_mutex_consistent(rb->lock); +                } +#endif +        pthread_cleanup_pop(true); +} +  void shm_rbuff_reset(struct shm_rbuff * rb)  {          assert(rb); | 
