summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-06-23 15:45:57 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-06-24 16:39:57 +0200
commit668a32c56b7b419f2415d5b59ad97dcf71e4783c (patch)
treed612ce20ee628b78a5dc49f777fa683369900577 /src/irmd/main.c
parentaa6255a605cac034089c78562c0d000aacd0af1e (diff)
downloadouroboros-668a32c56b7b419f2415d5b59ad97dcf71e4783c.tar.gz
ouroboros-668a32c56b7b419f2415d5b59ad97dcf71e4783c.zip
irmd: name space management
irmd registry rebuilt from the ground up. Now supports * multiple APs to accept flows for the same registered name * multiple instances per registered name. * irm can select a specific process to wake up
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c1071
1 files changed, 817 insertions, 254 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 50055c4d..34894e73 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -50,46 +50,170 @@
#include <pthread.h>
#include <sys/stat.h>
-#ifndef IRMD_MAX_FLOWS
#define IRMD_MAX_FLOWS 4096
-#endif
-#ifndef IRMD_THREADPOOL_SIZE
#define IRMD_THREADPOOL_SIZE 3
-#endif
-#ifndef IRMD_FLOW_TIMEOUT
#define IRMD_FLOW_TIMEOUT 5000 /* ms */
-#endif
#define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */
+#define REG_AP_AUTO 0x0001
+/* FIXME: add support for unique */
+#define REG_AP_UNIQUE 0x0002
+
+#define reg_entry_has_api(e, id) (reg_entry_get_reg_instance(e, id) != NULL)
+#define reg_entry_has_ap_name(e, name) (reg_entry_get_ap_name(e, name) != NULL)
+#define reg_entry_has_ap_auto(e, name) (reg_entry_get_reg_auto(e, name) != NULL)
+
struct ipcp_entry {
struct list_head next;
instance_name_t * api;
char * dif_name;
};
-/* currently supports only registering whatevercast groups of a single AP-I */
-struct reg_name_entry {
+enum irm_state {
+ IRMD_NULL = 0,
+ IRMD_RUNNING,
+ IRMD_SHUTDOWN
+};
+
+enum reg_name_state {
+ REG_NAME_NULL = 0,
+ REG_NAME_IDLE,
+ REG_NAME_AUTO_ACCEPT,
+ REG_NAME_AUTO_EXEC,
+ REG_NAME_FLOW_ACCEPT,
+ REG_NAME_FLOW_ARRIVED
+};
+
+enum reg_i_state {
+ REG_I_NULL = 0,
+ REG_I_SLEEP,
+ REG_I_WAKE
+};
+
+struct reg_instance {
+ struct list_head next;
+ pid_t pid;
+
+ /* the pid will block on this */
+ enum reg_i_state state;
+ pthread_cond_t wakeup;
+ pthread_mutex_t mutex;
+};
+
+static struct reg_instance * reg_instance_create(pid_t pid)
+{
+ struct reg_instance * i;
+ i = malloc(sizeof(*i));
+ if (i == NULL)
+ return NULL;
+
+ i->pid = pid;
+ i->state = REG_I_WAKE;
+
+ pthread_mutex_init(&i->mutex, NULL);
+ pthread_cond_init(&i->wakeup, NULL);
+
+ INIT_LIST_HEAD(&i->next);
+
+ return i;
+}
+
+static void reg_instance_sleep(struct reg_instance * i)
+{
+ pthread_mutex_lock(&i->mutex);
+ if (i->state != REG_I_WAKE) {
+ pthread_mutex_unlock(&i->mutex);
+ return;
+ }
+
+ i->state = REG_I_SLEEP;
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) &i->mutex);
+
+ while (i->state == REG_I_SLEEP)
+ pthread_cond_wait(&i->wakeup, &i->mutex);
+
+ pthread_cleanup_pop(true);
+}
+
+static void reg_instance_wake(struct reg_instance * i)
+{
+ pthread_mutex_lock(&i->mutex);
+
+ if (i->state == REG_I_NULL) {
+ pthread_mutex_unlock(&i->mutex);
+ return;
+ }
+
+ i->state = REG_I_WAKE;
+
+ pthread_mutex_unlock(&i->mutex);
+ pthread_cond_signal(&i->wakeup);
+}
+
+static void reg_instance_destroy(struct reg_instance * i)
+{
+ bool wait = true;
+ pthread_mutex_lock(&i->mutex);
+ i->state = REG_I_NULL;
+
+ pthread_cond_broadcast(&i->wakeup);
+ pthread_mutex_unlock(&i->mutex);
+
+ while (wait) {
+ pthread_mutex_lock(&i->mutex);
+ if (pthread_cond_destroy(&i->wakeup) < 0)
+ pthread_cond_broadcast(&i->wakeup);
+ else
+ wait = false;
+ pthread_mutex_unlock(&i->mutex);
+ }
+
+ pthread_mutex_destroy(&i->mutex);
+
+ free(i);
+}
+
+struct reg_auto {
+ struct list_head next;
+ char * ap_name;
+ char ** argv;
+};
+
+struct reg_ap_name {
+ struct list_head next;
+ char * ap_name;
+};
+
+/* an entry in the registry */
+struct reg_entry {
struct list_head next;
- /* generic whatevercast name */
- char * name;
+ /* generic name */
+ char * name;
+
+ /* names of the aps that can listen to this name */
+ struct list_head ap_names;
+
+ enum reg_name_state state;
- /* FIXME: make a list resolve to AP-I instead */
- instance_name_t * api;
- char ** argv;
- bool autoexec;
+ uint32_t flags;
+
+ /* auto execution info */
+ struct list_head auto_ap_info;
+
+ /* known instances */
+ struct list_head ap_instances;
- bool accept;
char * req_ae_name;
int response;
- int flow_arrived;
pthread_cond_t acc_signal;
- pthread_cond_t acc_arr_signal;
- pthread_mutex_t acc_lock;
+ pthread_mutex_t state_lock;
};
/* keeps track of port_id's between N and N - 1 */
@@ -105,13 +229,15 @@ struct port_map_entry {
pthread_mutex_t res_lock;
enum flow_state state;
+
struct timespec t0;
};
struct irm {
- /* FIXME: list of ipcps could be merged with registered names */
+ /* FIXME: list of ipcps could be merged into the registry */
struct list_head ipcps;
- struct list_head reg_names;
+
+ struct list_head registry;
pthread_rwlock_t reg_lock;
/* keep track of all flows in this processing system */
@@ -120,13 +246,14 @@ struct irm {
struct list_head port_map;
pthread_rwlock_t flows_lock;
+ enum irm_state state;
struct shm_du_map * dum;
pthread_t * threadpool;
int sockfd;
pthread_rwlock_t state_lock;
- pthread_t cleanup_flows;
- pthread_t shm_sanitize;
+ pthread_t cleanup_flows;
+ pthread_t shm_sanitize;
} * instance = NULL;
static struct port_map_entry * port_map_entry_create()
@@ -156,6 +283,29 @@ static struct port_map_entry * port_map_entry_create()
return e;
}
+static void port_map_entry_destroy(struct port_map_entry * e)
+{
+ bool wait = true;
+ pthread_mutex_lock(&e->res_lock);
+ e->state = FLOW_NULL;
+
+ pthread_cond_broadcast(&e->res_signal);
+ pthread_mutex_unlock(&e->res_lock);
+
+ while (wait) {
+ pthread_mutex_lock(&e->res_lock);
+ if (pthread_cond_destroy(&e->res_signal) < 0)
+ pthread_cond_broadcast(&e->res_signal);
+ else
+ wait = false;
+ pthread_mutex_unlock(&e->res_lock);
+ }
+
+ pthread_mutex_destroy(&e->res_lock);
+
+ free(e);
+}
+
static struct port_map_entry * get_port_map_entry(int port_id)
{
struct list_head * pos = NULL;
@@ -272,87 +422,181 @@ static instance_name_t * get_ipcp_by_dst_name(char * dst_name,
return NULL;
}
-static struct reg_name_entry * reg_name_entry_create()
+static struct reg_entry * reg_entry_create()
{
- struct reg_name_entry * e = malloc(sizeof(*e));
+ struct reg_entry * e = malloc(sizeof(*e));
if (e == NULL)
return NULL;
e->name = NULL;
- e->api = NULL;
- e->argv = NULL;
- e->autoexec = false;
- e->accept = false;
+ e->state = REG_NAME_NULL;
+ e->flags = 0;
+
e->req_ae_name = NULL;
- e->flow_arrived = -1;
+ e->response = -1;
- if (pthread_cond_init(&e->acc_arr_signal, NULL)) {
- free(e);
+ return e;
+}
+
+static struct reg_entry * reg_entry_init(struct reg_entry * e,
+ char * name,
+ char * ap_name,
+ uint32_t flags)
+{
+ if (e == NULL || name == NULL || ap_name == NULL)
return NULL;
- }
+
+ struct reg_ap_name * n = malloc(sizeof(*n));
+ if (n == NULL)
+ return NULL;
+
+ INIT_LIST_HEAD(&e->next);
+ INIT_LIST_HEAD(&e->ap_names);
+ INIT_LIST_HEAD(&e->auto_ap_info);
+ INIT_LIST_HEAD(&e->ap_instances);
+
+ e->name = name;
+ e->flags = flags;
+ n->ap_name = ap_name;
+
+ list_add(&n->next, &e->ap_names);
if (pthread_cond_init(&e->acc_signal, NULL)) {
free(e);
return NULL;
}
- if (pthread_mutex_init(&e->acc_lock, NULL)) {
+ if (pthread_mutex_init(&e->state_lock, NULL)) {
free(e);
return NULL;
}
- INIT_LIST_HEAD(&e->next);
+ e->state = REG_NAME_IDLE;
return e;
}
-static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e,
- char * name,
- instance_name_t * api,
- char ** argv,
- bool ae)
+static void reg_entry_destroy(struct reg_entry * e)
{
- if (e == NULL || name == NULL || api == NULL)
- return NULL;
+ struct list_head * pos = NULL;
+ struct list_head * n = NULL;
- e->name = name;
- e->api = api;
- e->argv = argv;
- e->autoexec = ae;
+ bool wait = true;
- return e;
+ if (e == NULL)
+ return;
+
+ pthread_mutex_lock(&e->state_lock);
+
+ e->state = REG_NAME_NULL;
+
+ pthread_cond_broadcast(&e->acc_signal);
+ pthread_mutex_unlock(&e->state_lock);
+
+ while (wait) {
+ pthread_mutex_lock(&e->state_lock);
+ if (pthread_cond_destroy(&e->acc_signal) < 0)
+ pthread_cond_broadcast(&e->acc_signal);
+ else
+ wait = false;
+ pthread_mutex_unlock(&e->state_lock);
+ }
+
+ pthread_mutex_destroy(&e->state_lock);
+
+ if (e->name != NULL)
+ free(e->name);
+
+ if (e->req_ae_name != NULL)
+ free(e->req_ae_name);
+
+ list_for_each_safe(pos, n, &e->ap_instances) {
+ struct reg_instance * i =
+ list_entry(pos, struct reg_instance, next);
+ reg_instance_destroy(i);
+ }
+
+ list_for_each_safe(pos, n, &e->auto_ap_info) {
+ struct reg_auto * a =
+ list_entry(pos, struct reg_auto, next);
+
+ if (a->argv != NULL) {
+ char ** t = a->argv;
+ while (*a->argv != NULL)
+ free(*(a->argv++));
+ free(t);
+ }
+
+ free(a->ap_name);
+ free(a);
+ }
+
+ list_for_each_safe(pos, n, &e->ap_names) {
+ struct reg_ap_name * n =
+ list_entry(pos, struct reg_ap_name, next);
+
+ free(n->ap_name);
+ free(n);
+ }
+
+ free(e);
}
-static int reg_name_entry_destroy(struct reg_name_entry * e)
+static struct reg_ap_name * reg_entry_get_ap_name(struct reg_entry * e,
+ char * ap_name)
{
- if (e == NULL)
- return 0;
+ struct list_head * pos = NULL;
- if (e->accept) {
- pthread_mutex_lock(&e->acc_lock);
- e->flow_arrived = -2;
- pthread_mutex_unlock(&e->acc_lock);
- pthread_cond_broadcast(&e->acc_arr_signal);
- sched_yield();
+ list_for_each(pos, &e->ap_names) {
+ struct reg_ap_name * n =
+ list_entry(pos, struct reg_ap_name, next);
+
+ if (strcmp(ap_name, n->ap_name) == 0)
+ return n;
}
- free(e->name);
- instance_name_destroy(e->api);
+ return NULL;
+}
- free(e);
+static struct reg_instance * reg_entry_get_reg_instance(struct reg_entry * e,
+ pid_t pid)
+{
+ struct list_head * pos = NULL;
- e = NULL;
+ list_for_each(pos, &e->ap_instances) {
+ struct reg_instance * r =
+ list_entry(pos, struct reg_instance, next);
- return 0;
+ if (r->pid == pid)
+ return r;
+ }
+
+ return NULL;
+}
+
+static struct reg_auto * reg_entry_get_reg_auto(struct reg_entry * e,
+ char * ap_name)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &e->auto_ap_info) {
+ struct reg_auto * a =
+ list_entry(pos, struct reg_auto, next);
+
+ if (strcmp(ap_name, a->ap_name) == 0)
+ return a;
+ }
+
+ return NULL;
}
-static struct reg_name_entry * get_reg_name_entry_by_name(char * name)
+static struct reg_entry * get_reg_entry_by_name(char * name)
{
struct list_head * pos = NULL;
- list_for_each(pos, &instance->reg_names) {
- struct reg_name_entry * e =
- list_entry(pos, struct reg_name_entry, next);
+ list_for_each(pos, &instance->registry) {
+ struct reg_entry * e =
+ list_entry(pos, struct reg_entry, next);
if (strcmp(name, e->name) == 0)
return e;
@@ -361,72 +605,305 @@ static struct reg_name_entry * get_reg_name_entry_by_name(char * name)
return NULL;
}
-static struct reg_name_entry * get_reg_name_entry_by_ap_name(char * ap_name)
+static struct reg_entry * get_reg_entry_by_ap_name(char * ap_name)
{
struct list_head * pos = NULL;
- list_for_each(pos, &instance->reg_names) {
- struct reg_name_entry * e =
- list_entry(pos, struct reg_name_entry, next);
+ list_for_each(pos, &instance->registry) {
+ struct list_head * p = NULL;
+ struct reg_entry * e =
+ list_entry(pos, struct reg_entry, next);
- if (strcmp(ap_name, e->api->name) == 0)
- return e;
+ list_for_each(p, &e->ap_names) {
+ struct reg_ap_name * n =
+ list_entry(p, struct reg_ap_name, next);
+
+ if (strcmp(n->ap_name, ap_name) == 0)
+ return e;
+ }
}
return NULL;
}
-static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid)
+static struct reg_entry * get_reg_entry_by_ap_id(pid_t pid)
{
struct list_head * pos = NULL;
- list_for_each(pos, &instance->reg_names) {
- struct reg_name_entry * e =
- list_entry(pos, struct reg_name_entry, next);
+ list_for_each(pos, &instance->registry) {
+ struct list_head * p = NULL;
+ struct reg_entry * e =
+ list_entry(pos, struct reg_entry, next);
- if (e->api->id == pid)
- return e;
+ list_for_each(p, &e->ap_instances) {
+ struct reg_instance * r =
+ list_entry(p, struct reg_instance, next);
+
+ if (r->pid == pid)
+ return e;
+ }
}
return NULL;
}
-/* FIXME: add only name when we have NSM solved */
-static int reg_name_entry_add_name_instance(char * name,
- instance_name_t * api,
- char ** argv,
- bool autoexec)
+static int registry_add_entry(char * name, char * ap_name, uint32_t flags)
{
- struct reg_name_entry * e = get_reg_name_entry_by_name(name);
+ struct reg_entry * e = NULL;
+
+ if (name == NULL || ap_name == NULL)
+ return -EINVAL;
+
+ e = get_reg_entry_by_name(name);
+ if (e != NULL) {
+ LOG_INFO("Name %s already registered.", name);
+ return -1;
+ }
+
+ e = reg_entry_create();
if (e == NULL) {
- e = reg_name_entry_create();
- if (e == NULL)
- return -1;
+ LOG_ERR("Could not create registry entry.");
+ return -1;
+ }
- if (reg_name_entry_init(e, name, api, argv, autoexec)
- == NULL) {
- reg_name_entry_destroy(e);
+ e = reg_entry_init(e, name, ap_name, flags);
+ if (e == NULL) {
+ LOG_ERR("Could not initialize registry entry.");
+ reg_entry_destroy(e);
+ return -1;
+ }
+
+ list_add(&e->next, &instance->registry);
+
+ return 0;
+}
+
+static int registry_add_ap_auto(char * name,
+ char * ap_name,
+ char ** argv)
+{
+ struct reg_entry * e;
+ struct reg_auto * a;
+
+ if (name == NULL || ap_name == NULL)
+ return -EINVAL;
+
+ e = get_reg_entry_by_name(name);
+ if (e == NULL) {
+ LOG_DBGF("Name %s not found in registry.", name);
+ return -1;
+ }
+
+ if (!(e->flags & REG_AP_AUTO)) {
+ LOG_DBGF("%s does not allow auto-instantiation.", name);
+ return -1;
+ }
+
+ if (!reg_entry_has_ap_name(e, ap_name)) {
+ LOG_DBGF("AP name %s not associated with %s.", ap_name, name);
+ return -1;
+ }
+
+ if (e->state == REG_NAME_NULL) {
+ LOG_DBGF("Tried to add instantiation info in NULL state.");
+ return -1;
+ }
+
+ a = reg_entry_get_reg_auto(e, ap_name);
+ if (a != NULL) {
+ LOG_DBGF("Updating auto-instantiation info for %s.", ap_name);
+ list_del(&a->next);
+ free(a->ap_name);
+ if (a->argv != NULL) {
+ while (*a->argv != NULL)
+ free(*a->argv++);
+ }
+
+ } else {
+ a = malloc(sizeof(*a));
+ if (a == NULL) {
return -1;
}
+ }
- list_add(&e->next, &instance->reg_names);
- return 0;
+ a->ap_name = ap_name;
+ a->argv = argv;
+
+ switch(e->state) {
+ case REG_NAME_IDLE:
+ e->state = REG_NAME_AUTO_ACCEPT;
+ break;
+ default:
+ break;
+ }
+
+ list_add(&a->next, &e->auto_ap_info);
+
+ return 0;
+}
+
+#if 0
+static int registry_remove_ap_auto(char * name,
+ char * ap_name)
+{
+ struct reg_entry * e;
+ struct reg_auto * a;
+
+ if (name == NULL || ap_name == NULL)
+ return -EINVAL;
+
+ e = get_reg_entry_by_name(name);
+ if (e == NULL) {
+ LOG_DBGF("Name %s not found in registry.", name);
+ return -1;
+ }
+
+ a = reg_entry_get_reg_auto(e, ap_name);
+ if (a == NULL) {
+ LOG_DBGF("Quto-instantiation info for %s not found.", ap_name);
+ return -1;
+ }
+
+ list_del(&a->next);
+
+ switch(e->state) {
+ case REG_NAME_AUTO_ACCEPT:
+ if (list_empty(&e->auto_ap_info))
+ e->state = REG_NAME_IDLE;
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+#endif
+
+static struct reg_instance * registry_add_ap_instance(char * name,
+ pid_t pid)
+{
+ struct reg_entry * e = NULL;
+ struct reg_instance * i = NULL;
+
+ if (name == NULL || pid == 0)
+ return NULL;
+
+ e = get_reg_entry_by_name(name);
+ if (e == NULL) {
+ LOG_DBGF("Name %s not found in registry.", name);
+ return NULL;
+ }
+
+ if (pid == 0) {
+ LOG_DBGF("Invalid pid.");
+ return NULL;
+ }
+
+ if (reg_entry_has_api(e, pid)) {
+ LOG_DBGF("Instance already registered with this name.");
+ return NULL;
+ }
+
+ if (e->state == REG_NAME_NULL) {
+ LOG_DBGF("Tried to add instance in NULL state.");
+ return NULL;
+ }
+
+ i = reg_instance_create(pid);
+ if (i == NULL) {
+ LOG_DBGF("Failed to create reg_instance");
+ return NULL;
+ }
+
+ switch(e->state) {
+ case REG_NAME_IDLE:
+ case REG_NAME_AUTO_EXEC:
+ e->state = REG_NAME_FLOW_ACCEPT;
+ pthread_cond_signal(&e->acc_signal);
+ break;
+ default:
+ break;
+ }
+
+ list_add(&i->next, &e->ap_instances);
+
+ return i;
+}
+
+static int registry_remove_ap_instance(char * name, pid_t pid)
+{
+ struct reg_entry * e = NULL;
+ struct reg_instance * i = NULL;
+
+ if (name == NULL || pid == 0)
+ return -1;
+
+ e = get_reg_entry_by_name(name);
+ if (e == NULL) {
+ LOG_DBGF("Name %s is not registered.", name);
+ return -1;
}
- /* already exists, we don't have NSM yet */
- return -1;
+ i = reg_entry_get_reg_instance(e, pid);
+ if (i == NULL) {
+ LOG_DBGF("Instance %d is not accepting flows for %s.",
+ pid, name);
+ return -1;
+ }
+
+ list_del(&i->next);
+
+ if (list_empty(&e->ap_instances)) {
+ if ((e->flags & REG_AP_AUTO) &&
+ !list_empty(&e->auto_ap_info))
+ e->state = REG_NAME_AUTO_ACCEPT;
+ else
+ e->state = REG_NAME_IDLE;
+ } else {
+ e->state = REG_NAME_FLOW_ACCEPT;
+ }
+
+ return 0;
}
-static int reg_name_entry_del_name(char * name)
+static pid_t registry_resolve_api(struct reg_entry * e)
{
- struct reg_name_entry * e = get_reg_name_entry_by_name(name);
+ struct list_head * pos = NULL;
+
+ /* FIXME: now just returns the first accepting instance */
+ list_for_each(pos, &e->ap_instances) {
+ struct reg_instance * r =
+ list_entry(pos, struct reg_instance, next);
+ return r->pid;
+ }
+
+ return 0;
+}
+
+static char ** registry_resolve_auto(struct reg_entry * e)
+{
+ struct list_head * pos = NULL;
+
+ /* FIXME: now just returns the first accepting instance */
+ list_for_each(pos, &e->auto_ap_info) {
+ struct reg_auto * r =
+ list_entry(pos, struct reg_auto, next);
+ return r->argv;
+ }
+
+ return NULL;
+}
+
+static void registry_del_name(char * name)
+{
+ struct reg_entry * e = get_reg_entry_by_name(name);
if (e == NULL)
- return 0;
+ return;
list_del(&e->next);
- reg_name_entry_destroy(e);
+ reg_entry_destroy(e);
- return 0;
+ return;
}
static pid_t create_ipcp(char * ap_name,
@@ -437,6 +914,11 @@ static pid_t create_ipcp(char * ap_name,
pthread_rwlock_rdlock(&instance->state_lock);
+ if (instance->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&instance->state_lock);
+ return 0;
+ }
+
pid = ipcp_create(ap_name, ipcp_type);
if (pid == -1) {
pthread_rwlock_unlock(&instance->state_lock);
@@ -483,16 +965,21 @@ static pid_t create_ipcp(char * ap_name,
static int destroy_ipcp(instance_name_t * api)
{
struct list_head * pos = NULL;
- struct list_head * n = NULL;
+ struct list_head * n = NULL;
pid_t pid = 0;
if (api == NULL)
return 0;
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_wrlock(&instance->reg_lock);
+
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_ERR("No such IPCP in the system.");
return 0;
}
@@ -511,6 +998,8 @@ static int destroy_ipcp(instance_name_t * api)
}
}
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
LOG_INFO("Destroyed IPCP %d.", pid);
@@ -523,6 +1012,12 @@ static int bootstrap_ipcp(instance_name_t * api,
struct ipcp_entry * entry = NULL;
pthread_rwlock_rdlock(&instance->state_lock);
+
+ if (instance->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&instance->state_lock);
+ return -1;
+ }
+
pthread_rwlock_wrlock(&instance->reg_lock);
if (api->id == 0)
@@ -578,6 +1073,12 @@ static int enroll_ipcp(instance_name_t * api,
struct ipcp_entry * entry = NULL;
pthread_rwlock_rdlock(&instance->state_lock);
+
+ if (instance->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&instance->state_lock);
+ return -1;
+ }
+
pthread_rwlock_rdlock(&instance->reg_lock);
entry = get_ipcp_entry_by_name(api);
@@ -633,6 +1134,7 @@ static int enroll_ipcp(instance_name_t * api,
return 0;
}
+/* FIXME: distinction between registering names and associating instances */
static int ap_reg(char * name,
char * ap_name,
pid_t ap_id,
@@ -644,42 +1146,36 @@ static int ap_reg(char * name,
{
int i;
int ret = 0;
+
struct list_head * pos = NULL;
- struct reg_name_entry * rne = NULL;
+ char ** argv_dup = NULL;
+ char * apn = path_strip(ap_name);
- instance_name_t * api = NULL;
- char ** argv_dup = NULL;
+ uint32_t flags = 0;
pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_wrlock(&instance->reg_lock);
- if (instance->ipcps.next == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
+ if (instance->state != IRMD_RUNNING) {
pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
- api = instance_name_create();
- if (api == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
- return -1;
- }
+ pthread_rwlock_wrlock(&instance->reg_lock);
- if (instance_name_init_from(api, path_strip(ap_name), ap_id) == NULL) {
+ if (list_empty(&instance->ipcps)) {
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
- instance_name_destroy(api);
return -1;
}
- /* check if this name is already registered */
- rne = get_reg_name_entry_by_name(name);
- if (rne != NULL) {
+ if (autoexec)
+ flags |= REG_AP_AUTO;
+
+ if (registry_add_entry(strdup(name), strdup(apn), flags) < 0) {
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
- instance_name_destroy(api);
- return -1; /* can only register one instance for now */
+ LOG_ERR("Failed to register %s.", name);
+ return -1;
}
list_for_each(pos, &instance->ipcps) {
@@ -694,39 +1190,36 @@ static int ap_reg(char * name,
if (ipcp_name_reg(e->api->id, name)) {
LOG_ERR("Could not register "
"%s in DIF %s as %s.",
- api->name, e->dif_name, name);
+ apn, e->dif_name, name);
} else {
LOG_INFO("Registered %s as %s in %s",
- api->name, name, e->dif_name);
+ apn, name, e->dif_name);
++ret;
}
}
}
}
- if (ret == 0) {
+
+ if (ret == 0) {
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
- instance_name_destroy(api);
return -1;
}
- /* we need to duplicate argv */
- if (argc != 0) {
- argv_dup = malloc((argc + 2) * sizeof(*argv_dup));
- argv_dup[0] = strdup(api->name);
- for (i = 1; i <= argc; ++i)
- argv_dup[i] = strdup(argv[i - 1]);
- argv_dup[argc + 1] = NULL;
- }
-
+ if (autoexec) {
+ /* we need to duplicate argv */
+ if (argc != 0) {
+ argv_dup = malloc((argc + 2) * sizeof(*argv_dup));
+ argv_dup[0] = strdup(ap_name);
+ for (i = 1; i <= argc; ++i)
+ argv_dup[i] = strdup(argv[i - 1]);
+ argv_dup[argc + 1] = NULL;
+ }
- /* for now, we register single instances */
- if ((ret = reg_name_entry_add_name_instance(strdup(name),
- api,
- argv_dup,
- autoexec))
- < 0)
- LOG_DBGF("Failed to add application %s.", api->name);
+ registry_add_ap_auto(name, strdup(apn), argv_dup);
+ } else {
+ registry_add_ap_instance(name, ap_id);
+ }
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
@@ -743,13 +1236,19 @@ static int ap_unreg(char * name,
{
int i;
int ret = 0;
- struct reg_name_entry * rne = NULL;
- struct list_head * pos = NULL;
+ struct reg_entry * rne = NULL;
+ struct list_head * pos = NULL;
if (name == NULL || len == 0 || difs == NULL || difs[0] == NULL)
return -1;
pthread_rwlock_rdlock(&instance->state_lock);
+
+ if (instance->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&instance->state_lock);
+ return -1;
+ }
+
pthread_rwlock_wrlock(&instance->reg_lock);
if (!hard && strcmp(difs[0], "*") != 0) {
@@ -778,7 +1277,7 @@ static int ap_unreg(char * name,
}
}
- reg_name_entry_del_name(rne->name);
+ registry_del_name(rne->name);
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
@@ -786,69 +1285,62 @@ static int ap_unreg(char * name,
return ret;
}
-static struct port_map_entry * flow_accept(pid_t pid,
- char * srv_ap_name,
- char ** dst_ae_name)
+static struct port_map_entry * flow_accept(pid_t pid,
+ char * srv_ap_name,
+ char ** dst_ae_name)
{
- struct port_map_entry * pme;
- struct reg_name_entry * rne = NULL;
+ struct port_map_entry * pme = NULL;
+ struct reg_entry * rne = NULL;
+ struct reg_instance * rgi = NULL;
pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_wrlock(&instance->reg_lock);
- rne = get_reg_name_entry_by_ap_name(srv_ap_name);
- if (rne == NULL) {
- pthread_rwlock_unlock(&instance->reg_lock);
+ if (instance->state != IRMD_RUNNING) {
pthread_rwlock_unlock(&instance->state_lock);
- LOG_DBGF("AP %s is unknown.", srv_ap_name);
return NULL;
}
- if (rne->api->id == 0) {
- rne->api->id = pid;
- } else if (rne->api->id != pid) {
- pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_unlock(&instance->state_lock);
- LOG_DBGF("Can only register one instance.");
- LOG_MISSING;
- return NULL;
- }
+ pthread_rwlock_wrlock(&instance->reg_lock);
- if (rne->accept) {
+ rne = get_reg_entry_by_ap_name(srv_ap_name);
+ if (rne == NULL) {
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
- LOG_DBGF("This AP still has a pending accept().");
+ LOG_DBGF("AP %s is unknown.", srv_ap_name);
return NULL;
}
- rne->accept = true;
- rne->flow_arrived = -1;
-
- pthread_cond_broadcast(&rne->acc_signal);
+ if (!reg_entry_has_api(rne, pid)) {
+ rgi = registry_add_ap_instance(rne->name, pid);
+ if (rgi == NULL) {
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
+ LOG_ERR("Failed to register instance %d with %s.",
+ pid,srv_ap_name);
+ return NULL;
+ }
+ LOG_DBGF("New instance (%d) of %s added.", pid, srv_ap_name);
+ }
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
- pthread_mutex_lock(&rne->acc_lock);
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) &rne->acc_lock);
-
- while (rne->flow_arrived == -1)
- pthread_cond_wait(&rne->acc_arr_signal, &rne->acc_lock);
+ reg_instance_sleep(rgi);
- pthread_cleanup_pop(true);
-
- pthread_mutex_lock(&rne->acc_lock);
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->reg_lock);
+ pthread_mutex_lock(&rne->state_lock);
- /* ap with pending accept being unregistered */
- if (rne->flow_arrived == -2 ) {
- pthread_mutex_unlock(&rne->acc_lock);
+ if (rne->state != REG_NAME_FLOW_ARRIVED) {
+ pthread_mutex_unlock(&rne->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
return NULL;
}
- pthread_mutex_unlock(&rne->acc_lock);
+ pthread_mutex_unlock(&rne->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
- pthread_rwlock_rdlock(&instance->state_lock);
pthread_rwlock_rdlock(&instance->flows_lock);
pme = get_port_map_entry_n(pid);
@@ -859,6 +1351,8 @@ static struct port_map_entry * flow_accept(pid_t pid,
return NULL;
}
+ rne->req_ae_name = NULL;
+
if (dst_ae_name != NULL)
*dst_ae_name = rne->req_ae_name;
@@ -873,38 +1367,39 @@ static int flow_alloc_resp(pid_t n_pid,
int response)
{
struct port_map_entry * pme = NULL;
- struct reg_name_entry * rne = NULL;
+ struct reg_entry * rne = NULL;
int ret = -1;
+ LOG_DBGF("Instance %d response for flow %d", n_pid, port_id);
+
pthread_rwlock_rdlock(&instance->state_lock);
- pthread_rwlock_rdlock(&instance->reg_lock);
- rne = get_reg_name_entry_by_id(n_pid);
+ if (instance->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&instance->state_lock);
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&instance->reg_lock);
+
+ rne = get_reg_entry_by_ap_id(n_pid);
if (rne == NULL) {
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
return -1;
}
- /* FIXME: check all instances associated with the name */
- if (!rne->accept) {
+ if (rne->state != REG_NAME_FLOW_ARRIVED) {
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
- LOG_ERR("No process listening for this name.");
+ LOG_ERR("Process not listening for this name.");
return -1;
}
- /*
- * consider the flow as handled
- * once we can handle a list of AP-I's, remove it from the list
- */
+ pthread_mutex_lock(&rne->state_lock);
- pthread_mutex_lock(&rne->acc_lock);
+ registry_remove_ap_instance(rne->name, n_pid);
- rne->accept = false;
- rne->flow_arrived = -1;
-
- pthread_mutex_unlock(&rne->acc_lock);
+ pthread_mutex_unlock(&rne->state_lock);
pthread_rwlock_unlock(&instance->reg_lock);
@@ -919,14 +1414,12 @@ static int flow_alloc_resp(pid_t n_pid,
}
pme->state = FLOW_ALLOCATED;
-
pthread_rwlock_unlock(&instance->flows_lock);
ret = ipcp_flow_alloc_resp(pme->n_1_pid,
port_id,
pme->n_pid,
response);
-
}
pthread_rwlock_unlock(&instance->state_lock);
@@ -945,18 +1438,25 @@ static struct port_map_entry * flow_alloc(pid_t pid,
/* FIXME: Map qos_spec to qos_cube */
+ pthread_rwlock_rdlock(&instance->state_lock);
+
+ if (instance->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&instance->state_lock);
+ return NULL;
+ }
+
pme = port_map_entry_create();
if (pme == NULL) {
- LOG_ERR("Failed malloc of port_map_entry.");
+ pthread_rwlock_unlock(&instance->state_lock);
+ LOG_ERR("Failed to create port_map_entry.");
return NULL;
}
- pme->n_pid = pid;
- pme->state = FLOW_PENDING;
+ pme->n_pid = pid;
+ pme->state = FLOW_PENDING;
if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)
LOG_WARN("Failed to set timestamp.");
- pthread_rwlock_rdlock(&instance->state_lock);
pthread_rwlock_rdlock(&instance->reg_lock);
if (qos != NULL)
@@ -1005,6 +1505,11 @@ static int flow_alloc_res(int port_id)
struct port_map_entry * e;
pthread_rwlock_rdlock(&instance->state_lock);
+
+ if (instance->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&instance->state_lock);
+ return -1;
+ }
pthread_rwlock_rdlock(&instance->flows_lock);
e = get_port_map_entry(port_id);
@@ -1102,10 +1607,10 @@ static int flow_dealloc(int port_id)
return ret;
}
-static int auto_execute(char * ap, char ** argv)
+static int auto_execute(char ** argv)
{
pid_t pid;
- LOG_INFO("Executing %s.", ap);
+ LOG_INFO("Executing %s.", argv[0]);
pid = fork();
if (pid == -1) {
LOG_ERR("Failed to fork");
@@ -1116,90 +1621,114 @@ static int auto_execute(char * ap, char ** argv)
return pid;
}
- execv(ap, argv);
+ execv(argv[0], argv);
LOG_ERR("Failed to execute.");
exit(EXIT_FAILURE);
- return 0;
}
static struct port_map_entry * flow_req_arr(pid_t pid,
char * dst_name,
char * ae_name)
{
- struct reg_name_entry * rne;
- struct port_map_entry * pme;
- bool acc_wait = true;
+ struct reg_entry * rne = NULL;
+ struct port_map_entry * pme = NULL;
+
+ bool acc_wait = true;
- pme = malloc(sizeof(*pme));
+ pme = port_map_entry_create();
if (pme == NULL) {
- LOG_ERR("Failed malloc of port_map_entry.");
+ LOG_ERR("Failed to create port_map_entry.");
return NULL;
}
pme->state = FLOW_PENDING;
pme->n_1_pid = pid;
+ if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)
+ LOG_WARN("Failed to set timestamp.");
pthread_rwlock_rdlock(&instance->state_lock);
pthread_rwlock_rdlock(&instance->reg_lock);
- pthread_rwlock_wrlock(&instance->flows_lock);
- pme->port_id = bmp_allocate(instance->port_ids);
-
- rne = get_reg_name_entry_by_name(dst_name);
+ rne = get_reg_entry_by_name(dst_name);
if (rne == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
- LOG_DBGF("Destination name %s unknown.", dst_name);
+ LOG_DBGF("Unknown name: %s.", dst_name);
free(pme);
return NULL;
}
- pme->n_pid = rne->api->id;
+ pthread_mutex_lock(&rne->state_lock);
- list_add(&pme->next, &instance->port_map);
+ switch (rne->state) {
+ case REG_NAME_IDLE:
+ pthread_mutex_unlock(&rne->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
+ LOG_DBGF("No AP's for %s.", dst_name);
+ free(pme);
+ return NULL;
+ case REG_NAME_AUTO_ACCEPT:
+ rne->state = REG_NAME_AUTO_EXEC;
+ pthread_mutex_unlock(&rne->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ pthread_rwlock_unlock(&instance->state_lock);
- pthread_rwlock_unlock(&instance->flows_lock);
+ if (auto_execute(registry_resolve_auto(rne)) < 0) {
+ free(pme);
+ return NULL;
+ }
- pthread_mutex_lock(&rne->acc_lock);
+ pthread_mutex_lock(&rne->state_lock);
+ pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
+ (void *) &rne->state_lock);
- pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
- (void *) &rne->acc_lock);
+ while (rne->state == REG_NAME_AUTO_EXEC)
+ pthread_cond_wait(&rne->acc_signal,
+ &rne->state_lock);
- rne->req_ae_name = ae_name;
+ pthread_cleanup_pop(true);
- if (rne->accept == false) {
- if (rne->autoexec) {
- pthread_rwlock_wrlock(&instance->flows_lock);
- pme->n_pid = auto_execute(rne->api->name, rne->argv);
- pthread_rwlock_unlock(&instance->flows_lock);
- while (rne->accept == false)
- pthread_cond_wait(&rne->acc_signal,
- &rne->acc_lock);
- } else {
- pthread_mutex_unlock(&rne->acc_lock);
- LOG_WARN("%s is not accepting flow allocations.",
- rne->name);
- return NULL;
+ pthread_rwlock_rdlock(&instance->state_lock);
+ pthread_rwlock_rdlock(&instance->reg_lock);
+ pthread_mutex_lock(&rne->state_lock);
+ case REG_NAME_FLOW_ACCEPT:
+ pme->n_pid = registry_resolve_api(rne);
+ if(pme->n_pid == 0) {
+ LOG_DBGF("Invalid pid returned.");
+ exit(EXIT_FAILURE);
}
+ pthread_mutex_unlock(&rne->state_lock);
+ pthread_rwlock_unlock(&instance->reg_lock);
+ break;
+ default:
+ LOG_DBGF("IRMs in wrong state.");
+ break;
}
- rne->flow_arrived = 0;
+ pthread_rwlock_wrlock(&instance->flows_lock);
+ pme->port_id = bmp_allocate(instance->port_ids);
- if (pthread_cond_signal(&rne->acc_arr_signal))
- LOG_ERR("Failed to send signal.");
+ list_add(&pme->next, &instance->port_map);
- pthread_cleanup_pop(true);
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ rne->req_ae_name = ae_name;
+
+ rne->state = REG_NAME_FLOW_ARRIVED;
+
+ reg_instance_wake(reg_entry_get_reg_instance(rne, pme->n_pid));
+
+ pthread_mutex_unlock(&rne->state_lock);
while (acc_wait) {
- pthread_mutex_lock(&rne->acc_lock);
- acc_wait = (rne->flow_arrived != -1);
- pthread_mutex_unlock(&rne->acc_lock);
+ pthread_mutex_lock(&rne->state_lock);
+ acc_wait = (rne->state == REG_NAME_FLOW_ARRIVED);
+ pthread_mutex_unlock(&rne->state_lock);
}
- pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
return pme;
@@ -1268,26 +1797,30 @@ static void irm_destroy()
struct list_head * h;
struct list_head * t;
- pthread_rwlock_wrlock(&instance->state_lock);
- pthread_rwlock_wrlock(&instance->reg_lock);
+
+ pthread_rwlock_rdlock(&instance->state_lock);
+
+ if (instance->state != IRMD_NULL)
+ LOG_DBGF("Unsafe destroy.");
if (instance->threadpool != NULL)
free(instance->threadpool);
+ pthread_rwlock_wrlock(&instance->reg_lock);
+
if (instance->port_ids != NULL)
bmp_destroy(instance->port_ids);
/* clear the lists */
list_for_each_safe(h, t, &instance->ipcps) {
struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next);
- destroy_ipcp(e->api);
+ list_del(&e->next);
+ ipcp_entry_destroy(e);
}
- list_for_each_safe(h, t, &instance->reg_names) {
- struct reg_name_entry * e = list_entry(h,
- struct reg_name_entry,
- next);
+ list_for_each_safe(h, t, &instance->registry) {
+ struct reg_entry * e = list_entry(h, struct reg_entry, next);
list_del(&e->next);
- reg_name_entry_destroy(e);
+ reg_entry_destroy(e);
}
pthread_rwlock_unlock(&instance->reg_lock);
@@ -1300,7 +1833,7 @@ static void irm_destroy()
next);
list_del(&e->next);
- free(e);
+ port_map_entry_destroy(e);
}
pthread_rwlock_unlock(&instance->flows_lock);
@@ -1326,7 +1859,7 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)
case SIGHUP:
pthread_rwlock_wrlock(&instance->state_lock);
-
+ instance->state = IRMD_NULL;
pthread_rwlock_unlock(&instance->state_lock);
@@ -1353,6 +1886,10 @@ void * irm_flow_cleaner()
struct timespec now;
struct list_head * pos = NULL;
struct list_head * n = NULL;
+
+ struct list_head * pos2 = NULL;
+ struct list_head * n2 = NULL;
+
struct timespec timeout = {IRMD_CLEANUP_TIMER / BILLION,
IRMD_CLEANUP_TIMER % BILLION};
@@ -1367,6 +1904,7 @@ void * irm_flow_cleaner()
list_for_each_safe(pos, n, &(instance->port_map)) {
struct port_map_entry * e =
list_entry(pos, struct port_map_entry, next);
+
pthread_mutex_lock(&e->res_lock);
if (e->state == FLOW_PENDING &&
@@ -1399,6 +1937,29 @@ void * irm_flow_cleaner()
}
pthread_rwlock_unlock(&instance->flows_lock);
+
+ pthread_rwlock_wrlock(&instance->reg_lock);
+
+ list_for_each_safe(pos, n, &(instance->registry)) {
+ struct reg_entry * e =
+ list_entry(pos, struct reg_entry, next);
+
+ list_for_each_safe(pos2, n2, &e->ap_instances) {
+ struct reg_instance * r =
+ list_entry(pos2,
+ struct reg_instance,
+ next);
+ if (kill(r->pid, 0) < 0) {
+ LOG_DBGF("Process %d gone, "
+ "instance deleted.",
+ r->pid);
+ registry_remove_ap_instance(e->name,
+ r->pid);
+ }
+ }
+ }
+
+ pthread_rwlock_unlock(&instance->reg_lock);
pthread_rwlock_unlock(&instance->state_lock);
nanosleep(&timeout, NULL);
@@ -1641,7 +2202,7 @@ static struct irm * irm_create()
}
INIT_LIST_HEAD(&instance->ipcps);
- INIT_LIST_HEAD(&instance->reg_names);
+ INIT_LIST_HEAD(&instance->registry);
INIT_LIST_HEAD(&instance->port_map);
instance->port_ids = bmp_create(IRMD_MAX_FLOWS, 0);
@@ -1670,6 +2231,8 @@ static struct irm * irm_create()
return NULL;
}
+ instance->state = IRMD_RUNNING;
+
return instance;
}