diff options
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/flow.c | 38 | ||||
| -rw-r--r-- | src/ipcpd/flow.h | 12 | ||||
| -rw-r--r-- | src/ipcpd/ipcp-data.c | 104 | ||||
| -rw-r--r-- | src/ipcpd/ipcp-data.h | 16 | ||||
| -rw-r--r-- | src/ipcpd/ipcp-ops.h | 9 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.c | 20 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.h | 3 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 524 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/tests/shim_udp_test.c | 12 | 
9 files changed, 408 insertions, 330 deletions
| diff --git a/src/ipcpd/flow.c b/src/ipcpd/flow.c index c436733b..ae8f848c 100644 --- a/src/ipcpd/flow.c +++ b/src/ipcpd/flow.c @@ -27,7 +27,7 @@  #include <ouroboros/logs.h> -flow_t * flow_create(int32_t port_id) +flow_t * flow_create(uint32_t port_id)  {          flow_t * flow = malloc(sizeof *flow);          if (flow == NULL) { @@ -38,8 +38,7 @@ flow_t * flow_create(int32_t port_id)          INIT_LIST_HEAD(&flow->list);          flow->port_id = port_id; -        flow->oflags = FLOW_O_DEFAULT; -        flow->state = FLOW_NULL; +        flow->state   = FLOW_NULL;          pthread_mutex_init(&flow->lock, NULL); @@ -52,36 +51,3 @@ void flow_destroy(flow_t * flow)                  return;          free(flow);  } - -int flow_set_opts(flow_t * flow, uint16_t opts) -{ -        if (flow == NULL) { -                LOG_DBGF("Non-existing flow."); -                return -1; -        } - -        pthread_mutex_lock(&flow->lock); - -        if ((opts & FLOW_O_ACCMODE) == FLOW_O_ACCMODE) { -                flow->oflags = FLOW_O_DEFAULT; -                pthread_mutex_unlock(&flow->lock); -                LOG_WARN("Invalid flow options. Setting default."); -                return -1; -        } - -        flow->oflags = opts; - -        pthread_mutex_unlock(&flow->lock); - -        return 0; -} - -uint16_t flow_get_opts(const flow_t * flow) -{ -        if (flow == NULL) { -                LOG_DBGF("Non-existing flow."); -                return FLOW_O_INVALID; -        } - -        return flow->oflags; -} diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h index 000de5ad..0a3e90d1 100644 --- a/src/ipcpd/flow.h +++ b/src/ipcpd/flow.h @@ -25,6 +25,7 @@  #include <ouroboros/common.h>  #include <ouroboros/list.h> +#include <ouroboros/shm_ap_rbuff.h>  #include <pthread.h>  /* same values as fcntl.h */ @@ -47,17 +48,14 @@ enum flow_state {  typedef struct flow {          struct list_head list; -        int32_t          port_id; -        uint16_t         oflags; -        enum flow_state  state; +        uint32_t              port_id; +        struct shm_ap_rbuff * rb; +        enum flow_state       state;          pthread_mutex_t  lock;  } flow_t; -flow_t * flow_create(int32_t   port_id); +flow_t * flow_create(uint32_t port_id);  void     flow_destroy(flow_t * flow); -int      flow_set_opts(flow_t * flow, uint16_t opts); -uint16_t flow_get_opts(const flow_t * flow); -  #endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c index 72407a53..76fc4bcd 100644 --- a/src/ipcpd/ipcp-data.c +++ b/src/ipcpd/ipcp-data.c @@ -96,46 +96,26 @@ struct ipcp_data * ipcp_data_create()          if (data == NULL)                  return NULL; -        data->iname = NULL;          data->type  = 0; -        data->dum   = NULL;          return data;  }  struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, -                                  const char *       ipcp_name,                                    enum ipcp_type     ipcp_type)  {          if (dst == NULL)                  return NULL; -        dst->iname = instance_name_create(); -        if (dst->iname == NULL) -                return NULL; - -        if(instance_name_init_from(dst->iname, ipcp_name, getpid()) == NULL) { -                instance_name_destroy(dst->iname); -                return NULL; -        } -          dst->type  = ipcp_type; -        dst->dum = shm_du_map_open(); -        if (dst->dum == NULL) { -                instance_name_destroy(dst->iname); -                return NULL; -        } -          /* init the lists */          INIT_LIST_HEAD(&dst->registry); -        INIT_LIST_HEAD(&dst->flows);          INIT_LIST_HEAD(&dst->directory);          /* init the mutexes */          pthread_mutex_init(&dst->reg_lock, NULL);          pthread_mutex_init(&dst->dir_lock, NULL); -        pthread_mutex_init(&dst->flow_lock, NULL);          return dst;  } @@ -156,42 +136,22 @@ static void clear_directory(struct ipcp_data * data)                  dir_entry_destroy(list_entry(h, struct dir_entry, list));  } -static void clear_flows(struct ipcp_data * data) -{ -        struct list_head * h; -        struct list_head * t; -        list_for_each_safe(h, t, &data->flows) -                flow_destroy(list_entry(h, flow_t, list)); - -} -  void ipcp_data_destroy(struct ipcp_data * data)  {          if (data == NULL)                  return; -        /* FIXME: finish all pending operations here */ - -        if (data->iname != NULL) -                instance_name_destroy(data->iname); -        data->iname = NULL; - -        if (data->dum != NULL) -                shm_du_map_close(data->dum); -        data->dum = NULL; +        /* FIXME: finish all pending operations here and cancel all threads */          pthread_mutex_lock(&data->reg_lock);          pthread_mutex_lock(&data->dir_lock); -        pthread_mutex_lock(&data->flow_lock);          /* clear the lists */          clear_registry(data);          clear_directory(data); -        clear_flows(data);          /*           * no need to unlock, just free the entire thing -         * pthread_mutex_unlock(&data->flow_lock);           * pthread_mutex_unlock(&data->dir_lock);           * pthread_mutex_unlock(&data->reg_lock);           */ @@ -380,65 +340,3 @@ uint64_t ipcp_data_get_addr(struct ipcp_data * data,          return addr;  } - -flow_t * ipcp_data_find_flow(struct ipcp_data * data, -                             uint32_t           port_id) -{ -        struct list_head * h; -        list_for_each(h, &data->flows) { -                flow_t * f = list_entry(h, flow_t, list); -                if (f->port_id == port_id) -                        return f; -        } - -        return NULL; -} - -bool ipcp_data_has_flow(struct ipcp_data * data, -                        uint32_t           port_id) -{ -        return ipcp_data_find_flow(data, port_id) != NULL; -} - -int ipcp_data_add_flow(struct ipcp_data * data, -                       flow_t *           flow) -{ -        if (data == NULL || flow == NULL) -                return -1; - -        pthread_mutex_lock(&data->flow_lock); - -        if (ipcp_data_has_flow(data, flow->port_id)) { -                pthread_mutex_unlock(&data->flow_lock); -                return -2; -        } - -        list_add(&flow->list,&data->flows); - -        pthread_mutex_unlock(&data->flow_lock); - -        return 0; -} - -int ipcp_data_del_flow(struct ipcp_data * data, -                       uint32_t           port_id) -{ -        flow_t * f; - -        if (data == NULL) -                return -1; - -        pthread_mutex_lock(&data->flow_lock); - -        f = ipcp_data_find_flow(data, port_id); -        if (f == NULL) -                return -1; - -        list_del(&f->list); - -        free(f); - -        pthread_mutex_unlock(&data->flow_lock); - -        return 0; -} diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index 1dea8c3c..2e86ba11 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -34,17 +34,11 @@  #include "flow.h"  struct ipcp_data { -        instance_name_t   * iname;          enum ipcp_type      type; -        struct shm_du_map * dum; -          struct list_head    registry;          pthread_mutex_t     reg_lock; -        struct list_head    flows; -        pthread_mutex_t     flow_lock; -          struct list_head    directory;          pthread_mutex_t     dir_lock; @@ -53,7 +47,6 @@ struct ipcp_data {  struct ipcp_data * ipcp_data_create();  struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, -                                  const char *       ipcp_name,                                    enum ipcp_type     ipcp_type);  void               ipcp_data_destroy(struct ipcp_data * data); @@ -73,13 +66,4 @@ bool         ipcp_data_is_in_directory(struct ipcp_data * data,                                         const char *       ap_name);  uint64_t     ipcp_data_get_addr(struct ipcp_data * data,                                  const char *       ap_name); -bool         ipcp_data_has_flow(struct ipcp_data * data, -                                uint32_t           port_id); -flow_t *     ipcp_data_find_flow(struct ipcp_data * data, -                                 uint32_t           port_id); -int          ipcp_data_add_flow(struct ipcp_data * data, -                                flow_t *           flow); -int          ipcp_data_del_flow(struct ipcp_data * data, -                                uint32_t           port_id); -  #endif /* IPCPD_IPCP_DATA_H */ diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index 2ccb2e59..91b6cac9 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -39,20 +39,15 @@ struct ipcp_ops {          int   (* ipcp_name_reg)(char *   name);          int   (* ipcp_name_unreg)(char * name);          int   (* ipcp_flow_alloc)(uint32_t port_id, +                                  pid_t    n_pid,                                    char *   dst_ap_name,                                    char *   src_ap_name,                                    char *   src_ae_name,                                    struct qos_spec * qos);          int   (* ipcp_flow_alloc_resp)(uint32_t port_id, +                                       pid_t    n_pid,                                         int      response);          int   (* ipcp_flow_dealloc)(uint32_t port_id); - -        /* FIXME: let's see how this will work with the shm_du_map */ -        int   (* ipcp_du_write)(uint32_t port_id, -                                size_t map_index); - -        int   (* ipcp_du_read)(uint32_t port_id, -                               size_t map_index);  };  #endif /* IPCPD_IPCP_OPS_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index d6f373cd..13632a80 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -45,11 +45,12 @@ int ipcp_arg_check(int argc, char * argv[])          return 0;  } -int ipcp_main_loop(struct ipcp * _ipcp) +void * ipcp_main_loop(void * o)  {          int     lsockfd;          int     sockfd;          uint8_t buf[IPCP_MSG_BUF_SIZE]; +        struct ipcp * _ipcp = (struct ipcp *) o;          ipcp_msg_t *    msg;          ssize_t         count; @@ -61,13 +62,13 @@ int ipcp_main_loop(struct ipcp * _ipcp)          if (_ipcp == NULL) {                  LOG_ERR("Invalid ipcp struct."); -                return 1; +                return (void *) 1;          }          sockfd = server_socket_open(ipcp_sock_path(getpid()));          if (sockfd < 0) {                  LOG_ERR("Could not open server socket."); -                return 1; +                return (void *) 1;          }          while (true) { @@ -113,7 +114,7 @@ int ipcp_main_loop(struct ipcp * _ipcp)                                  conf.max_pdu_size    = conf_msg->max_pdu_size;                          }                          if (conf_msg->ipcp_type == IPCP_SHIM_UDP) { -                                conf.ip_addr = conf_msg->ip_addr; +                                conf.ip_addr  = conf_msg->ip_addr;                                  conf.dns_addr = conf_msg->dns_addr;                          } @@ -149,7 +150,8 @@ int ipcp_main_loop(struct ipcp * _ipcp)                          }                          ret_msg.has_result = true;                          ret_msg.result = -                                _ipcp->ops->ipcp_unreg(msg->dif_names, msg->len); +                                _ipcp->ops->ipcp_unreg(msg->dif_names, +                                                       msg->len);                          break;                  case IPCP_MSG_CODE__IPCP_NAME_REG:                          if (_ipcp->ops->ipcp_name_reg == NULL) { @@ -172,9 +174,10 @@ int ipcp_main_loop(struct ipcp * _ipcp)                                  LOG_ERR("Flow_alloc unsupported.");                                  break;                          } -                        ret_msg.has_fd = true; -                        ret_msg.fd = +                        ret_msg.has_result = true; +                        ret_msg.result =                                  _ipcp->ops->ipcp_flow_alloc(msg->port_id, +                                                            msg->pid,                                                              msg->dst_name,                                                              msg->src_ap_name,                                                              msg->src_ae_name, @@ -188,6 +191,7 @@ int ipcp_main_loop(struct ipcp * _ipcp)                          ret_msg.has_result = true;                          ret_msg.result =                                  _ipcp->ops->ipcp_flow_alloc_resp(msg->port_id, +                                                                 msg->pid,                                                                   msg->result);                          break;                  case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: @@ -231,5 +235,5 @@ int ipcp_main_loop(struct ipcp * _ipcp)                  close(lsockfd);          } -        return 0; +        return NULL;  } diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9decac8b..393af994 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -43,7 +43,8 @@ struct ipcp {          int                irmd_fd;  }; -int ipcp_main_loop(); +void * ipcp_main_loop(void * o); +void * ipcp_sdu_loop(void * o);  int ipcp_arg_check(int argc, char * argv[]);  #endif diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 460fe9e3..1f7bb12f 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -24,12 +24,13 @@  #include "ipcp.h"  #include "flow.h"  #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h>  #include <ouroboros/ipcp.h>  #include <ouroboros/dif_config.h>  #include <ouroboros/sockets.h> -#include <ouroboros/dev.h> +#include <ouroboros/bitmap.h>  #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -67,6 +68,144 @@ extern struct ipcp * _ipcp; /* defined in test */  struct ipcp * _ipcp;  #endif +/* + * copied from ouroboros/dev. The shim needs access to the internals + * because it doesn't follow all steps necessary steps to get + * the info + */ + +#define UNKNOWN_AP "__UNKNOWN_AP__" +#define UNKNOWN_AE "__UNKNOWN_AE__" + +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE +  #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE +  #define DU_BUFF_TAILSPACE 0 +#endif + +/* the shim needs access to these internals */ +struct shim_ap_data { +        instance_name_t *     api; +        struct shm_du_map *   dum; +        struct bmp *          fds; + +        struct shm_ap_rbuff * rb; +        struct flow           flows[AP_MAX_FLOWS]; + +        pthread_t mainloop; +        pthread_t sduloop; +        pthread_t handler; +        pthread_t sdu_reader[2]; +        int       ping_pong; +} * _ap_instance; + +int shim_ap_init(char * ap_name) +{ +        _ap_instance = malloc(sizeof(struct shim_ap_data)); +        if (_ap_instance == NULL) { +                return -1; +        } + +        _ap_instance->api = instance_name_create(); +        if (_ap_instance->api == NULL) { +                free(_ap_instance); +                return -1; +        } + +        if (instance_name_init_from(_ap_instance->api, +                                    ap_name, +                                    getpid()) == NULL) { +                instance_name_destroy(_ap_instance->api); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); +        if (_ap_instance->fds == NULL) { +                instance_name_destroy(_ap_instance->api); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->dum = shm_du_map_open(); +        if (_ap_instance->dum == NULL) { +                instance_name_destroy(_ap_instance->api); +                bmp_destroy(_ap_instance->fds); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->rb = shm_ap_rbuff_create(); +        if (_ap_instance->rb == NULL) { +                instance_name_destroy(_ap_instance->api); +                bmp_destroy(_ap_instance->fds); +                free(_ap_instance); +                return -1; +        } + +        return 0; +} + +void shim_ap_fini() +{ +        int i = 0; + +        if (_ap_instance == NULL) +                return; +        if (_ap_instance->api != NULL) +                instance_name_destroy(_ap_instance->api); +        if (_ap_instance->fds != NULL) +                bmp_destroy(_ap_instance->fds); +        if (_ap_instance->dum != NULL) +                shm_du_map_close(_ap_instance->dum); +        if (_ap_instance->rb != NULL) +                shm_ap_rbuff_destroy(_ap_instance->rb); +        for (i = 0; i < AP_MAX_FLOWS; i ++) +                if (_ap_instance->flows[i].rb != NULL) +                        shm_ap_rbuff_close(_ap_instance->flows[i].rb); + +        free(_ap_instance); +} + +static int port_id_to_fd(uint32_t port_id) +{ +        int i; +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                if (_ap_instance->flows[i].port_id == port_id +                        && _ap_instance->flows[i].state != FLOW_NULL) +                        return i; +        return -1; +} + +static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) +{ +        /* the AP chooses the amount of headspace and tailspace */ +        size_t index = shm_create_du_buff(_ap_instance->dum, +                                          count, +                                          0, +                                          buf, +                                          count); +        struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + +        if (index == -1) +                return -1; + +        if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { +                shm_release_du_buff(_ap_instance->dum, index); +                return -EPIPE; +        } + +        return 0; +} + +/* + * end copy from dev.c + */ +  struct ipcp_udp_data {          /* keep ipcp_data first for polymorphism */          struct ipcp_data ipcp_data; @@ -79,39 +218,15 @@ struct ipcp_udp_data {          int                s_fd;          fd_set flow_fd_s; -        flow_t * fd_to_flow_ptr[FD_SETSIZE]; -        pthread_mutex_t   lock; +        pthread_mutex_t lock;  }; -struct udp_flow { -        /* keep flow first for polymorphism */ -        flow_t flow; -        int    fd; -}; - -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) -{ -        switch(sig) { -        case SIGINT: -        case SIGTERM: -        case SIGHUP: -                LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); -                if (info->si_pid == irmd_pid) { -                        /* shm_du_map_close(_ipcp->data->dum); */ -                        exit(0); -                } -        default: -                return; -        } -} - -struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) +struct ipcp_udp_data * ipcp_udp_data_create()  {          struct ipcp_udp_data * udp_data;          struct ipcp_data *     data;          enum ipcp_type         ipcp_type; -        int                    n;          udp_data = malloc(sizeof *udp_data);          if (udp_data == NULL) { @@ -121,18 +236,52 @@ struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name)          ipcp_type = THIS_TYPE;          data = (struct ipcp_data *) udp_data; -        if (ipcp_data_init(data, ap_name, ipcp_type) == NULL) { +        if (ipcp_data_init(data, ipcp_type) == NULL) {                  free(udp_data);                  return NULL;          }          FD_ZERO(&udp_data->flow_fd_s); -        for (n = 0; n < FD_SETSIZE; ++n) -                udp_data->fd_to_flow_ptr[n] = NULL;          return udp_data;  } +void ipcp_udp_data_destroy(struct ipcp_udp_data * data) +{ +        if (data == NULL) +                return; + +        ipcp_data_destroy((struct ipcp_data *) data); +} + +void ipcp_udp_destroy(struct ipcp * ipcp) +{ +        ipcp_udp_data_destroy((struct ipcp_udp_data *) ipcp->data); +        shim_ap_fini(); +        free(ipcp); +} + +void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +{ +        switch(sig) { +        case SIGINT: +        case SIGTERM: +        case SIGHUP: +                if (info->si_pid == irmd_pid || info->si_pid == 0) { +                        LOG_DBG("Terminating by order of %d. Bye.", +                                info->si_pid); +                        pthread_cancel(_ap_instance->mainloop); +                        pthread_cancel(_ap_instance->handler); +                        pthread_cancel(_ap_instance->sdu_reader[0]); +                        pthread_cancel(_ap_instance->sdu_reader[1]); +                        pthread_cancel(_ap_instance->sduloop); +                        exit(0); +                } +        default: +                return; +        } +} +  static void * ipcp_udp_listener()  {          char buf[SHIM_UDP_BUF_SIZE]; @@ -141,10 +290,10 @@ static void * ipcp_udp_listener()          struct sockaddr_in f_saddr;          struct sockaddr_in c_saddr;          struct hostent  *  hostp; -        struct udp_flow *  flow;          int                sfd = shim_data(_ipcp)->s_fd;          while (true) { +                int fd;                  n = sizeof c_saddr;                  n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,                               (struct sockaddr *) &c_saddr, (unsigned *) &n); @@ -157,16 +306,7 @@ static void * ipcp_udp_listener()                  if (hostp == NULL)                          continue; -                /* create a new socket for the server */ -                flow = malloc(sizeof *flow); -                if (flow == NULL) -                        continue; - -                flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); -                if (flow->fd == -1) { -                        free(flow); -                        continue; -                } +                fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);                  memset((char *) &f_saddr, 0, sizeof f_saddr);                  f_saddr.sin_family      = AF_INET; @@ -185,36 +325,33 @@ static void * ipcp_udp_listener()                   * the flow structure                   */ -                if (connect(flow->fd, +                if (connect(fd,                              (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { -                        close(flow->fd); -                        free(flow); +                        close(fd);                          continue;                  } +                /* echo back the packet */ +                while(send(fd, buf, strlen(buf), 0) < 0) +                        ; +                  /* reply to IRM */ -                flow->flow.port_id = ipcp_flow_req_arr(getpid(), buf, -                                                       UNKNOWN_AP, ""); -                if (flow->flow.port_id < 0) { +                _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), +                                                                    buf, +                                                                    UNKNOWN_AP, +                                                                    UNKNOWN_AE); +                if (_ap_instance->flows[fd].port_id < 0) {                          LOG_ERR("Could not get port id from IRMd"); -                        close(flow->fd); -                        free(flow); +                        close(fd);                          continue;                  } -                flow->flow.oflags = FLOW_O_DEFAULT; -                flow->flow.state  = FLOW_PENDING; - -                if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { -                        LOG_DBGF("Could not add flow."); -                        close(flow->fd); -                        free(flow); -                        continue; -                } +                _ap_instance->flows[fd].rb     = NULL; +                _ap_instance->flows[fd].state  = FLOW_PENDING; -                FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); -                shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; +                LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", +                         _ap_instance->flows[fd].port_id, fd);          }          return 0; @@ -229,8 +366,6 @@ static void * ipcp_udp_sdu_reader()          struct sockaddr_in r_saddr;          while (true) { -                flow_t * flow; -                  if (select(FD_SETSIZE,                             &shim_data(_ipcp)->flow_fd_s,                             NULL, NULL, NULL) @@ -249,18 +384,8 @@ static void * ipcp_udp_sdu_reader()                                       (struct sockaddr *) &r_saddr,                                       (unsigned *) &n); -                        flow = shim_data(_ipcp)->fd_to_flow_ptr[fd]; -                        if (flow->state == FLOW_PENDING) { -                                if (connect(fd, -                                            (struct sockaddr *) &r_saddr, -                                            sizeof r_saddr) -                                    < 0) -                                        continue; -                                flow->state = FLOW_ALLOCATED; -                        } -                          /* send the sdu to the correct port_id */ -                        LOG_MISSING; +                        ipcp_udp_flow_write(fd, buf, n);                  }          } @@ -271,8 +396,6 @@ int ipcp_udp_bootstrap(struct dif_config * conf)  {          char ipstr[INET_ADDRSTRLEN];          char dnsstr[INET_ADDRSTRLEN]; -        pthread_t handler; -        pthread_t sdu_reader;          int enable = 1;          if (conf->type != THIS_TYPE) { @@ -296,7 +419,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf)                            dnsstr,                            INET_ADDRSTRLEN);          else -                strcpy(dnsstr, "not set.\n"); +                strcpy(dnsstr, "not set");          shim_data(_ipcp)->ip_addr  = conf->ip_addr;          shim_data(_ipcp)->dns_addr = conf->dns_addr; @@ -304,7 +427,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf)          /* UDP listen server */          if ((shim_data(_ipcp)->s_fd = -             socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { +             socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {                  LOG_DBGF("Can't create socket.");                  return -1;          } @@ -328,13 +451,28 @@ int ipcp_udp_bootstrap(struct dif_config * conf)                  return -1;          } -        pthread_create(&handler, NULL, ipcp_udp_listener, NULL); -        pthread_create(&sdu_reader, NULL, ipcp_udp_sdu_reader, NULL); +        FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + +        pthread_create(&_ap_instance->handler, +                       NULL, +                       ipcp_udp_listener, +                       NULL); +        pthread_create(&_ap_instance->sdu_reader[0], +                       NULL, +                       ipcp_udp_sdu_reader, +                       NULL); + +        pthread_create(&_ap_instance->sdu_reader[1], +                       NULL, +                       ipcp_udp_sdu_reader, +                       NULL); + +        _ap_instance->ping_pong = 0;          _ipcp->state = IPCP_ENROLLED; -        LOG_DBG("Bootstrapped shim IPCP over UDP %s-%d.", -                _ipcp->data->iname->name, _ipcp->data->iname->id); +        LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", +                getpid());          LOG_DBG("Bound to IP address %s.", ipstr);          LOG_DBG("DNS server address is %s.", dnsstr); @@ -464,23 +602,25 @@ int ipcp_udp_name_unreg(char * name)  }  int ipcp_udp_flow_alloc(uint32_t          port_id, +                        pid_t             n_pid,                          char *            dst_name,                          char *            src_ap_name,                          char *            src_ae_name,                          struct qos_spec * qos)  { -        struct udp_flow *  flow = NULL;          struct sockaddr_in l_saddr;          struct sockaddr_in r_saddr; +        struct sockaddr_in rf_saddr; +        int                fd; +        int n; + +        char * recv_buf = NULL;          struct hostent * h;          if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)                  return -1; -        LOG_DBG("Received flow allocation request from %s to %s.", -                src_ap_name, dst_name); -          if (strlen(dst_name) > 255              || strlen(src_ap_name) > 255              || strlen(src_ae_name) > 255) { @@ -491,15 +631,7 @@ int ipcp_udp_flow_alloc(uint32_t          port_id,          if (qos != NULL)                  LOG_DBGF("QoS requested. UDP/IP can't do that."); -        flow = malloc(sizeof *flow); -        if (flow == NULL) -                return -1; - -        flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); -        if (flow->fd == -1) { -                free(flow); -                return -1; -        } +        fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);          /* this socket is for the flow */          memset((char *) &l_saddr, 0, sizeof l_saddr); @@ -507,108 +639,161 @@ int ipcp_udp_flow_alloc(uint32_t          port_id,          l_saddr.sin_addr.s_addr = local_ip;          l_saddr.sin_port        = 0; -        if (bind(flow->fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { -                char ipstr[INET_ADDRSTRLEN]; -                inet_ntop(AF_INET, -                          &l_saddr.sin_addr.s_addr, -                          ipstr, -                          INET_ADDRSTRLEN); -                close(flow->fd); -                free(flow); +        if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { +                close(fd);                  return -1;          }          h = gethostbyname(dst_name);          if (h == NULL) {                  LOG_DBGF("Could not resolve %s.", dst_name); -                close(flow->fd); -                free(flow); +                close(fd);                  return -1;          } -          memset((char *) &r_saddr, 0, sizeof r_saddr);          r_saddr.sin_family      = AF_INET; -        r_saddr.sin_addr.s_addr = (uint32_t) *(h->h_addr_list[0]); +        r_saddr.sin_addr.s_addr = *((uint32_t *) (h->h_addr_list[0]));          r_saddr.sin_port        = LISTEN_PORT; +          /* at least try to get the packet on the wire */ -        while (sendto(flow->fd, dst_name, strlen(dst_name), 0, +        while (sendto(fd, dst_name, strlen(dst_name), 0,                        (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) {          } -        flow->flow.port_id = port_id; -        flow->flow.oflags  = FLOW_O_DEFAULT; -        flow->flow.state   = FLOW_PENDING; - -        /* add flow to the list */ +        /* wait for the other shim IPCP to respond */ -        pthread_mutex_lock(&_ipcp->data->flow_lock); +        recv_buf = malloc(strlen(dst_name) + 1); +        n = sizeof(rf_saddr); +        n = recvfrom(fd, +                     recv_buf, +                     strlen(dst_name) + 1, +                     0, +                     (struct sockaddr *) &rf_saddr, +                     (unsigned *) &n); -        if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { -                LOG_DBGF("Could not add flow."); -                pthread_mutex_unlock(&_ipcp->data->flow_lock); -                close(flow->fd); -                free(flow); +        if (connect(fd, +                    (struct sockaddr *) &rf_saddr, +                    sizeof rf_saddr) +            < 0) { +                free(recv_buf);                  return -1;          } -        pthread_mutex_unlock(&_ipcp->data->flow_lock); +        if (!strcmp(recv_buf, dst_name)) +                LOG_WARN("Incorrect echo from server"); + +        free(recv_buf); + +        _ap_instance->flows[fd].port_id = port_id; +        _ap_instance->flows[fd].state   = FLOW_ALLOCATED; +        _ap_instance->flows[fd].rb      = shm_ap_rbuff_open(n_pid); +        if (_ap_instance->flows[fd].rb == NULL) { +                LOG_ERR("Could not open N + 1 ringbuffer."); +                close(fd); +        }          /* tell IRMd that flow allocation "worked" */ -        if (ipcp_flow_alloc_reply(getpid(), flow->flow.port_id, 0)) { +        if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {                  LOG_ERR("Failed to notify IRMd about flow allocation reply"); -                close(flow->fd); -                ipcp_data_del_flow(_ipcp->data, flow->flow.port_id); +                close(fd); +                shm_ap_rbuff_close(_ap_instance->flows[fd].rb);                  return -1;          } -        FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); -        shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; +        FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); -        return 0; +        pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); +        pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], +                       NULL, +                       ipcp_udp_sdu_reader, +                       NULL); +        _ap_instance->ping_pong = !_ap_instance->ping_pong; + +        LOG_DBG("Allocated flow with port_id %u on UDP fd %d.", port_id, fd); + +        return fd;  }  int ipcp_udp_flow_alloc_resp(uint32_t port_id, +                             pid_t    n_pid,                               int      response)  { -        struct udp_flow * flow = -                (struct udp_flow *) ipcp_data_find_flow(_ipcp->data, port_id); -        if (flow == NULL) { -                return -1; +        int fd = port_id_to_fd(port_id); +        if (fd < 0) { +                LOG_DBGF("Could not find flow with port_id %u.", port_id); +                return 0;          } -        if (response) { -                ipcp_data_del_flow(_ipcp->data, port_id); +        if (response)                  return 0; -        }          /* awaken pending flow */ -        if (flow->flow.state != FLOW_PENDING) +        if (_ap_instance->flows[fd].state != FLOW_PENDING) { +                LOG_DBGF("Flow was not pending.");                  return -1; +        } + +        _ap_instance->flows[fd].state = FLOW_ALLOCATED; +        _ap_instance->flows[fd].rb    = shm_ap_rbuff_open(n_pid); +        if (_ap_instance->flows[fd].rb == NULL) { +                LOG_ERR("Could not open N + 1 ringbuffer."); +                _ap_instance->flows[fd].state   = FLOW_NULL; +                _ap_instance->flows[fd].port_id = 0; +                return 0; +        } -        flow->flow.state = FLOW_ALLOCATED; +        FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + +        pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); +        pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], +                       NULL, +                       ipcp_udp_sdu_reader, +                       NULL); +        _ap_instance->ping_pong = !_ap_instance->ping_pong; + +        LOG_DBG("Accepted flow, port_id %u on UDP fd %d.", port_id, fd);          return 0;  }  int ipcp_udp_flow_dealloc(uint32_t port_id)  { -        return 0; -} +        int fd = port_id_to_fd(port_id); +        if (fd < 0) { +                LOG_DBGF("Could not find flow with port_id %u.", port_id); +                return 0; +        } -int ipcp_udp_du_write(uint32_t port_id, -                      size_t map_index) -{ +        _ap_instance->flows[fd].state   = FLOW_NULL; +        _ap_instance->flows[fd].port_id = 0; +        if (_ap_instance->flows[fd].rb != NULL) +                shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + +        FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);          return 0;  } -int ipcp_udp_du_read(uint32_t port_id, -                     size_t map_index) +/* FIXME: may be crap, didn't think this one through */ +int ipcp_udp_flow_dealloc_arr(uint32_t port_id)  { -        return 0; +        int fd = port_id_to_fd(port_id); +        if (fd < 0) { +                LOG_DBGF("Could not find flow with port_id %u.", port_id); +                return 0; +        } + +        _ap_instance->flows[fd].state   = FLOW_NULL; +        _ap_instance->flows[fd].port_id = 0; +        if (_ap_instance->flows[fd].rb != NULL) +                shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + +        FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + +        return ipcp_flow_dealloc(0, port_id);  }  struct ipcp * ipcp_udp_create(char * ap_name) @@ -617,11 +802,14 @@ struct ipcp * ipcp_udp_create(char * ap_name)          struct ipcp_udp_data * data;          struct ipcp_ops *      ops; +        if (shim_ap_init(ap_name) < 0) +                return NULL; +          i = malloc(sizeof *i);          if (i == NULL)                  return NULL; -        data = ipcp_udp_data_create(ap_name); +        data = ipcp_udp_data_create();          if (data == NULL) {                  free(i);                  return NULL; @@ -643,8 +831,6 @@ struct ipcp * ipcp_udp_create(char * ap_name)          ops->ipcp_flow_alloc      = ipcp_udp_flow_alloc;          ops->ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp;          ops->ipcp_flow_dealloc    = ipcp_udp_flow_dealloc; -        ops->ipcp_du_read         = ipcp_udp_du_read; -        ops->ipcp_du_write        = ipcp_udp_du_write;          i->data = (struct ipcp_data *) data;          i->ops  = ops; @@ -656,6 +842,40 @@ struct ipcp * ipcp_udp_create(char * ap_name)  #ifndef MAKE_CHECK +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +/* FIXME: stop eating the CPU */ +void * ipcp_udp_sdu_loop(void * o) +{ +        while (true) { +                struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); +                int fd; +                int len = 0; +                char * buf; +                if (e == NULL) +                        continue; + +                len = shm_du_map_read_sdu((uint8_t **) &buf, +                                          _ap_instance->dum, +                                          e->index); +                if (len == -1) +                        continue; + +                fd = port_id_to_fd(e->port_id); + +                if (fd == -1) +                        continue; + +                if (len == 0) +                        continue; + +                send(fd, buf, len, 0); + +                shm_release_du_buff(_ap_instance->dum, e->index); +        } + +        return (void *) 1; +} +  int main (int argc, char * argv[])  {          /* argument 1: pid of irmd ? */ @@ -680,6 +900,7 @@ int main (int argc, char * argv[])          sigaction(SIGINT,  &sig_act, NULL);          sigaction(SIGTERM, &sig_act, NULL);          sigaction(SIGHUP,  &sig_act, NULL); +        sigaction(SIGPIPE, &sig_act, NULL);          _ipcp = ipcp_udp_create(argv[2]);          if (_ipcp == NULL) { @@ -687,7 +908,18 @@ int main (int argc, char * argv[])                  exit(1);          } -        ipcp_main_loop(_ipcp); +        pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); +        pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); + +        pthread_join(_ap_instance->sduloop, NULL); +        pthread_join(_ap_instance->mainloop, NULL); +        pthread_join(_ap_instance->handler, NULL); +        pthread_join(_ap_instance->sdu_reader[0], NULL); +        pthread_join(_ap_instance->sdu_reader[1], NULL); + +        ipcp_udp_destroy(_ipcp); + +        shim_ap_fini();          exit(0);  } diff --git a/src/ipcpd/shim-udp/tests/shim_udp_test.c b/src/ipcpd/shim-udp/tests/shim_udp_test.c index 036f5877..e5e8b32d 100644 --- a/src/ipcpd/shim-udp/tests/shim_udp_test.c +++ b/src/ipcpd/shim-udp/tests/shim_udp_test.c @@ -59,7 +59,7 @@ int shim_udp_test(int argc, char ** argv)          _ipcp = ipcp_udp_create(ipcp_name);          if (_ipcp == NULL) {                  LOG_ERR("Could not instantiate shim IPCP."); -                shm_du_map_close(dum); +                shm_du_map_destroy(dum);                  exit(1);          } @@ -69,13 +69,13 @@ int shim_udp_test(int argc, char ** argv)          if (ipcp_udp_name_reg("bogus name")) {                  LOG_ERR("Failed to register application."); -                shm_du_map_close(dum); +                shm_du_map_destroy(dum);                  exit(1);          }          if (ipcp_udp_name_unreg("bogus name")) {                  LOG_ERR("Failed to unregister application."); -                shm_du_map_close(dum); +                shm_du_map_destroy(dum);                  exit(1);          } @@ -83,7 +83,7 @@ int shim_udp_test(int argc, char ** argv)                  sprintf(bogus, "bogus name %4d", i);                  if (ipcp_udp_name_reg(bogus)) {                           LOG_ERR("Failed to register application %s.", bogus); -                         shm_du_map_close(dum); +                         shm_du_map_destroy(dum);                           exit(1);                  }          } @@ -92,12 +92,12 @@ int shim_udp_test(int argc, char ** argv)                  sprintf(bogus, "bogus name %4d", i);                  if(ipcp_udp_name_unreg(bogus)) {                           LOG_ERR("Failed to unregister application %s.", bogus); -                         shm_du_map_close(dum); +                         shm_du_map_destroy(dum);                           exit(1);                  }          } -        shm_du_map_close(dum); +        shm_du_map_destroy(dum);          exit(0);  } | 
