summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/config.h.in2
-rw-r--r--src/ipcpd/normal/sdu_sched.c68
2 files changed, 44 insertions, 26 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 736ba5b3..e8341ee2 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -56,7 +56,7 @@
/* IPCP dynamic threadpooling */
#define IPCP_MIN_THREADS 4
#define IPCP_ADD_THREADS 16
-
+#define IPCP_SCHED_THREADS 8
#define IPCPD_MAX_CONNS IRMD_MAX_FLOWS
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
#define PFT_SIZE 1 << 12
diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c
index 63259430..a4b9e074 100644
--- a/src/ipcpd/normal/sdu_sched.c
+++ b/src/ipcpd/normal/sdu_sched.c
@@ -36,11 +36,19 @@
struct sdu_sched {
flow_set_t * set[QOS_CUBE_MAX];
- fqueue_t * fqs[QOS_CUBE_MAX];
next_sdu_t callback;
- pthread_t sdu_reader;
+ pthread_t sdu_readers[IPCP_SCHED_THREADS];
};
+static void cleanup_reader(void * o)
+{
+ int i;
+ fqueue_t ** fqs = (fqueue_t **) o;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ fqueue_destroy(fqs[i]);
+}
+
static void * sdu_reader(void * o)
{
struct sdu_sched * sched;
@@ -49,14 +57,27 @@ static void * sdu_reader(void * o)
int fd;
int i = 0;
int ret;
+ fqueue_t * fqs[QOS_CUBE_MAX];
sched = (struct sdu_sched *) o;
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fqs[i] = fqueue_create();
+ if (fqs[i] == NULL) {
+ int j;
+ for (j = 0; j < i; ++j)
+ fqueue_destroy(fqs[j]);
+ return (void *) -1;
+ }
+ }
+
+ pthread_cleanup_push(cleanup_reader, fqs);
+
while (true) {
/* FIXME: replace with scheduling policy call */
i = (i + 1) % QOS_CUBE_MAX;
- ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout);
+ ret = flow_event_wait(sched->set[i], fqs[i], &timeout);
if (ret == -ETIMEDOUT)
continue;
@@ -65,7 +86,7 @@ static void * sdu_reader(void * o)
continue;
}
- while ((fd = fqueue_next(sched->fqs[i])) >= 0) {
+ while ((fd = fqueue_next(fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
log_warn("Failed to read SDU from fd %d.", fd);
continue;
@@ -78,6 +99,8 @@ static void * sdu_reader(void * o)
}
}
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
@@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)
sdu_sched = malloc(sizeof(*sdu_sched));
if (sdu_sched == NULL)
- return NULL;
+ goto fail_malloc;
sdu_sched->callback = callback;
@@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)
if (sdu_sched->set[i] == NULL) {
for (j = 0; j < i; ++j)
flow_set_destroy(sdu_sched->set[j]);
- goto fail_sdu_sched;
+ goto fail_flow_set;
}
}
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- sdu_sched->fqs[i] = fqueue_create();
- if (sdu_sched->fqs[i] == NULL) {
- for (j = 0; j < i; ++j)
- fqueue_destroy(sdu_sched->fqs[j]);
+ for (i = 0; i < IPCP_SCHED_THREADS; ++i) {
+ if (pthread_create(&sdu_sched->sdu_readers[i], NULL,
+ sdu_reader, sdu_sched)) {
+ int j;
+ for (j = 0; j < i; ++j) {
+ pthread_cancel(sdu_sched->sdu_readers[j]);
+ pthread_join(sdu_sched->sdu_readers[j], NULL);
+ }
goto fail_flow_set;
}
}
- pthread_create(&sdu_sched->sdu_reader,
- NULL,
- sdu_reader,
- (void *) sdu_sched);
-
return sdu_sched;
fail_flow_set:
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- flow_set_destroy(sdu_sched->set[i]);
- fail_sdu_sched:
free(sdu_sched);
+ fail_malloc:
return NULL;
}
@@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)
assert(sdu_sched);
- pthread_cancel(sdu_sched->sdu_reader);
-
- pthread_join(sdu_sched->sdu_reader, NULL);
+ for (i = 0; i < IPCP_SCHED_THREADS; ++i) {
+ pthread_cancel(sdu_sched->sdu_readers[i]);
+ pthread_join(sdu_sched->sdu_readers[i], NULL);
+ }
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fqueue_destroy(sdu_sched->fqs[i]);
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
flow_set_destroy(sdu_sched->set[i]);
- }
free(sdu_sched);
}