diff options
| -rw-r--r-- | doc/man/fqueue.3 | 31 | ||||
| -rw-r--r-- | include/ouroboros/fqueue.h | 40 | ||||
| -rw-r--r-- | include/ouroboros/shm_flow_set.h | 3 | ||||
| -rw-r--r-- | src/lib/dev.c | 52 | ||||
| -rw-r--r-- | src/lib/shm_flow_set.c | 38 | 
5 files changed, 108 insertions, 56 deletions
diff --git a/doc/man/fqueue.3 b/doc/man/fqueue.3 index 00c28d4c..abd21cfe 100644 --- a/doc/man/fqueue.3 +++ b/doc/man/fqueue.3 @@ -19,6 +19,8 @@ on flows  \fBint fqueue_next(fqueue_t * \fIfq\fB); +\fBint fqueue_type(fqueue_t * \fIfq\fB); +  \fBint fevent(fset_t * \fIset\fB, fqueue_t * \fIfq\fB,  const struct timespec * \fItimeo\fB); @@ -36,6 +38,22 @@ an \fBfqueue_t\fR \fIfq\fR.  The \fBfqueue_next\fR() function retrieves the next event (a \fIflow  descriptor\fR) that is ready within the event queue \fIfq\fR. +The \fBfqueue_type\fR() function retrieves the type for the current +event on the fd that was returned by \fBfqueue_next\fR(). Event types +are: +.RS 4 +FLOW_PKT: A new packet arrived on this flow and is ready for reading. + +FLOW_UP: The flow is now marked UP and ready for read/write. + +FLOW_DOWN: The flow is now marked DOWN and cannot be written to. + +FLOW_ALLOC: A pending flow is now allocated. + +FLOW_DEALLOC: The flow is deallocated by the other (N+1 or N-1) +process. +.RE +  The \fBfevent\fR() function retrieves all events that occured on any  \fIflow descriptor\fR within \fIset\fR and returns them in the event  queue \fBfq\fR. If a \fBstruct timespec *\fI timeo\fR can be provided, @@ -50,7 +68,14 @@ On success, \fBfqueue_create\fR() returns a pointer to an  \fBfqueue_destroy\fR() has no return value. -On success, \fBfevent\fR() returns the number of events that occured in \fIset\fR. +On success, \fBfevent\fR() returns the number of events that occured +in \fIset\fR. + +On success, \fBfqueue_next\fR() returns the next file descriptor for +which an event occurred. + +On success, \fBfqueue_type\fR() returns the event type for the last +event returned by \fBfqueue_next\fR().  .SH ERRORS @@ -62,10 +87,10 @@ were available to create the \fBfqueue_t\fR.  .B -EINVAL  An invalid argument was passed (\fIfq\fR or \fIset\fR was \fINULL\fR). -In addition, \fBfqueue_next\fR() can return +In addition, \fBfqueue_next\fR() or \fBqueue_type\fR() can return  .B -EPERM -No more fds available in \fIfq\fR. +No more fds available or no current event in \fIfq\fR.  and \fBfevent\fR() can return diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h index 1b102669..8a5dc988 100644 --- a/include/ouroboros/fqueue.h +++ b/include/ouroboros/fqueue.h @@ -28,6 +28,14 @@  #include <stdbool.h>  #include <time.h> +enum fqtype { +        FLOW_PKT = 0, +        FLOW_DOWN, +        FLOW_UP, +        FLOW_ALLOC, +        FLOW_DEALLOC +}; +  struct flow_set;  struct fqueue; @@ -37,30 +45,32 @@ typedef struct fqueue fqueue_t;  __BEGIN_DECLS -fset_t *   fset_create(void); +fset_t *    fset_create(void); + +void        fset_destroy(fset_t * set); -void       fset_destroy(fset_t * set); +fqueue_t *  fqueue_create(void); -fqueue_t * fqueue_create(void); +void        fqueue_destroy(struct fqueue * fq); -void       fqueue_destroy(struct fqueue * fq); +void        fset_zero(fset_t * set); -void       fset_zero(fset_t * set); +int         fset_add(fset_t * set, +                     int      fd); -int        fset_add(fset_t * set, -                    int      fd); +bool        fset_has(const fset_t * set, +                     int            fd); -bool       fset_has(const fset_t * set, -                    int            fd); +void        fset_del(fset_t * set, +                     int      fd); -void       fset_del(fset_t * set, -                    int      fd); +int         fqueue_next(fqueue_t * fq); -int        fqueue_next(fqueue_t * fq); +enum fqtype fqueue_type(fqueue_t * fq); -int        fevent(fset_t *                set, -                  fqueue_t *              fq, -                  const struct timespec * timeo); +int         fevent(fset_t *                set, +                   fqueue_t *              fq, +                   const struct timespec * timeo);  __END_DECLS diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h index 76849137..ebf63af5 100644 --- a/include/ouroboros/shm_flow_set.h +++ b/include/ouroboros/shm_flow_set.h @@ -53,7 +53,8 @@ void                  shm_flow_set_del(struct shm_flow_set * shm_set,                                         int                   port_id);  void                  shm_flow_set_notify(struct shm_flow_set * set, -                                          int                   port_id); +                                          int                   port_id, +                                          int                   event);  ssize_t               shm_flow_set_wait(const struct shm_flow_set * shm_set,                                          size_t                      idx, diff --git a/src/lib/dev.c b/src/lib/dev.c index edcf56ed..3d854c2a 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -65,7 +65,7 @@ struct flow_set {  };  struct fqueue { -        int    fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */ +        int    fqueue[2 * SHM_BUFFER_SIZE]; /* Safe copy from shm. */          size_t fqsize;          size_t next;  }; @@ -875,7 +875,7 @@ ssize_t flow_write(int          fd,          if (ret < 0)                  shm_rdrbuff_remove(ai.rdrb, idx);          else -                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); +                shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);          pthread_rwlock_unlock(&ai.lock); @@ -1039,7 +1039,7 @@ void fset_zero(struct flow_set * set)  int fset_add(struct flow_set * set,               int               fd)  { -        int ret; +        int    ret;          size_t sdus;          size_t i; @@ -1052,7 +1052,7 @@ int fset_add(struct flow_set * set,          sdus = shm_rbuff_queued(ai.flows[fd].rx_rb);          for (i = 0; i < sdus; i++) -                shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); +                shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT);          pthread_rwlock_unlock(&ai.lock); @@ -1102,23 +1102,31 @@ int fqueue_next(struct fqueue * fq)          if (fq == NULL)                  return -EINVAL; -        if (fq->fqsize == 0) +        if (fq->fqsize == 0 || fq->next == fq->fqsize)                  return -EPERM;          pthread_rwlock_rdlock(&ai.lock); -        fd = ai.ports[fq->fqueue[fq->next++]].fd; +        fd = ai.ports[fq->fqueue[fq->next]].fd; -        pthread_rwlock_unlock(&ai.lock); +        fq->next += 2; -        if (fq->next == fq->fqsize) { -                fq->fqsize = 0; -                fq->next = 0; -        } +        pthread_rwlock_unlock(&ai.lock);          return fd;  } +enum fqtype fqueue_type(struct fqueue * fq) +{ +        if (fq == NULL) +                return -EINVAL; + +        if (fq->fqsize == 0 || fq->next == 0) +                return -EPERM; + +        return fq->fqueue[fq->next - 1]; +} +  int fevent(struct flow_set *       set,             struct fqueue *         fq,             const struct timespec * timeo) @@ -1130,11 +1138,9 @@ int fevent(struct flow_set *       set,          if (set == NULL || fq == NULL)                  return -EINVAL; -        if (fq->fqsize > 0) +        if (fq->fqsize > 0 && fq->next != fq->fqsize)                  return fq->fqsize; -        assert(!fq->next); -          if (timeo != NULL) {                  clock_gettime(PTHREAD_COND_CLOCK, &abstime);                  ts_add(&abstime, timeo, &abstime); @@ -1147,7 +1153,8 @@ int fevent(struct flow_set *       set,                  return -ETIMEDOUT;          } -        fq->fqsize = ret; +        fq->fqsize = ret << 1; +        fq->next   = 0;          assert(ret); @@ -1365,9 +1372,9 @@ int ipcp_flow_write(int                  fd,                  return -ENOMEM;          } -        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); +        ret = shm_rbuff_write(flow->tx_rb, idx);          if (ret == 0) -                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); +                shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);          pthread_rwlock_unlock(&ai.lock); @@ -1454,20 +1461,23 @@ ssize_t local_flow_read(int fd)  int local_flow_write(int    fd,                       size_t idx)  { -        int ret; +        struct flow * flow; +        int           ret;          assert(fd >= 0); +        flow = &ai.flows[fd]; +          pthread_rwlock_rdlock(&ai.lock); -        if (ai.flows[fd].port_id < 0) { +        if (flow->port_id < 0) {                  pthread_rwlock_unlock(&ai.lock);                  return -ENOTALLOC;          } -        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); +        ret = shm_rbuff_write(flow->tx_rb, idx);          if (ret == 0) -                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); +                shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);          pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index d2107fc3..bb9e3caa 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -27,7 +27,6 @@  #include <ouroboros/lockfile.h>  #include <ouroboros/time_utils.h>  #include <ouroboros/shm_flow_set.h> -#include <ouroboros/fqueue.h>  #include <ouroboros/errno.h>  #include <pthread.h> @@ -54,24 +53,29 @@  #define FN_MAX_CHARS 255 -#define FQUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(int)) +#define QUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(struct portevent))  #define SHM_FLOW_SET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t)             \                                  + PROG_MAX_FQUEUES * sizeof(size_t)         \                                  + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \ -                                + PROG_MAX_FQUEUES * FQUEUESIZE             \ +                                + PROG_MAX_FQUEUES * QUEUESIZE              \                                  + sizeof(pthread_mutex_t))  #define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx) +struct portevent { +        int port_id; +        int event; +}; +  struct shm_flow_set { -        ssize_t *         mtable; -        size_t *          heads; -        pthread_cond_t *  conds; -        int *             fqueues; -        pthread_mutex_t * lock; +        ssize_t *          mtable; +        size_t *           heads; +        pthread_cond_t *   conds; +        struct portevent * fqueues; +        pthread_mutex_t *  lock; -        pid_t             pid; +        pid_t pid;  };  struct shm_flow_set * shm_flow_set_create() @@ -125,7 +129,7 @@ struct shm_flow_set * shm_flow_set_create()          set->mtable  = shm_base;          set->heads   = (size_t *) (set->mtable + SYS_MAX_FLOWS);          set->conds   = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); -        set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES); +        set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES);          set->lock    = (pthread_mutex_t *)                  (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); @@ -191,10 +195,9 @@ struct shm_flow_set * shm_flow_set_open(pid_t pid)          set->mtable  = shm_base;          set->heads   = (size_t *) (set->mtable + SYS_MAX_FLOWS);          set->conds   = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); -        set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES); +        set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES);          set->lock    = (pthread_mutex_t *)                  (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); -          set->pid = pid;          return set; @@ -316,7 +319,8 @@ int shm_flow_set_has(struct shm_flow_set * set,  }  void shm_flow_set_notify(struct shm_flow_set * set, -                         int                   port_id) +                         int                   port_id, +                         int                   event)  {          assert(set);          assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); @@ -328,8 +332,10 @@ void shm_flow_set_notify(struct shm_flow_set * set,                  return;          } -        *(fqueue_ptr(set, set->mtable[port_id]) + -                     (set->heads[set->mtable[port_id]])++) = port_id; +        (fqueue_ptr(set, set->mtable[port_id]) + +         (set->heads[set->mtable[port_id]]))->port_id = port_id; +        (fqueue_ptr(set, set->mtable[port_id]) + +         (set->heads[set->mtable[port_id]])++)->event = event;          pthread_cond_signal(&set->conds[set->mtable[port_id]]); @@ -380,7 +386,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,          if (ret != -ETIMEDOUT) {                  memcpy(fqueue,                         fqueue_ptr(set, idx), -                       set->heads[idx] * sizeof(int)); +                       set->heads[idx] * sizeof(struct portevent));                  ret = set->heads[idx];                  set->heads[idx] = 0;          }  | 
