diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/cdap.c | 26 | 
1 files changed, 19 insertions, 7 deletions
| diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 8ebfbec1..16f2c078 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -309,17 +309,22 @@ static void * sdu_reader(void * o)                  set_proc(instance, true);                  fd = fqueue_next(instance->fq);                  len = flow_read(fd, buf, BUF_SIZE); -                if (len < 0) +                if (len < 0) { +                        set_proc(instance, false);                          continue; +                }                  msg = cdap__unpack(NULL, len, buf); -                if (msg == NULL) +                if (msg == NULL) { +                        set_proc(instance, false);                          continue; +                }                  if (msg->opcode != CDAP_REPLY) {                          rcvd = malloc(sizeof(*rcvd));                          if (rcvd == NULL) {                                  cdap__free_unpacked(msg, NULL); +                                set_proc(instance, false);                                  continue;                          } @@ -331,6 +336,7 @@ static void * sdu_reader(void * o)                          rcvd->key    = next_id(instance);                          if (rcvd->key == INVALID_ID) {                                  cdap__free_unpacked(msg, NULL); +                                set_proc(instance, false);                                  free(rcvd);                                  continue;                          } @@ -341,6 +347,7 @@ static void * sdu_reader(void * o)                          if (rcvd->name == NULL) {                                  release_id(instance, rcvd->key);                                  cdap__free_unpacked(msg, NULL); +                                set_proc(instance, false);                                  free(rcvd);                                  continue;                          } @@ -351,6 +358,7 @@ static void * sdu_reader(void * o)                                  if (rcvd->data == NULL) {                                          release_id(instance, rcvd->key);                                          cdap__free_unpacked(msg, NULL); +                                        set_proc(instance, false);                                          free(rcvd->name);                                          free(rcvd);                                          continue; @@ -371,6 +379,7 @@ static void * sdu_reader(void * o)                          req = cdap_sent_get_by_iid(instance, msg->invoke_id);                          if (req == NULL) {                                  cdap__free_unpacked(msg, NULL); +                                set_proc(instance, false);                                  continue;                          } @@ -379,6 +388,7 @@ static void * sdu_reader(void * o)                                  data.data = malloc(data.len);                                  if (data.data == NULL) {                                          cdap__free_unpacked(msg, NULL); +                                        set_proc(instance, false);                                          continue;                                  }                                  memcpy(data.data, msg->value.data, data.len); @@ -640,8 +650,10 @@ cdap_key_t * cdap_request_send(struct cdap *    instance,          pthread_rwlock_rdlock(&instance->flows_lock);          keys = malloc(sizeof(*keys) * (instance->n_flows + 1)); -        if (keys == NULL) +        if (keys == NULL) { +                pthread_rwlock_unlock(&instance->flows_lock);                  return NULL; +        }          memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1)); @@ -677,35 +689,35 @@ cdap_key_t * cdap_request_send(struct cdap *    instance,                  *key = next_id(instance);                  if (*key == INVALID_ID) { -                        pthread_rwlock_unlock(&instance->flows_lock);                          release_id(instance, iid); +                        pthread_rwlock_unlock(&instance->flows_lock);                          return keys;                  }                  req = cdap_sent_add(instance, e->fd, iid, *key);                  if (req == NULL) { -                        pthread_rwlock_unlock(&instance->flows_lock);                          release_id(instance, *key);                          release_id(instance, iid); +                        pthread_rwlock_unlock(&instance->flows_lock);                          *key = INVALID_CDAP_KEY;                          return keys;                  }                  ret = write_msg(e->fd, &msg);                  if (ret == -ENOMEM) { -                        pthread_rwlock_unlock(&instance->flows_lock);                          cdap_sent_del(instance, req);                          release_id(instance, *key);                          release_id(instance, iid); +                        pthread_rwlock_unlock(&instance->flows_lock);                          *key = INVALID_CDAP_KEY;                          return keys;                  }                  if (ret < 0) { -                        pthread_rwlock_unlock(&instance->flows_lock);                          cdap_sent_del(instance, req);                          release_id(instance, *key);                          release_id(instance, iid); +                        pthread_rwlock_unlock(&instance->flows_lock);                          *key = INVALID_CDAP_KEY;                          return keys;                  } | 
