From 2c8e29ca7a997c5aa9d34e3fa956b120a0bbf20c Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 18 Aug 2016 14:22:06 +0200 Subject: ipcpd: normal: Handle enrollment replies This adds a condition variable with a timeout to the CDAP request so that we can respond correctly to the answer from the remote. It also adds a timeout to the condition variable waiting on completion of enrollment. Furthermore, for every CDAP callback a new thread is now spawned, to avoid deadlocking in case a callback is stuck. --- include/ouroboros/config.h.in | 12 ++-- src/ipcpd/ipcp.c | 6 +- src/ipcpd/normal/main.c | 18 ++++- src/ipcpd/normal/ribmgr.c | 50 +++++++++++--- src/lib/cdap.c | 153 +++++++++++++++++++++++++----------------- src/lib/shm_ap_rbuff.c | 2 - 6 files changed, 163 insertions(+), 78 deletions(-) diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index d5af0c71..84d56e85 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -41,15 +41,19 @@ #define SHM_DU_MAP_FILENAME "/ouroboros.shm" #define LOCKFILE_NAME "/ouroboros.lockfile" #define SHM_BUFFER_SIZE (1 << 14) -#define SHM_DU_TIMEOUT_MICROS 15000 #define DU_BUFF_HEADSPACE 128 #define DU_BUFF_TAILSPACE 0 #define SHM_AP_RBUFF_PREFIX "/ouroboros.rbuff." #define IRMD_MAX_FLOWS 4096 #define IRMD_THREADPOOL_SIZE 5 -#define IRMD_ACCEPT_TIMEOUT 100 /* ms */ -#define IRMD_FLOW_TIMEOUT 5000 /* ms */ #define LOG_DIR "/@LOG_DIR@/" -#define SOCKET_TIMEOUT 2000 /* ms */ +#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC +/* Timeout values */ +#define SHM_DU_TIMEOUT_MICROS 15000 +#define IRMD_ACCEPT_TIMEOUT 100 +#define IRMD_FLOW_TIMEOUT 5000 +#define SOCKET_TIMEOUT 4000 +#define CDAP_REPLY_TIMEOUT 1000 +#define ENROLL_TIMEOUT 2000 #endif diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 544b10df..0263d7b5 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -33,6 +33,8 @@ struct ipcp * ipcp_instance_create() { + pthread_condattr_t cattr; + struct ipcp * i = malloc(sizeof *i); if (i == NULL) return NULL; @@ -43,7 +45,9 @@ struct ipcp * ipcp_instance_create() i->state = IPCP_INIT; pthread_mutex_init(&i->state_lock, NULL); - pthread_cond_init(&i->state_cond, NULL); + pthread_condattr_init(&cattr); + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); + pthread_cond_init(&i->state_cond, &cattr); return i; } diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index cf6ac728..4173246d 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -28,12 +28,14 @@ #include #include #include +#include #include #include #include #include #include +#include #include "fmgr.h" #include "ribmgr.h" @@ -131,6 +133,13 @@ static int normal_ipcp_name_unreg(char * name) static int normal_ipcp_enroll(char * dif_name) { + struct timespec timeout = {(ENROLL_TIMEOUT / 1000), + (ENROLL_TIMEOUT % 1000) * MILLION}; + struct timespec abstime; + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, &timeout, &abstime); + pthread_mutex_lock(&_ipcp->state_lock); if (_ipcp->state != IPCP_INIT) { @@ -147,10 +156,15 @@ static int normal_ipcp_enroll(char * dif_name) return -1; } - /* FIXME: Change into timedwait, see solution in irmd first */ pthread_mutex_lock(&_ipcp->state_lock); while (_ipcp->state != IPCP_ENROLLED) - pthread_cond_wait(&_ipcp->state_cond, &_ipcp->state_lock); + if (pthread_cond_timedwait(&_ipcp->state_cond, + &_ipcp->state_lock, + &abstime) == ETIMEDOUT) { + pthread_mutex_unlock(&_ipcp->state_lock); + LOG_ERR("Enrollment didn't complete in time."); + return -1; + } pthread_mutex_unlock(&_ipcp->state_lock); return 0; diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index bbc29b64..c8d517b5 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -26,10 +26,12 @@ #include #include #include +#include #include #include #include +#include #include "ribmgr.h" #include "dt_const.h" @@ -58,6 +60,9 @@ struct cdap_request { char * name; int invoke_id; struct cdap * instance; + int result; + bool replied; + pthread_cond_t cond; struct list_head next; }; @@ -81,12 +86,19 @@ struct rib { } * rib = NULL; /* Call while holding cdap_reqs_lock */ -int cdap_request_add(struct cdap * instance, +int cdap_result_wait(struct cdap * instance, enum cdap_opcode code, char * name, int invoke_id) { struct cdap_request * req; + pthread_condattr_t cattr; + struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000), + (CDAP_REPLY_TIMEOUT % 1000) * MILLION}; + struct timespec abstime; + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, &timeout, &abstime); req = malloc(sizeof(*req)); if (req == NULL) @@ -95,6 +107,12 @@ int cdap_request_add(struct cdap * instance, req->code = code; req->invoke_id = invoke_id; req->instance = instance; + req->result = -1; + req->replied = false; + + pthread_condattr_init(&cattr); + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); + pthread_cond_init(&req->cond, &cattr); req->name = strdup(name); if (req->name == NULL) { @@ -106,7 +124,15 @@ int cdap_request_add(struct cdap * instance, list_add(&req->next, &rib->cdap_reqs); - return 0; + while (req->replied == false) + if (pthread_cond_timedwait(&req->cond, + &rib->cdap_reqs_lock, + &abstime) == ETIMEDOUT) { + LOG_ERR("Didn't receive a CDAP reply in time."); + return -1; + } + + return req->result; } int ribmgr_init() @@ -190,6 +216,14 @@ int ribmgr_cdap_reply(struct cdap * instance, /* FIXME: In case of a read, update values here */ + req->replied = true; + req->result = result; + pthread_cond_broadcast(&req->cond); + pthread_mutex_unlock(&rib->cdap_reqs_lock); + + sched_yield(); + + pthread_mutex_lock(&rib->cdap_reqs_lock); free(req->name); list_del(&req->next); free(req); @@ -350,11 +384,11 @@ int ribmgr_cdap_start(struct cdap * instance, return -1; } - if (cdap_request_add(instance, WRITE, STATIC_INFO, iid)) { + if (cdap_result_wait(instance, WRITE, STATIC_INFO, iid)) { pthread_mutex_unlock(&rib->cdap_reqs_lock); pthread_mutex_unlock(&_ipcp->state_lock); free(data); - LOG_ERR("Failed to add CDAP request to list."); + LOG_ERR("Remote did not receive static information."); return -1; } pthread_mutex_unlock(&rib->cdap_reqs_lock); @@ -374,11 +408,11 @@ int ribmgr_cdap_start(struct cdap * instance, return -1; } - if (cdap_request_add(instance, STOP, ENROLLMENT, iid)) { + if (cdap_result_wait(instance, STOP, ENROLLMENT, iid)) { pthread_mutex_unlock(&rib->cdap_reqs_lock); pthread_mutex_unlock(&_ipcp->state_lock); free(data); - LOG_ERR("Failed to add CDAP request to list."); + LOG_ERR("Remote failed to complete enrollment."); return -1; } pthread_mutex_unlock(&rib->cdap_reqs_lock); @@ -470,10 +504,10 @@ int ribmgr_add_flow(int fd) return -1; } - if (cdap_request_add(instance, START, ENROLLMENT, iid)) { + if (cdap_result_wait(instance, START, ENROLLMENT, iid)) { pthread_mutex_unlock(&rib->cdap_reqs_lock); pthread_rwlock_unlock(&rib->flows_lock); - LOG_ERR("Failed to add CDAP request to list."); + LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); return -1; diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 4c70b2e4..5dc050a4 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -43,6 +43,12 @@ struct cdap { struct cdap_ops * ops; }; +struct cdap_info { + pthread_t thread; + struct cdap * instance; + cdap_t * msg; +}; + static int next_invoke_id(struct cdap * instance) { int ret; @@ -66,12 +72,84 @@ static int release_invoke_id(struct cdap * instance, return ret; } +static void * handle_cdap_msg(void * o) +{ + struct cdap_info * info = (struct cdap_info *) o; + struct cdap * instance = info->instance; + cdap_t * msg = info->msg; + + switch (msg->opcode) { + case OPCODE__READ: + if (msg->name != NULL) + instance->ops->cdap_read(instance, + msg->invoke_id, + msg->name); + break; + case OPCODE__WRITE: + if (msg->name != NULL && + msg->has_value) + instance->ops->cdap_write(instance, + msg->invoke_id, + msg->name, + msg->value.data, + msg->value.len, + msg->flags); + break; + case OPCODE__CREATE: + if (msg->name != NULL && + msg->has_value) + instance->ops->cdap_create(instance, + msg->invoke_id, + msg->name, + msg->value.data, + msg->value.len); + break; + case OPCODE__DELETE: + if (msg->name != NULL && + msg->has_value) + instance->ops->cdap_create(instance, + msg->invoke_id, + msg->name, + msg->value.data, + msg->value.len); + break; + case OPCODE__START: + if (msg->name != NULL) + instance->ops->cdap_start(instance, + msg->invoke_id, + msg->name); + break; + case OPCODE__STOP: + if (msg->name != NULL) + instance->ops->cdap_stop(instance, + msg->invoke_id, + msg->name); + break; + case OPCODE__REPLY: + instance->ops->cdap_reply(instance, + msg->invoke_id, + msg->result, + msg->value.data, + msg->value.len); + release_invoke_id(instance, msg->invoke_id); + break; + default: + break; + } + + free(info); + cdap__free_unpacked(msg, NULL); + + return (void *) 0; +} + static void * sdu_reader(void * o) { struct cdap * instance = (struct cdap *) o; cdap_t * msg; uint8_t buf[BUF_SIZE]; ssize_t len; + struct cdap_info * cdap_info; while (true) { len = flow_read(instance->fd, buf, BUF_SIZE); @@ -82,69 +160,22 @@ static void * sdu_reader(void * o) if (msg == NULL) continue; - switch (msg->opcode) { - case OPCODE__READ: - if (msg->name != NULL) - instance->ops->cdap_read(instance, - msg->invoke_id, - msg->name); - break; - case OPCODE__WRITE: - if (msg->name != NULL && - msg->has_value) { - instance->ops->cdap_write(instance, - msg->invoke_id, - msg->name, - msg->value.data, - msg->value.len, - msg->flags); - } - break; - case OPCODE__CREATE: - if (msg->name != NULL && - msg->has_value) { - instance->ops->cdap_create(instance, - msg->invoke_id, - msg->name, - msg->value.data, - msg->value.len); - } - break; - case OPCODE__DELETE: - if (msg->name != NULL && - msg->has_value) { - instance->ops->cdap_create(instance, - msg->invoke_id, - msg->name, - msg->value.data, - msg->value.len); - } - break; - case OPCODE__START: - if (msg->name != NULL) - instance->ops->cdap_start(instance, - msg->invoke_id, - msg->name); - break; - case OPCODE__STOP: - if (msg->name != NULL) - instance->ops->cdap_stop(instance, - msg->invoke_id, - msg->name); - break; - case OPCODE__REPLY: - instance->ops->cdap_reply(instance, - msg->invoke_id, - msg->result, - msg->value.data, - msg->value.len); - release_invoke_id(instance, msg->invoke_id); - break; - default: - break; + cdap_info = malloc(sizeof(*cdap_info)); + if (cdap_info == NULL) { + cdap__free_unpacked(msg, NULL); + continue; } - cdap__free_unpacked(msg, NULL); + cdap_info->instance = instance; + cdap_info->msg = msg; + + pthread_create(&cdap_info->thread, + NULL, + handle_cdap_msg, + (void *) cdap_info); + + pthread_detach(cdap_info->thread); + } return (void *) 0; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 1c7fd600..4ca29636 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -40,8 +40,6 @@ #include #include -#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC - #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) -- cgit v1.2.3