From 514791e5c6ded690aaf6dc43709dd02bc6a2ff6a Mon Sep 17 00:00:00 2001
From: Sander Vrijders <sander.vrijders@ugent.be>
Date: Tue, 22 Aug 2017 14:58:12 +0200
Subject: lib: Make timerwheel a passive component

This turns the timerwheel into a passive component since it is used by
application using the library. The user of the timerwheel now has to
call timerwheel_move to advance the timerwheel.
---
 src/lib/dev.c                   |  21 ++++--
 src/lib/tests/timerwheel_test.c |   6 +-
 src/lib/timerwheel.c            | 158 ++--------------------------------------
 3 files changed, 23 insertions(+), 162 deletions(-)

(limited to 'src/lib')

diff --git a/src/lib/dev.c b/src/lib/dev.c
index 1018f556..52a56097 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -83,11 +83,7 @@ struct frcti {
         uint64_t      rcv_lwe;
         uint64_t      rcv_rwe;
 
-        bool          resource_control;
-        bool          reliable;
-        bool          error_check;
-        bool          ordered;
-        bool          partial;
+        uint8_t       conf_flags;
 };
 
 struct port {
@@ -121,7 +117,6 @@ struct {
         struct shm_flow_set * fqset;
 
         struct timerwheel *   tw;
-        int                   tw_users;
 
         struct bmp *          fds;
         struct bmp *          fqueues;
@@ -317,6 +312,12 @@ static int frcti_write(int                  fd,
 
         frcti = &(ai.frcti[fd]);
 
+        pthread_rwlock_unlock(&ai.lock);
+
+        timerwheel_move(ai.tw);
+
+        pthread_rwlock_rdlock(&ai.lock);
+
         /*
          * Set the DRF in the first packet of a new run of SDUs,
          * otherwise simply recharge the timer.
@@ -337,7 +338,7 @@ static int frcti_write(int                  fd,
         pci.seqno = frcti->snd_lwe++;
         pci.type |= PDU_TYPE_DATA;
 
-        if (frct_pci_ser(sdb, &pci, frcti->error_check))
+        if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK))
                 return -1;
 
         if (finalize_write(fd, shm_du_buff_get_idx(sdb)))
@@ -368,6 +369,8 @@ static ssize_t frcti_read(int fd)
         struct frct_pci      pci;
         struct shm_du_buff * sdb;
 
+        timerwheel_move(ai.tw);
+
         pthread_rwlock_rdlock(&ai.lock);
 
         if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
@@ -399,7 +402,7 @@ static ssize_t frcti_read(int fd)
         sdb = shm_rdrbuff_get(ai.rdrb, idx);
 
         /* SDU may be corrupted. */
-        if (frct_pci_des(sdb, &pci, frcti->error_check)) {
+        if (frct_pci_des(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) {
                 pthread_rwlock_unlock(&ai.lock);
                 shm_rdrbuff_remove(ai.rdrb, idx);
                 return -1;
@@ -460,6 +463,8 @@ static int frcti_event_wait(struct flow_set *       set,
         assert(fq);
         assert(timeout);
 
+        timerwheel_move(ai.tw);
+
         /*
          * FIXME: Return the fq only if a data SDU
          * for the application is available.
diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c
index d9ca164e..d7478487 100644
--- a/src/lib/tests/timerwheel_test.c
+++ b/src/lib/tests/timerwheel_test.c
@@ -91,14 +91,12 @@ int timerwheel_test(int argc, char ** argv)
 
         nanosleep(&wait, NULL);
 
-        /* On some systems and VMs, the scheduler may be too slow. */
-        if (total != check_total)
-                nanosleep(&wait, NULL);
+        timerwheel_move(tw);
 
         timerwheel_destroy(tw);
 
         if (total != check_total) {
-                printf("Totals do not match.\n");
+                printf("Totals do not match: %d and %d.\n", total, check_total);
                 return -1;
         }
 
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 76f0ab32..2952c5d3 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -39,12 +39,6 @@
 #define tw_free(tw) (tw_used(tw) + 1 < tw->elements)
 #define tw_empty(tw) (tw->head == tw->tail)
 
-enum tw_state {
-        TW_NULL = 0,
-        TW_RUNNING,
-        TW_DESTROY
-};
-
 struct tw_f {
         struct list_head next;
         void (* func)(void *);
@@ -63,19 +57,10 @@ struct timerwheel {
 
         size_t           pos;
 
-        struct list_head wq;
-
-        pthread_cond_t   work;
         pthread_mutex_t  lock;
 
-        int              resolution;
+        time_t           resolution;
         unsigned int     elements;
-
-        enum tw_state    state;
-        pthread_mutex_t  s_lock;
-
-        pthread_t        ticker;
-        pthread_t        worker;
 };
 
 static void tw_el_fini(struct tw_el * e)
@@ -89,72 +74,8 @@ static void tw_el_fini(struct tw_el * e)
         }
 }
 
-static enum tw_state tw_get_state(struct timerwheel * tw)
-{
-        enum tw_state state;
-
-        assert(tw);
-
-        pthread_mutex_lock(&tw->s_lock);
-
-        state = tw->state;
-
-        pthread_mutex_unlock(&tw->s_lock);
-
-        return state;
-}
-
-static void tw_set_state(struct timerwheel * tw, enum tw_state state)
-{
-        assert(tw);
-        assert(state != TW_NULL);
-
-        pthread_mutex_lock(&tw->s_lock);
-
-        tw->state = state;
-
-        pthread_mutex_unlock(&tw->s_lock);
-}
-
-static void * worker(void * o)
-{
-        struct list_head * p;
-        struct list_head * h;
-
-        struct timerwheel * tw = (struct timerwheel *) o;
-        struct timespec dl;
-        struct timespec now;
-
-        clock_gettime(PTHREAD_COND_CLOCK, &now);
-
-        ts_add(&now, &tw->intv, &dl);
-
-        pthread_mutex_lock(&tw->lock);
-
-        while (tw_get_state(tw) == TW_RUNNING) {
-                if (pthread_cond_timedwait(&tw->work, &tw->lock, &dl)
-                    == ETIMEDOUT)
-                        ts_add(&dl, &tw->intv, &dl);
-
-                list_for_each_safe(p, h, &tw->wq) {
-                        struct tw_f * f = list_entry(p, struct tw_f, next);
-                        list_del(&f->next);
-                        pthread_mutex_unlock(&tw->lock);
-                        f->func(f->arg);
-                        free(f);
-
-                        pthread_mutex_lock(&tw->lock);
-                }
-        }
-
-        pthread_mutex_unlock(&tw->lock);
-
-        return (void *) o;
-}
-
-static void * movement(void * o)
+void timerwheel_move(struct timerwheel * tw)
 {
-        struct timerwheel * tw = (struct timerwheel *) o;
         struct timespec now = {0, 0};
         long ms = tw->resolution * tw->elements;
         struct timespec total = {ms / 1000,
@@ -162,21 +83,16 @@ static void * movement(void * o)
         struct list_head * p;
         struct list_head * h;
 
-        while (tw_get_state(tw) == TW_RUNNING) {
-                clock_gettime(CLOCK_MONOTONIC, &now);
-
-                pthread_mutex_lock(&tw->lock);
+        clock_gettime(CLOCK_MONOTONIC, &now);
 
-                if (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) < 0) {
-                        pthread_mutex_unlock(&tw->lock);
-                        nanosleep(&tw->intv, NULL);
-                        continue;
-                }
+        pthread_mutex_lock(&tw->lock);
 
+        while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) {
                 list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) {
                         struct tw_f * f = list_entry(p, struct tw_f, next);
                         list_del(&f->next);
-                        list_add(&f->next, &tw->wq);
+                        f->func(f->arg);
+                        free(f);
                 }
 
                 ts_add(&tw->wheel[tw->pos].expiry,
@@ -184,13 +100,9 @@ static void * movement(void * o)
                        &tw->wheel[tw->pos].expiry);
 
                 tw->pos = (tw->pos + 1) & (tw->elements - 1);
-
-                pthread_cond_signal(&tw->work);
-
-                pthread_mutex_unlock(&tw->lock);
         }
 
-        return (void *) 0;
+        pthread_mutex_unlock(&tw->lock);
 }
 
 struct timerwheel * timerwheel_create(time_t resolution,
@@ -203,8 +115,6 @@ struct timerwheel * timerwheel_create(time_t resolution,
 
         struct timerwheel * tw;
 
-        pthread_condattr_t cattr;
-
         assert(resolution != 0);
 
         tw = malloc(sizeof(*tw));
@@ -228,25 +138,10 @@ struct timerwheel * timerwheel_create(time_t resolution,
         tw->intv.tv_sec = (tw->resolution / FRAC) / 1000;
         tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION;
 
-        list_head_init(&tw->wq);
-
         if (pthread_mutex_init(&tw->lock, NULL))
                 goto fail_lock_init;
 
-        if (pthread_mutex_init(&tw->s_lock, NULL))
-                goto fail_s_lock_init;
-
-        if (pthread_condattr_init(&cattr))
-                goto fail_cond_init;
-
-#ifndef __APPLE__
-        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
-        if (pthread_cond_init(&tw->work, &cattr))
-                goto fail_cond_init;
-
         tw->pos = 0;
-        tw->state = TW_RUNNING;
 
         clock_gettime(CLOCK_MONOTONIC, &now);
         now.tv_nsec -= (now.tv_nsec % MILLION);
@@ -257,24 +152,8 @@ struct timerwheel * timerwheel_create(time_t resolution,
                 ts_add(&now, &res_ts, &now);
         }
 
-        if (pthread_create(&tw->worker, NULL, worker, (void *) tw))
-                goto fail_worker_create;
-
-        if (pthread_create(&tw->ticker, NULL, movement, (void *) tw)) {
-                tw_set_state(tw, TW_DESTROY);
-                goto fail_ticker_create;
-        }
-
         return tw;
 
- fail_ticker_create:
-         pthread_join(tw->worker, NULL);
- fail_worker_create:
-         pthread_cond_destroy(&tw->work);
- fail_cond_init:
-         pthread_mutex_destroy(&tw->s_lock);
- fail_s_lock_init:
-         pthread_mutex_destroy(&tw->lock);
  fail_lock_init:
          free(tw->wheel);
  fail_wheel_malloc:
@@ -286,31 +165,10 @@ void timerwheel_destroy(struct timerwheel * tw)
 {
         unsigned long i;
 
-        struct list_head * p;
-        struct list_head * h;
-
-        tw_set_state(tw, TW_DESTROY);
-
-        pthread_join(tw->ticker, NULL);
-        pthread_join(tw->worker, NULL);
-
         for (i = 0; i < tw->elements; ++i)
                 tw_el_fini(&tw->wheel[i]);
 
-        pthread_mutex_lock(&tw->lock);
-
-        list_for_each_safe(p, h, &tw->wq) {
-                struct tw_f * f = list_entry(p, struct tw_f, next);
-                list_del(&f->next);
-                free(f);
-        }
-
-        pthread_mutex_unlock(&tw->lock);
-
-        pthread_cond_destroy(&tw->work);
         pthread_mutex_destroy(&tw->lock);
-        pthread_mutex_destroy(&tw->s_lock);
-
         free(tw->wheel);
         free(tw);
 }
-- 
cgit v1.2.3