summaryrefslogtreecommitdiff
path: root/src/lib/cdap.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2017-03-21 16:21:49 +0100
committerSander Vrijders <sander.vrijders@ugent.be>2017-03-21 16:21:49 +0100
commitfef50c3db0e02f0052f1759d508045c44fc4146e (patch)
treefc73859827a5dfebf5022fad37e826d98ba4046f /src/lib/cdap.c
parent4b257b249ea91d1ee7e2341c563bac561911e8a6 (diff)
parentd4e80d41197b75d2c351659c7e8d4546270e677d (diff)
downloadouroboros-fef50c3db0e02f0052f1759d508045c44fc4146e.tar.gz
ouroboros-fef50c3db0e02f0052f1759d508045c44fc4146e.zip
Merge branch 'be' of bitbucket.org:ouroboros-rina/ouroboros into be
Diffstat (limited to 'src/lib/cdap.c')
-rw-r--r--src/lib/cdap.c395
1 files changed, 279 insertions, 116 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 6c46775c..0d1568b2 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -3,7 +3,8 @@
*
* The Common Distributed Application Protocol
*
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -24,6 +25,7 @@
#include <ouroboros/cdap.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/fcntl.h>
#include <ouroboros/errno.h>
@@ -36,33 +38,46 @@
#include "cdap.pb-c.h"
typedef Cdap cdap_t;
-typedef Opcode opcode_t;
typedef int32_t invoke_id_t;
+#define CDAP_REPLY (CDAP_DELETE + 1)
+
#define INVALID_INVOKE_ID -1
#define IDS_SIZE 256
#define BUF_SIZE 2048
-struct cdap {
+struct fd_el {
+ struct list_head next;
+
int fd;
+};
+
+struct cdap {
+ flow_set_t * set;
+
+ size_t n_flows;
+ struct list_head flows;
+ pthread_rwlock_t flows_lock;
struct bmp * ids;
pthread_mutex_t ids_lock;
- pthread_t reader;
-
struct list_head sent;
pthread_rwlock_t sent_lock;
struct list_head rcvd;
pthread_cond_t rcvd_cond;
pthread_mutex_t rcvd_lock;
+
+ pthread_t reader;
};
struct cdap_rcvd {
struct list_head next;
+ int fd;
+
invoke_id_t iid;
enum cdap_opcode opcode;
@@ -131,7 +146,34 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,
return NULL;
}
+static struct cdap_rcvd * cdap_rcvd_get_by_iid(struct cdap * instance,
+ invoke_id_t iid)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+ struct cdap_rcvd * rcvd = NULL;
+
+ assert(instance);
+ assert(iid >= 0);
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ list_for_each_safe(p, h, &instance->rcvd) {
+ rcvd = list_entry(p, struct cdap_rcvd, next);
+ if (rcvd->iid == iid) {
+ pthread_mutex_unlock(&instance->rcvd_lock);
+ list_del(&rcvd->next);
+ return rcvd;
+ }
+ }
+
+ pthread_mutex_unlock(&instance->rcvd_lock);
+
+ return NULL;
+}
+
static struct cdap_req * cdap_sent_add(struct cdap * instance,
+ int fd,
cdap_key_t key)
{
struct cdap_req * req;
@@ -140,7 +182,7 @@ static struct cdap_req * cdap_sent_add(struct cdap * instance,
assert(key >= 0);
assert(!cdap_sent_has_key(instance, key));
- req = cdap_req_create(key);
+ req = cdap_req_create(fd, key);
if (req == NULL)
return NULL;
@@ -207,6 +249,8 @@ static void cdap_rcvd_destroy(struct cdap * instance)
free(r);
}
+ pthread_cond_broadcast(&instance->rcvd_cond);
+
pthread_mutex_unlock(&instance->rcvd_lock);
}
@@ -219,9 +263,15 @@ static void * sdu_reader(void * o)
uint8_t buf[BUF_SIZE];
ssize_t len;
buffer_t data;
+ fqueue_t * fq;
- while (true) {
- len = flow_read(instance->fd, buf, BUF_SIZE);
+ fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) -1;
+
+ while (flow_event_wait(instance->set, fq, NULL)) {
+ int fd = fqueue_next(fq);
+ len = flow_read(fd, buf, BUF_SIZE);
if (len < 0)
continue;
@@ -229,41 +279,20 @@ static void * sdu_reader(void * o)
if (msg == NULL)
continue;
- if (msg->opcode != OPCODE__REPLY) {
+ if (msg->opcode != CDAP_REPLY) {
rcvd = malloc(sizeof(*rcvd));
if (rcvd == NULL) {
cdap__free_unpacked(msg, NULL);
continue;
}
- switch (msg->opcode) {
- case OPCODE__START:
- rcvd->opcode = CDAP_START;
- break;
- case OPCODE__STOP:
- rcvd->opcode = CDAP_STOP;
- break;
- case OPCODE__READ:
- rcvd->opcode = CDAP_READ;
- break;
- case OPCODE__WRITE:
- rcvd->opcode = CDAP_WRITE;
- break;
- case OPCODE__CREATE:
- rcvd->opcode = CDAP_CREATE;
- break;
- case OPCODE__DELETE:
- rcvd->opcode = CDAP_DELETE;
- break;
- default:
- cdap__free_unpacked(msg, NULL);
- free(rcvd);
- continue;
- }
+ assert(msg->name);
- rcvd->iid = msg->invoke_id;
- rcvd->flags = msg->flags;
- rcvd->name = strdup(msg->name);
+ rcvd->opcode = msg->opcode;
+ rcvd->iid = msg->invoke_id;
+ rcvd->flags = msg->flags;
+ rcvd->fd = fd;
+ rcvd->name = strdup(msg->name);
if (rcvd->name == NULL) {
cdap__free_unpacked(msg, NULL);
free(rcvd);
@@ -310,36 +339,32 @@ static void * sdu_reader(void * o)
cdap_req_respond(req, msg->result, data);
}
-
- cdap__free_unpacked(msg, NULL);
}
-
return (void *) 0;
}
-struct cdap * cdap_create(int fd)
+struct cdap * cdap_create()
{
struct cdap * instance = NULL;
- int flags;
-
- if (fd < 0)
- return NULL;
-
- flags = flow_get_flags(fd);
- if (flags & FLOW_O_NONBLOCK)
- return NULL;
instance = malloc(sizeof(*instance));
if (instance == NULL)
return NULL;
+ if (pthread_rwlock_init(&instance->flows_lock, NULL)) {
+ free(instance);
+ return NULL;
+ }
+
if (pthread_mutex_init(&instance->ids_lock, NULL)) {
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
if (pthread_mutex_init(&instance->rcvd_lock, NULL)) {
pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
@@ -347,6 +372,7 @@ struct cdap * cdap_create(int fd)
if (pthread_rwlock_init(&instance->sent_lock, NULL)) {
pthread_mutex_destroy(&instance->rcvd_lock);
pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
@@ -355,6 +381,7 @@ struct cdap * cdap_create(int fd)
pthread_rwlock_destroy(&instance->sent_lock);
pthread_mutex_destroy(&instance->rcvd_lock);
pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
@@ -365,15 +392,29 @@ struct cdap * cdap_create(int fd)
pthread_rwlock_destroy(&instance->sent_lock);
pthread_mutex_destroy(&instance->rcvd_lock);
pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
+ free(instance);
+ return NULL;
+ }
+
+ instance->set = flow_set_create();
+ if (instance->set == NULL) {
+ bmp_destroy(instance->ids);
+ pthread_cond_destroy(&instance->rcvd_cond);
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
+ instance->n_flows = 0;
+
+ list_head_init(&instance->flows);
list_head_init(&instance->sent);
list_head_init(&instance->rcvd);
- instance->fd = fd;
-
pthread_create(&instance->reader, NULL, sdu_reader, instance);
return instance;
@@ -381,12 +422,29 @@ struct cdap * cdap_create(int fd)
int cdap_destroy(struct cdap * instance)
{
+ struct list_head * p;
+ struct list_head * h;
+
if (instance == NULL)
return 0;
pthread_cancel(instance->reader);
pthread_join(instance->reader, NULL);
+ flow_set_destroy(instance->set);
+
+ pthread_rwlock_wrlock(&instance->flows_lock);
+
+ list_for_each_safe(p,h, &instance->flows) {
+ struct fd_el * e = list_entry(p, struct fd_el, next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ pthread_rwlock_destroy(&instance->flows_lock);
+
pthread_mutex_lock(&instance->ids_lock);
bmp_destroy(instance->ids);
@@ -408,14 +466,71 @@ int cdap_destroy(struct cdap * instance)
return 0;
}
-static int write_msg(struct cdap * instance,
+int cdap_add_flow(struct cdap * instance,
+ int fd)
+{
+ struct fd_el * e;
+
+ if (fd < 0)
+ return -EINVAL;
+
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return -ENOMEM;
+
+ e->fd = fd;
+
+ pthread_rwlock_wrlock(&instance->flows_lock);
+
+ if (flow_set_add(instance->set, fd)) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ return -1;
+ }
+
+ list_add(&e->next, &instance->flows);
+
+ ++instance->n_flows;
+
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ return 0;
+}
+
+int cdap_del_flow(struct cdap * instance,
+ int fd)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ if (fd < 0)
+ return -EINVAL;
+
+ pthread_rwlock_wrlock(&instance->flows_lock);
+
+ flow_set_del(instance->set, fd);
+
+ list_for_each_safe(p, h, &instance->flows) {
+ struct fd_el * e = list_entry(p, struct fd_el, next);
+ if (e->fd == fd) {
+ list_del(&e->next);
+ free(e);
+ break;
+ }
+ }
+
+ --instance->n_flows;
+
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ return 0;
+}
+
+static int write_msg(int fd,
cdap_t * msg)
{
- int ret;
uint8_t * data;
size_t len;
- assert(instance);
assert(msg);
len = cdap__get_packed_size(msg);
@@ -428,11 +543,14 @@ static int write_msg(struct cdap * instance,
cdap__pack(msg, data);
- ret = flow_write(instance->fd, data, len);
+ if (flow_write(fd, data, len)) {
+ free(data);
+ return -1;
+ }
free(data);
- return ret;
+ return 0;
}
static cdap_key_t invoke_id_to_key(invoke_id_t iid)
@@ -451,75 +569,115 @@ static invoke_id_t key_to_invoke_id(cdap_key_t key)
return (invoke_id_t) key;
}
-cdap_key_t cdap_request_send(struct cdap * instance,
- enum cdap_opcode code,
- const char * name,
- const void * data,
- size_t len,
- uint32_t flags)
+cdap_key_t * cdap_request_send(struct cdap * instance,
+ enum cdap_opcode code,
+ const char * name,
+ const void * data,
+ size_t len,
+ uint32_t flags)
{
- cdap_t msg = CDAP__INIT;
- struct cdap_req * req;
- invoke_id_t iid;
- cdap_key_t key;
+ cdap_key_t * keys;
+ cdap_key_t * key;
+ cdap_t msg = CDAP__INIT;
+ struct list_head * p;
+ int ret;
- if (instance == NULL || name == NULL)
- return -EINVAL;
+ if (instance == NULL || name == NULL || code > CDAP_DELETE)
+ return NULL;
+ pthread_rwlock_rdlock(&instance->flows_lock);
- iid = next_invoke_id(instance);
- if (iid == INVALID_INVOKE_ID)
- return INVALID_CDAP_KEY;
+ keys = malloc(sizeof(*keys) * (instance->n_flows + 1));
+ if (keys == NULL)
+ return NULL;
- switch (code) {
- case CDAP_READ:
- msg.opcode = OPCODE__READ;
- break;
- case CDAP_WRITE:
- msg.opcode = OPCODE__WRITE;
- break;
- case CDAP_CREATE:
- msg.opcode = OPCODE__CREATE;
- break;
- case CDAP_DELETE:
- msg.opcode = OPCODE__DELETE;
- break;
- case CDAP_START:
- msg.opcode = OPCODE__START;
- break;
- case CDAP_STOP:
- msg.opcode = OPCODE__STOP;
- break;
- default:
- release_invoke_id(instance, iid);
- return -EINVAL;
- }
+ memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1));
+
+ key = keys;
+ cdap__init(&msg);
+
+ msg.opcode = code;
msg.name = (char *) name;
msg.has_flags = true;
msg.flags = flags;
- msg.invoke_id = iid;
+
if (data != NULL) {
msg.has_value = true;
msg.value.data = (uint8_t *) data;
msg.value.len = len;
}
- key = invoke_id_to_key(iid);
+ list_for_each(p, &instance->flows) {
+ struct cdap_req * req;
+ invoke_id_t iid;
+ struct fd_el * e;
+
+ iid = next_invoke_id(instance);
+ if (iid == INVALID_INVOKE_ID) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ while(key > keys) {
+ struct cdap_req * r =
+ cdap_sent_get_by_key(instance,
+ *(--key));
+ cdap_sent_del(instance, r);
+ cdap_req_destroy(r);
+ }
+
+ free(keys);
+ return NULL;
+ }
- req = cdap_sent_add(instance, key);
- if (req == NULL) {
- release_invoke_id(instance, iid);
- return INVALID_CDAP_KEY;
- }
+ msg.invoke_id = iid;
- if (write_msg(instance, &msg)) {
- cdap_sent_del(instance, req);
- release_invoke_id(instance, iid);
- return INVALID_CDAP_KEY;
+ *key = invoke_id_to_key(iid);
+
+ e = list_entry(p, struct fd_el, next);
+
+ req = cdap_sent_add(instance, e->fd, *key);
+ if (req == NULL) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ while(key > keys) {
+ struct cdap_req * r =
+ cdap_sent_get_by_key(instance,
+ *(--key));
+ cdap_sent_del(instance, r);
+ release_invoke_id(instance,
+ key_to_invoke_id(r->key));
+ cdap_req_destroy(r);
+ }
+ release_invoke_id(instance, iid);
+ free(keys);
+ return NULL;
+ }
+
+ ret = write_msg(e->fd, &msg);
+ if (ret == -ENOMEM) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ while(key >= keys) {
+ struct cdap_req * r =
+ cdap_sent_get_by_key(instance, *key);
+ cdap_sent_del(instance, r);
+ release_invoke_id(instance,
+ key_to_invoke_id(r->key));
+ cdap_req_destroy(r);
+ }
+
+ free(keys);
+ return NULL;
+ }
+
+ if (ret < 0) {
+ release_invoke_id(instance, iid);
+ cdap_sent_del(instance, req);
+ }
+
+ ++key;
}
- return key;
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ return keys;
}
int cdap_reply_wait(struct cdap * instance,
@@ -567,7 +725,6 @@ cdap_key_t cdap_request_wait(struct cdap * instance,
uint32_t * flags)
{
struct cdap_rcvd * rcvd;
- invoke_id_t iid;
if (instance == NULL || opcode == NULL || name == NULL || data == NULL
|| len == NULL || flags == NULL)
@@ -583,8 +740,6 @@ cdap_key_t cdap_request_wait(struct cdap * instance,
rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
- list_del(&rcvd->next);
-
pthread_cleanup_pop(true);
*opcode = rcvd->opcode;
@@ -593,11 +748,9 @@ cdap_key_t cdap_request_wait(struct cdap * instance,
*len = rcvd->len;
*flags = rcvd->flags;
- iid = rcvd->iid;
+ rcvd->name = NULL;
- free(rcvd);
-
- return invoke_id_to_key(iid);
+ return invoke_id_to_key(rcvd->iid);
}
int cdap_reply_send(struct cdap * instance,
@@ -606,13 +759,17 @@ int cdap_reply_send(struct cdap * instance,
const void * data,
size_t len)
{
- cdap_t msg = CDAP__INIT;
- invoke_id_t iid = key_to_invoke_id(key);
+ int fd;
+ cdap_t msg = CDAP__INIT;
+ invoke_id_t iid = key_to_invoke_id(key);
+ struct cdap_rcvd * rcvd = cdap_rcvd_get_by_iid(instance, iid);
+ if (rcvd == NULL)
+ return -EINVAL;
if (instance == NULL)
return -EINVAL;
- msg.opcode = OPCODE__REPLY;
+ msg.opcode = CDAP_REPLY;
msg.invoke_id = iid;
msg.has_result = true;
msg.result = result;
@@ -623,5 +780,11 @@ int cdap_reply_send(struct cdap * instance,
msg.value.len = len;
}
- return write_msg(instance, &msg);
+ fd = rcvd->fd;
+
+ assert(rcvd->name == NULL);
+
+ free(rcvd);
+
+ return write_msg(fd, &msg);
}