From 2c8e29ca7a997c5aa9d34e3fa956b120a0bbf20c Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 18 Aug 2016 14:22:06 +0200 Subject: ipcpd: normal: Handle enrollment replies This adds a condition variable with a timeout to the CDAP request so that we can respond correctly to the answer from the remote. It also adds a timeout to the condition variable waiting on completion of enrollment. Furthermore, for every CDAP callback a new thread is now spawned, to avoid deadlocking in case a callback is stuck. --- src/lib/cdap.c | 153 +++++++++++++++++++++++++++++-------------------- src/lib/shm_ap_rbuff.c | 2 - 2 files changed, 92 insertions(+), 63 deletions(-) (limited to 'src/lib') diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 4c70b2e4..5dc050a4 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -43,6 +43,12 @@ struct cdap { struct cdap_ops * ops; }; +struct cdap_info { + pthread_t thread; + struct cdap * instance; + cdap_t * msg; +}; + static int next_invoke_id(struct cdap * instance) { int ret; @@ -66,12 +72,84 @@ static int release_invoke_id(struct cdap * instance, return ret; } +static void * handle_cdap_msg(void * o) +{ + struct cdap_info * info = (struct cdap_info *) o; + struct cdap * instance = info->instance; + cdap_t * msg = info->msg; + + switch (msg->opcode) { + case OPCODE__READ: + if (msg->name != NULL) + instance->ops->cdap_read(instance, + msg->invoke_id, + msg->name); + break; + case OPCODE__WRITE: + if (msg->name != NULL && + msg->has_value) + instance->ops->cdap_write(instance, + msg->invoke_id, + msg->name, + msg->value.data, + msg->value.len, + msg->flags); + break; + case OPCODE__CREATE: + if (msg->name != NULL && + msg->has_value) + instance->ops->cdap_create(instance, + msg->invoke_id, + msg->name, + msg->value.data, + msg->value.len); + break; + case OPCODE__DELETE: + if (msg->name != NULL && + msg->has_value) + instance->ops->cdap_create(instance, + msg->invoke_id, + msg->name, + msg->value.data, + msg->value.len); + break; + case OPCODE__START: + if (msg->name != NULL) + instance->ops->cdap_start(instance, + msg->invoke_id, + msg->name); + break; + case OPCODE__STOP: + if (msg->name != NULL) + instance->ops->cdap_stop(instance, + msg->invoke_id, + msg->name); + break; + case OPCODE__REPLY: + instance->ops->cdap_reply(instance, + msg->invoke_id, + msg->result, + msg->value.data, + msg->value.len); + release_invoke_id(instance, msg->invoke_id); + break; + default: + break; + } + + free(info); + cdap__free_unpacked(msg, NULL); + + return (void *) 0; +} + static void * sdu_reader(void * o) { struct cdap * instance = (struct cdap *) o; cdap_t * msg; uint8_t buf[BUF_SIZE]; ssize_t len; + struct cdap_info * cdap_info; while (true) { len = flow_read(instance->fd, buf, BUF_SIZE); @@ -82,69 +160,22 @@ static void * sdu_reader(void * o) if (msg == NULL) continue; - switch (msg->opcode) { - case OPCODE__READ: - if (msg->name != NULL) - instance->ops->cdap_read(instance, - msg->invoke_id, - msg->name); - break; - case OPCODE__WRITE: - if (msg->name != NULL && - msg->has_value) { - instance->ops->cdap_write(instance, - msg->invoke_id, - msg->name, - msg->value.data, - msg->value.len, - msg->flags); - } - break; - case OPCODE__CREATE: - if (msg->name != NULL && - msg->has_value) { - instance->ops->cdap_create(instance, - msg->invoke_id, - msg->name, - msg->value.data, - msg->value.len); - } - break; - case OPCODE__DELETE: - if (msg->name != NULL && - msg->has_value) { - instance->ops->cdap_create(instance, - msg->invoke_id, - msg->name, - msg->value.data, - msg->value.len); - } - break; - case OPCODE__START: - if (msg->name != NULL) - instance->ops->cdap_start(instance, - msg->invoke_id, - msg->name); - break; - case OPCODE__STOP: - if (msg->name != NULL) - instance->ops->cdap_stop(instance, - msg->invoke_id, - msg->name); - break; - case OPCODE__REPLY: - instance->ops->cdap_reply(instance, - msg->invoke_id, - msg->result, - msg->value.data, - msg->value.len); - release_invoke_id(instance, msg->invoke_id); - break; - default: - break; + cdap_info = malloc(sizeof(*cdap_info)); + if (cdap_info == NULL) { + cdap__free_unpacked(msg, NULL); + continue; } - cdap__free_unpacked(msg, NULL); + cdap_info->instance = instance; + cdap_info->msg = msg; + + pthread_create(&cdap_info->thread, + NULL, + handle_cdap_msg, + (void *) cdap_info); + + pthread_detach(cdap_info->thread); + } return (void *) 0; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 1c7fd600..4ca29636 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -40,8 +40,6 @@ #include #include -#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC - #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) -- cgit v1.2.3