diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/irmd/main.c | 227 | 
1 files changed, 196 insertions, 31 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index 39f44c44..966be500 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -75,31 +75,37 @@ enum irm_state {  };  struct irm { -        struct list_head     registry; +        struct list_head     registry;     /* registered names known     */ -        struct list_head     ipcps; +        struct list_head     ipcps;        /* list of ipcps in system    */ -        struct list_head     api_table; -        struct list_head     apn_table; -        struct list_head     spawned_apis; -        pthread_rwlock_t     reg_lock; +        struct list_head     api_table;    /* ap instances               */ +        struct list_head     apn_table;    /* ap names known             */ +        struct list_head     spawned_apis; /* child ap instances         */ +        pthread_rwlock_t     reg_lock;     /* lock for registration info */ -        /* keep track of all flows in this processing system */ -        struct bmp *         port_ids; -        /* maps port_ids to api pair */ -        struct list_head     irm_flows; -        pthread_rwlock_t     flows_lock; +        struct bmp *         port_ids;     /* port_ids for flows         */ +        struct list_head     irm_flows;    /* flow information           */ +        pthread_rwlock_t     flows_lock;   /* lock for flows             */ -        struct lockfile *    lf; -        struct shm_rdrbuff * rdrb; -        pthread_t *          threadpool; -        int                  sockfd; +        struct lockfile *    lf;           /* single irmd per system     */ +        struct shm_rdrbuff * rdrb;         /* rdrbuff for SDUs           */ +        int                  sockfd;       /* UNIX socket                */ -        enum irm_state       state; -        pthread_rwlock_t     state_lock; +        pthread_t *          threadpool;   /* pool of mainloop threads   */ -        pthread_t            irm_sanitize; -        pthread_t            shm_sanitize; +        struct bmp *         thread_ids;   /* ids for mainloop threads   */ +        size_t               max_threads;  /* max threads set by tpm     */ +        size_t               threads;      /* available mainloop threads */ +        pthread_cond_t       threads_cond; /* signal thread entry/exit   */ +        pthread_mutex_t      threads_lock; /* mutex for threads/condvar  */ + +        enum irm_state       state;        /* state of the irmd          */ +        pthread_rwlock_t     state_lock;   /* lock for the entire irmd   */ + +        pthread_t            tpm;          /* threadpool manager         */ +        pthread_t            irm_sanitize; /* clean up irmd resources    */ +        pthread_t            shm_sanitize; /* keep track of rdrbuff use  */  } * irmd;  static void clear_irm_flow(struct irm_flow * f) { @@ -1449,6 +1455,13 @@ static void irm_destroy(void)          if (irmd->state != IRMD_NULL)                  log_warn("Unsafe destroy."); +        pthread_mutex_lock(&irmd->threads_lock); + +        if (irmd->thread_ids != NULL) +                bmp_destroy(irmd->thread_ids); + +        pthread_mutex_unlock(&irmd->threads_lock); +          if (irmd->threadpool != NULL)                  free(irmd->threadpool); @@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o)          }  } +static void thread_inc(void) +{ +        pthread_mutex_lock(&irmd->threads_lock); + +        ++irmd->threads; +        pthread_cond_signal(&irmd->threads_cond); + +        pthread_mutex_unlock(&irmd->threads_lock); +} + +static void thread_dec(void) +{ +        pthread_mutex_lock(&irmd->threads_lock); + +        --irmd->threads; +        pthread_cond_signal(&irmd->threads_cond); + +        pthread_mutex_unlock(&irmd->threads_lock); +} + +static bool thread_check(void) +{ +        int ret; + +        pthread_mutex_lock(&irmd->threads_lock); + +        ret = irmd->threads > irmd->max_threads; + +        pthread_mutex_unlock(&irmd->threads_lock); + +        return ret; +} + +static void thread_exit(ssize_t id) +{ +        pthread_mutex_lock(&irmd->threads_lock); +        bmp_release(irmd->thread_ids, id); + +        --irmd->threads; +        pthread_cond_signal(&irmd->threads_cond); + +        pthread_mutex_unlock(&irmd->threads_lock); +} +  void * mainloop(void * o)  {          uint8_t buf[IRM_MSG_BUF_SIZE]; -        (void) o; +        ssize_t id = (ssize_t) o;          while (true) {  #ifdef __FreeBSD__ @@ -1747,10 +1804,13 @@ void * mainloop(void * o)                                       (SOCKET_TIMEOUT % 1000) * 1000};                  pthread_rwlock_rdlock(&irmd->state_lock); -                if (irmd->state != IRMD_RUNNING) { + +                if (irmd->state != IRMD_RUNNING || thread_check()) { +                        thread_exit(id);                          pthread_rwlock_unlock(&irmd->state_lock);                          break;                  } +                  pthread_rwlock_unlock(&irmd->state_lock);                  ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1760,6 +1820,7 @@ void * mainloop(void * o)                  if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0)                          continue;  #endif +                  cli_sockfd = accept(irmd->sockfd, 0, 0);                  if (cli_sockfd < 0)                          continue; @@ -1781,6 +1842,8 @@ void * mainloop(void * o)                          continue;                  } +                thread_dec(); +                  switch (msg->code) {                  case IRM_MSG_CODE__IRM_CREATE_IPCP:                          ret_msg.has_result = true; @@ -1909,6 +1972,7 @@ void * mainloop(void * o)                          if (apis != NULL)                                  free(apis);                          close(cli_sockfd); +                        thread_inc();                          continue;                  } @@ -1917,6 +1981,7 @@ void * mainloop(void * o)                          if (apis != NULL)                                  free(apis);                          close(cli_sockfd); +                        thread_inc();                          continue;                  } @@ -1930,6 +1995,82 @@ void * mainloop(void * o)                  free(buffer.data);                  close(cli_sockfd); + +                thread_inc(); +        } + +        return (void *) 0; +} + +static bool is_thread_alive(ssize_t id) +{ +        bool ret; +        pthread_mutex_lock(&irmd->threads_lock); + +        ret = bmp_is_id_used(irmd->thread_ids, id); + +        pthread_mutex_unlock(&irmd->threads_lock); + +        return ret; +} + +void * threadpoolmgr(void * o) +{ +        struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), +                              (IRMD_TPM_TIMEOUT % 1000) * MILLION}; +        struct timespec dl; +        size_t t; + +        (void) o; + +        while (true) { +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_rwlock_rdlock(&irmd->state_lock); +                if (irmd->state != IRMD_RUNNING) { +                        pthread_rwlock_unlock(&irmd->state_lock); +                        log_dbg("Threadpool manager exiting."); +                        for (t = 0; t < IRMD_MAX_THREADS; ++t) +                                if (is_thread_alive(t)) { +                                        log_dbg("Waiting for thread %zd.", t); +                                        pthread_join(irmd->threadpool[t], NULL); +                                } + +                        log_dbg("Threadpool manager done."); +                        break; +                } + +                pthread_rwlock_unlock(&irmd->state_lock); + +                pthread_mutex_lock(&irmd->threads_lock); + +                if (irmd->threads < IRMD_MIN_AV_THREADS) { +                        log_dbg("Increasing threadpool."); +                        irmd->max_threads = IRMD_MAX_AV_THREADS; + +                        while (irmd->threads < irmd->max_threads) { +                                ssize_t id = bmp_allocate(irmd->thread_ids); +                                if (!bmp_is_id_valid(irmd->thread_ids, id)) { +                                        log_warn("IRMd threadpool exhausted."); +                                        break; +                                } + +                                if (pthread_create(&irmd->threadpool[id], +                                                   NULL, mainloop, (void *) id)) +                                        log_warn("Failed to start new thread."); +                                else +                                        ++irmd->threads; +                        } +                } + +                if (pthread_cond_timedwait(&irmd->threads_cond, +                                           &irmd->threads_lock, +                                           &dl) == ETIMEDOUT) +                        if (irmd->threads > IRMD_MIN_AV_THREADS) +                                --irmd->max_threads; + +                pthread_mutex_unlock(&irmd->threads_lock);          }          return (void *) 0; @@ -1938,6 +2079,7 @@ void * mainloop(void * o)  static int irm_create(void)  {          struct stat st; +        pthread_condattr_t cattr;          struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),                                    (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -1967,6 +2109,27 @@ static int irm_create(void)                  return -1;          } +        if (pthread_mutex_init(&irmd->threads_lock, NULL)) { +                log_err("Failed to initialize mutex."); +                free(irmd); +                return -1; +        } + +        if (pthread_condattr_init(&cattr)) { +                log_err("Failed to initialize condattr."); +                free(irmd); +                return -1; +        } + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&irmd->threads_cond, &cattr)) { +                log_err("Failed to initialize cond."); +                free(irmd); +                return -1; +        } +          list_head_init(&irmd->ipcps);          list_head_init(&irmd->api_table);          list_head_init(&irmd->apn_table); @@ -1980,7 +2143,13 @@ static int irm_create(void)                  return -ENOMEM;          } -        irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); +        irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0); +        if (irmd->thread_ids == NULL) { +                irm_destroy(); +                return -ENOMEM; +        } + +        irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS);          if (irmd->threadpool == NULL) {                  irm_destroy();                  return -ENOMEM; @@ -2045,7 +2214,9 @@ static int irm_create(void)                  return -1;          } -        irmd->state = IRMD_RUNNING; +        irmd->threads     = 0; +        irmd->max_threads = IRMD_MIN_AV_THREADS; +        irmd->state       = IRMD_RUNNING;          log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2063,8 +2234,6 @@ int main(int     argc,  {          struct sigaction sig_act; -        int t = 0; -          bool use_stdout = false;          if (geteuid() != 0) { @@ -2108,16 +2277,12 @@ int main(int     argc,                  exit(EXIT_FAILURE);          } -        for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) -                pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL); +        pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL); +        pthread_join(irmd->tpm, NULL);          pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL);          pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb); -        /* Wait for (all of them) to return. */ -        for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) -                pthread_join(irmd->threadpool[t], NULL); -          pthread_join(irmd->irm_sanitize, NULL);          pthread_cancel(irmd->shm_sanitize); | 
