diff options
-rw-r--r-- | src/lib/cdap.c | 166 | ||||
-rw-r--r-- | src/lib/cdap_req.c | 6 | ||||
-rw-r--r-- | src/lib/cdap_req.h | 8 |
3 files changed, 114 insertions, 66 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c index a60bf8f9..f5e0593d 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -39,12 +39,12 @@ #include "cdap.pb-c.h" typedef Cdap cdap_t; -typedef int32_t invoke_id_t; +typedef cdap_key_t invoke_id_t; #define CDAP_REPLY (CDAP_DELETE + 1) -#define INVALID_INVOKE_ID -1 -#define IDS_SIZE 256 +#define INVALID_ID -1 +#define IDS_SIZE 2048 #define BUF_SIZE 2048 struct fd_el { @@ -78,8 +78,10 @@ struct cdap_rcvd { struct list_head next; int fd; + bool proc; invoke_id_t iid; + cdap_key_t key; enum cdap_opcode opcode; char * name; @@ -88,7 +90,7 @@ struct cdap_rcvd { uint32_t flags; }; -static int next_invoke_id(struct cdap * instance) +static int next_id(struct cdap * instance) { int ret; @@ -98,15 +100,15 @@ static int next_invoke_id(struct cdap * instance) ret = bmp_allocate(instance->ids); if (!bmp_is_id_valid(instance->ids, ret)) - ret = INVALID_INVOKE_ID; + ret = INVALID_ID; pthread_mutex_unlock(&instance->ids_lock); return ret; } -static int release_invoke_id(struct cdap * instance, - int id) +static int release_id(struct cdap * instance, + int32_t id) { int ret; @@ -130,7 +132,6 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, struct cdap_req * req = NULL; assert(instance); - assert(key >= 0); pthread_rwlock_rdlock(&instance->sent_lock); @@ -147,21 +148,43 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, return NULL; } -static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance, - invoke_id_t iid) +static struct cdap_req * cdap_sent_get_by_iid(struct cdap * instance, + invoke_id_t iid) +{ + struct list_head * p = NULL; + struct cdap_req * req = NULL; + + assert(instance); + + pthread_rwlock_rdlock(&instance->sent_lock); + + list_for_each(p, &instance->sent) { + req = list_entry(p, struct cdap_req, next); + if (req->iid == iid) { + pthread_rwlock_unlock(&instance->sent_lock); + return req; + } + } + + pthread_rwlock_unlock(&instance->sent_lock); + + return NULL; +} + +static struct cdap_rcvd * cdap_rcvd_get_by_key(struct cdap * instance, + cdap_key_t key) { struct list_head * p = NULL; struct list_head * h = NULL; struct cdap_rcvd * rcvd = NULL; assert(instance); - assert(iid >= 0); pthread_mutex_lock(&instance->rcvd_lock); list_for_each_safe(p, h, &instance->rcvd) { rcvd = list_entry(p, struct cdap_rcvd, next); - if (rcvd->iid == iid) { + if (rcvd->key == key) { pthread_mutex_unlock(&instance->rcvd_lock); list_del(&rcvd->next); return rcvd; @@ -170,20 +193,22 @@ static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance, pthread_mutex_unlock(&instance->rcvd_lock); + assert(false); + return NULL; } static struct cdap_req * cdap_sent_add(struct cdap * instance, int fd, + invoke_id_t iid, cdap_key_t key) { struct cdap_req * req; assert(instance); - assert(key >= 0); assert(!cdap_sent_has_key(instance, key)); - req = cdap_req_create(fd, key); + req = cdap_req_create(fd, iid, key); if (req == NULL) return NULL; @@ -285,11 +310,20 @@ static void * sdu_reader(void * o) assert(msg->name); rcvd->opcode = msg->opcode; + rcvd->fd = fd; rcvd->iid = msg->invoke_id; + rcvd->key = next_id(instance); + if (rcvd->key == INVALID_ID) { + cdap__free_unpacked(msg, NULL); + free(rcvd); + continue; + } + rcvd->flags = msg->flags; - rcvd->fd = fd; + rcvd->proc = false; rcvd->name = strdup(msg->name); if (rcvd->name == NULL) { + release_id(instance, rcvd->key); cdap__free_unpacked(msg, NULL); free(rcvd); continue; @@ -299,6 +333,7 @@ static void * sdu_reader(void * o) rcvd->len = msg->value.len; rcvd->data = malloc(rcvd->len); if (rcvd->data == NULL) { + release_id(instance, rcvd->key); cdap__free_unpacked(msg, NULL); free(rcvd->name); free(rcvd); @@ -317,7 +352,7 @@ static void * sdu_reader(void * o) pthread_cond_signal(&instance->rcvd_cond); pthread_mutex_unlock(&instance->rcvd_lock); } else { - req = cdap_sent_get_by_key(instance, msg->invoke_id); + req = cdap_sent_get_by_iid(instance, msg->invoke_id); if (req == NULL) { cdap__free_unpacked(msg, NULL); continue; @@ -569,22 +604,6 @@ static int write_msg(int fd, return 0; } -static cdap_key_t invoke_id_to_key(invoke_id_t iid) -{ - if (iid == INVALID_INVOKE_ID) - return INVALID_CDAP_KEY; - - return (cdap_key_t) iid; -} - -static invoke_id_t key_to_invoke_id(cdap_key_t key) -{ - if (key == INVALID_CDAP_KEY) - return INVALID_INVOKE_ID; - - return (invoke_id_t) key; -} - cdap_key_t * cdap_request_send(struct cdap * instance, enum cdap_opcode code, const char * name, @@ -629,22 +648,28 @@ cdap_key_t * cdap_request_send(struct cdap * instance, invoke_id_t iid; struct fd_el * e; - iid = next_invoke_id(instance); - if (iid == INVALID_INVOKE_ID) { + iid = next_id(instance); + if (iid == INVALID_ID) { pthread_rwlock_unlock(&instance->flows_lock); return keys; } msg.invoke_id = iid; - *key = invoke_id_to_key(iid); - e = list_entry(p, struct fd_el, next); - req = cdap_sent_add(instance, e->fd, *key); + *key = next_id(instance); + if (*key == INVALID_ID) { + pthread_rwlock_unlock(&instance->flows_lock); + release_id(instance, iid); + return keys; + } + + req = cdap_sent_add(instance, e->fd, iid, *key); if (req == NULL) { pthread_rwlock_unlock(&instance->flows_lock); - release_invoke_id(instance, iid); + release_id(instance, *key); + release_id(instance, iid); return keys; } @@ -652,13 +677,15 @@ cdap_key_t * cdap_request_send(struct cdap * instance, if (ret == -ENOMEM) { pthread_rwlock_unlock(&instance->flows_lock); cdap_sent_del(instance, req); - release_invoke_id(instance, iid); + release_id(instance, *key); + release_id(instance, iid); return keys; } if (ret < 0) { cdap_sent_del(instance, req); - release_invoke_id(instance, iid); + release_id(instance, *key); + release_id(instance, iid); } ++key; @@ -676,10 +703,8 @@ int cdap_reply_wait(struct cdap * instance, { int ret; struct cdap_req * r; - invoke_id_t iid = key_to_invoke_id(key); - if (instance == NULL || iid == INVALID_INVOKE_ID - || (data != NULL && len == NULL)) + if (instance == NULL || (data != NULL && len == NULL)) return -EINVAL; r = cdap_sent_get_by_key(instance, key); @@ -689,7 +714,7 @@ int cdap_reply_wait(struct cdap * instance, ret = cdap_req_wait(r); if (ret < 0) { cdap_sent_del(instance, r); - release_invoke_id(instance, iid); + release_id(instance, r->iid); return ret; } @@ -703,7 +728,7 @@ int cdap_reply_wait(struct cdap * instance, ret = r->response; cdap_sent_del(instance, r); - release_invoke_id(instance, iid); + release_id(instance, r->iid); return ret; } @@ -715,7 +740,7 @@ cdap_key_t cdap_request_wait(struct cdap * instance, size_t * len, uint32_t * flags) { - struct cdap_rcvd * rcvd; + struct cdap_rcvd * rcv = NULL; if (instance == NULL || opcode == NULL || name == NULL || data == NULL || len == NULL || flags == NULL) @@ -726,23 +751,35 @@ cdap_key_t cdap_request_wait(struct cdap * instance, pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) &instance->rcvd_lock); - while (list_is_empty(&instance->rcvd)) - pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock); + while (rcv == NULL) { + while (list_is_empty(&instance->rcvd)) + pthread_cond_wait(&instance->rcvd_cond, + &instance->rcvd_lock); - rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); + rcv = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); + if (rcv->proc) { + rcv = NULL; + pthread_cond_wait(&instance->rcvd_cond, + &instance->rcvd_lock); + } + } + + rcv->proc = true; + list_del(&rcv->next); + list_add_tail(&rcv->next, &instance->rcvd); pthread_cleanup_pop(true); - *opcode = rcvd->opcode; - *name = rcvd->name; - *data = rcvd->data; - *len = rcvd->len; - *flags = rcvd->flags; + *opcode = rcv->opcode; + *name = rcv->name; + *data = rcv->data; + *len = rcv->len; + *flags = rcv->flags; - rcvd->name = NULL; - rcvd->data = NULL; + rcv->name = NULL; + rcv->data = NULL; - return invoke_id_to_key(rcvd->iid); + return rcv->key; } int cdap_reply_send(struct cdap * instance, @@ -753,16 +790,17 @@ int cdap_reply_send(struct cdap * instance, { int fd; cdap_t msg = CDAP__INIT; - invoke_id_t iid = key_to_invoke_id(key); - struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid); - if (rcvd == NULL) - return -EINVAL; + struct cdap_rcvd * rcvd; if (instance == NULL) return -EINVAL; + rcvd = cdap_rcvd_get_by_key(instance, key); + if (rcvd == NULL) + return -1; + msg.opcode = CDAP_REPLY; - msg.invoke_id = iid; + msg.invoke_id = rcvd->iid; msg.has_result = true; msg.result = result; @@ -774,7 +812,11 @@ int cdap_reply_send(struct cdap * instance, fd = rcvd->fd; + release_id(instance, rcvd->key); + + assert(rcvd->data == NULL); assert(rcvd->name == NULL); + assert(rcvd->proc); free(rcvd); diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c index b60e73ad..0b77f266 100644 --- a/src/lib/cdap_req.c +++ b/src/lib/cdap_req.c @@ -30,8 +30,9 @@ #include <stdlib.h> #include <assert.h> -struct cdap_req * cdap_req_create(int fd, - cdap_key_t key) +struct cdap_req * cdap_req_create(int fd, + invoke_id_t iid, + cdap_key_t key) { struct cdap_req * creq = malloc(sizeof(*creq)); pthread_condattr_t cattr; @@ -40,6 +41,7 @@ struct cdap_req * cdap_req_create(int fd, return NULL; creq->fd = fd; + creq->iid = iid; creq->key = key; creq->state = REQ_INIT; creq->response = -1; diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h index fe8e3613..592d26a0 100644 --- a/src/lib/cdap_req.h +++ b/src/lib/cdap_req.h @@ -31,6 +31,8 @@ #include <pthread.h> +typedef cdap_key_t invoke_id_t; + enum creq_state { REQ_NULL = 0, REQ_INIT, @@ -46,6 +48,7 @@ struct cdap_req { int fd; struct timespec birth; cdap_key_t key; + invoke_id_t iid; int response; buffer_t data; @@ -55,8 +58,9 @@ struct cdap_req { pthread_mutex_t lock; }; -struct cdap_req * cdap_req_create(int fd, - cdap_key_t key); +struct cdap_req * cdap_req_create(int fd, + cdap_key_t key, + invoke_id_t iid); void cdap_req_destroy(struct cdap_req * creq); |