diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2017-02-12 16:15:46 +0100 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2017-02-12 22:19:50 +0100 |
commit | 98a15feabb6a14e52a54a09dfed58d55e0f99884 (patch) | |
tree | 0a67e3eb076558b6408d5744d74808e756e0639d | |
parent | 2ee140ec27335ca50e813080ee0e85e4ab86af37 (diff) | |
download | ouroboros-98a15feabb6a14e52a54a09dfed58d55e0f99884.tar.gz ouroboros-98a15feabb6a14e52a54a09dfed58d55e0f99884.zip |
irmd: Allow time for AP to call flow_accept()
When there is a burst of successive flow allocations for a certain
name, each such request will block a thread in the IRMD for
IRMD_REQ_ARR_TIMEOUT ms to allow the application some time to respond.
This refactors some parts of the IRMd.
-rw-r--r-- | include/ouroboros/config.h.in | 1 | ||||
-rw-r--r-- | src/irmd/api_table.c | 15 | ||||
-rw-r--r-- | src/irmd/main.c | 45 | ||||
-rw-r--r-- | src/irmd/registry.c | 129 | ||||
-rw-r--r-- | src/irmd/registry.h | 11 |
5 files changed, 154 insertions, 47 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index c1b6bd02..10d78cd0 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -54,6 +54,7 @@ #define PFT_SIZE 1 << 12 /* Timeout values */ #define IRMD_ACCEPT_TIMEOUT 100 +#define IRMD_REQ_ARR_TIMEOUT 200 #define IRMD_FLOW_TIMEOUT 5000 #define IPCP_ACCEPT_TIMEOUT 100 #define SOCKET_TIMEOUT 4000 diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c index 7619fcf6..df300cea 100644 --- a/src/irmd/api_table.c +++ b/src/irmd/api_table.c @@ -159,16 +159,15 @@ int api_entry_sleep(struct api_entry * e) e->state = API_SLEEP; - while (e->state == API_SLEEP) { - if ((ret = -pthread_cond_timedwait(&e->state_cond, - &e->state_lock, - &dl)) == -ETIMEDOUT) { - break; - } - } + while (e->state == API_SLEEP && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&e->state_cond, + &e->state_lock, + &dl); - if (e->state == API_DESTROY) + if (e->state == API_DESTROY) { + reg_entry_del_api(e->re, e->api); ret = -1; + } e->state = API_INIT; pthread_cond_broadcast(&e->state_cond); diff --git a/src/irmd/main.c b/src/irmd/main.c index 2454a9ca..aa4614c1 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -970,7 +970,6 @@ static struct irm_flow * flow_accept(pid_t api, log_err("Unknown instance %d calling accept.", api); return NULL; } - log_dbg("New instance (%d) of %s added.", api, e->apn); log_dbg("This instance accepts flows for:"); list_for_each(p, &e->names) { @@ -996,6 +995,7 @@ static struct irm_flow * flow_accept(pid_t api, pthread_rwlock_rdlock(&irmd->state_lock); if (irmd->state != IRMD_RUNNING) { + reg_entry_set_state(re, REG_NAME_NULL); pthread_rwlock_unlock(&irmd->state_lock); return NULL; } @@ -1331,11 +1331,14 @@ static struct irm_flow * flow_req_arr(pid_t api, pid_t h_api = -1; int port_id = -1; + struct timespec wt = {IRMD_REQ_ARR_TIMEOUT % 1000, + (IRMD_REQ_ARR_TIMEOUT % 1000) * MILLION}; + log_dbg("Flow req arrived from IPCP %d for %s on AE %s.", api, dst_name, ae_name); pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->reg_lock); + pthread_rwlock_rdlock(&irmd->reg_lock); re = registry_get_entry(&irmd->registry, dst_name); if (re == NULL) { @@ -1345,6 +1348,18 @@ static struct irm_flow * flow_req_arr(pid_t api, return NULL; } + pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->state_lock); + + /* Give the AP a bit of slop time to call accept */ + if (reg_entry_leave_state(re, REG_NAME_IDLE, &wt) == -1) { + log_err("No APs for %s.", dst_name); + return NULL; + } + + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_wrlock(&irmd->reg_lock); + switch (reg_entry_get_state(re)) { case REG_NAME_IDLE: pthread_rwlock_unlock(&irmd->reg_lock); @@ -1378,17 +1393,12 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - reg_entry_leave_state(re, REG_NAME_AUTO_EXEC); - - pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_rdlock(&irmd->reg_lock); - - if (reg_entry_get_state(re) == REG_NAME_DESTROY) { - reg_entry_set_state(re, REG_NAME_NULL); + if (reg_entry_leave_state(re, REG_NAME_AUTO_EXEC, NULL)) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); return NULL; } + case REG_NAME_FLOW_ACCEPT: h_api = reg_entry_get_api(re); if (h_api == -1) { @@ -1453,7 +1463,7 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - reg_entry_leave_state(re, REG_NAME_FLOW_ARRIVED); + reg_entry_leave_state(re, REG_NAME_FLOW_ARRIVED, NULL); return f; } @@ -1518,10 +1528,16 @@ static void irm_destroy(void) list_del(&e->next); ipcp_destroy(e->api); clear_spawned_api(e->api); + registry_del_api(&irmd->registry, e->api); ipcp_entry_destroy(e); } - registry_destroy(&irmd->registry); + list_for_each_safe(p, h, &irmd->api_table) { + struct api_entry * e = list_entry(p, struct api_entry, next); + list_del(&e->next); + registry_del_api(&irmd->registry, e->api); + api_entry_destroy(e); + } list_for_each_safe(p, h, &irmd->spawned_apis) { struct pid_el * e = list_entry(p, struct pid_el, next); @@ -1531,6 +1547,7 @@ static void irm_destroy(void) else if (waitpid(e->pid, &status, 0) < 0) log_dbg("Error waiting for %d to exit.", e->pid); list_del(&e->next); + registry_del_api(&irmd->registry, e->pid); free(e); } @@ -1540,11 +1557,7 @@ static void irm_destroy(void) apn_entry_destroy(e); } - list_for_each_safe(p, h, &irmd->api_table) { - struct api_entry * e = list_entry(p, struct api_entry, next); - list_del(&e->next); - api_entry_destroy(e); - } + registry_destroy(&irmd->registry); pthread_rwlock_unlock(&irmd->reg_lock); diff --git a/src/irmd/registry.c b/src/irmd/registry.c index d22c1be3..985ecda0 100644 --- a/src/irmd/registry.c +++ b/src/irmd/registry.c @@ -25,6 +25,7 @@ #include <ouroboros/errno.h> #include <ouroboros/logs.h> #include <ouroboros/irm_config.h> +#include <ouroboros/time_utils.h> #include "registry.h" #include "utils.h" @@ -60,6 +61,8 @@ static struct reg_entry * reg_entry_create(void) static struct reg_entry * reg_entry_init(struct reg_entry * e, char * name) { + pthread_condattr_t cattr; + if (e == NULL || name == NULL) return NULL; @@ -70,7 +73,13 @@ static struct reg_entry * reg_entry_init(struct reg_entry * e, e->name = name; - if (pthread_cond_init(&e->state_cond, NULL)) + if (pthread_condattr_init(&cattr)) + return NULL; + +#ifdef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&e->state_cond, &cattr)) return NULL; if (pthread_mutex_init(&e->state_lock, NULL)) @@ -91,9 +100,21 @@ static void reg_entry_destroy(struct reg_entry * e) pthread_mutex_lock(&e->state_lock); - e->state = REG_NAME_DESTROY; + if (e->state == REG_NAME_DESTROY) { + pthread_mutex_unlock(&e->state_lock); + return; + } + + if (e->state != REG_NAME_FLOW_ACCEPT) + e->state = REG_NAME_NULL; + else + e->state = REG_NAME_DESTROY; pthread_cond_broadcast(&e->state_cond); + + while (e->state != REG_NAME_NULL) + pthread_cond_wait(&e->state_cond, &e->state_lock); + pthread_mutex_unlock(&e->state_lock); pthread_cond_destroy(&e->state_cond); @@ -102,12 +123,6 @@ static void reg_entry_destroy(struct reg_entry * e) if (e->name != NULL) free(e->name); - list_for_each_safe(p, h, &e->reg_apis) { - struct pid_el * i = list_entry(p, struct pid_el, next); - list_del(&i->next); - free(i); - } - list_for_each_safe(p, h, &e->reg_apns) { struct str_el * a = list_entry(p, struct str_el, next); list_del(&a->next); @@ -171,7 +186,8 @@ static void reg_entry_del_local_from_dif(struct reg_entry * e, } } -static bool reg_entry_has_apn(struct reg_entry * e, char * apn) +static bool reg_entry_has_apn(struct reg_entry * e, + char * apn) { struct list_head * p; @@ -184,7 +200,8 @@ static bool reg_entry_has_apn(struct reg_entry * e, char * apn) return false; } -int reg_entry_add_apn(struct reg_entry * e, struct apn_entry * a) +int reg_entry_add_apn(struct reg_entry * e, + struct apn_entry * a) { struct str_el * n; @@ -215,7 +232,8 @@ int reg_entry_add_apn(struct reg_entry * e, struct apn_entry * a) return 0; } -void reg_entry_del_apn(struct reg_entry * e, char * apn) +void reg_entry_del_apn(struct reg_entry * e, + char * apn) { struct list_head * p = NULL; struct list_head * h = NULL; @@ -244,7 +262,8 @@ char * reg_entry_get_apn(struct reg_entry * e) return list_first_entry(&e->reg_apns, struct str_el, next)->str; } -static bool reg_entry_has_api(struct reg_entry * e, pid_t api) +static bool reg_entry_has_api(struct reg_entry * e, + pid_t api) { struct list_head * p; @@ -257,7 +276,8 @@ static bool reg_entry_has_api(struct reg_entry * e, pid_t api) return false; } -int reg_entry_add_api(struct reg_entry * e, pid_t api) +int reg_entry_add_api(struct reg_entry * e, + pid_t api) { struct pid_el * i; @@ -288,7 +308,7 @@ int reg_entry_add_api(struct reg_entry * e, pid_t api) e->state == REG_NAME_AUTO_ACCEPT || e->state == REG_NAME_AUTO_EXEC) { e->state = REG_NAME_FLOW_ACCEPT; - pthread_cond_signal(&e->state_cond); + pthread_cond_broadcast(&e->state_cond); } pthread_mutex_unlock(&e->state_lock); @@ -298,6 +318,12 @@ int reg_entry_add_api(struct reg_entry * e, pid_t api) static void reg_entry_check_state(struct reg_entry * e) { + if (e->state == REG_NAME_DESTROY) { + e->state = REG_NAME_NULL; + pthread_cond_broadcast(&e->state_cond); + return; + } + if (list_is_empty(&e->reg_apis)) { if (!list_is_empty(&e->reg_apns)) e->state = REG_NAME_AUTO_ACCEPT; @@ -319,7 +345,8 @@ void reg_entry_del_pid_el(struct reg_entry * e, reg_entry_check_state(e); } -void reg_entry_del_api(struct reg_entry * e, pid_t api) +void reg_entry_del_api(struct reg_entry * e, + pid_t api) { struct list_head * p; struct list_head * h; @@ -365,7 +392,8 @@ enum reg_name_state reg_entry_get_state(struct reg_entry * e) return state; } -int reg_entry_set_state(struct reg_entry * e, enum reg_name_state state) +int reg_entry_set_state(struct reg_entry * e, + enum reg_name_state state) { if (state == REG_NAME_DESTROY) return -EPERM; @@ -380,19 +408,80 @@ int reg_entry_set_state(struct reg_entry * e, enum reg_name_state state) return 0; } -int reg_entry_leave_state(struct reg_entry * e, enum reg_name_state state) +int reg_entry_leave_state(struct reg_entry * e, + enum reg_name_state state, + struct timespec * timeout) { + struct timespec abstime; + int ret = 0; + if (e == NULL || state == REG_NAME_DESTROY) return -EINVAL; + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + pthread_mutex_lock(&e->state_lock); - while (e->state == state) - pthread_cond_wait(&e->state_cond, &e->state_lock); + while (e->state == state && ret != -ETIMEDOUT) + if (timeout) + ret = -pthread_cond_timedwait(&e->state_cond, + &e->state_lock, + timeout); + else + ret = -pthread_cond_wait(&e->state_cond, + &e->state_lock); + + if (e->state == REG_NAME_DESTROY) { + ret = -1; + e->state = REG_NAME_NULL; + pthread_cond_broadcast(&e->state_cond); + } pthread_mutex_unlock(&e->state_lock); - return 0; + return ret; +} + +int reg_entry_wait_state(struct reg_entry * e, + enum reg_name_state state, + struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + + if (e == NULL || state == REG_NAME_DESTROY) + return -EINVAL; + + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_mutex_lock(&e->state_lock); + + while (e->state != state && + e->state != REG_NAME_DESTROY && + ret != -ETIMEDOUT) + if (timeout) + ret = -pthread_cond_timedwait(&e->state_cond, + &e->state_lock, + timeout); + else + ret = -pthread_cond_wait(&e->state_cond, + &e->state_lock); + + if (e->state == REG_NAME_DESTROY) { + ret = -1; + e->state = REG_NAME_NULL; + pthread_cond_broadcast(&e->state_cond); + } + + pthread_mutex_unlock(&e->state_lock); + + return ret; } struct reg_entry * registry_get_entry(struct list_head * registry, diff --git a/src/irmd/registry.h b/src/irmd/registry.h index 7713e278..67e4da40 100644 --- a/src/irmd/registry.h +++ b/src/irmd/registry.h @@ -91,11 +91,16 @@ pid_t reg_entry_get_api(struct reg_entry * e); enum reg_name_state reg_entry_get_state(struct reg_entry * e); -int reg_entry_set_state(struct reg_entry * e, +int reg_entry_set_state(struct reg_entry * e, enum reg_name_state state); -int reg_entry_leave_state(struct reg_entry * e, - enum reg_name_state state); +int reg_entry_leave_state(struct reg_entry * e, + enum reg_name_state state, + struct timespec * timeout); + +int reg_entry_wait_state(struct reg_entry * e, + enum reg_name_state state, + struct timespec * timeout); struct reg_entry * registry_add_name(struct list_head * registry, char * name); |