diff options
Diffstat (limited to 'src/lib/dev.c')
| -rw-r--r-- | src/lib/dev.c | 1795 |
1 files changed, 963 insertions, 832 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 8a723b63..9cfc24ee 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2026 * * API for applications * @@ -27,29 +27,38 @@ #endif #include "config.h" +#include "ssm.h" -#include <ouroboros/hash.h> -#include <ouroboros/cacep.h> -#include <ouroboros/errno.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/cep.h> +#include <ouroboros/crypt.h> #include <ouroboros/dev.h> +#include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> +#include <ouroboros/flow.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/list.h> #include <ouroboros/local-dev.h> -#include <ouroboros/sockets.h> -#include <ouroboros/fccntl.h> -#include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> #include <ouroboros/pthread.h> #include <ouroboros/random.h> -#include <ouroboros/shm_flow_set.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_rbuff.h> +#include <ouroboros/serdes-irm.h> +#include <ouroboros/ssm_flow_set.h> +#include <ouroboros/ssm_pool.h> +#include <ouroboros/ssm_rbuff.h> +#include <ouroboros/sockets.h> #include <ouroboros/utils.h> -#include <ouroboros/fqueue.h> #ifdef PROC_FLOW_STATS #include <ouroboros/rib.h> #endif +#ifdef HAVE_LIBGCRYPT +#include <gcrypt.h> +#endif +#include <assert.h> #include <stdlib.h> #include <string.h> #include <stdio.h> @@ -67,23 +76,13 @@ #define CRCLEN (sizeof(uint32_t)) #define SECMEMSZ 16384 -#define SYMMKEYSZ 32 #define MSGBUFSZ 2048 -enum port_state { - PORT_NULL = 0, - PORT_INIT, - PORT_ID_PENDING, - PORT_ID_ASSIGNED, - PORT_DESTROY -}; - -struct port { +/* map flow_ids to flow descriptors; track state of the flow */ +struct fmap { int fd; - - enum port_state state; - pthread_mutex_t state_lock; - pthread_cond_t state_cond; + /* TODO: use actual flow state */ + enum flow_state state; }; #define frcti_to_flow(frcti) \ @@ -92,18 +91,18 @@ struct port { struct flow { struct list_head next; - struct shm_rbuff * rx_rb; - struct shm_rbuff * tx_rb; - struct shm_flow_set * set; - int flow_id; + struct flow_info info; + + struct ssm_rbuff * rx_rb; + struct ssm_rbuff * tx_rb; + struct ssm_flow_set * set; + uint16_t oflags; - qosspec_t qs; ssize_t part_idx; - void * ctx; - uint8_t key[SYMMKEYSZ]; - - pid_t pid; + struct crypt_ctx * crypt; + int headsz; /* IV */ + int tailsz; /* Tag + CRC */ struct timespec snd_act; struct timespec rcv_act; @@ -122,137 +121,208 @@ struct flow_set { }; struct fqueue { - struct portevent fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */ + struct flowevent fqueue[SSM_RBUFF_SIZE]; /* Safe copy from shm. */ size_t fqsize; size_t next; }; struct { - struct shm_rdrbuff * rdrb; - struct shm_flow_set * fqset; + struct ssm_pool * pool; + struct ssm_flow_set * fqset; struct bmp * fds; struct bmp * fqueues; struct flow * flows; - struct port * ports; + struct fmap * id_to_fd; struct list_head flow_list; + pthread_mutex_t mtx; + pthread_cond_t cond; + pthread_t tx; pthread_t rx; size_t n_frcti; fset_t * frct_set; pthread_rwlock_t lock; -} ai; +} proc; -static void port_destroy(struct port * p) +static void flow_destroy(struct fmap * p) { - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&proc.mtx); - if (p->state == PORT_DESTROY) { - pthread_mutex_unlock(&p->state_lock); + if (p->state == FLOW_DESTROY) { + pthread_mutex_unlock(&proc.mtx); return; } - if (p->state == PORT_ID_PENDING) - p->state = PORT_DESTROY; + if (p->state == FLOW_ALLOC_PENDING) + p->state = FLOW_DESTROY; else - p->state = PORT_NULL; + p->state = FLOW_NULL; - pthread_cond_signal(&p->state_cond); + pthread_cond_signal(&proc.cond); - while (p->state != PORT_NULL) - pthread_cond_wait(&p->state_cond, &p->state_lock); + pthread_cleanup_push(__cleanup_mutex_unlock, &proc.mtx); + + while (p->state != FLOW_NULL) + pthread_cond_wait(&proc.cond, &proc.mtx); p->fd = -1; - p->state = PORT_INIT; + p->state = FLOW_INIT; - pthread_mutex_unlock(&p->state_lock); + pthread_cleanup_pop(true); } -static void port_set_state(struct port * p, - enum port_state state) +static void flow_set_state(struct fmap * p, + enum flow_state state) { - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&proc.mtx); - if (p->state == PORT_DESTROY) { - pthread_mutex_unlock(&p->state_lock); + if (p->state == FLOW_DESTROY) { + pthread_mutex_unlock(&proc.mtx); return; } p->state = state; - pthread_cond_broadcast(&p->state_cond); + pthread_cond_broadcast(&proc.cond); - pthread_mutex_unlock(&p->state_lock); + pthread_mutex_unlock(&proc.mtx); } -static enum port_state port_wait_assign(int flow_id) +static enum flow_state flow_wait_assign(int flow_id) { - enum port_state state; - struct port * p; + enum flow_state state; + struct fmap * p; - p = &ai.ports[flow_id]; + p = &proc.id_to_fd[flow_id]; - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&proc.mtx); - if (p->state == PORT_ID_ASSIGNED) { - pthread_mutex_unlock(&p->state_lock); - return PORT_ID_ASSIGNED; + if (p->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&proc.mtx); + return FLOW_ALLOCATED; } - if (p->state == PORT_INIT) - p->state = PORT_ID_PENDING; + if (p->state == FLOW_INIT) + p->state = FLOW_ALLOC_PENDING; - while (p->state == PORT_ID_PENDING) - pthread_cond_wait(&p->state_cond, &p->state_lock); + pthread_cleanup_push(__cleanup_mutex_unlock, &proc.mtx); - if (p->state == PORT_DESTROY) { - p->state = PORT_NULL; - pthread_cond_broadcast(&p->state_cond); + while (p->state == FLOW_ALLOC_PENDING) + pthread_cond_wait(&proc.cond, &proc.mtx); + + if (p->state == FLOW_DESTROY) { + p->state = FLOW_NULL; + pthread_cond_broadcast(&proc.cond); } state = p->state; - assert(state != PORT_INIT); + pthread_cleanup_pop(true); - pthread_mutex_unlock(&p->state_lock); + assert(state != FLOW_INIT); return state; } -static int proc_announce(char * prog) +static int proc_announce(const struct proc_info * proc) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret = -1; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + int err; - msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE; - msg.has_pid = true; - msg.pid = getpid(); - msg.prog = prog; + if (proc_announce__irm_req_ser(&msg, proc) < 0) + return -ENOMEM; - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) { - return -EIRMD; - } + err = send_recv_msg(&msg); + if (err < 0) + return err; - if (!recv_msg->has_result || (ret = recv_msg->result)) { - irm_msg__free_unpacked(recv_msg, NULL); - return ret; - } + return irm__irm_result_des(&msg); +} + +/* IRMd will clean up the mess if this fails */ +static void proc_exit(void) +{ + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; - irm_msg__free_unpacked(recv_msg, NULL); + if (proc_exit__irm_req_ser(&msg) < 0) + return; - return ret; + send_recv_msg(&msg); +} + +static int spb_encrypt(struct flow * flow, + struct ssm_pk_buff * spb) +{ + buffer_t in; + buffer_t out; + uint8_t * head; + uint8_t * tail; + + if (flow->crypt == NULL) + return 0; /* No encryption */ + + in.data = ssm_pk_buff_head(spb); + in.len = ssm_pk_buff_len(spb); + + if (crypt_encrypt(flow->crypt, in, &out) < 0) + goto fail_encrypt; + + head = ssm_pk_buff_head_alloc(spb, flow->headsz); + if (head == NULL) + goto fail_alloc; + + tail = ssm_pk_buff_tail_alloc(spb, flow->tailsz); + if (tail == NULL) + goto fail_alloc; + + memcpy(head, out.data, out.len); + + freebuf(out); + + return 0; + fail_alloc: + freebuf(out); + fail_encrypt: + return -ECRYPT; +} + +static int spb_decrypt(struct flow * flow, + struct ssm_pk_buff * spb) +{ + buffer_t in; + buffer_t out; + uint8_t * head; + + if (flow->crypt == NULL) + return 0; /* No decryption */ + + in.data = ssm_pk_buff_head(spb); + in.len = ssm_pk_buff_len(spb); + + if (crypt_decrypt(flow->crypt, in, &out) < 0) + return -ENOMEM; + + + head = ssm_pk_buff_head_release(spb, flow->headsz) + flow->headsz; + ssm_pk_buff_tail_release(spb, flow->tailsz); + + memcpy(head, out.data, out.len); + + freebuf(out); + + return 0; } -#include "crypt.c" #include "frct.c" void * flow_tx(void * o) { - struct timespec tic = {0, TICTIME}; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); (void) o; @@ -268,60 +338,60 @@ void * flow_tx(void * o) static void flow_send_keepalive(struct flow * flow, struct timespec now) { - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; ssize_t idx; uint8_t * ptr; - idx = shm_rdrbuff_alloc(ai.rdrb, 0, &ptr, &sdb); + idx = ssm_pool_alloc(proc.pool, 0, &ptr, &spb); if (idx < 0) return; - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); flow->snd_act = now; - if (shm_rbuff_write(flow->tx_rb, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); + if (ssm_rbuff_write(flow->tx_rb, idx)) + ssm_pool_remove(proc.pool, idx); else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); } -/* Needs rdlock on ai. */ +/* Needs rdlock on proc. */ static void _flow_keepalive(struct flow * flow) { struct timespec now; struct timespec s_act; struct timespec r_act; int flow_id; - uint32_t timeo; + time_t timeo; uint32_t acl; s_act = flow->snd_act; r_act = flow->rcv_act; - flow_id = flow->flow_id; - timeo = flow->qs.timeout; + flow_id = flow->info.id; + timeo = flow->info.qs.timeout; - acl = shm_rbuff_get_acl(flow->rx_rb); - if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) + acl = ssm_rbuff_get_acl(flow->rx_rb); + if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) return; clock_gettime(PTHREAD_COND_CLOCK, &now); - if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { - shm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); - shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); + if (ts_diff_ns(&now, &r_act) > (int64_t) timeo * MILLION) { + ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); + ssm_flow_set_notify(proc.fqset, flow_id, FLOW_PEER); return; } - if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) { - pthread_rwlock_unlock(&ai.lock); + if (ts_diff_ns(&now, &s_act) > (int64_t) timeo * (MILLION >> 2)) { + pthread_rwlock_unlock(&proc.lock); flow_send_keepalive(flow, now); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); } } @@ -330,15 +400,15 @@ static void handle_keepalives(void) struct list_head * p; struct list_head * h; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - list_for_each_safe(p, h, &ai.flow_list) { + list_for_each_safe(p, h, &proc.flow_list) { struct flow * flow; flow = list_entry(p, struct flow, next); _flow_keepalive(flow); } - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); } static void __cleanup_fqueue_destroy(void * fq) @@ -348,7 +418,7 @@ static void __cleanup_fqueue_destroy(void * fq) void * flow_rx(void * o) { - struct timespec tic = {0, TICTIME}; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); int ret; struct fqueue * fq; @@ -359,7 +429,7 @@ void * flow_rx(void * o) pthread_cleanup_push(__cleanup_fqueue_destroy, fq); /* fevent will filter all FRCT packets for us */ - while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) { + while ((ret = fevent(proc.frct_set, fq, &tic)) != 0) { if (ret == -ETIMEDOUT) { handle_keepalives(); continue; @@ -376,63 +446,69 @@ void * flow_rx(void * o) static void flow_clear(int fd) { - memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); + memset(&proc.flows[fd], 0, sizeof(proc.flows[fd])); - ai.flows[fd].flow_id = -1; - ai.flows[fd].pid = -1; + proc.flows[fd].info.id = -1; } -static void flow_fini(int fd) +static void __flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); - if (ai.flows[fd].frcti != NULL) { - ai.n_frcti--; - if (ai.n_frcti == 0) { - pthread_cancel(ai.tx); - pthread_join(ai.tx, NULL); + if (proc.flows[fd].frcti != NULL) { + proc.n_frcti--; + if (proc.n_frcti == 0) { + pthread_cancel(proc.tx); + pthread_join(proc.tx, NULL); } - shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); + ssm_flow_set_del(proc.fqset, 0, proc.flows[fd].info.id); - frcti_destroy(ai.flows[fd].frcti); + frcti_destroy(proc.flows[fd].frcti); } - if (ai.flows[fd].flow_id != -1) { - port_destroy(&ai.ports[ai.flows[fd].flow_id]); - bmp_release(ai.fds, fd); + if (proc.flows[fd].info.id != -1) { + flow_destroy(&proc.id_to_fd[proc.flows[fd].info.id]); + bmp_release(proc.fds, fd); } - if (ai.flows[fd].rx_rb != NULL) { - shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); - shm_rbuff_close(ai.flows[fd].rx_rb); + if (proc.flows[fd].rx_rb != NULL) { + ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN); + ssm_rbuff_close(proc.flows[fd].rx_rb); } - if (ai.flows[fd].tx_rb != NULL) { - shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); - shm_rbuff_close(ai.flows[fd].tx_rb); + if (proc.flows[fd].tx_rb != NULL) { + ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN); + ssm_rbuff_close(proc.flows[fd].tx_rb); } - if (ai.flows[fd].set != NULL) { - shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].flow_id, + if (proc.flows[fd].set != NULL) { + ssm_flow_set_notify(proc.flows[fd].set, + proc.flows[fd].info.id, FLOW_DEALLOC); - shm_flow_set_close(ai.flows[fd].set); + ssm_flow_set_close(proc.flows[fd].set); } - if (ai.flows[fd].ctx != NULL) - crypt_fini(ai.flows[fd].ctx); + crypt_destroy_ctx(proc.flows[fd].crypt); - list_del(&ai.flows[fd].next); + list_del(&proc.flows[fd].next); flow_clear(fd); } -static int flow_init(int flow_id, - pid_t pid, - qosspec_t qs, - uint8_t * s, - time_t mpl) +static void flow_fini(int fd) +{ + pthread_rwlock_wrlock(&proc.lock); + + __flow_fini(fd); + + pthread_rwlock_unlock(&proc.lock); +} + +#define IS_ENCRYPTED(crypt) ((crypt)->nid != NID_undef) +#define IS_ORDERED(flow) (flow.qs.in_order != 0) +static int flow_init(struct flow_info * info, + struct crypt_sk * sk) { struct timespec now; struct flow * flow; @@ -441,86 +517,90 @@ static int flow_init(int flow_id, clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); - fd = bmp_allocate(ai.fds); - if (!bmp_is_id_valid(ai.fds, fd)) { + fd = bmp_allocate(proc.fds); + if (!bmp_is_id_valid(proc.fds, fd)) { err = -EBADF; goto fail_fds; } - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; - flow->rx_rb = shm_rbuff_open(getpid(), flow_id); + flow->info = *info; + + flow->rx_rb = ssm_rbuff_open(info->n_pid, info->id); if (flow->rx_rb == NULL) goto fail_rx_rb; - flow->tx_rb = shm_rbuff_open(pid, flow_id); + flow->tx_rb = ssm_rbuff_open(info->n_1_pid, info->id); if (flow->tx_rb == NULL) goto fail_tx_rb; - flow->set = shm_flow_set_open(pid); + flow->set = ssm_flow_set_open(info->n_1_pid); if (flow->set == NULL) goto fail_set; - flow->flow_id = flow_id; flow->oflags = FLOWFDEFAULT; - flow->pid = pid; flow->part_idx = NO_PART; - flow->qs = qs; flow->snd_act = now; flow->rcv_act = now; + flow->crypt = NULL; + flow->headsz = 0; + flow->tailsz = 0; - if (qs.cypher_s > 0) { - assert(s != NULL); - if (crypt_init(&flow->ctx) < 0) - goto fail_ctx; - - memcpy(flow->key, s, SYMMKEYSZ); + if (IS_ENCRYPTED(sk)) { + /* Set to lower value in tests, should we make configurable? */ + sk->rot_bit = KEY_ROTATION_BIT; + flow->crypt = crypt_create_ctx(sk); + if (flow->crypt == NULL) + goto fail_crypt; + flow->headsz = crypt_get_ivsz(flow->crypt); + flow->tailsz = crypt_get_tagsz(flow->crypt); } assert(flow->frcti == NULL); - if (flow->qs.in_order != 0) { - flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl); + if (IS_ORDERED(flow->info)) { + flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl); if (flow->frcti == NULL) goto fail_frcti; - if (shm_flow_set_add(ai.fqset, 0, flow_id)) + if (ssm_flow_set_add(proc.fqset, 0, info->id)) goto fail_flow_set_add; - ++ai.n_frcti; - if (ai.n_frcti == 1 && - pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0) + ++proc.n_frcti; + if (proc.n_frcti == 1 && + pthread_create(&proc.tx, NULL, flow_tx, NULL) < 0) goto fail_tx_thread; } - list_add_tail(&flow->next, &ai.flow_list); + list_add_tail(&flow->next, &proc.flow_list); - ai.ports[flow_id].fd = fd; + proc.id_to_fd[info->id].fd = fd; - port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED); + flow_set_state(&proc.id_to_fd[info->id], FLOW_ALLOCATED); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return fd; fail_tx_thread: - shm_flow_set_del(ai.fqset, 0, flow_id); + ssm_flow_set_del(proc.fqset, 0, info->id); fail_flow_set_add: frcti_destroy(flow->frcti); fail_frcti: - crypt_fini(flow->ctx); - fail_ctx: - shm_flow_set_close(flow->set); + crypt_destroy_ctx(flow->crypt); + fail_crypt: + ssm_flow_set_close(flow->set); fail_set: - shm_rbuff_close(flow->tx_rb); + ssm_rbuff_close(flow->tx_rb); fail_tx_rb: - shm_rbuff_close(flow->rx_rb); + ssm_rbuff_close(flow->rx_rb); fail_rx_rb: - bmp_release(ai.fds, fd); + bmp_release(proc.fds, fd); fail_fds: - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return err; } @@ -538,6 +618,7 @@ static void init(int argc, char ** argv, char ** envp) { + struct proc_info info; char * prog = argv[0]; int i; #ifdef PROC_FLOW_STATS @@ -549,85 +630,123 @@ static void init(int argc, if (check_python(argv[0])) prog = argv[1]; + prog = path_strip(prog); + if (prog == NULL) { + fprintf(stderr, "FATAL: Could not determine program name.\n"); + goto fail_prog; + } + + memset(&info, 0, sizeof(info)); + info.pid = getpid(); + strncpy(info.prog, prog, PROG_NAME_SIZE); + + if (proc_announce(&info)) { + fprintf(stderr, "FATAL: Could not announce to IRMd.\n"); + goto fail_prog; + } + #ifdef HAVE_LIBGCRYPT if (!gcry_control(GCRYCTL_INITIALIZATION_FINISHED_P)) { - if (!gcry_check_version(GCRYPT_VERSION)) - goto fail_fds; + if (!gcry_check_version(GCRYPT_VERSION)) { + fprintf(stderr, "FATAL: Could not get gcry version.\n"); + goto fail_prog; + } gcry_control(GCRYCTL_DISABLE_SECMEM, 0); gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0); } #endif - list_head_init(&ai.flow_list); - - ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS); - if (ai.fds == NULL) + proc.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS); + if (proc.fds == NULL) { + fprintf(stderr, "FATAL: Could not create fd bitmap.\n"); goto fail_fds; + } - ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0); - if (ai.fqueues == NULL) + proc.fqueues = bmp_create(PROG_MAX_FQUEUES, 0); + if (proc.fqueues == NULL) { + fprintf(stderr, "FATAL: Could not create fqueue bitmap.\n"); goto fail_fqueues; + } - ai.rdrb = shm_rdrbuff_open(); - if (ai.rdrb == NULL) + if (is_ouroboros_member_uid(getuid())) + proc.pool = ssm_pool_open(0); + else + proc.pool = ssm_pool_open(getuid()); + + if (proc.pool == NULL) { + fprintf(stderr, "FATAL: Could not open packet buffer.\n"); goto fail_rdrb; + } - ai.flows = malloc(sizeof(*ai.flows) * PROG_MAX_FLOWS); - if (ai.flows == NULL) + proc.flows = malloc(sizeof(*proc.flows) * PROG_MAX_FLOWS); + if (proc.flows == NULL) { + fprintf(stderr, "FATAL: Could not malloc flows.\n"); goto fail_flows; + } for (i = 0; i < PROG_MAX_FLOWS; ++i) flow_clear(i); - ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS); - if (ai.ports == NULL) - goto fail_ports; + proc.id_to_fd = malloc(sizeof(*proc.id_to_fd) * SYS_MAX_FLOWS); + if (proc.id_to_fd == NULL) { + fprintf(stderr, "FATAL: Could not malloc id_to_fd.\n"); + goto fail_id_to_fd; + } + + for (i = 0; i < SYS_MAX_FLOWS; ++i) + proc.id_to_fd[i].state = FLOW_INIT; - if (prog != NULL) { - prog = path_strip(prog); - if (proc_announce(prog)) - goto fail_announce; + if (pthread_mutex_init(&proc.mtx, NULL)) { + fprintf(stderr, "FATAL: Could not init mutex.\n"); + goto fail_mtx; } - for (i = 0; i < SYS_MAX_FLOWS; ++i) { - ai.ports[i].state = PORT_INIT; - if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) { - int j; - for (j = 0; j < i; ++j) - pthread_mutex_destroy(&ai.ports[j].state_lock); - goto fail_announce; - } - if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) { - int j; - for (j = 0; j < i; ++j) - pthread_cond_destroy(&ai.ports[j].state_cond); - goto fail_state_cond; - } + if (pthread_cond_init(&proc.cond, NULL) < 0) { + fprintf(stderr, "FATAL: Could not init condvar.\n"); + goto fail_cond; } - if (pthread_rwlock_init(&ai.lock, NULL)) - goto fail_lock; + if (pthread_rwlock_init(&proc.lock, NULL) < 0) { + fprintf(stderr, "FATAL: Could not initialize flow lock.\n"); + goto fail_flow_lock; + } - ai.fqset = shm_flow_set_open(getpid()); - if (ai.fqset == NULL) + proc.fqset = ssm_flow_set_open(getpid()); + if (proc.fqset == NULL) { + fprintf(stderr, "FATAL: Could not open flow set.\n"); goto fail_fqset; + } - ai.frct_set = fset_create(); - if (ai.frct_set == NULL || ai.frct_set->idx != 0) + proc.frct_set = fset_create(); + if (proc.frct_set == NULL || proc.frct_set->idx != 0) { + fprintf(stderr, "FATAL: Could not create FRCT set.\n"); goto fail_frct_set; + } - if (timerwheel_init() < 0) + if (timerwheel_init() < 0) { + fprintf(stderr, "FATAL: Could not initialize timerwheel.\n"); goto fail_timerwheel; + } + + if (crypt_secure_malloc_init(PROC_SECMEM_MAX) < 0) { + fprintf(stderr, "FATAL: Could not init secure malloc.\n"); + goto fail_timerwheel; + } #if defined PROC_FLOW_STATS if (strstr(argv[0], "ipcpd") == NULL) { sprintf(procstr, "proc.%d", getpid()); - /* Don't bail on fail, it just won't show metrics */ - if (rib_init(procstr) < 0) + if (rib_init(procstr) < 0) { + fprintf(stderr, "FATAL: Could not initialize RIB.\n"); goto fail_rib_init; + } } #endif - if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0) + if (pthread_create(&proc.rx, NULL, flow_rx, NULL) < 0) { + fprintf(stderr, "FATAL: Could not start monitor thread.\n"); goto fail_monitor; + } + + list_head_init(&proc.flow_list); return; @@ -638,80 +757,81 @@ static void init(int argc, #endif timerwheel_fini(); fail_timerwheel: - fset_destroy(ai.frct_set); + fset_destroy(proc.frct_set); fail_frct_set: - shm_flow_set_close(ai.fqset); + ssm_flow_set_close(proc.fqset); fail_fqset: - pthread_rwlock_destroy(&ai.lock); - fail_lock: - for (i = 0; i < SYS_MAX_FLOWS; ++i) - pthread_cond_destroy(&ai.ports[i].state_cond); - fail_state_cond: - for (i = 0; i < SYS_MAX_FLOWS; ++i) - pthread_mutex_destroy(&ai.ports[i].state_lock); - fail_announce: - free(ai.ports); - fail_ports: - free(ai.flows); + pthread_rwlock_destroy(&proc.lock); + fail_flow_lock: + pthread_cond_destroy(&proc.cond); + fail_cond: + pthread_mutex_destroy(&proc.mtx); + fail_mtx: + free(proc.id_to_fd); + fail_id_to_fd: + free(proc.flows); fail_flows: - shm_rdrbuff_close(ai.rdrb); + ssm_pool_close(proc.pool); fail_rdrb: - bmp_destroy(ai.fqueues); + bmp_destroy(proc.fqueues); fail_fqueues: - bmp_destroy(ai.fds); + bmp_destroy(proc.fds); fail_fds: - fprintf(stderr, "FATAL: ouroboros-dev init failed. " - "Make sure an IRMd is running.\n\n"); - memset(&ai, 0, sizeof(ai)); + memset(&proc, 0, sizeof(proc)); + fail_prog: exit(EXIT_FAILURE); } static void fini(void) { - int i = 0; -#ifdef PROC_FLOW_STATS - rib_fini(); -#endif - if (ai.fds == NULL) - return; + int i; - pthread_cancel(ai.rx); - pthread_join(ai.rx, NULL); + if (proc.fds == NULL) + return; - fset_destroy(ai.frct_set); + pthread_cancel(proc.rx); + pthread_join(proc.rx, NULL); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); for (i = 0; i < PROG_MAX_FLOWS; ++i) { - if (ai.flows[i].flow_id != -1) { + struct flow * flow = &proc.flows[i]; + if (flow->info.id != -1) { ssize_t idx; - shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); - while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) - shm_rdrbuff_remove(ai.rdrb, idx); - flow_fini(i); + ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWDOWN); + while ((idx = ssm_rbuff_read(flow->rx_rb)) >= 0) + ssm_pool_remove(proc.pool, idx); + __flow_fini(i); } } - shm_flow_set_close(ai.fqset); + pthread_cond_destroy(&proc.cond); + pthread_mutex_destroy(&proc.mtx); - for (i = 0; i < SYS_MAX_FLOWS; ++i) { - pthread_mutex_destroy(&ai.ports[i].state_lock); - pthread_cond_destroy(&ai.ports[i].state_cond); - } + pthread_rwlock_unlock(&proc.lock); +#ifdef PROC_FLOW_STATS + rib_fini(); +#endif timerwheel_fini(); - shm_rdrbuff_close(ai.rdrb); + fset_destroy(proc.frct_set); + + ssm_flow_set_close(proc.fqset); + + pthread_rwlock_destroy(&proc.lock); - free(ai.flows); - free(ai.ports); + free(proc.flows); + free(proc.id_to_fd); - bmp_destroy(ai.fds); - bmp_destroy(ai.fqueues); + ssm_pool_close(proc.pool); - pthread_rwlock_unlock(&ai.lock); + bmp_destroy(proc.fds); + bmp_destroy(proc.fqueues); - pthread_rwlock_destroy(&ai.lock); + proc_exit(); + + memset(&proc, 0, sizeof(proc)); } #if defined(__MACH__) && defined(__APPLE__) @@ -728,286 +848,244 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini; int flow_accept(qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; - void * pkp; /* public key pair */ - uint8_t s[SYMMKEYSZ]; /* secret key for flow */ - uint8_t buf[MSGBUFSZ]; - int err = -EIRMD; - ssize_t key_len; - - memset(s, 0, SYMMKEYSZ); - - msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; - msg.has_pid = true; - msg.pid = getpid(); - - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } - - key_len = crypt_dh_pkp_create(&pkp, buf); - if (key_len < 0) { - err = -ECRYPT; - goto fail_crypt_pkp; - } - if (key_len > 0) { - msg.has_pk = true; - msg.pk.data = buf; - msg.pk.len = (uint32_t) key_len; - } - - pthread_cleanup_push(crypt_dh_pkp_destroy, pkp); - - recv_msg = send_recv_irm_msg(&msg); - - pthread_cleanup_pop(false); - - if (recv_msg == NULL) - goto fail_recv; - - if (!recv_msg->has_result) - goto fail_msg; - - if (recv_msg->result != 0) { - err = recv_msg->result; - goto fail_msg; - } + struct flow_info flow; + struct crypt_sk crypt; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + uint8_t key[SYMMKEYSZ]; + int fd; + int err; - if (!recv_msg->has_pid || !recv_msg->has_flow_id || - !recv_msg->has_mpl || recv_msg->qosspec == NULL) - goto fail_msg; +#ifdef QOS_DISABLE_CRC + if (qs != NULL) + qs->ber = 1; +#endif + memset(&flow, 0, sizeof(flow)); - if (recv_msg->pk.len != 0 && - crypt_dh_derive(pkp, recv_msg->pk.data, - recv_msg->pk.len, s) < 0) { - err = -ECRYPT; - goto fail_msg; - } + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - crypt_dh_pkp_destroy(pkp); + if (flow_accept__irm_req_ser(&msg, &flow, timeo)) + return -ENOMEM; - fd = flow_init(recv_msg->flow_id, recv_msg->pid, - msg_to_spec(recv_msg->qosspec), s, - recv_msg->mpl); + err = send_recv_msg(&msg); + if (err < 0) + return err; - irm_msg__free_unpacked(recv_msg, NULL); + crypt.key = key; - if (fd < 0) - return fd; + err = flow__irm_result_des(&msg, &flow, &crypt); + if (err < 0) + return err; + fd = flow_init(&flow, &crypt); - pthread_rwlock_rdlock(&ai.lock); + crypt_secure_clear(key, SYMMKEYSZ); if (qs != NULL) - *qs = ai.flows[fd].qs; - - pthread_rwlock_unlock(&ai.lock); + *qs = flow.qs; return fd; - - fail_msg: - irm_msg__free_unpacked(recv_msg, NULL); - fail_recv: - crypt_dh_pkp_destroy(pkp); - fail_crypt_pkp: - return err; } -static int __flow_alloc(const char * dst, - qosspec_t * qs, - const struct timespec * timeo, - bool join) +int flow_alloc(const char * dst, + qosspec_t * qs, + const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT; - irm_msg_t * recv_msg; - int fd; - void * pkp = NULL; /* public key pair */ - uint8_t s[SYMMKEYSZ]; /* secret key for flow */ - uint8_t buf[MSGBUFSZ]; - int err = -EIRMD; - - memset(s, 0, SYMMKEYSZ); + struct flow_info flow; + struct crypt_sk crypt; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + uint8_t key[SYMMKEYSZ]; + int fd; + int err; #ifdef QOS_DISABLE_CRC if (qs != NULL) qs->ber = 1; #endif - msg.code = join ? IRM_MSG_CODE__IRM_FLOW_JOIN - : IRM_MSG_CODE__IRM_FLOW_ALLOC; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.pid = getpid(); - qs_msg = spec_to_msg(qs); - msg.qosspec = &qs_msg; - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } + memset(&flow, 0, sizeof(flow)); - if (!join && qs != NULL && qs->cypher_s != 0) { - ssize_t key_len; + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - key_len = crypt_dh_pkp_create(&pkp, buf); - if (key_len < 0) { - err = -ECRYPT; - goto fail_crypt_pkp; - } + if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) + return -ENOMEM; - msg.has_pk = true; - msg.pk.data = buf; - msg.pk.len = (uint32_t) key_len; + err = send_recv_msg(&msg); + if (err < 0) { + printf("send_recv_msg error %d\n", err); + return err; } - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - goto fail_send; + crypt.key = key; - if (!recv_msg->has_result) - goto fail_result; + err = flow__irm_result_des(&msg, &flow, &crypt); + if (err < 0) + return err; - if (recv_msg->result != 0) { - err = recv_msg->result; - goto fail_result; - } + fd = flow_init(&flow, &crypt); - if (!recv_msg->has_pid || !recv_msg->has_flow_id || - !recv_msg->has_mpl) - goto fail_result; + crypt_secure_clear(key, SYMMKEYSZ); - if (!join && qs != NULL && qs->cypher_s != 0) { - if (!recv_msg->has_pk || recv_msg->pk.len == 0) { - err = -ECRYPT; - goto fail_result; - } + if (qs != NULL) + *qs = flow.qs; - if (crypt_dh_derive(pkp, recv_msg->pk.data, - recv_msg->pk.len, s) < 0) { - err = -ECRYPT; - goto fail_result; - } + return fd; +} - crypt_dh_pkp_destroy(pkp); - } +int flow_join(const char * dst, + const struct timespec * timeo) +{ + struct flow_info flow; + struct crypt_sk crypt; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + uint8_t key[SYMMKEYSZ]; + int fd; + int err; + memset(&flow, 0, sizeof(flow)); - fd = flow_init(recv_msg->flow_id, recv_msg->pid, - qs == NULL ? qos_raw : *qs, s, - recv_msg->mpl); + flow.n_pid = getpid(); + flow.qs = qos_np1; - irm_msg__free_unpacked(recv_msg, NULL); + if (flow_join__irm_req_ser(&msg, &flow, dst, timeo)) + return -ENOMEM; - return fd; + err = send_recv_msg(&msg); + if (err < 0) + return err; - fail_result: - irm_msg__free_unpacked(recv_msg, NULL); - fail_send: - crypt_dh_pkp_destroy(pkp); - fail_crypt_pkp: - return err; -} + crypt.key = key; -int flow_alloc(const char * dst, - qosspec_t * qs, - const struct timespec * timeo) -{ - return __flow_alloc(dst, qs, timeo, false); -} + err = flow__irm_result_des(&msg, &flow, &crypt); + if (err < 0) + return err; -int flow_join(const char * dst, - qosspec_t * qs, - const struct timespec * timeo) -{ - if (qs != NULL && qs->cypher_s != 0) - return -ECRYPT; + fd = flow_init(&flow, &crypt); + + crypt_secure_clear(key, SYMMKEYSZ); - return __flow_alloc(dst, qs, timeo, true); + return fd; } +#define PKT_BUF_LEN 2048 int flow_dealloc(int fd) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - struct flow * f; - time_t timeo; + struct flow_info info; + uint8_t pkt[PKT_BUF_LEN]; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + struct timespec timeo = TIMESPEC_INIT_S(0); + struct flow * flow; + int err; if (fd < 0 || fd >= SYS_MAX_FLOWS ) return -EINVAL; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_flow_id = true; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_nsec = 0; + memset(&info, 0, sizeof(flow)); - f = &ai.flows[fd]; + flow = &proc.flows[fd]; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - if (f->flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + if (flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); return -ENOTALLOC; } - msg.flow_id = f->flow_id; + flow->oflags = FLOWFDEFAULT | FLOWFRNOPART; - timeo = frcti_dealloc(f->frcti); - while (timeo < 0) { /* keep the flow active for rtx */ - ssize_t ret; - uint8_t buf[128]; - struct timespec tic = {0, TICTIME}; + flow->rcv_timesout = true; + flow->rcv_timeo = tic; + + pthread_rwlock_unlock(&proc.lock); - f->oflags = FLOWFDEFAULT | FLOWFRNOPART; + flow_read(fd, buf, SOCK_BUF_SIZE); - f->rcv_timesout = true; - f->rcv_timeo = tic; + pthread_rwlock_rdlock(&proc.lock); - pthread_rwlock_unlock(&ai.lock); + timeo.tv_sec = frcti_dealloc(flow->frcti); + while (timeo.tv_sec < 0) { /* keep the flow active for rtx */ + ssize_t ret; + + pthread_rwlock_unlock(&proc.lock); - ret = flow_read(fd, buf, 128); + ret = flow_read(fd, pkt, PKT_BUF_LEN); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - timeo = frcti_dealloc(f->frcti); + timeo.tv_sec = frcti_dealloc(flow->frcti); - if (ret == -EFLOWDOWN && timeo < 0) - timeo = -timeo; + if (ret == -EFLOWDOWN && timeo.tv_sec < 0) + timeo.tv_sec = -timeo.tv_sec; } - msg.timeo_sec = timeo; + pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); - shm_rbuff_fini(ai.flows[fd].tx_rb); + ssm_rbuff_fini(flow->tx_rb); - pthread_rwlock_unlock(&ai.lock); + pthread_cleanup_pop(true); - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; + info.id = flow->info.id; + info.n_pid = getpid(); - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - pthread_rwlock_wrlock(&ai.lock); + err = irm__irm_result_des(&msg); flow_fini(fd); - pthread_rwlock_unlock(&ai.lock); + return err; +} - return 0; +int ipcp_flow_dealloc(int fd) +{ + struct flow_info info; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + struct flow * flow; + int err; + + if (fd < 0 || fd >= SYS_MAX_FLOWS ) + return -EINVAL; + + flow = &proc.flows[fd]; + + memset(&info, 0, sizeof(flow)); + + pthread_rwlock_rdlock(&proc.lock); + + if (flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); + return -ENOTALLOC; + } + + info.id = flow->info.id; + info.n_1_pid = flow->info.n_1_pid; + + pthread_rwlock_unlock(&proc.lock); + + if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) + return -ENOMEM; + + err = send_recv_msg(&msg); + if (err < 0) + return err; + + err = irm__irm_result_des(&msg); + + flow_fini(fd); + + return err; } int fccntl(int fd, @@ -1028,14 +1106,14 @@ int fccntl(int fd, if (fd < 0 || fd >= SYS_MAX_FLOWS) return -EBADF; - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; va_start(l, cmd); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); - if (flow->flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + if (flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); va_end(l); return -ENOTALLOC; } @@ -1073,26 +1151,26 @@ int fccntl(int fd, goto einval; if (!flow->rcv_timesout) goto eperm; - *timeo = flow->snd_timeo; + *timeo = flow->rcv_timeo; break; case FLOWGQOSSPEC: qs = va_arg(l, qosspec_t *); if (qs == NULL) goto einval; - *qs = flow->qs; + *qs = flow->info.qs; break; case FLOWGRXQLEN: qlen = va_arg(l, size_t *); - *qlen = shm_rbuff_queued(flow->rx_rb); + *qlen = ssm_rbuff_queued(flow->rx_rb); break; case FLOWGTXQLEN: qlen = va_arg(l, size_t *); - *qlen = shm_rbuff_queued(flow->tx_rb); + *qlen = ssm_rbuff_queued(flow->tx_rb); break; case FLOWSFLAGS: flow->oflags = va_arg(l, uint32_t); - rx_acl = shm_rbuff_get_acl(flow->rx_rb); - tx_acl = shm_rbuff_get_acl(flow->rx_rb); + rx_acl = ssm_rbuff_get_acl(flow->rx_rb); + tx_acl = ssm_rbuff_get_acl(flow->rx_rb); /* * Making our own flow write only means making the * the other side of the flow read only. @@ -1105,19 +1183,19 @@ int fccntl(int fd, if (flow->oflags & FLOWFDOWN) { rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; - shm_flow_set_notify(flow->set, - flow->flow_id, + ssm_flow_set_notify(flow->set, + flow->info.id, FLOW_DOWN); } else { rx_acl &= ~ACL_FLOWDOWN; tx_acl &= ~ACL_FLOWDOWN; - shm_flow_set_notify(flow->set, - flow->flow_id, + ssm_flow_set_notify(flow->set, + flow->info.id, FLOW_UP); } - shm_rbuff_set_acl(flow->rx_rb, rx_acl); - shm_rbuff_set_acl(flow->tx_rb, tx_acl); + ssm_rbuff_set_acl(flow->rx_rb, rx_acl); + ssm_rbuff_set_acl(flow->tx_rb, tx_acl); break; case FLOWGFLAGS: @@ -1141,53 +1219,56 @@ int fccntl(int fd, *cflags = frcti_getflags(flow->frcti); break; default: - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); va_end(l); return -ENOTSUP; }; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); va_end(l); return 0; einval: - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); va_end(l); return -EINVAL; eperm: - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); va_end(l); return -EPERM; } -static int chk_crc(struct shm_du_buff * sdb) +static int chk_crc(struct ssm_pk_buff * spb) { uint32_t crc; - uint8_t * head = shm_du_buff_head(sdb); - uint8_t * tail = shm_du_buff_tail_release(sdb, CRCLEN); + uint8_t * head = ssm_pk_buff_head(spb); + uint8_t * tail = ssm_pk_buff_tail_release(spb, CRCLEN); mem_hash(HASH_CRC32, &crc, head, tail - head); return !(crc == *((uint32_t *) tail)); } -static int add_crc(struct shm_du_buff * sdb) +static int add_crc(struct ssm_pk_buff * spb) { - uint8_t * head = shm_du_buff_head(sdb); - uint8_t * tail = shm_du_buff_tail_alloc(sdb, CRCLEN); + uint8_t * head; + uint8_t * tail; + + tail = ssm_pk_buff_tail_alloc(spb, CRCLEN); if (tail == NULL) - return -1; + return -ENOMEM; + head = ssm_pk_buff_head(spb); mem_hash(HASH_CRC32, tail, head, tail - head); return 0; } -static int flow_tx_sdb(struct flow * flow, - struct shm_du_buff * sdb, +static int flow_tx_spb(struct flow * flow, + struct ssm_pk_buff * spb, bool block, struct timespec * abstime) { @@ -1197,46 +1278,46 @@ static int flow_tx_sdb(struct flow * flow, clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); flow->snd_act = now; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); - idx = shm_du_buff_get_idx(sdb); + idx = ssm_pk_buff_get_idx(spb); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - if (shm_du_buff_len(sdb) > 0) { - if (frcti_snd(flow->frcti, sdb) < 0) + if (ssm_pk_buff_len(spb) > 0) { + if (frcti_snd(flow->frcti, spb) < 0) goto enomem; - if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) + if (spb_encrypt(flow, spb) < 0) goto enomem; - if (flow->qs.ber == 0 && add_crc(sdb) != 0) + if (flow->info.qs.ber == 0 && add_crc(spb) != 0) goto enomem; } - pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); + pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock); if (!block) - ret = shm_rbuff_write(flow->tx_rb, idx); + ret = ssm_rbuff_write(flow->tx_rb, idx); else - ret = shm_rbuff_write_b(flow->tx_rb, idx, abstime); + ret = ssm_rbuff_write_b(flow->tx_rb, idx, abstime); if (ret < 0) - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_pool_remove(proc.pool, idx); else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); pthread_cleanup_pop(true); return 0; enomem: - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); + pthread_rwlock_unlock(&proc.lock); + ssm_pool_remove(proc.pool, idx); return -ENOMEM; } @@ -1250,7 +1331,7 @@ ssize_t flow_write(int fd, int flags; struct timespec abs; struct timespec * abstime = NULL; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; uint8_t * ptr; if (buf == NULL && count != 0) @@ -1259,14 +1340,14 @@ ssize_t flow_write(int fd, if (fd < 0 || fd >= PROG_MAX_FLOWS) return -EBADF; - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; clock_gettime(PTHREAD_COND_CLOCK, &abs); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); - if (flow->flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + if (flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); return -ENOTALLOC; } @@ -1277,7 +1358,7 @@ ssize_t flow_write(int fd, flags = flow->oflags; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); if ((flags & FLOWFACCMODE) == FLOWFRDONLY) return -EPERM; @@ -1285,13 +1366,12 @@ ssize_t flow_write(int fd, if (flags & FLOWFWNOBLOCK) { if (!frcti_is_window_open(flow->frcti)) return -EAGAIN; - idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb); + idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb); } else { - while ((ret = frcti_window_wait(flow->frcti, abstime)) < 0) { - if (ret < 0) - return ret; - } - idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime); + ret = frcti_window_wait(flow->frcti, abstime); + if (ret < 0) + return ret; + idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, abstime); } if (idx < 0) @@ -1300,50 +1380,51 @@ ssize_t flow_write(int fd, if (count > 0) memcpy(ptr, buf, count); - ret = flow_tx_sdb(flow, sdb, !(flags & FLOWFWNOBLOCK), abstime); + ret = flow_tx_spb(flow, spb, !(flags & FLOWFWNOBLOCK), abstime); return ret < 0 ? (ssize_t) ret : (ssize_t) count; } static bool invalid_pkt(struct flow * flow, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { - if (shm_du_buff_len(sdb) == 0) + if (spb == NULL || ssm_pk_buff_len(spb) == 0) return true; - if (flow->qs.ber == 0 && chk_crc(sdb) != 0) + if (flow->info.qs.ber == 0 && chk_crc(spb) != 0) return true; - if (flow->qs.cypher_s > 0 && crypt_decrypt(flow, sdb) < 0) + if (spb_decrypt(flow, spb) < 0) return true; return false; } -static ssize_t flow_rx_sdb(struct flow * flow, - struct shm_du_buff ** sdb, +static ssize_t flow_rx_spb(struct flow * flow, + struct ssm_pk_buff ** spb, bool block, struct timespec * abstime) { ssize_t idx; struct timespec now; - idx = block ? shm_rbuff_read_b(flow->rx_rb, abstime) : - shm_rbuff_read(flow->rx_rb); + idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) : + ssm_rbuff_read(flow->rx_rb); if (idx < 0) return idx; clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); flow->rcv_act = now; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - if (invalid_pkt(flow, *sdb)) { - shm_rdrbuff_remove(ai.rdrb, idx); + *spb = ssm_pool_get(proc.pool, idx); + + if (invalid_pkt(flow, *spb)) { + ssm_pool_remove(proc.pool, idx); return -EAGAIN; } @@ -1357,7 +1438,7 @@ ssize_t flow_read(int fd, ssize_t idx; ssize_t n; uint8_t * packet; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct timespec abs; struct timespec now; struct timespec * abstime = NULL; @@ -1368,19 +1449,19 @@ ssize_t flow_read(int fd, if (fd < 0 || fd >= PROG_MAX_FLOWS) return -EBADF; - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - if (flow->flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + if (flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); return -ENOTALLOC; } if (flow->part_idx == DONE_PART) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); flow->part_idx = NO_PART; return 0; } @@ -1396,61 +1477,61 @@ ssize_t flow_read(int fd, idx = flow->part_idx; if (idx < 0) { while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); - idx = flow_rx_sdb(flow, &sdb, block, abstime); + idx = flow_rx_spb(flow, &spb, block, abstime); if (idx < 0) { if (block && idx != -EAGAIN) return idx; if (!block) return idx; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); continue; } - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - frcti_rcv(flow->frcti, sdb); + frcti_rcv(flow->frcti, spb); } } - sdb = shm_rdrbuff_get(ai.rdrb, idx); + spb = ssm_pool_get(proc.pool, idx); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); - packet = shm_du_buff_head(sdb); + packet = ssm_pk_buff_head(spb); - n = shm_du_buff_len(sdb); + n = ssm_pk_buff_len(spb); assert(n >= 0); if (n <= (ssize_t) count) { memcpy(buf, packet, n); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; flow->rcv_act = now; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return n; } else { if (partrd) { memcpy(buf, packet, count); - shm_du_buff_head_release(sdb, n); - pthread_rwlock_wrlock(&ai.lock); + ssm_pk_buff_head_release(spb, n); + pthread_rwlock_wrlock(&proc.lock); flow->part_idx = idx; flow->rcv_act = now; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return count; } else { - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return -EMSGSIZE; } } @@ -1458,7 +1539,7 @@ ssize_t flow_read(int fd, /* fqueue functions. */ -struct flow_set * fset_create() +struct flow_set * fset_create(void) { struct flow_set * set; @@ -1466,20 +1547,20 @@ struct flow_set * fset_create() if (set == NULL) goto fail_malloc; - assert(ai.fqueues); + assert(proc.fqueues); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); - set->idx = bmp_allocate(ai.fqueues); - if (!bmp_is_id_valid(ai.fqueues, set->idx)) + set->idx = bmp_allocate(proc.fqueues); + if (!bmp_is_id_valid(proc.fqueues, set->idx)) goto fail_bmp_alloc; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return set; fail_bmp_alloc: - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); free(set); fail_malloc: return NULL; @@ -1492,22 +1573,22 @@ void fset_destroy(struct flow_set * set) fset_zero(set); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); - bmp_release(ai.fqueues, set->idx); + bmp_release(proc.fqueues, set->idx); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); free(set); } -struct fqueue * fqueue_create() +struct fqueue * fqueue_create(void) { struct fqueue * fq = malloc(sizeof(*fq)); if (fq == NULL) return NULL; - memset(fq->fqueue, -1, SHM_BUFFER_SIZE * sizeof(*fq->fqueue)); + memset(fq->fqueue, -1, SSM_RBUFF_SIZE * sizeof(*fq->fqueue)); fq->fqsize = 0; fq->next = 0; @@ -1524,7 +1605,7 @@ void fset_zero(struct flow_set * set) if (set == NULL) return; - shm_flow_set_zero(ai.fqset, set->idx); + ssm_flow_set_zero(proc.fqset, set->idx); } int fset_add(struct flow_set * set, @@ -1536,31 +1617,31 @@ int fset_add(struct flow_set * set, if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return -EINVAL; - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { ret = -EINVAL; goto fail; } if (flow->frcti != NULL) - shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); + ssm_flow_set_del(proc.fqset, 0, flow->info.id); - ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id); + ret = ssm_flow_set_add(proc.fqset, set->idx, flow->info.id); if (ret < 0) goto fail; - if (shm_rbuff_queued(ai.flows[fd].rx_rb)) - shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT); + if (ssm_rbuff_queued(flow->rx_rb)) + ssm_flow_set_notify(proc.fqset, flow->info.id, FLOW_PKT); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return ret; fail: - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return ret; } @@ -1572,37 +1653,40 @@ void fset_del(struct flow_set * set, if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return; - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - if (flow->flow_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); + if (flow->info.id >= 0) + ssm_flow_set_del(proc.fqset, set->idx, flow->info.id); if (flow->frcti != NULL) - shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id); + ssm_flow_set_add(proc.fqset, 0, proc.flows[fd].info.id); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); } bool fset_has(const struct flow_set * set, int fd) { - bool ret; + struct flow * flow; + bool ret; if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return false; - pthread_rwlock_rdlock(&ai.lock); + flow = &proc.flows[fd]; - if (ai.flows[fd].flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); + + if (flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); return false; } - ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1); + ret = (ssm_flow_set_has(proc.fqset, set->idx, flow->info.id) == 1); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return ret; } @@ -1610,7 +1694,7 @@ bool fset_has(const struct flow_set * set, /* Filter fqueue events for non-data packets */ static int fqueue_filter(struct fqueue * fq) { - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; int fd; ssize_t idx; struct frcti * frcti; @@ -1619,44 +1703,44 @@ static int fqueue_filter(struct fqueue * fq) if (fq->fqueue[fq->next].event != FLOW_PKT) return 1; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - fd = ai.ports[fq->fqueue[fq->next].flow_id].fd; + fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd; if (fd < 0) { ++fq->next; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); continue; } - frcti = ai.flows[fd].frcti; + frcti = proc.flows[fd].frcti; if (frcti == NULL) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return 1; } if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return 1; } - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); - idx = flow_rx_sdb(&ai.flows[fd], &sdb, false, NULL); + idx = flow_rx_spb(&proc.flows[fd], &spb, false, NULL); if (idx < 0) return 0; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - sdb = shm_rdrbuff_get(ai.rdrb, idx); + spb = ssm_pool_get(proc.pool, idx); - __frcti_rcv(frcti, sdb); + __frcti_rcv(frcti, spb); if (__frcti_pdu_ready(frcti) >= 0) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return 1; } - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); ++fq->next; } @@ -1667,7 +1751,7 @@ static int fqueue_filter(struct fqueue * fq) int fqueue_next(struct fqueue * fq) { int fd; - struct portevent * e; + struct flowevent * e; if (fq == NULL) return -EINVAL; @@ -1678,15 +1762,15 @@ int fqueue_next(struct fqueue * fq) if (fq->next != 0 && fqueue_filter(fq) == 0) return -EPERM; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); e = fq->fqueue + fq->next; - fd = ai.ports[e->flow_id].fd; + fd = proc.id_to_fd[e->flow_id].fd; ++fq->next; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return fd; } @@ -1724,7 +1808,7 @@ ssize_t fevent(struct flow_set * set, } while (ret == 0) { - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); + ret = ssm_flow_set_wait(proc.fqset, set->idx, fq->fqueue, t); if (ret == -ETIMEDOUT) return -ETIMEDOUT; @@ -1741,10 +1825,21 @@ ssize_t fevent(struct flow_set * set, /* ipcp-dev functions. */ -int np1_flow_alloc(pid_t n_pid, - int flow_id) +int np1_flow_alloc(pid_t n_pid, + int flow_id) { - return flow_init(flow_id, n_pid, qos_np1, NULL, 0); + struct flow_info flow; + struct crypt_sk crypt = { .nid = NID_undef, .key = NULL }; + + memset(&flow, 0, sizeof(flow)); + + flow.id = flow_id; + flow.n_pid = getpid(); + flow.qs = qos_np1; + flow.mpl = 0; + flow.n_1_pid = n_pid; /* This "flow" is upside-down! */ + + return flow_init(&flow, &crypt); } int np1_flow_dealloc(int flow_id, @@ -1760,316 +1855,332 @@ int np1_flow_dealloc(int flow_id, sleep(timeo); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - fd = ai.ports[flow_id].fd; + fd = proc.id_to_fd[flow_id].fd; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return fd; } -int np1_flow_resp(int flow_id) +int np1_flow_resp(int flow_id, + int resp) { int fd; - if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED) + if (resp == 0 && flow_wait_assign(flow_id) != FLOW_ALLOCATED) return -1; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - fd = ai.ports[flow_id].fd; + fd = proc.id_to_fd[flow_id].fd; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return fd; } -int ipcp_create_r(int result) +int ipcp_create_r(const struct ipcp_info * info) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret; - - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_result = true; - msg.result = result; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + int err; - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); + if (ipcp_create_r__irm_req_ser(&msg,info) < 0) + return -ENOMEM; - return ret; + err = send_recv_msg(&msg); + if (err < 0) + return err; + + return irm__irm_result_des(&msg); } -int ipcp_flow_req_arr(const uint8_t * dst, - size_t len, - qosspec_t qs, - time_t mpl, - const void * data, - size_t dlen) +int ipcp_flow_req_arr(const buffer_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - qosspec_msg_t qs_msg; - int fd; - - assert(dst != NULL); - - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_hash = true; - msg.hash.len = len; - msg.hash.data = (uint8_t *) dst; - qs_msg = spec_to_msg(&qs); - msg.qosspec = &qs_msg; - msg.has_mpl = true; - msg.mpl = mpl; - msg.has_pk = true; - msg.pk.data = (uint8_t *) data; - msg.pk.len = dlen; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_flow_id || !recv_msg->has_pid) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + struct crypt_sk crypt; + uint8_t key[SYMMKEYSZ]; + int err; - if (recv_msg->has_result && recv_msg->result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + memset(&flow, 0, sizeof(flow)); - fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0); + assert(dst != NULL && dst->len != 0 && dst->data != NULL); - irm_msg__free_unpacked(recv_msg, NULL); + flow.n_1_pid = getpid(); + flow.qs = qs; + flow.mpl = mpl; - return fd; -} + if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0) + return -ENOMEM; -int ipcp_flow_alloc_reply(int fd, - int response, - time_t mpl, - const void * data, - size_t len) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret; + err = send_recv_msg(&msg); + if (err < 0) + return err; - assert(fd >= 0 && fd < SYS_MAX_FLOWS); + crypt.key = key; - msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.has_flow_id = true; - msg.has_pk = true; - msg.pk.data = (uint8_t *) data; - msg.pk.len = (uint32_t) len; - msg.has_mpl = true; - msg.mpl = mpl; + err = flow__irm_result_des(&msg, &flow, &crypt); + if (err < 0) + return err; - pthread_rwlock_rdlock(&ai.lock); + assert(crypt.nid == NID_undef); /* np1 flows are not encrypted */ - msg.flow_id = ai.flows[fd].flow_id; + /* inverted for np1_flow */ + flow.n_1_pid = flow.n_pid; + flow.n_pid = getpid(); + flow.mpl = 0; + flow.qs = qos_np1; - pthread_rwlock_unlock(&ai.lock); + crypt.nid = NID_undef; - msg.has_response = true; - msg.response = response; + return flow_init(&flow, &crypt); +} - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; +int ipcp_flow_alloc_reply(int fd, + int response, + time_t mpl, + const buffer_t * data) +{ + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + int err; - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + assert(fd >= 0 && fd < SYS_MAX_FLOWS); - ret = recv_msg->result; + pthread_rwlock_rdlock(&proc.lock); - irm_msg__free_unpacked(recv_msg, NULL); + flow.id = proc.flows[fd].info.id; - return ret; + pthread_rwlock_unlock(&proc.lock); + + flow.mpl = mpl; + + if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0) + return -ENOMEM; + + err = send_recv_msg(&msg); + if (err < 0) + return err; + + return irm__irm_result_des(&msg); } int ipcp_flow_read(int fd, - struct shm_du_buff ** sdb) + struct ssm_pk_buff ** spb) { struct flow * flow; ssize_t idx = -1; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - assert(sdb); + assert(spb); - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - assert(flow->flow_id >= 0); + assert(flow->info.id >= 0); - while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { - pthread_rwlock_unlock(&ai.lock); + while (frcti_queued_pdu(flow->frcti) < 0) { + pthread_rwlock_unlock(&proc.lock); - idx = flow_rx_sdb(flow, sdb, false, NULL); + idx = flow_rx_spb(flow, spb, false, NULL); if (idx < 0) return idx; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - frcti_rcv(flow->frcti, *sdb); + frcti_rcv(flow->frcti, *spb); } - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return 0; } int ipcp_flow_write(int fd, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct flow * flow; int ret; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - assert(sdb); + assert(spb); - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_wrlock(&proc.lock); - if (flow->flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + if (flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); return -ENOTALLOC; } if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return -EPERM; } - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); - ret = flow_tx_sdb(flow, sdb, true, NULL); + ret = flow_tx_spb(flow, spb, true, NULL); return ret; } +static int pool_copy_spb(struct ssm_pool * src_pool, + ssize_t src_idx, + struct ssm_pool * dst_pool, + struct ssm_pk_buff ** dst_spb) +{ + struct ssm_pk_buff * src; + uint8_t * ptr; + size_t len; + + src = ssm_pool_get(src_pool, src_idx); + len = ssm_pk_buff_len(src); + + if (ssm_pool_alloc(dst_pool, len, &ptr, dst_spb) < 0) { + ssm_pool_remove(src_pool, src_idx); + return -ENOMEM; + } + + memcpy(ptr, ssm_pk_buff_head(src), len); + ssm_pool_remove(src_pool, src_idx); + + return 0; +} + int np1_flow_read(int fd, - struct shm_du_buff ** sdb) + struct ssm_pk_buff ** spb, + struct ssm_pool * pool) { - struct flow * flow; - ssize_t idx = -1; + struct flow * flow; + ssize_t idx = -1; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - assert(sdb); + assert(spb); - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; - assert(flow->flow_id >= 0); + assert(flow->info.id >= 0); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - idx = shm_rbuff_read(flow->rx_rb);; + idx = ssm_rbuff_read(flow->rx_rb); if (idx < 0) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return idx; } - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); - *sdb = shm_rdrbuff_get(ai.rdrb, idx); + if (pool == NULL) { + *spb = ssm_pool_get(proc.pool, idx); + } else { + /* Cross-pool copy: PUP -> GSPP */ + if (pool_copy_spb(pool, idx, proc.pool, spb) < 0) + return -ENOMEM; + } return 0; } int np1_flow_write(int fd, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb, + struct ssm_pool * pool) { - struct flow * flow; - int ret; - ssize_t idx; + struct flow * flow; + struct ssm_pk_buff * dst; + int ret; + ssize_t idx; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - assert(sdb); + assert(spb); - flow = &ai.flows[fd]; + flow = &proc.flows[fd]; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - if (flow->flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + if (flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); return -ENOTALLOC; } if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return -EPERM; } - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); - idx = shm_du_buff_get_idx(sdb); + idx = ssm_pk_buff_get_idx(spb); - ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); - if (ret < 0) - shm_rdrbuff_remove(ai.rdrb, idx); - else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + if (pool == NULL) { + ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL); + if (ret < 0) + ssm_pool_remove(proc.pool, idx); + else + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + } else { + /* Cross-pool copy: GSPP -> PUP */ + if (pool_copy_spb(proc.pool, idx, pool, &dst) < 0) + return -ENOMEM; + idx = ssm_pk_buff_get_idx(dst); + ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL); + if (ret < 0) + ssm_pool_remove(pool, idx); + else + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + } return ret; } -int ipcp_sdb_reserve(struct shm_du_buff ** sdb, +int ipcp_spb_reserve(struct ssm_pk_buff ** spb, size_t len) { - return shm_rdrbuff_alloc_b(ai.rdrb, len, NULL, sdb, NULL) < 0 ? -1 : 0; + return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 ? -1 : 0; } -void ipcp_sdb_release(struct shm_du_buff * sdb) +void ipcp_spb_release(struct ssm_pk_buff * spb) { - shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); + ssm_pool_remove(proc.pool, ssm_pk_buff_get_idx(spb)); } int ipcp_flow_fini(int fd) { - struct shm_rbuff * rx_rb; + struct ssm_rbuff * rx_rb; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - if (ai.flows[fd].flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + if (proc.flows[fd].info.id < 0) { + pthread_rwlock_unlock(&proc.lock); return -1; } - shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN); - shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].flow_id, + ssm_flow_set_notify(proc.flows[fd].set, + proc.flows[fd].info.id, FLOW_DEALLOC); - rx_rb = ai.flows[fd].rx_rb; + rx_rb = proc.flows[fd].rx_rb; - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); if (rx_rb != NULL) - shm_rbuff_fini(rx_rb); + ssm_rbuff_fini(rx_rb); return 0; } @@ -2080,13 +2191,13 @@ int ipcp_flow_get_qoscube(int fd, assert(fd >= 0 && fd < SYS_MAX_FLOWS); assert(cube); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - assert(ai.flows[fd].flow_id >= 0); + assert(proc.flows[fd].info.id >= 0); - *cube = qos_spec_to_cube(ai.flows[fd].qs); + *cube = qos_spec_to_cube(proc.flows[fd].info.qs); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return 0; } @@ -2095,56 +2206,76 @@ size_t ipcp_flow_queued(int fd) { size_t q; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_rdlock(&proc.lock); - assert(ai.flows[fd].flow_id >= 0); + assert(proc.flows[fd].info.id >= 0); - q = shm_rbuff_queued(ai.flows[fd].tx_rb); + q = ssm_rbuff_queued(proc.flows[fd].tx_rb); - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&proc.lock); return q; } -ssize_t local_flow_read(int fd) +int local_flow_transfer(int src_fd, + int dst_fd, + struct ssm_pool * src_pool, + struct ssm_pool * dst_pool) { - ssize_t ret; - - assert(fd >= 0); - - pthread_rwlock_rdlock(&ai.lock); - - ret = shm_rbuff_read(ai.flows[fd].rx_rb); - - pthread_rwlock_unlock(&ai.lock); + struct flow * src_flow; + struct flow * dst_flow; + struct ssm_pk_buff * dst_spb; + struct ssm_pool * sp; + struct ssm_pool * dp; + ssize_t idx; + int ret; - return ret; -} + assert(src_fd >= 0); + assert(dst_fd >= 0); -int local_flow_write(int fd, - size_t idx) -{ - struct flow * flow; - int ret; + src_flow = &proc.flows[src_fd]; + dst_flow = &proc.flows[dst_fd]; - assert(fd >= 0); + sp = src_pool == NULL ? proc.pool : src_pool; + dp = dst_pool == NULL ? proc.pool : dst_pool; - flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&proc.lock); - pthread_rwlock_rdlock(&ai.lock); + idx = ssm_rbuff_read(src_flow->rx_rb); + if (idx < 0) { + pthread_rwlock_unlock(&proc.lock); + return idx; + } - if (flow->flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); + if (dst_flow->info.id < 0) { + pthread_rwlock_unlock(&proc.lock); + ssm_pool_remove(sp, idx); return -ENOTALLOC; } - ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); - if (ret == 0) - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); - else - shm_rdrbuff_remove(ai.rdrb, idx); + pthread_rwlock_unlock(&proc.lock); - pthread_rwlock_unlock(&ai.lock); + if (sp == dp) { + /* Same pool: zero-copy */ + ret = ssm_rbuff_write_b(dst_flow->tx_rb, idx, NULL); + if (ret < 0) + ssm_pool_remove(sp, idx); + else + ssm_flow_set_notify(dst_flow->set, + dst_flow->info.id, FLOW_PKT); + } else { + /* Different pools: single copy */ + if (pool_copy_spb(sp, idx, dp, &dst_spb) < 0) + return -ENOMEM; + + idx = ssm_pk_buff_get_idx(dst_spb); + ret = ssm_rbuff_write_b(dst_flow->tx_rb, idx, NULL); + if (ret < 0) + ssm_pool_remove(dp, idx); + else + ssm_flow_set_notify(dst_flow->set, + dst_flow->info.id, FLOW_PKT); + } return ret; } |
