diff options
Diffstat (limited to 'src/ipcpd/shim-udp')
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 456 |
1 files changed, 353 insertions, 103 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index f9a8c42b..917c343b 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -34,6 +34,7 @@ #include <ouroboros/bitmap.h> #include <ouroboros/flow.h> #include <ouroboros/dev.h> +#include <ouroboros/rw_lock.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -82,18 +83,25 @@ struct shim_ap_data { instance_name_t * api; struct shm_du_map * dum; struct bmp * fds; - struct shm_ap_rbuff * rb; + rw_lock_t data_lock; + struct flow flows[AP_MAX_FLOWS]; + rw_lock_t flows_lock; + + pthread_t mainloop; + pthread_t sduloop; + pthread_t handler; + pthread_t sdu_reader; + + rw_lock_t thread_lock; - pthread_t mainloop; - pthread_t sduloop; - pthread_t handler; - pthread_t sdu_reader; } * _ap_instance; static int shim_ap_init(char * ap_name) { + int i; + _ap_instance = malloc(sizeof(struct shim_ap_data)); if (_ap_instance == NULL) { return -1; @@ -131,11 +139,22 @@ static int shim_ap_init(char * ap_name) _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { instance_name_destroy(_ap_instance->api); + shm_du_map_close(_ap_instance->dum); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; } + for (i = 0; i < AP_MAX_FLOWS; i ++) { + _ap_instance->flows[i].rb = NULL; + _ap_instance->flows[i].port_id = -1; + _ap_instance->flows[i].state = FLOW_NULL; + } + + rw_lock_init(&_ap_instance->flows_lock); + rw_lock_init(&_ap_instance->thread_lock); + rw_lock_init(&_ap_instance->data_lock); + return 0; } @@ -145,6 +164,9 @@ void shim_ap_fini() if (_ap_instance == NULL) return; + + rw_lock_wrlock(&_ap_instance->data_lock); + if (_ap_instance->api != NULL) instance_name_destroy(_ap_instance->api); if (_ap_instance->fds != NULL) @@ -153,41 +175,76 @@ void shim_ap_fini() shm_du_map_close(_ap_instance->dum); if (_ap_instance->rb != NULL) shm_ap_rbuff_destroy(_ap_instance->rb); + + rw_lock_wrlock(&_ap_instance->flows_lock); + for (i = 0; i < AP_MAX_FLOWS; i ++) if (_ap_instance->flows[i].rb != NULL) shm_ap_rbuff_close(_ap_instance->flows[i].rb); + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + free(_ap_instance); } static int port_id_to_fd(int port_id) { int i; - for (i = 0; i < AP_MAX_FLOWS; ++i) + + rw_lock_rdlock(&_ap_instance->flows_lock); + + for (i = 0; i < AP_MAX_FLOWS; ++i) { if (_ap_instance->flows[i].port_id == port_id - && _ap_instance->flows[i].state != FLOW_NULL) + && _ap_instance->flows[i].state != FLOW_NULL) { + + rw_lock_unlock(&_ap_instance->flows_lock); + return i; + } + } + + rw_lock_unlock(&_ap_instance->flows_lock); + return -1; } static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) { /* the AP chooses the amount of headspace and tailspace */ - size_t index = shm_create_du_buff(_ap_instance->dum, - count, - 0, - buf, - count); - struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; - - if (index == -1) + size_t index; + struct rb_entry e; + + rw_lock_rdlock(&_ap_instance->data_lock); + + index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count); + + if (index == -1) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } + + e.index = index; + + rw_lock_rdlock(&_ap_instance->flows_lock); + + e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + shm_release_du_buff(_ap_instance->dum, index); + + rw_lock_unlock(&_ap_instance->data_lock); + return -EPIPE; } + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + return 0; } @@ -207,8 +264,7 @@ struct ipcp_udp_data { int s_fd; fd_set flow_fd_s; - - pthread_mutex_t lock; + rw_lock_t fd_lock; }; struct ipcp_udp_data * ipcp_udp_data_create() @@ -230,6 +286,8 @@ struct ipcp_udp_data * ipcp_udp_data_create() return NULL; } + rw_lock_init(&udp_data->fd_lock); + FD_ZERO(&udp_data->flow_fd_s); return udp_data; @@ -237,21 +295,49 @@ struct ipcp_udp_data * ipcp_udp_data_create() void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + bool clean_threads = false; + switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: if (info->si_pid == irmd_pid || info->si_pid == 0) { + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); + + rw_lock_wrlock(&_ipcp->state_lock); + + if (_ipcp->state == IPCP_ENROLLED) { + clean_threads = true; + } + + _ipcp->state = IPCP_SHUTDOWN; + + rw_lock_unlock(&_ipcp->state_lock); + + if (clean_threads) { + rw_lock_wrlock(&_ap_instance->thread_lock); + + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader); + pthread_cancel(_ap_instance->sduloop); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader, NULL); + + rw_lock_unlock(&_ap_instance->thread_lock); + } + pthread_cancel(_ap_instance->mainloop); - pthread_cancel(_ap_instance->handler); - pthread_cancel(_ap_instance->sdu_reader); - pthread_cancel(_ap_instance->sduloop); - /* FIXME: should be called after join */ - shim_ap_fini(); - exit(0); + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + } default: return; @@ -269,6 +355,7 @@ static void * ipcp_udp_listener() while (true) { int fd; + int port_id; memset(&buf, 0, SHIM_UDP_BUF_SIZE); n = sizeof c_saddr; n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, @@ -316,21 +403,27 @@ static void * ipcp_udp_listener() /* reply to IRM */ - _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), - buf, - UNKNOWN_AP, - UNKNOWN_AE); - if (_ap_instance->flows[fd].port_id < 0) { + port_id = ipcp_flow_req_arr(getpid(), + buf, + UNKNOWN_AP, + UNKNOWN_AE); + + if (port_id < 0) { LOG_ERR("Could not get port id from IRMd"); close(fd); continue; } - _ap_instance->flows[fd].rb = NULL; - _ap_instance->flows[fd].state = FLOW_PENDING; + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; + + rw_lock_unlock(&_ap_instance->flows_lock); LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", - _ap_instance->flows[fd].port_id, fd); + port_id, fd); } return 0; @@ -341,12 +434,26 @@ static void * ipcp_udp_sdu_reader() int n; int fd; char buf[SHIM_UDP_MAX_SDU_SIZE]; - struct timeval tv = {0, 10}; + struct timeval tv = {0, 1000}; struct sockaddr_in r_saddr; fd_set read_fds; while (true) { + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return (void *) 0; + } + + rw_lock_unlock(&_ipcp->state_lock); + + rw_lock_rdlock(&shim_data(_ipcp)->fd_lock); + read_fds = shim_data(_ipcp)->flow_fd_s; + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) continue; @@ -370,22 +477,97 @@ static void * ipcp_udp_sdu_reader() return (void *) 0; } +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +static void * ipcp_udp_sdu_loop(void * o) +{ + while (true) { + struct rb_entry * e; + int fd; + int len = 0; + char * buf; + + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return (void *) 0; + } + + rw_lock_unlock(&_ipcp->state_lock); + + rw_lock_rdlock(&_ap_instance->data_lock); + + e = shm_ap_rbuff_read(_ap_instance->rb); + + if (e == NULL) { + rw_lock_unlock(&_ap_instance->data_lock); + continue; + } + + len = shm_du_map_read_sdu((uint8_t **) &buf, + _ap_instance->dum, + e->index); + if (len == -1) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + fd = port_id_to_fd(e->port_id); + + if (fd == -1) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + if (len == 0) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + rw_lock_unlock(&_ap_instance->data_lock); + + if (send(fd, buf, len, 0) < 0) + LOG_ERR("Failed to send SDU."); + + rw_lock_rdlock(&_ap_instance->data_lock); + + shm_release_du_buff(_ap_instance->dum, e->index); + + rw_lock_unlock(&_ap_instance->data_lock); + + free(e); + } + + return (void *) 1; +} + static int ipcp_udp_bootstrap(struct dif_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - int enable = 1; + int enable = 1; + int fd = -1; if (conf->type != THIS_TYPE) { LOG_ERR("Config doesn't match IPCP type."); return -1; } + rw_lock_wrlock(&_ipcp->state_lock); + if (_ipcp->state != IPCP_INIT) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("IPCP in wrong state."); return -1; } + _ipcp->state = IPCP_BOOTSTRAPPING; + + rw_lock_unlock(&_ipcp->state_lock); + if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, @@ -409,37 +591,36 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) strcpy(dnsstr, "not set"); } - shim_data(_ipcp)->ip_addr = conf->ip_addr; - shim_data(_ipcp)->dns_addr = conf->dns_addr; - /* UDP listen server */ - - if ((shim_data(_ipcp)->s_fd = - socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - LOG_DBGF("Can't create socket."); + if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { + LOG_ERR("Can't create socket."); return -1; } - if (setsockopt(shim_data(_ipcp)->s_fd, + if (setsockopt(fd, SOL_SOCKET, - SO_REUSEADDR, - &enable, + SO_REUSEADDR, + &enable, sizeof(int)) < 0) { - LOG_DBGF("Setsockopt(SO_REUSEADDR) failed."); + LOG_WARN("Setsockopt(SO_REUSEADDR) failed."); } + shim_data(_ipcp)->s_fd = fd; + shim_data(_ipcp)->ip_addr = conf->ip_addr; + shim_data(_ipcp)->dns_addr = conf->dns_addr; + shim_data(_ipcp)->s_saddr.sin_family = AF_INET; shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr; shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT; - if (bind(shim_data(_ipcp)->s_fd, + if (bind(fd, (struct sockaddr *) &shim_data(_ipcp)->s_saddr, sizeof shim_data(_ipcp)->s_saddr ) < 0) { LOG_ERR("Couldn't bind to %s.", ipstr); return -1; } - FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_wrlock(&_ap_instance->thread_lock); pthread_create(&_ap_instance->handler, NULL, @@ -450,8 +631,25 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) ipcp_udp_sdu_reader, NULL); + pthread_create(&_ap_instance->sduloop, + NULL, + ipcp_udp_sdu_loop, + NULL); + + rw_lock_unlock(&_ap_instance->thread_lock); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + + rw_lock_wrlock(&_ipcp->state_lock); + _ipcp->state = IPCP_ENROLLED; + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", getpid()); @@ -596,16 +794,21 @@ static int ipcp_udp_name_reg(char * name) uint32_t ip_addr; #endif - if (_ipcp->state != IPCP_ENROLLED) { - LOG_DBGF("Won't register with non-enrolled IPCP."); - return -1; - } - if (strlen(name) > 24) { LOG_ERR("DNS names cannot be longer than 24 chars."); return -1; } + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Won't register with non-enrolled IPCP."); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(_ipcp->data, name)) { LOG_ERR("Failed to add %s to local registry.", name); return -1; @@ -660,6 +863,16 @@ static int ipcp_udp_name_unreg(char * name) #ifdef CONFIG_OUROBOROS_ENABLE_DNS /* unregister application with DNS server */ + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("IPCP is not enrolled"); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + dns_addr = shim_data(_ipcp)->dns_addr; if (dns_addr != 0) { if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) @@ -698,10 +911,21 @@ static int ipcp_udp_flow_alloc(int port_id, #ifdef CONFIG_OUROBOROS_ENABLE_DNS uint32_t dns_addr = 0; #endif + struct shm_ap_rbuff * rb; if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { @@ -727,6 +951,7 @@ static int ipcp_udp_flow_alloc(int port_id, #ifdef CONFIG_OUROBOROS_ENABLE_DNS dns_addr = shim_data(_ipcp)->dns_addr; + if (dns_addr != 0) { ip_addr = ddns_resolve(dst_name, dns_addr); if (ip_addr == 0) { @@ -790,35 +1015,45 @@ static int ipcp_udp_flow_alloc(int port_id, free(recv_buf); - _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); - if (_ap_instance->flows[fd].rb == NULL) { + rb = shm_ap_rbuff_open(n_pid); + if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); close(fd); - return -1; + return -1; /* -ENORBUFF */ } + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = rb; + + rw_lock_unlock(&_ap_instance->flows_lock); /* tell IRMd that flow allocation "worked" */ if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { + shm_ap_rbuff_close(rb); LOG_ERR("Failed to notify IRMd about flow allocation reply"); close(fd); - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); return -1; } + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd); return fd; } static int ipcp_udp_flow_alloc_resp(int port_id, - pid_t n_pid, - int response) + pid_t n_pid, + int response) { + struct shm_ap_rbuff * rb; int fd = port_id_to_fd(port_id); if (fd < 0) { LOG_DBGF("Could not find flow with port_id %d.", port_id); @@ -830,22 +1065,44 @@ static int ipcp_udp_flow_alloc_resp(int port_id, /* awaken pending flow */ + rw_lock_rdlock(&_ap_instance->flows_lock); + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + rw_lock_unlock(&_ap_instance->flows_lock); + LOG_DBGF("Flow was not pending."); return -1; } - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); - if (_ap_instance->flows[fd].rb == NULL) { + rw_lock_unlock(&_ap_instance->flows_lock); + + rb = shm_ap_rbuff_open(n_pid); + if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); + + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].state = FLOW_NULL; _ap_instance->flows[fd].port_id = 0; + + rw_lock_unlock(&_ap_instance->flows_lock); + return 0; } + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = rb; + + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); return 0; @@ -854,18 +1111,33 @@ static int ipcp_udp_flow_alloc_resp(int port_id, static int ipcp_udp_flow_dealloc(int port_id) { int fd = port_id_to_fd(port_id); + struct shm_ap_rbuff * rb; + if (fd < 0) { LOG_DBGF("Could not find flow with port_id %d.", port_id); return 0; } + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].state = FLOW_NULL; _ap_instance->flows[fd].port_id = 0; - if (_ap_instance->flows[fd].rb != NULL) - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + rb = _ap_instance->flows[fd].rb; + _ap_instance->flows[fd].rb = NULL; + + rw_lock_unlock(&_ap_instance->flows_lock); + + if (rb != NULL) + shm_ap_rbuff_close(rb); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + close(fd); + return 0; } @@ -878,7 +1150,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name) if (shim_ap_init(ap_name) < 0) return NULL; - i = malloc(sizeof *i); + i = ipcp_instance_create(); if (i == NULL) return NULL; @@ -915,45 +1187,17 @@ static struct ipcp * ipcp_udp_create(char * ap_name) #ifndef MAKE_CHECK -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ -/* FIXME: stop eating the CPU */ -static void * ipcp_udp_sdu_loop(void * o) -{ - while (true) { - struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); - int fd; - int len = 0; - char * buf; - if (e == NULL) - continue; - - len = shm_du_map_read_sdu((uint8_t **) &buf, - _ap_instance->dum, - e->index); - if (len == -1) - continue; - - fd = port_id_to_fd(e->port_id); - - if (fd == -1) - continue; - - if (len == 0) - continue; - - send(fd, buf, len, 0); - - shm_release_du_buff(_ap_instance->dum, e->index); - } - - return (void *) 1; -} - int main (int argc, char * argv[]) { /* argument 1: pid of irmd ? */ /* argument 2: ap name */ struct sigaction sig_act; + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + sigaddset(&sigset, SIGQUIT); + sigaddset(&sigset, SIGHUP); + sigaddset(&sigset, SIGPIPE); if (ipcp_arg_check(argc, argv)) { LOG_ERR("Wrong arguments."); @@ -981,13 +1225,19 @@ int main (int argc, char * argv[]) exit(1); } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); - pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); - pthread_join(_ap_instance->sduloop, NULL); + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + pthread_join(_ap_instance->mainloop, NULL); - pthread_join(_ap_instance->handler, NULL); - pthread_join(_ap_instance->sdu_reader, NULL); + + shim_ap_fini(); + + free(_ipcp->data); + free(_ipcp->ops); + free(_ipcp); exit(0); } |