summaryrefslogtreecommitdiff
path: root/src/irmd
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-03-08 13:29:21 +0100
committerSander Vrijders <sander@ouroboros.rocks>2020-03-15 14:20:38 +0100
commitc80c93f11dbfb1b0c07f9a6f8b8d91024e5db507 (patch)
treec6ac06aa8841bcb4a403507deda4401594d5cdbe /src/irmd
parent8796a612f0600fc973aa908b84ded837f3470512 (diff)
downloadouroboros-c80c93f11dbfb1b0c07f9a6f8b8d91024e5db507.tar.gz
ouroboros-c80c93f11dbfb1b0c07f9a6f8b8d91024e5db507.zip
irm: Revise naming API
This revises the naming API to treat names (or reg_name in the source) as first-class citizens of the architecture. This is more in line with the way they are described in the article. Operations have been added to create/destroy names independently of registering. This was previously done only as part of register, and there was no way to delete a name from the IRMd. The create call now allows specifying a policy for load-balancing incoming flows for a name. The default is the new round-robin load-balancer, the previous behaviour is still available as a spillover load-balancer. The register calls will still create a name if it doesn't exist, with the default round-robin load-balancer. The tools now have a "name" section, so the format is now irm name <operation> <name> ... Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/irmd')
-rw-r--r--src/irmd/main.c181
-rw-r--r--src/irmd/registry.c23
-rw-r--r--src/irmd/registry.h11
3 files changed, 169 insertions, 46 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 65354382..940432f1 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -108,6 +108,7 @@ struct cmd {
struct {
struct list_head registry; /* registered names known */
+ size_t n_names; /* number of names */
struct list_head ipcps; /* list of ipcps in system */
size_t n_ipcps; /* number of ipcps */
@@ -918,7 +919,7 @@ static ssize_t list_ipcps(ipcp_info_msg_t *** ipcps,
if (*ipcps == NULL) {
pthread_rwlock_unlock(&irmd.reg_lock);
*n_ipcps = 0;
- return -1;
+ return -ENOMEM;
}
list_for_each(p, &irmd.ipcps) {
@@ -959,53 +960,140 @@ static ssize_t list_ipcps(ipcp_info_msg_t *** ipcps,
return -ENOMEM;
}
-static int irm_update_name(const char * name)
+static int name_create(const char * name,
+ enum pol_balance pol)
{
+ struct reg_entry * re;
struct list_head * p;
+ assert(name);
+
pthread_rwlock_wrlock(&irmd.reg_lock);
- if (!registry_has_name(&irmd.registry, name)) {
- struct reg_entry * re = registry_add_name(&irmd.registry, name);
- if (re == NULL) {
- log_err("Failed creating registry entry for %s.", name);
- pthread_rwlock_unlock(&irmd.reg_lock);
- return -1;
- }
+ if (registry_has_name(&irmd.registry, name)) {
+ pthread_rwlock_unlock(&irmd.reg_lock);
+ log_err("Registry entry for %s already exists.", name);
+ return -ENAME;
+ }
- /* check the tables for client programs */
- list_for_each(p, &irmd.proc_table) {
- struct list_head * q;
- struct proc_entry * e;
- e = list_entry(p, struct proc_entry, next);
- list_for_each(q, &e->names) {
- struct str_el * s;
- s = list_entry(q, struct str_el, next);
- if (!strcmp(s->str, name))
- reg_entry_add_pid(re, e->pid);
- }
+ re = registry_add_name(&irmd.registry, name);
+ if (re == NULL) {
+ pthread_rwlock_unlock(&irmd.reg_lock);
+ log_err("Failed creating registry entry for %s.", name);
+ return -ENOMEM;
+ }
+ ++irmd.n_names;
+ reg_entry_set_policy(re, pol);
+
+ /* check the tables for existing bindings */
+ list_for_each(p, &irmd.proc_table) {
+ struct list_head * q;
+ struct proc_entry * e;
+ e = list_entry(p, struct proc_entry, next);
+ list_for_each(q, &e->names) {
+ struct str_el * s;
+ s = list_entry(q, struct str_el, next);
+ if (!strcmp(s->str, name))
+ reg_entry_add_pid(re, e->pid);
}
+ }
- list_for_each(p, &irmd.prog_table) {
- struct list_head * q;
- struct prog_entry * e;
- e = list_entry(p, struct prog_entry, next);
- list_for_each(q, &e->names) {
- struct str_el * s;
- s = list_entry(q, struct str_el, next);
- if (!strcmp(s->str, name))
- reg_entry_add_prog(re, e);
- }
+ list_for_each(p, &irmd.prog_table) {
+ struct list_head * q;
+ struct prog_entry * e;
+ e = list_entry(p, struct prog_entry, next);
+ list_for_each(q, &e->names) {
+ struct str_el * s;
+ s = list_entry(q, struct str_el, next);
+ if (!strcmp(s->str, name))
+ reg_entry_add_prog(re, e);
}
}
pthread_rwlock_unlock(&irmd.reg_lock);
+ log_info("Created new name: %s.", name);
+
+ return 0;
+}
+
+static int name_destroy(const char * name)
+{
+ assert(name);
+
+ pthread_rwlock_wrlock(&irmd.reg_lock);
+
+ if (!registry_has_name(&irmd.registry, name)) {
+ pthread_rwlock_unlock(&irmd.reg_lock);
+ log_warn("Registry entry for %s does not exist.", name);
+ return -ENAME;
+ }
+
+ registry_del_name(&irmd.registry, name);
+ --irmd.n_names;
+
+ pthread_rwlock_unlock(&irmd.reg_lock);
+
+ log_info("Destroyed name: %s.", name);
+
return 0;
}
-static int name_reg(pid_t pid,
- const char * name)
+static ssize_t list_names(name_info_msg_t *** names,
+ size_t * n_names)
+{
+ struct list_head * p;
+ int i = 0;
+
+ pthread_rwlock_rdlock(&irmd.reg_lock);
+
+ *n_names = irmd.n_names;
+ if (*n_names == 0) {
+ pthread_rwlock_unlock(&irmd.reg_lock);
+ return 0;
+ }
+
+ *names = malloc(irmd.n_names * sizeof(**names));
+ if (*names == NULL) {
+ *n_names = 0;
+ pthread_rwlock_unlock(&irmd.reg_lock);
+ return -ENOMEM;
+ }
+
+ list_for_each(p, &irmd.registry) {
+ struct reg_entry * e = list_entry(p, struct reg_entry, next);
+
+ (*names)[i] = malloc(sizeof(***names));
+ if ((*names)[i] == NULL) {
+ --i;
+ goto fail;
+ }
+
+ name_info_msg__init((*names)[i]);
+ (*names)[i]->name = strdup(e->name);
+ if ((*names)[i]->name == NULL)
+ goto fail;
+
+ (*names)[i++]->pol_lb = e->pol_lb;
+ }
+
+ pthread_rwlock_unlock(&irmd.reg_lock);
+
+ return 0;
+
+ fail:
+ pthread_rwlock_unlock(&irmd.reg_lock);
+ while (i >= 0) {
+ free((*names)[i]->name);
+ free(*names[i--]);
+ }
+ free(*names);
+ *n_names = 0;
+ return -ENOMEM;
+}
+
+static int name_reg(const char * name,
+ pid_t pid)
{
size_t len;
struct ipcp_entry * ipcp;
@@ -1016,6 +1104,11 @@ static int name_reg(pid_t pid,
pthread_rwlock_wrlock(&irmd.reg_lock);
+ if (!registry_has_name(&irmd.registry, name)) {
+ err = -ENAME;
+ goto fail;
+ }
+
ipcp = get_ipcp_entry_by_pid(pid);
if (ipcp == NULL) {
err = -EIPCP;
@@ -1045,8 +1138,6 @@ static int name_reg(pid_t pid,
return -1;
}
- irm_update_name(name);
-
log_info("Registered %s with IPCP %d as " HASH_FMT ".",
name, pid, HASH_VAL(hash));
@@ -1059,8 +1150,8 @@ fail:
return err;
}
-static int name_unreg(pid_t pid,
- const char * name)
+static int name_unreg(const char * name,
+ pid_t pid)
{
struct ipcp_entry * ipcp;
int err;
@@ -2021,11 +2112,21 @@ static void * mainloop(void * o)
case IRM_MSG_CODE__IRM_LIST_IPCPS:
result = list_ipcps(&ret_msg->ipcps, &ret_msg->n_ipcps);
break;
- case IRM_MSG_CODE__IRM_REG:
- result = name_reg(msg->pid, msg->name);
+ case IRM_MSG_CODE__IRM_CREATE_NAME:
+ result = name_create(msg->names[0]->name,
+ msg->names[0]->pol_lb);
+ break;
+ case IRM_MSG_CODE__IRM_DESTROY_NAME:
+ result = name_destroy(msg->name);
+ break;
+ case IRM_MSG_CODE__IRM_LIST_NAMES:
+ result = list_names(&ret_msg->names, &ret_msg->n_names);
+ break;
+ case IRM_MSG_CODE__IRM_REG_NAME:
+ result = name_reg(msg->name, msg->pid);
break;
- case IRM_MSG_CODE__IRM_UNREG:
- result = name_unreg(msg->pid, msg->name);
+ case IRM_MSG_CODE__IRM_UNREG_NAME:
+ result = name_unreg(msg->name, msg->pid);
break;
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
assert(msg->pk.len > 0 ? msg->pk.data != NULL
diff --git a/src/irmd/registry.c b/src/irmd/registry.c
index ef0bc8cc..66cc5af3 100644
--- a/src/irmd/registry.c
+++ b/src/irmd/registry.c
@@ -32,7 +32,6 @@
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
-#include <ouroboros/irm.h>
#include <ouroboros/time_utils.h>
#include "registry.h"
@@ -72,7 +71,8 @@ static int reg_entry_init(struct reg_entry * e,
list_head_init(&e->reg_progs);
list_head_init(&e->reg_pids);
- e->name = name;
+ e->name = name;
+ e->pol_lb = 0;
if (pthread_condattr_init(&cattr))
goto fail_cattr;
@@ -286,7 +286,17 @@ int reg_entry_add_pid(struct reg_entry * e,
i->pid = pid;
- list_add(&i->next, &e->reg_pids);
+ /* load balancing policy assigns queue order for this process. */
+ switch(e->pol_lb) {
+ case LB_RR: /* Round robin policy. */
+ list_add_tail(&i->next, &e->reg_pids);
+ break;
+ case LB_SPILL: /* Keep accepting flows on the current process */
+ list_add(&i->next, &e->reg_pids);
+ break;
+ default:
+ assert(false);
+ };
if (e->state == REG_NAME_IDLE ||
e->state == REG_NAME_AUTO_ACCEPT ||
@@ -300,6 +310,13 @@ int reg_entry_add_pid(struct reg_entry * e,
return 0;
}
+void reg_entry_set_policy(struct reg_entry * e,
+ enum pol_balance p)
+{
+ e->pol_lb = p;
+}
+
+
static void reg_entry_check_state(struct reg_entry * e)
{
assert(e);
diff --git a/src/irmd/registry.h b/src/irmd/registry.h
index 31c1b668..bc9aa23b 100644
--- a/src/irmd/registry.h
+++ b/src/irmd/registry.h
@@ -26,6 +26,7 @@
#include <ouroboros/hash.h>
#include <ouroboros/ipcp.h>
#include <ouroboros/list.h>
+#include <ouroboros/irm.h>
#include "proc_table.h"
#include "prog_table.h"
@@ -54,9 +55,11 @@ struct reg_entry {
struct list_head next;
char * name;
- /* Programs that can be instantiated by the irmd */
+ /* Policies for this name. */
+ enum pol_balance pol_lb; /* Load balance incoming flows. */
+ /* Programs that can be instantiated by the irmd. */
struct list_head reg_progs;
- /* Processes that are listening for this name */
+ /* Processes that are listening for this name. */
struct list_head reg_pids;
enum reg_name_state state;
@@ -72,7 +75,6 @@ void reg_entry_del_prog(struct reg_entry * e,
char * reg_entry_get_prog(struct reg_entry * e);
-
int reg_entry_add_pid(struct reg_entry * e,
pid_t pid);
@@ -84,6 +86,9 @@ void reg_entry_del_pid_el(struct reg_entry * e,
pid_t reg_entry_get_pid(struct reg_entry * e);
+void reg_entry_set_policy(struct reg_entry * e,
+ enum pol_balance p);
+
enum reg_name_state reg_entry_get_state(struct reg_entry * e);
int reg_entry_set_state(struct reg_entry * e,