summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c1861
1 files changed, 1248 insertions, 613 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index edcf56ed..92310b9e 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2018
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* API for applications
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -20,31 +20,44 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#include <ouroboros/endian.h>
-
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
#define _POSIX_C_SOURCE 200809L
+#endif
#include "config.h"
-#include <ouroboros/hash.h>
-#include <ouroboros/errno.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/cep.h>
+#include <ouroboros/crypt.h>
#include <ouroboros/dev.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
+#include <ouroboros/flow.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/hash.h>
+#include <ouroboros/ipcp.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/list.h>
#include <ouroboros/local-dev.h>
-#include <ouroboros/sockets.h>
-#include <ouroboros/fccntl.h>
-#include <ouroboros/bitmap.h>
+#include <ouroboros/np1_flow.h>
+#include <ouroboros/pthread.h>
#include <ouroboros/random.h>
+#include <ouroboros/serdes-irm.h>
#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_rbuff.h>
+#include <ouroboros/sockets.h>
#include <ouroboros/utils.h>
-#include <ouroboros/fqueue.h>
-#include <ouroboros/qoscube.h>
-#include <ouroboros/timerwheel.h>
-
-#include "rq.h"
+#ifdef PROC_FLOW_STATS
+#include <ouroboros/rib.h>
+#endif
+#ifdef HAVE_LIBGCRYPT
+#include <gcrypt.h>
+#endif
+#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
@@ -57,46 +70,39 @@
#endif
/* Partial read information. */
-#define NO_PART -1
-#define DONE_PART -2
+#define NO_PART -1
+#define DONE_PART -2
-struct flow_set {
- size_t idx;
-};
+#define CRCLEN (sizeof(uint32_t))
+#define SECMEMSZ 16384
+#define MSGBUFSZ 2048
-struct fqueue {
- int fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
- size_t fqsize;
- size_t next;
+/* map flow_ids to flow descriptors; track state of the flow */
+struct fmap {
+ int fd;
+ /* TODO: use actual flow state */
+ enum flow_state state;
};
-enum port_state {
- PORT_NULL = 0,
- PORT_INIT,
- PORT_ID_PENDING,
- PORT_ID_ASSIGNED,
- PORT_DESTROY
-};
+#define frcti_to_flow(frcti) \
+ ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti)))
-struct port {
- int fd;
+struct flow {
+ struct list_head next;
- enum port_state state;
- pthread_mutex_t state_lock;
- pthread_cond_t state_cond;
-};
+ struct flow_info info;
-struct flow {
struct shm_rbuff * rx_rb;
struct shm_rbuff * tx_rb;
struct shm_flow_set * set;
- int port_id;
- int oflags;
- qoscube_t cube;
- qosspec_t spec;
+
+ uint16_t oflags;
ssize_t part_idx;
- pid_t pid;
+ struct crypt_info crypt;
+
+ struct timespec snd_act;
+ struct timespec rcv_act;
bool snd_timesout;
bool rcv_timesout;
@@ -106,166 +112,342 @@ struct flow {
struct frcti * frcti;
};
-struct {
- char * prog;
- pid_t pid;
+struct flow_set {
+ size_t idx;
+ pthread_rwlock_t lock;
+};
+
+struct fqueue {
+ struct flowevent fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
+ size_t fqsize;
+ size_t next;
+};
+struct {
struct shm_rdrbuff * rdrb;
struct shm_flow_set * fqset;
- struct timerwheel * tw;
-
struct bmp * fds;
struct bmp * fqueues;
struct flow * flows;
- struct port * ports;
+ struct fmap * id_to_fd;
+ struct list_head flow_list;
+
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+
+ pthread_t tx;
+ pthread_t rx;
+ size_t n_frcti;
+ fset_t * frct_set;
pthread_rwlock_t lock;
} ai;
-#include "frct.c"
-
-static void port_destroy(struct port * p)
+static void flow_destroy(struct fmap * p)
{
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_DESTROY) {
- pthread_mutex_unlock(&p->state_lock);
+ if (p->state == FLOW_DESTROY) {
+ pthread_mutex_unlock(&ai.mtx);
return;
}
- if (p->state == PORT_ID_PENDING)
- p->state = PORT_DESTROY;
+ if (p->state == FLOW_ALLOC_PENDING)
+ p->state = FLOW_DESTROY;
else
- p->state = PORT_NULL;
+ p->state = FLOW_NULL;
+
+ pthread_cond_signal(&ai.cond);
- pthread_cond_signal(&p->state_cond);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx);
- while (p->state != PORT_NULL)
- pthread_cond_wait(&p->state_cond, &p->state_lock);
+ while (p->state != FLOW_NULL)
+ pthread_cond_wait(&ai.cond, &ai.mtx);
- p->fd = -1;
- p->state = PORT_INIT;
+ p->fd = -1;
+ p->state = FLOW_INIT;
- pthread_mutex_unlock(&p->state_lock);
+ pthread_cleanup_pop(true);
}
-static void port_set_state(struct port * p,
- enum port_state state)
+static void flow_set_state(struct fmap * p,
+ enum flow_state state)
{
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_DESTROY) {
- pthread_mutex_unlock(&p->state_lock);
+ if (p->state == FLOW_DESTROY) {
+ pthread_mutex_unlock(&ai.mtx);
return;
}
p->state = state;
- pthread_cond_broadcast(&p->state_cond);
+ pthread_cond_broadcast(&ai.cond);
- pthread_mutex_unlock(&p->state_lock);
+ pthread_mutex_unlock(&ai.mtx);
}
-static enum port_state port_wait_assign(int port_id)
+static enum flow_state flow_wait_assign(int flow_id)
{
- enum port_state state;
- struct port * p;
+ enum flow_state state;
+ struct fmap * p;
- p = &ai.ports[port_id];
+ p = &ai.id_to_fd[flow_id];
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_ID_ASSIGNED) {
- pthread_mutex_unlock(&p->state_lock);
- return PORT_ID_ASSIGNED;
+ if (p->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&ai.mtx);
+ return FLOW_ALLOCATED;
}
- if (p->state == PORT_INIT)
- p->state = PORT_ID_PENDING;
+ if (p->state == FLOW_INIT)
+ p->state = FLOW_ALLOC_PENDING;
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx);
- while (p->state == PORT_ID_PENDING)
- pthread_cond_wait(&p->state_cond, &p->state_lock);
+ while (p->state == FLOW_ALLOC_PENDING)
+ pthread_cond_wait(&ai.cond, &ai.mtx);
- if (p->state == PORT_DESTROY) {
- p->state = PORT_NULL;
- pthread_cond_broadcast(&p->state_cond);
+ if (p->state == FLOW_DESTROY) {
+ p->state = FLOW_NULL;
+ pthread_cond_broadcast(&ai.cond);
}
state = p->state;
- assert(state != PORT_INIT);
+ pthread_cleanup_pop(true);
- pthread_mutex_unlock(&p->state_lock);
+ assert(state != FLOW_INIT);
return state;
}
-static int proc_announce(char * prog)
+static int proc_announce(const char * prog)
+{
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
+
+ if (proc_announce__irm_req_ser(&msg, prog) < 0)
+ return -ENOMEM;
+
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ return irm__irm_result_des(&msg);
+}
+
+/* IRMd will clean up the mess if this fails */
+static void proc_exit(void)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE;
- msg.has_pid = true;
- msg.pid = ai.pid;
- msg.prog = prog;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL) {
- return -EIRMD;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+
+ if (proc_exit__irm_req_ser(&msg) < 0)
+ return;
+
+ send_recv_msg(&msg);
+}
+
+#include "frct.c"
+
+void * flow_tx(void * o)
+{
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
+
+ (void) o;
+
+ while (true) {
+ timerwheel_move();
+
+ nanosleep(&tic, NULL);
}
- if (!recv_msg->has_result || (ret = recv_msg->result)) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return ret;
+ return (void *) 0;
+}
+
+static void flow_send_keepalive(struct flow * flow,
+ struct timespec now)
+{
+ struct shm_du_buff * sdb;
+ ssize_t idx;
+ uint8_t * ptr;
+
+ idx = shm_rdrbuff_alloc(ai.rdrb, 0, &ptr, &sdb);
+ if (idx < 0)
+ return;
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ flow->snd_act = now;
+
+ if (shm_rbuff_write(flow->tx_rb, idx))
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ else
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+/* Needs rdlock on ai. */
+static void _flow_keepalive(struct flow * flow)
+{
+ struct timespec now;
+ struct timespec s_act;
+ struct timespec r_act;
+ int flow_id;
+ time_t timeo;
+ uint32_t acl;
+
+ s_act = flow->snd_act;
+ r_act = flow->rcv_act;
+
+ flow_id = flow->info.id;
+ timeo = flow->info.qs.timeout;
+
+ acl = shm_rbuff_get_acl(flow->rx_rb);
+ if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
+ return;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ if (ts_diff_ns(&r_act, &now) > (int64_t) timeo * MILLION) {
+ shm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER);
+ shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
+ return;
}
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (ts_diff_ns(&s_act, &now) > (int64_t) timeo * (MILLION >> 2)) {
+ pthread_rwlock_unlock(&ai.lock);
- return ret;
+ flow_send_keepalive(flow, now);
+
+ pthread_rwlock_rdlock(&ai.lock);
+ }
+}
+
+static void handle_keepalives(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ list_for_each_safe(p, h, &ai.flow_list) {
+ struct flow * flow;
+ flow = list_entry(p, struct flow, next);
+ _flow_keepalive(flow);
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static void __cleanup_fqueue_destroy(void * fq)
+{
+ fqueue_destroy((fqueue_t *) fq);
+}
+
+void * flow_rx(void * o)
+{
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
+ int ret;
+ struct fqueue * fq;
+
+ (void) o;
+
+ fq = fqueue_create();
+
+ pthread_cleanup_push(__cleanup_fqueue_destroy, fq);
+
+ /* fevent will filter all FRCT packets for us */
+ while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) {
+ if (ret == -ETIMEDOUT) {
+ handle_keepalives();
+ continue;
+ }
+
+ while (fqueue_next(fq) >= 0)
+ ; /* no need to act */
+ }
+
+ pthread_cleanup_pop(true);
+
+ return (void *) 0;
}
static void flow_clear(int fd)
{
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
- ai.flows[fd].port_id = -1;
- ai.flows[fd].pid = -1;
- ai.flows[fd].cube = QOS_CUBE_BE;
+ ai.flows[fd].info.id = -1;
}
-static void flow_fini(int fd)
+static void __flow_fini(int fd)
{
- assert(!(fd < 0));
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+
+ if (ai.flows[fd].frcti != NULL) {
+ ai.n_frcti--;
+ if (ai.n_frcti == 0) {
+ pthread_cancel(ai.tx);
+ pthread_join(ai.tx, NULL);
+ }
- if (ai.flows[fd].port_id != -1) {
- port_destroy(&ai.ports[ai.flows[fd].port_id]);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
+
+ frcti_destroy(ai.flows[fd].frcti);
+ }
+
+ if (ai.flows[fd].info.id != -1) {
+ flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]);
bmp_release(ai.fds, fd);
}
- if (ai.flows[fd].rx_rb != NULL)
+ if (ai.flows[fd].rx_rb != NULL) {
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
shm_rbuff_close(ai.flows[fd].rx_rb);
+ }
- if (ai.flows[fd].tx_rb != NULL)
+ if (ai.flows[fd].tx_rb != NULL) {
+ shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
shm_rbuff_close(ai.flows[fd].tx_rb);
+ }
- if (ai.flows[fd].set != NULL)
+ if (ai.flows[fd].set != NULL) {
+ shm_flow_set_notify(ai.flows[fd].set,
+ ai.flows[fd].info.id,
+ FLOW_DEALLOC);
shm_flow_set_close(ai.flows[fd].set);
+ }
- if (ai.flows[fd].frcti != NULL)
- frcti_destroy(ai.flows[fd].frcti);
+ crypt_fini(&ai.flows[fd].crypt);
+
+ list_del(&ai.flows[fd].next);
flow_clear(fd);
}
-static int flow_init(int port_id,
- pid_t pid,
- qoscube_t qc)
+static void flow_fini(int fd)
{
- int fd;
- int err = -ENOMEM;
+ pthread_rwlock_wrlock(&ai.lock);
+
+ __flow_fini(fd);
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static int flow_init(struct flow_info * info,
+ buffer_t * sk)
+{
+ struct timespec now;
+ struct flow * flow;
+ int fd;
+ int err = -ENOMEM;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_wrlock(&ai.lock);
@@ -275,35 +457,77 @@ static int flow_init(int port_id,
goto fail_fds;
}
- ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, port_id);
- if (ai.flows[fd].rx_rb == NULL)
- goto fail;
+ flow = &ai.flows[fd];
- ai.flows[fd].tx_rb = shm_rbuff_open(pid, port_id);
- if (ai.flows[fd].tx_rb == NULL)
- goto fail;
+ flow->info = *info;
- ai.flows[fd].set = shm_flow_set_open(pid);
- if (ai.flows[fd].set == NULL)
- goto fail;
+ flow->rx_rb = shm_rbuff_open(info->n_pid, info->id);
+ if (flow->rx_rb == NULL)
+ goto fail_rx_rb;
+
+ flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id);
+ if (flow->tx_rb == NULL)
+ goto fail_tx_rb;
+
+ flow->set = shm_flow_set_open(info->n_1_pid);
+ if (flow->set == NULL)
+ goto fail_set;
+
+ flow->oflags = FLOWFDEFAULT;
+ flow->part_idx = NO_PART;
+ flow->snd_act = now;
+ flow->rcv_act = now;
- ai.flows[fd].port_id = port_id;
- ai.flows[fd].oflags = FLOWFDEFAULT;
- ai.flows[fd].pid = pid;
- ai.flows[fd].cube = qc;
- ai.flows[fd].spec = qos_cube_to_spec(qc);
- ai.flows[fd].part_idx = NO_PART;
+ flow->crypt.flags = info->qs.cypher_s; /* TODO: move cypher_s */
- ai.ports[port_id].fd = fd;
+ memset(flow->crypt.key, 0, SYMMKEYSZ);
+
+ if (flow->crypt.flags > 0 && sk!= NULL && sk->data != NULL)
+ memcpy(flow->crypt.key, sk->data , sk->len);
+
+ if (crypt_init(&flow->crypt) < 0)
+ goto fail_crypt;
+
+ assert(flow->frcti == NULL);
+
+ if (info->qs.in_order != 0) {
+ flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl);
+ if (flow->frcti == NULL)
+ goto fail_frcti;
- port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
+ if (shm_flow_set_add(ai.fqset, 0, info->id))
+ goto fail_flow_set_add;
+
+ ++ai.n_frcti;
+ if (ai.n_frcti == 1 &&
+ pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0)
+ goto fail_tx_thread;
+ }
+
+ list_add_tail(&flow->next, &ai.flow_list);
+
+ ai.id_to_fd[info->id].fd = fd;
+
+ flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED);
pthread_rwlock_unlock(&ai.lock);
return fd;
- fail:
- flow_fini(fd);
+ fail_tx_thread:
+ shm_flow_set_del(ai.fqset, 0, info->id);
+ fail_flow_set_add:
+ frcti_destroy(flow->frcti);
+ fail_frcti:
+ crypt_fini(&flow->crypt);
+ fail_crypt:
+ shm_flow_set_close(flow->set);
+ fail_set:
+ shm_rbuff_close(flow->tx_rb);
+ fail_tx_rb:
+ shm_rbuff_close(flow->rx_rb);
+ fail_rx_rb:
+ bmp_release(ai.fds, fd);
fail_fds:
pthread_rwlock_unlock(&ai.lock);
return err;
@@ -323,153 +547,205 @@ static void init(int argc,
char ** argv,
char ** envp)
{
- const char * prog = argv[0];
- int i;
-
+ char * prog = argv[0];
+ int i;
+#ifdef PROC_FLOW_STATS
+ char procstr[32];
+#endif
(void) argc;
(void) envp;
- assert(ai.prog == NULL);
-
if (check_python(argv[0]))
prog = argv[1];
- ai.pid = getpid();
+ prog = path_strip(prog);
+ if (prog == NULL) {
+ fprintf(stderr, "FATAL: Could not determine program name.\n");
+ goto fail_prog;
+ }
+ if (proc_announce(prog)) {
+ fprintf(stderr, "FATAL: Could not announce to IRMd.\n");
+ goto fail_prog;
+ }
+
+#ifdef HAVE_LIBGCRYPT
+ if (!gcry_control(GCRYCTL_INITIALIZATION_FINISHED_P)) {
+ if (!gcry_check_version(GCRYPT_VERSION)) {
+ fprintf(stderr, "FATAL: Could not get gcry version.\n");
+ goto fail_prog;
+ }
+ gcry_control(GCRYCTL_DISABLE_SECMEM, 0);
+ gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0);
+ }
+#endif
ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS);
- if (ai.fds == NULL)
+ if (ai.fds == NULL) {
+ fprintf(stderr, "FATAL: Could not create fd bitmap.\n");
goto fail_fds;
+ }
ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);
- if (ai.fqueues == NULL)
+ if (ai.fqueues == NULL) {
+ fprintf(stderr, "FATAL: Could not create fqueue bitmap.\n");
goto fail_fqueues;
-
- ai.fqset = shm_flow_set_create();
- if (ai.fqset == NULL)
- goto fail_fqset;
+ }
ai.rdrb = shm_rdrbuff_open();
- if (ai.rdrb == NULL)
+ if (ai.rdrb == NULL) {
+ fprintf(stderr, "FATAL: Could not open packet buffer.\n");
goto fail_rdrb;
+ }
ai.flows = malloc(sizeof(*ai.flows) * PROG_MAX_FLOWS);
- if (ai.flows == NULL)
+ if (ai.flows == NULL) {
+ fprintf(stderr, "FATAL: Could not malloc flows.\n");
goto fail_flows;
+ }
for (i = 0; i < PROG_MAX_FLOWS; ++i)
flow_clear(i);
- ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS);
- if (ai.ports == NULL)
- goto fail_ports;
+ ai.id_to_fd = malloc(sizeof(*ai.id_to_fd) * SYS_MAX_FLOWS);
+ if (ai.id_to_fd == NULL) {
+ fprintf(stderr, "FATAL: Could not malloc id_to_fd.\n");
+ goto fail_id_to_fd;
+ }
- if (prog != NULL) {
- ai.prog = strdup(path_strip((char *) prog));
- if (ai.prog == NULL)
- goto fail_prog;
+ for (i = 0; i < SYS_MAX_FLOWS; ++i)
+ ai.id_to_fd[i].state = FLOW_INIT;
- if (proc_announce((char *) ai.prog))
- goto fail_announce;
+ if (pthread_mutex_init(&ai.mtx, NULL)) {
+ fprintf(stderr, "FATAL: Could not init mutex.\n");
+ goto fail_mtx;
}
- for (i = 0; i < SYS_MAX_FLOWS; ++i) {
- ai.ports[i].state = PORT_INIT;
- if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) {
- int j;
- for (j = 0; j < i; ++j)
- pthread_mutex_destroy(&ai.ports[j].state_lock);
- goto fail_announce;
- }
- if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) {
- int j;
- for (j = 0; j < i; ++j)
- pthread_cond_destroy(&ai.ports[j].state_cond);
- goto fail_state_cond;
- }
+ if (pthread_cond_init(&ai.cond, NULL) < 0) {
+ fprintf(stderr, "FATAL: Could not init condvar.\n");
+ goto fail_cond;
}
- if (pthread_rwlock_init(&ai.lock, NULL))
- goto fail_lock;
+ if (pthread_rwlock_init(&ai.lock, NULL) < 0) {
+ fprintf(stderr, "FATAL: Could not initialize flow lock.\n");
+ goto fail_flow_lock;
+ }
+
+ ai.fqset = shm_flow_set_open(getpid());
+ if (ai.fqset == NULL) {
+ fprintf(stderr, "FATAL: Could not open flow set.\n");
+ goto fail_fqset;
+ }
+
+ ai.frct_set = fset_create();
+ if (ai.frct_set == NULL || ai.frct_set->idx != 0) {
+ fprintf(stderr, "FATAL: Could not create FRCT set.\n");
+ goto fail_frct_set;
+ }
+
+ if (timerwheel_init() < 0) {
+ fprintf(stderr, "FATAL: Could not initialize timerwheel.\n");
+ goto fail_timerwheel;
+ }
+
+#if defined PROC_FLOW_STATS
+ if (strstr(argv[0], "ipcpd") == NULL) {
+ sprintf(procstr, "proc.%d", getpid());
+ if (rib_init(procstr) < 0) {
+ fprintf(stderr, "FATAL: Could not initialize RIB.\n");
+ goto fail_rib_init;
+ }
+ }
+#endif
+ if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0) {
+ fprintf(stderr, "FATAL: Could not start monitor thread.\n");
+ goto fail_monitor;
+ }
- if (frct_init())
- goto fail_frct;
+ list_head_init(&ai.flow_list);
return;
- fail_frct:
+ fail_monitor:
+#if defined PROC_FLOW_STATS
+ rib_fini();
+ fail_rib_init:
+#endif
+ timerwheel_fini();
+ fail_timerwheel:
+ fset_destroy(ai.frct_set);
+ fail_frct_set:
+ shm_flow_set_close(ai.fqset);
+ fail_fqset:
pthread_rwlock_destroy(&ai.lock);
- fail_lock:
- for (i = 0; i < SYS_MAX_FLOWS; ++i)
- pthread_cond_destroy(&ai.ports[i].state_cond);
- fail_state_cond:
- for (i = 0; i < SYS_MAX_FLOWS; ++i)
- pthread_mutex_destroy(&ai.ports[i].state_lock);
- fail_announce:
- free(ai.prog);
- fail_prog:
- free(ai.ports);
- fail_ports:
+ fail_flow_lock:
+ pthread_cond_destroy(&ai.cond);
+ fail_cond:
+ pthread_mutex_destroy(&ai.mtx);
+ fail_mtx:
+ free(ai.id_to_fd);
+ fail_id_to_fd:
free(ai.flows);
fail_flows:
shm_rdrbuff_close(ai.rdrb);
fail_rdrb:
- shm_flow_set_destroy(ai.fqset);
- fail_fqset:
bmp_destroy(ai.fqueues);
fail_fqueues:
bmp_destroy(ai.fds);
fail_fds:
- fprintf(stderr, "FATAL: ouroboros-dev init failed. "
- "Make sure an IRMd is running.\n\n");
memset(&ai, 0, sizeof(ai));
+ fail_prog:
exit(EXIT_FAILURE);
}
static void fini(void)
{
- int i = 0;
+ int i;
if (ai.fds == NULL)
return;
- frct_fini();
-
- shm_flow_set_destroy(ai.fqset);
-
- if (ai.prog != NULL)
- free(ai.prog);
+ pthread_cancel(ai.rx);
+ pthread_join(ai.rx, NULL);
pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
- if (ai.flows[i].port_id != -1) {
+ if (ai.flows[i].info.id != -1) {
ssize_t idx;
shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN);
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- flow_fini(i);
+ __flow_fini(i);
}
}
- for (i = 0; i < SYS_MAX_FLOWS; ++i) {
- pthread_mutex_destroy(&ai.ports[i].state_lock);
- pthread_cond_destroy(&ai.ports[i].state_cond);
- }
+ pthread_cond_destroy(&ai.cond);
+ pthread_mutex_destroy(&ai.mtx);
- shm_rdrbuff_close(ai.rdrb);
+ pthread_rwlock_unlock(&ai.lock);
- if (ai.tw != NULL)
- timerwheel_destroy(ai.tw);
+#ifdef PROC_FLOW_STATS
+ rib_fini();
+#endif
+ timerwheel_fini();
+
+ fset_destroy(ai.frct_set);
+
+ shm_flow_set_close(ai.fqset);
+
+ pthread_rwlock_destroy(&ai.lock);
free(ai.flows);
- free(ai.ports);
+ free(ai.id_to_fd);
+
+ shm_rdrbuff_close(ai.rdrb);
bmp_destroy(ai.fds);
bmp_destroy(ai.fqueues);
- pthread_rwlock_unlock(&ai.lock);
+ proc_exit();
- pthread_rwlock_destroy(&ai.lock);
+ memset(&ai, 0, sizeof(ai));
}
#if defined(__MACH__) && defined(__APPLE__)
@@ -486,189 +762,250 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini;
int flow_accept(qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int fd = -1;
-
- msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
- msg.has_pid = true;
- msg.pid = ai.pid;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk;
+ int fd;
+ int err;
+
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
+ memset(&flow, 0, sizeof(flow));
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
+ if (flow_accept__irm_req_ser(&msg, &flow, timeo))
+ return -ENOMEM;
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if (recv_msg->result != 0) {
- int res = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
- return res;
- }
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
- if (!recv_msg->has_pid || !recv_msg->has_port_id ||
- !recv_msg->has_qoscube) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ fd = flow_init(&flow, &sk);
- fd = flow_init(recv_msg->port_id, recv_msg->pid, recv_msg->qoscube);
+ freebuf(sk);
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (qs != NULL)
+ *qs = flow.qs;
- if (fd < 0)
- return fd;
+ return fd;
+}
- pthread_rwlock_wrlock(&ai.lock);
+int flow_alloc(const char * dst,
+ qosspec_t * qs,
+ const struct timespec * timeo)
+{
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk; /* symmetric key */
+ int fd;
+ int err;
+
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
- /* FIXME: check if FRCT is needed based on qc? */
+ memset(&flow, 0, sizeof(flow));
- assert(ai.flows[fd].frcti == NULL);
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- ai.flows[fd].frcti = frcti_create(fd);
- if (ai.flows[fd].frcti == NULL) {
- flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
return -ENOMEM;
- }
- if (qs != NULL)
- *qs = ai.flows[fd].spec;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_unlock(&ai.lock);
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
+
+ fd = flow_init(&flow, &sk);
+
+ freebuf(sk);
+
+ if (qs != NULL)
+ *qs = flow.qs;
return fd;
}
-int flow_alloc(const char * dst,
- qosspec_t * qs,
- const struct timespec * timeo)
+int flow_join(const char * dst,
+ qosspec_t * qs,
+ const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- qoscube_t qc = QOS_CUBE_BE;
- int fd;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int fd;
+ int err;
+
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
+ if (qs != NULL && qs->cypher_s > 0)
+ return -ENOTSUP; /* TODO: Encrypted broadcast */
+
+ memset(&flow, 0, sizeof(flow));
+
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.has_qoscube = true;
- msg.pid = ai.pid;
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
+
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
+
+ fd = flow_init(&flow, NULL);
if (qs != NULL)
- qc = qos_spec_to_cube(*qs);
+ *qs = flow.qs;
- msg.qoscube = qc;
+ return fd;
+}
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
+#define PKT_BUF_LEN 2048
+int flow_dealloc(int fd)
+{
+ struct flow_info info;
+ uint8_t pkt[PKT_BUF_LEN];
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
+ struct timespec timeo = TIMESPEC_INIT_S(0);
+ struct flow * flow;
+ int err;
+
+ if (fd < 0 || fd >= SYS_MAX_FLOWS )
+ return -EINVAL;
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
+ memset(&info, 0, sizeof(flow));
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ flow = &ai.flows[fd];
- if (recv_msg->result != 0) {
- int res = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
- return res;
- }
+ pthread_rwlock_rdlock(&ai.lock);
- if (!recv_msg->has_pid || !recv_msg->has_port_id) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
+ if (flow->info.id < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOTALLOC;
}
- fd = flow_init(recv_msg->port_id, recv_msg->pid, qc);
+ flow->oflags = FLOWFDEFAULT | FLOWFRNOPART;
- irm_msg__free_unpacked(recv_msg, NULL);
+ flow->rcv_timesout = true;
+ flow->rcv_timeo = tic;
- if (fd < 0)
- return fd;
+ pthread_rwlock_unlock(&ai.lock);
- pthread_rwlock_wrlock(&ai.lock);
+ flow_read(fd, buf, SOCK_BUF_SIZE);
+
+ pthread_rwlock_rdlock(&ai.lock);
- /* FIXME: check if FRCT is needed based on qc? */
- assert(ai.flows[fd].frcti == NULL);
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
+ while (timeo.tv_sec < 0) { /* keep the flow active for rtx */
+ ssize_t ret;
- ai.flows[fd].frcti = frcti_create(fd);
- if (ai.flows[fd].frcti == NULL) {
- flow_fini(fd);
pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
+
+ ret = flow_read(fd, pkt, PKT_BUF_LEN);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
+
+ if (ret == -EFLOWDOWN && timeo.tv_sec < 0)
+ timeo.tv_sec = -timeo.tv_sec;
}
- pthread_rwlock_unlock(&ai.lock);
+ pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
- return fd;
+ shm_rbuff_fini(flow->tx_rb);
+
+ pthread_cleanup_pop(true);
+
+ info.id = flow->info.id;
+ info.n_pid = getpid();
+
+ if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0)
+ return -ENOMEM;
+
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ err = irm__irm_result_des(&msg);
+
+ flow_fini(fd);
+
+ return err;
}
-int flow_dealloc(int fd)
+int ipcp_flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
+ struct flow_info info;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct flow * flow;
+ int err;
- if (fd < 0)
+ if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_port_id = true;
- msg.has_pid = true;
- msg.pid = ai.pid;
+ flow = &ai.flows[fd];
+
+ memset(&info, 0, sizeof(flow));
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].port_id >= 0);
+ if (flow->info.id < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOTALLOC;
+ }
- msg.port_id = ai.flows[fd].port_id;
+ info.id = flow->info.id;
+ info.n_1_pid = flow->info.n_1_pid;
pthread_rwlock_unlock(&ai.lock);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_wrlock(&ai.lock);
+ err = irm__irm_result_des(&msg);
flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
-
- return 0;
+ return err;
}
int fccntl(int fd,
int cmd,
...)
{
- uint16_t sflags;
uint32_t * fflags;
uint16_t * cflags;
+ uint16_t csflags;
va_list l;
struct timespec * timeo;
qosspec_t * qs;
@@ -677,7 +1014,7 @@ int fccntl(int fd,
size_t * qlen;
struct flow * flow;
- if (fd < 0 || fd >= PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= SYS_MAX_FLOWS)
return -EBADF;
flow = &ai.flows[fd];
@@ -686,7 +1023,7 @@ int fccntl(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->port_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -725,13 +1062,13 @@ int fccntl(int fd,
goto einval;
if (!flow->rcv_timesout)
goto eperm;
- *timeo = flow->snd_timeo;
+ *timeo = flow->rcv_timeo;
break;
case FLOWGQOSSPEC:
qs = va_arg(l, qosspec_t *);
if (qs == NULL)
goto einval;
- *qs = flow->spec;
+ *qs = flow->info.qs;
break;
case FLOWGRXQLEN:
qlen = va_arg(l, size_t *);
@@ -739,7 +1076,7 @@ int fccntl(int fd,
break;
case FLOWGTXQLEN:
qlen = va_arg(l, size_t *);
- *qlen = shm_rbuff_queued(flow->rx_rb);
+ *qlen = shm_rbuff_queued(flow->tx_rb);
break;
case FLOWSFLAGS:
flow->oflags = va_arg(l, uint32_t);
@@ -757,9 +1094,15 @@ int fccntl(int fd,
if (flow->oflags & FLOWFDOWN) {
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
+ shm_flow_set_notify(flow->set,
+ flow->info.id,
+ FLOW_DOWN);
} else {
rx_acl &= ~ACL_FLOWDOWN;
tx_acl &= ~ACL_FLOWDOWN;
+ shm_flow_set_notify(flow->set,
+ flow->info.id,
+ FLOW_UP);
}
shm_rbuff_set_acl(flow->rx_rb, rx_acl);
@@ -773,17 +1116,18 @@ int fccntl(int fd,
*fflags = flow->oflags;
break;
case FRCTSFLAGS:
- sflags = (uint16_t) va_arg(l, int);
- if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags))
+ csflags = (uint16_t) va_arg(l, uint32_t);
+ if (flow->frcti == NULL)
goto eperm;
+ frcti_setflags(flow->frcti, csflags);
break;
case FRCTGFLAGS:
- cflags = (uint16_t *) va_arg(l, int *);
+ cflags = (uint16_t *) va_arg(l, uint32_t *);
if (cflags == NULL)
goto einval;
if (flow->frcti == NULL)
goto eperm;
- *cflags = frcti_getconf(flow->frcti);
+ *cflags = frcti_getflags(flow->frcti);
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -808,35 +1152,114 @@ int fccntl(int fd,
return -EPERM;
}
+static int chk_crc(struct shm_du_buff * sdb)
+{
+ uint32_t crc;
+ uint8_t * head = shm_du_buff_head(sdb);
+ uint8_t * tail = shm_du_buff_tail_release(sdb, CRCLEN);
+
+ mem_hash(HASH_CRC32, &crc, head, tail - head);
+
+ return !(crc == *((uint32_t *) tail));
+}
+
+static int add_crc(struct shm_du_buff * sdb)
+{
+ uint8_t * head = shm_du_buff_head(sdb);
+ uint8_t * tail = shm_du_buff_tail_alloc(sdb, CRCLEN);
+ if (tail == NULL)
+ return -1;
+
+ mem_hash(HASH_CRC32, tail, head, tail - head);
+
+ return 0;
+}
+
+static int flow_tx_sdb(struct flow * flow,
+ struct shm_du_buff * sdb,
+ bool block,
+ struct timespec * abstime)
+{
+ struct timespec now;
+ ssize_t idx;
+ int ret;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ flow->snd_act = now;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ idx = shm_du_buff_get_idx(sdb);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (shm_du_buff_len(sdb) > 0) {
+ if (frcti_snd(flow->frcti, sdb) < 0)
+ goto enomem;
+
+ if (crypt_encrypt(&flow->crypt, sdb) < 0)
+ goto enomem;
+
+ if (flow->info.qs.ber == 0 && add_crc(sdb) != 0)
+ goto enomem;
+ }
+
+ pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
+
+ if (!block)
+ ret = shm_rbuff_write(flow->tx_rb, idx);
+ else
+ ret = shm_rbuff_write_b(flow->tx_rb, idx, abstime);
+
+ if (ret < 0)
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ else
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+
+ pthread_cleanup_pop(true);
+
+ return 0;
+
+enomem:
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -ENOMEM;
+}
+
ssize_t flow_write(int fd,
const void * buf,
size_t count)
{
- struct flow * flow;
- ssize_t idx;
- int ret;
- int flags;
- struct timespec abs;
- struct timespec * abstime = NULL;
+ struct flow * flow;
+ ssize_t idx;
+ int ret;
+ int flags;
+ struct timespec abs;
+ struct timespec * abstime = NULL;
+ struct shm_du_buff * sdb;
+ uint8_t * ptr;
- if (buf == NULL)
- return 0;
+ if (buf == NULL && count != 0)
+ return -EINVAL;
- if (fd < 0 || fd > PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROG_MAX_FLOWS)
return -EBADF;
flow = &ai.flows[fd];
clock_gettime(PTHREAD_COND_CLOCK, &abs);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
- if (flow->port_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- if (ai.flows[fd].snd_timesout) {
+ if (flow->snd_timesout) {
ts_add(&abs, &flow->snd_timeo, &abs);
abstime = &abs;
}
@@ -848,40 +1271,71 @@ ssize_t flow_write(int fd,
if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
return -EPERM;
- if (flags & FLOWFWNOBLOCK)
- idx = shm_rdrbuff_write(ai.rdrb,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- buf,
- count);
- else /* Blocking. */
- idx = shm_rdrbuff_write_b(ai.rdrb,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- buf,
- count,
- abstime);
+ if (flags & FLOWFWNOBLOCK) {
+ if (!frcti_is_window_open(flow->frcti))
+ return -EAGAIN;
+ idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);
+ } else {
+ ret = frcti_window_wait(flow->frcti, abstime);
+ if (ret < 0)
+ return ret;
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);
+ }
+
if (idx < 0)
return idx;
- if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) {
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -ENOMEM;
- }
+ if (count > 0)
+ memcpy(ptr, buf, count);
- pthread_rwlock_rdlock(&ai.lock);
+ ret = flow_tx_sdb(flow, sdb, !(flags & FLOWFWNOBLOCK), abstime);
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
- if (ret < 0)
- shm_rdrbuff_remove(ai.rdrb, idx);
- else
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ return ret < 0 ? (ssize_t) ret : (ssize_t) count;
+}
+
+static bool invalid_pkt(struct flow * flow,
+ struct shm_du_buff * sdb)
+{
+ if (shm_du_buff_len(sdb) == 0)
+ return true;
+
+ if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0)
+ return true;
+
+ if (crypt_decrypt(&flow->crypt, sdb) < 0)
+ return true;
+
+ return false;
+}
+
+static ssize_t flow_rx_sdb(struct flow * flow,
+ struct shm_du_buff ** sdb,
+ bool block,
+ struct timespec * abstime)
+{
+ ssize_t idx;
+ struct timespec now;
+
+ idx = block ? shm_rbuff_read_b(flow->rx_rb, abstime) :
+ shm_rbuff_read(flow->rx_rb);
+ if (idx < 0)
+ return idx;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ flow->rcv_act = now;
pthread_rwlock_unlock(&ai.lock);
- assert(ret <= 0);
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ if (invalid_pkt(flow, *sdb)) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -EAGAIN;
+ }
- return ret;
+ return idx;
}
ssize_t flow_read(int fd,
@@ -890,78 +1344,101 @@ ssize_t flow_read(int fd,
{
ssize_t idx;
ssize_t n;
- uint8_t * sdu;
- struct shm_rbuff * rb;
+ uint8_t * packet;
struct shm_du_buff * sdb;
struct timespec abs;
+ struct timespec now;
struct timespec * abstime = NULL;
struct flow * flow;
- bool noblock;
+ bool block;
bool partrd;
- if (fd < 0 || fd > PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROG_MAX_FLOWS)
return -EBADF;
flow = &ai.flows[fd];
- if (flow->part_idx == DONE_PART) {
- flow->part_idx = NO_PART;
- return 0;
- }
-
- clock_gettime(PTHREAD_COND_CLOCK, &abs);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_rdlock(&ai.lock);
- if (flow->port_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- rb = flow->rx_rb;
- noblock = flow->oflags & FLOWFRNOBLOCK;
+ if (flow->part_idx == DONE_PART) {
+ pthread_rwlock_unlock(&ai.lock);
+ flow->part_idx = NO_PART;
+ return 0;
+ }
+
+ block = !(flow->oflags & FLOWFRNOBLOCK);
partrd = !(flow->oflags & FLOWFRNOPART);
- if (ai.flows[fd].rcv_timesout) {
- ts_add(&abs, &flow->rcv_timeo, &abs);
+ if (flow->rcv_timesout) {
+ ts_add(&now, &flow->rcv_timeo, &abs);
abstime = &abs;
}
- pthread_rwlock_unlock(&ai.lock);
-
idx = flow->part_idx;
if (idx < 0) {
- idx = frcti_queued_pdu(flow->frcti);
- if (idx < 0) {
- do {
- idx = noblock ? shm_rbuff_read(rb) :
- shm_rbuff_read_b(rb, abstime);
- if (idx < 0)
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+
+ idx = flow_rx_sdb(flow, &sdb, block, abstime);
+ if (idx < 0) {
+ if (block && idx != -EAGAIN)
return idx;
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
- } while (frcti_rcv(flow->frcti, sdb) != 0);
+ if (!block)
+ return idx;
+
+ pthread_rwlock_rdlock(&ai.lock);
+ continue;
+ }
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ frcti_rcv(flow->frcti, sdb);
}
}
- n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ packet = shm_du_buff_head(sdb);
+
+ n = shm_du_buff_len(sdb);
assert(n >= 0);
if (n <= (ssize_t) count) {
- memcpy(buf, sdu, n);
- shm_rdrbuff_remove(ai.rdrb, idx);
+ memcpy(buf, packet, n);
+ ipcp_sdb_release(sdb);
+
+ pthread_rwlock_wrlock(&ai.lock);
+
flow->part_idx = (partrd && n == (ssize_t) count) ?
DONE_PART : NO_PART;
+
+ flow->rcv_act = now;
+
+ pthread_rwlock_unlock(&ai.lock);
return n;
} else {
if (partrd) {
- memcpy(buf, sdu, count);
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ memcpy(buf, packet, count);
shm_du_buff_head_release(sdb, n);
+ pthread_rwlock_wrlock(&ai.lock);
flow->part_idx = idx;
+
+ flow->rcv_act = now;
+
+ pthread_rwlock_unlock(&ai.lock);
return count;
} else {
- shm_rdrbuff_remove(ai.rdrb, idx);
+ ipcp_sdb_release(sdb);
return -EMSGSIZE;
}
}
@@ -969,26 +1446,31 @@ ssize_t flow_read(int fd,
/* fqueue functions. */
-struct flow_set * fset_create()
+struct flow_set * fset_create(void)
{
- struct flow_set * set = malloc(sizeof(*set));
+ struct flow_set * set;
+
+ set = malloc(sizeof(*set));
if (set == NULL)
- return NULL;
+ goto fail_malloc;
assert(ai.fqueues);
pthread_rwlock_wrlock(&ai.lock);
set->idx = bmp_allocate(ai.fqueues);
- if (!bmp_is_id_valid(ai.fqueues, set->idx)) {
- pthread_rwlock_unlock(&ai.lock);
- free(set);
- return NULL;
- }
+ if (!bmp_is_id_valid(ai.fqueues, set->idx))
+ goto fail_bmp_alloc;
pthread_rwlock_unlock(&ai.lock);
return set;
+
+ fail_bmp_alloc:
+ pthread_rwlock_unlock(&ai.lock);
+ free(set);
+ fail_malloc:
+ return NULL;
}
void fset_destroy(struct flow_set * set)
@@ -1007,13 +1489,13 @@ void fset_destroy(struct flow_set * set)
free(set);
}
-struct fqueue * fqueue_create()
+struct fqueue * fqueue_create(void)
{
struct fqueue * fq = malloc(sizeof(*fq));
if (fq == NULL)
return NULL;
- memset(fq->fqueue, -1, (SHM_BUFFER_SIZE) * sizeof(*fq->fqueue));
+ memset(fq->fqueue, -1, SHM_BUFFER_SIZE * sizeof(*fq->fqueue));
fq->fqsize = 0;
fq->next = 0;
@@ -1022,9 +1504,6 @@ struct fqueue * fqueue_create()
void fqueue_destroy(struct fqueue * fq)
{
- if (fq == NULL)
- return;
-
free(fq);
}
@@ -1039,36 +1518,57 @@ void fset_zero(struct flow_set * set)
int fset_add(struct flow_set * set,
int fd)
{
- int ret;
- size_t sdus;
- size_t i;
+ struct flow * flow;
+ int ret;
- if (set == NULL || fd < 0 || fd > PROG_MAX_FLOWS)
+ if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return -EINVAL;
- pthread_rwlock_wrlock(&ai.lock);
+ flow = &ai.flows[fd];
- ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id);
+ pthread_rwlock_rdlock(&ai.lock);
- sdus = shm_rbuff_queued(ai.flows[fd].rx_rb);
- for (i = 0; i < sdus; i++)
- shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id);
+ if (flow->info.id < 0) {
+ ret = -EINVAL;
+ goto fail;
+ }
+
+ if (flow->frcti != NULL)
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
+
+ ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id);
+ if (ret < 0)
+ goto fail;
+
+ if (shm_rbuff_queued(ai.flows[fd].rx_rb))
+ shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
return ret;
+
+ fail:
+ pthread_rwlock_unlock(&ai.lock);
+ return ret;
}
void fset_del(struct flow_set * set,
int fd)
{
- if (set == NULL || fd < 0 || fd > PROG_MAX_FLOWS)
+ struct flow * flow;
+
+ if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return;
- pthread_rwlock_wrlock(&ai.lock);
+ flow = &ai.flows[fd];
- if (ai.flows[fd].port_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id);
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (flow->info.id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, flow->info.id);
+
+ if (flow->frcti != NULL)
+ shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id);
pthread_rwlock_unlock(&ai.lock);
}
@@ -1076,262 +1576,324 @@ void fset_del(struct flow_set * set,
bool fset_has(const struct flow_set * set,
int fd)
{
- bool ret = false;
+ bool ret;
- if (set == NULL || fd < 0)
+ if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return false;
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return false;
}
- ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1);
+ ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1);
pthread_rwlock_unlock(&ai.lock);
return ret;
}
+/* Filter fqueue events for non-data packets */
+static int fqueue_filter(struct fqueue * fq)
+{
+ struct shm_du_buff * sdb;
+ int fd;
+ ssize_t idx;
+ struct frcti * frcti;
+
+ while (fq->next < fq->fqsize) {
+ if (fq->fqueue[fq->next].event != FLOW_PKT)
+ return 1;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ fd = ai.id_to_fd[fq->fqueue[fq->next].flow_id].fd;
+ if (fd < 0) {
+ ++fq->next;
+ pthread_rwlock_unlock(&ai.lock);
+ continue;
+ }
+
+ frcti = ai.flows[fd].frcti;
+ if (frcti == NULL) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ if (__frcti_pdu_ready(frcti) >= 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ idx = flow_rx_sdb(&ai.flows[fd], &sdb, false, NULL);
+ if (idx < 0)
+ return 0;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+ __frcti_rcv(frcti, sdb);
+
+ if (__frcti_pdu_ready(frcti) >= 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ ++fq->next;
+ }
+
+ return 0;
+}
+
int fqueue_next(struct fqueue * fq)
{
- int fd;
+ int fd;
+ struct flowevent * e;
if (fq == NULL)
return -EINVAL;
- if (fq->fqsize == 0)
+ if (fq->fqsize == 0 || fq->next == fq->fqsize)
+ return -EPERM;
+
+ if (fq->next != 0 && fqueue_filter(fq) == 0)
return -EPERM;
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[fq->fqueue[fq->next++]].fd;
+ e = fq->fqueue + fq->next;
- pthread_rwlock_unlock(&ai.lock);
+ fd = ai.id_to_fd[e->flow_id].fd;
- if (fq->next == fq->fqsize) {
- fq->fqsize = 0;
- fq->next = 0;
- }
+ ++fq->next;
+
+ pthread_rwlock_unlock(&ai.lock);
return fd;
}
-int fevent(struct flow_set * set,
- struct fqueue * fq,
- const struct timespec * timeo)
+enum fqtype fqueue_type(struct fqueue * fq)
+{
+ if (fq == NULL)
+ return -EINVAL;
+
+ if (fq->fqsize == 0 || fq->next == 0)
+ return -EPERM;
+
+ return fq->fqueue[(fq->next - 1)].event;
+}
+
+ssize_t fevent(struct flow_set * set,
+ struct fqueue * fq,
+ const struct timespec * timeo)
{
- ssize_t ret;
- struct timespec abstime;
+ ssize_t ret = 0;
+ struct timespec abs;
struct timespec * t = NULL;
if (set == NULL || fq == NULL)
return -EINVAL;
- if (fq->fqsize > 0)
- return fq->fqsize;
+ if (fq->fqsize > 0 && fq->next != fq->fqsize)
+ return 1;
- assert(!fq->next);
+ clock_gettime(PTHREAD_COND_CLOCK, &abs);
if (timeo != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeo, &abstime);
- t = &abstime;
+ ts_add(&abs, timeo, &abs);
+ t = &abs;
}
- ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
- if (ret == -ETIMEDOUT) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
- }
+ while (ret == 0) {
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
+ if (ret == -ETIMEDOUT)
+ return -ETIMEDOUT;
- fq->fqsize = ret;
+ fq->fqsize = ret;
+ fq->next = 0;
- assert(ret);
+ ret = fqueue_filter(fq);
+ }
- return ret;
+ assert(ret != 0);
+
+ return 1;
}
/* ipcp-dev functions. */
-int np1_flow_alloc(pid_t n_pid,
- int port_id,
- qoscube_t qc)
+int np1_flow_alloc(pid_t n_pid,
+ int flow_id)
{
- return flow_init(port_id, n_pid, qc);
+ struct flow_info flow;
+
+ memset(&flow, 0, sizeof(flow));
+
+ flow.id = flow_id;
+ flow.n_pid = getpid();
+ flow.qs = qos_np1;
+ flow.mpl = 0;
+ flow.n_1_pid = n_pid; /* This "flow" is upside-down! */
+
+ return flow_init(&flow, NULL);
}
-int np1_flow_dealloc(int port_id)
+int np1_flow_dealloc(int flow_id,
+ time_t timeo)
{
int fd;
+ /*
+ * TODO: Don't pass timeo to the IPCP but wait in IRMd.
+ * This will need async ops, waiting until we bootstrap
+ * the IRMd over ouroboros.
+ */
+
+ sleep(timeo);
+
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[port_id].fd;
+ fd = ai.id_to_fd[flow_id].fd;
pthread_rwlock_unlock(&ai.lock);
return fd;
}
-int np1_flow_resp(int port_id)
+int np1_flow_resp(int flow_id)
{
int fd;
- if (port_wait_assign(port_id) != PORT_ID_ASSIGNED)
+ if (flow_wait_assign(flow_id) != FLOW_ALLOCATED)
return -1;
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[port_id].fd;
+ fd = ai.id_to_fd[flow_id].fd;
pthread_rwlock_unlock(&ai.lock);
return fd;
}
-int ipcp_create_r(pid_t pid,
- int result)
+int ipcp_create_r(const struct ipcp_info * info)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.has_pid = true;
- msg.pid = pid;
- msg.has_result = true;
- msg.result = result;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (ipcp_create_r__irm_req_ser(&msg,info) < 0)
+ return -ENOMEM;
- return ret;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ return irm__irm_result_des(&msg);
}
-int ipcp_flow_req_arr(pid_t pid,
- const uint8_t * dst,
- size_t len,
- qoscube_t qc)
+int ipcp_flow_req_arr(const buffer_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const buffer_t * data)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int fd = -1;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- if (dst == NULL)
- return -EINVAL;
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_pid = true;
- msg.pid = pid;
- msg.has_hash = true;
- msg.hash.len = len;
- msg.hash.data = (uint8_t *) dst;
- msg.has_qoscube = true;
- msg.qoscube = qc;
+ memset(&flow, 0, sizeof(flow));
- recv_msg = send_recv_irm_msg(&msg);
+ assert(dst != NULL && dst->len != 0 && dst->data != NULL);
- if (recv_msg == NULL)
- return -EIRMD;
+ flow.n_1_pid = getpid();
+ flow.qs = qs;
+ flow.mpl = mpl;
- if (!recv_msg->has_port_id || !recv_msg->has_pid) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0)
+ return -ENOMEM;
- if (recv_msg->has_result && recv_msg->result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- fd = flow_init(recv_msg->port_id, recv_msg->pid, qc);
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ /* inverted for np1_flow */
+ flow.n_1_pid = flow.n_pid;
+ flow.n_pid = getpid();
+ flow.mpl = 0;
- return fd;
+ return flow_init(&flow, NULL);
}
-int ipcp_flow_alloc_reply(int fd,
- int response)
+int ipcp_flow_alloc_reply(int fd,
+ int response,
+ time_t mpl,
+ const buffer_t * data)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg.has_port_id = true;
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
pthread_rwlock_rdlock(&ai.lock);
- msg.port_id = ai.flows[fd].port_id;
+ flow.id = ai.flows[fd].info.id;
pthread_rwlock_unlock(&ai.lock);
- msg.has_response = true;
- msg.response = response;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ flow.mpl = mpl;
- ret = recv_msg->result;
+ if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- return ret;
+ return irm__irm_result_des(&msg);
}
int ipcp_flow_read(int fd,
struct shm_du_buff ** sdb)
{
- struct flow * flow;
- struct shm_rbuff * rb;
- ssize_t idx;
+ struct flow * flow;
+ ssize_t idx = -1;
- assert(fd >= 0);
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(sdb);
flow = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- assert(flow->port_id >= 0);
+ assert(flow->info.id >= 0);
- rb = flow->rx_rb;
+ while (frcti_queued_pdu(flow->frcti) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
- pthread_rwlock_unlock(&ai.lock);
+ idx = flow_rx_sdb(flow, sdb, false, NULL);
+ if (idx < 0)
+ return idx;
- if (flow->frcti != NULL) {
- idx = frcti_queued_pdu(flow->frcti);
- if (idx >= 0) {
- *sdb = shm_rdrbuff_get(ai.rdrb, idx);
- return 0;
- }
+ pthread_rwlock_rdlock(&ai.lock);
+
+ frcti_rcv(flow->frcti, *sdb);
}
- do {
- idx = shm_rbuff_read(rb);
- if (idx < 0)
- return idx;
- *sdb = shm_rdrbuff_get(ai.rdrb, idx);
- } while (frcti_rcv(flow->frcti, *sdb) != 0);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}
@@ -1339,61 +1901,102 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
- struct flow * flow;
- int ret;
- ssize_t idx;
+ struct flow * flow;
+ int ret;
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(sdb);
flow = &ai.flows[fd];
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
- assert(flow->port_id >= 0);
+ if (flow->info.id < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOTALLOC;
+ }
if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
pthread_rwlock_unlock(&ai.lock);
return -EPERM;
}
- assert(flow->tx_rb);
+ pthread_rwlock_unlock(&ai.lock);
- idx = shm_du_buff_get_idx(sdb);
+ ret = flow_tx_sdb(flow, sdb, true, NULL);
+
+ return ret;
+}
- if (frcti_snd(flow->frcti, sdb) < 0) {
+int np1_flow_read(int fd,
+ struct shm_du_buff ** sdb)
+{
+ struct flow * flow;
+ ssize_t idx = -1;
+
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(sdb);
+
+ flow = &ai.flows[fd];
+
+ assert(flow->info.id >= 0);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ idx = shm_rbuff_read(flow->rx_rb);;
+ if (idx < 0) {
pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
+ return idx;
}
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
- if (ret == 0)
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
-
pthread_rwlock_unlock(&ai.lock);
- assert(ret <= 0);
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
- return ret;
+ return 0;
}
-int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
- size_t len)
+int np1_flow_write(int fd,
+ struct shm_du_buff * sdb)
{
- ssize_t idx;
+ struct flow * flow;
+ int ret;
+ ssize_t idx;
- idx = shm_rdrbuff_write_b(ai.rdrb,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- NULL,
- len,
- NULL);
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(sdb);
- if (idx < 0)
- return -1;
+ flow = &ai.flows[fd];
- *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ pthread_rwlock_rdlock(&ai.lock);
- return 0;
+ if (flow->info.id < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOTALLOC;
+ }
+
+ if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -EPERM;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ idx = shm_du_buff_get_idx(sdb);
+
+ ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);
+ if (ret < 0)
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ else
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+
+ return ret;
+}
+
+int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
+ size_t len)
+{
+ return shm_rdrbuff_alloc_b(ai.rdrb, len, NULL, sdb, NULL) < 0 ? -1 : 0;
}
void ipcp_sdb_release(struct shm_du_buff * sdb)
@@ -1401,41 +2004,68 @@ void ipcp_sdb_release(struct shm_du_buff * sdb)
shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
}
-void ipcp_flow_fini(int fd)
+int ipcp_flow_fini(int fd)
{
struct shm_rbuff * rx_rb;
- assert(fd >= 0);
-
- fccntl(fd, FLOWSFLAGS, FLOWFWRONLY);
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
pthread_rwlock_rdlock(&ai.lock);
+ if (ai.flows[fd].info.id < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -1;
+ }
+
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
+
+ shm_flow_set_notify(ai.flows[fd].set,
+ ai.flows[fd].info.id,
+ FLOW_DEALLOC);
+
rx_rb = ai.flows[fd].rx_rb;
pthread_rwlock_unlock(&ai.lock);
if (rx_rb != NULL)
shm_rbuff_fini(rx_rb);
+
+ return 0;
}
int ipcp_flow_get_qoscube(int fd,
qoscube_t * cube)
{
- assert(fd >= 0);
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(cube);
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].port_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
- *cube = ai.flows[fd].cube;
+ *cube = qos_spec_to_cube(ai.flows[fd].info.qs);
pthread_rwlock_unlock(&ai.lock);
return 0;
}
+size_t ipcp_flow_queued(int fd)
+{
+ size_t q;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ assert(ai.flows[fd].info.id >= 0);
+
+ q = shm_rbuff_queued(ai.flows[fd].tx_rb);
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ return q;
+}
+
ssize_t local_flow_read(int fd)
{
ssize_t ret;
@@ -1454,20 +2084,25 @@ ssize_t local_flow_read(int fd)
int local_flow_write(int fd,
size_t idx)
{
- int ret;
+ struct flow * flow;
+ int ret;
assert(fd >= 0);
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);
if (ret == 0)
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+ else
+ shm_rdrbuff_remove(ai.rdrb, idx);
pthread_rwlock_unlock(&ai.lock);