summaryrefslogtreecommitdiff
path: root/src/lib/cdap.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-07-20 12:31:02 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-07-20 12:31:02 +0000
commit3e9cbbabbc70319e44aa12bd6c59746cab3c23f6 (patch)
tree95fb7178310ce4ecc688ba593679220c0145cabf /src/lib/cdap.c
parenta295fd7b24c86f071061aa15e7c82c2463b001b5 (diff)
parent7a993023669945399b174cff5d182ac3dcaadf7f (diff)
downloadouroboros-3e9cbbabbc70319e44aa12bd6c59746cab3c23f6.tar.gz
ouroboros-3e9cbbabbc70319e44aa12bd6c59746cab3c23f6.zip
Merged in dstaesse/ouroboros/testing-cdap (pull request #533)
lib: Fix processing state in CDAP
Diffstat (limited to 'src/lib/cdap.c')
-rw-r--r--src/lib/cdap.c26
1 files changed, 19 insertions, 7 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 8ebfbec1..16f2c078 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -309,17 +309,22 @@ static void * sdu_reader(void * o)
set_proc(instance, true);
fd = fqueue_next(instance->fq);
len = flow_read(fd, buf, BUF_SIZE);
- if (len < 0)
+ if (len < 0) {
+ set_proc(instance, false);
continue;
+ }
msg = cdap__unpack(NULL, len, buf);
- if (msg == NULL)
+ if (msg == NULL) {
+ set_proc(instance, false);
continue;
+ }
if (msg->opcode != CDAP_REPLY) {
rcvd = malloc(sizeof(*rcvd));
if (rcvd == NULL) {
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
continue;
}
@@ -331,6 +336,7 @@ static void * sdu_reader(void * o)
rcvd->key = next_id(instance);
if (rcvd->key == INVALID_ID) {
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
free(rcvd);
continue;
}
@@ -341,6 +347,7 @@ static void * sdu_reader(void * o)
if (rcvd->name == NULL) {
release_id(instance, rcvd->key);
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
free(rcvd);
continue;
}
@@ -351,6 +358,7 @@ static void * sdu_reader(void * o)
if (rcvd->data == NULL) {
release_id(instance, rcvd->key);
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
free(rcvd->name);
free(rcvd);
continue;
@@ -371,6 +379,7 @@ static void * sdu_reader(void * o)
req = cdap_sent_get_by_iid(instance, msg->invoke_id);
if (req == NULL) {
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
continue;
}
@@ -379,6 +388,7 @@ static void * sdu_reader(void * o)
data.data = malloc(data.len);
if (data.data == NULL) {
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
continue;
}
memcpy(data.data, msg->value.data, data.len);
@@ -640,8 +650,10 @@ cdap_key_t * cdap_request_send(struct cdap * instance,
pthread_rwlock_rdlock(&instance->flows_lock);
keys = malloc(sizeof(*keys) * (instance->n_flows + 1));
- if (keys == NULL)
+ if (keys == NULL) {
+ pthread_rwlock_unlock(&instance->flows_lock);
return NULL;
+ }
memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1));
@@ -677,35 +689,35 @@ cdap_key_t * cdap_request_send(struct cdap * instance,
*key = next_id(instance);
if (*key == INVALID_ID) {
- pthread_rwlock_unlock(&instance->flows_lock);
release_id(instance, iid);
+ pthread_rwlock_unlock(&instance->flows_lock);
return keys;
}
req = cdap_sent_add(instance, e->fd, iid, *key);
if (req == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
release_id(instance, *key);
release_id(instance, iid);
+ pthread_rwlock_unlock(&instance->flows_lock);
*key = INVALID_CDAP_KEY;
return keys;
}
ret = write_msg(e->fd, &msg);
if (ret == -ENOMEM) {
- pthread_rwlock_unlock(&instance->flows_lock);
cdap_sent_del(instance, req);
release_id(instance, *key);
release_id(instance, iid);
+ pthread_rwlock_unlock(&instance->flows_lock);
*key = INVALID_CDAP_KEY;
return keys;
}
if (ret < 0) {
- pthread_rwlock_unlock(&instance->flows_lock);
cdap_sent_del(instance, req);
release_id(instance, *key);
release_id(instance, iid);
+ pthread_rwlock_unlock(&instance->flows_lock);
*key = INVALID_CDAP_KEY;
return keys;
}