summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib/cdap.c26
1 files changed, 19 insertions, 7 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 8ebfbec1..16f2c078 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -309,17 +309,22 @@ static void * sdu_reader(void * o)
set_proc(instance, true);
fd = fqueue_next(instance->fq);
len = flow_read(fd, buf, BUF_SIZE);
- if (len < 0)
+ if (len < 0) {
+ set_proc(instance, false);
continue;
+ }
msg = cdap__unpack(NULL, len, buf);
- if (msg == NULL)
+ if (msg == NULL) {
+ set_proc(instance, false);
continue;
+ }
if (msg->opcode != CDAP_REPLY) {
rcvd = malloc(sizeof(*rcvd));
if (rcvd == NULL) {
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
continue;
}
@@ -331,6 +336,7 @@ static void * sdu_reader(void * o)
rcvd->key = next_id(instance);
if (rcvd->key == INVALID_ID) {
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
free(rcvd);
continue;
}
@@ -341,6 +347,7 @@ static void * sdu_reader(void * o)
if (rcvd->name == NULL) {
release_id(instance, rcvd->key);
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
free(rcvd);
continue;
}
@@ -351,6 +358,7 @@ static void * sdu_reader(void * o)
if (rcvd->data == NULL) {
release_id(instance, rcvd->key);
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
free(rcvd->name);
free(rcvd);
continue;
@@ -371,6 +379,7 @@ static void * sdu_reader(void * o)
req = cdap_sent_get_by_iid(instance, msg->invoke_id);
if (req == NULL) {
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
continue;
}
@@ -379,6 +388,7 @@ static void * sdu_reader(void * o)
data.data = malloc(data.len);
if (data.data == NULL) {
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
continue;
}
memcpy(data.data, msg->value.data, data.len);
@@ -640,8 +650,10 @@ cdap_key_t * cdap_request_send(struct cdap * instance,
pthread_rwlock_rdlock(&instance->flows_lock);
keys = malloc(sizeof(*keys) * (instance->n_flows + 1));
- if (keys == NULL)
+ if (keys == NULL) {
+ pthread_rwlock_unlock(&instance->flows_lock);
return NULL;
+ }
memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1));
@@ -677,35 +689,35 @@ cdap_key_t * cdap_request_send(struct cdap * instance,
*key = next_id(instance);
if (*key == INVALID_ID) {
- pthread_rwlock_unlock(&instance->flows_lock);
release_id(instance, iid);
+ pthread_rwlock_unlock(&instance->flows_lock);
return keys;
}
req = cdap_sent_add(instance, e->fd, iid, *key);
if (req == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
release_id(instance, *key);
release_id(instance, iid);
+ pthread_rwlock_unlock(&instance->flows_lock);
*key = INVALID_CDAP_KEY;
return keys;
}
ret = write_msg(e->fd, &msg);
if (ret == -ENOMEM) {
- pthread_rwlock_unlock(&instance->flows_lock);
cdap_sent_del(instance, req);
release_id(instance, *key);
release_id(instance, iid);
+ pthread_rwlock_unlock(&instance->flows_lock);
*key = INVALID_CDAP_KEY;
return keys;
}
if (ret < 0) {
- pthread_rwlock_unlock(&instance->flows_lock);
cdap_sent_del(instance, req);
release_id(instance, *key);
release_id(instance, iid);
+ pthread_rwlock_unlock(&instance->flows_lock);
*key = INVALID_CDAP_KEY;
return keys;
}