summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/dht.c27
-rw-r--r--src/ipcpd/normal/dir.c6
-rw-r--r--src/ipcpd/normal/fa.c9
-rw-r--r--src/ipcpd/normal/sdu_sched.c68
4 files changed, 63 insertions, 47 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index b79d9480..74618658 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -827,13 +827,6 @@ static enum lookup_state lookup_wait(struct lookup * lu)
pthread_cleanup_pop(false);
- if (lu->state == LU_DESTROY) {
- lu->state = LU_NULL;
- pthread_cond_signal(&lu->cond);
- pthread_mutex_unlock(&lu->lock);
- return -1;
- }
-
state = lu->state;
pthread_mutex_unlock(&lu->lock);
@@ -1320,7 +1313,7 @@ static int send_msg(struct dht * dht,
ipcp_sdb_release(sdb);
#endif /* __DHT_TEST__ */
- if (msg->code < KAD_STORE) {
+ if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) {
req = kad_req_create(dht, msg, addr);
if (req != NULL)
list_add(&req->next, &dht->requests);
@@ -1364,7 +1357,7 @@ static int kad_add(struct dht * dht,
pthread_rwlock_wrlock(&dht->lock);
- while (--n >= 0) {
+ while (n-- > 0) {
if (contacts[n].id.len != dht->b)
log_warn("Bad key length in contact data.");
@@ -1478,6 +1471,7 @@ static void lookup_set_state(struct lookup * lu,
pthread_mutex_lock(&lu->lock);
lu->state = state;
+ pthread_cond_signal(&lu->cond);
pthread_mutex_unlock(&lu->lock);
}
@@ -1526,6 +1520,9 @@ static struct lookup * kad_lookup(struct dht * dht,
kad_find(dht, id, addrs, code);
break;
case LU_DESTROY:
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
lookup_set_state(lu, LU_NULL);
return NULL;
default:
@@ -1560,7 +1557,7 @@ static void kad_publish(struct dht * dht,
n = lookup_contact_addrs(lu, addrs);
- while (--n > 0) {
+ while (n-- > 0) {
if (addrs[n] == dht->addr) {
kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;
msg.id.data = (uint8_t *) key;
@@ -1932,7 +1929,6 @@ static int kad_handle_find_resp(struct dht * dht,
lu = dht_find_lookup(dht, req->key);
if (lu == NULL) {
- log_dbg("Response for unknown lookup.");
pthread_rwlock_unlock(&dht->lock);
return -1;
}
@@ -1974,8 +1970,7 @@ static void kad_handle_response(struct dht * dht,
case KAD_FIND_NODE:
if (dht_get_state(dht) != DHT_RUNNING)
return;
- if (kad_handle_find_resp(dht, req, msg))
- log_dbg("Invalid or outdated response.");
+ kad_handle_find_resp(dht, req, msg);
break;
default:
break;
@@ -2265,8 +2260,10 @@ void dht_post_sdu(void * ae,
pthread_rwlock_unlock(&dht->lock);
}
- if (msg->code < KAD_STORE)
- send_msg(dht, &resp_msg, addr);
+ if (msg->code < KAD_STORE) {
+ if (send_msg(dht, &resp_msg, addr))
+ log_warn("Failed to send response.");
+ }
kad_msg__free_unpacked(msg, NULL);
diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c
index 697c02da..69b7e90e 100644
--- a/src/ipcpd/normal/dir.c
+++ b/src/ipcpd/normal/dir.c
@@ -114,10 +114,12 @@ int dir_init()
sleep(ENROL_INTV);
}
+ log_dbg("Directory enrolled.");
+
return 0;
}
- log_dbg("Bootstrapping DHT.");
+ log_dbg("Bootstrapping directory.");
/* TODO: get parameters for bootstrap from IRM tool. */
if (dht_bootstrap(dht, KAD_B, 86400)) {
@@ -125,6 +127,8 @@ int dir_init()
return -ENOMEM;
}
+ log_dbg("Directory bootstrapped.");
+
return 0;
}
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index 6e880067..d346f67c 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -98,9 +98,11 @@ static void fa_post_sdu(void * ae,
shm_du_buff_tail(sdb) -
shm_du_buff_head(sdb),
shm_du_buff_head(sdb));
+
+ ipcp_sdb_release(sdb);
+
if (msg == NULL) {
log_err("Failed to unpack flow alloc message.");
- ipcp_sdb_release(sdb);
return;
}
@@ -112,7 +114,6 @@ static void fa_post_sdu(void * ae,
log_err("Bad flow request.");
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
- ipcp_sdb_release(sdb);
return;
}
@@ -126,7 +127,6 @@ static void fa_post_sdu(void * ae,
log_dbg("Won't allocate over non-operational IPCP.");
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
- ipcp_sdb_release(sdb);
return;
}
@@ -140,7 +140,6 @@ static void fa_post_sdu(void * ae,
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
log_err("Failed to get fd for flow.");
- ipcp_sdb_release(sdb);
return;
}
@@ -173,12 +172,10 @@ static void fa_post_sdu(void * ae,
default:
log_err("Got an unknown flow allocation message.");
flow_alloc_msg__free_unpacked(msg, NULL);
- ipcp_sdb_release(sdb);
return;
}
flow_alloc_msg__free_unpacked(msg, NULL);
- ipcp_sdb_release(sdb);
}
int fa_init(void)
diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c
index 63259430..a4b9e074 100644
--- a/src/ipcpd/normal/sdu_sched.c
+++ b/src/ipcpd/normal/sdu_sched.c
@@ -36,11 +36,19 @@
struct sdu_sched {
flow_set_t * set[QOS_CUBE_MAX];
- fqueue_t * fqs[QOS_CUBE_MAX];
next_sdu_t callback;
- pthread_t sdu_reader;
+ pthread_t sdu_readers[IPCP_SCHED_THREADS];
};
+static void cleanup_reader(void * o)
+{
+ int i;
+ fqueue_t ** fqs = (fqueue_t **) o;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ fqueue_destroy(fqs[i]);
+}
+
static void * sdu_reader(void * o)
{
struct sdu_sched * sched;
@@ -49,14 +57,27 @@ static void * sdu_reader(void * o)
int fd;
int i = 0;
int ret;
+ fqueue_t * fqs[QOS_CUBE_MAX];
sched = (struct sdu_sched *) o;
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fqs[i] = fqueue_create();
+ if (fqs[i] == NULL) {
+ int j;
+ for (j = 0; j < i; ++j)
+ fqueue_destroy(fqs[j]);
+ return (void *) -1;
+ }
+ }
+
+ pthread_cleanup_push(cleanup_reader, fqs);
+
while (true) {
/* FIXME: replace with scheduling policy call */
i = (i + 1) % QOS_CUBE_MAX;
- ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout);
+ ret = flow_event_wait(sched->set[i], fqs[i], &timeout);
if (ret == -ETIMEDOUT)
continue;
@@ -65,7 +86,7 @@ static void * sdu_reader(void * o)
continue;
}
- while ((fd = fqueue_next(sched->fqs[i])) >= 0) {
+ while ((fd = fqueue_next(fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
log_warn("Failed to read SDU from fd %d.", fd);
continue;
@@ -78,6 +99,8 @@ static void * sdu_reader(void * o)
}
}
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
@@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)
sdu_sched = malloc(sizeof(*sdu_sched));
if (sdu_sched == NULL)
- return NULL;
+ goto fail_malloc;
sdu_sched->callback = callback;
@@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)
if (sdu_sched->set[i] == NULL) {
for (j = 0; j < i; ++j)
flow_set_destroy(sdu_sched->set[j]);
- goto fail_sdu_sched;
+ goto fail_flow_set;
}
}
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- sdu_sched->fqs[i] = fqueue_create();
- if (sdu_sched->fqs[i] == NULL) {
- for (j = 0; j < i; ++j)
- fqueue_destroy(sdu_sched->fqs[j]);
+ for (i = 0; i < IPCP_SCHED_THREADS; ++i) {
+ if (pthread_create(&sdu_sched->sdu_readers[i], NULL,
+ sdu_reader, sdu_sched)) {
+ int j;
+ for (j = 0; j < i; ++j) {
+ pthread_cancel(sdu_sched->sdu_readers[j]);
+ pthread_join(sdu_sched->sdu_readers[j], NULL);
+ }
goto fail_flow_set;
}
}
- pthread_create(&sdu_sched->sdu_reader,
- NULL,
- sdu_reader,
- (void *) sdu_sched);
-
return sdu_sched;
fail_flow_set:
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- flow_set_destroy(sdu_sched->set[i]);
- fail_sdu_sched:
free(sdu_sched);
+ fail_malloc:
return NULL;
}
@@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)
assert(sdu_sched);
- pthread_cancel(sdu_sched->sdu_reader);
-
- pthread_join(sdu_sched->sdu_reader, NULL);
+ for (i = 0; i < IPCP_SCHED_THREADS; ++i) {
+ pthread_cancel(sdu_sched->sdu_readers[i]);
+ pthread_join(sdu_sched->sdu_readers[i], NULL);
+ }
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fqueue_destroy(sdu_sched->fqs[i]);
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
flow_set_destroy(sdu_sched->set[i]);
- }
free(sdu_sched);
}