diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/cdap.c | 341 | ||||
| -rw-r--r-- | src/lib/cdap.proto | 12 | ||||
| -rw-r--r-- | src/lib/cdap_req.c | 9 | ||||
| -rw-r--r-- | src/lib/cdap_req.h | 5 | 
4 files changed, 242 insertions, 125 deletions
| diff --git a/src/lib/cdap.c b/src/lib/cdap.c index ba4a2a21..86554dd2 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -25,6 +25,7 @@  #include <ouroboros/cdap.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/dev.h> +#include <ouroboros/fqueue.h>  #include <ouroboros/fcntl.h>  #include <ouroboros/errno.h> @@ -37,28 +38,39 @@  #include "cdap.pb-c.h"  typedef Cdap cdap_t; -typedef Opcode opcode_t;  typedef int32_t invoke_id_t; +#define CDAP_REPLY (CDAP_DELETE + 1) +  #define INVALID_INVOKE_ID -1  #define IDS_SIZE 256  #define BUF_SIZE 2048 -struct cdap { +struct fd_el { +        struct list_head next; +          int              fd; +}; + +struct cdap { +        flow_set_t *     set; + +        size_t           n_flows; +        struct list_head flows; +        pthread_rwlock_t flows_lock;          struct bmp *     ids;          pthread_mutex_t  ids_lock; -        pthread_t        reader; -          struct list_head sent;          pthread_rwlock_t sent_lock;          struct list_head rcvd;          pthread_cond_t   rcvd_cond;          pthread_mutex_t  rcvd_lock; + +        pthread_t        reader;  };  struct cdap_rcvd { @@ -133,6 +145,7 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,  }  static struct cdap_req * cdap_sent_add(struct cdap * instance, +                                       int           fd,                                         cdap_key_t    key)  {          struct cdap_req * req; @@ -141,7 +154,7 @@ static struct cdap_req * cdap_sent_add(struct cdap * instance,          assert(key >= 0);          assert(!cdap_sent_has_key(instance, key)); -        req = cdap_req_create(key); +        req = cdap_req_create(fd, key);          if (req == NULL)                  return NULL; @@ -220,9 +233,14 @@ static void * sdu_reader(void * o)          uint8_t buf[BUF_SIZE];          ssize_t len;          buffer_t data; +        fqueue_t * fq; -        while (true) { -                len = flow_read(instance->fd, buf, BUF_SIZE); +        fq = fqueue_create(); +        if (fq == NULL) +                return (void *) -1; + +        while (flow_event_wait(instance->set, fq, NULL)) { +                len = flow_read(fqueue_next(fq), buf, BUF_SIZE);                  if (len < 0)                          continue; @@ -230,41 +248,17 @@ static void * sdu_reader(void * o)                  if (msg == NULL)                          continue; -                if (msg->opcode != OPCODE__REPLY) { +                if (msg->opcode != CDAP_REPLY) {                          rcvd = malloc(sizeof(*rcvd));                          if (rcvd == NULL) {                                  cdap__free_unpacked(msg, NULL);                                  continue;                          } -                        switch (msg->opcode) { -                        case OPCODE__START: -                                rcvd->opcode = CDAP_START; -                                break; -                        case OPCODE__STOP: -                                rcvd->opcode = CDAP_STOP; -                                break; -                        case OPCODE__READ: -                                rcvd->opcode = CDAP_READ; -                                break; -                        case OPCODE__WRITE: -                                rcvd->opcode = CDAP_WRITE; -                                break; -                        case OPCODE__CREATE: -                                rcvd->opcode = CDAP_CREATE; -                                break; -                        case OPCODE__DELETE: -                                rcvd->opcode = CDAP_DELETE; -                                break; -                        default: -                                cdap__free_unpacked(msg, NULL); -                                free(rcvd); -                                continue; -                        } - -                        rcvd->iid   = msg->invoke_id; -                        rcvd->flags = msg->flags; -                        rcvd->name  = strdup(msg->name); +                        rcvd->opcode = msg->opcode; +                        rcvd->iid    = msg->invoke_id; +                        rcvd->flags  = msg->flags; +                        rcvd->name   = strdup(msg->name);                          if (rcvd->name == NULL) {                                  cdap__free_unpacked(msg, NULL);                                  free(rcvd); @@ -303,7 +297,7 @@ static void * sdu_reader(void * o)                                          cdap__free_unpacked(msg, NULL);                                          continue;                                  } -                                memcpy(data.data, msg->value.data, data.len); +                                memcpy(data.data, msg->value.data Iata.len);                          } else {                                  data.len = 0;                                  data.data = NULL; @@ -311,36 +305,32 @@ static void * sdu_reader(void * o)                          cdap_req_respond(req, msg->result, data);                  } - -                cdap__free_unpacked(msg, NULL);          } -          return (void *) 0;  } -struct cdap * cdap_create(int fd) +struct cdap * cdap_create()  {          struct cdap * instance = NULL; -        int flags; - -        if (fd < 0) -                return NULL; - -        flags = flow_get_flags(fd); -        if (flags & FLOW_O_NONBLOCK) -                return NULL;          instance = malloc(sizeof(*instance));          if (instance == NULL)                  return NULL; +        if (pthread_rwlock_init(&instance->flows_lock, NULL)) { +                free(instance); +                return NULL; +        } +          if (pthread_mutex_init(&instance->ids_lock, NULL)) { +                pthread_rwlock_destroy(&instance->flows_lock);                  free(instance);                  return NULL;          }          if (pthread_mutex_init(&instance->rcvd_lock, NULL)) {                  pthread_mutex_destroy(&instance->ids_lock); +                pthread_rwlock_destroy(&instance->flows_lock);                  free(instance);                  return NULL;          } @@ -348,6 +338,7 @@ struct cdap * cdap_create(int fd)          if (pthread_rwlock_init(&instance->sent_lock, NULL)) {                  pthread_mutex_destroy(&instance->rcvd_lock);                  pthread_mutex_destroy(&instance->ids_lock); +                pthread_rwlock_destroy(&instance->flows_lock);                  free(instance);                  return NULL;          } @@ -356,6 +347,7 @@ struct cdap * cdap_create(int fd)                  pthread_rwlock_destroy(&instance->sent_lock);                  pthread_mutex_destroy(&instance->rcvd_lock);                  pthread_mutex_destroy(&instance->ids_lock); +                pthread_rwlock_destroy(&instance->flows_lock);                  free(instance);                  return NULL;          } @@ -366,15 +358,29 @@ struct cdap * cdap_create(int fd)                  pthread_rwlock_destroy(&instance->sent_lock);                  pthread_mutex_destroy(&instance->rcvd_lock);                  pthread_mutex_destroy(&instance->ids_lock); +                pthread_rwlock_destroy(&instance->flows_lock);                  free(instance);                  return NULL;          } +        instance->set = flow_set_create(); +        if (instance->set == NULL) { +                bmp_destroy(instance->ids); +                pthread_cond_destroy(&instance->rcvd_cond); +                pthread_rwlock_destroy(&instance->sent_lock); +                pthread_mutex_destroy(&instance->rcvd_lock); +                pthread_mutex_destroy(&instance->ids_lock); +                pthread_rwlock_destroy(&instance->flows_lock); +                free(instance); +                return NULL; +        } + +        instance->n_flows = 0; + +        list_head_init(&instance->flows);          list_head_init(&instance->sent);          list_head_init(&instance->rcvd); -        instance->fd = fd; -          pthread_create(&instance->reader, NULL, sdu_reader, instance);          return instance; @@ -382,12 +388,29 @@ struct cdap * cdap_create(int fd)  int cdap_destroy(struct cdap * instance)  { +        struct list_head * p; +        struct list_head * h; +          if (instance == NULL)                  return 0;          pthread_cancel(instance->reader);          pthread_join(instance->reader, NULL); +        flow_set_destroy(instance->set); + +        pthread_rwlock_wrlock(&instance->flows_lock); + +        list_for_each_safe(p,h, &instance->flows) { +                struct fd_el * e = list_entry(p, struct fd_el, next); +                list_del(&e->next); +                free(e); +        } + +        pthread_rwlock_unlock(&instance->flows_lock); + +        pthread_rwlock_destroy(&instance->flows_lock); +          pthread_mutex_lock(&instance->ids_lock);          bmp_destroy(instance->ids); @@ -409,14 +432,71 @@ int cdap_destroy(struct cdap * instance)          return 0;  } -static int write_msg(struct cdap * instance, +int cdap_add_flow(struct cdap * instance, +                  int           fd) +{ +        struct fd_el * e; + +        if (fd < 0) +                return -EINVAL; + +        e = malloc(sizeof(*e)); +        if (e == NULL) +                return -ENOMEM; + +        e->fd = fd; + +        pthread_rwlock_wrlock(&instance->flows_lock); + +        if (flow_set_add(instance->set, fd)) { +                pthread_rwlock_unlock(&instance->flows_lock); +                return -1; +        } + +        list_add(&e->next, &instance->flows); + +        ++instance->n_flows; + +        pthread_rwlock_unlock(&instance->flows_lock); + +        return 0; +} + +int cdap_del_flow(struct cdap * instance, +                  int           fd) +{ +        struct list_head * p; +        struct list_head * h; + +        if (fd < 0) +                return -EINVAL; + +        pthread_rwlock_wrlock(&instance->flows_lock); + +        flow_set_del(instance->set, fd); + +        list_for_each_safe(p, h, &instance->flows) { +                struct fd_el * e = list_entry(p, struct fd_el, next); +                if (e->fd == fd) { +                        list_del(&e->next); +                        free(e); +                        break; +                } +        } + +        --instance->n_flows; + +        pthread_rwlock_unlock(&instance->flows_lock); + +        return 0; +} + +static int write_msg(int           fd,                       cdap_t *      msg)  { -        int ret;          uint8_t * data;          size_t len; -        assert(instance);          assert(msg);          len = cdap__get_packed_size(msg); @@ -429,11 +509,14 @@ static int write_msg(struct cdap * instance,          cdap__pack(msg, data); -        ret = flow_write(instance->fd, data, len); +        if (flow_write(fd, data, len)) { +                free(data); +                return -1; +        }          free(data); -        return ret; +        return 0;  }  static cdap_key_t invoke_id_to_key(invoke_id_t iid) @@ -452,75 +535,114 @@ static invoke_id_t key_to_invoke_id(cdap_key_t key)          return (invoke_id_t) key;  } -cdap_key_t cdap_request_send(struct cdap *    instance, -                             enum cdap_opcode code, -                             const char *     name, -                             const void *     data, -                             size_t           len, -                             uint32_t         flags) +cdap_key_t * cdap_request_send(struct cdap *    instance, +                               enum cdap_opcode code, +                               const char *     name, +                               const void *     data, +                               size_t           len, +                               uint32_t         flags)  { -        cdap_t msg = CDAP__INIT; -        struct cdap_req * req; -        invoke_id_t iid; -        cdap_key_t key; +        cdap_key_t *       keys; +        cdap_key_t *       key; +        cdap_t             msg = CDAP__INIT; +        struct list_head * p; +        int                ret; -        if (instance == NULL || name == NULL) -                return -EINVAL; +        if (instance == NULL || name == NULL || code > CDAP_DELETE) +                return NULL; +        pthread_rwlock_rdlock(&instance->flows_lock); -        iid = next_invoke_id(instance); -        if (iid == INVALID_INVOKE_ID) -                return INVALID_CDAP_KEY; +        keys = malloc(sizeof(*keys) * (instance->n_flows + 1)); +        if (keys == NULL) +                return NULL; -        switch (code) { -        case CDAP_READ: -                msg.opcode = OPCODE__READ; -                break; -        case CDAP_WRITE: -                msg.opcode = OPCODE__WRITE; -                break; -        case CDAP_CREATE: -                msg.opcode = OPCODE__CREATE; -                break; -        case CDAP_DELETE: -                msg.opcode = OPCODE__DELETE; -                break; -        case CDAP_START: -                msg.opcode = OPCODE__START; -                break; -        case CDAP_STOP: -                msg.opcode = OPCODE__STOP; -                break; -        default: -                release_invoke_id(instance, iid); -                return -EINVAL; -        } +        memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1)); +        key = keys; + +        msg.opcode = code;          msg.name = (char *) name;          msg.has_flags = true;          msg.flags = flags; -        msg.invoke_id = iid; +          if (data != NULL) {                  msg.has_value = true;                  msg.value.data = (uint8_t *) data;                  msg.value.len = len;          } -        key = invoke_id_to_key(iid); +        list_for_each(p, &instance->flows) { +                struct cdap_req * req; +                invoke_id_t iid; +                struct fd_el * e; +                cdap__init(&msg); + +                iid = next_invoke_id(instance); +                if (iid == INVALID_INVOKE_ID) { +                        pthread_rwlock_unlock(&instance->flows_lock); +                        while(key > keys) { +                                struct cdap_req * r = +                                        cdap_sent_get_by_key(instance, +                                                             *(--key)); +                                cdap_sent_del(instance, r); +                                cdap_req_destroy(r); +                        } -        req = cdap_sent_add(instance, key); -        if (req == NULL) { -                release_invoke_id(instance, iid); -                return INVALID_CDAP_KEY; -        } +                        free(keys); +                        return NULL; +                } -        if (write_msg(instance, &msg)) { -                cdap_sent_del(instance, req); -                release_invoke_id(instance, iid); -                return INVALID_CDAP_KEY; +                msg.invoke_id = iid; + +                *key = invoke_id_to_key(iid); + +                e = list_entry(p, struct fd_el, next); + +                req = cdap_sent_add(instance, e->fd, *key); +                if (req == NULL) { +                        pthread_rwlock_unlock(&instance->flows_lock); +                        while(key > keys) { +                                struct cdap_req * r = +                                        cdap_sent_get_by_key(instance, +                                                             *(--key)); +                                release_invoke_id(instance, iid); +                                cdap_sent_del(instance, r); +                                release_invoke_id(instance, +                                                  key_to_invoke_id(r->key)); +                                cdap_req_destroy(r); +                        } +                        free(keys); +                        return NULL; +                } + +                ret = write_msg(e->fd, &msg); +                if (ret == -ENOMEM) { +                        pthread_rwlock_unlock(&instance->flows_lock); +                        while(key >= keys) { +                                struct cdap_req * r = +                                        cdap_sent_get_by_key(instance, *key); +                                cdap_sent_del(instance, r); +                                release_invoke_id(instance, +                                                  key_to_invoke_id(r->key)); +                                cdap_req_destroy(r); +                        } + +                        free(keys); +                        return NULL; +                } + +                if (ret < 0) { +                        release_invoke_id(instance, iid); +                        cdap_sent_del(instance, req); +                } + +                ++key;          } -        return key; +        pthread_rwlock_unlock(&instance->flows_lock); + +        return keys;  }  int cdap_reply_wait(struct cdap * instance, @@ -609,11 +731,14 @@ int cdap_reply_send(struct cdap * instance,  {          cdap_t msg = CDAP__INIT;          invoke_id_t iid = key_to_invoke_id(key); +        struct cdap_req * req = cdap_sent_get_by_key(instance, key); +        if (req == NULL) +                return -EINVAL;          if (instance == NULL)                  return -EINVAL; -        msg.opcode = OPCODE__REPLY; +        msg.opcode = CDAP_REPLY;          msg.invoke_id = iid;          msg.has_result = true;          msg.result = result; @@ -624,5 +749,5 @@ int cdap_reply_send(struct cdap * instance,                  msg.value.len = len;          } -        return write_msg(instance, &msg); +        return write_msg(req->fd, &msg);  } diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto index 5fde1658..120b2c97 100644 --- a/src/lib/cdap.proto +++ b/src/lib/cdap.proto @@ -23,18 +23,8 @@  syntax = "proto2"; -enum opcode { -        CREATE = 1; -        DELETE = 2; -        READ   = 3; -        WRITE  = 4; -        START  = 5; -        STOP   = 6; -        REPLY  = 7; -} -  message cdap { -        required opcode opcode    = 1; +        required uint32 opcode    = 1;          required uint32 invoke_id = 2;          optional uint32 flags     = 3;          optional string name      = 4; diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c index a0348a14..b60e73ad 100644 --- a/src/lib/cdap_req.c +++ b/src/lib/cdap_req.c @@ -30,7 +30,8 @@  #include <stdlib.h>  #include <assert.h> -struct cdap_req * cdap_req_create(cdap_key_t key) +struct cdap_req * cdap_req_create(int        fd, +                                  cdap_key_t key)  {          struct cdap_req * creq = malloc(sizeof(*creq));          pthread_condattr_t cattr; @@ -38,10 +39,10 @@ struct cdap_req * cdap_req_create(cdap_key_t key)          if (creq == NULL)                  return NULL; -        creq->key = key; +        creq->fd        = fd; +        creq->key       = key;          creq->state     = REQ_INIT; - -        creq->response = -1; +        creq->response  = -1;          creq->data.data = NULL;          creq->data.len  = 0; diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h index 9023357d..fe8e3613 100644 --- a/src/lib/cdap_req.h +++ b/src/lib/cdap_req.h @@ -43,8 +43,8 @@ enum creq_state {  struct cdap_req {          struct list_head next; +        int              fd;          struct timespec  birth; -          cdap_key_t       key;          int              response; @@ -55,7 +55,8 @@ struct cdap_req {          pthread_mutex_t  lock;  }; -struct cdap_req * cdap_req_create(cdap_key_t key); +struct cdap_req * cdap_req_create(int        fd, +                                  cdap_key_t key);  void              cdap_req_destroy(struct cdap_req * creq); | 
