diff options
| -rw-r--r-- | src/ipcpd/normal/dht.c | 35 | ||||
| -rw-r--r-- | src/lib/tpm.c | 88 | 
2 files changed, 85 insertions, 38 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index e1c34b6f..c15dacbc 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -119,6 +119,8 @@ struct lookup {          struct list_head  contacts;          size_t            n_contacts; +        size_t            out; +          uint64_t *        addrs;          size_t            n_addrs; @@ -575,6 +577,7 @@ static struct lookup * lookup_create(struct dht *    dht,          lu->state   = LU_INIT;          lu->addrs   = NULL;          lu->n_addrs = 0; +        lu->out     = 0;          lu->key     = dht_dup_key(id, dht->b);          if (lu->key == NULL)                  goto fail_id; @@ -613,6 +616,16 @@ static struct lookup * lookup_create(struct dht *    dht,          return NULL;  } +static void lookup_add_out(struct lookup * lu, +                           size_t          n) +{ +        pthread_mutex_lock(&lu->lock); + +        lu->out += n; + +        pthread_mutex_unlock(&lu->lock); +} +  static void lookup_destroy(struct lookup * lu)  {          struct list_head * p; @@ -680,6 +693,8 @@ static void lookup_update(struct dht *    dht,          pthread_mutex_lock(&lu->lock); +        --lu->out; +          if (msg->n_addrs > 0) {                  if (lu->addrs == NULL) {                          lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); @@ -734,7 +749,11 @@ static void lookup_update(struct dht *    dht,                  }          } -        lu->state = LU_UPDATE; +        if (lu->out == 0) +                lu->state = LU_COMPLETE; +        else +                lu->state = LU_UPDATE; +          pthread_cond_signal(&lu->cond);          pthread_mutex_unlock(&lu->lock);          return; @@ -833,7 +852,7 @@ static enum lookup_state lookup_wait(struct lookup * lu)          pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu); -        while (lu->state == LU_PENDING) +        while (lu->state == LU_PENDING && ret != -ETIMEDOUT)                  ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs);          pthread_cleanup_pop(false); @@ -1497,7 +1516,7 @@ static struct lookup * kad_lookup(struct dht *    dht,          uint64_t          addrs[KAD_ALPHA + 1];          enum lookup_state state;          struct lookup *   lu; -        size_t            out = 0; +        size_t            out;          lu = lookup_create(dht, id);          if (lu == NULL) @@ -1513,7 +1532,7 @@ static struct lookup * kad_lookup(struct dht *    dht,                  return NULL;          } -        out += kad_find(dht, id, addrs, code); +        out = kad_find(dht, id, addrs, code);          if (out == 0) {                  pthread_rwlock_wrlock(&dht->lock);                  list_del(&lu->next); @@ -1522,19 +1541,21 @@ static struct lookup * kad_lookup(struct dht *    dht,                  return lu;          } +        lookup_add_out(lu, out); +          while ((state = lookup_wait(lu)) != LU_COMPLETE) { -                --out;                  switch (state) {                  case LU_UPDATE:                          lookup_new_addrs(lu, addrs); -                        if (addrs[0] == 0 && out == 0) { +                        if (addrs[0] == 0) {                                  pthread_rwlock_wrlock(&dht->lock);                                  list_del(&lu->next);                                  pthread_rwlock_unlock(&dht->lock);                                  return lu;                          } -                        out += kad_find(dht, id, addrs, code); +                        out = kad_find(dht, id, addrs, code); +                        lookup_add_out(lu, out);                          break;                  case LU_DESTROY:                          pthread_rwlock_wrlock(&dht->lock); diff --git a/src/lib/tpm.c b/src/lib/tpm.c index 8298eeb5..f45744ee 100644 --- a/src/lib/tpm.c +++ b/src/lib/tpm.c @@ -29,6 +29,7 @@  #include <pthread.h>  #include <stdlib.h> +#include <assert.h>  #define TPM_TIMEOUT 1000 @@ -36,6 +37,7 @@ struct pthr_el {          struct list_head next;          bool             join; +        bool             kill;          pthread_t        thr;  }; @@ -49,8 +51,8 @@ enum tpm_state {  struct {          size_t           min;          size_t           inc; -        size_t           max;          size_t           cur; +        size_t           wrk;          void * (* func)(void *); @@ -71,9 +73,14 @@ static void tpm_join(void)          list_for_each_safe(p, h, &tpm.pool) {                  struct pthr_el * e = list_entry(p, struct pthr_el, next); -                if (tpm.state != TPM_RUNNING) +                if (tpm.state != TPM_RUNNING) { +                        if (!e->kill) { +                                e->kill = true; +                                --tpm.cur; +                        }                          while (!e->join)                                  pthread_cond_wait(&tpm.cond, &tpm.lock); +                }                  if (e->join) {                          pthread_join(e->thr, NULL); @@ -83,6 +90,37 @@ static void tpm_join(void)          }  } +static struct pthr_el * tpm_pthr_el(pthread_t thr) +{ +        struct list_head * p; +        struct pthr_el *   e; + +        list_for_each(p, &tpm.pool) { +                e = list_entry(p, struct pthr_el, next); +                if (e->thr == thr) +                        return e; + +        } + +        assert(false); + +        return NULL; +} + +static void tpm_kill(void) +{ +        struct list_head * p; + +        list_for_each(p, &tpm.pool) { +                struct pthr_el * e = list_entry(p, struct pthr_el, next); +                if (!e->kill) { +                        e->kill = true; +                        --tpm.cur; +                        return; +                } +        } +} +  static void * tpmgr(void * o)  {          struct timespec dl; @@ -96,39 +134,40 @@ static void * tpmgr(void * o)                  pthread_mutex_lock(&tpm.lock); -                tpm_join(); -                  if (tpm.state != TPM_RUNNING) { -                        tpm.max = 0;                          tpm_join();                          pthread_mutex_unlock(&tpm.lock);                          break;                  } -                if (tpm.cur < tpm.min) { -                        tpm.max = tpm.inc; +                tpm_join(); -                        while (tpm.cur < tpm.max) { +                if (tpm.cur - tpm.wrk < tpm.min) { +                        size_t i; +                        for (i = 0; i < tpm.inc; ++i) {                                  struct pthr_el * e = malloc(sizeof(*e));                                  if (e == NULL)                                          break;                                  e->join = false; +                                e->kill = false;                                  if (pthread_create(&e->thr, NULL,                                                     tpm.func, NULL)) {                                          free(e); -                                } else { -                                        list_add(&e->next, &tpm.pool); -                                        ++tpm.cur; +                                        break;                                  } + +                                list_add(&e->next, &tpm.pool);                          } + +                        tpm.cur += tpm.inc;                  }                  if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl)                      == ETIMEDOUT) -                        if (tpm.cur > tpm.min ) -                                --tpm.max; +                        if (tpm.cur > tpm.min) +                                tpm_kill();                  pthread_mutex_unlock(&tpm.lock);          } @@ -162,8 +201,8 @@ int tpm_init(size_t min,          tpm.func  = func;          tpm.min   = min;          tpm.inc   = inc; -        tpm.max   = 0;          tpm.cur   = 0; +        tpm.wrk   = 0;          return 0; @@ -214,7 +253,7 @@ bool tpm_check(void)          pthread_mutex_lock(&tpm.lock); -        ret = tpm.cur > tpm.max; +        ret = tpm_pthr_el(pthread_self())->kill;          pthread_mutex_unlock(&tpm.lock); @@ -225,7 +264,7 @@ void tpm_inc(void)  {          pthread_mutex_lock(&tpm.lock); -        ++tpm.cur; +        --tpm.wrk;          pthread_mutex_unlock(&tpm.lock);  } @@ -234,7 +273,7 @@ void tpm_dec(void)  {          pthread_mutex_lock(&tpm.lock); -        --tpm.cur; +        ++tpm.wrk;          pthread_cond_signal(&tpm.cond); @@ -243,22 +282,9 @@ void tpm_dec(void)  void tpm_exit(void)  { -        struct list_head * p; -        pthread_t          id; - -        id = pthread_self(); -          pthread_mutex_lock(&tpm.lock); -        --tpm.cur; - -        list_for_each(p, &tpm.pool) { -                struct pthr_el * e = list_entry(p, struct pthr_el, next); -                if (e->thr == id) { -                        e->join = true; -                        break; -                } -        } +        tpm_pthr_el(pthread_self())->join = true;          pthread_cond_signal(&tpm.cond);  | 
