diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-05-15 22:42:47 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-05-15 22:42:47 +0200 |
commit | 3f5b31d49139968a84c42d5d3067d06edefa3aae (patch) | |
tree | f860fa136bf8fcf4ee89881ac41bd39f6c7cd311 /src/ipcpd/shim-udp | |
parent | 8f79d80e7fe7f52f310edddc73589f4f71457747 (diff) | |
download | ouroboros-3f5b31d49139968a84c42d5d3067d06edefa3aae.tar.gz ouroboros-3f5b31d49139968a84c42d5d3067d06edefa3aae.zip |
ipcpd: shim-udp: Revised locking
Simplified locking to take only two locks: the first lock guards the
state of the ipcp. This lock must be held for writing on bootstrap and
closing, and held for reading during all other operations. The second
lock guards operations on flows, and must be held for writing during
allocation and deallocation, and held for reading when sending sdu's.
After adding a fd to FD_SET, the shim will wait for 1 ms to ensure
that the FD is added to the select call.
Diffstat (limited to 'src/ipcpd/shim-udp')
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 191 |
1 files changed, 97 insertions, 94 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 74fa0d2b..300a5748 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -50,6 +50,7 @@ #include <stdlib.h> #include <pthread.h> #include <sys/wait.h> +#include <fcntl.h> #define THIS_TYPE IPCP_SHIM_UDP #define LISTEN_PORT htons(0x0D1F) @@ -152,8 +153,6 @@ static int shim_ap_init(char * ap_name) } rw_lock_init(&_ap_instance->flows_lock); - rw_lock_init(&_ap_instance->thread_lock); - rw_lock_init(&_ap_instance->data_lock); return 0; } @@ -165,7 +164,10 @@ void shim_ap_fini() if (_ap_instance == NULL) return; - rw_lock_wrlock(&_ap_instance->data_lock); + rw_lock_wrlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_SHUTDOWN) + LOG_WARN("Cleaning up AP while not in shutdown."); if (_ap_instance->api != NULL) instance_name_destroy(_ap_instance->api); @@ -184,9 +186,9 @@ void shim_ap_fini() rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ap_instance->data_lock); - free(_ap_instance); + + rw_lock_unlock(&_ipcp->state_lock); } /* only call this under flows_lock */ @@ -209,12 +211,12 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) size_t index; struct rb_entry e; - rw_lock_rdlock(&_ap_instance->data_lock); + rw_lock_rdlock(&_ipcp->state_lock); index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count); if (index == -1) { - rw_lock_unlock(&_ap_instance->data_lock); + rw_lock_unlock(&_ipcp->state_lock); return -1; } @@ -226,17 +228,13 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { rw_lock_unlock(&_ap_instance->flows_lock); - shm_release_du_buff(_ap_instance->dum, index); - - rw_lock_unlock(&_ap_instance->data_lock); - + rw_lock_unlock(&_ipcp->state_lock); return -EPIPE; } rw_lock_unlock(&_ap_instance->flows_lock); - - rw_lock_unlock(&_ap_instance->data_lock); + rw_lock_unlock(&_ipcp->state_lock); return 0; } @@ -256,8 +254,8 @@ struct ipcp_udp_data { struct sockaddr_in s_saddr; int s_fd; + /* only modify under _ap_instance->flows_lock */ fd_set flow_fd_s; - rw_lock_t fd_lock; }; struct ipcp_udp_data * ipcp_udp_data_create() @@ -279,8 +277,6 @@ struct ipcp_udp_data * ipcp_udp_data_create() return NULL; } - rw_lock_init(&udp_data->fd_lock); - FD_ZERO(&udp_data->flow_fd_s); return udp_data; @@ -309,10 +305,6 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) clean_threads = true; } - _ipcp->state = IPCP_SHUTDOWN; - - rw_lock_unlock(&_ipcp->state_lock); - if (clean_threads) { rw_lock_wrlock(&_ap_instance->thread_lock); @@ -329,6 +321,10 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) pthread_cancel(_ap_instance->mainloop); + _ipcp->state = IPCP_SHUTDOWN; + + rw_lock_unlock(&_ipcp->state_lock); + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); } @@ -349,18 +345,25 @@ static void * ipcp_udp_listener() while (true) { int fd; int port_id; + + rw_lock_rdlock(&_ipcp->state_lock); + memset(&buf, 0, SHIM_UDP_BUF_SIZE); n = sizeof c_saddr; n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, (struct sockaddr *) &c_saddr, (unsigned *) &n); - if (n < 0) + if (n < 0) { + rw_lock_unlock(&_ipcp->state_lock); continue; + } /* flow alloc request from other host */ if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr, sizeof(c_saddr.sin_addr.s_addr), AF_INET) - == NULL) + == NULL) { + rw_lock_unlock(&_ipcp->state_lock); continue; + } fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); @@ -383,12 +386,14 @@ static void * ipcp_udp_listener() if (connect(fd, (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { + rw_lock_unlock(&_ipcp->state_lock); close(fd); continue; } /* echo back the packet */ if (send(fd, buf, strlen(buf), 0) < 0) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to echo back the packet."); close(fd); continue; @@ -403,9 +408,10 @@ static void * ipcp_udp_listener() UNKNOWN_AE); if (port_id < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Could not get port id from IRMd"); close(fd); - rw_lock_unlock(&_ap_instance->flows_lock); continue; } @@ -414,6 +420,7 @@ static void * ipcp_udp_listener() _ap_instance->flows[fd].state = FLOW_PENDING; rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", port_id, fd); @@ -427,9 +434,10 @@ static void * ipcp_udp_sdu_reader() int n; int fd; char buf[SHIM_UDP_MAX_SDU_SIZE]; - struct timeval tv = {0, 1000}; + struct timeval tv = {0, 750}; struct sockaddr_in r_saddr; fd_set read_fds; + int flags; while (true) { rw_lock_rdlock(&_ipcp->state_lock); @@ -439,20 +447,21 @@ static void * ipcp_udp_sdu_reader() return (void *) 0; } - rw_lock_unlock(&_ipcp->state_lock); - - rw_lock_rdlock(&shim_data(_ipcp)->fd_lock); + rw_lock_rdlock(&_ap_instance->flows_lock); read_fds = shim_data(_ipcp)->flow_fd_s; - rw_lock_unlock(&shim_data(_ipcp)->fd_lock); - - if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) + if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); continue; + } for (fd = 0; fd < FD_SETSIZE; ++fd) { if (!FD_ISSET(fd, &read_fds)) continue; + flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); n = sizeof r_saddr; if ((n = recvfrom(fd, @@ -467,6 +476,9 @@ static void * ipcp_udp_sdu_reader() if (ipcp_udp_flow_write(fd, buf, n) < 0) LOG_ERR("Failed to write SDU."); } + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); } return (void *) 0; @@ -488,14 +500,10 @@ static void * ipcp_udp_sdu_loop(void * o) return (void *) 0; } - rw_lock_unlock(&_ipcp->state_lock); - - rw_lock_rdlock(&_ap_instance->data_lock); - e = shm_ap_rbuff_read(_ap_instance->rb); if (e == NULL) { - rw_lock_unlock(&_ap_instance->data_lock); + rw_lock_rdlock(&_ipcp->state_lock); continue; } @@ -503,7 +511,7 @@ static void * ipcp_udp_sdu_loop(void * o) _ap_instance->dum, e->index); if (len == -1) { - rw_lock_unlock(&_ap_instance->data_lock); + rw_lock_rdlock(&_ipcp->state_lock); free(e); continue; } @@ -512,30 +520,27 @@ static void * ipcp_udp_sdu_loop(void * o) fd = port_id_to_fd(e->port_id); - rw_lock_unlock(&_ap_instance->flows_lock); - if (fd == -1) { - rw_lock_unlock(&_ap_instance->data_lock); + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); free(e); continue; } if (len == 0) { - rw_lock_unlock(&_ap_instance->data_lock); + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); free(e); continue; } - rw_lock_unlock(&_ap_instance->data_lock); - if (send(fd, buf, len, 0) < 0) LOG_ERR("Failed to send SDU."); - rw_lock_rdlock(&_ap_instance->data_lock); - shm_release_du_buff(_ap_instance->dum, e->index); - rw_lock_unlock(&_ap_instance->data_lock); + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); free(e); } @@ -563,14 +568,11 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) return -1; } - _ipcp->state = IPCP_BOOTSTRAPPING; - - rw_lock_unlock(&_ipcp->state_lock); - if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, INET_ADDRSTRLEN) == NULL) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to convert IP address"); return -1; } @@ -580,6 +582,7 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) &conf->dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to convert DNS address"); return -1; } @@ -592,6 +595,7 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) /* UDP listen server */ if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Can't create socket."); return -1; } @@ -615,11 +619,12 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) if (bind(fd, (struct sockaddr *) &shim_data(_ipcp)->s_saddr, sizeof shim_data(_ipcp)->s_saddr ) < 0) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Couldn't bind to %s.", ipstr); return -1; } - rw_lock_wrlock(&_ap_instance->thread_lock); + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); pthread_create(&_ap_instance->handler, NULL, @@ -635,16 +640,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) ipcp_udp_sdu_loop, NULL); - rw_lock_unlock(&_ap_instance->thread_lock); - - rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); - - FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); - - rw_lock_unlock(&shim_data(_ipcp)->fd_lock); - - rw_lock_wrlock(&_ipcp->state_lock); - _ipcp->state = IPCP_ENROLLED; rw_lock_unlock(&_ipcp->state_lock); @@ -806,9 +801,8 @@ static int ipcp_udp_name_reg(char * name) return -1; /* -ENOTENROLLED */ } - rw_lock_unlock(&_ipcp->state_lock); - if (ipcp_data_add_reg_entry(_ipcp->data, name)) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to add %s to local registry.", name); return -1; } @@ -822,11 +816,13 @@ static int ipcp_udp_name_reg(char * name) if (inet_ntop(AF_INET, &ip_addr, ipstr, INET_ADDRSTRLEN) == NULL) { + rw_lock_unlock(&_ipcp->state_lock); return -1; } if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { + rw_lock_unlock(&_ipcp->state_lock); return -1; } @@ -835,11 +831,14 @@ static int ipcp_udp_name_reg(char * name) if (ddns_send(cmd)) { ipcp_data_del_reg_entry(_ipcp->data, name); + rw_lock_unlock(&_ipcp->state_lock); return -1; } } #endif + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBG("Registered %s.", name); return 0; @@ -870,12 +869,11 @@ static int ipcp_udp_name_unreg(char * name) return -1; /* -ENOTENROLLED */ } - rw_lock_unlock(&_ipcp->state_lock); - dns_addr = shim_data(_ipcp)->dns_addr; if (dns_addr != 0) { if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { + rw_lock_unlock(&_ipcp->state_lock); return -1; } sprintf(cmd, "server %s\nupdate delete %s A\nsend\nquit\n", @@ -887,6 +885,8 @@ static int ipcp_udp_name_unreg(char * name) ipcp_data_del_reg_entry(_ipcp->data, name); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBG("Unregistered %s.", name); return 0; @@ -911,6 +911,7 @@ static int ipcp_udp_flow_alloc(int port_id, uint32_t dns_addr = 0; #endif struct shm_ap_rbuff * rb; + struct timespec wait = {0, 1000000}; if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; @@ -923,11 +924,10 @@ static int ipcp_udp_flow_alloc(int port_id, return -1; /* -ENOTENROLLED */ } - rw_lock_unlock(&_ipcp->state_lock); - if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Name too long for this shim."); return -1; } @@ -944,6 +944,7 @@ static int ipcp_udp_flow_alloc(int port_id, l_saddr.sin_port = 0; if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { + rw_lock_unlock(&_ipcp->state_lock); close(fd); return -1; } @@ -954,6 +955,7 @@ static int ipcp_udp_flow_alloc(int port_id, if (dns_addr != 0) { ip_addr = ddns_resolve(dst_name, dns_addr); if (ip_addr == 0) { + rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not resolve %s.", dst_name); close(fd); return -1; @@ -962,6 +964,7 @@ static int ipcp_udp_flow_alloc(int port_id, #endif h = gethostbyname(dst_name); if (h == NULL) { + rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not resolve %s.", dst_name); close(fd); return -1; @@ -979,6 +982,7 @@ static int ipcp_udp_flow_alloc(int port_id, if (sendto(fd, dst_name, strlen(dst_name), 0, (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to send packet"); close(fd); return -1; @@ -988,6 +992,7 @@ static int ipcp_udp_flow_alloc(int port_id, recv_buf = malloc(strlen(dst_name) + 1); if (recv_buf == NULL) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to malloc recv_buff."); close(fd); return -1; @@ -1004,6 +1009,7 @@ static int ipcp_udp_flow_alloc(int port_id, (struct sockaddr *) &rf_saddr, sizeof rf_saddr) < 0) { + rw_lock_unlock(&_ipcp->state_lock); close(fd); free(recv_buf); return -1; @@ -1016,6 +1022,7 @@ static int ipcp_udp_flow_alloc(int port_id, rb = shm_ap_rbuff_open(n_pid); if (rb == NULL) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Could not open N + 1 ringbuffer."); close(fd); return -1; /* -ENORBUFF */ @@ -1026,22 +1033,23 @@ static int ipcp_udp_flow_alloc(int port_id, _ap_instance->flows[fd].state = FLOW_ALLOCATED; _ap_instance->flows[fd].rb = rb; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + + nanosleep(&wait, NULL); + rw_lock_unlock(&_ap_instance->flows_lock); /* tell IRMd that flow allocation "worked" */ if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { + rw_lock_unlock(&_ipcp->state_lock); shm_ap_rbuff_close(rb); LOG_ERR("Failed to notify IRMd about flow allocation reply"); close(fd); return -1; } - rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); - - FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - - rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + rw_lock_unlock(&_ipcp->state_lock); LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd); @@ -1053,58 +1061,52 @@ static int ipcp_udp_flow_alloc_resp(int port_id, int response) { struct shm_ap_rbuff * rb; + struct timespec wait = {0, 1000000}; int fd = -1; if (response) return 0; + rw_lock_unlock(&_ipcp->state_lock); + /* awaken pending flow */ - rw_lock_rdlock(&_ap_instance->flows_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); fd = port_id_to_fd(port_id); if (fd < 0) { rw_lock_unlock(&_ap_instance->flows_lock); - + rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not find flow with port_id %d.", port_id); return 0; } if (_ap_instance->flows[fd].state != FLOW_PENDING) { rw_lock_unlock(&_ap_instance->flows_lock); - + rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Flow was not pending."); return -1; } - rw_lock_unlock(&_ap_instance->flows_lock); - rb = shm_ap_rbuff_open(n_pid); if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); - - rw_lock_wrlock(&_ap_instance->flows_lock); - _ap_instance->flows[fd].state = FLOW_NULL; _ap_instance->flows[fd].port_id = 0; - rw_lock_unlock(&_ap_instance->flows_lock); - + rw_lock_unlock(&_ipcp->state_lock); return 0; } - rw_lock_wrlock(&_ap_instance->flows_lock); - _ap_instance->flows[fd].state = FLOW_ALLOCATED; _ap_instance->flows[fd].rb = rb; - rw_lock_unlock(&_ap_instance->flows_lock); - - rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); - FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + nanosleep(&wait, NULL); + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); @@ -1115,13 +1117,15 @@ static int ipcp_udp_flow_dealloc(int port_id) { int fd = -1; struct shm_ap_rbuff * rb; + struct timespec wait = {0, 1000000}; + rw_lock_rdlock(&_ipcp->state_lock); rw_lock_wrlock(&_ap_instance->flows_lock); fd = port_id_to_fd(port_id); if (fd < 0) { rw_lock_unlock(&_ap_instance->flows_lock); - + rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not find flow with port_id %d.", port_id); return 0; } @@ -1131,19 +1135,18 @@ static int ipcp_udp_flow_dealloc(int port_id) rb = _ap_instance->flows[fd].rb; _ap_instance->flows[fd].rb = NULL; - rw_lock_unlock(&_ap_instance->flows_lock); - if (rb != NULL) shm_ap_rbuff_close(rb); - rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); - FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); - rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + nanosleep(&wait, NULL); close(fd); + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + return 0; } |