diff options
-rw-r--r-- | src/ipcpd/ipcp.c | 34 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 6 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 456 | ||||
-rw-r--r-- | src/lib/dev.c | 212 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 4 |
5 files changed, 566 insertions, 146 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 1e122b73..76d3620b 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -29,6 +29,22 @@ #define OUROBOROS_PREFIX "ipcpd/ipcp" #include <ouroboros/logs.h> +struct ipcp * ipcp_instance_create() +{ + struct ipcp * i = malloc(sizeof *i); + if (i == NULL) + return NULL; + + i->data = NULL; + i->ops = NULL; + i->irmd_fd = -1; + i->state = IPCP_INIT; + + rw_lock_init(&i->state_lock); + + return i; +} + int ipcp_arg_check(int argc, char * argv[]) { if (argc != 3) @@ -52,25 +68,33 @@ void * ipcp_main_loop(void * o) uint8_t buf[IPCP_MSG_BUF_SIZE]; struct ipcp * _ipcp = (struct ipcp *) o; - ipcp_msg_t * msg; - ssize_t count; - buffer_t buffer; - ipcp_msg_t ret_msg = IPCP_MSG__INIT; + ipcp_msg_t * msg; + ssize_t count; + buffer_t buffer; + ipcp_msg_t ret_msg = IPCP_MSG__INIT; dif_config_msg_t * conf_msg; struct dif_config conf; + char * sock_path; + if (_ipcp == NULL) { LOG_ERR("Invalid ipcp struct."); return (void *) 1; } - sockfd = server_socket_open(ipcp_sock_path(getpid())); + sock_path = ipcp_sock_path(getpid()); + if (sock_path == NULL) + return (void *) 1; + + sockfd = server_socket_open(sock_path); if (sockfd < 0) { LOG_ERR("Could not open server socket."); return (void *) 1; } + free(sock_path); + while (true) { ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 393af994..c9002d4d 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -23,6 +23,8 @@ #ifndef IPCPD_IPCP_H #define IPCPD_IPCP_H +#include <ouroboros/rw_lock.h> + #include "ipcp-ops.h" #include "ipcp-data.h" @@ -38,11 +40,13 @@ enum ipcp_state { struct ipcp { struct ipcp_data * data; struct ipcp_ops * ops; + int irmd_fd; enum ipcp_state state; - int irmd_fd; + rw_lock_t state_lock; }; +struct ipcp * ipcp_instance_create(); void * ipcp_main_loop(void * o); void * ipcp_sdu_loop(void * o); int ipcp_arg_check(int argc, char * argv[]); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index f9a8c42b..917c343b 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -34,6 +34,7 @@ #include <ouroboros/bitmap.h> #include <ouroboros/flow.h> #include <ouroboros/dev.h> +#include <ouroboros/rw_lock.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -82,18 +83,25 @@ struct shim_ap_data { instance_name_t * api; struct shm_du_map * dum; struct bmp * fds; - struct shm_ap_rbuff * rb; + rw_lock_t data_lock; + struct flow flows[AP_MAX_FLOWS]; + rw_lock_t flows_lock; + + pthread_t mainloop; + pthread_t sduloop; + pthread_t handler; + pthread_t sdu_reader; + + rw_lock_t thread_lock; - pthread_t mainloop; - pthread_t sduloop; - pthread_t handler; - pthread_t sdu_reader; } * _ap_instance; static int shim_ap_init(char * ap_name) { + int i; + _ap_instance = malloc(sizeof(struct shim_ap_data)); if (_ap_instance == NULL) { return -1; @@ -131,11 +139,22 @@ static int shim_ap_init(char * ap_name) _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { instance_name_destroy(_ap_instance->api); + shm_du_map_close(_ap_instance->dum); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; } + for (i = 0; i < AP_MAX_FLOWS; i ++) { + _ap_instance->flows[i].rb = NULL; + _ap_instance->flows[i].port_id = -1; + _ap_instance->flows[i].state = FLOW_NULL; + } + + rw_lock_init(&_ap_instance->flows_lock); + rw_lock_init(&_ap_instance->thread_lock); + rw_lock_init(&_ap_instance->data_lock); + return 0; } @@ -145,6 +164,9 @@ void shim_ap_fini() if (_ap_instance == NULL) return; + + rw_lock_wrlock(&_ap_instance->data_lock); + if (_ap_instance->api != NULL) instance_name_destroy(_ap_instance->api); if (_ap_instance->fds != NULL) @@ -153,41 +175,76 @@ void shim_ap_fini() shm_du_map_close(_ap_instance->dum); if (_ap_instance->rb != NULL) shm_ap_rbuff_destroy(_ap_instance->rb); + + rw_lock_wrlock(&_ap_instance->flows_lock); + for (i = 0; i < AP_MAX_FLOWS; i ++) if (_ap_instance->flows[i].rb != NULL) shm_ap_rbuff_close(_ap_instance->flows[i].rb); + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + free(_ap_instance); } static int port_id_to_fd(int port_id) { int i; - for (i = 0; i < AP_MAX_FLOWS; ++i) + + rw_lock_rdlock(&_ap_instance->flows_lock); + + for (i = 0; i < AP_MAX_FLOWS; ++i) { if (_ap_instance->flows[i].port_id == port_id - && _ap_instance->flows[i].state != FLOW_NULL) + && _ap_instance->flows[i].state != FLOW_NULL) { + + rw_lock_unlock(&_ap_instance->flows_lock); + return i; + } + } + + rw_lock_unlock(&_ap_instance->flows_lock); + return -1; } static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) { /* the AP chooses the amount of headspace and tailspace */ - size_t index = shm_create_du_buff(_ap_instance->dum, - count, - 0, - buf, - count); - struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; - - if (index == -1) + size_t index; + struct rb_entry e; + + rw_lock_rdlock(&_ap_instance->data_lock); + + index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count); + + if (index == -1) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } + + e.index = index; + + rw_lock_rdlock(&_ap_instance->flows_lock); + + e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + shm_release_du_buff(_ap_instance->dum, index); + + rw_lock_unlock(&_ap_instance->data_lock); + return -EPIPE; } + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + return 0; } @@ -207,8 +264,7 @@ struct ipcp_udp_data { int s_fd; fd_set flow_fd_s; - - pthread_mutex_t lock; + rw_lock_t fd_lock; }; struct ipcp_udp_data * ipcp_udp_data_create() @@ -230,6 +286,8 @@ struct ipcp_udp_data * ipcp_udp_data_create() return NULL; } + rw_lock_init(&udp_data->fd_lock); + FD_ZERO(&udp_data->flow_fd_s); return udp_data; @@ -237,21 +295,49 @@ struct ipcp_udp_data * ipcp_udp_data_create() void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + bool clean_threads = false; + switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: if (info->si_pid == irmd_pid || info->si_pid == 0) { + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); + + rw_lock_wrlock(&_ipcp->state_lock); + + if (_ipcp->state == IPCP_ENROLLED) { + clean_threads = true; + } + + _ipcp->state = IPCP_SHUTDOWN; + + rw_lock_unlock(&_ipcp->state_lock); + + if (clean_threads) { + rw_lock_wrlock(&_ap_instance->thread_lock); + + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader); + pthread_cancel(_ap_instance->sduloop); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader, NULL); + + rw_lock_unlock(&_ap_instance->thread_lock); + } + pthread_cancel(_ap_instance->mainloop); - pthread_cancel(_ap_instance->handler); - pthread_cancel(_ap_instance->sdu_reader); - pthread_cancel(_ap_instance->sduloop); - /* FIXME: should be called after join */ - shim_ap_fini(); - exit(0); + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + } default: return; @@ -269,6 +355,7 @@ static void * ipcp_udp_listener() while (true) { int fd; + int port_id; memset(&buf, 0, SHIM_UDP_BUF_SIZE); n = sizeof c_saddr; n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, @@ -316,21 +403,27 @@ static void * ipcp_udp_listener() /* reply to IRM */ - _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), - buf, - UNKNOWN_AP, - UNKNOWN_AE); - if (_ap_instance->flows[fd].port_id < 0) { + port_id = ipcp_flow_req_arr(getpid(), + buf, + UNKNOWN_AP, + UNKNOWN_AE); + + if (port_id < 0) { LOG_ERR("Could not get port id from IRMd"); close(fd); continue; } - _ap_instance->flows[fd].rb = NULL; - _ap_instance->flows[fd].state = FLOW_PENDING; + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; + + rw_lock_unlock(&_ap_instance->flows_lock); LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", - _ap_instance->flows[fd].port_id, fd); + port_id, fd); } return 0; @@ -341,12 +434,26 @@ static void * ipcp_udp_sdu_reader() int n; int fd; char buf[SHIM_UDP_MAX_SDU_SIZE]; - struct timeval tv = {0, 10}; + struct timeval tv = {0, 1000}; struct sockaddr_in r_saddr; fd_set read_fds; while (true) { + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return (void *) 0; + } + + rw_lock_unlock(&_ipcp->state_lock); + + rw_lock_rdlock(&shim_data(_ipcp)->fd_lock); + read_fds = shim_data(_ipcp)->flow_fd_s; + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) continue; @@ -370,22 +477,97 @@ static void * ipcp_udp_sdu_reader() return (void *) 0; } +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +static void * ipcp_udp_sdu_loop(void * o) +{ + while (true) { + struct rb_entry * e; + int fd; + int len = 0; + char * buf; + + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return (void *) 0; + } + + rw_lock_unlock(&_ipcp->state_lock); + + rw_lock_rdlock(&_ap_instance->data_lock); + + e = shm_ap_rbuff_read(_ap_instance->rb); + + if (e == NULL) { + rw_lock_unlock(&_ap_instance->data_lock); + continue; + } + + len = shm_du_map_read_sdu((uint8_t **) &buf, + _ap_instance->dum, + e->index); + if (len == -1) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + fd = port_id_to_fd(e->port_id); + + if (fd == -1) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + if (len == 0) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + rw_lock_unlock(&_ap_instance->data_lock); + + if (send(fd, buf, len, 0) < 0) + LOG_ERR("Failed to send SDU."); + + rw_lock_rdlock(&_ap_instance->data_lock); + + shm_release_du_buff(_ap_instance->dum, e->index); + + rw_lock_unlock(&_ap_instance->data_lock); + + free(e); + } + + return (void *) 1; +} + static int ipcp_udp_bootstrap(struct dif_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - int enable = 1; + int enable = 1; + int fd = -1; if (conf->type != THIS_TYPE) { LOG_ERR("Config doesn't match IPCP type."); return -1; } + rw_lock_wrlock(&_ipcp->state_lock); + if (_ipcp->state != IPCP_INIT) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("IPCP in wrong state."); return -1; } + _ipcp->state = IPCP_BOOTSTRAPPING; + + rw_lock_unlock(&_ipcp->state_lock); + if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, @@ -409,37 +591,36 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) strcpy(dnsstr, "not set"); } - shim_data(_ipcp)->ip_addr = conf->ip_addr; - shim_data(_ipcp)->dns_addr = conf->dns_addr; - /* UDP listen server */ - - if ((shim_data(_ipcp)->s_fd = - socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - LOG_DBGF("Can't create socket."); + if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { + LOG_ERR("Can't create socket."); return -1; } - if (setsockopt(shim_data(_ipcp)->s_fd, + if (setsockopt(fd, SOL_SOCKET, - SO_REUSEADDR, - &enable, + SO_REUSEADDR, + &enable, sizeof(int)) < 0) { - LOG_DBGF("Setsockopt(SO_REUSEADDR) failed."); + LOG_WARN("Setsockopt(SO_REUSEADDR) failed."); } + shim_data(_ipcp)->s_fd = fd; + shim_data(_ipcp)->ip_addr = conf->ip_addr; + shim_data(_ipcp)->dns_addr = conf->dns_addr; + shim_data(_ipcp)->s_saddr.sin_family = AF_INET; shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr; shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT; - if (bind(shim_data(_ipcp)->s_fd, + if (bind(fd, (struct sockaddr *) &shim_data(_ipcp)->s_saddr, sizeof shim_data(_ipcp)->s_saddr ) < 0) { LOG_ERR("Couldn't bind to %s.", ipstr); return -1; } - FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_wrlock(&_ap_instance->thread_lock); pthread_create(&_ap_instance->handler, NULL, @@ -450,8 +631,25 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) ipcp_udp_sdu_reader, NULL); + pthread_create(&_ap_instance->sduloop, + NULL, + ipcp_udp_sdu_loop, + NULL); + + rw_lock_unlock(&_ap_instance->thread_lock); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + + rw_lock_wrlock(&_ipcp->state_lock); + _ipcp->state = IPCP_ENROLLED; + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", getpid()); @@ -596,16 +794,21 @@ static int ipcp_udp_name_reg(char * name) uint32_t ip_addr; #endif - if (_ipcp->state != IPCP_ENROLLED) { - LOG_DBGF("Won't register with non-enrolled IPCP."); - return -1; - } - if (strlen(name) > 24) { LOG_ERR("DNS names cannot be longer than 24 chars."); return -1; } + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Won't register with non-enrolled IPCP."); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(_ipcp->data, name)) { LOG_ERR("Failed to add %s to local registry.", name); return -1; @@ -660,6 +863,16 @@ static int ipcp_udp_name_unreg(char * name) #ifdef CONFIG_OUROBOROS_ENABLE_DNS /* unregister application with DNS server */ + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("IPCP is not enrolled"); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + dns_addr = shim_data(_ipcp)->dns_addr; if (dns_addr != 0) { if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) @@ -698,10 +911,21 @@ static int ipcp_udp_flow_alloc(int port_id, #ifdef CONFIG_OUROBOROS_ENABLE_DNS uint32_t dns_addr = 0; #endif + struct shm_ap_rbuff * rb; if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { @@ -727,6 +951,7 @@ static int ipcp_udp_flow_alloc(int port_id, #ifdef CONFIG_OUROBOROS_ENABLE_DNS dns_addr = shim_data(_ipcp)->dns_addr; + if (dns_addr != 0) { ip_addr = ddns_resolve(dst_name, dns_addr); if (ip_addr == 0) { @@ -790,35 +1015,45 @@ static int ipcp_udp_flow_alloc(int port_id, free(recv_buf); - _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); - if (_ap_instance->flows[fd].rb == NULL) { + rb = shm_ap_rbuff_open(n_pid); + if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); close(fd); - return -1; + return -1; /* -ENORBUFF */ } + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = rb; + + rw_lock_unlock(&_ap_instance->flows_lock); /* tell IRMd that flow allocation "worked" */ if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { + shm_ap_rbuff_close(rb); LOG_ERR("Failed to notify IRMd about flow allocation reply"); close(fd); - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); return -1; } + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd); return fd; } static int ipcp_udp_flow_alloc_resp(int port_id, - pid_t n_pid, - int response) + pid_t n_pid, + int response) { + struct shm_ap_rbuff * rb; int fd = port_id_to_fd(port_id); if (fd < 0) { LOG_DBGF("Could not find flow with port_id %d.", port_id); @@ -830,22 +1065,44 @@ static int ipcp_udp_flow_alloc_resp(int port_id, /* awaken pending flow */ + rw_lock_rdlock(&_ap_instance->flows_lock); + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + rw_lock_unlock(&_ap_instance->flows_lock); + LOG_DBGF("Flow was not pending."); return -1; } - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); - if (_ap_instance->flows[fd].rb == NULL) { + rw_lock_unlock(&_ap_instance->flows_lock); + + rb = shm_ap_rbuff_open(n_pid); + if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); + + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].state = FLOW_NULL; _ap_instance->flows[fd].port_id = 0; + + rw_lock_unlock(&_ap_instance->flows_lock); + return 0; } + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = rb; + + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); return 0; @@ -854,18 +1111,33 @@ static int ipcp_udp_flow_alloc_resp(int port_id, static int ipcp_udp_flow_dealloc(int port_id) { int fd = port_id_to_fd(port_id); + struct shm_ap_rbuff * rb; + if (fd < 0) { LOG_DBGF("Could not find flow with port_id %d.", port_id); return 0; } + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].state = FLOW_NULL; _ap_instance->flows[fd].port_id = 0; - if (_ap_instance->flows[fd].rb != NULL) - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + rb = _ap_instance->flows[fd].rb; + _ap_instance->flows[fd].rb = NULL; + + rw_lock_unlock(&_ap_instance->flows_lock); + + if (rb != NULL) + shm_ap_rbuff_close(rb); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + close(fd); + return 0; } @@ -878,7 +1150,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name) if (shim_ap_init(ap_name) < 0) return NULL; - i = malloc(sizeof *i); + i = ipcp_instance_create(); if (i == NULL) return NULL; @@ -915,45 +1187,17 @@ static struct ipcp * ipcp_udp_create(char * ap_name) #ifndef MAKE_CHECK -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ -/* FIXME: stop eating the CPU */ -static void * ipcp_udp_sdu_loop(void * o) -{ - while (true) { - struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); - int fd; - int len = 0; - char * buf; - if (e == NULL) - continue; - - len = shm_du_map_read_sdu((uint8_t **) &buf, - _ap_instance->dum, - e->index); - if (len == -1) - continue; - - fd = port_id_to_fd(e->port_id); - - if (fd == -1) - continue; - - if (len == 0) - continue; - - send(fd, buf, len, 0); - - shm_release_du_buff(_ap_instance->dum, e->index); - } - - return (void *) 1; -} - int main (int argc, char * argv[]) { /* argument 1: pid of irmd ? */ /* argument 2: ap name */ struct sigaction sig_act; + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + sigaddset(&sigset, SIGQUIT); + sigaddset(&sigset, SIGHUP); + sigaddset(&sigset, SIGPIPE); if (ipcp_arg_check(argc, argv)) { LOG_ERR("Wrong arguments."); @@ -981,13 +1225,19 @@ int main (int argc, char * argv[]) exit(1); } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); - pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); - pthread_join(_ap_instance->sduloop, NULL); + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + pthread_join(_ap_instance->mainloop, NULL); - pthread_join(_ap_instance->handler, NULL); - pthread_join(_ap_instance->sdu_reader, NULL); + + shim_ap_fini(); + + free(_ipcp->data); + free(_ipcp->ops); + free(_ipcp); exit(0); } diff --git a/src/lib/dev.c b/src/lib/dev.c index ae27a05f..440f40f9 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -31,6 +31,7 @@ #include <ouroboros/shm_du_map.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/utils.h> +#include <ouroboros/rw_lock.h> #include <stdlib.h> #include <string.h> @@ -47,9 +48,11 @@ struct ap_data { instance_name_t * api; struct shm_du_map * dum; struct bmp * fds; - struct shm_ap_rbuff * rb; + rw_lock_t data_lock; + struct flow flows[AP_MAX_FLOWS]; + rw_lock_t flows_lock; } * _ap_instance; int ap_init(char * ap_name) @@ -92,14 +95,19 @@ int ap_init(char * ap_name) _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { instance_name_destroy(_ap_instance->api); + shm_du_map_close(_ap_instance->dum); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; } - for (i = 0; i < AP_MAX_FLOWS; ++i) + for (i = 0; i < AP_MAX_FLOWS; ++i) { _ap_instance->flows[i].rb = NULL; + _ap_instance->flows[i].port_id = -1; + } + rw_lock_init(&_ap_instance->flows_lock); + rw_lock_init(&_ap_instance->data_lock); return 0; } @@ -110,6 +118,9 @@ void ap_fini(void) if (_ap_instance == NULL) return; + + rw_lock_wrlock(&_ap_instance->data_lock); + if (_ap_instance->api != NULL) instance_name_destroy(_ap_instance->api); if (_ap_instance->fds != NULL) @@ -122,6 +133,8 @@ void ap_fini(void) if (_ap_instance->flows[i].rb != NULL) shm_ap_rbuff_close(_ap_instance->flows[i].rb); + rw_lock_unlock(&_ap_instance->data_lock); + free(_ap_instance); } @@ -142,7 +155,7 @@ int ap_reg(char ** difs, { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = bmp_allocate(_ap_instance->fds); + int fd = -1; if (difs == NULL || len == 0 || @@ -157,11 +170,16 @@ int ap_reg(char ** difs, msg.code = IRM_MSG_CODE__IRM_AP_REG; msg.has_pid = true; - msg.pid = _ap_instance->api->id; - msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; msg.n_dif_name = len; + rw_lock_rdlock(&_ap_instance->data_lock); + + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; + + rw_lock_unlock(&_ap_instance->data_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -176,6 +194,12 @@ int ap_reg(char ** difs, irm_msg__free_unpacked(recv_msg, NULL); + rw_lock_wrlock(&_ap_instance->data_lock); + + fd = bmp_allocate(_ap_instance->fds); + + rw_lock_unlock(&_ap_instance->data_lock); + return fd; } @@ -194,11 +218,16 @@ int ap_unreg(char ** difs, msg.code = IRM_MSG_CODE__IRM_AP_UNREG; msg.has_pid = true; - msg.pid = _ap_instance->api->id; - msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; msg.n_dif_name = len; + rw_lock_rdlock(&_ap_instance->data_lock); + + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; + + rw_lock_unlock(&_ap_instance->data_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -224,8 +253,13 @@ int flow_accept(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; + + rw_lock_rdlock(&_ap_instance->data_lock); + msg.pid = _ap_instance->api->id; + rw_lock_unlock(&_ap_instance->data_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -235,18 +269,8 @@ int flow_accept(int fd, return -1; } - cfd = bmp_allocate(_ap_instance->fds); - - _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); - if (_ap_instance->flows[cfd].rb == NULL) { - bmp_release(_ap_instance->fds, cfd); - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - *ap_name = strdup(recv_msg->ap_name); if (*ap_name == NULL) { - bmp_release(_ap_instance->fds, cfd); irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -254,21 +278,46 @@ int flow_accept(int fd, if (ae_name != NULL) { *ae_name = strdup(recv_msg->ae_name); if (*ae_name == NULL) { - bmp_release(_ap_instance->fds, cfd); irm_msg__free_unpacked(recv_msg, NULL); return -1; } } + rw_lock_wrlock(&_ap_instance->data_lock); + + cfd = bmp_allocate(_ap_instance->fds); + + rw_lock_unlock(&_ap_instance->data_lock); + + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[cfd].rb == NULL) { + rw_lock_wrlock(&_ap_instance->data_lock); + + bmp_release(_ap_instance->fds, cfd); + + rw_lock_unlock(&_ap_instance->data_lock); + + irm_msg__free_unpacked(recv_msg, NULL); + + rw_lock_unlock(&_ap_instance->flows_lock); + return -1; + } + _ap_instance->flows[cfd].port_id = recv_msg->port_id; _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT; - + rw_lock_unlock(&_ap_instance->flows_lock); irm_msg__free_unpacked(recv_msg, NULL); + rw_lock_wrlock(&_ap_instance->data_lock); + bmp_release(_ap_instance->fds, fd); + rw_lock_unlock(&_ap_instance->data_lock); + return cfd; } @@ -281,9 +330,21 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_pid = true; + + rw_lock_rdlock(&_ap_instance->data_lock); + msg.pid = _ap_instance->api->id; + + rw_lock_unlock(&_ap_instance->data_lock); + msg.has_port_id = true; + + rw_lock_rdlock(&_ap_instance->flows_lock); + msg.port_id = _ap_instance->flows[fd].port_id; + + rw_lock_unlock(&_ap_instance->flows_lock); + msg.has_response = true; msg.response = response; @@ -318,10 +379,15 @@ int flow_alloc(char * dst_name, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = dst_name; - msg.ap_name = _ap_instance->api->name; + msg.ae_name = src_ae_name; msg.has_pid = true; + + rw_lock_rdlock(&_ap_instance->data_lock); + msg.pid = _ap_instance->api->id; - msg.ae_name = src_ae_name; + msg.ap_name = _ap_instance->api->name; + + rw_lock_unlock(&_ap_instance->data_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) @@ -332,11 +398,23 @@ int flow_alloc(char * dst_name, return -1; } + rw_lock_wrlock(&_ap_instance->data_lock); + fd = bmp_allocate(_ap_instance->fds); + rw_lock_unlock(&_ap_instance->data_lock); + + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); if (_ap_instance->flows[fd].rb == NULL) { + rw_lock_wrlock(&_ap_instance->data_lock); + bmp_release(_ap_instance->fds, fd); + + rw_lock_unlock(&_ap_instance->data_lock); + + rw_lock_unlock(&_ap_instance->flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -344,6 +422,8 @@ int flow_alloc(char * dst_name, _ap_instance->flows[fd].port_id = recv_msg->port_id; _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + rw_lock_unlock(&_ap_instance->flows_lock); + irm_msg__free_unpacked(recv_msg, NULL); return fd; @@ -357,8 +437,13 @@ int flow_alloc_res(int fd) msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; msg.has_port_id = true; + + rw_lock_rdlock(&_ap_instance->flows_lock); + msg.port_id = _ap_instance->flows[fd].port_id; + rw_lock_unlock(&_ap_instance->flows_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -382,8 +467,14 @@ int flow_dealloc(int fd) msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; msg.has_port_id = true; + + + rw_lock_rdlock(&_ap_instance->data_lock); + msg.port_id = _ap_instance->flows[fd].port_id; + rw_lock_unlock(&_ap_instance->data_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -401,42 +492,77 @@ int flow_dealloc(int fd) int flow_cntl(int fd, int cmd, int oflags) { - int old = _ap_instance->flows[fd].oflags; + int old; + + rw_lock_wrlock(&_ap_instance->flows_lock); + + old = _ap_instance->flows[fd].oflags; + switch (cmd) { case FLOW_F_GETFL: /* GET FLOW FLAGS */ - return _ap_instance->flows[fd].oflags; + rw_lock_unlock(&_ap_instance->flows_lock); + return old; case FLOW_F_SETFL: /* SET FLOW FLAGS */ _ap_instance->flows[fd].oflags = oflags; + rw_lock_unlock(&_ap_instance->flows_lock); return old; default: + rw_lock_unlock(&_ap_instance->flows_lock); return FLOW_O_INVALID; /* unknown command */ } } ssize_t flow_write(int fd, void * buf, size_t count) { - size_t index = shm_create_du_buff(_ap_instance->dum, - count + DU_BUFF_HEADSPACE + - DU_BUFF_TAILSPACE, - DU_BUFF_HEADSPACE, - (uint8_t *) buf, - count); - struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; - if (index == -1) + size_t index; + struct rb_entry e; + + if (buf == NULL) + return 0; + + rw_lock_rdlock(&_ap_instance->data_lock); + + index = shm_create_du_buff(_ap_instance->dum, + count + DU_BUFF_HEADSPACE + + DU_BUFF_TAILSPACE, + DU_BUFF_HEADSPACE, + (uint8_t *) buf, + count); + if (index == -1) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } + + rw_lock_rdlock(&_ap_instance->flows_lock); + + e.index = index; + e.port_id = _ap_instance->flows[fd].port_id; if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { shm_release_du_buff(_ap_instance->dum, index); + + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + return -EPIPE; } + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + return 0; } else { while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) - ; + LOG_DBGF("Couldn't write to rbuff."); } + rw_lock_unlock(&_ap_instance->data_lock); + + rw_lock_unlock(&_ap_instance->flows_lock); + return 0; } @@ -446,27 +572,41 @@ ssize_t flow_read(int fd, void * buf, size_t count) int n; uint8_t * sdu; + rw_lock_rdlock(&_ap_instance->data_lock); + + rw_lock_rdlock(&_ap_instance->flows_lock); + if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { e = shm_ap_rbuff_read(_ap_instance->rb); } else { - /* FIXME: move this to a thread */ + + /* FIXME: this will throw away packets for other fd's */ while (e == NULL || - e->port_id != _ap_instance->flows[fd].port_id) + e->port_id != _ap_instance->flows[fd].port_id) { e = shm_ap_rbuff_read(_ap_instance->rb); + } } - if (e == NULL) + rw_lock_unlock(&_ap_instance->flows_lock); + + if (e == NULL) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } n = shm_du_map_read_sdu(&sdu, _ap_instance->dum, e->index); - if (n < 0) + if (n < 0) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } memcpy(buf, sdu, MIN(n, count)); shm_release_du_buff(_ap_instance->dum, e->index); + rw_lock_unlock(&_ap_instance->data_lock); + return n; } diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6c04ccc5..da6f0e33 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -253,8 +253,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) } e = malloc(sizeof(*e)); - if (e == NULL) + if (e == NULL) { + pthread_mutex_unlock(rb->shm_mutex); return NULL; + } *e = *(rb->shm_base + *rb->ptr_tail); |