diff options
Diffstat (limited to 'src/irmd/reg/flow.c')
-rw-r--r-- | src/irmd/reg/flow.c | 274 |
1 files changed, 132 insertions, 142 deletions
diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c index 66bd25a3..4d091b23 100644 --- a/src/irmd/reg/flow.c +++ b/src/irmd/reg/flow.c @@ -20,199 +20,189 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#define _POSIX_C_SOURCE 200112L +#define _POSIX_C_SOURCE 200809L -#include "config.h" +#define OUROBOROS_PREFIX "reg/flow" -#define OUROBOROS_PREFIX "reg-flow" - -#include <ouroboros/errno.h> -#include <ouroboros/flow.h> #include <ouroboros/logs.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/pthread.h> #include "flow.h" #include <assert.h> +#include <errno.h> #include <stdbool.h> #include <stdlib.h> -#include <string.h> -struct reg_flow * reg_flow_create(pid_t n_pid, - pid_t n_1_pid, - int flow_id, - qosspec_t qs) +struct reg_flow * reg_flow_create(const struct flow_info * info) { - pthread_condattr_t cattr; - struct reg_flow * f; - - f = malloc(sizeof(*f)); - if (f == NULL) + struct reg_flow * flow; + + assert(info != NULL); + assert(info->id > 0); + assert(info->n_pid != 0); + assert(info->n_1_pid == 0); + assert(info->mpl == 0); + assert(info->state == FLOW_INIT); + + flow = malloc(sizeof(*flow)); + if (flow == NULL) { + log_err("Failed to malloc flow."); goto fail_malloc; - - memset(f, 0, sizeof(*f)); - - if (pthread_condattr_init(&cattr)) - goto fail_cattr; - -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - if (pthread_cond_init(&f->cond, &cattr)) - goto fail_cond; - - if (pthread_mutex_init(&f->mtx, NULL)) - goto fail_mutex; - - f->n_rb = shm_rbuff_create(n_pid, flow_id); - if (f->n_rb == NULL) { - log_err("Could not create N ringbuffer flow %d, pid %d.", - flow_id, n_pid); - goto fail_n_rbuff; } - f->n_1_rb = shm_rbuff_create(n_1_pid, flow_id); - if (f->n_1_rb == NULL) { - log_err("Could not create N - 1 ringbuffer flow %d, pid %d.", - flow_id, n_1_pid); - goto fail_n_1_rbuff; - } - - if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) - log_warn("Failed to set timestamp."); + memset(flow, 0, sizeof(*flow)); - pthread_condattr_destroy(&cattr); + clock_gettime(PTHREAD_COND_CLOCK, &flow->t0); + list_head_init(&flow->next); - f->n_pid = n_pid; - f->n_1_pid = n_1_pid; - f->flow_id = flow_id; - f->qs = qs; + flow->info = *info; - f->state = FLOW_ALLOC_PENDING; + return flow; - return f; - - fail_n_1_rbuff: - shm_rbuff_destroy(f->n_rb); - fail_n_rbuff: - pthread_mutex_destroy(&f->mtx); - fail_mutex: - pthread_cond_destroy(&f->cond); - fail_cond: - pthread_condattr_destroy(&cattr); - fail_cattr: - free(f); fail_malloc: return NULL; } -static void cancel_irm_destroy(void * o) +static void destroy_rbuffs(struct reg_flow * flow) { - struct reg_flow * f = (struct reg_flow *) o; - - pthread_mutex_unlock(&f->mtx); + if (flow->n_rb != NULL) + shm_rbuff_destroy(flow->n_rb); + flow->n_rb = NULL; - pthread_cond_destroy(&f->cond); - pthread_mutex_destroy(&f->mtx); - - shm_rbuff_destroy(f->n_rb); - shm_rbuff_destroy(f->n_1_rb); - - free(f); + if (flow->n_1_rb != NULL) + shm_rbuff_destroy(flow->n_1_rb); + flow->n_1_rb = NULL; } -void reg_flow_destroy(struct reg_flow * f) +void reg_flow_destroy(struct reg_flow * flow) { - assert(f); - - pthread_mutex_lock(&f->mtx); - - assert(f->data.len == 0); - - if (f->state == FLOW_DESTROY) { - pthread_mutex_unlock(&f->mtx); - return; + assert(flow != NULL); + + switch(flow->info.state) { + case FLOW_ACCEPT_PENDING: + clrbuf(flow->data); + /* FALLTHRU */ + default: + destroy_rbuffs(flow); + break; } - if (f->state == FLOW_ALLOC_PENDING) - f->state = FLOW_DESTROY; - else - f->state = FLOW_NULL; - - pthread_cond_broadcast(&f->cond); - - pthread_cleanup_push(cancel_irm_destroy, f); + assert(flow->n_rb == NULL); + assert(flow->n_1_rb == NULL); + assert(flow->data.data == NULL); + assert(flow->data.len == 0); - while (f->state != FLOW_NULL) - pthread_cond_wait(&f->cond, &f->mtx); + assert(list_is_empty(&flow->next)); - pthread_cleanup_pop(true); + free(flow); } -enum flow_state reg_flow_get_state(struct reg_flow * f) +static int create_rbuffs(struct reg_flow * flow, + struct flow_info * info) { - enum flow_state state; + assert(flow != NULL); + assert(info != NULL); - assert(f); + flow->n_rb = shm_rbuff_create(info->n_pid, info->id); + if (flow->n_rb == NULL) + goto fail_n_rb; - pthread_mutex_lock(&f->mtx); + assert(flow->info.n_1_pid == 0); + assert(flow->n_1_rb == NULL); - state = f->state; + flow->info.n_1_pid = info->n_1_pid; + flow->n_1_rb = shm_rbuff_create(info->n_1_pid, info->id); + if (flow->n_1_rb == NULL) + goto fail_n_1_rb; - pthread_mutex_unlock(&f->mtx); + return 0; - return state; + fail_n_1_rb: + shm_rbuff_destroy(flow->n_rb); + fail_n_rb: + return -ENOMEM; } -void reg_flow_set_state(struct reg_flow * f, - enum flow_state state) +int reg_flow_update(struct reg_flow * flow, + struct flow_info * info) { - assert(f); - assert(state != FLOW_DESTROY); + assert(flow != NULL); + assert(info != NULL); + + assert(flow->info.id == info->id); + + switch(info->state) { + case FLOW_ACCEPT_PENDING: + assert(flow->info.state == FLOW_INIT); + flow->info.n_pid = info->n_pid; + break; + case FLOW_ALLOC_PENDING: + assert(flow->info.state == FLOW_INIT); + assert(info->n_1_pid != 0); + + if (create_rbuffs(flow, info) < 0) + goto fail; + + break; + case FLOW_ALLOCATED: + assert(info->n_1_pid != 0); + assert(flow->info.state > FLOW_INIT); + assert(flow->info.state < FLOW_ALLOCATED); + assert(flow->info.n_pid != 0); + assert(info->mpl != 0); + + flow->info.mpl = info->mpl; + + if (flow->info.state == FLOW_ALLOC_PENDING) + break; + + flow->info.qs = info->qs; + + if (create_rbuffs(flow, info) < 0) + goto fail; + break; + case FLOW_DEALLOCATED: + destroy_rbuffs(flow); + break; + case FLOW_DEALLOC_PENDING: + break; + default: + assert(false); + return -EPERM; + } - pthread_mutex_lock(&f->mtx); + flow->info.state = info->state; - f->state = state; - pthread_cond_broadcast(&f->cond); + *info = flow->info; - pthread_mutex_unlock(&f->mtx); + return 0; + fail: + return -ENOMEM; } -int reg_flow_wait_state(struct reg_flow * f, - enum flow_state state, - struct timespec * dl) +void reg_flow_set_data(struct reg_flow * flow, + const buffer_t * buf) { - int ret = 0; - int s; - - assert(f); - assert(state != FLOW_NULL); - assert(state != FLOW_DESTROY); - assert(state != FLOW_DEALLOC_PENDING); - - pthread_mutex_lock(&f->mtx); + assert(flow != NULL); + assert(buf != NULL); + assert(flow->data.data == NULL); + assert(flow->data.len == 0); - assert(f->state != FLOW_NULL); - - pthread_cleanup_push(__cleanup_mutex_unlock, &f->mtx); - - while (!(f->state == state || - f->state == FLOW_DESTROY || - f->state == FLOW_DEALLOC_PENDING) && - ret != -ETIMEDOUT) - ret = -__timedwait(&f->cond, &f->mtx, dl); + flow->data = *buf; +} - if (f->state == FLOW_DESTROY || - f->state == FLOW_DEALLOC_PENDING || - ret == -ETIMEDOUT) { - f->state = FLOW_NULL; - pthread_cond_broadcast(&f->cond); - } +void reg_flow_get_data(struct reg_flow * flow, + buffer_t * buf) +{ + assert(flow != NULL); + assert(buf != NULL); - s = f->state; + *buf = flow->data; - pthread_cleanup_pop(true); + clrbuf(flow->data); +} - return ret ? ret : s; +void reg_flow_free_data(struct reg_flow * flow) +{ + freebuf(flow->data); } |