diff options
Diffstat (limited to 'src/ipcpd/udp')
| -rw-r--r-- | src/ipcpd/udp/CMakeLists.txt | 9 | ||||
| -rw-r--r-- | src/ipcpd/udp/main.c | 787 | 
2 files changed, 374 insertions, 422 deletions
| diff --git a/src/ipcpd/udp/CMakeLists.txt b/src/ipcpd/udp/CMakeLists.txt index b21afe75..f1a29ef6 100644 --- a/src/ipcpd/udp/CMakeLists.txt +++ b/src/ipcpd/udp/CMakeLists.txt @@ -16,9 +16,11 @@ set(IPCP_UDP_TARGET ipcpd-udp CACHE INTERNAL "")  set(UDP_SOURCES    # Add source files here -  ${CMAKE_CURRENT_SOURCE_DIR}/main.c) +  ${CMAKE_CURRENT_SOURCE_DIR}/main.c +  )  add_executable(ipcpd-udp ${UDP_SOURCES} ${IPCP_SOURCES}) +  target_link_libraries(ipcpd-udp LINK_PUBLIC ouroboros-dev)  # Find the nsupdate executable @@ -52,6 +54,11 @@ else ()    endif ()  endif () +set(IPCP_UDP_RD_THR 3 CACHE STRING +  "Number of reader threads in UDP IPCP") +set(IPCP_UDP_WR_THR 3 CACHE STRING +  "Number of writer threads in UDP IPCP") +  include(AddCompileFlags)  if (CMAKE_BUILD_TYPE MATCHES "Debug*")    add_compile_flags(ipcpd-udp -DCONFIG_OUROBOROS_DEBUG) diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index f45a18cb..559be55a 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -30,6 +30,7 @@  #define OUROBOROS_PREFIX "ipcpd/udp" +#include <ouroboros/bitmap.h>  #include <ouroboros/hash.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h> @@ -58,34 +59,50 @@  #define FLOW_REPLY               2  #define THIS_TYPE                IPCP_UDP -#define LISTEN_PORT              htons(0x0D1F) -#define SHIM_UDP_BUF_SIZE        256 -#define SHIM_UDP_MSG_SIZE        256 -#define SHIM_UDP_MAX_PACKET_SIZE 8980 +#define IPCP_UDP_MAX_PACKET_SIZE 8980 +#define OUR_HEADER_LEN           sizeof(uint32_t) /* adds eid */ + +#define IPCP_UDP_BUF_SIZE        256 +#define IPCP_UDP_MSG_SIZE        256  #define DNS_TTL                  86400  #define FD_UPDATE_TIMEOUT        100 /* microseconds */ -#define local_ip                 (udp_data.s_saddr.sin_addr.s_addr) +#define SERV_PORT                udp_data.s_saddr.sin_port; +#define SERV_SADDR               ((struct sockaddr *) &udp_data.s_saddr) +#define CLNT_SADDR               ((struct sockaddr *) &udp_data.c_saddr) +#define SERV_SADDR_SIZE          (sizeof(udp_data.s_saddr)) +#define LOCAL_IP                 (udp_data.s_saddr.sin_addr.s_addr) -#define UDP_MAX_PORTS            0xFFFF +#define MGMT_EID                 0 +#define MGMT_FRAME_SIZE          512 +/* Keep order for alignment. */  struct mgmt_msg { -        uint16_t src_udp_port; -        uint16_t dst_udp_port; +        uint32_t eid; +        uint32_t s_eid; +        uint32_t d_eid;          uint8_t  code; -        uint8_t  response; -        /* QoS parameters from spec, aligned */ +        int8_t   response; +        /* QoS parameters from spec */          uint8_t  availability;          uint8_t  in_order; -        uint32_t delay;          uint64_t bandwidth; +        uint32_t delay;          uint32_t loss;          uint32_t ber;          uint32_t max_gap;  } __attribute__((packed)); +struct mgmt_frame { +        struct list_head   next; +        struct sockaddr_in r_saddr; +        uint8_t            buf[MGMT_FRAME_SIZE]; +}; + +/* UDP flow */  struct uf { -        int udp; +        int d_eid; +        /* IP details are stored through connect(). */          int skfd;  }; @@ -94,25 +111,25 @@ struct {          uint32_t           ip_addr;          uint32_t           dns_addr; -        /* listen server */ +        /* server socket */          struct sockaddr_in s_saddr;          int                s_fd; +        /* client port */ +        int                clt_port;          fset_t *           np1_flows;          fqueue_t *         fq; -        fd_set             flow_fd_s; -        /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ -        int                uf_to_fd[FD_SETSIZE];          struct uf          fd_to_uf[SYS_MAX_FLOWS];          pthread_rwlock_t   flows_lock; -        pthread_t          packet_loop; -        pthread_t          handler; -        pthread_t          packet_reader; +        pthread_t          packet_writer[IPCP_UDP_WR_THR]; +        pthread_t          packet_reader[IPCP_UDP_RD_THR]; -        bool               fd_set_mod; -        pthread_cond_t     fd_set_cond; -        pthread_mutex_t    fd_set_lock; +        /* Handle mgmt frames in a different thread */ +        pthread_t          mgmt_handler; +        pthread_mutex_t    mgmt_lock; +        pthread_cond_t     mgmt_cond; +        struct list_head   mgmt_frames;  } udp_data;  static int udp_data_init(void) @@ -120,24 +137,19 @@ static int udp_data_init(void)          int i;          if (pthread_rwlock_init(&udp_data.flows_lock, NULL)) -                return -1; - -        if (pthread_cond_init(&udp_data.fd_set_cond, NULL)) -                goto fail_set_cond; +                goto fail_rwlock_init; -        if (pthread_mutex_init(&udp_data.fd_set_lock, NULL)) -                goto fail_set_lock; +        if (pthread_cond_init(&udp_data.mgmt_cond, NULL)) +                goto fail_mgmt_cond; -        for (i = 0; i < FD_SETSIZE; ++i) -                udp_data.uf_to_fd[i] = -1; +        if (pthread_mutex_init(&udp_data.mgmt_lock, NULL)) +                goto fail_mgmt_lock;          for (i = 0; i < SYS_MAX_FLOWS; ++i) { -                udp_data.fd_to_uf[i].skfd = -1; -                udp_data.fd_to_uf[i].udp = -1; +                udp_data.fd_to_uf[i].skfd  = -1; +                udp_data.fd_to_uf[i].d_eid = -1;          } -        FD_ZERO(&udp_data.flow_fd_s); -          udp_data.np1_flows = fset_create();          if (udp_data.np1_flows == NULL)                  goto fail_fset; @@ -150,88 +162,43 @@ static int udp_data_init(void)          if (udp_data.shim_data == NULL)                  goto fail_data; +        list_head_init(&udp_data.mgmt_frames); +          return 0;   fail_data:          fqueue_destroy(udp_data.fq);   fail_fqueue:          fset_destroy(udp_data.np1_flows);   fail_fset: -        pthread_mutex_destroy(&udp_data.fd_set_lock); - fail_set_lock: -        pthread_cond_destroy(&udp_data.fd_set_cond); - fail_set_cond: +        pthread_mutex_destroy(&udp_data.mgmt_lock); + fail_mgmt_lock: +        pthread_cond_destroy(&udp_data.mgmt_cond); + fail_mgmt_cond:          pthread_rwlock_destroy(&udp_data.flows_lock); + fail_rwlock_init:          return -1;  }  static void udp_data_fini(void)  { -        fset_destroy(udp_data.np1_flows); -        fqueue_destroy(udp_data.fq); -          shim_data_destroy(udp_data.shim_data); -        pthread_rwlock_destroy(&udp_data.flows_lock); -        pthread_mutex_destroy(&udp_data.fd_set_lock); -        pthread_cond_destroy(&udp_data.fd_set_cond); -} - -static void set_fd(int fd) -{ -        pthread_mutex_lock(&udp_data.fd_set_lock); - -        udp_data.fd_set_mod = true; -        FD_SET(fd, &udp_data.flow_fd_s); - -        while (udp_data.fd_set_mod) -                pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); - -        pthread_mutex_unlock(&udp_data.fd_set_lock); -} - -static void clr_fd(int fd) -{ -        pthread_mutex_lock(&udp_data.fd_set_lock); - -        udp_data.fd_set_mod = true; -        FD_CLR(fd, &udp_data.flow_fd_s); - -        while (udp_data.fd_set_mod) -                pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); - -        pthread_mutex_unlock(&udp_data.fd_set_lock); -} - -static int send_shim_udp_msg(uint8_t * buf, -                             size_t    len, -                             uint32_t  dst_ip_addr) -{ -       struct sockaddr_in r_saddr; - -       memset((char *)&r_saddr, 0, sizeof(r_saddr)); -       r_saddr.sin_family      = AF_INET; -       r_saddr.sin_addr.s_addr = dst_ip_addr; -       r_saddr.sin_port        = LISTEN_PORT; - -       if (sendto(udp_data.s_fd, buf, len, 0, -                  (struct sockaddr *) &r_saddr, -                  sizeof(r_saddr)) == -1) { -               log_err("Failed to send message."); -               return -1; -       } +        fqueue_destroy(udp_data.fq); +        fset_destroy(udp_data.np1_flows); -       return 0; +        pthread_rwlock_destroy(&udp_data.flows_lock); +        pthread_cond_destroy(&udp_data.mgmt_cond); +        pthread_mutex_destroy(&udp_data.mgmt_lock);  } -static int ipcp_udp_port_alloc(uint32_t        dst_ip_addr, -                               uint16_t        src_udp_port, +static int ipcp_udp_port_alloc(int             skfd, +                               uint32_t        s_eid,                                 const uint8_t * dst,                                 qosspec_t       qs)  {          uint8_t *         buf;          struct mgmt_msg * msg;          size_t            len; -        int               ret;          len = sizeof(*msg) + ipcp_dir_hash_len(); @@ -240,8 +207,9 @@ static int ipcp_udp_port_alloc(uint32_t        dst_ip_addr,                  return -1;          msg               = (struct mgmt_msg *) buf; +        msg->eid          = hton32(MGMT_EID);          msg->code         = FLOW_REQ; -        msg->src_udp_port = src_udp_port; +        msg->s_eid        = hton32(s_eid);          msg->delay        = hton32(qs.delay);          msg->bandwidth    = hton64(qs.bandwidth);          msg->availability = qs.availability; @@ -252,73 +220,63 @@ static int ipcp_udp_port_alloc(uint32_t        dst_ip_addr,          memcpy(msg + 1, dst, ipcp_dir_hash_len()); -        ret = send_shim_udp_msg(buf, len, dst_ip_addr); +        if (write(skfd, msg, len) < 0) { +                free(buf); +                return -1; +        }          free(buf); -        return ret; +        return 0;  } -static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr, -                                    uint16_t src_udp_port, -                                    uint16_t dst_udp_port, -                                    int      response) +static int ipcp_udp_port_alloc_resp(int      skfd, +                                    uint32_t s_eid, +                                    uint32_t d_eid, +                                    int8_t   response)  { -        struct mgmt_msg * msg; -        int               ret; +        struct mgmt_msg *  msg;          msg = malloc(sizeof(*msg));          if (msg == NULL)                  return -1; -        msg->code         = FLOW_REPLY; -        msg->src_udp_port = src_udp_port; -        msg->dst_udp_port = dst_udp_port; -        msg->response     = response; +        msg->eid      = hton32(MGMT_EID); +        msg->code     = FLOW_REPLY; +        msg->s_eid    = hton32(s_eid); +        msg->d_eid    = hton32(d_eid); +        msg->response = response; -        ret = send_shim_udp_msg((uint8_t *) msg, sizeof(*msg), dst_ip_addr); +        if (write(skfd, msg, sizeof(*msg)) < 0) { +                free(msg); +                return -1; +        }          free(msg); -        return ret; +        return 0;  }  static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, +                             int                  d_eid,                               const uint8_t *      dst,                               qosspec_t            qs)  { -        struct timespec    ts          = {0, FD_UPDATE_TIMEOUT * 1000}; -        struct timespec    abstime; -        struct sockaddr_in f_saddr; -        socklen_t          f_saddr_len = sizeof(f_saddr); -        int                skfd; -        int                fd; +        struct timespec ts        = {0, FD_UPDATE_TIMEOUT * 1000}; +        struct timespec abstime; +        int             skfd; +        int             fd; -        log_dbg("Port request arrived from UDP port %d", -                 ntohs(c_saddr->sin_port)); - -        if ((skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { +        skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +        if (skfd < 0) {                  log_err("Could not create UDP socket.");                  return -1;          } -        memset((char *) &f_saddr, 0, sizeof(f_saddr)); -        f_saddr.sin_family      = AF_INET; -        f_saddr.sin_addr.s_addr = local_ip; -        f_saddr.sin_port        = 0; - -        if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { -                log_err("Could not bind to socket."); -                close(skfd); -                return -1; -        } +        /* Remote listens on server port. Mod of c_saddr allowed. */ +        c_saddr->sin_port = udp_data.s_saddr.sin_port; -        if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { -                log_err("Could not get address from fd."); -                return -1; -        } - -        /* connect stores the remote address in the file descriptor */ +        /* Connect stores the remote address in the file descriptor. */          if (connect(skfd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) {                  log_err("Could not connect to remote UDP client.");                  close(skfd); @@ -331,8 +289,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,          while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {                  ts_add(&abstime, &ts, &abstime); -                pthread_cond_timedwait(&ipcpi.alloc_cond, -                                       &ipcpi.alloc_lock, +                pthread_cond_timedwait(&ipcpi.alloc_cond, &ipcpi.alloc_lock,                                         &abstime);          } @@ -354,9 +311,8 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,          pthread_rwlock_wrlock(&udp_data.flows_lock); -        udp_data.uf_to_fd[skfd]    = fd; -        udp_data.fd_to_uf[fd].skfd = skfd; -        udp_data.fd_to_uf[fd].udp  = f_saddr.sin_port; +        udp_data.fd_to_uf[fd].skfd  = skfd; +        udp_data.fd_to_uf[fd].d_eid = d_eid;          pthread_rwlock_unlock(&udp_data.flows_lock); @@ -365,224 +321,228 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,          pthread_mutex_unlock(&ipcpi.alloc_lock); -        log_dbg("Pending allocation request, fd %d, UDP port (%d, %d).", -                fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); +        log_dbg("Pending allocation request, fd %d, remote eid %d.", +                fd, d_eid);          return 0;  } -/* returns the n flow descriptor */ -static int udp_port_to_fd(int udp_port) +static int ipcp_udp_port_alloc_reply(uint32_t s_eid, +                                     uint32_t d_eid, +                                     int8_t   response)  { -        int i; - -        for (i = 0; i < SYS_MAX_FLOWS; ++i) -                if (udp_data.fd_to_uf[i].udp == udp_port) -                        return i; - -        return -1; -} - -static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port, -                                     uint16_t dst_udp_port, -                                     int      response) -{ -        int fd   = -1; -        int ret  =  0; -        int skfd = -1; -          struct sockaddr_in t_saddr; -        socklen_t          t_saddr_len = sizeof(t_saddr); +        socklen_t          t_saddr_len; +        int                ret         = 0; +        int                skfd        = -1; -        log_dbg("Received reply for flow on udp port %d.", -                ntohs(dst_udp_port)); +        t_saddr_len = sizeof(t_saddr); -        pthread_rwlock_rdlock(&udp_data.flows_lock); +        pthread_rwlock_wrlock(&udp_data.flows_lock); -        fd = udp_port_to_fd(dst_udp_port); -        if (fd < 0) { +        skfd = udp_data.fd_to_uf[s_eid].skfd; +        if (skfd < 0) {                  pthread_rwlock_unlock(&udp_data.flows_lock); +                log_err("Got reply for unknown UDP eid: %u.", s_eid);                  return -1;          } -        skfd = udp_data.fd_to_uf[fd].skfd; +        udp_data.fd_to_uf[s_eid].d_eid = d_eid;          pthread_rwlock_unlock(&udp_data.flows_lock); -        /* get the original address with the LISTEN PORT */          if (getpeername(skfd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { -                log_dbg("Flow with fd %d has no peer.", fd); +                log_dbg("Flow with fd %d has no peer.", s_eid); +                close(skfd);                  return -1;          } -        /* connect to the flow udp port */ -        t_saddr.sin_port = src_udp_port; -          if (connect(skfd, (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { +                log_dbg("Could not connect flow to remote.");                  close(skfd);                  return -1;          } -        pthread_rwlock_rdlock(&udp_data.flows_lock); - -        set_fd(skfd); - -        pthread_rwlock_unlock(&udp_data.flows_lock); - -        if (ipcp_flow_alloc_reply(fd, response) < 0) +        if (ipcp_flow_alloc_reply(s_eid, response) < 0) { +                log_dbg("Failed to reply to flow allocation.");                  return -1; +        } -        log_dbg("Flow allocation completed, UDP ports: (%d, %d).", -                 ntohs(dst_udp_port), ntohs(src_udp_port)); +        log_dbg("Flow allocation completed on eids (%d, %d).", +                 s_eid, d_eid);          return ret;  } -static void * ipcp_udp_listener(void * o) +static int ipcp_udp_mgmt_frame(const uint8_t *    buf, +                               struct sockaddr_in c_saddr)  { -        uint8_t            buf[SHIM_UDP_MSG_SIZE]; -        ssize_t            n   = 0; -        struct sockaddr_in c_saddr; -        int                sfd = udp_data.s_fd; +        struct mgmt_msg * msg; +        qosspec_t         qs; + +        msg = (struct mgmt_msg *) buf; + +        switch (msg->code) { +        case FLOW_REQ: +                qs.delay        = ntoh32(msg->delay); +                qs.bandwidth    = ntoh64(msg->bandwidth); +                qs.availability = msg->availability; +                qs.loss         = ntoh32(msg->loss); +                qs.ber          = ntoh32(msg->ber); +                qs.in_order     = msg->in_order; +                qs.max_gap      = ntoh32(msg->max_gap); +                return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid), +                                         (uint8_t *) (msg + 1), qs); +        case FLOW_REPLY: +                return ipcp_udp_port_alloc_reply(ntoh32(msg->s_eid), +                                                 ntoh32(msg->d_eid), +                                                 msg->response); +        default: +                log_err("Unknown message received %d.", msg->code); +                return -1; +        } +} +static void * ipcp_udp_mgmt_handler(void * o) +{          (void) o; +        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, +                             (void *) &udp_data.mgmt_lock); +          while (true) { -                struct mgmt_msg * msg = NULL; -                qosspec_t         qs; -                memset(&buf, 0, SHIM_UDP_MSG_SIZE); -                n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, -                             (struct sockaddr *) &c_saddr, -                             (socklen_t *) sizeof(c_saddr)); -                if (n < 0) -                        continue; +                struct mgmt_frame * frame; -                /* flow alloc request from other host */ -                if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr, -                                  sizeof(c_saddr.sin_addr.s_addr), AF_INET) -                    == NULL) -                        continue; +                pthread_mutex_lock(&udp_data.mgmt_lock); -                msg = (struct mgmt_msg *) buf; - -                switch (msg->code) { -                case FLOW_REQ: -                        c_saddr.sin_port = msg->src_udp_port; -                        qs.delay = ntoh32(msg->delay); -                        qs.bandwidth = ntoh64(msg->bandwidth); -                        qs.availability = msg->availability; -                        qs.loss = ntoh32(msg->loss); -                        qs.ber = ntoh32(msg->ber); -                        qs.in_order = msg->in_order; -                        qs.max_gap = ntoh32(msg->max_gap); -                        ipcp_udp_port_req(&c_saddr, -                                          (uint8_t *) (msg + 1), -                                          qs); -                        break; -                case FLOW_REPLY: -                        ipcp_udp_port_alloc_reply(msg->src_udp_port, -                                                  msg->dst_udp_port, -                                                  msg->response); -                        break; -                default: -                        log_err("Unknown message received %d.", msg->code); -                        continue; -                } +                while (list_is_empty(&udp_data.mgmt_frames)) +                        pthread_cond_wait(&udp_data.mgmt_cond, +                                          &udp_data.mgmt_lock); + +                frame = list_first_entry((&udp_data.mgmt_frames), +                                         struct mgmt_frame, next); +                assert(frame != NULL); +                list_del(&frame->next); + +                pthread_mutex_unlock(&udp_data.mgmt_lock); -                c_saddr.sin_port = LISTEN_PORT; +                ipcp_udp_mgmt_frame(frame->buf, frame->r_saddr); + +                free(frame);          } -        return 0; +        pthread_cleanup_pop(false); + +        return (void *) 0;  }  static void * ipcp_udp_packet_reader(void * o)  { -        ssize_t            n; -        int                skfd; -        int                fd; -        /* FIXME: avoid this copy */ -        char               buf[SHIM_UDP_MAX_PACKET_SIZE]; -        struct sockaddr_in r_saddr; -        struct timeval     tv = {0, FD_UPDATE_TIMEOUT}; -        fd_set             read_fds; -        int                flags; +        uint8_t   buf[IPCP_UDP_MAX_PACKET_SIZE]; +        uint8_t * data; +        ssize_t   n; +        uint32_t  eid;          (void) o; -        ipcp_lock_to_core(); +        data = buf + sizeof(uint32_t);          while (true) { -                pthread_rwlock_rdlock(&udp_data.flows_lock); -                pthread_mutex_lock(&udp_data.fd_set_lock); +                struct mgmt_frame * frame; +                struct sockaddr_in  r_saddr; +                socklen_t           len; -                read_fds = udp_data.flow_fd_s; -                udp_data.fd_set_mod = false; -                pthread_cond_broadcast(&udp_data.fd_set_cond); +                len = sizeof(r_saddr); -                pthread_mutex_unlock(&udp_data.fd_set_lock); -                pthread_rwlock_unlock(&udp_data.flows_lock); +                n = recvfrom(udp_data.s_fd, buf, IPCP_UDP_MAX_PACKET_SIZE, 0, +                             (struct sockaddr *) &r_saddr, &len); +                if (n < 0) +                        continue; -                if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) +                if (n == 0) +                        log_dbg("Got a 0 frame."); + +                if ((size_t) n < sizeof(eid)) { +                        log_dbg("Dropped bad frame.");                          continue; +                } -                for (skfd = 0; skfd < FD_SETSIZE; ++skfd) { -                        if (!FD_ISSET(skfd, &read_fds)) -                                continue; -                        flags = fcntl(skfd, F_GETFL, 0); -                        fcntl(skfd, F_SETFL, flags | O_NONBLOCK); -                        n = sizeof(r_saddr); -                        if ((n = recvfrom(skfd, -                                          &buf, -                                          SHIM_UDP_MAX_PACKET_SIZE, -                                          0, -                                          (struct sockaddr *) &r_saddr, -                                          (unsigned *) &n)) <= 0) -                                continue; +                eid = ntoh32(*((uint32_t *) buf)); -                        pthread_rwlock_rdlock(&udp_data.flows_lock); +                /* pass onto mgmt queue */ +                if (eid == MGMT_EID) { +                        if (n > IPCP_UDP_MSG_SIZE) { +                                log_warn("Dropped oversize management frame."); +                                continue; +                        } -                        fd = udp_data.uf_to_fd[skfd]; +                        frame = malloc(sizeof(*frame)); +                        if (frame == NULL) +                                continue; -                        pthread_rwlock_unlock(&udp_data.flows_lock); +                        memcpy(frame->buf, buf, n); +                        memcpy(&frame->r_saddr, &r_saddr, sizeof(r_saddr)); -                        flow_write(fd, buf, n); +                        pthread_mutex_lock(&udp_data.mgmt_lock); +                        list_add(&frame->next, &udp_data.mgmt_frames); +                        pthread_cond_signal(&udp_data.mgmt_cond); +                        pthread_mutex_unlock(&udp_data.mgmt_lock); +                        continue;                  } + +                flow_write(eid, data, n - sizeof(eid));          } -        return (void *) 0; +        return 0;  } -static void * ipcp_udp_packet_loop(void * o) +static void * ipcp_udp_packet_writer(void * o)  { -        int fd; -        struct shm_du_buff * sdb; -          (void) o;          ipcp_lock_to_core();          while (true) { +                int fd; +                int eid;                  fevent(udp_data.np1_flows, udp_data.fq, NULL);                  while ((fd = fqueue_next(udp_data.fq)) >= 0) { +                        struct shm_du_buff * sdb; +                        uint8_t *            buf; +                        uint16_t             len;                          if (ipcp_flow_read(fd, &sdb)) { -                                log_err("Bad read from fd %d.", fd); +                                log_dbg("Bad read from fd %d.", fd); +                                continue; +                        } + +                        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +                        if (len > IPCP_UDP_MAX_PACKET_SIZE) { +                                log_dbg("Packet length exceeds MTU."); +                                ipcp_sdb_release(sdb); +                                continue; +                        } + +                        buf = shm_du_buff_head_alloc(sdb, OUR_HEADER_LEN); +                        if (buf == NULL) { +                                log_dbg("Failed to allocate header."); +                                ipcp_sdb_release(sdb);                                  continue;                          }                          pthread_rwlock_rdlock(&udp_data.flows_lock); +                        eid = hton32(udp_data.fd_to_uf[fd].d_eid);                          fd = udp_data.fd_to_uf[fd].skfd;                          pthread_rwlock_unlock(&udp_data.flows_lock); +                        memcpy(buf, &eid, sizeof(eid)); +                          pthread_cleanup_push((void (*)(void *)) -                                             ipcp_sdb_release, -                                             (void *) sdb); +                                             ipcp_sdb_release, (void *) sdb); -                        if (send(fd, shm_du_buff_head(sdb), -                                 shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), -                                 0) < 0) -                                log_err("Failed to send PACKET."); +                        if (write(fd, buf, len + OUR_HEADER_LEN) < 0) +                                log_err("Failed to send packet.");                          pthread_cleanup_pop(true);                  } @@ -593,28 +553,23 @@ static void * ipcp_udp_packet_loop(void * o)  static int ipcp_udp_bootstrap(const struct ipcp_config * conf)  { -        struct sockaddr_in s_saddr;          char ipstr[INET_ADDRSTRLEN];          char dnsstr[INET_ADDRSTRLEN]; -        int  enable = 1; -        int  fd = -1; +        char portstr[128]; /* port is max 64535 = 5 chars */ +        int  i = 1;          assert(conf);          assert(conf->type == THIS_TYPE); -        if (inet_ntop(AF_INET, -                      &conf->ip_addr, -                      ipstr, -                      INET_ADDRSTRLEN) == NULL) { +        if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, INET_ADDRSTRLEN) +            == NULL) {                  log_err("Failed to convert IP address");                  return -1;          }          if (conf->dns_addr != 0) { -                if (inet_ntop(AF_INET, -                              &conf->dns_addr, -                              dnsstr, -                              INET_ADDRSTRLEN) == NULL) { +                if (inet_ntop(AF_INET, &conf->dns_addr, dnsstr, INET_ADDRSTRLEN) +                    == NULL) {                          log_err("Failed to convert DNS address");                          return -1;                  } @@ -626,76 +581,79 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf)          }          /* UDP listen server */ -        if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { -                log_err("Can't create socket."); +        udp_data.s_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +        if (udp_data.s_fd < 0) { +                log_err("Can't create socket: %s", strerror(errno));                  goto fail_socket;          } -        if (setsockopt(fd, -                       SOL_SOCKET, -                       SO_REUSEADDR, -                       &enable, -                       sizeof(int)) < 0) +        if (setsockopt(udp_data.s_fd, SOL_SOCKET, SO_REUSEADDR, +                       &i, sizeof(i)) < 0)                  log_warn("Failed to set SO_REUSEADDR."); -        memset((char *) &s_saddr, 0, sizeof(s_saddr)); +        memset((char *) &udp_data.s_saddr, 0, sizeof(udp_data.s_saddr));          udp_data.s_saddr.sin_family      = AF_INET;          udp_data.s_saddr.sin_addr.s_addr = conf->ip_addr; -        udp_data.s_saddr.sin_port        = LISTEN_PORT; +        udp_data.s_saddr.sin_port        = htons(conf->srv_port); -        if (bind(fd, -                 (struct sockaddr *) &udp_data.s_saddr, -                 sizeof(udp_data.s_saddr)) < 0) { +        if (bind(udp_data.s_fd, SERV_SADDR, SERV_SADDR_SIZE) < 0) {                  log_err("Couldn't bind to %s.", ipstr);                  goto fail_bind;          } -        udp_data.s_fd     = fd;          udp_data.ip_addr  = conf->ip_addr;          udp_data.dns_addr = conf->dns_addr; - -        FD_CLR(udp_data.s_fd, &udp_data.flow_fd_s); +        udp_data.clt_port = htons(conf->clt_port);          ipcp_set_state(IPCP_OPERATIONAL); -        if (pthread_create(&udp_data.handler, -                           NULL, -                           ipcp_udp_listener, -                           NULL)) { +        if (pthread_create(&udp_data.mgmt_handler, NULL, +                           ipcp_udp_mgmt_handler, NULL)) {                  ipcp_set_state(IPCP_INIT);                  goto fail_bind;          } -        if (pthread_create(&udp_data.packet_reader, -                           NULL, -                           ipcp_udp_packet_reader, -                           NULL)) { -                ipcp_set_state(IPCP_INIT); -                goto fail_packet_reader; +        for (i = 0; i < IPCP_UDP_RD_THR; ++i) { +                if (pthread_create(&udp_data.packet_reader[i], NULL, +                                   ipcp_udp_packet_reader, NULL)) { +                        ipcp_set_state(IPCP_INIT); +                        goto fail_packet_reader; +                }          } -        if (pthread_create(&udp_data.packet_loop, -                           NULL, -                           ipcp_udp_packet_loop, -                           NULL)) { -                ipcp_set_state(IPCP_INIT); -                goto fail_packet_loop; +        for (i = 0; i < IPCP_UDP_WR_THR; ++i) { +                if (pthread_create(&udp_data.packet_writer[i], NULL, +                        ipcp_udp_packet_writer, NULL)) { +                        ipcp_set_state(IPCP_INIT); +                        goto fail_packet_writer; +                }          } +        sprintf(portstr, "%d", conf->clt_port); +          log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid());          log_dbg("Bound to IP address %s.", ipstr); +        log_dbg("Client port is %s.", conf->clt_port == 0 ? "random" : portstr); +        log_dbg("Server port is %u.", conf->srv_port);          log_dbg("DNS server address is %s.", dnsstr);          return 0; - fail_packet_loop: -        pthread_cancel(udp_data.packet_reader); -        pthread_join(udp_data.packet_reader, NULL); + fail_packet_writer: +        while (i > 0) { +                pthread_cancel(udp_data.packet_writer[--i]); +                pthread_join(udp_data.packet_writer[i], NULL); +        } +        i = IPCP_UDP_RD_THR;   fail_packet_reader: -        pthread_cancel(udp_data.handler); -        pthread_join(udp_data.handler, NULL); +        while (i > 0) { +                pthread_cancel(udp_data.packet_reader[--i]); +                pthread_join(udp_data.packet_reader[i], NULL); +        } +        pthread_cancel(udp_data.mgmt_handler); +        pthread_join(udp_data.mgmt_handler, NULL);   fail_bind: -        close(fd); +        close(udp_data.s_fd);   fail_socket:          return -1;  } @@ -705,9 +663,9 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf)  /* NOTE: Disgusted with this crap */  static int ddns_send(char * cmd)  { -        pid_t pid = -1; -        int wstatus; -        int pipe_fd[2]; +        pid_t pid     = -1; +        int   wstatus; +        int   pipe_fd[2];          char * argv[] = {NSUPDATE_EXEC, 0};          char * envp[] = {0}; @@ -743,22 +701,23 @@ static int ddns_send(char * cmd)                  log_err("Failed to register with DNS server.");          close(pipe_fd[1]); +          return 0;  }  static uint32_t ddns_resolve(char *   name,                               uint32_t dns_addr)  { -        pid_t pid = -1; -        int wstatus; -        int pipe_fd[2]; -        char dnsstr[INET_ADDRSTRLEN]; -        char buf[SHIM_UDP_BUF_SIZE]; -        ssize_t count = 0; -        char * substr = NULL; -        char * substr2 = NULL; -        char * addr_str = "Address:"; -        uint32_t ip_addr = 0; +        pid_t    pid      = -1; +        int      wstatus; +        int      pipe_fd[2]; +        char     dnsstr[INET_ADDRSTRLEN]; +        char     buf[IPCP_UDP_BUF_SIZE]; +        ssize_t  count    = 0; +        char *   substr   = NULL; +        char *   substr2  = NULL; +        char *   addr_str = "Address:"; +        uint32_t ip_addr  = 0;          if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL)                  return 0; @@ -785,7 +744,7 @@ static uint32_t ddns_resolve(char *   name,          close(pipe_fd[1]); -        count = read(pipe_fd[0], buf, SHIM_UDP_BUF_SIZE); +        count = read(pipe_fd[0], buf, IPCP_UDP_BUF_SIZE);          if (count <= 0) {                  log_err("Failed to communicate with nslookup.");                  close(pipe_fd[0]); @@ -796,7 +755,7 @@ static uint32_t ddns_resolve(char *   name,          waitpid(pid, &wstatus, 0);          if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0 && -            count != SHIM_UDP_BUF_SIZE) +            count != IPCP_UDP_BUF_SIZE)                  log_dbg("Succesfully communicated with nslookup.");          else                  log_err("Failed to resolve DNS address."); @@ -825,13 +784,13 @@ static uint32_t ddns_resolve(char *   name,  static int ipcp_udp_reg(const uint8_t * hash)  {  #ifdef HAVE_DDNS -        char ipstr[INET_ADDRSTRLEN]; -        char dnsstr[INET_ADDRSTRLEN]; -        char cmd[1000]; +        char     ipstr[INET_ADDRSTRLEN]; +        char     dnsstr[INET_ADDRSTRLEN]; +        char     cmd[1000];          uint32_t dns_addr;          uint32_t ip_addr;  #endif -        char * hashstr; +        char *   hashstr;          hashstr = malloc(ipcp_dir_hash_strlen() + 1);          if (hashstr == NULL) @@ -888,12 +847,12 @@ static int ipcp_udp_reg(const uint8_t * hash)  static int ipcp_udp_unreg(const uint8_t * hash)  {  #ifdef HAVE_DDNS -        char dnsstr[INET_ADDRSTRLEN]; +        char     dnsstr[INET_ADDRSTRLEN];          /* max DNS name length + max IP length + max command length */ -        char cmd[100]; +        char     cmd[100];          uint32_t dns_addr;  #endif -        char * hashstr; +        char *   hashstr;          assert(hash); @@ -932,13 +891,12 @@ static int ipcp_udp_unreg(const uint8_t * hash)  static int ipcp_udp_query(const uint8_t * hash)  { -        uint32_t           ip_addr = 0; -        char *             hashstr; -        struct hostent *   h; +        uint32_t         ip_addr  = 0; +        char *           hashstr; +        struct hostent * h;  #ifdef HAVE_DDNS -        uint32_t           dns_addr = 0; +        uint32_t         dns_addr = 0;  #endif -          assert(hash);          hashstr = malloc(ipcp_dir_hash_strlen() + 1); @@ -991,11 +949,14 @@ static int ipcp_udp_flow_alloc(int             fd,                                 const uint8_t * dst,                                 qosspec_t       qs)  { -        struct sockaddr_in r_saddr; /* server address */ -        struct sockaddr_in f_saddr; /* flow */ -        socklen_t          f_saddr_len = sizeof(f_saddr); +        struct sockaddr_in r_saddr; /* Server address */ +        struct sockaddr_in c_saddr; /* Client address */ +        socklen_t          c_saddr_len;          int                skfd;          uint32_t           ip_addr = 0; +        char               ip_str[INET_ADDRSTRLEN]; + +        c_saddr_len = sizeof(c_saddr);          log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst)); @@ -1004,21 +965,24 @@ static int ipcp_udp_flow_alloc(int             fd,          assert(dst);          skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); -        if (skfd < 0) +        if (skfd < 0) { +                log_err("Could not create socket.");                  return -1; +        } -        /* this socket is for the flow */ -        memset((char *) &f_saddr, 0, sizeof(f_saddr)); -        f_saddr.sin_family      = AF_INET; -        f_saddr.sin_addr.s_addr = local_ip; -        f_saddr.sin_port        = 0; +        /* This socket is for the flow. */ +        memset((char *) &c_saddr, 0, sizeof(c_saddr)); +        c_saddr.sin_family      = AF_INET; +        c_saddr.sin_addr.s_addr = LOCAL_IP; +        c_saddr.sin_port        = udp_data.clt_port; -        if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { +        if (bind(skfd, (struct sockaddr *) &c_saddr, sizeof(c_saddr)) < 0) { +                log_dbg("Could not bind socket to client address.");                  close(skfd);                  return -1;          } -        if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { +        if (getsockname(skfd, (struct sockaddr *) &c_saddr, &c_saddr_len) < 0) {                  log_err("Could not get address from fd.");                  close(skfd);                  return -1; @@ -1029,43 +993,41 @@ static int ipcp_udp_flow_alloc(int             fd,                  close(skfd);                  return -1;          } +          ip_addr = (uint32_t) shim_data_dir_get_addr(udp_data.shim_data, dst); -        /* connect to server (store the remote IP address in the fd) */ +        inet_ntop(AF_INET, &ip_addr, ip_str, INET_ADDRSTRLEN); +        log_dbg("Destination UDP ipcp resolved at %s.", ip_str); + +        /* Connect to server and store the remote IP address in the skfd. */          memset((char *) &r_saddr, 0, sizeof(r_saddr));          r_saddr.sin_family      = AF_INET;          r_saddr.sin_addr.s_addr = ip_addr; -        r_saddr.sin_port        = LISTEN_PORT; +        r_saddr.sin_port        = udp_data.s_saddr.sin_port;          if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { +                log_dbg("Could not connect socket to remote."); +                close(skfd); +                return -1; +        } + +        if (ipcp_udp_port_alloc(skfd, fd, dst, qs) < 0) { +                log_err("Could not allocate port.");                  close(skfd);                  return -1;          }          pthread_rwlock_wrlock(&udp_data.flows_lock); -        udp_data.fd_to_uf[fd].udp  = f_saddr.sin_port; -        udp_data.fd_to_uf[fd].skfd = skfd; -        udp_data.uf_to_fd[skfd]    = fd; +        udp_data.fd_to_uf[fd].d_eid = -1; +        udp_data.fd_to_uf[fd].skfd  = skfd;          fset_add(udp_data.np1_flows, fd);          pthread_rwlock_unlock(&udp_data.flows_lock); -        if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, qs) < 0) { -                pthread_rwlock_wrlock(&udp_data.flows_lock); - -                udp_data.fd_to_uf[fd].udp  = -1; -                udp_data.fd_to_uf[fd].skfd = -1; -                udp_data.uf_to_fd[skfd]    = -1; - -                pthread_rwlock_unlock(&udp_data.flows_lock); -                close(skfd); -                return -1; -        } - -        log_dbg("Flow pending on fd %d, UDP port %d.", -                fd, ntohs(f_saddr.sin_port)); +        log_dbg("Flow pending on fd %d, UDP src port %d, dst port %d.", +                fd, ntohs(c_saddr.sin_port), ntohs(r_saddr.sin_port));          return 0;  } @@ -1073,12 +1035,10 @@ static int ipcp_udp_flow_alloc(int             fd,  static int ipcp_udp_flow_alloc_resp(int fd,                                      int response)  { -        struct timespec    ts   = {0, FD_UPDATE_TIMEOUT * 1000}; -        struct timespec    abstime; -        int                skfd = -1; -        struct sockaddr_in f_saddr; -        struct sockaddr_in r_saddr; -        socklen_t          len  = sizeof(r_saddr); +        struct timespec ts  = {0, FD_UPDATE_TIMEOUT * 1000}; +        struct timespec abstime; +        int             skfd; +        int             d_eid;          if (response)                  return 0; @@ -1106,38 +1066,23 @@ static int ipcp_udp_flow_alloc_resp(int fd,          pthread_rwlock_rdlock(&udp_data.flows_lock); -        skfd = udp_data.fd_to_uf[fd].skfd; - -        pthread_rwlock_unlock(&udp_data.flows_lock); - -        if (getsockname(skfd, (struct sockaddr *) &f_saddr, &len) < 0) { -                log_dbg("Socket with fd %d has no address.", skfd); -                return -1; -        } - -        if (getpeername(skfd, (struct sockaddr *) &r_saddr, &len) < 0) { -                log_dbg("Socket with fd %d has no peer.", skfd); -                return -1; -        } - -        pthread_rwlock_rdlock(&udp_data.flows_lock); - -        set_fd(skfd); +        skfd  = udp_data.fd_to_uf[fd].skfd; +        d_eid = udp_data.fd_to_uf[fd].d_eid;          fset_add(udp_data.np1_flows, fd);          pthread_rwlock_unlock(&udp_data.flows_lock); -        if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr, f_saddr.sin_port, -                                     r_saddr.sin_port, response) < 0) { +        if (ipcp_udp_port_alloc_resp(skfd, d_eid, fd, response) < 0) {                  pthread_rwlock_rdlock(&udp_data.flows_lock); -                clr_fd(skfd); +                fset_del(udp_data.np1_flows, fd);                  pthread_rwlock_unlock(&udp_data.flows_lock); +                log_err("Failed to respond to flow request.");                  return -1;          } -        log_dbg("Accepted flow, fd %d on UDP port %d.", -                fd, ntohs(f_saddr.sin_port)); +        log_dbg("Accepted flow, fd %d on eid %d.", +                fd, d_eid);          return 0;  } @@ -1154,18 +1099,12 @@ static int ipcp_udp_flow_dealloc(int fd)          skfd = udp_data.fd_to_uf[fd].skfd; -        udp_data.uf_to_fd[skfd]    = -1; -        udp_data.fd_to_uf[fd].udp  = -1; -        udp_data.fd_to_uf[fd].skfd = -1; +        udp_data.fd_to_uf[fd].d_eid = -1; +        udp_data.fd_to_uf[fd].skfd  = -1;          close(skfd);          pthread_rwlock_unlock(&udp_data.flows_lock); -        pthread_rwlock_rdlock(&udp_data.flows_lock); - -        clr_fd(skfd); - -        pthread_rwlock_unlock(&udp_data.flows_lock);          flow_dealloc(fd); @@ -1191,6 +1130,8 @@ static struct ipcp_ops udp_ops = {  int main(int    argc,           char * argv[])  { +        int i; +          if (ipcp_init(argc, argv, &udp_ops) < 0)                  goto fail_init; @@ -1212,13 +1153,17 @@ int main(int    argc,          ipcp_shutdown();          if (ipcp_get_state() == IPCP_SHUTDOWN) { -                pthread_cancel(udp_data.packet_loop); -                pthread_cancel(udp_data.handler); -                pthread_cancel(udp_data.packet_reader); - -                pthread_join(udp_data.packet_loop, NULL); -                pthread_join(udp_data.handler, NULL); -                pthread_join(udp_data.packet_reader, NULL); +                for (i = 0; i < IPCP_UDP_RD_THR; ++i) +                        pthread_cancel(udp_data.packet_reader[i]); +                for (i = 0; i < IPCP_UDP_WR_THR; ++i) +                        pthread_cancel(udp_data.packet_writer[i]); +                pthread_cancel(udp_data.mgmt_handler); + +                for (i = 0; i < IPCP_UDP_RD_THR; ++i) +                        pthread_join(udp_data.packet_reader[i], NULL); +                for (i = 0; i < IPCP_UDP_WR_THR; ++i) +                        pthread_join(udp_data.packet_writer[i], NULL); +                pthread_join(udp_data.mgmt_handler, NULL);          }          udp_data_fini(); | 
