diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2020-03-08 13:29:21 +0100 |
---|---|---|
committer | Sander Vrijders <sander@ouroboros.rocks> | 2020-03-15 14:20:38 +0100 |
commit | c80c93f11dbfb1b0c07f9a6f8b8d91024e5db507 (patch) | |
tree | c6ac06aa8841bcb4a403507deda4401594d5cdbe /src/irmd | |
parent | 8796a612f0600fc973aa908b84ded837f3470512 (diff) | |
download | ouroboros-c80c93f11dbfb1b0c07f9a6f8b8d91024e5db507.tar.gz ouroboros-c80c93f11dbfb1b0c07f9a6f8b8d91024e5db507.zip |
irm: Revise naming API
This revises the naming API to treat names (or reg_name in the source)
as first-class citizens of the architecture. This is more in line with
the way they are described in the article.
Operations have been added to create/destroy names independently of
registering. This was previously done only as part of register, and
there was no way to delete a name from the IRMd. The create call now
allows specifying a policy for load-balancing incoming flows for a
name. The default is the new round-robin load-balancer, the previous
behaviour is still available as a spillover load-balancer.
The register calls will still create a name if it doesn't exist, with
the default round-robin load-balancer.
The tools now have a "name" section, so the format is now
irm name <operation> <name> ...
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/irmd')
-rw-r--r-- | src/irmd/main.c | 181 | ||||
-rw-r--r-- | src/irmd/registry.c | 23 | ||||
-rw-r--r-- | src/irmd/registry.h | 11 |
3 files changed, 169 insertions, 46 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 65354382..940432f1 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -108,6 +108,7 @@ struct cmd { struct { struct list_head registry; /* registered names known */ + size_t n_names; /* number of names */ struct list_head ipcps; /* list of ipcps in system */ size_t n_ipcps; /* number of ipcps */ @@ -918,7 +919,7 @@ static ssize_t list_ipcps(ipcp_info_msg_t *** ipcps, if (*ipcps == NULL) { pthread_rwlock_unlock(&irmd.reg_lock); *n_ipcps = 0; - return -1; + return -ENOMEM; } list_for_each(p, &irmd.ipcps) { @@ -959,53 +960,140 @@ static ssize_t list_ipcps(ipcp_info_msg_t *** ipcps, return -ENOMEM; } -static int irm_update_name(const char * name) +static int name_create(const char * name, + enum pol_balance pol) { + struct reg_entry * re; struct list_head * p; + assert(name); + pthread_rwlock_wrlock(&irmd.reg_lock); - if (!registry_has_name(&irmd.registry, name)) { - struct reg_entry * re = registry_add_name(&irmd.registry, name); - if (re == NULL) { - log_err("Failed creating registry entry for %s.", name); - pthread_rwlock_unlock(&irmd.reg_lock); - return -1; - } + if (registry_has_name(&irmd.registry, name)) { + pthread_rwlock_unlock(&irmd.reg_lock); + log_err("Registry entry for %s already exists.", name); + return -ENAME; + } - /* check the tables for client programs */ - list_for_each(p, &irmd.proc_table) { - struct list_head * q; - struct proc_entry * e; - e = list_entry(p, struct proc_entry, next); - list_for_each(q, &e->names) { - struct str_el * s; - s = list_entry(q, struct str_el, next); - if (!strcmp(s->str, name)) - reg_entry_add_pid(re, e->pid); - } + re = registry_add_name(&irmd.registry, name); + if (re == NULL) { + pthread_rwlock_unlock(&irmd.reg_lock); + log_err("Failed creating registry entry for %s.", name); + return -ENOMEM; + } + ++irmd.n_names; + reg_entry_set_policy(re, pol); + + /* check the tables for existing bindings */ + list_for_each(p, &irmd.proc_table) { + struct list_head * q; + struct proc_entry * e; + e = list_entry(p, struct proc_entry, next); + list_for_each(q, &e->names) { + struct str_el * s; + s = list_entry(q, struct str_el, next); + if (!strcmp(s->str, name)) + reg_entry_add_pid(re, e->pid); } + } - list_for_each(p, &irmd.prog_table) { - struct list_head * q; - struct prog_entry * e; - e = list_entry(p, struct prog_entry, next); - list_for_each(q, &e->names) { - struct str_el * s; - s = list_entry(q, struct str_el, next); - if (!strcmp(s->str, name)) - reg_entry_add_prog(re, e); - } + list_for_each(p, &irmd.prog_table) { + struct list_head * q; + struct prog_entry * e; + e = list_entry(p, struct prog_entry, next); + list_for_each(q, &e->names) { + struct str_el * s; + s = list_entry(q, struct str_el, next); + if (!strcmp(s->str, name)) + reg_entry_add_prog(re, e); } } pthread_rwlock_unlock(&irmd.reg_lock); + log_info("Created new name: %s.", name); + + return 0; +} + +static int name_destroy(const char * name) +{ + assert(name); + + pthread_rwlock_wrlock(&irmd.reg_lock); + + if (!registry_has_name(&irmd.registry, name)) { + pthread_rwlock_unlock(&irmd.reg_lock); + log_warn("Registry entry for %s does not exist.", name); + return -ENAME; + } + + registry_del_name(&irmd.registry, name); + --irmd.n_names; + + pthread_rwlock_unlock(&irmd.reg_lock); + + log_info("Destroyed name: %s.", name); + return 0; } -static int name_reg(pid_t pid, - const char * name) +static ssize_t list_names(name_info_msg_t *** names, + size_t * n_names) +{ + struct list_head * p; + int i = 0; + + pthread_rwlock_rdlock(&irmd.reg_lock); + + *n_names = irmd.n_names; + if (*n_names == 0) { + pthread_rwlock_unlock(&irmd.reg_lock); + return 0; + } + + *names = malloc(irmd.n_names * sizeof(**names)); + if (*names == NULL) { + *n_names = 0; + pthread_rwlock_unlock(&irmd.reg_lock); + return -ENOMEM; + } + + list_for_each(p, &irmd.registry) { + struct reg_entry * e = list_entry(p, struct reg_entry, next); + + (*names)[i] = malloc(sizeof(***names)); + if ((*names)[i] == NULL) { + --i; + goto fail; + } + + name_info_msg__init((*names)[i]); + (*names)[i]->name = strdup(e->name); + if ((*names)[i]->name == NULL) + goto fail; + + (*names)[i++]->pol_lb = e->pol_lb; + } + + pthread_rwlock_unlock(&irmd.reg_lock); + + return 0; + + fail: + pthread_rwlock_unlock(&irmd.reg_lock); + while (i >= 0) { + free((*names)[i]->name); + free(*names[i--]); + } + free(*names); + *n_names = 0; + return -ENOMEM; +} + +static int name_reg(const char * name, + pid_t pid) { size_t len; struct ipcp_entry * ipcp; @@ -1016,6 +1104,11 @@ static int name_reg(pid_t pid, pthread_rwlock_wrlock(&irmd.reg_lock); + if (!registry_has_name(&irmd.registry, name)) { + err = -ENAME; + goto fail; + } + ipcp = get_ipcp_entry_by_pid(pid); if (ipcp == NULL) { err = -EIPCP; @@ -1045,8 +1138,6 @@ static int name_reg(pid_t pid, return -1; } - irm_update_name(name); - log_info("Registered %s with IPCP %d as " HASH_FMT ".", name, pid, HASH_VAL(hash)); @@ -1059,8 +1150,8 @@ fail: return err; } -static int name_unreg(pid_t pid, - const char * name) +static int name_unreg(const char * name, + pid_t pid) { struct ipcp_entry * ipcp; int err; @@ -2021,11 +2112,21 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IRM_LIST_IPCPS: result = list_ipcps(&ret_msg->ipcps, &ret_msg->n_ipcps); break; - case IRM_MSG_CODE__IRM_REG: - result = name_reg(msg->pid, msg->name); + case IRM_MSG_CODE__IRM_CREATE_NAME: + result = name_create(msg->names[0]->name, + msg->names[0]->pol_lb); + break; + case IRM_MSG_CODE__IRM_DESTROY_NAME: + result = name_destroy(msg->name); + break; + case IRM_MSG_CODE__IRM_LIST_NAMES: + result = list_names(&ret_msg->names, &ret_msg->n_names); + break; + case IRM_MSG_CODE__IRM_REG_NAME: + result = name_reg(msg->name, msg->pid); break; - case IRM_MSG_CODE__IRM_UNREG: - result = name_unreg(msg->pid, msg->name); + case IRM_MSG_CODE__IRM_UNREG_NAME: + result = name_unreg(msg->name, msg->pid); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: assert(msg->pk.len > 0 ? msg->pk.data != NULL diff --git a/src/irmd/registry.c b/src/irmd/registry.c index ef0bc8cc..66cc5af3 100644 --- a/src/irmd/registry.c +++ b/src/irmd/registry.c @@ -32,7 +32,6 @@ #include <ouroboros/errno.h> #include <ouroboros/logs.h> -#include <ouroboros/irm.h> #include <ouroboros/time_utils.h> #include "registry.h" @@ -72,7 +71,8 @@ static int reg_entry_init(struct reg_entry * e, list_head_init(&e->reg_progs); list_head_init(&e->reg_pids); - e->name = name; + e->name = name; + e->pol_lb = 0; if (pthread_condattr_init(&cattr)) goto fail_cattr; @@ -286,7 +286,17 @@ int reg_entry_add_pid(struct reg_entry * e, i->pid = pid; - list_add(&i->next, &e->reg_pids); + /* load balancing policy assigns queue order for this process. */ + switch(e->pol_lb) { + case LB_RR: /* Round robin policy. */ + list_add_tail(&i->next, &e->reg_pids); + break; + case LB_SPILL: /* Keep accepting flows on the current process */ + list_add(&i->next, &e->reg_pids); + break; + default: + assert(false); + }; if (e->state == REG_NAME_IDLE || e->state == REG_NAME_AUTO_ACCEPT || @@ -300,6 +310,13 @@ int reg_entry_add_pid(struct reg_entry * e, return 0; } +void reg_entry_set_policy(struct reg_entry * e, + enum pol_balance p) +{ + e->pol_lb = p; +} + + static void reg_entry_check_state(struct reg_entry * e) { assert(e); diff --git a/src/irmd/registry.h b/src/irmd/registry.h index 31c1b668..bc9aa23b 100644 --- a/src/irmd/registry.h +++ b/src/irmd/registry.h @@ -26,6 +26,7 @@ #include <ouroboros/hash.h> #include <ouroboros/ipcp.h> #include <ouroboros/list.h> +#include <ouroboros/irm.h> #include "proc_table.h" #include "prog_table.h" @@ -54,9 +55,11 @@ struct reg_entry { struct list_head next; char * name; - /* Programs that can be instantiated by the irmd */ + /* Policies for this name. */ + enum pol_balance pol_lb; /* Load balance incoming flows. */ + /* Programs that can be instantiated by the irmd. */ struct list_head reg_progs; - /* Processes that are listening for this name */ + /* Processes that are listening for this name. */ struct list_head reg_pids; enum reg_name_state state; @@ -72,7 +75,6 @@ void reg_entry_del_prog(struct reg_entry * e, char * reg_entry_get_prog(struct reg_entry * e); - int reg_entry_add_pid(struct reg_entry * e, pid_t pid); @@ -84,6 +86,9 @@ void reg_entry_del_pid_el(struct reg_entry * e, pid_t reg_entry_get_pid(struct reg_entry * e); +void reg_entry_set_policy(struct reg_entry * e, + enum pol_balance p); + enum reg_name_state reg_entry_get_state(struct reg_entry * e); int reg_entry_set_state(struct reg_entry * e, |