diff options
Diffstat (limited to 'src')
32 files changed, 1975 insertions, 2763 deletions
| diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h deleted file mode 100644 index 01226c1e..00000000 --- a/src/ipcpd/flow.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Flows - * - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCP_FLOW_H -#define OUROBOROS_IPCP_FLOW_H - -#include <ouroboros/list.h> -#include <ouroboros/shm_ap_rbuff.h> - -#include <stdint.h> - -struct flow { -        int                   port_id; -        struct shm_ap_rbuff * rb; -        enum flow_state       state; - -        pid_t                 api; -}; - -#endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index 36245eea..4971dbb5 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -30,7 +30,6 @@  #include <pthread.h>  #include "ipcp-ops.h" -#include "flow.h"  struct ipcp_data {          enum ipcp_type      type; @@ -46,24 +45,32 @@ struct ipcp_data {  };  struct ipcp_data * ipcp_data_create(); +  struct ipcp_data * ipcp_data_init(struct ipcp_data * dst,                                    enum ipcp_type     ipcp_type); +  void               ipcp_data_destroy(struct ipcp_data * data); -int          ipcp_data_add_reg_entry(struct ipcp_data * data, -                                     char *             name); -int          ipcp_data_del_reg_entry(struct ipcp_data * data, -                                     const char *       name); -int          ipcp_data_add_dir_entry(struct ipcp_data * data, -                                     char *             ap_name, -                                     uint64_t           addr); -int          ipcp_data_del_dir_entry(struct ipcp_data * data, -                                     const char *       ap_name, -                                     uint64_t           addr); -bool         ipcp_data_is_in_registry(struct ipcp_data * data, -                                      const char *       name); -bool         ipcp_data_is_in_directory(struct ipcp_data * data, -                                       const char *       ap_name); -uint64_t     ipcp_data_get_addr(struct ipcp_data * data, -                                const char *       ap_name); +int                ipcp_data_add_reg_entry(struct ipcp_data * data, +                                           char *             name); + +int                ipcp_data_del_reg_entry(struct ipcp_data * data, +                                           const char *       name); + +int                ipcp_data_add_dir_entry(struct ipcp_data * data, +                                           char *             ap_name, +                                           uint64_t           addr); + +int                ipcp_data_del_dir_entry(struct ipcp_data * data, +                                           const char *       ap_name, +                                           uint64_t           addr); + +bool               ipcp_data_is_in_registry(struct ipcp_data * data, +                                            const char *       name); + +bool               ipcp_data_is_in_directory(struct ipcp_data * data, +                                             const char *       ap_name); + +uint64_t           ipcp_data_get_addr(struct ipcp_data * data, +                                      const char *       ap_name);  #endif /* IPCPD_IPCP_DATA_H */ diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index e43c2c38..815cda09 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -25,23 +25,26 @@  #define IPCPD_IPCP_OPS_H  #include <ouroboros/irm_config.h> -#include <ouroboros/common.h> -#include <sys/types.h> +#include <ouroboros/shared.h>  struct ipcp_ops {          int   (* ipcp_bootstrap)(struct dif_config * conf); +          int   (* ipcp_enroll)(char * dif_name); -        int   (* ipcp_name_reg)(char *   name); + +        int   (* ipcp_name_reg)(char * name); +          int   (* ipcp_name_unreg)(char * name); -        int   (* ipcp_flow_alloc)(pid_t         n_api, -                                  int           port_id, + +        int   (* ipcp_flow_alloc)(int           fd,                                    char *        dst_ap_name,                                    char *        src_ae_name,                                    enum qos_cube qos); -        int   (* ipcp_flow_alloc_resp)(pid_t n_api, -                                       int   port_id, -                                       int   response); -        int   (* ipcp_flow_dealloc)(int port_id); + +        int   (* ipcp_flow_alloc_resp)(int fd, +                                       int response); + +        int   (* ipcp_flow_dealloc)(int fd);  };  #endif /* IPCPD_IPCP_OPS_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index ec5ab927..db72b88d 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -21,8 +21,12 @@   */  #include <ouroboros/config.h> -#include <ouroboros/ipcp.h>  #include <ouroboros/time_utils.h> +#include <ouroboros/utils.h> +#include <ouroboros/sockets.h> +#include <ouroboros/errno.h> +#include <ouroboros/dev.h> +#include <ouroboros/np1_flow.h>  #define OUROBOROS_PREFIX "ipcpd/ipcp"  #include <ouroboros/logs.h> @@ -32,62 +36,68 @@  #include <stdlib.h>  #include "ipcp.h" -struct ipcp * ipcp_instance_create() +int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops)  {          pthread_condattr_t cattr; -        struct ipcp * i = malloc(sizeof *i); -        if (i == NULL) -                return NULL; +        ipcpi.irmd_fd = -1; +        ipcpi.state   = IPCP_INIT; -        i->data    = NULL; -        i->ops     = NULL; -        i->irmd_fd = -1; -        i->state   = IPCP_INIT; +        ipcpi.ops = ops; -        pthread_rwlock_init(&i->state_lock, NULL); -        pthread_mutex_init(&i->state_mtx, NULL); +        ipcpi.data = ipcp_data_create(); +        if (ipcpi.data == NULL) +                return -ENOMEM; + +        ipcp_data_init(ipcpi.data, type); + +        pthread_rwlock_init(&ipcpi.state_lock, NULL); +        pthread_mutex_init(&ipcpi.state_mtx, NULL);          pthread_condattr_init(&cattr);  #ifndef __APPLE__          pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);  #endif -        pthread_cond_init(&i->state_cond, &cattr); +        pthread_cond_init(&ipcpi.state_cond, &cattr); -        return i; +        pthread_create(&ipcpi.mainloop, NULL, ipcp_main_loop, NULL); + +        return 0;  } -void ipcp_set_state(struct ipcp * ipcp, -                    enum ipcp_state state) +void ipcp_fini()  { -        if (ipcp == NULL) -            return; +        pthread_join(ipcpi.mainloop, NULL); -        pthread_mutex_lock(&ipcp->state_mtx); +        ipcp_data_destroy(ipcpi.data); +        pthread_cond_destroy(&ipcpi.state_cond); +        pthread_mutex_destroy(&ipcpi.state_mtx); +        pthread_rwlock_destroy(&ipcpi.state_lock); +} + +void ipcp_set_state(enum ipcp_state state) +{ +        pthread_mutex_lock(&ipcpi.state_mtx); -        ipcp->state = state; +        ipcpi.state = state; -        pthread_cond_broadcast(&ipcp->state_cond); -        pthread_mutex_unlock(&ipcp->state_mtx); +        pthread_cond_broadcast(&ipcpi.state_cond); +        pthread_mutex_unlock(&ipcpi.state_mtx);  } -enum ipcp_state ipcp_get_state(struct ipcp * ipcp) +enum ipcp_state ipcp_get_state()  {          enum ipcp_state state; -        if (ipcp == NULL) -                return IPCP_NULL; +        pthread_mutex_lock(&ipcpi.state_mtx); -        pthread_mutex_lock(&ipcp->state_mtx); +        state = ipcpi.state; -        state = ipcp->state; - -        pthread_mutex_unlock(&ipcp->state_mtx); +        pthread_mutex_unlock(&ipcpi.state_mtx);          return state;  } -int ipcp_wait_state(struct ipcp *           ipcp, -                    enum ipcp_state         state, +int ipcp_wait_state(enum ipcp_state         state,                      const struct timespec * timeout)  {          struct timespec abstime; @@ -95,24 +105,24 @@ int ipcp_wait_state(struct ipcp *           ipcp,          clock_gettime(PTHREAD_COND_CLOCK, &abstime);          ts_add(&abstime, timeout, &abstime); -        pthread_mutex_lock(&ipcp->state_mtx); +        pthread_mutex_lock(&ipcpi.state_mtx); -        while (ipcp->state != state && ipcp->state != IPCP_SHUTDOWN) { +        while (ipcpi.state != state && ipcpi.state != IPCP_SHUTDOWN) {                  int ret;                  if (timeout == NULL) -                        ret = pthread_cond_wait(&ipcp->state_cond, -                                                &ipcp->state_mtx); +                        ret = pthread_cond_wait(&ipcpi.state_cond, +                                                &ipcpi.state_mtx);                  else -                        ret = pthread_cond_timedwait(&ipcp->state_cond, -                                                     &ipcp->state_mtx, +                        ret = pthread_cond_timedwait(&ipcpi.state_cond, +                                                     &ipcpi.state_mtx,                                                       &abstime);                  if (ret) { -                        pthread_mutex_unlock(&ipcp->state_mtx); +                        pthread_mutex_unlock(&ipcpi.state_mtx);                          return -ret;                  }          } -        pthread_mutex_unlock(&ipcp->state_mtx); +        pthread_mutex_unlock(&ipcpi.state_mtx);          return 0;  } @@ -161,7 +171,6 @@ void * ipcp_main_loop(void * o)          int     lsockfd;          int     sockfd;          uint8_t buf[IPCP_MSG_BUF_SIZE]; -        struct ipcp * _ipcp = (struct ipcp *) o;          ipcp_msg_t * msg;          ssize_t      count; @@ -180,12 +189,6 @@ void * ipcp_main_loop(void * o)          struct timeval ltv = {(SOCKET_TIMEOUT / 1000),                               (SOCKET_TIMEOUT % 1000) * 1000}; - -        if (_ipcp == NULL) { -                LOG_ERR("Invalid ipcp struct."); -                return (void *) 1; -        } -          sock_path = ipcp_sock_path(getpid());          if (sock_path == NULL)                  return (void *) 1; @@ -202,13 +205,15 @@ void * ipcp_main_loop(void * o)                  LOG_WARN("Failed to set timeout on socket.");          while (true) { -                pthread_rwlock_rdlock(&_ipcp->state_lock); -                if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                int fd = -1; + +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                if (ipcp_get_state() == IPCP_SHUTDOWN) { +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          break;                  } -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -235,7 +240,7 @@ void * ipcp_main_loop(void * o)                  switch (msg->code) {                  case IPCP_MSG_CODE__IPCP_BOOTSTRAP: -                        if (_ipcp->ops->ipcp_bootstrap == NULL) { +                        if (ipcpi.ops->ipcp_bootstrap == NULL) {                                  LOG_ERR("Bootstrap unsupported.");                                  break;                          } @@ -267,72 +272,102 @@ void * ipcp_main_loop(void * o)                                  conf.if_name = conf_msg->if_name;                          ret_msg.has_result = true; -                        ret_msg.result = _ipcp->ops->ipcp_bootstrap(&conf); +                        ret_msg.result = ipcpi.ops->ipcp_bootstrap(&conf);                          if (ret_msg.result < 0)                                  free(conf.dif_name);                          break;                  case IPCP_MSG_CODE__IPCP_ENROLL: -                        if (_ipcp->ops->ipcp_enroll == NULL) { +                        if (ipcpi.ops->ipcp_enroll == NULL) {                                  LOG_ERR("Enroll unsupported.");                                  break;                          }                          ret_msg.has_result = true; -                        ret_msg.result = _ipcp->ops->ipcp_enroll(msg->dif_name); +                        ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dif_name);                          break;                  case IPCP_MSG_CODE__IPCP_NAME_REG: -                        if (_ipcp->ops->ipcp_name_reg == NULL) { +                        if (ipcpi.ops->ipcp_name_reg == NULL) {                                  LOG_ERR("Ap_reg unsupported.");                                  break;                          }                          msg_name_dup = strdup(msg->name);                          ret_msg.has_result = true;                          ret_msg.result = -                                _ipcp->ops->ipcp_name_reg(msg_name_dup); +                                ipcpi.ops->ipcp_name_reg(msg_name_dup);                          if (ret_msg.result < 0)                                  free(msg_name_dup);                          break;                  case IPCP_MSG_CODE__IPCP_NAME_UNREG: -                        if (_ipcp->ops->ipcp_name_unreg == NULL) { +                        if (ipcpi.ops->ipcp_name_unreg == NULL) {                                  LOG_ERR("Ap_unreg unsupported.");                                  break;                          }                          ret_msg.has_result = true;                          ret_msg.result = -                                _ipcp->ops->ipcp_name_unreg(msg->name); +                                ipcpi.ops->ipcp_name_unreg(msg->name);                          break;                  case IPCP_MSG_CODE__IPCP_FLOW_ALLOC: -                        if (_ipcp->ops->ipcp_flow_alloc == NULL) { +                        if (ipcpi.ops->ipcp_flow_alloc == NULL) {                                  LOG_ERR("Flow_alloc unsupported.");                                  break;                          } +                        fd = np1_flow_alloc(msg->api, msg->port_id); +                        if (fd < 0) { +                                LOG_ERR("Could not get fd for flow."); +                                ret_msg.has_result = true; +                                ret_msg.result = -1; +                                break; +                        } +                          ret_msg.has_result = true;                          ret_msg.result = -                                _ipcp->ops->ipcp_flow_alloc(msg->api, -                                                            msg->port_id, +                                ipcpi.ops->ipcp_flow_alloc(fd,                                                              msg->dst_name,                                                              msg->src_ae_name,                                                              msg->qos_cube); +                        if (ret_msg.result < 0) { +                                LOG_DBG("Deallocating failed flow on port_id %d.", +                                        msg->port_id); +                                flow_dealloc(fd); +                        }                          break;                  case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: -                        if (_ipcp->ops->ipcp_flow_alloc_resp == NULL) { +                        if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) {                                  LOG_ERR("Flow_alloc_resp unsupported.");                                  break;                          } + +                        if (!msg->response) { +                                fd = np1_flow_resp(msg->api, msg->port_id); +                                if (fd < 0) { +                                        LOG_ERR("Could not get fd for flow."); +                                        ret_msg.has_result = true; +                                        ret_msg.result = -1; +                                        break; +                                } +                        }                          ret_msg.has_result = true;                          ret_msg.result = -                                _ipcp->ops->ipcp_flow_alloc_resp(msg->api, -                                                                 msg->port_id, -                                                                 msg->result); +                                ipcpi.ops->ipcp_flow_alloc_resp(fd, +                                                                msg->response);                          break;                  case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: -                        if (_ipcp->ops->ipcp_flow_dealloc == NULL) { +                        if (ipcpi.ops->ipcp_flow_dealloc == NULL) {                                  LOG_ERR("Flow_dealloc unsupported.");                                  break;                          } + +                        fd = np1_flow_dealloc(msg->port_id); +                        if (fd < 0) { +                                LOG_ERR("Could not get fd for flow."); +                                ret_msg.has_result = true; +                                ret_msg.result = -1; +                                break; +                        } +                          ret_msg.has_result = true;                          ret_msg.result = -                                _ipcp->ops->ipcp_flow_dealloc(msg->port_id); +                                ipcpi.ops->ipcp_flow_dealloc(fd);                          break;                  default:                          LOG_ERR("Don't know that message code"); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index edaea0fd..87c0c5d1 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -24,7 +24,6 @@  #define IPCPD_IPCP_H  #include <ouroboros/config.h> -#include <ouroboros/shared.h>  #include <pthread.h>  #include <time.h> @@ -50,23 +49,23 @@ struct ipcp {          pthread_rwlock_t   state_lock;          pthread_mutex_t    state_mtx;          pthread_cond_t     state_cond; -}; -struct ipcp *   ipcp_instance_create(); +        pthread_t          mainloop; +} ipcpi; + +int             ipcp_init(); -void            ipcp_set_state(struct ipcp *   ipcp, -                               enum ipcp_state state); +void            ipcp_fini(); -enum ipcp_state ipcp_get_state(struct ipcp * ipcp); +void            ipcp_set_state(enum ipcp_state state); -int             ipcp_wait_state(struct ipcp *           ipcp, -                                enum ipcp_state         state, +enum ipcp_state ipcp_get_state(); + +int             ipcp_wait_state(enum ipcp_state         state,                                  const struct timespec * timeout);  void *          ipcp_main_loop(void * o); -void *          ipcp_sdu_loop(void * o); -  int             ipcp_parse_arg(int argc,                                 char * argv[]); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index c0809429..1ccec0c0 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -22,17 +22,10 @@  #include <ouroboros/config.h>  #include "ipcp.h" -#include "flow.h"  #include <ouroboros/errno.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/list.h> -#include <ouroboros/utils.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/irm_config.h> -#include <ouroboros/bitmap.h> -#include <ouroboros/shared.h>  #include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/local-dev.h>  #define OUROBOROS_PREFIX "ipcpd/local"  #include <ouroboros/logs.h> @@ -46,176 +39,51 @@  #define THIS_TYPE IPCP_LOCAL -#define shim_data(type) ((struct ipcp_local_data *) type->data) -  /* global for trapping signal */  int irmd_api; -/* this IPCP's data */ -#ifdef MAKE_CHECK -extern struct ipcp * _ipcp; /* defined in test */ -#else -struct ipcp * _ipcp; -#endif - -/* - * copied from ouroboros/dev. The shim needs access to the internals - * because it doesn't follow all steps necessary steps to get - * the info - */ - -/* the shim needs access to these internals */ -struct shim_ap_data { -        pid_t                 api; -        struct shm_rdrbuff *  rdrb; -        struct bmp *          fds; -        struct shm_ap_rbuff * rb; - -        int                   in_out[AP_MAX_FLOWS]; +struct { +        int                   in_out[IRMD_MAX_FLOWS]; -        struct flow           flows[AP_MAX_FLOWS]; -        pthread_rwlock_t      flows_lock; - -        pthread_t             mainloop; +        pthread_rwlock_t      lock;          pthread_t             sduloop; +} local_data; -} * _ap_instance; - -static int shim_ap_init() +void local_data_init()  {          int i; +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) +                local_data.in_out[i] = -1; -        _ap_instance = malloc(sizeof(struct shim_ap_data)); -        if (_ap_instance == NULL) { -                return -1; -        } - -        _ap_instance->api = getpid(); - -        _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); -        if (_ap_instance->fds == NULL) { -                free(_ap_instance); -                return -1; -        } - -        _ap_instance->rdrb = shm_rdrbuff_open(); -        if (_ap_instance->rdrb == NULL) { -                bmp_destroy(_ap_instance->fds); -                free(_ap_instance); -                return -1; -        } - -        _ap_instance->rb = shm_ap_rbuff_create_n(); -        if (_ap_instance->rb == NULL) { -                shm_rdrbuff_close(_ap_instance->rdrb); -                bmp_destroy(_ap_instance->fds); -                free(_ap_instance); -                return -1; -        } - -        for (i = 0; i < AP_MAX_FLOWS; i ++) { -                _ap_instance->flows[i].rb = NULL; -                _ap_instance->flows[i].port_id = -1; -                _ap_instance->flows[i].state = FLOW_NULL; -                _ap_instance->in_out[i] = -1; -        } - -        pthread_rwlock_init(&_ap_instance->flows_lock, NULL); - -        return 0; -} - -void shim_ap_fini() -{ -        int i = 0; - -        if (_ap_instance == NULL) -                return; - -        pthread_rwlock_rdlock(&_ipcp->state_lock); - -        if (_ipcp->state != IPCP_SHUTDOWN) -                LOG_WARN("Cleaning up AP while not in shutdown."); - -        if (_ap_instance->fds != NULL) -                bmp_destroy(_ap_instance->fds); - -        /* remove all remaining sdus */ -        while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) -                shm_rdrbuff_remove(_ap_instance->rdrb, i); - -        if (_ap_instance->rdrb != NULL) -                shm_rdrbuff_close(_ap_instance->rdrb); -        if (_ap_instance->rb != NULL) -                shm_ap_rbuff_destroy(_ap_instance->rb); - -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); - -        for (i = 0; i < AP_MAX_FLOWS; i ++) -                if (_ap_instance->flows[i].rb != NULL) -                        shm_ap_rbuff_close(_ap_instance->flows[i].rb); - -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); - -        free(_ap_instance); +        pthread_rwlock_init(&local_data.lock, NULL);  } -/* only call this under flows_lock */ -static int port_id_to_fd(int port_id) +void local_data_fini()  { -        int i; - -        for (i = 0; i < AP_MAX_FLOWS; ++i) { -                if (_ap_instance->flows[i].port_id == port_id -                    && _ap_instance->flows[i].state != FLOW_NULL) -                        return i; -        } - -        return -1; +        pthread_rwlock_destroy(&local_data.lock);  } -/* - * end copy from dev.c - */ - -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */  static void * ipcp_local_sdu_loop(void * o)  {          while (true) { -                struct rb_entry * e; -                int fd; - -                e = shm_ap_rbuff_read(_ap_instance->rb); -                if (e == NULL) { -                        continue; -                } +                struct rb_entry e; +                int fd = local_flow_read(&e); -                pthread_rwlock_rdlock(&_ipcp->state_lock); +                pthread_rwlock_rdlock(&ipcpi.state_lock); -                if (_ipcp->state != IPCP_ENROLLED) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                if (ipcp_get_state() != IPCP_ENROLLED) { +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          return (void *) 1; /* -ENOTENROLLED */                  } -                pthread_rwlock_rdlock(&_ap_instance->flows_lock); -                fd = _ap_instance->in_out[port_id_to_fd(e->port_id)]; -                if (fd == -1) { -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); -                        free(e); -                        continue; -                } - -                e->port_id = _ap_instance->flows[fd].port_id; - -                while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0) -                        ; +                pthread_rwlock_rdlock(&local_data.lock); +                fd = local_data.in_out[fd]; +                pthread_rwlock_unlock(&local_data.lock); -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +                if (fd != -1) +                        local_flow_write(fd, &e); -                free(e); +                pthread_rwlock_unlock(&ipcpi.state_lock);          }          return (void *) 1; @@ -223,10 +91,6 @@ static void * ipcp_local_sdu_loop(void * o)  void ipcp_sig_handler(int sig, siginfo_t * info, void * c)  { -        sigset_t  sigset; -        sigemptyset(&sigset); -        sigaddset(&sigset, SIGINT); -          switch(sig) {          case SIGINT:          case SIGTERM: @@ -236,11 +100,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)                          LOG_DBG("IPCP %d terminating by order of %d. Bye.",                                  getpid(), info->si_pid); -                        pthread_rwlock_wrlock(&_ipcp->state_lock); +                        pthread_rwlock_wrlock(&ipcpi.state_lock); -                        ipcp_set_state(_ipcp, IPCP_SHUTDOWN); +                        ipcp_set_state(IPCP_SHUTDOWN); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                  }          default:                  return; @@ -254,307 +118,154 @@ static int ipcp_local_bootstrap(struct dif_config * conf)                  return -1;          } -        pthread_rwlock_wrlock(&_ipcp->state_lock); +        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_INIT) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_get_state() != IPCP_INIT) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("IPCP in wrong state.");                  return -1;          } -        ipcp_set_state(_ipcp, IPCP_ENROLLED); +        ipcp_set_state(IPCP_ENROLLED); -        pthread_create(&_ap_instance->sduloop, -                       NULL, -                       ipcp_local_sdu_loop, -                       NULL); +        pthread_create(&local_data.sduloop, NULL, ipcp_local_sdu_loop, NULL); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        LOG_DBG("Bootstrapped local IPCP with api %d.", -                getpid()); +        LOG_INFO("Bootstrapped local IPCP with api %d.", getpid());          return 0;  }  static int ipcp_local_name_reg(char * name)  { -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_data_add_reg_entry(_ipcp->data, name)) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_data_add_reg_entry(ipcpi.data, name)) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_DBGF("Failed to add %s to local registry.", name);                  return -1;          } -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        LOG_DBG("Registered %s.", name); +        LOG_INFO("Registered %s.", name);          return 0;  }  static int ipcp_local_name_unreg(char * name)  { -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        ipcp_data_del_reg_entry(_ipcp->data, name); +        ipcp_data_del_reg_entry(ipcpi.data, name); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); + +        LOG_INFO("Unregistered %s.", name);          return 0;  } -static int ipcp_local_flow_alloc(pid_t         n_api, -                                 int           port_id, +static int ipcp_local_flow_alloc(int           fd,                                   char *        dst_name,                                   char *        src_ae_name,                                   enum qos_cube qos)  { -        int in_fd = -1;          int out_fd = -1; -        struct shm_ap_rbuff * rb; - -        LOG_INFO("Allocating flow to %s.", dst_name); +        LOG_DBG("Allocating flow to %s on fd %d.", dst_name, fd);          if (dst_name == NULL || src_ae_name == NULL)                  return -1;          /* This ipcpd has all QoS */ -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_get_state() != IPCP_ENROLLED) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_DBGF("Won't register with non-enrolled IPCP.");                  return -1; /* -ENOTENROLLED */          } -        rb = shm_ap_rbuff_open_s(n_api); -        if (rb == NULL) { -                pthread_rwlock_unlock(&_ipcp->state_lock); -                return -1; /* -ENORBUFF */ -        } - -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_wrlock(&local_data.lock); -        in_fd = bmp_allocate(_ap_instance->fds); -        if (!bmp_is_id_valid(_ap_instance->fds, in_fd)) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                return -EMFILE; -        } +        out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); -        _ap_instance->flows[in_fd].port_id = port_id; -        _ap_instance->flows[in_fd].state   = FLOW_PENDING; -        _ap_instance->flows[in_fd].rb      = rb; +        local_data.in_out[fd]  = out_fd; +        local_data.in_out[out_fd] = fd; -        LOG_DBGF("Pending local flow with port_id %d.", port_id); +        pthread_rwlock_unlock(&local_data.lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        /* reply to IRM */ -        port_id = ipcp_flow_req_arr(getpid(), -                                    dst_name, -                                    src_ae_name); - -        if (port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_ERR("Could not get port id from IRMd"); -                /* shm_ap_rbuff_close(n_api); */ -                return -1; -        } - -        out_fd = bmp_allocate(_ap_instance->fds); -        if (!bmp_is_id_valid(_ap_instance->fds, out_fd)) { -                /* shm_ap_rbuff_close(n_api); */ -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                return -1; /* -ENOMOREFDS */ -        } - -        _ap_instance->flows[out_fd].port_id = port_id; -        _ap_instance->flows[out_fd].rb      = NULL; -        _ap_instance->flows[out_fd].state   = FLOW_PENDING; - -        _ap_instance->in_out[in_fd]  = out_fd; -        _ap_instance->in_out[out_fd] = in_fd; - -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); - -        LOG_DBGF("Pending local allocation request, port_id %d.", port_id); +        LOG_INFO("Pending local allocation request on fd %d.", fd);          return 0;  } -static int ipcp_local_flow_alloc_resp(pid_t n_api, -                                      int   port_id, -                                      int   response) +static int ipcp_local_flow_alloc_resp(int fd, int response)  { -        struct shm_ap_rbuff * rb; -        int in_fd = -1;          int out_fd = -1;          int ret = -1; +        LOG_DBG("Received response for fd %d: %d.", fd, response); +          if (response)                  return 0; -        pthread_rwlock_rdlock(&_ipcp->state_lock); - -        /* awaken pending flow */ - -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); - -        in_fd = port_id_to_fd(port_id); -        if (in_fd < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBGF("Could not find flow with port_id %d.", port_id); -                return -1; -        } - -        if (_ap_instance->flows[in_fd].state != FLOW_PENDING) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBGF("Flow was not pending."); -                return -1; -        } - -        rb = shm_ap_rbuff_open_s(n_api); -        if (rb == NULL) { -                LOG_ERR("Could not open N + 1 ringbuffer."); -                _ap_instance->flows[in_fd].state   = FLOW_NULL; -                _ap_instance->flows[in_fd].port_id = -1; -                _ap_instance->in_out[in_fd] = -1; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                return -1; -        } - -        _ap_instance->flows[in_fd].state = FLOW_ALLOCATED; -        _ap_instance->flows[in_fd].rb    = rb; - -        LOG_DBGF("Accepted flow, port_id %d on fd %d.", port_id, in_fd); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        out_fd = _ap_instance->in_out[in_fd]; +        out_fd = local_data.in_out[fd];          if (out_fd < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBGF("No pending local flow with port_id %d.", port_id); +                pthread_rwlock_unlock(&local_data.lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        if (_ap_instance->flows[out_fd].state != FLOW_PENDING) { -                 /* FIXME: clean up other end */ -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBGF("Flow was not pending."); -                return -1; -        } - -        _ap_instance->flows[out_fd].state = FLOW_ALLOCATED; - -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        if ((ret = ipcp_flow_alloc_reply(getpid(), -                                         _ap_instance->flows[out_fd].port_id, -                                         response)) < 0) { -                return -1; /* -EPIPE */ -        } +        if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) +                return -1; -        LOG_INFO("Flow allocation completed, port_ids (%d, %d).", -                 _ap_instance->flows[out_fd].port_id, -                 _ap_instance->flows[in_fd].port_id); +        LOG_INFO("Flow allocation completed, fds (%d, %d).", out_fd, fd);          return ret;  } -static int ipcp_local_flow_dealloc(int port_id) +static int ipcp_local_flow_dealloc(int fd)  { -        int fd = -1; -        struct shm_ap_rbuff * rb; - -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); - -        fd = port_id_to_fd(port_id); -        if (fd < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBGF("Could not find flow with port_id %d.", port_id); -                return 0; -        } - -        bmp_release(_ap_instance->fds, fd); - -        if (_ap_instance->in_out[fd] != -1) -                _ap_instance->in_out[_ap_instance->in_out[fd]] = -1; - -        _ap_instance->in_out[fd] = -1; - -        _ap_instance->flows[fd].state   = FLOW_NULL; -        _ap_instance->flows[fd].port_id = -1; -        rb = _ap_instance->flows[fd].rb; -        _ap_instance->flows[fd].rb      = NULL; - -        pthread_rwlock_unlock(&_ap_instance->flows_lock); - -        if (rb != NULL) -                shm_ap_rbuff_close(rb); - -        pthread_rwlock_unlock(&_ipcp->state_lock); - -        LOG_DBGF("Flow with port_id %d deallocated.", port_id); - -        return 0; -} - -static struct ipcp * ipcp_local_create() -{ -        struct ipcp * i; -        struct ipcp_ops *  ops; +        int out_fd = -1; -        i = ipcp_instance_create(); -        if (i == NULL) -                return NULL; +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(&local_data.lock); -        i->data = ipcp_data_create(); -        if (i->data == NULL) { -                free(i); -                return NULL; -        } +        out_fd = local_data.in_out[fd]; -        if (ipcp_data_init(i->data, THIS_TYPE) == NULL) { -                free(i->data); -                free(i); -                return NULL; +        if (out_fd != -1) { +                local_data.in_out[out_fd] = -1; +                flow_dealloc(out_fd);          } -        ops = malloc(sizeof(*ops)); -        if (ops == NULL) { -                free(i->data); -                free(i); -                return NULL; -        } +        local_data.in_out[fd] = -1; -        ops->ipcp_bootstrap       = ipcp_local_bootstrap; -        ops->ipcp_enroll          = NULL;                       /* shim */ -        ops->ipcp_name_reg        = ipcp_local_name_reg; -        ops->ipcp_name_unreg      = ipcp_local_name_unreg; -        ops->ipcp_flow_alloc      = ipcp_local_flow_alloc; -        ops->ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp; -        ops->ipcp_flow_dealloc    = ipcp_local_flow_dealloc; +        pthread_rwlock_unlock(&local_data.lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        i->ops = ops; +        LOG_INFO("Flow with fd %d deallocated.", fd); -        i->state = IPCP_INIT; - -        return i; +        return 0;  } -#ifndef MAKE_CHECK +static struct ipcp_ops local_ops = { +        .ipcp_bootstrap       = ipcp_local_bootstrap, +        .ipcp_enroll          = NULL,                       /* shim */ +        .ipcp_name_reg        = ipcp_local_name_reg, +        .ipcp_name_unreg      = ipcp_local_name_unreg, +        .ipcp_flow_alloc      = ipcp_local_flow_alloc, +        .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp, +        .ipcp_flow_dealloc    = ipcp_local_flow_dealloc +};  int main(int argc, char * argv[])  { @@ -571,7 +282,9 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        if (shim_ap_init() < 0) { +        local_data_init(); + +        if (ap_init(NULL) < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } @@ -591,17 +304,13 @@ int main(int argc, char * argv[])          sigaction(SIGHUP,  &sig_act, NULL);          sigaction(SIGPIPE, &sig_act, NULL); -        _ipcp = ipcp_local_create(); -        if (_ipcp == NULL) { -                LOG_ERR("Failed to create IPCP."); +        pthread_sigmask(SIG_BLOCK, &sigset, NULL); + +        if (ipcp_init(THIS_TYPE, &local_ops) < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } -        pthread_sigmask(SIG_BLOCK, &sigset, NULL); - -        pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); -          pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);          if (ipcp_create_r(getpid())) { @@ -610,21 +319,16 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        pthread_join(_ap_instance->mainloop, NULL); - -        pthread_cancel(_ap_instance->sduloop); -        pthread_join(_ap_instance->sduloop, NULL); +        ipcp_fini(); -        shim_ap_fini(); +        pthread_cancel(local_data.sduloop); +        pthread_join(local_data.sduloop, NULL); -        ipcp_data_destroy(_ipcp->data); +        ap_fini(); -        free(_ipcp->ops); -        free(_ipcp); +        local_data_fini();          close_logfile();          exit(EXIT_SUCCESS);  } - -#endif /* MAKE_CHECK */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 79b1bb4b..b6ec1984 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -26,7 +26,7 @@  #include <ouroboros/logs.h>  #include <ouroboros/dev.h>  #include <ouroboros/list.h> -#include <ouroboros/ipcp.h> +#include <ouroboros/ipcp-dev.h>  #include <stdlib.h>  #include <stdbool.h> @@ -41,10 +41,8 @@  #include "flow_alloc.pb-c.h"  typedef FlowAllocMsg flow_alloc_msg_t; -extern struct ipcp * _ipcp; -  struct n_flow { -        struct flow flow; +        int fd;          struct frct_i * frct_i;          enum qos_cube qos; @@ -57,7 +55,7 @@ struct n_1_flow {          struct list_head next;  }; -struct fmgr { +struct {          pthread_t listen_thread;          struct list_head n_1_flows; @@ -66,10 +64,9 @@ struct fmgr {          struct list_head n_flows;          /* FIXME: Make this a read/write lock */          pthread_mutex_t n_flows_lock; -} * fmgr = NULL; +} fmgr; -static int add_n_1_fd(int fd, -                      char * ae_name) +static int add_n_1_fd(int fd, char * ae_name)  {          struct n_1_flow * tmp; @@ -85,9 +82,9 @@ static int add_n_1_fd(int fd,          INIT_LIST_HEAD(&tmp->next); -        pthread_mutex_lock(&fmgr->n_1_flows_lock); -        list_add(&tmp->next, &fmgr->n_1_flows); -        pthread_mutex_unlock(&fmgr->n_1_flows_lock); +        pthread_mutex_lock(&fmgr.n_1_flows_lock); +        list_add(&tmp->next, &fmgr.n_1_flows); +        pthread_mutex_unlock(&fmgr.n_1_flows_lock);          return 0;  } @@ -98,16 +95,16 @@ static void * fmgr_listen(void * o)          char * ae_name;          while (true) { -                ipcp_wait_state(_ipcp, IPCP_ENROLLED, NULL); +                ipcp_wait_state(IPCP_ENROLLED, NULL); -                pthread_rwlock_rdlock(&_ipcp->state_lock); +                pthread_rwlock_rdlock(&ipcpi.state_lock); -                if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                if (ipcp_get_state() == IPCP_SHUTDOWN) { +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          return 0;                  } -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  fd = flow_accept(&ae_name);                  if (fd < 0) { @@ -161,17 +158,13 @@ static void * fmgr_listen(void * o)  int fmgr_init()  { -        fmgr = malloc(sizeof(*fmgr)); -        if (fmgr == NULL) -                return -1; +        INIT_LIST_HEAD(&fmgr.n_1_flows); +        INIT_LIST_HEAD(&fmgr.n_flows); -        INIT_LIST_HEAD(&fmgr->n_1_flows); -        INIT_LIST_HEAD(&fmgr->n_flows); +        pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); +        pthread_mutex_init(&fmgr.n_flows_lock, NULL); -        pthread_mutex_init(&fmgr->n_1_flows_lock, NULL); -        pthread_mutex_init(&fmgr->n_flows_lock, NULL); - -        pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL); +        pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL);          return 0;  } @@ -180,23 +173,20 @@ int fmgr_fini()  {          struct list_head * pos = NULL; -        pthread_cancel(fmgr->listen_thread); +        pthread_cancel(fmgr.listen_thread); -        pthread_join(fmgr->listen_thread, NULL); +        pthread_join(fmgr.listen_thread, NULL); -        list_for_each(pos, &fmgr->n_1_flows) { -                struct n_1_flow * e = -                        list_entry(pos, struct n_1_flow, next); +        list_for_each(pos, &fmgr.n_1_flows) { +                struct n_1_flow * e = list_entry(pos, struct n_1_flow, next);                  if (e->ae_name != NULL)                          free(e->ae_name);                  if (ribmgr_remove_flow(e->fd))                      LOG_ERR("Failed to remove management flow.");          } -        pthread_mutex_destroy(&fmgr->n_1_flows_lock); -        pthread_mutex_destroy(&fmgr->n_flows_lock); - -        free(fmgr); +        pthread_mutex_destroy(&fmgr.n_1_flows_lock); +        pthread_mutex_destroy(&fmgr.n_flows_lock);          return 0;  } @@ -243,8 +233,7 @@ int fmgr_mgmt_flow(char * dst_name)          return 0;  } -int fmgr_dt_flow(char * dst_name, -                 enum qos_cube qos) +int fmgr_dt_flow(char * dst_name, enum qos_cube qos)  {          int fd;          int result; @@ -288,14 +277,13 @@ int fmgr_dt_flow(char * dst_name,  }  /* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_port_id(int port_id) +static struct n_flow * get_n_flow_by_fd(int fd)  {          struct list_head * pos = NULL; -        list_for_each(pos, &fmgr->n_flows) { -                struct n_flow * e = -                        list_entry(pos, struct n_flow, next); -                if (e->flow.port_id == port_id) +        list_for_each(pos, &fmgr.n_flows) { +                struct n_flow * e = list_entry(pos, struct n_flow, next); +                if (e->fd == fd)                          return e;          } @@ -307,9 +295,8 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)  {          struct list_head * pos = NULL; -        list_for_each(pos, &fmgr->n_flows) { -                struct n_flow * e = -                        list_entry(pos, struct n_flow, next); +        list_for_each(pos, &fmgr.n_flows) { +                struct n_flow * e = list_entry(pos, struct n_flow, next);                  if (e->frct_i == frct_i)                          return e;          } @@ -317,8 +304,7 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)          return NULL;  } -int fmgr_flow_alloc(pid_t         n_api, -                    int           port_id, +int fmgr_flow_alloc(int           fd,                      char *        dst_ap_name,                      char *        src_ae_name,                      enum qos_cube qos) @@ -355,49 +341,40 @@ int fmgr_flow_alloc(pid_t         n_api,          flow_alloc_msg__pack(&msg, buf.data); -        pthread_mutex_lock(&fmgr->n_flows_lock); +        pthread_mutex_lock(&fmgr.n_flows_lock);          frct_i = frct_i_create(address, &buf, qos);          if (frct_i == NULL) {                  free(buf.data);                  free(flow); -                pthread_mutex_unlock(&fmgr->n_flows_lock); +                pthread_mutex_unlock(&fmgr.n_flows_lock);                  return -1;          }          free(buf.data); -        flow->flow.rb = shm_ap_rbuff_open_s(n_api); -        if (flow->flow.rb == NULL) { -                pthread_mutex_unlock(&fmgr->n_flows_lock); -                free(flow); -                return -1; -        } - -        flow->flow.api = n_api; -        flow->flow.port_id = port_id; -        flow->flow.state = FLOW_PENDING; +        flow->fd     = fd;          flow->frct_i = frct_i; -        flow->qos = qos; +        flow->qos    = qos;          INIT_LIST_HEAD(&flow->next); -        list_add(&flow->next, &fmgr->n_flows); +        list_add(&flow->next, &fmgr.n_flows); -        pthread_mutex_unlock(&fmgr->n_flows_lock); +        pthread_mutex_unlock(&fmgr.n_flows_lock);          return 0;  }  /* Call under n_flows lock */ -static int n_flow_dealloc(int port_id) +static int n_flow_dealloc(int fd)  {          struct n_flow * flow;          flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;          buffer_t buf;          int ret; -        flow = get_n_flow_by_port_id(port_id); +        flow = get_n_flow_by_fd(fd);          if (flow == NULL)                  return -1; @@ -414,8 +391,6 @@ static int n_flow_dealloc(int port_id)          flow_alloc_msg__pack(&msg, buf.data);          ret = frct_i_destroy(flow->frct_i, &buf); -        if (flow->flow.rb != NULL) -                shm_ap_rbuff_close(flow->flow.rb);          list_del(&flow->next);          free(flow); @@ -424,25 +399,17 @@ static int n_flow_dealloc(int port_id)          return ret;  } -int fmgr_flow_alloc_resp(pid_t n_api, -                         int   port_id, -                         int   response) +int fmgr_flow_alloc_resp(int fd, int response)  {          struct n_flow * flow;          flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;          buffer_t buf; -        pthread_mutex_lock(&fmgr->n_flows_lock); +        pthread_mutex_lock(&fmgr.n_flows_lock); -        flow = get_n_flow_by_port_id(port_id); +        flow = get_n_flow_by_fd(fd);          if (flow == NULL) { -                pthread_mutex_unlock(&fmgr->n_flows_lock); -                return -1; -        } - -        if (flow->flow.state != FLOW_PENDING) { -                pthread_mutex_unlock(&fmgr->n_flows_lock); -                LOG_ERR("Flow is not pending."); +                pthread_mutex_unlock(&fmgr.n_flows_lock);                  return -1;          } @@ -452,13 +419,13 @@ int fmgr_flow_alloc_resp(pid_t n_api,          buf.len = flow_alloc_msg__get_packed_size(&msg);          if (buf.len == 0) { -                pthread_mutex_unlock(&fmgr->n_flows_lock); +                pthread_mutex_unlock(&fmgr.n_flows_lock);                  return -1;          }          buf.data = malloc(buf.len);          if (buf.data == NULL) { -                pthread_mutex_unlock(&fmgr->n_flows_lock); +                pthread_mutex_unlock(&fmgr.n_flows_lock);                  return -1;          } @@ -469,106 +436,85 @@ int fmgr_flow_alloc_resp(pid_t n_api,                  free(buf.data);                  list_del(&flow->next);                  free(flow); -        } else { -                if (frct_i_accept(flow->frct_i, &buf)) { -                        pthread_mutex_unlock(&fmgr->n_flows_lock); -                        return -1; -                } - -                flow->flow.state = FLOW_ALLOCATED; -                flow->flow.api = n_api; - -                flow->flow.rb = shm_ap_rbuff_open_s(n_api); -                if (flow->flow.rb == NULL) { -                        n_flow_dealloc(port_id); -                        pthread_mutex_unlock(&fmgr->n_flows_lock); -                        return -1; -                } +        } else if (frct_i_accept(flow->frct_i, &buf)) { +                pthread_mutex_unlock(&fmgr.n_flows_lock); +                return -1;          } -        pthread_mutex_unlock(&fmgr->n_flows_lock); +        pthread_mutex_unlock(&fmgr.n_flows_lock);          return 0;  } -int fmgr_flow_dealloc(int port_id) +int fmgr_flow_dealloc(int fd)  {          int ret; -        pthread_mutex_lock(&fmgr->n_flows_lock); -        ret = n_flow_dealloc(port_id); -        pthread_mutex_unlock(&fmgr->n_flows_lock); +        pthread_mutex_lock(&fmgr.n_flows_lock); +        ret = n_flow_dealloc(fd); +        pthread_mutex_unlock(&fmgr.n_flows_lock);          return ret;  } -int fmgr_flow_alloc_msg(struct frct_i * frct_i, -                        buffer_t *      buf) +int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)  {          struct n_flow * flow;          int ret = 0; -        int port_id; +        int fd;          flow_alloc_msg_t * msg; -        pthread_mutex_lock(&fmgr->n_flows_lock); +        pthread_mutex_lock(&fmgr.n_flows_lock); -        /* Depending on what is in the message call the function in ipcp.h */ +        /* Depending on the message call the function in ipcp-dev.h */          msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);          if (msg == NULL) { -                pthread_mutex_unlock(&fmgr->n_flows_lock); +                pthread_mutex_unlock(&fmgr.n_flows_lock);                  LOG_ERR("Failed to unpack flow alloc message");                  return -1;          }          switch (msg->code) {          case FLOW_ALLOC_CODE__FLOW_REQ: -                  flow = malloc(sizeof(*flow));                  if (flow == NULL) { -                        pthread_mutex_unlock(&fmgr->n_flows_lock); +                        pthread_mutex_unlock(&fmgr.n_flows_lock);                          flow_alloc_msg__free_unpacked(msg, NULL);                          return -1;                  } -                flow->flow.state = FLOW_PENDING;                  flow->frct_i = frct_i;                  flow->qos = msg->qos_cube; -                flow->flow.rb = NULL; -                flow->flow.api = 0; - -                port_id = ipcp_flow_req_arr(getpid(), -                                            msg->dst_name, -                                            msg->src_ae_name); -                if (port_id < 0) { -                        pthread_mutex_unlock(&fmgr->n_flows_lock); + +                fd = ipcp_flow_req_arr(getpid(), +                                       msg->dst_name, +                                       msg->src_ae_name); +                if (fd < 0) { +                        pthread_mutex_unlock(&fmgr.n_flows_lock);                          free(flow);                          flow_alloc_msg__free_unpacked(msg, NULL); -                        LOG_ERR("Failed to get port-id from IRMd."); +                        LOG_ERR("Failed to get fd for flow.");                          return -1;                  } -                flow->flow.port_id = port_id; +                flow->fd = fd;                  INIT_LIST_HEAD(&flow->next); -                list_add(&flow->next, &fmgr->n_flows); +                list_add(&flow->next, &fmgr.n_flows);                  break;          case FLOW_ALLOC_CODE__FLOW_REPLY:                  flow = get_n_flow_by_frct_i(frct_i);                  if (flow == NULL) { -                        pthread_mutex_unlock(&fmgr->n_flows_lock); +                        pthread_mutex_unlock(&fmgr.n_flows_lock);                          flow_alloc_msg__free_unpacked(msg, NULL);                          LOG_ERR("No such flow in flow manager.");                          return -1;                  } -                ret = ipcp_flow_alloc_reply(getpid(), -                                            flow->flow.port_id, -                                            msg->response); - +                ret = ipcp_flow_alloc_reply(flow->fd, msg->response);                  if (msg->response < 0) { -                        shm_ap_rbuff_close(flow->flow.rb);                          list_del(&flow->next);                          free(flow);                  } @@ -577,13 +523,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i,          case FLOW_ALLOC_CODE__FLOW_DEALLOC:                  flow = get_n_flow_by_frct_i(frct_i);                  if (flow == NULL) { -                        pthread_mutex_unlock(&fmgr->n_flows_lock); +                        pthread_mutex_unlock(&fmgr.n_flows_lock);                          flow_alloc_msg__free_unpacked(msg, NULL);                          LOG_ERR("No such flow in flow manager.");                          return -1;                  } -                ret = irm_flow_dealloc(flow->flow.port_id); +                ret = flow_dealloc(flow->fd);                  break;          default:                  LOG_ERR("Got an unknown flow allocation message."); @@ -591,7 +537,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i,                  break;          } -        pthread_mutex_unlock(&fmgr->n_flows_lock); +        pthread_mutex_unlock(&fmgr.n_flows_lock);          flow_alloc_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 342410ca..7e3ef5f4 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -35,25 +35,25 @@  #define DT_AE "Data transfer"  int fmgr_init(); +  int fmgr_fini();  /* N-flow ops */  int fmgr_mgmt_flow(char * dst_name); +  int fmgr_dt_flow(char * dst_name,                   enum qos_cube qos);  /* N+1-flow ops, local */ -int fmgr_flow_alloc(pid_t         n_api, -                    int           port_id, +int fmgr_flow_alloc(int           fd,                      char *        dst_ap_name,                      char *        src_ae_name,                      enum qos_cube qos); -int fmgr_flow_alloc_resp(pid_t n_api, -                         int   port_id, -                         int   response); +int fmgr_flow_alloc_resp(int fd, +                         int response); -int fmgr_flow_dealloc(int port_id); +int fmgr_flow_dealloc(int fd);  /* N+1-flow ops, remote */  int fmgr_flow_alloc_msg(struct frct_i * frct_i, diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 09873445..0ee87004 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -24,7 +24,7 @@  #define OUROBOROS_IPCP_FRCT_H  #include <ouroboros/shared.h> -#include <ouroboros/common.h> +#include <ouroboros/utils.h>  #include "dt_const.h" diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 082973f4..4611408d 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -24,10 +24,8 @@  #include <ouroboros/config.h>  #include <ouroboros/logs.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/dev.h> -#include <ouroboros/ipcp.h> +#include <ouroboros/ipcp-dev.h>  #include <ouroboros/time_utils.h>  #include <stdbool.h> @@ -47,26 +45,8 @@  /* global for trapping signal */  int irmd_api; -struct ipcp * _ipcp; - -#define normal_data(type) ((struct normal_ipcp_data *) type->data) - -struct normal_ipcp_data { -        /* Keep ipcp_data first for polymorphism. */ -        struct ipcp_data      ipcp_data; - -        struct shm_rdrbuff *  rdrb; -        struct shm_ap_rbuff * rb; - -        pthread_t             mainloop; -}; -  void ipcp_sig_handler(int sig, siginfo_t * info, void * c)  { -        sigset_t  sigset; -        sigemptyset(&sigset); -        sigaddset(&sigset, SIGINT); -          switch(sig) {          case SIGINT:          case SIGTERM: @@ -75,11 +55,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)                          LOG_DBG("IPCP %d terminating by order of %d. Bye.",                                  getpid(), info->si_pid); -                        pthread_rwlock_wrlock(&_ipcp->state_lock); +                        pthread_rwlock_wrlock(&ipcpi.state_lock); -                        ipcp_set_state(_ipcp, IPCP_SHUTDOWN); +                        ipcp_set_state(IPCP_SHUTDOWN); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                  }          default:                  return; @@ -88,15 +68,15 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)  static int normal_ipcp_name_reg(char * name)  { -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_data_add_reg_entry(_ipcp->data, name)) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_data_add_reg_entry(ipcpi.data, name)) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to add %s to local registry.", name);                  return -1;          } -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          LOG_DBG("Registered %s.", name); @@ -105,11 +85,11 @@ static int normal_ipcp_name_reg(char * name)  static int normal_ipcp_name_unreg(char * name)  { -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        ipcp_data_del_reg_entry(_ipcp->data, name); +        ipcp_data_del_reg_entry(ipcpi.data, name); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } @@ -119,59 +99,59 @@ static int normal_ipcp_enroll(char * dif_name)          struct timespec timeout = {(ENROLL_TIMEOUT / 1000),                                     (ENROLL_TIMEOUT % 1000) * MILLION}; -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_INIT) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_get_state() != IPCP_INIT) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Won't enroll an IPCP that is not in INIT.");                  return -1; /* -ENOTINIT */          } -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          if (fmgr_mgmt_flow(dif_name)) {                  LOG_ERR("Failed to establish management flow.");                  return -1;          } -        if (ipcp_wait_state(_ipcp, IPCP_ENROLLED, &timeout) == -ETIMEDOUT) { +        if (ipcp_wait_state(IPCP_ENROLLED, &timeout) == -ETIMEDOUT) {                  LOG_ERR("Enrollment timed out.");                  return -1;          } -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_get_state() != IPCP_ENROLLED) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  }  static int normal_ipcp_bootstrap(struct dif_config * conf)  { -        pthread_rwlock_wrlock(&_ipcp->state_lock); +        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_INIT) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_get_state() != IPCP_INIT) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Won't bootstrap an IPCP that is not in INIT.");                  return -1; /* -ENOTINIT */          }          if (ribmgr_bootstrap(conf)) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to bootstrap RIB manager.");                  return -1;          } -        ipcp_set_state(_ipcp, IPCP_ENROLLED); +        ipcp_set_state(IPCP_ENROLLED); -        _ipcp->data->dif_name = conf->dif_name; +        ipcpi.data->dif_name = conf->dif_name; -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          LOG_DBG("Bootstrapped in DIF %s.", conf->dif_name); @@ -188,67 +168,6 @@ static struct ipcp_ops normal_ops = {          .ipcp_flow_dealloc    = fmgr_flow_dealloc  }; -struct normal_ipcp_data * normal_ipcp_data_create() -{ -        struct normal_ipcp_data * normal_data; -        enum ipcp_type            ipcp_type; - -        normal_data = malloc(sizeof(*normal_data)); -        if (normal_data == NULL) { -                LOG_ERR("Failed to allocate."); -                return NULL; -        } - -        ipcp_type = THIS_TYPE; -        if (ipcp_data_init((struct ipcp_data *) normal_data, -                           ipcp_type) == NULL) { -                free(normal_data); -                return NULL; -        } - -        normal_data->rdrb = shm_rdrbuff_open(); -        if (normal_data->rdrb == NULL) { -                free(normal_data); -                return NULL; -        } - -        normal_data->rb = shm_ap_rbuff_create_n(); -        if (normal_data->rb == NULL) { -                shm_rdrbuff_close(normal_data->rdrb); -                free(normal_data); -                return NULL; -        } - -        return normal_data; -} - - -void normal_ipcp_data_destroy() -{ -        int idx = 0; - -        if (_ipcp == NULL) -                return; - -        pthread_rwlock_rdlock(&_ipcp->state_lock); - -        if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN) -                LOG_WARN("Cleaning up while not in shutdown."); - -        /* remove all remaining sdus */ -        while ((idx = shm_ap_rbuff_peek_idx(normal_data(_ipcp)->rb)) >= 0) -                shm_rdrbuff_remove(normal_data(_ipcp)->rdrb, idx); - -        if (normal_data(_ipcp)->rdrb != NULL) -                shm_rdrbuff_close(normal_data(_ipcp)->rdrb); -        if (normal_data(_ipcp)->rb != NULL) -                shm_ap_rbuff_close(normal_data(_ipcp)->rb); - -        ipcp_data_destroy(_ipcp->data); - -        pthread_rwlock_unlock(&_ipcp->state_lock); -} -  int main(int argc, char * argv[])  {          struct sigaction sig_act; @@ -285,56 +204,38 @@ int main(int argc, char * argv[])          sigaction(SIGHUP,  &sig_act, NULL);          sigaction(SIGPIPE, &sig_act, NULL); -        _ipcp = ipcp_instance_create(); -        if (_ipcp == NULL) { -                LOG_ERR("Failed to create instance."); -                close_logfile(); -                exit(EXIT_FAILURE); -        } +        pthread_sigmask(SIG_BLOCK, &sigset, NULL); -        _ipcp->data = (struct ipcp_data *) normal_ipcp_data_create(); -        if (_ipcp->data == NULL) { -                LOG_ERR("Failed to create instance data."); -                free(_ipcp); +        if (ipcp_init(THIS_TYPE, &normal_ops) < 0) { +                LOG_ERR("Failed to create instance.");                  close_logfile();                  exit(EXIT_FAILURE);          } -        _ipcp->ops = &normal_ops; -        _ipcp->state = IPCP_INIT; +        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);          if (fmgr_init()) { -                normal_ipcp_data_destroy(); -                free(_ipcp); +                ipcp_fini();                  close_logfile();                  exit(EXIT_FAILURE);          }          if (ribmgr_init()) { -                normal_ipcp_data_destroy();                  fmgr_fini(); -                free(_ipcp); +                ipcp_fini();                  close_logfile();                  exit(EXIT_FAILURE);          } -        pthread_sigmask(SIG_BLOCK, &sigset, NULL); - -        pthread_create(&normal_data(_ipcp)->mainloop, NULL, -                       ipcp_main_loop, _ipcp); - -        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); -          if (ipcp_create_r(getpid())) {                  LOG_ERR("Failed to notify IRMd we are initialized."); -                normal_ipcp_data_destroy();                  fmgr_fini(); -                free(_ipcp); +                ipcp_fini();                  close_logfile();                  exit(EXIT_FAILURE);          } -        pthread_join(normal_data(_ipcp)->mainloop, NULL); +        ipcp_fini();          if (fmgr_fini())                  LOG_ERR("Failed to finalize flow manager."); @@ -345,10 +246,9 @@ int main(int argc, char * argv[])          if (frct_fini())                  LOG_ERR("Failed to finalize FRCT."); -        normal_ipcp_data_destroy(); -        free(_ipcp);          close_logfile();          ap_fini(); +          exit(EXIT_SUCCESS);  } diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 9733abc9..99d156f5 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -27,6 +27,7 @@  #include <ouroboros/cdap.h>  #include <ouroboros/list.h>  #include <ouroboros/time_utils.h> +#include <ouroboros/ipcp-dev.h>  #include <stdlib.h>  #include <pthread.h> @@ -45,15 +46,13 @@ typedef StaticInfoMsg static_info_msg_t;  #define ENROLLMENT  "enrollment"  #define STATIC_INFO "static DIF information" -extern struct ipcp * _ipcp; -  struct mgmt_flow {          struct cdap *    instance;          int              fd;          struct list_head next;  }; -struct rib { +struct {          struct dt_const  dtc;          uint32_t         address; @@ -63,7 +62,7 @@ struct rib {          struct list_head cdap_reqs;          pthread_mutex_t  cdap_reqs_lock; -} * rib = NULL; +} rib;  /* Call while holding cdap_reqs_lock */  /* FIXME: better not to call blocking functions under any lock */ @@ -84,13 +83,13 @@ int cdap_result_wait(struct cdap * instance,                  return -1;          } -        list_add(&req->next, &rib->cdap_reqs); +        list_add(&req->next, &rib.cdap_reqs); -        pthread_mutex_unlock(&rib->cdap_reqs_lock); +        pthread_mutex_unlock(&rib.cdap_reqs_lock);          ret = cdap_request_wait(req); -        pthread_mutex_lock(&rib->cdap_reqs_lock); +        pthread_mutex_lock(&rib.cdap_reqs_lock);          if (ret == -1)  /* should only be on ipcp shutdown */                  LOG_DBG("Waiting CDAP request destroyed."); @@ -112,22 +111,16 @@ int cdap_result_wait(struct cdap * instance,  int ribmgr_init()  { -        rib = malloc(sizeof(*rib)); -        if (rib == NULL) -                return -1; +        INIT_LIST_HEAD(&rib.flows); +        INIT_LIST_HEAD(&rib.cdap_reqs); -        INIT_LIST_HEAD(&rib->flows); -        INIT_LIST_HEAD(&rib->cdap_reqs); - -        if (pthread_rwlock_init(&rib->flows_lock, NULL)) { +        if (pthread_rwlock_init(&rib.flows_lock, NULL)) {                  LOG_ERR("Failed to initialize rwlock."); -                free(rib);                  return -1;          } -        if (pthread_mutex_init(&rib->cdap_reqs_lock, NULL)) { +        if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex."); -                free(rib);                  return -1;          } @@ -139,19 +132,18 @@ int ribmgr_fini()          struct list_head * pos = NULL;          struct list_head * n = NULL; -        pthread_mutex_lock(&rib->cdap_reqs_lock); -        list_for_each_safe(pos, n, &rib->cdap_reqs) { +        pthread_mutex_lock(&rib.cdap_reqs_lock); +        list_for_each_safe(pos, n, &rib.cdap_reqs) {                  struct cdap_request * req =                          list_entry(pos, struct cdap_request, next); -                  free(req->name);                  list_del(&req->next);                  free(req);          } -        pthread_mutex_unlock(&rib->cdap_reqs_lock); +        pthread_mutex_unlock(&rib.cdap_reqs_lock); -        pthread_rwlock_wrlock(&rib->flows_lock); -        list_for_each_safe(pos, n, &rib->flows) { +        pthread_rwlock_wrlock(&rib.flows_lock); +        list_for_each_safe(pos, n, &rib.flows) {                  struct mgmt_flow * flow =                          list_entry(pos, struct mgmt_flow, next);                  if (cdap_destroy(flow->instance)) @@ -159,9 +151,10 @@ int ribmgr_fini()                  list_del(&flow->next);                  free(flow);          } -        pthread_rwlock_unlock(&rib->flows_lock); +        pthread_rwlock_unlock(&rib.flows_lock); -        free(rib); +        pthread_mutex_destroy(&rib.cdap_reqs_lock); +        pthread_rwlock_destroy(&rib.flows_lock);          return 0;  } @@ -174,9 +167,9 @@ int ribmgr_cdap_reply(struct cdap * instance,  {          struct list_head * pos, * n = NULL; -        pthread_mutex_lock(&rib->cdap_reqs_lock); +        pthread_mutex_lock(&rib.cdap_reqs_lock); -        list_for_each_safe(pos, n, &rib->cdap_reqs) { +        list_for_each_safe(pos, n, &rib.cdap_reqs) {                  struct cdap_request * req =                          list_entry(pos, struct cdap_request, next);                  if (req->instance == instance && @@ -191,15 +184,15 @@ int ribmgr_cdap_reply(struct cdap * instance,                                          "executed succesfully",                                          req->code, req->name); -                        pthread_mutex_unlock(&rib->cdap_reqs_lock); +                        pthread_mutex_unlock(&rib.cdap_reqs_lock);                          /* FIXME: In case of a read, update values here */                          cdap_request_respond(req, result); -                        pthread_mutex_lock(&rib->cdap_reqs_lock); +                        pthread_mutex_lock(&rib.cdap_reqs_lock);                  }          } -        pthread_mutex_unlock(&rib->cdap_reqs_lock); +        pthread_mutex_unlock(&rib.cdap_reqs_lock);          return 0;  } @@ -223,34 +216,34 @@ int ribmgr_cdap_write(struct cdap * instance,          static_info_msg_t * msg;          int ret = 0; -        pthread_rwlock_wrlock(&_ipcp->state_lock); -        if (ipcp_get_state(_ipcp) == IPCP_PENDING_ENROLL && +        pthread_rwlock_wrlock(&ipcpi.state_lock); +        if (ipcp_get_state() == IPCP_PENDING_ENROLL &&              strcmp(name, STATIC_INFO) == 0) {                  LOG_DBG("Received static DIF information.");                  msg = static_info_msg__unpack(NULL, len, data);                  if (msg == NULL) { -                        ipcp_set_state(_ipcp, IPCP_INIT); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        ipcp_set_state(IPCP_INIT); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          cdap_send_reply(instance, invoke_id, -1, NULL, 0);                          LOG_ERR("Failed to unpack static info message.");                          return -1;                  } -                rib->dtc.addr_size = msg->addr_size; -                rib->dtc.cep_id_size = msg->cep_id_size; -                rib->dtc.pdu_length_size = msg->pdu_length_size; -                rib->dtc.seqno_size = msg->seqno_size; -                rib->dtc.has_ttl = msg->has_ttl; -                rib->dtc.has_chk = msg->has_chk; -                rib->dtc.min_pdu_size = msg->min_pdu_size; -                rib->dtc.max_pdu_size = msg->max_pdu_size; +                rib.dtc.addr_size = msg->addr_size; +                rib.dtc.cep_id_size = msg->cep_id_size; +                rib.dtc.pdu_length_size = msg->pdu_length_size; +                rib.dtc.seqno_size = msg->seqno_size; +                rib.dtc.has_ttl = msg->has_ttl; +                rib.dtc.has_chk = msg->has_chk; +                rib.dtc.min_pdu_size = msg->min_pdu_size; +                rib.dtc.max_pdu_size = msg->max_pdu_size; -                rib->address = msg->address; +                rib.address = msg->address; -                if (frct_init(&rib->dtc, rib->address)) { -                        ipcp_set_state(_ipcp, IPCP_INIT); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                if (frct_init(&rib.dtc, rib.address)) { +                        ipcp_set_state(IPCP_INIT); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          cdap_send_reply(instance, invoke_id, -1, NULL, 0);                          static_info_msg__free_unpacked(msg, NULL);                          LOG_ERR("Failed to init FRCT"); @@ -262,7 +255,7 @@ int ribmgr_cdap_write(struct cdap * instance,                  ret = -1;          } -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {                  LOG_ERR("Failed to send reply to write request."); @@ -303,39 +296,39 @@ int ribmgr_cdap_start(struct cdap * instance,          size_t len = 0;          int iid = 0; -        pthread_rwlock_wrlock(&_ipcp->state_lock); -        if (ipcp_get_state(_ipcp) == IPCP_ENROLLED && +        pthread_rwlock_wrlock(&ipcpi.state_lock); +        if (ipcp_get_state() == IPCP_ENROLLED &&              strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("New enrollment request.");                  if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send reply to enrollment request.");                          return -1;                  } -                stat_info.addr_size = rib->dtc.addr_size; -                stat_info.cep_id_size = rib->dtc.cep_id_size; -                stat_info.pdu_length_size = rib->dtc.pdu_length_size; -                stat_info.seqno_size = rib->dtc.seqno_size; -                stat_info.has_ttl = rib->dtc.has_ttl; -                stat_info.has_chk = rib->dtc.has_chk; -                stat_info.min_pdu_size  = rib->dtc.min_pdu_size; -                stat_info.max_pdu_size = rib->dtc.max_pdu_size; +                stat_info.addr_size = rib.dtc.addr_size; +                stat_info.cep_id_size = rib.dtc.cep_id_size; +                stat_info.pdu_length_size = rib.dtc.pdu_length_size; +                stat_info.seqno_size = rib.dtc.seqno_size; +                stat_info.has_ttl = rib.dtc.has_ttl; +                stat_info.has_chk = rib.dtc.has_chk; +                stat_info.min_pdu_size  = rib.dtc.min_pdu_size; +                stat_info.max_pdu_size = rib.dtc.max_pdu_size;                  /* FIXME: Hand out an address. */                  stat_info.address = 0;                  len = static_info_msg__get_packed_size(&stat_info);                  if (len == 0) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to get size of static information.");                          return -1;                  }                  data = malloc(len);                  if (data == NULL) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to allocate memory.");                          return -1;                  } @@ -344,59 +337,59 @@ int ribmgr_cdap_start(struct cdap * instance,                  LOG_DBGF("Sending static info..."); -                pthread_mutex_lock(&rib->cdap_reqs_lock); +                pthread_mutex_lock(&rib.cdap_reqs_lock);                  iid = cdap_send_write(instance, STATIC_INFO, data, len, 0);                  if (iid < 0) { -                        pthread_mutex_unlock(&rib->cdap_reqs_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          free(data);                          LOG_ERR("Failed to send static information.");                          return -1;                  }                  if (cdap_result_wait(instance, WRITE, STATIC_INFO, iid)) { -                        pthread_mutex_unlock(&rib->cdap_reqs_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          free(data);                          LOG_ERR("Remote did not receive static information.");                          return -1;                  } -                pthread_mutex_unlock(&rib->cdap_reqs_lock); +                pthread_mutex_unlock(&rib.cdap_reqs_lock);                  /* FIXME: Send neighbors here. */                  LOG_DBGF("Sending stop enrollment..."); -                pthread_mutex_lock(&rib->cdap_reqs_lock); +                pthread_mutex_lock(&rib.cdap_reqs_lock);                  iid = cdap_send_stop(instance, ENROLLMENT);                  if (iid < 0) { -                        pthread_mutex_unlock(&rib->cdap_reqs_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          free(data);                          LOG_ERR("Failed to send stop of enrollment.");                          return -1;                  }                  if (cdap_result_wait(instance, STOP, ENROLLMENT, iid)) { -                        pthread_mutex_unlock(&rib->cdap_reqs_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          free(data);                          LOG_ERR("Remote failed to complete enrollment.");                          return -1;                  } -                pthread_mutex_unlock(&rib->cdap_reqs_lock); +                pthread_mutex_unlock(&rib.cdap_reqs_lock);                  free(data);          } else {                  if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send reply to start request.");                          return -1;                  }          } -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } @@ -407,21 +400,21 @@ int ribmgr_cdap_stop(struct cdap * instance,  {          int ret = 0; -        pthread_rwlock_wrlock(&_ipcp->state_lock); -        if (ipcp_get_state(_ipcp) == IPCP_PENDING_ENROLL && +        pthread_rwlock_wrlock(&ipcpi.state_lock); +        if (ipcp_get_state() == IPCP_PENDING_ENROLL &&              strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("Stop enrollment received."); -                ipcp_set_state(_ipcp, IPCP_ENROLLED); +                ipcp_set_state(IPCP_ENROLLED);          } else                  ret = -1;          if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to send reply to stop request.");                  return -1;          } -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } @@ -457,19 +450,18 @@ int ribmgr_add_flow(int fd)          flow->instance = instance;          flow->fd = fd; -        pthread_rwlock_wrlock(&_ipcp->state_lock); -        pthread_rwlock_wrlock(&rib->flows_lock); -        if (list_empty(&rib->flows) && -            ipcp_get_state(_ipcp) == IPCP_INIT) { -                ipcp_set_state(_ipcp, IPCP_PENDING_ENROLL); +        pthread_rwlock_wrlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(&rib.flows_lock); +        if (list_empty(&rib.flows) && ipcp_get_state() == IPCP_INIT) { +                ipcp_set_state(IPCP_PENDING_ENROLL); -                pthread_mutex_lock(&rib->cdap_reqs_lock); +                pthread_mutex_lock(&rib.cdap_reqs_lock);                  iid = cdap_send_start(instance,                                        ENROLLMENT);                  if (iid < 0) { -                        pthread_mutex_unlock(&rib->cdap_reqs_lock); -                        pthread_rwlock_unlock(&rib->flows_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                        pthread_rwlock_unlock(&rib.flows_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to start enrollment.");                          cdap_destroy(instance);                          free(flow); @@ -477,20 +469,20 @@ int ribmgr_add_flow(int fd)                  }                  if (cdap_result_wait(instance, START, ENROLLMENT, iid)) { -                        pthread_mutex_unlock(&rib->cdap_reqs_lock); -                        pthread_rwlock_unlock(&rib->flows_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                        pthread_rwlock_unlock(&rib.flows_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to start enrollment.");                          cdap_destroy(instance);                          free(flow);                          return -1;                  } -                pthread_mutex_unlock(&rib->cdap_reqs_lock); +                pthread_mutex_unlock(&rib.cdap_reqs_lock);          } -        list_add(&flow->next, &rib->flows); -        pthread_rwlock_unlock(&rib->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        list_add(&flow->next, &rib.flows); +        pthread_rwlock_unlock(&rib.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } @@ -499,20 +491,20 @@ int ribmgr_remove_flow(int fd)  {          struct list_head * pos, * n = NULL; -        pthread_rwlock_wrlock(&rib->flows_lock); -        list_for_each_safe(pos, n, &rib->flows) { +        pthread_rwlock_wrlock(&rib.flows_lock); +        list_for_each_safe(pos, n, &rib.flows) {                  struct mgmt_flow * flow =                          list_entry(pos, struct mgmt_flow, next);                  if (flow->fd == fd) {                          if (cdap_destroy(flow->instance))                                  LOG_ERR("Failed to destroy CDAP instance.");                          list_del(&flow->next); -                        pthread_rwlock_unlock(&rib->flows_lock); +                        pthread_rwlock_unlock(&rib.flows_lock);                          free(flow);                          return 0;                  }          } -        pthread_rwlock_unlock(&rib->flows_lock); +        pthread_rwlock_unlock(&rib.flows_lock);          return -1;  } @@ -525,19 +517,19 @@ int ribmgr_bootstrap(struct dif_config * conf)                  return -1;          } -        rib->dtc.addr_size = conf->addr_size; -        rib->dtc.cep_id_size  = conf->cep_id_size; -        rib->dtc.pdu_length_size = conf->pdu_length_size; -        rib->dtc.seqno_size = conf->seqno_size; -        rib->dtc.has_ttl = conf->has_ttl; -        rib->dtc.has_chk = conf->has_chk; -        rib->dtc.min_pdu_size = conf->min_pdu_size; -        rib->dtc.max_pdu_size = conf->max_pdu_size; +        rib.dtc.addr_size = conf->addr_size; +        rib.dtc.cep_id_size  = conf->cep_id_size; +        rib.dtc.pdu_length_size = conf->pdu_length_size; +        rib.dtc.seqno_size = conf->seqno_size; +        rib.dtc.has_ttl = conf->has_ttl; +        rib.dtc.has_chk = conf->has_chk; +        rib.dtc.min_pdu_size = conf->min_pdu_size; +        rib.dtc.max_pdu_size = conf->max_pdu_size;          /* FIXME: Set correct address. */ -        rib->address = 0; +        rib.address = 0; -        if (frct_init(&rib->dtc, rib->address)) { +        if (frct_init(&rib.dtc, rib.address)) {                  LOG_ERR("Failed to initialize FRCT.");                  return -1;          } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index d74984cc..2cf46e51 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -24,24 +24,19 @@  #define _DEFAULT_SOURCE -#include "ipcp.h" -#include "flow.h"  #include <ouroboros/errno.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/irm_config.h> -#include <ouroboros/sockets.h>  #include <ouroboros/bitmap.h> -#include <ouroboros/flow.h>  #include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h>  #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc"  #include <ouroboros/logs.h> +#include "ipcp.h" +  #include <net/if.h>  #include <signal.h>  #include <stdlib.h> @@ -79,18 +74,12 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t;  #define LLC_HEADER_SIZE 3  #define MAX_SAPS 64  #define ETH_HEADER_SIZE (2 * MAC_SIZE + 2) -#define ETH_FRAME_SIZE (SHIM_ETH_LLC_MAX_SDU_SIZE + ETH_HEADER_SIZE +       \ -                        LLC_HEADER_SIZE + 2) +#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \ +                        + SHIM_ETH_LLC_MAX_SDU_SIZE)  /* global for trapping signal */  int irmd_api; -struct ipcp * _ipcp; - -#define shim_data(type) ((struct eth_llc_ipcp_data *) type->data) - -#define ipcp_flow(index) ((struct flow *) &(shim_data(_ipcp)->flows[index])) -  struct eth_llc_frame {          uint8_t dst_hwaddr[MAC_SIZE];          uint8_t src_hwaddr[MAC_SIZE]; @@ -98,196 +87,78 @@ struct eth_llc_frame {          uint8_t dsap;          uint8_t ssap;          uint8_t cf; -        uint8_t size[2];          uint8_t payload;  }; -struct eth_llc_flow { -        struct flow flow; -        uint8_t     sap; -        uint8_t     r_sap; -        uint8_t     r_addr[MAC_SIZE]; +struct ef { +        int8_t  sap; +        int8_t  r_sap; +        uint8_t r_addr[MAC_SIZE];  }; -struct eth_llc_ipcp_data { -        /* Keep ipcp_data first for polymorphism. */ -        struct ipcp_data      ipcp_data; - +struct {  #ifdef __FreeBSD__ -        struct sockaddr_dl    device; +        struct sockaddr_dl device;  #else -        struct sockaddr_ll    device; +        struct sockaddr_ll device;  #endif -        int                   s_fd; - -        struct bmp *          indices; -        struct bmp *          saps; +        int                s_fd; -        struct shm_rdrbuff *  rdrb; -        struct shm_ap_rbuff * rb; - -        uint8_t *             rx_ring; -        uint8_t *             tx_ring; -        int                   tx_offset; - -        struct eth_llc_flow   flows[AP_MAX_FLOWS]; -        pthread_rwlock_t      flows_lock; +        struct bmp *       saps; +#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) +        uint8_t *          rx_ring; +        uint8_t *          tx_ring; +        int                tx_offset; +#endif +        int *              ef_to_fd; +        struct ef *        fd_to_ef; +        pthread_rwlock_t   flows_lock; -        pthread_t             mainloop; -        pthread_t             sdu_writer; -        pthread_t             sdu_reader; -}; +        pthread_t          sdu_writer; +        pthread_t          sdu_reader; +} eth_llc_data; -struct eth_llc_ipcp_data * eth_llc_ipcp_data_create() +static int eth_llc_data_init()  { -        struct eth_llc_ipcp_data * eth_llc_data; -        enum ipcp_type             ipcp_type; - -        eth_llc_data = malloc(sizeof(*eth_llc_data)); -        if (eth_llc_data == NULL) { -                LOG_ERR("Failed to allocate."); -                return NULL; -        } - -        ipcp_type = THIS_TYPE; -        if (ipcp_data_init((struct ipcp_data *) eth_llc_data, -                           ipcp_type) == NULL) { -                free(eth_llc_data); -                return NULL; -        } - -        eth_llc_data->rdrb = shm_rdrbuff_open(); -        if (eth_llc_data->rdrb == NULL) { -                free(eth_llc_data); -                return NULL; -        } - -        eth_llc_data->rb = shm_ap_rbuff_create_n(); -        if (eth_llc_data->rb == NULL) { -                shm_rdrbuff_close(eth_llc_data->rdrb); -                free(eth_llc_data); -                return NULL; -        } +        int i; -        eth_llc_data->indices = bmp_create(AP_MAX_FLOWS, 0); -        if (eth_llc_data->indices == NULL) { -                shm_ap_rbuff_destroy(eth_llc_data->rb); -                shm_rdrbuff_close(eth_llc_data->rdrb); -                free(eth_llc_data); -                return NULL; -        } +        eth_llc_data.fd_to_ef = malloc(sizeof(struct ef) * IRMD_MAX_FLOWS); +        if (eth_llc_data.fd_to_ef == NULL) +                return -ENOMEM; -        eth_llc_data->saps = bmp_create(MAX_SAPS, 2); -        if (eth_llc_data->indices == NULL) { -                bmp_destroy(eth_llc_data->indices); -                shm_ap_rbuff_destroy(eth_llc_data->rb); -                shm_rdrbuff_close(eth_llc_data->rdrb); -                free(eth_llc_data); -                return NULL; +        eth_llc_data.ef_to_fd = malloc(sizeof(struct ef) * MAX_SAPS); +        if (eth_llc_data.ef_to_fd == NULL) { +                free(eth_llc_data.fd_to_ef); +                return -ENOMEM;          } -        pthread_rwlock_init(ð_llc_data->flows_lock, NULL); - -        return eth_llc_data; -} - -void eth_llc_ipcp_data_destroy() -{ -        int i = 0; - -        if (_ipcp == NULL) -                return; - -        pthread_rwlock_rdlock(&_ipcp->state_lock); - -        if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN) -                LOG_WARN("Cleaning up while not in shutdown."); - -        /* remove all remaining sdus */ -        while ((i = shm_ap_rbuff_peek_idx(shim_data(_ipcp)->rb)) >= 0) -                shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, i); - -        if (shim_data(_ipcp)->rdrb != NULL) -                shm_rdrbuff_close(shim_data(_ipcp)->rdrb); -        if (shim_data(_ipcp)->rb != NULL) -                shm_ap_rbuff_destroy(shim_data(_ipcp)->rb); -        if (shim_data(_ipcp)->indices != NULL) -                bmp_destroy(shim_data(_ipcp)->indices); -        if (shim_data(_ipcp)->saps != NULL) -                bmp_destroy(shim_data(_ipcp)->saps); - -        pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); - -        for (i = 0; i < AP_MAX_FLOWS; i ++) -                if (ipcp_flow(i)->rb != NULL) -                        shm_ap_rbuff_close(ipcp_flow(i)->rb); - -        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); - -        ipcp_data_destroy(_ipcp->data); -} - -/* only call this under flows_lock */ -static int port_id_to_index(int port_id) -{ -        int i; - -        for (i = 0; i < AP_MAX_FLOWS; ++i) { -                if (ipcp_flow(i)->port_id == port_id -                    && ipcp_flow(i)->state != FLOW_NULL) -                        return i; +        eth_llc_data.saps = bmp_create(MAX_SAPS, 2); +        if (eth_llc_data.saps == NULL) { +                free(eth_llc_data.ef_to_fd); +                free(eth_llc_data.fd_to_ef); +                return -ENOMEM;          } -        return -1; -} +        for (i = 0; i < MAX_SAPS; ++i) +                eth_llc_data.ef_to_fd[i] = -1; -/* only call this under flows_lock */ -static int addr_and_saps_to_index(const uint8_t * r_addr, -                                  uint8_t   r_sap, -                                  uint8_t   sap) -{ -        int i = 0; - -        for (i = 0; i < AP_MAX_FLOWS; i++) { -                if (ipcp_flow(i)->state == FLOW_ALLOCATED && -                    shim_data(_ipcp)->flows[i].r_sap == r_sap && -                    shim_data(_ipcp)->flows[i].sap == sap && -                    !memcmp(shim_data(_ipcp)->flows[i].r_addr, -                            r_addr, -                            MAC_SIZE)) { -                        return i; -                } +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) { +                eth_llc_data.fd_to_ef[i].sap   = -1; +                eth_llc_data.fd_to_ef[i].r_sap = -1; +                memset(ð_llc_data.fd_to_ef[i].r_addr, 0, MAC_SIZE);          } -        return -1; -} +        pthread_rwlock_init(ð_llc_data.flows_lock, NULL); -/* only call this under flows_lock */ -static int sap_to_index(uint8_t sap) -{ -        int i = 0; - -        for (i = 0; i < AP_MAX_FLOWS; i++) { -                if (shim_data(_ipcp)->flows[i].sap == sap) { -                        return i; -                } -        } - -        return -1; +        return 0;  } -/* only call this under flows_lock */ -static void destroy_ipcp_flow(int index) +void eth_llc_data_fini()  { -        ipcp_flow(index)->port_id = -1; -        if (ipcp_flow(index)->rb != NULL) -                shm_ap_rbuff_close(ipcp_flow(index)->rb); -        ipcp_flow(index)->rb = NULL; -        ipcp_flow(index)->state = FLOW_NULL; -        bmp_release(shim_data(_ipcp)->indices, index); -        bmp_release(shim_data(_ipcp)->saps, -                    shim_data(_ipcp)->flows[index].sap); +        bmp_destroy(eth_llc_data.saps); +        free(eth_llc_data.fd_to_ef); +        free(eth_llc_data.ef_to_fd); +        pthread_rwlock_destroy(ð_llc_data.flows_lock);  }  static uint8_t reverse_bits(uint8_t b) @@ -299,7 +170,7 @@ static uint8_t reverse_bits(uint8_t b)          return b;  } -static int eth_llc_ipcp_send_frame(uint8_t   dst_addr[MAC_SIZE], +static int eth_llc_ipcp_send_frame(uint8_t * dst_addr,                                     uint8_t   dsap,                                     uint8_t   ssap,                                     uint8_t * payload, @@ -307,24 +178,16 @@ static int eth_llc_ipcp_send_frame(uint8_t   dst_addr[MAC_SIZE],  {          int frame_len = 0;          uint8_t cf = 0x03; -        int fd; - -        uint16_t size;          uint16_t length; -  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)          struct pollfd pfd;          struct tpacket_hdr * header;          uint8_t * frame;  #else          uint8_t frame[SHIM_ETH_LLC_MAX_SDU_SIZE]; -#ifdef __FreeBSD__ -        struct sockaddr_dl    device; -#else -        struct sockaddr_ll    device; -#endif  #endif          struct eth_llc_frame * llc_frame; +          if (payload == NULL) {                  LOG_ERR("Payload was NULL.");                  return -1; @@ -333,79 +196,75 @@ static int eth_llc_ipcp_send_frame(uint8_t   dst_addr[MAC_SIZE],          if (len > SHIM_ETH_LLC_MAX_SDU_SIZE)                  return -1; -        fd = (shim_data(_ipcp))->s_fd; -  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) -        header = (void *) shim_data(_ipcp)->tx_ring + -                (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE); +        header = (void *) (eth_llc_data.tx_ring + +                           eth_llc_data.tx_offset * SHM_RDRB_BLOCK_SIZE);          while (header->tp_status != TP_STATUS_AVAILABLE) { -                pfd.fd = fd; +                pfd.fd = eth_llc_data.s_fd;                  pfd.revents = 0;                  pfd.events = POLLIN | POLLRDNORM | POLLERR;                  if (poll(&pfd, 1, -1) <= 0) { -                        LOG_ERR("Failed to poll: %s.", strerror(errno)); +                        LOG_ERR("Failed to poll.");                          continue;                  } -                header = (void *) shim_data(_ipcp)->tx_ring + -                        (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE); +                header = (void *) (eth_llc_data.tx_ring +                                   + eth_llc_data.tx_offset +                                   * SHM_RDRB_BLOCK_SIZE);          } -        frame = (void *) header + TPACKET_HDRLEN - sizeof(struct sockaddr_ll); +        frame = (uint8_t *) header +                + TPACKET_HDRLEN - sizeof(struct sockaddr_ll);  #endif -          llc_frame = (struct eth_llc_frame *) frame; -        memcpy(&llc_frame->dst_hwaddr, dst_addr, MAC_SIZE); -        memcpy(&llc_frame->src_hwaddr, +        memcpy(llc_frame->dst_hwaddr, dst_addr, MAC_SIZE); +        memcpy(llc_frame->src_hwaddr,  #ifdef __FreeBSD__ -               LLADDR(&shim_data(_ipcp)->device), +               LLADDR(ð_llc_data.device),  #else -               shim_data(_ipcp)->device.sll_addr, +               eth_llc_data.device.sll_addr,  #endif                 MAC_SIZE); -        length = htons(LLC_HEADER_SIZE + sizeof(size) + len); +        length = htons(LLC_HEADER_SIZE + len);          memcpy(&llc_frame->length, &length, sizeof(length));          llc_frame->dsap = dsap;          llc_frame->ssap = ssap; -        llc_frame->cf = cf; -        /* write the payload length, can't trust the driver */ -        size = htons(len); -        memcpy(&llc_frame->size, &size, sizeof(size)); +        llc_frame->cf   = cf;          memcpy(&llc_frame->payload, payload, len); -        frame_len = ETH_HEADER_SIZE + LLC_HEADER_SIZE + sizeof(uint16_t) + len; +        frame_len = ETH_HEADER_SIZE + LLC_HEADER_SIZE + len;  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)          header->tp_len = frame_len;          header->tp_status = TP_STATUS_SEND_REQUEST; -        if (send(fd, NULL, 0, MSG_DONTWAIT) < 0) { +        if (send(eth_llc_data.s_fd, NULL, 0, MSG_DONTWAIT) < 0) {                  LOG_ERR("Failed to write frame into TX_RING.");                  return -1;          } -        shim_data(_ipcp)->tx_offset = -                (shim_data(_ipcp)->tx_offset + 1) -                & (SHM_BUFFER_SIZE -1); +        eth_llc_data.tx_offset = +                (eth_llc_data.tx_offset + 1) & (SHM_BUFFER_SIZE - 1);  #else -        device = (shim_data(_ipcp))->device; - -        if (sendto(fd, frame, frame_len, 0, -                   (struct sockaddr *) &device, sizeof(device)) <= 0) { +        if (sendto(eth_llc_data.s_fd, +                   frame, +                   frame_len, +                   0, +                   (struct sockaddr *) ð_llc_data.device, +                   sizeof(eth_llc_data.device)) <= 0) {                  LOG_ERR("Failed to send message.");                  return -1;          }  #endif -          return 0;  }  static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg, -                                        uint8_t              dst_addr[MAC_SIZE]) +                                        uint8_t *            dst_addr)  {          size_t    len;          uint8_t * buf; @@ -423,6 +282,7 @@ static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg,          if (eth_llc_ipcp_send_frame(dst_addr, reverse_bits(MGMT_SAP),                                      reverse_bits(MGMT_SAP), buf, len)) {                  LOG_ERR("Failed to send management frame."); +                free(buf);                  return -1;          } @@ -431,10 +291,10 @@ static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg,          return 0;  } -static int eth_llc_ipcp_port_alloc(uint8_t dst_addr[MAC_SIZE], -                                   uint8_t ssap, -                                   char *  dst_name, -                                   char *  src_ae_name) +static int eth_llc_ipcp_sap_alloc(uint8_t * dst_addr, +                                  uint8_t   ssap, +                                  char *    dst_name, +                                  char *    src_ae_name)  {          shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT; @@ -446,10 +306,10 @@ static int eth_llc_ipcp_port_alloc(uint8_t dst_addr[MAC_SIZE],          return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);  } -static int eth_llc_ipcp_port_alloc_resp(uint8_t dst_addr[MAC_SIZE], -                                        uint8_t ssap, -                                        uint8_t dsap, -                                        int     response) +static int eth_llc_ipcp_sap_alloc_resp(uint8_t * dst_addr, +                                       uint8_t   ssap, +                                       uint8_t   dsap, +                                       int       response)  {          shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT; @@ -463,8 +323,7 @@ static int eth_llc_ipcp_port_alloc_resp(uint8_t dst_addr[MAC_SIZE],          return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);  } -static int eth_llc_ipcp_port_dealloc(uint8_t dst_addr[MAC_SIZE], -                                     uint8_t ssap) +static int eth_llc_ipcp_sap_dealloc(uint8_t * dst_addr, uint8_t ssap)  {          shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT; @@ -474,142 +333,102 @@ static int eth_llc_ipcp_port_dealloc(uint8_t dst_addr[MAC_SIZE],          return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);  } -static int eth_llc_ipcp_port_req(uint8_t r_sap, -                                 uint8_t r_addr[MAC_SIZE], -                                 char *  dst_name, -                                 char *  src_ae_name) +static int eth_llc_ipcp_sap_req(uint8_t   r_sap, +                                uint8_t * r_addr, +                                char *    dst_name, +                                char *    src_ae_name)  { -        int port_id; -        ssize_t index = 0; -        int i; +        int fd; -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); - -        index = bmp_allocate(shim_data(_ipcp)->indices); -        if (index < 0) { -                pthread_rwlock_unlock(&_ipcp->state_lock); -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                LOG_ERR("Out of free indices."); -                return -1; -        } +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(ð_llc_data.flows_lock);          /* reply to IRM */ -        port_id = ipcp_flow_req_arr(getpid(), -                                    dst_name, -                                    src_ae_name); - -        if (port_id < 0) { -                bmp_release(shim_data(_ipcp)->indices, index); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                LOG_ERR("Could not get port id from IRMd."); +        fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); +        if (fd < 0) { +                pthread_rwlock_unlock(ð_llc_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("Could not get new flow from IRMd.");                  return -1;          } -        ipcp_flow(index)->port_id = port_id; -        ipcp_flow(index)->rb      = NULL; -        ipcp_flow(index)->state   = FLOW_PENDING; -        shim_data(_ipcp)->flows[index].r_sap = r_sap; -        for (i = 0; i < MAC_SIZE; i++) { -                shim_data(_ipcp)->flows[index].r_addr[i] = r_addr[i]; -        } +        eth_llc_data.fd_to_ef[fd].r_sap = r_sap; +        memcpy(eth_llc_data.fd_to_ef[fd].r_addr, r_addr, MAC_SIZE); -        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(ð_llc_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        LOG_DBG("New flow request, port_id %d, remote SAP %d.", port_id, r_sap); +        LOG_DBG("New flow request, fd %d, remote SAP %d.", fd, r_sap);          return 0;  } -static int eth_llc_ipcp_port_alloc_reply(uint8_t ssap, -                                         uint8_t r_addr[MAC_SIZE], -                                         int     dsap, -                                         int     response) +static int eth_llc_ipcp_sap_alloc_reply(uint8_t   ssap, +                                        uint8_t * r_addr, +                                        int       dsap, +                                        int       response)  { -        int index = -1;          int ret = 0; -        int port_id = -1; -        int i; +        int fd = -1; -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_rdlock(& eth_llc_data.flows_lock); -        index = sap_to_index(ssap); -        if (index < 0) { -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +        fd = eth_llc_data.ef_to_fd[dsap]; +        if (fd < 0) { +                pthread_rwlock_unlock(& eth_llc_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("No flow found with that SAP.");                  return -1; /* -EFLOWNOTFOUND */          } -        if (ipcp_flow(index)->state != FLOW_PENDING) { -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                return -1; /* -EFLOWNOTPENDING */ -        } - -        port_id = ipcp_flow(index)->port_id; -          if (response) { -                destroy_ipcp_flow(index); +                bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap);          } else { -                ipcp_flow(index)->state = FLOW_ALLOCATED; -                shim_data(_ipcp)->flows[index].r_sap = dsap; -                for (i = 0; i < MAC_SIZE; i++) { -                        shim_data(_ipcp)->flows[index].r_addr[i] = r_addr[i]; -                } +                eth_llc_data.fd_to_ef[fd].r_sap = ssap; +                memcpy(eth_llc_data.fd_to_ef[fd].r_addr, r_addr, MAC_SIZE);          } -        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(ð_llc_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        LOG_DBG("Flow reply, port_id %d, remote SAP %d.", port_id, dsap); +        LOG_DBG("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap); -        if ((ret = ipcp_flow_alloc_reply(getpid(), -                                         port_id, -                                         response)) < 0) { -                return -1; /* -EPIPE */ -        } +        if ((ret = ipcp_flow_alloc_reply(fd, response)) < 0) +                return -1;          return ret;  } -static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, -                                         uint8_t r_addr[MAC_SIZE]) +static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr)  { -        int port_id = -1; -        int i = 0; +        int fd = -1; -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(ð_llc_data.flows_lock); -        i = sap_to_index(ssap); -        if (i < 0) { -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +        fd = eth_llc_data.ef_to_fd[ssap]; +        if (fd < 0) { +                pthread_rwlock_unlock(ð_llc_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("No flow found for remote deallocation request.");                  return 0;          } -        port_id = ipcp_flow(i)->port_id; -        destroy_ipcp_flow(i); +        bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); -        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(ð_llc_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        irm_flow_dealloc(port_id); +        flow_dealloc(fd); -        LOG_DBG("Flow with port_id %d deallocated.", port_id); +        LOG_DBG("Flow with fd %d deallocated.", fd);          return 0;  } -static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, -                                   size_t    len, -                                   uint8_t   r_addr[MAC_SIZE]) +static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)  {          shim_eth_llc_msg_t * msg = NULL; @@ -621,27 +440,24 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf,          switch (msg->code) {          case SHIM_ETH_LLC_MSG_CODE__FLOW_REQ: -                if (ipcp_data_is_in_registry(_ipcp->data, -                                             msg->dst_name)) { -                        eth_llc_ipcp_port_req(msg->ssap, -                                              r_addr, -                                              msg->dst_name, -                                              msg->src_ae_name); +                if (ipcp_data_is_in_registry(ipcpi.data, msg->dst_name)) { +                        eth_llc_ipcp_sap_req(msg->ssap, +                                             r_addr, +                                             msg->dst_name, +                                             msg->src_ae_name);                  }                  break;          case SHIM_ETH_LLC_MSG_CODE__FLOW_REPLY: -                eth_llc_ipcp_port_alloc_reply(msg->ssap, -                                              r_addr, -                                              msg->dsap, -                                              msg->response); +                eth_llc_ipcp_sap_alloc_reply(msg->ssap, +                                             r_addr, +                                             msg->dsap, +                                             msg->response);                  break;          case SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC: -                eth_llc_ipcp_flow_dealloc_req(msg->ssap, -                                              r_addr); +                eth_llc_ipcp_flow_dealloc_req(msg->ssap, r_addr);                  break;          default: -                LOG_ERR("Unknown message received %d.", -                        msg->code); +                LOG_ERR("Unknown message received %d.", msg->code);                  shim_eth_llc_msg__free_unpacked(msg, NULL);                  return -1;          } @@ -652,15 +468,11 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf,  static void * eth_llc_ipcp_sdu_reader(void * o)  { -        ssize_t index; -        struct rb_entry e;          uint8_t br_addr[MAC_SIZE]; +        uint16_t length;          uint8_t dsap;          uint8_t ssap; -        int i = 0; -        struct eth_llc_frame * llc_frame; -        uint16_t size; - +        int fd;  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)          struct pollfd pfd;          int offset = 0; @@ -670,43 +482,43 @@ static void * eth_llc_ipcp_sdu_reader(void * o)          uint8_t buf[ETH_FRAME_SIZE];          int frame_len = 0;  #endif +        struct eth_llc_frame * llc_frame;          memset(br_addr, 0xff, MAC_SIZE * sizeof(uint8_t));          while (true) {  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) -                header = (void *) shim_data(_ipcp)->rx_ring + -                        (offset * SHM_RDRB_BLOCK_SIZE); +                header = (void *) (eth_llc_data.rx_ring + +                                   offset * SHM_RDRB_BLOCK_SIZE);                  while (!(header->tp_status & TP_STATUS_USER)) { -                        pfd.fd = shim_data(_ipcp)->s_fd; +                        pfd.fd = eth_llc_data.s_fd;                          pfd.revents = 0;                          pfd.events = POLLIN | POLLRDNORM | POLLERR;                          if (poll(&pfd, 1, -1) <= 0) { -                                LOG_ERR("Failed to poll: %s.", strerror(errno)); +                                LOG_ERR("Failed to poll.");                                  continue;                          } -                        header = (void *) shim_data(_ipcp)->rx_ring + -                                (offset * SHM_RDRB_BLOCK_SIZE); +                        header = (void *) (eth_llc_data.rx_ring + +                                           offset * SHM_RDRB_BLOCK_SIZE);                  } -                buf = (void * ) header + header->tp_mac; +                buf = (uint8_t * ) header + header->tp_mac;  #else -                frame_len = recv(shim_data(_ipcp)->s_fd, buf, +                frame_len = recv(eth_llc_data.s_fd, buf,                                   SHIM_ETH_LLC_MAX_SDU_SIZE, 0);                  if (frame_len < 0) { -                        LOG_ERR("Failed to recv frame."); +                        LOG_ERR("Failed to receive frame.");                          continue;                  }  #endif -                  llc_frame = (struct eth_llc_frame *) buf;  #ifdef __FreeBSD__ -                if (memcmp(LLADDR(&shim_data(_ipcp)->device), +                if (memcmp(LLADDR(ð_llc_data.device),  #else -                if (memcmp(shim_data(_ipcp)->device.sll_addr, +                if (memcmp(eth_llc_data.device.sll_addr,  #endif                             &llc_frame->dst_hwaddr,                             MAC_SIZE) && @@ -721,46 +533,29 @@ static void * eth_llc_ipcp_sdu_reader(void * o)                  dsap = reverse_bits(llc_frame->dsap);                  ssap = reverse_bits(llc_frame->ssap); -                memcpy(&size, &llc_frame->size, sizeof(size)); +                memcpy(&length, &llc_frame->length, sizeof(length)); +                length = ntohs(length) - LLC_HEADER_SIZE;                  if (ssap == MGMT_SAP && dsap == MGMT_SAP) {                          eth_llc_ipcp_mgmt_frame(&llc_frame->payload, -                                                ntohs(size), +                                                length,                                                  llc_frame->src_hwaddr);                  } else { -                        pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock); - -                        i = addr_and_saps_to_index(llc_frame->src_hwaddr, -                                                   ssap, -                                                   dsap); -                        if (i < 0) { -                                pthread_rwlock_unlock(&shim_data(_ipcp)-> -                                                      flows_lock); +                        pthread_rwlock_rdlock(ð_llc_data.flows_lock); + +                        fd = eth_llc_data.ef_to_fd[dsap]; +                        if (fd < 0) { +                                pthread_rwlock_unlock(ð_llc_data.flows_lock);  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) -                                offset = (offset + 1) -                                        & (SHM_BUFFER_SIZE - 1); +                                offset = (offset + 1) & (SHM_BUFFER_SIZE - 1);                                  header->tp_status = TP_STATUS_KERNEL;  #endif                                  continue;                          } -                        while ((index = -                                shm_rdrbuff_write(shim_data(_ipcp)->rdrb, -                                                 ipcp_flow(i)->api, -                                                 0, -                                                 0, -                                                 &llc_frame->payload, -                                                 ntohs(size))) -                                < 0) -                                ; - -                        e.index = index; -                        e.port_id = ipcp_flow(i)->port_id; - -                        while (shm_ap_rbuff_write(ipcp_flow(i)->rb, &e) < 0) -                                ; +                        pthread_rwlock_unlock(ð_llc_data.flows_lock); -                        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); +                        flow_write(fd, &llc_frame->payload, length);                  }  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)                  offset = (offset + 1) & (SHM_BUFFER_SIZE -1); @@ -774,51 +569,34 @@ static void * eth_llc_ipcp_sdu_reader(void * o)  static void * eth_llc_ipcp_sdu_writer(void * o)  {          while (true) { -                struct rb_entry * e; -                int i; -                int len = 0; -                uint8_t * buf; +                int fd; +                struct shm_du_buff * sdb;                  uint8_t ssap;                  uint8_t dsap; +                uint8_t r_addr[MAC_SIZE]; -                e = shm_ap_rbuff_read(shim_data(_ipcp)->rb); -                if (e == NULL) -                        continue; - -                pthread_rwlock_rdlock(&_ipcp->state_lock); - -                len = shm_rdrbuff_read((uint8_t **) &buf, -                                      shim_data(_ipcp)->rdrb, -                                      e->index); -                if (len <= 0) { -                        free(e); -                        LOG_ERR("Length of du map read was %d.", len); -                        continue; -                } +                pthread_rwlock_rdlock(&ipcpi.state_lock); -                pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock); - -                i = port_id_to_index(e->port_id); -                if (i < 0) { -                        free(e); -                        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); +                fd = ipcp_flow_read(&sdb); +                if (fd < 0) { +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          continue;                  } -                ssap = reverse_bits(shim_data(_ipcp)->flows[i].sap); -                dsap = reverse_bits(shim_data(_ipcp)->flows[i].r_sap); - -                eth_llc_ipcp_send_frame(shim_data(_ipcp)->flows[i].r_addr, -                                        dsap, ssap, buf, len); - -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); +                pthread_rwlock_rdlock(ð_llc_data.flows_lock); -                if (shim_data(_ipcp)->rdrb != NULL) -                        shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, e->index); +                ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); +                dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); +                memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(ð_llc_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock); -                free(e); +                eth_llc_ipcp_send_frame(r_addr, dsap, ssap, +                                        shm_du_buff_head(sdb), +                                        shm_du_buff_tail(sdb) +                                        - shm_du_buff_head(sdb)); +                ipcp_flow_del(sdb);          }          return (void *) 1; @@ -826,10 +604,6 @@ static void * eth_llc_ipcp_sdu_writer(void * o)  void ipcp_sig_handler(int sig, siginfo_t * info, void * c)  { -        sigset_t  sigset; -        sigemptyset(&sigset); -        sigaddset(&sigset, SIGINT); -          switch(sig) {          case SIGINT:          case SIGTERM: @@ -838,12 +612,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)                          LOG_DBG("IPCP %d terminating by order of %d. Bye.",                                  getpid(), info->si_pid); -                        pthread_rwlock_wrlock(&_ipcp->state_lock); - -                        ipcp_set_state(_ipcp, IPCP_SHUTDOWN); +                        pthread_rwlock_wrlock(&ipcpi.state_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        ipcp_set_state(IPCP_SHUTDOWN); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                  }          default:                  return; @@ -852,7 +625,7 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)  static int eth_llc_ipcp_bootstrap(struct dif_config * conf)  { -        int fd = -1; +        int skfd = -1;          struct ifreq ifr;          int idx;  #ifdef __FreeBSD__ @@ -882,17 +655,10 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          memset(&ifr, 0, sizeof(ifr)); -        fd = socket(AF_UNIX, SOCK_STREAM, 0); -        if (fd < 0) { -                LOG_ERR("Failed to open socket."); -                return -1; -        } -          memcpy(ifr.ifr_name, conf->if_name, strlen(conf->if_name));  #ifdef __FreeBSD__          if (getifaddrs(&ifaddr) < 0)  { -                close(fd);                  LOG_ERR("Could not get interfaces.");                  return -1;          } @@ -900,8 +666,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          for (ifa = ifaddr, idx = 0; ifa != NULL; ifa = ifa->ifa_next, ++idx) {                  if (strcmp(ifa->ifa_name, conf->if_name))                          continue; -                LOG_DBGF("Interface %s found.", conf->if_name); - +                LOG_DBG("Interface %s found.", conf->if_name);                  memcpy(&ifr.ifr_addr, ifa->ifa_addr, sizeof(*ifa->ifa_addr));                  break;          } @@ -913,30 +678,33 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          }          freeifaddrs(ifaddr); -  #else -        if (ioctl(fd, SIOCGIFHWADDR, &ifr)) { -                close(fd); -                LOG_ERR("Failed to ioctl: %s.", strerror(errno)); +        skfd = socket(AF_UNIX, SOCK_STREAM, 0); +        if (skfd < 0) { +                LOG_ERR("Failed to open socket."); +                return -1; +        } + +        if (ioctl(skfd, SIOCGIFHWADDR, &ifr)) { +                LOG_ERR("Failed to ioctl."); +                close(skfd);                  return -1;          } +        close(skfd); +          idx = if_nametoindex(conf->if_name);          if (idx == 0) {                  LOG_ERR("Failed to retrieve interface index."); +                close(skfd);                  return -1;          }  #endif - -        close(fd); -          memset(&(device), 0, sizeof(device));  #ifdef __FreeBSD__          device.sdl_index = idx;          device.sdl_family = AF_LINK; -        memcpy(LLADDR(&device), -               ifr.ifr_addr.sa_data, -               MAC_SIZE * sizeof (uint8_t)); +        memcpy(LLADDR(&device), ifr.ifr_addr.sa_data, MAC_SIZE);          device.sdl_alen = MAC_SIZE;          /* TODO: replace socket calls with bpf for BSD */          LOG_MISSING; @@ -944,23 +712,21 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)  #else          device.sll_ifindex = idx;          device.sll_family = AF_PACKET; -        memcpy(device.sll_addr, -               ifr.ifr_hwaddr.sa_data, -               MAC_SIZE * sizeof (uint8_t)); +        memcpy(device.sll_addr, ifr.ifr_hwaddr.sa_data, MAC_SIZE);          device.sll_halen = MAC_SIZE;          device.sll_protocol = htons(ETH_P_ALL); -        fd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_802_2)); +        skfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_802_2));  #endif -        if (fd < 0) { -                LOG_ERR("Failed to create socket: %s.", strerror(errno)); +        if (skfd < 0) { +                LOG_ERR("Failed to create socket.");                  return -1;          }  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)          if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_RDRB_BLOCK_SIZE) {                  LOG_ERR("Max SDU size is bigger than DU map block size."); -                close(fd); +                close(skfd);                  return -1;          } @@ -969,68 +735,68 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          req.tp_block_nr = SHM_BUFFER_SIZE;          req.tp_frame_nr = SHM_BUFFER_SIZE; -        if (setsockopt(fd, SOL_PACKET, PACKET_RX_RING, +        if (setsockopt(skfd, SOL_PACKET, PACKET_RX_RING,                         (void *) &req, sizeof(req))) {                  LOG_ERR("Failed to set sockopt PACKET_RX_RING"); -                close(fd); +                close(skfd);                  return -1;          } -        if (setsockopt(fd, SOL_PACKET, PACKET_TX_RING, +        if (setsockopt(skfd, SOL_PACKET, PACKET_TX_RING,                         (void *) &req, sizeof(req))) {                  LOG_ERR("Failed to set sockopt PACKET_TX_RING"); -                close(fd); +                close(skfd);                  return -1;          }  #endif - -        if (bind(fd,(struct sockaddr *) &device, sizeof(device))) { +        if (bind(skfd, (struct sockaddr *) &device, sizeof(device))) {                  LOG_ERR("Failed to bind socket to interface"); -                close(fd); +                close(skfd);                  return -1;          }  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) -        shim_data(_ipcp)->rx_ring = mmap(NULL, -                                         2 * SHM_RDRB_BLOCK_SIZE -                                         * SHM_BUFFER_SIZE, -                                         PROT_READ | PROT_WRITE, MAP_SHARED, -                                         fd, 0); -        if (shim_data(_ipcp)->rx_ring == NULL) { +        eth_llc_data.rx_ring = mmap(NULL, 2 * SHM_RDRB_BLOCK_SIZE +                                    * SHM_BUFFER_SIZE, +                                    PROT_READ | PROT_WRITE, MAP_SHARED, +                                    skfd, 0); +        if (eth_llc_data.rx_ring == NULL) {                  LOG_ERR("Failed to mmap"); -                close(fd); +                close(skfd);                  return -1;          } -        shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring -                + (SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE); +        eth_llc_data.tx_ring = eth_llc_data.rx_ring +                + SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE;  #endif -        pthread_rwlock_wrlock(&_ipcp->state_lock); +        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_INIT) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_get_state() != IPCP_INIT) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("IPCP in wrong state."); -                close(fd); +                close(skfd);                  return -1;          } -        shim_data(_ipcp)->s_fd = fd; -        shim_data(_ipcp)->device = device; -        shim_data(_ipcp)->tx_offset = 0; +        eth_llc_data.s_fd = skfd; +        eth_llc_data.device = device; +#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) +        eth_llc_data.tx_offset = 0; +#endif -        ipcp_set_state(_ipcp, IPCP_ENROLLED); +        ipcp_set_state(IPCP_ENROLLED); -        pthread_create(&shim_data(_ipcp)->sdu_reader, +        pthread_create(ð_llc_data.sdu_reader,                         NULL,                         eth_llc_ipcp_sdu_reader,                         NULL); -        pthread_create(&shim_data(_ipcp)->sdu_writer, +        pthread_create(ð_llc_data.sdu_writer,                         NULL,                         eth_llc_ipcp_sdu_writer,                         NULL); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          LOG_DBG("Bootstrapped shim IPCP over Ethernet with LLC with api %d.",                  getpid()); @@ -1040,15 +806,15 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)  static int eth_llc_ipcp_name_reg(char * name)  { -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_data_add_reg_entry(_ipcp->data, name)) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_data_add_reg_entry(ipcpi.data, name)) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to add %s to local registry.", name);                  return -1;          } -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          LOG_DBG("Registered %s.", name); @@ -1057,25 +823,22 @@ static int eth_llc_ipcp_name_reg(char * name)  static int eth_llc_ipcp_name_unreg(char * name)  { -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        ipcp_data_del_reg_entry(_ipcp->data, name); +        ipcp_data_del_reg_entry(ipcpi.data, name); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } -static int eth_llc_ipcp_flow_alloc(pid_t         n_api, -                                   int           port_id, +static int eth_llc_ipcp_flow_alloc(int           fd,                                     char *        dst_name,                                     char *        src_ae_name,                                     enum qos_cube qos)  { -        struct shm_ap_rbuff * rb;          uint8_t ssap = 0;          uint8_t r_addr[MAC_SIZE]; -        int index = 0;          LOG_INFO("Allocating flow to %s.", dst_name); @@ -1083,182 +846,122 @@ static int eth_llc_ipcp_flow_alloc(pid_t         n_api,                  return -1;          if (qos != QOS_CUBE_BE) -                LOG_DBGF("QoS requested. Ethernet LLC can't do that. For now."); - -        rb = shm_ap_rbuff_open_s(n_api); -        if (rb == NULL) -                return -1; /* -ENORBUFF */ +                LOG_DBG("QoS requested. Ethernet LLC can't do that. For now."); -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { -                pthread_rwlock_unlock(&_ipcp->state_lock); -                shm_ap_rbuff_close(rb); -                LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); +        if (ipcp_get_state() != IPCP_ENROLLED) { +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_DBG("Won't allocate flow with non-enrolled IPCP.");                  return -1; /* -ENOTENROLLED */          } -        index = bmp_allocate(shim_data(_ipcp)->indices); -        if (index < 0) { -                pthread_rwlock_unlock(&_ipcp->state_lock); -                shm_ap_rbuff_close(rb); -                return -1; -        } - -        pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); +        pthread_rwlock_wrlock(ð_llc_data.flows_lock); -        ssap = bmp_allocate(shim_data(_ipcp)->saps); +        ssap = bmp_allocate(eth_llc_data.saps);          if (ssap < 0) { -                shm_ap_rbuff_close(rb); -                bmp_release(shim_data(_ipcp)->indices, index); -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(ð_llc_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        ipcp_flow(index)->port_id = port_id; -        ipcp_flow(index)->state = FLOW_PENDING; -        ipcp_flow(index)->rb = rb; -        shim_data(_ipcp)->flows[index].sap = ssap; - -        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); - -        memset(r_addr, 0xff, MAC_SIZE * sizeof(uint8_t)); - -        if (eth_llc_ipcp_port_alloc(r_addr, ssap, -                                    dst_name, -                                    src_ae_name) < 0) { -                LOG_DBGF("Port alloc returned -1."); -                pthread_rwlock_rdlock(&_ipcp->state_lock); -                pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); -                destroy_ipcp_flow(index); -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +        eth_llc_data.fd_to_ef[fd].sap = ssap; +        eth_llc_data.ef_to_fd[ssap]   = fd; + +        pthread_rwlock_unlock(ð_llc_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); + +        memset(r_addr, 0xff, MAC_SIZE); + +        if (eth_llc_ipcp_sap_alloc(r_addr, ssap, dst_name, src_ae_name) < 0) { +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                pthread_rwlock_wrlock(ð_llc_data.flows_lock); +                bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); +                eth_llc_data.fd_to_ef[fd].sap = -1; +                eth_llc_data.ef_to_fd[ssap]   = -1; +                pthread_rwlock_unlock(ð_llc_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        LOG_DBG("Pending flow with port_id %d on SAP %d.", -                port_id, ssap); +        LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap); -        return index; +        return 0;  } -static int eth_llc_ipcp_flow_alloc_resp(pid_t n_api, -                                        int   port_id, -                                        int   response) +static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)  { -        struct shm_ap_rbuff * rb; -        int index = -1;          uint8_t ssap = 0; +        uint8_t r_sap = 0; +        uint8_t r_addr[MAC_SIZE]; -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); - -        index = port_id_to_index(port_id); -        if (index < 0) { -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBGF("Could not find flow with port_id %d.", port_id); -                return -1; -        } - -        if (ipcp_flow(index)->state != FLOW_PENDING) { -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBGF("Flow was not pending."); -                return -1; -        } - -        rb = shm_ap_rbuff_open_s(n_api); -        if (rb == NULL) { -                LOG_ERR("Could not open N + 1 ringbuffer."); -                ipcp_flow(index)->state = FLOW_NULL; -                ipcp_flow(index)->port_id = -1; -                bmp_release(shim_data(_ipcp)->indices, index); -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                return -1; -        } +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(ð_llc_data.flows_lock); -        ssap = bmp_allocate(shim_data(_ipcp)->saps); +        ssap = bmp_allocate(eth_llc_data.saps);          if (ssap < 0) { -                ipcp_flow(index)->state = FLOW_NULL; -                ipcp_flow(index)->port_id = -1; -                shm_ap_rbuff_close(ipcp_flow(index)->rb); -                bmp_release(shim_data(_ipcp)->indices, index); -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(ð_llc_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        ipcp_flow(index)->state = FLOW_ALLOCATED; -        ipcp_flow(index)->rb = rb; -        shim_data(_ipcp)->flows[index].sap = ssap; - -        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        eth_llc_data.fd_to_ef[fd].sap = ssap; +        memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); +        r_sap = eth_llc_data.fd_to_ef[fd].r_sap; +        eth_llc_data.ef_to_fd[ssap] = fd; -        if (eth_llc_ipcp_port_alloc_resp(shim_data(_ipcp)->flows[index].r_addr, -                                         shim_data(_ipcp)->flows[index].r_sap, -                                         ssap, -                                         response) < 0) { -                pthread_rwlock_rdlock(&_ipcp->state_lock); -                pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); -                destroy_ipcp_flow(index); -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(ð_llc_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_DBGF("Could not send response."); +        if (eth_llc_ipcp_sap_alloc_resp(r_addr, ssap, r_sap, response) < 0) { +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                pthread_rwlock_wrlock(ð_llc_data.flows_lock); +                bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); +                pthread_rwlock_unlock(ð_llc_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        LOG_DBG("Accepted flow, port_id %d, SAP %d.", port_id, ssap); +        LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap);          return 0;  } -static int eth_llc_ipcp_flow_dealloc(int port_id) +static int eth_llc_ipcp_flow_dealloc(int fd)  { -        int index = -1;          uint8_t sap;          uint8_t addr[MAC_SIZE]; -        int i;          int ret; -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(ð_llc_data.flows_lock); -        index = port_id_to_index(port_id); -        if (index < 0) { -                pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                return 0; -        } +        sap = eth_llc_data.fd_to_ef[fd].r_sap; +        memcpy(addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); -        sap = shim_data(_ipcp)->flows[index].r_sap; -        for (i = 0; i < MAC_SIZE; i++) { -                addr[i] = shim_data(_ipcp)->flows[index].r_addr[i]; -        } +        bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); +        eth_llc_data.fd_to_ef[fd].sap = -1; +        eth_llc_data.fd_to_ef[fd].r_sap = -1; +        memset(ð_llc_data.fd_to_ef[fd].r_addr, 0, MAC_SIZE); -        destroy_ipcp_flow(index); +        eth_llc_data.ef_to_fd[sap] = -1; -        pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); +        pthread_rwlock_unlock(ð_llc_data.flows_lock); -        ret = eth_llc_ipcp_port_dealloc(addr, sap); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        ret = eth_llc_ipcp_sap_dealloc(addr, sap); +        pthread_rwlock_unlock(&ipcpi.state_lock);          if (ret < 0) -                LOG_DBGF("Could not notify remote."); +                LOG_DBG("Could not notify remote."); -        LOG_DBG("Flow with port_id %d deallocated.", port_id); +        LOG_DBG("Flow with fd %d deallocated.", fd);          return 0;  }  static struct ipcp_ops eth_llc_ops = {          .ipcp_bootstrap       = eth_llc_ipcp_bootstrap, -        .ipcp_enroll          = NULL,                       /* shim */ +        .ipcp_enroll          = NULL,          .ipcp_name_reg        = eth_llc_ipcp_name_reg,          .ipcp_name_unreg      = eth_llc_ipcp_name_unreg,          .ipcp_flow_alloc      = eth_llc_ipcp_flow_alloc, @@ -1270,7 +973,6 @@ int main(int argc, char * argv[])  {          struct sigaction sig_act;          sigset_t  sigset; -        int i = 0;          sigemptyset(&sigset);          sigaddset(&sigset, SIGINT); @@ -1283,6 +985,14 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } +        if (eth_llc_data_init() < 0) +                exit(EXIT_FAILURE); + +        if (ap_init(NULL) < 0) { +                close_logfile(); +                exit(EXIT_FAILURE); +        } +          /* store the process id of the irmd */          irmd_api = atoi(argv[1]); @@ -1298,35 +1008,13 @@ int main(int argc, char * argv[])          sigaction(SIGHUP,  &sig_act, NULL);          sigaction(SIGPIPE, &sig_act, NULL); -        _ipcp = ipcp_instance_create(); -        if (_ipcp == NULL) { -                LOG_ERR("Failed to create instance."); -                close_logfile(); -                exit(EXIT_FAILURE); -        } +        pthread_sigmask(SIG_BLOCK, &sigset, NULL); -        _ipcp->data = (struct ipcp_data *) eth_llc_ipcp_data_create(); -        if (_ipcp->data == NULL) { -                LOG_ERR("Failed to create instance data."); -                free(_ipcp); +        if (ipcp_init(THIS_TYPE, ð_llc_ops) < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } -        for (i = 0; i < AP_MAX_FLOWS; i++) { -                ipcp_flow(i)->rb = NULL; -                ipcp_flow(i)->port_id = -1; -                ipcp_flow(i)->state = FLOW_NULL; -        } - -        _ipcp->ops = ð_llc_ops; -        _ipcp->state = IPCP_INIT; - -        pthread_sigmask(SIG_BLOCK, &sigset, NULL); - -        pthread_create(&shim_data(_ipcp)->mainloop, NULL, -                       ipcp_main_loop, _ipcp); -          pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);          if (ipcp_create_r(getpid())) { @@ -1335,17 +1023,17 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        pthread_join(shim_data(_ipcp)->mainloop, NULL); +        ipcp_fini(); -        pthread_cancel(shim_data(_ipcp)->sdu_reader); -        pthread_cancel(shim_data(_ipcp)->sdu_writer); +        pthread_cancel(eth_llc_data.sdu_reader); +        pthread_cancel(eth_llc_data.sdu_writer); -        pthread_join(shim_data(_ipcp)->sdu_writer, NULL); -        pthread_join(shim_data(_ipcp)->sdu_reader, NULL); +        pthread_join(eth_llc_data.sdu_writer, NULL); +        pthread_join(eth_llc_data.sdu_reader, NULL); -        eth_llc_ipcp_data_destroy(); +        ap_fini(); -        free(_ipcp); +        eth_llc_data_fini();          close_logfile(); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index c35bd244..8c31e11a 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -22,18 +22,11 @@  #include <ouroboros/config.h>  #include "ipcp.h" -#include "flow.h"  #include "shim_udp_config.h" -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/irm_config.h> -#include <ouroboros/sockets.h> -#include <ouroboros/bitmap.h> -#include <ouroboros/flow.h>  #include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h>  #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -63,268 +56,93 @@ typedef ShimUdpMsg shim_udp_msg_t;  #define DNS_TTL 86400  #define FD_UPDATE_TIMEOUT 100 /* microseconds */ -#define shim_data(type) ((struct ipcp_udp_data *) type->data) +#define local_ip (udp_data.s_saddr.sin_addr.s_addr) -#define local_ip (((struct ipcp_udp_data *)                     \ -                   _ipcp->data)->s_saddr.sin_addr.s_addr) +#define UDP_MAX_PORTS 0xFFFF  /* global for trapping signal */  int irmd_api; -/* this IPCP's data */ -#ifdef MAKE_CHECK -extern struct ipcp * _ipcp; /* defined in test */ -#else -struct ipcp * _ipcp; -#endif - -/* - * copied from ouroboros/dev. The shim needs access to the internals - * because it doesn't follow all steps necessary steps to get - * the info - */ - -/* the shim needs access to these internals */ -struct shim_ap_data { -        pid_t                 api; -        struct shm_rdrbuff *  rdrb; -        struct bmp *          fds; -        struct shm_ap_rbuff * rb; - -        struct flow           flows[AP_MAX_FLOWS]; -        pthread_rwlock_t      flows_lock; - -        pthread_t             mainloop; -        pthread_t             sduloop; -        pthread_t             handler; -        pthread_t             sdu_reader; - -        bool                  fd_set_mod; -        pthread_cond_t        fd_set_cond; -        pthread_mutex_t       fd_set_lock; -} * _ap_instance; - -static int shim_ap_init() -{ -        int i; - -        _ap_instance = malloc(sizeof(struct shim_ap_data)); -        if (_ap_instance == NULL) { -                return -1; -        } - -        _ap_instance->api = getpid(); - -        _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); -        if (_ap_instance->fds == NULL) { -                free(_ap_instance); -                return -1; -        } - -        _ap_instance->rdrb = shm_rdrbuff_open(); -        if (_ap_instance->rdrb == NULL) { -                bmp_destroy(_ap_instance->fds); -                free(_ap_instance); -                return -1; -        } - -        _ap_instance->rb = shm_ap_rbuff_create_n(); -        if (_ap_instance->rb == NULL) { -                shm_rdrbuff_close(_ap_instance->rdrb); -                bmp_destroy(_ap_instance->fds); -                free(_ap_instance); -                return -1; -        } - -        for (i = 0; i < AP_MAX_FLOWS; i ++) { -                _ap_instance->flows[i].rb = NULL; -                _ap_instance->flows[i].port_id = -1; -                _ap_instance->flows[i].state = FLOW_NULL; -        } - -        pthread_rwlock_init(&_ap_instance->flows_lock, NULL); -        pthread_cond_init(&_ap_instance->fd_set_cond, NULL); -        pthread_mutex_init(&_ap_instance->fd_set_lock, NULL); - -        return 0; -} - -void shim_ap_fini() -{ -        int i = 0; - -        if (_ap_instance == NULL) -                return; - -        pthread_rwlock_rdlock(&_ipcp->state_lock); - -        if (_ipcp->state != IPCP_SHUTDOWN) -                LOG_WARN("Cleaning up AP while not in shutdown."); - -        if (_ap_instance->fds != NULL) -                bmp_destroy(_ap_instance->fds); - -        /* remove all remaining sdus */ -        while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) -                shm_rdrbuff_remove(_ap_instance->rdrb, i); - -        if (_ap_instance->rdrb != NULL) -                shm_rdrbuff_close(_ap_instance->rdrb); -        if (_ap_instance->rb != NULL) -                shm_ap_rbuff_destroy(_ap_instance->rb); +struct uf { +        int                udp; +        int                skfd; +}; -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +struct { +        uint32_t           ip_addr; +        uint32_t           dns_addr; +        /* listen server */ +        struct sockaddr_in s_saddr; +        int                s_fd; -        for (i = 0; i < AP_MAX_FLOWS; i ++) -                if (_ap_instance->flows[i].rb != NULL) -                        shm_ap_rbuff_close(_ap_instance->flows[i].rb); +        fd_set             flow_fd_s; +        /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ +        int                uf_to_fd[FD_SETSIZE]; +        struct uf          fd_to_uf[IRMD_MAX_FLOWS]; +        pthread_rwlock_t   flows_lock; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_t          sduloop; +        pthread_t          handler; +        pthread_t          sdu_reader; -        free(_ap_instance); -} +        bool               fd_set_mod; +        pthread_cond_t     fd_set_cond; +        pthread_mutex_t    fd_set_lock; +} udp_data; -/* only call this under flows_lock */ -static int port_id_to_fd(int port_id) +static void udp_data_init()  {          int i; -        for (i = 0; i < AP_MAX_FLOWS; ++i) { -                if (_ap_instance->flows[i].port_id == port_id -                    && _ap_instance->flows[i].state != FLOW_NULL) -                        return i; -        } +        for (i = 0; i < FD_SETSIZE; ++i) +                udp_data.uf_to_fd[i] = -1; -        return -1; -} - -static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) -{ -        ssize_t index; -        struct rb_entry e; - -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); - -        index = shm_rdrbuff_write_b(_ap_instance->rdrb, -                                   _ap_instance->flows[fd].api, -                                   0, -                                   0, -                                   (uint8_t *) buf, -                                   count); -        if (index < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                return -1; +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) { +                udp_data.fd_to_uf[i].skfd = -1; +                udp_data.fd_to_uf[i].udp = -1;          } -        e.index   = index; -        e.port_id = _ap_instance->flows[fd].port_id; - -        shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e); +        FD_ZERO(&udp_data.flow_fd_s); -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); - -        return 0; +        pthread_rwlock_init(&udp_data.flows_lock, NULL); +        pthread_cond_init(&udp_data.fd_set_cond, NULL); +        pthread_mutex_init(&udp_data.fd_set_lock, NULL);  } -/* - * end copy from dev.c - */ - -/* only call this under flows_lock */ -static int udp_port_to_fd(int udp_port) +static void udp_data_fini()  { -        int i; - -        struct sockaddr_in f_saddr; -        socklen_t len = sizeof(f_saddr); - -        for (i = 0; i < AP_MAX_FLOWS; ++i) { -                if (_ap_instance->flows[i].state == FLOW_NULL) -                        continue; - -                if (getsockname(i, (struct sockaddr *) &f_saddr, &len) < 0) -                        continue; - -                if (f_saddr.sin_port == udp_port) -                        return i; -        } - -        return -1; -} - -struct ipcp_udp_data { -        /* keep ipcp_data first for polymorphism */ -        struct ipcp_data ipcp_data; - -        uint32_t ip_addr; -        uint32_t dns_addr; -        /* listen server */ -        struct sockaddr_in s_saddr; -        int                s_fd; - -        /* only modify under _ap_instance->flows_lock */ -        fd_set flow_fd_s; -}; - -struct ipcp_udp_data * ipcp_udp_data_create() -{ -        struct ipcp_udp_data * udp_data; -        struct ipcp_data *     data; -        enum ipcp_type         ipcp_type; - -        udp_data = malloc(sizeof(*udp_data)); -        if (udp_data == NULL) { -                LOG_ERR("Failed to allocate."); -                return NULL; -        } - -        ipcp_type = THIS_TYPE; -        data = (struct ipcp_data *) udp_data; -        if (ipcp_data_init(data, ipcp_type) == NULL) { -                free(udp_data); -                return NULL; -        } - -        FD_ZERO(&udp_data->flow_fd_s); - -        return udp_data; +        pthread_rwlock_destroy(&udp_data.flows_lock); +        pthread_mutex_destroy(&udp_data.fd_set_lock); +        pthread_cond_destroy(&udp_data.fd_set_cond);  }  static void set_fd(int fd)  { -        pthread_mutex_lock(&_ap_instance->fd_set_lock); +        pthread_mutex_lock(&udp_data.fd_set_lock); -        _ap_instance->fd_set_mod = true; -        FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); +        udp_data.fd_set_mod = true; +        FD_SET(fd, &udp_data.flow_fd_s); -        while (_ap_instance->fd_set_mod) -                pthread_cond_wait(&_ap_instance->fd_set_cond, -                                  &_ap_instance->fd_set_lock); +        while (udp_data.fd_set_mod) +                pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); -        pthread_mutex_unlock(&_ap_instance->fd_set_lock); +        pthread_mutex_unlock(&udp_data.fd_set_lock);  }  static void clr_fd(int fd)  { -        pthread_mutex_lock(&_ap_instance->fd_set_lock); +        pthread_mutex_lock(&udp_data.fd_set_lock); -        _ap_instance->fd_set_mod = true; -        FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); +        udp_data.fd_set_mod = true; +        FD_CLR(fd, &udp_data.flow_fd_s); -        while (_ap_instance->fd_set_mod) -                pthread_cond_wait(&_ap_instance->fd_set_cond, -                                  &_ap_instance->fd_set_lock); +        while (udp_data.fd_set_mod) +                pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); -        pthread_mutex_unlock(&_ap_instance->fd_set_lock); +        pthread_mutex_unlock(&udp_data.fd_set_lock);  } - -static int send_shim_udp_msg(shim_udp_msg_t * msg, -                             uint32_t dst_ip_addr) +static int send_shim_udp_msg(shim_udp_msg_t * msg, uint32_t dst_ip_addr)  {         buffer_t           buf;         struct sockaddr_in r_saddr; @@ -340,13 +158,12 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg,         }         buf.data = malloc(SHIM_UDP_MSG_SIZE); -       if (buf.data == NULL) { +       if (buf.data == NULL)                 return -1; -       }         shim_udp_msg__pack(msg, buf.data); -       if (sendto(shim_data(_ipcp)->s_fd, +       if (sendto(udp_data.s_fd,                    buf.data,                    buf.len,                    0, @@ -409,8 +226,8 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,                               char * dst_name,                               char * src_ae_name)  { -        int  fd; -        int  port_id; +        int skfd; +        int fd;          struct sockaddr_in f_saddr;          socklen_t          f_saddr_len = sizeof(f_saddr); @@ -418,7 +235,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,          LOG_DBG("Port request arrived from UDP port %d",                   ntohs(c_saddr->sin_port)); -        if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { +        if ((skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {                  LOG_ERR("Could not create UDP socket.");                  return -1;          } @@ -426,73 +243,72 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,          memset((char *) &f_saddr, 0, sizeof(f_saddr));          f_saddr.sin_family      = AF_INET;          f_saddr.sin_addr.s_addr = local_ip; - -        /* -         * FIXME: we could have a port dedicated per registered AP -         * Not that critical for UDP, but will be for LLC -         */ -          f_saddr.sin_port        = 0; -        if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { +        if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) {                  LOG_ERR("Could not bind to socket."); -                close(fd); +                close(skfd);                  return -1;          } -        if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { +        if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) {                  LOG_ERR("Could not get address from fd.");                  return -1;          } -        /* -         * store the remote address in the file descriptor -         * this avoids having to store the sockaddr_in in -         * the flow structure -         */ - -        if (connect(fd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) { +        /* connect stores the remote address in the file descriptor */ +        if (connect(skfd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) {                  LOG_ERR("Could not connect to remote UDP client."); -                close(fd); +                close(skfd);                  return -1;          } -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock);          /* reply to IRM */ -        port_id = ipcp_flow_req_arr(getpid(), -                                    dst_name, -                                    src_ae_name); - -        if (port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_ERR("Could not get port id from IRMd"); -                close(fd); +        fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); +        if (fd < 0) { +                pthread_rwlock_unlock(&udp_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("Could not get new flow from IRMd."); +                close(skfd);                  return -1;          } -        _ap_instance->flows[fd].port_id = port_id; -        _ap_instance->flows[fd].rb      = NULL; -        _ap_instance->flows[fd].state   = FLOW_PENDING; +        pthread_rwlock_wrlock(&udp_data.flows_lock); + +        udp_data.uf_to_fd[skfd]    = fd; +        udp_data.fd_to_uf[fd].skfd = skfd; +        udp_data.fd_to_uf[fd].udp  = f_saddr.sin_port; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        LOG_DBG("Pending allocation request, port_id %d, UDP port (%d, %d).", -                port_id, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); +        LOG_DBG("Pending allocation request, fd %d, UDP port (%d, %d).", +                fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port));          return 0;  } +/* returns the n flow descriptor */ +static int udp_port_to_fd(int udp_port) +{ +        int i; + +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) +                if (udp_data.fd_to_uf[i].udp == udp_port) +                        return i; + +        return -1; +} +  static int ipcp_udp_port_alloc_reply(int src_udp_port,                                       int dst_udp_port,                                       int response)  { -        int  fd        = -1; -        int  ret       =  0; -        int  port_id   = -1; +        int fd   = -1; +        int ret  =  0; +        int skfd = -1;          struct sockaddr_in t_saddr;          socklen_t          t_saddr_len = sizeof(t_saddr); @@ -500,117 +316,82 @@ static int ipcp_udp_port_alloc_reply(int src_udp_port,          LOG_DBG("Received reply for flow on udp port %d.",                  ntohs(dst_udp_port)); -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_rdlock(&udp_data.flows_lock);          fd = udp_port_to_fd(dst_udp_port); -        if (fd == -1) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBG("Unknown flow on UDP port %d.", ntohs(dst_udp_port)); -                return -1; /* -EUNKNOWNFLOW */ -        } +        skfd = udp_data.fd_to_uf[fd].skfd; -        if (_ap_instance->flows[fd].state != FLOW_PENDING) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBG("Flow on UDP port %d not pending.", -                         ntohs(dst_udp_port)); -                return -1; /* -EFLOWNOTPENDING */ -        } - -        port_id = _ap_instance->flows[fd].port_id; - -        if (response) { -                _ap_instance->flows[fd].port_id = -1; -                _ap_instance->flows[fd].rb      = NULL; -                shm_ap_rbuff_close(_ap_instance->flows[fd].rb); -                _ap_instance->flows[fd].state   = FLOW_NULL; -        } else { -                /* get the original address with the LISTEN PORT */ -                if (getpeername(fd, -                                (struct sockaddr *) &t_saddr, -                                &t_saddr_len) < 0) { -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); -                        LOG_DBG("Flow with port_id %d has no peer.", port_id); -                        return -1; -                } +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -                /* connect to the flow udp port */ -                t_saddr.sin_port = src_udp_port; +        /* get the original address with the LISTEN PORT */ +        if (getpeername(skfd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { +                LOG_DBG("Flow with fd %d has no peer.", fd); +                return -1; +        } -                if (connect(fd, -                            (struct sockaddr *) &t_saddr, -                            sizeof(t_saddr)) < 0) { -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ipcp->state_lock); -                        close(fd); -                        return -1; -                } +        /* connect to the flow udp port */ +        t_saddr.sin_port = src_udp_port; -                _ap_instance->flows[fd].state   = FLOW_ALLOCATED; +        if (connect(skfd, (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { +                close(skfd); +                return -1;          } -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_rdlock(&udp_data.flows_lock); +        set_fd(skfd); -        if ((ret = ipcp_flow_alloc_reply(getpid(), -                                         port_id, -                                         response)) < 0) { -                return -1; /* -EPIPE */ -        } +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); + +        if (ipcp_flow_alloc_reply(fd, response) < 0) +                return -1; -        LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).", +        LOG_DBG("Flow allocation completed, UDP ports: (%d, %d).",                   ntohs(dst_udp_port), ntohs(src_udp_port));          return ret; -  }  static int ipcp_udp_flow_dealloc_req(int udp_port)  { -        int fd      = -1; -        int port_id = -1; - -        struct shm_ap_rbuff * rb; +        int skfd = -1; +        int fd   = -1; -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(&udp_data.flows_lock);          fd = udp_port_to_fd(udp_port);          if (fd < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(&udp_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_DBG("Could not find flow on UDP port %d.",                           ntohs(udp_port));                  return 0;          } -        clr_fd(fd); - -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        skfd = udp_data.fd_to_uf[fd].skfd; -        _ap_instance->flows[fd].state   = FLOW_NULL; -        port_id = _ap_instance->flows[fd].port_id; -        _ap_instance->flows[fd].port_id = -1; -        rb = _ap_instance->flows[fd].rb; -        _ap_instance->flows[fd].rb      = NULL; +        udp_data.uf_to_fd[skfd]    = -1; +        udp_data.fd_to_uf[fd].udp  = -1; +        udp_data.fd_to_uf[fd].skfd = -1; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_rdlock(&udp_data.flows_lock); -        if (rb != NULL) -                shm_ap_rbuff_close(rb); +        clr_fd(skfd); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        irm_flow_dealloc(port_id); +        flow_dealloc(fd); -        close(fd); +        close(skfd); -        LOG_DBG("Flow with port_id %d deallocated.", port_id); +        LOG_DBG("Flow with fd %d deallocated.", fd);          return 0;  } @@ -619,39 +400,28 @@ static void * ipcp_udp_listener()  {          uint8_t buf[SHIM_UDP_MSG_SIZE];          int  n = 0; -          struct sockaddr_in c_saddr; +        int sfd = udp_data.s_fd;          while (true) { -                int sfd = 0;                  shim_udp_msg_t * msg = NULL; -                pthread_rwlock_rdlock(&_ipcp->state_lock); - -                sfd = shim_data(_ipcp)->s_fd; - -                pthread_rwlock_unlock(&_ipcp->state_lock); -                  memset(&buf, 0, SHIM_UDP_MSG_SIZE);                  n = sizeof(c_saddr);                  n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0,                               (struct sockaddr *) &c_saddr, (unsigned *) &n); - -                if (n < 0) { +                if (n < 0)                          continue; -                }                  /* flow alloc request from other host */                  if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr,                                    sizeof(c_saddr.sin_addr.s_addr), AF_INET) -                    == NULL) { +                    == NULL)                          continue; -                }                  msg = shim_udp_msg__unpack(NULL, n, buf); -                if (msg == NULL) { +                if (msg == NULL)                          continue; -                }                  switch (msg->code) {                  case SHIM_UDP_MSG_CODE__FLOW_REQ: @@ -685,103 +455,80 @@ static void * ipcp_udp_listener()  static void * ipcp_udp_sdu_reader()  {          int n; +        int skfd;          int fd; +        /* FIXME: avoid this copy */          char buf[SHIM_UDP_MAX_SDU_SIZE];          struct sockaddr_in r_saddr;          fd_set read_fds;          int flags; +        struct timeval tv = {0, FD_UPDATE_TIMEOUT};          while (true) { -                struct timeval tv = {0, FD_UPDATE_TIMEOUT}; - -                pthread_rwlock_rdlock(&_ipcp->state_lock); -                pthread_rwlock_rdlock(&_ap_instance->flows_lock); -                pthread_mutex_lock(&_ap_instance->fd_set_lock); +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                pthread_rwlock_rdlock(&udp_data.flows_lock); +                pthread_mutex_lock(&udp_data.fd_set_lock); -                read_fds = shim_data(_ipcp)->flow_fd_s; -                _ap_instance->fd_set_mod = false; -                pthread_cond_broadcast(&_ap_instance->fd_set_cond); +                read_fds = udp_data.flow_fd_s; +                udp_data.fd_set_mod = false; +                pthread_cond_broadcast(&udp_data.fd_set_cond); -                pthread_mutex_unlock(&_ap_instance->fd_set_lock); -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_mutex_unlock(&udp_data.fd_set_lock); +                pthread_rwlock_unlock(&udp_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock); -                if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) { +                if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)                          continue; -                } -                for (fd = 0; fd < FD_SETSIZE; ++fd) { -                        if (!FD_ISSET(fd, &read_fds)) +                for (skfd = 0; skfd < FD_SETSIZE; ++skfd) { +                        if (!FD_ISSET(skfd, &read_fds))                                  continue; -                        flags = fcntl(fd, F_GETFL, 0); -                        fcntl(fd, F_SETFL, flags | O_NONBLOCK); - +                        flags = fcntl(skfd, F_GETFL, 0); +                        fcntl(skfd, F_SETFL, flags | O_NONBLOCK); +                        fd = udp_data.uf_to_fd[skfd];                          n = sizeof(r_saddr); -                        if ((n = recvfrom(fd, -                                          buf, +                        if ((n = recvfrom(skfd, +                                          &buf,                                            SHIM_UDP_MAX_SDU_SIZE,                                            0,                                            (struct sockaddr *) &r_saddr,                                            (unsigned *) &n)) <= 0)                                  continue; -                        /* send the sdu to the correct port_id */ -                        ipcp_udp_flow_write(fd, buf, n); +                        /* send the sdu to the correct fd */ +                        flow_write(fd, buf, n);                  }          }          return (void *) 0;  } -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */  static void * ipcp_udp_sdu_loop(void * o)  {          while (true) { -                struct rb_entry * e;                  int fd; -                int len = 0; -                char * buf; +                struct shm_du_buff * sdb; -                e = shm_ap_rbuff_read(_ap_instance->rb); -                if (e == NULL) { +                fd = ipcp_flow_read(&sdb); +                if (fd < 0)                          continue; -                } -                pthread_rwlock_rdlock(&_ipcp->state_lock); +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                pthread_rwlock_rdlock(&udp_data.flows_lock); -                len = shm_rdrbuff_read((uint8_t **) &buf, -                                      _ap_instance->rdrb, -                                      e->index); -                if (len <= 0) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); -                        free(e); -                        continue; -                } - -                pthread_rwlock_rdlock(&_ap_instance->flows_lock); +                fd = udp_data.fd_to_uf[fd].skfd; -                fd = port_id_to_fd(e->port_id); - -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); - -                if (fd == -1) { -                        free(e); -                        continue; -                } +                pthread_rwlock_unlock(&udp_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock); -                if (send(fd, buf, len, 0) < 0) +                if (send(fd, +                         shm_du_buff_head(sdb), +                         shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), +                         0) < 0)                          LOG_ERR("Failed to send SDU."); -                pthread_rwlock_rdlock(&_ipcp->state_lock); - -                if (_ap_instance->rdrb != NULL) -                        shm_rdrbuff_remove(_ap_instance->rdrb, e->index); - -                pthread_rwlock_unlock(&_ipcp->state_lock); - -                free(e); +                ipcp_flow_del(sdb);          }          return (void *) 1; @@ -789,23 +536,16 @@ static void * ipcp_udp_sdu_loop(void * o)  void ipcp_sig_handler(int sig, siginfo_t * info, void * c)  { -        sigset_t  sigset; -        sigemptyset(&sigset); -        sigaddset(&sigset, SIGINT); -          switch(sig) {          case SIGINT:          case SIGTERM:          case SIGHUP:                  if (info->si_pid == irmd_api) { -                        LOG_DBG("IPCP %d terminating by order of %d. Bye.", -                                getpid(), info->si_pid); - -                        pthread_rwlock_wrlock(&_ipcp->state_lock); +                        pthread_rwlock_wrlock(&ipcpi.state_lock); -                        ipcp_set_state(_ipcp, IPCP_SHUTDOWN); +                        ipcp_set_state(IPCP_SHUTDOWN); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                  }          default:                  return; @@ -865,54 +605,52 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)                  LOG_WARN("Failed to set SO_REUSEADDR.");          memset((char *) &s_saddr, 0, sizeof(s_saddr)); -        shim_data(_ipcp)->s_saddr.sin_family      = AF_INET; -        shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr; -        shim_data(_ipcp)->s_saddr.sin_port        = LISTEN_PORT; +        udp_data.s_saddr.sin_family      = AF_INET; +        udp_data.s_saddr.sin_addr.s_addr = conf->ip_addr; +        udp_data.s_saddr.sin_port        = LISTEN_PORT;          if (bind(fd, -                 (struct sockaddr *) &shim_data(_ipcp)->s_saddr, -                 sizeof(shim_data(_ipcp)->s_saddr)) < 0) { +                 (struct sockaddr *) &udp_data.s_saddr, +                 sizeof(udp_data.s_saddr)) < 0) {                  LOG_ERR("Couldn't bind to %s.", ipstr);                  close(fd);                  return -1;          } -        pthread_rwlock_wrlock(&_ipcp->state_lock); +        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_INIT) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_get_state() != IPCP_INIT) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("IPCP in wrong state.");                  close(fd);                  return -1;          } -        shim_data(_ipcp)->s_fd     = fd; -        shim_data(_ipcp)->ip_addr  = conf->ip_addr; -        shim_data(_ipcp)->dns_addr = conf->dns_addr; +        udp_data.s_fd     = fd; +        udp_data.ip_addr  = conf->ip_addr; +        udp_data.dns_addr = conf->dns_addr; -        FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); +        FD_CLR(udp_data.s_fd, &udp_data.flow_fd_s); -        ipcp_set_state(_ipcp, IPCP_ENROLLED); +        ipcp_set_state(IPCP_ENROLLED); -        pthread_create(&_ap_instance->handler, +        pthread_create(&udp_data.handler,                         NULL,                         ipcp_udp_listener,                         NULL); -        pthread_create(&_ap_instance->sdu_reader, +        pthread_create(&udp_data.sdu_reader,                         NULL,                         ipcp_udp_sdu_reader,                         NULL); -        pthread_create(&_ap_instance->sduloop, +        pthread_create(&udp_data.sduloop,                         NULL,                         ipcp_udp_sdu_loop,                         NULL); -        pthread_rwlock_unlock(&_ipcp->state_lock); - -        LOG_DBG("Bootstrapped shim IPCP over UDP with api %d.", -                getpid()); +        pthread_rwlock_unlock(&ipcpi.state_lock); +        LOG_DBG("Bootstrapped shim IPCP over UDP with api %d.", getpid());          LOG_DBG("Bound to IP address %s.", ipstr);          LOG_DBG("DNS server address is %s.", dnsstr); @@ -1059,10 +797,10 @@ static int ipcp_udp_name_reg(char * name)                  return -1;          } -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_data_add_reg_entry(_ipcp->data, name)) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_data_add_reg_entry(ipcpi.data, name)) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to add %s to local registry.", name);                  return -1;          } @@ -1070,12 +808,12 @@ static int ipcp_udp_name_reg(char * name)  #ifdef CONFIG_OUROBOROS_ENABLE_DNS          /* register application with DNS server */ -        dns_addr = shim_data(_ipcp)->dns_addr; +        dns_addr = udp_data.dns_addr; -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          if (dns_addr != 0) { -                ip_addr = shim_data(_ipcp)->ip_addr; +                ip_addr = udp_data.ip_addr;                  if (inet_ntop(AF_INET, &ip_addr,                                ipstr, INET_ADDRSTRLEN) == NULL) { @@ -1091,14 +829,14 @@ static int ipcp_udp_name_reg(char * name)                          dnsstr, name, DNS_TTL, ipstr);                  if (ddns_send(cmd)) { -                        pthread_rwlock_rdlock(&_ipcp->state_lock); -                        ipcp_data_del_reg_entry(_ipcp->data, name); -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                        pthread_rwlock_rdlock(&ipcpi.state_lock); +                        ipcp_data_del_reg_entry(ipcpi.data, name); +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          return -1;                  }          }  #else -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);  #endif          LOG_DBG("Registered %s.", name); @@ -1122,11 +860,11 @@ static int ipcp_udp_name_unreg(char * name)  #ifdef CONFIG_OUROBOROS_ENABLE_DNS          /* unregister application with DNS server */ -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        dns_addr = shim_data(_ipcp)->dns_addr; +        dns_addr = udp_data.dns_addr; -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          if (dns_addr != 0) {                  if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) @@ -1140,17 +878,16 @@ static int ipcp_udp_name_unreg(char * name)          }  #endif -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        ipcp_data_del_reg_entry(_ipcp->data, name); +        ipcp_data_del_reg_entry(ipcpi.data, name); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } -static int ipcp_udp_flow_alloc(pid_t         n_api, -                               int           port_id, +static int ipcp_udp_flow_alloc(int           fd,                                 char *        dst_name,                                 char *        src_ae_name,                                 enum qos_cube qos) @@ -1158,15 +895,13 @@ static int ipcp_udp_flow_alloc(pid_t         n_api,          struct sockaddr_in r_saddr; /* server address */          struct sockaddr_in f_saddr; /* flow */          socklen_t          f_saddr_len = sizeof(f_saddr); -        int                fd; +        int                skfd;          struct hostent *   h;          uint32_t           ip_addr = 0;  #ifdef CONFIG_OUROBOROS_ENABLE_DNS          uint32_t           dns_addr = 0;  #endif -        struct shm_ap_rbuff * rb; - -        LOG_INFO("Allocating flow to %s.", dst_name); +        LOG_DBG("Allocating flow to %s.", dst_name);          if (dst_name == NULL || src_ae_name == NULL)                  return -1; @@ -1179,11 +914,7 @@ static int ipcp_udp_flow_alloc(pid_t         n_api,          if (qos != QOS_CUBE_BE)                  LOG_DBG("QoS requested. UDP/IP can't do that."); -        rb = shm_ap_rbuff_open_s(n_api); -        if (rb == NULL) -                return -1; /* -ENORBUFF */ - -        fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +        skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);          /* this socket is for the flow */          memset((char *) &f_saddr, 0, sizeof(f_saddr)); @@ -1191,31 +922,31 @@ static int ipcp_udp_flow_alloc(pid_t         n_api,          f_saddr.sin_addr.s_addr = local_ip;          f_saddr.sin_port        = 0; -        if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { -                close(fd); +        if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { +                close(skfd);                  return -1;          } -        if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { +        if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) {                  LOG_ERR("Could not get address from fd."); -                close(fd); +                close(skfd);                  return -1;          } -        pthread_rwlock_rdlock(&_ipcp->state_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); -        if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (ipcp_get_state() != IPCP_ENROLLED) { +                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_DBG("Won't allocate flow with non-enrolled IPCP."); -                close(fd); +                close(skfd);                  return -1; /* -ENOTENROLLED */          }  #ifdef CONFIG_OUROBOROS_ENABLE_DNS -        dns_addr = shim_data(_ipcp)->dns_addr; +        dns_addr = udp_data.dns_addr;          if (dns_addr != 0) { -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  ip_addr = ddns_resolve(dst_name, dns_addr);                  if (ip_addr == 0) { @@ -1224,11 +955,11 @@ static int ipcp_udp_flow_alloc(pid_t         n_api,                          return -1;                  } -                pthread_rwlock_rdlock(&_ipcp->state_lock); -                if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { -                        pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                if (ipcp_get_state() != IPCP_ENROLLED) { +                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_DBG("Won't allocate flow with non-enrolled IPCP."); -                        close(fd); +                        close(skfd);                          return -1; /* -ENOTENROLLED */                  }          } else { @@ -1236,7 +967,7 @@ static int ipcp_udp_flow_alloc(pid_t         n_api,                  h = gethostbyname(dst_name);                  if (h == NULL) {                          LOG_DBG("Could not resolve %s.", dst_name); -                        close(fd); +                        close(skfd);                          return -1;                  } @@ -1251,60 +982,46 @@ static int ipcp_udp_flow_alloc(pid_t         n_api,          r_saddr.sin_addr.s_addr = ip_addr;          r_saddr.sin_port        = LISTEN_PORT; -        if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { -                close(fd); +        if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { +                close(skfd);                  return -1;          } -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); - -        _ap_instance->flows[fd].port_id = port_id; -        _ap_instance->flows[fd].state   = FLOW_PENDING; -        _ap_instance->flows[fd].rb      = rb; - -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_wrlock(&udp_data.flows_lock); -        set_fd(fd); +        udp_data.fd_to_uf[fd].udp  = f_saddr.sin_port; +        udp_data.fd_to_uf[fd].skfd = skfd; +        udp_data.uf_to_fd[skfd]    = fd; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); - -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          if (ipcp_udp_port_alloc(ip_addr,                                  f_saddr.sin_port,                                  dst_name,                                  src_ae_name) < 0) { -                pthread_rwlock_rdlock(&_ipcp->state_lock); -                pthread_rwlock_rdlock(&_ap_instance->flows_lock); - -                clr_fd(fd); - -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_wrlock(&_ap_instance->flows_lock); +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                pthread_rwlock_wrlock(&udp_data.flows_lock); -                _ap_instance->flows[fd].port_id = -1; -                _ap_instance->flows[fd].state   = FLOW_NULL; -                shm_ap_rbuff_close(_ap_instance->flows[fd].rb); -                 _ap_instance->flows[fd].rb     = NULL; +                udp_data.fd_to_uf[fd].udp  = -1; +                udp_data.fd_to_uf[fd].skfd = -1; +                udp_data.uf_to_fd[skfd]    = -1; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                close(fd); +                pthread_rwlock_unlock(&udp_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                close(skfd);                  return -1;          } -        LOG_DBG("Flow pending on port_id %d.", port_id); +        LOG_DBG("Flow pending on fd %d, UDP port %d.", +                fd, ntohs(f_saddr.sin_port));          return fd;  } -static int ipcp_udp_flow_alloc_resp(pid_t n_api, -                                    int   port_id, -                                    int   response) +static int ipcp_udp_flow_alloc_resp(int fd, int response)  { -        struct shm_ap_rbuff * rb; -        int fd = -1; +        int skfd = -1;          struct sockaddr_in f_saddr;          struct sockaddr_in r_saddr;          socklen_t len = sizeof(r_saddr); @@ -1312,148 +1029,95 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_api,          if (response)                  return 0; -        pthread_rwlock_rdlock(&_ipcp->state_lock); - -        /* awaken pending flow */ - -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(&udp_data.flows_lock); -        fd = port_id_to_fd(port_id); -        if (fd < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBG("Could not find flow with port_id %d.", port_id); -                return -1; -        } +        skfd = udp_data.fd_to_uf[fd].skfd; -        if (_ap_instance->flows[fd].state != FLOW_PENDING) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBG("Flow was not pending."); +        if (getsockname(skfd, (struct sockaddr *) &f_saddr, &len) < 0) { +                LOG_DBG("Socket with fd %d has no address.", skfd);                  return -1;          } -        rb = shm_ap_rbuff_open_s(n_api); -        if (rb == NULL) { -                LOG_ERR("Could not open N + 1 ringbuffer."); -                _ap_instance->flows[fd].state   = FLOW_NULL; -                _ap_instance->flows[fd].port_id = -1; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +        if (getpeername(skfd, (struct sockaddr *) &r_saddr, &len) < 0) { +                LOG_DBG("Socket with fd %d has no peer.", skfd);                  return -1;          } -        if (getsockname(fd, (struct sockaddr *) &f_saddr, &len) < 0) { -                LOG_DBG("Flow with port_id %d has no socket.", port_id); -                return -1; -        } +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_rdlock(&udp_data.flows_lock); -        if (getpeername(fd, (struct sockaddr *) &r_saddr, &len) < 0) { -                LOG_DBG("Flow with port_id %d has no peer.", port_id); -                return -1; -        } +        set_fd(skfd); -        _ap_instance->flows[fd].state = FLOW_ALLOCATED; -        _ap_instance->flows[fd].rb    = rb; - -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); - -        set_fd(fd); - -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ipcp->state_lock); +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock);          if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr,                                       f_saddr.sin_port,                                       r_saddr.sin_port,                                       response) < 0) { -                pthread_rwlock_rdlock(&_ipcp->state_lock); -                pthread_rwlock_rdlock(&_ap_instance->flows_lock); - -                clr_fd(fd); - -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_wrlock(&_ap_instance->flows_lock); - -                _ap_instance->flows[fd].state = FLOW_NULL; -                shm_ap_rbuff_close(_ap_instance->flows[fd].rb); -                _ap_instance->flows[fd].rb    = NULL; - -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                pthread_rwlock_rdlock(&udp_data.flows_lock); +                clr_fd(skfd); +                pthread_rwlock_unlock(&udp_data.flows_lock); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); +        LOG_DBG("Accepted flow, fd %d on UDP port %d.", +                fd, ntohs(f_saddr.sin_port));          return 0;  } -static int ipcp_udp_flow_dealloc(int port_id) +static int ipcp_udp_flow_dealloc(int fd)  { -        int fd = -1; +        int skfd = -1;          int remote_udp = -1; -        struct shm_ap_rbuff * rb;          struct sockaddr_in    r_saddr;          socklen_t             r_saddr_len = sizeof(r_saddr); -        pthread_rwlock_rdlock(&_ipcp->state_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(&udp_data.flows_lock); -        fd = port_id_to_fd(port_id); -        if (fd < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBG("Could not find flow with port_id %d.", port_id); -                return 0; -        } +        skfd = udp_data.fd_to_uf[fd].skfd; -        clr_fd(fd); +        udp_data.uf_to_fd[skfd]    = -1; +        udp_data.fd_to_uf[fd].udp  = -1; +        udp_data.fd_to_uf[fd].skfd = -1; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_rdlock(&udp_data.flows_lock); -        _ap_instance->flows[fd].state   = FLOW_NULL; -        _ap_instance->flows[fd].port_id = -1; -        rb = _ap_instance->flows[fd].rb; -        _ap_instance->flows[fd].rb      = NULL; +        clr_fd(skfd); -        pthread_rwlock_unlock(&_ap_instance->flows_lock); +        pthread_rwlock_unlock(&udp_data.flows_lock); +        pthread_rwlock_unlock(&ipcpi.state_lock); -        if (rb != NULL) -                shm_ap_rbuff_close(rb); - -        if (getpeername(fd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) { -                pthread_rwlock_unlock(&_ipcp->state_lock); -                LOG_DBG("Flow with port_id %d has no peer.", port_id); -                close(fd); +        if (getpeername(skfd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) { +                LOG_DBG("Socket with fd %d has no peer.", skfd); +                close(skfd);                  return 0;          }          remote_udp       = r_saddr.sin_port;          r_saddr.sin_port = LISTEN_PORT; -        if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { -                pthread_rwlock_unlock(&_ipcp->state_lock); -                close(fd); +        if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { +                close(skfd);                  return 0 ;          } -        if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr, -                                  remote_udp) < 0) { +        if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr, remote_udp) < 0) {                  LOG_DBG("Could not notify remote."); -                pthread_rwlock_unlock(&_ipcp->state_lock); -                close(fd); +                close(skfd);                  return 0;          } -        pthread_rwlock_unlock(&_ipcp->state_lock); - -        close(fd); +        close(skfd); -        LOG_DBG("Flow with port_id %d deallocated.", port_id); +        LOG_DBG("Flow with fd %d deallocated.", fd);          return 0;  } @@ -1468,31 +1132,6 @@ static struct ipcp_ops udp_ops = {          .ipcp_flow_dealloc    = ipcp_udp_flow_dealloc  }; -static struct ipcp * ipcp_udp_create() -{ -        struct ipcp * i; -        struct ipcp_udp_data * data; - -        i = ipcp_instance_create(); -        if (i == NULL) -                return NULL; - -        data = ipcp_udp_data_create(); -        if (data == NULL) { -                free(i); -                return NULL; -        } - -        i->data = (struct ipcp_data *) data; -        i->ops  = &udp_ops; - -        i->state = IPCP_INIT; - -        return i; -} - -#ifndef MAKE_CHECK -  int main(int argc, char * argv[])  {          struct sigaction sig_act; @@ -1508,7 +1147,9 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        if (shim_ap_init() < 0) { +        udp_data_init(); + +        if (ap_init(NULL) < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } @@ -1528,17 +1169,13 @@ int main(int argc, char * argv[])          sigaction(SIGHUP,  &sig_act, NULL);          sigaction(SIGPIPE, &sig_act, NULL); -        _ipcp = ipcp_udp_create(); -        if (_ipcp == NULL) { -                LOG_ERR("Failed to create IPCP."); +        pthread_sigmask(SIG_BLOCK, &sigset, NULL); + +        if (ipcp_init(THIS_TYPE, &udp_ops) < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } -        pthread_sigmask(SIG_BLOCK, &sigset, NULL); - -        pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); -          pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);          if (ipcp_create_r(getpid())) { @@ -1547,24 +1184,21 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        pthread_join(_ap_instance->mainloop, NULL); +        ipcp_fini(); -        pthread_cancel(_ap_instance->handler); -        pthread_cancel(_ap_instance->sdu_reader); -        pthread_cancel(_ap_instance->sduloop); +        pthread_cancel(udp_data.handler); +        pthread_cancel(udp_data.sdu_reader); +        pthread_cancel(udp_data.sduloop); -        pthread_join(_ap_instance->sduloop, NULL); -        pthread_join(_ap_instance->handler, NULL); -        pthread_join(_ap_instance->sdu_reader, NULL); +        pthread_join(udp_data.sduloop, NULL); +        pthread_join(udp_data.handler, NULL); +        pthread_join(udp_data.sdu_reader, NULL); -        shim_ap_fini(); +        ap_fini(); -        ipcp_data_destroy(_ipcp->data); -        free(_ipcp); +        udp_data_fini();          close_logfile();          exit(EXIT_SUCCESS);  } - -#endif /* MAKE_CHECK */ diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt index 05919326..16b53414 100644 --- a/src/irmd/CMakeLists.txt +++ b/src/irmd/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCE_FILES          # Add source files here          api_table.c          apn_table.c +        ipcp.c          irm_flow.c          main.c          registry.c diff --git a/src/lib/ipcp.c b/src/irmd/ipcp.c index 01741121..f79e6caf 100644 --- a/src/lib/ipcp.c +++ b/src/irmd/ipcp.c @@ -20,16 +20,17 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#define OUROBOROS_PREFIX "lib-ipcp" -  #include <ouroboros/config.h>  #include <ouroboros/errno.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/common.h> -#include <ouroboros/logs.h>  #include <ouroboros/utils.h>  #include <ouroboros/sockets.h> +#define OUROBOROS_PREFIX "irmd/ipcp" + +#include <ouroboros/logs.h> + +#include "ipcp.h" +  #include <stdlib.h>  #include <string.h>  #include <signal.h> @@ -42,11 +43,10 @@  static void close_ptr(void * o)  { -        close(*((int *) o)); +        close(*(int *) o);  } -static ipcp_msg_t * send_recv_ipcp_msg(pid_t api, -                                       ipcp_msg_t * msg) +ipcp_msg_t * send_recv_ipcp_msg(pid_t api, ipcp_msg_t * msg)  {         int sockfd = 0;         buffer_t buf; @@ -177,31 +177,6 @@ pid_t ipcp_create(enum ipcp_type ipcp_type)          exit(EXIT_FAILURE);  } -int ipcp_create_r(pid_t api) -{ -        irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int ret = -1; - -        msg.code         = IRM_MSG_CODE__IPCP_CREATE_R; -        msg.has_api      = true; -        msg.api          = api; - -        recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) -                return -1; - -        if (recv_msg->has_result == false) { -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ret = recv_msg->result; -        irm_msg__free_unpacked(recv_msg, NULL); - -        return ret; -} -  int ipcp_destroy(pid_t api)  {          int status; @@ -399,68 +374,6 @@ int ipcp_flow_alloc_resp(pid_t api,          return ret;  } -int ipcp_flow_req_arr(pid_t  api, -                      char * dst_name, -                      char * src_ae_name) -{ -        irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int port_id = -1; - -        if (dst_name == NULL || src_ae_name == NULL) -                return -EINVAL; - -        msg.code          = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; -        msg.has_api       = true; -        msg.api           = api; -        msg.dst_name      = dst_name; -        msg.ae_name       = src_ae_name; - -        recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) -                return -1; - -        if (!recv_msg->has_port_id) { -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        port_id = recv_msg->port_id; -        irm_msg__free_unpacked(recv_msg, NULL); - -        return port_id; -} - -int ipcp_flow_alloc_reply(pid_t api, -                          int   port_id, -                          int   response) -{ -        irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int ret = -1; - -        msg.code         = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; -        msg.port_id      = port_id; -        msg.has_port_id  = true; -        msg.response     = response; -        msg.has_response = true; - -        recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) -                return -1; - -        if (recv_msg->has_result == false) { -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ret = recv_msg->result; -        irm_msg__free_unpacked(recv_msg, NULL); - -        return ret; -} - -  int ipcp_flow_dealloc(pid_t api,                        int   port_id)  { @@ -487,28 +400,3 @@ int ipcp_flow_dealloc(pid_t api,          return ret;  } - -int irm_flow_dealloc(int port_id) -{ -        irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int ret = -1; - -        msg.code        = IRM_MSG_CODE__IPCP_FLOW_DEALLOC; -        msg.has_port_id = true; -        msg.port_id     = port_id; - -        recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) -                return 0; - -        if (recv_msg->has_result == false) { -                irm_msg__free_unpacked(recv_msg, NULL); -                return 0; -        } - -        ret = recv_msg->result; -        irm_msg__free_unpacked(recv_msg, NULL); - -        return ret; -} diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h new file mode 100644 index 00000000..930695fa --- /dev/null +++ b/src/irmd/ipcp.h @@ -0,0 +1,62 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * The API for the IRM to instruct IPCPs + * + *    Sander Vrijders <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/irm_config.h> +#include <ouroboros/sockets.h> +#include <ouroboros/shared.h> + +#include <sys/types.h> + +#ifndef OUROBOROS_IPCP_H +#define OUROBOROS_IPCP_H + +/* Returns the process id */ +pid_t ipcp_create(enum ipcp_type ipcp_type); + +int   ipcp_destroy(pid_t api); + +int   ipcp_enroll(pid_t  api, +                  char * dif_name); + +int   ipcp_bootstrap(pid_t              api, +                     dif_config_msg_t * conf); + +int   ipcp_name_reg(pid_t  api, +                    char * name); +int   ipcp_name_unreg(pid_t  api, +                      char * name); + +int   ipcp_flow_alloc(pid_t         api, +                      int           port_id, +                      pid_t         n_api, +                      char *        dst_name, +                      char *        src_ae_name, +                      enum qos_cube qos); +int   ipcp_flow_alloc_resp(pid_t api, +                           int   port_id, +                           pid_t n_api, +                           int   response); + +int   ipcp_flow_dealloc(pid_t api, +                        int   port_id); + +#endif /* OUROBOROS_IPCP_H */ diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index d9fe3fb3..b99c6f97 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -58,6 +58,11 @@ void irm_flow_destroy(struct irm_flow * f)  {          pthread_mutex_lock(&f->state_lock); +        if (f->state == FLOW_DESTROY) { +                pthread_mutex_unlock(&f->state_lock); +                return; +        } +          if (f->state == FLOW_PENDING)                  f->state = FLOW_DESTROY;          else @@ -75,3 +80,45 @@ void irm_flow_destroy(struct irm_flow * f)          free(f);  } + +enum flow_state irm_flow_get_state(struct irm_flow * f) +{ +        enum flow_state state; + +        pthread_mutex_lock(&f->state_lock); + +        state = f->state; + +        pthread_mutex_unlock(&f->state_lock); + +        return state; +} + +void irm_flow_set_state(struct irm_flow * f, enum flow_state state) +{ +        pthread_mutex_lock(&f->state_lock); + +        f->state = state; +        pthread_cond_broadcast(&f->state_cond); + +        pthread_mutex_unlock(&f->state_lock); +} + +enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state) +{ +        pthread_mutex_lock(&f->state_lock); + +        while (!(f->state == state || f->state == FLOW_DESTROY)) +                pthread_cond_wait(&f->state_cond, &f->state_lock); + +        if (state == FLOW_DESTROY) { +                f->state = FLOW_NULL; +                pthread_cond_broadcast(&f->state_cond); +        } + +        state = f->state; + +        pthread_mutex_unlock(&f->state_lock); + +        return state; +} diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index b7e5a1be..db6598bf 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -24,12 +24,18 @@  #define OUROBOROS_IRMD_IRM_FLOW_H  #include <ouroboros/list.h> -#include <ouroboros/shared.h>  #include <sys/types.h>  #include <pthread.h>  #include <time.h> +enum flow_state { +        FLOW_NULL = 0, +        FLOW_PENDING, +        FLOW_ALLOCATED, +        FLOW_DESTROY +}; +  struct irm_flow {          struct list_head next; @@ -46,6 +52,16 @@ struct irm_flow {  };  struct irm_flow * irm_flow_create(); +  void              irm_flow_destroy(struct irm_flow * f); +enum flow_state   irm_flow_get_state(struct irm_flow * f); + + +void              irm_flow_set_state(struct irm_flow * f, +                                     enum flow_state   state); + +enum flow_state   irm_flow_wait_state(struct irm_flow * f, +                                      enum flow_state   state); +  #endif /* OUROBOROS_IRMD_IRM_FLOW_H */ diff --git a/src/irmd/main.c b/src/irmd/main.c index cc9160bf..523741ef 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -21,14 +21,9 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#define OUROBOROS_PREFIX "irmd" -  #include <ouroboros/config.h>  #include <ouroboros/errno.h> -#include <ouroboros/logs.h>  #include <ouroboros/sockets.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/nsm.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h>  #include <ouroboros/irm_config.h> @@ -36,14 +31,19 @@  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/bitmap.h> -#include <ouroboros/flow.h>  #include <ouroboros/qos.h>  #include <ouroboros/time_utils.h> +#define OUROBOROS_PREFIX "irmd" + +#include <ouroboros/logs.h> + +  #include "utils.h"  #include "registry.h"  #include "irm_flow.h"  #include "api_table.h" +#include "ipcp.h"  #include <sys/socket.h>  #include <sys/un.h> @@ -60,10 +60,12 @@  struct ipcp_entry {          struct list_head next; +          char *           name;          pid_t            api;          enum ipcp_type   type;          char *           dif_name; +          pthread_cond_t   init_cond;          pthread_mutex_t  init_lock;          bool             init; @@ -100,7 +102,7 @@ struct irm {          pthread_t            irm_sanitize;          pthread_t            shm_sanitize; -} * irmd = NULL; +} * irmd;  static struct irm_flow * get_irm_flow(int port_id)  { @@ -108,7 +110,6 @@ static struct irm_flow * get_irm_flow(int port_id)          list_for_each(pos, &irmd->irm_flows) {                  struct irm_flow * e = list_entry(pos, struct irm_flow, next); -                  if (e->port_id == port_id)                          return e;          } @@ -122,7 +123,6 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api)          list_for_each(pos, &irmd->irm_flows) {                  struct irm_flow * e = list_entry(pos, struct irm_flow, next); -                  if (e->n_api == n_api)                          return e;          } @@ -965,8 +965,7 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name)                  return NULL;          } -        LOG_INFO("New instance (%d) of %s added.", api, e->apn); - +        LOG_DBG("New instance (%d) of %s added.", api, e->apn);          LOG_DBG("This instance accepts flows for:");          list_for_each(p, &e->names) {                  struct str_el * s = list_entry(p, struct str_el, next); @@ -1053,8 +1052,8 @@ static int flow_alloc_resp(pid_t n_api,          struct api_entry * e  = NULL;          int ret = -1; -        pid_t f_n_1_api; -        pid_t f_n_api; +        pid_t api_n1; +        pid_t api_n;          pthread_rwlock_rdlock(&irmd->state_lock); @@ -1107,21 +1106,17 @@ static int flow_alloc_resp(pid_t n_api,                  return -1;          } -        f_n_api   = f->n_api; -        f_n_1_api = f->n_1_api; - -        if (!response) { -                f->state = FLOW_ALLOCATED; -                pthread_cond_signal(&f->state_cond); -        } +        api_n  = f->n_api; +        api_n1 = f->n_1_api;          pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_unlock(&irmd->state_lock); -        ret = ipcp_flow_alloc_resp(f_n_1_api, -                                   port_id, -                                   f_n_api, -                                   response); +        ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response); + +        if (!(response || ret)) +                irm_flow_set_state(f, FLOW_ALLOCATED); +          return ret;  } @@ -1132,6 +1127,7 @@ static struct irm_flow * flow_alloc(pid_t  api,  {          struct irm_flow * f;          pid_t ipcp; +        int port_id;          /* FIXME: Map qos_spec to qos_cube */ @@ -1151,6 +1147,7 @@ static struct irm_flow * flow_alloc(pid_t  api,          f->n_api = api;          f->state = FLOW_PENDING; +          if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)                  LOG_WARN("Failed to set timestamp."); @@ -1167,7 +1164,7 @@ static struct irm_flow * flow_alloc(pid_t  api,          pthread_rwlock_unlock(&irmd->reg_lock);          pthread_rwlock_wrlock(&irmd->flows_lock); -        f->port_id = bmp_allocate(irmd->port_ids); +        port_id = f->port_id = bmp_allocate(irmd->port_ids);          f->n_1_api = ipcp;          list_add(&f->next, &irmd->irm_flows); @@ -1175,19 +1172,15 @@ static struct irm_flow * flow_alloc(pid_t  api,          pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_unlock(&irmd->state_lock); -        if (ipcp_flow_alloc(ipcp, -                            f->port_id, -                            f->n_api, -                            dst_name, -                            src_ae_name, -                            QOS_CUBE_BE) < 0) { +        if (ipcp_flow_alloc(ipcp, port_id, api, +                            dst_name, src_ae_name, QOS_CUBE_BE) < 0) {                  pthread_rwlock_rdlock(&irmd->state_lock);                  pthread_rwlock_wrlock(&irmd->flows_lock);                  list_del(&f->next);                  bmp_release(irmd->port_ids, f->port_id);                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock); -                free(f); +                irm_flow_destroy(f);                  return NULL;          } @@ -1208,20 +1201,20 @@ static int flow_alloc_res(int port_id)          f = get_irm_flow(port_id);          if (f == NULL) { -                LOG_ERR("Could not find port %d.", port_id);                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock); +                LOG_ERR("Could not find port %d.", port_id);                  return -1;          } -        if (f->state == FLOW_NULL) { -                LOG_INFO("Port %d is deprecated.", port_id); +        if (irm_flow_get_state(f) == FLOW_NULL) {                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock); +                LOG_INFO("Port %d is deprecated.", port_id);                  return -1;          } -        if (f->state == FLOW_ALLOCATED) { +        if (irm_flow_get_state(f) == FLOW_ALLOCATED) {                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  return 0; @@ -1230,35 +1223,13 @@ static int flow_alloc_res(int port_id)          pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_unlock(&irmd->state_lock); -        pthread_mutex_lock(&f->state_lock); - -        while (f->state == FLOW_PENDING) -                pthread_cond_wait(&f->state_cond, &f->state_lock); - -        pthread_mutex_unlock(&f->state_lock); - -        pthread_rwlock_rdlock(&irmd->state_lock); -        pthread_rwlock_wrlock(&irmd->flows_lock); -        pthread_mutex_lock(&f->state_lock); - -        if (f->state == FLOW_ALLOCATED) { -                pthread_cond_broadcast(&f->state_cond); -                pthread_mutex_unlock(&f->state_lock); -                pthread_rwlock_unlock(&irmd->flows_lock); -                pthread_rwlock_unlock(&irmd->state_lock); +        if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED)                  return 0; -        } - -        f->state = FLOW_NULL; -        pthread_cond_broadcast(&f->state_cond); -        pthread_mutex_unlock(&f->state_lock); -        pthread_rwlock_unlock(&irmd->flows_lock); -        pthread_rwlock_unlock(&irmd->state_lock);          return -1;  } -static int flow_dealloc(int port_id) +static int flow_dealloc(pid_t api, int port_id)  {          pid_t n_1_api;          int   ret = 0; @@ -1282,7 +1253,8 @@ static int flow_dealloc(int port_id)          pthread_rwlock_unlock(&irmd->flows_lock); -        ret = ipcp_flow_dealloc(n_1_api, port_id); +        if (api != n_1_api) +                ret = ipcp_flow_dealloc(n_1_api, port_id);          pthread_rwlock_unlock(&irmd->state_lock); @@ -1340,6 +1312,9 @@ static struct irm_flow * flow_req_arr(pid_t  api,          struct pid_el * c_api;          pid_t h_api = -1; +        LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.", +                 api, dst_name, ae_name); +          f = irm_flow_create();          if (f == NULL) {                  LOG_ERR("Failed to create irm_flow."); @@ -1490,8 +1465,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,          return f;  } -static int flow_alloc_reply(int port_id, -                            int response) +static int flow_alloc_reply(int port_id, int response)  {          struct irm_flow * f; @@ -1505,18 +1479,10 @@ static int flow_alloc_reply(int port_id,                  return -1;          } -        pthread_mutex_lock(&f->state_lock); -          if (!response) -                f->state = FLOW_ALLOCATED; - +                irm_flow_set_state(f, FLOW_ALLOCATED);          else -                f->state = FLOW_NULL; - -        if (pthread_cond_signal(&f->state_cond)) -                LOG_ERR("Failed to send signal."); - -        pthread_mutex_unlock(&f->state_lock); +                irm_flow_set_state(f, FLOW_NULL);          pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_unlock(&irmd->state_lock); @@ -1524,30 +1490,6 @@ static int flow_alloc_reply(int port_id,          return 0;  } -static int flow_dealloc_ipcp(int port_id) -{ -        struct irm_flow * f = NULL; - -        pthread_rwlock_rdlock(&irmd->state_lock); -        pthread_rwlock_wrlock(&irmd->flows_lock); - -        f = get_irm_flow(port_id); -        if (f == NULL) { -                pthread_rwlock_unlock(&irmd->flows_lock); -                pthread_rwlock_unlock(&irmd->state_lock); -                return 0; -        } - -        list_del(&f->next); - -        pthread_rwlock_unlock(&irmd->flows_lock); -        pthread_rwlock_unlock(&irmd->state_lock); - -        irm_flow_destroy(f); - -        return 0; -} -  static void irm_destroy()  {          struct list_head * p; @@ -1729,46 +1671,35 @@ void * irm_sanitize()                          struct irm_flow * f =                                  list_entry(p, struct irm_flow, next); -                        pthread_mutex_lock(&f->state_lock); - -                        if (f->state == FLOW_PENDING && -                            ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { +                        if (irm_flow_get_state(f) == FLOW_PENDING +                            && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {                                  LOG_INFO("Pending port_id %d timed out.",                                           f->port_id); -                                f->state = FLOW_NULL; -                                pthread_cond_signal(&f->state_cond); -                                pthread_mutex_unlock(&f->state_lock); +                                irm_flow_set_state(f, FLOW_NULL);                                  continue;                          } -                        pthread_mutex_unlock(&f->state_lock); -                          if (kill(f->n_api, 0) < 0) { -                                struct shm_ap_rbuff * n_rb = -                                        shm_ap_rbuff_open_s(f->n_api); +                                struct shm_ap_rbuff * rb = +                                        shm_ap_rbuff_open(f->n_api);                                  bmp_release(irmd->port_ids, f->port_id); -                                  list_del(&f->next);                                  LOG_INFO("AP-I %d gone, flow %d deallocated.",                                           f->n_api, f->port_id);                                  ipcp_flow_dealloc(f->n_1_api, f->port_id); -                                if (n_rb != NULL) -                                        shm_ap_rbuff_destroy(n_rb); +                                if (rb != NULL) +                                        shm_ap_rbuff_destroy(rb);                                  irm_flow_destroy(f);                                  continue;                          }                          if (kill(f->n_1_api, 0) < 0) { -                                struct shm_ap_rbuff * n_1_rb_s = -                                        shm_ap_rbuff_open_s(f->n_1_api); -                                struct shm_ap_rbuff * n_1_rb_n = -                                        shm_ap_rbuff_open_n(f->n_1_api); +                                struct shm_ap_rbuff * rb = +                                        shm_ap_rbuff_open(f->n_1_api);                                  list_del(&f->next);                                  LOG_ERR("IPCP %d gone, flow %d removed.",                                          f->n_1_api, f->port_id); -                                if (n_1_rb_n != NULL) -                                        shm_ap_rbuff_destroy(n_1_rb_n); -                                if (n_1_rb_s != NULL) -                                        shm_ap_rbuff_destroy(n_1_rb_s); +                                if (rb != NULL) +                                        shm_ap_rbuff_destroy(rb);                                  irm_flow_destroy(f);                          }                  } @@ -1939,7 +1870,7 @@ void * mainloop()                          break;                  case IRM_MSG_CODE__IRM_FLOW_DEALLOC:                          ret_msg.has_result = true; -                        ret_msg.result = flow_dealloc(msg->port_id); +                        ret_msg.result = flow_dealloc(msg->api, msg->port_id);                          break;                  case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:                          e = flow_req_arr(msg->api, @@ -1950,7 +1881,6 @@ void * mainloop()                                  ret_msg.result = -1;                                  break;                          } -                        /* FIXME: badly timed dealloc may give SEGV */                          ret_msg.has_port_id = true;                          ret_msg.port_id     = e->port_id;                          ret_msg.has_api     = true; @@ -1961,10 +1891,6 @@ void * mainloop()                          ret_msg.result = flow_alloc_reply(msg->port_id,                                                            msg->response);                          break; -                case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: -                        ret_msg.has_result = true; -                        ret_msg.result = flow_dealloc_ipcp(msg->port_id); -                        break;                  default:                          LOG_ERR("Don't know that message code.");                          break; diff --git a/src/irmd/utils.h b/src/irmd/utils.h index 37c745af..2fbc8ef2 100644 --- a/src/irmd/utils.h +++ b/src/irmd/utils.h @@ -40,7 +40,8 @@ struct pid_el {          pid_t            pid;  }; -int wildcard_match(const char * pattern, const char * string); +int     wildcard_match(const char * pattern, +                       const char * string);  /* functions for copying and destroying arguments list */  char ** argvdup(char ** argv); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 14e7051a..b94d0eea 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -30,7 +30,6 @@ set(SOURCE_FILES    bitmap.c    cdap.c    dev.c -  ipcp.c    irm.c    list.c    lockfile.c diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 8b1b3bc6..92a05221 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -24,6 +24,7 @@  #include <ouroboros/cdap.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/dev.h> +#include <ouroboros/fcntl.h>  #include <stdlib.h>  #include <pthread.h> diff --git a/src/lib/dev.c b/src/lib/dev.c index 391563da..178ee287 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -24,6 +24,7 @@  #include <ouroboros/errno.h>  #include <ouroboros/dev.h>  #include <ouroboros/sockets.h> +#include <ouroboros/fcntl.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h> @@ -41,6 +42,87 @@ struct flow_set {          pthread_rwlock_t lock;  }; +enum port_state { +        PORT_NULL = 0, +        PORT_ID_PENDING, +        PORT_ID_ASSIGNED, +        PORT_DESTROY +}; + +struct port { +        int             fd; + +        enum port_state state; +        pthread_mutex_t state_lock; +        pthread_cond_t  state_cond; +}; + +static void port_destroy(struct port * p) +{ +        pthread_mutex_lock(&p->state_lock); + +        if (p->state == PORT_DESTROY) { +                pthread_mutex_unlock(&p->state_lock); +                return; +        } + +        if (p->state == PORT_ID_PENDING) +                p->state = PORT_DESTROY; +        else +                p->state = PORT_NULL; + +        pthread_cond_signal(&p->state_cond); + +        while (p->state != PORT_NULL) +                pthread_cond_wait(&p->state_cond, &p->state_lock); + +        p->fd = -1; +        p->state = PORT_ID_PENDING; + +        pthread_mutex_unlock(&p->state_lock); +} + +static void port_set_state(struct port * p, enum port_state state) +{ +        pthread_mutex_lock(&p->state_lock); + +        if (p->state == PORT_DESTROY) { +                pthread_mutex_unlock(&p->state_lock); +                return; +        } + +        p->state = state; +        pthread_cond_broadcast(&p->state_cond); + +        pthread_mutex_unlock(&p->state_lock); +} + +enum port_state port_wait_assign(struct port * p) +{ +        enum port_state state; + +        pthread_mutex_lock(&p->state_lock); + +        if (p->state != PORT_ID_PENDING) { +                pthread_mutex_unlock(&p->state_lock); +                return -1; +        } + +        while (!(p->state == PORT_ID_ASSIGNED || p->state == PORT_DESTROY)) +                pthread_cond_wait(&p->state_cond, &p->state_lock); + +        if (p->state == PORT_DESTROY) { +                p->state = PORT_NULL; +                pthread_cond_broadcast(&p->state_cond); +        } + +        state = p->state; + +        pthread_mutex_unlock(&p->state_lock); + +        return state; +} +  struct flow {          struct shm_ap_rbuff * rb;          int                   port_id; @@ -48,24 +130,24 @@ struct flow {          pid_t                 api; -        struct timespec *     timeout; +        struct timespec       timeout;  }; -struct ap_instance { +struct {          char *                ap_name;          char *                daf_name;          pid_t                 api;          struct shm_rdrbuff *  rdrb; -        struct bmp *          fds;          struct shm_ap_rbuff * rb;          pthread_rwlock_t      data_lock; -        struct flow           flows[AP_MAX_FLOWS]; -        int                   ports[AP_MAX_FLOWS]; +        struct bmp *          fds; +        struct flow *         flows; +        struct port *         ports;          pthread_rwlock_t      flows_lock; -} * ai; +} ai;  static int api_announce(char * ap_name)  { @@ -76,12 +158,12 @@ static int api_announce(char * ap_name)          msg.code    = IRM_MSG_CODE__IRM_API_ANNOUNCE;          msg.has_api = true; -        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai.data_lock); -        msg.api = ai->api; +        msg.api = ai.api;          msg.ap_name = ap_name; -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.data_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { @@ -104,47 +186,61 @@ int ap_init(char * ap_name)          ap_name = path_strip(ap_name); -        ai = malloc(sizeof(*ai)); -        if (ai == NULL) { -                return -ENOMEM; -        } - -        ai->api = getpid(); -        ai->ap_name = ap_name; -        ai->daf_name = NULL; +        ai.api = getpid(); +        ai.ap_name = ap_name; +        ai.daf_name = NULL; -        ai->fds = bmp_create(AP_MAX_FLOWS, 0); -        if (ai->fds == NULL) { -                free(ai); +        ai.fds = bmp_create(AP_MAX_FLOWS, 0); +        if (ai.fds == NULL)                  return -ENOMEM; + +        ai.rdrb = shm_rdrbuff_open(); +        if (ai.rdrb == NULL) { +                bmp_destroy(ai.fds); +                return -1;          } -        ai->rdrb = shm_rdrbuff_open(); -        if (ai->rdrb == NULL) { -                bmp_destroy(ai->fds); -                free(ai); +        ai.rb = shm_ap_rbuff_create(); +        if (ai.rb == NULL) { +                shm_rdrbuff_close(ai.rdrb); +                bmp_destroy(ai.fds);                  return -1;          } -        ai->rb = shm_ap_rbuff_create_s(); -        if (ai->rb == NULL) { -                shm_rdrbuff_close(ai->rdrb); -                bmp_destroy(ai->fds); -                free(ai); +        ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS); +        if (ai.flows == NULL) { +                shm_ap_rbuff_destroy(ai.rb); +                shm_rdrbuff_close(ai.rdrb); +                bmp_destroy(ai.fds);                  return -1;          }          for (i = 0; i < AP_MAX_FLOWS; ++i) { -                ai->flows[i].rb = NULL; -                ai->flows[i].port_id = -1; -                ai->flows[i].oflags = 0; -                ai->flows[i].api = -1; -                ai->flows[i].timeout = NULL; -                ai->ports[i] = -1; +                ai.flows[i].rb = NULL; +                ai.flows[i].port_id = -1; +                ai.flows[i].oflags = 0; +                ai.flows[i].api = -1; +                ai.flows[i].timeout.tv_sec  = 0; +                ai.flows[i].timeout.tv_nsec = 0;          } -        pthread_rwlock_init(&ai->flows_lock, NULL); -        pthread_rwlock_init(&ai->data_lock, NULL); +        ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); +        if (ai.flows == NULL) { +                free(ai.flows); +                shm_ap_rbuff_destroy(ai.rb); +                shm_rdrbuff_close(ai.rdrb); +                bmp_destroy(ai.fds); +                return -1; +        } + +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) { +                ai.ports[i].state = PORT_ID_PENDING; +                pthread_mutex_init(&ai.ports[i].state_lock, NULL); +                pthread_cond_init(&ai.ports[i].state_cond, NULL); +        } + +        pthread_rwlock_init(&ai.flows_lock, NULL); +        pthread_rwlock_init(&ai.data_lock, NULL);          if (ap_name != NULL)                  return api_announce(ap_name); @@ -152,46 +248,49 @@ int ap_init(char * ap_name)          return 0;  } -void ap_fini(void) +void ap_fini()  {          int i = 0; -        if (ai == NULL) -                return; - -        pthread_rwlock_wrlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai.data_lock);          /* remove all remaining sdus */ -        while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0) -                shm_rdrbuff_remove(ai->rdrb, i); +        while ((i = shm_ap_rbuff_peek_idx(ai.rb)) >= 0) +                shm_rdrbuff_remove(ai.rdrb, i); -        if (ai->fds != NULL) -                bmp_destroy(ai->fds); -        if (ai->rb != NULL) -                shm_ap_rbuff_destroy(ai->rb); -        if (ai->rdrb != NULL) -                shm_rdrbuff_close(ai->rdrb); +        if (ai.fds != NULL) +                bmp_destroy(ai.fds); +        if (ai.rb != NULL) +                shm_ap_rbuff_destroy(ai.rb); +        if (ai.rdrb != NULL) +                shm_rdrbuff_close(ai.rdrb); -        if (ai->daf_name != NULL) -                free(ai->daf_name); +        if (ai.daf_name != NULL) +                free(ai.daf_name); -        pthread_rwlock_rdlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); -        for (i = 0; i < AP_MAX_FLOWS; ++i) { -                if (ai->flows[i].rb != NULL) -                        shm_ap_rbuff_close(ai->flows[i].rb); -                ai->ports[ai->flows[i].port_id] = -1; +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                if (ai.flows[i].rb != NULL) +                        shm_ap_rbuff_close(ai.flows[i].rb); + +        for (i = 0; i < IRMD_MAX_FLOWS; ++i) { +                ai.ports[i].state = PORT_NULL; +                pthread_mutex_destroy(&ai.ports[i].state_lock); +                pthread_cond_destroy(&ai.ports[i].state_cond);          } -        pthread_rwlock_unlock(&ai->flows_lock); -        pthread_rwlock_unlock(&ai->data_lock); +        free(ai.flows); +        free(ai.ports); -        pthread_rwlock_destroy(&ai->flows_lock); -        pthread_rwlock_destroy(&ai->data_lock); +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); -        free(ai); +        pthread_rwlock_destroy(&ai.flows_lock); +        pthread_rwlock_destroy(&ai.data_lock);  } +  int flow_accept(char ** ae_name)  {          irm_msg_t msg = IRM_MSG__INIT; @@ -201,11 +300,11 @@ int flow_accept(char ** ae_name)          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_api = true; -        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai.data_lock); -        msg.api     = ai->api; +        msg.api     = ai.api; -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.data_lock);          recv_msg = send_recv_irm_msg_b(&msg);          if (recv_msg == NULL) @@ -216,22 +315,22 @@ int flow_accept(char ** ae_name)                  return -1;          } -        pthread_rwlock_rdlock(&ai->data_lock); -        pthread_rwlock_wrlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); -        fd = bmp_allocate(ai->fds); -        if (!bmp_is_id_valid(ai->fds, fd)) { -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        fd = bmp_allocate(ai.fds); +        if (!bmp_is_id_valid(ai.fds, fd)) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); -        if (ai->flows[fd].rb == NULL) { -                bmp_release(ai->fds, fd); -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); +        if (ai.flows[fd].rb == NULL) { +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -239,31 +338,31 @@ int flow_accept(char ** ae_name)          if (ae_name != NULL) {                  *ae_name = strdup(recv_msg->ae_name);                  if (*ae_name == NULL) { -                        shm_ap_rbuff_close(ai->flows[fd].rb); -                        bmp_release(ai->fds, fd); -                        pthread_rwlock_unlock(&ai->flows_lock); -                        pthread_rwlock_unlock(&ai->data_lock); +                        shm_ap_rbuff_close(ai.flows[fd].rb); +                        bmp_release(ai.fds, fd); +                        pthread_rwlock_unlock(&ai.flows_lock); +                        pthread_rwlock_unlock(&ai.data_lock);                          irm_msg__free_unpacked(recv_msg, NULL);                          return -ENOMEM;                  }          } -        ai->flows[fd].port_id = recv_msg->port_id; -        ai->flows[fd].oflags  = FLOW_O_DEFAULT; -        ai->flows[fd].api     = recv_msg->api; +        ai.flows[fd].port_id = recv_msg->port_id; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT; +        ai.flows[fd].api     = recv_msg->api; -        ai->ports[recv_msg->port_id] = fd; +        ai.ports[recv_msg->port_id].fd    = fd; +        ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; -        pthread_rwlock_unlock(&ai->flows_lock); -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock);          irm_msg__free_unpacked(recv_msg, NULL);          return fd;  } -int flow_alloc_resp(int fd, -                    int response) +int flow_alloc_resp(int fd, int response)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; @@ -274,49 +373,47 @@ int flow_alloc_resp(int fd,          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;          msg.has_api      = true; -        msg.api          = ai->api; +        msg.api          = ai.api;          msg.has_port_id  = true; -        pthread_rwlock_rdlock(&ai->data_lock); -        pthread_rwlock_rdlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -ENOTALLOC;          } -        msg.port_id      = ai->flows[fd].port_id; +        msg.port_id      = ai.flows[fd].port_id; -        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai.flows_lock);          msg.has_response = true;          msg.response     = response;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { -                pthread_rwlock_unlock(&ai->data_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -1;          }          if (!recv_msg->has_result) { -                pthread_rwlock_unlock(&ai->data_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ret = recv_msg->result; -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.data_lock);          irm_msg__free_unpacked(recv_msg, NULL);          return ret;  } -int flow_alloc(char * dst_name, -               char * src_ae_name, -               struct qos_spec * qos) +int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; @@ -333,11 +430,11 @@ int flow_alloc(char * dst_name,          msg.ae_name     = src_ae_name;          msg.has_api     = true; -        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai.data_lock); -        msg.api         = ai->api; +        msg.api         = ai.api; -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.data_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { @@ -349,34 +446,35 @@ int flow_alloc(char * dst_name,                  return -1;          } -        pthread_rwlock_rdlock(&ai->data_lock); -        pthread_rwlock_wrlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); -        fd = bmp_allocate(ai->fds); -        if (!bmp_is_id_valid(ai->fds, fd)) { -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        fd = bmp_allocate(ai.fds); +        if (!bmp_is_id_valid(ai.fds, fd)) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); -        if (ai->flows[fd].rb == NULL) { -                bmp_release(ai->fds, fd); -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); +        if (ai.flows[fd].rb == NULL) { +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        ai->flows[fd].port_id = recv_msg->port_id; -        ai->flows[fd].oflags  = FLOW_O_DEFAULT; -        ai->flows[fd].api     = recv_msg->api; +        ai.flows[fd].port_id = recv_msg->port_id; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT; +        ai.flows[fd].api     = recv_msg->api; -        ai->ports[recv_msg->port_id] = fd; +        ai.ports[recv_msg->port_id].fd    = fd; +        ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; -        pthread_rwlock_unlock(&ai->flows_lock); -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -395,19 +493,19 @@ int flow_alloc_res(int fd)          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;          msg.has_port_id  = true; -        pthread_rwlock_rdlock(&ai->data_lock); -        pthread_rwlock_rdlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -ENOTALLOC;          } -        msg.port_id = ai->flows[fd].port_id; +        msg.port_id = ai.flows[fd].port_id; -        pthread_rwlock_unlock(&ai->flows_lock); -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock);          recv_msg = send_recv_irm_msg_b(&msg);          if (recv_msg == NULL) { @@ -437,43 +535,43 @@ int flow_dealloc(int fd)          msg.has_api      = true;          msg.api          = getpid(); -        pthread_rwlock_rdlock(&ai->data_lock); -        pthread_rwlock_wrlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); -        if (ai->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -ENOTALLOC;          } -        msg.port_id = ai->flows[fd].port_id; +        msg.port_id = ai.flows[fd].port_id; -        ai->ports[msg.port_id] = -1; +        port_destroy(&ai.ports[msg.port_id]); -        ai->flows[fd].port_id = -1; -        shm_ap_rbuff_close(ai->flows[fd].rb); -        ai->flows[fd].rb = NULL; -        ai->flows[fd].api = -1; +        ai.flows[fd].port_id = -1; +        shm_ap_rbuff_close(ai.flows[fd].rb); +        ai.flows[fd].rb = NULL; +        ai.flows[fd].api = -1; -        bmp_release(ai->fds, fd); +        bmp_release(ai.fds, fd); -        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai.flows_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { -                pthread_rwlock_unlock(&ai->data_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -1;          }          if (!recv_msg->has_result) { -                pthread_rwlock_unlock(&ai->data_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ret = recv_msg->result; -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -487,30 +585,30 @@ int flow_cntl(int fd, int cmd, int oflags)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&ai->data_lock); -        pthread_rwlock_wrlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); -        if (ai->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -ENOTALLOC;          } -        old = ai->flows[fd].oflags; +        old = ai.flows[fd].oflags;          switch (cmd) {          case FLOW_F_GETFL: /* GET FLOW FLAGS */ -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return old;          case FLOW_F_SETFL: /* SET FLOW FLAGS */ -                ai->flows[fd].oflags = oflags; -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +                ai.flows[fd].oflags = oflags; +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return old;          default: -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return FLOW_O_INVALID; /* unknown command */          }  } @@ -526,62 +624,62 @@ ssize_t flow_write(int fd, void * buf, size_t count)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&ai->data_lock); -        pthread_rwlock_rdlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -ENOTALLOC;          } -        if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_rdrbuff_write(ai->rdrb, -                                       ai->flows[fd].api, +        if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { +                idx = shm_rdrbuff_write(ai.rdrb, +                                       ai.flows[fd].api,                                         DU_BUFF_HEADSPACE,                                         DU_BUFF_TAILSPACE, -                                       (uint8_t *) buf, +                                       buf,                                         count);                  if (idx == -1) { -                        pthread_rwlock_unlock(&ai->flows_lock); -                        pthread_rwlock_unlock(&ai->data_lock); +                        pthread_rwlock_unlock(&ai.flows_lock); +                        pthread_rwlock_unlock(&ai.data_lock);                          return -EAGAIN;                  }                  e.index   = idx; -                e.port_id = ai->flows[fd].port_id; +                e.port_id = ai.flows[fd].port_id; -                if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) { -                        shm_rdrbuff_remove(ai->rdrb, idx); -                        pthread_rwlock_unlock(&ai->flows_lock); -                        pthread_rwlock_unlock(&ai->data_lock); +                if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { +                        shm_rdrbuff_remove(ai.rdrb, idx); +                        pthread_rwlock_unlock(&ai.flows_lock); +                        pthread_rwlock_unlock(&ai.data_lock);                          return -1;                  }          } else { /* blocking */ -                struct shm_rdrbuff * rdrb = ai->rdrb; -                pid_t                api = ai->flows[fd].api; -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +                struct shm_rdrbuff * rdrb = ai.rdrb; +                pid_t                api = ai.flows[fd].api; +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  idx = shm_rdrbuff_write_b(rdrb, -                                         api, -                                         DU_BUFF_HEADSPACE, -                                         DU_BUFF_TAILSPACE, -                                         (uint8_t *) buf, -                                         count); +                                          api, +                                          DU_BUFF_HEADSPACE, +                                          DU_BUFF_TAILSPACE, +                                          buf, +                                          count); -                pthread_rwlock_rdlock(&ai->data_lock); -                pthread_rwlock_rdlock(&ai->flows_lock); +                pthread_rwlock_rdlock(&ai.data_lock); +                pthread_rwlock_rdlock(&ai.flows_lock);                  e.index   = idx; -                e.port_id = ai->flows[fd].port_id; +                e.port_id = ai.flows[fd].port_id; -                while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) +                while (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0)                          ;          } -        pthread_rwlock_unlock(&ai->flows_lock); -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock);          return 0;  } @@ -595,47 +693,44 @@ ssize_t flow_read(int fd, void * buf, size_t count)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&ai->data_lock); -        pthread_rwlock_rdlock(&ai->flows_lock); +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -ENOTALLOC;          } -        if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_ap_rbuff_read_port(ai->rb, -                                             ai->flows[fd].port_id); -                pthread_rwlock_unlock(&ai->flows_lock); +        if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { +                idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id); +                pthread_rwlock_unlock(&ai.flows_lock);          } else { -                struct shm_ap_rbuff * rb      = ai->rb; -                int                   port_id = ai->flows[fd].port_id; -                struct timespec *     timeout = ai->flows[fd].timeout; -                pthread_rwlock_unlock(&ai->flows_lock); -                pthread_rwlock_unlock(&ai->data_lock); - -                idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); - -                pthread_rwlock_rdlock(&ai->data_lock); +                struct shm_ap_rbuff * rb      = ai.rb; +                int                   port_id = ai.flows[fd].port_id; +                struct timespec       timeout = ai.flows[fd].timeout; +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                idx = shm_ap_rbuff_read_port_b(rb, port_id, &timeout); +                pthread_rwlock_rdlock(&ai.data_lock);          }          if (idx < 0) { -                pthread_rwlock_unlock(&ai->data_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -EAGAIN;          } -        n = shm_rdrbuff_read(&sdu, ai->rdrb, idx); +        n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);          if (n < 0) { -                pthread_rwlock_unlock(&ai->data_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -1;          }          memcpy(buf, sdu, MIN(n, count)); -        shm_rdrbuff_remove(ai->rdrb, idx); +        shm_rdrbuff_remove(ai.rdrb, idx); -        pthread_rwlock_unlock(&ai->data_lock); +        pthread_rwlock_unlock(&ai.data_lock);          return n;  } @@ -671,7 +766,7 @@ void flow_set_zero(struct flow_set * set)  void flow_set_add(struct flow_set * set, int fd)  {          pthread_rwlock_wrlock(&set->lock); -        set->b[ai->flows[fd].port_id] = true; +        set->b[ai.flows[fd].port_id] = true;          set->dirty = true;          pthread_rwlock_unlock(&set->lock);  } @@ -679,7 +774,7 @@ void flow_set_add(struct flow_set * set, int fd)  void flow_set_del(struct flow_set * set, int fd)  {          pthread_rwlock_wrlock(&set->lock); -        set->b[ai->flows[fd].port_id] = false; +        set->b[ai.flows[fd].port_id] = false;          set->dirty = true;          pthread_rwlock_unlock(&set->lock);  } @@ -688,7 +783,7 @@ bool flow_set_has(struct flow_set * set, int fd)  {          bool ret;          pthread_rwlock_rdlock(&set->lock); -        ret = set->b[ai->flows[fd].port_id]; +        ret = set->b[ai.flows[fd].port_id];          pthread_rwlock_unlock(&set->lock);          return ret;  } @@ -712,12 +807,324 @@ int flow_select(struct flow_set * set, const struct timespec * timeout)  {          int port_id;          if (set == NULL) { -                port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout); +                port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout);          } else {                  flow_set_cpy(set); -                port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout); +                port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout);          }          if (port_id < 0)                  return port_id; -        return ai->ports[port_id]; +        return ai.ports[port_id].fd; +} + +/* ipcp-dev functions */ + +int np1_flow_alloc(pid_t n_api, int port_id) +{ +        int fd; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        fd = bmp_allocate(ai.fds); +        if (!bmp_is_id_valid(ai.fds, fd)) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].rb = shm_ap_rbuff_open(n_api); +        if (ai.flows[fd].rb == NULL) { +                bmp_release(ai.fds, fd); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].port_id = port_id; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT; +        ai.flows[fd].api     = n_api; + +        ai.ports[port_id].fd = fd; +        port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return fd; +} + +int np1_flow_dealloc(int port_id) +{ +        int fd; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        fd = ai.ports[port_id].fd; +        if (fd < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return fd; +        } + +        ai.flows[fd].port_id = -1; +        shm_ap_rbuff_close(ai.flows[fd].rb); +        ai.flows[fd].rb = NULL; +        ai.flows[fd].api = -1; + +        bmp_release(ai.fds, fd); + +        port_destroy(&ai.ports[port_id]); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return fd; +} + + +int np1_flow_resp(pid_t n_api, int port_id) +{ +        int fd; +        struct shm_ap_rbuff * rb; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        port_wait_assign(&ai.ports[port_id]); + +        fd = ai.ports[port_id].fd; +        if (fd < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return fd; +        } + +        rb = shm_ap_rbuff_open(n_api); +        if (rb == NULL) { +                ai.flows[fd].port_id = -1; +                port_destroy(&ai.ports[port_id]); +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; +        } + +        ai.flows[fd].rb = rb; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return fd; +} + +int ipcp_create_r(pid_t api) +{ +        irm_msg_t msg = IRM_MSG__INIT; +        irm_msg_t * recv_msg = NULL; +        int ret = -1; + +        msg.code         = IRM_MSG_CODE__IPCP_CREATE_R; +        msg.has_api      = true; +        msg.api          = api; + +        recv_msg = send_recv_irm_msg(&msg); +        if (recv_msg == NULL) +                return -1; + +        if (recv_msg->has_result == false) { +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        ret = recv_msg->result; +        irm_msg__free_unpacked(recv_msg, NULL); + +        return ret; +} + +int ipcp_flow_req_arr(pid_t  api, char * dst_name, char * src_ae_name) +{ +        irm_msg_t msg = IRM_MSG__INIT; +        irm_msg_t * recv_msg = NULL; +        int port_id = -1; +        int fd = -1; + +        if (dst_name == NULL || src_ae_name == NULL) +                return -EINVAL; + +        msg.code          = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; +        msg.has_api       = true; +        msg.api           = api; +        msg.dst_name      = dst_name; +        msg.ae_name       = src_ae_name; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        fd = bmp_allocate(ai.fds); +        if (!bmp_is_id_valid(ai.fds, fd)) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -1; /* -ENOMOREFDS */ +        } + +        ai.flows[fd].rb    = NULL; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        recv_msg = send_recv_irm_msg(&msg); +        if (recv_msg == NULL) +                return -1; + +        if (!recv_msg->has_port_id) { +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        port_id = recv_msg->port_id; +        irm_msg__free_unpacked(recv_msg, NULL); +        if (port_id < 0) +                return -1; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        ai.flows[fd].port_id = port_id; +        ai.flows[fd].rb      = NULL; + +        ai.ports[port_id].fd = fd; +        port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return fd; +} + +int ipcp_flow_alloc_reply(int fd, int response) +{ +        irm_msg_t msg = IRM_MSG__INIT; +        irm_msg_t * recv_msg = NULL; +        int ret = -1; + +        msg.code         = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; +        msg.has_port_id  = true; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); +        msg.port_id      = ai.flows[fd].port_id; +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        msg.has_response = true; +        msg.response     = response; + +        recv_msg = send_recv_irm_msg(&msg); +        if (recv_msg == NULL) +                return -1; + +        if (recv_msg->has_result == false) { +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        ret = recv_msg->result; +        irm_msg__free_unpacked(recv_msg, NULL); + +        return ret; +} + +int ipcp_flow_read(struct shm_du_buff ** sdb) +{ +        int fd; +        struct rb_entry * e; + +        e = shm_ap_rbuff_read(ai.rb); + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        fd = ai.ports[e->port_id].fd; + +        *sdb = shm_rdrbuff_get(ai.rdrb, e->index); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return fd; +} + +int ipcp_flow_write(int fd, struct shm_du_buff * sdb) +{ +        struct rb_entry e; + +        if (sdb == NULL) +                return -EINVAL; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        if (ai.flows[fd].rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -EPERM; +        } + +        e.index = shm_du_buff_get_idx(sdb); +        e.port_id = ai.flows[fd].port_id; + +        shm_ap_rbuff_write(ai.flows[fd].rb, &e); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return 0; +} + +int local_flow_read(struct rb_entry * e) +{ +        int fd; + +        *e = *(shm_ap_rbuff_read(ai.rb)); + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        fd = ai.ports[e->port_id].fd; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return fd; +} + +int local_flow_write(int fd, struct rb_entry * e) +{ +        if (e == NULL) +                return -EINVAL; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); + +        if (ai.flows[fd].rb == NULL) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -EPERM; +        } + +        e->port_id = ai.flows[fd].port_id; + +        shm_ap_rbuff_write(ai.flows[fd].rb, e); + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return 0; +} + +void ipcp_flow_del(struct shm_du_buff * sdb) +{ +        shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));  } diff --git a/src/lib/irm.c b/src/lib/irm.c index fce11ba5..c4c6395b 100644 --- a/src/lib/irm.c +++ b/src/lib/irm.c @@ -25,7 +25,7 @@  #include <ouroboros/config.h>  #include <ouroboros/errno.h>  #include <ouroboros/irm.h> -#include <ouroboros/common.h> +#include <ouroboros/utils.h>  #include <ouroboros/logs.h>  #include <ouroboros/sockets.h> diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 7a634201..61c27d01 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -43,8 +43,7 @@ enum irm_msg_code {          IRM_FLOW_DEALLOC      = 18;          IPCP_FLOW_REQ_ARR     = 19;          IPCP_FLOW_ALLOC_REPLY = 20; -        IPCP_FLOW_DEALLOC     = 21; -        IRM_REPLY             = 22; +        IRM_REPLY             = 21;  };  message irm_msg { diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index d9e332fe..184a1bf2 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -21,14 +21,14 @@   */  #include <ouroboros/config.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h>  #include <ouroboros/errno.h>  #define OUROBOROS_PREFIX "shm_ap_rbuff"  #include <ouroboros/logs.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/lockfile.h> -#include <ouroboros/time_utils.h>  #include <pthread.h>  #include <sys/mman.h> @@ -41,8 +41,6 @@  #include <sys/stat.h>  #define FN_MAX_CHARS 255 -#define NORTH false -#define SOUTH true  #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry)         \                               + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \ @@ -63,11 +61,10 @@ struct shm_ap_rbuff {          pthread_cond_t *  add;         /* SDU arrived */          pthread_cond_t *  del;         /* SDU removed */          pid_t             api;         /* api to which this rb belongs */ -        bool              dir;         /* direction, false = N */          int               fd;  }; -static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) +struct shm_ap_rbuff * shm_ap_rbuff_create()  {          struct shm_ap_rbuff * rb;          int                   shm_fd; @@ -77,10 +74,7 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)          char                  fn[FN_MAX_CHARS];          mode_t                mask; -        if (dir == SOUTH) -                sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid()); -        else -                sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid()); +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());          rb = malloc(sizeof(*rb));          if (rb == NULL) { @@ -157,22 +151,18 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)          rb->fd  = shm_fd;          rb->api = getpid(); -        rb->dir = dir;          return rb;  } -static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir) +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)  {          struct shm_ap_rbuff * rb;          int                   shm_fd;          struct rb_entry *     shm_base;          char                  fn[FN_MAX_CHARS]; -        if (dir == SOUTH) -                sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api); -        else -                sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api); +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api);          rb = malloc(sizeof(*rb));          if (rb == NULL) { @@ -215,31 +205,10 @@ static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)          rb->fd = shm_fd;          rb->api = api; -        rb->dir = dir;          return rb;  } -struct shm_ap_rbuff * shm_ap_rbuff_create_n() -{ -        return shm_ap_rbuff_create(NORTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_create_s() -{ -        return shm_ap_rbuff_create(SOUTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api) -{ -        return shm_ap_rbuff_open(api, NORTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api) -{ -        return shm_ap_rbuff_open(api, SOUTH); -} -  void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)  {          if (rb == NULL) { @@ -285,10 +254,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)          if (close(rb->fd) < 0)                  LOG_DBG("Couldn't close shared memory."); -        if (rb->dir == SOUTH) -                sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api); -        else -                sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api); +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api);          if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)                  LOG_DBG("Couldn't unmap shared memory."); diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index bf5c7f16..fb58a4d6 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -24,7 +24,6 @@  #include <ouroboros/config.h>  #include <ouroboros/errno.h>  #include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/time_utils.h>  #include <pthread.h> @@ -35,6 +34,7 @@  #include <string.h>  #include <signal.h>  #include <sys/stat.h> +#include <stdbool.h>  #define OUROBOROS_PREFIX "shm_rdrbuff" @@ -76,6 +76,7 @@ struct shm_du_buff {          size_t du_head;          size_t du_tail;          pid_t  dst_api; +        size_t idx;  };  struct shm_rdrbuff { @@ -458,7 +459,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,  #endif          int                  sz = size + sizeof *sdb;          uint8_t *            write_pos; -        ssize_t              idx = -1;          if (rdrb == NULL || data == NULL) {                  LOG_DBGF("Bogus input, bugging out."); @@ -505,6 +505,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,                  sdb->dst_api = -1;                  sdb->du_head = 0;                  sdb->du_tail = 0; +                sdb->idx     = *rdrb->ptr_head;                  *rdrb->ptr_head = 0;          } @@ -521,7 +522,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,          memcpy(write_pos, data, len); -        idx = *rdrb->ptr_head; +        sdb->idx = *rdrb->ptr_head;  #ifdef SHM_RDRB_MULTI_BLOCK          *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);  #else @@ -529,7 +530,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,  #endif          pthread_mutex_unlock(rdrb->lock); -        return idx; +        return sdb->idx;  }  ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, @@ -547,7 +548,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,  #endif          int                  sz = size + sizeof *sdb;          uint8_t *            write_pos; -        ssize_t              idx = -1;          if (rdrb == NULL || data == NULL) {                  LOG_DBGF("Bogus input, bugging out."); @@ -596,6 +596,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,                  sdb->dst_api = -1;                  sdb->du_head = 0;                  sdb->du_tail = 0; +                sdb->idx     = *rdrb->ptr_head;                  *rdrb->ptr_head = 0;          } @@ -612,7 +613,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,          memcpy(write_pos, data, len); -        idx = *rdrb->ptr_head; +        sdb->idx = *rdrb->ptr_head;  #ifdef SHM_RDRB_MULTI_BLOCK          *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);  #else @@ -620,7 +621,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,  #endif          pthread_cleanup_pop(true); -        return idx; +        return sdb->idx;  }  int shm_rdrbuff_read(uint8_t **           dst, @@ -654,6 +655,32 @@ int shm_rdrbuff_read(uint8_t **           dst,          return len;  } +struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, ssize_t idx) +{ +        struct shm_du_buff * sdb; + +        if (idx > SHM_BUFFER_SIZE) +                return NULL; +#ifdef __APPLE__ +        pthread_mutex_lock(rdrb->lock); +#else +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } +#endif +        if (shm_rdrb_empty(rdrb)) { +                pthread_mutex_unlock(rdrb->lock); +                return NULL; +        } + +        sdb = idx_to_du_buff_ptr(rdrb, idx); + +        pthread_mutex_unlock(rdrb->lock); + +        return sdb; +} +  int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx)  {          if (idx > SHM_BUFFER_SIZE) @@ -688,6 +715,11 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx)          return 0;  } +size_t shm_du_buff_get_idx(struct shm_du_buff * sdb) +{ +        return sdb->idx; +} +  uint8_t * shm_du_buff_head(struct shm_du_buff * sdb)  {          if (sdb == NULL) diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 751c61b2..408e79e7 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -25,7 +25,6 @@  #include <ouroboros/config.h>  #include <ouroboros/errno.h>  #include <ouroboros/logs.h> -#include <ouroboros/common.h>  #include <ouroboros/sockets.h>  #include <ouroboros/utils.h> @@ -102,13 +101,12 @@ int server_socket_open(char * file_name)          return sockfd;  } -void close_ptr(void * o) +static void close_ptr(void * o)  {          close(*(int *) o);  } -static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, -                                           bool timed) +static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed)  {          int sockfd;          buffer_t buf; diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c index 8eff4a4c..c5664d8b 100644 --- a/src/tools/cbr/cbr_server.c +++ b/src/tools/cbr/cbr_server.c @@ -21,6 +21,10 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ +#include <ouroboros/dev.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/fcntl.h> +  #include <stdbool.h>  #ifdef __FreeBSD__ @@ -32,9 +36,6 @@  #include <stdlib.h>  #include <pthread.h> -#include <ouroboros/dev.h> -#include <ouroboros/time_utils.h> -  #define THREADS_SIZE 10  pthread_t       listen_thread; diff --git a/src/tools/irm/irm.c b/src/tools/irm/irm.c index c260feb9..a674c7ba 100644 --- a/src/tools/irm/irm.c +++ b/src/tools/irm/irm.c @@ -20,7 +20,6 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#include <ouroboros/common.h>  #include <ouroboros/irm.h>  #include <stdio.h>  #include <string.h> diff --git a/src/tools/irm/irm_utils.c b/src/tools/irm/irm_utils.c index feb8ac98..41a1e811 100644 --- a/src/tools/irm/irm_utils.c +++ b/src/tools/irm/irm_utils.c @@ -23,7 +23,6 @@  #include <string.h>  #include <stdbool.h>  #include <stdlib.h> -#include <ouroboros/common.h>  #include "irm_utils.h" diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 3a254984..47b40118 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -22,7 +22,7 @@   */  #include <ouroboros/dev.h> -#include <ouroboros/errno.h> +#include <ouroboros/fcntl.h>  #include <ouroboros/time_utils.h>  #ifdef __FreeBSD__ @@ -34,6 +34,7 @@  #include <sys/time.h>  #include <arpa/inet.h>  #include <math.h> +#include <errno.h>  #include <float.h>  void shutdown_client(int signo, siginfo_t * info, void * c) | 
