diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 23 | ||||
| -rw-r--r-- | src/irmd/ipcp.c | 66 | ||||
| -rw-r--r-- | src/irmd/main.c | 25 | 
3 files changed, 80 insertions, 34 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 74618658..954ca670 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -813,7 +813,14 @@ static void lookup_new_addrs(struct lookup * lu,  static enum lookup_state lookup_wait(struct lookup * lu)  { +        struct timespec   timeo = {KAD_T_RESP, 0}; +        struct timespec   abs;          enum lookup_state state; +        int               ret = 0; + +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        ts_add(&abs, &timeo, &abs);          pthread_mutex_lock(&lu->lock); @@ -823,11 +830,14 @@ static enum lookup_state lookup_wait(struct lookup * lu)          pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu);          while (lu->state == LU_PENDING) -                pthread_cond_wait(&lu->cond, &lu->lock); +                ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs);          pthread_cleanup_pop(false); -        state = lu->state; +        if (ret == -ETIMEDOUT) +                state = LU_COMPLETE; +        else +                state = lu->state;          pthread_mutex_unlock(&lu->lock); @@ -1483,6 +1493,7 @@ static struct lookup * kad_lookup(struct dht *    dht,          uint64_t          addrs[KAD_ALPHA + 1];          enum lookup_state state;          struct lookup *   lu; +        size_t            out = 0;          lu = lookup_create(dht, id);          if (lu == NULL) @@ -1498,7 +1509,8 @@ static struct lookup * kad_lookup(struct dht *    dht,                  return NULL;          } -        if (kad_find(dht, id, addrs, code) == 0) { +        out += kad_find(dht, id, addrs, code); +        if (out == 0) {                  pthread_rwlock_wrlock(&dht->lock);                  list_del(&lu->next);                  pthread_rwlock_unlock(&dht->lock); @@ -1507,17 +1519,18 @@ static struct lookup * kad_lookup(struct dht *    dht,          }          while ((state = lookup_wait(lu)) != LU_COMPLETE) { +                --out;                  switch (state) {                  case LU_UPDATE:                          lookup_new_addrs(lu, addrs); -                        if (addrs[0] == 0) { +                        if (addrs[0] == 0 && out == 0) {                                  pthread_rwlock_wrlock(&dht->lock);                                  list_del(&lu->next);                                  pthread_rwlock_unlock(&dht->lock);                                  return lu;                          } -                        kad_find(dht, id, addrs, code); +                        out += kad_find(dht, id, addrs, code);                          break;                  case LU_DESTROY:                          pthread_rwlock_wrlock(&dht->lock); diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 28e91b18..bf71bc3d 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -48,13 +48,12 @@ static void close_ptr(void * o)  ipcp_msg_t * send_recv_ipcp_msg(pid_t        api,                                  ipcp_msg_t * msg)  { -       int sockfd = 0; -       buffer_t buf; -       char * sock_path = NULL; -       ssize_t count = 0; -       ipcp_msg_t * recv_msg = NULL; -       struct timeval tv = {(SOCKET_TIMEOUT / 1000), -                            (SOCKET_TIMEOUT % 1000) * 1000}; +       int            sockfd    = 0; +       buffer_t       buf; +       char *         sock_path = NULL; +       ssize_t        count     = 0; +       ipcp_msg_t *   recv_msg  = NULL; +       struct timeval tv;         if (kill(api, 0) < 0)                 return NULL; @@ -83,6 +82,29 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t        api,                 return NULL;         } +       switch (msg->code) { +       case IPCP_MSG_CODE__IPCP_BOOTSTRAP: +               tv.tv_sec  = BOOTSTRAP_TIMEOUT / 1000; +               tv.tv_usec = (BOOTSTRAP_TIMEOUT % 1000) * 1000; +               break; +       case IPCP_MSG_CODE__IPCP_ENROLL: +               tv.tv_sec  = ENROLL_TIMEOUT / 1000; +               tv.tv_usec = (ENROLL_TIMEOUT % 1000) * 1000; +               break; +       case IPCP_MSG_CODE__IPCP_REG: +               tv.tv_sec  = REG_TIMEOUT / 1000; +               tv.tv_usec = (REG_TIMEOUT % 1000) * 1000; +               break; +       case IPCP_MSG_CODE__IPCP_QUERY: +               tv.tv_sec  = QUERY_TIMEOUT / 1000; +               tv.tv_usec = (QUERY_TIMEOUT % 1000) * 1000; +               break; +       default: +               tv.tv_sec  = SOCKET_TIMEOUT / 1000; +               tv.tv_usec = (SOCKET_TIMEOUT % 1000) * 1000; +               break; +       } +         if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,                        (void *) &tv, sizeof(tv)))                 log_warn("Failed to set timeout on socket."); @@ -187,9 +209,9 @@ int ipcp_destroy(pid_t api)  int ipcp_bootstrap(pid_t              api,                     ipcp_config_msg_t * conf)  { -        ipcp_msg_t msg = IPCP_MSG__INIT; +        ipcp_msg_t   msg      = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL; -        int ret = -1; +        int          ret      = -1;          if (conf == NULL)                  return -EINVAL; @@ -223,7 +245,7 @@ int ipcp_enroll(pid_t             api,          if (dst == NULL)                  return -EINVAL; -        msg.code = IPCP_MSG_CODE__IPCP_ENROLL; +        msg.code     = IPCP_MSG_CODE__IPCP_ENROLL;          msg.dst_name = (char *) dst;          recv_msg = send_recv_ipcp_msg(api, &msg); @@ -259,9 +281,9 @@ int ipcp_reg(pid_t           api,               const uint8_t * hash,               size_t          len)  { -        ipcp_msg_t msg = IPCP_MSG__INIT; +        ipcp_msg_t   msg      = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL; -        int ret = -1; +        int          ret      = -1;          assert(hash); @@ -289,9 +311,9 @@ int ipcp_unreg(pid_t           api,                 const uint8_t * hash,                 size_t          len)  { -        ipcp_msg_t msg = IPCP_MSG__INIT; +        ipcp_msg_t   msg      = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL; -        int ret = -1; +        int          ret      = -1;          msg.code      = IPCP_MSG_CODE__IPCP_UNREG;          msg.has_hash  = true; @@ -317,9 +339,9 @@ int ipcp_query(pid_t           api,                 const uint8_t * hash,                 size_t          len)  { -        ipcp_msg_t msg = IPCP_MSG__INIT; +        ipcp_msg_t   msg      = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL; -        int ret = -1; +        int          ret      = -1;          msg.code      = IPCP_MSG_CODE__IPCP_QUERY;          msg.has_hash  = true; @@ -348,9 +370,9 @@ int ipcp_flow_alloc(pid_t           api,                      size_t          len,                      qoscube_t       cube)  { -        ipcp_msg_t msg = IPCP_MSG__INIT; +        ipcp_msg_t   msg      = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL; -        int ret = -1; +        int          ret      = -1;          assert(dst); @@ -385,9 +407,9 @@ int ipcp_flow_alloc_resp(pid_t api,                           pid_t n_api,                           int   response)  { -        ipcp_msg_t msg = IPCP_MSG__INIT; +        ipcp_msg_t   msg      = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL; -        int ret = -1; +        int          ret      = -1;          msg.code         = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP;          msg.has_port_id  = true; @@ -415,9 +437,9 @@ int ipcp_flow_alloc_resp(pid_t api,  int ipcp_flow_dealloc(pid_t api,                        int   port_id)  { -        ipcp_msg_t msg = IPCP_MSG__INIT; +        ipcp_msg_t   msg      = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL; -        int ret = -1; +        int          ret      = -1;          msg.code        = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC;          msg.has_port_id = true; diff --git a/src/irmd/main.c b/src/irmd/main.c index 96b0b729..3f83ab2c 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -222,7 +222,8 @@ static struct ipcp_entry * get_ipcp_entry_by_name(const char * name)          return NULL;  } -static struct ipcp_entry * get_ipcp_by_dst_name(const char * name) +static struct ipcp_entry * get_ipcp_by_dst_name(const char * name, +                                                pid_t        src)  {          struct list_head * p;          struct list_head * h; @@ -233,7 +234,7 @@ static struct ipcp_entry * get_ipcp_by_dst_name(const char * name)          list_for_each_safe(p, h, &irmd.ipcps) {                  struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next); -                if (e->dif_name == NULL) +                if (e->dif_name == NULL || e->api == src)                          continue;                  hash = malloc(IPCP_HASH_LEN(e)); @@ -1103,7 +1104,7 @@ static int flow_alloc(pid_t              api,          int                 state;          uint8_t *           hash; -        ipcp = get_ipcp_by_dst_name(dst); +        ipcp = get_ipcp_by_dst_name(dst, api);          if (ipcp == NULL) {                  log_info("Destination %s unreachable.", dst);                  return -1; @@ -1199,7 +1200,7 @@ static int flow_dealloc(pid_t api,          if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) {                  list_del(&f->next);                  if ((kill(f->n_api, 0) < 0 && f->n_1_api == -1) || -                    (kill (f->n_1_api, 0) < 0 && f->n_api == -1)) +                    (kill(f->n_1_api, 0) < 0 && f->n_api == -1))                          irm_flow_set_state(f, FLOW_NULL);                  clear_irm_flow(f);                  irm_flow_destroy(f); @@ -1638,6 +1639,8 @@ void * irm_sanitize(void * o)                  pthread_rwlock_wrlock(&irmd.flows_lock);                  list_for_each_safe(p, h, &irmd.irm_flows) { +                        int ipcpi; +                        int port_id;                          struct irm_flow * f =                                  list_entry(p, struct irm_flow, next); @@ -1645,9 +1648,13 @@ void * irm_sanitize(void * o)                              && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {                                  log_dbg("Pending port_id %d timed out.",                                           f->port_id); -                                f->n_1_api = -1; +                                f->n_api = -1;                                  irm_flow_set_state(f, FLOW_DEALLOC_PENDING); -                                ipcp_flow_dealloc(f->n_1_api, f->port_id); +                                ipcpi   = f->n_1_api; +                                port_id = f->port_id; +                                pthread_rwlock_unlock(&irmd.flows_lock); +                                ipcp_flow_dealloc(ipcpi, port_id); +                                pthread_rwlock_wrlock(&irmd.flows_lock);                                  continue;                          } @@ -1660,7 +1667,11 @@ void * irm_sanitize(void * o)                                          shm_flow_set_destroy(set);                                  f->n_api = -1;                                  irm_flow_set_state(f, FLOW_DEALLOC_PENDING); -                                ipcp_flow_dealloc(f->n_1_api, f->port_id); +                                ipcpi   = f->n_1_api; +                                port_id = f->port_id; +                                pthread_rwlock_unlock(&irmd.flows_lock); +                                ipcp_flow_dealloc(ipcpi, port_id); +                                pthread_rwlock_wrlock(&irmd.flows_lock);                                  continue;                          } | 
