summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-03-29 02:12:40 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-03-29 02:12:40 +0200
commitb09a2719a7820cef58a251ccb2ce286754114a0a (patch)
treee37cdffce783b846311df03d34df994a946f8597
parentb8ad2438385f7c60630d38524ad0c4c13b53b80a (diff)
downloadouroboros-b09a2719a7820cef58a251ccb2ce286754114a0a.tar.gz
ouroboros-b09a2719a7820cef58a251ccb2ce286754114a0a.zip
lib: Revise handling CDAP messages
Fixes a number of issues in CDAP. CDAP keeps track if a message is being processed, and moves it to the end of the request list if it is. It will now correctly wait for new messages. The invoke_ids are generated locally per CDAP instance, invoke_ids can't be used to track incoming requests, we need to keep track of the fd. The keys are now identifiers (taken from the same local pool as the invoke_ids) that are used to track requests.
-rw-r--r--src/lib/cdap.c166
-rw-r--r--src/lib/cdap_req.c6
-rw-r--r--src/lib/cdap_req.h8
3 files changed, 114 insertions, 66 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index a60bf8f9..f5e0593d 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -39,12 +39,12 @@
#include "cdap.pb-c.h"
typedef Cdap cdap_t;
-typedef int32_t invoke_id_t;
+typedef cdap_key_t invoke_id_t;
#define CDAP_REPLY (CDAP_DELETE + 1)
-#define INVALID_INVOKE_ID -1
-#define IDS_SIZE 256
+#define INVALID_ID -1
+#define IDS_SIZE 2048
#define BUF_SIZE 2048
struct fd_el {
@@ -78,8 +78,10 @@ struct cdap_rcvd {
struct list_head next;
int fd;
+ bool proc;
invoke_id_t iid;
+ cdap_key_t key;
enum cdap_opcode opcode;
char * name;
@@ -88,7 +90,7 @@ struct cdap_rcvd {
uint32_t flags;
};
-static int next_invoke_id(struct cdap * instance)
+static int next_id(struct cdap * instance)
{
int ret;
@@ -98,15 +100,15 @@ static int next_invoke_id(struct cdap * instance)
ret = bmp_allocate(instance->ids);
if (!bmp_is_id_valid(instance->ids, ret))
- ret = INVALID_INVOKE_ID;
+ ret = INVALID_ID;
pthread_mutex_unlock(&instance->ids_lock);
return ret;
}
-static int release_invoke_id(struct cdap * instance,
- int id)
+static int release_id(struct cdap * instance,
+ int32_t id)
{
int ret;
@@ -130,7 +132,6 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,
struct cdap_req * req = NULL;
assert(instance);
- assert(key >= 0);
pthread_rwlock_rdlock(&instance->sent_lock);
@@ -147,21 +148,43 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,
return NULL;
}
-static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance,
- invoke_id_t iid)
+static struct cdap_req * cdap_sent_get_by_iid(struct cdap * instance,
+ invoke_id_t iid)
+{
+ struct list_head * p = NULL;
+ struct cdap_req * req = NULL;
+
+ assert(instance);
+
+ pthread_rwlock_rdlock(&instance->sent_lock);
+
+ list_for_each(p, &instance->sent) {
+ req = list_entry(p, struct cdap_req, next);
+ if (req->iid == iid) {
+ pthread_rwlock_unlock(&instance->sent_lock);
+ return req;
+ }
+ }
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+
+ return NULL;
+}
+
+static struct cdap_rcvd * cdap_rcvd_get_by_key(struct cdap * instance,
+ cdap_key_t key)
{
struct list_head * p = NULL;
struct list_head * h = NULL;
struct cdap_rcvd * rcvd = NULL;
assert(instance);
- assert(iid >= 0);
pthread_mutex_lock(&instance->rcvd_lock);
list_for_each_safe(p, h, &instance->rcvd) {
rcvd = list_entry(p, struct cdap_rcvd, next);
- if (rcvd->iid == iid) {
+ if (rcvd->key == key) {
pthread_mutex_unlock(&instance->rcvd_lock);
list_del(&rcvd->next);
return rcvd;
@@ -170,20 +193,22 @@ static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance,
pthread_mutex_unlock(&instance->rcvd_lock);
+ assert(false);
+
return NULL;
}
static struct cdap_req * cdap_sent_add(struct cdap * instance,
int fd,
+ invoke_id_t iid,
cdap_key_t key)
{
struct cdap_req * req;
assert(instance);
- assert(key >= 0);
assert(!cdap_sent_has_key(instance, key));
- req = cdap_req_create(fd, key);
+ req = cdap_req_create(fd, iid, key);
if (req == NULL)
return NULL;
@@ -285,11 +310,20 @@ static void * sdu_reader(void * o)
assert(msg->name);
rcvd->opcode = msg->opcode;
+ rcvd->fd = fd;
rcvd->iid = msg->invoke_id;
+ rcvd->key = next_id(instance);
+ if (rcvd->key == INVALID_ID) {
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+
rcvd->flags = msg->flags;
- rcvd->fd = fd;
+ rcvd->proc = false;
rcvd->name = strdup(msg->name);
if (rcvd->name == NULL) {
+ release_id(instance, rcvd->key);
cdap__free_unpacked(msg, NULL);
free(rcvd);
continue;
@@ -299,6 +333,7 @@ static void * sdu_reader(void * o)
rcvd->len = msg->value.len;
rcvd->data = malloc(rcvd->len);
if (rcvd->data == NULL) {
+ release_id(instance, rcvd->key);
cdap__free_unpacked(msg, NULL);
free(rcvd->name);
free(rcvd);
@@ -317,7 +352,7 @@ static void * sdu_reader(void * o)
pthread_cond_signal(&instance->rcvd_cond);
pthread_mutex_unlock(&instance->rcvd_lock);
} else {
- req = cdap_sent_get_by_key(instance, msg->invoke_id);
+ req = cdap_sent_get_by_iid(instance, msg->invoke_id);
if (req == NULL) {
cdap__free_unpacked(msg, NULL);
continue;
@@ -569,22 +604,6 @@ static int write_msg(int fd,
return 0;
}
-static cdap_key_t invoke_id_to_key(invoke_id_t iid)
-{
- if (iid == INVALID_INVOKE_ID)
- return INVALID_CDAP_KEY;
-
- return (cdap_key_t) iid;
-}
-
-static invoke_id_t key_to_invoke_id(cdap_key_t key)
-{
- if (key == INVALID_CDAP_KEY)
- return INVALID_INVOKE_ID;
-
- return (invoke_id_t) key;
-}
-
cdap_key_t * cdap_request_send(struct cdap * instance,
enum cdap_opcode code,
const char * name,
@@ -629,22 +648,28 @@ cdap_key_t * cdap_request_send(struct cdap * instance,
invoke_id_t iid;
struct fd_el * e;
- iid = next_invoke_id(instance);
- if (iid == INVALID_INVOKE_ID) {
+ iid = next_id(instance);
+ if (iid == INVALID_ID) {
pthread_rwlock_unlock(&instance->flows_lock);
return keys;
}
msg.invoke_id = iid;
- *key = invoke_id_to_key(iid);
-
e = list_entry(p, struct fd_el, next);
- req = cdap_sent_add(instance, e->fd, *key);
+ *key = next_id(instance);
+ if (*key == INVALID_ID) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ release_id(instance, iid);
+ return keys;
+ }
+
+ req = cdap_sent_add(instance, e->fd, iid, *key);
if (req == NULL) {
pthread_rwlock_unlock(&instance->flows_lock);
- release_invoke_id(instance, iid);
+ release_id(instance, *key);
+ release_id(instance, iid);
return keys;
}
@@ -652,13 +677,15 @@ cdap_key_t * cdap_request_send(struct cdap * instance,
if (ret == -ENOMEM) {
pthread_rwlock_unlock(&instance->flows_lock);
cdap_sent_del(instance, req);
- release_invoke_id(instance, iid);
+ release_id(instance, *key);
+ release_id(instance, iid);
return keys;
}
if (ret < 0) {
cdap_sent_del(instance, req);
- release_invoke_id(instance, iid);
+ release_id(instance, *key);
+ release_id(instance, iid);
}
++key;
@@ -676,10 +703,8 @@ int cdap_reply_wait(struct cdap * instance,
{
int ret;
struct cdap_req * r;
- invoke_id_t iid = key_to_invoke_id(key);
- if (instance == NULL || iid == INVALID_INVOKE_ID
- || (data != NULL && len == NULL))
+ if (instance == NULL || (data != NULL && len == NULL))
return -EINVAL;
r = cdap_sent_get_by_key(instance, key);
@@ -689,7 +714,7 @@ int cdap_reply_wait(struct cdap * instance,
ret = cdap_req_wait(r);
if (ret < 0) {
cdap_sent_del(instance, r);
- release_invoke_id(instance, iid);
+ release_id(instance, r->iid);
return ret;
}
@@ -703,7 +728,7 @@ int cdap_reply_wait(struct cdap * instance,
ret = r->response;
cdap_sent_del(instance, r);
- release_invoke_id(instance, iid);
+ release_id(instance, r->iid);
return ret;
}
@@ -715,7 +740,7 @@ cdap_key_t cdap_request_wait(struct cdap * instance,
size_t * len,
uint32_t * flags)
{
- struct cdap_rcvd * rcvd;
+ struct cdap_rcvd * rcv = NULL;
if (instance == NULL || opcode == NULL || name == NULL || data == NULL
|| len == NULL || flags == NULL)
@@ -726,23 +751,35 @@ cdap_key_t cdap_request_wait(struct cdap * instance,
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) &instance->rcvd_lock);
- while (list_is_empty(&instance->rcvd))
- pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock);
+ while (rcv == NULL) {
+ while (list_is_empty(&instance->rcvd))
+ pthread_cond_wait(&instance->rcvd_cond,
+ &instance->rcvd_lock);
- rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
+ rcv = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
+ if (rcv->proc) {
+ rcv = NULL;
+ pthread_cond_wait(&instance->rcvd_cond,
+ &instance->rcvd_lock);
+ }
+ }
+
+ rcv->proc = true;
+ list_del(&rcv->next);
+ list_add_tail(&rcv->next, &instance->rcvd);
pthread_cleanup_pop(true);
- *opcode = rcvd->opcode;
- *name = rcvd->name;
- *data = rcvd->data;
- *len = rcvd->len;
- *flags = rcvd->flags;
+ *opcode = rcv->opcode;
+ *name = rcv->name;
+ *data = rcv->data;
+ *len = rcv->len;
+ *flags = rcv->flags;
- rcvd->name = NULL;
- rcvd->data = NULL;
+ rcv->name = NULL;
+ rcv->data = NULL;
- return invoke_id_to_key(rcvd->iid);
+ return rcv->key;
}
int cdap_reply_send(struct cdap * instance,
@@ -753,16 +790,17 @@ int cdap_reply_send(struct cdap * instance,
{
int fd;
cdap_t msg = CDAP__INIT;
- invoke_id_t iid = key_to_invoke_id(key);
- struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid);
- if (rcvd == NULL)
- return -EINVAL;
+ struct cdap_rcvd * rcvd;
if (instance == NULL)
return -EINVAL;
+ rcvd = cdap_rcvd_get_by_key(instance, key);
+ if (rcvd == NULL)
+ return -1;
+
msg.opcode = CDAP_REPLY;
- msg.invoke_id = iid;
+ msg.invoke_id = rcvd->iid;
msg.has_result = true;
msg.result = result;
@@ -774,7 +812,11 @@ int cdap_reply_send(struct cdap * instance,
fd = rcvd->fd;
+ release_id(instance, rcvd->key);
+
+ assert(rcvd->data == NULL);
assert(rcvd->name == NULL);
+ assert(rcvd->proc);
free(rcvd);
diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c
index b60e73ad..0b77f266 100644
--- a/src/lib/cdap_req.c
+++ b/src/lib/cdap_req.c
@@ -30,8 +30,9 @@
#include <stdlib.h>
#include <assert.h>
-struct cdap_req * cdap_req_create(int fd,
- cdap_key_t key)
+struct cdap_req * cdap_req_create(int fd,
+ invoke_id_t iid,
+ cdap_key_t key)
{
struct cdap_req * creq = malloc(sizeof(*creq));
pthread_condattr_t cattr;
@@ -40,6 +41,7 @@ struct cdap_req * cdap_req_create(int fd,
return NULL;
creq->fd = fd;
+ creq->iid = iid;
creq->key = key;
creq->state = REQ_INIT;
creq->response = -1;
diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h
index fe8e3613..592d26a0 100644
--- a/src/lib/cdap_req.h
+++ b/src/lib/cdap_req.h
@@ -31,6 +31,8 @@
#include <pthread.h>
+typedef cdap_key_t invoke_id_t;
+
enum creq_state {
REQ_NULL = 0,
REQ_INIT,
@@ -46,6 +48,7 @@ struct cdap_req {
int fd;
struct timespec birth;
cdap_key_t key;
+ invoke_id_t iid;
int response;
buffer_t data;
@@ -55,8 +58,9 @@ struct cdap_req {
pthread_mutex_t lock;
};
-struct cdap_req * cdap_req_create(int fd,
- cdap_key_t key);
+struct cdap_req * cdap_req_create(int fd,
+ cdap_key_t key,
+ invoke_id_t iid);
void cdap_req_destroy(struct cdap_req * creq);