diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/ipcp.c | 16 | ||||
| -rw-r--r-- | src/ipcpd/local/main.c | 20 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 30 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 58 | ||||
| -rw-r--r-- | src/lib/dev.c | 185 | 
5 files changed, 168 insertions, 141 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index b7eff3fb..4dff86f4 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -114,7 +114,6 @@ void ipcp_fini()          ipcp_data_destroy(ipcpi.data);          pthread_cond_destroy(&ipcpi.state_cond); -        pthread_mutex_destroy(&ipcpi.state_mtx);          pthread_rwlock_destroy(&ipcpi.state_lock);  } @@ -349,7 +348,7 @@ void * ipcp_main_loop(void * o)                          }                          fd = np1_flow_alloc(msg->api, msg->port_id);                          if (fd < 0) { -                                LOG_ERR("Could not get fd for port_id. %d", +                                LOG_ERR("Failed allocating fd on port_id %d.",                                          msg->port_id);                                  ret_msg.has_result = true;                                  ret_msg.result = -1; @@ -362,11 +361,6 @@ void * ipcp_main_loop(void * o)                                                             msg->dst_name,                                                             msg->src_ae_name,                                                             msg->qos_cube); -                        if (ret_msg.result < 0) { -                                LOG_DBG("Deallocate failed on port_id %d.", -                                        msg->port_id); -                                flow_dealloc(fd); -                        }                          break;                  case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP:                          if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) { @@ -375,10 +369,10 @@ void * ipcp_main_loop(void * o)                          }                          if (!msg->response) { -                                fd = np1_flow_resp(msg->api, msg->port_id); +                                fd = np1_flow_resp(msg->port_id);                                  if (fd < 0) { -                                        LOG_ERR("Could not get fd for port_id %d.", -                                                msg->port_id); +                                        LOG_WARN("Port_id %d is not known.", +                                                 msg->port_id);                                          ret_msg.has_result = true;                                          ret_msg.result = -1;                                          break; @@ -397,7 +391,7 @@ void * ipcp_main_loop(void * o)                          fd = np1_flow_dealloc(msg->port_id);                          if (fd < 0) { -                                LOG_ERR("Could not deallocate port_id %d.", +                                LOG_WARN("Could not deallocate port_id %d.",                                          msg->port_id);                                  ret_msg.has_result = true;                                  ret_msg.result = -1; diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index f95eeae4..f0c85084 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -88,14 +88,10 @@ static void * ipcp_local_sdu_loop(void * o)          (void) o; -        while (true) { +        while (flow_event_wait(local_data.flows, local_data.fq, &timeout)) {                  int fd;                  ssize_t idx; -                if (flow_event_wait(local_data.flows, local_data.fq, &timeout) -                    == -ETIMEDOUT) -                        continue; -                  pthread_rwlock_rdlock(&ipcpi.state_lock);                  if (ipcp_get_state() != IPCP_ENROLLED) { @@ -233,7 +229,8 @@ static int ipcp_local_flow_alloc(int           fd,          LOG_DBG("Allocating flow to %s on fd %d.", dst_name, fd); -        assert(dst_name || src_ae_name); +        assert(dst_name); +        assert(src_ae_name);          pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -298,17 +295,24 @@ static int ipcp_local_flow_dealloc(int fd)          ipcp_flow_fini(fd);          pthread_rwlock_rdlock(&ipcpi.state_lock); + +        if (ipcp_get_state() != IPCP_ENROLLED) { +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_DBG("Won't register with non-enrolled IPCP."); +                return -1; /* -ENOTENROLLED */ +        } +          pthread_rwlock_wrlock(&local_data.lock);          flow_set_del(local_data.flows, fd);          local_data.in_out[fd] = -1; +        flow_dealloc(fd); +          pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        flow_dealloc(fd); -          LOG_INFO("Flow with fd %d deallocated.", fd);          return 0; diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 0d4c3903..3f3c0e1e 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -624,18 +624,22 @@ static void * eth_llc_ipcp_sdu_writer(void * o)          (void) o; -        while (true) { -                if (flow_event_wait(eth_llc_data.np1_flows, -                                    eth_llc_data.fq, -                                    &timeout) == -ETIMEDOUT) -                        continue; +        while (flow_event_wait(eth_llc_data.np1_flows, +                               eth_llc_data.fq, +                               &timeout)) { +                pthread_rwlock_rdlock(&ipcpi.state_lock); + +                if (ipcp_get_state() != IPCP_ENROLLED) { +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        return (void *) -1; /* -ENOTENROLLED */ +                }                  while ((fd = fqueue_next(eth_llc_data.fq)) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  LOG_ERR("Bad read from fd %d.", fd);                                  continue;                          } -                        pthread_rwlock_rdlock(&ipcpi.state_lock); +                          pthread_rwlock_rdlock(ð_llc_data.flows_lock);                          ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); @@ -645,7 +649,6 @@ static void * eth_llc_ipcp_sdu_writer(void * o)                                 MAC_SIZE);                          pthread_rwlock_unlock(ð_llc_data.flows_lock); -                        pthread_rwlock_unlock(&ipcpi.state_lock);                          eth_llc_ipcp_send_frame(r_addr, dsap, ssap,                                                  shm_du_buff_head(sdb), @@ -653,6 +656,8 @@ static void * eth_llc_ipcp_sdu_writer(void * o)                                                  - shm_du_buff_head(sdb));                          ipcp_flow_del(sdb);                  } + +                pthread_rwlock_unlock(&ipcpi.state_lock);          }          return (void *) 1; @@ -1045,6 +1050,13 @@ static int eth_llc_ipcp_flow_dealloc(int fd)          ipcp_flow_fini(fd);          pthread_rwlock_rdlock(&ipcpi.state_lock); + +        if (ipcp_get_state() != IPCP_ENROLLED) { +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_DBG("Won't register with non-enrolled IPCP."); +                return -1; /* -ENOTENROLLED */ +        } +          pthread_rwlock_wrlock(ð_llc_data.flows_lock);          flow_set_del(eth_llc_data.np1_flows, fd); @@ -1058,11 +1070,11 @@ static int eth_llc_ipcp_flow_dealloc(int fd)          eth_llc_data.ef_to_fd[sap] = -1; +        flow_dealloc(fd); +          pthread_rwlock_unlock(ð_llc_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        flow_dealloc(fd); -          LOG_DBG("Flow with fd %d deallocated.", fd);          return 0; diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 503dbd0b..eff0bd94 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -452,7 +452,6 @@ static void * ipcp_udp_sdu_reader(void * o)                                  continue;                          flags = fcntl(skfd, F_GETFL, 0);                          fcntl(skfd, F_SETFL, flags | O_NONBLOCK); -                        fd = udp_data.uf_to_fd[skfd];                          n = sizeof(r_saddr);                          if ((n = recvfrom(skfd,                                            &buf, @@ -462,7 +461,14 @@ static void * ipcp_udp_sdu_reader(void * o)                                            (unsigned *) &n)) <= 0)                                  continue; +                        pthread_rwlock_rdlock(&ipcpi.state_lock); +                        pthread_rwlock_rdlock(&udp_data.flows_lock); + +                        fd = udp_data.uf_to_fd[skfd];                          flow_write(fd, buf, n); + +                        pthread_rwlock_unlock(&udp_data.flows_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                  }          } @@ -477,11 +483,16 @@ static void * ipcp_udp_sdu_loop(void * o)          (void) o; -        while (true) { -                if (flow_event_wait(udp_data.np1_flows, -                                    udp_data.fq, -                                    &timeout)  == -ETIMEDOUT) -                        continue; +        while (flow_event_wait(udp_data.np1_flows, udp_data.fq, &timeout)) { +                pthread_rwlock_rdlock(&ipcpi.state_lock); + +                if (ipcp_get_state() != IPCP_ENROLLED) { +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        return (void *) -1; /* -ENOTENROLLED */ +                } + + +                pthread_rwlock_rdlock(&udp_data.flows_lock);                  while ((fd = fqueue_next(udp_data.fq)) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) { @@ -489,19 +500,18 @@ static void * ipcp_udp_sdu_loop(void * o)                                  continue;                          } -                        pthread_rwlock_rdlock(&ipcpi.state_lock); -                        pthread_rwlock_rdlock(&udp_data.flows_lock); -                          if (send(udp_data.fd_to_uf[fd].skfd,                                   shm_du_buff_head(sdb),                                   shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),                                   0) < 0)                                  LOG_ERR("Failed to send SDU."); -                        pthread_rwlock_unlock(&udp_data.flows_lock); -                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        ipcp_flow_del(sdb); +                } -                        ipcp_flow_del(sdb);                } + +                pthread_rwlock_unlock(&udp_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);          }          return (void *) 1; @@ -948,8 +958,8 @@ static int ipcp_udp_flow_alloc(int           fd,          LOG_DBG("Allocating flow to %s.", dst_name); -        if (dst_name == NULL || src_ae_name == NULL) -                return -1; +        assert(dst_name); +        assert(src_ae_name);          if (strlen(dst_name) > 255              || strlen(src_ae_name) > 255) { @@ -1101,32 +1111,36 @@ static int ipcp_udp_flow_dealloc(int fd)          ipcp_flow_fini(fd); -        flow_set_del(udp_data.np1_flows, fd); -          pthread_rwlock_rdlock(&ipcpi.state_lock); + +        if (ipcp_get_state() != IPCP_ENROLLED) { +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_DBG("Won't register with non-enrolled IPCP."); +                return -1; /* -ENOTENROLLED */ +        } +          pthread_rwlock_wrlock(&udp_data.flows_lock); +        flow_set_del(udp_data.np1_flows, fd); +          skfd = udp_data.fd_to_uf[fd].skfd;          udp_data.uf_to_fd[skfd]    = -1;          udp_data.fd_to_uf[fd].udp  = -1;          udp_data.fd_to_uf[fd].skfd = -1; +        close(skfd); +          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_rdlock(&udp_data.flows_lock);          clr_fd(skfd); -        pthread_rwlock_unlock(&udp_data.flows_lock); -        pthread_rwlock_wrlock(&udp_data.flows_lock); - -        close(skfd); +        flow_dealloc(fd);          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        flow_dealloc(fd); -          LOG_DBG("Flow with fd %d deallocated.", fd);          return 0; diff --git a/src/lib/dev.c b/src/lib/dev.c index 4b97428e..fc8739a2 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -388,6 +388,14 @@ int flow_accept(char ** ae_name, struct qos_spec * qos)                  return -1;          } +        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                reset_flow(fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } +          ai.flows[fd].set = shm_flow_set_open(recv_msg->api);          if (ai.flows[fd].set == NULL) {                  reset_flow(fd); @@ -398,7 +406,6 @@ int flow_accept(char ** ae_name, struct qos_spec * qos)                  return -1;          } -          if (ae_name != NULL) {                  *ae_name = strdup(recv_msg->ae_name);                  if (*ae_name == NULL) { @@ -452,40 +459,34 @@ int flow_alloc_resp(int fd, int response)          msg.port_id      = ai.flows[fd].port_id;          pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock);          msg.has_response = true;          msg.response     = response;          recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) { -                pthread_rwlock_unlock(&ai.data_lock); +        if (recv_msg == NULL)                  return -1; -        }          if (!recv_msg->has_result) { -                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ret = recv_msg->result; -        pthread_rwlock_wrlock(&ai.flows_lock); +        irm_msg__free_unpacked(recv_msg, NULL); + +        if (response) { +                pthread_rwlock_rdlock(&ai.data_lock); +                pthread_rwlock_wrlock(&ai.flows_lock); -        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, -                                            ai.flows[fd].port_id); -        if (ai.flows[fd].tx_rb == NULL) {                  reset_flow(fd); +                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); -                return -1;          } -        pthread_rwlock_unlock(&ai.flows_lock); -        pthread_rwlock_unlock(&ai.data_lock); - -        irm_msg__free_unpacked(recv_msg, NULL); -          return ret;  } @@ -535,9 +536,6 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)                  return -1;          } -        ai.flows[fd].port_id = recv_msg->port_id; -        ai.flows[fd].oflags  = FLOW_O_DEFAULT; -        ai.flows[fd].api     = recv_msg->api;          ai.flows[fd].rx_rb   = shm_rbuff_open(ai.api, recv_msg->port_id);          if (ai.flows[fd].rx_rb == NULL) {                  reset_flow(fd); @@ -548,6 +546,25 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)                  return -1;          } +        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); +        if (ai.flows[fd].set == NULL) { +                reset_flow(fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].port_id = recv_msg->port_id; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT; +        ai.flows[fd].api     = recv_msg->api; +          ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -582,22 +599,6 @@ int flow_alloc_res(int fd)          msg.port_id = ai.flows[fd].port_id; -        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, -                                            ai.flows[fd].port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -1; -        } - -        ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -1; -        } -          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -764,10 +765,14 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  }          } else { /* blocking */                  struct shm_rdrbuff * rdrb = ai.rdrb; -                pid_t                api  = ai.flows[fd].api; +                struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; +                pid_t api  = ai.flows[fd].api; +                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); +                assert(tx_rb); +                  idx = shm_rdrbuff_write_b(rdrb,                                            api,                                            DU_BUFF_HEADSPACE, @@ -775,15 +780,13 @@ ssize_t flow_write(int fd, void * buf, size_t count)                                            buf,                                            count); -                pthread_rwlock_rdlock(&ai.data_lock); -                pthread_rwlock_rdlock(&ai.flows_lock); - -                if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { -                        shm_rdrbuff_remove(ai.rdrb, idx); -                        pthread_rwlock_unlock(&ai.flows_lock); -                        pthread_rwlock_unlock(&ai.data_lock); +                if (shm_rbuff_write(tx_rb, idx) < 0) { +                        shm_rdrbuff_remove(rdrb, idx);                          return -ENOTALLOC;                  } + +                pthread_rwlock_rdlock(&ai.data_lock); +                pthread_rwlock_rdlock(&ai.flows_lock);          }          shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); @@ -993,7 +996,7 @@ int flow_event_wait(struct flow_set *       set,  {          ssize_t ret; -        if (set == NULL) +        if (set == NULL || fq == NULL)                  return -EINVAL;          if (fq->fqsize > 0) @@ -1038,6 +1041,22 @@ int np1_flow_alloc(pid_t n_api, int port_id)                  return -1;          } +        ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                reset_flow(fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].set = shm_flow_set_open(n_api); +        if (ai.flows[fd].set == NULL) { +                reset_flow(fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } +          ai.flows[fd].port_id = port_id;          ai.flows[fd].oflags  = FLOW_O_DEFAULT;          ai.flows[fd].api     = n_api; @@ -1066,8 +1085,7 @@ int np1_flow_dealloc(int port_id)          return fd;  } - -int np1_flow_resp(pid_t n_api, int port_id) +int np1_flow_resp(int port_id)  {          int fd; @@ -1077,28 +1095,6 @@ int np1_flow_resp(pid_t n_api, int port_id)          pthread_rwlock_wrlock(&ai.flows_lock);          fd = ai.ports[port_id].fd; -        if (fd < 0) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return fd; -        } - -        ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                port_destroy(&ai.ports[port_id]); -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -1; -        } - -        ai.flows[fd].set = shm_flow_set_open(n_api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -1; -        }          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -1166,21 +1162,47 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)          if (recv_msg == NULL)                  return -1; -        if (!recv_msg->has_port_id) { +        if (!recv_msg->has_port_id || !recv_msg->has_api) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } +        if (recv_msg->has_result && recv_msg->result) { +                   irm_msg__free_unpacked(recv_msg, NULL); +                   return -1; +        } +          port_id = recv_msg->port_id; -        irm_msg__free_unpacked(recv_msg, NULL); -        if (port_id < 0) +        if (port_id < 0) { +                irm_msg__free_unpacked(recv_msg, NULL);                  return -1; +        }          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_wrlock(&ai.flows_lock);          ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);          if (ai.flows[fd].rx_rb == NULL) { +                irm_msg__free_unpacked(recv_msg, NULL); +                reset_flow(fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id); +        if (ai.flows[fd].tx_rb == NULL) { +                irm_msg__free_unpacked(recv_msg, NULL); +                reset_flow(fd); +                port_destroy(&ai.ports[port_id]); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].set = shm_flow_set_open(recv_msg->api); +        if (ai.flows[fd].set == NULL) { +                irm_msg__free_unpacked(recv_msg, NULL);                  reset_flow(fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -1196,6 +1218,8 @@ int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name)          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); +        irm_msg__free_unpacked(recv_msg, NULL); +          return fd;  } @@ -1228,27 +1252,6 @@ int ipcp_flow_alloc_reply(int fd, int response)          ret = recv_msg->result; -        pthread_rwlock_wrlock(&ai.flows_lock); - -        ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, -                                            ai.flows[fd].port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                reset_flow(fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -1; -        } - -        ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); -        if (ai.flows[fd].set == NULL) { -                reset_flow(fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -1; -        } - -        pthread_rwlock_unlock(&ai.flows_lock); -          irm_msg__free_unpacked(recv_msg, NULL);          return ret; | 
