summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/cdap.c153
-rw-r--r--src/lib/shm_ap_rbuff.c2
2 files changed, 92 insertions, 63 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 4c70b2e4..5dc050a4 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -43,6 +43,12 @@ struct cdap {
struct cdap_ops * ops;
};
+struct cdap_info {
+ pthread_t thread;
+ struct cdap * instance;
+ cdap_t * msg;
+};
+
static int next_invoke_id(struct cdap * instance)
{
int ret;
@@ -66,12 +72,84 @@ static int release_invoke_id(struct cdap * instance,
return ret;
}
+static void * handle_cdap_msg(void * o)
+{
+ struct cdap_info * info = (struct cdap_info *) o;
+ struct cdap * instance = info->instance;
+ cdap_t * msg = info->msg;
+
+ switch (msg->opcode) {
+ case OPCODE__READ:
+ if (msg->name != NULL)
+ instance->ops->cdap_read(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__WRITE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_write(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len,
+ msg->flags);
+ break;
+ case OPCODE__CREATE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_create(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len);
+ break;
+ case OPCODE__DELETE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_create(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len);
+ break;
+ case OPCODE__START:
+ if (msg->name != NULL)
+ instance->ops->cdap_start(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__STOP:
+ if (msg->name != NULL)
+ instance->ops->cdap_stop(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__REPLY:
+ instance->ops->cdap_reply(instance,
+ msg->invoke_id,
+ msg->result,
+ msg->value.data,
+ msg->value.len);
+ release_invoke_id(instance, msg->invoke_id);
+ break;
+ default:
+ break;
+ }
+
+ free(info);
+ cdap__free_unpacked(msg, NULL);
+
+ return (void *) 0;
+}
+
static void * sdu_reader(void * o)
{
struct cdap * instance = (struct cdap *) o;
cdap_t * msg;
uint8_t buf[BUF_SIZE];
ssize_t len;
+ struct cdap_info * cdap_info;
while (true) {
len = flow_read(instance->fd, buf, BUF_SIZE);
@@ -82,69 +160,22 @@ static void * sdu_reader(void * o)
if (msg == NULL)
continue;
- switch (msg->opcode) {
- case OPCODE__READ:
- if (msg->name != NULL)
- instance->ops->cdap_read(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__WRITE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_write(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len,
- msg->flags);
- }
- break;
- case OPCODE__CREATE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_create(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len);
- }
- break;
- case OPCODE__DELETE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_create(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len);
- }
- break;
- case OPCODE__START:
- if (msg->name != NULL)
- instance->ops->cdap_start(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__STOP:
- if (msg->name != NULL)
- instance->ops->cdap_stop(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__REPLY:
- instance->ops->cdap_reply(instance,
- msg->invoke_id,
- msg->result,
- msg->value.data,
- msg->value.len);
- release_invoke_id(instance, msg->invoke_id);
- break;
- default:
- break;
+ cdap_info = malloc(sizeof(*cdap_info));
+ if (cdap_info == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
}
- cdap__free_unpacked(msg, NULL);
+ cdap_info->instance = instance;
+ cdap_info->msg = msg;
+
+ pthread_create(&cdap_info->thread,
+ NULL,
+ handle_cdap_msg,
+ (void *) cdap_info);
+
+ pthread_detach(cdap_info->thread);
+
}
return (void *) 0;
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 1c7fd600..4ca29636 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -40,8 +40,6 @@
#include <signal.h>
#include <sys/stat.h>
-#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
-
#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
+ 2 * sizeof (pthread_cond_t))