diff options
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 52 |
1 files changed, 31 insertions, 21 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index edcf56ed..3d854c2a 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -65,7 +65,7 @@ struct flow_set { }; struct fqueue { - int fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */ + int fqueue[2 * SHM_BUFFER_SIZE]; /* Safe copy from shm. */ size_t fqsize; size_t next; }; @@ -875,7 +875,7 @@ ssize_t flow_write(int fd, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1039,7 +1039,7 @@ void fset_zero(struct flow_set * set) int fset_add(struct flow_set * set, int fd) { - int ret; + int ret; size_t sdus; size_t i; @@ -1052,7 +1052,7 @@ int fset_add(struct flow_set * set, sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); for (i = 0; i < sdus; i++) - shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1102,23 +1102,31 @@ int fqueue_next(struct fqueue * fq) if (fq == NULL) return -EINVAL; - if (fq->fqsize == 0) + if (fq->fqsize == 0 || fq->next == fq->fqsize) return -EPERM; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[fq->fqueue[fq->next++]].fd; + fd = ai.ports[fq->fqueue[fq->next]].fd; - pthread_rwlock_unlock(&ai.lock); + fq->next += 2; - if (fq->next == fq->fqsize) { - fq->fqsize = 0; - fq->next = 0; - } + pthread_rwlock_unlock(&ai.lock); return fd; } +enum fqtype fqueue_type(struct fqueue * fq) +{ + if (fq == NULL) + return -EINVAL; + + if (fq->fqsize == 0 || fq->next == 0) + return -EPERM; + + return fq->fqueue[fq->next - 1]; +} + int fevent(struct flow_set * set, struct fqueue * fq, const struct timespec * timeo) @@ -1130,11 +1138,9 @@ int fevent(struct flow_set * set, if (set == NULL || fq == NULL) return -EINVAL; - if (fq->fqsize > 0) + if (fq->fqsize > 0 && fq->next != fq->fqsize) return fq->fqsize; - assert(!fq->next); - if (timeo != NULL) { clock_gettime(PTHREAD_COND_CLOCK, &abstime); ts_add(&abstime, timeo, &abstime); @@ -1147,7 +1153,8 @@ int fevent(struct flow_set * set, return -ETIMEDOUT; } - fq->fqsize = ret; + fq->fqsize = ret << 1; + fq->next = 0; assert(ret); @@ -1365,9 +1372,9 @@ int ipcp_flow_write(int fd, return -ENOMEM; } - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1454,20 +1461,23 @@ ssize_t local_flow_read(int fd) int local_flow_write(int fd, size_t idx) { - int ret; + struct flow * flow; + int ret; assert(fd >= 0); + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); |