summaryrefslogtreecommitdiff
path: root/src/lib/cdap.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/cdap.c')
-rw-r--r--src/lib/cdap.c531
1 files changed, 394 insertions, 137 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index df79be54..dee8f88c 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -25,39 +25,65 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
#include <ouroboros/fcntl.h>
+#include <ouroboros/errno.h>
+
+#include "cdap_req.h"
#include <stdlib.h>
#include <pthread.h>
+#include <string.h>
+#include <assert.h>
#include "cdap.pb-c.h"
typedef Cdap cdap_t;
typedef Opcode opcode_t;
+typedef int32_t invoke_id_t;
+
+#define INVALID_INVOKE_ID -1
#define IDS_SIZE 256
#define BUF_SIZE 2048
struct cdap {
- int fd;
- struct bmp * ids;
- pthread_mutex_t ids_lock;
- pthread_t reader;
- struct cdap_ops * ops;
+ int fd;
+
+ struct bmp * ids;
+ pthread_mutex_t ids_lock;
+
+ pthread_t reader;
+
+ struct list_head sent;
+ pthread_rwlock_t sent_lock;
+
+ struct list_head rcvd;
+ pthread_cond_t rcvd_cond;
+ pthread_mutex_t rcvd_lock;
};
-struct cdap_info {
- pthread_t thread;
- struct cdap * instance;
- cdap_t * msg;
+struct cdap_rcvd {
+ struct list_head next;
+
+ invoke_id_t iid;
+
+ enum cdap_opcode opcode;
+ char * name;
+ uint8_t * data;
+ size_t len;
+ uint32_t flags;
};
static int next_invoke_id(struct cdap * instance)
{
int ret;
+ assert(instance);
+
pthread_mutex_lock(&instance->ids_lock);
+
ret = bmp_allocate(instance->ids);
if (!bmp_is_id_valid(instance->ids, ret))
- ret = -1; /* INVALID_INVOKE_ID */
+ ret = INVALID_INVOKE_ID;
+
pthread_mutex_unlock(&instance->ids_lock);
return ret;
@@ -67,140 +93,225 @@ static int release_invoke_id(struct cdap * instance, int id)
{
int ret;
+ assert(instance);
+
pthread_mutex_lock(&instance->ids_lock);
+
ret = bmp_release(instance->ids, id);
+
pthread_mutex_unlock(&instance->ids_lock);
return ret;
}
-static void * handle_cdap_msg(void * o)
+#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL)
+
+struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, cdap_key_t key)
{
- struct cdap_info * info = (struct cdap_info *) o;
- struct cdap * instance = info->instance;
- cdap_t * msg = info->msg;
-
- switch (msg->opcode) {
- case OPCODE__READ:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_READ,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__WRITE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_WRITE,
- msg->name,
- msg->value.data,
- msg->value.len,
- msg->flags);
- break;
- case OPCODE__CREATE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_CREATE,
- msg->name,
- msg->value.data,
- msg->value.len, 0);
- break;
- case OPCODE__DELETE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_DELETE,
- msg->name,
- msg->value.data,
- msg->value.len, 0);
- break;
- case OPCODE__START:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_START,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__STOP:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_STOP,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__REPLY:
- instance->ops->cdap_reply(instance,
- msg->invoke_id,
- msg->result,
- msg->value.data,
- msg->value.len);
- release_invoke_id(instance, msg->invoke_id);
- break;
- default:
- break;
+ struct list_head * p = NULL;
+ struct cdap_req * req = NULL;
+
+ assert(instance);
+ assert(key >= 0);
+
+ pthread_rwlock_rdlock(&instance->sent_lock);
+
+ list_for_each(p, &instance->sent) {
+ req = list_entry(p, struct cdap_req, next);
+ if (req->key == key) {
+ pthread_rwlock_unlock(&instance->sent_lock);
+ return req;
+ }
}
- free(info);
- cdap__free_unpacked(msg, NULL);
+ pthread_rwlock_unlock(&instance->sent_lock);
- return (void *) 0;
+ return NULL;
+}
+
+static int cdap_sent_add(struct cdap * instance, struct cdap_req * req)
+{
+ assert (instance);
+ assert (req);
+
+ if (cdap_sent_has_key(instance, req->key))
+ return -EPERM;
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_add(&req->next, &instance->sent);
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+
+ return 0;
+}
+
+static void cdap_sent_del(struct cdap * instance, struct cdap_req * req)
+{
+ assert(instance);
+ assert(req);
+
+ assert(cdap_sent_has_key(instance, req->key));
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_del(&req->next);
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+}
+
+static void cdap_sent_destroy(struct cdap * instance)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+
+ assert(instance);
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_for_each_safe(p, h, &instance->sent) {
+ struct cdap_req * req = list_entry(p, struct cdap_req, next);
+ list_del(&req->next);
+ cdap_req_destroy(req);
+ }
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+}
+
+static void cdap_rcvd_destroy(struct cdap * instance)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+
+ assert(instance);
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ list_for_each_safe(p, h, &instance->sent) {
+ struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next);
+ list_del(&r->next);
+ if (r->data != NULL)
+ free(r->data);
+ if (r->name != NULL)
+ free(r->name);
+ free(r);
+ }
+
+ pthread_mutex_unlock(&instance->rcvd_lock);
}
static void * sdu_reader(void * o)
{
struct cdap * instance = (struct cdap *) o;
+ struct cdap_req * req;
+ struct cdap_rcvd * rcvd;
cdap_t * msg;
uint8_t buf[BUF_SIZE];
ssize_t len;
- struct cdap_info * cdap_info;
+ buffer_t data;
while (true) {
len = flow_read(instance->fd, buf, BUF_SIZE);
if (len < 0)
- return (void *) -1;
+ continue;
msg = cdap__unpack(NULL, len, buf);
if (msg == NULL)
continue;
- cdap_info = malloc(sizeof(*cdap_info));
- if (cdap_info == NULL) {
- cdap__free_unpacked(msg, NULL);
- continue;
+ if (msg->opcode != OPCODE__REPLY) {
+ rcvd = malloc(sizeof(*rcvd));
+ if (rcvd == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
+ }
+
+ switch (msg->opcode) {
+ case OPCODE__START:
+ rcvd->opcode = CDAP_START;
+ break;
+ case OPCODE__STOP:
+ rcvd->opcode = CDAP_STOP;
+ break;
+ case OPCODE__READ:
+ rcvd->opcode = CDAP_READ;
+ break;
+ case OPCODE__WRITE:
+ rcvd->opcode = CDAP_WRITE;
+ break;
+ case OPCODE__CREATE:
+ rcvd->opcode = CDAP_CREATE;
+ break;
+ case OPCODE__DELETE:
+ rcvd->opcode = CDAP_DELETE;
+ break;
+ default:
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+ rcvd->iid = msg->invoke_id;
+ rcvd->flags = msg->flags;
+ rcvd->name = strdup(msg->name);
+ if (rcvd->name == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+
+ if (msg->has_value) {
+ rcvd->len = msg->value.len;
+ rcvd->data = malloc(rcvd->len);
+ if (rcvd->data == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+ memcpy(rcvd->data, msg->value.data, rcvd->len);
+ } else {
+ rcvd->len = 0;
+ rcvd->data = NULL;
+ }
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ list_add(&rcvd->next, &instance->rcvd);
+
+ pthread_cond_signal(&instance->rcvd_cond);
+ pthread_mutex_unlock(&instance->rcvd_lock);
+ } else {
+ req = cdap_sent_get_by_key(instance, msg->invoke_id);
+ if (req == NULL)
+ continue;
+
+ if (msg->has_value) {
+ data.len = msg->value.len;
+ data.data = malloc(data.len);
+ if (data.data == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
+ }
+ memcpy(data.data, msg->value.data, data.len);
+ } else {
+ data.len = 0;
+ data.data = NULL;
+ }
+
+ cdap_req_respond(req, msg->result, data);
}
- cdap_info->instance = instance;
- cdap_info->msg = msg;
-
- pthread_create(&cdap_info->thread,
- NULL,
- handle_cdap_msg,
- (void *) cdap_info);
-
- pthread_detach(cdap_info->thread);
-
+ cdap__free_unpacked(msg, NULL);
}
return (void *) 0;
}
-struct cdap * cdap_create(struct cdap_ops * ops,
- int fd)
+struct cdap * cdap_create(int fd)
{
struct cdap * instance = NULL;
int flags;
- if (ops == NULL || fd < 0 ||
- ops->cdap_reply == NULL ||
- ops->cdap_request == NULL)
+ if (fd < 0)
return NULL;
flags = flow_get_flags(fd);
@@ -216,19 +327,43 @@ struct cdap * cdap_create(struct cdap_ops * ops,
return NULL;
}
- instance->ops = ops;
- instance->fd = fd;
+ if (pthread_mutex_init(&instance->rcvd_lock, NULL)) {
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
+
+ if (pthread_rwlock_init(&instance->sent_lock, NULL)) {
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
+
+ if (pthread_cond_init(&instance->rcvd_cond, NULL)) {
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
instance->ids = bmp_create(IDS_SIZE, 0);
if (instance->ids == NULL) {
+ pthread_cond_destroy(&instance->rcvd_cond);
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
free(instance);
return NULL;
}
- pthread_create(&instance->reader,
- NULL,
- sdu_reader,
- (void *) instance);
+ INIT_LIST_HEAD(&instance->sent);
+ INIT_LIST_HEAD(&instance->rcvd);
+
+ instance->fd = fd;
+
+ pthread_create(&instance->reader, NULL, sdu_reader, instance);
return instance;
}
@@ -247,27 +382,37 @@ int cdap_destroy(struct cdap * instance)
pthread_mutex_unlock(&instance->ids_lock);
- flow_dealloc(instance->fd);
+ pthread_mutex_destroy(&instance->ids_lock);
+
+ cdap_sent_destroy(instance);
+
+ pthread_rwlock_destroy(&instance->sent_lock);
+
+ cdap_rcvd_destroy(instance);
+
+ pthread_mutex_destroy(&instance->rcvd_lock);
free(instance);
return 0;
}
-static int write_msg(struct cdap * instance,
- cdap_t * msg)
+static int write_msg(struct cdap * instance, cdap_t * msg)
{
int ret;
uint8_t * data;
size_t len;
+ assert(instance);
+ assert(msg);
+
len = cdap__get_packed_size(msg);
if (len == 0)
return -1;
- data = malloc(BUF_SIZE);
+ data = malloc(len);
if (data == NULL)
- return -1;
+ return -ENOMEM;
cdap__pack(msg, data);
@@ -278,22 +423,41 @@ static int write_msg(struct cdap * instance,
return ret;
}
-int cdap_send_request(struct cdap * instance,
- enum cdap_opcode code,
- char * name,
- uint8_t * data,
- size_t len,
- uint32_t flags)
+static cdap_key_t invoke_id_to_key(invoke_id_t iid)
+{
+ if (iid == INVALID_INVOKE_ID)
+ return INVALID_CDAP_KEY;
+
+ return (cdap_key_t) iid;
+}
+
+static invoke_id_t key_to_invoke_id(cdap_key_t key)
+{
+ if (key == INVALID_CDAP_KEY)
+ return INVALID_INVOKE_ID;
+
+ return (invoke_id_t) key;
+}
+
+cdap_key_t cdap_request_send(struct cdap * instance,
+ enum cdap_opcode code,
+ char * name,
+ uint8_t * data,
+ size_t len,
+ uint32_t flags)
{
- int id;
cdap_t msg = CDAP__INIT;
+ struct cdap_req * req;
+ invoke_id_t iid;
+ cdap_key_t key;
if (instance == NULL || name == NULL)
- return -1;
+ return -EINVAL;
- id = next_invoke_id(instance);
- if (!bmp_is_id_valid(instance->ids, id))
- return -1;
+
+ iid = next_invoke_id(instance);
+ if (iid == INVALID_INVOKE_ID)
+ return INVALID_CDAP_KEY;
switch (code) {
case CDAP_READ:
@@ -315,39 +479,132 @@ int cdap_send_request(struct cdap * instance,
msg.opcode = OPCODE__STOP;
break;
default:
- release_invoke_id(instance, id);
- return -1;
+ release_invoke_id(instance, iid);
+ return -EINVAL;
}
msg.name = name;
msg.has_flags = true;
msg.flags = flags;
- msg.invoke_id = id;
+ msg.invoke_id = iid;
if (data != NULL) {
msg.has_value = true;
msg.value.data = data;
msg.value.len = len;
}
- if (write_msg(instance, &msg))
- return -1;
+ key = invoke_id_to_key(iid);
+
+ req = cdap_req_create(key);
+ if (req == NULL)
+ return INVALID_CDAP_KEY;
+
+ if (cdap_sent_add(instance, req)) {
+ cdap_req_destroy(req);
+ return INVALID_CDAP_KEY;
+ }
+
+ if (write_msg(instance, &msg)) {
+ cdap_sent_del(instance, req);
+ cdap_req_destroy(req);
+ return INVALID_CDAP_KEY;
+ }
+
+ return key;
+}
+
+int cdap_reply_wait(struct cdap * instance,
+ cdap_key_t key,
+ uint8_t ** data,
+ size_t * len)
+{
+ int ret;
+ struct cdap_req * r;
+ invoke_id_t iid = key_to_invoke_id(key);
+
+ if (instance == NULL || iid == INVALID_INVOKE_ID)
+ return -EINVAL;
+
+ r = cdap_sent_get_by_key(instance, key);
+ if (r == NULL)
+ return -EINVAL;
+
+ ret = cdap_req_wait(r);
+ if (ret < 0)
+ return ret;
+
+ if (r->response)
+ return r->response;
+
+ assert(ret == 0);
+
+ if (data != NULL) {
+ *data = r->data.data;
+ *len = r->data.len;
+ }
+
+ cdap_sent_del(instance, r);
- return id;
+ release_invoke_id(instance, iid);
+
+ return 0;
}
-int cdap_send_reply(struct cdap * instance,
- int invoke_id,
+cdap_key_t cdap_request_wait(struct cdap * instance,
+ enum cdap_opcode * opcode,
+ char ** name,
+ uint8_t ** data,
+ size_t * len,
+ uint32_t * flags)
+{
+ struct cdap_rcvd * rcvd;
+ invoke_id_t iid;
+
+ if (instance == NULL || opcode == NULL || name == NULL || data == NULL
+ || len == NULL || flags == NULL)
+ return -EINVAL;
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) &instance->rcvd_lock);
+
+ while (list_empty(&instance->rcvd))
+ pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock);
+
+ rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
+
+ list_del(&rcvd->next);
+
+ pthread_cleanup_pop(true);
+
+ *opcode = rcvd->opcode;
+ *name = rcvd->name;
+ *data = rcvd->data;
+ *len = rcvd->len;
+ *flags = rcvd->flags;
+
+ iid = rcvd->iid;
+
+ free(rcvd);
+
+ return invoke_id_to_key(iid);
+}
+
+int cdap_reply_send(struct cdap * instance,
+ cdap_key_t key,
int result,
uint8_t * data,
size_t len)
{
cdap_t msg = CDAP__INIT;
+ invoke_id_t iid = key_to_invoke_id(key);
if (instance == NULL)
- return -1;
+ return -EINVAL;
msg.opcode = OPCODE__REPLY;
- msg.invoke_id = invoke_id;
+ msg.invoke_id = iid;
msg.has_result = true;
msg.result = result;