diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 21 | ||||
| -rw-r--r-- | src/lib/tests/timerwheel_test.c | 6 | ||||
| -rw-r--r-- | src/lib/timerwheel.c | 158 | 
3 files changed, 23 insertions, 162 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index 1018f556..52a56097 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -83,11 +83,7 @@ struct frcti {          uint64_t      rcv_lwe;          uint64_t      rcv_rwe; -        bool          resource_control; -        bool          reliable; -        bool          error_check; -        bool          ordered; -        bool          partial; +        uint8_t       conf_flags;  };  struct port { @@ -121,7 +117,6 @@ struct {          struct shm_flow_set * fqset;          struct timerwheel *   tw; -        int                   tw_users;          struct bmp *          fds;          struct bmp *          fqueues; @@ -317,6 +312,12 @@ static int frcti_write(int                  fd,          frcti = &(ai.frcti[fd]); +        pthread_rwlock_unlock(&ai.lock); + +        timerwheel_move(ai.tw); + +        pthread_rwlock_rdlock(&ai.lock); +          /*           * Set the DRF in the first packet of a new run of SDUs,           * otherwise simply recharge the timer. @@ -337,7 +338,7 @@ static int frcti_write(int                  fd,          pci.seqno = frcti->snd_lwe++;          pci.type |= PDU_TYPE_DATA; -        if (frct_pci_ser(sdb, &pci, frcti->error_check)) +        if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK))                  return -1;          if (finalize_write(fd, shm_du_buff_get_idx(sdb))) @@ -368,6 +369,8 @@ static ssize_t frcti_read(int fd)          struct frct_pci      pci;          struct shm_du_buff * sdb; +        timerwheel_move(ai.tw); +          pthread_rwlock_rdlock(&ai.lock);          if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { @@ -399,7 +402,7 @@ static ssize_t frcti_read(int fd)          sdb = shm_rdrbuff_get(ai.rdrb, idx);          /* SDU may be corrupted. */ -        if (frct_pci_des(sdb, &pci, frcti->error_check)) { +        if (frct_pci_des(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) {                  pthread_rwlock_unlock(&ai.lock);                  shm_rdrbuff_remove(ai.rdrb, idx);                  return -1; @@ -460,6 +463,8 @@ static int frcti_event_wait(struct flow_set *       set,          assert(fq);          assert(timeout); +        timerwheel_move(ai.tw); +          /*           * FIXME: Return the fq only if a data SDU           * for the application is available. diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c index d9ca164e..d7478487 100644 --- a/src/lib/tests/timerwheel_test.c +++ b/src/lib/tests/timerwheel_test.c @@ -91,14 +91,12 @@ int timerwheel_test(int argc, char ** argv)          nanosleep(&wait, NULL); -        /* On some systems and VMs, the scheduler may be too slow. */ -        if (total != check_total) -                nanosleep(&wait, NULL); +        timerwheel_move(tw);          timerwheel_destroy(tw);          if (total != check_total) { -                printf("Totals do not match.\n"); +                printf("Totals do not match: %d and %d.\n", total, check_total);                  return -1;          } diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 76f0ab32..2952c5d3 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -39,12 +39,6 @@  #define tw_free(tw) (tw_used(tw) + 1 < tw->elements)  #define tw_empty(tw) (tw->head == tw->tail) -enum tw_state { -        TW_NULL = 0, -        TW_RUNNING, -        TW_DESTROY -}; -  struct tw_f {          struct list_head next;          void (* func)(void *); @@ -63,19 +57,10 @@ struct timerwheel {          size_t           pos; -        struct list_head wq; - -        pthread_cond_t   work;          pthread_mutex_t  lock; -        int              resolution; +        time_t           resolution;          unsigned int     elements; - -        enum tw_state    state; -        pthread_mutex_t  s_lock; - -        pthread_t        ticker; -        pthread_t        worker;  };  static void tw_el_fini(struct tw_el * e) @@ -89,72 +74,8 @@ static void tw_el_fini(struct tw_el * e)          }  } -static enum tw_state tw_get_state(struct timerwheel * tw) -{ -        enum tw_state state; - -        assert(tw); - -        pthread_mutex_lock(&tw->s_lock); - -        state = tw->state; - -        pthread_mutex_unlock(&tw->s_lock); - -        return state; -} - -static void tw_set_state(struct timerwheel * tw, enum tw_state state) -{ -        assert(tw); -        assert(state != TW_NULL); - -        pthread_mutex_lock(&tw->s_lock); - -        tw->state = state; - -        pthread_mutex_unlock(&tw->s_lock); -} - -static void * worker(void * o) -{ -        struct list_head * p; -        struct list_head * h; - -        struct timerwheel * tw = (struct timerwheel *) o; -        struct timespec dl; -        struct timespec now; - -        clock_gettime(PTHREAD_COND_CLOCK, &now); - -        ts_add(&now, &tw->intv, &dl); - -        pthread_mutex_lock(&tw->lock); - -        while (tw_get_state(tw) == TW_RUNNING) { -                if (pthread_cond_timedwait(&tw->work, &tw->lock, &dl) -                    == ETIMEDOUT) -                        ts_add(&dl, &tw->intv, &dl); - -                list_for_each_safe(p, h, &tw->wq) { -                        struct tw_f * f = list_entry(p, struct tw_f, next); -                        list_del(&f->next); -                        pthread_mutex_unlock(&tw->lock); -                        f->func(f->arg); -                        free(f); - -                        pthread_mutex_lock(&tw->lock); -                } -        } - -        pthread_mutex_unlock(&tw->lock); - -        return (void *) o; -} - -static void * movement(void * o) +void timerwheel_move(struct timerwheel * tw)  { -        struct timerwheel * tw = (struct timerwheel *) o;          struct timespec now = {0, 0};          long ms = tw->resolution * tw->elements;          struct timespec total = {ms / 1000, @@ -162,21 +83,16 @@ static void * movement(void * o)          struct list_head * p;          struct list_head * h; -        while (tw_get_state(tw) == TW_RUNNING) { -                clock_gettime(CLOCK_MONOTONIC, &now); - -                pthread_mutex_lock(&tw->lock); +        clock_gettime(CLOCK_MONOTONIC, &now); -                if (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) < 0) { -                        pthread_mutex_unlock(&tw->lock); -                        nanosleep(&tw->intv, NULL); -                        continue; -                } +        pthread_mutex_lock(&tw->lock); +        while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) {                  list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) {                          struct tw_f * f = list_entry(p, struct tw_f, next);                          list_del(&f->next); -                        list_add(&f->next, &tw->wq); +                        f->func(f->arg); +                        free(f);                  }                  ts_add(&tw->wheel[tw->pos].expiry, @@ -184,13 +100,9 @@ static void * movement(void * o)                         &tw->wheel[tw->pos].expiry);                  tw->pos = (tw->pos + 1) & (tw->elements - 1); - -                pthread_cond_signal(&tw->work); - -                pthread_mutex_unlock(&tw->lock);          } -        return (void *) 0; +        pthread_mutex_unlock(&tw->lock);  }  struct timerwheel * timerwheel_create(time_t resolution, @@ -203,8 +115,6 @@ struct timerwheel * timerwheel_create(time_t resolution,          struct timerwheel * tw; -        pthread_condattr_t cattr; -          assert(resolution != 0);          tw = malloc(sizeof(*tw)); @@ -228,25 +138,10 @@ struct timerwheel * timerwheel_create(time_t resolution,          tw->intv.tv_sec = (tw->resolution / FRAC) / 1000;          tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION; -        list_head_init(&tw->wq); -          if (pthread_mutex_init(&tw->lock, NULL))                  goto fail_lock_init; -        if (pthread_mutex_init(&tw->s_lock, NULL)) -                goto fail_s_lock_init; - -        if (pthread_condattr_init(&cattr)) -                goto fail_cond_init; - -#ifndef __APPLE__ -        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif -        if (pthread_cond_init(&tw->work, &cattr)) -                goto fail_cond_init; -          tw->pos = 0; -        tw->state = TW_RUNNING;          clock_gettime(CLOCK_MONOTONIC, &now);          now.tv_nsec -= (now.tv_nsec % MILLION); @@ -257,24 +152,8 @@ struct timerwheel * timerwheel_create(time_t resolution,                  ts_add(&now, &res_ts, &now);          } -        if (pthread_create(&tw->worker, NULL, worker, (void *) tw)) -                goto fail_worker_create; - -        if (pthread_create(&tw->ticker, NULL, movement, (void *) tw)) { -                tw_set_state(tw, TW_DESTROY); -                goto fail_ticker_create; -        } -          return tw; - fail_ticker_create: -         pthread_join(tw->worker, NULL); - fail_worker_create: -         pthread_cond_destroy(&tw->work); - fail_cond_init: -         pthread_mutex_destroy(&tw->s_lock); - fail_s_lock_init: -         pthread_mutex_destroy(&tw->lock);   fail_lock_init:           free(tw->wheel);   fail_wheel_malloc: @@ -286,31 +165,10 @@ void timerwheel_destroy(struct timerwheel * tw)  {          unsigned long i; -        struct list_head * p; -        struct list_head * h; - -        tw_set_state(tw, TW_DESTROY); - -        pthread_join(tw->ticker, NULL); -        pthread_join(tw->worker, NULL); -          for (i = 0; i < tw->elements; ++i)                  tw_el_fini(&tw->wheel[i]); -        pthread_mutex_lock(&tw->lock); - -        list_for_each_safe(p, h, &tw->wq) { -                struct tw_f * f = list_entry(p, struct tw_f, next); -                list_del(&f->next); -                free(f); -        } - -        pthread_mutex_unlock(&tw->lock); - -        pthread_cond_destroy(&tw->work);          pthread_mutex_destroy(&tw->lock); -        pthread_mutex_destroy(&tw->s_lock); -          free(tw->wheel);          free(tw);  } | 
