diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 304 | 
1 files changed, 130 insertions, 174 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index b3e9c69e..5a57aa08 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -123,9 +123,6 @@ struct flow_set_entry {  struct flow_set {          size_t           idx; -        struct timespec  chk;   /* Last keepalive check.          */ -        uint32_t         min;   /* Minimum keepalive time in set. */ -          struct list_head flows;          pthread_rwlock_t lock;  }; @@ -148,6 +145,11 @@ struct {          struct flow *         flows;          struct port *         ports; +        pthread_t             mon; +        int                   min_timeo; +        int                   min_fd; +        int                   max_fd; +          pthread_t             tx;          size_t                n_frcti; @@ -255,14 +257,6 @@ static int proc_announce(char * prog)          return ret;  } -static void flow_clear(int fd) -{ -        memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); - -        ai.flows[fd].flow_id  = -1; -        ai.flows[fd].pid      = -1; -} -  #include "crypt.c"  #include "frct.c" @@ -281,6 +275,90 @@ void * frct_tx(void * o)          return (void *) 0;  } +static void flow_send_keepalive(int fd) +{ +        flow_write(fd, NULL, 0); +} + +static void flow_keepalive(int fd) +{ +        struct timespec    now; +        struct timespec    s_act; +        struct timespec    r_act; +        struct flow *      flow; +        int                flow_id; +        uint32_t           timeo; +        struct shm_rbuff * rb; +        uint32_t           acl; + +        flow = &ai.flows[fd]; + +        pthread_rwlock_rdlock(&ai.lock); + +        if (flow->flow_id < 0) { +                pthread_rwlock_unlock(&ai.lock); +                return; +        } + +        s_act = flow->snd_act; +        r_act = flow->rcv_act; + +        flow_id = flow->flow_id; +        timeo   = flow->qs.timeout; + +        rb = flow->rx_rb; + +        pthread_rwlock_unlock(&ai.lock); + +        acl = shm_rbuff_get_acl(rb); +        if (timeo == 0 ||  acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) +                return; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { +                shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWPEER); +                shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); +                return; +        } + +        if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) +                flow_send_keepalive(fd); +} + +void * monitor(void * o) +{ +        struct timespec tic = {0, TICTIME}; + +        (void) o; + +        while (true) { +                int i; +                int min; +                int max; + +                pthread_rwlock_rdlock(&ai.lock); +                min = ai.min_fd; +                max = ai.max_fd; +                pthread_rwlock_unlock(&ai.lock); + +                for (i = min; i <= max; ++i) +                        flow_keepalive(i); + +                nanosleep(&tic, NULL); +        } + +        return (void *) 0; +} + +static void flow_clear(int fd) +{ +        memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); + +        ai.flows[fd].flow_id  = -1; +        ai.flows[fd].pid      = -1; +} +  static void flow_fini(int fd)  {          assert(fd >= 0 && fd < SYS_MAX_FLOWS); @@ -299,7 +377,6 @@ static void flow_fini(int fd)                  bmp_release(ai.fds, fd);          } -          if (ai.flows[fd].rx_rb != NULL) {                  shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);                  shm_rbuff_close(ai.flows[fd].rx_rb); @@ -321,6 +398,12 @@ static void flow_fini(int fd)                  crypt_fini(ai.flows[fd].ctx);          flow_clear(fd); + +        while (ai.flows[ai.max_fd].flow_id == -1 && ai.max_fd > ai.min_fd) +                --ai.max_fd; + +        while (ai.flows[ai.min_fd].flow_id == -1 && ai.min_fd < ai.max_fd) +                ++ai.min_fd;  }  static int flow_init(int       flow_id, @@ -344,6 +427,12 @@ static int flow_init(int       flow_id,                  goto fail_fds;          } +        if (fd > ai.max_fd) +                ai.max_fd = fd; + +        if (fd < ai.min_fd) +                ai.min_fd = fd; +          flow = &ai.flows[fd];          flow->rx_rb = shm_rbuff_open(getpid(), flow_id); @@ -449,6 +538,9 @@ static void init(int     argc,          if (ai.fds == NULL)                  goto fail_fds; +        ai.min_fd = PROG_RES_FDS; +        ai.max_fd = PROG_RES_FDS; +          ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);          if (ai.fqueues == NULL)                  goto fail_fqueues; @@ -508,12 +600,17 @@ static void init(int     argc,                          goto fail_rib_init;          }  #endif +        if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0) +                goto fail_monitor; +          return; + fail_monitor:  #if defined PROC_FLOW_STATS +        rib_fini();   fail_rib_init: -        timerwheel_fini();  #endif +        timerwheel_fini();   fail_timerwheel:          shm_flow_set_close(ai.fqset);   fail_fqset: @@ -550,6 +647,9 @@ static void fini(void)          if (ai.fds == NULL)                  return; +        pthread_cancel(ai.mon); +        pthread_join(ai.mon, NULL); +          pthread_rwlock_wrlock(&ai.lock);          for (i = 0; i < PROG_MAX_FLOWS; ++i) { @@ -671,7 +771,7 @@ int flow_accept(qosspec_t *             qs,          if (fd < 0)                  return fd; -        pthread_rwlock_wrlock(&ai.lock); +        pthread_rwlock_rdlock(&ai.lock);          if (qs != NULL)                  *qs = ai.flows[fd].qs; @@ -1058,48 +1158,6 @@ static int add_crc(struct shm_du_buff * sdb)          return 0;  } -static void flow_send_keepalive(int fd) -{ -        flow_write(fd, NULL, 0); -} - -static int flow_keepalive(int fd) -{ -        struct timespec    now; -        struct timespec    s_act; -        struct timespec    r_act; -        struct flow *      flow; -        int                flow_id; -        uint32_t           timeo; - -        flow = &ai.flows[fd]; - -        clock_gettime(PTHREAD_COND_CLOCK, &now); - -        pthread_rwlock_rdlock(&ai.lock); - -        s_act = flow->snd_act; -        r_act = flow->rcv_act; - -        flow_id = flow->flow_id; -        timeo   = flow->qs.timeout; - -        pthread_rwlock_unlock(&ai.lock); - -        if (timeo == 0) -                return 0; - -        if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { -                shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); -                return -EFLOWPEER; -        } - -        if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) -                flow_send_keepalive(fd); - -        return 0; -} -  static int flow_tx_sdb(struct flow *        flow,                         struct shm_du_buff * sdb,                         bool                 block, @@ -1164,8 +1222,6 @@ ssize_t flow_write(int          fd,          int                  flags;          struct timespec      abs;          struct timespec *    abstime = NULL; -        struct timespec      tic = {0, TICTIME}; -        struct timespec      tictime;          struct shm_du_buff * sdb;          uint8_t *            ptr; @@ -1186,9 +1242,7 @@ ssize_t flow_write(int          fd,                  return -ENOTALLOC;          } -        ts_add(&tic, &abs, &tictime); - -        if (ai.flows[fd].snd_timesout) { +        if (flow->snd_timesout) {                  ts_add(&abs, &flow->snd_timeo, &abs);                  abstime = &abs;          } @@ -1205,17 +1259,9 @@ ssize_t flow_write(int          fd,                          return -EAGAIN;                  idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);          } else { -                while ((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) { -                        if (ret != -ETIMEDOUT) +                while ((ret = frcti_window_wait(flow->frcti, abstime)) < 0) { +                        if (ret < 0)                                  return ret; - -                        if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0) -                                return -ETIMEDOUT; - -                        if (flow_keepalive(fd)) -                                return -EFLOWPEER; - -                        ts_add(&tictime, &tic, &tictime);                  }                  idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);          } @@ -1226,7 +1272,7 @@ ssize_t flow_write(int          fd,          if (count > 0)                  memcpy(ptr, buf, count); -        ret = flow_tx_sdb(flow, sdb, flags & FLOWFWNOBLOCK, abstime); +        ret = flow_tx_sdb(flow, sdb, !(flags & FLOWFWNOBLOCK), abstime);          return ret < 0 ? (ssize_t) ret : (ssize_t) count;  } @@ -1259,8 +1305,6 @@ static ssize_t flow_rx_sdb(struct flow *         flow,          if (idx < 0)                  return idx; -        *sdb = shm_rdrbuff_get(ai.rdrb, idx); -          clock_gettime(PTHREAD_COND_CLOCK, &now);          pthread_rwlock_wrlock(&ai.lock); @@ -1269,6 +1313,7 @@ static ssize_t flow_rx_sdb(struct flow *         flow,          pthread_rwlock_unlock(&ai.lock); +        *sdb = shm_rdrbuff_get(ai.rdrb, idx);          if (invalid_pkt(flow, *sdb)) {                  shm_rdrbuff_remove(ai.rdrb, idx);                  return -EAGAIN; @@ -1287,8 +1332,6 @@ ssize_t flow_read(int    fd,          struct shm_du_buff * sdb;          struct timespec      abs;          struct timespec      now; -        struct timespec      tic = {0, TICTIME}; -        struct timespec      tictime;          struct timespec *    abstime = NULL;          struct flow *        flow;          bool                 block; @@ -1317,8 +1360,6 @@ ssize_t flow_read(int    fd,          block  = !(flow->oflags & FLOWFRNOBLOCK);          partrd = !(flow->oflags & FLOWFRNOPART); -        ts_add(&now, &tic, &tictime); -          if (flow->rcv_timesout) {                  ts_add(&now, &flow->rcv_timeo, &abs);                  abstime = &abs; @@ -1329,19 +1370,12 @@ ssize_t flow_read(int    fd,                  while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {                          pthread_rwlock_unlock(&ai.lock); -                        idx = flow_rx_sdb(flow, &sdb, block, &tictime); +                        idx = flow_rx_sdb(flow, &sdb, block, abstime);                          if (idx < 0) { -                                if (idx != -ETIMEDOUT && idx != -EAGAIN) +                                if (block && idx != -EAGAIN) +                                        return idx; +                                if (!block)                                          return idx; - -                                if (abstime != NULL -                                    && ts_diff_ns(&tictime, &abs) <= 0) -                                        return -ETIMEDOUT; - -                                if (flow_keepalive(fd) < 0) -                                        return -EFLOWPEER; - -                                ts_add(&tictime, &tic, &tictime);                                  pthread_rwlock_rdlock(&ai.lock);                                  continue; @@ -1399,9 +1433,6 @@ ssize_t flow_read(int    fd,  struct flow_set * fset_create()  {          struct flow_set * set; -        struct timespec   now; - -        clock_gettime(PTHREAD_COND_CLOCK, &now);          set = malloc(sizeof(*set));          if (set == NULL) @@ -1418,9 +1449,6 @@ struct flow_set * fset_create()          if (!bmp_is_id_valid(ai.fqueues, set->idx))                  goto fail_bmp_alloc; -        set->chk = now; -        set->min = UINT32_MAX; -          pthread_rwlock_unlock(&ai.lock);          list_head_init(&set->flows); @@ -1525,9 +1553,6 @@ int fset_add(struct flow_set * set,          pthread_rwlock_wrlock(&set->lock); -        if (flow->qs.timeout != 0 && flow->qs.timeout < set->min) -                set->min = flow->qs.timeout; -          list_add_tail(&fse->next, &set->flows);          pthread_rwlock_unlock(&set->lock); @@ -1551,15 +1576,12 @@ void fset_del(struct flow_set * set,          struct list_head * p;          struct list_head * h;          struct flow *      flow; -        uint32_t           min;          if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)                  return;          flow = &ai.flows[fd]; -        min = UINT32_MAX; -          pthread_rwlock_rdlock(&ai.lock);          if (flow->flow_id >= 0) @@ -1573,14 +1595,10 @@ void fset_del(struct flow_set * set,                  if (e->fd == fd) {                          list_del(&e->next);                          free(e); -                } else { -                        if (flow->qs.timeout != 0 && flow->qs.timeout < min) -                                min = flow->qs.timeout; +                        break;                  }          } -        set->min = min; -          pthread_rwlock_unlock(&set->lock);          pthread_rwlock_unlock(&ai.lock); @@ -1608,48 +1626,6 @@ bool fset_has(const struct flow_set * set,          return ret;  } -static void fset_keepalive(struct flow_set * set) -{ -        struct timespec    now; -        struct list_head * p; -        struct list_head * h; -        struct list_head   copy; - -        clock_gettime(PTHREAD_COND_CLOCK, &now); - -        pthread_rwlock_wrlock(&set->lock); - -        if (ts_diff_ns(&now, &set->chk) < set->min >> 2) { -                pthread_rwlock_unlock(&set->lock); -                return; -        } - -        set->chk = now; - -        list_head_init(©); - -        list_for_each(p, &set->flows) { -                struct flow_set_entry * c; -                struct flow_set_entry * e; -                e = list_entry(p, struct flow_set_entry, next); -                c = malloc(sizeof(*c)); -                if (c == NULL) -                        continue; -                c->fd = e->fd; -                list_add_tail(&c->next, ©); -        } - -        pthread_rwlock_unlock(&set->lock); - -        list_for_each_safe(p, h, ©) { -                struct flow_set_entry * e; -                e = list_entry(p, struct flow_set_entry, next); -                flow_send_keepalive(e->fd); -                list_del(&e->next); -                free(e); -        } -} -  int fqueue_next(struct fqueue * fq)  {          int fd; @@ -1692,8 +1668,6 @@ ssize_t fevent(struct flow_set *       set,                 const struct timespec * timeo)  {          ssize_t           ret = 0; -        struct timespec   tic = {0, TICTIME}; -        struct timespec   tictime;          struct timespec   abs;          struct timespec * t = NULL; @@ -1705,27 +1679,15 @@ ssize_t fevent(struct flow_set *       set,          clock_gettime(PTHREAD_COND_CLOCK, &abs); -        ts_add(&tic, &abs, &tictime); -        t = &tictime; - -        if (timeo != NULL) +        if (timeo != NULL) {                  ts_add(&abs, timeo, &abs); +                t = &abs; +        }          while (ret == 0) {                  ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); -                if (ret == -ETIMEDOUT) { -                        if (timeo != NULL && ts_diff_ns(t, &abs) < 0) { -                                fq->fqsize = 0; -                                return -ETIMEDOUT; -                        } -                        ret = 0; -                        ts_add(t, &tic, t); -                        pthread_rwlock_rdlock(&ai.lock); -                        timerwheel_move(); -                        fset_keepalive(set); -                        pthread_rwlock_unlock(&ai.lock); -                        continue; -                } +                if (ret == -ETIMEDOUT) +                        return -ETIMEDOUT;                  fq->fqsize = ret << 1;                  fq->next   = 0; @@ -1924,14 +1886,8 @@ int ipcp_flow_read(int                   fd,                  pthread_rwlock_unlock(&ai.lock);                  idx = flow_rx_sdb(flow, sdb, false, NULL); -                if (idx < 0) { -                        if (idx == -EAGAIN) { -                                pthread_rwlock_rdlock(&ai.lock); -                                continue; -                        } - +                if (idx < 0)                          return idx; -                }                  pthread_rwlock_rdlock(&ai.lock);  | 
