diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/flow.c | 38 | ||||
| -rw-r--r-- | src/ipcpd/flow.h | 12 | ||||
| -rw-r--r-- | src/ipcpd/ipcp-data.c | 104 | ||||
| -rw-r--r-- | src/ipcpd/ipcp-data.h | 16 | ||||
| -rw-r--r-- | src/ipcpd/ipcp-ops.h | 9 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.c | 20 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.h | 3 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 524 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/tests/shim_udp_test.c | 12 | ||||
| -rw-r--r-- | src/irmd/main.c | 764 | ||||
| -rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/lib/bitmap.c | 19 | ||||
| -rw-r--r-- | src/lib/dev.c | 344 | ||||
| -rw-r--r-- | src/lib/ipcp.c | 65 | ||||
| -rw-r--r-- | src/lib/ipcpd_messages.proto | 6 | ||||
| -rw-r--r-- | src/lib/irmd_messages.proto | 26 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 268 | ||||
| -rw-r--r-- | src/lib/shm_du_map.c | 143 | ||||
| -rw-r--r-- | src/lib/tests/shm_du_map_test.c | 53 | ||||
| -rw-r--r-- | src/tools/echo/echo_client.c | 17 | ||||
| -rw-r--r-- | src/tools/echo/echo_server.c | 47 | 
21 files changed, 1732 insertions, 759 deletions
| diff --git a/src/ipcpd/flow.c b/src/ipcpd/flow.c index c436733b..ae8f848c 100644 --- a/src/ipcpd/flow.c +++ b/src/ipcpd/flow.c @@ -27,7 +27,7 @@  #include <ouroboros/logs.h> -flow_t * flow_create(int32_t port_id) +flow_t * flow_create(uint32_t port_id)  {          flow_t * flow = malloc(sizeof *flow);          if (flow == NULL) { @@ -38,8 +38,7 @@ flow_t * flow_create(int32_t port_id)          INIT_LIST_HEAD(&flow->list);          flow->port_id = port_id; -        flow->oflags = FLOW_O_DEFAULT; -        flow->state = FLOW_NULL; +        flow->state   = FLOW_NULL;          pthread_mutex_init(&flow->lock, NULL); @@ -52,36 +51,3 @@ void flow_destroy(flow_t * flow)                  return;          free(flow);  } - -int flow_set_opts(flow_t * flow, uint16_t opts) -{ -        if (flow == NULL) { -                LOG_DBGF("Non-existing flow."); -                return -1; -        } - -        pthread_mutex_lock(&flow->lock); - -        if ((opts & FLOW_O_ACCMODE) == FLOW_O_ACCMODE) { -                flow->oflags = FLOW_O_DEFAULT; -                pthread_mutex_unlock(&flow->lock); -                LOG_WARN("Invalid flow options. Setting default."); -                return -1; -        } - -        flow->oflags = opts; - -        pthread_mutex_unlock(&flow->lock); - -        return 0; -} - -uint16_t flow_get_opts(const flow_t * flow) -{ -        if (flow == NULL) { -                LOG_DBGF("Non-existing flow."); -                return FLOW_O_INVALID; -        } - -        return flow->oflags; -} diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h index 000de5ad..0a3e90d1 100644 --- a/src/ipcpd/flow.h +++ b/src/ipcpd/flow.h @@ -25,6 +25,7 @@  #include <ouroboros/common.h>  #include <ouroboros/list.h> +#include <ouroboros/shm_ap_rbuff.h>  #include <pthread.h>  /* same values as fcntl.h */ @@ -47,17 +48,14 @@ enum flow_state {  typedef struct flow {          struct list_head list; -        int32_t          port_id; -        uint16_t         oflags; -        enum flow_state  state; +        uint32_t              port_id; +        struct shm_ap_rbuff * rb; +        enum flow_state       state;          pthread_mutex_t  lock;  } flow_t; -flow_t * flow_create(int32_t   port_id); +flow_t * flow_create(uint32_t port_id);  void     flow_destroy(flow_t * flow); -int      flow_set_opts(flow_t * flow, uint16_t opts); -uint16_t flow_get_opts(const flow_t * flow); -  #endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c index 72407a53..76fc4bcd 100644 --- a/src/ipcpd/ipcp-data.c +++ b/src/ipcpd/ipcp-data.c @@ -96,46 +96,26 @@ struct ipcp_data * ipcp_data_create()          if (data == NULL)                  return NULL; -        data->iname = NULL;          data->type  = 0; -        data->dum   = NULL;          return data;  }  struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, -                                  const char *       ipcp_name,                                    enum ipcp_type     ipcp_type)  {          if (dst == NULL)                  return NULL; -        dst->iname = instance_name_create(); -        if (dst->iname == NULL) -                return NULL; - -        if(instance_name_init_from(dst->iname, ipcp_name, getpid()) == NULL) { -                instance_name_destroy(dst->iname); -                return NULL; -        } -          dst->type  = ipcp_type; -        dst->dum = shm_du_map_open(); -        if (dst->dum == NULL) { -                instance_name_destroy(dst->iname); -                return NULL; -        } -          /* init the lists */          INIT_LIST_HEAD(&dst->registry); -        INIT_LIST_HEAD(&dst->flows);          INIT_LIST_HEAD(&dst->directory);          /* init the mutexes */          pthread_mutex_init(&dst->reg_lock, NULL);          pthread_mutex_init(&dst->dir_lock, NULL); -        pthread_mutex_init(&dst->flow_lock, NULL);          return dst;  } @@ -156,42 +136,22 @@ static void clear_directory(struct ipcp_data * data)                  dir_entry_destroy(list_entry(h, struct dir_entry, list));  } -static void clear_flows(struct ipcp_data * data) -{ -        struct list_head * h; -        struct list_head * t; -        list_for_each_safe(h, t, &data->flows) -                flow_destroy(list_entry(h, flow_t, list)); - -} -  void ipcp_data_destroy(struct ipcp_data * data)  {          if (data == NULL)                  return; -        /* FIXME: finish all pending operations here */ - -        if (data->iname != NULL) -                instance_name_destroy(data->iname); -        data->iname = NULL; - -        if (data->dum != NULL) -                shm_du_map_close(data->dum); -        data->dum = NULL; +        /* FIXME: finish all pending operations here and cancel all threads */          pthread_mutex_lock(&data->reg_lock);          pthread_mutex_lock(&data->dir_lock); -        pthread_mutex_lock(&data->flow_lock);          /* clear the lists */          clear_registry(data);          clear_directory(data); -        clear_flows(data);          /*           * no need to unlock, just free the entire thing -         * pthread_mutex_unlock(&data->flow_lock);           * pthread_mutex_unlock(&data->dir_lock);           * pthread_mutex_unlock(&data->reg_lock);           */ @@ -380,65 +340,3 @@ uint64_t ipcp_data_get_addr(struct ipcp_data * data,          return addr;  } - -flow_t * ipcp_data_find_flow(struct ipcp_data * data, -                             uint32_t           port_id) -{ -        struct list_head * h; -        list_for_each(h, &data->flows) { -                flow_t * f = list_entry(h, flow_t, list); -                if (f->port_id == port_id) -                        return f; -        } - -        return NULL; -} - -bool ipcp_data_has_flow(struct ipcp_data * data, -                        uint32_t           port_id) -{ -        return ipcp_data_find_flow(data, port_id) != NULL; -} - -int ipcp_data_add_flow(struct ipcp_data * data, -                       flow_t *           flow) -{ -        if (data == NULL || flow == NULL) -                return -1; - -        pthread_mutex_lock(&data->flow_lock); - -        if (ipcp_data_has_flow(data, flow->port_id)) { -                pthread_mutex_unlock(&data->flow_lock); -                return -2; -        } - -        list_add(&flow->list,&data->flows); - -        pthread_mutex_unlock(&data->flow_lock); - -        return 0; -} - -int ipcp_data_del_flow(struct ipcp_data * data, -                       uint32_t           port_id) -{ -        flow_t * f; - -        if (data == NULL) -                return -1; - -        pthread_mutex_lock(&data->flow_lock); - -        f = ipcp_data_find_flow(data, port_id); -        if (f == NULL) -                return -1; - -        list_del(&f->list); - -        free(f); - -        pthread_mutex_unlock(&data->flow_lock); - -        return 0; -} diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index 1dea8c3c..2e86ba11 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -34,17 +34,11 @@  #include "flow.h"  struct ipcp_data { -        instance_name_t   * iname;          enum ipcp_type      type; -        struct shm_du_map * dum; -          struct list_head    registry;          pthread_mutex_t     reg_lock; -        struct list_head    flows; -        pthread_mutex_t     flow_lock; -          struct list_head    directory;          pthread_mutex_t     dir_lock; @@ -53,7 +47,6 @@ struct ipcp_data {  struct ipcp_data * ipcp_data_create();  struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, -                                  const char *       ipcp_name,                                    enum ipcp_type     ipcp_type);  void               ipcp_data_destroy(struct ipcp_data * data); @@ -73,13 +66,4 @@ bool         ipcp_data_is_in_directory(struct ipcp_data * data,                                         const char *       ap_name);  uint64_t     ipcp_data_get_addr(struct ipcp_data * data,                                  const char *       ap_name); -bool         ipcp_data_has_flow(struct ipcp_data * data, -                                uint32_t           port_id); -flow_t *     ipcp_data_find_flow(struct ipcp_data * data, -                                 uint32_t           port_id); -int          ipcp_data_add_flow(struct ipcp_data * data, -                                flow_t *           flow); -int          ipcp_data_del_flow(struct ipcp_data * data, -                                uint32_t           port_id); -  #endif /* IPCPD_IPCP_DATA_H */ diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index 2ccb2e59..91b6cac9 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -39,20 +39,15 @@ struct ipcp_ops {          int   (* ipcp_name_reg)(char *   name);          int   (* ipcp_name_unreg)(char * name);          int   (* ipcp_flow_alloc)(uint32_t port_id, +                                  pid_t    n_pid,                                    char *   dst_ap_name,                                    char *   src_ap_name,                                    char *   src_ae_name,                                    struct qos_spec * qos);          int   (* ipcp_flow_alloc_resp)(uint32_t port_id, +                                       pid_t    n_pid,                                         int      response);          int   (* ipcp_flow_dealloc)(uint32_t port_id); - -        /* FIXME: let's see how this will work with the shm_du_map */ -        int   (* ipcp_du_write)(uint32_t port_id, -                                size_t map_index); - -        int   (* ipcp_du_read)(uint32_t port_id, -                               size_t map_index);  };  #endif /* IPCPD_IPCP_OPS_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index d6f373cd..13632a80 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -45,11 +45,12 @@ int ipcp_arg_check(int argc, char * argv[])          return 0;  } -int ipcp_main_loop(struct ipcp * _ipcp) +void * ipcp_main_loop(void * o)  {          int     lsockfd;          int     sockfd;          uint8_t buf[IPCP_MSG_BUF_SIZE]; +        struct ipcp * _ipcp = (struct ipcp *) o;          ipcp_msg_t *    msg;          ssize_t         count; @@ -61,13 +62,13 @@ int ipcp_main_loop(struct ipcp * _ipcp)          if (_ipcp == NULL) {                  LOG_ERR("Invalid ipcp struct."); -                return 1; +                return (void *) 1;          }          sockfd = server_socket_open(ipcp_sock_path(getpid()));          if (sockfd < 0) {                  LOG_ERR("Could not open server socket."); -                return 1; +                return (void *) 1;          }          while (true) { @@ -113,7 +114,7 @@ int ipcp_main_loop(struct ipcp * _ipcp)                                  conf.max_pdu_size    = conf_msg->max_pdu_size;                          }                          if (conf_msg->ipcp_type == IPCP_SHIM_UDP) { -                                conf.ip_addr = conf_msg->ip_addr; +                                conf.ip_addr  = conf_msg->ip_addr;                                  conf.dns_addr = conf_msg->dns_addr;                          } @@ -149,7 +150,8 @@ int ipcp_main_loop(struct ipcp * _ipcp)                          }                          ret_msg.has_result = true;                          ret_msg.result = -                                _ipcp->ops->ipcp_unreg(msg->dif_names, msg->len); +                                _ipcp->ops->ipcp_unreg(msg->dif_names, +                                                       msg->len);                          break;                  case IPCP_MSG_CODE__IPCP_NAME_REG:                          if (_ipcp->ops->ipcp_name_reg == NULL) { @@ -172,9 +174,10 @@ int ipcp_main_loop(struct ipcp * _ipcp)                                  LOG_ERR("Flow_alloc unsupported.");                                  break;                          } -                        ret_msg.has_fd = true; -                        ret_msg.fd = +                        ret_msg.has_result = true; +                        ret_msg.result =                                  _ipcp->ops->ipcp_flow_alloc(msg->port_id, +                                                            msg->pid,                                                              msg->dst_name,                                                              msg->src_ap_name,                                                              msg->src_ae_name, @@ -188,6 +191,7 @@ int ipcp_main_loop(struct ipcp * _ipcp)                          ret_msg.has_result = true;                          ret_msg.result =                                  _ipcp->ops->ipcp_flow_alloc_resp(msg->port_id, +                                                                 msg->pid,                                                                   msg->result);                          break;                  case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: @@ -231,5 +235,5 @@ int ipcp_main_loop(struct ipcp * _ipcp)                  close(lsockfd);          } -        return 0; +        return NULL;  } diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9decac8b..393af994 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -43,7 +43,8 @@ struct ipcp {          int                irmd_fd;  }; -int ipcp_main_loop(); +void * ipcp_main_loop(void * o); +void * ipcp_sdu_loop(void * o);  int ipcp_arg_check(int argc, char * argv[]);  #endif diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 460fe9e3..1f7bb12f 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -24,12 +24,13 @@  #include "ipcp.h"  #include "flow.h"  #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h>  #include <ouroboros/ipcp.h>  #include <ouroboros/dif_config.h>  #include <ouroboros/sockets.h> -#include <ouroboros/dev.h> +#include <ouroboros/bitmap.h>  #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -67,6 +68,144 @@ extern struct ipcp * _ipcp; /* defined in test */  struct ipcp * _ipcp;  #endif +/* + * copied from ouroboros/dev. The shim needs access to the internals + * because it doesn't follow all steps necessary steps to get + * the info + */ + +#define UNKNOWN_AP "__UNKNOWN_AP__" +#define UNKNOWN_AE "__UNKNOWN_AE__" + +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE +  #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE +  #define DU_BUFF_TAILSPACE 0 +#endif + +/* the shim needs access to these internals */ +struct shim_ap_data { +        instance_name_t *     api; +        struct shm_du_map *   dum; +        struct bmp *          fds; + +        struct shm_ap_rbuff * rb; +        struct flow           flows[AP_MAX_FLOWS]; + +        pthread_t mainloop; +        pthread_t sduloop; +        pthread_t handler; +        pthread_t sdu_reader[2]; +        int       ping_pong; +} * _ap_instance; + +int shim_ap_init(char * ap_name) +{ +        _ap_instance = malloc(sizeof(struct shim_ap_data)); +        if (_ap_instance == NULL) { +                return -1; +        } + +        _ap_instance->api = instance_name_create(); +        if (_ap_instance->api == NULL) { +                free(_ap_instance); +                return -1; +        } + +        if (instance_name_init_from(_ap_instance->api, +                                    ap_name, +                                    getpid()) == NULL) { +                instance_name_destroy(_ap_instance->api); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); +        if (_ap_instance->fds == NULL) { +                instance_name_destroy(_ap_instance->api); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->dum = shm_du_map_open(); +        if (_ap_instance->dum == NULL) { +                instance_name_destroy(_ap_instance->api); +                bmp_destroy(_ap_instance->fds); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->rb = shm_ap_rbuff_create(); +        if (_ap_instance->rb == NULL) { +                instance_name_destroy(_ap_instance->api); +                bmp_destroy(_ap_instance->fds); +                free(_ap_instance); +                return -1; +        } + +        return 0; +} + +void shim_ap_fini() +{ +        int i = 0; + +        if (_ap_instance == NULL) +                return; +        if (_ap_instance->api != NULL) +                instance_name_destroy(_ap_instance->api); +        if (_ap_instance->fds != NULL) +                bmp_destroy(_ap_instance->fds); +        if (_ap_instance->dum != NULL) +                shm_du_map_close(_ap_instance->dum); +        if (_ap_instance->rb != NULL) +                shm_ap_rbuff_destroy(_ap_instance->rb); +        for (i = 0; i < AP_MAX_FLOWS; i ++) +                if (_ap_instance->flows[i].rb != NULL) +                        shm_ap_rbuff_close(_ap_instance->flows[i].rb); + +        free(_ap_instance); +} + +static int port_id_to_fd(uint32_t port_id) +{ +        int i; +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                if (_ap_instance->flows[i].port_id == port_id +                        && _ap_instance->flows[i].state != FLOW_NULL) +                        return i; +        return -1; +} + +static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) +{ +        /* the AP chooses the amount of headspace and tailspace */ +        size_t index = shm_create_du_buff(_ap_instance->dum, +                                          count, +                                          0, +                                          buf, +                                          count); +        struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + +        if (index == -1) +                return -1; + +        if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { +                shm_release_du_buff(_ap_instance->dum, index); +                return -EPIPE; +        } + +        return 0; +} + +/* + * end copy from dev.c + */ +  struct ipcp_udp_data {          /* keep ipcp_data first for polymorphism */          struct ipcp_data ipcp_data; @@ -79,39 +218,15 @@ struct ipcp_udp_data {          int                s_fd;          fd_set flow_fd_s; -        flow_t * fd_to_flow_ptr[FD_SETSIZE]; -        pthread_mutex_t   lock; +        pthread_mutex_t lock;  }; -struct udp_flow { -        /* keep flow first for polymorphism */ -        flow_t flow; -        int    fd; -}; - -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) -{ -        switch(sig) { -        case SIGINT: -        case SIGTERM: -        case SIGHUP: -                LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); -                if (info->si_pid == irmd_pid) { -                        /* shm_du_map_close(_ipcp->data->dum); */ -                        exit(0); -                } -        default: -                return; -        } -} - -struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) +struct ipcp_udp_data * ipcp_udp_data_create()  {          struct ipcp_udp_data * udp_data;          struct ipcp_data *     data;          enum ipcp_type         ipcp_type; -        int                    n;          udp_data = malloc(sizeof *udp_data);          if (udp_data == NULL) { @@ -121,18 +236,52 @@ struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name)          ipcp_type = THIS_TYPE;          data = (struct ipcp_data *) udp_data; -        if (ipcp_data_init(data, ap_name, ipcp_type) == NULL) { +        if (ipcp_data_init(data, ipcp_type) == NULL) {                  free(udp_data);                  return NULL;          }          FD_ZERO(&udp_data->flow_fd_s); -        for (n = 0; n < FD_SETSIZE; ++n) -                udp_data->fd_to_flow_ptr[n] = NULL;          return udp_data;  } +void ipcp_udp_data_destroy(struct ipcp_udp_data * data) +{ +        if (data == NULL) +                return; + +        ipcp_data_destroy((struct ipcp_data *) data); +} + +void ipcp_udp_destroy(struct ipcp * ipcp) +{ +        ipcp_udp_data_destroy((struct ipcp_udp_data *) ipcp->data); +        shim_ap_fini(); +        free(ipcp); +} + +void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +{ +        switch(sig) { +        case SIGINT: +        case SIGTERM: +        case SIGHUP: +                if (info->si_pid == irmd_pid || info->si_pid == 0) { +                        LOG_DBG("Terminating by order of %d. Bye.", +                                info->si_pid); +                        pthread_cancel(_ap_instance->mainloop); +                        pthread_cancel(_ap_instance->handler); +                        pthread_cancel(_ap_instance->sdu_reader[0]); +                        pthread_cancel(_ap_instance->sdu_reader[1]); +                        pthread_cancel(_ap_instance->sduloop); +                        exit(0); +                } +        default: +                return; +        } +} +  static void * ipcp_udp_listener()  {          char buf[SHIM_UDP_BUF_SIZE]; @@ -141,10 +290,10 @@ static void * ipcp_udp_listener()          struct sockaddr_in f_saddr;          struct sockaddr_in c_saddr;          struct hostent  *  hostp; -        struct udp_flow *  flow;          int                sfd = shim_data(_ipcp)->s_fd;          while (true) { +                int fd;                  n = sizeof c_saddr;                  n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,                               (struct sockaddr *) &c_saddr, (unsigned *) &n); @@ -157,16 +306,7 @@ static void * ipcp_udp_listener()                  if (hostp == NULL)                          continue; -                /* create a new socket for the server */ -                flow = malloc(sizeof *flow); -                if (flow == NULL) -                        continue; - -                flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); -                if (flow->fd == -1) { -                        free(flow); -                        continue; -                } +                fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);                  memset((char *) &f_saddr, 0, sizeof f_saddr);                  f_saddr.sin_family      = AF_INET; @@ -185,36 +325,33 @@ static void * ipcp_udp_listener()                   * the flow structure                   */ -                if (connect(flow->fd, +                if (connect(fd,                              (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { -                        close(flow->fd); -                        free(flow); +                        close(fd);                          continue;                  } +                /* echo back the packet */ +                while(send(fd, buf, strlen(buf), 0) < 0) +                        ; +                  /* reply to IRM */ -                flow->flow.port_id = ipcp_flow_req_arr(getpid(), buf, -                                                       UNKNOWN_AP, ""); -                if (flow->flow.port_id < 0) { +                _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), +                                                                    buf, +                                                                    UNKNOWN_AP, +                                                                    UNKNOWN_AE); +                if (_ap_instance->flows[fd].port_id < 0) {                          LOG_ERR("Could not get port id from IRMd"); -                        close(flow->fd); -                        free(flow); +                        close(fd);                          continue;                  } -                flow->flow.oflags = FLOW_O_DEFAULT; -                flow->flow.state  = FLOW_PENDING; - -                if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { -                        LOG_DBGF("Could not add flow."); -                        close(flow->fd); -                        free(flow); -                        continue; -                } +                _ap_instance->flows[fd].rb     = NULL; +                _ap_instance->flows[fd].state  = FLOW_PENDING; -                FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); -                shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; +                LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", +                         _ap_instance->flows[fd].port_id, fd);          }          return 0; @@ -229,8 +366,6 @@ static void * ipcp_udp_sdu_reader()          struct sockaddr_in r_saddr;          while (true) { -                flow_t * flow; -                  if (select(FD_SETSIZE,                             &shim_data(_ipcp)->flow_fd_s,                             NULL, NULL, NULL) @@ -249,18 +384,8 @@ static void * ipcp_udp_sdu_reader()                                       (struct sockaddr *) &r_saddr,                                       (unsigned *) &n); -                        flow = shim_data(_ipcp)->fd_to_flow_ptr[fd]; -                        if (flow->state == FLOW_PENDING) { -                                if (connect(fd, -                                            (struct sockaddr *) &r_saddr, -                                            sizeof r_saddr) -                                    < 0) -                                        continue; -                                flow->state = FLOW_ALLOCATED; -                        } -                          /* send the sdu to the correct port_id */ -                        LOG_MISSING; +                        ipcp_udp_flow_write(fd, buf, n);                  }          } @@ -271,8 +396,6 @@ int ipcp_udp_bootstrap(struct dif_config * conf)  {          char ipstr[INET_ADDRSTRLEN];          char dnsstr[INET_ADDRSTRLEN]; -        pthread_t handler; -        pthread_t sdu_reader;          int enable = 1;          if (conf->type != THIS_TYPE) { @@ -296,7 +419,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf)                            dnsstr,                            INET_ADDRSTRLEN);          else -                strcpy(dnsstr, "not set.\n"); +                strcpy(dnsstr, "not set");          shim_data(_ipcp)->ip_addr  = conf->ip_addr;          shim_data(_ipcp)->dns_addr = conf->dns_addr; @@ -304,7 +427,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf)          /* UDP listen server */          if ((shim_data(_ipcp)->s_fd = -             socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { +             socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {                  LOG_DBGF("Can't create socket.");                  return -1;          } @@ -328,13 +451,28 @@ int ipcp_udp_bootstrap(struct dif_config * conf)                  return -1;          } -        pthread_create(&handler, NULL, ipcp_udp_listener, NULL); -        pthread_create(&sdu_reader, NULL, ipcp_udp_sdu_reader, NULL); +        FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + +        pthread_create(&_ap_instance->handler, +                       NULL, +                       ipcp_udp_listener, +                       NULL); +        pthread_create(&_ap_instance->sdu_reader[0], +                       NULL, +                       ipcp_udp_sdu_reader, +                       NULL); + +        pthread_create(&_ap_instance->sdu_reader[1], +                       NULL, +                       ipcp_udp_sdu_reader, +                       NULL); + +        _ap_instance->ping_pong = 0;          _ipcp->state = IPCP_ENROLLED; -        LOG_DBG("Bootstrapped shim IPCP over UDP %s-%d.", -                _ipcp->data->iname->name, _ipcp->data->iname->id); +        LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", +                getpid());          LOG_DBG("Bound to IP address %s.", ipstr);          LOG_DBG("DNS server address is %s.", dnsstr); @@ -464,23 +602,25 @@ int ipcp_udp_name_unreg(char * name)  }  int ipcp_udp_flow_alloc(uint32_t          port_id, +                        pid_t             n_pid,                          char *            dst_name,                          char *            src_ap_name,                          char *            src_ae_name,                          struct qos_spec * qos)  { -        struct udp_flow *  flow = NULL;          struct sockaddr_in l_saddr;          struct sockaddr_in r_saddr; +        struct sockaddr_in rf_saddr; +        int                fd; +        int n; + +        char * recv_buf = NULL;          struct hostent * h;          if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)                  return -1; -        LOG_DBG("Received flow allocation request from %s to %s.", -                src_ap_name, dst_name); -          if (strlen(dst_name) > 255              || strlen(src_ap_name) > 255              || strlen(src_ae_name) > 255) { @@ -491,15 +631,7 @@ int ipcp_udp_flow_alloc(uint32_t          port_id,          if (qos != NULL)                  LOG_DBGF("QoS requested. UDP/IP can't do that."); -        flow = malloc(sizeof *flow); -        if (flow == NULL) -                return -1; - -        flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); -        if (flow->fd == -1) { -                free(flow); -                return -1; -        } +        fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);          /* this socket is for the flow */          memset((char *) &l_saddr, 0, sizeof l_saddr); @@ -507,108 +639,161 @@ int ipcp_udp_flow_alloc(uint32_t          port_id,          l_saddr.sin_addr.s_addr = local_ip;          l_saddr.sin_port        = 0; -        if (bind(flow->fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { -                char ipstr[INET_ADDRSTRLEN]; -                inet_ntop(AF_INET, -                          &l_saddr.sin_addr.s_addr, -                          ipstr, -                          INET_ADDRSTRLEN); -                close(flow->fd); -                free(flow); +        if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { +                close(fd);                  return -1;          }          h = gethostbyname(dst_name);          if (h == NULL) {                  LOG_DBGF("Could not resolve %s.", dst_name); -                close(flow->fd); -                free(flow); +                close(fd);                  return -1;          } -          memset((char *) &r_saddr, 0, sizeof r_saddr);          r_saddr.sin_family      = AF_INET; -        r_saddr.sin_addr.s_addr = (uint32_t) *(h->h_addr_list[0]); +        r_saddr.sin_addr.s_addr = *((uint32_t *) (h->h_addr_list[0]));          r_saddr.sin_port        = LISTEN_PORT; +          /* at least try to get the packet on the wire */ -        while (sendto(flow->fd, dst_name, strlen(dst_name), 0, +        while (sendto(fd, dst_name, strlen(dst_name), 0,                        (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) {          } -        flow->flow.port_id = port_id; -        flow->flow.oflags  = FLOW_O_DEFAULT; -        flow->flow.state   = FLOW_PENDING; - -        /* add flow to the list */ +        /* wait for the other shim IPCP to respond */ -        pthread_mutex_lock(&_ipcp->data->flow_lock); +        recv_buf = malloc(strlen(dst_name) + 1); +        n = sizeof(rf_saddr); +        n = recvfrom(fd, +                     recv_buf, +                     strlen(dst_name) + 1, +                     0, +                     (struct sockaddr *) &rf_saddr, +                     (unsigned *) &n); -        if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { -                LOG_DBGF("Could not add flow."); -                pthread_mutex_unlock(&_ipcp->data->flow_lock); -                close(flow->fd); -                free(flow); +        if (connect(fd, +                    (struct sockaddr *) &rf_saddr, +                    sizeof rf_saddr) +            < 0) { +                free(recv_buf);                  return -1;          } -        pthread_mutex_unlock(&_ipcp->data->flow_lock); +        if (!strcmp(recv_buf, dst_name)) +                LOG_WARN("Incorrect echo from server"); + +        free(recv_buf); + +        _ap_instance->flows[fd].port_id = port_id; +        _ap_instance->flows[fd].state   = FLOW_ALLOCATED; +        _ap_instance->flows[fd].rb      = shm_ap_rbuff_open(n_pid); +        if (_ap_instance->flows[fd].rb == NULL) { +                LOG_ERR("Could not open N + 1 ringbuffer."); +                close(fd); +        }          /* tell IRMd that flow allocation "worked" */ -        if (ipcp_flow_alloc_reply(getpid(), flow->flow.port_id, 0)) { +        if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {                  LOG_ERR("Failed to notify IRMd about flow allocation reply"); -                close(flow->fd); -                ipcp_data_del_flow(_ipcp->data, flow->flow.port_id); +                close(fd); +                shm_ap_rbuff_close(_ap_instance->flows[fd].rb);                  return -1;          } -        FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); -        shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; +        FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); -        return 0; +        pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); +        pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], +                       NULL, +                       ipcp_udp_sdu_reader, +                       NULL); +        _ap_instance->ping_pong = !_ap_instance->ping_pong; + +        LOG_DBG("Allocated flow with port_id %u on UDP fd %d.", port_id, fd); + +        return fd;  }  int ipcp_udp_flow_alloc_resp(uint32_t port_id, +                             pid_t    n_pid,                               int      response)  { -        struct udp_flow * flow = -                (struct udp_flow *) ipcp_data_find_flow(_ipcp->data, port_id); -        if (flow == NULL) { -                return -1; +        int fd = port_id_to_fd(port_id); +        if (fd < 0) { +                LOG_DBGF("Could not find flow with port_id %u.", port_id); +                return 0;          } -        if (response) { -                ipcp_data_del_flow(_ipcp->data, port_id); +        if (response)                  return 0; -        }          /* awaken pending flow */ -        if (flow->flow.state != FLOW_PENDING) +        if (_ap_instance->flows[fd].state != FLOW_PENDING) { +                LOG_DBGF("Flow was not pending.");                  return -1; +        } + +        _ap_instance->flows[fd].state = FLOW_ALLOCATED; +        _ap_instance->flows[fd].rb    = shm_ap_rbuff_open(n_pid); +        if (_ap_instance->flows[fd].rb == NULL) { +                LOG_ERR("Could not open N + 1 ringbuffer."); +                _ap_instance->flows[fd].state   = FLOW_NULL; +                _ap_instance->flows[fd].port_id = 0; +                return 0; +        } -        flow->flow.state = FLOW_ALLOCATED; +        FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + +        pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); +        pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], +                       NULL, +                       ipcp_udp_sdu_reader, +                       NULL); +        _ap_instance->ping_pong = !_ap_instance->ping_pong; + +        LOG_DBG("Accepted flow, port_id %u on UDP fd %d.", port_id, fd);          return 0;  }  int ipcp_udp_flow_dealloc(uint32_t port_id)  { -        return 0; -} +        int fd = port_id_to_fd(port_id); +        if (fd < 0) { +                LOG_DBGF("Could not find flow with port_id %u.", port_id); +                return 0; +        } -int ipcp_udp_du_write(uint32_t port_id, -                      size_t map_index) -{ +        _ap_instance->flows[fd].state   = FLOW_NULL; +        _ap_instance->flows[fd].port_id = 0; +        if (_ap_instance->flows[fd].rb != NULL) +                shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + +        FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);          return 0;  } -int ipcp_udp_du_read(uint32_t port_id, -                     size_t map_index) +/* FIXME: may be crap, didn't think this one through */ +int ipcp_udp_flow_dealloc_arr(uint32_t port_id)  { -        return 0; +        int fd = port_id_to_fd(port_id); +        if (fd < 0) { +                LOG_DBGF("Could not find flow with port_id %u.", port_id); +                return 0; +        } + +        _ap_instance->flows[fd].state   = FLOW_NULL; +        _ap_instance->flows[fd].port_id = 0; +        if (_ap_instance->flows[fd].rb != NULL) +                shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + +        FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + +        return ipcp_flow_dealloc(0, port_id);  }  struct ipcp * ipcp_udp_create(char * ap_name) @@ -617,11 +802,14 @@ struct ipcp * ipcp_udp_create(char * ap_name)          struct ipcp_udp_data * data;          struct ipcp_ops *      ops; +        if (shim_ap_init(ap_name) < 0) +                return NULL; +          i = malloc(sizeof *i);          if (i == NULL)                  return NULL; -        data = ipcp_udp_data_create(ap_name); +        data = ipcp_udp_data_create();          if (data == NULL) {                  free(i);                  return NULL; @@ -643,8 +831,6 @@ struct ipcp * ipcp_udp_create(char * ap_name)          ops->ipcp_flow_alloc      = ipcp_udp_flow_alloc;          ops->ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp;          ops->ipcp_flow_dealloc    = ipcp_udp_flow_dealloc; -        ops->ipcp_du_read         = ipcp_udp_du_read; -        ops->ipcp_du_write        = ipcp_udp_du_write;          i->data = (struct ipcp_data *) data;          i->ops  = ops; @@ -656,6 +842,40 @@ struct ipcp * ipcp_udp_create(char * ap_name)  #ifndef MAKE_CHECK +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +/* FIXME: stop eating the CPU */ +void * ipcp_udp_sdu_loop(void * o) +{ +        while (true) { +                struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); +                int fd; +                int len = 0; +                char * buf; +                if (e == NULL) +                        continue; + +                len = shm_du_map_read_sdu((uint8_t **) &buf, +                                          _ap_instance->dum, +                                          e->index); +                if (len == -1) +                        continue; + +                fd = port_id_to_fd(e->port_id); + +                if (fd == -1) +                        continue; + +                if (len == 0) +                        continue; + +                send(fd, buf, len, 0); + +                shm_release_du_buff(_ap_instance->dum, e->index); +        } + +        return (void *) 1; +} +  int main (int argc, char * argv[])  {          /* argument 1: pid of irmd ? */ @@ -680,6 +900,7 @@ int main (int argc, char * argv[])          sigaction(SIGINT,  &sig_act, NULL);          sigaction(SIGTERM, &sig_act, NULL);          sigaction(SIGHUP,  &sig_act, NULL); +        sigaction(SIGPIPE, &sig_act, NULL);          _ipcp = ipcp_udp_create(argv[2]);          if (_ipcp == NULL) { @@ -687,7 +908,18 @@ int main (int argc, char * argv[])                  exit(1);          } -        ipcp_main_loop(_ipcp); +        pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); +        pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); + +        pthread_join(_ap_instance->sduloop, NULL); +        pthread_join(_ap_instance->mainloop, NULL); +        pthread_join(_ap_instance->handler, NULL); +        pthread_join(_ap_instance->sdu_reader[0], NULL); +        pthread_join(_ap_instance->sdu_reader[1], NULL); + +        ipcp_udp_destroy(_ipcp); + +        shim_ap_fini();          exit(0);  } diff --git a/src/ipcpd/shim-udp/tests/shim_udp_test.c b/src/ipcpd/shim-udp/tests/shim_udp_test.c index 036f5877..e5e8b32d 100644 --- a/src/ipcpd/shim-udp/tests/shim_udp_test.c +++ b/src/ipcpd/shim-udp/tests/shim_udp_test.c @@ -59,7 +59,7 @@ int shim_udp_test(int argc, char ** argv)          _ipcp = ipcp_udp_create(ipcp_name);          if (_ipcp == NULL) {                  LOG_ERR("Could not instantiate shim IPCP."); -                shm_du_map_close(dum); +                shm_du_map_destroy(dum);                  exit(1);          } @@ -69,13 +69,13 @@ int shim_udp_test(int argc, char ** argv)          if (ipcp_udp_name_reg("bogus name")) {                  LOG_ERR("Failed to register application."); -                shm_du_map_close(dum); +                shm_du_map_destroy(dum);                  exit(1);          }          if (ipcp_udp_name_unreg("bogus name")) {                  LOG_ERR("Failed to unregister application."); -                shm_du_map_close(dum); +                shm_du_map_destroy(dum);                  exit(1);          } @@ -83,7 +83,7 @@ int shim_udp_test(int argc, char ** argv)                  sprintf(bogus, "bogus name %4d", i);                  if (ipcp_udp_name_reg(bogus)) {                           LOG_ERR("Failed to register application %s.", bogus); -                         shm_du_map_close(dum); +                         shm_du_map_destroy(dum);                           exit(1);                  }          } @@ -92,12 +92,12 @@ int shim_udp_test(int argc, char ** argv)                  sprintf(bogus, "bogus name %4d", i);                  if(ipcp_udp_name_unreg(bogus)) {                           LOG_ERR("Failed to unregister application %s.", bogus); -                         shm_du_map_close(dum); +                         shm_du_map_destroy(dum);                           exit(1);                  }          } -        shm_du_map_close(dum); +        shm_du_map_destroy(dum);          exit(0);  } diff --git a/src/irmd/main.c b/src/irmd/main.c index 67254feb..a6403612 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -34,6 +34,7 @@  #include <ouroboros/utils.h>  #include <ouroboros/dif_config.h>  #include <ouroboros/shm_du_map.h> +#include <ouroboros/bitmap.h>  #include <sys/socket.h>  #include <sys/un.h> @@ -42,41 +43,146 @@  #include <errno.h>  #include <string.h>  #include <limits.h> +#include <pthread.h>  /* FIXME: this smells like part of namespace management */  #define ALL_DIFS "*" +#ifndef IRMD_MAX_FLOWS +  #define IRMD_MAX_FLOWS 4096 +#endif + +#ifndef IRMD_THREADPOOL_SIZE +  #define IRMD_THREADPOOL_SIZE 3 +#endif + + + +enum flow_state { +        FLOW_NULL = 0, +        FLOW_PENDING, +        FLOW_ALLOCATED +}; +  struct ipcp_entry {          struct list_head  next;          instance_name_t * api;          char *            dif_name; + +        pthread_mutex_t   lock;  }; -/* currently supports only registering whatevercast groups of a single AP */ +/* currently supports only registering whatevercast groups of a single AP-I */  struct reg_name_entry {          struct list_head next;          /* generic whatevercast name */          char *             name; -        /* FIXME: resolve name instead */ +        /* FIXME: make a list resolve to AP-I instead */          instance_name_t  * api; -        uint32_t           reg_ap_id; + +        bool   accept; +        char * req_ap_name; +        char * req_ae_name; +        bool   flow_arrived; + +        pthread_mutex_t fa_lock; +}; + +/* keeps track of port_id's between N and N - 1 */ +struct port_map_entry { +        struct list_head next; + +        uint32_t port_id; + +        pid_t    n_pid; +        pid_t    n_1_pid; + +        enum flow_state state;  };  struct irm { -        /* FIXME: list of ipcps can be merged with registered names */ +        /* FIXME: list of ipcps could be merged with registered names */          struct list_head ipcps;          struct list_head reg_names; +        int sockfd; + +        /* keep track of all flows in this processing system */ +        struct bmp * port_ids; + +        /* maps port_ids to pid pair */ +        struct list_head port_map; +          struct shm_du_map * dum; -}; -struct irm * instance = NULL; +        pthread_t * threadpool; + +        pthread_mutex_t lock; +} * instance = NULL; -static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api) +static struct port_map_entry * get_port_map_entry(uint32_t port_id) +{ +        struct list_head * pos = NULL; + +        list_for_each(pos, &instance->port_map) { +                struct port_map_entry * e = +                        list_entry(pos, struct port_map_entry, next); + +                if (e->port_id == port_id) +                        return e; +        } + +        return NULL; +} + +static struct port_map_entry * get_port_map_entry_n(pid_t n_pid) +{ +        struct list_head * pos = NULL; + +        list_for_each(pos, &instance->port_map) { +                struct port_map_entry * e = +                        list_entry(pos, struct port_map_entry, next); + +                if (e->n_pid == n_pid) +                        return e; +        } + +        return NULL; +} + +static struct ipcp_entry * ipcp_entry_create() +{ +        struct ipcp_entry * e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        e->api = NULL; +        e->dif_name = NULL; + +        INIT_LIST_HEAD(&e->next); +        pthread_mutex_init(&e->lock, NULL); + +        return e; +} + +static void ipcp_entry_destroy(struct ipcp_entry * e) +{ +        if (e == NULL) +                return; + +        if (e->api != NULL) +                instance_name_destroy(e->api); + +        if (e->dif_name != NULL) +                free(e->dif_name); + +        free(e); +} + +static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api)  { -        struct ipcp_entry * tmp = NULL;          struct list_head * pos = NULL;          list_for_each(pos, &instance->ipcps) { @@ -87,7 +193,7 @@ static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api)                          return tmp;          } -        return tmp; +        return NULL;  }  static instance_name_t * get_ipcp_by_name(char * ap_name) @@ -143,9 +249,14 @@ static struct reg_name_entry * reg_name_entry_create()          if (e == NULL)                  return NULL; -        e->reg_ap_id = rand() % INT_MAX; -        e->name  = NULL; +        e->name         = NULL; +        e->api          = NULL; +        e->accept       = false; +        e->req_ap_name  = NULL; +        e->req_ae_name  = NULL; +        e->flow_arrived = false; +        pthread_mutex_init(&e->fa_lock, NULL);          INIT_LIST_HEAD(&e->next);          return e; @@ -153,7 +264,7 @@ static struct reg_name_entry * reg_name_entry_create()  static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e,                                                     char *                  name, -                                                   instance_name_t       * api) +                                                   instance_name_t *       api)  {          if (e == NULL || name == NULL || api == NULL)                  return NULL; @@ -171,10 +282,18 @@ static int reg_name_entry_destroy(struct reg_name_entry * e)          free(e->name);          instance_name_destroy(e->api); + +        if (e->req_ap_name != NULL) +                free(e->req_ap_name); +        if (e->req_ae_name != NULL) +                free(e->req_ae_name); + +        free(e); +          return 0;  } -static struct reg_name_entry * find_reg_name_entry_by_name(char * name) +static struct reg_name_entry * get_reg_name_entry_by_name(char * name)  {          struct list_head * pos = NULL; @@ -189,7 +308,7 @@ static struct reg_name_entry * find_reg_name_entry_by_name(char * name)          return NULL;  } -static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) +static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid)  {          struct list_head * pos = NULL; @@ -197,7 +316,7 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)                  struct reg_name_entry * e =                          list_entry(pos, struct reg_name_entry, next); -                if (reg_ap_id == e->reg_ap_id) +                if (e->api->id == pid)                          return e;          } @@ -207,10 +326,17 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)  /* FIXME: add only name when we have NSM solved */  static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)  { -        struct reg_name_entry * e = find_reg_name_entry_by_name(name); +        struct reg_name_entry * e = get_reg_name_entry_by_name(name);          if (e == NULL) {                  e = reg_name_entry_create(); -                e = reg_name_entry_init(e, name, api); +                if (e == NULL) +                        return -1; + +                if (reg_name_entry_init(e, name, api) == NULL) { +                        reg_name_entry_destroy(e); +                        return -1; +                } +                  list_add(&e->next, &instance->reg_names);                  return 0;          } @@ -221,7 +347,7 @@ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)  static int reg_name_entry_del_name(char * name)  { -        struct reg_name_entry * e = find_reg_name_entry_by_name(name); +        struct reg_name_entry * e = get_reg_name_entry_by_name(name);          if (e == NULL)                  return 0; @@ -240,34 +366,38 @@ static pid_t create_ipcp(char *         ap_name,          pid = ipcp_create(ap_name, ipcp_type);          if (pid == -1) { -                LOG_ERR("Failed to create IPCP"); +                LOG_ERR("Failed to create IPCP.");                  return -1;          } -        tmp = malloc(sizeof(*tmp)); -        if (tmp == NULL) { +        tmp = ipcp_entry_create(); +        if (tmp == NULL)                  return -1; -        }          INIT_LIST_HEAD(&tmp->next);          tmp->api = instance_name_create();          if (tmp->api == NULL) { -                free(tmp); +                ipcp_entry_destroy(tmp);                  return -1;          }          if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) {                  instance_name_destroy(tmp->api); -                free(tmp); +                ipcp_entry_destroy(tmp);                  return -1;          }          tmp->dif_name = NULL; -        LOG_DBG("Created IPC process with pid %d", pid); +        pthread_mutex_lock(&instance->lock);          list_add(&tmp->next, &instance->ipcps); + +        pthread_mutex_unlock(&instance->lock); + +        LOG_INFO("Created IPCP %s-%d ", ap_name, pid); +          return pid;  } @@ -276,18 +406,19 @@ static int destroy_ipcp(instance_name_t * api)          struct list_head * pos = NULL;          struct list_head * n = NULL; +        if (api == NULL) +                return 0; +          if (api->id == 0)                  api = get_ipcp_by_name(api->name);          if (api == NULL) {                  LOG_ERR("No such IPCP in the system."); -                return -1; +                return 0;          } -        LOG_DBG("Destroying ipcp %s-%d", api->name, api->id); -          if (ipcp_destroy(api->id)) -                LOG_ERR("Could not destroy IPCP"); +                LOG_ERR("Could not destroy IPCP.");          list_for_each_safe(pos, n, &(instance->ipcps)) {                  struct ipcp_entry * tmp = @@ -295,8 +426,12 @@ static int destroy_ipcp(instance_name_t * api)                  if (instance_name_cmp(api, tmp->api) == 0)                          list_del(&tmp->next); + +                ipcp_entry_destroy(tmp);          } +        LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id); +          return 0;  } @@ -313,25 +448,28 @@ static int bootstrap_ipcp(instance_name_t *  api,                  return -1;          } -        entry = find_ipcp_entry_by_name(api); +        entry = get_ipcp_entry_by_name(api);          if (entry == NULL) { -                LOG_ERR("No such IPCP"); +                LOG_ERR("No such IPCP.");                  return -1;          }          entry->dif_name = strdup(conf->dif_name);          if (entry->dif_name == NULL) { -                LOG_ERR("Failed to strdup"); +                LOG_ERR("Failed to strdup.");                  return -1;          }          if (ipcp_bootstrap(entry->api->id, conf)) { -                LOG_ERR("Could not bootstrap IPCP"); +                LOG_ERR("Could not bootstrap IPCP.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1;          } +        LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.", +                 api->name, api->id, conf->dif_name); +          return 0;  } @@ -343,21 +481,21 @@ static int enroll_ipcp(instance_name_t  * api,          ssize_t n_1_difs_size = 0;          struct ipcp_entry * entry = NULL; -        entry = find_ipcp_entry_by_name(api); +        entry = get_ipcp_entry_by_name(api);          if (entry == NULL) { -                LOG_ERR("No such IPCP"); +                LOG_ERR("No such IPCP.");                  return -1;          }          entry->dif_name = strdup(dif_name);          if (entry->dif_name == NULL) { -                LOG_ERR("Failed to strdup"); +                LOG_ERR("Failed to strdup.");                  return -1;          }          member = da_resolve_daf(dif_name);          if (member == NULL) { -                LOG_ERR("Could not find a member of that DIF"); +                LOG_ERR("Could not find a member of that DIF.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1; @@ -365,19 +503,22 @@ static int enroll_ipcp(instance_name_t  * api,          n_1_difs_size = da_resolve_dap(member, n_1_difs);          if (n_1_difs_size < 1) { -                LOG_ERR("Could not find N-1 DIFs"); +                LOG_ERR("Could not find N-1 DIFs.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1;          }          if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) { -                LOG_ERR("Could not enroll IPCP"); +                LOG_ERR("Could not enroll IPCP.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1;          } +        LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", +                 api->name, api->id, dif_name); +          return 0;  } @@ -386,7 +527,7 @@ static int reg_ipcp(instance_name_t * api,                      size_t            difs_size)  {          if (ipcp_reg(api->id, difs, difs_size)) { -                LOG_ERR("Could not register IPCP to N-1 DIF(s)"); +                LOG_ERR("Could not register IPCP to N-1 DIF(s).");                  return -1;          } @@ -399,24 +540,23 @@ static int unreg_ipcp(instance_name_t  * api,  {          if (ipcp_unreg(api->id, difs, difs_size)) { -                LOG_ERR("Could not unregister IPCP from N-1 DIF(s)"); +                LOG_ERR("Could not unregister IPCP from N-1 DIF(s).");                  return -1;          }          return 0;  } -static int ap_unreg_id(uint32_t reg_ap_id, -                       pid_t    pid, +static int ap_unreg_id(pid_t    pid,                         char **  difs,                         size_t   len)  {          int i;          int ret = 0; -        struct reg_name_entry * rne    = NULL; -        struct list_head      * pos  = NULL; +        struct reg_name_entry * rne = NULL; +        struct list_head      * pos = NULL; -        rne = find_reg_name_entry_by_id(reg_ap_id); +        rne = get_reg_name_entry_by_id(pid);          if (rne == NULL)                  return 0; /* no such id */ @@ -458,7 +598,6 @@ static int ap_reg(char *  ap_name,  {          int i;          int ret = 0; -        int reg_ap_id = 0;          struct list_head * pos = NULL;          struct reg_name_entry * rne = NULL; @@ -466,18 +605,18 @@ static int ap_reg(char *  ap_name,          instance_name_t * ipcpi = NULL;          if (instance->ipcps.next == NULL) -                LOG_ERR("No IPCPs in this system."); +                return -1;          /* check if this ap_name is already registered */ -        rne = find_reg_name_entry_by_name(ap_name); +        rne = get_reg_name_entry_by_name(ap_name);          if (rne != NULL)                  return -1; /* can only register one instance for now */ -        rne = reg_name_entry_create(); -        if (rne == NULL) +        api = instance_name_create(); +        if (api == NULL) {                  return -1; +        } -        api = instance_name_create();          if (instance_name_init_from(api, ap_name, ap_id) == NULL) {                  instance_name_destroy(api);                  return -1; @@ -488,12 +627,6 @@ static int ap_reg(char *  ap_name,           * contains a single instance only           */ -        if (reg_name_entry_init(rne, strdup(ap_name), api) == NULL) { -                reg_name_entry_destroy(rne); -                instance_name_destroy(api); -                return -1; -        } -          if (strcmp(difs[0], ALL_DIFS) == 0) {                  list_for_each(pos, &instance->ipcps) {                          struct ipcp_entry * e = @@ -528,11 +661,10 @@ static int ap_reg(char *  ap_name,                  return -1;          }          /* for now, we register single instances */ -        reg_name_entry_add_name_instance(strdup(ap_name), -                                         instance_name_dup(api)); -        instance_name_destroy(api); +        ret = reg_name_entry_add_name_instance(strdup(ap_name), +                                               api); -        return reg_ap_id; +        return ret;  }  static int ap_unreg(char *  ap_name, @@ -542,149 +674,304 @@ static int ap_unreg(char *  ap_name,  {          struct reg_name_entry * tmp = NULL; -        instance_name_t * api = instance_name_create(); -        if (api == NULL) -                return -1; - -        if (instance_name_init_from(api, ap_name, ap_id) == NULL) { -                instance_name_destroy(api); -                return -1; -        } -          /* check if ap_name is registered */ -        tmp = find_reg_name_entry_by_name(api->name); -        if (tmp == NULL) { -                instance_name_destroy(api); +        tmp = get_reg_name_entry_by_id(ap_id); +        if (tmp == NULL)                  return 0; -        } else { -                return ap_unreg_id(tmp->reg_ap_id, api->id, difs, len); -        } -} +        if (strcmp(ap_name, tmp->api->name)) +                return 0; -static int flow_accept(int fd, -                       pid_t pid, -                       char * ap_name, -                       char * ae_name) -{ -        return -1; +        return ap_unreg_id(ap_id, difs, len);  } -static int flow_alloc_resp(int fd, -                           int result) +static struct port_map_entry * flow_accept(pid_t    pid, +                                           char **  ap_name, +                                           char **  ae_name)  { -        return -1; +        bool arrived = false; + +        struct timespec         ts = {0, 100000}; + +        struct port_map_entry * pme; +        struct reg_name_entry * rne = get_reg_name_entry_by_id(pid); +        if (rne == NULL) { +                LOG_DBGF("Unregistered AP calling accept()."); +                return NULL; +        } + +        if (rne->accept) { +                LOG_DBGF("This AP still has a pending accept()."); +                return NULL; +        } + +        rne->accept = true; + +        /* FIXME: wait for a thread that runs select() on flow_arrived */ +        while (!arrived) { +                /* FIXME: this needs locking */ +                rne = get_reg_name_entry_by_id(pid); +                if (rne == NULL) +                        return NULL; +                arrived = rne->flow_arrived; +                nanosleep(&ts, NULL); +        } + +        pme = get_port_map_entry_n(pid); +        if (pme == NULL) { +                LOG_ERR("Port_id was not created yet."); +                return NULL; +        } + +        pthread_mutex_lock(&rne->fa_lock); +        *ap_name = rne->req_ap_name; +        if (ae_name != NULL) +                *ae_name = rne->req_ae_name; +        pthread_mutex_unlock(&rne->fa_lock); + +        return pme;  } -static int flow_alloc(char * dst_name, -                      char * src_ap_name, -                      char * src_ae_name, -                      struct qos_spec * qos, -                      int oflags) +static int flow_alloc_resp(pid_t     n_pid, +                           uint32_t  port_id, +                           int       response)  { -        int   port_id = 0; -        pid_t pid     = get_ipcp_by_dst_name(dst_name)->id; +        struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid); +        struct port_map_entry * pme = get_port_map_entry(port_id); -        LOG_DBG("flow alloc received from %s-%s to %s.", -                 src_ap_name, src_ae_name, dst_name); +        if (rne == NULL || pme == NULL) +                return -1; -        return ipcp_flow_alloc(pid, -                               port_id, -                               dst_name, -                               src_ap_name, -                               src_ae_name, -                               qos); +        /* FIXME: check all instances associated with the name */ +        if (!rne->accept) { +                LOG_ERR("No process listening for this name."); +                return -1; +        } + +        /* +         * consider the flow as handled +         * once we can handle a list of AP-I's, remove it from the list +         */ + +        rne->flow_arrived = false; +        rne->accept       = false; + +        if (!response) +                pme->state = FLOW_ALLOCATED; + +        return ipcp_flow_alloc_resp(pme->n_1_pid, +                                    port_id, +                                    pme->n_pid, +                                    response);  } -static int flow_alloc_res(int fd) +static struct port_map_entry * flow_alloc(pid_t  pid, +                                          char * dst_name, +                                          char * src_ap_name, +                                          char * src_ae_name, +                                          struct qos_spec * qos)  { +        struct port_map_entry * e = malloc(sizeof(*e)); +        if (e == NULL) { +                LOG_ERR("Failed malloc of port_map_entry."); +                return NULL; +        } -        return -1; +        e->port_id = bmp_allocate(instance->port_ids); +        e->n_pid   = pid; +        e->state   = FLOW_PENDING; +        e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + +        list_add(&e->next, &instance->port_map); + +        if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id, +                            e->port_id, +                            e->n_pid, +                            dst_name, +                            src_ap_name, +                            src_ae_name, +                            qos) < 0) { +                list_del(&e->next); +                bmp_release(instance->port_ids, e->port_id); +                free(e); +                return NULL; +        } + +        return e;  } -static int flow_dealloc(int fd) +static int flow_alloc_res(uint32_t port_id)  { -        return -1; +        bool allocated = false; +        struct port_map_entry * e; +        struct timespec ts = {0,100000}; + +        while (!allocated) { +                /* FIXME: this needs locking */ +                e = get_port_map_entry(port_id); +                if (e == NULL) { +                        LOG_DBGF("Could not locate port_id %u", port_id); +                        return -1; +                } +                if (e->state == FLOW_ALLOCATED) +                        allocated = true; +                nanosleep(&ts, NULL); +        } + +        return 0;  } -static int flow_cntl(int fd, -                     int oflags) +static int flow_dealloc(uint32_t port_id)  { -        return -1; +        pid_t    n_1_pid; + +        struct port_map_entry * e = get_port_map_entry(port_id); +        if (e == NULL) +                return 0; + +        n_1_pid = e->n_1_pid; + +        list_del(&e->next); +        free(e); + +        return ipcp_flow_dealloc(n_1_pid, port_id);  } -static int flow_req_arr(char * dst_name, -                        char * ap_name, -                        char * ae_name) +static struct port_map_entry * flow_req_arr(pid_t  pid, +                                            char * dst_name, +                                            char * ap_name, +                                            char * ae_name)  { -        return -1; +        struct reg_name_entry * rne; +        struct port_map_entry * pme; + +        rne = get_reg_name_entry_by_name(dst_name); +        if (rne == NULL) { +                LOG_DBGF("Destination name %s unknown.", dst_name); +                return NULL; +        } + +        pme = malloc(sizeof(*pme)); +        if (pme == NULL) { +                LOG_ERR("Failed malloc of port_map_entry."); +                return NULL; +        } + +        pme->port_id = bmp_allocate(instance->port_ids); +        pme->n_pid   = rne->api->id; +        pme->state   = FLOW_PENDING; +        pme->n_1_pid = pid; + +        list_add(&pme->next, &instance->port_map); + +        pthread_mutex_lock(&rne->fa_lock); + +        rne->req_ap_name = strdup(ap_name); +        rne->req_ae_name = strdup(ae_name); + +        rne->flow_arrived = true; + +        pthread_mutex_unlock(&rne->fa_lock); + +        return pme;  }  static int flow_alloc_reply(uint32_t port_id, -                            int      result) +                            int      response)  { -        return -1; +        struct port_map_entry * e; + +        /* FIXME: do this under lock */ +        if (!response) { +                e = get_port_map_entry(port_id); +                if (e == NULL) +                        return -1; +                e->state = FLOW_ALLOCATED; +        } + +        /* FIXME: does this need to be propagated to the IPCP? */ + +        return 0;  }  static int flow_dealloc_ipcp(uint32_t port_id)  { -        return -1; +        struct port_map_entry * e = get_port_map_entry(port_id); +        if (e == NULL) +                return 0; + +        list_del(&e->next); +        free(e); + +        return 0; +} + +static void irm_destroy(struct irm * irm) +{ +        struct list_head * h; +        struct list_head * t; + +        if (irm == NULL) +                return; + +        if (irm->threadpool != NULL) +                free(irm->threadpool); + +        if (irm->port_ids != NULL) +                bmp_destroy(irm->port_ids); +        /* clear the lists */ +        list_for_each_safe(h, t, &irm->ipcps) { +                struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); +                destroy_ipcp(e->api); +        } + +        list_for_each_safe(h, t, &irm->reg_names) { +                struct reg_name_entry * e = list_entry(h, +                                                       struct reg_name_entry, +                                                       next); +                char * difs [1] = {ALL_DIFS}; +                ap_unreg_id(e->api->id, difs, 1); +        } + +        list_for_each_safe(h, t, &irm->port_map) { +                struct port_map_entry * e = list_entry(h, +                                                       struct port_map_entry, +                                                       next); +                list_del(&e->next); +                free(e); +        } + +        if (irm->dum != NULL) +                shm_du_map_destroy(irm->dum); + +        close(irm->sockfd); +        free(irm);  }  void irmd_sig_handler(int sig, siginfo_t * info, void * c)  { +        int i; +          switch(sig) {          case SIGINT:          case SIGTERM:          case SIGHUP: -                shm_du_map_close(instance->dum); -                free(instance); -                exit(0); +                if (instance->threadpool != NULL) { +                        for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) +                                pthread_cancel(instance->threadpool[i]); +                } + +        case SIGPIPE: +                LOG_DBG("Ignoring SIGPIPE.");          default:                  return;          }  } -int main() +void * mainloop()  { -        int     sockfd;          uint8_t buf[IRM_MSG_BUF_SIZE]; -        struct sigaction sig_act; - -        /* init sig_act */ -        memset(&sig_act, 0, sizeof sig_act); - -        /* install signal traps */ -        sig_act.sa_sigaction = &irmd_sig_handler; -        sig_act.sa_flags     = SA_SIGINFO; - -        sigaction(SIGINT,  &sig_act, NULL); -        sigaction(SIGTERM, &sig_act, NULL); -        sigaction(SIGHUP,  &sig_act, NULL); - -        if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) -                unlink("/dev/shm/" SHM_DU_MAP_FILENAME); - -        instance = malloc(sizeof(*instance)); -        if (instance == NULL) -                return -1; - -        if ((instance->dum = shm_du_map_create()) == NULL) { -                free(instance); -                return -1; -        } - -        INIT_LIST_HEAD(&instance->ipcps); -        INIT_LIST_HEAD(&instance->reg_names); - -        sockfd = server_socket_open(IRM_SOCK_PATH); -        if (sockfd < 0) { -                shm_du_map_close(instance->dum); -                free(instance); -                return -1; -        } -          while (true) {                  int cli_sockfd;                  irm_msg_t * msg; @@ -692,18 +979,19 @@ int main()                  instance_name_t api;                  buffer_t buffer;                  irm_msg_t ret_msg = IRM_MSG__INIT; +                struct port_map_entry * e = NULL;                  ret_msg.code = IRM_MSG_CODE__IRM_REPLY; -                cli_sockfd = accept(sockfd, 0, 0); +                cli_sockfd = accept(instance->sockfd, 0, 0);                  if (cli_sockfd < 0) { -                        LOG_ERR("Cannot accept new connection"); +                        LOG_ERR("Cannot accept new connection.");                          continue;                  }                  count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE);                  if (count <= 0) { -                        LOG_ERR("Failed to read from socket"); +                        LOG_ERR("Failed to read from socket.");                          close(cli_sockfd);                          continue;                  } @@ -750,11 +1038,11 @@ int main()                                                      msg->n_dif_name);                          break;                  case IRM_MSG_CODE__IRM_AP_REG: -                        ret_msg.has_fd = true; -                        ret_msg.fd = ap_reg(msg->ap_name, -                                            msg->pid, -                                            msg->dif_name, -                                            msg->n_dif_name); +                        ret_msg.has_result = true; +                        ret_msg.result = ap_reg(msg->ap_name, +                                                msg->pid, +                                                msg->dif_name, +                                                msg->n_dif_name);                          break;                  case IRM_MSG_CODE__IRM_AP_UNREG:                          ret_msg.has_result = true; @@ -764,43 +1052,57 @@ int main()                                                    msg->n_dif_name);                          break;                  case IRM_MSG_CODE__IRM_FLOW_ACCEPT: -                        ret_msg.has_fd = true; -                        ret_msg.fd = flow_accept(msg->fd, -                                                 msg->pid, -                                                 ret_msg.ap_name, -                                                 ret_msg.ae_name); +                        e = flow_accept(msg->pid, +                                        &ret_msg.ap_name, +                                        &ret_msg.ae_name); +                        if (e == NULL) +                                break; + +                        ret_msg.has_port_id = true; +                        ret_msg.port_id     = e->port_id; +                        ret_msg.has_pid     = true; +                        ret_msg.pid         = e->n_1_pid;                          break;                  case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP:                          ret_msg.has_result = true; -                        ret_msg.result = flow_alloc_resp(msg->fd, -                                                         msg->result); +                        ret_msg.result = flow_alloc_resp(msg->pid, +                                                         msg->port_id, +                                                         msg->response);                          break;                  case IRM_MSG_CODE__IRM_FLOW_ALLOC: -                        ret_msg.has_fd = true; -                        ret_msg.fd = flow_alloc(msg->dst_name, -                                                msg->ap_name, -                                                msg->ae_name, -                                                NULL, -                                                msg->oflags); +                        e = flow_alloc(msg->pid, +                                       msg->dst_name, +                                       msg->ap_name, +                                       msg->ae_name, +                                       NULL); +                        if (e == NULL) +                                break; + +                        ret_msg.has_port_id = true; +                        ret_msg.port_id     = e->port_id; +                        ret_msg.has_pid     = true; +                        ret_msg.pid         = e->n_1_pid;                          break;                  case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: -                        ret_msg.has_response = true; -                        ret_msg.response = flow_alloc_res(msg->fd); -                        break; -                case IRM_MSG_CODE__IRM_FLOW_DEALLOC:                          ret_msg.has_result = true; -                        ret_msg.result = flow_dealloc(msg->fd); +                        ret_msg.result = flow_alloc_res(msg->port_id);                          break; -                case IRM_MSG_CODE__IRM_FLOW_CONTROL: +                case IRM_MSG_CODE__IRM_FLOW_DEALLOC:                          ret_msg.has_result = true; -                        ret_msg.result = flow_cntl(msg->fd, -                                                   msg->oflags); +                        ret_msg.result = flow_dealloc(msg->port_id);                          break;                  case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: +                        e = flow_req_arr(msg->pid, +                                         msg->dst_name, +                                         msg->ap_name, +                                         msg->ae_name); +                        if (e == NULL) +                                break; +                          ret_msg.has_port_id = true; -                        ret_msg.port_id = flow_req_arr(msg->dst_name, -                                                       msg->ap_name, -                                                       msg->ae_name); +                        ret_msg.port_id     = e->port_id; +                        ret_msg.has_pid     = true; +                        ret_msg.pid         = e->n_pid;                          break;                  case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:                          ret_msg.has_result = true; @@ -812,7 +1114,7 @@ int main()                          ret_msg.result = flow_dealloc_ipcp(msg->port_id);                          break;                  default: -                        LOG_ERR("Don't know that message code"); +                        LOG_ERR("Don't know that message code.");                          break;                  } @@ -820,7 +1122,7 @@ int main()                  buffer.size = irm_msg__get_packed_size(&ret_msg);                  if (buffer.size == 0) { -                        LOG_ERR("Failed to send reply message"); +                        LOG_ERR("Failed to send reply message.");                          close(cli_sockfd);                          continue;                  } @@ -842,6 +1144,88 @@ int main()                  free(buffer.data);                  close(cli_sockfd);          } +} + +static struct irm * irm_create() +{ +        struct irm * i = malloc(sizeof(*i)); +        if (i == NULL) +                return NULL; + +        if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) +                unlink("/dev/shm/" SHM_DU_MAP_FILENAME); + +        i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); +        if (i->threadpool == NULL) { +                irm_destroy(i); +                return NULL; +        } + +        if ((i->dum = shm_du_map_create()) == NULL) { +                irm_destroy(i); +                return NULL; +        } + +        INIT_LIST_HEAD(&i->ipcps); +        INIT_LIST_HEAD(&i->reg_names); +        INIT_LIST_HEAD(&i->port_map); + +        i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); +        if (i->port_ids == NULL) { +                irm_destroy(i); +                return NULL; +        } + +        i->sockfd = server_socket_open(IRM_SOCK_PATH); +        if (i->sockfd < 0) { +                irm_destroy(i); +                return NULL; +        } + +        pthread_mutex_init(&i->lock, NULL); + +        return i; +} + +int main() +{ +        struct sigaction sig_act; + +        int t = 0; + +        /* init sig_act */ +        memset(&sig_act, 0, sizeof sig_act); + +        /* install signal traps */ +        sig_act.sa_sigaction = &irmd_sig_handler; +        sig_act.sa_flags     = SA_SIGINFO; + +        if (sigaction(SIGINT,  &sig_act, NULL) < 0) +                exit(1); +        if (sigaction(SIGTERM, &sig_act, NULL) < 0) +                exit(1); +        if (sigaction(SIGHUP,  &sig_act, NULL) < 0) +                exit(1); +        if (sigaction(SIGPIPE, &sig_act, NULL) < 0) +                exit(1); + +        instance = irm_create(); +        if (instance == NULL) +                return 1; + +        /* +         * FIXME: we need a main loop that delegates messages to subthreads in a +         * way that avoids all possible deadlocks for local apps +         */ + +        for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) +                pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); + +        /* wait for (all of them) to return */ +        for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) +                pthread_join(instance->threadpool[t], NULL); + +        irm_destroy(instance);          return 0;  } diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4922e07c..53a7b354 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -32,6 +32,7 @@ set(SOURCE_FILES    ipcp.c    irm.c    list.c +  shm_ap_rbuff.c    shm_du_map.c    sockets.c    utils.c diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 8aabb4f4..e84145b2 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -108,12 +108,14 @@ struct bmp * bmp_create(size_t bits, ssize_t offset)                  return NULL;          tmp = malloc(sizeof(*tmp)); -        if (!tmp) +        if (tmp == NULL)                  return NULL; -        tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(*(tmp->bitmap))); -        if (!tmp->bitmap) +        tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(unsigned long)); +        if (tmp->bitmap == NULL) { +                free(tmp);                  return NULL; +        }          tmp->size = bits;          tmp->offset = offset; @@ -140,7 +142,8 @@ int bmp_destroy(struct bmp * b)  static ssize_t bad_id(struct bmp * b)  { -        assert(b); +        if (b == NULL) +                return -1;          return b->offset - 1;  } @@ -149,8 +152,8 @@ ssize_t bmp_allocate(struct bmp * b)  {          ssize_t id; -        if (!b) -                return bad_id(b); +        if (b == NULL) +                return -1;          id = (ssize_t) find_next_zero_bit(b->bitmap,                                            b->size); @@ -177,7 +180,7 @@ static bool is_id_valid(struct bmp * b,  bool bmp_is_id_valid(struct bmp * b,                       ssize_t id)  { -        if (!b) +        if (b == NULL)                  return false;          return is_id_valid(b, id); @@ -188,7 +191,7 @@ int bmp_release(struct bmp * b,  {          ssize_t rid; -        if (!b) +        if (b == NULL)                  return -1;          if (!is_id_valid(b, id)) diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d8411c5..c99e8cdb 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,73 +25,190 @@  #include <ouroboros/logs.h>  #include <ouroboros/dev.h>  #include <ouroboros/sockets.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/instance_name.h> +#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/utils.h>  #include <stdlib.h> +#include <string.h> -int ap_reg(char * ap_name, -           char ** difs, -           size_t difs_size) +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE +  #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE +  #define DU_BUFF_TAILSPACE 0 +#endif + +struct flow { +        struct shm_ap_rbuff * rb; +        uint32_t              port_id; +        uint32_t              oflags; + +        /* don't think this needs locking */ +}; + +struct ap_data { +        instance_name_t *     api; +        struct shm_du_map *   dum; +        struct bmp *          fds; + +        struct shm_ap_rbuff * rb; +        struct flow           flows[AP_MAX_FLOWS]; +} * _ap_instance; + + +int ap_init(char * ap_name)  { -        irm_msg_t msg = IRM_MSG__INIT; +        _ap_instance = malloc(sizeof(struct ap_data)); +        if (_ap_instance == NULL) { +                return -1; +        } + +        _ap_instance->api = instance_name_create(); +        if (_ap_instance->api == NULL) { +                free(_ap_instance); +                return -1; +        } + +        if (instance_name_init_from(_ap_instance->api, +                                    ap_name, +                                    getpid()) == NULL) { +                instance_name_destroy(_ap_instance->api); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); +        if (_ap_instance->fds == NULL) { +                instance_name_destroy(_ap_instance->api); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->dum = shm_du_map_open(); +        if (_ap_instance->dum == NULL) { +                instance_name_destroy(_ap_instance->api); +                bmp_destroy(_ap_instance->fds); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->rb = shm_ap_rbuff_create(); +        if (_ap_instance->rb == NULL) { +                instance_name_destroy(_ap_instance->api); +                bmp_destroy(_ap_instance->fds); +                free(_ap_instance); +                return -1; +        } + +        return 0; +} + +void ap_fini() +{ +        int i = 0; + +        if (_ap_instance == NULL) +                return; +        if (_ap_instance->api != NULL) +                instance_name_destroy(_ap_instance->api); +        if (_ap_instance->fds != NULL) +                bmp_destroy(_ap_instance->fds); +        if (_ap_instance->dum != NULL) +                shm_du_map_close(_ap_instance->dum); +        if (_ap_instance->rb != NULL) +                shm_ap_rbuff_destroy(_ap_instance->rb); +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                if (_ap_instance->flows[i].rb != NULL) +                        shm_ap_rbuff_close(_ap_instance->flows[i].rb); + +        free(_ap_instance); +} + +#if 0 +static int port_id_to_fd(uint32_t port_id) +{ +        int i; +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                if (_ap_instance->flows[i].port_id == port_id +                        && _ap_instance->flows[i].state != FLOW_NULL) +                        return i; +        return -1; +} +#endif + +int ap_reg(char ** difs, +           size_t  len) +{ +        irm_msg_t msg        = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int fd = 0; +        int fd = bmp_allocate(_ap_instance->fds); -        if (ap_name == NULL || -            difs == NULL || -            difs_size == 0 || +        if (difs == NULL || +            len == 0 ||              difs[0] == NULL) {                  return -EINVAL;          } +        if (_ap_instance == NULL) { +                LOG_DBG("ap_init was not called"); +                return -1; +        } +          msg.code       = IRM_MSG_CODE__IRM_AP_REG;          msg.has_pid    = true; -        msg.pid        = getpid(); -        msg.ap_name    = ap_name; +        msg.pid        = _ap_instance->api->id; +        msg.ap_name    = _ap_instance->api->name;          msg.dif_name   = difs; -        msg.n_dif_name = difs_size; +        msg.n_dif_name = len;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_fd == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        fd = recv_msg->fd; +        if (recv_msg->result < 0) +                fd = -1; +          irm_msg__free_unpacked(recv_msg, NULL);          return fd;  } -int ap_unreg(char * ap_name, -             char ** difs, -             size_t difs_size) +int ap_unreg(char ** difs, +             size_t  len)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL;          int ret = -1; -        if (ap_name == NULL || -            difs == NULL || -            difs_size == 0 || +        if (difs == NULL || +            len == 0 ||              difs[0] == NULL) {                  return -EINVAL;          }          msg.code       = IRM_MSG_CODE__IRM_AP_UNREG;          msg.has_pid    = true; -        msg.pid        = getpid(); -        msg.ap_name    = ap_name; +        msg.pid        = _ap_instance->api->id; +        msg.ap_name    = _ap_instance->api->name;          msg.dif_name   = difs; -        msg.n_dif_name = difs_size; +        msg.n_dif_name = len;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -102,38 +219,62 @@ int ap_unreg(char * ap_name,          return ret;  } -int flow_accept(int fd, -                char * ap_name, -                char * ae_name) +int flow_accept(int     fd, +                char ** ap_name, +                char ** ae_name)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int cli_fd = 0; - -        if (ap_name == NULL) { -                return -EINVAL; -        } +        int cfd = -1;          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_pid = true; -        msg.pid     = getpid(); -        msg.has_fd  = true; -        msg.fd      = fd; +        msg.pid     = _ap_instance->api->id;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_fd == false) { +        if (!recv_msg->has_pid || !recv_msg->has_port_id) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        cli_fd  = recv_msg->fd; -        ap_name = recv_msg->ap_name; -        ae_name = recv_msg->ae_name; + +        cfd = bmp_allocate(_ap_instance->fds); + +        _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); +        if (_ap_instance->flows[cfd].rb == NULL) { +                bmp_release(_ap_instance->fds, cfd); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        *ap_name = strdup(recv_msg->ap_name); +        if (*ap_name == NULL) { +                bmp_release(_ap_instance->fds, cfd); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        if (ae_name != NULL) { +                *ae_name = strdup(recv_msg->ae_name); +                if (*ae_name == NULL) { +                        bmp_release(_ap_instance->fds, cfd); +                        irm_msg__free_unpacked(recv_msg, NULL); +                        return -1; +                } +        } + +        _ap_instance->flows[cfd].port_id = recv_msg->port_id; +        _ap_instance->flows[cfd].oflags  = FLOW_O_DEFAULT; + +          irm_msg__free_unpacked(recv_msg, NULL); -        return cli_fd; + +        bmp_release(_ap_instance->fds, fd); + +        return cfd;  }  int flow_alloc_resp(int fd, @@ -145,9 +286,9 @@ int flow_alloc_resp(int fd,          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;          msg.has_pid      = true; -        msg.pid          = getpid(); -        msg.has_fd       = true; -        msg.fd = fd; +        msg.pid          = _ap_instance->api->id; +        msg.has_port_id  = true; +        msg.port_id      = _ap_instance->flows[fd].port_id;          msg.has_response = true;          msg.response     = response; @@ -155,7 +296,7 @@ int flow_alloc_resp(int fd,          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -167,41 +308,49 @@ int flow_alloc_resp(int fd,  }  int flow_alloc(char * dst_name, -               char * src_ap_name,                 char * src_ae_name, -               struct qos_spec * qos, -               int oflags) +               struct qos_spec * qos)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int fd = 0; +        int fd = -1; -        if (dst_name == NULL || -            src_ap_name == NULL) { +        if (dst_name == NULL)                  return -EINVAL; -        }          if (src_ae_name == NULL)                  src_ae_name  = UNKNOWN_AE;          msg.code        = IRM_MSG_CODE__IRM_FLOW_ALLOC;          msg.dst_name    = dst_name; -        msg.ap_name     = src_ap_name; +        msg.ap_name     = _ap_instance->api->name; +        msg.has_pid     = true; +        msg.pid         = _ap_instance->api->id;          msg.ae_name     = src_ae_name; -        msg.has_oflags  = true; -        msg.oflags      = oflags;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_fd == false) { +        if (!recv_msg->has_pid || !recv_msg->has_port_id) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        fd = recv_msg->fd; +        fd = bmp_allocate(_ap_instance->fds); + +        _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); +        if (_ap_instance->flows[fd].rb == NULL) { +                bmp_release(_ap_instance->fds, fd); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        _ap_instance->flows[fd].port_id = recv_msg->port_id; +        _ap_instance->flows[fd].oflags  = FLOW_O_DEFAULT; +          irm_msg__free_unpacked(recv_msg, NULL); +          return fd;  } @@ -211,17 +360,15 @@ int flow_alloc_res(int fd)          irm_msg_t * recv_msg = NULL;          int result = 0; -        msg.code    = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; -        msg.has_pid = true; -        msg.pid     = getpid(); -        msg.has_fd  = true; -        msg.fd      = fd; +        msg.code          = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; +        msg.has_port_id  = true; +        msg.port_id      = _ap_instance->flows[fd].port_id;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -238,17 +385,15 @@ int flow_dealloc(int fd)          irm_msg_t * recv_msg = NULL;          int ret = -1; -        msg.code    = IRM_MSG_CODE__IRM_FLOW_DEALLOC; -        msg.has_pid = true; -        msg.pid     = getpid(); -        msg.has_fd  = true; -        msg.fd      = fd; +        msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC; +        msg.has_port_id  = true; +        msg.port_id      = _ap_instance->flows[fd].port_id;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -259,47 +404,50 @@ int flow_dealloc(int fd)          return ret;  } -int flow_cntl(int fd, int oflags) +int flow_cntl(int fd, int cmd, int oflags)  { -        irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int ret = -1; - -        msg.has_pid = true; -        msg.pid     = getpid(); -        msg.has_fd  = true; -        msg.fd      = fd; -        msg.oflags  = oflags; +        return -1; +} -        recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) +ssize_t flow_write(int fd, void * buf, size_t count) +{ +        /* the AP chooses the amount of headspace and tailspace */ +        size_t index = shm_create_du_buff(_ap_instance->dum, +                                          count + DU_BUFF_HEADSPACE + +                                          DU_BUFF_TAILSPACE, +                                          DU_BUFF_HEADSPACE, +                                          (uint8_t *) buf, +                                          count); +        struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; +        if (index == -1)                  return -1; -        if (recv_msg->has_result == false) { -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; +        if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { +                shm_release_du_buff(_ap_instance->dum, index); +                return -EPIPE;          } -        ret = recv_msg->result; -        irm_msg__free_unpacked(recv_msg, NULL); - -        return ret; +        return 0;  } -ssize_t flow_write(int fd, -                   void * buf, -                   size_t count) +ssize_t flow_read(int fd, void * buf, size_t count)  { -        LOG_MISSING; +        struct rb_entry * e = NULL; +        int n; +        uint8_t * sdu; +        /* FIXME: move this to a thread  */ +        while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id) +                e = shm_ap_rbuff_read(_ap_instance->rb); + +        n = shm_du_map_read_sdu(&sdu, +                                _ap_instance->dum, +                                e->index); +        if (n < 0) +                return -1; -        return -1; -} +        memcpy(buf, sdu, MIN(n, count)); -ssize_t flow_read(int fd, -                  void * buf, -                  size_t count) -{ -        LOG_MISSING; +        shm_release_du_buff(_ap_instance->dum, e->index); -        return -1; +        return n;  } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 387572b3..75676915 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -121,6 +121,8 @@ pid_t ipcp_create(char *         ipcp_name,                  return pid;          } +        /* clear fd table */ +          if (ipcp_type == IPCP_NORMAL)                  exec_name = IPCP_NORMAL_EXEC;          else if (ipcp_type == IPCP_SHIM_UDP) @@ -286,13 +288,8 @@ int ipcp_enroll(pid_t pid,                  return -EINVAL;          msg.code        = IPCP_MSG_CODE__IPCP_ENROLL; -        msg.member_name = malloc(sizeof(*(msg.member_name))); -        if (msg.member_name == NULL) { -                LOG_ERR("Failed to malloc."); -                return -1; -        } -        msg.n_1_dif     = n_1_dif;          msg.member_name = member_name; +        msg.n_1_dif     = n_1_dif;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL) { @@ -323,8 +320,8 @@ int ipcp_name_reg(pid_t    pid,          if (name == NULL)                  return -1; -        msg.code          = IPCP_MSG_CODE__IPCP_NAME_REG; -        msg.name          = name; +        msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; +        msg.name = name;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL) @@ -368,6 +365,7 @@ int ipcp_name_unreg(pid_t  pid,  int ipcp_flow_alloc(pid_t             pid,                      uint32_t          port_id, +                    pid_t             n_pid,                      char *            dst_name,                      char *            src_ap_name,                      char *            src_ae_name, @@ -381,17 +379,19 @@ int ipcp_flow_alloc(pid_t             pid,                  return -EINVAL;          msg.code        = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; +        msg.has_port_id = true; +        msg.port_id     = port_id; +        msg.has_pid     = true; +        msg.pid         = n_pid;          msg.src_ap_name = src_ap_name;          msg.src_ae_name = src_ae_name;          msg.dst_name    = dst_name; -        msg.port_id     = port_id; -        msg.has_port_id = true;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  ipcp_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -404,17 +404,20 @@ int ipcp_flow_alloc(pid_t             pid,  int ipcp_flow_alloc_resp(pid_t    pid,                           uint32_t port_id, -                         int      result) +                         pid_t    n_pid, +                         int      response)  {          ipcp_msg_t msg = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL;          int ret = -1; -        msg.code        = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; -        msg.has_port_id = true; -        msg.port_id     = port_id; -        msg.has_result  = true; -        msg.result      = result; +        msg.code         = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; +        msg.has_port_id  = true; +        msg.port_id      = port_id; +        msg.has_pid      = true; +        msg.pid          = n_pid; +        msg.has_response = true; +        msg.response     = response;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL) @@ -431,38 +434,38 @@ int ipcp_flow_alloc_resp(pid_t    pid,          return ret;  } -int ipcp_flow_req_arr(pid_t    pid, -                      char *   dst_name, -                      char *   src_ap_name, -                      char *   src_ae_name) +int ipcp_flow_req_arr(pid_t  pid, +                      char * dst_name, +                      char * src_ap_name, +                      char * src_ae_name)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int fd = -1; +        int port_id = -1;          if (src_ap_name == NULL || src_ae_name == NULL)                  return -EINVAL;          msg.code          = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; +        msg.has_pid       = true; +        msg.pid           = pid;          msg.dst_name      = dst_name;          msg.ap_name       = src_ap_name;          msg.ae_name       = src_ae_name; -        msg.pid           = pid; -        msg.has_pid       = true;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_fd == false) { +        if (!recv_msg->has_port_id) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        fd = recv_msg->fd; +        port_id = recv_msg->port_id;          irm_msg__free_unpacked(recv_msg, NULL); -        return fd; +        return port_id;  }  int ipcp_flow_alloc_reply(pid_t    pid, @@ -509,11 +512,11 @@ int ipcp_flow_dealloc(pid_t    pid,                  recv_msg = send_recv_ipcp_msg(pid, &msg);                  if (recv_msg == NULL) -                        return -1; +                        return 0;                  if (recv_msg->has_result == false) {                          ipcp_msg__free_unpacked(recv_msg, NULL); -                        return -1; +                        return 0;                  }                  ret = recv_msg->result; @@ -531,11 +534,11 @@ int ipcp_flow_dealloc(pid_t    pid,                  recv_msg = send_recv_irm_msg(&msg);                  if (recv_msg == NULL) -                        return -1; +                        return 0;                  if (recv_msg->has_result == false) {                          irm_msg__free_unpacked(recv_msg, NULL); -                        return -1; +                        return 0;                  }                  ret = recv_msg->result; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index da4bb469..daca011d 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -25,6 +25,8 @@ message ipcp_msg {          optional string src_ap_name  =  9;          optional string src_ae_name  = 10;          optional dif_config_msg conf = 11; -        optional int32 result        = 12; -        optional int32 fd            = 13; +        optional int32 fd            = 12; +        optional int32 pid           = 13; +        optional int32 response      = 14; +        optional int32 result        = 15;  }; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 89e2c882..c336614e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -36,13 +36,10 @@ enum irm_msg_code {          IRM_FLOW_ALLOC        = 11;          IRM_FLOW_ALLOC_RES    = 12;          IRM_FLOW_DEALLOC      = 13; -        IRM_FLOW_CONTROL      = 14; -        IRM_FLOW_WRITE        = 15; -        IRM_FLOW_READ         = 16; -        IPCP_FLOW_REQ_ARR     = 17; -        IPCP_FLOW_ALLOC_REPLY = 18; -        IPCP_FLOW_DEALLOC     = 19; -        IRM_REPLY             = 20; +        IPCP_FLOW_REQ_ARR     = 14; +        IPCP_FLOW_ALLOC_REPLY = 15; +        IPCP_FLOW_DEALLOC     = 16; +        IRM_REPLY             = 17;  };  message irm_msg { @@ -52,12 +49,11 @@ message irm_msg {          optional uint32 api_id       =  3;          optional uint32 ipcp_type    =  5;          repeated string dif_name     =  6; -        optional int32 fd            =  7; -        optional int32 response      =  8; -        optional int32 oflags        =  9; -        optional string dst_name     = 10; -        optional uint32 port_id      = 11; -        optional int32 pid           = 12; -        optional dif_config_msg conf = 13; -        optional int32 result        = 14; +        optional int32 response      =  7; +        optional string dst_name     =  8; +        optional uint32 port_id      =  9; +        optional int32 pid           = 10; +        optional dif_config_msg conf = 11; +        optional int32 cfd           = 12; +        optional int32 result        = 13;  }; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c new file mode 100644 index 00000000..0a41dfb3 --- /dev/null +++ b/src/lib/shm_ap_rbuff.c @@ -0,0 +1,268 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for application processes + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/shm_ap_rbuff.h> +#define OUROBOROS_PREFIX "shm_ap_rbuff" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <stdbool.h> +#include <errno.h> + +#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry)          \ +                             + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)) + +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)\ +                          & (SHM_RBUFF_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) + +struct shm_ap_rbuff { +        struct rb_entry * shm_base;    /* start of entry */ +        size_t *          ptr_head;    /* start of ringbuffer head */ +        size_t *          ptr_tail;    /* start of ringbuffer tail */ +        pthread_mutex_t * shm_mutex;   /* lock all free space in shm */ +        pid_t             pid;         /* pid to which this rb belongs */ +        int               fd; +}; + +struct shm_ap_rbuff * shm_ap_rbuff_create() +{ +        struct shm_ap_rbuff * rb; +        int                   shm_fd; +        struct rb_entry *     shm_base; +        pthread_mutexattr_t   attr; +        char                  fn[25]; + +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); + +        rb = malloc(sizeof(*rb)); +        if (rb == NULL) { +                LOG_DBGF("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBGF("Failed creating ring buffer."); +                free(rb); +                return NULL; +        } + +        if (lseek(shm_fd, SHM_RBUFF_FILE_SIZE - 1, SEEK_SET) < 0) { +                LOG_DBGF("Failed to extend ringbuffer."); +                free(rb); +                return NULL; +        } + +        if (write(shm_fd, "", 1) != 1) { +                LOG_DBGF("Failed to finalise extension of ringbuffer."); +                free(rb); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_RBUFF_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        if (shm_base == MAP_FAILED) { +                LOG_DBGF("Failed to map shared memory."); +                if (close(shm_fd) == -1) +                        LOG_DBGF("Failed to close invalid shm."); + +                if (shm_unlink(fn) == -1) +                        LOG_DBGF("Failed to remove invalid shm."); + +                free(rb); +                return NULL; +        } + +        rb->shm_base = shm_base; +        rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); +        rb->ptr_tail = (size_t *) +                ((uint8_t *) rb->ptr_head + sizeof(size_t)); +        rb->shm_mutex = (pthread_mutex_t *) +                ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + +        pthread_mutexattr_init(&attr); +        pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); +        pthread_mutex_init(rb->shm_mutex, &attr); + +        *rb->ptr_head = 0; +        *rb->ptr_tail = 0; + +        rb->fd  = shm_fd; +        rb->pid = getpid(); + +        return rb; +} + +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid) +{ +        struct shm_ap_rbuff * rb; +        int                   shm_fd; +        struct rb_entry *     shm_base; +        char                  fn[25]; + +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", pid); + +        rb = malloc(sizeof(*rb)); +        if (rb == NULL) { +                LOG_DBGF("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(fn, O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBGF("Failed opening shared memory %s.", fn); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_RBUFF_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        if (shm_base == MAP_FAILED) { +                LOG_DBGF("Failed to map shared memory."); +                if (close(shm_fd) == -1) +                        LOG_DBGF("Failed to close invalid shm."); + +                if (shm_unlink(fn) == -1) +                        LOG_DBGF("Failed to remove invalid shm."); + +                free(rb); +                return NULL; +        } + +        rb->shm_base = shm_base; +        rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); +        rb->ptr_tail = (size_t *) +                ((uint8_t *) rb->ptr_head + sizeof(size_t)); +        rb->shm_mutex = (pthread_mutex_t *) +                ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + +        rb->fd = shm_fd; +        rb->pid = pid; + +        return rb; +} +void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) +{ +        char fn[25]; + +        if (rb == NULL) { +                LOG_DBGF("Bogus input. Bugging out."); +                return; +        } + +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->pid); + +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBGF("Couldn't unmap shared memory."); + +        free(rb); +} + +void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) +{ +        char fn[25]; + + +        if (rb == NULL) { +                LOG_DBGF("Bogus input. Bugging out."); +                return; +        } + +        if (rb->pid != getpid()) { +                LOG_ERR("Tried to destroy other AP's rbuff."); +                return; +        } + +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->pid); + +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBGF("Couldn't unmap shared memory."); + +        if (shm_unlink(fn) == -1) +                LOG_DBGF("Failed to unlink shm."); + +        free(rb); +} + +int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) +{ +        struct rb_entry * pos; + +        if (rb == NULL || e == NULL) +                return -1; + +        pthread_mutex_lock(rb->shm_mutex); + +        if (!shm_rbuff_free(rb)) { +                pthread_mutex_unlock(rb->shm_mutex); +                return -1; +        } + +        pos = rb->shm_base + *rb->ptr_head; +        *pos = *e; +        *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); + +        pthread_mutex_unlock(rb->shm_mutex); + +        return 0; +} +struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) +{ +        struct rb_entry * e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        if (rb == NULL) +                return NULL; + +        pthread_mutex_lock(rb->shm_mutex); + +        if (shm_rbuff_used(rb) == 0) { +                pthread_mutex_unlock(rb->shm_mutex); +                return NULL; +        } + +        *e = *(rb->shm_base + *rb->ptr_tail); + +        *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + +        pthread_mutex_unlock(rb->shm_mutex); + +        return e; +} diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index dfccca6a..56062c9d 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -45,6 +45,9 @@  ((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail *                      \                                           SHM_DU_BUFF_BLOCK_SIZE))) +#define idx_to_du_buff_ptr(dum, idx)                                           \ +        ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) +  #define block_ptr_to_idx(dum, sdb)                                             \          (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) @@ -52,27 +55,31 @@                            & (SHM_BLOCKS_IN_MAP - 1))  #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail -            \ +                            idx_to_du_buff_ptr(dum, idx)->du_head) +  #define MIN(a,b)(a < b ? a : b)  struct shm_du_buff { -        size_t            size; -        size_t            du_head; -        size_t            du_tail; +        size_t size; +        size_t du_head; +        size_t du_tail; +        size_t garbage;  };  struct shm_du_map { -        uint8_t            * shm_base;    /* start of blocks */ -        size_t             * ptr_head;    /* start of ringbuffer head */ -        size_t             * ptr_tail;    /* start of ringbuffer tail */ -        pthread_mutex_t    * shm_mutex;   /* lock all free space in shm */ -        int                  fd; +        uint8_t *         shm_base;    /* start of blocks */ +        size_t *          ptr_head;    /* start of ringbuffer head */ +        size_t *          ptr_tail;    /* start of ringbuffer tail */ +        pthread_mutex_t * shm_mutex;   /* lock all free space in shm */ +        int               fd;  };  struct shm_du_map * shm_du_map_create()  {          struct shm_du_map * dum;          int                 shm_fd; -        uint8_t           * shm_base; +        uint8_t *           shm_base;          pthread_mutexattr_t attr;          dum = malloc(sizeof *dum); @@ -141,7 +148,13 @@ struct shm_du_map * shm_du_map_open()  {          struct shm_du_map * dum;          int                 shm_fd; -        uint8_t           * shm_base; +        uint8_t *           shm_base; + +        dum = malloc(sizeof *dum); +        if (dum == NULL) { +                LOG_DBGF("Could not allocate struct."); +                return NULL; +        }          shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666);          if (shm_fd == -1) { @@ -166,12 +179,6 @@ struct shm_du_map * shm_du_map_open()                  return NULL;          } -        dum = malloc(sizeof *dum); -        if (dum == NULL) { -                LOG_DBGF("Could not allocate struct."); -                return NULL; -        } -          dum->shm_base = shm_base;          dum->ptr_head = (size_t *)                  ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); @@ -195,38 +202,52 @@ void shm_du_map_close(struct shm_du_map * dum)          if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)                  LOG_DBGF("Couldn't unmap shared memory."); +        free(dum); +} + +void shm_du_map_destroy(struct shm_du_map * dum) +{ +        if (dum == NULL) { +                LOG_DBGF("Bogus input. Bugging out."); +                return; +        } + +        if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) +                LOG_DBGF("Couldn't unmap shared memory."); +          if (shm_unlink(SHM_DU_MAP_FILENAME) == -1)                  LOG_DBGF("Failed to unlink shm.");          free(dum);  } -struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, -                                        size_t              size, -                                        size_t              headspace, -                                        uint8_t           * data, -                                        size_t              len) +int shm_create_du_buff(struct shm_du_map * dum, +                       size_t              size, +                       size_t              headspace, +                       uint8_t *           data, +                       size_t              len)  {          struct shm_du_buff * sdb;          long                 blocks = 0;          int                  sz = size + sizeof *sdb;          int                  sz2 = headspace + len + sizeof *sdb; -        uint8_t            * write_pos; +        uint8_t *            write_pos;          size_t               copy_len; +        size_t               index;          if (dum == NULL || data == NULL) {                  LOG_DBGF("Bogus input, bugging out."); -                return NULL; +                return -1;          }          if (headspace >= size) {                  LOG_DBGF("Index out of bounds."); -                return NULL; +                return -1;          }          if (headspace + len > size) {                  LOG_DBGF("Buffer too small for data."); -                return NULL; +                return -1;          }          pthread_mutex_lock(dum->shm_mutex); @@ -237,20 +258,20 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,                  if (sz2 < 0 && sz > 0) {                          pthread_mutex_unlock(dum->shm_mutex);                          LOG_DBG("Can't handle this packet now"); -                        return NULL; +                        return -1;                  }                  ++blocks;          }          if (!shm_map_free(dum, blocks)) {                  pthread_mutex_unlock(dum->shm_mutex); -                LOG_DBGF("Allocation failed, Out of Memory."); -                return NULL; +                return -1;          }          sdb = get_head_ptr(dum);          sdb->size = size; +        sdb->garbage = 0;          sdb->du_head = headspace;          sdb->du_tail = sdb->du_head + len; @@ -267,32 +288,76 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,                  --blocks;          } +        index = *dum->ptr_head - 1; +          pthread_mutex_unlock(dum->shm_mutex); -        return sdb; +        return index;  } -int shm_release_du_buff(struct shm_du_map * dum) +/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ +int shm_du_map_read_sdu(uint8_t **          dst, +                        struct shm_du_map * dum, +                        size_t              idx) +{ +        size_t    len = 0; + +        if (idx > SHM_BLOCKS_IN_MAP) +                return -1; + +        pthread_mutex_lock(dum->shm_mutex); + +        if (*dum->ptr_head == *dum->ptr_tail) { +                pthread_mutex_unlock(dum->shm_mutex); +                return -1; +        } + +        *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) + +                sizeof(struct shm_du_buff) + +                idx_to_du_buff_ptr(dum, idx)->du_head; +        len = sdu_size(dum, idx); + +        pthread_mutex_unlock(dum->shm_mutex); + +        return len; +} + +int shm_release_du_buff(struct shm_du_map * dum, size_t idx)  {          long sz;          long blocks = 0; + +        /* FIXME: this is crap for the test */ +        if (idx > SHM_BLOCKS_IN_MAP) +                idx = *dum->ptr_tail; +          pthread_mutex_lock(dum->shm_mutex);          if (*dum->ptr_head == *dum->ptr_tail) { -                LOG_DBGF("Attempt to free empty ringbuffer. Nothing to do.");                  pthread_mutex_unlock(dum->shm_mutex);                  return -1;          } -        sz = get_tail_ptr(dum)->size; +        idx_to_du_buff_ptr(dum, idx)->garbage = 1; -        while (sz + (long) sizeof (struct shm_du_buff) > 0) { -                sz -= SHM_DU_BUFF_BLOCK_SIZE; -                ++blocks; +        if (idx != *dum->ptr_tail) { +                pthread_mutex_unlock(dum->shm_mutex); +                return 0; +        } + +        while (get_tail_ptr(dum)->garbage == 1) { +                sz = get_tail_ptr(dum)->size; + +                while (sz + (long) sizeof (struct shm_du_buff) > 0) { +                        sz -= SHM_DU_BUFF_BLOCK_SIZE; +                        ++blocks; +                } + +                *(dum->ptr_tail) = +                        (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);          } -        *(dum->ptr_tail) = (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);          pthread_mutex_unlock(dum->shm_mutex);          return 0; @@ -317,7 +382,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,  }  uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, -                                 size_t size) +                                 size_t               size)  {          if (sdb == NULL) {                  LOG_DBGF("Bogus input, bugging out."); @@ -335,7 +400,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb,  }  int shm_du_buff_head_release(struct shm_du_buff * sdb, -                             size_t size) +                             size_t               size)  {          if (sdb == NULL) {                  LOG_DBGF("Bogus input, bugging out."); @@ -353,7 +418,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb,  }  int shm_du_buff_tail_release(struct shm_du_buff * sdb, -                             size_t size) +                             size_t               size)  {          if (sdb == NULL) {                  LOG_DBGF("Bogus input, bugging out."); diff --git a/src/lib/tests/shm_du_map_test.c b/src/lib/tests/shm_du_map_test.c index 85a82e4d..55938a62 100644 --- a/src/lib/tests/shm_du_map_test.c +++ b/src/lib/tests/shm_du_map_test.c @@ -32,7 +32,7 @@  #include <ouroboros/logs.h> -#define SIZE_OF_DU_BUFF 24 +#define SIZE_OF_DU_BUFF 32  #define TEST_BUFF_SIZE (SHM_DU_BUFF_BLOCK_SIZE - SIZE_OF_DU_BUFF)  #define MAX(a,b) (a > b ? a : b) @@ -44,7 +44,7 @@ void * produce()  {          struct shm_du_map * dum;          long                test_buf_size = 0; -        uint8_t           * test_values; +        uint8_t *           test_values;          int                 headspace;          int                 tailspace;          long                i; @@ -66,9 +66,8 @@ void * produce()                  test_values[i] = 170;          clock_gettime(CLOCK_MONOTONIC, &starttime); -        for (i = 1; i < SHM_BLOCKS_IN_MAP; i++) { -                struct shm_du_buff * sdb; -                size_t               len; +        for (i = 1; i < 16 * SHM_BLOCKS_IN_MAP; i++) { +                size_t len;                  test_buf_size = TEST_BUFF_SIZE; @@ -77,21 +76,19 @@ void * produce()                  len = test_buf_size - (headspace + tailspace); -                sdb = shm_create_du_buff(dum, -                                         test_buf_size, -                                         headspace, -                                         test_values, -                                         len); - -                if (sdb != NULL) { -                        bytes_written += len; -                } -                else { -                        sync = -2; -                        break; +                if (shm_create_du_buff(dum, +                                       test_buf_size, +                                       headspace, +                                       test_values, +                                       len) < 0) { +                        continue;                  } + +                bytes_written += len;          } +        sync = -2; +          clock_gettime(CLOCK_MONOTONIC, &stoptime);          elapsed =(stoptime.tv_sec + stoptime.tv_nsec / 1000000000.0) -                  (starttime.tv_sec + starttime.tv_nsec / 1000000000.0); @@ -104,13 +101,14 @@ void * produce()          sync = -1; +        shm_du_map_close(dum); +          return 0;  }  void * consume()  {          struct shm_du_map * dum; -          struct timespec     ts;          ts.tv_sec = 0; @@ -123,10 +121,15 @@ void * consume()                  return (void *)-1;          } -        while (!sync) { -                while (!shm_release_du_buff(dum)); -                nanosleep(&ts, NULL); +        while (true) { +                shm_release_du_buff(dum, 1823429173941); +                if (sync) +                        break;          } +        nanosleep(&ts, NULL); + + +        shm_du_map_close(dum);          return 0;  } @@ -149,7 +152,7 @@ int shm_du_map_test(int argc, char ** argv)                  return -1;          } -        shm_du_map_close(dum); +        shm_du_map_destroy(dum);          LOG_INFO("done."); @@ -165,7 +168,7 @@ int shm_du_map_test(int argc, char ** argv)          pthread_create(&consumer, NULL, consume, NULL);          pthread_join(consumer, NULL); -        shm_du_map_close(dum); +        shm_du_map_destroy(dum);          LOG_INFO("done."); @@ -173,6 +176,8 @@ int shm_du_map_test(int argc, char ** argv)          LOG_INFO("starting concurrency test."); +        sync = 0; +          dum = shm_du_map_create();          res1 = (int) pthread_create(&producer, NULL, produce, NULL); @@ -181,7 +186,7 @@ int shm_du_map_test(int argc, char ** argv)          pthread_join(producer, NULL);          pthread_join(consumer, NULL); -        shm_du_map_close(dum); +        shm_du_map_destroy(dum);          LOG_INFO("done."); diff --git a/src/tools/echo/echo_client.c b/src/tools/echo/echo_client.c index 8d3fc322..9cf56cee 100644 --- a/src/tools/echo/echo_client.c +++ b/src/tools/echo/echo_client.c @@ -23,19 +23,25 @@  #define CLIENT_AP_NAME "echo-client"  #include <ouroboros/dev.h> +#include <stdlib.h>  int client_main()  {          int fd = 0;          int result = 0; -        uint8_t buf[BUF_SIZE]; +        char buf[BUF_SIZE];          char * message  = "Client says hi!";          ssize_t count = 0; -        fd = flow_alloc(SERVER_AP_NAME, CLIENT_AP_NAME, -                        NULL, NULL, 0); +        if(ap_init(CLIENT_AP_NAME)) { +                printf("Failed to init AP."); +                return -1; +        } + +        fd = flow_alloc(SERVER_AP_NAME, NULL, NULL);          if (fd < 0) {                  printf("Failed to allocate flow\n"); +                ap_fini();                  return -1;          } @@ -43,12 +49,14 @@ int client_main()          if (result < 0) {                  printf("Flow allocation refused\n");                  flow_dealloc(fd); +                ap_fini();                  return -1;          }          if (flow_write(fd, message, strlen(message) + 1) == -1) {                  printf("Failed to write SDU\n");                  flow_dealloc(fd); +                ap_fini();                  return -1;          } @@ -56,6 +64,7 @@ int client_main()          if (count < 0) {                  printf("Failed to read SDU\n");                  flow_dealloc(fd); +                ap_fini();                  return -1;          } @@ -63,5 +72,7 @@ int client_main()          flow_dealloc(fd); +        ap_fini(); +          return 0;  } diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c index e457e22b..d9af1c1f 100644 --- a/src/tools/echo/echo_server.c +++ b/src/tools/echo/echo_server.c @@ -33,65 +33,72 @@ void shutdown_server(int signo)  {          char * dif = DIF_NAME; -        if (ap_unreg(SERVER_AP_NAME, &dif, 1)) { -                printf("Failed to unregister application\n"); +        if (ap_unreg(&dif, 1)) { +                printf("Failed to unregister application.\n"); +                ap_fini();                  exit(EXIT_FAILURE);          } +        ap_fini();          exit(EXIT_SUCCESS);  }  int server_main()  { -        int server_fd = 0; -        int client_fd = 0; +        int    server_fd = 0; +        int    client_fd = 0;          char * dif = DIF_NAME;          char * client_name = NULL; -        uint8_t buf[BUF_SIZE]; +        char   buf[BUF_SIZE];          ssize_t count = 0; -        printf("Starting the server\n"); +        printf("Starting the server.\n");          /* Manual cleanup is required for now */          if (signal(SIGINT, shutdown_server) == SIG_ERR) { -                printf("Can't install signal handler\n"); +                printf("Can't install signal handler.\n");                  return -1;          } -        server_fd = ap_reg(SERVER_AP_NAME, &dif, 1); -        if (server_fd < 0) { -                printf("Failed to register application\n"); +        if(ap_init(SERVER_AP_NAME)) { +                printf("Failed to init AP.");                  return -1;          } -        printf("Echo server started...\n"); +        server_fd = ap_reg(&dif, 1); +        if (server_fd < 0) { +                printf("Failed to register application.\n"); +                ap_fini(); +                return -1; +        }          while (true) {                  client_fd = flow_accept(server_fd, -                                        client_name, NULL); +                                        &client_name, NULL);                  if (client_fd < 0) { -                        continue; +                        printf("Failed to accept flow.\n"); +                        break;                  } -                printf("New flow from %s\n", client_name); +                printf("New flow from %s.\n", client_name);                  if (flow_alloc_resp(client_fd, 0)) { -                        printf("Failed to give an allocate response\n"); +                        printf("Failed to give an allocate response.\n");                          flow_dealloc(client_fd);                          continue;                  } -                count = flow_read(client_fd, buf, BUF_SIZE); +                count = flow_read(client_fd, (void **) &buf, BUF_SIZE);                  if (count < 0) { -                        printf("Failed to read SDU\n"); +                        printf("Failed to read SDU.\n");                          flow_dealloc(client_fd);                          continue;                  } -                printf("Message from client is %.*s\n", (int) count, buf); +                printf("Message from client is %.*s.\n", (int) count, buf);                  if (flow_write(client_fd, buf, count) == -1) { -                        printf("Failed to write SDU\n"); +                        printf("Failed to write SDU.\n");                          flow_dealloc(client_fd);                          continue;                  } @@ -99,5 +106,7 @@ int server_main()                  flow_dealloc(client_fd);          } +        ap_fini(); +          return 0;  } | 
