diff options
| author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-10-07 15:25:22 +0200 | 
|---|---|---|
| committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-10-07 15:42:44 +0200 | 
| commit | 71f10f5efab37f3df3d909d324cff2e098d21c85 (patch) | |
| tree | 90d6031870d02b557107b0bc2623a129c4b1d074 /src | |
| parent | aa0eac4f93b80537d02123715842d594a8ff3aad (diff) | |
| download | ouroboros-71f10f5efab37f3df3d909d324cff2e098d21c85.tar.gz ouroboros-71f10f5efab37f3df3d909d324cff2e098d21c85.zip | |
lib, dev: Add asynchronous deallocation
Flow deallocation from the application will immediately return (void
call). The IRMd will not send a reply message.
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/ipcp.c | 11 | ||||
| -rw-r--r-- | src/irmd/main.c | 45 | ||||
| -rw-r--r-- | src/lib/dev.c | 38 | ||||
| -rw-r--r-- | src/lib/sockets.c | 33 | 
4 files changed, 72 insertions, 55 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index db72b88d..a9f80ee7 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -313,7 +313,8 @@ void * ipcp_main_loop(void * o)                          }                          fd = np1_flow_alloc(msg->api, msg->port_id);                          if (fd < 0) { -                                LOG_ERR("Could not get fd for flow."); +                                LOG_ERR("Could not get fd for port_id. %d", +                                        msg->port_id);                                  ret_msg.has_result = true;                                  ret_msg.result = -1;                                  break; @@ -326,7 +327,7 @@ void * ipcp_main_loop(void * o)                                                              msg->src_ae_name,                                                              msg->qos_cube);                          if (ret_msg.result < 0) { -                                LOG_DBG("Deallocating failed flow on port_id %d.", +                                LOG_DBG("Deallocate failed on port_id %d.",                                          msg->port_id);                                  flow_dealloc(fd);                          } @@ -340,7 +341,8 @@ void * ipcp_main_loop(void * o)                          if (!msg->response) {                                  fd = np1_flow_resp(msg->api, msg->port_id);                                  if (fd < 0) { -                                        LOG_ERR("Could not get fd for flow."); +                                        LOG_ERR("Could not get fd for port_id %d.", +                                                msg->port_id);                                          ret_msg.has_result = true;                                          ret_msg.result = -1;                                          break; @@ -359,7 +361,8 @@ void * ipcp_main_loop(void * o)                          fd = np1_flow_dealloc(msg->port_id);                          if (fd < 0) { -                                LOG_ERR("Could not get fd for flow."); +                                LOG_ERR("Could not deallocate port_id %d.", +                                        msg->port_id);                                  ret_msg.has_result = true;                                  ret_msg.result = -1;                                  break; diff --git a/src/irmd/main.c b/src/irmd/main.c index 523741ef..24a49c49 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -587,8 +587,6 @@ static int bind_api(pid_t  api,          if (name == NULL)                  return -EINVAL; -        LOG_DBG("BIND_API called %d, %s", api, name); -          pthread_rwlock_rdlock(&irmd->state_lock);          if (irmd->state != IRMD_RUNNING) { @@ -1231,29 +1229,32 @@ static int flow_alloc_res(int port_id)  static int flow_dealloc(pid_t api, int port_id)  { -        pid_t n_1_api; +        pid_t n_1_api = -1;          int   ret = 0;          struct irm_flow * f = NULL;          pthread_rwlock_rdlock(&irmd->state_lock);          pthread_rwlock_wrlock(&irmd->flows_lock); -        bmp_release(irmd->port_ids, port_id);          f = get_irm_flow(port_id);          if (f == NULL) { +                bmp_release(irmd->port_ids, port_id);                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  return 0;          } -        n_1_api = f->n_1_api; +        if (api == f->n_api) { +                    bmp_release(irmd->port_ids, port_id); +                    n_1_api = f->n_1_api; +        }          list_del(&f->next);          pthread_rwlock_unlock(&irmd->flows_lock); -        if (api != n_1_api) +        if (n_1_api != -1)                  ret = ipcp_flow_dealloc(n_1_api, port_id);          pthread_rwlock_unlock(&irmd->state_lock); @@ -1772,8 +1773,7 @@ void * mainloop()                          break;                  case IRM_MSG_CODE__IRM_BOOTSTRAP_IPCP:                          ret_msg.has_result = true; -                        ret_msg.result = bootstrap_ipcp(msg->api, -                                                        msg->conf); +                        ret_msg.result = bootstrap_ipcp(msg->api, msg->conf);                          break;                  case IRM_MSG_CODE__IRM_ENROLL_IPCP:                          ret_msg.has_result = true; @@ -1790,27 +1790,22 @@ void * mainloop()                          break;                  case IRM_MSG_CODE__IRM_UNBIND_AP:                          ret_msg.has_result = true; -                        ret_msg.result = unbind_ap(msg->ap_name, -                                                   msg->dst_name); +                        ret_msg.result = unbind_ap(msg->ap_name, msg->dst_name);                          break;                  case IRM_MSG_CODE__IRM_API_ANNOUNCE:                          ret_msg.has_result = true; -                        ret_msg.result = api_announce(msg->api, -                                                      msg->ap_name); +                        ret_msg.result = api_announce(msg->api, msg->ap_name);                          break;                  case IRM_MSG_CODE__IRM_BIND_API:                          ret_msg.has_result = true; -                        ret_msg.result = bind_api(msg->api, -                                                  msg->dst_name); +                        ret_msg.result = bind_api(msg->api, msg->dst_name);                          break;                  case IRM_MSG_CODE__IRM_UNBIND_API:                          ret_msg.has_result = true; -                        ret_msg.result = unbind_api(msg->api, -                                                    msg->dst_name); +                        ret_msg.result = unbind_api(msg->api, msg->dst_name);                          break;                  case IRM_MSG_CODE__IRM_LIST_IPCPS: -                        ret_msg.n_apis = list_ipcps(msg->dst_name, -                                                    &apis); +                        ret_msg.n_apis = list_ipcps(msg->dst_name, &apis);                          ret_msg.apis = apis;                          ret_msg.has_result = true;                          break; @@ -1827,15 +1822,12 @@ void * mainloop()                                                      msg->n_dif_name);                          break;                  case IRM_MSG_CODE__IRM_FLOW_ACCEPT: -                        e = flow_accept(msg->api, -                                        &ret_msg.ae_name); - +                        e = flow_accept(msg->api, &ret_msg.ae_name);                          if (e == NULL) {                                  ret_msg.has_result = true;                                  ret_msg.result = -1;                                  break;                          } -                          ret_msg.has_port_id = true;                          ret_msg.port_id     = e->port_id;                          ret_msg.has_api     = true; @@ -1857,8 +1849,6 @@ void * mainloop()                                  ret_msg.result = -1;                                  break;                          } - -                        /* FIXME: badly timed dealloc may give SEGV */                          ret_msg.has_port_id = true;                          ret_msg.port_id     = e->port_id;                          ret_msg.has_api     = true; @@ -1869,9 +1859,10 @@ void * mainloop()                          ret_msg.result = flow_alloc_res(msg->port_id);                          break;                  case IRM_MSG_CODE__IRM_FLOW_DEALLOC: -                        ret_msg.has_result = true; -                        ret_msg.result = flow_dealloc(msg->api, msg->port_id); -                        break; +                        flow_dealloc(msg->api, msg->port_id); +                        irm_msg__free_unpacked(msg, NULL); +                        close(cli_sockfd); +                        continue;                  case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:                          e = flow_req_arr(msg->api,                                           msg->dst_name, diff --git a/src/lib/dev.c b/src/lib/dev.c index 8556d6e2..d36764ed 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -130,7 +130,7 @@ struct flow {          pid_t                 api; -        struct timespec       timeout; +        struct timespec *     timeout;  };  struct { @@ -220,8 +220,7 @@ int ap_init(char * ap_name)                  ai.flows[i].port_id = -1;                  ai.flows[i].oflags = 0;                  ai.flows[i].api = -1; -                ai.flows[i].timeout.tv_sec  = 0; -                ai.flows[i].timeout.tv_nsec = 0; +                ai.flows[i].timeout = NULL;          }          ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); @@ -270,9 +269,12 @@ void ap_fini()          pthread_rwlock_rdlock(&ai.flows_lock); -        for (i = 0; i < AP_MAX_FLOWS; ++i) +        for (i = 0; i < AP_MAX_FLOWS; ++i) {                  if (ai.flows[i].rb != NULL)                          shm_ap_rbuff_close(ai.flows[i].rb); +                if (ai.flows[i].timeout != NULL) +                        free(ai.flows[i].timeout); +        }          for (i = 0; i < IRMD_MAX_FLOWS; ++i) {                  ai.ports[i].state = PORT_NULL; @@ -527,8 +529,6 @@ int flow_alloc_res(int fd)  int flow_dealloc(int fd)  {          irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int ret = -1;          msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC;          msg.has_port_id  = true; @@ -552,30 +552,20 @@ int flow_dealloc(int fd)          shm_ap_rbuff_close(ai.flows[fd].rb);          ai.flows[fd].rb = NULL;          ai.flows[fd].api = -1; +        if (ai.flows[fd].timeout != NULL) { +                free(ai.flows[fd].timeout); +                ai.flows[fd].timeout = NULL; +        }          bmp_release(ai.fds, fd);          pthread_rwlock_unlock(&ai.flows_lock); -        recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) { -                pthread_rwlock_unlock(&ai.data_lock); -                return -1; -        } - -        if (!recv_msg->has_result) { -                pthread_rwlock_unlock(&ai.data_lock); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ret = recv_msg->result; +        send_irm_msg(&msg);          pthread_rwlock_unlock(&ai.data_lock); -        irm_msg__free_unpacked(recv_msg, NULL); - -        return ret; +        return 0;  }  int flow_cntl(int fd, int cmd, int oflags) @@ -708,10 +698,10 @@ ssize_t flow_read(int fd, void * buf, size_t count)          } else {                  struct shm_ap_rbuff * rb      = ai.rb;                  int                   port_id = ai.flows[fd].port_id; -                struct timespec       timeout = ai.flows[fd].timeout; +                struct timespec *     timeout = ai.flows[fd].timeout;                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); -                idx = shm_ap_rbuff_read_port_b(rb, port_id, &timeout); +                idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout);                  pthread_rwlock_rdlock(&ai.data_lock);          } diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 408e79e7..c8375c22 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -154,6 +154,39 @@ static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed)          return recv_msg;  } +void send_irm_msg(irm_msg_t * msg) +{ +        int sockfd; +        buffer_t buf; + +        sockfd = client_socket_open(IRM_SOCK_PATH); +        if (sockfd < 0) +                return; + +        buf.len = irm_msg__get_packed_size(msg); +        if (buf.len == 0) { +                close(sockfd); +                return; +        } + +        buf.data = malloc(buf.len); +        if (buf.data == NULL) { +                close(sockfd); +                return; +        } + +        pthread_cleanup_push(close_ptr, &sockfd); +        pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data); + +        irm_msg__pack(msg, buf.data); + +        if (write(sockfd, buf.data, buf.len) < 0) +                return; + +        pthread_cleanup_pop(true); +        pthread_cleanup_pop(true); +} +  irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)  { return send_recv_irm_msg_timed(msg, true); } | 
