diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-07-30 10:11:35 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-07-30 10:11:35 +0000 |
commit | 490ebfdb483c6f13aa45e5d80610f55ee95f9fae (patch) | |
tree | a30f99b02b460edeb2f3db094f89a431eddfa1f6 | |
parent | bddac9e135e1a412d60de39cf17249507107499d (diff) | |
parent | 396cd99e3bc51bbc23387259da028d109349374c (diff) | |
download | ouroboros-490ebfdb483c6f13aa45e5d80610f55ee95f9fae.tar.gz ouroboros-490ebfdb483c6f13aa45e5d80610f55ee95f9fae.zip |
Merged in dstaesse/ouroboros/be-fixes (pull request #537)
Be fixes
-rw-r--r-- | include/ouroboros/config.h.in | 2 | ||||
-rw-r--r-- | src/ipcpd/normal/dht.c | 27 | ||||
-rw-r--r-- | src/ipcpd/normal/dir.c | 6 | ||||
-rw-r--r-- | src/ipcpd/normal/fa.c | 9 | ||||
-rw-r--r-- | src/ipcpd/normal/sdu_sched.c | 68 |
5 files changed, 64 insertions, 48 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 736ba5b3..e8341ee2 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -56,7 +56,7 @@ /* IPCP dynamic threadpooling */ #define IPCP_MIN_THREADS 4 #define IPCP_ADD_THREADS 16 - +#define IPCP_SCHED_THREADS 8 #define IPCPD_MAX_CONNS IRMD_MAX_FLOWS #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC #define PFT_SIZE 1 << 12 diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index b79d9480..74618658 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -827,13 +827,6 @@ static enum lookup_state lookup_wait(struct lookup * lu) pthread_cleanup_pop(false); - if (lu->state == LU_DESTROY) { - lu->state = LU_NULL; - pthread_cond_signal(&lu->cond); - pthread_mutex_unlock(&lu->lock); - return -1; - } - state = lu->state; pthread_mutex_unlock(&lu->lock); @@ -1320,7 +1313,7 @@ static int send_msg(struct dht * dht, ipcp_sdb_release(sdb); #endif /* __DHT_TEST__ */ - if (msg->code < KAD_STORE) { + if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) { req = kad_req_create(dht, msg, addr); if (req != NULL) list_add(&req->next, &dht->requests); @@ -1364,7 +1357,7 @@ static int kad_add(struct dht * dht, pthread_rwlock_wrlock(&dht->lock); - while (--n >= 0) { + while (n-- > 0) { if (contacts[n].id.len != dht->b) log_warn("Bad key length in contact data."); @@ -1478,6 +1471,7 @@ static void lookup_set_state(struct lookup * lu, pthread_mutex_lock(&lu->lock); lu->state = state; + pthread_cond_signal(&lu->cond); pthread_mutex_unlock(&lu->lock); } @@ -1526,6 +1520,9 @@ static struct lookup * kad_lookup(struct dht * dht, kad_find(dht, id, addrs, code); break; case LU_DESTROY: + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); lookup_set_state(lu, LU_NULL); return NULL; default: @@ -1560,7 +1557,7 @@ static void kad_publish(struct dht * dht, n = lookup_contact_addrs(lu, addrs); - while (--n > 0) { + while (n-- > 0) { if (addrs[n] == dht->addr) { kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; msg.id.data = (uint8_t *) key; @@ -1932,7 +1929,6 @@ static int kad_handle_find_resp(struct dht * dht, lu = dht_find_lookup(dht, req->key); if (lu == NULL) { - log_dbg("Response for unknown lookup."); pthread_rwlock_unlock(&dht->lock); return -1; } @@ -1974,8 +1970,7 @@ static void kad_handle_response(struct dht * dht, case KAD_FIND_NODE: if (dht_get_state(dht) != DHT_RUNNING) return; - if (kad_handle_find_resp(dht, req, msg)) - log_dbg("Invalid or outdated response."); + kad_handle_find_resp(dht, req, msg); break; default: break; @@ -2265,8 +2260,10 @@ void dht_post_sdu(void * ae, pthread_rwlock_unlock(&dht->lock); } - if (msg->code < KAD_STORE) - send_msg(dht, &resp_msg, addr); + if (msg->code < KAD_STORE) { + if (send_msg(dht, &resp_msg, addr)) + log_warn("Failed to send response."); + } kad_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 697c02da..69b7e90e 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -114,10 +114,12 @@ int dir_init() sleep(ENROL_INTV); } + log_dbg("Directory enrolled."); + return 0; } - log_dbg("Bootstrapping DHT."); + log_dbg("Bootstrapping directory."); /* TODO: get parameters for bootstrap from IRM tool. */ if (dht_bootstrap(dht, KAD_B, 86400)) { @@ -125,6 +127,8 @@ int dir_init() return -ENOMEM; } + log_dbg("Directory bootstrapped."); + return 0; } diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 6e880067..d346f67c 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -98,9 +98,11 @@ static void fa_post_sdu(void * ae, shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), shm_du_buff_head(sdb)); + + ipcp_sdb_release(sdb); + if (msg == NULL) { log_err("Failed to unpack flow alloc message."); - ipcp_sdb_release(sdb); return; } @@ -112,7 +114,6 @@ static void fa_post_sdu(void * ae, log_err("Bad flow request."); pthread_mutex_unlock(&ipcpi.alloc_lock); flow_alloc_msg__free_unpacked(msg, NULL); - ipcp_sdb_release(sdb); return; } @@ -126,7 +127,6 @@ static void fa_post_sdu(void * ae, log_dbg("Won't allocate over non-operational IPCP."); pthread_mutex_unlock(&ipcpi.alloc_lock); flow_alloc_msg__free_unpacked(msg, NULL); - ipcp_sdb_release(sdb); return; } @@ -140,7 +140,6 @@ static void fa_post_sdu(void * ae, pthread_mutex_unlock(&ipcpi.alloc_lock); flow_alloc_msg__free_unpacked(msg, NULL); log_err("Failed to get fd for flow."); - ipcp_sdb_release(sdb); return; } @@ -173,12 +172,10 @@ static void fa_post_sdu(void * ae, default: log_err("Got an unknown flow allocation message."); flow_alloc_msg__free_unpacked(msg, NULL); - ipcp_sdb_release(sdb); return; } flow_alloc_msg__free_unpacked(msg, NULL); - ipcp_sdb_release(sdb); } int fa_init(void) diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 63259430..a4b9e074 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -36,11 +36,19 @@ struct sdu_sched { flow_set_t * set[QOS_CUBE_MAX]; - fqueue_t * fqs[QOS_CUBE_MAX]; next_sdu_t callback; - pthread_t sdu_reader; + pthread_t sdu_readers[IPCP_SCHED_THREADS]; }; +static void cleanup_reader(void * o) +{ + int i; + fqueue_t ** fqs = (fqueue_t **) o; + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fqueue_destroy(fqs[i]); +} + static void * sdu_reader(void * o) { struct sdu_sched * sched; @@ -49,14 +57,27 @@ static void * sdu_reader(void * o) int fd; int i = 0; int ret; + fqueue_t * fqs[QOS_CUBE_MAX]; sched = (struct sdu_sched *) o; + for (i = 0; i < QOS_CUBE_MAX; ++i) { + fqs[i] = fqueue_create(); + if (fqs[i] == NULL) { + int j; + for (j = 0; j < i; ++j) + fqueue_destroy(fqs[j]); + return (void *) -1; + } + } + + pthread_cleanup_push(cleanup_reader, fqs); + while (true) { /* FIXME: replace with scheduling policy call */ i = (i + 1) % QOS_CUBE_MAX; - ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); + ret = flow_event_wait(sched->set[i], fqs[i], &timeout); if (ret == -ETIMEDOUT) continue; @@ -65,7 +86,7 @@ static void * sdu_reader(void * o) continue; } - while ((fd = fqueue_next(sched->fqs[i])) >= 0) { + while ((fd = fqueue_next(fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { log_warn("Failed to read SDU from fd %d.", fd); continue; @@ -78,6 +99,8 @@ static void * sdu_reader(void * o) } } + pthread_cleanup_pop(true); + return (void *) 0; } @@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback) sdu_sched = malloc(sizeof(*sdu_sched)); if (sdu_sched == NULL) - return NULL; + goto fail_malloc; sdu_sched->callback = callback; @@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback) if (sdu_sched->set[i] == NULL) { for (j = 0; j < i; ++j) flow_set_destroy(sdu_sched->set[j]); - goto fail_sdu_sched; + goto fail_flow_set; } } - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sdu_sched->fqs[i] = fqueue_create(); - if (sdu_sched->fqs[i] == NULL) { - for (j = 0; j < i; ++j) - fqueue_destroy(sdu_sched->fqs[j]); + for (i = 0; i < IPCP_SCHED_THREADS; ++i) { + if (pthread_create(&sdu_sched->sdu_readers[i], NULL, + sdu_reader, sdu_sched)) { + int j; + for (j = 0; j < i; ++j) { + pthread_cancel(sdu_sched->sdu_readers[j]); + pthread_join(sdu_sched->sdu_readers[j], NULL); + } goto fail_flow_set; } } - pthread_create(&sdu_sched->sdu_reader, - NULL, - sdu_reader, - (void *) sdu_sched); - return sdu_sched; fail_flow_set: - for (i = 0; i < QOS_CUBE_MAX; ++i) - flow_set_destroy(sdu_sched->set[i]); - fail_sdu_sched: free(sdu_sched); + fail_malloc: return NULL; } @@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched) assert(sdu_sched); - pthread_cancel(sdu_sched->sdu_reader); - - pthread_join(sdu_sched->sdu_reader, NULL); + for (i = 0; i < IPCP_SCHED_THREADS; ++i) { + pthread_cancel(sdu_sched->sdu_readers[i]); + pthread_join(sdu_sched->sdu_readers[i], NULL); + } - for (i = 0; i < QOS_CUBE_MAX; ++i) { - fqueue_destroy(sdu_sched->fqs[i]); + for (i = 0; i < QOS_CUBE_MAX; ++i) flow_set_destroy(sdu_sched->set[i]); - } free(sdu_sched); } |