summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c52
1 files changed, 31 insertions, 21 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index edcf56ed..3d854c2a 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -65,7 +65,7 @@ struct flow_set {
};
struct fqueue {
- int fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
+ int fqueue[2 * SHM_BUFFER_SIZE]; /* Safe copy from shm. */
size_t fqsize;
size_t next;
};
@@ -875,7 +875,7 @@ ssize_t flow_write(int fd,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1039,7 +1039,7 @@ void fset_zero(struct flow_set * set)
int fset_add(struct flow_set * set,
int fd)
{
- int ret;
+ int ret;
size_t sdus;
size_t i;
@@ -1052,7 +1052,7 @@ int fset_add(struct flow_set * set,
sdus = shm_rbuff_queued(ai.flows[fd].rx_rb);
for (i = 0; i < sdus; i++)
- shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id);
+ shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1102,23 +1102,31 @@ int fqueue_next(struct fqueue * fq)
if (fq == NULL)
return -EINVAL;
- if (fq->fqsize == 0)
+ if (fq->fqsize == 0 || fq->next == fq->fqsize)
return -EPERM;
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[fq->fqueue[fq->next++]].fd;
+ fd = ai.ports[fq->fqueue[fq->next]].fd;
- pthread_rwlock_unlock(&ai.lock);
+ fq->next += 2;
- if (fq->next == fq->fqsize) {
- fq->fqsize = 0;
- fq->next = 0;
- }
+ pthread_rwlock_unlock(&ai.lock);
return fd;
}
+enum fqtype fqueue_type(struct fqueue * fq)
+{
+ if (fq == NULL)
+ return -EINVAL;
+
+ if (fq->fqsize == 0 || fq->next == 0)
+ return -EPERM;
+
+ return fq->fqueue[fq->next - 1];
+}
+
int fevent(struct flow_set * set,
struct fqueue * fq,
const struct timespec * timeo)
@@ -1130,11 +1138,9 @@ int fevent(struct flow_set * set,
if (set == NULL || fq == NULL)
return -EINVAL;
- if (fq->fqsize > 0)
+ if (fq->fqsize > 0 && fq->next != fq->fqsize)
return fq->fqsize;
- assert(!fq->next);
-
if (timeo != NULL) {
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
ts_add(&abstime, timeo, &abstime);
@@ -1147,7 +1153,8 @@ int fevent(struct flow_set * set,
return -ETIMEDOUT;
}
- fq->fqsize = ret;
+ fq->fqsize = ret << 1;
+ fq->next = 0;
assert(ret);
@@ -1365,9 +1372,9 @@ int ipcp_flow_write(int fd,
return -ENOMEM;
}
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ ret = shm_rbuff_write(flow->tx_rb, idx);
if (ret == 0)
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1454,20 +1461,23 @@ ssize_t local_flow_read(int fd)
int local_flow_write(int fd,
size_t idx)
{
- int ret;
+ struct flow * flow;
+ int ret;
assert(fd >= 0);
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ ret = shm_rbuff_write(flow->tx_rb, idx);
if (ret == 0)
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);