summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/cdap.c65
1 files changed, 51 insertions, 14 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 32169c1b..0d1568b2 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -76,6 +76,8 @@ struct cdap {
struct cdap_rcvd {
struct list_head next;
+ int fd;
+
invoke_id_t iid;
enum cdap_opcode opcode;
@@ -144,6 +146,32 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,
return NULL;
}
+static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance,
+ invoke_id_t iid)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+ struct cdap_rcvd * rcvd = NULL;
+
+ assert(instance);
+ assert(iid >= 0);
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ list_for_each_safe(p, h, &instance->rcvd) {
+ rcvd = list_entry(p, struct cdap_rcvd, next);
+ if (rcvd->iid == iid) {
+ pthread_mutex_unlock(&instance->rcvd_lock);
+ list_del(&rcvd->next);
+ return rcvd;
+ }
+ }
+
+ pthread_mutex_unlock(&instance->rcvd_lock);
+
+ return NULL;
+}
+
static struct cdap_req * cdap_sent_add(struct cdap * instance,
int fd,
cdap_key_t key)
@@ -221,6 +249,8 @@ static void cdap_rcvd_destroy(struct cdap * instance)
free(r);
}
+ pthread_cond_broadcast(&instance->rcvd_cond);
+
pthread_mutex_unlock(&instance->rcvd_lock);
}
@@ -240,7 +270,8 @@ static void * sdu_reader(void * o)
return (void *) -1;
while (flow_event_wait(instance->set, fq, NULL)) {
- len = flow_read(fqueue_next(fq), buf, BUF_SIZE);
+ int fd = fqueue_next(fq);
+ len = flow_read(fd, buf, BUF_SIZE);
if (len < 0)
continue;
@@ -255,9 +286,12 @@ static void * sdu_reader(void * o)
continue;
}
+ assert(msg->name);
+
rcvd->opcode = msg->opcode;
rcvd->iid = msg->invoke_id;
rcvd->flags = msg->flags;
+ rcvd->fd = fd;
rcvd->name = strdup(msg->name);
if (rcvd->name == NULL) {
cdap__free_unpacked(msg, NULL);
@@ -561,6 +595,8 @@ cdap_key_t * cdap_request_send(struct cdap * instance,
key = keys;
+ cdap__init(&msg);
+
msg.opcode = code;
msg.name = (char *) name;
msg.has_flags = true;
@@ -576,7 +612,6 @@ cdap_key_t * cdap_request_send(struct cdap * instance,
struct cdap_req * req;
invoke_id_t iid;
struct fd_el * e;
- cdap__init(&msg);
iid = next_invoke_id(instance);
if (iid == INVALID_INVOKE_ID) {
@@ -690,7 +725,6 @@ cdap_key_t cdap_request_wait(struct cdap * instance,
uint32_t * flags)
{
struct cdap_rcvd * rcvd;
- invoke_id_t iid;
if (instance == NULL || opcode == NULL || name == NULL || data == NULL
|| len == NULL || flags == NULL)
@@ -706,8 +740,6 @@ cdap_key_t cdap_request_wait(struct cdap * instance,
rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
- list_del(&rcvd->next);
-
pthread_cleanup_pop(true);
*opcode = rcvd->opcode;
@@ -716,11 +748,9 @@ cdap_key_t cdap_request_wait(struct cdap * instance,
*len = rcvd->len;
*flags = rcvd->flags;
- iid = rcvd->iid;
+ rcvd->name = NULL;
- free(rcvd);
-
- return invoke_id_to_key(iid);
+ return invoke_id_to_key(rcvd->iid);
}
int cdap_reply_send(struct cdap * instance,
@@ -729,10 +759,11 @@ int cdap_reply_send(struct cdap * instance,
const void * data,
size_t len)
{
- cdap_t msg = CDAP__INIT;
- invoke_id_t iid = key_to_invoke_id(key);
- struct cdap_req * req = cdap_sent_get_by_key(instance, key);
- if (req == NULL)
+ int fd;
+ cdap_t msg = CDAP__INIT;
+ invoke_id_t iid = key_to_invoke_id(key);
+ struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid);
+ if (rcvd == NULL)
return -EINVAL;
if (instance == NULL)
@@ -749,5 +780,11 @@ int cdap_reply_send(struct cdap * instance,
msg.value.len = len;
}
- return write_msg(req->fd, &msg);
+ fd = rcvd->fd;
+
+ assert(rcvd->name == NULL);
+
+ free(rcvd);
+
+ return write_msg(fd, &msg);
}