summaryrefslogtreecommitdiff
path: root/src/irmd/reg/flow.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/irmd/reg/flow.c')
-rw-r--r--src/irmd/reg/flow.c274
1 files changed, 132 insertions, 142 deletions
diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c
index 66bd25a3..4d091b23 100644
--- a/src/irmd/reg/flow.c
+++ b/src/irmd/reg/flow.c
@@ -20,199 +20,189 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#define _POSIX_C_SOURCE 200112L
+#define _POSIX_C_SOURCE 200809L
-#include "config.h"
+#define OUROBOROS_PREFIX "reg/flow"
-#define OUROBOROS_PREFIX "reg-flow"
-
-#include <ouroboros/errno.h>
-#include <ouroboros/flow.h>
#include <ouroboros/logs.h>
-#include <ouroboros/time_utils.h>
-#include <ouroboros/pthread.h>
#include "flow.h"
#include <assert.h>
+#include <errno.h>
#include <stdbool.h>
#include <stdlib.h>
-#include <string.h>
-struct reg_flow * reg_flow_create(pid_t n_pid,
- pid_t n_1_pid,
- int flow_id,
- qosspec_t qs)
+struct reg_flow * reg_flow_create(const struct flow_info * info)
{
- pthread_condattr_t cattr;
- struct reg_flow * f;
-
- f = malloc(sizeof(*f));
- if (f == NULL)
+ struct reg_flow * flow;
+
+ assert(info != NULL);
+ assert(info->id > 0);
+ assert(info->n_pid != 0);
+ assert(info->n_1_pid == 0);
+ assert(info->mpl == 0);
+ assert(info->state == FLOW_INIT);
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL) {
+ log_err("Failed to malloc flow.");
goto fail_malloc;
-
- memset(f, 0, sizeof(*f));
-
- if (pthread_condattr_init(&cattr))
- goto fail_cattr;
-
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- if (pthread_cond_init(&f->cond, &cattr))
- goto fail_cond;
-
- if (pthread_mutex_init(&f->mtx, NULL))
- goto fail_mutex;
-
- f->n_rb = shm_rbuff_create(n_pid, flow_id);
- if (f->n_rb == NULL) {
- log_err("Could not create N ringbuffer flow %d, pid %d.",
- flow_id, n_pid);
- goto fail_n_rbuff;
}
- f->n_1_rb = shm_rbuff_create(n_1_pid, flow_id);
- if (f->n_1_rb == NULL) {
- log_err("Could not create N - 1 ringbuffer flow %d, pid %d.",
- flow_id, n_1_pid);
- goto fail_n_1_rbuff;
- }
-
- if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
- log_warn("Failed to set timestamp.");
+ memset(flow, 0, sizeof(*flow));
- pthread_condattr_destroy(&cattr);
+ clock_gettime(PTHREAD_COND_CLOCK, &flow->t0);
+ list_head_init(&flow->next);
- f->n_pid = n_pid;
- f->n_1_pid = n_1_pid;
- f->flow_id = flow_id;
- f->qs = qs;
+ flow->info = *info;
- f->state = FLOW_ALLOC_PENDING;
+ return flow;
- return f;
-
- fail_n_1_rbuff:
- shm_rbuff_destroy(f->n_rb);
- fail_n_rbuff:
- pthread_mutex_destroy(&f->mtx);
- fail_mutex:
- pthread_cond_destroy(&f->cond);
- fail_cond:
- pthread_condattr_destroy(&cattr);
- fail_cattr:
- free(f);
fail_malloc:
return NULL;
}
-static void cancel_irm_destroy(void * o)
+static void destroy_rbuffs(struct reg_flow * flow)
{
- struct reg_flow * f = (struct reg_flow *) o;
-
- pthread_mutex_unlock(&f->mtx);
+ if (flow->n_rb != NULL)
+ shm_rbuff_destroy(flow->n_rb);
+ flow->n_rb = NULL;
- pthread_cond_destroy(&f->cond);
- pthread_mutex_destroy(&f->mtx);
-
- shm_rbuff_destroy(f->n_rb);
- shm_rbuff_destroy(f->n_1_rb);
-
- free(f);
+ if (flow->n_1_rb != NULL)
+ shm_rbuff_destroy(flow->n_1_rb);
+ flow->n_1_rb = NULL;
}
-void reg_flow_destroy(struct reg_flow * f)
+void reg_flow_destroy(struct reg_flow * flow)
{
- assert(f);
-
- pthread_mutex_lock(&f->mtx);
-
- assert(f->data.len == 0);
-
- if (f->state == FLOW_DESTROY) {
- pthread_mutex_unlock(&f->mtx);
- return;
+ assert(flow != NULL);
+
+ switch(flow->info.state) {
+ case FLOW_ACCEPT_PENDING:
+ clrbuf(flow->data);
+ /* FALLTHRU */
+ default:
+ destroy_rbuffs(flow);
+ break;
}
- if (f->state == FLOW_ALLOC_PENDING)
- f->state = FLOW_DESTROY;
- else
- f->state = FLOW_NULL;
-
- pthread_cond_broadcast(&f->cond);
-
- pthread_cleanup_push(cancel_irm_destroy, f);
+ assert(flow->n_rb == NULL);
+ assert(flow->n_1_rb == NULL);
+ assert(flow->data.data == NULL);
+ assert(flow->data.len == 0);
- while (f->state != FLOW_NULL)
- pthread_cond_wait(&f->cond, &f->mtx);
+ assert(list_is_empty(&flow->next));
- pthread_cleanup_pop(true);
+ free(flow);
}
-enum flow_state reg_flow_get_state(struct reg_flow * f)
+static int create_rbuffs(struct reg_flow * flow,
+ struct flow_info * info)
{
- enum flow_state state;
+ assert(flow != NULL);
+ assert(info != NULL);
- assert(f);
+ flow->n_rb = shm_rbuff_create(info->n_pid, info->id);
+ if (flow->n_rb == NULL)
+ goto fail_n_rb;
- pthread_mutex_lock(&f->mtx);
+ assert(flow->info.n_1_pid == 0);
+ assert(flow->n_1_rb == NULL);
- state = f->state;
+ flow->info.n_1_pid = info->n_1_pid;
+ flow->n_1_rb = shm_rbuff_create(info->n_1_pid, info->id);
+ if (flow->n_1_rb == NULL)
+ goto fail_n_1_rb;
- pthread_mutex_unlock(&f->mtx);
+ return 0;
- return state;
+ fail_n_1_rb:
+ shm_rbuff_destroy(flow->n_rb);
+ fail_n_rb:
+ return -ENOMEM;
}
-void reg_flow_set_state(struct reg_flow * f,
- enum flow_state state)
+int reg_flow_update(struct reg_flow * flow,
+ struct flow_info * info)
{
- assert(f);
- assert(state != FLOW_DESTROY);
+ assert(flow != NULL);
+ assert(info != NULL);
+
+ assert(flow->info.id == info->id);
+
+ switch(info->state) {
+ case FLOW_ACCEPT_PENDING:
+ assert(flow->info.state == FLOW_INIT);
+ flow->info.n_pid = info->n_pid;
+ break;
+ case FLOW_ALLOC_PENDING:
+ assert(flow->info.state == FLOW_INIT);
+ assert(info->n_1_pid != 0);
+
+ if (create_rbuffs(flow, info) < 0)
+ goto fail;
+
+ break;
+ case FLOW_ALLOCATED:
+ assert(info->n_1_pid != 0);
+ assert(flow->info.state > FLOW_INIT);
+ assert(flow->info.state < FLOW_ALLOCATED);
+ assert(flow->info.n_pid != 0);
+ assert(info->mpl != 0);
+
+ flow->info.mpl = info->mpl;
+
+ if (flow->info.state == FLOW_ALLOC_PENDING)
+ break;
+
+ flow->info.qs = info->qs;
+
+ if (create_rbuffs(flow, info) < 0)
+ goto fail;
+ break;
+ case FLOW_DEALLOCATED:
+ destroy_rbuffs(flow);
+ break;
+ case FLOW_DEALLOC_PENDING:
+ break;
+ default:
+ assert(false);
+ return -EPERM;
+ }
- pthread_mutex_lock(&f->mtx);
+ flow->info.state = info->state;
- f->state = state;
- pthread_cond_broadcast(&f->cond);
+ *info = flow->info;
- pthread_mutex_unlock(&f->mtx);
+ return 0;
+ fail:
+ return -ENOMEM;
}
-int reg_flow_wait_state(struct reg_flow * f,
- enum flow_state state,
- struct timespec * dl)
+void reg_flow_set_data(struct reg_flow * flow,
+ const buffer_t * buf)
{
- int ret = 0;
- int s;
-
- assert(f);
- assert(state != FLOW_NULL);
- assert(state != FLOW_DESTROY);
- assert(state != FLOW_DEALLOC_PENDING);
-
- pthread_mutex_lock(&f->mtx);
+ assert(flow != NULL);
+ assert(buf != NULL);
+ assert(flow->data.data == NULL);
+ assert(flow->data.len == 0);
- assert(f->state != FLOW_NULL);
-
- pthread_cleanup_push(__cleanup_mutex_unlock, &f->mtx);
-
- while (!(f->state == state ||
- f->state == FLOW_DESTROY ||
- f->state == FLOW_DEALLOC_PENDING) &&
- ret != -ETIMEDOUT)
- ret = -__timedwait(&f->cond, &f->mtx, dl);
+ flow->data = *buf;
+}
- if (f->state == FLOW_DESTROY ||
- f->state == FLOW_DEALLOC_PENDING ||
- ret == -ETIMEDOUT) {
- f->state = FLOW_NULL;
- pthread_cond_broadcast(&f->cond);
- }
+void reg_flow_get_data(struct reg_flow * flow,
+ buffer_t * buf)
+{
+ assert(flow != NULL);
+ assert(buf != NULL);
- s = f->state;
+ *buf = flow->data;
- pthread_cleanup_pop(true);
+ clrbuf(flow->data);
+}
- return ret ? ret : s;
+void reg_flow_free_data(struct reg_flow * flow)
+{
+ freebuf(flow->data);
}