summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c689
1 files changed, 383 insertions, 306 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index fb06c496..454dd027 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -127,7 +127,7 @@ struct fqueue {
};
struct {
- struct ssm_pool * gspp;
+ struct ssm_pool * pool;
struct ssm_flow_set * fqset;
struct bmp * fds;
@@ -146,14 +146,14 @@ struct {
fset_t * frct_set;
pthread_rwlock_t lock;
-} ai;
+} proc;
static void flow_destroy(struct fmap * p)
{
- pthread_mutex_lock(&ai.mtx);
+ pthread_mutex_lock(&proc.mtx);
if (p->state == FLOW_DESTROY) {
- pthread_mutex_unlock(&ai.mtx);
+ pthread_mutex_unlock(&proc.mtx);
return;
}
@@ -162,12 +162,12 @@ static void flow_destroy(struct fmap * p)
else
p->state = FLOW_NULL;
- pthread_cond_signal(&ai.cond);
+ pthread_cond_signal(&proc.cond);
- pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &proc.mtx);
while (p->state != FLOW_NULL)
- pthread_cond_wait(&ai.cond, &ai.mtx);
+ pthread_cond_wait(&proc.cond, &proc.mtx);
p->fd = -1;
p->state = FLOW_INIT;
@@ -178,17 +178,17 @@ static void flow_destroy(struct fmap * p)
static void flow_set_state(struct fmap * p,
enum flow_state state)
{
- pthread_mutex_lock(&ai.mtx);
+ pthread_mutex_lock(&proc.mtx);
if (p->state == FLOW_DESTROY) {
- pthread_mutex_unlock(&ai.mtx);
+ pthread_mutex_unlock(&proc.mtx);
return;
}
p->state = state;
- pthread_cond_broadcast(&ai.cond);
+ pthread_cond_broadcast(&proc.cond);
- pthread_mutex_unlock(&ai.mtx);
+ pthread_mutex_unlock(&proc.mtx);
}
static enum flow_state flow_wait_assign(int flow_id)
@@ -196,26 +196,26 @@ static enum flow_state flow_wait_assign(int flow_id)
enum flow_state state;
struct fmap * p;
- p = &ai.id_to_fd[flow_id];
+ p = &proc.id_to_fd[flow_id];
- pthread_mutex_lock(&ai.mtx);
+ pthread_mutex_lock(&proc.mtx);
if (p->state == FLOW_ALLOCATED) {
- pthread_mutex_unlock(&ai.mtx);
+ pthread_mutex_unlock(&proc.mtx);
return FLOW_ALLOCATED;
}
if (p->state == FLOW_INIT)
p->state = FLOW_ALLOC_PENDING;
- pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &proc.mtx);
while (p->state == FLOW_ALLOC_PENDING)
- pthread_cond_wait(&ai.cond, &ai.mtx);
+ pthread_cond_wait(&proc.cond, &proc.mtx);
if (p->state == FLOW_DESTROY) {
p->state = FLOW_NULL;
- pthread_cond_broadcast(&ai.cond);
+ pthread_cond_broadcast(&proc.cond);
}
state = p->state;
@@ -227,13 +227,13 @@ static enum flow_state flow_wait_assign(int flow_id)
return state;
}
-static int proc_announce(const char * prog)
+static int proc_announce(const struct proc_info * proc)
{
uint8_t buf[SOCK_BUF_SIZE];
buffer_t msg = {SOCK_BUF_SIZE, buf};
int err;
- if (proc_announce__irm_req_ser(&msg, prog) < 0)
+ if (proc_announce__irm_req_ser(&msg, proc) < 0)
return -ENOMEM;
err = send_recv_msg(&msg);
@@ -342,23 +342,23 @@ static void flow_send_keepalive(struct flow * flow,
ssize_t idx;
uint8_t * ptr;
- idx = ssm_pool_alloc(ai.gspp, 0, &ptr, &spb);
+ idx = ssm_pool_alloc(proc.pool, 0, &ptr, &spb);
if (idx < 0)
return;
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
flow->snd_act = now;
if (ssm_rbuff_write(flow->tx_rb, idx))
- ssm_pool_remove(ai.gspp, idx);
+ ssm_pool_remove(proc.pool, idx);
else
ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
}
-/* Needs rdlock on ai. */
+/* Needs rdlock on proc. */
static void _flow_keepalive(struct flow * flow)
{
struct timespec now;
@@ -382,16 +382,16 @@ static void _flow_keepalive(struct flow * flow)
if (ts_diff_ns(&now, &r_act) > (int64_t) timeo * MILLION) {
ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER);
- ssm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
+ ssm_flow_set_notify(proc.fqset, flow_id, FLOW_PEER);
return;
}
if (ts_diff_ns(&now, &s_act) > (int64_t) timeo * (MILLION >> 2)) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
flow_send_keepalive(flow, now);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
}
}
@@ -400,15 +400,15 @@ static void handle_keepalives(void)
struct list_head * p;
struct list_head * h;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- list_for_each_safe(p, h, &ai.flow_list) {
+ list_for_each_safe(p, h, &proc.flow_list) {
struct flow * flow;
flow = list_entry(p, struct flow, next);
_flow_keepalive(flow);
}
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
}
static void __cleanup_fqueue_destroy(void * fq)
@@ -429,7 +429,7 @@ void * flow_rx(void * o)
pthread_cleanup_push(__cleanup_fqueue_destroy, fq);
/* fevent will filter all FRCT packets for us */
- while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) {
+ while ((ret = fevent(proc.frct_set, fq, &tic)) != 0) {
if (ret == -ETIMEDOUT) {
handle_keepalives();
continue;
@@ -446,63 +446,63 @@ void * flow_rx(void * o)
static void flow_clear(int fd)
{
- memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
+ memset(&proc.flows[fd], 0, sizeof(proc.flows[fd]));
- ai.flows[fd].info.id = -1;
+ proc.flows[fd].info.id = -1;
}
static void __flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
- if (ai.flows[fd].frcti != NULL) {
- ai.n_frcti--;
- if (ai.n_frcti == 0) {
- pthread_cancel(ai.tx);
- pthread_join(ai.tx, NULL);
+ if (proc.flows[fd].frcti != NULL) {
+ proc.n_frcti--;
+ if (proc.n_frcti == 0) {
+ pthread_cancel(proc.tx);
+ pthread_join(proc.tx, NULL);
}
- ssm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
+ ssm_flow_set_del(proc.fqset, 0, proc.flows[fd].info.id);
- frcti_destroy(ai.flows[fd].frcti);
+ frcti_destroy(proc.flows[fd].frcti);
}
- if (ai.flows[fd].info.id != -1) {
- flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]);
- bmp_release(ai.fds, fd);
+ if (proc.flows[fd].info.id != -1) {
+ flow_destroy(&proc.id_to_fd[proc.flows[fd].info.id]);
+ bmp_release(proc.fds, fd);
}
- if (ai.flows[fd].rx_rb != NULL) {
- ssm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
- ssm_rbuff_close(ai.flows[fd].rx_rb);
+ if (proc.flows[fd].rx_rb != NULL) {
+ ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_close(proc.flows[fd].rx_rb);
}
- if (ai.flows[fd].tx_rb != NULL) {
- ssm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
- ssm_rbuff_close(ai.flows[fd].tx_rb);
+ if (proc.flows[fd].tx_rb != NULL) {
+ ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_close(proc.flows[fd].tx_rb);
}
- if (ai.flows[fd].set != NULL) {
- ssm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].info.id,
+ if (proc.flows[fd].set != NULL) {
+ ssm_flow_set_notify(proc.flows[fd].set,
+ proc.flows[fd].info.id,
FLOW_DEALLOC);
- ssm_flow_set_close(ai.flows[fd].set);
+ ssm_flow_set_close(proc.flows[fd].set);
}
- crypt_destroy_ctx(ai.flows[fd].crypt);
+ crypt_destroy_ctx(proc.flows[fd].crypt);
- list_del(&ai.flows[fd].next);
+ list_del(&proc.flows[fd].next);
flow_clear(fd);
}
static void flow_fini(int fd)
{
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
__flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
}
#define IS_ENCRYPTED(crypt) ((crypt)->nid != NID_undef)
@@ -517,15 +517,15 @@ static int flow_init(struct flow_info * info,
clock_gettime(PTHREAD_COND_CLOCK, &now);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
- fd = bmp_allocate(ai.fds);
- if (!bmp_is_id_valid(ai.fds, fd)) {
+ fd = bmp_allocate(proc.fds);
+ if (!bmp_is_id_valid(proc.fds, fd)) {
err = -EBADF;
goto fail_fds;
}
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
flow->info = *info;
@@ -566,27 +566,27 @@ static int flow_init(struct flow_info * info,
if (flow->frcti == NULL)
goto fail_frcti;
- if (ssm_flow_set_add(ai.fqset, 0, info->id))
+ if (ssm_flow_set_add(proc.fqset, 0, info->id))
goto fail_flow_set_add;
- ++ai.n_frcti;
- if (ai.n_frcti == 1 &&
- pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0)
+ ++proc.n_frcti;
+ if (proc.n_frcti == 1 &&
+ pthread_create(&proc.tx, NULL, flow_tx, NULL) < 0)
goto fail_tx_thread;
}
- list_add_tail(&flow->next, &ai.flow_list);
+ list_add_tail(&flow->next, &proc.flow_list);
- ai.id_to_fd[info->id].fd = fd;
+ proc.id_to_fd[info->id].fd = fd;
- flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED);
+ flow_set_state(&proc.id_to_fd[info->id], FLOW_ALLOCATED);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return fd;
fail_tx_thread:
- ssm_flow_set_del(ai.fqset, 0, info->id);
+ ssm_flow_set_del(proc.fqset, 0, info->id);
fail_flow_set_add:
frcti_destroy(flow->frcti);
fail_frcti:
@@ -598,9 +598,9 @@ static int flow_init(struct flow_info * info,
fail_tx_rb:
ssm_rbuff_close(flow->rx_rb);
fail_rx_rb:
- bmp_release(ai.fds, fd);
+ bmp_release(proc.fds, fd);
fail_fds:
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return err;
}
@@ -618,6 +618,7 @@ static void init(int argc,
char ** argv,
char ** envp)
{
+ struct proc_info info;
char * prog = argv[0];
int i;
#ifdef PROC_FLOW_STATS
@@ -635,7 +636,11 @@ static void init(int argc,
goto fail_prog;
}
- if (proc_announce(prog)) {
+ memset(&info, 0, sizeof(info));
+ info.pid = getpid();
+ strncpy(info.prog, prog, PROG_NAME_SIZE);
+
+ if (proc_announce(&info)) {
fprintf(stderr, "FATAL: Could not announce to IRMd.\n");
goto fail_prog;
}
@@ -650,26 +655,30 @@ static void init(int argc,
gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0);
}
#endif
- ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS);
- if (ai.fds == NULL) {
+ proc.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS);
+ if (proc.fds == NULL) {
fprintf(stderr, "FATAL: Could not create fd bitmap.\n");
goto fail_fds;
}
- ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);
- if (ai.fqueues == NULL) {
+ proc.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);
+ if (proc.fqueues == NULL) {
fprintf(stderr, "FATAL: Could not create fqueue bitmap.\n");
goto fail_fqueues;
}
- ai.gspp = ssm_pool_open();
- if (ai.gspp == NULL) {
+ if (is_ouroboros_member_uid(getuid()))
+ proc.pool = ssm_pool_open(0);
+ else
+ proc.pool = ssm_pool_open(getuid());
+
+ if (proc.pool == NULL) {
fprintf(stderr, "FATAL: Could not open packet buffer.\n");
goto fail_rdrb;
}
- ai.flows = malloc(sizeof(*ai.flows) * PROG_MAX_FLOWS);
- if (ai.flows == NULL) {
+ proc.flows = malloc(sizeof(*proc.flows) * PROG_MAX_FLOWS);
+ if (proc.flows == NULL) {
fprintf(stderr, "FATAL: Could not malloc flows.\n");
goto fail_flows;
}
@@ -677,38 +686,38 @@ static void init(int argc,
for (i = 0; i < PROG_MAX_FLOWS; ++i)
flow_clear(i);
- ai.id_to_fd = malloc(sizeof(*ai.id_to_fd) * SYS_MAX_FLOWS);
- if (ai.id_to_fd == NULL) {
+ proc.id_to_fd = malloc(sizeof(*proc.id_to_fd) * SYS_MAX_FLOWS);
+ if (proc.id_to_fd == NULL) {
fprintf(stderr, "FATAL: Could not malloc id_to_fd.\n");
goto fail_id_to_fd;
}
for (i = 0; i < SYS_MAX_FLOWS; ++i)
- ai.id_to_fd[i].state = FLOW_INIT;
+ proc.id_to_fd[i].state = FLOW_INIT;
- if (pthread_mutex_init(&ai.mtx, NULL)) {
+ if (pthread_mutex_init(&proc.mtx, NULL)) {
fprintf(stderr, "FATAL: Could not init mutex.\n");
goto fail_mtx;
}
- if (pthread_cond_init(&ai.cond, NULL) < 0) {
+ if (pthread_cond_init(&proc.cond, NULL) < 0) {
fprintf(stderr, "FATAL: Could not init condvar.\n");
goto fail_cond;
}
- if (pthread_rwlock_init(&ai.lock, NULL) < 0) {
+ if (pthread_rwlock_init(&proc.lock, NULL) < 0) {
fprintf(stderr, "FATAL: Could not initialize flow lock.\n");
goto fail_flow_lock;
}
- ai.fqset = ssm_flow_set_open(getpid());
- if (ai.fqset == NULL) {
+ proc.fqset = ssm_flow_set_open(getpid());
+ if (proc.fqset == NULL) {
fprintf(stderr, "FATAL: Could not open flow set.\n");
goto fail_fqset;
}
- ai.frct_set = fset_create();
- if (ai.frct_set == NULL || ai.frct_set->idx != 0) {
+ proc.frct_set = fset_create();
+ if (proc.frct_set == NULL || proc.frct_set->idx != 0) {
fprintf(stderr, "FATAL: Could not create FRCT set.\n");
goto fail_frct_set;
}
@@ -732,12 +741,12 @@ static void init(int argc,
}
}
#endif
- if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0) {
+ if (pthread_create(&proc.rx, NULL, flow_rx, NULL) < 0) {
fprintf(stderr, "FATAL: Could not start monitor thread.\n");
goto fail_monitor;
}
- list_head_init(&ai.flow_list);
+ list_head_init(&proc.flow_list);
return;
@@ -748,27 +757,27 @@ static void init(int argc,
#endif
timerwheel_fini();
fail_timerwheel:
- fset_destroy(ai.frct_set);
+ fset_destroy(proc.frct_set);
fail_frct_set:
- ssm_flow_set_close(ai.fqset);
+ ssm_flow_set_close(proc.fqset);
fail_fqset:
- pthread_rwlock_destroy(&ai.lock);
+ pthread_rwlock_destroy(&proc.lock);
fail_flow_lock:
- pthread_cond_destroy(&ai.cond);
+ pthread_cond_destroy(&proc.cond);
fail_cond:
- pthread_mutex_destroy(&ai.mtx);
+ pthread_mutex_destroy(&proc.mtx);
fail_mtx:
- free(ai.id_to_fd);
+ free(proc.id_to_fd);
fail_id_to_fd:
- free(ai.flows);
+ free(proc.flows);
fail_flows:
- ssm_pool_close(ai.gspp);
+ ssm_pool_close(proc.pool);
fail_rdrb:
- bmp_destroy(ai.fqueues);
+ bmp_destroy(proc.fqueues);
fail_fqueues:
- bmp_destroy(ai.fds);
+ bmp_destroy(proc.fds);
fail_fds:
- memset(&ai, 0, sizeof(ai));
+ memset(&proc, 0, sizeof(proc));
fail_prog:
exit(EXIT_FAILURE);
}
@@ -777,51 +786,52 @@ static void fini(void)
{
int i;
- if (ai.fds == NULL)
+ if (proc.fds == NULL)
return;
- pthread_cancel(ai.rx);
- pthread_join(ai.rx, NULL);
+ pthread_cancel(proc.rx);
+ pthread_join(proc.rx, NULL);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
- if (ai.flows[i].info.id != -1) {
+ struct flow * flow = &proc.flows[i];
+ if (flow->info.id != -1) {
ssize_t idx;
- ssm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN);
- while ((idx = ssm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
- ssm_pool_remove(ai.gspp, idx);
+ ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWDOWN);
+ while ((idx = ssm_rbuff_read(flow->rx_rb)) >= 0)
+ ssm_pool_remove(proc.pool, idx);
__flow_fini(i);
}
}
- pthread_cond_destroy(&ai.cond);
- pthread_mutex_destroy(&ai.mtx);
+ pthread_cond_destroy(&proc.cond);
+ pthread_mutex_destroy(&proc.mtx);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
#ifdef PROC_FLOW_STATS
rib_fini();
#endif
timerwheel_fini();
- fset_destroy(ai.frct_set);
+ fset_destroy(proc.frct_set);
- ssm_flow_set_close(ai.fqset);
+ ssm_flow_set_close(proc.fqset);
- pthread_rwlock_destroy(&ai.lock);
+ pthread_rwlock_destroy(&proc.lock);
- free(ai.flows);
- free(ai.id_to_fd);
+ free(proc.flows);
+ free(proc.id_to_fd);
- ssm_pool_close(ai.gspp);
+ ssm_pool_close(proc.pool);
- bmp_destroy(ai.fds);
- bmp_destroy(ai.fqueues);
+ bmp_destroy(proc.fds);
+ bmp_destroy(proc.fqueues);
proc_exit();
- memset(&ai, 0, sizeof(ai));
+ memset(&proc, 0, sizeof(proc));
}
#if defined(__MACH__) && defined(__APPLE__)
@@ -978,12 +988,12 @@ int flow_dealloc(int fd)
memset(&info, 0, sizeof(flow));
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return -ENOTALLOC;
}
@@ -992,21 +1002,21 @@ int flow_dealloc(int fd)
flow->rcv_timesout = true;
flow->rcv_timeo = tic;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
flow_read(fd, buf, SOCK_BUF_SIZE);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
timeo.tv_sec = frcti_dealloc(flow->frcti);
while (timeo.tv_sec < 0) { /* keep the flow active for rtx */
ssize_t ret;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
ret = flow_read(fd, pkt, PKT_BUF_LEN);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
timeo.tv_sec = frcti_dealloc(flow->frcti);
@@ -1014,7 +1024,7 @@ int flow_dealloc(int fd)
timeo.tv_sec = -timeo.tv_sec;
}
- pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
+ pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock);
ssm_rbuff_fini(flow->tx_rb);
@@ -1048,21 +1058,21 @@ int ipcp_flow_dealloc(int fd)
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
memset(&info, 0, sizeof(flow));
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return -ENOTALLOC;
}
info.id = flow->info.id;
info.n_1_pid = flow->info.n_1_pid;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0)
return -ENOMEM;
@@ -1096,14 +1106,14 @@ int fccntl(int fd,
if (fd < 0 || fd >= SYS_MAX_FLOWS)
return -EBADF;
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
va_start(l, cmd);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
if (flow->info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
va_end(l);
return -ENOTALLOC;
}
@@ -1209,24 +1219,24 @@ int fccntl(int fd,
*cflags = frcti_getflags(flow->frcti);
break;
default:
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
va_end(l);
return -ENOTSUP;
};
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
va_end(l);
return 0;
einval:
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
va_end(l);
return -EINVAL;
eperm:
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
va_end(l);
return -EPERM;
}
@@ -1268,15 +1278,15 @@ static int flow_tx_spb(struct flow * flow,
clock_gettime(PTHREAD_COND_CLOCK, &now);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
flow->snd_act = now;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
idx = ssm_pk_buff_get_idx(spb);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (ssm_pk_buff_len(spb) > 0) {
if (frcti_snd(flow->frcti, spb) < 0)
@@ -1289,7 +1299,7 @@ static int flow_tx_spb(struct flow * flow,
goto enomem;
}
- pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
+ pthread_cleanup_push(__cleanup_rwlock_unlock, &proc.lock);
if (!block)
ret = ssm_rbuff_write(flow->tx_rb, idx);
@@ -1297,7 +1307,7 @@ static int flow_tx_spb(struct flow * flow,
ret = ssm_rbuff_write_b(flow->tx_rb, idx, abstime);
if (ret < 0)
- ssm_pool_remove(ai.gspp, idx);
+ ssm_pool_remove(proc.pool, idx);
else
ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
@@ -1306,8 +1316,8 @@ static int flow_tx_spb(struct flow * flow,
return 0;
enomem:
- pthread_rwlock_unlock(&ai.lock);
- ssm_pool_remove(ai.gspp, idx);
+ pthread_rwlock_unlock(&proc.lock);
+ ssm_pool_remove(proc.pool, idx);
return -ENOMEM;
}
@@ -1330,14 +1340,14 @@ ssize_t flow_write(int fd,
if (fd < 0 || fd >= PROG_MAX_FLOWS)
return -EBADF;
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
clock_gettime(PTHREAD_COND_CLOCK, &abs);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
if (flow->info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return -ENOTALLOC;
}
@@ -1348,7 +1358,7 @@ ssize_t flow_write(int fd,
flags = flow->oflags;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
return -EPERM;
@@ -1356,12 +1366,12 @@ ssize_t flow_write(int fd,
if (flags & FLOWFWNOBLOCK) {
if (!frcti_is_window_open(flow->frcti))
return -EAGAIN;
- idx = ssm_pool_alloc(ai.gspp, count, &ptr, &spb);
+ idx = ssm_pool_alloc(proc.pool, count, &ptr, &spb);
} else {
ret = frcti_window_wait(flow->frcti, abstime);
if (ret < 0)
return ret;
- idx = ssm_pool_alloc_b(ai.gspp, count, &ptr, &spb, abstime);
+ idx = ssm_pool_alloc_b(proc.pool, count, &ptr, &spb, abstime);
}
if (idx < 0)
@@ -1405,16 +1415,16 @@ static ssize_t flow_rx_spb(struct flow * flow,
clock_gettime(PTHREAD_COND_CLOCK, &now);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
flow->rcv_act = now;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
- *spb = ssm_pool_get(ai.gspp, idx);
+ *spb = ssm_pool_get(proc.pool, idx);
if (invalid_pkt(flow, *spb)) {
- ssm_pool_remove(ai.gspp, idx);
+ ssm_pool_remove(proc.pool, idx);
return -EAGAIN;
}
@@ -1439,19 +1449,19 @@ ssize_t flow_read(int fd,
if (fd < 0 || fd >= PROG_MAX_FLOWS)
return -EBADF;
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
clock_gettime(PTHREAD_COND_CLOCK, &now);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return -ENOTALLOC;
}
if (flow->part_idx == DONE_PART) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
flow->part_idx = NO_PART;
return 0;
}
@@ -1467,7 +1477,7 @@ ssize_t flow_read(int fd,
idx = flow->part_idx;
if (idx < 0) {
while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
idx = flow_rx_spb(flow, &spb, block, abstime);
if (idx < 0) {
@@ -1476,19 +1486,19 @@ ssize_t flow_read(int fd,
if (!block)
return idx;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
continue;
}
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
frcti_rcv(flow->frcti, spb);
}
}
- spb = ssm_pool_get(ai.gspp, idx);
+ spb = ssm_pool_get(proc.pool, idx);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
packet = ssm_pk_buff_head(spb);
@@ -1500,25 +1510,25 @@ ssize_t flow_read(int fd,
memcpy(buf, packet, n);
ipcp_spb_release(spb);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
flow->part_idx = (partrd && n == (ssize_t) count) ?
DONE_PART : NO_PART;
flow->rcv_act = now;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return n;
} else {
if (partrd) {
memcpy(buf, packet, count);
ssm_pk_buff_head_release(spb, n);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
flow->part_idx = idx;
flow->rcv_act = now;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return count;
} else {
ipcp_spb_release(spb);
@@ -1537,20 +1547,20 @@ struct flow_set * fset_create(void)
if (set == NULL)
goto fail_malloc;
- assert(ai.fqueues);
+ assert(proc.fqueues);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
- set->idx = bmp_allocate(ai.fqueues);
- if (!bmp_is_id_valid(ai.fqueues, set->idx))
+ set->idx = bmp_allocate(proc.fqueues);
+ if (!bmp_is_id_valid(proc.fqueues, set->idx))
goto fail_bmp_alloc;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return set;
fail_bmp_alloc:
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
free(set);
fail_malloc:
return NULL;
@@ -1563,11 +1573,11 @@ void fset_destroy(struct flow_set * set)
fset_zero(set);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
- bmp_release(ai.fqueues, set->idx);
+ bmp_release(proc.fqueues, set->idx);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
free(set);
}
@@ -1595,7 +1605,7 @@ void fset_zero(struct flow_set * set)
if (set == NULL)
return;
- ssm_flow_set_zero(ai.fqset, set->idx);
+ ssm_flow_set_zero(proc.fqset, set->idx);
}
int fset_add(struct flow_set * set,
@@ -1607,9 +1617,9 @@ int fset_add(struct flow_set * set,
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return -EINVAL;
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0) {
ret = -EINVAL;
@@ -1617,21 +1627,21 @@ int fset_add(struct flow_set * set,
}
if (flow->frcti != NULL)
- ssm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
+ ssm_flow_set_del(proc.fqset, 0, flow->info.id);
- ret = ssm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id);
+ ret = ssm_flow_set_add(proc.fqset, set->idx, flow->info.id);
if (ret < 0)
goto fail;
- if (ssm_rbuff_queued(ai.flows[fd].rx_rb))
- ssm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT);
+ if (ssm_rbuff_queued(flow->rx_rb))
+ ssm_flow_set_notify(proc.fqset, flow->info.id, FLOW_PKT);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return ret;
fail:
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return ret;
}
@@ -1643,37 +1653,40 @@ void fset_del(struct flow_set * set,
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return;
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id >= 0)
- ssm_flow_set_del(ai.fqset, set->idx, flow->info.id);
+ ssm_flow_set_del(proc.fqset, set->idx, flow->info.id);
if (flow->frcti != NULL)
- ssm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id);
+ ssm_flow_set_add(proc.fqset, 0, proc.flows[fd].info.id);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
}
bool fset_has(const struct flow_set * set,
int fd)
{
- bool ret;
+ struct flow * flow;
+ bool ret;
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return false;
- pthread_rwlock_rdlock(&ai.lock);
+ flow = &proc.flows[fd];
- if (ai.flows[fd].info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
+
+ if (flow->info.id < 0) {
+ pthread_rwlock_unlock(&proc.lock);
return false;
}
- ret = (ssm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1);
+ ret = (ssm_flow_set_has(proc.fqset, set->idx, flow->info.id) == 1);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return ret;
}
@@ -1690,44 +1703,44 @@ static int fqueue_filter(struct fqueue * fq)
if (fq->fqueue[fq->next].event != FLOW_PKT)
return 1;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- fd = ai.id_to_fd[fq->fqueue[fq->next].flow_id].fd;
+ fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd;
if (fd < 0) {
++fq->next;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
continue;
}
- frcti = ai.flows[fd].frcti;
+ frcti = proc.flows[fd].frcti;
if (frcti == NULL) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return 1;
}
if (__frcti_pdu_ready(frcti) >= 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return 1;
}
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
- idx = flow_rx_spb(&ai.flows[fd], &spb, false, NULL);
+ idx = flow_rx_spb(&proc.flows[fd], &spb, false, NULL);
if (idx < 0)
return 0;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- spb = ssm_pool_get(ai.gspp, idx);
+ spb = ssm_pool_get(proc.pool, idx);
__frcti_rcv(frcti, spb);
if (__frcti_pdu_ready(frcti) >= 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return 1;
}
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
++fq->next;
}
@@ -1749,15 +1762,15 @@ int fqueue_next(struct fqueue * fq)
if (fq->next != 0 && fqueue_filter(fq) == 0)
return -EPERM;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
e = fq->fqueue + fq->next;
- fd = ai.id_to_fd[e->flow_id].fd;
+ fd = proc.id_to_fd[e->flow_id].fd;
++fq->next;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return fd;
}
@@ -1795,7 +1808,7 @@ ssize_t fevent(struct flow_set * set,
}
while (ret == 0) {
- ret = ssm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
+ ret = ssm_flow_set_wait(proc.fqset, set->idx, fq->fqueue, t);
if (ret == -ETIMEDOUT)
return -ETIMEDOUT;
@@ -1842,11 +1855,11 @@ int np1_flow_dealloc(int flow_id,
sleep(timeo);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- fd = ai.id_to_fd[flow_id].fd;
+ fd = proc.id_to_fd[flow_id].fd;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return fd;
}
@@ -1859,11 +1872,11 @@ int np1_flow_resp(int flow_id,
if (resp == 0 && flow_wait_assign(flow_id) != FLOW_ALLOCATED)
return -1;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- fd = ai.id_to_fd[flow_id].fd;
+ fd = proc.id_to_fd[flow_id].fd;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return fd;
}
@@ -1942,11 +1955,11 @@ int ipcp_flow_alloc_reply(int fd,
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- flow.id = ai.flows[fd].info.id;
+ flow.id = proc.flows[fd].info.id;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
flow.mpl = mpl;
@@ -1969,25 +1982,25 @@ int ipcp_flow_read(int fd,
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(spb);
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
assert(flow->info.id >= 0);
while (frcti_queued_pdu(flow->frcti) < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
idx = flow_rx_spb(flow, spb, false, NULL);
if (idx < 0)
return idx;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
frcti_rcv(flow->frcti, *spb);
}
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return 0;
}
@@ -2001,88 +2014,132 @@ int ipcp_flow_write(int fd,
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(spb);
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&proc.lock);
if (flow->info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return -ENOTALLOC;
}
if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return -EPERM;
}
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
ret = flow_tx_spb(flow, spb, true, NULL);
return ret;
}
+static int pool_copy_spb(struct ssm_pool * src_pool,
+ ssize_t src_idx,
+ struct ssm_pool * dst_pool,
+ struct ssm_pk_buff ** dst_spb)
+{
+ struct ssm_pk_buff * src;
+ uint8_t * ptr;
+ size_t len;
+
+ src = ssm_pool_get(src_pool, src_idx);
+ len = ssm_pk_buff_len(src);
+
+ if (ssm_pool_alloc(dst_pool, len, &ptr, dst_spb) < 0) {
+ ssm_pool_remove(src_pool, src_idx);
+ return -ENOMEM;
+ }
+
+ memcpy(ptr, ssm_pk_buff_head(src), len);
+ ssm_pool_remove(src_pool, src_idx);
+
+ return 0;
+}
+
int np1_flow_read(int fd,
- struct ssm_pk_buff ** spb)
+ struct ssm_pk_buff ** spb,
+ struct ssm_pool * pool)
{
- struct flow * flow;
- ssize_t idx = -1;
+ struct flow * flow;
+ ssize_t idx = -1;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(spb);
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
assert(flow->info.id >= 0);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
idx = ssm_rbuff_read(flow->rx_rb);
if (idx < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return idx;
}
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
- *spb = ssm_pool_get(ai.gspp, idx);
+ if (pool == NULL) {
+ *spb = ssm_pool_get(proc.pool, idx);
+ } else {
+ /* Cross-pool copy: PUP -> GSPP */
+ if (pool_copy_spb(pool, idx, proc.pool, spb) < 0)
+ return -ENOMEM;
+ }
return 0;
}
int np1_flow_write(int fd,
- struct ssm_pk_buff * spb)
+ struct ssm_pk_buff * spb,
+ struct ssm_pool * pool)
{
- struct flow * flow;
- int ret;
- ssize_t idx;
+ struct flow * flow;
+ struct ssm_pk_buff * dst;
+ int ret;
+ ssize_t idx;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(spb);
- flow = &ai.flows[fd];
+ flow = &proc.flows[fd];
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
if (flow->info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return -ENOTALLOC;
}
if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return -EPERM;
}
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
idx = ssm_pk_buff_get_idx(spb);
- ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL);
- if (ret < 0)
- ssm_pool_remove(ai.gspp, idx);
- else
- ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+ if (pool == NULL) {
+ ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL);
+ if (ret < 0)
+ ssm_pool_remove(proc.pool, idx);
+ else
+ ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+ } else {
+ /* Cross-pool copy: GSPP -> PUP */
+ if (pool_copy_spb(proc.pool, idx, pool, &dst) < 0)
+ return -ENOMEM;
+ idx = ssm_pk_buff_get_idx(dst);
+ ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL);
+ if (ret < 0)
+ ssm_pool_remove(pool, idx);
+ else
+ ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
+ }
return ret;
}
@@ -2090,12 +2147,12 @@ int np1_flow_write(int fd,
int ipcp_spb_reserve(struct ssm_pk_buff ** spb,
size_t len)
{
- return ssm_pool_alloc_b(ai.gspp, len, NULL, spb, NULL) < 0 ? -1 : 0;
+ return ssm_pool_alloc_b(proc.pool, len, NULL, spb, NULL) < 0 ? -1 : 0;
}
void ipcp_spb_release(struct ssm_pk_buff * spb)
{
- ssm_pool_remove(ai.gspp, ssm_pk_buff_get_idx(spb));
+ ssm_pool_remove(proc.pool, ssm_pk_buff_get_idx(spb));
}
int ipcp_flow_fini(int fd)
@@ -2104,23 +2161,23 @@ int ipcp_flow_fini(int fd)
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- if (ai.flows[fd].info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ if (proc.flows[fd].info.id < 0) {
+ pthread_rwlock_unlock(&proc.lock);
return -1;
}
- ssm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
- ssm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN);
- ssm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].info.id,
+ ssm_flow_set_notify(proc.flows[fd].set,
+ proc.flows[fd].info.id,
FLOW_DEALLOC);
- rx_rb = ai.flows[fd].rx_rb;
+ rx_rb = proc.flows[fd].rx_rb;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
if (rx_rb != NULL)
ssm_rbuff_fini(rx_rb);
@@ -2134,13 +2191,13 @@ int ipcp_flow_get_qoscube(int fd,
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(cube);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- assert(ai.flows[fd].info.id >= 0);
+ assert(proc.flows[fd].info.id >= 0);
- *cube = qos_spec_to_cube(ai.flows[fd].info.qs);
+ *cube = qos_spec_to_cube(proc.flows[fd].info.qs);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return 0;
}
@@ -2149,56 +2206,76 @@ size_t ipcp_flow_queued(int fd)
{
size_t q;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_rdlock(&proc.lock);
- assert(ai.flows[fd].info.id >= 0);
+ assert(proc.flows[fd].info.id >= 0);
- q = ssm_rbuff_queued(ai.flows[fd].tx_rb);
+ q = ssm_rbuff_queued(proc.flows[fd].tx_rb);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&proc.lock);
return q;
}
-ssize_t local_flow_read(int fd)
+int local_flow_transfer(int src_fd,
+ int dst_fd,
+ struct ssm_pool * src_pool,
+ struct ssm_pool * dst_pool)
{
- ssize_t ret;
-
- assert(fd >= 0);
+ struct flow * src_flow;
+ struct flow * dst_flow;
+ struct ssm_pk_buff * dst_spb;
+ struct ssm_pool * sp;
+ struct ssm_pool * dp;
+ ssize_t idx;
+ int ret;
- pthread_rwlock_rdlock(&ai.lock);
+ assert(src_fd >= 0);
+ assert(dst_fd >= 0);
- ret = ssm_rbuff_read(ai.flows[fd].rx_rb);
+ src_flow = &proc.flows[src_fd];
+ dst_flow = &proc.flows[dst_fd];
- pthread_rwlock_unlock(&ai.lock);
+ sp = src_pool == NULL ? proc.pool : src_pool;
+ dp = dst_pool == NULL ? proc.pool : dst_pool;
- return ret;
-}
-
-int local_flow_write(int fd,
- size_t idx)
-{
- struct flow * flow;
- int ret;
+ pthread_rwlock_rdlock(&proc.lock);
- assert(fd >= 0);
-
- flow = &ai.flows[fd];
-
- pthread_rwlock_rdlock(&ai.lock);
+ idx = ssm_rbuff_read(src_flow->rx_rb);
+ if (idx < 0) {
+ pthread_rwlock_unlock(&proc.lock);
+ return idx;
+ }
- if (flow->info.id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ if (dst_flow->info.id < 0) {
+ pthread_rwlock_unlock(&proc.lock);
+ ssm_pool_remove(sp, idx);
return -ENOTALLOC;
}
- ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL);
- if (ret == 0)
- ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
- else
- ssm_pool_remove(ai.gspp, idx);
+ pthread_rwlock_unlock(&proc.lock);
+
+ if (sp == dp) {
+ /* Same pool: zero-copy */
+ ret = ssm_rbuff_write_b(dst_flow->tx_rb, idx, NULL);
+ if (ret < 0)
+ ssm_pool_remove(sp, idx);
+ else
+ ssm_flow_set_notify(dst_flow->set,
+ dst_flow->info.id, FLOW_PKT);
+ } else {
+ /* Different pools: single copy */
+ if (pool_copy_spb(sp, idx, dp, &dst_spb) < 0)
+ return -ENOMEM;
- pthread_rwlock_unlock(&ai.lock);
+ idx = ssm_pk_buff_get_idx(dst_spb);
+ ret = ssm_rbuff_write_b(dst_flow->tx_rb, idx, NULL);
+ if (ret < 0)
+ ssm_pool_remove(dp, idx);
+ else
+ ssm_flow_set_notify(dst_flow->set,
+ dst_flow->info.id, FLOW_PKT);
+ }
return ret;
}