diff options
-rw-r--r-- | include/ouroboros/config.h.in | 2 | ||||
-rw-r--r-- | src/ipcpd/normal/sdu_sched.c | 68 |
2 files changed, 44 insertions, 26 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 736ba5b3..e8341ee2 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -56,7 +56,7 @@ /* IPCP dynamic threadpooling */ #define IPCP_MIN_THREADS 4 #define IPCP_ADD_THREADS 16 - +#define IPCP_SCHED_THREADS 8 #define IPCPD_MAX_CONNS IRMD_MAX_FLOWS #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC #define PFT_SIZE 1 << 12 diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 63259430..a4b9e074 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -36,11 +36,19 @@ struct sdu_sched { flow_set_t * set[QOS_CUBE_MAX]; - fqueue_t * fqs[QOS_CUBE_MAX]; next_sdu_t callback; - pthread_t sdu_reader; + pthread_t sdu_readers[IPCP_SCHED_THREADS]; }; +static void cleanup_reader(void * o) +{ + int i; + fqueue_t ** fqs = (fqueue_t **) o; + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fqueue_destroy(fqs[i]); +} + static void * sdu_reader(void * o) { struct sdu_sched * sched; @@ -49,14 +57,27 @@ static void * sdu_reader(void * o) int fd; int i = 0; int ret; + fqueue_t * fqs[QOS_CUBE_MAX]; sched = (struct sdu_sched *) o; + for (i = 0; i < QOS_CUBE_MAX; ++i) { + fqs[i] = fqueue_create(); + if (fqs[i] == NULL) { + int j; + for (j = 0; j < i; ++j) + fqueue_destroy(fqs[j]); + return (void *) -1; + } + } + + pthread_cleanup_push(cleanup_reader, fqs); + while (true) { /* FIXME: replace with scheduling policy call */ i = (i + 1) % QOS_CUBE_MAX; - ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); + ret = flow_event_wait(sched->set[i], fqs[i], &timeout); if (ret == -ETIMEDOUT) continue; @@ -65,7 +86,7 @@ static void * sdu_reader(void * o) continue; } - while ((fd = fqueue_next(sched->fqs[i])) >= 0) { + while ((fd = fqueue_next(fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { log_warn("Failed to read SDU from fd %d.", fd); continue; @@ -78,6 +99,8 @@ static void * sdu_reader(void * o) } } + pthread_cleanup_pop(true); + return (void *) 0; } @@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback) sdu_sched = malloc(sizeof(*sdu_sched)); if (sdu_sched == NULL) - return NULL; + goto fail_malloc; sdu_sched->callback = callback; @@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback) if (sdu_sched->set[i] == NULL) { for (j = 0; j < i; ++j) flow_set_destroy(sdu_sched->set[j]); - goto fail_sdu_sched; + goto fail_flow_set; } } - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sdu_sched->fqs[i] = fqueue_create(); - if (sdu_sched->fqs[i] == NULL) { - for (j = 0; j < i; ++j) - fqueue_destroy(sdu_sched->fqs[j]); + for (i = 0; i < IPCP_SCHED_THREADS; ++i) { + if (pthread_create(&sdu_sched->sdu_readers[i], NULL, + sdu_reader, sdu_sched)) { + int j; + for (j = 0; j < i; ++j) { + pthread_cancel(sdu_sched->sdu_readers[j]); + pthread_join(sdu_sched->sdu_readers[j], NULL); + } goto fail_flow_set; } } - pthread_create(&sdu_sched->sdu_reader, - NULL, - sdu_reader, - (void *) sdu_sched); - return sdu_sched; fail_flow_set: - for (i = 0; i < QOS_CUBE_MAX; ++i) - flow_set_destroy(sdu_sched->set[i]); - fail_sdu_sched: free(sdu_sched); + fail_malloc: return NULL; } @@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched) assert(sdu_sched); - pthread_cancel(sdu_sched->sdu_reader); - - pthread_join(sdu_sched->sdu_reader, NULL); + for (i = 0; i < IPCP_SCHED_THREADS; ++i) { + pthread_cancel(sdu_sched->sdu_readers[i]); + pthread_join(sdu_sched->sdu_readers[i], NULL); + } - for (i = 0; i < QOS_CUBE_MAX; ++i) { - fqueue_destroy(sdu_sched->fqs[i]); + for (i = 0; i < QOS_CUBE_MAX; ++i) flow_set_destroy(sdu_sched->set[i]); - } free(sdu_sched); } |