diff options
Diffstat (limited to 'src/lib/dev.c')
| -rw-r--r-- | src/lib/dev.c | 1861 |
1 files changed, 1248 insertions, 613 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index edcf56ed..92310b9e 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2018 + * Ouroboros - Copyright (C) 2016 - 2024 * * API for applications * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,31 +20,44 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include <ouroboros/endian.h> - +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else #define _POSIX_C_SOURCE 200809L +#endif #include "config.h" -#include <ouroboros/hash.h> -#include <ouroboros/errno.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/cep.h> +#include <ouroboros/crypt.h> #include <ouroboros/dev.h> +#include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> +#include <ouroboros/flow.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/list.h> #include <ouroboros/local-dev.h> -#include <ouroboros/sockets.h> -#include <ouroboros/fccntl.h> -#include <ouroboros/bitmap.h> +#include <ouroboros/np1_flow.h> +#include <ouroboros/pthread.h> #include <ouroboros/random.h> +#include <ouroboros/serdes-irm.h> #include <ouroboros/shm_flow_set.h> #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_rbuff.h> +#include <ouroboros/sockets.h> #include <ouroboros/utils.h> -#include <ouroboros/fqueue.h> -#include <ouroboros/qoscube.h> -#include <ouroboros/timerwheel.h> - -#include "rq.h" +#ifdef PROC_FLOW_STATS +#include <ouroboros/rib.h> +#endif +#ifdef HAVE_LIBGCRYPT +#include <gcrypt.h> +#endif +#include <assert.h> #include <stdlib.h> #include <string.h> #include <stdio.h> @@ -57,46 +70,39 @@ #endif /* Partial read information. */ -#define NO_PART -1 -#define DONE_PART -2 +#define NO_PART -1 +#define DONE_PART -2 -struct flow_set { - size_t idx; -}; +#define CRCLEN (sizeof(uint32_t)) +#define SECMEMSZ 16384 +#define MSGBUFSZ 2048 -struct fqueue { - int fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */ - size_t fqsize; - size_t next; +/* map flow_ids to flow descriptors; track state of the flow */ +struct fmap { + int fd; + /* TODO: use actual flow state */ + enum flow_state state; }; -enum port_state { - PORT_NULL = 0, - PORT_INIT, - PORT_ID_PENDING, - PORT_ID_ASSIGNED, - PORT_DESTROY -}; +#define frcti_to_flow(frcti) \ + ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti))) -struct port { - int fd; +struct flow { + struct list_head next; - enum port_state state; - pthread_mutex_t state_lock; - pthread_cond_t state_cond; -}; + struct flow_info info; -struct flow { struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; struct shm_flow_set * set; - int port_id; - int oflags; - qoscube_t cube; - qosspec_t spec; + + uint16_t oflags; ssize_t part_idx; - pid_t pid; + struct crypt_info crypt; + + struct timespec snd_act; + struct timespec rcv_act; bool snd_timesout; bool rcv_timesout; @@ -106,166 +112,342 @@ struct flow { struct frcti * frcti; }; -struct { - char * prog; - pid_t pid; +struct flow_set { + size_t idx; + pthread_rwlock_t lock; +}; + +struct fqueue { + struct flowevent fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */ + size_t fqsize; + size_t next; +}; +struct { struct shm_rdrbuff * rdrb; struct shm_flow_set * fqset; - struct timerwheel * tw; - struct bmp * fds; struct bmp * fqueues; struct flow * flows; - struct port * ports; + struct fmap * id_to_fd; + struct list_head flow_list; + + pthread_mutex_t mtx; + pthread_cond_t cond; + + pthread_t tx; + pthread_t rx; + size_t n_frcti; + fset_t * frct_set; pthread_rwlock_t lock; } ai; -#include "frct.c" - -static void port_destroy(struct port * p) +static void flow_destroy(struct fmap * p) { - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&ai.mtx); - if (p->state == PORT_DESTROY) { - pthread_mutex_unlock(&p->state_lock); + if (p->state == FLOW_DESTROY) { + pthread_mutex_unlock(&ai.mtx); return; } - if (p->state == PORT_ID_PENDING) - p->state = PORT_DESTROY; + if (p->state == FLOW_ALLOC_PENDING) + p->state = FLOW_DESTROY; else - p->state = PORT_NULL; + p->state = FLOW_NULL; + + pthread_cond_signal(&ai.cond); - pthread_cond_signal(&p->state_cond); + pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx); - while (p->state != PORT_NULL) - pthread_cond_wait(&p->state_cond, &p->state_lock); + while (p->state != FLOW_NULL) + pthread_cond_wait(&ai.cond, &ai.mtx); - p->fd = -1; - p->state = PORT_INIT; + p->fd = -1; + p->state = FLOW_INIT; - pthread_mutex_unlock(&p->state_lock); + pthread_cleanup_pop(true); } -static void port_set_state(struct port * p, - enum port_state state) +static void flow_set_state(struct fmap * p, + enum flow_state state) { - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&ai.mtx); - if (p->state == PORT_DESTROY) { - pthread_mutex_unlock(&p->state_lock); + if (p->state == FLOW_DESTROY) { + pthread_mutex_unlock(&ai.mtx); return; } p->state = state; - pthread_cond_broadcast(&p->state_cond); + pthread_cond_broadcast(&ai.cond); - pthread_mutex_unlock(&p->state_lock); + pthread_mutex_unlock(&ai.mtx); } -static enum port_state port_wait_assign(int port_id) +static enum flow_state flow_wait_assign(int flow_id) { - enum port_state state; - struct port * p; + enum flow_state state; + struct fmap * p; - p = &ai.ports[port_id]; + p = &ai.id_to_fd[flow_id]; - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&ai.mtx); - if (p->state == PORT_ID_ASSIGNED) { - pthread_mutex_unlock(&p->state_lock); - return PORT_ID_ASSIGNED; + if (p->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&ai.mtx); + return FLOW_ALLOCATED; } - if (p->state == PORT_INIT) - p->state = PORT_ID_PENDING; + if (p->state == FLOW_INIT) + p->state = FLOW_ALLOC_PENDING; + + pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx); - while (p->state == PORT_ID_PENDING) - pthread_cond_wait(&p->state_cond, &p->state_lock); + while (p->state == FLOW_ALLOC_PENDING) + pthread_cond_wait(&ai.cond, &ai.mtx); - if (p->state == PORT_DESTROY) { - p->state = PORT_NULL; - pthread_cond_broadcast(&p->state_cond); + if (p->state == FLOW_DESTROY) { + p->state = FLOW_NULL; + pthread_cond_broadcast(&ai.cond); } state = p->state; - assert(state != PORT_INIT); + pthread_cleanup_pop(true); - pthread_mutex_unlock(&p->state_lock); + assert(state != FLOW_INIT); return state; } -static int proc_announce(char * prog) +static int proc_announce(const char * prog) +{ + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; + + if (proc_announce__irm_req_ser(&msg, prog) < 0) + return -ENOMEM; + + err = send_recv_msg(&msg); + if (err < 0) + return err; + + return irm__irm_result_des(&msg); +} + +/* IRMd will clean up the mess if this fails */ +static void proc_exit(void) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE; - msg.has_pid = true; - msg.pid = ai.pid; - msg.prog = prog; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) { - return -EIRMD; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + + if (proc_exit__irm_req_ser(&msg) < 0) + return; + + send_recv_msg(&msg); +} + +#include "frct.c" + +void * flow_tx(void * o) +{ + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + + (void) o; + + while (true) { + timerwheel_move(); + + nanosleep(&tic, NULL); } - if (!recv_msg->has_result || (ret = recv_msg->result)) { - irm_msg__free_unpacked(recv_msg, NULL); - return ret; + return (void *) 0; +} + +static void flow_send_keepalive(struct flow * flow, + struct timespec now) +{ + struct shm_du_buff * sdb; + ssize_t idx; + uint8_t * ptr; + + idx = shm_rdrbuff_alloc(ai.rdrb, 0, &ptr, &sdb); + if (idx < 0) + return; + + pthread_rwlock_wrlock(&ai.lock); + + flow->snd_act = now; + + if (shm_rbuff_write(flow->tx_rb, idx)) + shm_rdrbuff_remove(ai.rdrb, idx); + else + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + + pthread_rwlock_unlock(&ai.lock); +} + +/* Needs rdlock on ai. */ +static void _flow_keepalive(struct flow * flow) +{ + struct timespec now; + struct timespec s_act; + struct timespec r_act; + int flow_id; + time_t timeo; + uint32_t acl; + + s_act = flow->snd_act; + r_act = flow->rcv_act; + + flow_id = flow->info.id; + timeo = flow->info.qs.timeout; + + acl = shm_rbuff_get_acl(flow->rx_rb); + if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) + return; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + if (ts_diff_ns(&r_act, &now) > (int64_t) timeo * MILLION) { + shm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); + shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); + return; } - irm_msg__free_unpacked(recv_msg, NULL); + if (ts_diff_ns(&s_act, &now) > (int64_t) timeo * (MILLION >> 2)) { + pthread_rwlock_unlock(&ai.lock); - return ret; + flow_send_keepalive(flow, now); + + pthread_rwlock_rdlock(&ai.lock); + } +} + +static void handle_keepalives(void) +{ + struct list_head * p; + struct list_head * h; + + pthread_rwlock_rdlock(&ai.lock); + + list_for_each_safe(p, h, &ai.flow_list) { + struct flow * flow; + flow = list_entry(p, struct flow, next); + _flow_keepalive(flow); + } + + pthread_rwlock_unlock(&ai.lock); +} + +static void __cleanup_fqueue_destroy(void * fq) +{ + fqueue_destroy((fqueue_t *) fq); +} + +void * flow_rx(void * o) +{ + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + int ret; + struct fqueue * fq; + + (void) o; + + fq = fqueue_create(); + + pthread_cleanup_push(__cleanup_fqueue_destroy, fq); + + /* fevent will filter all FRCT packets for us */ + while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) { + if (ret == -ETIMEDOUT) { + handle_keepalives(); + continue; + } + + while (fqueue_next(fq) >= 0) + ; /* no need to act */ + } + + pthread_cleanup_pop(true); + + return (void *) 0; } static void flow_clear(int fd) { memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); - ai.flows[fd].port_id = -1; - ai.flows[fd].pid = -1; - ai.flows[fd].cube = QOS_CUBE_BE; + ai.flows[fd].info.id = -1; } -static void flow_fini(int fd) +static void __flow_fini(int fd) { - assert(!(fd < 0)); + assert(fd >= 0 && fd < SYS_MAX_FLOWS); + + if (ai.flows[fd].frcti != NULL) { + ai.n_frcti--; + if (ai.n_frcti == 0) { + pthread_cancel(ai.tx); + pthread_join(ai.tx, NULL); + } - if (ai.flows[fd].port_id != -1) { - port_destroy(&ai.ports[ai.flows[fd].port_id]); + shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); + + frcti_destroy(ai.flows[fd].frcti); + } + + if (ai.flows[fd].info.id != -1) { + flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]); bmp_release(ai.fds, fd); } - if (ai.flows[fd].rx_rb != NULL) + if (ai.flows[fd].rx_rb != NULL) { + shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); shm_rbuff_close(ai.flows[fd].rx_rb); + } - if (ai.flows[fd].tx_rb != NULL) + if (ai.flows[fd].tx_rb != NULL) { + shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); shm_rbuff_close(ai.flows[fd].tx_rb); + } - if (ai.flows[fd].set != NULL) + if (ai.flows[fd].set != NULL) { + shm_flow_set_notify(ai.flows[fd].set, + ai.flows[fd].info.id, + FLOW_DEALLOC); shm_flow_set_close(ai.flows[fd].set); + } - if (ai.flows[fd].frcti != NULL) - frcti_destroy(ai.flows[fd].frcti); + crypt_fini(&ai.flows[fd].crypt); + + list_del(&ai.flows[fd].next); flow_clear(fd); } -static int flow_init(int port_id, - pid_t pid, - qoscube_t qc) +static void flow_fini(int fd) { - int fd; - int err = -ENOMEM; + pthread_rwlock_wrlock(&ai.lock); + + __flow_fini(fd); + + pthread_rwlock_unlock(&ai.lock); +} + +static int flow_init(struct flow_info * info, + buffer_t * sk) +{ + struct timespec now; + struct flow * flow; + int fd; + int err = -ENOMEM; + + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_wrlock(&ai.lock); @@ -275,35 +457,77 @@ static int flow_init(int port_id, goto fail_fds; } - ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, port_id); - if (ai.flows[fd].rx_rb == NULL) - goto fail; + flow = &ai.flows[fd]; - ai.flows[fd].tx_rb = shm_rbuff_open(pid, port_id); - if (ai.flows[fd].tx_rb == NULL) - goto fail; + flow->info = *info; - ai.flows[fd].set = shm_flow_set_open(pid); - if (ai.flows[fd].set == NULL) - goto fail; + flow->rx_rb = shm_rbuff_open(info->n_pid, info->id); + if (flow->rx_rb == NULL) + goto fail_rx_rb; + + flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id); + if (flow->tx_rb == NULL) + goto fail_tx_rb; + + flow->set = shm_flow_set_open(info->n_1_pid); + if (flow->set == NULL) + goto fail_set; + + flow->oflags = FLOWFDEFAULT; + flow->part_idx = NO_PART; + flow->snd_act = now; + flow->rcv_act = now; - ai.flows[fd].port_id = port_id; - ai.flows[fd].oflags = FLOWFDEFAULT; - ai.flows[fd].pid = pid; - ai.flows[fd].cube = qc; - ai.flows[fd].spec = qos_cube_to_spec(qc); - ai.flows[fd].part_idx = NO_PART; + flow->crypt.flags = info->qs.cypher_s; /* TODO: move cypher_s */ - ai.ports[port_id].fd = fd; + memset(flow->crypt.key, 0, SYMMKEYSZ); + + if (flow->crypt.flags > 0 && sk!= NULL && sk->data != NULL) + memcpy(flow->crypt.key, sk->data , sk->len); + + if (crypt_init(&flow->crypt) < 0) + goto fail_crypt; + + assert(flow->frcti == NULL); + + if (info->qs.in_order != 0) { + flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl); + if (flow->frcti == NULL) + goto fail_frcti; - port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + if (shm_flow_set_add(ai.fqset, 0, info->id)) + goto fail_flow_set_add; + + ++ai.n_frcti; + if (ai.n_frcti == 1 && + pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0) + goto fail_tx_thread; + } + + list_add_tail(&flow->next, &ai.flow_list); + + ai.id_to_fd[info->id].fd = fd; + + flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED); pthread_rwlock_unlock(&ai.lock); return fd; - fail: - flow_fini(fd); + fail_tx_thread: + shm_flow_set_del(ai.fqset, 0, info->id); + fail_flow_set_add: + frcti_destroy(flow->frcti); + fail_frcti: + crypt_fini(&flow->crypt); + fail_crypt: + shm_flow_set_close(flow->set); + fail_set: + shm_rbuff_close(flow->tx_rb); + fail_tx_rb: + shm_rbuff_close(flow->rx_rb); + fail_rx_rb: + bmp_release(ai.fds, fd); fail_fds: pthread_rwlock_unlock(&ai.lock); return err; @@ -323,153 +547,205 @@ static void init(int argc, char ** argv, char ** envp) { - const char * prog = argv[0]; - int i; - + char * prog = argv[0]; + int i; +#ifdef PROC_FLOW_STATS + char procstr[32]; +#endif (void) argc; (void) envp; - assert(ai.prog == NULL); - if (check_python(argv[0])) prog = argv[1]; - ai.pid = getpid(); + prog = path_strip(prog); + if (prog == NULL) { + fprintf(stderr, "FATAL: Could not determine program name.\n"); + goto fail_prog; + } + if (proc_announce(prog)) { + fprintf(stderr, "FATAL: Could not announce to IRMd.\n"); + goto fail_prog; + } + +#ifdef HAVE_LIBGCRYPT + if (!gcry_control(GCRYCTL_INITIALIZATION_FINISHED_P)) { + if (!gcry_check_version(GCRYPT_VERSION)) { + fprintf(stderr, "FATAL: Could not get gcry version.\n"); + goto fail_prog; + } + gcry_control(GCRYCTL_DISABLE_SECMEM, 0); + gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0); + } +#endif ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS); - if (ai.fds == NULL) + if (ai.fds == NULL) { + fprintf(stderr, "FATAL: Could not create fd bitmap.\n"); goto fail_fds; + } ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0); - if (ai.fqueues == NULL) + if (ai.fqueues == NULL) { + fprintf(stderr, "FATAL: Could not create fqueue bitmap.\n"); goto fail_fqueues; - - ai.fqset = shm_flow_set_create(); - if (ai.fqset == NULL) - goto fail_fqset; + } ai.rdrb = shm_rdrbuff_open(); - if (ai.rdrb == NULL) + if (ai.rdrb == NULL) { + fprintf(stderr, "FATAL: Could not open packet buffer.\n"); goto fail_rdrb; + } ai.flows = malloc(sizeof(*ai.flows) * PROG_MAX_FLOWS); - if (ai.flows == NULL) + if (ai.flows == NULL) { + fprintf(stderr, "FATAL: Could not malloc flows.\n"); goto fail_flows; + } for (i = 0; i < PROG_MAX_FLOWS; ++i) flow_clear(i); - ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS); - if (ai.ports == NULL) - goto fail_ports; + ai.id_to_fd = malloc(sizeof(*ai.id_to_fd) * SYS_MAX_FLOWS); + if (ai.id_to_fd == NULL) { + fprintf(stderr, "FATAL: Could not malloc id_to_fd.\n"); + goto fail_id_to_fd; + } - if (prog != NULL) { - ai.prog = strdup(path_strip((char *) prog)); - if (ai.prog == NULL) - goto fail_prog; + for (i = 0; i < SYS_MAX_FLOWS; ++i) + ai.id_to_fd[i].state = FLOW_INIT; - if (proc_announce((char *) ai.prog)) - goto fail_announce; + if (pthread_mutex_init(&ai.mtx, NULL)) { + fprintf(stderr, "FATAL: Could not init mutex.\n"); + goto fail_mtx; } - for (i = 0; i < SYS_MAX_FLOWS; ++i) { - ai.ports[i].state = PORT_INIT; - if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) { - int j; - for (j = 0; j < i; ++j) - pthread_mutex_destroy(&ai.ports[j].state_lock); - goto fail_announce; - } - if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) { - int j; - for (j = 0; j < i; ++j) - pthread_cond_destroy(&ai.ports[j].state_cond); - goto fail_state_cond; - } + if (pthread_cond_init(&ai.cond, NULL) < 0) { + fprintf(stderr, "FATAL: Could not init condvar.\n"); + goto fail_cond; } - if (pthread_rwlock_init(&ai.lock, NULL)) - goto fail_lock; + if (pthread_rwlock_init(&ai.lock, NULL) < 0) { + fprintf(stderr, "FATAL: Could not initialize flow lock.\n"); + goto fail_flow_lock; + } + + ai.fqset = shm_flow_set_open(getpid()); + if (ai.fqset == NULL) { + fprintf(stderr, "FATAL: Could not open flow set.\n"); + goto fail_fqset; + } + + ai.frct_set = fset_create(); + if (ai.frct_set == NULL || ai.frct_set->idx != 0) { + fprintf(stderr, "FATAL: Could not create FRCT set.\n"); + goto fail_frct_set; + } + + if (timerwheel_init() < 0) { + fprintf(stderr, "FATAL: Could not initialize timerwheel.\n"); + goto fail_timerwheel; + } + +#if defined PROC_FLOW_STATS + if (strstr(argv[0], "ipcpd") == NULL) { + sprintf(procstr, "proc.%d", getpid()); + if (rib_init(procstr) < 0) { + fprintf(stderr, "FATAL: Could not initialize RIB.\n"); + goto fail_rib_init; + } + } +#endif + if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0) { + fprintf(stderr, "FATAL: Could not start monitor thread.\n"); + goto fail_monitor; + } - if (frct_init()) - goto fail_frct; + list_head_init(&ai.flow_list); return; - fail_frct: + fail_monitor: +#if defined PROC_FLOW_STATS + rib_fini(); + fail_rib_init: +#endif + timerwheel_fini(); + fail_timerwheel: + fset_destroy(ai.frct_set); + fail_frct_set: + shm_flow_set_close(ai.fqset); + fail_fqset: pthread_rwlock_destroy(&ai.lock); - fail_lock: - for (i = 0; i < SYS_MAX_FLOWS; ++i) - pthread_cond_destroy(&ai.ports[i].state_cond); - fail_state_cond: - for (i = 0; i < SYS_MAX_FLOWS; ++i) - pthread_mutex_destroy(&ai.ports[i].state_lock); - fail_announce: - free(ai.prog); - fail_prog: - free(ai.ports); - fail_ports: + fail_flow_lock: + pthread_cond_destroy(&ai.cond); + fail_cond: + pthread_mutex_destroy(&ai.mtx); + fail_mtx: + free(ai.id_to_fd); + fail_id_to_fd: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); fail_rdrb: - shm_flow_set_destroy(ai.fqset); - fail_fqset: bmp_destroy(ai.fqueues); fail_fqueues: bmp_destroy(ai.fds); fail_fds: - fprintf(stderr, "FATAL: ouroboros-dev init failed. " - "Make sure an IRMd is running.\n\n"); memset(&ai, 0, sizeof(ai)); + fail_prog: exit(EXIT_FAILURE); } static void fini(void) { - int i = 0; + int i; if (ai.fds == NULL) return; - frct_fini(); - - shm_flow_set_destroy(ai.fqset); - - if (ai.prog != NULL) - free(ai.prog); + pthread_cancel(ai.rx); + pthread_join(ai.rx, NULL); pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < PROG_MAX_FLOWS; ++i) { - if (ai.flows[i].port_id != -1) { + if (ai.flows[i].info.id != -1) { ssize_t idx; shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) shm_rdrbuff_remove(ai.rdrb, idx); - flow_fini(i); + __flow_fini(i); } } - for (i = 0; i < SYS_MAX_FLOWS; ++i) { - pthread_mutex_destroy(&ai.ports[i].state_lock); - pthread_cond_destroy(&ai.ports[i].state_cond); - } + pthread_cond_destroy(&ai.cond); + pthread_mutex_destroy(&ai.mtx); - shm_rdrbuff_close(ai.rdrb); + pthread_rwlock_unlock(&ai.lock); - if (ai.tw != NULL) - timerwheel_destroy(ai.tw); +#ifdef PROC_FLOW_STATS + rib_fini(); +#endif + timerwheel_fini(); + + fset_destroy(ai.frct_set); + + shm_flow_set_close(ai.fqset); + + pthread_rwlock_destroy(&ai.lock); free(ai.flows); - free(ai.ports); + free(ai.id_to_fd); + + shm_rdrbuff_close(ai.rdrb); bmp_destroy(ai.fds); bmp_destroy(ai.fqueues); - pthread_rwlock_unlock(&ai.lock); + proc_exit(); - pthread_rwlock_destroy(&ai.lock); + memset(&ai, 0, sizeof(ai)); } #if defined(__MACH__) && defined(__APPLE__) @@ -486,189 +762,250 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini; int flow_accept(qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int fd = -1; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; - msg.has_pid = true; - msg.pid = ai.pid; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + buffer_t sk; + int fd; + int err; + +#ifdef QOS_DISABLE_CRC + if (qs != NULL) + qs->ber = 1; +#endif + memset(&flow, 0, sizeof(flow)); - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; + if (flow_accept__irm_req_ser(&msg, &flow, timeo)) + return -ENOMEM; - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + err = send_recv_msg(&msg); + if (err < 0) + return err; - if (recv_msg->result != 0) { - int res = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - return res; - } + err = flow__irm_result_des(&msg, &flow, &sk); + if (err < 0) + return err; - if (!recv_msg->has_pid || !recv_msg->has_port_id || - !recv_msg->has_qoscube) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + fd = flow_init(&flow, &sk); - fd = flow_init(recv_msg->port_id, recv_msg->pid, recv_msg->qoscube); + freebuf(sk); - irm_msg__free_unpacked(recv_msg, NULL); + if (qs != NULL) + *qs = flow.qs; - if (fd < 0) - return fd; + return fd; +} - pthread_rwlock_wrlock(&ai.lock); +int flow_alloc(const char * dst, + qosspec_t * qs, + const struct timespec * timeo) +{ + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + buffer_t sk; /* symmetric key */ + int fd; + int err; + +#ifdef QOS_DISABLE_CRC + if (qs != NULL) + qs->ber = 1; +#endif - /* FIXME: check if FRCT is needed based on qc? */ + memset(&flow, 0, sizeof(flow)); - assert(ai.flows[fd].frcti == NULL); + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - ai.flows[fd].frcti = frcti_create(fd); - if (ai.flows[fd].frcti == NULL) { - flow_fini(fd); - pthread_rwlock_unlock(&ai.lock); + if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) return -ENOMEM; - } - if (qs != NULL) - *qs = ai.flows[fd].spec; + err = send_recv_msg(&msg); + if (err < 0) + return err; - pthread_rwlock_unlock(&ai.lock); + err = flow__irm_result_des(&msg, &flow, &sk); + if (err < 0) + return err; + + fd = flow_init(&flow, &sk); + + freebuf(sk); + + if (qs != NULL) + *qs = flow.qs; return fd; } -int flow_alloc(const char * dst, - qosspec_t * qs, - const struct timespec * timeo) +int flow_join(const char * dst, + qosspec_t * qs, + const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - qoscube_t qc = QOS_CUBE_BE; - int fd; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int fd; + int err; + +#ifdef QOS_DISABLE_CRC + if (qs != NULL) + qs->ber = 1; +#endif + if (qs != NULL && qs->cypher_s > 0) + return -ENOTSUP; /* TODO: Encrypted broadcast */ + + memset(&flow, 0, sizeof(flow)); + + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.has_qoscube = true; - msg.pid = ai.pid; + if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) + return -ENOMEM; + + err = send_recv_msg(&msg); + if (err < 0) + return err; + + err = flow__irm_result_des(&msg, &flow, NULL); + if (err < 0) + return err; + + fd = flow_init(&flow, NULL); if (qs != NULL) - qc = qos_spec_to_cube(*qs); + *qs = flow.qs; - msg.qoscube = qc; + return fd; +} - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } +#define PKT_BUF_LEN 2048 +int flow_dealloc(int fd) +{ + struct flow_info info; + uint8_t pkt[PKT_BUF_LEN]; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + struct timespec timeo = TIMESPEC_INIT_S(0); + struct flow * flow; + int err; + + if (fd < 0 || fd >= SYS_MAX_FLOWS ) + return -EINVAL; - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; + memset(&info, 0, sizeof(flow)); - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + flow = &ai.flows[fd]; - if (recv_msg->result != 0) { - int res = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - return res; - } + pthread_rwlock_rdlock(&ai.lock); - if (!recv_msg->has_pid || !recv_msg->has_port_id) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; + if (flow->info.id < 0) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + flow->oflags = FLOWFDEFAULT | FLOWFRNOPART; - irm_msg__free_unpacked(recv_msg, NULL); + flow->rcv_timesout = true; + flow->rcv_timeo = tic; - if (fd < 0) - return fd; + pthread_rwlock_unlock(&ai.lock); - pthread_rwlock_wrlock(&ai.lock); + flow_read(fd, buf, SOCK_BUF_SIZE); + + pthread_rwlock_rdlock(&ai.lock); - /* FIXME: check if FRCT is needed based on qc? */ - assert(ai.flows[fd].frcti == NULL); + timeo.tv_sec = frcti_dealloc(flow->frcti); + while (timeo.tv_sec < 0) { /* keep the flow active for rtx */ + ssize_t ret; - ai.flows[fd].frcti = frcti_create(fd); - if (ai.flows[fd].frcti == NULL) { - flow_fini(fd); pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; + + ret = flow_read(fd, pkt, PKT_BUF_LEN); + + pthread_rwlock_rdlock(&ai.lock); + + timeo.tv_sec = frcti_dealloc(flow->frcti); + + if (ret == -EFLOWDOWN && timeo.tv_sec < 0) + timeo.tv_sec = -timeo.tv_sec; } - pthread_rwlock_unlock(&ai.lock); + pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); - return fd; + shm_rbuff_fini(flow->tx_rb); + + pthread_cleanup_pop(true); + + info.id = flow->info.id; + info.n_pid = getpid(); + + if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) + return -ENOMEM; + + err = send_recv_msg(&msg); + if (err < 0) + return err; + + err = irm__irm_result_des(&msg); + + flow_fini(fd); + + return err; } -int flow_dealloc(int fd) +int ipcp_flow_dealloc(int fd) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; + struct flow_info info; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + struct flow * flow; + int err; - if (fd < 0) + if (fd < 0 || fd >= SYS_MAX_FLOWS ) return -EINVAL; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_port_id = true; - msg.has_pid = true; - msg.pid = ai.pid; + flow = &ai.flows[fd]; + + memset(&info, 0, sizeof(flow)); pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].port_id >= 0); + if (flow->info.id < 0) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; + } - msg.port_id = ai.flows[fd].port_id; + info.id = flow->info.id; + info.n_1_pid = flow->info.n_1_pid; pthread_rwlock_unlock(&ai.lock); - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - pthread_rwlock_wrlock(&ai.lock); + err = irm__irm_result_des(&msg); flow_fini(fd); - pthread_rwlock_unlock(&ai.lock); - - return 0; + return err; } int fccntl(int fd, int cmd, ...) { - uint16_t sflags; uint32_t * fflags; uint16_t * cflags; + uint16_t csflags; va_list l; struct timespec * timeo; qosspec_t * qs; @@ -677,7 +1014,7 @@ int fccntl(int fd, size_t * qlen; struct flow * flow; - if (fd < 0 || fd >= PROG_MAX_FLOWS) + if (fd < 0 || fd >= SYS_MAX_FLOWS) return -EBADF; flow = &ai.flows[fd]; @@ -686,7 +1023,7 @@ int fccntl(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -725,13 +1062,13 @@ int fccntl(int fd, goto einval; if (!flow->rcv_timesout) goto eperm; - *timeo = flow->snd_timeo; + *timeo = flow->rcv_timeo; break; case FLOWGQOSSPEC: qs = va_arg(l, qosspec_t *); if (qs == NULL) goto einval; - *qs = flow->spec; + *qs = flow->info.qs; break; case FLOWGRXQLEN: qlen = va_arg(l, size_t *); @@ -739,7 +1076,7 @@ int fccntl(int fd, break; case FLOWGTXQLEN: qlen = va_arg(l, size_t *); - *qlen = shm_rbuff_queued(flow->rx_rb); + *qlen = shm_rbuff_queued(flow->tx_rb); break; case FLOWSFLAGS: flow->oflags = va_arg(l, uint32_t); @@ -757,9 +1094,15 @@ int fccntl(int fd, if (flow->oflags & FLOWFDOWN) { rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; + shm_flow_set_notify(flow->set, + flow->info.id, + FLOW_DOWN); } else { rx_acl &= ~ACL_FLOWDOWN; tx_acl &= ~ACL_FLOWDOWN; + shm_flow_set_notify(flow->set, + flow->info.id, + FLOW_UP); } shm_rbuff_set_acl(flow->rx_rb, rx_acl); @@ -773,17 +1116,18 @@ int fccntl(int fd, *fflags = flow->oflags; break; case FRCTSFLAGS: - sflags = (uint16_t) va_arg(l, int); - if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags)) + csflags = (uint16_t) va_arg(l, uint32_t); + if (flow->frcti == NULL) goto eperm; + frcti_setflags(flow->frcti, csflags); break; case FRCTGFLAGS: - cflags = (uint16_t *) va_arg(l, int *); + cflags = (uint16_t *) va_arg(l, uint32_t *); if (cflags == NULL) goto einval; if (flow->frcti == NULL) goto eperm; - *cflags = frcti_getconf(flow->frcti); + *cflags = frcti_getflags(flow->frcti); break; default: pthread_rwlock_unlock(&ai.lock); @@ -808,35 +1152,114 @@ int fccntl(int fd, return -EPERM; } +static int chk_crc(struct shm_du_buff * sdb) +{ + uint32_t crc; + uint8_t * head = shm_du_buff_head(sdb); + uint8_t * tail = shm_du_buff_tail_release(sdb, CRCLEN); + + mem_hash(HASH_CRC32, &crc, head, tail - head); + + return !(crc == *((uint32_t *) tail)); +} + +static int add_crc(struct shm_du_buff * sdb) +{ + uint8_t * head = shm_du_buff_head(sdb); + uint8_t * tail = shm_du_buff_tail_alloc(sdb, CRCLEN); + if (tail == NULL) + return -1; + + mem_hash(HASH_CRC32, tail, head, tail - head); + + return 0; +} + +static int flow_tx_sdb(struct flow * flow, + struct shm_du_buff * sdb, + bool block, + struct timespec * abstime) +{ + struct timespec now; + ssize_t idx; + int ret; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&ai.lock); + + flow->snd_act = now; + + pthread_rwlock_unlock(&ai.lock); + + idx = shm_du_buff_get_idx(sdb); + + pthread_rwlock_rdlock(&ai.lock); + + if (shm_du_buff_len(sdb) > 0) { + if (frcti_snd(flow->frcti, sdb) < 0) + goto enomem; + + if (crypt_encrypt(&flow->crypt, sdb) < 0) + goto enomem; + + if (flow->info.qs.ber == 0 && add_crc(sdb) != 0) + goto enomem; + } + + pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); + + if (!block) + ret = shm_rbuff_write(flow->tx_rb, idx); + else + ret = shm_rbuff_write_b(flow->tx_rb, idx, abstime); + + if (ret < 0) + shm_rdrbuff_remove(ai.rdrb, idx); + else + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + + pthread_cleanup_pop(true); + + return 0; + +enomem: + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOMEM; +} + ssize_t flow_write(int fd, const void * buf, size_t count) { - struct flow * flow; - ssize_t idx; - int ret; - int flags; - struct timespec abs; - struct timespec * abstime = NULL; + struct flow * flow; + ssize_t idx; + int ret; + int flags; + struct timespec abs; + struct timespec * abstime = NULL; + struct shm_du_buff * sdb; + uint8_t * ptr; - if (buf == NULL) - return 0; + if (buf == NULL && count != 0) + return -EINVAL; - if (fd < 0 || fd > PROG_MAX_FLOWS) + if (fd < 0 || fd >= PROG_MAX_FLOWS) return -EBADF; flow = &ai.flows[fd]; clock_gettime(PTHREAD_COND_CLOCK, &abs); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - if (ai.flows[fd].snd_timesout) { + if (flow->snd_timesout) { ts_add(&abs, &flow->snd_timeo, &abs); abstime = &abs; } @@ -848,40 +1271,71 @@ ssize_t flow_write(int fd, if ((flags & FLOWFACCMODE) == FLOWFRDONLY) return -EPERM; - if (flags & FLOWFWNOBLOCK) - idx = shm_rdrbuff_write(ai.rdrb, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - buf, - count); - else /* Blocking. */ - idx = shm_rdrbuff_write_b(ai.rdrb, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - buf, - count, - abstime); + if (flags & FLOWFWNOBLOCK) { + if (!frcti_is_window_open(flow->frcti)) + return -EAGAIN; + idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb); + } else { + ret = frcti_window_wait(flow->frcti, abstime); + if (ret < 0) + return ret; + idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime); + } + if (idx < 0) return idx; - if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) { - shm_rdrbuff_remove(ai.rdrb, idx); - return -ENOMEM; - } + if (count > 0) + memcpy(ptr, buf, count); - pthread_rwlock_rdlock(&ai.lock); + ret = flow_tx_sdb(flow, sdb, !(flags & FLOWFWNOBLOCK), abstime); - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); - if (ret < 0) - shm_rdrbuff_remove(ai.rdrb, idx); - else - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + return ret < 0 ? (ssize_t) ret : (ssize_t) count; +} + +static bool invalid_pkt(struct flow * flow, + struct shm_du_buff * sdb) +{ + if (shm_du_buff_len(sdb) == 0) + return true; + + if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0) + return true; + + if (crypt_decrypt(&flow->crypt, sdb) < 0) + return true; + + return false; +} + +static ssize_t flow_rx_sdb(struct flow * flow, + struct shm_du_buff ** sdb, + bool block, + struct timespec * abstime) +{ + ssize_t idx; + struct timespec now; + + idx = block ? shm_rbuff_read_b(flow->rx_rb, abstime) : + shm_rbuff_read(flow->rx_rb); + if (idx < 0) + return idx; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&ai.lock); + + flow->rcv_act = now; pthread_rwlock_unlock(&ai.lock); - assert(ret <= 0); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + if (invalid_pkt(flow, *sdb)) { + shm_rdrbuff_remove(ai.rdrb, idx); + return -EAGAIN; + } - return ret; + return idx; } ssize_t flow_read(int fd, @@ -890,78 +1344,101 @@ ssize_t flow_read(int fd, { ssize_t idx; ssize_t n; - uint8_t * sdu; - struct shm_rbuff * rb; + uint8_t * packet; struct shm_du_buff * sdb; struct timespec abs; + struct timespec now; struct timespec * abstime = NULL; struct flow * flow; - bool noblock; + bool block; bool partrd; - if (fd < 0 || fd > PROG_MAX_FLOWS) + if (fd < 0 || fd >= PROG_MAX_FLOWS) return -EBADF; flow = &ai.flows[fd]; - if (flow->part_idx == DONE_PART) { - flow->part_idx = NO_PART; - return 0; - } - - clock_gettime(PTHREAD_COND_CLOCK, &abs); + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - rb = flow->rx_rb; - noblock = flow->oflags & FLOWFRNOBLOCK; + if (flow->part_idx == DONE_PART) { + pthread_rwlock_unlock(&ai.lock); + flow->part_idx = NO_PART; + return 0; + } + + block = !(flow->oflags & FLOWFRNOBLOCK); partrd = !(flow->oflags & FLOWFRNOPART); - if (ai.flows[fd].rcv_timesout) { - ts_add(&abs, &flow->rcv_timeo, &abs); + if (flow->rcv_timesout) { + ts_add(&now, &flow->rcv_timeo, &abs); abstime = &abs; } - pthread_rwlock_unlock(&ai.lock); - idx = flow->part_idx; if (idx < 0) { - idx = frcti_queued_pdu(flow->frcti); - if (idx < 0) { - do { - idx = noblock ? shm_rbuff_read(rb) : - shm_rbuff_read_b(rb, abstime); - if (idx < 0) + while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { + pthread_rwlock_unlock(&ai.lock); + + idx = flow_rx_sdb(flow, &sdb, block, abstime); + if (idx < 0) { + if (block && idx != -EAGAIN) return idx; - sdb = shm_rdrbuff_get(ai.rdrb, idx); - } while (frcti_rcv(flow->frcti, sdb) != 0); + if (!block) + return idx; + + pthread_rwlock_rdlock(&ai.lock); + continue; + } + + pthread_rwlock_rdlock(&ai.lock); + + frcti_rcv(flow->frcti, sdb); } } - n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + pthread_rwlock_unlock(&ai.lock); + + packet = shm_du_buff_head(sdb); + + n = shm_du_buff_len(sdb); assert(n >= 0); if (n <= (ssize_t) count) { - memcpy(buf, sdu, n); - shm_rdrbuff_remove(ai.rdrb, idx); + memcpy(buf, packet, n); + ipcp_sdb_release(sdb); + + pthread_rwlock_wrlock(&ai.lock); + flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; + + flow->rcv_act = now; + + pthread_rwlock_unlock(&ai.lock); return n; } else { if (partrd) { - memcpy(buf, sdu, count); - sdb = shm_rdrbuff_get(ai.rdrb, idx); + memcpy(buf, packet, count); shm_du_buff_head_release(sdb, n); + pthread_rwlock_wrlock(&ai.lock); flow->part_idx = idx; + + flow->rcv_act = now; + + pthread_rwlock_unlock(&ai.lock); return count; } else { - shm_rdrbuff_remove(ai.rdrb, idx); + ipcp_sdb_release(sdb); return -EMSGSIZE; } } @@ -969,26 +1446,31 @@ ssize_t flow_read(int fd, /* fqueue functions. */ -struct flow_set * fset_create() +struct flow_set * fset_create(void) { - struct flow_set * set = malloc(sizeof(*set)); + struct flow_set * set; + + set = malloc(sizeof(*set)); if (set == NULL) - return NULL; + goto fail_malloc; assert(ai.fqueues); pthread_rwlock_wrlock(&ai.lock); set->idx = bmp_allocate(ai.fqueues); - if (!bmp_is_id_valid(ai.fqueues, set->idx)) { - pthread_rwlock_unlock(&ai.lock); - free(set); - return NULL; - } + if (!bmp_is_id_valid(ai.fqueues, set->idx)) + goto fail_bmp_alloc; pthread_rwlock_unlock(&ai.lock); return set; + + fail_bmp_alloc: + pthread_rwlock_unlock(&ai.lock); + free(set); + fail_malloc: + return NULL; } void fset_destroy(struct flow_set * set) @@ -1007,13 +1489,13 @@ void fset_destroy(struct flow_set * set) free(set); } -struct fqueue * fqueue_create() +struct fqueue * fqueue_create(void) { struct fqueue * fq = malloc(sizeof(*fq)); if (fq == NULL) return NULL; - memset(fq->fqueue, -1, (SHM_BUFFER_SIZE) * sizeof(*fq->fqueue)); + memset(fq->fqueue, -1, SHM_BUFFER_SIZE * sizeof(*fq->fqueue)); fq->fqsize = 0; fq->next = 0; @@ -1022,9 +1504,6 @@ struct fqueue * fqueue_create() void fqueue_destroy(struct fqueue * fq) { - if (fq == NULL) - return; - free(fq); } @@ -1039,36 +1518,57 @@ void fset_zero(struct flow_set * set) int fset_add(struct flow_set * set, int fd) { - int ret; - size_t sdus; - size_t i; + struct flow * flow; + int ret; - if (set == NULL || fd < 0 || fd > PROG_MAX_FLOWS) + if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return -EINVAL; - pthread_rwlock_wrlock(&ai.lock); + flow = &ai.flows[fd]; - ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + pthread_rwlock_rdlock(&ai.lock); - sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); - for (i = 0; i < sdus; i++) - shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + if (flow->info.id < 0) { + ret = -EINVAL; + goto fail; + } + + if (flow->frcti != NULL) + shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); + + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id); + if (ret < 0) + goto fail; + + if (shm_rbuff_queued(ai.flows[fd].rx_rb)) + shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); return ret; + + fail: + pthread_rwlock_unlock(&ai.lock); + return ret; } void fset_del(struct flow_set * set, int fd) { - if (set == NULL || fd < 0 || fd > PROG_MAX_FLOWS) + struct flow * flow; + + if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return; - pthread_rwlock_wrlock(&ai.lock); + flow = &ai.flows[fd]; - if (ai.flows[fd].port_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + pthread_rwlock_rdlock(&ai.lock); + + if (flow->info.id >= 0) + shm_flow_set_del(ai.fqset, set->idx, flow->info.id); + + if (flow->frcti != NULL) + shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id); pthread_rwlock_unlock(&ai.lock); } @@ -1076,262 +1576,324 @@ void fset_del(struct flow_set * set, bool fset_has(const struct flow_set * set, int fd) { - bool ret = false; + bool ret; - if (set == NULL || fd < 0) + if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return false; pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (ai.flows[fd].info.id < 0) { pthread_rwlock_unlock(&ai.lock); return false; } - ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1); pthread_rwlock_unlock(&ai.lock); return ret; } +/* Filter fqueue events for non-data packets */ +static int fqueue_filter(struct fqueue * fq) +{ + struct shm_du_buff * sdb; + int fd; + ssize_t idx; + struct frcti * frcti; + + while (fq->next < fq->fqsize) { + if (fq->fqueue[fq->next].event != FLOW_PKT) + return 1; + + pthread_rwlock_rdlock(&ai.lock); + + fd = ai.id_to_fd[fq->fqueue[fq->next].flow_id].fd; + if (fd < 0) { + ++fq->next; + pthread_rwlock_unlock(&ai.lock); + continue; + } + + frcti = ai.flows[fd].frcti; + if (frcti == NULL) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + if (__frcti_pdu_ready(frcti) >= 0) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + pthread_rwlock_unlock(&ai.lock); + + idx = flow_rx_sdb(&ai.flows[fd], &sdb, false, NULL); + if (idx < 0) + return 0; + + pthread_rwlock_rdlock(&ai.lock); + + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + __frcti_rcv(frcti, sdb); + + if (__frcti_pdu_ready(frcti) >= 0) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + pthread_rwlock_unlock(&ai.lock); + + ++fq->next; + } + + return 0; +} + int fqueue_next(struct fqueue * fq) { - int fd; + int fd; + struct flowevent * e; if (fq == NULL) return -EINVAL; - if (fq->fqsize == 0) + if (fq->fqsize == 0 || fq->next == fq->fqsize) + return -EPERM; + + if (fq->next != 0 && fqueue_filter(fq) == 0) return -EPERM; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[fq->fqueue[fq->next++]].fd; + e = fq->fqueue + fq->next; - pthread_rwlock_unlock(&ai.lock); + fd = ai.id_to_fd[e->flow_id].fd; - if (fq->next == fq->fqsize) { - fq->fqsize = 0; - fq->next = 0; - } + ++fq->next; + + pthread_rwlock_unlock(&ai.lock); return fd; } -int fevent(struct flow_set * set, - struct fqueue * fq, - const struct timespec * timeo) +enum fqtype fqueue_type(struct fqueue * fq) +{ + if (fq == NULL) + return -EINVAL; + + if (fq->fqsize == 0 || fq->next == 0) + return -EPERM; + + return fq->fqueue[(fq->next - 1)].event; +} + +ssize_t fevent(struct flow_set * set, + struct fqueue * fq, + const struct timespec * timeo) { - ssize_t ret; - struct timespec abstime; + ssize_t ret = 0; + struct timespec abs; struct timespec * t = NULL; if (set == NULL || fq == NULL) return -EINVAL; - if (fq->fqsize > 0) - return fq->fqsize; + if (fq->fqsize > 0 && fq->next != fq->fqsize) + return 1; - assert(!fq->next); + clock_gettime(PTHREAD_COND_CLOCK, &abs); if (timeo != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeo, &abstime); - t = &abstime; + ts_add(&abs, timeo, &abs); + t = &abs; } - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); - if (ret == -ETIMEDOUT) { - fq->fqsize = 0; - return -ETIMEDOUT; - } + while (ret == 0) { + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); + if (ret == -ETIMEDOUT) + return -ETIMEDOUT; - fq->fqsize = ret; + fq->fqsize = ret; + fq->next = 0; - assert(ret); + ret = fqueue_filter(fq); + } - return ret; + assert(ret != 0); + + return 1; } /* ipcp-dev functions. */ -int np1_flow_alloc(pid_t n_pid, - int port_id, - qoscube_t qc) +int np1_flow_alloc(pid_t n_pid, + int flow_id) { - return flow_init(port_id, n_pid, qc); + struct flow_info flow; + + memset(&flow, 0, sizeof(flow)); + + flow.id = flow_id; + flow.n_pid = getpid(); + flow.qs = qos_np1; + flow.mpl = 0; + flow.n_1_pid = n_pid; /* This "flow" is upside-down! */ + + return flow_init(&flow, NULL); } -int np1_flow_dealloc(int port_id) +int np1_flow_dealloc(int flow_id, + time_t timeo) { int fd; + /* + * TODO: Don't pass timeo to the IPCP but wait in IRMd. + * This will need async ops, waiting until we bootstrap + * the IRMd over ouroboros. + */ + + sleep(timeo); + pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[port_id].fd; + fd = ai.id_to_fd[flow_id].fd; pthread_rwlock_unlock(&ai.lock); return fd; } -int np1_flow_resp(int port_id) +int np1_flow_resp(int flow_id) { int fd; - if (port_wait_assign(port_id) != PORT_ID_ASSIGNED) + if (flow_wait_assign(flow_id) != FLOW_ALLOCATED) return -1; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[port_id].fd; + fd = ai.id_to_fd[flow_id].fd; pthread_rwlock_unlock(&ai.lock); return fd; } -int ipcp_create_r(pid_t pid, - int result) +int ipcp_create_r(const struct ipcp_info * info) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.has_pid = true; - msg.pid = pid; - msg.has_result = true; - msg.result = result; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); + if (ipcp_create_r__irm_req_ser(&msg,info) < 0) + return -ENOMEM; - return ret; + err = send_recv_msg(&msg); + if (err < 0) + return err; + + return irm__irm_result_des(&msg); } -int ipcp_flow_req_arr(pid_t pid, - const uint8_t * dst, - size_t len, - qoscube_t qc) +int ipcp_flow_req_arr(const buffer_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int fd = -1; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - if (dst == NULL) - return -EINVAL; - - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_pid = true; - msg.pid = pid; - msg.has_hash = true; - msg.hash.len = len; - msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = qc; + memset(&flow, 0, sizeof(flow)); - recv_msg = send_recv_irm_msg(&msg); + assert(dst != NULL && dst->len != 0 && dst->data != NULL); - if (recv_msg == NULL) - return -EIRMD; + flow.n_1_pid = getpid(); + flow.qs = qs; + flow.mpl = mpl; - if (!recv_msg->has_port_id || !recv_msg->has_pid) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0) + return -ENOMEM; - if (recv_msg->has_result && recv_msg->result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + err = send_recv_msg(&msg); + if (err < 0) + return err; - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + err = flow__irm_result_des(&msg, &flow, NULL); + if (err < 0) + return err; - irm_msg__free_unpacked(recv_msg, NULL); + /* inverted for np1_flow */ + flow.n_1_pid = flow.n_pid; + flow.n_pid = getpid(); + flow.mpl = 0; - return fd; + return flow_init(&flow, NULL); } -int ipcp_flow_alloc_reply(int fd, - int response) +int ipcp_flow_alloc_reply(int fd, + int response, + time_t mpl, + const buffer_t * data) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.has_port_id = true; + assert(fd >= 0 && fd < SYS_MAX_FLOWS); pthread_rwlock_rdlock(&ai.lock); - msg.port_id = ai.flows[fd].port_id; + flow.id = ai.flows[fd].info.id; pthread_rwlock_unlock(&ai.lock); - msg.has_response = true; - msg.response = response; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + flow.mpl = mpl; - ret = recv_msg->result; + if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - return ret; + return irm__irm_result_des(&msg); } int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) { - struct flow * flow; - struct shm_rbuff * rb; - ssize_t idx; + struct flow * flow; + ssize_t idx = -1; - assert(fd >= 0); + assert(fd >= 0 && fd < SYS_MAX_FLOWS); assert(sdb); flow = &ai.flows[fd]; pthread_rwlock_rdlock(&ai.lock); - assert(flow->port_id >= 0); + assert(flow->info.id >= 0); - rb = flow->rx_rb; + while (frcti_queued_pdu(flow->frcti) < 0) { + pthread_rwlock_unlock(&ai.lock); - pthread_rwlock_unlock(&ai.lock); + idx = flow_rx_sdb(flow, sdb, false, NULL); + if (idx < 0) + return idx; - if (flow->frcti != NULL) { - idx = frcti_queued_pdu(flow->frcti); - if (idx >= 0) { - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - return 0; - } + pthread_rwlock_rdlock(&ai.lock); + + frcti_rcv(flow->frcti, *sdb); } - do { - idx = shm_rbuff_read(rb); - if (idx < 0) - return idx; - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - } while (frcti_rcv(flow->frcti, *sdb) != 0); + pthread_rwlock_unlock(&ai.lock); return 0; } @@ -1339,61 +1901,102 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - struct flow * flow; - int ret; - ssize_t idx; + struct flow * flow; + int ret; + assert(fd >= 0 && fd < SYS_MAX_FLOWS); assert(sdb); flow = &ai.flows[fd]; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); - assert(flow->port_id >= 0); + if (flow->info.id < 0) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; + } if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { pthread_rwlock_unlock(&ai.lock); return -EPERM; } - assert(flow->tx_rb); + pthread_rwlock_unlock(&ai.lock); - idx = shm_du_buff_get_idx(sdb); + ret = flow_tx_sdb(flow, sdb, true, NULL); + + return ret; +} - if (frcti_snd(flow->frcti, sdb) < 0) { +int np1_flow_read(int fd, + struct shm_du_buff ** sdb) +{ + struct flow * flow; + ssize_t idx = -1; + + assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(sdb); + + flow = &ai.flows[fd]; + + assert(flow->info.id >= 0); + + pthread_rwlock_rdlock(&ai.lock); + + idx = shm_rbuff_read(flow->rx_rb);; + if (idx < 0) { pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; + return idx; } - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); - if (ret == 0) - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - pthread_rwlock_unlock(&ai.lock); - assert(ret <= 0); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); - return ret; + return 0; } -int ipcp_sdb_reserve(struct shm_du_buff ** sdb, - size_t len) +int np1_flow_write(int fd, + struct shm_du_buff * sdb) { - ssize_t idx; + struct flow * flow; + int ret; + ssize_t idx; - idx = shm_rdrbuff_write_b(ai.rdrb, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - NULL, - len, - NULL); + assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(sdb); - if (idx < 0) - return -1; + flow = &ai.flows[fd]; - *sdb = shm_rdrbuff_get(ai.rdrb, idx); + pthread_rwlock_rdlock(&ai.lock); - return 0; + if (flow->info.id < 0) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; + } + + if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { + pthread_rwlock_unlock(&ai.lock); + return -EPERM; + } + + pthread_rwlock_unlock(&ai.lock); + + idx = shm_du_buff_get_idx(sdb); + + ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); + if (ret < 0) + shm_rdrbuff_remove(ai.rdrb, idx); + else + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + + return ret; +} + +int ipcp_sdb_reserve(struct shm_du_buff ** sdb, + size_t len) +{ + return shm_rdrbuff_alloc_b(ai.rdrb, len, NULL, sdb, NULL) < 0 ? -1 : 0; } void ipcp_sdb_release(struct shm_du_buff * sdb) @@ -1401,41 +2004,68 @@ void ipcp_sdb_release(struct shm_du_buff * sdb) shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); } -void ipcp_flow_fini(int fd) +int ipcp_flow_fini(int fd) { struct shm_rbuff * rx_rb; - assert(fd >= 0); - - fccntl(fd, FLOWSFLAGS, FLOWFWRONLY); + assert(fd >= 0 && fd < SYS_MAX_FLOWS); pthread_rwlock_rdlock(&ai.lock); + if (ai.flows[fd].info.id < 0) { + pthread_rwlock_unlock(&ai.lock); + return -1; + } + + shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); + + shm_flow_set_notify(ai.flows[fd].set, + ai.flows[fd].info.id, + FLOW_DEALLOC); + rx_rb = ai.flows[fd].rx_rb; pthread_rwlock_unlock(&ai.lock); if (rx_rb != NULL) shm_rbuff_fini(rx_rb); + + return 0; } int ipcp_flow_get_qoscube(int fd, qoscube_t * cube) { - assert(fd >= 0); + assert(fd >= 0 && fd < SYS_MAX_FLOWS); assert(cube); pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].port_id >= 0); + assert(ai.flows[fd].info.id >= 0); - *cube = ai.flows[fd].cube; + *cube = qos_spec_to_cube(ai.flows[fd].info.qs); pthread_rwlock_unlock(&ai.lock); return 0; } +size_t ipcp_flow_queued(int fd) +{ + size_t q; + + pthread_rwlock_rdlock(&ai.lock); + + assert(ai.flows[fd].info.id >= 0); + + q = shm_rbuff_queued(ai.flows[fd].tx_rb); + + pthread_rwlock_unlock(&ai.lock); + + return q; +} + ssize_t local_flow_read(int fd) { ssize_t ret; @@ -1454,20 +2084,25 @@ ssize_t local_flow_read(int fd) int local_flow_write(int fd, size_t idx) { - int ret; + struct flow * flow; + int ret; assert(fd >= 0); + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); if (ret == 0) - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + else + shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.lock); |
