From ff3e622ce3f686c38848dcc3aedcbf380c631910 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 31 Mar 2017 20:35:11 +0200 Subject: ipcpd: Fix CDAP return checks at enrollment --- src/ipcpd/normal/enroll.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c index e2762993..3e6a0197 100644 --- a/src/ipcpd/normal/enroll.c +++ b/src/ipcpd/normal/enroll.c @@ -200,7 +200,7 @@ int enroll_boot(char * dst_name) clock_gettime(CLOCK_REALTIME, &t0); key = cdap_request_send(cdap, CDAP_READ, TIME_PATH, NULL, 0, 0); - if (key == NULL) { + if (key == NULL || key[0] == INVALID_CDAP_KEY) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); @@ -232,7 +232,7 @@ int enroll_boot(char * dst_name) free(data); key = cdap_request_send(cdap, CDAP_READ, boot_ro, NULL, 0, 0); - if (key == NULL) { + if (key == NULL || key[0] == INVALID_CDAP_KEY) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); @@ -263,7 +263,7 @@ int enroll_boot(char * dst_name) log_dbg("Packed information inserted into RIB."); key = cdap_request_send(cdap, CDAP_READ, members_ro, NULL, 0, 0); - if (key == NULL) { + if (key == NULL || key[0] == INVALID_CDAP_KEY) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); @@ -294,7 +294,7 @@ int enroll_boot(char * dst_name) log_dbg("Packed information inserted into RIB."); key = cdap_request_send(cdap, CDAP_READ, dif_ro, NULL, 0, 0); - if (key == NULL) { + if (key == NULL || key[0] == INVALID_CDAP_KEY) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); -- cgit v1.2.3 From 014daf9684e85566cfcf44ec107c8cb792df3b14 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 31 Mar 2017 20:36:45 +0200 Subject: lib: Some fixes in CDAP --- src/lib/cdap.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 824f2c5d..f0db2419 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -183,8 +183,8 @@ static struct cdap_rcvd * cdap_rcvd_get_by_key(struct cdap * instance, list_for_each_safe(p, h, &instance->rcvd) { rcvd = list_entry(p, struct cdap_rcvd, next); if (rcvd->key == key) { - pthread_mutex_unlock(&instance->rcvd_lock); list_del(&rcvd->next); + pthread_mutex_unlock(&instance->rcvd_lock); return rcvd; } } @@ -669,6 +669,7 @@ cdap_key_t * cdap_request_send(struct cdap * instance, pthread_rwlock_unlock(&instance->flows_lock); release_id(instance, *key); release_id(instance, iid); + *key = INVALID_CDAP_KEY; return keys; } @@ -678,13 +679,17 @@ cdap_key_t * cdap_request_send(struct cdap * instance, cdap_sent_del(instance, req); release_id(instance, *key); release_id(instance, iid); + *key = INVALID_CDAP_KEY; return keys; } if (ret < 0) { + pthread_rwlock_unlock(&instance->flows_lock); cdap_sent_del(instance, req); release_id(instance, *key); release_id(instance, iid); + *key = INVALID_CDAP_KEY; + return keys; } ++key; @@ -717,6 +722,7 @@ int cdap_reply_wait(struct cdap * instance, if (ret < 0) { cdap_sent_del(instance, r); release_id(instance, iid); + release_id(instance, key); return ret; } @@ -731,6 +737,7 @@ int cdap_reply_wait(struct cdap * instance, cdap_sent_del(instance, r); release_id(instance, iid); + release_id(instance, key); return ret; } @@ -766,6 +773,8 @@ cdap_key_t cdap_request_wait(struct cdap * instance, } } + assert(rcv->proc == false); + rcv->proc = true; list_del(&rcv->next); list_add_tail(&rcv->next, &instance->rcvd); -- cgit v1.2.3 From 8913ac0a36c068c012cd0be9591cfad63a1af44e Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 31 Mar 2017 20:38:15 +0200 Subject: lib: Fix missing assignment in flow_alloc --- src/lib/dev.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/lib/dev.c b/src/lib/dev.c index e19083c3..5acbada2 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -555,6 +555,7 @@ int flow_alloc(const char * dst_name, ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; + ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; pthread_rwlock_unlock(&ai.flows_lock); @@ -921,7 +922,7 @@ struct fqueue * fqueue_create() if (fq == NULL) return NULL; - memset(fq->fqueue, -1, SHM_BUFFER_SIZE); + memset(fq->fqueue, -1, (SHM_BUFFER_SIZE) * sizeof(*fq->fqueue)); fq->fqsize = 0; fq->next = 0; @@ -1021,11 +1022,8 @@ int fqueue_next(struct fqueue * fq) if (fq == NULL) return -EINVAL; - if (fq->next == fq->fqsize) { - fq->fqsize = 0; - fq->next = 0; + if (fq->fqsize == 0) return -EPERM; - } pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1035,6 +1033,11 @@ int fqueue_next(struct fqueue * fq) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + if (fq->next == fq->fqsize) { + fq->fqsize = 0; + fq->next = 0; + } + return fd; } @@ -1319,6 +1322,9 @@ int ipcp_flow_read(int fd, int port_id = -1; struct shm_rbuff * rb; + assert(fd >=0); + assert(sdb); + pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1427,6 +1433,8 @@ ssize_t local_flow_read(int fd) { ssize_t ret; + assert(fd >= 0); + pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); -- cgit v1.2.3 From 304bf4f90f58f28d6941d3e3b14bb04d48f52392 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 31 Mar 2017 22:35:51 +0200 Subject: lib: Fix use-after-free when destroying cdap_req --- src/lib/cdap_req.c | 6 +++++- src/lib/cdap_req.h | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c index df748058..4eab6fa6 100644 --- a/src/lib/cdap_req.c +++ b/src/lib/cdap_req.c @@ -76,6 +76,7 @@ void cdap_req_destroy(struct cdap_req * creq) creq->state = REQ_NULL; pthread_cond_broadcast(&creq->cond); break; + case REQ_INIT_PENDING: case REQ_PENDING: case REQ_RESPONSE: creq->state = REQ_DESTROY; @@ -151,7 +152,10 @@ void cdap_req_respond(struct cdap_req * creq, pthread_mutex_lock(&creq->lock); - while (creq->state == REQ_INIT) + if (creq->state == REQ_INIT) + creq->state = REQ_INIT_PENDING; + + while (creq->state == REQ_INIT_PENDING) pthread_cond_wait(&creq->cond, &creq->lock); if (creq->state != REQ_PENDING) { diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h index 648ebc75..b21467f3 100644 --- a/src/lib/cdap_req.h +++ b/src/lib/cdap_req.h @@ -36,6 +36,7 @@ typedef cdap_key_t invoke_id_t; enum creq_state { REQ_NULL = 0, REQ_INIT, + REQ_INIT_PENDING, REQ_PENDING, REQ_RESPONSE, REQ_DONE, -- cgit v1.2.3 From 67fcb9107ae73fd1a4ccb30e4922f0dee0bd29a5 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 31 Mar 2017 22:41:43 +0200 Subject: lib: Fix data race in rdrbuff The blocks should be accessed inside the lock (or later with CAS). --- src/lib/shm_rdrbuff.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index b8d73650..9dffdf74 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -380,6 +380,8 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb->flags = SDB_VALID; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK + sdb->blocks = blocks; + *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); #else *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); @@ -389,9 +391,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb->size = size; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; -#ifdef SHM_RDRB_MULTI_BLOCK - sdb->blocks = blocks; -#endif + memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); return sdb->idx; @@ -461,6 +461,8 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb->flags = SDB_VALID; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK + sdb->blocks = blocks; + *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); #else *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); @@ -470,12 +472,8 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb->size = size; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; -#ifdef SHM_RDRB_MULTI_BLOCK - sdb->blocks = blocks; -#endif memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); - return sdb->idx; } -- cgit v1.2.3 From 47b6ff3333fb3fcc3f5f76459c356c79e4bb111c Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 1 Apr 2017 13:44:41 +0200 Subject: lib: Add a check if a bitmap ID is in use --- include/ouroboros/bitmap.h | 5 +++- src/lib/bitmap.c | 61 +++++++++++++++++++++++++++++---------------- src/lib/tests/bitmap_test.c | 35 +++++++++++++++++++------- 3 files changed, 69 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/include/ouroboros/bitmap.h b/include/ouroboros/bitmap.h index cb62312a..d6bb250b 100644 --- a/include/ouroboros/bitmap.h +++ b/include/ouroboros/bitmap.h @@ -33,7 +33,7 @@ struct bmp; struct bmp * bmp_create(size_t bits, ssize_t offset); -int bmp_destroy(struct bmp * b); +void bmp_destroy(struct bmp * b); ssize_t bmp_allocate(struct bmp * instance); @@ -43,4 +43,7 @@ int bmp_release(struct bmp * instance, bool bmp_is_id_valid(struct bmp * b, ssize_t id); +bool bmp_is_id_used(struct bmp * b, + ssize_t id); + #endif /* OUROBOROS_BITMAP_H */ diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 93ffda77..bf9bb99d 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -38,7 +38,8 @@ #define BITS_TO_LONGS(nr) \ DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(size_t)) -static size_t find_next_zero_bit(const size_t * addr, size_t nbits) +static size_t find_next_zero_bit(const size_t * addr, + size_t nbits) { size_t tmp; size_t start = 0; @@ -65,13 +66,15 @@ static size_t find_next_zero_bit(const size_t * addr, size_t nbits) return (start * BITS_PER_LONG) + pos; } -static void bitmap_zero(size_t * dst, size_t nbits) +static void bitmap_zero(size_t * dst, + size_t nbits) { size_t len = BITS_TO_LONGS(nbits) * sizeof(size_t); memset(dst, 0, len); } -static void bitmap_clear(size_t * map, size_t start) +static void bitmap_clear(size_t * map, + size_t start) { size_t * p = map + BIT_WORD(start); size_t mask = ~(1UL << (start % (BITS_PER_LONG))); @@ -79,7 +82,8 @@ static void bitmap_clear(size_t * map, size_t start) *p &= mask; } -static void bitmap_set(size_t * map, size_t start) +static void bitmap_set(size_t * map, + size_t start) { size_t * p = map + BIT_WORD(start); size_t mask = 1UL << (start % (BITS_PER_LONG)); @@ -94,7 +98,8 @@ struct bmp { size_t * bitmap; }; -struct bmp * bmp_create(size_t bits, ssize_t offset) +struct bmp * bmp_create(size_t bits, + ssize_t offset) { struct bmp * tmp; @@ -118,20 +123,15 @@ struct bmp * bmp_create(size_t bits, ssize_t offset) return tmp; } -int bmp_destroy(struct bmp * b) +void bmp_destroy(struct bmp * b) { if (b == NULL) - return -1; + return; - if (b->bitmap == NULL) { - free(b); - return -1; - } + if (b->bitmap != NULL) + free(b->bitmap); - free(b->bitmap); free(b); - - return 0; } static ssize_t bad_id(struct bmp * b) @@ -158,7 +158,8 @@ ssize_t bmp_allocate(struct bmp * b) return id + b->offset; } -static bool is_id_valid(struct bmp * b, ssize_t id) +static bool is_id_valid(struct bmp * b, + ssize_t id) { assert(b); @@ -168,7 +169,17 @@ static bool is_id_valid(struct bmp * b, ssize_t id) return true; } -bool bmp_is_id_valid(struct bmp * b, ssize_t id) +static bool is_id_used(size_t * map, + size_t start) +{ + size_t * p = map + BIT_WORD(start); + size_t mask = 1UL << (start % (BITS_PER_LONG)); + + return (*p & mask) != 0; +} + +bool bmp_is_id_valid(struct bmp * b, + ssize_t id) { if (b == NULL) return false; @@ -176,19 +187,25 @@ bool bmp_is_id_valid(struct bmp * b, ssize_t id) return is_id_valid(b, id); } -int bmp_release(struct bmp * b, ssize_t id) +int bmp_release(struct bmp * b, + ssize_t id) { - size_t rid; - if (b == NULL) return -1; if (!is_id_valid(b, id)) return -1; - rid = id - b->offset; - - bitmap_clear(b->bitmap, rid); + bitmap_clear(b->bitmap, id - b->offset); return 0; } + +bool bmp_is_id_used(struct bmp * b, + ssize_t id) +{ + if (b == NULL) + return false; + + return is_id_used(b->bitmap, id - b->offset); +} diff --git a/src/lib/tests/bitmap_test.c b/src/lib/tests/bitmap_test.c index 7480600e..e438f217 100644 --- a/src/lib/tests/bitmap_test.c +++ b/src/lib/tests/bitmap_test.c @@ -23,6 +23,7 @@ #include "bitmap.c" #include #include +#include #define BITMAP_SIZE 200 @@ -41,40 +42,56 @@ int bitmap_test(int argc, char ** argv) srand(time(NULL)); bmp = bmp_create(bits, offset); - if (bmp == NULL) + if (bmp == NULL) { + printf("Failed to create bmp.\n"); return -1; + } - if (bmp_destroy(bmp)) - return -1; + bmp_destroy(bmp); bmp = bmp_create(bits, offset); - if (bmp == NULL) + if (bmp == NULL) { + printf("Failed to re-create bmp.\n"); return -1; + } for (i = offset; i < BITMAP_SIZE + 5 + offset; i++) { id = bmp_allocate(bmp); if (!bmp_is_id_valid(bmp, id)) continue; - if (id != i) + if (!bmp_is_id_used(bmp, id)) { + printf("ID not marked in use.\n"); + bmp_destroy(bmp); return -1; + } + + if (id != i) { + printf("Wrong ID returned.\n"); + bmp_destroy(bmp); + return -1; + } } for (i = 0; i < BITMAP_SIZE + 5; i++) { r = (ssize_t) (rand() % BITMAP_SIZE) + offset; - if (bmp_release(bmp, r)) + if (bmp_release(bmp, r)) { + printf("Failed to release ID.\n"); return -1; + } id = bmp_allocate(bmp); if (!bmp_is_id_valid(bmp, id)) continue; - if (id != r) + if (id != r) { + printf("Wrong prev ID returned.\n"); + bmp_destroy(bmp); return -1; + } } - if (bmp_destroy(bmp)) - return -1; + bmp_destroy(bmp); return 0; } -- cgit v1.2.3 From c72634b5d921bc06d8e06afb2a60a05a1acb7ee2 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 1 Apr 2017 13:45:51 +0200 Subject: irmd: Add dynamic threadpool This makes the IRMd add/remove worker threads dynamically. IRMD_TPM_TIMEOUT sets a timer in the threadpool manager for checking idle threads. Each time this timer expires, it will reduce the threadpool by one. IRMD_MIN_AV_THREADS is the minimum number of available worker threads. If the number of active threads goes under this threshold, the threadpool manager will create threads to get the number of threads to IRMD_MAX_AV_THREADS, unless IRMD_MAX_THREADS is reached. --- include/ouroboros/config.h.in | 11 +- src/irmd/main.c | 227 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 204 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 7616961c..067a2f85 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -35,7 +35,7 @@ #define IPCP_SHIM_ETH_LLC_EXEC "@IPCP_SHIM_ETH_LLC_TARGET@" #define IPCP_NORMAL_EXEC "@IPCP_NORMAL_TARGET@" #define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@" -#define AP_MAX_FLOWS 256 +#define AP_MAX_FLOWS 2048 #define AP_MAX_FQUEUES 64 #define SHM_RDRB_BLOCK_SIZE sysconf(_SC_PAGESIZE) #define SHM_RDRB_MULTI_BLOCK @@ -47,14 +47,19 @@ #define SHM_RBUFF_PREFIX "/ouroboros.rbuff." #define SHM_FLOW_SET_PREFIX "/ouroboros.sets." #define IRMD_MAX_FLOWS 4096 -#define IRMD_THREADPOOL_SIZE 16 +/* IRMD dynamic threadpooling */ +#define IRMD_MIN_AV_THREADS 16 +#define IRMD_MAX_AV_THREADS 64 +#define IRMD_MAX_THREADS 256 + #define IPCPD_THREADPOOL_SIZE 16 #define IPCPD_MAX_CONNS IRMD_MAX_FLOWS #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC #define PFT_SIZE 1 << 12 /* Timeout values */ +#define IRMD_TPM_TIMEOUT 1000 #define IRMD_ACCEPT_TIMEOUT 100 -#define IRMD_REQ_ARR_TIMEOUT 200 +#define IRMD_REQ_ARR_TIMEOUT 500 #define IRMD_FLOW_TIMEOUT 5000 #define IPCP_ACCEPT_TIMEOUT 100 #define SOCKET_TIMEOUT 4000 diff --git a/src/irmd/main.c b/src/irmd/main.c index 39f44c44..966be500 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -75,31 +75,37 @@ enum irm_state { }; struct irm { - struct list_head registry; + struct list_head registry; /* registered names known */ - struct list_head ipcps; + struct list_head ipcps; /* list of ipcps in system */ - struct list_head api_table; - struct list_head apn_table; - struct list_head spawned_apis; - pthread_rwlock_t reg_lock; + struct list_head api_table; /* ap instances */ + struct list_head apn_table; /* ap names known */ + struct list_head spawned_apis; /* child ap instances */ + pthread_rwlock_t reg_lock; /* lock for registration info */ - /* keep track of all flows in this processing system */ - struct bmp * port_ids; - /* maps port_ids to api pair */ - struct list_head irm_flows; - pthread_rwlock_t flows_lock; + struct bmp * port_ids; /* port_ids for flows */ + struct list_head irm_flows; /* flow information */ + pthread_rwlock_t flows_lock; /* lock for flows */ - struct lockfile * lf; - struct shm_rdrbuff * rdrb; - pthread_t * threadpool; - int sockfd; + struct lockfile * lf; /* single irmd per system */ + struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + int sockfd; /* UNIX socket */ - enum irm_state state; - pthread_rwlock_t state_lock; + pthread_t * threadpool; /* pool of mainloop threads */ - pthread_t irm_sanitize; - pthread_t shm_sanitize; + struct bmp * thread_ids; /* ids for mainloop threads */ + size_t max_threads; /* max threads set by tpm */ + size_t threads; /* available mainloop threads */ + pthread_cond_t threads_cond; /* signal thread entry/exit */ + pthread_mutex_t threads_lock; /* mutex for threads/condvar */ + + enum irm_state state; /* state of the irmd */ + pthread_rwlock_t state_lock; /* lock for the entire irmd */ + + pthread_t tpm; /* threadpool manager */ + pthread_t irm_sanitize; /* clean up irmd resources */ + pthread_t shm_sanitize; /* keep track of rdrbuff use */ } * irmd; static void clear_irm_flow(struct irm_flow * f) { @@ -1449,6 +1455,13 @@ static void irm_destroy(void) if (irmd->state != IRMD_NULL) log_warn("Unsafe destroy."); + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->thread_ids != NULL) + bmp_destroy(irmd->thread_ids); + + pthread_mutex_unlock(&irmd->threads_lock); + if (irmd->threadpool != NULL) free(irmd->threadpool); @@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o) } } +static void thread_inc(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + ++irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static void thread_dec(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static bool thread_check(void) +{ + int ret; + + pthread_mutex_lock(&irmd->threads_lock); + + ret = irmd->threads > irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +static void thread_exit(ssize_t id) +{ + pthread_mutex_lock(&irmd->threads_lock); + bmp_release(irmd->thread_ids, id); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + void * mainloop(void * o) { uint8_t buf[IRM_MSG_BUF_SIZE]; - (void) o; + ssize_t id = (ssize_t) o; while (true) { #ifdef __FreeBSD__ @@ -1747,10 +1804,13 @@ void * mainloop(void * o) (SOCKET_TIMEOUT % 1000) * 1000}; pthread_rwlock_rdlock(&irmd->state_lock); - if (irmd->state != IRMD_RUNNING) { + + if (irmd->state != IRMD_RUNNING || thread_check()) { + thread_exit(id); pthread_rwlock_unlock(&irmd->state_lock); break; } + pthread_rwlock_unlock(&irmd->state_lock); ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1760,6 +1820,7 @@ void * mainloop(void * o) if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0) continue; #endif + cli_sockfd = accept(irmd->sockfd, 0, 0); if (cli_sockfd < 0) continue; @@ -1781,6 +1842,8 @@ void * mainloop(void * o) continue; } + thread_dec(); + switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; @@ -1909,6 +1972,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1917,6 +1981,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1930,6 +1995,82 @@ void * mainloop(void * o) free(buffer.data); close(cli_sockfd); + + thread_inc(); + } + + return (void *) 0; +} + +static bool is_thread_alive(ssize_t id) +{ + bool ret; + pthread_mutex_lock(&irmd->threads_lock); + + ret = bmp_is_id_used(irmd->thread_ids, id); + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +void * threadpoolmgr(void * o) +{ + struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), + (IRMD_TPM_TIMEOUT % 1000) * MILLION}; + struct timespec dl; + size_t t; + + (void) o; + + while (true) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_rwlock_rdlock(&irmd->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); + log_dbg("Threadpool manager exiting."); + for (t = 0; t < IRMD_MAX_THREADS; ++t) + if (is_thread_alive(t)) { + log_dbg("Waiting for thread %zd.", t); + pthread_join(irmd->threadpool[t], NULL); + } + + log_dbg("Threadpool manager done."); + break; + } + + pthread_rwlock_unlock(&irmd->state_lock); + + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->threads < IRMD_MIN_AV_THREADS) { + log_dbg("Increasing threadpool."); + irmd->max_threads = IRMD_MAX_AV_THREADS; + + while (irmd->threads < irmd->max_threads) { + ssize_t id = bmp_allocate(irmd->thread_ids); + if (!bmp_is_id_valid(irmd->thread_ids, id)) { + log_warn("IRMd threadpool exhausted."); + break; + } + + if (pthread_create(&irmd->threadpool[id], + NULL, mainloop, (void *) id)) + log_warn("Failed to start new thread."); + else + ++irmd->threads; + } + } + + if (pthread_cond_timedwait(&irmd->threads_cond, + &irmd->threads_lock, + &dl) == ETIMEDOUT) + if (irmd->threads > IRMD_MIN_AV_THREADS) + --irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); } return (void *) 0; @@ -1938,6 +2079,7 @@ void * mainloop(void * o) static int irm_create(void) { struct stat st; + pthread_condattr_t cattr; struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -1967,6 +2109,27 @@ static int irm_create(void) return -1; } + if (pthread_mutex_init(&irmd->threads_lock, NULL)) { + log_err("Failed to initialize mutex."); + free(irmd); + return -1; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize condattr."); + free(irmd); + return -1; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd->threads_cond, &cattr)) { + log_err("Failed to initialize cond."); + free(irmd); + return -1; + } + list_head_init(&irmd->ipcps); list_head_init(&irmd->api_table); list_head_init(&irmd->apn_table); @@ -1980,7 +2143,13 @@ static int irm_create(void) return -ENOMEM; } - irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0); + if (irmd->thread_ids == NULL) { + irm_destroy(); + return -ENOMEM; + } + + irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS); if (irmd->threadpool == NULL) { irm_destroy(); return -ENOMEM; @@ -2045,7 +2214,9 @@ static int irm_create(void) return -1; } - irmd->state = IRMD_RUNNING; + irmd->threads = 0; + irmd->max_threads = IRMD_MIN_AV_THREADS; + irmd->state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2063,8 +2234,6 @@ int main(int argc, { struct sigaction sig_act; - int t = 0; - bool use_stdout = false; if (geteuid() != 0) { @@ -2108,16 +2277,12 @@ int main(int argc, exit(EXIT_FAILURE); } - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL); + pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL); + pthread_join(irmd->tpm, NULL); pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL); pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb); - /* Wait for (all of them) to return. */ - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_join(irmd->threadpool[t], NULL); - pthread_join(irmd->irm_sanitize, NULL); pthread_cancel(irmd->shm_sanitize); -- cgit v1.2.3