From 037fec33cda726d0078e23798f462ad273153dd5 Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@intec.ugent.be>
Date: Sat, 14 May 2016 16:39:27 +0200
Subject: ipcpd: shim-udp: complete locking

Added necessary locks for the shim-udp.  This PR also improves thread
management, the main thread now starts a mainloop thread, which spawns
sdu handler threads when it the IPCP is enrolled. If the IPCP exits
the enrolled state, the sdu loop is cancelled.
---
 src/ipcpd/ipcp.c          |  34 +++-
 src/ipcpd/ipcp.h          |   6 +-
 src/ipcpd/shim-udp/main.c | 455 +++++++++++++++++++++++++++++++++++-----------
 src/lib/shm_ap_rbuff.c    |   9 +-
 4 files changed, 390 insertions(+), 114 deletions(-)

diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 13632a80..060178bf 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -29,6 +29,22 @@
 #define OUROBOROS_PREFIX "ipcpd/ipcp"
 #include <ouroboros/logs.h>
 
+struct ipcp * ipcp_instance_create()
+{
+        struct ipcp * i = malloc(sizeof *i);
+        if (i == NULL)
+                return NULL;
+
+        i->data    = NULL;
+        i->ops     = NULL;
+        i->irmd_fd = -1;
+        i->state   = IPCP_INIT;
+
+        rw_lock_init(&i->state_lock);
+
+        return i;
+}
+
 int ipcp_arg_check(int argc, char * argv[])
 {
         if (argc != 3)
@@ -52,25 +68,33 @@ void * ipcp_main_loop(void * o)
         uint8_t buf[IPCP_MSG_BUF_SIZE];
         struct ipcp * _ipcp = (struct ipcp *) o;
 
-        ipcp_msg_t *    msg;
-        ssize_t         count;
-        buffer_t        buffer;
-        ipcp_msg_t      ret_msg = IPCP_MSG__INIT;
+        ipcp_msg_t * msg;
+        ssize_t      count;
+        buffer_t     buffer;
+        ipcp_msg_t   ret_msg = IPCP_MSG__INIT;
 
         dif_config_msg_t * conf_msg;
         struct dif_config  conf;
 
+        char * sock_path;
+
         if (_ipcp == NULL) {
                 LOG_ERR("Invalid ipcp struct.");
                 return (void *) 1;
         }
 
-        sockfd = server_socket_open(ipcp_sock_path(getpid()));
+        sock_path = ipcp_sock_path(getpid());
+        if (sock_path == NULL)
+                return (void *) 1;
+
+        sockfd = server_socket_open(sock_path);
         if (sockfd < 0) {
                 LOG_ERR("Could not open server socket.");
                 return (void *) 1;
         }
 
+        free(sock_path);
+
         while (true) {
                 ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
 
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 393af994..c9002d4d 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -23,6 +23,8 @@
 #ifndef IPCPD_IPCP_H
 #define IPCPD_IPCP_H
 
+#include <ouroboros/rw_lock.h>
+
 #include "ipcp-ops.h"
 #include "ipcp-data.h"
 
@@ -38,11 +40,13 @@ enum ipcp_state {
 struct ipcp {
         struct ipcp_data * data;
         struct ipcp_ops *  ops;
+        int                irmd_fd;
 
         enum ipcp_state    state;
-        int                irmd_fd;
+        rw_lock_t          state_lock;
 };
 
+struct ipcp * ipcp_instance_create();
 void * ipcp_main_loop(void * o);
 void * ipcp_sdu_loop(void * o);
 int ipcp_arg_check(int argc, char * argv[]);
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 14a698ee..3296540e 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -33,6 +33,7 @@
 #include <ouroboros/sockets.h>
 #include <ouroboros/bitmap.h>
 #include <ouroboros/dev.h>
+#include <ouroboros/rw_lock.h>
 
 #define OUROBOROS_PREFIX "ipcpd/shim-udp"
 
@@ -81,18 +82,25 @@ struct shim_ap_data {
         instance_name_t *     api;
         struct shm_du_map *   dum;
         struct bmp *          fds;
-
         struct shm_ap_rbuff * rb;
+        rw_lock_t             data_lock;
+
         struct flow           flows[AP_MAX_FLOWS];
+        rw_lock_t             flows_lock;
+
+        pthread_t             mainloop;
+        pthread_t             sduloop;
+        pthread_t             handler;
+        pthread_t             sdu_reader;
+
+        rw_lock_t             thread_lock;
 
-        pthread_t mainloop;
-        pthread_t sduloop;
-        pthread_t handler;
-        pthread_t sdu_reader;
 } * _ap_instance;
 
 static int shim_ap_init(char * ap_name)
 {
+        int i;
+
         _ap_instance = malloc(sizeof(struct shim_ap_data));
         if (_ap_instance == NULL) {
                 return -1;
@@ -130,11 +138,22 @@ static int shim_ap_init(char * ap_name)
         _ap_instance->rb = shm_ap_rbuff_create();
         if (_ap_instance->rb == NULL) {
                 instance_name_destroy(_ap_instance->api);
+                shm_du_map_close(_ap_instance->dum);
                 bmp_destroy(_ap_instance->fds);
                 free(_ap_instance);
                 return -1;
         }
 
+        for (i = 0; i < AP_MAX_FLOWS; i ++) {
+                _ap_instance->flows[i].rb = NULL;
+                _ap_instance->flows[i].port_id = -1;
+                _ap_instance->flows[i].state = FLOW_NULL;
+        }
+
+        rw_lock_init(&_ap_instance->flows_lock);
+        rw_lock_init(&_ap_instance->thread_lock);
+        rw_lock_init(&_ap_instance->data_lock);
+
         return 0;
 }
 
@@ -144,6 +163,9 @@ void shim_ap_fini()
 
         if (_ap_instance == NULL)
                 return;
+
+        rw_lock_wrlock(&_ap_instance->data_lock);
+
         if (_ap_instance->api != NULL)
                 instance_name_destroy(_ap_instance->api);
         if (_ap_instance->fds != NULL)
@@ -152,41 +174,76 @@ void shim_ap_fini()
                 shm_du_map_close(_ap_instance->dum);
         if (_ap_instance->rb != NULL)
                 shm_ap_rbuff_destroy(_ap_instance->rb);
+
+        rw_lock_wrlock(&_ap_instance->flows_lock);
+
         for (i = 0; i < AP_MAX_FLOWS; i ++)
                 if (_ap_instance->flows[i].rb != NULL)
                         shm_ap_rbuff_close(_ap_instance->flows[i].rb);
 
+        rw_lock_unlock(&_ap_instance->flows_lock);
+
+        rw_lock_unlock(&_ap_instance->data_lock);
+
         free(_ap_instance);
 }
 
 static int port_id_to_fd(int port_id)
 {
         int i;
-        for (i = 0; i < AP_MAX_FLOWS; ++i)
+
+        rw_lock_rdlock(&_ap_instance->flows_lock);
+
+        for (i = 0; i < AP_MAX_FLOWS; ++i) {
                 if (_ap_instance->flows[i].port_id == port_id
-                        && _ap_instance->flows[i].state != FLOW_NULL)
+                    && _ap_instance->flows[i].state != FLOW_NULL) {
+
+                        rw_lock_unlock(&_ap_instance->flows_lock);
+
                         return i;
+                }
+        }
+
+        rw_lock_unlock(&_ap_instance->flows_lock);
+
         return -1;
 }
 
 static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
 {
         /* the AP chooses the amount of headspace and tailspace */
-        size_t index = shm_create_du_buff(_ap_instance->dum,
-                                          count,
-                                          0,
-                                          buf,
-                                          count);
-        struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
-
-        if (index == -1)
+        size_t index;
+        struct rb_entry e;
+
+        rw_lock_rdlock(&_ap_instance->data_lock);
+
+        index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count);
+
+        if (index == -1) {
+                rw_lock_unlock(&_ap_instance->data_lock);
                 return -1;
+        }
+
+        e.index = index;
+
+        rw_lock_rdlock(&_ap_instance->flows_lock);
+
+        e.port_id = _ap_instance->flows[fd].port_id;
 
         if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+                rw_lock_unlock(&_ap_instance->flows_lock);
+
                 shm_release_du_buff(_ap_instance->dum, index);
+
+                rw_lock_unlock(&_ap_instance->data_lock);
+
                 return -EPIPE;
         }
 
+        rw_lock_unlock(&_ap_instance->flows_lock);
+
+        rw_lock_unlock(&_ap_instance->data_lock);
+
         return 0;
 }
 
