diff options
Diffstat (limited to 'src/irmd/reg')
| -rw-r--r-- | src/irmd/reg/flow.c | 222 | ||||
| -rw-r--r-- | src/irmd/reg/flow.h | 83 | ||||
| -rw-r--r-- | src/irmd/reg/ipcp.c | 152 | ||||
| -rw-r--r-- | src/irmd/reg/ipcp.h | 58 | ||||
| -rw-r--r-- | src/irmd/reg/name.c | 451 | ||||
| -rw-r--r-- | src/irmd/reg/name.h | 103 | ||||
| -rw-r--r-- | src/irmd/reg/proc.c | 265 | ||||
| -rw-r--r-- | src/irmd/reg/proc.h | 75 | ||||
| -rw-r--r-- | src/irmd/reg/prog.c | 174 | ||||
| -rw-r--r-- | src/irmd/reg/prog.h | 52 | 
10 files changed, 1635 insertions, 0 deletions
| diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c new file mode 100644 index 00000000..30b9c504 --- /dev/null +++ b/src/irmd/reg/flow.c @@ -0,0 +1,222 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Flows + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#define OUROBOROS_PREFIX "reg-flow" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/pthread.h> + +#include "flow.h" + +#include <assert.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +struct reg_flow * reg_flow_create(pid_t     n_pid, +                                  pid_t     n_1_pid, +                                  int       flow_id, +                                  qosspec_t qs) +{ +        pthread_condattr_t cattr; +        struct reg_flow *  f; + +        f = malloc(sizeof(*f)); +        if (f == NULL) +                goto fail_malloc; + +        memset(f, 0, sizeof(*f)); + +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&f->cond, &cattr)) +                goto fail_cond; + +        if (pthread_mutex_init(&f->mtx, NULL)) +                goto fail_mutex; + +        f->n_rb = shm_rbuff_create(n_pid, flow_id); +        if (f->n_rb == NULL) { +                log_err("Could not create ringbuffer for process %d.", n_pid); +                goto fail_n_rbuff; +        } + +        f->n_1_rb = shm_rbuff_create(n_1_pid, flow_id); +        if (f->n_1_rb == NULL) { +                log_err("Could not create ringbuffer for process %d.", n_1_pid); +                goto fail_n_1_rbuff; +        } + +        if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) +                log_warn("Failed to set timestamp."); + +        pthread_condattr_destroy(&cattr); + +        f->n_pid   = n_pid; +        f->n_1_pid = n_1_pid; +        f->flow_id = flow_id; +        f->qs      = qs; + +        f->state = FLOW_ALLOC_PENDING; + +        return f; + + fail_n_1_rbuff: +        shm_rbuff_destroy(f->n_rb); + fail_n_rbuff: +        pthread_mutex_destroy(&f->mtx); + fail_mutex: +        pthread_cond_destroy(&f->cond); + fail_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        free(f); + fail_malloc: +        return NULL; +} + +static void cancel_irm_destroy(void * o) +{ +        struct reg_flow * f = (struct reg_flow *) o; + +        pthread_mutex_unlock(&f->mtx); + +        pthread_cond_destroy(&f->cond); +        pthread_mutex_destroy(&f->mtx); + +        shm_rbuff_destroy(f->n_rb); +        shm_rbuff_destroy(f->n_1_rb); + +        free(f); +} + +void reg_flow_destroy(struct reg_flow * f) +{ +        assert(f); + +        pthread_mutex_lock(&f->mtx); + +        assert(f->data.len == 0); + +        if (f->state == FLOW_DESTROY) { +                pthread_mutex_unlock(&f->mtx); +                return; +        } + +        if (f->state == FLOW_ALLOC_PENDING) +                f->state = FLOW_DESTROY; +        else +                f->state = FLOW_NULL; + +        pthread_cond_broadcast(&f->cond); + +        pthread_cleanup_push(cancel_irm_destroy, f); + +        while (f->state != FLOW_NULL) +                pthread_cond_wait(&f->cond, &f->mtx); + +        pthread_cleanup_pop(true); +} + +enum flow_state reg_flow_get_state(struct reg_flow * f) +{ +        enum flow_state state; + +        assert(f); + +        pthread_mutex_lock(&f->mtx); + +        state = f->state; + +        pthread_mutex_unlock(&f->mtx); + +        return state; +} + +void reg_flow_set_state(struct reg_flow * f, +                        enum flow_state   state) +{ +        assert(f); +        assert(state != FLOW_DESTROY); + +        pthread_mutex_lock(&f->mtx); + +        f->state = state; +        pthread_cond_broadcast(&f->cond); + +        pthread_mutex_unlock(&f->mtx); +} + +int reg_flow_wait_state(struct reg_flow * f, +                        enum flow_state   state, +                        struct timespec * dl) +{ +        int ret = 0; +        int s; + +        assert(f); +        assert(state != FLOW_NULL); +        assert(state != FLOW_DESTROY); +        assert(state != FLOW_DEALLOC_PENDING); + +        pthread_mutex_lock(&f->mtx); + +        assert(f->state != FLOW_NULL); + +        pthread_cleanup_push(__cleanup_mutex_unlock, &f->mtx); + +        while (!(f->state == state || +                 f->state == FLOW_DESTROY || +                 f->state == FLOW_DEALLOC_PENDING) && +               ret != -ETIMEDOUT) { +                if (dl != NULL) +                        ret = -pthread_cond_timedwait(&f->cond, +                                                      &f->mtx, +                                                      dl); +                else +                        ret = -pthread_cond_wait(&f->cond, +                                                 &f->mtx); +        } + +        if (f->state == FLOW_DESTROY || +            f->state == FLOW_DEALLOC_PENDING || +            ret == -ETIMEDOUT) { +                f->state = FLOW_NULL; +                pthread_cond_broadcast(&f->cond); +        } + +        s = f->state; + +        pthread_cleanup_pop(true); + +        return ret ? ret : s; +} diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h new file mode 100644 index 00000000..9af15032 --- /dev/null +++ b/src/irmd/reg/flow.h @@ -0,0 +1,83 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Flows + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_FLOW_H +#define OUROBOROS_IRMD_REG_FLOW_H + +#include <ouroboros/list.h> +#include <ouroboros/qos.h> +#include <ouroboros/shm_rbuff.h> +#include <ouroboros/utils.h> + +#include <sys/types.h> +#include <pthread.h> +#include <time.h> + +enum flow_state { +        FLOW_NULL = 0, +        FLOW_ALLOC_PENDING, +        FLOW_ALLOC_REQ_PENDING, +        FLOW_ALLOCATED, +        FLOW_DEALLOC_PENDING, +        FLOW_DESTROY +}; + +struct reg_flow { +        struct list_head   next; + +        int                flow_id; + +        pid_t              n_pid; +        pid_t              n_1_pid; + +        qosspec_t          qs; +        time_t             mpl; +        buffer_t           data; + +        struct shm_rbuff * n_rb; +        struct shm_rbuff * n_1_rb; + +        struct timespec    t0; + +        enum flow_state    state; +        pthread_cond_t     cond; +        pthread_mutex_t    mtx; +}; + +struct reg_flow * reg_flow_create(pid_t     n_pid, +                                  pid_t     n_1_pid, +                                  int       flow_id, +                                  qosspec_t qs); + +void              reg_flow_destroy(struct reg_flow * f); + +enum flow_state   reg_flow_get_state(struct reg_flow * f); + + +void              reg_flow_set_state(struct reg_flow * f, +                                     enum flow_state   state); + +int               reg_flow_wait_state(struct reg_flow * f, +                                      enum flow_state   state, +                                      struct timespec * timeo); + +#endif /* OUROBOROS_IRMD_REG_FLOW_H */ diff --git a/src/irmd/reg/ipcp.c b/src/irmd/reg/ipcp.c new file mode 100644 index 00000000..62505871 --- /dev/null +++ b/src/irmd/reg/ipcp.c @@ -0,0 +1,152 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - IPCPs + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> +#include <ouroboros/pthread.h> +#include <ouroboros/time_utils.h> + +#include "ipcp.h" + +#include <assert.h> +#include <signal.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +struct reg_ipcp * reg_ipcp_create(const char *   name, +                                  enum ipcp_type type) +{ +        struct reg_ipcp *  ipcp; +        pthread_condattr_t cattr; + +        ipcp = malloc(sizeof(*ipcp)); +        if (ipcp == NULL) +                goto fail_malloc; + +        if (pthread_mutex_init(&ipcp->mtx, NULL)) +                goto fail_mutex; + +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&ipcp->cond, &cattr)) +                goto fail_cond; + +        ipcp->name = strdup(name); +        if (ipcp->name == NULL) +                goto fail_name; + +        pthread_condattr_destroy(&cattr); + +        ipcp->layer = NULL; +        ipcp->type  = type; +        ipcp->state = IPCP_BOOT; + +        list_head_init(&ipcp->next); + +        return ipcp; + + fail_name: +        pthread_cond_destroy(&ipcp->cond); + fail_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        pthread_mutex_destroy(&ipcp->mtx); + fail_mutex: +        free(ipcp); + fail_malloc: +        return NULL; +} + +void reg_ipcp_destroy(struct reg_ipcp * ipcp) +{ +        assert(ipcp); + +        pthread_mutex_lock(&ipcp->mtx); + +        while (ipcp->state == IPCP_BOOT) +                pthread_cond_wait(&ipcp->cond, &ipcp->mtx); + +        free(ipcp->layer); +        free(ipcp->name); + +        pthread_mutex_unlock(&ipcp->mtx); + +        pthread_cond_destroy(&ipcp->cond); +        pthread_mutex_destroy(&ipcp->mtx); + +        free(ipcp); +} + +void reg_ipcp_set_state(struct reg_ipcp * ipcp, +                        enum ipcp_state   state) +{ +        pthread_mutex_lock(&ipcp->mtx); + +        ipcp->state = state; +        pthread_cond_broadcast(&ipcp->cond); + +        pthread_mutex_unlock(&ipcp->mtx); +} + +int reg_ipcp_wait_boot(struct reg_ipcp * ipcp) +{ +        int             ret = 0; +        struct timespec dl; +        struct timespec to = {SOCKET_TIMEOUT / 1000, +                              (SOCKET_TIMEOUT % 1000) * MILLION}; + +        clock_gettime(PTHREAD_COND_CLOCK, &dl); +        ts_add(&dl, &to, &dl); + +        pthread_mutex_lock(&ipcp->mtx); + +        while (ipcp->state == IPCP_BOOT && ret != ETIMEDOUT) +                ret = pthread_cond_timedwait(&ipcp->cond, &ipcp->mtx, &dl); + +        if (ret == ETIMEDOUT) { +                kill(ipcp->pid, SIGTERM); +                ipcp->state = IPCP_NULL; +                pthread_cond_signal(&ipcp->cond); +        } + +        if (ipcp->state != IPCP_LIVE) { +                pthread_mutex_unlock(&ipcp->mtx); +                return -1; +        } + +        pthread_mutex_unlock(&ipcp->mtx); + +        return 0; +} diff --git a/src/irmd/reg/ipcp.h b/src/irmd/reg/ipcp.h new file mode 100644 index 00000000..8ad334cf --- /dev/null +++ b/src/irmd/reg/ipcp.h @@ -0,0 +1,58 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - IPCPs + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_IPCP_H +#define OUROBOROS_IRMD_REG_IPCP_H + +#include <ouroboros/list.h> + +enum ipcp_state { +        IPCP_NULL = 0, +        IPCP_BOOT, +        IPCP_LIVE +}; + +struct reg_ipcp { +        struct list_head next; + +        char *           name; +        pid_t            pid; +        enum ipcp_type   type; +        enum hash_algo   dir_hash_algo; +        char *           layer; + +        enum ipcp_state  state; +        pthread_cond_t   cond; +        pthread_mutex_t  mtx; +}; + +struct reg_ipcp * reg_ipcp_create(const char *   name, +                                  enum ipcp_type type); + +void              reg_ipcp_destroy(struct reg_ipcp * i); + +void              reg_ipcp_set_state(struct reg_ipcp * i, +                                     enum ipcp_state   state); + +int               reg_ipcp_wait_boot(struct reg_ipcp * i); + +#endif /* OUROBOROS_IRMD_REG_IPCP_H */
\ No newline at end of file diff --git a/src/irmd/reg/name.c b/src/irmd/reg/name.c new file mode 100644 index 00000000..7e13e888 --- /dev/null +++ b/src/irmd/reg/name.c @@ -0,0 +1,451 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Names + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#define OUROBOROS_PREFIX "reg_name" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/pthread.h> + +#include "name.h" +#include "utils.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <string.h> +#include <signal.h> +#include <unistd.h> +#include <limits.h> +#include <assert.h> + +struct reg_name * reg_name_create(const char *     name, +                                  enum pol_balance lb) +{ +        pthread_condattr_t cattr; +        struct reg_name *  n; + +        assert(name != NULL); + +        n = malloc(sizeof(*n)); +        if (n == NULL) +                goto fail_malloc; + +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&n->cond, &cattr)) +                goto fail_cond; + +        if (pthread_mutex_init(&n->mtx, NULL)) +                goto fail_mutex; + +        n->name = strdup(name); +        if (n->name == NULL) +                goto fail_name; + +        pthread_condattr_destroy(&cattr); + +        list_head_init(&n->next); +        list_head_init(&n->reg_progs); +        list_head_init(&n->reg_pids); + +        n->pol_lb = lb; +        n->state = NAME_IDLE; + +        return n; + + fail_name: +        pthread_mutex_destroy(&n->mtx); + fail_mutex: +        pthread_cond_destroy(&n->cond); + fail_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        free(n); + fail_malloc: +        return NULL; +} + +static void cancel_reg_name_destroy(void * o) +{ +        struct reg_name * name; +        struct list_head * p; +        struct list_head * h; + +        name = (struct reg_name *) o; + +        pthread_mutex_unlock(&name->mtx); + +        pthread_cond_destroy(&name->cond); +        pthread_mutex_destroy(&name->mtx); + +        if (name->name != NULL) +                free(name->name); + +        list_for_each_safe(p, h, &name->reg_pids) { +                struct pid_el * pe = list_entry(p, struct pid_el, next); +                list_del(&pe->next); +                free(pe); +        } + +        list_for_each_safe(p, h, &name->reg_progs) { +                struct str_el * se = list_entry(p, struct str_el, next); +                list_del(&se->next); +                free(se->str); +                free(se); +        } + +        free(name); +} + +void reg_name_destroy(struct reg_name * name) +{ +        if (name == NULL) +                return; + +        pthread_mutex_lock(&name->mtx); + +        if (name->state == NAME_DESTROY) { +                pthread_mutex_unlock(&name->mtx); +                return; +        } + +        if (name->state != NAME_FLOW_ACCEPT) +                name->state = NAME_NULL; +        else +                name->state = NAME_DESTROY; + +        pthread_cond_broadcast(&name->cond); + +        pthread_cleanup_push(cancel_reg_name_destroy, name); + +        while (name->state != NAME_NULL) +                pthread_cond_wait(&name->cond, &name->mtx); + +        pthread_cleanup_pop(true); +} + +static bool reg_name_has_prog(struct reg_name * name, +                              const char *      prog) +{ +        struct list_head * p; + +        list_for_each(p, &name->reg_progs) { +                struct str_el * name = list_entry(p, struct str_el, next); +                if (!strcmp(name->str, prog)) +                        return true; +        } + +        return false; +} + +int reg_name_add_prog(struct reg_name * name, +                      struct reg_prog * a) +{ +        struct str_el * n; + +        if (reg_name_has_prog(name, a->prog)) { +                log_warn("Program %s already accepting flows for %s.", +                         a->prog, name->name); +                return 0; +        } + +        if (!(a->flags & BIND_AUTO)) { +                log_dbg("Program %s cannot be auto-instantiated.", a->prog); +                return 0; +        } + +        n = malloc(sizeof(*n)); +        if (n == NULL) +                return -ENOMEM; + +        n->str = strdup(a->prog); +        if (n->str == NULL) { +                free(n); +                return -ENOMEM; +        } + +        list_add(&n->next, &name->reg_progs); + +        pthread_mutex_lock(&name->mtx); + +        if (name->state == NAME_IDLE) +                name->state = NAME_AUTO_ACCEPT; + +        pthread_mutex_unlock(&name->mtx); + +        return 0; +} + +void reg_name_del_prog(struct reg_name * name, +                       const char *      prog) +{ +        struct list_head * p; +        struct list_head * h; + +        list_for_each_safe(p, h, &name->reg_progs) { +                struct str_el * se = list_entry(p, struct str_el, next); +                if (strcmp(prog, se->str) == 0) { +                        list_del(&se->next); +                        free(se->str); +                        free(se); +                } +        } + +        pthread_mutex_lock(&name->mtx); + +        if (name->state == NAME_AUTO_ACCEPT && list_is_empty(&name->reg_progs)) { +                name->state = NAME_IDLE; +                pthread_cond_broadcast(&name->cond); +        } + +        pthread_mutex_unlock(&name->mtx); +} + +char * reg_name_get_prog(struct reg_name * name) +{ +        if (!list_is_empty(&name->reg_pids) || list_is_empty(&name->reg_progs)) +                return NULL; + +        return list_first_entry(&name->reg_progs, struct str_el, next)->str; +} + +static bool reg_name_has_pid(struct reg_name * name, +                             pid_t             pid) +{ +        struct list_head * p; + +        list_for_each(p, &name->reg_progs) { +                struct pid_el * name = list_entry(p, struct pid_el, next); +                if (name->pid == pid) +                        return true; +        } + +        return false; +} + +int reg_name_add_pid(struct reg_name * name, +                     pid_t             pid) +{ +        struct pid_el * i; + +        assert(name); + +        if (reg_name_has_pid(name, pid)) { +                log_dbg("Process already registered with this name."); +                return -EPERM; +        } + +        pthread_mutex_lock(&name->mtx); + +        if (name->state == NAME_NULL) { +                pthread_mutex_unlock(&name->mtx); +                log_dbg("Tried to add instance in NULL state."); +                return -EPERM; +        } + +        i = malloc(sizeof(*i)); +        if (i == NULL) { +                pthread_mutex_unlock(&name->mtx); +                return -ENOMEM; +        } + +        i->pid = pid; + +        /* load balancing policy assigns queue order for this process. */ +        switch(name->pol_lb) { +        case LB_RR:    /* Round robin policy. */ +                list_add_tail(&i->next, &name->reg_pids); +                break; +        case LB_SPILL: /* Keep accepting flows on the current process */ +                list_add(&i->next, &name->reg_pids); +                break; +        default: +                free(i); +                assert(false); +        }; + +        if (name->state == NAME_IDLE || +            name->state == NAME_AUTO_ACCEPT || +            name->state == NAME_AUTO_EXEC) { +                name->state = NAME_FLOW_ACCEPT; +                pthread_cond_broadcast(&name->cond); +        } + +        pthread_mutex_unlock(&name->mtx); + +        return 0; +} + +void reg_name_set_policy(struct reg_name * name, +                         enum pol_balance  lb) +{ +        name->pol_lb = lb; +} + +static void reg_name_check_state(struct reg_name * name) +{ +        assert(name); + +        if (name->state == NAME_DESTROY) { +                name->state = NAME_NULL; +                pthread_cond_broadcast(&name->cond); +                return; +        } + +        if (list_is_empty(&name->reg_pids)) { +                if (!list_is_empty(&name->reg_progs)) +                        name->state = NAME_AUTO_ACCEPT; +                else +                        name->state = NAME_IDLE; +        } else { +                name->state = NAME_FLOW_ACCEPT; +        } + +        pthread_cond_broadcast(&name->cond); +} + +void reg_name_del_pid_el(struct reg_name * name, +                         struct pid_el *   p) +{ +        assert(name); +        assert(p); + +        list_del(&p->next); +        free(p); + +        reg_name_check_state(name); +} + +void reg_name_del_pid(struct reg_name * name, +                      pid_t             pid) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(name); + +        if (name == NULL) +                return; + +        list_for_each_safe(p, h, &name->reg_pids) { +                struct pid_el * a = list_entry(p, struct pid_el, next); +                if (a->pid == pid) { +                        list_del(&a->next); +                        free(a); +                } +        } + +        reg_name_check_state(name); +} + +pid_t reg_name_get_pid(struct reg_name * name) +{ +        if (name == NULL) +                return -1; + +        if (list_is_empty(&name->reg_pids)) +                return -1; + +        return list_first_entry(&name->reg_pids, struct pid_el, next)->pid; +} + +enum name_state reg_name_get_state(struct reg_name * name) +{ +        enum name_state state; + +        assert(name); + +        pthread_mutex_lock(&name->mtx); + +        state = name->state; + +        pthread_mutex_unlock(&name->mtx); + +        return state; +} + +int reg_name_set_state(struct reg_name * name, +                       enum name_state   state) +{ +        assert(state != NAME_DESTROY); + +        pthread_mutex_lock(&name->mtx); + +        name->state = state; +        pthread_cond_broadcast(&name->cond); + +        pthread_mutex_unlock(&name->mtx); + +        return 0; +} + +int reg_name_leave_state(struct reg_name * name, +                         enum name_state   state, +                         struct timespec * timeout) +{ +        struct timespec abstime; +        int ret = 0; + +        assert(name); +        assert(state != NAME_DESTROY); + +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_mutex_lock(&name->mtx); + +        pthread_cleanup_push(__cleanup_mutex_unlock, &name->mtx); + +        while (name->state == state && ret != -ETIMEDOUT) +                if (timeout) +                        ret = -pthread_cond_timedwait(&name->cond, +                                                      &name->mtx, +                                                      &abstime); +                else +                        ret = -pthread_cond_wait(&name->cond, +                                                 &name->mtx); + +        if (name->state == NAME_DESTROY) { +                ret = -1; +                name->state = NAME_NULL; +                pthread_cond_broadcast(&name->cond); +        } + +        pthread_cleanup_pop(true); + +        return ret; +} diff --git a/src/irmd/reg/name.h b/src/irmd/reg/name.h new file mode 100644 index 00000000..0731782c --- /dev/null +++ b/src/irmd/reg/name.h @@ -0,0 +1,103 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Names + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_NAME_H +#define OUROBOROS_IRMD_REG_NAME_H + +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> +#include <ouroboros/list.h> +#include <ouroboros/irm.h> + +#include "proc.h" +#include "prog.h" + +#include <stdint.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <sys/types.h> + +enum name_state { +        NAME_NULL = 0, +        NAME_IDLE, +        NAME_AUTO_ACCEPT, +        NAME_AUTO_EXEC, +        NAME_FLOW_ACCEPT, +        NAME_FLOW_ARRIVED, +        NAME_DESTROY +}; + +/* An entry in the registry */ +struct reg_name { +        struct list_head    next; +        char *              name; + +        /* Policies for this name. */ +        enum pol_balance    pol_lb;  /* Load balance incoming flows. */ +        /* Programs that can be instantiated by the irmd. */ +        struct list_head    reg_progs; +        /* Processes that are listening for this name. */ +        struct list_head    reg_pids; + +        enum name_state     state; +        pthread_cond_t      cond; +        pthread_mutex_t     mtx; +}; + +struct reg_name * reg_name_create(const char *     name, +                                  enum pol_balance lb); + +void              reg_name_destroy(struct reg_name * n); + +int               reg_name_add_prog(struct reg_name * n, +                                    struct reg_prog * p); + +void              reg_name_del_prog(struct reg_name * n, +                                    const char *      prog); + +char *            reg_name_get_prog(struct reg_name * n); + +int               reg_name_add_pid(struct reg_name * n, +                                   pid_t             pid); + +void              reg_name_del_pid(struct reg_name * n, +                                   pid_t             pid); + +void              reg_name_del_pid_el(struct reg_name * n, +                                      struct pid_el *   p); + +pid_t             reg_name_get_pid(struct reg_name * n); + +void              reg_name_set_policy(struct reg_name * n, +                                      enum pol_balance  lb); + +enum name_state   reg_name_get_state(struct reg_name * n); + +int               reg_name_set_state(struct reg_name * n, +                                     enum name_state   state); + +int               reg_name_leave_state(struct reg_name * n, +                                       enum name_state   state, +                                       struct timespec * timeout); + +#endif /* OUROBOROS_IRMD_REG_NAME_H */ diff --git a/src/irmd/reg/proc.c b/src/irmd/reg/proc.c new file mode 100644 index 00000000..1aae789d --- /dev/null +++ b/src/irmd/reg/proc.c @@ -0,0 +1,265 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Processes + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include <ouroboros/list.h> +#include <ouroboros/errno.h> +#include <ouroboros/time_utils.h> + +#include "proc.h" +#include "name.h" + +#include <stdlib.h> +#include <unistd.h> +#include <limits.h> +#include <assert.h> + +struct reg_proc * reg_proc_create(pid_t        pid, +                                  const char * prog) +{ +        struct reg_proc *  proc; +        pthread_condattr_t cattr; + +        assert(prog); + +        proc = malloc(sizeof(*proc)); +        if (proc == NULL) +                goto fail_malloc; + +        if (pthread_condattr_init(&cattr)) +                goto fail_condattr; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + +        if (pthread_mutex_init(&proc->lock, NULL)) +                goto fail_mutex; + +        if (pthread_cond_init(&proc->cond, &cattr)) +                goto fail_cond; + +        proc->set = shm_flow_set_create(pid); +        if (proc->set == NULL) +                goto fail_set; + +        proc->prog = strdup(prog); +        if(proc->prog == NULL) +                goto fail_prog; + +        list_head_init(&proc->next); +        list_head_init(&proc->names); + +        proc->pid      = pid; +        proc->name     = NULL; +        proc->state    = PROC_INIT; + +        return proc; + + fail_prog: +        shm_flow_set_destroy(proc->set); + fail_set: +        pthread_cond_destroy(&proc->cond);; + fail_cond: +        pthread_mutex_destroy(&proc->lock); + fail_mutex: +        pthread_condattr_destroy(&cattr); + fail_condattr: +        free(proc); + fail_malloc: +        return NULL; +} + +static void cancel_reg_proc(void * o) +{ +        struct reg_proc * proc = (struct reg_proc *) o; + +        proc->state = PROC_NULL; + +        pthread_mutex_unlock(&proc->lock); +} + +void reg_proc_destroy(struct reg_proc * proc) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(proc); + +        pthread_mutex_lock(&proc->lock); + +        if (proc->state == PROC_DESTROY) { +                pthread_mutex_unlock(&proc->lock); +                return; +        } + +        if (proc->state == PROC_SLEEP) +                proc->state = PROC_DESTROY; + +        pthread_cond_signal(&proc->cond); + +        pthread_cleanup_push(cancel_reg_proc, proc); + +        while (proc->state != PROC_INIT) +                pthread_cond_wait(&proc->cond, &proc->lock); + +        pthread_cleanup_pop(false); + +        pthread_mutex_unlock(&proc->lock); + +        shm_flow_set_destroy(proc->set); + +        pthread_cond_destroy(&proc->cond); +        pthread_mutex_destroy(&proc->lock); + +        list_for_each_safe(p, h, &proc->names) { +                struct str_el * n = list_entry(p, struct str_el, next); +                list_del(&n->next); +                if (n->str != NULL) +                        free(n->str); +                free(n); +        } + +        free(proc->prog); +        free(proc); +} + +int reg_proc_add_name(struct reg_proc * proc, +                      const char *      name) +{ +        struct str_el * s; + +        assert(proc); +        assert(name); + +        s = malloc(sizeof(*s)); +        if (s == NULL) +                goto fail_malloc; + +        s->str = strdup(name); +        if (s->str == NULL) +                goto fail_name; + +        list_add(&s->next, &proc->names); + +        return 0; + + fail_name: +        free(s); + fail_malloc: +        return -ENOMEM; +} + +void reg_proc_del_name(struct reg_proc * proc, +                       const char *      name) +{ +        struct list_head * p = NULL; +        struct list_head * h = NULL; + +        assert(proc); +        assert(name); + +        list_for_each_safe(p, h, &proc->names) { +                struct str_el * s = list_entry(p, struct str_el, next); +                if (!strcmp(name, s->str)) { +                        list_del(&s->next); +                        free(s->str); +                        free(s); +                } +        } +} + +int reg_proc_sleep(struct reg_proc * proc, +                   struct timespec * dl) +{ + +        int ret = 0; + +        assert(proc); + +        pthread_mutex_lock(&proc->lock); + +        if (proc->state != PROC_WAKE && proc->state != PROC_DESTROY) +                proc->state = PROC_SLEEP; + +        pthread_cleanup_push(cancel_reg_proc, proc); + +        while (proc->state == PROC_SLEEP && ret != -ETIMEDOUT) +                if (dl != NULL) +                        ret = -pthread_cond_timedwait(&proc->cond, +                                                      &proc->lock, dl); +                else +                        ret = -pthread_cond_wait(&proc->cond, &proc->lock); + +        pthread_cleanup_pop(false); + +        if (proc->state == PROC_DESTROY) { +                if (proc->name != NULL) +                        reg_name_del_pid(proc->name, proc->pid); +                ret = -1; +        } + +        proc->state = PROC_INIT; + +        pthread_cond_broadcast(&proc->cond); +        pthread_mutex_unlock(&proc->lock); + +        return ret; +} + +void reg_proc_wake(struct reg_proc * proc, +                   struct reg_name * name) +{ +        assert(proc); +        assert(name); + +        pthread_mutex_lock(&proc->lock); + +        if (proc->state != PROC_SLEEP) { +                pthread_mutex_unlock(&proc->lock); +                return; +        } + +        proc->state = PROC_WAKE; +        proc->name  = name; + +        pthread_cond_broadcast(&proc->cond); + +        pthread_cleanup_push(cancel_reg_proc, proc); + +        while (proc->state == PROC_WAKE) +                pthread_cond_wait(&proc->cond, &proc->lock); + +        pthread_cleanup_pop(false); + +        if (proc->state == PROC_DESTROY) +                proc->state = PROC_INIT; + +        pthread_mutex_unlock(&proc->lock); +} diff --git a/src/irmd/reg/proc.h b/src/irmd/reg/proc.h new file mode 100644 index 00000000..fb11f34f --- /dev/null +++ b/src/irmd/reg/proc.h @@ -0,0 +1,75 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Processes + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_PROC_H +#define OUROBOROS_IRMD_REG_PROC_H + +#include <ouroboros/shm_flow_set.h> + +#include "utils.h" + +#include <unistd.h> +#include <pthread.h> + +enum proc_state { +        PROC_NULL = 0, +        PROC_INIT, +        PROC_SLEEP, +        PROC_WAKE, +        PROC_DESTROY +}; + +struct reg_proc { +        struct list_head      next; +        pid_t                 pid; +        char *                prog;  /* program instantiated */ +        struct list_head      names; /* names for which process accepts flows */ +        struct shm_flow_set * set; + +        struct reg_name *     name;  /* name for which a flow arrived */ + +        /* The process will block on this */ +        enum proc_state       state; +        pthread_cond_t        cond; +        pthread_mutex_t       lock; +}; + +struct reg_proc * reg_proc_create(pid_t        proc, +                                  const char * prog); + +void              reg_proc_destroy(struct reg_proc * proc); + +int               reg_proc_sleep(struct reg_proc * proc, +                                 struct timespec * timeo); + +void              reg_proc_wake(struct reg_proc * proc, +                                struct reg_name * name); + +void              reg_proc_cancel(struct reg_proc * proc); + +int               reg_proc_add_name(struct reg_proc * proc, +                                    const char *      name); + +void              reg_proc_del_name(struct reg_proc * proc, +                                    const char *      name); + +#endif /* OUROBOROS_IRMD_REG_PROC_H */ diff --git a/src/irmd/reg/prog.c b/src/irmd/reg/prog.c new file mode 100644 index 00000000..e3fd6105 --- /dev/null +++ b/src/irmd/reg/prog.c @@ -0,0 +1,174 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Programs + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include <ouroboros/errno.h> +#include <ouroboros/irm.h> +#include <ouroboros/utils.h> + +#include "prog.h" +#include "utils.h" + +#include <assert.h> +#include <stdlib.h> +#include <string.h> + + +static char ** create_argv(const char *  prog, +                           size_t        argc, +                           char **       argv) +{ +        char ** argv2; +        size_t  i; + +        argv2 = malloc((argc + 2) * sizeof(*argv2)); /* prog + args + NULL */ +        if (argv2 == 0) +                goto fail_malloc; + +        argv2[0] = strdup(prog); +        if (argv2[0] == NULL) +                goto fail_prog; + +        for (i = 1; i <= argc; ++i) { +                argv2[i] = strdup(argv[i - 1]); +                if (argv2[i] == NULL) +                        goto fail_arg; +        } + +        argv2[argc + 1] = NULL; + +        return argv2; + + fail_arg: +        argvfree(argv2); + fail_prog: +        free(argv2); + fail_malloc: +        return NULL; +} + +struct reg_prog * reg_prog_create(const char * prog, +                                  uint32_t     flags, +                                  int          argc, +                                  char **      argv) +{ +        struct reg_prog * p; + +        assert(prog); + +        p = malloc(sizeof(*p)); +        if (p == NULL) +                goto fail_malloc; + +        memset(p, 0, sizeof(*p)); + +        p->prog  = strdup(path_strip(prog)); +        if (p->prog == NULL) +                goto fail_prog; + +        if (argc > 0 && flags & BIND_AUTO) { +                p->argv = create_argv(prog, argc, argv); +                if (p->argv == NULL) +                        goto fail_argv; +        } + +        list_head_init(&p->next); +        list_head_init(&p->names); + +        p->flags = flags; + +        return p; + + fail_argv: +        free(p->prog); + fail_prog: +        free(p); + fail_malloc: +        return NULL; +} + +void reg_prog_destroy(struct reg_prog * prog) +{ +        struct list_head * p; +        struct list_head * h; + +        if (prog == NULL) +                return; + +        list_for_each_safe(p, h, &prog->names) { +                struct str_el * s = list_entry(p, struct str_el, next); +                list_del(&s->next); +                free(s->str); +                free(s); +        } + +        argvfree(prog->argv); +        free(prog->prog); +        free(prog); +} + +int reg_prog_add_name(struct reg_prog * prog, +                      const char *      name) +{ +        struct str_el * s; + +        if (prog == NULL || name == NULL) +                return -EINVAL; + +        s = malloc(sizeof(*s)); +        if (s == NULL) +                goto fail_malloc; + +        s->str = strdup(name); +        if(s->str == NULL) +                goto fail_name; + +        list_add(&s->next, &prog->names); + +        return 0; + + fail_name: +        free(s); + fail_malloc: +        return -ENOMEM; +} + +void reg_prog_del_name(struct reg_prog * prog, +                       const char *      name) +{ +        struct list_head * p; +        struct list_head * h; + +        list_for_each_safe(p, h, &prog->names) { +                struct str_el * s = list_entry(p, struct str_el, next); +                if (!strcmp(name, s->str)) { +                        list_del(&s->next); +                        free(s->str); +                        free(s); +                } +        } +} diff --git a/src/irmd/reg/prog.h b/src/irmd/reg/prog.h new file mode 100644 index 00000000..c45f2d8c --- /dev/null +++ b/src/irmd/reg/prog.h @@ -0,0 +1,52 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Programs + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_PROG_H +#define OUROBOROS_IRMD_REG_PROG_H + +#include <ouroboros/list.h> + +#include <unistd.h> +#include <stdint.h> + +struct reg_prog { +        struct list_head next; +        char *           prog;    /* name of binary */ +        uint32_t         flags; +        char **          argv; +        struct list_head names; /* names that all instances will listen for */ +}; + +struct reg_prog * reg_prog_create(const char *  prog, +                                  uint32_t      flags, +                                  int           argc, +                                  char **       argv); + +void              reg_prog_destroy(struct reg_prog * prog); + +int               reg_prog_add_name(struct reg_prog * prog, +                                    const char *      name); + +void              reg_prog_del_name(struct reg_prog * prog, +                                    const char *      name); + +#endif /* OUROBOROS_IRMD_REG_PROG_H */ | 
