diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 98 | 
1 files changed, 49 insertions, 49 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 135324ea..1478d0bb 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -90,6 +90,8 @@ struct port {          ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti)))  struct flow { +        struct list_head      next; +          struct shm_rbuff *    rx_rb;          struct shm_rbuff *    tx_rb;          struct shm_flow_set * set; @@ -136,11 +138,9 @@ struct {          struct flow *         flows;          struct port *         ports; +        struct list_head      flow_list;          pthread_t             mon; -        int                   min_timeo; -        int                   min_fd; -        int                   max_fd;          pthread_t             tx;          size_t                n_frcti; @@ -267,55 +267,64 @@ void * frct_tx(void * o)          return (void *) 0;  } -static void flow_send_keepalive(int fd) +static void flow_send_keepalive(struct flow * flow, +                                struct timespec now)  { -        flow_write(fd, NULL, 0); +        struct shm_du_buff * sdb; +        ssize_t              idx; +        uint8_t *            ptr; + +        idx = shm_rdrbuff_alloc(ai.rdrb, 0, &ptr, &sdb); +        if (idx < 0) +                return; + +        pthread_rwlock_wrlock(&ai.lock); + +        flow->snd_act = now; + +        if (shm_rbuff_write(flow->tx_rb, idx)) +                shm_rdrbuff_remove(ai.rdrb, idx); +        else +                shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + +        pthread_rwlock_unlock(&ai.lock);  } -static void flow_keepalive(int fd) +/* Needs rdlock on ai. */ +static void _flow_keepalive(struct flow * flow)  {          struct timespec    now;          struct timespec    s_act;          struct timespec    r_act; -        struct flow *      flow;          int                flow_id;          uint32_t           timeo; -        struct shm_rbuff * rb;          uint32_t           acl; -        flow = &ai.flows[fd]; - -        pthread_rwlock_rdlock(&ai.lock); - -        if (flow->flow_id < 0) { -                pthread_rwlock_unlock(&ai.lock); -                return; -        } -          s_act = flow->snd_act;          r_act = flow->rcv_act;          flow_id = flow->flow_id;          timeo   = flow->qs.timeout; -        rb = flow->rx_rb; - -        pthread_rwlock_unlock(&ai.lock); - -        acl = shm_rbuff_get_acl(rb); +        acl = shm_rbuff_get_acl(flow->rx_rb);          if (timeo == 0 ||  acl & (ACL_FLOWPEER | ACL_FLOWDOWN))                  return;          clock_gettime(PTHREAD_COND_CLOCK, &now);          if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { -                shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWPEER); +                shm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER);                  shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);                  return;          } -        if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) -                flow_send_keepalive(fd); +        if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) { +                pthread_rwlock_unlock(&ai.lock); + +                flow_send_keepalive(flow, now); + +                pthread_rwlock_rdlock(&ai.lock); +        }  }  void * monitor(void * o) @@ -325,17 +334,17 @@ void * monitor(void * o)          (void) o;          while (true) { -                int i; -                int min; -                int max; +                struct list_head * p; +                struct list_head * h;                  pthread_rwlock_rdlock(&ai.lock); -                min = ai.min_fd; -                max = ai.max_fd; -                pthread_rwlock_unlock(&ai.lock); -                for (i = min; i <= max; ++i) -                        flow_keepalive(i); +                list_for_each_safe(p, h, &ai.flow_list) { +                        struct flow * flow  = list_entry(p, struct flow, next); +                        _flow_keepalive(flow); +                } + +                pthread_rwlock_unlock(&ai.lock);                  nanosleep(&tic, NULL);          } @@ -390,13 +399,9 @@ static void flow_fini(int fd)          if (ai.flows[fd].ctx != NULL)                  crypt_fini(ai.flows[fd].ctx); -        flow_clear(fd); - -        while (ai.flows[ai.max_fd].flow_id == -1 && ai.max_fd > ai.min_fd) -                --ai.max_fd; +        list_del(&ai.flows[fd].next); -        while (ai.flows[ai.min_fd].flow_id == -1 && ai.min_fd < ai.max_fd) -                ++ai.min_fd; +        flow_clear(fd);  }  static int flow_init(int       flow_id, @@ -420,12 +425,6 @@ static int flow_init(int       flow_id,                  goto fail_fds;          } -        if (fd > ai.max_fd) -                ai.max_fd = fd; - -        if (fd < ai.min_fd) -                ai.min_fd = fd; -          flow = &ai.flows[fd];          flow->rx_rb = shm_rbuff_open(getpid(), flow_id); @@ -469,6 +468,8 @@ static int flow_init(int       flow_id,                          goto fail_tx_thread;          } +        list_add_tail(&flow->next, &ai.flow_list); +          ai.ports[flow_id].fd = fd;          port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED); @@ -527,13 +528,12 @@ static void init(int     argc,                  gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0);          }  #endif +        list_head_init(&ai.flow_list); +          ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS);          if (ai.fds == NULL)                  goto fail_fds; -        ai.min_fd = PROG_RES_FDS; -        ai.max_fd = PROG_RES_FDS; -          ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);          if (ai.fqueues == NULL)                  goto fail_fqueues; @@ -1219,7 +1219,7 @@ ssize_t flow_write(int          fd,          uint8_t *            ptr;          if (buf == NULL && count != 0) -                return 0; +                return -EINVAL;          if (fd < 0 || fd >= PROG_MAX_FLOWS)                  return -EBADF;  | 
