diff options
Diffstat (limited to 'src/lib/cdap.c')
-rw-r--r-- | src/lib/cdap.c | 531 |
1 files changed, 394 insertions, 137 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c index df79be54..dee8f88c 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -25,39 +25,65 @@ #include <ouroboros/bitmap.h> #include <ouroboros/dev.h> #include <ouroboros/fcntl.h> +#include <ouroboros/errno.h> + +#include "cdap_req.h" #include <stdlib.h> #include <pthread.h> +#include <string.h> +#include <assert.h> #include "cdap.pb-c.h" typedef Cdap cdap_t; typedef Opcode opcode_t; +typedef int32_t invoke_id_t; + +#define INVALID_INVOKE_ID -1 #define IDS_SIZE 256 #define BUF_SIZE 2048 struct cdap { - int fd; - struct bmp * ids; - pthread_mutex_t ids_lock; - pthread_t reader; - struct cdap_ops * ops; + int fd; + + struct bmp * ids; + pthread_mutex_t ids_lock; + + pthread_t reader; + + struct list_head sent; + pthread_rwlock_t sent_lock; + + struct list_head rcvd; + pthread_cond_t rcvd_cond; + pthread_mutex_t rcvd_lock; }; -struct cdap_info { - pthread_t thread; - struct cdap * instance; - cdap_t * msg; +struct cdap_rcvd { + struct list_head next; + + invoke_id_t iid; + + enum cdap_opcode opcode; + char * name; + uint8_t * data; + size_t len; + uint32_t flags; }; static int next_invoke_id(struct cdap * instance) { int ret; + assert(instance); + pthread_mutex_lock(&instance->ids_lock); + ret = bmp_allocate(instance->ids); if (!bmp_is_id_valid(instance->ids, ret)) - ret = -1; /* INVALID_INVOKE_ID */ + ret = INVALID_INVOKE_ID; + pthread_mutex_unlock(&instance->ids_lock); return ret; @@ -67,140 +93,225 @@ static int release_invoke_id(struct cdap * instance, int id) { int ret; + assert(instance); + pthread_mutex_lock(&instance->ids_lock); + ret = bmp_release(instance->ids, id); + pthread_mutex_unlock(&instance->ids_lock); return ret; } -static void * handle_cdap_msg(void * o) +#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL) + +struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, cdap_key_t key) { - struct cdap_info * info = (struct cdap_info *) o; - struct cdap * instance = info->instance; - cdap_t * msg = info->msg; - - switch (msg->opcode) { - case OPCODE__READ: - if (msg->name != NULL) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_READ, - msg->name, - NULL, 0, 0); - break; - case OPCODE__WRITE: - if (msg->name != NULL && - msg->has_value) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_WRITE, - msg->name, - msg->value.data, - msg->value.len, - msg->flags); - break; - case OPCODE__CREATE: - if (msg->name != NULL && - msg->has_value) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_CREATE, - msg->name, - msg->value.data, - msg->value.len, 0); - break; - case OPCODE__DELETE: - if (msg->name != NULL && - msg->has_value) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_DELETE, - msg->name, - msg->value.data, - msg->value.len, 0); - break; - case OPCODE__START: - if (msg->name != NULL) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_START, - msg->name, - NULL, 0, 0); - break; - case OPCODE__STOP: - if (msg->name != NULL) - instance->ops->cdap_request(instance, - msg->invoke_id, - CDAP_STOP, - msg->name, - NULL, 0, 0); - break; - case OPCODE__REPLY: - instance->ops->cdap_reply(instance, - msg->invoke_id, - msg->result, - msg->value.data, - msg->value.len); - release_invoke_id(instance, msg->invoke_id); - break; - default: - break; + struct list_head * p = NULL; + struct cdap_req * req = NULL; + + assert(instance); + assert(key >= 0); + + pthread_rwlock_rdlock(&instance->sent_lock); + + list_for_each(p, &instance->sent) { + req = list_entry(p, struct cdap_req, next); + if (req->key == key) { + pthread_rwlock_unlock(&instance->sent_lock); + return req; + } } - free(info); - cdap__free_unpacked(msg, NULL); + pthread_rwlock_unlock(&instance->sent_lock); - return (void *) 0; + return NULL; +} + +static int cdap_sent_add(struct cdap * instance, struct cdap_req * req) +{ + assert (instance); + assert (req); + + if (cdap_sent_has_key(instance, req->key)) + return -EPERM; + + pthread_rwlock_wrlock(&instance->sent_lock); + + list_add(&req->next, &instance->sent); + + pthread_rwlock_unlock(&instance->sent_lock); + + return 0; +} + +static void cdap_sent_del(struct cdap * instance, struct cdap_req * req) +{ + assert(instance); + assert(req); + + assert(cdap_sent_has_key(instance, req->key)); + + pthread_rwlock_wrlock(&instance->sent_lock); + + list_del(&req->next); + + pthread_rwlock_unlock(&instance->sent_lock); +} + +static void cdap_sent_destroy(struct cdap * instance) +{ + struct list_head * p = NULL; + struct list_head * h = NULL; + + assert(instance); + + pthread_rwlock_wrlock(&instance->sent_lock); + + list_for_each_safe(p, h, &instance->sent) { + struct cdap_req * req = list_entry(p, struct cdap_req, next); + list_del(&req->next); + cdap_req_destroy(req); + } + + pthread_rwlock_unlock(&instance->sent_lock); +} + +static void cdap_rcvd_destroy(struct cdap * instance) +{ + struct list_head * p = NULL; + struct list_head * h = NULL; + + assert(instance); + + pthread_mutex_lock(&instance->rcvd_lock); + + list_for_each_safe(p, h, &instance->sent) { + struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next); + list_del(&r->next); + if (r->data != NULL) + free(r->data); + if (r->name != NULL) + free(r->name); + free(r); + } + + pthread_mutex_unlock(&instance->rcvd_lock); } static void * sdu_reader(void * o) { struct cdap * instance = (struct cdap *) o; + struct cdap_req * req; + struct cdap_rcvd * rcvd; cdap_t * msg; uint8_t buf[BUF_SIZE]; ssize_t len; - struct cdap_info * cdap_info; + buffer_t data; while (true) { len = flow_read(instance->fd, buf, BUF_SIZE); if (len < 0) - return (void *) -1; + continue; msg = cdap__unpack(NULL, len, buf); if (msg == NULL) continue; - cdap_info = malloc(sizeof(*cdap_info)); - if (cdap_info == NULL) { - cdap__free_unpacked(msg, NULL); - continue; + if (msg->opcode != OPCODE__REPLY) { + rcvd = malloc(sizeof(*rcvd)); + if (rcvd == NULL) { + cdap__free_unpacked(msg, NULL); + continue; + } + + switch (msg->opcode) { + case OPCODE__START: + rcvd->opcode = CDAP_START; + break; + case OPCODE__STOP: + rcvd->opcode = CDAP_STOP; + break; + case OPCODE__READ: + rcvd->opcode = CDAP_READ; + break; + case OPCODE__WRITE: + rcvd->opcode = CDAP_WRITE; + break; + case OPCODE__CREATE: + rcvd->opcode = CDAP_CREATE; + break; + case OPCODE__DELETE: + rcvd->opcode = CDAP_DELETE; + break; + default: + cdap__free_unpacked(msg, NULL); + free(rcvd); + continue; + } + rcvd->iid = msg->invoke_id; + rcvd->flags = msg->flags; + rcvd->name = strdup(msg->name); + if (rcvd->name == NULL) { + cdap__free_unpacked(msg, NULL); + free(rcvd); + continue; + } + + if (msg->has_value) { + rcvd->len = msg->value.len; + rcvd->data = malloc(rcvd->len); + if (rcvd->data == NULL) { + cdap__free_unpacked(msg, NULL); + free(rcvd); + continue; + } + memcpy(rcvd->data, msg->value.data, rcvd->len); + } else { + rcvd->len = 0; + rcvd->data = NULL; + } + + pthread_mutex_lock(&instance->rcvd_lock); + + list_add(&rcvd->next, &instance->rcvd); + + pthread_cond_signal(&instance->rcvd_cond); + pthread_mutex_unlock(&instance->rcvd_lock); + } else { + req = cdap_sent_get_by_key(instance, msg->invoke_id); + if (req == NULL) + continue; + + if (msg->has_value) { + data.len = msg->value.len; + data.data = malloc(data.len); + if (data.data == NULL) { + cdap__free_unpacked(msg, NULL); + continue; + } + memcpy(data.data, msg->value.data, data.len); + } else { + data.len = 0; + data.data = NULL; + } + + cdap_req_respond(req, msg->result, data); } - cdap_info->instance = instance; - cdap_info->msg = msg; - - pthread_create(&cdap_info->thread, - NULL, - handle_cdap_msg, - (void *) cdap_info); - - pthread_detach(cdap_info->thread); - + cdap__free_unpacked(msg, NULL); } return (void *) 0; } -struct cdap * cdap_create(struct cdap_ops * ops, - int fd) +struct cdap * cdap_create(int fd) { struct cdap * instance = NULL; int flags; - if (ops == NULL || fd < 0 || - ops->cdap_reply == NULL || - ops->cdap_request == NULL) + if (fd < 0) return NULL; flags = flow_get_flags(fd); @@ -216,19 +327,43 @@ struct cdap * cdap_create(struct cdap_ops * ops, return NULL; } - instance->ops = ops; - instance->fd = fd; + if (pthread_mutex_init(&instance->rcvd_lock, NULL)) { + pthread_mutex_destroy(&instance->ids_lock); + free(instance); + return NULL; + } + + if (pthread_rwlock_init(&instance->sent_lock, NULL)) { + pthread_mutex_destroy(&instance->rcvd_lock); + pthread_mutex_destroy(&instance->ids_lock); + free(instance); + return NULL; + } + + if (pthread_cond_init(&instance->rcvd_cond, NULL)) { + pthread_rwlock_destroy(&instance->sent_lock); + pthread_mutex_destroy(&instance->rcvd_lock); + pthread_mutex_destroy(&instance->ids_lock); + free(instance); + return NULL; + } instance->ids = bmp_create(IDS_SIZE, 0); if (instance->ids == NULL) { + pthread_cond_destroy(&instance->rcvd_cond); + pthread_rwlock_destroy(&instance->sent_lock); + pthread_mutex_destroy(&instance->rcvd_lock); + pthread_mutex_destroy(&instance->ids_lock); free(instance); return NULL; } - pthread_create(&instance->reader, - NULL, - sdu_reader, - (void *) instance); + INIT_LIST_HEAD(&instance->sent); + INIT_LIST_HEAD(&instance->rcvd); + + instance->fd = fd; + + pthread_create(&instance->reader, NULL, sdu_reader, instance); return instance; } @@ -247,27 +382,37 @@ int cdap_destroy(struct cdap * instance) pthread_mutex_unlock(&instance->ids_lock); - flow_dealloc(instance->fd); + pthread_mutex_destroy(&instance->ids_lock); + + cdap_sent_destroy(instance); + + pthread_rwlock_destroy(&instance->sent_lock); + + cdap_rcvd_destroy(instance); + + pthread_mutex_destroy(&instance->rcvd_lock); free(instance); return 0; } -static int write_msg(struct cdap * instance, - cdap_t * msg) +static int write_msg(struct cdap * instance, cdap_t * msg) { int ret; uint8_t * data; size_t len; + assert(instance); + assert(msg); + len = cdap__get_packed_size(msg); if (len == 0) return -1; - data = malloc(BUF_SIZE); + data = malloc(len); if (data == NULL) - return -1; + return -ENOMEM; cdap__pack(msg, data); @@ -278,22 +423,41 @@ static int write_msg(struct cdap * instance, return ret; } -int cdap_send_request(struct cdap * instance, - enum cdap_opcode code, - char * name, - uint8_t * data, - size_t len, - uint32_t flags) +static cdap_key_t invoke_id_to_key(invoke_id_t iid) +{ + if (iid == INVALID_INVOKE_ID) + return INVALID_CDAP_KEY; + + return (cdap_key_t) iid; +} + +static invoke_id_t key_to_invoke_id(cdap_key_t key) +{ + if (key == INVALID_CDAP_KEY) + return INVALID_INVOKE_ID; + + return (invoke_id_t) key; +} + +cdap_key_t cdap_request_send(struct cdap * instance, + enum cdap_opcode code, + char * name, + uint8_t * data, + size_t len, + uint32_t flags) { - int id; cdap_t msg = CDAP__INIT; + struct cdap_req * req; + invoke_id_t iid; + cdap_key_t key; if (instance == NULL || name == NULL) - return -1; + return -EINVAL; - id = next_invoke_id(instance); - if (!bmp_is_id_valid(instance->ids, id)) - return -1; + + iid = next_invoke_id(instance); + if (iid == INVALID_INVOKE_ID) + return INVALID_CDAP_KEY; switch (code) { case CDAP_READ: @@ -315,39 +479,132 @@ int cdap_send_request(struct cdap * instance, msg.opcode = OPCODE__STOP; break; default: - release_invoke_id(instance, id); - return -1; + release_invoke_id(instance, iid); + return -EINVAL; } msg.name = name; msg.has_flags = true; msg.flags = flags; - msg.invoke_id = id; + msg.invoke_id = iid; if (data != NULL) { msg.has_value = true; msg.value.data = data; msg.value.len = len; } - if (write_msg(instance, &msg)) - return -1; + key = invoke_id_to_key(iid); + + req = cdap_req_create(key); + if (req == NULL) + return INVALID_CDAP_KEY; + + if (cdap_sent_add(instance, req)) { + cdap_req_destroy(req); + return INVALID_CDAP_KEY; + } + + if (write_msg(instance, &msg)) { + cdap_sent_del(instance, req); + cdap_req_destroy(req); + return INVALID_CDAP_KEY; + } + + return key; +} + +int cdap_reply_wait(struct cdap * instance, + cdap_key_t key, + uint8_t ** data, + size_t * len) +{ + int ret; + struct cdap_req * r; + invoke_id_t iid = key_to_invoke_id(key); + + if (instance == NULL || iid == INVALID_INVOKE_ID) + return -EINVAL; + + r = cdap_sent_get_by_key(instance, key); + if (r == NULL) + return -EINVAL; + + ret = cdap_req_wait(r); + if (ret < 0) + return ret; + + if (r->response) + return r->response; + + assert(ret == 0); + + if (data != NULL) { + *data = r->data.data; + *len = r->data.len; + } + + cdap_sent_del(instance, r); - return id; + release_invoke_id(instance, iid); + + return 0; } -int cdap_send_reply(struct cdap * instance, - int invoke_id, +cdap_key_t cdap_request_wait(struct cdap * instance, + enum cdap_opcode * opcode, + char ** name, + uint8_t ** data, + size_t * len, + uint32_t * flags) +{ + struct cdap_rcvd * rcvd; + invoke_id_t iid; + + if (instance == NULL || opcode == NULL || name == NULL || data == NULL + || len == NULL || flags == NULL) + return -EINVAL; + + pthread_mutex_lock(&instance->rcvd_lock); + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) &instance->rcvd_lock); + + while (list_empty(&instance->rcvd)) + pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock); + + rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); + + list_del(&rcvd->next); + + pthread_cleanup_pop(true); + + *opcode = rcvd->opcode; + *name = rcvd->name; + *data = rcvd->data; + *len = rcvd->len; + *flags = rcvd->flags; + + iid = rcvd->iid; + + free(rcvd); + + return invoke_id_to_key(iid); +} + +int cdap_reply_send(struct cdap * instance, + cdap_key_t key, int result, uint8_t * data, size_t len) { cdap_t msg = CDAP__INIT; + invoke_id_t iid = key_to_invoke_id(key); if (instance == NULL) - return -1; + return -EINVAL; msg.opcode = OPCODE__REPLY; - msg.invoke_id = invoke_id; + msg.invoke_id = iid; msg.has_result = true; msg.result = result; |