From 38cfdc212c623a46038f005b0c1604c3fdaf3762 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Mon, 14 May 2018 09:20:44 +0200 Subject: lib: Add event types to fqueue The event type of the current event in the fqueue can now be requested using the fqueue_type() command. Currently events for packets (FLOW_PKT), flows (FLOW_UP, FLOW_DOWN) and allocation (FLOW_ALLOC, FLOW_DEALLOC) are specified. The implementation only tracks FLOW_PKT at this point. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- doc/man/fqueue.3 | 31 +++++++++++++++++++++--- include/ouroboros/fqueue.h | 40 +++++++++++++++++++------------ include/ouroboros/shm_flow_set.h | 3 ++- src/lib/dev.c | 52 ++++++++++++++++++++++++---------------- src/lib/shm_flow_set.c | 38 ++++++++++++++++------------- 5 files changed, 108 insertions(+), 56 deletions(-) diff --git a/doc/man/fqueue.3 b/doc/man/fqueue.3 index 00c28d4c..abd21cfe 100644 --- a/doc/man/fqueue.3 +++ b/doc/man/fqueue.3 @@ -19,6 +19,8 @@ on flows \fBint fqueue_next(fqueue_t * \fIfq\fB); +\fBint fqueue_type(fqueue_t * \fIfq\fB); + \fBint fevent(fset_t * \fIset\fB, fqueue_t * \fIfq\fB, const struct timespec * \fItimeo\fB); @@ -36,6 +38,22 @@ an \fBfqueue_t\fR \fIfq\fR. The \fBfqueue_next\fR() function retrieves the next event (a \fIflow descriptor\fR) that is ready within the event queue \fIfq\fR. +The \fBfqueue_type\fR() function retrieves the type for the current +event on the fd that was returned by \fBfqueue_next\fR(). Event types +are: +.RS 4 +FLOW_PKT: A new packet arrived on this flow and is ready for reading. + +FLOW_UP: The flow is now marked UP and ready for read/write. + +FLOW_DOWN: The flow is now marked DOWN and cannot be written to. + +FLOW_ALLOC: A pending flow is now allocated. + +FLOW_DEALLOC: The flow is deallocated by the other (N+1 or N-1) +process. +.RE + The \fBfevent\fR() function retrieves all events that occured on any \fIflow descriptor\fR within \fIset\fR and returns them in the event queue \fBfq\fR. If a \fBstruct timespec *\fI timeo\fR can be provided, @@ -50,7 +68,14 @@ On success, \fBfqueue_create\fR() returns a pointer to an \fBfqueue_destroy\fR() has no return value. -On success, \fBfevent\fR() returns the number of events that occured in \fIset\fR. +On success, \fBfevent\fR() returns the number of events that occured +in \fIset\fR. + +On success, \fBfqueue_next\fR() returns the next file descriptor for +which an event occurred. + +On success, \fBfqueue_type\fR() returns the event type for the last +event returned by \fBfqueue_next\fR(). .SH ERRORS @@ -62,10 +87,10 @@ were available to create the \fBfqueue_t\fR. .B -EINVAL An invalid argument was passed (\fIfq\fR or \fIset\fR was \fINULL\fR). -In addition, \fBfqueue_next\fR() can return +In addition, \fBfqueue_next\fR() or \fBqueue_type\fR() can return .B -EPERM -No more fds available in \fIfq\fR. +No more fds available or no current event in \fIfq\fR. and \fBfevent\fR() can return diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h index 1b102669..8a5dc988 100644 --- a/include/ouroboros/fqueue.h +++ b/include/ouroboros/fqueue.h @@ -28,6 +28,14 @@ #include #include +enum fqtype { + FLOW_PKT = 0, + FLOW_DOWN, + FLOW_UP, + FLOW_ALLOC, + FLOW_DEALLOC +}; + struct flow_set; struct fqueue; @@ -37,30 +45,32 @@ typedef struct fqueue fqueue_t; __BEGIN_DECLS -fset_t * fset_create(void); +fset_t * fset_create(void); + +void fset_destroy(fset_t * set); -void fset_destroy(fset_t * set); +fqueue_t * fqueue_create(void); -fqueue_t * fqueue_create(void); +void fqueue_destroy(struct fqueue * fq); -void fqueue_destroy(struct fqueue * fq); +void fset_zero(fset_t * set); -void fset_zero(fset_t * set); +int fset_add(fset_t * set, + int fd); -int fset_add(fset_t * set, - int fd); +bool fset_has(const fset_t * set, + int fd); -bool fset_has(const fset_t * set, - int fd); +void fset_del(fset_t * set, + int fd); -void fset_del(fset_t * set, - int fd); +int fqueue_next(fqueue_t * fq); -int fqueue_next(fqueue_t * fq); +enum fqtype fqueue_type(fqueue_t * fq); -int fevent(fset_t * set, - fqueue_t * fq, - const struct timespec * timeo); +int fevent(fset_t * set, + fqueue_t * fq, + const struct timespec * timeo); __END_DECLS diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h index 76849137..ebf63af5 100644 --- a/include/ouroboros/shm_flow_set.h +++ b/include/ouroboros/shm_flow_set.h @@ -53,7 +53,8 @@ void shm_flow_set_del(struct shm_flow_set * shm_set, int port_id); void shm_flow_set_notify(struct shm_flow_set * set, - int port_id); + int port_id, + int event); ssize_t shm_flow_set_wait(const struct shm_flow_set * shm_set, size_t idx, diff --git a/src/lib/dev.c b/src/lib/dev.c index edcf56ed..3d854c2a 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -65,7 +65,7 @@ struct flow_set { }; struct fqueue { - int fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */ + int fqueue[2 * SHM_BUFFER_SIZE]; /* Safe copy from shm. */ size_t fqsize; size_t next; }; @@ -875,7 +875,7 @@ ssize_t flow_write(int fd, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1039,7 +1039,7 @@ void fset_zero(struct flow_set * set) int fset_add(struct flow_set * set, int fd) { - int ret; + int ret; size_t sdus; size_t i; @@ -1052,7 +1052,7 @@ int fset_add(struct flow_set * set, sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); for (i = 0; i < sdus; i++) - shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1102,23 +1102,31 @@ int fqueue_next(struct fqueue * fq) if (fq == NULL) return -EINVAL; - if (fq->fqsize == 0) + if (fq->fqsize == 0 || fq->next == fq->fqsize) return -EPERM; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[fq->fqueue[fq->next++]].fd; + fd = ai.ports[fq->fqueue[fq->next]].fd; - pthread_rwlock_unlock(&ai.lock); + fq->next += 2; - if (fq->next == fq->fqsize) { - fq->fqsize = 0; - fq->next = 0; - } + pthread_rwlock_unlock(&ai.lock); return fd; } +enum fqtype fqueue_type(struct fqueue * fq) +{ + if (fq == NULL) + return -EINVAL; + + if (fq->fqsize == 0 || fq->next == 0) + return -EPERM; + + return fq->fqueue[fq->next - 1]; +} + int fevent(struct flow_set * set, struct fqueue * fq, const struct timespec * timeo) @@ -1130,11 +1138,9 @@ int fevent(struct flow_set * set, if (set == NULL || fq == NULL) return -EINVAL; - if (fq->fqsize > 0) + if (fq->fqsize > 0 && fq->next != fq->fqsize) return fq->fqsize; - assert(!fq->next); - if (timeo != NULL) { clock_gettime(PTHREAD_COND_CLOCK, &abstime); ts_add(&abstime, timeo, &abstime); @@ -1147,7 +1153,8 @@ int fevent(struct flow_set * set, return -ETIMEDOUT; } - fq->fqsize = ret; + fq->fqsize = ret << 1; + fq->next = 0; assert(ret); @@ -1365,9 +1372,9 @@ int ipcp_flow_write(int fd, return -ENOMEM; } - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1454,20 +1461,23 @@ ssize_t local_flow_read(int fd) int local_flow_write(int fd, size_t idx) { - int ret; + struct flow * flow; + int ret; assert(fd >= 0); + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index d2107fc3..bb9e3caa 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -27,7 +27,6 @@ #include #include #include -#include #include #include @@ -54,24 +53,29 @@ #define FN_MAX_CHARS 255 -#define FQUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(int)) +#define QUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(struct portevent)) #define SHM_FLOW_SET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \ + PROG_MAX_FQUEUES * sizeof(size_t) \ + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \ - + PROG_MAX_FQUEUES * FQUEUESIZE \ + + PROG_MAX_FQUEUES * QUEUESIZE \ + sizeof(pthread_mutex_t)) #define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx) +struct portevent { + int port_id; + int event; +}; + struct shm_flow_set { - ssize_t * mtable; - size_t * heads; - pthread_cond_t * conds; - int * fqueues; - pthread_mutex_t * lock; + ssize_t * mtable; + size_t * heads; + pthread_cond_t * conds; + struct portevent * fqueues; + pthread_mutex_t * lock; - pid_t pid; + pid_t pid; }; struct shm_flow_set * shm_flow_set_create() @@ -125,7 +129,7 @@ struct shm_flow_set * shm_flow_set_create() set->mtable = shm_base; set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS); set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); - set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES); + set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES); set->lock = (pthread_mutex_t *) (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); @@ -191,10 +195,9 @@ struct shm_flow_set * shm_flow_set_open(pid_t pid) set->mtable = shm_base; set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS); set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); - set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES); + set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES); set->lock = (pthread_mutex_t *) (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); - set->pid = pid; return set; @@ -316,7 +319,8 @@ int shm_flow_set_has(struct shm_flow_set * set, } void shm_flow_set_notify(struct shm_flow_set * set, - int port_id) + int port_id, + int event) { assert(set); assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); @@ -328,8 +332,10 @@ void shm_flow_set_notify(struct shm_flow_set * set, return; } - *(fqueue_ptr(set, set->mtable[port_id]) + - (set->heads[set->mtable[port_id]])++) = port_id; + (fqueue_ptr(set, set->mtable[port_id]) + + (set->heads[set->mtable[port_id]]))->port_id = port_id; + (fqueue_ptr(set, set->mtable[port_id]) + + (set->heads[set->mtable[port_id]])++)->event = event; pthread_cond_signal(&set->conds[set->mtable[port_id]]); @@ -380,7 +386,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set, if (ret != -ETIMEDOUT) { memcpy(fqueue, fqueue_ptr(set, idx), - set->heads[idx] * sizeof(int)); + set->heads[idx] * sizeof(struct portevent)); ret = set->heads[idx]; set->heads[idx] = 0; } -- cgit v1.2.3