From f18293b1458ebfb6105812868c15ba9be5661ba2 Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@ugent.be>
Date: Tue, 14 Mar 2017 19:34:18 +0100
Subject: lib: Fix CDAP with multiple fds

---
 src/ipcpd/normal/enroll.c | 11 +-------
 src/lib/cdap.c            | 65 +++++++++++++++++++++++++++++++++++++----------
 2 files changed, 52 insertions(+), 24 deletions(-)

(limited to 'src')

diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c
index 4e510038..bdf55fe8 100644
--- a/src/ipcpd/normal/enroll.c
+++ b/src/ipcpd/normal/enroll.c
@@ -106,8 +106,6 @@ static void * enroll_handle(void * o)
                         if (oc != CDAP_READ) {
                                 log_warn("Invalid request.");
                                 cdap_reply_send(cdap, key, -1, NULL, 0);
-                                cdap_destroy(cdap);
-                                flow_dealloc(conn.flow_info.fd);
                                 free(name);
                                 continue;
                         }
@@ -126,14 +124,10 @@ static void * enroll_handle(void * o)
                                 buf[1] = hton64(t.tv_nsec);
                                 cdap_reply_send(cdap, key, 0, buf, sizeof(buf));
                                 free(name);
-                                cdap_destroy(cdap);
-                                flow_dealloc(conn.flow_info.fd);
                                 continue;
                         } else {
                                 log_warn("Illegal read: %s.", name);
                                 cdap_reply_send(cdap, key, -1, NULL, 0);
-                                cdap_destroy(cdap);
-                                flow_dealloc(conn.flow_info.fd);
                                 free(name);
                                 continue;
                         }
@@ -142,8 +136,6 @@ static void * enroll_handle(void * o)
                         if (len < 0) {
                                 log_err("Failed to pack %s.", name);
                                 cdap_reply_send(cdap, key, -1, NULL, 0);
-                                cdap_destroy(cdap);
-                                flow_dealloc(conn.flow_info.fd);
                                 free(name);
                                 continue;
                         }
@@ -154,8 +146,7 @@ static void * enroll_handle(void * o)
 
                         if (cdap_reply_send(cdap, key, 0, buf, len)) {
                                 log_err("Failed to send CDAP reply.");
-                                cdap_destroy(cdap);
-                                flow_dealloc(conn.flow_info.fd);
+                                free(buf);
                                 continue;
                         }
 
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 32169c1b..0d1568b2 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -76,6 +76,8 @@ struct cdap {
 struct cdap_rcvd {
         struct list_head next;
 
+        int              fd;
+
         invoke_id_t      iid;
 
         enum cdap_opcode opcode;
@@ -144,6 +146,32 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,
         return NULL;
 }
 
+static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance,
+                                               invoke_id_t   iid)
+{
+        struct list_head * p = NULL;
+        struct list_head * h = NULL;
+        struct cdap_rcvd * rcvd = NULL;
+
+        assert(instance);
+        assert(iid >= 0);
+
+        pthread_mutex_lock(&instance->rcvd_lock);
+
+        list_for_each_safe(p, h, &instance->rcvd) {
+                rcvd = list_entry(p, struct cdap_rcvd, next);
+                if (rcvd->iid == iid) {
+                        pthread_mutex_unlock(&instance->rcvd_lock);
+                        list_del(&rcvd->next);
+                        return rcvd;
+                }
+        }
+
+        pthread_mutex_unlock(&instance->rcvd_lock);
+
+        return NULL;
+}
+
 static struct cdap_req * cdap_sent_add(struct cdap * instance,
                                        int           fd,
                                        cdap_key_t    key)
@@ -221,6 +249,8 @@ static void cdap_rcvd_destroy(struct cdap * instance)
                 free(r);
         }
 
+        pthread_cond_broadcast(&instance->rcvd_cond);
+
         pthread_mutex_unlock(&instance->rcvd_lock);
 }
 
@@ -240,7 +270,8 @@ static void * sdu_reader(void * o)
                 return (void *) -1;
 
         while (flow_event_wait(instance->set, fq, NULL)) {
-                len = flow_read(fqueue_next(fq), buf, BUF_SIZE);
+                int fd = fqueue_next(fq);
+                len = flow_read(fd, buf, BUF_SIZE);
                 if (len < 0)
                         continue;
 
@@ -255,9 +286,12 @@ static void * sdu_reader(void * o)
                                 continue;
                         }
 
+                        assert(msg->name);
+
                         rcvd->opcode = msg->opcode;
                         rcvd->iid    = msg->invoke_id;
                         rcvd->flags  = msg->flags;
+                        rcvd->fd     = fd;
                         rcvd->name   = strdup(msg->name);
                         if (rcvd->name == NULL) {
                                 cdap__free_unpacked(msg, NULL);
@@ -561,6 +595,8 @@ cdap_key_t * cdap_request_send(struct cdap *    instance,
 
         key = keys;
 
+        cdap__init(&msg);
+
         msg.opcode = code;
         msg.name = (char *) name;
         msg.has_flags = true;
@@ -576,7 +612,6 @@ cdap_key_t * cdap_request_send(struct cdap *    instance,
                 struct cdap_req * req;
                 invoke_id_t iid;
                 struct fd_el * e;
-                cdap__init(&msg);
 
                 iid = next_invoke_id(instance);
                 if (iid == INVALID_INVOKE_ID) {
@@ -690,7 +725,6 @@ cdap_key_t cdap_request_wait(struct cdap *      instance,
                              uint32_t *         flags)
 {
         struct cdap_rcvd * rcvd;
-        invoke_id_t iid;
 
         if (instance == NULL || opcode == NULL || name == NULL || data == NULL
             || len == NULL || flags == NULL)
@@ -706,8 +740,6 @@ cdap_key_t cdap_request_wait(struct cdap *      instance,
 
         rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
 
-        list_del(&rcvd->next);
-
         pthread_cleanup_pop(true);
 
         *opcode = rcvd->opcode;
@@ -716,11 +748,9 @@ cdap_key_t cdap_request_wait(struct cdap *      instance,
         *len    = rcvd->len;
         *flags  = rcvd->flags;
 
-        iid = rcvd->iid;
+        rcvd->name = NULL;
 
-        free(rcvd);
-
-        return invoke_id_to_key(iid);
+        return invoke_id_to_key(rcvd->iid);
 }
 
 int cdap_reply_send(struct cdap * instance,
@@ -729,10 +759,11 @@ int cdap_reply_send(struct cdap * instance,
                     const void *  data,
                     size_t        len)
 {
-        cdap_t msg = CDAP__INIT;
-        invoke_id_t iid = key_to_invoke_id(key);
-        struct cdap_req * req = cdap_sent_get_by_key(instance, key);
-        if (req == NULL)
+        int                fd;
+        cdap_t             msg  = CDAP__INIT;
+        invoke_id_t        iid  = key_to_invoke_id(key);
+        struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid);
+        if (rcvd == NULL)
                 return -EINVAL;
 
         if (instance == NULL)
@@ -749,5 +780,11 @@ int cdap_reply_send(struct cdap * instance,
                 msg.value.len = len;
         }
 
-        return write_msg(req->fd, &msg);
+        fd = rcvd->fd;
+
+        assert(rcvd->name == NULL);
+
+        free(rcvd);
+
+        return write_msg(fd, &msg);
 }
-- 
cgit v1.2.3