From 4384cd203a958373cf0ab959afb688f9eeba05fc Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 4 Jan 2017 15:25:15 +0100 Subject: ipcpd: Add boot and shutdown operations These operations separe the starting and joining of the main ipcp threads into ipcp_boot() and ipcp_shutdown() operations. This allows the proper cleanup of user data and user threads after the IPCP is requested to shut down. --- src/ipcpd/ipcp.c | 34 +++++++--- src/ipcpd/ipcp.h | 4 ++ src/ipcpd/local/main.c | 16 +++-- src/ipcpd/normal/main.c | 143 +++++++++++++++++++++++++----------------- src/ipcpd/shim-eth-llc/main.c | 18 ++++-- src/ipcpd/shim-udp/main.c | 15 ++++- 6 files changed, 152 insertions(+), 78 deletions(-) (limited to 'src') diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index c47f1181..e44fafe2 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -279,13 +279,12 @@ static void * ipcp_main_loop(void * o) int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops) { pthread_condattr_t cattr; - int t; struct timeval tv = {(IPCP_ACCEPT_TIMEOUT / 1000), (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; ipcpi.irmd_fd = -1; - ipcpi.state = IPCP_INIT; + ipcpi.state = IPCP_NULL; ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCPD_THREADPOOL_SIZE); if (ipcpi.threadpool == NULL) { @@ -329,20 +328,40 @@ int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops) #endif pthread_cond_init(&ipcpi.state_cond, &cattr); - for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) - pthread_create(&ipcpi.threadpool[t], NULL, - ipcp_main_loop, NULL); - return 0; } -void ipcp_fini() +int ipcp_boot() { int t; + for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) { + if (pthread_create(&ipcpi.threadpool[t], NULL, + ipcp_main_loop, NULL)) { + int i; + LOG_ERR("Failed to create main thread."); + ipcp_set_state(IPCP_NULL); + for (i = 0; i < t; ++i) + pthread_join(ipcpi.threadpool[i], NULL); + return -1; + } + } + + ipcpi.state = IPCP_INIT; + return 0; +} + +void ipcp_shutdown() +{ + int t; for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) pthread_join(ipcpi.threadpool[t], NULL); + LOG_DBG("IPCP %d shutting down. Bye.", getpid()); +} + +void ipcp_fini() +{ close(ipcpi.sockfd); if (unlink(ipcpi.sock_path)) LOG_DBG("Could not unlink %s.", ipcpi.sock_path); @@ -353,6 +372,7 @@ void ipcp_fini() ipcp_data_destroy(ipcpi.data); pthread_cond_destroy(&ipcpi.state_cond); + pthread_mutex_destroy(&ipcpi.state_mtx); pthread_rwlock_destroy(&ipcpi.state_lock); } diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 6910115e..ae5a56da 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -55,6 +55,10 @@ struct ipcp { int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops); +int ipcp_boot(void); + +void ipcp_shutdown(void); + void ipcp_fini(void); void ipcp_set_state(enum ipcp_state state); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index de8c72c2..01e58b91 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -128,9 +128,6 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) case SIGHUP: case SIGQUIT: if (info->si_pid == irmd_api) { - LOG_DBG("IPCP %d terminating by order of %d. Bye.", - getpid(), info->si_pid); - pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_INIT) @@ -367,9 +364,16 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); + if (ipcp_init(THIS_TYPE, &local_ops) < 0) { + LOG_ERR("Failed to init IPCP."); + close_logfile(); + exit(EXIT_FAILURE); + } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (ipcp_init(THIS_TYPE, &local_ops) < 0) { + if (ipcp_boot() < 0) { + LOG_ERR("Failed to boot IPCP."); close_logfile(); exit(EXIT_FAILURE); } @@ -382,13 +386,15 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - ipcp_fini(); + ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { pthread_cancel(local_data.sduloop); pthread_join(local_data.sduloop, NULL); } + ipcp_fini(); + local_data_fini(); ap_fini(); diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index f6a88f29..8d319894 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -45,6 +45,8 @@ /* global for trapping signal */ int irmd_api; +pthread_t acceptor; + void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { (void) c; @@ -54,9 +56,6 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) case SIGTERM: case SIGHUP: if (info->si_pid == irmd_api) { - LOG_DBG("IPCP %d terminating by order of %d. Bye.", - getpid(), info->si_pid); - pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_INIT) @@ -72,6 +71,51 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) } } +static void * flow_acceptor(void * o) +{ + int fd; + char * ae_name; + qosspec_t qs; + + (void) o; + + while (true) { + pthread_rwlock_rdlock(&ipcpi.state_lock); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_INFO("Shutting down flow acceptor."); + return 0; + } + + pthread_rwlock_unlock(&ipcpi.state_lock); + + fd = flow_accept(&ae_name, &qs); + if (fd < 0) { + LOG_WARN("Flow accept failed."); + continue; + } + + LOG_DBG("New flow allocation request for AE %s.", ae_name); + + if (strcmp(ae_name, MGMT_AE) == 0) { + ribmgr_add_nm1_flow(fd); + } else if (strcmp(ae_name, DT_AE) == 0) { + fmgr_nm1_add_flow(fd); + } else { + LOG_DBG("Flow allocation request for unknown AE %s.", + ae_name); + if (flow_alloc_resp(fd, -1)) + LOG_WARN("Failed to reply to flow allocation."); + flow_dealloc(fd); + } + + free(ae_name); + } + + return (void *) 0; +} + static int normal_ipcp_enroll(char * dst_name) { int ret; @@ -136,6 +180,18 @@ static int normal_ipcp_enroll(char * dst_name) ipcp_set_state(IPCP_OPERATIONAL); + if (pthread_create(&acceptor, NULL, flow_acceptor, NULL)) { + if (frct_fini()) + LOG_WARN("Failed to finalize frct."); + if (fmgr_fini()) + LOG_WARN("Failed to finalize flow manager."); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to create acceptor thread."); + return -1; + } + pthread_rwlock_unlock(&ipcpi.state_lock); /* FIXME: Remove once we obtain neighbors during enrollment */ @@ -144,6 +200,8 @@ static int normal_ipcp_enroll(char * dst_name) return -1; } + LOG_DBG("Enrolled with %s.", dst_name); + return 0; } @@ -204,6 +262,18 @@ static int normal_ipcp_bootstrap(struct dif_config * conf) ipcp_set_state(IPCP_OPERATIONAL); + if (pthread_create(&acceptor, NULL, flow_acceptor, NULL)) { + if (frct_fini()) + LOG_WARN("Failed to finalize frct."); + if (fmgr_fini()) + LOG_WARN("Failed to finalize flow manager."); + if (ribmgr_fini()) + LOG_WARN("Failed to finalize RIB manager."); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Failed to create acceptor thread."); + return -1; + } + ipcpi.data->dif_name = conf->dif_name; pthread_rwlock_unlock(&ipcpi.state_lock); @@ -224,56 +294,10 @@ static struct ipcp_ops normal_ops = { .ipcp_flow_dealloc = fmgr_np1_dealloc }; -static void * flow_acceptor(void * o) -{ - int fd; - char * ae_name; - qosspec_t qs; - - (void) o; - - while (true) { - pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_INFO("Shutting down flow acceptor."); - return 0; - } - - pthread_rwlock_unlock(&ipcpi.state_lock); - - fd = flow_accept(&ae_name, &qs); - if (fd < 0) { - LOG_WARN("Flow accept failed."); - continue; - } - - LOG_DBG("New flow allocation request for AE %s.", ae_name); - - if (strcmp(ae_name, MGMT_AE) == 0) { - ribmgr_add_nm1_flow(fd); - } else if (strcmp(ae_name, DT_AE) == 0) { - fmgr_nm1_add_flow(fd); - } else { - LOG_DBG("Flow allocation request for unknown AE %s.", - ae_name); - if (flow_alloc_resp(fd, -1)) - LOG_WARN("Failed to reply to flow allocation."); - flow_dealloc(fd); - } - - free(ae_name); - } - - return (void *) 0; -} - int main(int argc, char * argv[]) { struct sigaction sig_act; sigset_t sigset; - pthread_t acceptor; if (ap_init(argv[0])) { LOG_ERR("Failed to init AP"); @@ -306,14 +330,20 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (ipcp_init(THIS_TYPE, &normal_ops) < 0) { LOG_ERR("Failed to create instance."); close_logfile(); exit(EXIT_FAILURE); } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + if (ipcp_boot() < 0) { + LOG_ERR("Failed to boot IPCP."); + close_logfile(); + exit(EXIT_FAILURE); + } + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); if (ipcp_create_r(getpid())) { @@ -323,14 +353,7 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - ipcp_wait_state(IPCP_OPERATIONAL, NULL); - - if (pthread_create(&acceptor, NULL, flow_acceptor, NULL)) { - LOG_ERR("Failed to create acceptor thread."); - ipcp_set_state(IPCP_SHUTDOWN); - } - - ipcp_fini(); + ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { pthread_cancel(acceptor); @@ -344,6 +367,8 @@ int main(int argc, char * argv[]) LOG_WARN("Failed to finalize RIB manager."); } + ipcp_fini(); + close_logfile(); ap_fini(); diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index ab25ffb1..623f2071 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -676,9 +676,6 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) case SIGTERM: case SIGHUP: if (info->si_pid == irmd_api) { - LOG_DBG("IPCP %d terminating by order of %d. Bye.", - getpid(), info->si_pid); - pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() == IPCP_INIT) @@ -1115,11 +1112,13 @@ int main(int argc, char * argv[]) } if (ap_init(NULL) < 0) { + LOG_ERR("Failed to init application."); close_logfile(); exit(EXIT_FAILURE); } if (eth_llc_data_init() < 0) { + LOG_ERR("Failed to init shim-eth-llc data."); close_logfile(); exit(EXIT_FAILURE); } @@ -1139,9 +1138,16 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); + if (ipcp_init(THIS_TYPE, ð_llc_ops) < 0) { + LOG_ERR("Failed to init IPCP."); + close_logfile(); + exit(EXIT_FAILURE); + } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (ipcp_init(THIS_TYPE, ð_llc_ops) < 0) { + if (ipcp_boot() < 0) { + LOG_ERR("Failed to boot IPCP."); close_logfile(); exit(EXIT_FAILURE); } @@ -1154,7 +1160,7 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - ipcp_fini(); + ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { pthread_cancel(eth_llc_data.sdu_reader); @@ -1163,6 +1169,8 @@ int main(int argc, char * argv[]) pthread_join(eth_llc_data.sdu_reader, NULL); } + ipcp_fini(); + eth_llc_data_fini(); ap_fini(); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 9bc8f287..8c0c0aac 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -1180,11 +1180,13 @@ int main(int argc, char * argv[]) } if (ap_init(NULL) < 0) { + LOG_ERR("Failed to init application."); close_logfile(); exit(EXIT_FAILURE); } if (udp_data_init() < 0) { + LOG_ERR("Failed to init shim-udp data."); close_logfile(); exit(EXIT_FAILURE); } @@ -1204,9 +1206,16 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); + if (ipcp_init(THIS_TYPE, &udp_ops) < 0) { + LOG_ERR("Failed to init IPCP."); + close_logfile(); + exit(EXIT_FAILURE); + } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (ipcp_init(THIS_TYPE, &udp_ops) < 0) { + if (ipcp_boot() < 0) { + LOG_ERR("Failed to boot IPCP."); close_logfile(); exit(EXIT_FAILURE); } @@ -1219,7 +1228,7 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - ipcp_fini(); + ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { pthread_cancel(udp_data.handler); @@ -1230,6 +1239,8 @@ int main(int argc, char * argv[]) pthread_join(udp_data.sdu_reader, NULL); } + ipcp_fini(); + udp_data_fini(); ap_fini(); -- cgit v1.2.3