summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c1040
1 files changed, 500 insertions, 540 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 8a723b63..92310b9e 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2022
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* API for applications
*
@@ -28,28 +28,36 @@
#include "config.h"
-#include <ouroboros/hash.h>
-#include <ouroboros/cacep.h>
-#include <ouroboros/errno.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/cep.h>
+#include <ouroboros/crypt.h>
#include <ouroboros/dev.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
+#include <ouroboros/flow.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/hash.h>
+#include <ouroboros/ipcp.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/list.h>
#include <ouroboros/local-dev.h>
-#include <ouroboros/sockets.h>
-#include <ouroboros/fccntl.h>
-#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
#include <ouroboros/pthread.h>
#include <ouroboros/random.h>
+#include <ouroboros/serdes-irm.h>
#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_rbuff.h>
+#include <ouroboros/sockets.h>
#include <ouroboros/utils.h>
-#include <ouroboros/fqueue.h>
#ifdef PROC_FLOW_STATS
#include <ouroboros/rib.h>
#endif
+#ifdef HAVE_LIBGCRYPT
+#include <gcrypt.h>
+#endif
+#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
@@ -67,23 +75,13 @@
#define CRCLEN (sizeof(uint32_t))
#define SECMEMSZ 16384
-#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
-enum port_state {
- PORT_NULL = 0,
- PORT_INIT,
- PORT_ID_PENDING,
- PORT_ID_ASSIGNED,
- PORT_DESTROY
-};
-
-struct port {
+/* map flow_ids to flow descriptors; track state of the flow */
+struct fmap {
int fd;
-
- enum port_state state;
- pthread_mutex_t state_lock;
- pthread_cond_t state_cond;
+ /* TODO: use actual flow state */
+ enum flow_state state;
};
#define frcti_to_flow(frcti) \
@@ -92,18 +90,16 @@ struct port {
struct flow {
struct list_head next;
+ struct flow_info info;
+
struct shm_rbuff * rx_rb;
struct shm_rbuff * tx_rb;
struct shm_flow_set * set;
- int flow_id;
+
uint16_t oflags;
- qosspec_t qs;
ssize_t part_idx;
- void * ctx;
- uint8_t key[SYMMKEYSZ];
-
- pid_t pid;
+ struct crypt_info crypt;
struct timespec snd_act;
struct timespec rcv_act;
@@ -122,7 +118,7 @@ struct flow_set {
};
struct fqueue {
- struct portevent fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
+ struct flowevent fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
size_t fqsize;
size_t next;
};
@@ -135,9 +131,12 @@ struct {
struct bmp * fqueues;
struct flow * flows;
- struct port * ports;
+ struct fmap * id_to_fd;
struct list_head flow_list;
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+
pthread_t tx;
pthread_t rx;
size_t n_frcti;
@@ -146,113 +145,118 @@ struct {
pthread_rwlock_t lock;
} ai;
-static void port_destroy(struct port * p)
+static void flow_destroy(struct fmap * p)
{
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_DESTROY) {
- pthread_mutex_unlock(&p->state_lock);
+ if (p->state == FLOW_DESTROY) {
+ pthread_mutex_unlock(&ai.mtx);
return;
}
- if (p->state == PORT_ID_PENDING)
- p->state = PORT_DESTROY;
+ if (p->state == FLOW_ALLOC_PENDING)
+ p->state = FLOW_DESTROY;
else
- p->state = PORT_NULL;
+ p->state = FLOW_NULL;
- pthread_cond_signal(&p->state_cond);
+ pthread_cond_signal(&ai.cond);
- while (p->state != PORT_NULL)
- pthread_cond_wait(&p->state_cond, &p->state_lock);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx);
+
+ while (p->state != FLOW_NULL)
+ pthread_cond_wait(&ai.cond, &ai.mtx);
p->fd = -1;
- p->state = PORT_INIT;
+ p->state = FLOW_INIT;
- pthread_mutex_unlock(&p->state_lock);
+ pthread_cleanup_pop(true);
}
-static void port_set_state(struct port * p,
- enum port_state state)
+static void flow_set_state(struct fmap * p,
+ enum flow_state state)
{
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_DESTROY) {
- pthread_mutex_unlock(&p->state_lock);
+ if (p->state == FLOW_DESTROY) {
+ pthread_mutex_unlock(&ai.mtx);
return;
}
p->state = state;
- pthread_cond_broadcast(&p->state_cond);
+ pthread_cond_broadcast(&ai.cond);
- pthread_mutex_unlock(&p->state_lock);
+ pthread_mutex_unlock(&ai.mtx);
}
-static enum port_state port_wait_assign(int flow_id)
+static enum flow_state flow_wait_assign(int flow_id)
{
- enum port_state state;
- struct port * p;
+ enum flow_state state;
+ struct fmap * p;
- p = &ai.ports[flow_id];
+ p = &ai.id_to_fd[flow_id];
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_ID_ASSIGNED) {
- pthread_mutex_unlock(&p->state_lock);
- return PORT_ID_ASSIGNED;
+ if (p->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&ai.mtx);
+ return FLOW_ALLOCATED;
}
- if (p->state == PORT_INIT)
- p->state = PORT_ID_PENDING;
+ if (p->state == FLOW_INIT)
+ p->state = FLOW_ALLOC_PENDING;
- while (p->state == PORT_ID_PENDING)
- pthread_cond_wait(&p->state_cond, &p->state_lock);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx);
- if (p->state == PORT_DESTROY) {
- p->state = PORT_NULL;
- pthread_cond_broadcast(&p->state_cond);
+ while (p->state == FLOW_ALLOC_PENDING)
+ pthread_cond_wait(&ai.cond, &ai.mtx);
+
+ if (p->state == FLOW_DESTROY) {
+ p->state = FLOW_NULL;
+ pthread_cond_broadcast(&ai.cond);
}
state = p->state;
- assert(state != PORT_INIT);
+ pthread_cleanup_pop(true);
- pthread_mutex_unlock(&p->state_lock);
+ assert(state != FLOW_INIT);
return state;
}
-static int proc_announce(char * prog)
+static int proc_announce(const char * prog)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.prog = prog;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL) {
- return -EIRMD;
- }
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- if (!recv_msg->has_result || (ret = recv_msg->result)) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return ret;
- }
+ if (proc_announce__irm_req_ser(&msg, prog) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- return ret;
+ return irm__irm_result_des(&msg);
+}
+
+/* IRMd will clean up the mess if this fails */
+static void proc_exit(void)
+{
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+
+ if (proc_exit__irm_req_ser(&msg) < 0)
+ return;
+
+ send_recv_msg(&msg);
}
-#include "crypt.c"
#include "frct.c"
void * flow_tx(void * o)
{
- struct timespec tic = {0, TICTIME};
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
(void) o;
@@ -283,7 +287,7 @@ static void flow_send_keepalive(struct flow * flow,
if (shm_rbuff_write(flow->tx_rb, idx))
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
}
@@ -295,14 +299,14 @@ static void _flow_keepalive(struct flow * flow)
struct timespec s_act;
struct timespec r_act;
int flow_id;
- uint32_t timeo;
+ time_t timeo;
uint32_t acl;
s_act = flow->snd_act;
r_act = flow->rcv_act;
- flow_id = flow->flow_id;
- timeo = flow->qs.timeout;
+ flow_id = flow->info.id;
+ timeo = flow->info.qs.timeout;
acl = shm_rbuff_get_acl(flow->rx_rb);
if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
@@ -310,13 +314,13 @@ static void _flow_keepalive(struct flow * flow)
clock_gettime(PTHREAD_COND_CLOCK, &now);
- if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
+ if (ts_diff_ns(&r_act, &now) > (int64_t) timeo * MILLION) {
shm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER);
shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
return;
}
- if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) {
+ if (ts_diff_ns(&s_act, &now) > (int64_t) timeo * (MILLION >> 2)) {
pthread_rwlock_unlock(&ai.lock);
flow_send_keepalive(flow, now);
@@ -348,7 +352,7 @@ static void __cleanup_fqueue_destroy(void * fq)
void * flow_rx(void * o)
{
- struct timespec tic = {0, TICTIME};
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
int ret;
struct fqueue * fq;
@@ -378,11 +382,10 @@ static void flow_clear(int fd)
{
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
- ai.flows[fd].flow_id = -1;
- ai.flows[fd].pid = -1;
+ ai.flows[fd].info.id = -1;
}
-static void flow_fini(int fd)
+static void __flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
@@ -393,13 +396,13 @@ static void flow_fini(int fd)
pthread_join(ai.tx, NULL);
}
- shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
frcti_destroy(ai.flows[fd].frcti);
}
- if (ai.flows[fd].flow_id != -1) {
- port_destroy(&ai.ports[ai.flows[fd].flow_id]);
+ if (ai.flows[fd].info.id != -1) {
+ flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]);
bmp_release(ai.fds, fd);
}
@@ -415,24 +418,29 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL) {
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].flow_id,
+ ai.flows[fd].info.id,
FLOW_DEALLOC);
shm_flow_set_close(ai.flows[fd].set);
}
- if (ai.flows[fd].ctx != NULL)
- crypt_fini(ai.flows[fd].ctx);
+ crypt_fini(&ai.flows[fd].crypt);
list_del(&ai.flows[fd].next);
flow_clear(fd);
}
-static int flow_init(int flow_id,
- pid_t pid,
- qosspec_t qs,
- uint8_t * s,
- time_t mpl)
+static void flow_fini(int fd)
+{
+ pthread_rwlock_wrlock(&ai.lock);
+
+ __flow_fini(fd);
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static int flow_init(struct flow_info * info,
+ buffer_t * sk)
{
struct timespec now;
struct flow * flow;
@@ -451,42 +459,43 @@ static int flow_init(int flow_id,
flow = &ai.flows[fd];
- flow->rx_rb = shm_rbuff_open(getpid(), flow_id);
+ flow->info = *info;
+
+ flow->rx_rb = shm_rbuff_open(info->n_pid, info->id);
if (flow->rx_rb == NULL)
goto fail_rx_rb;
- flow->tx_rb = shm_rbuff_open(pid, flow_id);
+ flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id);
if (flow->tx_rb == NULL)
goto fail_tx_rb;
- flow->set = shm_flow_set_open(pid);
+ flow->set = shm_flow_set_open(info->n_1_pid);
if (flow->set == NULL)
goto fail_set;
- flow->flow_id = flow_id;
flow->oflags = FLOWFDEFAULT;
- flow->pid = pid;
flow->part_idx = NO_PART;
- flow->qs = qs;
flow->snd_act = now;
flow->rcv_act = now;
- if (qs.cypher_s > 0) {
- assert(s != NULL);
- if (crypt_init(&flow->ctx) < 0)
- goto fail_ctx;
+ flow->crypt.flags = info->qs.cypher_s; /* TODO: move cypher_s */
- memcpy(flow->key, s, SYMMKEYSZ);
- }
+ memset(flow->crypt.key, 0, SYMMKEYSZ);
+
+ if (flow->crypt.flags > 0 && sk!= NULL && sk->data != NULL)
+ memcpy(flow->crypt.key, sk->data , sk->len);
+
+ if (crypt_init(&flow->crypt) < 0)
+ goto fail_crypt;
assert(flow->frcti == NULL);
- if (flow->qs.in_order != 0) {
- flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl);
+ if (info->qs.in_order != 0) {
+ flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl);
if (flow->frcti == NULL)
goto fail_frcti;
- if (shm_flow_set_add(ai.fqset, 0, flow_id))
+ if (shm_flow_set_add(ai.fqset, 0, info->id))
goto fail_flow_set_add;
++ai.n_frcti;
@@ -497,21 +506,21 @@ static int flow_init(int flow_id,
list_add_tail(&flow->next, &ai.flow_list);
- ai.ports[flow_id].fd = fd;
+ ai.id_to_fd[info->id].fd = fd;
- port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED);
+ flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED);
pthread_rwlock_unlock(&ai.lock);
return fd;
fail_tx_thread:
- shm_flow_set_del(ai.fqset, 0, flow_id);
+ shm_flow_set_del(ai.fqset, 0, info->id);
fail_flow_set_add:
frcti_destroy(flow->frcti);
fail_frcti:
- crypt_fini(flow->ctx);
- fail_ctx:
+ crypt_fini(&flow->crypt);
+ fail_crypt:
shm_flow_set_close(flow->set);
fail_set:
shm_rbuff_close(flow->tx_rb);
@@ -549,85 +558,110 @@ static void init(int argc,
if (check_python(argv[0]))
prog = argv[1];
+ prog = path_strip(prog);
+ if (prog == NULL) {
+ fprintf(stderr, "FATAL: Could not determine program name.\n");
+ goto fail_prog;
+ }
+
+ if (proc_announce(prog)) {
+ fprintf(stderr, "FATAL: Could not announce to IRMd.\n");
+ goto fail_prog;
+ }
+
#ifdef HAVE_LIBGCRYPT
if (!gcry_control(GCRYCTL_INITIALIZATION_FINISHED_P)) {
- if (!gcry_check_version(GCRYPT_VERSION))
- goto fail_fds;
+ if (!gcry_check_version(GCRYPT_VERSION)) {
+ fprintf(stderr, "FATAL: Could not get gcry version.\n");
+ goto fail_prog;
+ }
gcry_control(GCRYCTL_DISABLE_SECMEM, 0);
gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0);
}
#endif
- list_head_init(&ai.flow_list);
-
ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS);
- if (ai.fds == NULL)
+ if (ai.fds == NULL) {
+ fprintf(stderr, "FATAL: Could not create fd bitmap.\n");
goto fail_fds;
+ }
ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);
- if (ai.fqueues == NULL)
+ if (ai.fqueues == NULL) {
+ fprintf(stderr, "FATAL: Could not create fqueue bitmap.\n");
goto fail_fqueues;
+ }
ai.rdrb = shm_rdrbuff_open();
- if (ai.rdrb == NULL)
+ if (ai.rdrb == NULL) {
+ fprintf(stderr, "FATAL: Could not open packet buffer.\n");
goto fail_rdrb;
+ }
ai.flows = malloc(sizeof(*ai.flows) * PROG_MAX_FLOWS);
- if (ai.flows == NULL)
+ if (ai.flows == NULL) {
+ fprintf(stderr, "FATAL: Could not malloc flows.\n");
goto fail_flows;
+ }
for (i = 0; i < PROG_MAX_FLOWS; ++i)
flow_clear(i);
- ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS);
- if (ai.ports == NULL)
- goto fail_ports;
+ ai.id_to_fd = malloc(sizeof(*ai.id_to_fd) * SYS_MAX_FLOWS);
+ if (ai.id_to_fd == NULL) {
+ fprintf(stderr, "FATAL: Could not malloc id_to_fd.\n");
+ goto fail_id_to_fd;
+ }
- if (prog != NULL) {
- prog = path_strip(prog);
- if (proc_announce(prog))
- goto fail_announce;
+ for (i = 0; i < SYS_MAX_FLOWS; ++i)
+ ai.id_to_fd[i].state = FLOW_INIT;
+
+ if (pthread_mutex_init(&ai.mtx, NULL)) {
+ fprintf(stderr, "FATAL: Could not init mutex.\n");
+ goto fail_mtx;
}
- for (i = 0; i < SYS_MAX_FLOWS; ++i) {
- ai.ports[i].state = PORT_INIT;
- if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) {
- int j;
- for (j = 0; j < i; ++j)
- pthread_mutex_destroy(&ai.ports[j].state_lock);
- goto fail_announce;
- }
- if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) {
- int j;
- for (j = 0; j < i; ++j)
- pthread_cond_destroy(&ai.ports[j].state_cond);
- goto fail_state_cond;
- }
+ if (pthread_cond_init(&ai.cond, NULL) < 0) {
+ fprintf(stderr, "FATAL: Could not init condvar.\n");
+ goto fail_cond;
}
- if (pthread_rwlock_init(&ai.lock, NULL))
- goto fail_lock;
+ if (pthread_rwlock_init(&ai.lock, NULL) < 0) {
+ fprintf(stderr, "FATAL: Could not initialize flow lock.\n");
+ goto fail_flow_lock;
+ }
ai.fqset = shm_flow_set_open(getpid());
- if (ai.fqset == NULL)
+ if (ai.fqset == NULL) {
+ fprintf(stderr, "FATAL: Could not open flow set.\n");
goto fail_fqset;
+ }
ai.frct_set = fset_create();
- if (ai.frct_set == NULL || ai.frct_set->idx != 0)
+ if (ai.frct_set == NULL || ai.frct_set->idx != 0) {
+ fprintf(stderr, "FATAL: Could not create FRCT set.\n");
goto fail_frct_set;
+ }
- if (timerwheel_init() < 0)
+ if (timerwheel_init() < 0) {
+ fprintf(stderr, "FATAL: Could not initialize timerwheel.\n");
goto fail_timerwheel;
+ }
#if defined PROC_FLOW_STATS
if (strstr(argv[0], "ipcpd") == NULL) {
sprintf(procstr, "proc.%d", getpid());
- /* Don't bail on fail, it just won't show metrics */
- if (rib_init(procstr) < 0)
+ if (rib_init(procstr) < 0) {
+ fprintf(stderr, "FATAL: Could not initialize RIB.\n");
goto fail_rib_init;
+ }
}
#endif
- if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0)
+ if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0) {
+ fprintf(stderr, "FATAL: Could not start monitor thread.\n");
goto fail_monitor;
+ }
+
+ list_head_init(&ai.flow_list);
return;
@@ -643,15 +677,13 @@ static void init(int argc,
shm_flow_set_close(ai.fqset);
fail_fqset:
pthread_rwlock_destroy(&ai.lock);
- fail_lock:
- for (i = 0; i < SYS_MAX_FLOWS; ++i)
- pthread_cond_destroy(&ai.ports[i].state_cond);
- fail_state_cond:
- for (i = 0; i < SYS_MAX_FLOWS; ++i)
- pthread_mutex_destroy(&ai.ports[i].state_lock);
- fail_announce:
- free(ai.ports);
- fail_ports:
+ fail_flow_lock:
+ pthread_cond_destroy(&ai.cond);
+ fail_cond:
+ pthread_mutex_destroy(&ai.mtx);
+ fail_mtx:
+ free(ai.id_to_fd);
+ fail_id_to_fd:
free(ai.flows);
fail_flows:
shm_rdrbuff_close(ai.rdrb);
@@ -660,58 +692,60 @@ static void init(int argc,
fail_fqueues:
bmp_destroy(ai.fds);
fail_fds:
- fprintf(stderr, "FATAL: ouroboros-dev init failed. "
- "Make sure an IRMd is running.\n\n");
memset(&ai, 0, sizeof(ai));
+ fail_prog:
exit(EXIT_FAILURE);
}
static void fini(void)
{
- int i = 0;
-#ifdef PROC_FLOW_STATS
- rib_fini();
-#endif
+ int i;
+
if (ai.fds == NULL)
return;
pthread_cancel(ai.rx);
pthread_join(ai.rx, NULL);
- fset_destroy(ai.frct_set);
-
pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
- if (ai.flows[i].flow_id != -1) {
+ if (ai.flows[i].info.id != -1) {
ssize_t idx;
shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN);
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- flow_fini(i);
+ __flow_fini(i);
}
}
- shm_flow_set_close(ai.fqset);
+ pthread_cond_destroy(&ai.cond);
+ pthread_mutex_destroy(&ai.mtx);
- for (i = 0; i < SYS_MAX_FLOWS; ++i) {
- pthread_mutex_destroy(&ai.ports[i].state_lock);
- pthread_cond_destroy(&ai.ports[i].state_cond);
- }
+ pthread_rwlock_unlock(&ai.lock);
+#ifdef PROC_FLOW_STATS
+ rib_fini();
+#endif
timerwheel_fini();
- shm_rdrbuff_close(ai.rdrb);
+ fset_destroy(ai.frct_set);
+
+ shm_flow_set_close(ai.fqset);
+
+ pthread_rwlock_destroy(&ai.lock);
free(ai.flows);
- free(ai.ports);
+ free(ai.id_to_fd);
+
+ shm_rdrbuff_close(ai.rdrb);
bmp_destroy(ai.fds);
bmp_destroy(ai.fqueues);
- pthread_rwlock_unlock(&ai.lock);
+ proc_exit();
- pthread_rwlock_destroy(&ai.lock);
+ memset(&ai, 0, sizeof(ai));
}
#if defined(__MACH__) && defined(__APPLE__)
@@ -728,286 +762,241 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini;
int flow_accept(qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
- void * pkp; /* public key pair */
- uint8_t s[SYMMKEYSZ]; /* secret key for flow */
- uint8_t buf[MSGBUFSZ];
- int err = -EIRMD;
- ssize_t key_len;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk;
+ int fd;
+ int err;
- memset(s, 0, SYMMKEYSZ);
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
+ memset(&flow, 0, sizeof(flow));
- msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
- msg.has_pid = true;
- msg.pid = getpid();
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
+ if (flow_accept__irm_req_ser(&msg, &flow, timeo))
+ return -ENOMEM;
- key_len = crypt_dh_pkp_create(&pkp, buf);
- if (key_len < 0) {
- err = -ECRYPT;
- goto fail_crypt_pkp;
- }
- if (key_len > 0) {
- msg.has_pk = true;
- msg.pk.data = buf;
- msg.pk.len = (uint32_t) key_len;
- }
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_cleanup_push(crypt_dh_pkp_destroy, pkp);
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
- recv_msg = send_recv_irm_msg(&msg);
+ fd = flow_init(&flow, &sk);
- pthread_cleanup_pop(false);
+ freebuf(sk);
- if (recv_msg == NULL)
- goto fail_recv;
+ if (qs != NULL)
+ *qs = flow.qs;
- if (!recv_msg->has_result)
- goto fail_msg;
+ return fd;
+}
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_msg;
- }
+int flow_alloc(const char * dst,
+ qosspec_t * qs,
+ const struct timespec * timeo)
+{
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk; /* symmetric key */
+ int fd;
+ int err;
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl || recv_msg->qosspec == NULL)
- goto fail_msg;
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
- if (recv_msg->pk.len != 0 &&
- crypt_dh_derive(pkp, recv_msg->pk.data,
- recv_msg->pk.len, s) < 0) {
- err = -ECRYPT;
- goto fail_msg;
- }
+ memset(&flow, 0, sizeof(flow));
- crypt_dh_pkp_destroy(pkp);
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- msg_to_spec(recv_msg->qosspec), s,
- recv_msg->mpl);
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if (fd < 0)
- return fd;
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
+ fd = flow_init(&flow, &sk);
- pthread_rwlock_rdlock(&ai.lock);
+ freebuf(sk);
if (qs != NULL)
- *qs = ai.flows[fd].qs;
-
- pthread_rwlock_unlock(&ai.lock);
+ *qs = flow.qs;
return fd;
-
- fail_msg:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_recv:
- crypt_dh_pkp_destroy(pkp);
- fail_crypt_pkp:
- return err;
}
-static int __flow_alloc(const char * dst,
- qosspec_t * qs,
- const struct timespec * timeo,
- bool join)
+int flow_join(const char * dst,
+ qosspec_t * qs,
+ const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
- void * pkp = NULL; /* public key pair */
- uint8_t s[SYMMKEYSZ]; /* secret key for flow */
- uint8_t buf[MSGBUFSZ];
- int err = -EIRMD;
-
- memset(s, 0, SYMMKEYSZ);
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int fd;
+ int err;
#ifdef QOS_DISABLE_CRC
if (qs != NULL)
qs->ber = 1;
#endif
- msg.code = join ? IRM_MSG_CODE__IRM_FLOW_JOIN
- : IRM_MSG_CODE__IRM_FLOW_ALLOC;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.pid = getpid();
- qs_msg = spec_to_msg(qs);
- msg.qosspec = &qs_msg;
-
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
-
- if (!join && qs != NULL && qs->cypher_s != 0) {
- ssize_t key_len;
-
- key_len = crypt_dh_pkp_create(&pkp, buf);
- if (key_len < 0) {
- err = -ECRYPT;
- goto fail_crypt_pkp;
- }
+ if (qs != NULL && qs->cypher_s > 0)
+ return -ENOTSUP; /* TODO: Encrypted broadcast */
- msg.has_pk = true;
- msg.pk.data = buf;
- msg.pk.len = (uint32_t) key_len;
- }
+ memset(&flow, 0, sizeof(flow));
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- goto fail_send;
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- if (!recv_msg->has_result)
- goto fail_result;
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_result;
- }
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl)
- goto fail_result;
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
- if (!join && qs != NULL && qs->cypher_s != 0) {
- if (!recv_msg->has_pk || recv_msg->pk.len == 0) {
- err = -ECRYPT;
- goto fail_result;
- }
+ fd = flow_init(&flow, NULL);
- if (crypt_dh_derive(pkp, recv_msg->pk.data,
- recv_msg->pk.len, s) < 0) {
- err = -ECRYPT;
- goto fail_result;
- }
-
- crypt_dh_pkp_destroy(pkp);
- }
-
-
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qs == NULL ? qos_raw : *qs, s,
- recv_msg->mpl);
-
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (qs != NULL)
+ *qs = flow.qs;
return fd;
-
- fail_result:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_send:
- crypt_dh_pkp_destroy(pkp);
- fail_crypt_pkp:
- return err;
-}
-
-int flow_alloc(const char * dst,
- qosspec_t * qs,
- const struct timespec * timeo)
-{
- return __flow_alloc(dst, qs, timeo, false);
-}
-
-int flow_join(const char * dst,
- qosspec_t * qs,
- const struct timespec * timeo)
-{
- if (qs != NULL && qs->cypher_s != 0)
- return -ECRYPT;
-
- return __flow_alloc(dst, qs, timeo, true);
}
+#define PKT_BUF_LEN 2048
int flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- struct flow * f;
- time_t timeo;
+ struct flow_info info;
+ uint8_t pkt[PKT_BUF_LEN];
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
+ struct timespec timeo = TIMESPEC_INIT_S(0);
+ struct flow * flow;
+ int err;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_flow_id = true;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_nsec = 0;
+ memset(&info, 0, sizeof(flow));
- f = &ai.flows[fd];
+ flow = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (f->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = f->flow_id;
+ flow->oflags = FLOWFDEFAULT | FLOWFRNOPART;
- timeo = frcti_dealloc(f->frcti);
- while (timeo < 0) { /* keep the flow active for rtx */
- ssize_t ret;
- uint8_t buf[128];
- struct timespec tic = {0, TICTIME};
+ flow->rcv_timesout = true;
+ flow->rcv_timeo = tic;
+
+ pthread_rwlock_unlock(&ai.lock);
- f->oflags = FLOWFDEFAULT | FLOWFRNOPART;
+ flow_read(fd, buf, SOCK_BUF_SIZE);
- f->rcv_timesout = true;
- f->rcv_timeo = tic;
+ pthread_rwlock_rdlock(&ai.lock);
+
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
+ while (timeo.tv_sec < 0) { /* keep the flow active for rtx */
+ ssize_t ret;
pthread_rwlock_unlock(&ai.lock);
- ret = flow_read(fd, buf, 128);
+ ret = flow_read(fd, pkt, PKT_BUF_LEN);
pthread_rwlock_rdlock(&ai.lock);
- timeo = frcti_dealloc(f->frcti);
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
- if (ret == -EFLOWDOWN && timeo < 0)
- timeo = -timeo;
+ if (ret == -EFLOWDOWN && timeo.tv_sec < 0)
+ timeo.tv_sec = -timeo.tv_sec;
}
- msg.timeo_sec = timeo;
+ pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
- shm_rbuff_fini(ai.flows[fd].tx_rb);
+ shm_rbuff_fini(flow->tx_rb);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_cleanup_pop(true);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
+ info.id = flow->info.id;
+ info.n_pid = getpid();
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_wrlock(&ai.lock);
+ err = irm__irm_result_des(&msg);
flow_fini(fd);
+ return err;
+}
+
+int ipcp_flow_dealloc(int fd)
+{
+ struct flow_info info;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct flow * flow;
+ int err;
+
+ if (fd < 0 || fd >= SYS_MAX_FLOWS )
+ return -EINVAL;
+
+ flow = &ai.flows[fd];
+
+ memset(&info, 0, sizeof(flow));
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (flow->info.id < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOTALLOC;
+ }
+
+ info.id = flow->info.id;
+ info.n_1_pid = flow->info.n_1_pid;
+
pthread_rwlock_unlock(&ai.lock);
- return 0;
+ if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0)
+ return -ENOMEM;
+
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ err = irm__irm_result_des(&msg);
+
+ flow_fini(fd);
+
+ return err;
}
int fccntl(int fd,
@@ -1034,7 +1023,7 @@ int fccntl(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -1073,13 +1062,13 @@ int fccntl(int fd,
goto einval;
if (!flow->rcv_timesout)
goto eperm;
- *timeo = flow->snd_timeo;
+ *timeo = flow->rcv_timeo;
break;
case FLOWGQOSSPEC:
qs = va_arg(l, qosspec_t *);
if (qs == NULL)
goto einval;
- *qs = flow->qs;
+ *qs = flow->info.qs;
break;
case FLOWGRXQLEN:
qlen = va_arg(l, size_t *);
@@ -1106,13 +1095,13 @@ int fccntl(int fd,
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->flow_id,
+ flow->info.id,
FLOW_DOWN);
} else {
rx_acl &= ~ACL_FLOWDOWN;
tx_acl &= ~ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->flow_id,
+ flow->info.id,
FLOW_UP);
}
@@ -1211,10 +1200,10 @@ static int flow_tx_sdb(struct flow * flow,
if (frcti_snd(flow->frcti, sdb) < 0)
goto enomem;
- if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
+ if (crypt_encrypt(&flow->crypt, sdb) < 0)
goto enomem;
- if (flow->qs.ber == 0 && add_crc(sdb) != 0)
+ if (flow->info.qs.ber == 0 && add_crc(sdb) != 0)
goto enomem;
}
@@ -1228,7 +1217,7 @@ static int flow_tx_sdb(struct flow * flow,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
pthread_cleanup_pop(true);
@@ -1265,7 +1254,7 @@ ssize_t flow_write(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1287,10 +1276,9 @@ ssize_t flow_write(int fd,
return -EAGAIN;
idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);
} else {
- while ((ret = frcti_window_wait(flow->frcti, abstime)) < 0) {
- if (ret < 0)
- return ret;
- }
+ ret = frcti_window_wait(flow->frcti, abstime);
+ if (ret < 0)
+ return ret;
idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);
}
@@ -1311,10 +1299,10 @@ static bool invalid_pkt(struct flow * flow,
if (shm_du_buff_len(sdb) == 0)
return true;
- if (flow->qs.ber == 0 && chk_crc(sdb) != 0)
+ if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0)
return true;
- if (flow->qs.cypher_s > 0 && crypt_decrypt(flow, sdb) < 0)
+ if (crypt_decrypt(&flow->crypt, sdb) < 0)
return true;
return false;
@@ -1374,7 +1362,7 @@ ssize_t flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1458,7 +1446,7 @@ ssize_t flow_read(int fd,
/* fqueue functions. */
-struct flow_set * fset_create()
+struct flow_set * fset_create(void)
{
struct flow_set * set;
@@ -1501,7 +1489,7 @@ void fset_destroy(struct flow_set * set)
free(set);
}
-struct fqueue * fqueue_create()
+struct fqueue * fqueue_create(void)
{
struct fqueue * fq = malloc(sizeof(*fq));
if (fq == NULL)
@@ -1540,20 +1528,20 @@ int fset_add(struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
ret = -EINVAL;
goto fail;
}
if (flow->frcti != NULL)
- shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
- ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);
+ ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id);
if (ret < 0)
goto fail;
if (shm_rbuff_queued(ai.flows[fd].rx_rb))
- shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT);
+ shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1576,11 +1564,11 @@ void fset_del(struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
+ if (flow->info.id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, flow->info.id);
if (flow->frcti != NULL)
- shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id);
pthread_rwlock_unlock(&ai.lock);
}
@@ -1595,12 +1583,12 @@ bool fset_has(const struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return false;
}
- ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1);
+ ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1);
pthread_rwlock_unlock(&ai.lock);
@@ -1621,7 +1609,7 @@ static int fqueue_filter(struct fqueue * fq)
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[fq->fqueue[fq->next].flow_id].fd;
+ fd = ai.id_to_fd[fq->fqueue[fq->next].flow_id].fd;
if (fd < 0) {
++fq->next;
pthread_rwlock_unlock(&ai.lock);
@@ -1667,7 +1655,7 @@ static int fqueue_filter(struct fqueue * fq)
int fqueue_next(struct fqueue * fq)
{
int fd;
- struct portevent * e;
+ struct flowevent * e;
if (fq == NULL)
return -EINVAL;
@@ -1682,7 +1670,7 @@ int fqueue_next(struct fqueue * fq)
e = fq->fqueue + fq->next;
- fd = ai.ports[e->flow_id].fd;
+ fd = ai.id_to_fd[e->flow_id].fd;
++fq->next;
@@ -1741,10 +1729,20 @@ ssize_t fevent(struct flow_set * set,
/* ipcp-dev functions. */
-int np1_flow_alloc(pid_t n_pid,
- int flow_id)
+int np1_flow_alloc(pid_t n_pid,
+ int flow_id)
{
- return flow_init(flow_id, n_pid, qos_np1, NULL, 0);
+ struct flow_info flow;
+
+ memset(&flow, 0, sizeof(flow));
+
+ flow.id = flow_id;
+ flow.n_pid = getpid();
+ flow.qs = qos_np1;
+ flow.mpl = 0;
+ flow.n_1_pid = n_pid; /* This "flow" is upside-down! */
+
+ return flow_init(&flow, NULL);
}
int np1_flow_dealloc(int flow_id,
@@ -1762,7 +1760,7 @@ int np1_flow_dealloc(int flow_id,
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[flow_id].fd;
+ fd = ai.id_to_fd[flow_id].fd;
pthread_rwlock_unlock(&ai.lock);
@@ -1773,137 +1771,99 @@ int np1_flow_resp(int flow_id)
{
int fd;
- if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED)
+ if (flow_wait_assign(flow_id) != FLOW_ALLOCATED)
return -1;
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[flow_id].fd;
+ fd = ai.id_to_fd[flow_id].fd;
pthread_rwlock_unlock(&ai.lock);
return fd;
}
-int ipcp_create_r(int result)
+int ipcp_create_r(const struct ipcp_info * info)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret;
-
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_result = true;
- msg.result = result;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (ipcp_create_r__irm_req_ser(&msg,info) < 0)
+ return -ENOMEM;
- return ret;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ return irm__irm_result_des(&msg);
}
-int ipcp_flow_req_arr(const uint8_t * dst,
- size_t len,
- qosspec_t qs,
- time_t mpl,
- const void * data,
- size_t dlen)
+int ipcp_flow_req_arr(const buffer_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const buffer_t * data)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- qosspec_msg_t qs_msg;
- int fd;
-
- assert(dst != NULL);
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_hash = true;
- msg.hash.len = len;
- msg.hash.data = (uint8_t *) dst;
- qs_msg = spec_to_msg(&qs);
- msg.qosspec = &qs_msg;
- msg.has_mpl = true;
- msg.mpl = mpl;
- msg.has_pk = true;
- msg.pk.data = (uint8_t *) data;
- msg.pk.len = dlen;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_flow_id || !recv_msg->has_pid) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- if (recv_msg->has_result && recv_msg->result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ memset(&flow, 0, sizeof(flow));
- fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0);
+ assert(dst != NULL && dst->len != 0 && dst->data != NULL);
- irm_msg__free_unpacked(recv_msg, NULL);
+ flow.n_1_pid = getpid();
+ flow.qs = qs;
+ flow.mpl = mpl;
- return fd;
+ if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0)
+ return -ENOMEM;
+
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
+
+ /* inverted for np1_flow */
+ flow.n_1_pid = flow.n_pid;
+ flow.n_pid = getpid();
+ flow.mpl = 0;
+
+ return flow_init(&flow, NULL);
}
-int ipcp_flow_alloc_reply(int fd,
- int response,
- time_t mpl,
- const void * data,
- size_t len)
+int ipcp_flow_alloc_reply(int fd,
+ int response,
+ time_t mpl,
+ const buffer_t * data)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
- msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg.has_flow_id = true;
- msg.has_pk = true;
- msg.pk.data = (uint8_t *) data;
- msg.pk.len = (uint32_t) len;
- msg.has_mpl = true;
- msg.mpl = mpl;
-
pthread_rwlock_rdlock(&ai.lock);
- msg.flow_id = ai.flows[fd].flow_id;
+ flow.id = ai.flows[fd].info.id;
pthread_rwlock_unlock(&ai.lock);
- msg.has_response = true;
- msg.response = response;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ flow.mpl = mpl;
- ret = recv_msg->result;
+ if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- return ret;
+ return irm__irm_result_des(&msg);
}
int ipcp_flow_read(int fd,
@@ -1919,9 +1879,9 @@ int ipcp_flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(flow->flow_id >= 0);
+ assert(flow->info.id >= 0);
- while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ while (frcti_queued_pdu(flow->frcti) < 0) {
pthread_rwlock_unlock(&ai.lock);
idx = flow_rx_sdb(flow, sdb, false, NULL);
@@ -1951,7 +1911,7 @@ int ipcp_flow_write(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1979,7 +1939,7 @@ int np1_flow_read(int fd,
flow = &ai.flows[fd];
- assert(flow->flow_id >= 0);
+ assert(flow->info.id >= 0);
pthread_rwlock_rdlock(&ai.lock);
@@ -2010,7 +1970,7 @@ int np1_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -2028,7 +1988,7 @@ int np1_flow_write(int fd,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
return ret;
}
@@ -2052,7 +2012,7 @@ int ipcp_flow_fini(int fd)
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -1;
}
@@ -2061,7 +2021,7 @@ int ipcp_flow_fini(int fd)
shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].flow_id,
+ ai.flows[fd].info.id,
FLOW_DEALLOC);
rx_rb = ai.flows[fd].rx_rb;
@@ -2082,9 +2042,9 @@ int ipcp_flow_get_qoscube(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].flow_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
- *cube = qos_spec_to_cube(ai.flows[fd].qs);
+ *cube = qos_spec_to_cube(ai.flows[fd].info.qs);
pthread_rwlock_unlock(&ai.lock);
@@ -2097,7 +2057,7 @@ size_t ipcp_flow_queued(int fd)
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].flow_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
q = shm_rbuff_queued(ai.flows[fd].tx_rb);
@@ -2133,14 +2093,14 @@ int local_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);
if (ret == 0)
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
else
shm_rdrbuff_remove(ai.rdrb, idx);