summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/gam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal/gam.c')
-rw-r--r--src/ipcpd/normal/gam.c288
1 files changed, 18 insertions, 270 deletions
diff --git a/src/ipcpd/normal/gam.c b/src/ipcpd/normal/gam.c
index 791cf34e..643d83b0 100644
--- a/src/ipcpd/normal/gam.c
+++ b/src/ipcpd/normal/gam.c
@@ -1,10 +1,10 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Data transfer graph adjacency manager
*
- * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -20,7 +20,7 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "graph-adjacency-manager"
+#define OUROBOROS_PREFIX "dt-gam"
#include <ouroboros/config.h>
#include <ouroboros/cdap.h>
@@ -40,295 +40,43 @@
#include <pthread.h>
#include <string.h>
-struct ga {
- struct list_head next;
-
- qosspec_t qs;
- int fd;
- struct cacep_info * info;
-};
-
struct gam {
- struct list_head gas;
- pthread_mutex_t gas_lock;
- pthread_cond_t gas_cond;
-
- char * ae_name;
-
struct pol_gam_ops * ops;
void * ops_o;
};
struct gam * gam_create(enum pol_gam gam_type,
- const char * ae_name)
+ struct nbs * nbs,
+ struct ae * ae)
{
- struct gam * tmp;
+ struct gam * gam;
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL)
+ gam = malloc(sizeof(*gam));
+ if (gam == NULL)
return NULL;
switch (gam_type) {
case COMPLETE:
- tmp->ops = &complete_ops;
+ gam->ops = &complete_ops;
break;
default:
log_err("Unknown gam policy: %d.", gam_type);
- free(tmp);
return NULL;
}
- list_head_init(&tmp->gas);
-
- tmp->ae_name = strdup(ae_name);
- if (tmp->ae_name == NULL) {
- free(tmp);
+ gam->ops_o = gam->ops->create(nbs, ae);
+ if (gam->ops_o == NULL) {
+ free(gam);
return NULL;
}
- if (pthread_mutex_init(&tmp->gas_lock, NULL)) {
- free(tmp->ae_name);
- free(tmp);
- return NULL;
- }
-
- if (pthread_cond_init(&tmp->gas_cond, NULL)) {
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp->ae_name);
- free(tmp);
- return NULL;
- }
-
- tmp->ops_o = tmp->ops->create(tmp);
- if (tmp->ops_o == NULL) {
- pthread_cond_destroy(&tmp->gas_cond);
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp->ae_name);
- free(tmp);
- return NULL;
- }
-
- if (tmp->ops->start(tmp->ops_o)) {
- pthread_cond_destroy(&tmp->gas_cond);
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp->ae_name);
- free(tmp);
- return NULL;
- }
-
- return tmp;
-}
-
-void gam_destroy(struct gam * instance)
-{
- struct list_head * p = NULL;
- struct list_head * n = NULL;
-
- assert(instance);
-
- instance->ops->stop(instance->ops_o);
-
- pthread_mutex_lock(&instance->gas_lock);
-
- list_for_each_safe(p, n, &instance->gas) {
- struct ga * e = list_entry(p, struct ga, next);
- list_del(&e->next);
- free(e->info->name);
- free(e->info);
- free(e);
- }
-
- pthread_mutex_unlock(&instance->gas_lock);
-
- pthread_mutex_destroy(&instance->gas_lock);
- pthread_cond_destroy(&instance->gas_cond);
-
- free(instance->ae_name);
- instance->ops->destroy(instance->ops_o);
- free(instance);
-}
-
-static int add_ga(struct gam * instance,
- int fd,
- qosspec_t qs,
- struct cacep_info * info)
-{
- struct ga * ga;
-
- ga = malloc(sizeof(*ga));
- if (ga == NULL)
- return -ENOMEM;
-
- ga->fd = fd;
- ga->info = info;
- ga->qs = qs;
-
- list_head_init(&ga->next);
-
- pthread_mutex_lock(&instance->gas_lock);
- list_add(&ga->next, &instance->gas);
- pthread_cond_signal(&instance->gas_cond);
- pthread_mutex_unlock(&instance->gas_lock);
-
- log_info("Added %s flow to %s.", instance->ae_name, info->name);
-
- return 0;
-}
-
-int gam_flow_arr(struct gam * instance,
- int fd,
- qosspec_t qs)
-{
- struct cacep_info * rcv_info;
- struct cacep_info snd_info;
-
- if (flow_alloc_resp(fd, instance->ops->accept_new_flow(instance->ops_o))
- < 0) {
- log_err("Could not respond to new flow.");
- return -1;
- }
-
- cacep_info_init(&snd_info);
- snd_info.proto.protocol = strdup(CDAP_PROTO);
- if (snd_info.proto.protocol == NULL) {
- cacep_info_fini(&snd_info);
- return -ENOMEM;
- }
-
- snd_info.proto.pref_version = 1;
- snd_info.proto.pref_syntax = PROTO_GPB;
- snd_info.addr = ipcpi.address;
- snd_info.name = strdup(ipcpi.name);
- if (snd_info.name == NULL) {
- cacep_info_fini(&snd_info);
- return -ENOMEM;
- }
-
- rcv_info = cacep_auth_wait(fd, SIMPLE_AUTH, &snd_info);
- if (rcv_info == NULL) {
- log_err("Other side failed to authenticate.");
- cacep_info_fini(&snd_info);
- return -1;
- }
-
- cacep_info_fini(&snd_info);
-
- if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) {
- flow_dealloc(fd);
- cacep_info_fini(rcv_info);
- free(rcv_info);
- return 0;
- }
-
- if (add_ga(instance, fd, qs, rcv_info)) {
- log_err("Failed to add ga to graph adjacency manager list.");
- flow_dealloc(fd);
- cacep_info_fini(rcv_info);
- free(rcv_info);
- return -1;
- }
-
- return 0;
-}
-
-int gam_flow_alloc(struct gam * instance,
- char * dst_name,
- qosspec_t qs)
-{
- struct cacep_info * rcv_info;
- struct cacep_info snd_info;
- int fd;
-
- log_dbg("Allocating flow to %s.", dst_name);
-
- fd = flow_alloc(dst_name, instance->ae_name, NULL);
- if (fd < 0) {
- log_err("Failed to allocate flow to %s.", dst_name);
- return -1;
- }
-
- if (flow_alloc_res(fd)) {
- log_err("Flow allocation to %s failed.", dst_name);
- flow_dealloc(fd);
- return -1;
- }
-
- cacep_info_init(&snd_info);
- snd_info.proto.protocol = strdup(CDAP_PROTO);
- if (snd_info.proto.protocol == NULL) {
- cacep_info_fini(&snd_info);
- return -ENOMEM;
- }
-
- snd_info.proto.pref_version = 1;
- snd_info.proto.pref_syntax = PROTO_GPB;
- snd_info.addr = ipcpi.address;
- snd_info.name = strdup(ipcpi.name);
- if (snd_info.name == NULL) {
- cacep_info_fini(&snd_info);
- return -ENOMEM;
- }
-
- rcv_info = cacep_auth(fd, SIMPLE_AUTH, &snd_info);
- if (rcv_info == NULL) {
- log_err("Other side failed to authenticate.");
- cacep_info_fini(&snd_info);
- return -1;
- }
-
- cacep_info_fini(&snd_info);
-
- if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) {
- flow_dealloc(fd);
- cacep_info_fini(rcv_info);
- free(rcv_info);
- return 0;
- }
-
- if (add_ga(instance, fd, qs, rcv_info)) {
- log_err("Failed to add GA to graph adjacency manager list.");
- flow_dealloc(fd);
- cacep_info_fini(rcv_info);
- free(rcv_info);
- return -1;
- }
-
- return 0;
+ return gam;
}
-int gam_flow_wait(struct gam * instance,
- int * fd,
- struct cacep_info ** info,
- qosspec_t * qs)
+void gam_destroy(struct gam * gam)
{
- struct ga * ga;
-
- assert(fd);
- assert(info);
- assert(qs);
-
- pthread_mutex_lock(&instance->gas_lock);
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) &instance->gas_lock);
-
- while (list_is_empty(&instance->gas))
- pthread_cond_wait(&instance->gas_cond, &instance->gas_lock);
-
- ga = list_first_entry((&instance->gas), struct ga, next);
- if (ga == NULL) {
- pthread_mutex_unlock(&instance->gas_lock);
- return -1;
- }
-
- *fd = ga->fd;
- *info = ga->info;
- *qs = ga->qs;
-
- list_del(&ga->next);
- free(ga);
-
- pthread_cleanup_pop(true);
+ assert(gam);
- return 0;
+ gam->ops->destroy(gam->ops_o);
+ free(gam);
}