diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2023-03-18 21:02:10 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2023-03-21 18:12:28 +0100 | 
| commit | 54156a3d9a2a7f87591e5efd37a8fe6f708b933f (patch) | |
| tree | 8df5ecafc1fd199adb05ea8f4573103c38c63848 /src/irmd/reg | |
| parent | 9ce5f6b0b1ae281bfa82df31a92026d946366286 (diff) | |
| download | ouroboros-54156a3d9a2a7f87591e5efd37a8fe6f708b933f.tar.gz ouroboros-54156a3d9a2a7f87591e5efd37a8fe6f708b933f.zip | |
irmd: Move registry objects to their own sources
Rename internal data structures so it's clear that they are the IRMd
representation of these objects for management purposes.
Split functionality for these objects off and and move them to their
own source files.
Rename internal functions of the IRMd to reflect this, with some small
refactoring.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/irmd/reg')
| -rw-r--r-- | src/irmd/reg/flow.c | 222 | ||||
| -rw-r--r-- | src/irmd/reg/flow.h | 83 | ||||
| -rw-r--r-- | src/irmd/reg/ipcp.c | 152 | ||||
| -rw-r--r-- | src/irmd/reg/ipcp.h | 58 | ||||
| -rw-r--r-- | src/irmd/reg/name.c | 451 | ||||
| -rw-r--r-- | src/irmd/reg/name.h | 103 | ||||
| -rw-r--r-- | src/irmd/reg/proc.c | 265 | ||||
| -rw-r--r-- | src/irmd/reg/proc.h | 75 | ||||
| -rw-r--r-- | src/irmd/reg/prog.c | 174 | ||||
| -rw-r--r-- | src/irmd/reg/prog.h | 52 | 
10 files changed, 1635 insertions, 0 deletions
| diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c new file mode 100644 index 00000000..30b9c504 --- /dev/null +++ b/src/irmd/reg/flow.c @@ -0,0 +1,222 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Flows + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#define OUROBOROS_PREFIX "reg-flow" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/pthread.h> + +#include "flow.h" + +#include <assert.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +struct reg_flow * reg_flow_create(pid_t     n_pid, +                                  pid_t     n_1_pid, +                                  int       flow_id, +                                  qosspec_t qs) +{ +        pthread_condattr_t cattr; +        struct reg_flow *  f; + +        f = malloc(sizeof(*f)); +        if (f == NULL) +                goto fail_malloc; + +        memset(f, 0, sizeof(*f)); + +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&f->cond, &cattr)) +                goto fail_cond; + +        if (pthread_mutex_init(&f->mtx, NULL)) +                goto fail_mutex; + +        f->n_rb = shm_rbuff_create(n_pid, flow_id); +        if (f->n_rb == NULL) { +                log_err("Could not create ringbuffer for process %d.", n_pid); +                goto fail_n_rbuff; +        } + +        f->n_1_rb = shm_rbuff_create(n_1_pid, flow_id); +        if (f->n_1_rb == NULL) { +                log_err("Could not create ringbuffer for process %d.", n_1_pid); +                goto fail_n_1_rbuff; +        } + +        if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) +                log_warn("Failed to set timestamp."); + +        pthread_condattr_destroy(&cattr); + +        f->n_pid   = n_pid; +        f->n_1_pid = n_1_pid; +        f->flow_id = flow_id; +        f->qs      = qs; + +        f->state = FLOW_ALLOC_PENDING; + +        return f; + + fail_n_1_rbuff: +        shm_rbuff_destroy(f->n_rb); + fail_n_rbuff: +        pthread_mutex_destroy(&f->mtx); + fail_mutex: +        pthread_cond_destroy(&f->cond); + fail_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        free(f); + fail_malloc: +        return NULL; +} + +static void cancel_irm_destroy(void * o) +{ +        struct reg_flow * f = (struct reg_flow *) o; + +        pthread_mutex_unlock(&f->mtx); + +        pthread_cond_destroy(&f->cond); +        pthread_mutex_destroy(&f->mtx); + +        shm_rbuff_destroy(f->n_rb); +        shm_rbuff_destroy(f->n_1_rb); + +        free(f); +} + +void reg_flow_destroy(struct reg_flow * f) +{ +        assert(f); + +        pthread_mutex_lock(&f->mtx); + +        assert(f->data.len == 0); + +        if (f->state == FLOW_DESTROY) { +                pthread_mutex_unlock(&f->mtx); +                return; +        } + +        if (f->state == FLOW_ALLOC_PENDING) +                f->state = FLOW_DESTROY; +        else +                f->state = FLOW_NULL; + +        pthread_cond_broadcast(&f->cond); + +        pthread_cleanup_push(cancel_irm_destroy, f); + +        while (f->state != FLOW_NULL) +                pthread_cond_wait(&f->cond, &f->mtx); + +        pthread_cleanup_pop(true); +} + +enum flow_state reg_flow_get_state(struct reg_flow * f) +{ +        enum flow_state state; + +        assert(f); + +        pthread_mutex_lock(&f->mtx); + +        state = f->state; + +        pthread_mutex_unlock(&f->mtx); + +        return state; +} + +void reg_flow_set_state(struct reg_flow * f, +                        enum flow_state   state) +{ +        assert(f); +        assert(state != FLOW_DESTROY); + +        pthread_mutex_lock(&f->mtx); + +        f->state = state; +        pthread_cond_broadcast(&f->cond); + +        pthread_mutex_unlock(&f->mtx); +} + +int reg_flow_wait_state(struct reg_flow * f, +                        enum flow_state   state, +                        struct timespec * dl) +{ +        int ret = 0; +        int s; + +        assert(f); +        assert(state != FLOW_NULL); +        assert(state != FLOW_DESTROY); +        assert(state != FLOW_DEALLOC_PENDING); + +        pthread_mutex_lock(&f->mtx); + +        assert(f->state != FLOW_NULL); + +        pthread_cleanup_push(__cleanup_mutex_unlock, &f->mtx); + +        while (!(f->state == state || +                 f->state == FLOW_DESTROY || +                 f->state == FLOW_DEALLOC_PENDING) && +               ret != -ETIMEDOUT) { +                if (dl != NULL) +                        ret = -pthread_cond_timedwait(&f->cond, +                                                      &f->mtx, +                                                      dl); +                else +                        ret = -pthread_cond_wait(&f->cond, +                                                 &f->mtx); +        } + +        if (f->state == FLOW_DESTROY || +            f->state == FLOW_DEALLOC_PENDING || +            ret == -ETIMEDOUT) { +                f->state = FLOW_NULL; +                pthread_cond_broadcast(&f->cond); +        } + +        s = f->state; + +        pthread_cleanup_pop(true); + +        return ret ? ret : s; +} diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h new file mode 100644 index 00000000..9af15032 --- /dev/null +++ b/src/irmd/reg/flow.h @@ -0,0 +1,83 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Flows + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_FLOW_H +#define OUROBOROS_IRMD_REG_FLOW_H + +#include <ouroboros/list.h> +#include <ouroboros/qos.h> +#include <ouroboros/shm_rbuff.h> +#include <ouroboros/utils.h> + +#include <sys/types.h> +#include <pthread.h> +#include <time.h> + +enum flow_state { +        FLOW_NULL = 0, +        FLOW_ALLOC_PENDING, +        FLOW_ALLOC_REQ_PENDING, +        FLOW_ALLOCATED, +        FLOW_DEALLOC_PENDING, +        FLOW_DESTROY +}; + +struct reg_flow { +        struct list_head   next; + +        int                flow_id; + +        pid_t              n_pid; +        pid_t              n_1_pid; + +        qosspec_t          qs; +        time_t             mpl; +        buffer_t           data; + +        struct shm_rbuff * n_rb; +        struct shm_rbuff * n_1_rb; + +        struct timespec    t0; + +        enum flow_state    state; +        pthread_cond_t     cond; +        pthread_mutex_t    mtx; +}; + +struct reg_flow * reg_flow_create(pid_t     n_pid, +                                  pid_t     n_1_pid, +                                  int       flow_id, +                                  qosspec_t qs); + +void              reg_flow_destroy(struct reg_flow * f); + +enum flow_state   reg_flow_get_state(struct reg_flow * f); + + +void              reg_flow_set_state(struct reg_flow * f, +                                     enum flow_state   state); + +int               reg_flow_wait_state(struct reg_flow * f, +                                      enum flow_state   state, +                                      struct timespec * timeo); + +#endif /* OUROBOROS_IRMD_REG_FLOW_H */ diff --git a/src/irmd/reg/ipcp.c b/src/irmd/reg/ipcp.c new file mode 100644 index 00000000..62505871 --- /dev/null +++ b/src/irmd/reg/ipcp.c @@ -0,0 +1,152 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - IPCPs + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> +#include <ouroboros/pthread.h> +#include <ouroboros/time_utils.h> + +#include "ipcp.h" + +#include <assert.h> +#include <signal.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +struct reg_ipcp * reg_ipcp_create(const char *   name, +                                  enum ipcp_type type) +{ +        struct reg_ipcp *  ipcp; +        pthread_condattr_t cattr; + +        ipcp = malloc(sizeof(*ipcp)); +        if (ipcp == NULL) +                goto fail_malloc; + +        if (pthread_mutex_init(&ipcp->mtx, NULL)) +                goto fail_mutex; + +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&ipcp->cond, &cattr)) +                goto fail_cond; + +        ipcp->name = strdup(name); +        if (ipcp->name == NULL) +                goto fail_name; + +        pthread_condattr_destroy(&cattr); + +        ipcp->layer = NULL; +        ipcp->type  = type; +        ipcp->state = IPCP_BOOT; + +        list_head_init(&ipcp->next); + +        return ipcp; + + fail_name: +        pthread_cond_destroy(&ipcp->cond); + fail_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        pthread_mutex_destroy(&ipcp->mtx); + fail_mutex: +        free(ipcp); + fail_malloc: +        return NULL; +} + +void reg_ipcp_destroy(struct reg_ipcp * ipcp) +{ +        assert(ipcp); + +        pthread_mutex_lock(&ipcp->mtx); + +        while (ipcp->state == IPCP_BOOT) +                pthread_cond_wait(&ipcp->cond, &ipcp->mtx); + +        free(ipcp->layer); +        free(ipcp->name); + +        pthread_mutex_unlock(&ipcp->mtx); + +        pthread_cond_destroy(&ipcp->cond); +        pthread_mutex_destroy(&ipcp->mtx); + +        free(ipcp); +} + +void reg_ipcp_set_state(struct reg_ipcp * ipcp, +                        enum ipcp_state   state) +{ +        pthread_mutex_lock(&ipcp->mtx); + +        ipcp->state = state; +        pthread_cond_broadcast(&ipcp->cond); + +        pthread_mutex_unlock(&ipcp->mtx); +} + +int reg_ipcp_wait_boot(struct reg_ipcp * ipcp) +{ +        int             ret = 0; +        struct timespec dl; +        struct timespec to = {SOCKET_TIMEOUT / 1000, +                              (SOCKET_TIMEOUT % 1000) * MILLION}; + +        clock_gettime(PTHREAD_COND_CLOCK, &dl); +        ts_add(&dl, &to, &dl); + +        pthread_mutex_lock(&ipcp->mtx); + +        while (ipcp->state == IPCP_BOOT && ret != ETIMEDOUT) +                ret = pthread_cond_timedwait(&ipcp->cond, &ipcp->mtx, &dl); + +        if (ret == ETIMEDOUT) { +                kill(ipcp->pid, SIGTERM); +                ipcp->state = IPCP_NULL; +                pthread_cond_signal(&ipcp->cond); +        } + +        if (ipcp->state != IPCP_LIVE) { +                pthread_mutex_unlock(&ipcp->mtx); +                return -1; +        } + +        pthread_mutex_unlock(&ipcp->mtx); + +        return 0; +} diff --git a/src/irmd/reg/ipcp.h b/src/irmd/reg/ipcp.h new file mode 100644 index 00000000..8ad334cf --- /dev/null +++ b/src/irmd/reg/ipcp.h @@ -0,0 +1,58 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - IPCPs + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_IPCP_H +#define OUROBOROS_IRMD_REG_IPCP_H + +#include <ouroboros/list.h> + +enum ipcp_state { +        IPCP_NULL = 0, +        IPCP_BOOT, +        IPCP_LIVE +}; + +struct reg_ipcp { +        struct list_head next; + +        char *           name; +        pid_t            pid; +        enum ipcp_type   type; +        enum hash_algo   dir_hash_algo; +        char *           layer; + +        enum ipcp_state  state; +        pthread_cond_t   cond; +        pthread_mutex_t  mtx; +}; + +struct reg_ipcp * reg_ipcp_create(const char *   name, +                                  enum ipcp_type type); + +void              reg_ipcp_destroy(struct reg_ipcp * i); + +void              reg_ipcp_set_state(struct reg_ipcp * i, +                                     enum ipcp_state   state); + +int               reg_ipcp_wait_boot(struct reg_ipcp * i); + +#endif /* OUROBOROS_IRMD_REG_IPCP_H */
\ No newline at end of file diff --git a/src/irmd/reg/name.c b/src/irmd/reg/name.c new file mode 100644 index 00000000..7e13e888 --- /dev/null +++ b/src/irmd/reg/name.c @@ -0,0 +1,451 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Names + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#define OUROBOROS_PREFIX "reg_name" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/pthread.h> + +#include "name.h" +#include "utils.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <string.h> +#include <signal.h> +#include <unistd.h> +#include <limits.h> +#include <assert.h> + +struct reg_name * reg_name_create(const char *     name, +                                  enum pol_balance lb) +{ +        pthread_condattr_t cattr; +        struct reg_name *  n; + +        assert(name != NULL); + +        n = malloc(sizeof(*n)); +        if (n == NULL) +                goto fail_malloc; + +        if (pthread_condattr_init(&cattr)) +                goto fail_cattr; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&n->cond, &cattr)) +                goto fail_cond; + +        if (pthread_mutex_init(&n->mtx, NULL)) +                goto fail_mutex; + +        n->name = strdup(name); +        if (n->name == NULL) +                goto fail_name; + +        pthread_condattr_destroy(&cattr); + +        list_head_init(&n->next); +        list_head_init(&n->reg_progs); +        list_head_init(&n->reg_pids); + +        n->pol_lb = lb; +        n->state = NAME_IDLE; + +        return n; + + fail_name: +        pthread_mutex_destroy(&n->mtx); + fail_mutex: +        pthread_cond_destroy(&n->cond); + fail_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        free(n); + fail_malloc: +        return NULL; +} + +static void cancel_reg_name_destroy(void * o) +{ +        struct reg_name * name; +        struct list_head * p; +        struct list_head * h; + +        name = (struct reg_name *) o; + +        pthread_mutex_unlock(&name->mtx); + +        pthread_cond_destroy(&name->cond); +        pthread_mutex_destroy(&name->mtx); + +        if (name->name != NULL) +                free(name->name); + +        list_for_each_safe(p, h, &name->reg_pids) { +                struct pid_el * pe = list_entry(p, struct pid_el, next); +                list_del(&pe->next); +                free(pe); +        } + +        list_for_each_safe(p, h, &name->reg_progs) { +                struct str_el * se = list_entry(p, struct str_el, next); +                list_del(&se->next); +                free(se->str); +                free(se); +        } + +        free(name); +} + +void reg_name_destroy(struct reg_name * name) +{ +        if (name == NULL) +                return; + +        pthread_mutex_lock(&name->mtx); + +        if (name->state == NAME_DESTROY) { +                pthread_mutex_unlock(&name->mtx); +                return; +        } + +        if (name->state != NAME_FLOW_ACCEPT) +                name->state = NAME_NULL; +        else +                name->state = NAME_DESTROY; + +        pthread_cond_broadcast(&name->cond); + +        pthread_cleanup_push(cancel_reg_name_destroy, name); + +        while (name->state != NAME_NULL) +                pthread_cond_wait(&name->cond, &name->mtx); + +        pthread_cleanup_pop(true); +} + +static bool reg_name_has_prog(struct reg_name * name, +                              const char *      prog) +{ +        struct list_head * p; + +        list_for_each(p, &name->reg_progs) { +                struct str_el * name = list_entry(p, struct str_el, next); +                if (!strcmp(name->str, prog)) +                        return true; +        } + +        return false; +} + +int reg_name_add_prog(struct reg_name * name, +                      struct reg_prog * a) +{ +        struct str_el * n; + +        if (reg_name_has_prog(name, a->prog)) { +                log_warn("Program %s already accepting flows for %s.", +                         a->prog, name->name); +                return 0; +        } + +        if (!(a->flags & BIND_AUTO)) { +                log_dbg("Program %s cannot be auto-instantiated.", a->prog); +                return 0; +        } + +        n = malloc(sizeof(*n)); +        if (n == NULL) +                return -ENOMEM; + +        n->str = strdup(a->prog); +        if (n->str == NULL) { +                free(n); +                return -ENOMEM; +        } + +        list_add(&n->next, &name->reg_progs); + +        pthread_mutex_lock(&name->mtx); + +        if (name->state == NAME_IDLE) +                name->state = NAME_AUTO_ACCEPT; + +        pthread_mutex_unlock(&name->mtx); + +        return 0; +} + +void reg_name_del_prog(struct reg_name * name, +                       const char *      prog) +{ +        struct list_head * p; +        struct list_head * h; + +        list_for_each_safe(p, h, &name->reg_progs) { +                struct str_el * se = list_entry(p, struct str_el, next); +                if (strcmp(prog, se->str) == 0) { +                        list_del(&se->next); +                        free(se->str); +                        free(se); +                } +        } + +        pthread_mutex_lock(&name->mtx); + +        if (name->state == NAME_AUTO_ACCEPT && list_is_empty(&name->reg_progs)) { +                name->state = NAME_IDLE; +                pthread_cond_broadcast(&name->cond); +        } + +        pthread_mutex_unlock(&name->mtx); +} + +char * reg_name_get_prog(struct reg_name * name) +{ +        if (!list_is_empty(&name->reg_pids) || list_is_empty(&name->reg_progs)) +                return NULL; + +        return list_first_entry(&name->reg_progs, struct str_el, next)->str; +} + +static bool reg_name_has_pid(struct reg_name * name, +                             pid_t             pid) +{ +        struct list_head * p; + +        list_for_each(p, &name->reg_progs) { +                struct pid_el * name = list_entry(p, struct pid_el, next); +                if (name->pid == pid) +                        return true; +        } + +        return false; +} + +int reg_name_add_pid(struct reg_name * name, +                     pid_t             pid) +{ +        struct pid_el * i; + +        assert(name); + +        if (reg_name_has_pid(name, pid)) { +                log_dbg("Process already registered with this name."); +                return -EPERM; +        } + +        pthread_mutex_lock(&name->mtx); + +        if (name->state == NAME_NULL) { +                pthread_mutex_unlock(&name->mtx); +                log_dbg("Tried to add instance in NULL state."); +                return -EPERM; +        } + +        i = malloc(sizeof(*i)); +        if (i == NULL) { +                pthread_mutex_unlock(&name->mtx); +                return -ENOMEM; +        } + +        i->pid = pid; + +        /* load balancing policy assigns queue order for this process. */ +        switch(name->pol_lb) { +        case LB_RR:    /* Round robin policy. */ +                list_add_tail(&i->next, &name->reg_pids); +                break; +        case LB_SPILL: /* Keep accepting flows on the current process */ +                list_add(&i->next, &name->reg_pids); +                break; +        default: +                free(i); +                assert(false); +        }; + +        if (name->state == NAME_IDLE || +            name->state == NAME_AUTO_ACCEPT || +            name->state == NAME_AUTO_EXEC) { +                name->state = NAME_FLOW_ACCEPT; +                pthread_cond_broadcast(&name->cond); +        } + +        pthread_mutex_unlock(&name->mtx); + +        return 0; +} + +void reg_name_set_policy(struct reg_name * name, +                         enum pol_balance  lb) +{ +        name->pol_lb = lb; +} + +static void reg_name_check_state(struct reg_name * name) +{ +        assert(name); + +        if (name->state == NAME_DESTROY) { +                name->state = NAME_NULL; +                pthread_cond_broadcast(&name->cond); +                return; +        } + +        if (list_is_empty(&name->reg_pids)) { +                if (!list_is_empty(&name->reg_progs)) +                        name->state = NAME_AUTO_ACCEPT; +                else +                        name->state = NAME_IDLE; +        } else { +                name->state = NAME_FLOW_ACCEPT; +        } + +        pthread_cond_broadcast(&name->cond); +} + +void reg_name_del_pid_el(struct reg_name * name, +                         struct pid_el *   p) +{ +        assert(name); +        assert(p); + +        list_del(&p->next); +        free(p); + +        reg_name_check_state(name); +} + +void reg_name_del_pid(struct reg_name * name, +                      pid_t             pid) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(name); + +        if (name == NULL) +                return; + +        list_for_each_safe(p, h, &name->reg_pids) { +                struct pid_el * a = list_entry(p, struct pid_el, next); +                if (a->pid == pid) { +                        list_del(&a->next); +                        free(a); +                } +        } + +        reg_name_check_state(name); +} + +pid_t reg_name_get_pid(struct reg_name * name) +{ +        if (name == NULL) +                return -1; + +        if (list_is_empty(&name->reg_pids)) +                return -1; + +        return list_first_entry(&name->reg_pids, struct pid_el, next)->pid; +} + +enum name_state reg_name_get_state(struct reg_name * name) +{ +        enum name_state state; + +        assert(name); + +        pthread_mutex_lock(&name->mtx); + +        state = name->state; + +        pthread_mutex_unlock(&name->mtx); + +        return state; +} + +int reg_name_set_state(struct reg_name * name, +                       enum name_state   state) +{ +        assert(state != NAME_DESTROY); + +        pthread_mutex_lock(&name->mtx); + +        name->state = state; +        pthread_cond_broadcast(&name->cond); + +        pthread_mutex_unlock(&name->mtx); + +        return 0; +} + +int reg_name_leave_state(struct reg_name * name, +                         enum name_state   state, +                         struct timespec * timeout) +{ +        struct timespec abstime; +        int ret = 0; + +        assert(name); +        assert(state != NAME_DESTROY); + +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_mutex_lock(&name->mtx); + +        pthread_cleanup_push(__cleanup_mutex_unlock, &name->mtx); + +        while (name->state == state && ret != -ETIMEDOUT) +                if (timeout) +                        ret = -pthread_cond_timedwait(&name->cond, +                                                      &name->mtx, +                                                      &abstime); +                else +                        ret = -pthread_cond_wait(&name->cond, +                                                 &name->mtx); + +        if (name->state == NAME_DESTROY) { +                ret = -1; +                name->state = NAME_NULL; +                pthread_cond_broadcast(&name->cond); +        } + +        pthread_cleanup_pop(true); + +        return ret; +} diff --git a/src/irmd/reg/name.h b/src/irmd/reg/name.h new file mode 100644 index 00000000..0731782c --- /dev/null +++ b/src/irmd/reg/name.h @@ -0,0 +1,103 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Names + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_NAME_H +#define OUROBOROS_IRMD_REG_NAME_H + +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> +#include <ouroboros/list.h> +#include <ouroboros/irm.h> + +#include "proc.h" +#include "prog.h" + +#include <stdint.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <sys/types.h> + +enum name_state { +        NAME_NULL = 0, +        NAME_IDLE, +        NAME_AUTO_ACCEPT, +        NAME_AUTO_EXEC, +        NAME_FLOW_ACCEPT, +        NAME_FLOW_ARRIVED, +        NAME_DESTROY +}; + +/* An entry in the registry */ +struct reg_name { +        struct list_head    next; +        char *              name; + +        /* Policies for this name. */ +        enum pol_balance    pol_lb;  /* Load balance incoming flows. */ +        /* Programs that can be instantiated by the irmd. */ +        struct list_head    reg_progs; +        /* Processes that are listening for this name. */ +        struct list_head    reg_pids; + +        enum name_state     state; +        pthread_cond_t      cond; +        pthread_mutex_t     mtx; +}; + +struct reg_name * reg_name_create(const char *     name, +                                  enum pol_balance lb); + +void              reg_name_destroy(struct reg_name * n); + +int               reg_name_add_prog(struct reg_name * n, +                                    struct reg_prog * p); + +void              reg_name_del_prog(struct reg_name * n, +                                    const char *      prog); + +char *            reg_name_get_prog(struct reg_name * n); + +int               reg_name_add_pid(struct reg_name * n, +                                   pid_t             pid); + +void              reg_name_del_pid(struct reg_name * n, +                                   pid_t             pid); + +void              reg_name_del_pid_el(struct reg_name * n, +                                      struct pid_el *   p); + +pid_t             reg_name_get_pid(struct reg_name * n); + +void              reg_name_set_policy(struct reg_name * n, +                                      enum pol_balance  lb); + +enum name_state   reg_name_get_state(struct reg_name * n); + +int               reg_name_set_state(struct reg_name * n, +                                     enum name_state   state); + +int               reg_name_leave_state(struct reg_name * n, +                                       enum name_state   state, +                                       struct timespec * timeout); + +#endif /* OUROBOROS_IRMD_REG_NAME_H */ diff --git a/src/irmd/reg/proc.c b/src/irmd/reg/proc.c new file mode 100644 index 00000000..1aae789d --- /dev/null +++ b/src/irmd/reg/proc.c @@ -0,0 +1,265 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Processes + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include <ouroboros/list.h> +#include <ouroboros/errno.h> +#include <ouroboros/time_utils.h> + +#include "proc.h" +#include "name.h" + +#include <stdlib.h> +#include <unistd.h> +#include <limits.h> +#include <assert.h> + +struct reg_proc * reg_proc_create(pid_t        pid, +                                  const char * prog) +{ +        struct reg_proc *  proc; +        pthread_condattr_t cattr; + +        assert(prog); + +        proc = malloc(sizeof(*proc)); +        if (proc == NULL) +                goto fail_malloc; + +        if (pthread_condattr_init(&cattr)) +                goto fail_condattr; + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + +        if (pthread_mutex_init(&proc->lock, NULL)) +                goto fail_mutex; + +        if (pthread_cond_init(&proc->cond, &cattr)) +                goto fail_cond; + +        proc->set = shm_flow_set_create(pid); +        if (proc->set == NULL) +                goto fail_set; + +        proc->prog = strdup(prog); +        if(proc->prog == NULL) +                goto fail_prog; + +        list_head_init(&proc->next); +        list_head_init(&proc->names); + +        proc->pid      = pid; +        proc->name     = NULL; +        proc->state    = PROC_INIT; + +        return proc; + + fail_prog: +        shm_flow_set_destroy(proc->set); + fail_set: +        pthread_cond_destroy(&proc->cond);; + fail_cond: +        pthread_mutex_destroy(&proc->lock); + fail_mutex: +        pthread_condattr_destroy(&cattr); + fail_condattr: +        free(proc); + fail_malloc: +        return NULL; +} + +static void cancel_reg_proc(void * o) +{ +        struct reg_proc * proc = (struct reg_proc *) o; + +        proc->state = PROC_NULL; + +        pthread_mutex_unlock(&proc->lock); +} + +void reg_proc_destroy(struct reg_proc * proc) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(proc); + +        pthread_mutex_lock(&proc->lock); + +        if (proc->state == PROC_DESTROY) { +                pthread_mutex_unlock(&proc->lock); +                return; +        } + +        if (proc->state == PROC_SLEEP) +                proc->state = PROC_DESTROY; + +        pthread_cond_signal(&proc->cond); + +        pthread_cleanup_push(cancel_reg_proc, proc); + +        while (proc->state != PROC_INIT) +                pthread_cond_wait(&proc->cond, &proc->lock); + +        pthread_cleanup_pop(false); + +        pthread_mutex_unlock(&proc->lock); + +        shm_flow_set_destroy(proc->set); + +        pthread_cond_destroy(&proc->cond); +        pthread_mutex_destroy(&proc->lock); + +        list_for_each_safe(p, h, &proc->names) { +                struct str_el * n = list_entry(p, struct str_el, next); +                list_del(&n->next); +                if (n->str != NULL) +                        free(n->str); +                free(n); +        } + +        free(proc->prog); +        free(proc); +} + +int reg_proc_add_name(struct reg_proc * proc, +                      const char *      name) +{ +        struct str_el * s; + +        assert(proc); +        assert(name); + +        s = malloc(sizeof(*s)); +        if (s == NULL) +                goto fail_malloc; + +        s->str = strdup(name); +        if (s->str == NULL) +                goto fail_name; + +        list_add(&s->next, &proc->names); + +        return 0; + + fail_name: +        free(s); + fail_malloc: +        return -ENOMEM; +} + +void reg_proc_del_name(struct reg_proc * proc, +                       const char *      name) +{ +        struct list_head * p = NULL; +        struct list_head * h = NULL; + +        assert(proc); +        assert(name); + +        list_for_each_safe(p, h, &proc->names) { +                struct str_el * s = list_entry(p, struct str_el, next); +                if (!strcmp(name, s->str)) { +                        list_del(&s->next); +                        free(s->str); +                        free(s); +                } +        } +} + +int reg_proc_sleep(struct reg_proc * proc, +                   struct timespec * dl) +{ + +        int ret = 0; + +        assert(proc); + +        pthread_mutex_lock(&proc->lock); + +        if (proc->state != PROC_WAKE && proc->state != PROC_DESTROY) +                proc->state = PROC_SLEEP; + +        pthread_cleanup_push(cancel_reg_proc, proc); + +        while (proc->state == PROC_SLEEP && ret != -ETIMEDOUT) +                if (dl != NULL) +                        ret = -pthread_cond_timedwait(&proc->cond, +                                                      &proc->lock, dl); +                else +                        ret = -pthread_cond_wait(&proc->cond, &proc->lock); + +        pthread_cleanup_pop(false); + +        if (proc->state == PROC_DESTROY) { +                if (proc->name != NULL) +                        reg_name_del_pid(proc->name, proc->pid); +                ret = -1; +        } + +        proc->state = PROC_INIT; + +        pthread_cond_broadcast(&proc->cond); +        pthread_mutex_unlock(&proc->lock); + +        return ret; +} + +void reg_proc_wake(struct reg_proc * proc, +                   struct reg_name * name) +{ +        assert(proc); +        assert(name); + +        pthread_mutex_lock(&proc->lock); + +        if (proc->state != PROC_SLEEP) { +                pthread_mutex_unlock(&proc->lock); +                return; +        } + +        proc->state = PROC_WAKE; +        proc->name  = name; + +        pthread_cond_broadcast(&proc->cond); + +        pthread_cleanup_push(cancel_reg_proc, proc); + +        while (proc->state == PROC_WAKE) +                pthread_cond_wait(&proc->cond, &proc->lock); + +        pthread_cleanup_pop(false); + +        if (proc->state == PROC_DESTROY) +                proc->state = PROC_INIT; + +        pthread_mutex_unlock(&proc->lock); +} diff --git a/src/irmd/reg/proc.h b/src/irmd/reg/proc.h new file mode 100644 index 00000000..fb11f34f --- /dev/null +++ b/src/irmd/reg/proc.h @@ -0,0 +1,75 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Processes + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_PROC_H +#define OUROBOROS_IRMD_REG_PROC_H + +#include <ouroboros/shm_flow_set.h> + +#include "utils.h" + +#include <unistd.h> +#include <pthread.h> + +enum proc_state { +        PROC_NULL = 0, +        PROC_INIT, +        PROC_SLEEP, +        PROC_WAKE, +        PROC_DESTROY +}; + +struct reg_proc { +        struct list_head      next; +        pid_t                 pid; +        char *                prog;  /* program instantiated */ +        struct list_head      names; /* names for which process accepts flows */ +        struct shm_flow_set * set; + +        struct reg_name *     name;  /* name for which a flow arrived */ + +        /* The process will block on this */ +        enum proc_state       state; +        pthread_cond_t        cond; +        pthread_mutex_t       lock; +}; + +struct reg_proc * reg_proc_create(pid_t        proc, +                                  const char * prog); + +void              reg_proc_destroy(struct reg_proc * proc); + +int               reg_proc_sleep(struct reg_proc * proc, +                                 struct timespec * timeo); + +void              reg_proc_wake(struct reg_proc * proc, +                                struct reg_name * name); + +void              reg_proc_cancel(struct reg_proc * proc); + +int               reg_proc_add_name(struct reg_proc * proc, +                                    const char *      name); + +void              reg_proc_del_name(struct reg_proc * proc, +                                    const char *      name); + +#endif /* OUROBOROS_IRMD_REG_PROC_H */ diff --git a/src/irmd/reg/prog.c b/src/irmd/reg/prog.c new file mode 100644 index 00000000..e3fd6105 --- /dev/null +++ b/src/irmd/reg/prog.c @@ -0,0 +1,174 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Programs + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include <ouroboros/errno.h> +#include <ouroboros/irm.h> +#include <ouroboros/utils.h> + +#include "prog.h" +#include "utils.h" + +#include <assert.h> +#include <stdlib.h> +#include <string.h> + + +static char ** create_argv(const char *  prog, +                           size_t        argc, +                           char **       argv) +{ +        char ** argv2; +        size_t  i; + +        argv2 = malloc((argc + 2) * sizeof(*argv2)); /* prog + args + NULL */ +        if (argv2 == 0) +                goto fail_malloc; + +        argv2[0] = strdup(prog); +        if (argv2[0] == NULL) +                goto fail_prog; + +        for (i = 1; i <= argc; ++i) { +                argv2[i] = strdup(argv[i - 1]); +                if (argv2[i] == NULL) +                        goto fail_arg; +        } + +        argv2[argc + 1] = NULL; + +        return argv2; + + fail_arg: +        argvfree(argv2); + fail_prog: +        free(argv2); + fail_malloc: +        return NULL; +} + +struct reg_prog * reg_prog_create(const char * prog, +                                  uint32_t     flags, +                                  int          argc, +                                  char **      argv) +{ +        struct reg_prog * p; + +        assert(prog); + +        p = malloc(sizeof(*p)); +        if (p == NULL) +                goto fail_malloc; + +        memset(p, 0, sizeof(*p)); + +        p->prog  = strdup(path_strip(prog)); +        if (p->prog == NULL) +                goto fail_prog; + +        if (argc > 0 && flags & BIND_AUTO) { +                p->argv = create_argv(prog, argc, argv); +                if (p->argv == NULL) +                        goto fail_argv; +        } + +        list_head_init(&p->next); +        list_head_init(&p->names); + +        p->flags = flags; + +        return p; + + fail_argv: +        free(p->prog); + fail_prog: +        free(p); + fail_malloc: +        return NULL; +} + +void reg_prog_destroy(struct reg_prog * prog) +{ +        struct list_head * p; +        struct list_head * h; + +        if (prog == NULL) +                return; + +        list_for_each_safe(p, h, &prog->names) { +                struct str_el * s = list_entry(p, struct str_el, next); +                list_del(&s->next); +                free(s->str); +                free(s); +        } + +        argvfree(prog->argv); +        free(prog->prog); +        free(prog); +} + +int reg_prog_add_name(struct reg_prog * prog, +                      const char *      name) +{ +        struct str_el * s; + +        if (prog == NULL || name == NULL) +                return -EINVAL; + +        s = malloc(sizeof(*s)); +        if (s == NULL) +                goto fail_malloc; + +        s->str = strdup(name); +        if(s->str == NULL) +                goto fail_name; + +        list_add(&s->next, &prog->names); + +        return 0; + + fail_name: +        free(s); + fail_malloc: +        return -ENOMEM; +} + +void reg_prog_del_name(struct reg_prog * prog, +                       const char *      name) +{ +        struct list_head * p; +        struct list_head * h; + +        list_for_each_safe(p, h, &prog->names) { +                struct str_el * s = list_entry(p, struct str_el, next); +                if (!strcmp(name, s->str)) { +                        list_del(&s->next); +                        free(s->str); +                        free(s); +                } +        } +} diff --git a/src/irmd/reg/prog.h b/src/irmd/reg/prog.h new file mode 100644 index 00000000..c45f2d8c --- /dev/null +++ b/src/irmd/reg/prog.h @@ -0,0 +1,52 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Programs + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_PROG_H +#define OUROBOROS_IRMD_REG_PROG_H + +#include <ouroboros/list.h> + +#include <unistd.h> +#include <stdint.h> + +struct reg_prog { +        struct list_head next; +        char *           prog;    /* name of binary */ +        uint32_t         flags; +        char **          argv; +        struct list_head names; /* names that all instances will listen for */ +}; + +struct reg_prog * reg_prog_create(const char *  prog, +                                  uint32_t      flags, +                                  int           argc, +                                  char **       argv); + +void              reg_prog_destroy(struct reg_prog * prog); + +int               reg_prog_add_name(struct reg_prog * prog, +                                    const char *      name); + +void              reg_prog_del_name(struct reg_prog * prog, +                                    const char *      name); + +#endif /* OUROBOROS_IRMD_REG_PROG_H */ | 
