diff options
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 27 | ||||
| -rw-r--r-- | src/ipcpd/normal/dir.c | 6 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 9 | ||||
| -rw-r--r-- | src/ipcpd/normal/sdu_sched.c | 68 | 
4 files changed, 63 insertions, 47 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index b79d9480..74618658 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -827,13 +827,6 @@ static enum lookup_state lookup_wait(struct lookup * lu)          pthread_cleanup_pop(false); -        if (lu->state == LU_DESTROY) { -                lu->state = LU_NULL; -                pthread_cond_signal(&lu->cond); -                pthread_mutex_unlock(&lu->lock); -                return -1; -        } -          state = lu->state;          pthread_mutex_unlock(&lu->lock); @@ -1320,7 +1313,7 @@ static int send_msg(struct dht * dht,          ipcp_sdb_release(sdb);  #endif /* __DHT_TEST__ */ -        if (msg->code < KAD_STORE) { +        if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) {                  req = kad_req_create(dht, msg, addr);                  if (req != NULL)                          list_add(&req->next, &dht->requests); @@ -1364,7 +1357,7 @@ static int kad_add(struct dht *              dht,          pthread_rwlock_wrlock(&dht->lock); -        while (--n >= 0) { +        while (n-- > 0) {                  if (contacts[n].id.len != dht->b)                          log_warn("Bad key length in contact data."); @@ -1478,6 +1471,7 @@ static void lookup_set_state(struct lookup *   lu,          pthread_mutex_lock(&lu->lock);          lu->state = state; +        pthread_cond_signal(&lu->cond);          pthread_mutex_unlock(&lu->lock);  } @@ -1526,6 +1520,9 @@ static struct lookup * kad_lookup(struct dht *    dht,                          kad_find(dht, id, addrs, code);                          break;                  case LU_DESTROY: +                        pthread_rwlock_wrlock(&dht->lock); +                        list_del(&lu->next); +                        pthread_rwlock_unlock(&dht->lock);                          lookup_set_state(lu, LU_NULL);                          return NULL;                  default: @@ -1560,7 +1557,7 @@ static void kad_publish(struct dht *    dht,          n = lookup_contact_addrs(lu, addrs); -        while (--n > 0) { +        while (n-- > 0) {                  if (addrs[n] == dht->addr) {                          kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;                          msg.id.data = (uint8_t *) key; @@ -1932,7 +1929,6 @@ static int kad_handle_find_resp(struct dht *     dht,          lu = dht_find_lookup(dht, req->key);          if (lu == NULL) { -                log_dbg("Response for unknown lookup.");                  pthread_rwlock_unlock(&dht->lock);                  return -1;          } @@ -1974,8 +1970,7 @@ static void kad_handle_response(struct dht * dht,          case KAD_FIND_NODE:                  if (dht_get_state(dht) != DHT_RUNNING)                          return; -                if (kad_handle_find_resp(dht, req, msg)) -                        log_dbg("Invalid or outdated response."); +                kad_handle_find_resp(dht, req, msg);                  break;          default:                  break; @@ -2265,8 +2260,10 @@ void dht_post_sdu(void *               ae,                  pthread_rwlock_unlock(&dht->lock);          } -        if (msg->code < KAD_STORE) -                send_msg(dht, &resp_msg, addr); +        if (msg->code < KAD_STORE) { +                if (send_msg(dht, &resp_msg, addr)) +                        log_warn("Failed to send response."); +        }          kad_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 697c02da..69b7e90e 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -114,10 +114,12 @@ int dir_init()                          sleep(ENROL_INTV);                  } +                log_dbg("Directory enrolled."); +                  return 0;          } -        log_dbg("Bootstrapping DHT."); +        log_dbg("Bootstrapping directory.");          /* TODO: get parameters for bootstrap from IRM tool. */          if (dht_bootstrap(dht, KAD_B, 86400)) { @@ -125,6 +127,8 @@ int dir_init()                  return -ENOMEM;          } +        log_dbg("Directory bootstrapped."); +          return 0;  } diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 6e880067..d346f67c 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -98,9 +98,11 @@ static void fa_post_sdu(void *               ae,                                       shm_du_buff_tail(sdb) -                                       shm_du_buff_head(sdb),                                       shm_du_buff_head(sdb)); + +        ipcp_sdb_release(sdb); +          if (msg == NULL) {                  log_err("Failed to unpack flow alloc message."); -                ipcp_sdb_release(sdb);                  return;          } @@ -112,7 +114,6 @@ static void fa_post_sdu(void *               ae,                          log_err("Bad flow request.");                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL); -                        ipcp_sdb_release(sdb);                          return;                  } @@ -126,7 +127,6 @@ static void fa_post_sdu(void *               ae,                          log_dbg("Won't allocate over non-operational IPCP.");                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL); -                        ipcp_sdb_release(sdb);                          return;                  } @@ -140,7 +140,6 @@ static void fa_post_sdu(void *               ae,                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL);                          log_err("Failed to get fd for flow."); -                        ipcp_sdb_release(sdb);                          return;                  } @@ -173,12 +172,10 @@ static void fa_post_sdu(void *               ae,          default:                  log_err("Got an unknown flow allocation message.");                  flow_alloc_msg__free_unpacked(msg, NULL); -                ipcp_sdb_release(sdb);                  return;          }          flow_alloc_msg__free_unpacked(msg, NULL); -        ipcp_sdb_release(sdb);  }  int fa_init(void) diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 63259430..a4b9e074 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -36,11 +36,19 @@  struct sdu_sched {          flow_set_t * set[QOS_CUBE_MAX]; -        fqueue_t *   fqs[QOS_CUBE_MAX];          next_sdu_t   callback; -        pthread_t    sdu_reader; +        pthread_t    sdu_readers[IPCP_SCHED_THREADS];  }; +static void cleanup_reader(void * o) +{ +        int         i; +        fqueue_t ** fqs = (fqueue_t **) o; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                fqueue_destroy(fqs[i]); +} +  static void * sdu_reader(void * o)  {          struct sdu_sched *   sched; @@ -49,14 +57,27 @@ static void * sdu_reader(void * o)          int                  fd;          int                  i = 0;          int                  ret; +        fqueue_t *           fqs[QOS_CUBE_MAX];          sched = (struct sdu_sched *) o; +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                fqs[i] = fqueue_create(); +                if (fqs[i] == NULL) { +                        int j; +                        for (j = 0; j < i; ++j) +                                fqueue_destroy(fqs[j]); +                        return (void *) -1; +                } +        } + +        pthread_cleanup_push(cleanup_reader, fqs); +          while (true) {                  /* FIXME: replace with scheduling policy call */                  i = (i + 1) % QOS_CUBE_MAX; -                ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); +                ret = flow_event_wait(sched->set[i], fqs[i], &timeout);                  if (ret == -ETIMEDOUT)                          continue; @@ -65,7 +86,7 @@ static void * sdu_reader(void * o)                          continue;                  } -                while ((fd = fqueue_next(sched->fqs[i])) >= 0) { +                while ((fd = fqueue_next(fqs[i])) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  log_warn("Failed to read SDU from fd %d.", fd);                                  continue; @@ -78,6 +99,8 @@ static void * sdu_reader(void * o)                  }          } +        pthread_cleanup_pop(true); +          return (void *) 0;  } @@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)          sdu_sched = malloc(sizeof(*sdu_sched));          if (sdu_sched == NULL) -                return NULL; +                goto fail_malloc;          sdu_sched->callback = callback; @@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)                  if (sdu_sched->set[i] == NULL) {                          for (j = 0; j < i; ++j)                                  flow_set_destroy(sdu_sched->set[j]); -                        goto fail_sdu_sched; +                        goto fail_flow_set;                  }          } -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                sdu_sched->fqs[i] = fqueue_create(); -                if (sdu_sched->fqs[i] == NULL) { -                        for (j = 0; j < i; ++j) -                                fqueue_destroy(sdu_sched->fqs[j]); +        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { +                if (pthread_create(&sdu_sched->sdu_readers[i], NULL, +                                   sdu_reader, sdu_sched)) { +                        int j; +                        for (j = 0; j < i; ++j) { +                                pthread_cancel(sdu_sched->sdu_readers[j]); +                                pthread_join(sdu_sched->sdu_readers[j], NULL); +                        }                          goto fail_flow_set;                  }          } -        pthread_create(&sdu_sched->sdu_reader, -                       NULL, -                       sdu_reader, -                       (void *) sdu_sched); -          return sdu_sched;   fail_flow_set: -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                flow_set_destroy(sdu_sched->set[i]); - fail_sdu_sched:           free(sdu_sched); + fail_malloc:           return NULL;  } @@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)          assert(sdu_sched); -        pthread_cancel(sdu_sched->sdu_reader); - -        pthread_join(sdu_sched->sdu_reader, NULL); +        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { +                pthread_cancel(sdu_sched->sdu_readers[i]); +                pthread_join(sdu_sched->sdu_readers[i], NULL); +        } -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fqueue_destroy(sdu_sched->fqs[i]); +        for (i = 0; i < QOS_CUBE_MAX; ++i)                  flow_set_destroy(sdu_sched->set[i]); -        }          free(sdu_sched);  } | 