@@ -206,8 +263,7 @@ struct ipcp_udp_data {
         int                s_fd;
 
         fd_set flow_fd_s;
-
-        pthread_mutex_t lock;
+        rw_lock_t fd_lock;
 };
 
 struct ipcp_udp_data * ipcp_udp_data_create()
@@ -229,6 +285,8 @@ struct ipcp_udp_data * ipcp_udp_data_create()
                 return NULL;
         }
 
+        rw_lock_init(&udp_data->fd_lock);
+
         FD_ZERO(&udp_data->flow_fd_s);
 
         return udp_data;
@@ -236,21 +294,49 @@ struct ipcp_udp_data * ipcp_udp_data_create()
 
 void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
 {
+        sigset_t  sigset;
+        sigemptyset(&sigset);
+        sigaddset(&sigset, SIGINT);
+        bool clean_threads = false;
+
         switch(sig) {
         case SIGINT:
         case SIGTERM:
         case SIGHUP:
                 if (info->si_pid == irmd_pid || info->si_pid == 0) {
+                        pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
                         LOG_DBG("Terminating by order of %d. Bye.",
                                 info->si_pid);
+
+                        rw_lock_wrlock(&_ipcp->state_lock);
+
+                        if (_ipcp->state == IPCP_ENROLLED) {
+                                clean_threads = true;
+                        }
+
+                        _ipcp->state = IPCP_SHUTDOWN;
+
+                        rw_lock_unlock(&_ipcp->state_lock);
+
+                        if (clean_threads) {
+                                rw_lock_wrlock(&_ap_instance->thread_lock);
+
+                                pthread_cancel(_ap_instance->handler);
+                                pthread_cancel(_ap_instance->sdu_reader);
+                                pthread_cancel(_ap_instance->sduloop);
+
+                                pthread_join(_ap_instance->sduloop, NULL);
+                                pthread_join(_ap_instance->handler, NULL);
+                                pthread_join(_ap_instance->sdu_reader, NULL);
+
+                                rw_lock_unlock(&_ap_instance->thread_lock);
+                        }
+
                         pthread_cancel(_ap_instance->mainloop);
-                        pthread_cancel(_ap_instance->handler);
-                        pthread_cancel(_ap_instance->sdu_reader);
-                        pthread_cancel(_ap_instance->sduloop);
 
-                        /* FIXME: should be called after join */
-                        shim_ap_fini();
-                        exit(0);
+                        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+
                 }
         default:
                 return;
@@ -268,6 +354,7 @@ static void * ipcp_udp_listener()
 
         while (true) {
                 int fd;
+                int port_id;
                 memset(&buf, 0, SHIM_UDP_BUF_SIZE);
                 n = sizeof c_saddr;
                 n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,
@@ -315,21 +402,27 @@ static void * ipcp_udp_listener()
 
                 /* reply to IRM */
 
-                _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(),
-                                                                    buf,
-                                                                    UNKNOWN_AP,
-                                                                    UNKNOWN_AE);
-                if (_ap_instance->flows[fd].port_id < 0) {
+                port_id = ipcp_flow_req_arr(getpid(),
+                                            buf,
+                                            UNKNOWN_AP,
+                                            UNKNOWN_AE);
+
+                if (port_id < 0) {
                         LOG_ERR("Could not get port id from IRMd");
                         close(fd);
                         continue;
                 }
 
-                _ap_instance->flows[fd].rb     = NULL;
-                _ap_instance->flows[fd].state  = FLOW_PENDING;
+                rw_lock_wrlock(&_ap_instance->flows_lock);
+
+                _ap_instance->flows[fd].port_id = port_id;
+                _ap_instance->flows[fd].rb      = NULL;
+                _ap_instance->flows[fd].state   = FLOW_PENDING;
+
+                rw_lock_unlock(&_ap_instance->flows_lock);
 
                 LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.",
-                         _ap_instance->flows[fd].port_id, fd);
+                         port_id, fd);
         }
 
         return 0;
