diff options
Diffstat (limited to 'src/irmd/reg')
-rw-r--r-- | src/irmd/reg/flow.c | 222 | ||||
-rw-r--r-- | src/irmd/reg/flow.h | 83 | ||||
-rw-r--r-- | src/irmd/reg/ipcp.c | 152 | ||||
-rw-r--r-- | src/irmd/reg/ipcp.h | 58 | ||||
-rw-r--r-- | src/irmd/reg/name.c | 451 | ||||
-rw-r--r-- | src/irmd/reg/name.h | 103 | ||||
-rw-r--r-- | src/irmd/reg/proc.c | 265 | ||||
-rw-r--r-- | src/irmd/reg/proc.h | 75 | ||||
-rw-r--r-- | src/irmd/reg/prog.c | 174 | ||||
-rw-r--r-- | src/irmd/reg/prog.h | 52 |
10 files changed, 1635 insertions, 0 deletions
diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c new file mode 100644 index 00000000..30b9c504 --- /dev/null +++ b/src/irmd/reg/flow.c @@ -0,0 +1,222 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Flows + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#define OUROBOROS_PREFIX "reg-flow" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/pthread.h> + +#include "flow.h" + +#include <assert.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +struct reg_flow * reg_flow_create(pid_t n_pid, + pid_t n_1_pid, + int flow_id, + qosspec_t qs) +{ + pthread_condattr_t cattr; + struct reg_flow * f; + + f = malloc(sizeof(*f)); + if (f == NULL) + goto fail_malloc; + + memset(f, 0, sizeof(*f)); + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&f->cond, &cattr)) + goto fail_cond; + + if (pthread_mutex_init(&f->mtx, NULL)) + goto fail_mutex; + + f->n_rb = shm_rbuff_create(n_pid, flow_id); + if (f->n_rb == NULL) { + log_err("Could not create ringbuffer for process %d.", n_pid); + goto fail_n_rbuff; + } + + f->n_1_rb = shm_rbuff_create(n_1_pid, flow_id); + if (f->n_1_rb == NULL) { + log_err("Could not create ringbuffer for process %d.", n_1_pid); + goto fail_n_1_rbuff; + } + + if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) + log_warn("Failed to set timestamp."); + + pthread_condattr_destroy(&cattr); + + f->n_pid = n_pid; + f->n_1_pid = n_1_pid; + f->flow_id = flow_id; + f->qs = qs; + + f->state = FLOW_ALLOC_PENDING; + + return f; + + fail_n_1_rbuff: + shm_rbuff_destroy(f->n_rb); + fail_n_rbuff: + pthread_mutex_destroy(&f->mtx); + fail_mutex: + pthread_cond_destroy(&f->cond); + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + free(f); + fail_malloc: + return NULL; +} + +static void cancel_irm_destroy(void * o) +{ + struct reg_flow * f = (struct reg_flow *) o; + + pthread_mutex_unlock(&f->mtx); + + pthread_cond_destroy(&f->cond); + pthread_mutex_destroy(&f->mtx); + + shm_rbuff_destroy(f->n_rb); + shm_rbuff_destroy(f->n_1_rb); + + free(f); +} + +void reg_flow_destroy(struct reg_flow * f) +{ + assert(f); + + pthread_mutex_lock(&f->mtx); + + assert(f->data.len == 0); + + if (f->state == FLOW_DESTROY) { + pthread_mutex_unlock(&f->mtx); + return; + } + + if (f->state == FLOW_ALLOC_PENDING) + f->state = FLOW_DESTROY; + else + f->state = FLOW_NULL; + + pthread_cond_broadcast(&f->cond); + + pthread_cleanup_push(cancel_irm_destroy, f); + + while (f->state != FLOW_NULL) + pthread_cond_wait(&f->cond, &f->mtx); + + pthread_cleanup_pop(true); +} + +enum flow_state reg_flow_get_state(struct reg_flow * f) +{ + enum flow_state state; + + assert(f); + + pthread_mutex_lock(&f->mtx); + + state = f->state; + + pthread_mutex_unlock(&f->mtx); + + return state; +} + +void reg_flow_set_state(struct reg_flow * f, + enum flow_state state) +{ + assert(f); + assert(state != FLOW_DESTROY); + + pthread_mutex_lock(&f->mtx); + + f->state = state; + pthread_cond_broadcast(&f->cond); + + pthread_mutex_unlock(&f->mtx); +} + +int reg_flow_wait_state(struct reg_flow * f, + enum flow_state state, + struct timespec * dl) +{ + int ret = 0; + int s; + + assert(f); + assert(state != FLOW_NULL); + assert(state != FLOW_DESTROY); + assert(state != FLOW_DEALLOC_PENDING); + + pthread_mutex_lock(&f->mtx); + + assert(f->state != FLOW_NULL); + + pthread_cleanup_push(__cleanup_mutex_unlock, &f->mtx); + + while (!(f->state == state || + f->state == FLOW_DESTROY || + f->state == FLOW_DEALLOC_PENDING) && + ret != -ETIMEDOUT) { + if (dl != NULL) + ret = -pthread_cond_timedwait(&f->cond, + &f->mtx, + dl); + else + ret = -pthread_cond_wait(&f->cond, + &f->mtx); + } + + if (f->state == FLOW_DESTROY || + f->state == FLOW_DEALLOC_PENDING || + ret == -ETIMEDOUT) { + f->state = FLOW_NULL; + pthread_cond_broadcast(&f->cond); + } + + s = f->state; + + pthread_cleanup_pop(true); + + return ret ? ret : s; +} diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h new file mode 100644 index 00000000..9af15032 --- /dev/null +++ b/src/irmd/reg/flow.h @@ -0,0 +1,83 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Flows + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_FLOW_H +#define OUROBOROS_IRMD_REG_FLOW_H + +#include <ouroboros/list.h> +#include <ouroboros/qos.h> +#include <ouroboros/shm_rbuff.h> +#include <ouroboros/utils.h> + +#include <sys/types.h> +#include <pthread.h> +#include <time.h> + +enum flow_state { + FLOW_NULL = 0, + FLOW_ALLOC_PENDING, + FLOW_ALLOC_REQ_PENDING, + FLOW_ALLOCATED, + FLOW_DEALLOC_PENDING, + FLOW_DESTROY +}; + +struct reg_flow { + struct list_head next; + + int flow_id; + + pid_t n_pid; + pid_t n_1_pid; + + qosspec_t qs; + time_t mpl; + buffer_t data; + + struct shm_rbuff * n_rb; + struct shm_rbuff * n_1_rb; + + struct timespec t0; + + enum flow_state state; + pthread_cond_t cond; + pthread_mutex_t mtx; +}; + +struct reg_flow * reg_flow_create(pid_t n_pid, + pid_t n_1_pid, + int flow_id, + qosspec_t qs); + +void reg_flow_destroy(struct reg_flow * f); + +enum flow_state reg_flow_get_state(struct reg_flow * f); + + +void reg_flow_set_state(struct reg_flow * f, + enum flow_state state); + +int reg_flow_wait_state(struct reg_flow * f, + enum flow_state state, + struct timespec * timeo); + +#endif /* OUROBOROS_IRMD_REG_FLOW_H */ diff --git a/src/irmd/reg/ipcp.c b/src/irmd/reg/ipcp.c new file mode 100644 index 00000000..62505871 --- /dev/null +++ b/src/irmd/reg/ipcp.c @@ -0,0 +1,152 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - IPCPs + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> +#include <ouroboros/pthread.h> +#include <ouroboros/time_utils.h> + +#include "ipcp.h" + +#include <assert.h> +#include <signal.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +struct reg_ipcp * reg_ipcp_create(const char * name, + enum ipcp_type type) +{ + struct reg_ipcp * ipcp; + pthread_condattr_t cattr; + + ipcp = malloc(sizeof(*ipcp)); + if (ipcp == NULL) + goto fail_malloc; + + if (pthread_mutex_init(&ipcp->mtx, NULL)) + goto fail_mutex; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&ipcp->cond, &cattr)) + goto fail_cond; + + ipcp->name = strdup(name); + if (ipcp->name == NULL) + goto fail_name; + + pthread_condattr_destroy(&cattr); + + ipcp->layer = NULL; + ipcp->type = type; + ipcp->state = IPCP_BOOT; + + list_head_init(&ipcp->next); + + return ipcp; + + fail_name: + pthread_cond_destroy(&ipcp->cond); + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&ipcp->mtx); + fail_mutex: + free(ipcp); + fail_malloc: + return NULL; +} + +void reg_ipcp_destroy(struct reg_ipcp * ipcp) +{ + assert(ipcp); + + pthread_mutex_lock(&ipcp->mtx); + + while (ipcp->state == IPCP_BOOT) + pthread_cond_wait(&ipcp->cond, &ipcp->mtx); + + free(ipcp->layer); + free(ipcp->name); + + pthread_mutex_unlock(&ipcp->mtx); + + pthread_cond_destroy(&ipcp->cond); + pthread_mutex_destroy(&ipcp->mtx); + + free(ipcp); +} + +void reg_ipcp_set_state(struct reg_ipcp * ipcp, + enum ipcp_state state) +{ + pthread_mutex_lock(&ipcp->mtx); + + ipcp->state = state; + pthread_cond_broadcast(&ipcp->cond); + + pthread_mutex_unlock(&ipcp->mtx); +} + +int reg_ipcp_wait_boot(struct reg_ipcp * ipcp) +{ + int ret = 0; + struct timespec dl; + struct timespec to = {SOCKET_TIMEOUT / 1000, + (SOCKET_TIMEOUT % 1000) * MILLION}; + + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_mutex_lock(&ipcp->mtx); + + while (ipcp->state == IPCP_BOOT && ret != ETIMEDOUT) + ret = pthread_cond_timedwait(&ipcp->cond, &ipcp->mtx, &dl); + + if (ret == ETIMEDOUT) { + kill(ipcp->pid, SIGTERM); + ipcp->state = IPCP_NULL; + pthread_cond_signal(&ipcp->cond); + } + + if (ipcp->state != IPCP_LIVE) { + pthread_mutex_unlock(&ipcp->mtx); + return -1; + } + + pthread_mutex_unlock(&ipcp->mtx); + + return 0; +} diff --git a/src/irmd/reg/ipcp.h b/src/irmd/reg/ipcp.h new file mode 100644 index 00000000..8ad334cf --- /dev/null +++ b/src/irmd/reg/ipcp.h @@ -0,0 +1,58 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - IPCPs + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_IPCP_H +#define OUROBOROS_IRMD_REG_IPCP_H + +#include <ouroboros/list.h> + +enum ipcp_state { + IPCP_NULL = 0, + IPCP_BOOT, + IPCP_LIVE +}; + +struct reg_ipcp { + struct list_head next; + + char * name; + pid_t pid; + enum ipcp_type type; + enum hash_algo dir_hash_algo; + char * layer; + + enum ipcp_state state; + pthread_cond_t cond; + pthread_mutex_t mtx; +}; + +struct reg_ipcp * reg_ipcp_create(const char * name, + enum ipcp_type type); + +void reg_ipcp_destroy(struct reg_ipcp * i); + +void reg_ipcp_set_state(struct reg_ipcp * i, + enum ipcp_state state); + +int reg_ipcp_wait_boot(struct reg_ipcp * i); + +#endif /* OUROBOROS_IRMD_REG_IPCP_H */
\ No newline at end of file diff --git a/src/irmd/reg/name.c b/src/irmd/reg/name.c new file mode 100644 index 00000000..7e13e888 --- /dev/null +++ b/src/irmd/reg/name.c @@ -0,0 +1,451 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Names + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#define OUROBOROS_PREFIX "reg_name" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/pthread.h> + +#include "name.h" +#include "utils.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <string.h> +#include <signal.h> +#include <unistd.h> +#include <limits.h> +#include <assert.h> + +struct reg_name * reg_name_create(const char * name, + enum pol_balance lb) +{ + pthread_condattr_t cattr; + struct reg_name * n; + + assert(name != NULL); + + n = malloc(sizeof(*n)); + if (n == NULL) + goto fail_malloc; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&n->cond, &cattr)) + goto fail_cond; + + if (pthread_mutex_init(&n->mtx, NULL)) + goto fail_mutex; + + n->name = strdup(name); + if (n->name == NULL) + goto fail_name; + + pthread_condattr_destroy(&cattr); + + list_head_init(&n->next); + list_head_init(&n->reg_progs); + list_head_init(&n->reg_pids); + + n->pol_lb = lb; + n->state = NAME_IDLE; + + return n; + + fail_name: + pthread_mutex_destroy(&n->mtx); + fail_mutex: + pthread_cond_destroy(&n->cond); + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + free(n); + fail_malloc: + return NULL; +} + +static void cancel_reg_name_destroy(void * o) +{ + struct reg_name * name; + struct list_head * p; + struct list_head * h; + + name = (struct reg_name *) o; + + pthread_mutex_unlock(&name->mtx); + + pthread_cond_destroy(&name->cond); + pthread_mutex_destroy(&name->mtx); + + if (name->name != NULL) + free(name->name); + + list_for_each_safe(p, h, &name->reg_pids) { + struct pid_el * pe = list_entry(p, struct pid_el, next); + list_del(&pe->next); + free(pe); + } + + list_for_each_safe(p, h, &name->reg_progs) { + struct str_el * se = list_entry(p, struct str_el, next); + list_del(&se->next); + free(se->str); + free(se); + } + + free(name); +} + +void reg_name_destroy(struct reg_name * name) +{ + if (name == NULL) + return; + + pthread_mutex_lock(&name->mtx); + + if (name->state == NAME_DESTROY) { + pthread_mutex_unlock(&name->mtx); + return; + } + + if (name->state != NAME_FLOW_ACCEPT) + name->state = NAME_NULL; + else + name->state = NAME_DESTROY; + + pthread_cond_broadcast(&name->cond); + + pthread_cleanup_push(cancel_reg_name_destroy, name); + + while (name->state != NAME_NULL) + pthread_cond_wait(&name->cond, &name->mtx); + + pthread_cleanup_pop(true); +} + +static bool reg_name_has_prog(struct reg_name * name, + const char * prog) +{ + struct list_head * p; + + list_for_each(p, &name->reg_progs) { + struct str_el * name = list_entry(p, struct str_el, next); + if (!strcmp(name->str, prog)) + return true; + } + + return false; +} + +int reg_name_add_prog(struct reg_name * name, + struct reg_prog * a) +{ + struct str_el * n; + + if (reg_name_has_prog(name, a->prog)) { + log_warn("Program %s already accepting flows for %s.", + a->prog, name->name); + return 0; + } + + if (!(a->flags & BIND_AUTO)) { + log_dbg("Program %s cannot be auto-instantiated.", a->prog); + return 0; + } + + n = malloc(sizeof(*n)); + if (n == NULL) + return -ENOMEM; + + n->str = strdup(a->prog); + if (n->str == NULL) { + free(n); + return -ENOMEM; + } + + list_add(&n->next, &name->reg_progs); + + pthread_mutex_lock(&name->mtx); + + if (name->state == NAME_IDLE) + name->state = NAME_AUTO_ACCEPT; + + pthread_mutex_unlock(&name->mtx); + + return 0; +} + +void reg_name_del_prog(struct reg_name * name, + const char * prog) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &name->reg_progs) { + struct str_el * se = list_entry(p, struct str_el, next); + if (strcmp(prog, se->str) == 0) { + list_del(&se->next); + free(se->str); + free(se); + } + } + + pthread_mutex_lock(&name->mtx); + + if (name->state == NAME_AUTO_ACCEPT && list_is_empty(&name->reg_progs)) { + name->state = NAME_IDLE; + pthread_cond_broadcast(&name->cond); + } + + pthread_mutex_unlock(&name->mtx); +} + +char * reg_name_get_prog(struct reg_name * name) +{ + if (!list_is_empty(&name->reg_pids) || list_is_empty(&name->reg_progs)) + return NULL; + + return list_first_entry(&name->reg_progs, struct str_el, next)->str; +} + +static bool reg_name_has_pid(struct reg_name * name, + pid_t pid) +{ + struct list_head * p; + + list_for_each(p, &name->reg_progs) { + struct pid_el * name = list_entry(p, struct pid_el, next); + if (name->pid == pid) + return true; + } + + return false; +} + +int reg_name_add_pid(struct reg_name * name, + pid_t pid) +{ + struct pid_el * i; + + assert(name); + + if (reg_name_has_pid(name, pid)) { + log_dbg("Process already registered with this name."); + return -EPERM; + } + + pthread_mutex_lock(&name->mtx); + + if (name->state == NAME_NULL) { + pthread_mutex_unlock(&name->mtx); + log_dbg("Tried to add instance in NULL state."); + return -EPERM; + } + + i = malloc(sizeof(*i)); + if (i == NULL) { + pthread_mutex_unlock(&name->mtx); + return -ENOMEM; + } + + i->pid = pid; + + /* load balancing policy assigns queue order for this process. */ + switch(name->pol_lb) { + case LB_RR: /* Round robin policy. */ + list_add_tail(&i->next, &name->reg_pids); + break; + case LB_SPILL: /* Keep accepting flows on the current process */ + list_add(&i->next, &name->reg_pids); + break; + default: + free(i); + assert(false); + }; + + if (name->state == NAME_IDLE || + name->state == NAME_AUTO_ACCEPT || + name->state == NAME_AUTO_EXEC) { + name->state = NAME_FLOW_ACCEPT; + pthread_cond_broadcast(&name->cond); + } + + pthread_mutex_unlock(&name->mtx); + + return 0; +} + +void reg_name_set_policy(struct reg_name * name, + enum pol_balance lb) +{ + name->pol_lb = lb; +} + +static void reg_name_check_state(struct reg_name * name) +{ + assert(name); + + if (name->state == NAME_DESTROY) { + name->state = NAME_NULL; + pthread_cond_broadcast(&name->cond); + return; + } + + if (list_is_empty(&name->reg_pids)) { + if (!list_is_empty(&name->reg_progs)) + name->state = NAME_AUTO_ACCEPT; + else + name->state = NAME_IDLE; + } else { + name->state = NAME_FLOW_ACCEPT; + } + + pthread_cond_broadcast(&name->cond); +} + +void reg_name_del_pid_el(struct reg_name * name, + struct pid_el * p) +{ + assert(name); + assert(p); + + list_del(&p->next); + free(p); + + reg_name_check_state(name); +} + +void reg_name_del_pid(struct reg_name * name, + pid_t pid) +{ + struct list_head * p; + struct list_head * h; + + assert(name); + + if (name == NULL) + return; + + list_for_each_safe(p, h, &name->reg_pids) { + struct pid_el * a = list_entry(p, struct pid_el, next); + if (a->pid == pid) { + list_del(&a->next); + free(a); + } + } + + reg_name_check_state(name); +} + +pid_t reg_name_get_pid(struct reg_name * name) +{ + if (name == NULL) + return -1; + + if (list_is_empty(&name->reg_pids)) + return -1; + + return list_first_entry(&name->reg_pids, struct pid_el, next)->pid; +} + +enum name_state reg_name_get_state(struct reg_name * name) +{ + enum name_state state; + + assert(name); + + pthread_mutex_lock(&name->mtx); + + state = name->state; + + pthread_mutex_unlock(&name->mtx); + + return state; +} + +int reg_name_set_state(struct reg_name * name, + enum name_state state) +{ + assert(state != NAME_DESTROY); + + pthread_mutex_lock(&name->mtx); + + name->state = state; + pthread_cond_broadcast(&name->cond); + + pthread_mutex_unlock(&name->mtx); + + return 0; +} + +int reg_name_leave_state(struct reg_name * name, + enum name_state state, + struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + + assert(name); + assert(state != NAME_DESTROY); + + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_mutex_lock(&name->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, &name->mtx); + + while (name->state == state && ret != -ETIMEDOUT) + if (timeout) + ret = -pthread_cond_timedwait(&name->cond, + &name->mtx, + &abstime); + else + ret = -pthread_cond_wait(&name->cond, + &name->mtx); + + if (name->state == NAME_DESTROY) { + ret = -1; + name->state = NAME_NULL; + pthread_cond_broadcast(&name->cond); + } + + pthread_cleanup_pop(true); + + return ret; +} diff --git a/src/irmd/reg/name.h b/src/irmd/reg/name.h new file mode 100644 index 00000000..0731782c --- /dev/null +++ b/src/irmd/reg/name.h @@ -0,0 +1,103 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Names + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_NAME_H +#define OUROBOROS_IRMD_REG_NAME_H + +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> +#include <ouroboros/list.h> +#include <ouroboros/irm.h> + +#include "proc.h" +#include "prog.h" + +#include <stdint.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <sys/types.h> + +enum name_state { + NAME_NULL = 0, + NAME_IDLE, + NAME_AUTO_ACCEPT, + NAME_AUTO_EXEC, + NAME_FLOW_ACCEPT, + NAME_FLOW_ARRIVED, + NAME_DESTROY +}; + +/* An entry in the registry */ +struct reg_name { + struct list_head next; + char * name; + + /* Policies for this name. */ + enum pol_balance pol_lb; /* Load balance incoming flows. */ + /* Programs that can be instantiated by the irmd. */ + struct list_head reg_progs; + /* Processes that are listening for this name. */ + struct list_head reg_pids; + + enum name_state state; + pthread_cond_t cond; + pthread_mutex_t mtx; +}; + +struct reg_name * reg_name_create(const char * name, + enum pol_balance lb); + +void reg_name_destroy(struct reg_name * n); + +int reg_name_add_prog(struct reg_name * n, + struct reg_prog * p); + +void reg_name_del_prog(struct reg_name * n, + const char * prog); + +char * reg_name_get_prog(struct reg_name * n); + +int reg_name_add_pid(struct reg_name * n, + pid_t pid); + +void reg_name_del_pid(struct reg_name * n, + pid_t pid); + +void reg_name_del_pid_el(struct reg_name * n, + struct pid_el * p); + +pid_t reg_name_get_pid(struct reg_name * n); + +void reg_name_set_policy(struct reg_name * n, + enum pol_balance lb); + +enum name_state reg_name_get_state(struct reg_name * n); + +int reg_name_set_state(struct reg_name * n, + enum name_state state); + +int reg_name_leave_state(struct reg_name * n, + enum name_state state, + struct timespec * timeout); + +#endif /* OUROBOROS_IRMD_REG_NAME_H */ diff --git a/src/irmd/reg/proc.c b/src/irmd/reg/proc.c new file mode 100644 index 00000000..1aae789d --- /dev/null +++ b/src/irmd/reg/proc.c @@ -0,0 +1,265 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Processes + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include <ouroboros/list.h> +#include <ouroboros/errno.h> +#include <ouroboros/time_utils.h> + +#include "proc.h" +#include "name.h" + +#include <stdlib.h> +#include <unistd.h> +#include <limits.h> +#include <assert.h> + +struct reg_proc * reg_proc_create(pid_t pid, + const char * prog) +{ + struct reg_proc * proc; + pthread_condattr_t cattr; + + assert(prog); + + proc = malloc(sizeof(*proc)); + if (proc == NULL) + goto fail_malloc; + + if (pthread_condattr_init(&cattr)) + goto fail_condattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_mutex_init(&proc->lock, NULL)) + goto fail_mutex; + + if (pthread_cond_init(&proc->cond, &cattr)) + goto fail_cond; + + proc->set = shm_flow_set_create(pid); + if (proc->set == NULL) + goto fail_set; + + proc->prog = strdup(prog); + if(proc->prog == NULL) + goto fail_prog; + + list_head_init(&proc->next); + list_head_init(&proc->names); + + proc->pid = pid; + proc->name = NULL; + proc->state = PROC_INIT; + + return proc; + + fail_prog: + shm_flow_set_destroy(proc->set); + fail_set: + pthread_cond_destroy(&proc->cond);; + fail_cond: + pthread_mutex_destroy(&proc->lock); + fail_mutex: + pthread_condattr_destroy(&cattr); + fail_condattr: + free(proc); + fail_malloc: + return NULL; +} + +static void cancel_reg_proc(void * o) +{ + struct reg_proc * proc = (struct reg_proc *) o; + + proc->state = PROC_NULL; + + pthread_mutex_unlock(&proc->lock); +} + +void reg_proc_destroy(struct reg_proc * proc) +{ + struct list_head * p; + struct list_head * h; + + assert(proc); + + pthread_mutex_lock(&proc->lock); + + if (proc->state == PROC_DESTROY) { + pthread_mutex_unlock(&proc->lock); + return; + } + + if (proc->state == PROC_SLEEP) + proc->state = PROC_DESTROY; + + pthread_cond_signal(&proc->cond); + + pthread_cleanup_push(cancel_reg_proc, proc); + + while (proc->state != PROC_INIT) + pthread_cond_wait(&proc->cond, &proc->lock); + + pthread_cleanup_pop(false); + + pthread_mutex_unlock(&proc->lock); + + shm_flow_set_destroy(proc->set); + + pthread_cond_destroy(&proc->cond); + pthread_mutex_destroy(&proc->lock); + + list_for_each_safe(p, h, &proc->names) { + struct str_el * n = list_entry(p, struct str_el, next); + list_del(&n->next); + if (n->str != NULL) + free(n->str); + free(n); + } + + free(proc->prog); + free(proc); +} + +int reg_proc_add_name(struct reg_proc * proc, + const char * name) +{ + struct str_el * s; + + assert(proc); + assert(name); + + s = malloc(sizeof(*s)); + if (s == NULL) + goto fail_malloc; + + s->str = strdup(name); + if (s->str == NULL) + goto fail_name; + + list_add(&s->next, &proc->names); + + return 0; + + fail_name: + free(s); + fail_malloc: + return -ENOMEM; +} + +void reg_proc_del_name(struct reg_proc * proc, + const char * name) +{ + struct list_head * p = NULL; + struct list_head * h = NULL; + + assert(proc); + assert(name); + + list_for_each_safe(p, h, &proc->names) { + struct str_el * s = list_entry(p, struct str_el, next); + if (!strcmp(name, s->str)) { + list_del(&s->next); + free(s->str); + free(s); + } + } +} + +int reg_proc_sleep(struct reg_proc * proc, + struct timespec * dl) +{ + + int ret = 0; + + assert(proc); + + pthread_mutex_lock(&proc->lock); + + if (proc->state != PROC_WAKE && proc->state != PROC_DESTROY) + proc->state = PROC_SLEEP; + + pthread_cleanup_push(cancel_reg_proc, proc); + + while (proc->state == PROC_SLEEP && ret != -ETIMEDOUT) + if (dl != NULL) + ret = -pthread_cond_timedwait(&proc->cond, + &proc->lock, dl); + else + ret = -pthread_cond_wait(&proc->cond, &proc->lock); + + pthread_cleanup_pop(false); + + if (proc->state == PROC_DESTROY) { + if (proc->name != NULL) + reg_name_del_pid(proc->name, proc->pid); + ret = -1; + } + + proc->state = PROC_INIT; + + pthread_cond_broadcast(&proc->cond); + pthread_mutex_unlock(&proc->lock); + + return ret; +} + +void reg_proc_wake(struct reg_proc * proc, + struct reg_name * name) +{ + assert(proc); + assert(name); + + pthread_mutex_lock(&proc->lock); + + if (proc->state != PROC_SLEEP) { + pthread_mutex_unlock(&proc->lock); + return; + } + + proc->state = PROC_WAKE; + proc->name = name; + + pthread_cond_broadcast(&proc->cond); + + pthread_cleanup_push(cancel_reg_proc, proc); + + while (proc->state == PROC_WAKE) + pthread_cond_wait(&proc->cond, &proc->lock); + + pthread_cleanup_pop(false); + + if (proc->state == PROC_DESTROY) + proc->state = PROC_INIT; + + pthread_mutex_unlock(&proc->lock); +} diff --git a/src/irmd/reg/proc.h b/src/irmd/reg/proc.h new file mode 100644 index 00000000..fb11f34f --- /dev/null +++ b/src/irmd/reg/proc.h @@ -0,0 +1,75 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Processes + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_PROC_H +#define OUROBOROS_IRMD_REG_PROC_H + +#include <ouroboros/shm_flow_set.h> + +#include "utils.h" + +#include <unistd.h> +#include <pthread.h> + +enum proc_state { + PROC_NULL = 0, + PROC_INIT, + PROC_SLEEP, + PROC_WAKE, + PROC_DESTROY +}; + +struct reg_proc { + struct list_head next; + pid_t pid; + char * prog; /* program instantiated */ + struct list_head names; /* names for which process accepts flows */ + struct shm_flow_set * set; + + struct reg_name * name; /* name for which a flow arrived */ + + /* The process will block on this */ + enum proc_state state; + pthread_cond_t cond; + pthread_mutex_t lock; +}; + +struct reg_proc * reg_proc_create(pid_t proc, + const char * prog); + +void reg_proc_destroy(struct reg_proc * proc); + +int reg_proc_sleep(struct reg_proc * proc, + struct timespec * timeo); + +void reg_proc_wake(struct reg_proc * proc, + struct reg_name * name); + +void reg_proc_cancel(struct reg_proc * proc); + +int reg_proc_add_name(struct reg_proc * proc, + const char * name); + +void reg_proc_del_name(struct reg_proc * proc, + const char * name); + +#endif /* OUROBOROS_IRMD_REG_PROC_H */ diff --git a/src/irmd/reg/prog.c b/src/irmd/reg/prog.c new file mode 100644 index 00000000..e3fd6105 --- /dev/null +++ b/src/irmd/reg/prog.c @@ -0,0 +1,174 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Programs + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include <ouroboros/errno.h> +#include <ouroboros/irm.h> +#include <ouroboros/utils.h> + +#include "prog.h" +#include "utils.h" + +#include <assert.h> +#include <stdlib.h> +#include <string.h> + + +static char ** create_argv(const char * prog, + size_t argc, + char ** argv) +{ + char ** argv2; + size_t i; + + argv2 = malloc((argc + 2) * sizeof(*argv2)); /* prog + args + NULL */ + if (argv2 == 0) + goto fail_malloc; + + argv2[0] = strdup(prog); + if (argv2[0] == NULL) + goto fail_prog; + + for (i = 1; i <= argc; ++i) { + argv2[i] = strdup(argv[i - 1]); + if (argv2[i] == NULL) + goto fail_arg; + } + + argv2[argc + 1] = NULL; + + return argv2; + + fail_arg: + argvfree(argv2); + fail_prog: + free(argv2); + fail_malloc: + return NULL; +} + +struct reg_prog * reg_prog_create(const char * prog, + uint32_t flags, + int argc, + char ** argv) +{ + struct reg_prog * p; + + assert(prog); + + p = malloc(sizeof(*p)); + if (p == NULL) + goto fail_malloc; + + memset(p, 0, sizeof(*p)); + + p->prog = strdup(path_strip(prog)); + if (p->prog == NULL) + goto fail_prog; + + if (argc > 0 && flags & BIND_AUTO) { + p->argv = create_argv(prog, argc, argv); + if (p->argv == NULL) + goto fail_argv; + } + + list_head_init(&p->next); + list_head_init(&p->names); + + p->flags = flags; + + return p; + + fail_argv: + free(p->prog); + fail_prog: + free(p); + fail_malloc: + return NULL; +} + +void reg_prog_destroy(struct reg_prog * prog) +{ + struct list_head * p; + struct list_head * h; + + if (prog == NULL) + return; + + list_for_each_safe(p, h, &prog->names) { + struct str_el * s = list_entry(p, struct str_el, next); + list_del(&s->next); + free(s->str); + free(s); + } + + argvfree(prog->argv); + free(prog->prog); + free(prog); +} + +int reg_prog_add_name(struct reg_prog * prog, + const char * name) +{ + struct str_el * s; + + if (prog == NULL || name == NULL) + return -EINVAL; + + s = malloc(sizeof(*s)); + if (s == NULL) + goto fail_malloc; + + s->str = strdup(name); + if(s->str == NULL) + goto fail_name; + + list_add(&s->next, &prog->names); + + return 0; + + fail_name: + free(s); + fail_malloc: + return -ENOMEM; +} + +void reg_prog_del_name(struct reg_prog * prog, + const char * name) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &prog->names) { + struct str_el * s = list_entry(p, struct str_el, next); + if (!strcmp(name, s->str)) { + list_del(&s->next); + free(s->str); + free(s); + } + } +} diff --git a/src/irmd/reg/prog.h b/src/irmd/reg/prog.h new file mode 100644 index 00000000..c45f2d8c --- /dev/null +++ b/src/irmd/reg/prog.h @@ -0,0 +1,52 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Programs + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_PROG_H +#define OUROBOROS_IRMD_REG_PROG_H + +#include <ouroboros/list.h> + +#include <unistd.h> +#include <stdint.h> + +struct reg_prog { + struct list_head next; + char * prog; /* name of binary */ + uint32_t flags; + char ** argv; + struct list_head names; /* names that all instances will listen for */ +}; + +struct reg_prog * reg_prog_create(const char * prog, + uint32_t flags, + int argc, + char ** argv); + +void reg_prog_destroy(struct reg_prog * prog); + +int reg_prog_add_name(struct reg_prog * prog, + const char * name); + +void reg_prog_del_name(struct reg_prog * prog, + const char * name); + +#endif /* OUROBOROS_IRMD_REG_PROG_H */ |