diff options
Diffstat (limited to 'src/ipcpd/local/main.c')
-rw-r--r-- | src/ipcpd/local/main.c | 217 |
1 files changed, 87 insertions, 130 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 9c62c3cc..160e07e0 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024 * * Local IPC process * @@ -48,8 +48,7 @@ #include <sys/wait.h> #include <assert.h> -#define THIS_TYPE IPCP_LOCAL -#define ALLOC_TIMEOUT 10 /* ms */ +#define THIS_TYPE IPCP_LOCAL struct ipcp ipcpi; @@ -72,34 +71,39 @@ static int local_data_init(void) local_data.flows = fset_create(); if (local_data.flows == NULL) - return -ENFILE; + goto fail_fset; local_data.fq = fqueue_create(); - if (local_data.fq == NULL) { - fset_destroy(local_data.flows); - return -ENOMEM; - } + if (local_data.fq == NULL) + goto fail_fqueue; local_data.shim_data = shim_data_create(); - if (local_data.shim_data == NULL) { - fqueue_destroy(local_data.fq); - fset_destroy(local_data.flows); - return -ENOMEM; - } + if (local_data.shim_data == NULL) + goto fail_shim_data; - pthread_rwlock_init(&local_data.lock, NULL); + if (pthread_rwlock_init(&local_data.lock, NULL) < 0) + goto fail_rwlock_init; return 0; + + fail_rwlock_init: + shim_data_destroy(local_data.shim_data); + fail_shim_data: + fqueue_destroy(local_data.fq); + fail_fqueue: + fset_destroy(local_data.flows); + fail_fset: + return -ENOMEM; } static void local_data_fini(void){ + pthread_rwlock_destroy(&local_data.lock); shim_data_destroy(local_data.shim_data); - fset_destroy(local_data.flows); fqueue_destroy(local_data.fq); - pthread_rwlock_destroy(&local_data.lock); + fset_destroy(local_data.flows); } -static void * ipcp_local_packet_loop(void * o) +static void * local_ipcp_packet_loop(void * o) { (void) o; @@ -135,54 +139,45 @@ static void * ipcp_local_packet_loop(void * o) return (void *) 0; } -static int ipcp_local_bootstrap(const struct ipcp_config * conf) +static int local_ipcp_bootstrap(const struct ipcp_config * conf) { assert(conf); assert(conf->type == THIS_TYPE); - ipcpi.dir_hash_algo = conf->layer_info.dir_hash_algo; - ipcpi.layer_name = strdup(conf->layer_info.layer_name); - if (ipcpi.layer_name == NULL) { - log_err("Failed to set layer name"); - return -ENOMEM; - } - - ipcp_set_state(IPCP_OPERATIONAL); + ipcpi.dir_hash_algo = (enum hash_algo) conf->layer_info.dir_hash_algo; + strcpy(ipcpi.layer_name,conf->layer_info.name); if (pthread_create(&local_data.packet_loop, NULL, - ipcp_local_packet_loop, NULL)) { + local_ipcp_packet_loop, NULL)) { + log_err("Failed to create pthread: %s", strerror(errno)); ipcp_set_state(IPCP_INIT); return -1; } - log_info("Bootstrapped local IPCP with pid %d.", getpid()); - return 0; } -static int ipcp_local_reg(const uint8_t * hash) +static int local_ipcp_reg(const uint8_t * hash) { if (shim_data_reg_add_entry(local_data.shim_data, hash)) { - log_dbg("Failed to add " HASH_FMT " to local registry.", - HASH_VAL(hash)); + log_err("Failed to add " HASH_FMT32 " to local registry.", + HASH_VAL32(hash)); return -1; } - log_info("Registered " HASH_FMT ".", HASH_VAL(hash)); - return 0; } -static int ipcp_local_unreg(const uint8_t * hash) +static int local_ipcp_unreg(const uint8_t * hash) { shim_data_reg_del_entry(local_data.shim_data, hash); - log_info("Unregistered " HASH_FMT ".", HASH_VAL(hash)); + log_info("Unregistered " HASH_FMT32 ".", HASH_VAL32(hash)); return 0; } -static int ipcp_local_query(const uint8_t * hash) +static int local_ipcp_query(const uint8_t * hash) { int ret; @@ -191,41 +186,19 @@ static int ipcp_local_query(const uint8_t * hash) return ret; } -static int ipcp_local_flow_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) +static int local_ipcp_flow_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { - struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; - struct timespec abstime; - int out_fd = -1; + int out_fd = -1; - log_dbg("Allocating flow to " HASH_FMT " on fd %d.", HASH_VAL(dst), fd); + log_dbg("Allocating flow to " HASH_FMT32 " on fd %d.", + HASH_VAL32(dst), fd); assert(dst); - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - assert(ipcpi.alloc_id == -1); - - out_fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len); + out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data); if (out_fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); log_dbg("Flow allocation failed: %d", out_fd); return -1; } @@ -237,11 +210,6 @@ static int ipcp_local_flow_alloc(int fd, pthread_rwlock_unlock(&local_data.lock); - ipcpi.alloc_id = out_fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - fset_add(local_data.flows, fd); log_info("Pending local allocation request on fd %d.", fd); @@ -249,39 +217,21 @@ static int ipcp_local_flow_alloc(int fd, return 0; } -static int ipcp_local_flow_alloc_resp(int fd, - int response, - const void * data, - size_t len) +static int local_ipcp_flow_alloc_resp(int fd, + int response, + const buffer_t * data) { - struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; - struct timespec abstime; - int out_fd = -1; + struct timespec wait = TIMESPEC_INIT_MS(1); + time_t mpl = IPCP_LOCAL_MPL; + int out_fd; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed waiting for IRMd response."); return -1; } - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - pthread_rwlock_wrlock(&local_data.lock); - - if (response) { + if (response < 0) { + pthread_rwlock_wrlock(&local_data.lock); if (local_data.in_out[fd] != -1) local_data.in_out[local_data.in_out[fd]] = fd; local_data.in_out[fd] = -1; @@ -289,25 +239,38 @@ static int ipcp_local_flow_alloc_resp(int fd, return 0; } + pthread_rwlock_rdlock(&local_data.lock); + out_fd = local_data.in_out[fd]; if (out_fd == -1) { pthread_rwlock_unlock(&local_data.lock); - return -1; + log_dbg("Potential race detected"); + nanosleep(&wait, NULL); + pthread_rwlock_rdlock(&local_data.lock); + out_fd = local_data.in_out[fd]; } pthread_rwlock_unlock(&local_data.lock); + if (out_fd == -1) { + log_err("Invalid out_fd."); + return -1; + } + fset_add(local_data.flows, fd); - if (ipcp_flow_alloc_reply(out_fd, response, data, len) < 0) + if (ipcp_flow_alloc_reply(out_fd, response, mpl, data) < 0) { + log_err("Failed to reply to allocation"); + fset_del(local_data.flows, fd); return -1; + } log_info("Flow allocation completed, fds (%d, %d).", out_fd, fd); return 0; } -static int ipcp_local_flow_dealloc(int fd) +static int local_ipcp_flow_dealloc(int fd) { assert(!(fd < 0)); @@ -321,7 +284,7 @@ static int ipcp_local_flow_dealloc(int fd) pthread_rwlock_unlock(&local_data.lock); - flow_dealloc(fd); + ipcp_flow_dealloc(fd); log_info("Flow with fd %d deallocated.", fd); @@ -329,60 +292,54 @@ static int ipcp_local_flow_dealloc(int fd) } static struct ipcp_ops local_ops = { - .ipcp_bootstrap = ipcp_local_bootstrap, + .ipcp_bootstrap = local_ipcp_bootstrap, .ipcp_enroll = NULL, .ipcp_connect = NULL, .ipcp_disconnect = NULL, - .ipcp_reg = ipcp_local_reg, - .ipcp_unreg = ipcp_local_unreg, - .ipcp_query = ipcp_local_query, - .ipcp_flow_alloc = ipcp_local_flow_alloc, + .ipcp_reg = local_ipcp_reg, + .ipcp_unreg = local_ipcp_unreg, + .ipcp_query = local_ipcp_query, + .ipcp_flow_alloc = local_ipcp_flow_alloc, .ipcp_flow_join = NULL, - .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp, - .ipcp_flow_dealloc = ipcp_local_flow_dealloc + .ipcp_flow_alloc_resp = local_ipcp_flow_alloc_resp, + .ipcp_flow_dealloc = local_ipcp_flow_dealloc }; int main(int argc, char * argv[]) { - if (ipcp_init(argc, argv, &local_ops, THIS_TYPE) < 0) - goto fail_init; - if (local_data_init() < 0) { log_err("Failed to init local data."); goto fail_data_init; } - if (ipcp_boot() < 0) { - log_err("Failed to boot IPCP."); - goto fail_boot; - } + if (ipcp_init(argc, argv, &local_ops, THIS_TYPE) < 0) + goto fail_init; - if (ipcp_create_r(0)) { - log_err("Failed to notify IRMd we are initialized."); - goto fail_create_r; + if (ipcp_start() < 0) { + log_err("Failed to start IPCP."); + goto fail_start; } - ipcp_shutdown(); + ipcp_sigwait(); if (ipcp_get_state() == IPCP_SHUTDOWN) { pthread_cancel(local_data.packet_loop); pthread_join(local_data.packet_loop, NULL); } - local_data_fini(); + ipcp_stop(); ipcp_fini(); - exit(EXIT_SUCCESS); - fail_create_r: - ipcp_set_state(IPCP_NULL); - ipcp_shutdown(); - fail_boot: local_data_fini(); - fail_data_init: + + exit(EXIT_SUCCESS); + + fail_start: ipcp_fini(); fail_init: - ipcp_create_r(-1); + local_data_fini(); + fail_data_init: exit(EXIT_FAILURE); } |