diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/normal/enroll.c | 11 | ||||
| -rw-r--r-- | src/lib/cdap.c | 65 | 
2 files changed, 52 insertions, 24 deletions
| diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c index 4e510038..bdf55fe8 100644 --- a/src/ipcpd/normal/enroll.c +++ b/src/ipcpd/normal/enroll.c @@ -106,8 +106,6 @@ static void * enroll_handle(void * o)                          if (oc != CDAP_READ) {                                  log_warn("Invalid request.");                                  cdap_reply_send(cdap, key, -1, NULL, 0); -                                cdap_destroy(cdap); -                                flow_dealloc(conn.flow_info.fd);                                  free(name);                                  continue;                          } @@ -126,14 +124,10 @@ static void * enroll_handle(void * o)                                  buf[1] = hton64(t.tv_nsec);                                  cdap_reply_send(cdap, key, 0, buf, sizeof(buf));                                  free(name); -                                cdap_destroy(cdap); -                                flow_dealloc(conn.flow_info.fd);                                  continue;                          } else {                                  log_warn("Illegal read: %s.", name);                                  cdap_reply_send(cdap, key, -1, NULL, 0); -                                cdap_destroy(cdap); -                                flow_dealloc(conn.flow_info.fd);                                  free(name);                                  continue;                          } @@ -142,8 +136,6 @@ static void * enroll_handle(void * o)                          if (len < 0) {                                  log_err("Failed to pack %s.", name);                                  cdap_reply_send(cdap, key, -1, NULL, 0); -                                cdap_destroy(cdap); -                                flow_dealloc(conn.flow_info.fd);                                  free(name);                                  continue;                          } @@ -154,8 +146,7 @@ static void * enroll_handle(void * o)                          if (cdap_reply_send(cdap, key, 0, buf, len)) {                                  log_err("Failed to send CDAP reply."); -                                cdap_destroy(cdap); -                                flow_dealloc(conn.flow_info.fd); +                                free(buf);                                  continue;                          } diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 32169c1b..0d1568b2 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -76,6 +76,8 @@ struct cdap {  struct cdap_rcvd {          struct list_head next; +        int              fd; +          invoke_id_t      iid;          enum cdap_opcode opcode; @@ -144,6 +146,32 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,          return NULL;  } +static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance, +                                               invoke_id_t   iid) +{ +        struct list_head * p = NULL; +        struct list_head * h = NULL; +        struct cdap_rcvd * rcvd = NULL; + +        assert(instance); +        assert(iid >= 0); + +        pthread_mutex_lock(&instance->rcvd_lock); + +        list_for_each_safe(p, h, &instance->rcvd) { +                rcvd = list_entry(p, struct cdap_rcvd, next); +                if (rcvd->iid == iid) { +                        pthread_mutex_unlock(&instance->rcvd_lock); +                        list_del(&rcvd->next); +                        return rcvd; +                } +        } + +        pthread_mutex_unlock(&instance->rcvd_lock); + +        return NULL; +} +  static struct cdap_req * cdap_sent_add(struct cdap * instance,                                         int           fd,                                         cdap_key_t    key) @@ -221,6 +249,8 @@ static void cdap_rcvd_destroy(struct cdap * instance)                  free(r);          } +        pthread_cond_broadcast(&instance->rcvd_cond); +          pthread_mutex_unlock(&instance->rcvd_lock);  } @@ -240,7 +270,8 @@ static void * sdu_reader(void * o)                  return (void *) -1;          while (flow_event_wait(instance->set, fq, NULL)) { -                len = flow_read(fqueue_next(fq), buf, BUF_SIZE); +                int fd = fqueue_next(fq); +                len = flow_read(fd, buf, BUF_SIZE);                  if (len < 0)                          continue; @@ -255,9 +286,12 @@ static void * sdu_reader(void * o)                                  continue;                          } +                        assert(msg->name); +                          rcvd->opcode = msg->opcode;                          rcvd->iid    = msg->invoke_id;                          rcvd->flags  = msg->flags; +                        rcvd->fd     = fd;                          rcvd->name   = strdup(msg->name);                          if (rcvd->name == NULL) {                                  cdap__free_unpacked(msg, NULL); @@ -561,6 +595,8 @@ cdap_key_t * cdap_request_send(struct cdap *    instance,          key = keys; +        cdap__init(&msg); +          msg.opcode = code;          msg.name = (char *) name;          msg.has_flags = true; @@ -576,7 +612,6 @@ cdap_key_t * cdap_request_send(struct cdap *    instance,                  struct cdap_req * req;                  invoke_id_t iid;                  struct fd_el * e; -                cdap__init(&msg);                  iid = next_invoke_id(instance);                  if (iid == INVALID_INVOKE_ID) { @@ -690,7 +725,6 @@ cdap_key_t cdap_request_wait(struct cdap *      instance,                               uint32_t *         flags)  {          struct cdap_rcvd * rcvd; -        invoke_id_t iid;          if (instance == NULL || opcode == NULL || name == NULL || data == NULL              || len == NULL || flags == NULL) @@ -706,8 +740,6 @@ cdap_key_t cdap_request_wait(struct cdap *      instance,          rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); -        list_del(&rcvd->next); -          pthread_cleanup_pop(true);          *opcode = rcvd->opcode; @@ -716,11 +748,9 @@ cdap_key_t cdap_request_wait(struct cdap *      instance,          *len    = rcvd->len;          *flags  = rcvd->flags; -        iid = rcvd->iid; +        rcvd->name = NULL; -        free(rcvd); - -        return invoke_id_to_key(iid); +        return invoke_id_to_key(rcvd->iid);  }  int cdap_reply_send(struct cdap * instance, @@ -729,10 +759,11 @@ int cdap_reply_send(struct cdap * instance,                      const void *  data,                      size_t        len)  { -        cdap_t msg = CDAP__INIT; -        invoke_id_t iid = key_to_invoke_id(key); -        struct cdap_req * req = cdap_sent_get_by_key(instance, key); -        if (req == NULL) +        int                fd; +        cdap_t             msg  = CDAP__INIT; +        invoke_id_t        iid  = key_to_invoke_id(key); +        struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid); +        if (rcvd == NULL)                  return -EINVAL;          if (instance == NULL) @@ -749,5 +780,11 @@ int cdap_reply_send(struct cdap * instance,                  msg.value.len = len;          } -        return write_msg(req->fd, &msg); +        fd = rcvd->fd; + +        assert(rcvd->name == NULL); + +        free(rcvd); + +        return write_msg(fd, &msg);  } | 
