summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-08-18 14:22:06 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-08-19 13:24:39 +0200
commit2c8e29ca7a997c5aa9d34e3fa956b120a0bbf20c (patch)
tree6807a23a6def167a2b9ab26937fe25bbcc2a8064
parent0192488015770b4855165db8502214dad1941dd2 (diff)
downloadouroboros-2c8e29ca7a997c5aa9d34e3fa956b120a0bbf20c.tar.gz
ouroboros-2c8e29ca7a997c5aa9d34e3fa956b120a0bbf20c.zip
ipcpd: normal: Handle enrollment replies
This adds a condition variable with a timeout to the CDAP request so that we can respond correctly to the answer from the remote. It also adds a timeout to the condition variable waiting on completion of enrollment. Furthermore, for every CDAP callback a new thread is now spawned, to avoid deadlocking in case a callback is stuck.
-rw-r--r--include/ouroboros/config.h.in12
-rw-r--r--src/ipcpd/ipcp.c6
-rw-r--r--src/ipcpd/normal/main.c18
-rw-r--r--src/ipcpd/normal/ribmgr.c50
-rw-r--r--src/lib/cdap.c153
-rw-r--r--src/lib/shm_ap_rbuff.c2
6 files changed, 163 insertions, 78 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index d5af0c71..84d56e85 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -41,15 +41,19 @@
#define SHM_DU_MAP_FILENAME "/ouroboros.shm"
#define LOCKFILE_NAME "/ouroboros.lockfile"
#define SHM_BUFFER_SIZE (1 << 14)
-#define SHM_DU_TIMEOUT_MICROS 15000
#define DU_BUFF_HEADSPACE 128
#define DU_BUFF_TAILSPACE 0
#define SHM_AP_RBUFF_PREFIX "/ouroboros.rbuff."
#define IRMD_MAX_FLOWS 4096
#define IRMD_THREADPOOL_SIZE 5
-#define IRMD_ACCEPT_TIMEOUT 100 /* ms */
-#define IRMD_FLOW_TIMEOUT 5000 /* ms */
#define LOG_DIR "/@LOG_DIR@/"
-#define SOCKET_TIMEOUT 2000 /* ms */
+#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
+/* Timeout values */
+#define SHM_DU_TIMEOUT_MICROS 15000
+#define IRMD_ACCEPT_TIMEOUT 100
+#define IRMD_FLOW_TIMEOUT 5000
+#define SOCKET_TIMEOUT 4000
+#define CDAP_REPLY_TIMEOUT 1000
+#define ENROLL_TIMEOUT 2000
#endif
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 544b10df..0263d7b5 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -33,6 +33,8 @@
struct ipcp * ipcp_instance_create()
{
+ pthread_condattr_t cattr;
+
struct ipcp * i = malloc(sizeof *i);
if (i == NULL)
return NULL;
@@ -43,7 +45,9 @@ struct ipcp * ipcp_instance_create()
i->state = IPCP_INIT;
pthread_mutex_init(&i->state_lock, NULL);
- pthread_cond_init(&i->state_cond, NULL);
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+ pthread_cond_init(&i->state_cond, &cattr);
return i;
}
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index cf6ac728..4173246d 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -28,12 +28,14 @@
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp.h>
+#include <ouroboros/time_utils.h>
#include <stdbool.h>
#include <signal.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
+#include <errno.h>
#include "fmgr.h"
#include "ribmgr.h"
@@ -131,6 +133,13 @@ static int normal_ipcp_name_unreg(char * name)
static int normal_ipcp_enroll(char * dif_name)
{
+ struct timespec timeout = {(ENROLL_TIMEOUT / 1000),
+ (ENROLL_TIMEOUT % 1000) * MILLION};
+ struct timespec abstime;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, &timeout, &abstime);
+
pthread_mutex_lock(&_ipcp->state_lock);
if (_ipcp->state != IPCP_INIT) {
@@ -147,10 +156,15 @@ static int normal_ipcp_enroll(char * dif_name)
return -1;
}
- /* FIXME: Change into timedwait, see solution in irmd first */
pthread_mutex_lock(&_ipcp->state_lock);
while (_ipcp->state != IPCP_ENROLLED)
- pthread_cond_wait(&_ipcp->state_cond, &_ipcp->state_lock);
+ if (pthread_cond_timedwait(&_ipcp->state_cond,
+ &_ipcp->state_lock,
+ &abstime) == ETIMEDOUT) {
+ pthread_mutex_unlock(&_ipcp->state_lock);
+ LOG_ERR("Enrollment didn't complete in time.");
+ return -1;
+ }
pthread_mutex_unlock(&_ipcp->state_lock);
return 0;
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index bbc29b64..c8d517b5 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -26,10 +26,12 @@
#include <ouroboros/logs.h>
#include <ouroboros/cdap.h>
#include <ouroboros/list.h>
+#include <ouroboros/time_utils.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
+#include <errno.h>
#include "ribmgr.h"
#include "dt_const.h"
@@ -58,6 +60,9 @@ struct cdap_request {
char * name;
int invoke_id;
struct cdap * instance;
+ int result;
+ bool replied;
+ pthread_cond_t cond;
struct list_head next;
};
@@ -81,12 +86,19 @@ struct rib {
} * rib = NULL;
/* Call while holding cdap_reqs_lock */
-int cdap_request_add(struct cdap * instance,
+int cdap_result_wait(struct cdap * instance,
enum cdap_opcode code,
char * name,
int invoke_id)
{
struct cdap_request * req;
+ pthread_condattr_t cattr;
+ struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000),
+ (CDAP_REPLY_TIMEOUT % 1000) * MILLION};
+ struct timespec abstime;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, &timeout, &abstime);
req = malloc(sizeof(*req));
if (req == NULL)
@@ -95,6 +107,12 @@ int cdap_request_add(struct cdap * instance,
req->code = code;
req->invoke_id = invoke_id;
req->instance = instance;
+ req->result = -1;
+ req->replied = false;
+
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+ pthread_cond_init(&req->cond, &cattr);
req->name = strdup(name);
if (req->name == NULL) {
@@ -106,7 +124,15 @@ int cdap_request_add(struct cdap * instance,
list_add(&req->next, &rib->cdap_reqs);
- return 0;
+ while (req->replied == false)
+ if (pthread_cond_timedwait(&req->cond,
+ &rib->cdap_reqs_lock,
+ &abstime) == ETIMEDOUT) {
+ LOG_ERR("Didn't receive a CDAP reply in time.");
+ return -1;
+ }
+
+ return req->result;
}
int ribmgr_init()
@@ -190,6 +216,14 @@ int ribmgr_cdap_reply(struct cdap * instance,
/* FIXME: In case of a read, update values here */
+ req->replied = true;
+ req->result = result;
+ pthread_cond_broadcast(&req->cond);
+ pthread_mutex_unlock(&rib->cdap_reqs_lock);
+
+ sched_yield();
+
+ pthread_mutex_lock(&rib->cdap_reqs_lock);
free(req->name);
list_del(&req->next);
free(req);
@@ -350,11 +384,11 @@ int ribmgr_cdap_start(struct cdap * instance,
return -1;
}
- if (cdap_request_add(instance, WRITE, STATIC_INFO, iid)) {
+ if (cdap_result_wait(instance, WRITE, STATIC_INFO, iid)) {
pthread_mutex_unlock(&rib->cdap_reqs_lock);
pthread_mutex_unlock(&_ipcp->state_lock);
free(data);
- LOG_ERR("Failed to add CDAP request to list.");
+ LOG_ERR("Remote did not receive static information.");
return -1;
}
pthread_mutex_unlock(&rib->cdap_reqs_lock);
@@ -374,11 +408,11 @@ int ribmgr_cdap_start(struct cdap * instance,
return -1;
}
- if (cdap_request_add(instance, STOP, ENROLLMENT, iid)) {
+ if (cdap_result_wait(instance, STOP, ENROLLMENT, iid)) {
pthread_mutex_unlock(&rib->cdap_reqs_lock);
pthread_mutex_unlock(&_ipcp->state_lock);
free(data);
- LOG_ERR("Failed to add CDAP request to list.");
+ LOG_ERR("Remote failed to complete enrollment.");
return -1;
}
pthread_mutex_unlock(&rib->cdap_reqs_lock);
@@ -470,10 +504,10 @@ int ribmgr_add_flow(int fd)
return -1;
}
- if (cdap_request_add(instance, START, ENROLLMENT, iid)) {
+ if (cdap_result_wait(instance, START, ENROLLMENT, iid)) {
pthread_mutex_unlock(&rib->cdap_reqs_lock);
pthread_rwlock_unlock(&rib->flows_lock);
- LOG_ERR("Failed to add CDAP request to list.");
+ LOG_ERR("Failed to start enrollment.");
cdap_destroy(instance);
free(flow);
return -1;
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 4c70b2e4..5dc050a4 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -43,6 +43,12 @@ struct cdap {
struct cdap_ops * ops;
};
+struct cdap_info {
+ pthread_t thread;
+ struct cdap * instance;
+ cdap_t * msg;
+};
+
static int next_invoke_id(struct cdap * instance)
{
int ret;
@@ -66,12 +72,84 @@ static int release_invoke_id(struct cdap * instance,
return ret;
}
+static void * handle_cdap_msg(void * o)
+{
+ struct cdap_info * info = (struct cdap_info *) o;
+ struct cdap * instance = info->instance;
+ cdap_t * msg = info->msg;
+
+ switch (msg->opcode) {
+ case OPCODE__READ:
+ if (msg->name != NULL)
+ instance->ops->cdap_read(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__WRITE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_write(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len,
+ msg->flags);
+ break;
+ case OPCODE__CREATE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_create(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len);
+ break;
+ case OPCODE__DELETE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_create(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len);
+ break;
+ case OPCODE__START:
+ if (msg->name != NULL)
+ instance->ops->cdap_start(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__STOP:
+ if (msg->name != NULL)
+ instance->ops->cdap_stop(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__REPLY:
+ instance->ops->cdap_reply(instance,
+ msg->invoke_id,
+ msg->result,
+ msg->value.data,
+ msg->value.len);
+ release_invoke_id(instance, msg->invoke_id);
+ break;
+ default:
+ break;
+ }
+
+ free(info);
+ cdap__free_unpacked(msg, NULL);
+
+ return (void *) 0;
+}
+
static void * sdu_reader(void * o)
{
struct cdap * instance = (struct cdap *) o;
cdap_t * msg;
uint8_t buf[BUF_SIZE];
ssize_t len;
+ struct cdap_info * cdap_info;
while (true) {
len = flow_read(instance->fd, buf, BUF_SIZE);
@@ -82,69 +160,22 @@ static void * sdu_reader(void * o)
if (msg == NULL)
continue;
- switch (msg->opcode) {
- case OPCODE__READ:
- if (msg->name != NULL)
- instance->ops->cdap_read(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__WRITE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_write(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len,
- msg->flags);
- }
- break;
- case OPCODE__CREATE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_create(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len);
- }
- break;
- case OPCODE__DELETE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_create(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len);
- }
- break;
- case OPCODE__START:
- if (msg->name != NULL)
- instance->ops->cdap_start(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__STOP:
- if (msg->name != NULL)
- instance->ops->cdap_stop(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__REPLY:
- instance->ops->cdap_reply(instance,
- msg->invoke_id,
- msg->result,
- msg->value.data,
- msg->value.len);
- release_invoke_id(instance, msg->invoke_id);
- break;
- default:
- break;
+ cdap_info = malloc(sizeof(*cdap_info));
+ if (cdap_info == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
}
- cdap__free_unpacked(msg, NULL);
+ cdap_info->instance = instance;
+ cdap_info->msg = msg;
+
+ pthread_create(&cdap_info->thread,
+ NULL,
+ handle_cdap_msg,
+ (void *) cdap_info);
+
+ pthread_detach(cdap_info->thread);
+
}
return (void *) 0;
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 1c7fd600..4ca29636 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -40,8 +40,6 @@
#include <signal.h>
#include <sys/stat.h>
-#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
-
#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
+ 2 * sizeof (pthread_cond_t))