diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-06 09:30:01 +0000 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-04-06 09:30:01 +0000 | 
| commit | 34ef0da6c1b3a3419dbdf2041ed1e3ba107dc915 (patch) | |
| tree | d8e793cffbe829d64855eaa5a429b90ebe3dc3a4 /src | |
| parent | c6ad4f96f8bb2f1ee749e92308e7173523ddd0b8 (diff) | |
| parent | e1c0714d5827cd927961f3a687d9720e6e9aa802 (diff) | |
| download | ouroboros-34ef0da6c1b3a3419dbdf2041ed1e3ba107dc915.tar.gz ouroboros-34ef0da6c1b3a3419dbdf2041ed1e3ba107dc915.zip | |
Merged in dstaesse/ouroboros/be-tim (pull request #464)
lib, irmd: Implement flow allocation timeout
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/ipcp.c | 4 | ||||
| -rw-r--r-- | src/irmd/api_table.c | 85 | ||||
| -rw-r--r-- | src/irmd/api_table.h | 2 | ||||
| -rw-r--r-- | src/irmd/ipcp.c | 14 | ||||
| -rw-r--r-- | src/irmd/irm_flow.c | 37 | ||||
| -rw-r--r-- | src/irmd/irm_flow.h | 5 | ||||
| -rw-r--r-- | src/irmd/main.c | 182 | ||||
| -rw-r--r-- | src/lib/dev.c | 31 | ||||
| -rw-r--r-- | src/lib/irm.c | 4 | ||||
| -rw-r--r-- | src/lib/irmd_messages.proto | 2 | ||||
| -rw-r--r-- | src/lib/sockets.c | 14 | 
11 files changed, 243 insertions, 137 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 4b7da030..f08e4ce7 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -296,7 +296,7 @@ static void * ipcp_main_loop(void * o)                  buffer.len = ipcp_msg__get_packed_size(&ret_msg);                  if (buffer.len == 0) { -                        log_err("Failed to send reply message"); +                        log_err("Failed to pack reply message");                          close(lsockfd);                          thread_inc();                          continue; @@ -304,6 +304,7 @@ static void * ipcp_main_loop(void * o)                  buffer.data = malloc(buffer.len);                  if (buffer.data == NULL) { +                        log_err("Failed to create reply buffer.");                          close(lsockfd);                          thread_inc();                          continue; @@ -312,6 +313,7 @@ static void * ipcp_main_loop(void * o)                  ipcp_msg__pack(&ret_msg, buffer.data);                  if (write(lsockfd, buffer.data, buffer.len) == -1) { +                        log_err("Failed to send reply message");                          free(buffer.data);                          close(lsockfd);                          thread_inc(); diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c index 5ff0fcf6..268f8231 100644 --- a/src/irmd/api_table.c +++ b/src/irmd/api_table.c @@ -31,14 +31,17 @@  #include <stdlib.h>  #include <unistd.h>  #include <limits.h> +#include <assert.h> -struct api_entry * api_entry_create(pid_t api, char * apn) +#define ENTRY_SLEEP_TIMEOUT 10 /* ms */ + +struct api_entry * api_entry_create(pid_t  api, +                                    char * apn)  {          struct api_entry * e;          pthread_condattr_t cattr; -        if (apn == NULL) -                return NULL; +        assert(apn);          e = malloc(sizeof(*e));          if (e == NULL) @@ -84,8 +87,7 @@ void api_entry_destroy(struct api_entry * e)          struct list_head * p;          struct list_head * h; -        if (e == NULL) -                return; +        assert(e);          pthread_mutex_lock(&e->state_lock); @@ -121,11 +123,13 @@ void api_entry_destroy(struct api_entry * e)          free(e);  } -int api_entry_add_name(struct api_entry * e, char * name) +int api_entry_add_name(struct api_entry * e, +                       char *             name)  {          struct str_el * s; -        if (e == NULL || name == NULL) -                return -EINVAL; + +        assert(e); +        assert(name);          s = malloc(sizeof(*s));          if (s == NULL) @@ -137,11 +141,15 @@ int api_entry_add_name(struct api_entry * e, char * name)          return 0;  } -void api_entry_del_name(struct api_entry * e, char * name) +void api_entry_del_name(struct api_entry * e, +                        char *             name)  {          struct list_head * p = NULL;          struct list_head * h = NULL; +        assert(e); +        assert(name); +          list_for_each_safe(p, h, &e->names) {                  struct str_el * s = list_entry(p, struct str_el, next);                  if (!wildcard_match(name, s->str)) { @@ -153,31 +161,34 @@ void api_entry_del_name(struct api_entry * e, char * name)          }  } +void api_entry_cancel(struct api_entry * e) +{ +        pthread_mutex_lock(&e->state_lock); + +        e->state = API_INIT; +        pthread_cond_broadcast(&e->state_cond); + +        pthread_mutex_unlock(&e->state_lock); +} +  int api_entry_sleep(struct api_entry * e)  { -        struct timespec timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), -                                   (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION}; +        struct timespec timeout = {(ENTRY_SLEEP_TIMEOUT / 1000), +                                   (ENTRY_SLEEP_TIMEOUT % 1000) * MILLION};          struct timespec now;          struct timespec dl;          int ret = 0; -        if (e == NULL) -                return -EINVAL; - -        e->re = NULL; +        assert(e);          clock_gettime(PTHREAD_COND_CLOCK, &now); -          ts_add(&now, &timeout, &dl);          pthread_mutex_lock(&e->state_lock); -        if (e->state != API_INIT) { -                pthread_mutex_unlock(&e->state_lock); -                return -EINVAL; -        } -        e->state = API_SLEEP; +        if (e->state != API_WAKE && e->state != API_DESTROY) +                e->state = API_SLEEP;          while (e->state == API_SLEEP && ret != -ETIMEDOUT)                  ret = -pthread_cond_timedwait(&e->state_cond, @@ -190,17 +201,20 @@ int api_entry_sleep(struct api_entry * e)                  ret = -1;          } -        e->state = API_INIT; +        if (ret != -ETIMEDOUT) +                e->state = API_INIT; +          pthread_cond_broadcast(&e->state_cond);          pthread_mutex_unlock(&e->state_lock);          return ret;  } -void api_entry_wake(struct api_entry * e, struct reg_entry * re) +void api_entry_wake(struct api_entry * e, +                    struct reg_entry * re)  { -        if (e == NULL) -                return; +        assert(e); +        assert(re);          pthread_mutex_lock(&e->state_lock); @@ -217,24 +231,32 @@ void api_entry_wake(struct api_entry * e, struct reg_entry * re)          while (e->state == API_WAKE)                  pthread_cond_wait(&e->state_cond, &e->state_lock); +        if (e->state == API_DESTROY) +                e->state = API_INIT; +          pthread_mutex_unlock(&e->state_lock);  } -int api_table_add(struct list_head * api_table, struct api_entry * e) +int api_table_add(struct list_head * api_table, +                  struct api_entry * e)  { -        if (api_table == NULL || e == NULL) -                return -EINVAL; + +        assert(api_table); +        assert(e);          list_add(&e->next, api_table);          return 0;  } -void api_table_del(struct list_head * api_table, pid_t api) +void api_table_del(struct list_head * api_table, +                   pid_t              api)  {          struct list_head * p;          struct list_head * h; +        assert(api_table); +          list_for_each_safe(p, h, api_table) {                  struct api_entry * e = list_entry(p, struct api_entry, next);                  if (api == e->api) { @@ -244,10 +266,13 @@ void api_table_del(struct list_head * api_table, pid_t api)          }  } -struct api_entry * api_table_get(struct list_head * api_table, pid_t api) +struct api_entry * api_table_get(struct list_head * api_table, +                                 pid_t              api)  {          struct list_head * h; +        assert(api_table); +          list_for_each(h, api_table) {                  struct api_entry * e = list_entry(h, struct api_entry, next);                  if (api == e->api) diff --git a/src/irmd/api_table.h b/src/irmd/api_table.h index c7998c7f..f9c4d0aa 100644 --- a/src/irmd/api_table.h +++ b/src/irmd/api_table.h @@ -61,6 +61,8 @@ int                api_entry_sleep(struct api_entry * e);  void               api_entry_wake(struct api_entry * e,                                    struct reg_entry * re); +void               api_entry_cancel(struct api_entry * e); +  int                api_entry_add_name(struct api_entry * e,                                        char *             name); diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index a8263580..eb0c2de0 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -53,10 +53,12 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t        api,         char * sock_path = NULL;         ssize_t count = 0;         ipcp_msg_t * recv_msg = NULL; -         struct timeval tv = {(SOCKET_TIMEOUT / 1000),                              (SOCKET_TIMEOUT % 1000) * 1000}; +       if (kill(api, 0) < 0) +               return NULL; +         sock_path = ipcp_sock_path(api);         if (sock_path == NULL)                 return NULL; @@ -67,10 +69,6 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t        api,                 return NULL;         } -       if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, -                      (void *) &tv, sizeof(tv))) -               log_warn("Failed to set timeout on socket."); -         free(sock_path);         buf.len = ipcp_msg__get_packed_size(msg); @@ -85,6 +83,10 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t        api,                 return NULL;         } +       if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, +                      (void *) &tv, sizeof(tv))) +               log_warn("Failed to set timeout on socket."); +         pthread_cleanup_push(close_ptr, (void *) &sockfd);         pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data); @@ -184,7 +186,7 @@ int ipcp_destroy(pid_t api)          return 0;  } -int ipcp_bootstrap(pid_t api, +int ipcp_bootstrap(pid_t              api,                     dif_config_msg_t * conf)  {          ipcp_msg_t msg = IPCP_MSG__INIT; diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index 7a02b01a..8b85f36f 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -23,7 +23,9 @@  #define OUROBOROS_PREFIX "irm_flow"  #include <ouroboros/config.h> +#include <ouroboros/errno.h>  #include <ouroboros/logs.h> +#include <ouroboros/time_utils.h>  #include "irm_flow.h" @@ -142,31 +144,52 @@ void irm_flow_set_state(struct irm_flow * f,          pthread_mutex_unlock(&f->state_lock);  } -enum flow_state irm_flow_wait_state(struct irm_flow * f, -                                    enum flow_state   state) +int irm_flow_wait_state(struct irm_flow * f, +                        enum flow_state   state, +                        struct timespec * timeo)  { +        int ret = 0; +        int s; + +        struct timespec dl; +          assert(f);          assert(state != FLOW_NULL);          assert(state != FLOW_DESTROY);          assert(state != FLOW_DEALLOC_PENDING); +        if (timeo != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, timeo, &dl); +        } +          pthread_mutex_lock(&f->state_lock);          assert(f->state != FLOW_NULL);          while (!(f->state == state ||                   f->state == FLOW_DESTROY || -                 f->state == FLOW_DEALLOC_PENDING)) -                pthread_cond_wait(&f->state_cond, &f->state_lock); +                 f->state == FLOW_DEALLOC_PENDING) && +               ret != -ETIMEDOUT) { +                if (timeo == NULL) +                        ret = -pthread_cond_wait(&f->state_cond, +                                                 &f->state_lock); +                else +                        ret = -pthread_cond_timedwait(&f->state_cond, +                                                      &f->state_lock, +                                                      &dl); +        } -        if (f->state == FLOW_DESTROY || f->state == FLOW_DEALLOC_PENDING) { +        if (f->state == FLOW_DESTROY || +            f->state == FLOW_DEALLOC_PENDING || +            ret == -ETIMEDOUT) {                  f->state = FLOW_NULL;                  pthread_cond_broadcast(&f->state_cond);          } -        state = f->state; +        s = f->state;          pthread_mutex_unlock(&f->state_lock); -        return state; +        return ret ? ret : s;  } diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index 97770117..8902a6ab 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -71,7 +71,8 @@ enum flow_state   irm_flow_get_state(struct irm_flow * f);  void              irm_flow_set_state(struct irm_flow * f,                                       enum flow_state   state); -enum flow_state   irm_flow_wait_state(struct irm_flow * f, -                                      enum flow_state   state); +int               irm_flow_wait_state(struct irm_flow * f, +                                      enum flow_state   state, +                                      struct timespec * timeo);  #endif /* OUROBOROS_IRMD_IRM_FLOW_H */ diff --git a/src/irmd/main.c b/src/irmd/main.c index 673e39ea..41beb049 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -983,23 +983,33 @@ static int api_announce(pid_t  api,          return 0;  } -static struct irm_flow * flow_accept(pid_t api) +static int flow_accept(pid_t              api, +                       struct timespec *  timeo, +                       struct irm_flow ** fl)  { -        struct irm_flow *  f  = NULL; +        struct irm_flow  * f  = NULL;          struct api_entry * e  = NULL;          struct reg_entry * re = NULL;          struct list_head * p  = NULL; +        struct timespec dl; +        struct timespec now; +          pid_t api_n1;          pid_t api_n;          int   port_id;          int   ret; +        if (timeo != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &now); +                ts_add(&now, timeo, &dl); +        } +          pthread_rwlock_rdlock(&irmd.state_lock);          if (irmd.state != IRMD_RUNNING) {                  pthread_rwlock_unlock(&irmd.state_lock); -                return NULL; +                return -EIRMD;          }          pthread_rwlock_wrlock(&irmd.reg_lock); @@ -1010,7 +1020,7 @@ static struct irm_flow * flow_accept(pid_t api)                  pthread_rwlock_unlock(&irmd.reg_lock);                  pthread_rwlock_unlock(&irmd.state_lock);                  log_err("Unknown instance %d calling accept.", api); -                return NULL; +                return -EINVAL;          }          log_dbg("New instance (%d) of %s added.", api, e->apn); @@ -1027,18 +1037,33 @@ static struct irm_flow * flow_accept(pid_t api)          pthread_rwlock_unlock(&irmd.reg_lock);          pthread_rwlock_unlock(&irmd.state_lock); -        while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) { +        while (true) { +                if (timeo != NULL && ts_diff_ns(&now, &dl) < 0) { +                        log_dbg("Accept timed out."); +                        return -ETIMEDOUT; +                } +                  pthread_rwlock_rdlock(&irmd.state_lock); +                  if (irmd.state != IRMD_RUNNING) {                          pthread_rwlock_unlock(&irmd.state_lock); -                        return NULL; +                        return -EIRMD;                  } +                  pthread_rwlock_unlock(&irmd.state_lock); -        } -        if (ret == -1) { -                /* The process died, we can exit here. */ -                return NULL; +                ret = api_entry_sleep(e); +                if (ret == -ETIMEDOUT) { +                        clock_gettime(PTHREAD_COND_CLOCK, &now); +                        api_entry_cancel(e); +                        continue; +                } + +                if (ret == -1) +                        return -EPIPE; + +                if (ret == 0) +                        break;          }          pthread_rwlock_rdlock(&irmd.state_lock); @@ -1046,7 +1071,7 @@ static struct irm_flow * flow_accept(pid_t api)          if (irmd.state != IRMD_RUNNING) {                  reg_entry_set_state(re, REG_NAME_NULL);                  pthread_rwlock_unlock(&irmd.state_lock); -                return NULL; +                return -EIRMD;          }          pthread_rwlock_rdlock(&irmd.flows_lock); @@ -1056,7 +1081,7 @@ static struct irm_flow * flow_accept(pid_t api)                  pthread_rwlock_unlock(&irmd.flows_lock);                  pthread_rwlock_unlock(&irmd.state_lock);                  log_warn("Port_id was not created yet."); -                return NULL; +                return -EPERM;          }          api_n   = f->n_api; @@ -1079,7 +1104,7 @@ static struct irm_flow * flow_accept(pid_t api)                  irm_flow_set_state(f, FLOW_NULL);                  irm_flow_destroy(f);                  log_dbg("Process gone while accepting flow."); -                return NULL; +                return -EPERM;          }          pthread_mutex_lock(&e->state_lock); @@ -1100,7 +1125,7 @@ static struct irm_flow * flow_accept(pid_t api)                  irm_flow_set_state(f, FLOW_NULL);                  irm_flow_destroy(f);                  log_err("Entry in wrong state."); -                return NULL; +                return -EPERM;          }          registry_del_api(&irmd.registry, api); @@ -1118,29 +1143,34 @@ static struct irm_flow * flow_accept(pid_t api)                  clear_irm_flow(f);                  irm_flow_set_state(f, FLOW_NULL);                  irm_flow_destroy(f); -                return NULL; +                return -EPERM;          }          irm_flow_set_state(f, FLOW_ALLOCATED);          log_info("Flow on port_id %d allocated.", f->port_id); -        return f; +        *fl = f; + +        return 0;  } -static struct irm_flow * flow_alloc(pid_t     api, -                                    char *    dst_name, -                                    qoscube_t cube) +static int flow_alloc(pid_t              api, +                      char *             dst_name, +                      qoscube_t          cube, +                      struct timespec *  timeo, +                      struct irm_flow ** e)  {          struct irm_flow * f; -        pid_t ipcp; -        int port_id; +        pid_t             ipcp; +        int               port_id; +        int               state;          pthread_rwlock_rdlock(&irmd.state_lock);          if (irmd.state != IRMD_RUNNING) {                  pthread_rwlock_unlock(&irmd.state_lock); -                return NULL; +                return -1;          }          pthread_rwlock_rdlock(&irmd.reg_lock); @@ -1150,7 +1180,7 @@ static struct irm_flow * flow_alloc(pid_t     api,                  pthread_rwlock_unlock(&irmd.reg_lock);                  pthread_rwlock_unlock(&irmd.state_lock);                  log_info("Destination unreachable."); -                return NULL; +                return -1;          }          pthread_rwlock_unlock(&irmd.reg_lock); @@ -1160,7 +1190,7 @@ static struct irm_flow * flow_alloc(pid_t     api,                  pthread_rwlock_unlock(&irmd.flows_lock);                  pthread_rwlock_unlock(&irmd.state_lock);                  log_err("Could not allocate port_id."); -                return NULL; +                return -EBADF;          }          f = irm_flow_create(api, ipcp, port_id, cube); @@ -1169,7 +1199,7 @@ static struct irm_flow * flow_alloc(pid_t     api,                  pthread_rwlock_unlock(&irmd.flows_lock);                  pthread_rwlock_unlock(&irmd.state_lock);                  log_err("Could not allocate port_id."); -                return NULL; +                return -ENOMEM;          }          list_add(&f->next, &irmd.irm_flows); @@ -1179,22 +1209,30 @@ static struct irm_flow * flow_alloc(pid_t     api,          assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); -        if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) { +        if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube)) {                  /* sanitizer cleans this */ -                log_info("Failed to respond to alloc."); -                return NULL; +                log_info("Flow_allocation failed."); +                return -EAGAIN;          } -        if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) { -                log_info("Pending flow on port_id %d torn down.", port_id); -                return NULL; +        state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo); +        if (state != FLOW_ALLOCATED) { +                if (state == -ETIMEDOUT) { +                        log_dbg("Flow allocation timed out"); +                        return -ETIMEDOUT; +                } + +                log_info("Pending flow to %s torn down.", dst_name); +                return -EPIPE;          }          assert(irm_flow_get_state(f) == FLOW_ALLOCATED); +        *e = f; +          log_info("Flow on port_id %d allocated.", port_id); -        return f; +        return 0;  }  static int flow_dealloc(pid_t api, @@ -1382,7 +1420,6 @@ static struct irm_flow * flow_req_arr(pid_t     api,                  return NULL;          } -          pthread_rwlock_unlock(&irmd.reg_lock);          pthread_rwlock_wrlock(&irmd.flows_lock);          port_id = bmp_allocate(irmd.port_ids); @@ -1798,15 +1835,17 @@ void * mainloop(void * o)                  struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),                                            (IRMD_ACCEPT_TIMEOUT % 1000) * 1000};  #endif -                int cli_sockfd; -                irm_msg_t * msg; -                ssize_t count; -                buffer_t buffer; -                irm_msg_t ret_msg = IRM_MSG__INIT; -                struct irm_flow * e = NULL; -                pid_t * apis = NULL; -                struct timeval tv = {(SOCKET_TIMEOUT / 1000), -                                     (SOCKET_TIMEOUT % 1000) * 1000}; +                int               cli_sockfd; +                irm_msg_t *       msg; +                ssize_t           count; +                buffer_t          buffer; +                irm_msg_t         ret_msg = IRM_MSG__INIT; +                struct irm_flow * e       = NULL; +                pid_t *           apis    = NULL; +                struct timespec * timeo   = NULL; +                struct timespec   ts      = {0, 0}; +                struct timeval    tv      = {(SOCKET_TIMEOUT / 1000), +                                             (SOCKET_TIMEOUT % 1000) * 1000};                  pthread_rwlock_rdlock(&irmd.state_lock); @@ -1849,6 +1888,14 @@ void * mainloop(void * o)                  thread_dec(); +                if (msg->has_timeo_sec) { +                        assert(msg->has_timeo_nsec); + +                        ts.tv_sec  = msg->timeo_sec; +                        ts.tv_nsec = msg->timeo_nsec; +                        timeo = &ts; +                } +                  switch (msg->code) {                  case IRM_MSG_CODE__IRM_CREATE_IPCP:                          ret_msg.has_result = true; @@ -1897,9 +1944,9 @@ void * mainloop(void * o)                          ret_msg.result = unbind_api(msg->api, msg->dst_name);                          break;                  case IRM_MSG_CODE__IRM_LIST_IPCPS: +                        ret_msg.has_result = true;                          ret_msg.n_apis = list_ipcps(msg->dst_name, &apis);                          ret_msg.apis = apis; -                        ret_msg.has_result = true;                          break;                  case IRM_MSG_CODE__IRM_REG:                          ret_msg.has_result = true; @@ -1914,32 +1961,27 @@ void * mainloop(void * o)                                                      msg->n_dif_name);                          break;                  case IRM_MSG_CODE__IRM_FLOW_ACCEPT: -                        e = flow_accept(msg->api); -                        if (e == NULL) { -                                ret_msg.has_result = true; -                                ret_msg.result = -EIRMD; -                                break; +                        ret_msg.has_result = true; +                        ret_msg.result = flow_accept(msg->api, timeo, &e); +                        if (ret_msg.result == 0) { +                                ret_msg.has_port_id = true; +                                ret_msg.port_id     = e->port_id; +                                ret_msg.has_api     = true; +                                ret_msg.api         = e->n_1_api; +                                ret_msg.has_qoscube = true; +                                ret_msg.qoscube     = e->qc;                          } -                        ret_msg.has_port_id = true; -                        ret_msg.port_id     = e->port_id; -                        ret_msg.has_api     = true; -                        ret_msg.api         = e->n_1_api; -                        ret_msg.has_qoscube = true; -                        ret_msg.qoscube     = e->qc;                          break;                  case IRM_MSG_CODE__IRM_FLOW_ALLOC: -                        e = flow_alloc(msg->api, -                                       msg->dst_name, -                                       msg->qoscube); -                        if (e == NULL) { -                                ret_msg.has_result = true; -                                ret_msg.result = -1; -                                break; +                        ret_msg.has_result = true; +                        ret_msg.result = flow_alloc(msg->api, msg->dst_name, +                                                    msg->qoscube, timeo, &e); +                        if (ret_msg.result == 0) { +                                ret_msg.has_port_id = true; +                                ret_msg.port_id     = e->port_id; +                                ret_msg.has_api     = true; +                                ret_msg.api         = e->n_1_api;                          } -                        ret_msg.has_port_id = true; -                        ret_msg.port_id     = e->port_id; -                        ret_msg.has_api     = true; -                        ret_msg.api         = e->n_1_api;                          break;                  case IRM_MSG_CODE__IRM_FLOW_DEALLOC:                          ret_msg.has_result = true; @@ -1949,8 +1991,8 @@ void * mainloop(void * o)                          e = flow_req_arr(msg->api,                                           msg->dst_name,                                           msg->qoscube); +                        ret_msg.has_result = true;                          if (e == NULL) { -                                ret_msg.has_result = true;                                  ret_msg.result = -1;                                  break;                          } @@ -1971,6 +2013,12 @@ void * mainloop(void * o)                  irm_msg__free_unpacked(msg, NULL); +                if (ret_msg.result == -EPIPE || !ret_msg.has_result) { +                        close(cli_sockfd); +                        thread_inc(); +                        continue; +                } +                  buffer.len = irm_msg__get_packed_size(&ret_msg);                  if (buffer.len == 0) {                          log_err("Failed to calculate length of reply message."); @@ -2065,7 +2113,7 @@ void * threadpoolmgr(void * o)                  if (pthread_cond_timedwait(&irmd.threads_cond,                                             &irmd.threads_lock,                                             &dl) == ETIMEDOUT) -                        if (irmd.threads > IRMD_MIN_AV_THREADS) +                        if (irmd.threads > IRMD_MIN_AV_THREADS )                                  --irmd.max_threads;                  pthread_mutex_unlock(&irmd.threads_lock); diff --git a/src/lib/dev.c b/src/lib/dev.c index c063fd47..389ff278 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -276,7 +276,7 @@ int ap_init(const char * ap_name)                  shm_flow_set_destroy(ai.fqset);                  bmp_destroy(ai.fqueues);                  bmp_destroy(ai.fds); -                return -1; +                return -EIRMD;          }          ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS); @@ -393,9 +393,9 @@ int flow_accept(qosspec_t *       qs,          if (timeo != NULL) {                  msg.has_timeo_sec = true; -                msg.has_timeo_usec = true; +                msg.has_timeo_nsec = true;                  msg.timeo_sec  = timeo->tv_sec; -                msg.timeo_usec = timeo->tv_nsec / 1000; +                msg.timeo_nsec = timeo->tv_nsec;          }          pthread_rwlock_rdlock(&ai.data_lock); @@ -404,15 +404,21 @@ int flow_accept(qosspec_t *       qs,          pthread_rwlock_unlock(&ai.data_lock); -        recv_msg = send_recv_irm_msg_b(&msg); +        recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -EIRMD; -        if (recv_msg->has_result) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -EIRMD;          } +        if (recv_msg->result !=  0) { +                int res =  recv_msg->result; +                irm_msg__free_unpacked(recv_msg, NULL); +                return res; +        } +          if (!recv_msg->has_api || !recv_msg->has_port_id) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1; @@ -496,9 +502,9 @@ int flow_alloc(const char *      dst_name,          if (timeo != NULL) {                  msg.has_timeo_sec = true; -                msg.has_timeo_usec = true; +                msg.has_timeo_nsec = true;                  msg.timeo_sec  = timeo->tv_sec; -                msg.timeo_usec = timeo->tv_nsec / 1000; +                msg.timeo_nsec = timeo->tv_nsec;          }          pthread_rwlock_rdlock(&ai.data_lock); @@ -511,6 +517,17 @@ int flow_alloc(const char *      dst_name,          if (recv_msg == NULL)                  return -EIRMD; +        if (!recv_msg->has_result) { +                irm_msg__free_unpacked(recv_msg, NULL); +                return -EIRMD; +        } + +        if (recv_msg->result !=  0) { +                int res =  recv_msg->result; +                irm_msg__free_unpacked(recv_msg, NULL); +                return res; +        } +          if (!recv_msg->has_api || !recv_msg->has_port_id) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1; diff --git a/src/lib/irm.c b/src/lib/irm.c index 0e4bfc40..57e09369 100644 --- a/src/lib/irm.c +++ b/src/lib/irm.c @@ -177,10 +177,8 @@ ssize_t irm_list_ipcps(const char * name,          msg.dst_name = (char *) name;          recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) { -                free(msg.dif_name); +        if (recv_msg == NULL)                  return -EIRMD; -        }          if (recv_msg->apis == NULL) {                  irm_msg__free_unpacked(recv_msg, NULL); diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 4fbd676e..e218f6f6 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -62,6 +62,6 @@ message irm_msg {          optional uint32 opts         = 12;          repeated sint32 apis         = 13;          optional uint32 timeo_sec    = 14; -        optional uint32 timeo_usec   = 15; +        optional uint32 timeo_nsec   = 15;          optional sint32 result       = 16;  }; diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 3a26a2cf..63f928cf 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -95,23 +95,17 @@ static void close_ptr(void * o)          close(*(int *) o);  } -static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed) +irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)  {          int sockfd;          buffer_t buf;          ssize_t count = 0;          irm_msg_t * recv_msg = NULL; -        struct timeval tv = {(SOCKET_TIMEOUT / 1000), -                             (SOCKET_TIMEOUT % 1000) * 1000};          sockfd = client_socket_open(IRM_SOCK_PATH);          if (sockfd < 0)                  return NULL; -        if (timed) -                setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, -                           (void *) &tv, sizeof(tv)); -          buf.len = irm_msg__get_packed_size(msg);          if (buf.len == 0) {                  close(sockfd); @@ -141,12 +135,6 @@ static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed)          return recv_msg;  } -irm_msg_t * send_recv_irm_msg(irm_msg_t * msg) -{ return send_recv_irm_msg_timed(msg, true); } - -irm_msg_t * send_recv_irm_msg_b(irm_msg_t * msg) -{ return send_recv_irm_msg_timed(msg, false); } -  char * ipcp_sock_path(pid_t api)  {          char * full_name = NULL; | 
