From f18293b1458ebfb6105812868c15ba9be5661ba2 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 14 Mar 2017 19:34:18 +0100 Subject: lib: Fix CDAP with multiple fds --- src/lib/cdap.c | 65 +++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 14 deletions(-) (limited to 'src/lib/cdap.c') diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 32169c1b..0d1568b2 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -76,6 +76,8 @@ struct cdap { struct cdap_rcvd { struct list_head next; + int fd; + invoke_id_t iid; enum cdap_opcode opcode; @@ -144,6 +146,32 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, return NULL; } +static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance, + invoke_id_t iid) +{ + struct list_head * p = NULL; + struct list_head * h = NULL; + struct cdap_rcvd * rcvd = NULL; + + assert(instance); + assert(iid >= 0); + + pthread_mutex_lock(&instance->rcvd_lock); + + list_for_each_safe(p, h, &instance->rcvd) { + rcvd = list_entry(p, struct cdap_rcvd, next); + if (rcvd->iid == iid) { + pthread_mutex_unlock(&instance->rcvd_lock); + list_del(&rcvd->next); + return rcvd; + } + } + + pthread_mutex_unlock(&instance->rcvd_lock); + + return NULL; +} + static struct cdap_req * cdap_sent_add(struct cdap * instance, int fd, cdap_key_t key) @@ -221,6 +249,8 @@ static void cdap_rcvd_destroy(struct cdap * instance) free(r); } + pthread_cond_broadcast(&instance->rcvd_cond); + pthread_mutex_unlock(&instance->rcvd_lock); } @@ -240,7 +270,8 @@ static void * sdu_reader(void * o) return (void *) -1; while (flow_event_wait(instance->set, fq, NULL)) { - len = flow_read(fqueue_next(fq), buf, BUF_SIZE); + int fd = fqueue_next(fq); + len = flow_read(fd, buf, BUF_SIZE); if (len < 0) continue; @@ -255,9 +286,12 @@ static void * sdu_reader(void * o) continue; } + assert(msg->name); + rcvd->opcode = msg->opcode; rcvd->iid = msg->invoke_id; rcvd->flags = msg->flags; + rcvd->fd = fd; rcvd->name = strdup(msg->name); if (rcvd->name == NULL) { cdap__free_unpacked(msg, NULL); @@ -561,6 +595,8 @@ cdap_key_t * cdap_request_send(struct cdap * instance, key = keys; + cdap__init(&msg); + msg.opcode = code; msg.name = (char *) name; msg.has_flags = true; @@ -576,7 +612,6 @@ cdap_key_t * cdap_request_send(struct cdap * instance, struct cdap_req * req; invoke_id_t iid; struct fd_el * e; - cdap__init(&msg); iid = next_invoke_id(instance); if (iid == INVALID_INVOKE_ID) { @@ -690,7 +725,6 @@ cdap_key_t cdap_request_wait(struct cdap * instance, uint32_t * flags) { struct cdap_rcvd * rcvd; - invoke_id_t iid; if (instance == NULL || opcode == NULL || name == NULL || data == NULL || len == NULL || flags == NULL) @@ -706,8 +740,6 @@ cdap_key_t cdap_request_wait(struct cdap * instance, rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); - list_del(&rcvd->next); - pthread_cleanup_pop(true); *opcode = rcvd->opcode; @@ -716,11 +748,9 @@ cdap_key_t cdap_request_wait(struct cdap * instance, *len = rcvd->len; *flags = rcvd->flags; - iid = rcvd->iid; + rcvd->name = NULL; - free(rcvd); - - return invoke_id_to_key(iid); + return invoke_id_to_key(rcvd->iid); } int cdap_reply_send(struct cdap * instance, @@ -729,10 +759,11 @@ int cdap_reply_send(struct cdap * instance, const void * data, size_t len) { - cdap_t msg = CDAP__INIT; - invoke_id_t iid = key_to_invoke_id(key); - struct cdap_req * req = cdap_sent_get_by_key(instance, key); - if (req == NULL) + int fd; + cdap_t msg = CDAP__INIT; + invoke_id_t iid = key_to_invoke_id(key); + struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid); + if (rcvd == NULL) return -EINVAL; if (instance == NULL) @@ -749,5 +780,11 @@ int cdap_reply_send(struct cdap * instance, msg.value.len = len; } - return write_msg(req->fd, &msg); + fd = rcvd->fd; + + assert(rcvd->name == NULL); + + free(rcvd); + + return write_msg(fd, &msg); } -- cgit v1.2.3