diff options
Diffstat (limited to 'src/ipcpd/ipcp.c')
-rw-r--r-- | src/ipcpd/ipcp.c | 859 |
1 files changed, 486 insertions, 373 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index e3e4221a..966c4920 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC process main loop * @@ -35,18 +35,21 @@ #define OUROBOROS_PREFIX "ipcpd/ipcp" #define IPCP_INFO "info" +#define ALLOC_TIMEOUT 50 /* ms */ +#include <ouroboros/bitmap.h> +#include <ouroboros/dev.h> +#include <ouroboros/errno.h> #include <ouroboros/hash.h> +#include <ouroboros/ipcp-dev.h> #include <ouroboros/logs.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/utils.h> -#include <ouroboros/sockets.h> -#include <ouroboros/errno.h> -#include <ouroboros/dev.h> -#include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> -#include <ouroboros/rib.h> +#include <ouroboros/protobuf.h> #include <ouroboros/pthread.h> +#include <ouroboros/rib.h> +#include <ouroboros/sockets.h> +#include <ouroboros/time.h> +#include <ouroboros/utils.h> #include "ipcp.h" @@ -173,15 +176,15 @@ static int ipcp_rib_readdir(char *** buf) while (info[i] != NULL) { (*buf)[i] = strdup(info[i]); - if (*buf == NULL) + if ((*buf)[i] == NULL) goto fail_dup; i++; } return i; fail_dup: - while (--i > 0) - free((*buf)[i]); + while (i > 0) + free((*buf)[--i]); fail: free(*buf); @@ -191,9 +194,13 @@ static int ipcp_rib_readdir(char *** buf) static int ipcp_rib_getattr(const char * path, struct rib_attr * attr) { - (void) path; + char buf[LAYER_NAME_SIZE + 2]; + struct timespec now; - attr->size = LAYER_NAME_SIZE; + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + attr->size = ipcp_rib_read(path, buf, LAYER_NAME_SIZE + 2); + attr->mtime = now.tv_sec; return 0; } @@ -206,9 +213,7 @@ static struct rib_ops r_ops = { static void * acceptloop(void * o) { - int csockfd; - struct timeval tv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; + int csockfd; (void) o; @@ -220,10 +225,6 @@ static void * acceptloop(void * o) if (csockfd < 0) continue; - if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO, - (void *) &tv, sizeof(tv))) - log_warn("Failed to set timeout on socket."); - cmd = malloc(sizeof(*cmd)); if (cmd == NULL) { log_err("Out of memory"); @@ -260,28 +261,395 @@ static void * acceptloop(void * o) return (void *) 0; } +int ipcp_wait_flow_req_arr(const uint8_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data) +{ + struct timespec ts = TIMESPEC_INIT_MS(ALLOC_TIMEOUT); + struct timespec abstime; + int fd; + buffer_t hash; + + hash.data = (uint8_t *) dst; + hash.len = ipcp_dir_hash_len(); + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { + ts_add(&abstime, &ts, &abstime); + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &abstime); + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_err("Won't allocate over non-operational IPCP."); + return -EIPCPSTATE; + } + + assert(ipcpi.alloc_id == -1); + + fd = ipcp_flow_req_arr(&hash, qs, mpl, data); + if (fd < 0) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_err("Failed to get fd for flow."); + return fd; + } + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + return fd; + +} + +int ipcp_wait_flow_resp(const int fd) +{ + struct timespec ts = TIMESPEC_INIT_MS(ALLOC_TIMEOUT); + struct timespec abstime; + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { + ts_add(&abstime, &ts, &abstime); + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &abstime); + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + assert(ipcpi.alloc_id == fd); + + ipcpi.alloc_id = -1; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + return 0; +} + static void free_msg(void * o) { ipcp_msg__free_unpacked((ipcp_msg_t *) o, NULL); } + +static void do_bootstrap(ipcp_config_msg_t * conf_msg, + ipcp_msg_t * ret_msg) +{ + struct ipcp_config conf; + + log_info("Bootstrapping..."); + + if (ipcpi.ops->ipcp_bootstrap == NULL) { + log_err("Bootstrap unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_INIT) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + conf = ipcp_config_msg_to_s(conf_msg); + ret_msg->result = ipcpi.ops->ipcp_bootstrap(&conf); + if (ret_msg->result == 0) { + ret_msg->layer_info = layer_info_s_to_msg(&conf.layer_info); + ipcp_set_state(IPCP_OPERATIONAL); + } + finish: + log_info("Finished bootstrapping: %d.", ret_msg->result); +} + +static void do_enroll(const char * dst, + ipcp_msg_t * ret_msg) +{ + struct layer_info info; + + log_info("Enrolling with %s...", dst); + + if (ipcpi.ops->ipcp_enroll == NULL) { + log_err("Enroll unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_INIT) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_enroll(dst, &info); + if (ret_msg->result == 0) { + ret_msg->layer_info = layer_info_s_to_msg(&info); + ipcp_set_state(IPCP_OPERATIONAL); + } + finish: + log_info("Finished enrolling with %s: %d.", dst, ret_msg->result); +} + +static void do_connect(const char * dst, + const char * comp, + qosspec_t qs, + ipcp_msg_t * ret_msg) +{ + log_info("Connecting %s to %s...", comp, dst); + + if (ipcpi.ops->ipcp_connect == NULL) { + log_err("Connect unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_connect(dst, comp, qs); + finish: + log_info("Finished connecting: %d.", ret_msg->result); +} + +static void do_disconnect(const char * dst, + const char * comp, + ipcp_msg_t * ret_msg) +{ + log_info("Disconnecting %s from %s...", comp, dst); + + if (ipcpi.ops->ipcp_disconnect == NULL) { + log_err("Disconnect unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_disconnect(dst, comp); + + finish: + log_info("Finished disconnecting %s from %s: %d.", + comp, dst, ret_msg->result); +} + +static void do_reg(const uint8_t * hash, + ipcp_msg_t * ret_msg) +{ + + log_info("Registering " HASH_FMT32 "...", HASH_VAL32(hash)); + + if (ipcpi.ops->ipcp_reg == NULL) { + log_err("Registration unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_reg(hash); + finish: + log_info("Finished registering " HASH_FMT32 " : %d.", + HASH_VAL32(hash), ret_msg->result); +} + +static void do_unreg(const uint8_t * hash, + ipcp_msg_t * ret_msg) +{ + log_info("Unregistering " HASH_FMT32 "...", HASH_VAL32(hash)); + + if (ipcpi.ops->ipcp_unreg == NULL) { + log_err("Unregistration unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_unreg(hash); + finish: + log_info("Finished unregistering " HASH_FMT32 ": %d.", + HASH_VAL32(hash), ret_msg->result); +} + +static void do_query(const uint8_t * hash, + ipcp_msg_t * ret_msg) +{ + /* TODO: Log this operation when IRMd has internal caches. */ + + if (ipcpi.ops->ipcp_query == NULL) { + log_err("Directory query unsupported."); + ret_msg->result = -ENOTSUP; + return; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + return; + } + + ret_msg->result = ipcpi.ops->ipcp_query(hash); +} + +static void do_flow_alloc(pid_t pid, + int flow_id, + uint8_t * dst, + qosspec_t qs, + const buffer_t * data, + ipcp_msg_t * ret_msg) +{ + int fd; + + log_info("Allocating flow %d for %d to " HASH_FMT32 ".", + flow_id, pid, HASH_VAL32(dst)); + + if (ipcpi.ops->ipcp_flow_alloc == NULL) { + log_err("Flow allocation unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + fd = np1_flow_alloc(pid, flow_id); + if (fd < 0) { + log_err("Failed allocating n + 1 fd on flow_id %d: %d", + flow_id, fd); + ret_msg->result = -EFLOWDOWN; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_flow_alloc(fd, dst, qs, data); + finish: + log_info("Finished allocating flow %d to " HASH_FMT32 ": %d.", + flow_id, HASH_VAL32(dst), ret_msg->result); +} + + +static void do_flow_join(pid_t pid, + int flow_id, + const uint8_t * dst, + qosspec_t qs, + ipcp_msg_t * ret_msg) +{ + int fd; + + log_info("Joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); + + if (ipcpi.ops->ipcp_flow_join == NULL) { + log_err("Broadcast unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + fd = np1_flow_alloc(pid, flow_id); + if (fd < 0) { + log_err("Failed allocating n + 1 fd on flow_id %d.", flow_id); + ret_msg->result = -1; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_flow_join(fd, dst, qs); + finish: + log_info("Finished joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); +} + +static void do_flow_alloc_resp(int resp, + int flow_id, + const buffer_t * data, + ipcp_msg_t * ret_msg) +{ + int fd = -1; + + log_info("Responding %d to alloc on flow_id %d.", resp, flow_id); + + if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) { + log_err("Flow_alloc_resp unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + if (resp == 0) { + fd = np1_flow_resp(flow_id); + if (fd < 0) { + log_warn("Flow_id %d is not known.", flow_id); + ret_msg->result = -1; + goto finish; + } + } + + ret_msg->result = ipcpi.ops->ipcp_flow_alloc_resp(fd, resp, data); + finish: + log_info("Finished responding to allocation request: %d", + ret_msg->result); +} + +static void do_flow_dealloc(int flow_id, + int timeo_sec, + ipcp_msg_t * ret_msg) +{ + int fd; + + log_info("Deallocating flow %d.", flow_id); + + if (ipcpi.ops->ipcp_flow_dealloc == NULL) { + log_err("Flow deallocation unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + fd = np1_flow_dealloc(flow_id, timeo_sec); + if (fd < 0) { + log_warn("Could not deallocate flow_id %d.", flow_id); + ret_msg->result = -1; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_flow_dealloc(fd); + finish: + log_info("Finished deallocating flow %d: %d.", + flow_id, ret_msg->result); +} + static void * mainloop(void * o) { int sfd; buffer_t buffer; - struct ipcp_config conf; - struct layer_info info; - ipcp_config_msg_t * conf_msg; ipcp_msg_t * msg; (void) o; while (true) { - ipcp_msg_t ret_msg = IPCP_MSG__INIT; - layer_info_msg_t layer_info = LAYER_INFO_MSG__INIT; - int fd = -1; - struct cmd * cmd; - qosspec_t qs; + ipcp_msg_t ret_msg = IPCP_MSG__INIT; + qosspec_t qs; + struct cmd * cmd; + buffer_t data; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -312,329 +680,68 @@ static void * mainloop(void * o) pthread_cleanup_push(__cleanup_close_ptr, &sfd); pthread_cleanup_push(free_msg, msg); + ret_msg.has_result = true; + switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_bootstrap == NULL) { - log_err("Bootstrap unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - if (ipcp_get_state() != IPCP_INIT) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - conf_msg = msg->conf; - conf.type = conf_msg->ipcp_type; - strcpy(conf.layer_info.layer_name, - conf_msg->layer_info->layer_name); - - switch(conf_msg->ipcp_type) { - case IPCP_LOCAL: - break; - case IPCP_UNICAST: - conf.addr_size = conf_msg->addr_size; - conf.eid_size = conf_msg->eid_size; - conf.max_ttl = conf_msg->max_ttl; - conf.addr_auth_type = conf_msg->addr_auth_type; - conf.routing_type = conf_msg->routing_type; - conf.cong_avoid = conf_msg->cong_avoid; - break; - case IPCP_ETH_DIX: - conf.ethertype = conf_msg->ethertype; - /* FALLTHRU */ - case IPCP_ETH_LLC: - conf.dev = conf_msg->dev; - break; - case IPCP_UDP: - conf.ip_addr = conf_msg->ip_addr; - conf.dns_addr = conf_msg->dns_addr; - conf.port = conf_msg->port; - conf.layer_info.dir_hash_algo = HASH_MD5; - layer_info.dir_hash_algo = HASH_MD5; - break; - case IPCP_BROADCAST: - conf.layer_info.dir_hash_algo = HASH_SHA3_256; - layer_info.dir_hash_algo = HASH_SHA3_256; - break; - default: - log_err("Unknown IPCP type: %d.", - conf_msg->ipcp_type); - ret_msg.result = -EIPCP; - goto exit; /* break from outer switch/case */ - } - - /* UDP and broadcast use fixed hash algorithm. */ - if (conf_msg->ipcp_type != IPCP_UDP && - conf_msg->ipcp_type != IPCP_BROADCAST) { - switch(conf_msg->layer_info->dir_hash_algo) { - case DIR_HASH_SHA3_224: - conf.layer_info.dir_hash_algo = - HASH_SHA3_224; - break; - case DIR_HASH_SHA3_256: - conf.layer_info.dir_hash_algo = - HASH_SHA3_256; - break; - case DIR_HASH_SHA3_384: - conf.layer_info.dir_hash_algo = - HASH_SHA3_384; - break; - case DIR_HASH_SHA3_512: - conf.layer_info.dir_hash_algo = - HASH_SHA3_512; - break; - default: - assert(false); - } - - layer_info.dir_hash_algo = - conf.layer_info.dir_hash_algo; - } - - ret_msg.result = ipcpi.ops->ipcp_bootstrap(&conf); - if (ret_msg.result == 0) { - ret_msg.layer_info = &layer_info; - layer_info.layer_name = - conf.layer_info.layer_name; - } + do_bootstrap(msg->conf, &ret_msg); break; case IPCP_MSG_CODE__IPCP_ENROLL: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_enroll == NULL) { - log_err("Enroll unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - if (ipcp_get_state() != IPCP_INIT) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dst, - &info); - if (ret_msg.result == 0) { - ret_msg.layer_info = &layer_info; - layer_info.dir_hash_algo = info.dir_hash_algo; - layer_info.layer_name = info.layer_name; - } + do_enroll(msg->dst, &ret_msg); break; case IPCP_MSG_CODE__IPCP_CONNECT: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_connect == NULL) { - log_err("Connect unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - qs = msg_to_spec(msg->qosspec); - ret_msg.result = ipcpi.ops->ipcp_connect(msg->dst, - msg->comp, - qs); + qs = qos_spec_msg_to_s(msg->qosspec); + do_connect(msg->dst, msg->comp, qs, &ret_msg); break; case IPCP_MSG_CODE__IPCP_DISCONNECT: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_disconnect == NULL) { - log_err("Disconnect unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - ret_msg.result = ipcpi.ops->ipcp_disconnect(msg->dst, - msg->comp); + do_disconnect(msg->dst, msg->comp, &ret_msg); break; case IPCP_MSG_CODE__IPCP_REG: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_reg == NULL) { - log_err("Registration unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); - - ret_msg.result = - ipcpi.ops->ipcp_reg(msg->hash.data); + do_reg(msg->hash.data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_UNREG: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_unreg == NULL) { - log_err("Unregistration unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); - - ret_msg.result = - ipcpi.ops->ipcp_unreg(msg->hash.data); + do_unreg(msg->hash.data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_QUERY: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_query == NULL) { - log_err("Directory query unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - ret_msg.result = - ipcpi.ops->ipcp_query(msg->hash.data); + do_query(msg->hash.data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_flow_alloc == NULL) { - log_err("Flow allocation unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); assert(msg->pk.len > 0 ? msg->pk.data != NULL : msg->pk.data == NULL); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - qs = msg_to_spec(msg->qosspec); - fd = np1_flow_alloc(msg->pid, - msg->flow_id, - qs); - if (fd < 0) { - log_err("Failed allocating fd on flow_id %d.", - msg->flow_id); - ret_msg.result = -1; - break; - } - - ret_msg.result = - ipcpi.ops->ipcp_flow_alloc(fd, - msg->hash.data, - qs, - msg->pk.data, - msg->pk.len); + data.len = msg->pk.len; + data.data = msg->pk.data; + qs = qos_spec_msg_to_s(msg->qosspec); + do_flow_alloc(msg->pid, msg->flow_id, + msg->hash.data, qs, + &data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_JOIN: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_flow_join == NULL) { - log_err("Broadcast unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - qs = msg_to_spec(msg->qosspec); - fd = np1_flow_alloc(msg->pid, - msg->flow_id, - qs); - if (fd < 0) { - log_err("Failed allocating fd on flow_id %d.", - msg->flow_id); - ret_msg.result = -1; - break; - } - - ret_msg.result = - ipcpi.ops->ipcp_flow_join(fd, - msg->hash.data, - qs); + qs = qos_spec_msg_to_s(msg->qosspec); + do_flow_join(msg->pid, msg->flow_id, + msg->hash.data, qs, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: - ret_msg.has_result = true; - if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) { - log_err("Flow_alloc_resp unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - if (!msg->response) { - fd = np1_flow_resp(msg->flow_id); - if (fd < 0) { - log_warn("Port_id %d is not known.", - msg->flow_id); - ret_msg.result = -1; - break; - } - } - assert(msg->pk.len > 0 ? msg->pk.data != NULL - : msg->pk.data == NULL); - - ret_msg.result = - ipcpi.ops->ipcp_flow_alloc_resp(fd, - msg->response, - msg->pk.data, - msg->pk.len); + : msg->pk.data == NULL); + data.len = msg->pk.len; + data.data = msg->pk.data; + do_flow_alloc_resp(msg->response, msg->flow_id, + &data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: - ret_msg.has_result = true; - if (ipcpi.ops->ipcp_flow_dealloc == NULL) { - log_err("Flow deallocation unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - fd = np1_flow_dealloc(msg->flow_id); - if (fd < 0) { - log_warn("Could not deallocate flow_id %d.", - msg->flow_id); - ret_msg.result = -1; - break; - } - - ret_msg.result = - ipcpi.ops->ipcp_flow_dealloc(fd); + do_flow_dealloc(msg->flow_id, msg->timeo_sec, &ret_msg); break; default: - ret_msg.has_result = true; - ret_msg.result = -1; - log_err("Don't know that message code"); + ret_msg.result = -1; + log_err("Unknown message code: %d.", msg->code); break; } - exit: + pthread_cleanup_pop(true); pthread_cleanup_pop(false); @@ -656,12 +763,16 @@ static void * mainloop(void * o) ipcp_msg__pack(&ret_msg, buffer.data); + if (ret_msg.layer_info != NULL) + layer_info_msg__free_unpacked(ret_msg.layer_info, NULL); + pthread_cleanup_push(__cleanup_close_ptr, &sfd); + pthread_cleanup_push(free, buffer.data) if (write(sfd, buffer.data, buffer.len) == -1) log_warn("Failed to send reply message"); - free(buffer.data); + pthread_cleanup_pop(true); pthread_cleanup_pop(true); tpm_inc(ipcpi.tpm); @@ -771,14 +882,32 @@ int ipcp_init(int argc, goto fail_rib_init; } + if (rib_reg(IPCP_INFO, &r_ops)) { + log_err("Failed to register rib."); + goto fail_rib_reg; + } + + ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, + mainloop, NULL); + if (ipcpi.tpm == NULL) { + log_err("Failed to create threadpool manager."); + goto fail_tpm_create; + } + list_head_init(&ipcpi.cmds); ipcpi.alloc_id = -1; pthread_condattr_destroy(&cattr); + ipcp_set_state(IPCP_INIT); + return 0; + fail_tpm_create: + rib_unreg(IPCP_INFO); + fail_rib_reg: + rib_fini(); fail_rib_init: pthread_cond_destroy(&ipcpi.cmd_cond); fail_cmd_cond: @@ -801,50 +930,55 @@ int ipcp_init(int argc, return ret; } -int ipcp_boot() +int ipcp_start(void) { - sigset_t sigset; + sigset_t sigset; + struct ipcp_info info; + sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGQUIT); sigaddset(&sigset, SIGHUP); sigaddset(&sigset, SIGPIPE); - ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, - mainloop, NULL); - if (ipcpi.tpm == NULL) - goto fail_tpm_create; - pthread_sigmask(SIG_BLOCK, &sigset, NULL); + info.pid = getpid(); + info.type = ipcpi.type; + strcpy(info.name, ipcpi.name); + info.state = IPCP_OPERATIONAL; + if (tpm_start(ipcpi.tpm)) goto fail_tpm_start; - ipcp_set_state(IPCP_INIT); - - if (rib_reg(IPCP_INFO, &r_ops)) - goto fail_rib_reg; - if (pthread_create(&ipcpi.acceptor, NULL, acceptloop, NULL)) { log_err("Failed to create acceptor thread."); - ipcp_set_state(IPCP_NULL); goto fail_acceptor; } - return 0; + info.state = IPCP_OPERATIONAL; + if (ipcp_create_r(&info)) { + log_err("Failed to notify IRMd we are initialized."); + goto fail_create_r; + } + return 0; + + fail_create_r: + pthread_cancel(ipcpi.acceptor); + pthread_join(ipcpi.acceptor, NULL); fail_acceptor: - rib_unreg(IPCP_INFO); - fail_rib_reg: tpm_stop(ipcpi.tpm); fail_tpm_start: tpm_destroy(ipcpi.tpm); - fail_tpm_create: + ipcp_set_state(IPCP_NULL); + info.state = IPCP_NULL; + ipcp_create_r(&info); return -1; } -void ipcp_shutdown() +void ipcp_sigwait(void) { siginfo_t info; @@ -877,8 +1011,11 @@ void ipcp_shutdown() #endif switch(info.si_signo) { case SIGINT: + /* FALLTHRU */ case SIGTERM: + /* FALLTHRU */ case SIGHUP: + /* FALLTHRU */ case SIGQUIT: if (info.si_pid == ipcpi.irmd_pid) { if (ipcp_get_state() == IPCP_INIT) @@ -890,23 +1027,30 @@ void ipcp_shutdown() break; case SIGPIPE: log_dbg("Ignored SIGPIPE."); + continue; default: continue; } } +} - pthread_cancel(ipcpi.acceptor); +void ipcp_stop(void) +{ + log_info("IPCP %d shutting down.", getpid()); + pthread_cancel(ipcpi.acceptor); pthread_join(ipcpi.acceptor, NULL); - tpm_stop(ipcpi.tpm); - tpm_destroy(ipcpi.tpm); - log_info("IPCP %d shutting down.", getpid()); + tpm_stop(ipcpi.tpm); } -void ipcp_fini() +void ipcp_fini(void) { + tpm_destroy(ipcpi.tpm); + + rib_unreg(IPCP_INFO); + rib_fini(); close(ipcpi.sockfd); @@ -937,7 +1081,7 @@ void ipcp_set_state(enum ipcp_state state) pthread_mutex_unlock(&ipcpi.state_mtx); } -enum ipcp_state ipcp_get_state() +enum ipcp_state ipcp_get_state(void) { enum ipcp_state state; @@ -950,37 +1094,6 @@ enum ipcp_state ipcp_get_state() return state; } -int ipcp_wait_state(enum ipcp_state state, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret = 0; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - - pthread_mutex_lock(&ipcpi.state_mtx); - - pthread_cleanup_push(__cleanup_mutex_unlock, &ipcpi.state_mtx); - - while (ipcpi.state != state - && ipcpi.state != IPCP_SHUTDOWN - && ipcpi.state != IPCP_NULL - && ret != -ETIMEDOUT) { - if (timeout == NULL) - ret = -pthread_cond_wait(&ipcpi.state_cond, - &ipcpi.state_mtx); - else - ret = -pthread_cond_timedwait(&ipcpi.state_cond, - &ipcpi.state_mtx, - &abstime); - } - - pthread_cleanup_pop(true); - - return ret; -} - void ipcp_lock_to_core(void) { #if defined(__linux__) && !defined(DISABLE_CORE_LOCK) |