diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib/dev.c | 52 | ||||
-rw-r--r-- | src/lib/shm_flow_set.c | 38 |
2 files changed, 53 insertions, 37 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index edcf56ed..3d854c2a 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -65,7 +65,7 @@ struct flow_set { }; struct fqueue { - int fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */ + int fqueue[2 * SHM_BUFFER_SIZE]; /* Safe copy from shm. */ size_t fqsize; size_t next; }; @@ -875,7 +875,7 @@ ssize_t flow_write(int fd, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1039,7 +1039,7 @@ void fset_zero(struct flow_set * set) int fset_add(struct flow_set * set, int fd) { - int ret; + int ret; size_t sdus; size_t i; @@ -1052,7 +1052,7 @@ int fset_add(struct flow_set * set, sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); for (i = 0; i < sdus; i++) - shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1102,23 +1102,31 @@ int fqueue_next(struct fqueue * fq) if (fq == NULL) return -EINVAL; - if (fq->fqsize == 0) + if (fq->fqsize == 0 || fq->next == fq->fqsize) return -EPERM; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[fq->fqueue[fq->next++]].fd; + fd = ai.ports[fq->fqueue[fq->next]].fd; - pthread_rwlock_unlock(&ai.lock); + fq->next += 2; - if (fq->next == fq->fqsize) { - fq->fqsize = 0; - fq->next = 0; - } + pthread_rwlock_unlock(&ai.lock); return fd; } +enum fqtype fqueue_type(struct fqueue * fq) +{ + if (fq == NULL) + return -EINVAL; + + if (fq->fqsize == 0 || fq->next == 0) + return -EPERM; + + return fq->fqueue[fq->next - 1]; +} + int fevent(struct flow_set * set, struct fqueue * fq, const struct timespec * timeo) @@ -1130,11 +1138,9 @@ int fevent(struct flow_set * set, if (set == NULL || fq == NULL) return -EINVAL; - if (fq->fqsize > 0) + if (fq->fqsize > 0 && fq->next != fq->fqsize) return fq->fqsize; - assert(!fq->next); - if (timeo != NULL) { clock_gettime(PTHREAD_COND_CLOCK, &abstime); ts_add(&abstime, timeo, &abstime); @@ -1147,7 +1153,8 @@ int fevent(struct flow_set * set, return -ETIMEDOUT; } - fq->fqsize = ret; + fq->fqsize = ret << 1; + fq->next = 0; assert(ret); @@ -1365,9 +1372,9 @@ int ipcp_flow_write(int fd, return -ENOMEM; } - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1454,20 +1461,23 @@ ssize_t local_flow_read(int fd) int local_flow_write(int fd, size_t idx) { - int ret; + struct flow * flow; + int ret; assert(fd >= 0); + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index d2107fc3..bb9e3caa 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -27,7 +27,6 @@ #include <ouroboros/lockfile.h> #include <ouroboros/time_utils.h> #include <ouroboros/shm_flow_set.h> -#include <ouroboros/fqueue.h> #include <ouroboros/errno.h> #include <pthread.h> @@ -54,24 +53,29 @@ #define FN_MAX_CHARS 255 -#define FQUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(int)) +#define QUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(struct portevent)) #define SHM_FLOW_SET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \ + PROG_MAX_FQUEUES * sizeof(size_t) \ + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \ - + PROG_MAX_FQUEUES * FQUEUESIZE \ + + PROG_MAX_FQUEUES * QUEUESIZE \ + sizeof(pthread_mutex_t)) #define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx) +struct portevent { + int port_id; + int event; +}; + struct shm_flow_set { - ssize_t * mtable; - size_t * heads; - pthread_cond_t * conds; - int * fqueues; - pthread_mutex_t * lock; + ssize_t * mtable; + size_t * heads; + pthread_cond_t * conds; + struct portevent * fqueues; + pthread_mutex_t * lock; - pid_t pid; + pid_t pid; }; struct shm_flow_set * shm_flow_set_create() @@ -125,7 +129,7 @@ struct shm_flow_set * shm_flow_set_create() set->mtable = shm_base; set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS); set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); - set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES); + set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES); set->lock = (pthread_mutex_t *) (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); @@ -191,10 +195,9 @@ struct shm_flow_set * shm_flow_set_open(pid_t pid) set->mtable = shm_base; set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS); set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); - set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES); + set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES); set->lock = (pthread_mutex_t *) (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); - set->pid = pid; return set; @@ -316,7 +319,8 @@ int shm_flow_set_has(struct shm_flow_set * set, } void shm_flow_set_notify(struct shm_flow_set * set, - int port_id) + int port_id, + int event) { assert(set); assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); @@ -328,8 +332,10 @@ void shm_flow_set_notify(struct shm_flow_set * set, return; } - *(fqueue_ptr(set, set->mtable[port_id]) + - (set->heads[set->mtable[port_id]])++) = port_id; + (fqueue_ptr(set, set->mtable[port_id]) + + (set->heads[set->mtable[port_id]]))->port_id = port_id; + (fqueue_ptr(set, set->mtable[port_id]) + + (set->heads[set->mtable[port_id]])++)->event = event; pthread_cond_signal(&set->conds[set->mtable[port_id]]); @@ -380,7 +386,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set, if (ret != -ETIMEDOUT) { memcpy(fqueue, fqueue_ptr(set, idx), - set->heads[idx] * sizeof(int)); + set->heads[idx] * sizeof(struct portevent)); ret = set->heads[idx]; set->heads[idx] = 0; } |