@@ -340,12 +433,26 @@ static void * ipcp_udp_sdu_reader()
         int n;
         int fd;
         char buf[SHIM_UDP_MAX_SDU_SIZE];
-        struct timeval tv = {0, 10};
+        struct timeval tv = {0, 1000};
         struct sockaddr_in r_saddr;
         fd_set read_fds;
 
         while (true) {
+                rw_lock_rdlock(&_ipcp->state_lock);
+
+                if (_ipcp->state != IPCP_ENROLLED) {
+                        rw_lock_unlock(&_ipcp->state_lock);
+                        return (void *) 0;
+                }
+
+                rw_lock_unlock(&_ipcp->state_lock);
+
+                rw_lock_rdlock(&shim_data(_ipcp)->fd_lock);
+
                 read_fds = shim_data(_ipcp)->flow_fd_s;
+
+                rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
                 if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)
                         continue;
 
@@ -369,22 +476,96 @@ static void * ipcp_udp_sdu_reader()
         return (void *) 0;
 }
 
+/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
+static void * ipcp_udp_sdu_loop(void * o)
+{
+        while (true) {
+                struct rb_entry * e;
+                int fd;
+                int len = 0;
+                char * buf;
+
+                rw_lock_rdlock(&_ipcp->state_lock);
+
+                if (_ipcp->state != IPCP_ENROLLED) {
+                        rw_lock_unlock(&_ipcp->state_lock);
+                        return (void *) 0;
+                }
+
+                rw_lock_unlock(&_ipcp->state_lock);
+
+                rw_lock_rdlock(&_ap_instance->data_lock);
+
+                e = shm_ap_rbuff_read(_ap_instance->rb);
+
+                if (e == NULL) {
+                        rw_lock_unlock(&_ap_instance->data_lock);
+                        continue;
+                }
+
+                len = shm_du_map_read_sdu((uint8_t **) &buf,
+                                          _ap_instance->dum,
+                                          e->index);
+                if (len == -1) {
+                        rw_lock_unlock(&_ap_instance->data_lock);
+                        free(e);
+                        continue;
+                }
+
+                fd = port_id_to_fd(e->port_id);
+
+                if (fd == -1) {
+                        rw_lock_unlock(&_ap_instance->data_lock);
+                        free(e);
+                        continue;
+                }
+
+                if (len == 0) {
+                        rw_lock_unlock(&_ap_instance->data_lock);
+                        free(e);
+                        continue;
+                }
+
+                rw_lock_unlock(&_ap_instance->data_lock);
+
+                send(fd, buf, len, 0);
+
+                rw_lock_rdlock(&_ap_instance->data_lock);
+
+                shm_release_du_buff(_ap_instance->dum, e->index);
+
+                rw_lock_unlock(&_ap_instance->data_lock);
+
+                free(e);
+        }
+
+        return (void *) 1;
+}
+
 static int ipcp_udp_bootstrap(struct dif_config * conf)
 {
         char ipstr[INET_ADDRSTRLEN];
         char dnsstr[INET_ADDRSTRLEN];
-        int enable = 1;
+        int  enable = 1;
+        int  fd = -1;
 
         if (conf->type != THIS_TYPE) {
                 LOG_ERR("Config doesn't match IPCP type.");
                 return -1;
         }
 
+        rw_lock_wrlock(&_ipcp->state_lock);
+
         if (_ipcp->state != IPCP_INIT) {
+                rw_lock_unlock(&_ipcp->state_lock);
                 LOG_ERR("IPCP in wrong state.");
                 return -1;
         }
 
+        _ipcp->state = IPCP_BOOTSTRAPPING;
+
+        rw_lock_unlock(&_ipcp->state_lock);
+
         if (inet_ntop(AF_INET,
                       &conf->ip_addr,
                       ipstr,
@@ -408,37 +589,36 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
                 strcpy(dnsstr, "not set");
         }
 
-        shim_data(_ipcp)->ip_addr  = conf->ip_addr;
-        shim_data(_ipcp)->dns_addr = conf->dns_addr;
-
         /* UDP listen server */
-
-        if ((shim_data(_ipcp)->s_fd =
-             socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
-                LOG_DBGF("Can't create socket.");
+        if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
+                LOG_ERR("Can't create socket.");
                 return -1;
         }
 
-        if (setsockopt(shim_data(_ipcp)->s_fd,
+        if (setsockopt(fd,
                        SOL_SOCKET,
-                        SO_REUSEADDR,
-                        &enable,
+                       SO_REUSEADDR,
+                       &enable,
                        sizeof(int)) < 0) {
-                LOG_DBGF("Setsockopt(SO_REUSEADDR) failed.");
+                LOG_WARN("Setsockopt(SO_REUSEADDR) failed.");
         }
 
+        shim_data(_ipcp)->s_fd     = fd;
+        shim_data(_ipcp)->ip_addr  = conf->ip_addr;
+        shim_data(_ipcp)->dns_addr = conf->dns_addr;
+
         shim_data(_ipcp)->s_saddr.sin_family      = AF_INET;
         shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr;
         shim_data(_ipcp)->s_saddr.sin_port        = LISTEN_PORT;
 
-        if (bind(shim_data(_ipcp)->s_fd,
+        if (bind(fd,
                  (struct sockaddr *) &shim_data(_ipcp)->s_saddr,
                  sizeof shim_data(_ipcp)->s_saddr ) < 0) {
                 LOG_ERR("Couldn't bind to %s.", ipstr);
                 return -1;
         }
 
-        FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+        rw_lock_wrlock(&_ap_instance->thread_lock);
 
         pthread_create(&_ap_instance->handler,
                        NULL,
@@ -449,8 +629,25 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
                        ipcp_udp_sdu_reader,
                        NULL);
 
+        pthread_create(&_ap_instance->sduloop,
+                       NULL,
+                       ipcp_udp_sdu_loop,
+                       NULL);
+
+        rw_lock_unlock(&_ap_instance->thread_lock);
+
+        rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
+        FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+
+        rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
+        rw_lock_wrlock(&_ipcp->state_lock);
+
         _ipcp->state = IPCP_ENROLLED;
 
+        rw_lock_unlock(&_ipcp->state_lock);
+
         LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.",
                 getpid());
 
@@ -595,16 +792,21 @@ static int ipcp_udp_name_reg(char * name)
         uint32_t ip_addr;
 #endif
 
-        if (_ipcp->state != IPCP_ENROLLED) {
-                LOG_DBGF("Won't register with non-enrolled IPCP.");
-                return -1;
-        }
-
         if (strlen(name) > 24) {
                 LOG_ERR("DNS names cannot be longer than 24 chars.");
                 return -1;
         }
 
+        rw_lock_rdlock(&_ipcp->state_lock);
+
+        if (_ipcp->state != IPCP_ENROLLED) {
+                rw_lock_unlock(&_ipcp->state_lock);
+                LOG_DBGF("Won't register with non-enrolled IPCP.");
+                return -1; /* -ENOTENROLLED */
+        }
+
+        rw_lock_unlock(&_ipcp->state_lock);
+
         if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
                 LOG_ERR("Failed to add %s to local registry.", name);
                 return -1;
@@ -659,6 +861,16 @@ static int ipcp_udp_name_unreg(char * name)
 #ifdef CONFIG_OUROBOROS_ENABLE_DNS
         /* unregister application with DNS server */
 
+        rw_lock_rdlock(&_ipcp->state_lock);
+
+        if (_ipcp->state != IPCP_ENROLLED) {
+                rw_lock_unlock(&_ipcp->state_lock);
+                LOG_DBGF("IPCP is not enrolled");
+                return -1; /* -ENOTENROLLED */
+        }
+
+        rw_lock_unlock(&_ipcp->state_lock);
+
         dns_addr = shim_data(_ipcp)->dns_addr;
         if (dns_addr != 0) {
                 if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN)
@@ -697,10 +909,21 @@ static int ipcp_udp_flow_alloc(int               port_id,
 #ifdef CONFIG_OUROBOROS_ENABLE_DNS
         uint32_t           dns_addr = 0;
 #endif
+        struct shm_ap_rbuff * rb;
 
         if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)
                 return -1;
 
+        rw_lock_rdlock(&_ipcp->state_lock);
+
+        if (_ipcp->state != IPCP_ENROLLED) {
+                rw_lock_unlock(&_ipcp->state_lock);
+                LOG_DBGF("Won't allocate flow with non-enrolled IPCP.");
+                return -1; /* -ENOTENROLLED */
+        }
+
+        rw_lock_unlock(&_ipcp->state_lock);
+
         if (strlen(dst_name) > 255
             || strlen(src_ap_name) > 255
             || strlen(src_ae_name) > 255) {
@@ -726,6 +949,7 @@ static int ipcp_udp_flow_alloc(int               port_id,
 
 #ifdef CONFIG_OUROBOROS_ENABLE_DNS
         dns_addr = shim_data(_ipcp)->dns_addr;
+
         if (dns_addr != 0) {
                 ip_addr = ddns_resolve(dst_name, dns_addr);
                 if (ip_addr == 0) {
@@ -789,35 +1013,45 @@ static int ipcp_udp_flow_alloc(int               port_id,
 
         free(recv_buf);
 
-        _ap_instance->flows[fd].port_id = port_id;
-        _ap_instance->flows[fd].state   = FLOW_ALLOCATED;
-        _ap_instance->flows[fd].rb      = shm_ap_rbuff_open(n_pid);
-        if (_ap_instance->flows[fd].rb == NULL) {
+        rb = shm_ap_rbuff_open(n_pid);
+        if (rb == NULL) {
                 LOG_ERR("Could not open N + 1 ringbuffer.");
                 close(fd);
-                return -1;
+                return -1; /* -ENORBUFF */
         }
+        rw_lock_wrlock(&_ap_instance->flows_lock);
+
+        _ap_instance->flows[fd].port_id = port_id;
+        _ap_instance->flows[fd].state   = FLOW_ALLOCATED;
+        _ap_instance->flows[fd].rb      = rb;
+
+        rw_lock_unlock(&_ap_instance->flows_lock);
 
         /* tell IRMd that flow allocation "worked" */
 
         if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {
+                shm_ap_rbuff_close(rb);
                 LOG_ERR("Failed to notify IRMd about flow allocation reply");
                 close(fd);
-                shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
                 return -1;
         }
 
+        rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
         FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
 
+        rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
         LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd);
 
         return fd;
 }
 
 static int ipcp_udp_flow_alloc_resp(int   port_id,
-                             pid_t n_pid,
-                             int   response)
+                                    pid_t n_pid,
+                                    int   response)
 {
+        struct shm_ap_rbuff * rb;
         int fd = port_id_to_fd(port_id);
         if (fd < 0) {
                 LOG_DBGF("Could not find flow with port_id %d.", port_id);
@@ -829,22 +1063,44 @@ static int ipcp_udp_flow_alloc_resp(int   port_id,
 
         /* awaken pending flow */
 
+        rw_lock_rdlock(&_ap_instance->flows_lock);
+
         if (_ap_instance->flows[fd].state != FLOW_PENDING) {
+                rw_lock_unlock(&_ap_instance->flows_lock);
+
                 LOG_DBGF("Flow was not pending.");
                 return -1;
         }
 
-        _ap_instance->flows[fd].state = FLOW_ALLOCATED;
-        _ap_instance->flows[fd].rb    = shm_ap_rbuff_open(n_pid);
-        if (_ap_instance->flows[fd].rb == NULL) {
+        rw_lock_unlock(&_ap_instance->flows_lock);
+
+        rb = shm_ap_rbuff_open(n_pid);
+        if (rb == NULL) {
                 LOG_ERR("Could not open N + 1 ringbuffer.");
+
+                rw_lock_wrlock(&_ap_instance->flows_lock);
+
                 _ap_instance->flows[fd].state   = FLOW_NULL;
                 _ap_instance->flows[fd].port_id = 0;
+
+                rw_lock_unlock(&_ap_instance->flows_lock);
+
                 return 0;
         }
 
+        rw_lock_wrlock(&_ap_instance->flows_lock);
+
+        _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+        _ap_instance->flows[fd].rb    = rb;
+
+        rw_lock_unlock(&_ap_instance->flows_lock);
+
+        rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
         FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
 
+        rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
         LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd);
 
         return 0;
@@ -853,18 +1109,33 @@ static int ipcp_udp_flow_alloc_resp(int   port_id,
 static int ipcp_udp_flow_dealloc(int port_id)
 {
         int fd = port_id_to_fd(port_id);
+        struct shm_ap_rbuff * rb;
+
         if (fd < 0) {
                 LOG_DBGF("Could not find flow with port_id %d.", port_id);
                 return 0;
         }
 
+        rw_lock_wrlock(&_ap_instance->flows_lock);
+
         _ap_instance->flows[fd].state   = FLOW_NULL;
         _ap_instance->flows[fd].port_id = 0;
-        if (_ap_instance->flows[fd].rb != NULL)
-                shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
+        rb = _ap_instance->flows[fd].rb;
+        _ap_instance->flows[fd].rb      = NULL;
+
+        rw_lock_unlock(&_ap_instance->flows_lock);
+
+        if (rb != NULL)
+                shm_ap_rbuff_close(rb);
+
+        rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
 
         FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+
+        rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
         close(fd);
+
         return 0;
 }
 
@@ -877,7 +1148,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name)
         if (shim_ap_init(ap_name) < 0)
                 return NULL;
 
-        i = malloc(sizeof *i);
+        i = ipcp_instance_create();
         if (i == NULL)
                 return NULL;
 
@@ -914,45 +1185,17 @@ static struct ipcp * ipcp_udp_create(char * ap_name)
 
 #ifndef MAKE_CHECK
 
-/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
-/* FIXME: stop eating the CPU */
-static void * ipcp_udp_sdu_loop(void * o)
-{
-        while (true) {
-                struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb);
-                int fd;
-                int len = 0;
-                char * buf;
-                if (e == NULL)
-                        continue;
-
-                len = shm_du_map_read_sdu((uint8_t **) &buf,
-                                          _ap_instance->dum,
-                                          e->index);
-                if (len == -1)
-                        continue;
-
-                fd = port_id_to_fd(e->port_id);
-
-                if (fd == -1)
-                        continue;
-
-                if (len == 0)
-                        continue;
-
-                send(fd, buf, len, 0);
-
-                shm_release_du_buff(_ap_instance->dum, e->index);
-        }
-
-        return (void *) 1;
-}
-
 int main (int argc, char * argv[])
 {
         /* argument 1: pid of irmd ? */
         /* argument 2: ap name */
         struct sigaction sig_act;
+        sigset_t  sigset;
+        sigemptyset(&sigset);
+        sigaddset(&sigset, SIGINT);
+        sigaddset(&sigset, SIGQUIT);
+        sigaddset(&sigset, SIGHUP);
+        sigaddset(&sigset, SIGPIPE);
 
         if (ipcp_arg_check(argc, argv)) {
                 LOG_ERR("Wrong arguments.");
@@ -980,13 +1223,19 @@ int main (int argc, char * argv[])
                 exit(1);
         }
 
+        pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
         pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp);
-        pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL);
 
-        pthread_join(_ap_instance->sduloop, NULL);
+        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+
         pthread_join(_ap_instance->mainloop, NULL);
-        pthread_join(_ap_instance->handler, NULL);
-        pthread_join(_ap_instance->sdu_reader, NULL);
+
+        shim_ap_fini();
+
+        free(_ipcp->data);
+        free(_ipcp->ops);
+        free(_ipcp);
 
         exit(0);
 }
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 6c977cbb..6c04ccc5 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -245,18 +245,17 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
         if (rb == NULL)
                 return NULL;
 
-        e = malloc(sizeof(*e));
-        if (e == NULL)
-                return NULL;
-
         pthread_mutex_lock(rb->shm_mutex);
 
         if (shm_rbuff_used(rb) == 0) {
                 pthread_mutex_unlock(rb->shm_mutex);
-                free(e);
                 return NULL;
         }
 
+        e = malloc(sizeof(*e));
+        if (e == NULL)
+                return NULL;
+
         *e = *(rb->shm_base + *rb->ptr_tail);
 
         *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
-- 
cgit v1.2.3


From fcfe96e834a10350cebad094922ad1f2f8a05ccb Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@intec.ugent.be>
Date: Sat, 14 May 2016 21:39:18 +0200
Subject: ipcpd: shim-udp: log error when failed sending SDU

---
 src/ipcpd/shim-udp/main.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 3296540e..4da27655 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -528,7 +528,8 @@ static void * ipcp_udp_sdu_loop(void * o)
 
                 rw_lock_unlock(&_ap_instance->data_lock);
 
-                send(fd, buf, len, 0);
+                if (send(fd, buf, len, 0) < 0)
+                        LOG_ERR("Failed to send SDU.");
 
                 rw_lock_rdlock(&_ap_instance->data_lock);
 
-- 
cgit v1.2.3