From 483692f03a7b7b6a85b136a50723366b1c1af1c8 Mon Sep 17 00:00:00 2001
From: Sander Vrijders <sander.vrijders@ugent.be>
Date: Tue, 4 Apr 2017 15:47:45 +0200
Subject: ipcpd: shim-eth-llc: Remove pthread_cancel

This removes cancellation calls from the shim Ethernet. The different
threads now check if the IPCP is still operational or not.
---
 src/ipcpd/shim-eth-llc/main.c | 195 +++++++++++++++++++++++++-----------------
 1 file changed, 117 insertions(+), 78 deletions(-)

(limited to 'src')

diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 997d22a1..388bddd1 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -33,6 +33,7 @@
 #include <ouroboros/ipcp-dev.h>
 #include <ouroboros/fqueue.h>
 #include <ouroboros/logs.h>
+#include <ouroboros/time_utils.h>
 
 #include "ipcp.h"
 #include "shim_eth_llc_messages.pb-c.h"
@@ -77,6 +78,7 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t;
 
 #define EVENT_WAIT_TIMEOUT 100 /* us */
 #define NAME_QUERY_TIMEOUT 100000000 /* ns */
+#define MGMT_TIMEOUT 100 /* ms */
 
 struct eth_llc_frame {
         uint8_t dst_hwaddr[MAC_SIZE];
@@ -113,13 +115,13 @@ struct {
         pthread_t          sdu_reader;
 
         /* Handle mgmt frames in a different thread */
-        pthread_t          mgmt_frame_handler;
-        pthread_mutex_t    mgmt_frame_lock;
-        pthread_cond_t     mgmt_frame_cond;
-        uint8_t            mgmt_frame_r_addr[MAC_SIZE];
-        uint8_t            mgmt_frame_buf[ETH_FRAME_SIZE];
-        size_t             mgmt_frame_len;
-        bool               mgmt_frame_arrived;
+        pthread_t          mgmt_handler;
+        pthread_mutex_t    mgmt_lock;
+        pthread_cond_t     mgmt_cond;
+        uint8_t            mgmt_r_addr[MAC_SIZE];
+        uint8_t            mgmt_buf[ETH_FRAME_SIZE];
+        size_t             mgmt_len;
+        bool               mgmt_arrived;
 } eth_llc_data;
 
 static int eth_llc_data_init(void)
@@ -167,16 +169,16 @@ static int eth_llc_data_init(void)
         if (pthread_rwlock_init(&eth_llc_data.flows_lock, NULL))
                 goto fqueue_destroy;
 
-        if (pthread_mutex_init(&eth_llc_data.mgmt_frame_lock, NULL))
+        if (pthread_mutex_init(&eth_llc_data.mgmt_lock, NULL))
                 goto flow_lock_destroy;
 
-        if (pthread_cond_init(&eth_llc_data.mgmt_frame_cond, NULL))
-                goto mgmt_frame_lock_destroy;
+        if (pthread_cond_init(&eth_llc_data.mgmt_cond, NULL))
+                goto mgmt_lock_destroy;
 
         return 0;
 
- mgmt_frame_lock_destroy:
-        pthread_mutex_destroy(&eth_llc_data.mgmt_frame_lock);
+ mgmt_lock_destroy:
+        pthread_mutex_destroy(&eth_llc_data.mgmt_lock);
  flow_lock_destroy:
         pthread_rwlock_destroy(&eth_llc_data.flows_lock);
  fqueue_destroy:
@@ -195,8 +197,8 @@ static int eth_llc_data_init(void)
 
 void eth_llc_data_fini(void)
 {
-        pthread_cond_destroy(&eth_llc_data.mgmt_frame_cond);
-        pthread_mutex_destroy(&eth_llc_data.mgmt_frame_lock);
+        pthread_cond_destroy(&eth_llc_data.mgmt_cond);
+        pthread_mutex_destroy(&eth_llc_data.mgmt_lock);
         pthread_rwlock_destroy(&eth_llc_data.flows_lock);
         fqueue_destroy(eth_llc_data.fq);
         flow_set_destroy(eth_llc_data.np1_flows);
@@ -220,10 +222,10 @@ static int eth_llc_ipcp_send_frame(uint8_t * dst_addr,
                                    uint8_t * payload,
                                    size_t    len)
 {
-        uint32_t frame_len = 0;
-        uint8_t cf = 0x03;
-        uint16_t length;
-        uint8_t frame[SHIM_ETH_LLC_MAX_SDU_SIZE];
+        uint32_t               frame_len = 0;
+        uint8_t                cf = 0x03;
+        uint16_t               length;
+        uint8_t                frame[SHIM_ETH_LLC_MAX_SDU_SIZE];
         struct eth_llc_frame * llc_frame;
 
         if (payload == NULL) {
@@ -414,7 +416,7 @@ static int eth_llc_ipcp_name_query_req(char *    name,
 static int eth_llc_ipcp_name_query_reply(char *    name,
                                          uint8_t * r_addr)
 {
-        uint64_t address = 0;
+        uint64_t           address = 0;
         struct list_head * pos;
 
         memcpy(&address, r_addr, MAC_SIZE);
@@ -438,7 +440,9 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf,
                                    size_t    len,
                                    uint8_t * r_addr)
 {
-        shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf);
+        shim_eth_llc_msg_t * msg;
+
+        msg = shim_eth_llc_msg__unpack(NULL, len, buf);
         if (msg == NULL) {
                 log_err("Failed to unpack.");
                 return -1;
@@ -475,25 +479,43 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf,
         return 0;
 }
 
-static void * eth_llc_ipcp_mgmt_frame_handler(void * o)
+static void * eth_llc_ipcp_mgmt_handler(void * o)
 {
+        int             ret;
+        struct timespec timeout = {(MGMT_TIMEOUT / 1000),
+                                   (MGMT_TIMEOUT % 1000) * MILLION};
+
         (void) o;
 
-         while (true) {
-                pthread_mutex_lock(&eth_llc_data.mgmt_frame_lock);
+        while (true) {
+                ret = 0;
+
+                pthread_rwlock_rdlock(&ipcpi.state_lock);
+                if (ipcp_get_state() != IPCP_OPERATIONAL) {
+                        pthread_rwlock_unlock(&ipcpi.state_lock);
+                        return (void *) 0;
+                }
+
+                pthread_mutex_lock(&eth_llc_data.mgmt_lock);
 
-                pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
-                                     (void *) &eth_llc_data.mgmt_frame_lock);
+                while (eth_llc_data.mgmt_arrived == false &&
+                       ret != -ETIMEDOUT)
+                        ret = -pthread_cond_timedwait(&eth_llc_data.mgmt_cond,
+                                                      &eth_llc_data.mgmt_lock,
+                                                      &timeout);
 
-                while (eth_llc_data.mgmt_frame_arrived == false)
-                        pthread_cond_wait(&eth_llc_data.mgmt_frame_cond,
-                                          &eth_llc_data.mgmt_frame_lock);
+                if (ret == -ETIMEDOUT) {
+                        pthread_mutex_unlock(&eth_llc_data.mgmt_lock);
+                        pthread_rwlock_unlock(&ipcpi.state_lock);
+                        continue;
+                }
 
-                eth_llc_ipcp_mgmt_frame(eth_llc_data.mgmt_frame_buf,
-                                        eth_llc_data.mgmt_frame_len,
-                                        eth_llc_data.mgmt_frame_r_addr);
-                eth_llc_data.mgmt_frame_arrived = false;
-                pthread_cleanup_pop(true);
+                eth_llc_ipcp_mgmt_frame(eth_llc_data.mgmt_buf,
+                                        eth_llc_data.mgmt_len,
+                                        eth_llc_data.mgmt_r_addr);
+                eth_llc_data.mgmt_arrived = false;
+                pthread_mutex_unlock(&eth_llc_data.mgmt_lock);
+                pthread_rwlock_unlock(&ipcpi.state_lock);
         }
 }
 
@@ -515,9 +537,13 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
         while (true) {
                 frame_len = recv(eth_llc_data.s_fd, buf,
                                  SHIM_ETH_LLC_MAX_SDU_SIZE, 0);
-                if (frame_len < 0) {
-                        log_err("Failed to receive frame.");
+                if (frame_len < 0)
                         continue;
+
+                pthread_rwlock_rdlock(&ipcpi.state_lock);
+                if (ipcp_get_state() != IPCP_OPERATIONAL) {
+                        pthread_rwlock_unlock(&ipcpi.state_lock);
+                        return (void *) 0;
                 }
 
                 llc_frame = (struct eth_llc_frame *) buf;
@@ -530,14 +556,17 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
                            llc_frame->dst_hwaddr,
                            MAC_SIZE) &&
                            memcmp(br_addr, llc_frame->dst_hwaddr, MAC_SIZE)) {
+                        pthread_rwlock_unlock(&ipcpi.state_lock);
                         continue;
                 }
 
                 memcpy(&length, &llc_frame->length, sizeof(length));
                 length = ntohs(length);
 
-                if (length > 0x05FF)  /* DIX */
+                if (length > 0x05FF) {  /* DIX */
+                        pthread_rwlock_unlock(&ipcpi.state_lock);
                         continue;
+                }
 
                 length -= LLC_HEADER_SIZE;
 
@@ -545,23 +574,24 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
                 ssap = reverse_bits(llc_frame->ssap);
 
                 if (ssap == MGMT_SAP && dsap == MGMT_SAP) {
-                        pthread_mutex_lock(&eth_llc_data.mgmt_frame_lock);
-                        memcpy(eth_llc_data.mgmt_frame_buf,
+                        pthread_mutex_lock(&eth_llc_data.mgmt_lock);
+                        memcpy(eth_llc_data.mgmt_buf,
                                &llc_frame->payload,
                                length);
-                        memcpy(eth_llc_data.mgmt_frame_r_addr,
+                        memcpy(eth_llc_data.mgmt_r_addr,
                                llc_frame->src_hwaddr,
                                MAC_SIZE);
-                        eth_llc_data.mgmt_frame_len = length;
-                        eth_llc_data.mgmt_frame_arrived = true;
-                        pthread_cond_signal(&eth_llc_data.mgmt_frame_cond);
-                        pthread_mutex_unlock(&eth_llc_data.mgmt_frame_lock);
+                        eth_llc_data.mgmt_len = length;
+                        eth_llc_data.mgmt_arrived = true;
+                        pthread_cond_signal(&eth_llc_data.mgmt_cond);
+                        pthread_mutex_unlock(&eth_llc_data.mgmt_lock);
                 } else {
                         pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
 
                         fd = eth_llc_data.ef_to_fd[dsap];
                         if (fd < 0) {
                                 pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+                                pthread_rwlock_unlock(&ipcpi.state_lock);
                                 continue;
                         }
 
@@ -569,6 +599,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
                             || memcmp(eth_llc_data.fd_to_ef[fd].r_addr,
                                       llc_frame->src_hwaddr, MAC_SIZE)) {
                                 pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+                                pthread_rwlock_unlock(&ipcpi.state_lock);
                                 continue;
                         }
 
@@ -576,6 +607,8 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
 
                         flow_write(fd, &llc_frame->payload, length);
                 }
+
+                pthread_rwlock_unlock(&ipcpi.state_lock);
         }
 
         return (void *) 0;
@@ -583,49 +616,47 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
 
 static void * eth_llc_ipcp_sdu_writer(void * o)
 {
-        int fd;
+        int                  fd;
         struct shm_du_buff * sdb;
-        uint8_t ssap;
-        uint8_t dsap;
-        uint8_t r_addr[MAC_SIZE];
-        struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
+        uint8_t              ssap;
+        uint8_t              dsap;
+        uint8_t              r_addr[MAC_SIZE];
+        struct timespec      timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
 
         (void) o;
 
         while (flow_event_wait(eth_llc_data.np1_flows,
                                eth_llc_data.fq,
                                &timeout)) {
+
+                pthread_rwlock_rdlock(&ipcpi.state_lock);
+
+                if (ipcp_get_state() != IPCP_OPERATIONAL) {
+                        pthread_rwlock_unlock(&ipcpi.state_lock);
+                        return (void *) 0;
+                }
+
+                pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
                 while ((fd = fqueue_next(eth_llc_data.fq)) >= 0) {
                         if (ipcp_flow_read(fd, &sdb)) {
                                 log_err("Bad read from fd %d.", fd);
                                 continue;
                         }
 
-                        pthread_rwlock_rdlock(&ipcpi.state_lock);
-
-                        if (ipcp_get_state() != IPCP_OPERATIONAL) {
-                                pthread_rwlock_unlock(&ipcpi.state_lock);
-                                ipcp_flow_del(sdb);
-                                return (void *) -1; /* -ENOTENROLLED */
-                        }
-
-                        pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
-
                         ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap);
                         dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap);
                         memcpy(r_addr,
                                eth_llc_data.fd_to_ef[fd].r_addr,
                                MAC_SIZE);
 
-                        pthread_rwlock_unlock(&eth_llc_data.flows_lock);
-                        pthread_rwlock_unlock(&ipcpi.state_lock);
-
                         eth_llc_ipcp_send_frame(r_addr, dsap, ssap,
                                                 shm_du_buff_head(sdb),
                                                 shm_du_buff_tail(sdb)
                                                 - shm_du_buff_head(sdb));
                         ipcp_flow_del(sdb);
                 }
+                pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+                pthread_rwlock_unlock(&ipcpi.state_lock);
         }
 
         return (void *) 1;
@@ -659,16 +690,17 @@ void ipcp_sig_handler(int         sig,
 
 static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
 {
-        int skfd = -1;
-        struct ifreq ifr;
-        int idx;
+        int                skfd = -1;
+        struct ifreq       ifr;
+        int                idx;
 #ifdef __FreeBSD__
-        struct ifaddrs * ifaddr;
-        struct ifaddrs * ifa;
+        struct ifaddrs *   ifaddr;
+        struct ifaddrs *   ifa;
         struct sockaddr_dl device;
 #else
         struct sockaddr_ll device;
 #endif
+        struct timeval tv = {0, EVENT_WAIT_TIMEOUT * 1000};
 
         assert(conf);
         assert(conf->type == THIS_TYPE);
@@ -753,11 +785,19 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
                 return -1;
         }
 
+        if (setsockopt(skfd, SOL_SOCKET, SO_RCVTIMEO,
+                       (void *) &tv, sizeof(tv))) {
+                log_err("Failed to set socket timeout");
+                close(skfd);
+                return -1;
+        }
+
         pthread_rwlock_wrlock(&ipcpi.state_lock);
 
         if (ipcp_get_state() != IPCP_INIT) {
                 pthread_rwlock_unlock(&ipcpi.state_lock);
                 log_err("IPCP in wrong state.");
+                close(skfd);
                 return -1;
         }
 
@@ -766,9 +806,9 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
 
         ipcp_set_state(IPCP_OPERATIONAL);
 
-        pthread_create(&eth_llc_data.mgmt_frame_handler,
+        pthread_create(&eth_llc_data.mgmt_handler,
                        NULL,
-                       eth_llc_ipcp_mgmt_frame_handler,
+                       eth_llc_ipcp_mgmt_handler,
                        NULL);
 
         pthread_create(&eth_llc_data.sdu_reader,
@@ -791,7 +831,9 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
 
 static int eth_llc_ipcp_name_reg(char * name)
 {
-        char * name_dup = strdup(name);
+        char * name_dup;
+
+        name_dup = strdup(name);
         if (name_dup == NULL) {
                 log_err("Failed to duplicate name.");
                 return -ENOMEM;
@@ -826,11 +868,11 @@ static int eth_llc_ipcp_name_unreg(char * name)
 
 static int eth_llc_ipcp_name_query(char * name)
 {
-        uint8_t r_addr[MAC_SIZE];
-        struct timespec timeout = {0, NAME_QUERY_TIMEOUT};
+        uint8_t            r_addr[MAC_SIZE];
+        struct timespec    timeout = {0, NAME_QUERY_TIMEOUT};
         shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT;
         struct dir_query * query;
-        int ret;
+        int                ret;
 
         if (shim_data_dir_has(ipcpi.shim_data, name))
                 return 0;
@@ -864,8 +906,8 @@ static int eth_llc_ipcp_flow_alloc(int       fd,
                                    char *    dst_name,
                                    qoscube_t cube)
 {
-        uint8_t ssap = 0;
-        uint8_t r_addr[MAC_SIZE];
+        uint8_t  ssap = 0;
+        uint8_t  r_addr[MAC_SIZE];
         uint64_t addr = 0;
 
         log_dbg("Allocating flow to %s.", dst_name);
@@ -1079,12 +1121,9 @@ int main(int    argc,
         ipcp_shutdown();
 
         if (ipcp_get_state() == IPCP_SHUTDOWN) {
-                pthread_cancel(eth_llc_data.sdu_reader);
-                pthread_cancel(eth_llc_data.sdu_writer);
-                pthread_cancel(eth_llc_data.mgmt_frame_handler);
                 pthread_join(eth_llc_data.sdu_writer, NULL);
                 pthread_join(eth_llc_data.sdu_reader, NULL);
-                pthread_join(eth_llc_data.mgmt_frame_handler, NULL);
+                pthread_join(eth_llc_data.mgmt_handler, NULL);
         }
 
         eth_llc_data_fini();
-- 
cgit v1.2.3