summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-09-20 21:28:56 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-09-20 21:40:45 +0200
commit1e1de177e7da8075b016b703168f3473f58e0568 (patch)
treed579d1f7ab0d63000569d78e22f84888382acca7
parent36962003f216b8176f1799bf22af4d258ee8e542 (diff)
downloadouroboros-1e1de177e7da8075b016b703168f3473f58e0568.tar.gz
ouroboros-1e1de177e7da8075b016b703168f3473f58e0568.zip
ipcpd: Handle DHT SDUs in different thread
The DHT will now spawn a thread when receiving SDUs to avoid starvation of sdu scheduler threads. Also fixes some locking issues.
-rw-r--r--src/ipcpd/normal/dht.c73
1 files changed, 58 insertions, 15 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index e7ba4bf3..996b122a 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -227,6 +227,11 @@ struct join_info {
uint64_t addr;
};
+struct sdu_info {
+ struct dht * dht;
+ struct shm_du_buff * sdb;
+};
+
static uint8_t * dht_dup_key(const uint8_t * key,
size_t len)
{
@@ -1665,11 +1670,16 @@ static int kad_join(struct dht * dht,
msg.has_t_refresh = true;
msg.has_t_replicate = true;
msg.alpha = KAD_ALPHA;
- msg.b = dht->b;
msg.k = KAD_K;
msg.t_refresh = KAD_T_REFR;
msg.t_replicate = KAD_T_REPL;
+ pthread_rwlock_rdlock(&dht->lock);
+
+ msg.b = dht->b;
+
+ pthread_rwlock_unlock(&dht->lock);
+
if (send_msg(dht, &msg, addr))
return -1;
@@ -1798,12 +1808,16 @@ static ssize_t dht_get_contacts(struct dht * dht,
pthread_rwlock_rdlock(&dht->lock);
len = dht_contact_list(dht, &l, key);
- if (len == 0)
+ if (len == 0) {
+ pthread_rwlock_unlock(&dht->lock);
return 0;
+ }
*msgs = malloc(len * sizeof(**msgs));
- if (*msgs == NULL)
+ if (*msgs == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
return 0;
+ }
list_for_each_safe(p, h, &l) {
struct contact * c = list_entry(p, struct contact, next);
@@ -1976,8 +1990,6 @@ static int kad_handle_join_resp(struct dht * dht,
return -1;
}
- dht->state = DHT_RUNNING;
-
kad_req_respond(req);
dht_update_bucket(dht, msg->s_id.data, msg->s_addr);
@@ -2201,10 +2213,10 @@ uint64_t dht_query(struct dht * dht,
return 0;
}
-void dht_post_sdu(void * ae,
- struct shm_du_buff * sdb)
+static void * dht_handle_sdu(void * o)
{
struct dht * dht;
+ struct shm_du_buff * sdb;
kad_msg_t * msg;
kad_contact_msg_t ** cmsgs;
kad_msg_t resp_msg = KAD_MSG__INIT;
@@ -2212,12 +2224,15 @@ void dht_post_sdu(void * ae,
buffer_t buf;
size_t i;
- assert(ae);
- assert(sdb);
+ assert(o);
memset(&buf, 0, sizeof(buf));
- dht = (struct dht *) ae;
+ dht = ((struct sdu_info *) o)->dht;
+ sdb = ((struct sdu_info *) o)->sdb;
+
+ assert(dht);
+ assert(sdb);
msg = kad_msg__unpack(NULL,
shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
@@ -2225,27 +2240,29 @@ void dht_post_sdu(void * ae,
ipcp_sdb_release(sdb);
+ free((struct sdu_info *) o);
+
if (msg == NULL) {
log_err("Failed to unpack message.");
- return;
+ return (void *) -1;
}
if (msg->has_key && msg->key.len != dht->b) {
kad_msg__free_unpacked(msg, NULL);
log_warn("Bad key in message.");
- return;
+ return (void *) -1;
}
if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) {
kad_msg__free_unpacked(msg, NULL);
log_warn("Bad source ID in message of type %d.", msg->code);
- return;
+ return (void *) -1;
}
if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
kad_msg__free_unpacked(msg, NULL);
log_dbg("Got a request message when not running.");
- return;
+ return (void *) -1;
}
addr = msg->s_addr;
@@ -2341,11 +2358,34 @@ void dht_post_sdu(void * ae,
free(resp_msg.addrs);
if (resp_msg.n_contacts == 0)
- return;
+ return (void *) -1;
for (i = 0; i < resp_msg.n_contacts; ++i)
kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL);
free(resp_msg.contacts);
+
+ return (void *) 0;
+}
+
+static void dht_post_sdu(void * comp,
+ struct shm_du_buff * sdb)
+{
+ pthread_t thr;
+ struct sdu_info * info;
+
+ info = malloc(sizeof(*info));
+ if (info == NULL)
+ return;
+
+ info->dht = (struct dht *) comp;
+ info->sdb = sdb;
+
+ if (pthread_create(&thr, NULL, dht_handle_sdu, info)) {
+ free(info);
+ return;
+ }
+
+ pthread_detach(thr);
}
void dht_destroy(struct dht * dht)
@@ -2421,6 +2461,8 @@ static void * join_thr(void * o)
sleep(KAD_JOIN_INTV);
}
+ dht_set_state(info->dht, DHT_RUNNING);
+
lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE);
if (lu != NULL)
lookup_destroy(lu);
@@ -2509,6 +2551,7 @@ struct dht * dht_create(uint64_t addr)
notifier_reg(handle_event, dht);
#else
(void) handle_event;
+ (void) dht_post_sdu;
#endif
dht->state = DHT_INIT;