diff options
| -rw-r--r-- | src/lib/cdap.c | 166 | ||||
| -rw-r--r-- | src/lib/cdap_req.c | 6 | ||||
| -rw-r--r-- | src/lib/cdap_req.h | 8 | 
3 files changed, 114 insertions, 66 deletions
| diff --git a/src/lib/cdap.c b/src/lib/cdap.c index a60bf8f9..f5e0593d 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -39,12 +39,12 @@  #include "cdap.pb-c.h"  typedef Cdap cdap_t; -typedef int32_t invoke_id_t; +typedef cdap_key_t invoke_id_t;  #define CDAP_REPLY (CDAP_DELETE + 1) -#define INVALID_INVOKE_ID -1 -#define IDS_SIZE 256 +#define INVALID_ID -1 +#define IDS_SIZE 2048  #define BUF_SIZE 2048  struct fd_el { @@ -78,8 +78,10 @@ struct cdap_rcvd {          struct list_head next;          int              fd; +        bool             proc;          invoke_id_t      iid; +        cdap_key_t       key;          enum cdap_opcode opcode;          char *           name; @@ -88,7 +90,7 @@ struct cdap_rcvd {          uint32_t         flags;  }; -static int next_invoke_id(struct cdap * instance) +static int next_id(struct cdap * instance)  {          int ret; @@ -98,15 +100,15 @@ static int next_invoke_id(struct cdap * instance)          ret = bmp_allocate(instance->ids);          if (!bmp_is_id_valid(instance->ids, ret)) -                ret = INVALID_INVOKE_ID; +                ret = INVALID_ID;          pthread_mutex_unlock(&instance->ids_lock);          return ret;  } -static int release_invoke_id(struct cdap * instance, -                             int           id) +static int release_id(struct cdap * instance, +                      int32_t       id)  {          int ret; @@ -130,7 +132,6 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,          struct cdap_req *  req = NULL;          assert(instance); -        assert(key >= 0);          pthread_rwlock_rdlock(&instance->sent_lock); @@ -147,21 +148,43 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,          return NULL;  } -static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance, -                                               invoke_id_t   iid) +static struct cdap_req * cdap_sent_get_by_iid(struct cdap * instance, +                                              invoke_id_t   iid) +{ +        struct list_head * p = NULL; +        struct cdap_req *  req = NULL; + +        assert(instance); + +        pthread_rwlock_rdlock(&instance->sent_lock); + +        list_for_each(p, &instance->sent) { +                req = list_entry(p, struct cdap_req, next); +                if (req->iid == iid) { +                        pthread_rwlock_unlock(&instance->sent_lock); +                        return req; +                } +        } + +        pthread_rwlock_unlock(&instance->sent_lock); + +        return NULL; +} + +static struct cdap_rcvd * cdap_rcvd_get_by_key(struct cdap * instance, +                                               cdap_key_t    key)  {          struct list_head * p = NULL;          struct list_head * h = NULL;          struct cdap_rcvd * rcvd = NULL;          assert(instance); -        assert(iid >= 0);          pthread_mutex_lock(&instance->rcvd_lock);          list_for_each_safe(p, h, &instance->rcvd) {                  rcvd = list_entry(p, struct cdap_rcvd, next); -                if (rcvd->iid == iid) { +                if (rcvd->key == key) {                          pthread_mutex_unlock(&instance->rcvd_lock);                          list_del(&rcvd->next);                          return rcvd; @@ -170,20 +193,22 @@ static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance,          pthread_mutex_unlock(&instance->rcvd_lock); +        assert(false); +          return NULL;  }  static struct cdap_req * cdap_sent_add(struct cdap * instance,                                         int           fd, +                                       invoke_id_t   iid,                                         cdap_key_t    key)  {          struct cdap_req * req;          assert(instance); -        assert(key >= 0);          assert(!cdap_sent_has_key(instance, key)); -        req = cdap_req_create(fd, key); +        req = cdap_req_create(fd, iid, key);          if (req == NULL)                  return NULL; @@ -285,11 +310,20 @@ static void * sdu_reader(void * o)                          assert(msg->name);                          rcvd->opcode = msg->opcode; +                        rcvd->fd     = fd;                          rcvd->iid    = msg->invoke_id; +                        rcvd->key    = next_id(instance); +                        if (rcvd->key == INVALID_ID) { +                                cdap__free_unpacked(msg, NULL); +                                free(rcvd); +                                continue; +                        } +                          rcvd->flags  = msg->flags; -                        rcvd->fd     = fd; +                        rcvd->proc   = false;                          rcvd->name   = strdup(msg->name);                          if (rcvd->name == NULL) { +                                release_id(instance, rcvd->key);                                  cdap__free_unpacked(msg, NULL);                                  free(rcvd);                                  continue; @@ -299,6 +333,7 @@ static void * sdu_reader(void * o)                                  rcvd->len = msg->value.len;                                  rcvd->data = malloc(rcvd->len);                                  if (rcvd->data == NULL) { +                                        release_id(instance, rcvd->key);                                          cdap__free_unpacked(msg, NULL);                                          free(rcvd->name);                                          free(rcvd); @@ -317,7 +352,7 @@ static void * sdu_reader(void * o)                          pthread_cond_signal(&instance->rcvd_cond);                          pthread_mutex_unlock(&instance->rcvd_lock);                  } else  { -                        req = cdap_sent_get_by_key(instance, msg->invoke_id); +                        req = cdap_sent_get_by_iid(instance, msg->invoke_id);                          if (req == NULL) {                                  cdap__free_unpacked(msg, NULL);                                  continue; @@ -569,22 +604,6 @@ static int write_msg(int           fd,          return 0;  } -static cdap_key_t invoke_id_to_key(invoke_id_t iid) -{ -        if (iid == INVALID_INVOKE_ID) -                return INVALID_CDAP_KEY; - -        return (cdap_key_t) iid; -} - -static invoke_id_t key_to_invoke_id(cdap_key_t key) -{ -        if (key == INVALID_CDAP_KEY) -                return INVALID_INVOKE_ID; - -        return (invoke_id_t) key; -} -  cdap_key_t * cdap_request_send(struct cdap *    instance,                                 enum cdap_opcode code,                                 const char *     name, @@ -629,22 +648,28 @@ cdap_key_t * cdap_request_send(struct cdap *    instance,                  invoke_id_t iid;                  struct fd_el * e; -                iid = next_invoke_id(instance); -                if (iid == INVALID_INVOKE_ID) { +                iid = next_id(instance); +                if (iid == INVALID_ID) {                          pthread_rwlock_unlock(&instance->flows_lock);                          return keys;                  }                  msg.invoke_id = iid; -                *key = invoke_id_to_key(iid); -                  e = list_entry(p, struct fd_el, next); -                req = cdap_sent_add(instance, e->fd, *key); +                *key = next_id(instance); +                if (*key == INVALID_ID) { +                        pthread_rwlock_unlock(&instance->flows_lock); +                        release_id(instance, iid); +                        return keys; +                } + +                req = cdap_sent_add(instance, e->fd, iid, *key);                  if (req == NULL) {                          pthread_rwlock_unlock(&instance->flows_lock); -                        release_invoke_id(instance, iid); +                        release_id(instance, *key); +                        release_id(instance, iid);                          return keys;                  } @@ -652,13 +677,15 @@ cdap_key_t * cdap_request_send(struct cdap *    instance,                  if (ret == -ENOMEM) {                          pthread_rwlock_unlock(&instance->flows_lock);                          cdap_sent_del(instance, req); -                        release_invoke_id(instance, iid); +                        release_id(instance, *key); +                        release_id(instance, iid);                          return keys;                  }                  if (ret < 0) {                          cdap_sent_del(instance, req); -                        release_invoke_id(instance, iid); +                        release_id(instance, *key); +                        release_id(instance, iid);                  }                  ++key; @@ -676,10 +703,8 @@ int cdap_reply_wait(struct cdap * instance,  {          int ret;          struct cdap_req * r; -        invoke_id_t iid = key_to_invoke_id(key); -        if (instance == NULL || iid == INVALID_INVOKE_ID -            || (data != NULL && len == NULL)) +        if (instance == NULL || (data != NULL && len == NULL))                  return -EINVAL;          r = cdap_sent_get_by_key(instance, key); @@ -689,7 +714,7 @@ int cdap_reply_wait(struct cdap * instance,          ret = cdap_req_wait(r);          if (ret < 0) {                  cdap_sent_del(instance, r); -                release_invoke_id(instance, iid); +                release_id(instance, r->iid);                  return ret;          } @@ -703,7 +728,7 @@ int cdap_reply_wait(struct cdap * instance,          ret = r->response;          cdap_sent_del(instance, r); -        release_invoke_id(instance, iid); +        release_id(instance, r->iid);          return ret;  } @@ -715,7 +740,7 @@ cdap_key_t cdap_request_wait(struct cdap *      instance,                               size_t *           len,                               uint32_t *         flags)  { -        struct cdap_rcvd * rcvd; +        struct cdap_rcvd * rcv = NULL;          if (instance == NULL || opcode == NULL || name == NULL || data == NULL              || len == NULL || flags == NULL) @@ -726,23 +751,35 @@ cdap_key_t cdap_request_wait(struct cdap *      instance,          pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,                               (void *) &instance->rcvd_lock); -        while (list_is_empty(&instance->rcvd)) -                pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock); +        while (rcv == NULL) { +                while (list_is_empty(&instance->rcvd)) +                        pthread_cond_wait(&instance->rcvd_cond, +                                          &instance->rcvd_lock); -        rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); +                rcv = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); +                if (rcv->proc) { +                        rcv = NULL; +                        pthread_cond_wait(&instance->rcvd_cond, +                                          &instance->rcvd_lock); +                } +        } + +        rcv->proc = true; +        list_del(&rcv->next); +        list_add_tail(&rcv->next, &instance->rcvd);          pthread_cleanup_pop(true); -        *opcode = rcvd->opcode; -        *name   = rcvd->name; -        *data   = rcvd->data; -        *len    = rcvd->len; -        *flags  = rcvd->flags; +        *opcode = rcv->opcode; +        *name   = rcv->name; +        *data   = rcv->data; +        *len    = rcv->len; +        *flags  = rcv->flags; -        rcvd->name = NULL; -        rcvd->data = NULL; +        rcv->name = NULL; +        rcv->data = NULL; -        return invoke_id_to_key(rcvd->iid); +        return rcv->key;  }  int cdap_reply_send(struct cdap * instance, @@ -753,16 +790,17 @@ int cdap_reply_send(struct cdap * instance,  {          int                fd;          cdap_t             msg  = CDAP__INIT; -        invoke_id_t        iid  = key_to_invoke_id(key); -        struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid); -        if (rcvd == NULL) -                return -EINVAL; +        struct cdap_rcvd * rcvd;          if (instance == NULL)                  return -EINVAL; +        rcvd = cdap_rcvd_get_by_key(instance, key); +        if (rcvd == NULL) +                return -1; +          msg.opcode = CDAP_REPLY; -        msg.invoke_id = iid; +        msg.invoke_id = rcvd->iid;          msg.has_result = true;          msg.result = result; @@ -774,7 +812,11 @@ int cdap_reply_send(struct cdap * instance,          fd = rcvd->fd; +        release_id(instance, rcvd->key); + +        assert(rcvd->data == NULL);          assert(rcvd->name == NULL); +        assert(rcvd->proc);          free(rcvd); diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c index b60e73ad..0b77f266 100644 --- a/src/lib/cdap_req.c +++ b/src/lib/cdap_req.c @@ -30,8 +30,9 @@  #include <stdlib.h>  #include <assert.h> -struct cdap_req * cdap_req_create(int        fd, -                                  cdap_key_t key) +struct cdap_req * cdap_req_create(int         fd, +                                  invoke_id_t iid, +                                  cdap_key_t  key)  {          struct cdap_req * creq = malloc(sizeof(*creq));          pthread_condattr_t cattr; @@ -40,6 +41,7 @@ struct cdap_req * cdap_req_create(int        fd,                  return NULL;          creq->fd        = fd; +        creq->iid       = iid;          creq->key       = key;          creq->state     = REQ_INIT;          creq->response  = -1; diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h index fe8e3613..592d26a0 100644 --- a/src/lib/cdap_req.h +++ b/src/lib/cdap_req.h @@ -31,6 +31,8 @@  #include <pthread.h> +typedef cdap_key_t invoke_id_t; +  enum creq_state {          REQ_NULL = 0,          REQ_INIT, @@ -46,6 +48,7 @@ struct cdap_req {          int              fd;          struct timespec  birth;          cdap_key_t       key; +        invoke_id_t      iid;          int              response;          buffer_t         data; @@ -55,8 +58,9 @@ struct cdap_req {          pthread_mutex_t  lock;  }; -struct cdap_req * cdap_req_create(int        fd, -                                  cdap_key_t key); +struct cdap_req * cdap_req_create(int         fd, +                                  cdap_key_t  key, +                                  invoke_id_t iid);  void              cdap_req_destroy(struct cdap_req * creq); | 
