diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/normal/enroll.c | 11 | ||||
-rw-r--r-- | src/lib/cdap.c | 65 |
2 files changed, 52 insertions, 24 deletions
diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c index 4e510038..bdf55fe8 100644 --- a/src/ipcpd/normal/enroll.c +++ b/src/ipcpd/normal/enroll.c @@ -106,8 +106,6 @@ static void * enroll_handle(void * o) if (oc != CDAP_READ) { log_warn("Invalid request."); cdap_reply_send(cdap, key, -1, NULL, 0); - cdap_destroy(cdap); - flow_dealloc(conn.flow_info.fd); free(name); continue; } @@ -126,14 +124,10 @@ static void * enroll_handle(void * o) buf[1] = hton64(t.tv_nsec); cdap_reply_send(cdap, key, 0, buf, sizeof(buf)); free(name); - cdap_destroy(cdap); - flow_dealloc(conn.flow_info.fd); continue; } else { log_warn("Illegal read: %s.", name); cdap_reply_send(cdap, key, -1, NULL, 0); - cdap_destroy(cdap); - flow_dealloc(conn.flow_info.fd); free(name); continue; } @@ -142,8 +136,6 @@ static void * enroll_handle(void * o) if (len < 0) { log_err("Failed to pack %s.", name); cdap_reply_send(cdap, key, -1, NULL, 0); - cdap_destroy(cdap); - flow_dealloc(conn.flow_info.fd); free(name); continue; } @@ -154,8 +146,7 @@ static void * enroll_handle(void * o) if (cdap_reply_send(cdap, key, 0, buf, len)) { log_err("Failed to send CDAP reply."); - cdap_destroy(cdap); - flow_dealloc(conn.flow_info.fd); + free(buf); continue; } diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 32169c1b..0d1568b2 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -76,6 +76,8 @@ struct cdap { struct cdap_rcvd { struct list_head next; + int fd; + invoke_id_t iid; enum cdap_opcode opcode; @@ -144,6 +146,32 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, return NULL; } +static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance, + invoke_id_t iid) +{ + struct list_head * p = NULL; + struct list_head * h = NULL; + struct cdap_rcvd * rcvd = NULL; + + assert(instance); + assert(iid >= 0); + + pthread_mutex_lock(&instance->rcvd_lock); + + list_for_each_safe(p, h, &instance->rcvd) { + rcvd = list_entry(p, struct cdap_rcvd, next); + if (rcvd->iid == iid) { + pthread_mutex_unlock(&instance->rcvd_lock); + list_del(&rcvd->next); + return rcvd; + } + } + + pthread_mutex_unlock(&instance->rcvd_lock); + + return NULL; +} + static struct cdap_req * cdap_sent_add(struct cdap * instance, int fd, cdap_key_t key) @@ -221,6 +249,8 @@ static void cdap_rcvd_destroy(struct cdap * instance) free(r); } + pthread_cond_broadcast(&instance->rcvd_cond); + pthread_mutex_unlock(&instance->rcvd_lock); } @@ -240,7 +270,8 @@ static void * sdu_reader(void * o) return (void *) -1; while (flow_event_wait(instance->set, fq, NULL)) { - len = flow_read(fqueue_next(fq), buf, BUF_SIZE); + int fd = fqueue_next(fq); + len = flow_read(fd, buf, BUF_SIZE); if (len < 0) continue; @@ -255,9 +286,12 @@ static void * sdu_reader(void * o) continue; } + assert(msg->name); + rcvd->opcode = msg->opcode; rcvd->iid = msg->invoke_id; rcvd->flags = msg->flags; + rcvd->fd = fd; rcvd->name = strdup(msg->name); if (rcvd->name == NULL) { cdap__free_unpacked(msg, NULL); @@ -561,6 +595,8 @@ cdap_key_t * cdap_request_send(struct cdap * instance, key = keys; + cdap__init(&msg); + msg.opcode = code; msg.name = (char *) name; msg.has_flags = true; @@ -576,7 +612,6 @@ cdap_key_t * cdap_request_send(struct cdap * instance, struct cdap_req * req; invoke_id_t iid; struct fd_el * e; - cdap__init(&msg); iid = next_invoke_id(instance); if (iid == INVALID_INVOKE_ID) { @@ -690,7 +725,6 @@ cdap_key_t cdap_request_wait(struct cdap * instance, uint32_t * flags) { struct cdap_rcvd * rcvd; - invoke_id_t iid; if (instance == NULL || opcode == NULL || name == NULL || data == NULL || len == NULL || flags == NULL) @@ -706,8 +740,6 @@ cdap_key_t cdap_request_wait(struct cdap * instance, rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); - list_del(&rcvd->next); - pthread_cleanup_pop(true); *opcode = rcvd->opcode; @@ -716,11 +748,9 @@ cdap_key_t cdap_request_wait(struct cdap * instance, *len = rcvd->len; *flags = rcvd->flags; - iid = rcvd->iid; + rcvd->name = NULL; - free(rcvd); - - return invoke_id_to_key(iid); + return invoke_id_to_key(rcvd->iid); } int cdap_reply_send(struct cdap * instance, @@ -729,10 +759,11 @@ int cdap_reply_send(struct cdap * instance, const void * data, size_t len) { - cdap_t msg = CDAP__INIT; - invoke_id_t iid = key_to_invoke_id(key); - struct cdap_req * req = cdap_sent_get_by_key(instance, key); - if (req == NULL) + int fd; + cdap_t msg = CDAP__INIT; + invoke_id_t iid = key_to_invoke_id(key); + struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid); + if (rcvd == NULL) return -EINVAL; if (instance == NULL) @@ -749,5 +780,11 @@ int cdap_reply_send(struct cdap * instance, msg.value.len = len; } - return write_msg(req->fd, &msg); + fd = rcvd->fd; + + assert(rcvd->name == NULL); + + free(rcvd); + + return write_msg(fd, &msg); } |