diff options
| -rw-r--r-- | src/ipcpd/ipcp.c | 34 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.h | 6 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 455 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 9 | 
4 files changed, 390 insertions, 114 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 13632a80..060178bf 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -29,6 +29,22 @@  #define OUROBOROS_PREFIX "ipcpd/ipcp"  #include <ouroboros/logs.h> +struct ipcp * ipcp_instance_create() +{ +        struct ipcp * i = malloc(sizeof *i); +        if (i == NULL) +                return NULL; + +        i->data    = NULL; +        i->ops     = NULL; +        i->irmd_fd = -1; +        i->state   = IPCP_INIT; + +        rw_lock_init(&i->state_lock); + +        return i; +} +  int ipcp_arg_check(int argc, char * argv[])  {          if (argc != 3) @@ -52,25 +68,33 @@ void * ipcp_main_loop(void * o)          uint8_t buf[IPCP_MSG_BUF_SIZE];          struct ipcp * _ipcp = (struct ipcp *) o; -        ipcp_msg_t *    msg; -        ssize_t         count; -        buffer_t        buffer; -        ipcp_msg_t      ret_msg = IPCP_MSG__INIT; +        ipcp_msg_t * msg; +        ssize_t      count; +        buffer_t     buffer; +        ipcp_msg_t   ret_msg = IPCP_MSG__INIT;          dif_config_msg_t * conf_msg;          struct dif_config  conf; +        char * sock_path; +          if (_ipcp == NULL) {                  LOG_ERR("Invalid ipcp struct.");                  return (void *) 1;          } -        sockfd = server_socket_open(ipcp_sock_path(getpid())); +        sock_path = ipcp_sock_path(getpid()); +        if (sock_path == NULL) +                return (void *) 1; + +        sockfd = server_socket_open(sock_path);          if (sockfd < 0) {                  LOG_ERR("Could not open server socket.");                  return (void *) 1;          } +        free(sock_path); +          while (true) {                  ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 393af994..c9002d4d 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -23,6 +23,8 @@  #ifndef IPCPD_IPCP_H  #define IPCPD_IPCP_H +#include <ouroboros/rw_lock.h> +  #include "ipcp-ops.h"  #include "ipcp-data.h" @@ -38,11 +40,13 @@ enum ipcp_state {  struct ipcp {          struct ipcp_data * data;          struct ipcp_ops *  ops; +        int                irmd_fd;          enum ipcp_state    state; -        int                irmd_fd; +        rw_lock_t          state_lock;  }; +struct ipcp * ipcp_instance_create();  void * ipcp_main_loop(void * o);  void * ipcp_sdu_loop(void * o);  int ipcp_arg_check(int argc, char * argv[]); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 14a698ee..3296540e 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -33,6 +33,7 @@  #include <ouroboros/sockets.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/dev.h> +#include <ouroboros/rw_lock.h>  #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -81,18 +82,25 @@ struct shim_ap_data {          instance_name_t *     api;          struct shm_du_map *   dum;          struct bmp *          fds; -          struct shm_ap_rbuff * rb; +        rw_lock_t             data_lock; +          struct flow           flows[AP_MAX_FLOWS]; +        rw_lock_t             flows_lock; + +        pthread_t             mainloop; +        pthread_t             sduloop; +        pthread_t             handler; +        pthread_t             sdu_reader; + +        rw_lock_t             thread_lock; -        pthread_t mainloop; -        pthread_t sduloop; -        pthread_t handler; -        pthread_t sdu_reader;  } * _ap_instance;  static int shim_ap_init(char * ap_name)  { +        int i; +          _ap_instance = malloc(sizeof(struct shim_ap_data));          if (_ap_instance == NULL) {                  return -1; @@ -130,11 +138,22 @@ static int shim_ap_init(char * ap_name)          _ap_instance->rb = shm_ap_rbuff_create();          if (_ap_instance->rb == NULL) {                  instance_name_destroy(_ap_instance->api); +                shm_du_map_close(_ap_instance->dum);                  bmp_destroy(_ap_instance->fds);                  free(_ap_instance);                  return -1;          } +        for (i = 0; i < AP_MAX_FLOWS; i ++) { +                _ap_instance->flows[i].rb = NULL; +                _ap_instance->flows[i].port_id = -1; +                _ap_instance->flows[i].state = FLOW_NULL; +        } + +        rw_lock_init(&_ap_instance->flows_lock); +        rw_lock_init(&_ap_instance->thread_lock); +        rw_lock_init(&_ap_instance->data_lock); +          return 0;  } @@ -144,6 +163,9 @@ void shim_ap_fini()          if (_ap_instance == NULL)                  return; + +        rw_lock_wrlock(&_ap_instance->data_lock); +          if (_ap_instance->api != NULL)                  instance_name_destroy(_ap_instance->api);          if (_ap_instance->fds != NULL) @@ -152,41 +174,76 @@ void shim_ap_fini()                  shm_du_map_close(_ap_instance->dum);          if (_ap_instance->rb != NULL)                  shm_ap_rbuff_destroy(_ap_instance->rb); + +        rw_lock_wrlock(&_ap_instance->flows_lock); +          for (i = 0; i < AP_MAX_FLOWS; i ++)                  if (_ap_instance->flows[i].rb != NULL)                          shm_ap_rbuff_close(_ap_instance->flows[i].rb); +        rw_lock_unlock(&_ap_instance->flows_lock); + +        rw_lock_unlock(&_ap_instance->data_lock); +          free(_ap_instance);  }  static int port_id_to_fd(int port_id)  {          int i; -        for (i = 0; i < AP_MAX_FLOWS; ++i) + +        rw_lock_rdlock(&_ap_instance->flows_lock); + +        for (i = 0; i < AP_MAX_FLOWS; ++i) {                  if (_ap_instance->flows[i].port_id == port_id -                        && _ap_instance->flows[i].state != FLOW_NULL) +                    && _ap_instance->flows[i].state != FLOW_NULL) { + +                        rw_lock_unlock(&_ap_instance->flows_lock); +                          return i; +                } +        } + +        rw_lock_unlock(&_ap_instance->flows_lock); +          return -1;  }  static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)  {          /* the AP chooses the amount of headspace and tailspace */ -        size_t index = shm_create_du_buff(_ap_instance->dum, -                                          count, -                                          0, -                                          buf, -                                          count); -        struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; - -        if (index == -1) +        size_t index; +        struct rb_entry e; + +        rw_lock_rdlock(&_ap_instance->data_lock); + +        index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count); + +        if (index == -1) { +                rw_lock_unlock(&_ap_instance->data_lock);                  return -1; +        } + +        e.index = index; + +        rw_lock_rdlock(&_ap_instance->flows_lock); + +        e.port_id = _ap_instance->flows[fd].port_id;          if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { +                rw_lock_unlock(&_ap_instance->flows_lock); +                  shm_release_du_buff(_ap_instance->dum, index); + +                rw_lock_unlock(&_ap_instance->data_lock); +                  return -EPIPE;          } +        rw_lock_unlock(&_ap_instance->flows_lock); + +        rw_lock_unlock(&_ap_instance->data_lock); +          return 0;  } @@ -206,8 +263,7 @@ struct ipcp_udp_data {          int                s_fd;          fd_set flow_fd_s; - -        pthread_mutex_t lock; +        rw_lock_t fd_lock;  };  struct ipcp_udp_data * ipcp_udp_data_create() @@ -229,6 +285,8 @@ struct ipcp_udp_data * ipcp_udp_data_create()                  return NULL;          } +        rw_lock_init(&udp_data->fd_lock); +          FD_ZERO(&udp_data->flow_fd_s);          return udp_data; @@ -236,21 +294,49 @@ struct ipcp_udp_data * ipcp_udp_data_create()  void ipcp_sig_handler(int sig, siginfo_t * info, void * c)  { +        sigset_t  sigset; +        sigemptyset(&sigset); +        sigaddset(&sigset, SIGINT); +        bool clean_threads = false; +          switch(sig) {          case SIGINT:          case SIGTERM:          case SIGHUP:                  if (info->si_pid == irmd_pid || info->si_pid == 0) { +                        pthread_sigmask(SIG_BLOCK, &sigset, NULL); +                          LOG_DBG("Terminating by order of %d. Bye.",                                  info->si_pid); + +                        rw_lock_wrlock(&_ipcp->state_lock); + +                        if (_ipcp->state == IPCP_ENROLLED) { +                                clean_threads = true; +                        } + +                        _ipcp->state = IPCP_SHUTDOWN; + +                        rw_lock_unlock(&_ipcp->state_lock); + +                        if (clean_threads) { +                                rw_lock_wrlock(&_ap_instance->thread_lock); + +                                pthread_cancel(_ap_instance->handler); +                                pthread_cancel(_ap_instance->sdu_reader); +                                pthread_cancel(_ap_instance->sduloop); + +                                pthread_join(_ap_instance->sduloop, NULL); +                                pthread_join(_ap_instance->handler, NULL); +                                pthread_join(_ap_instance->sdu_reader, NULL); + +                                rw_lock_unlock(&_ap_instance->thread_lock); +                        } +                          pthread_cancel(_ap_instance->mainloop); -                        pthread_cancel(_ap_instance->handler); -                        pthread_cancel(_ap_instance->sdu_reader); -                        pthread_cancel(_ap_instance->sduloop); -                        /* FIXME: should be called after join */ -                        shim_ap_fini(); -                        exit(0); +                        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); +                  }          default:                  return; @@ -268,6 +354,7 @@ static void * ipcp_udp_listener()          while (true) {                  int fd; +                int port_id;                  memset(&buf, 0, SHIM_UDP_BUF_SIZE);                  n = sizeof c_saddr;                  n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, @@ -315,21 +402,27 @@ static void * ipcp_udp_listener()                  /* reply to IRM */ -                _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), -                                                                    buf, -                                                                    UNKNOWN_AP, -                                                                    UNKNOWN_AE); -                if (_ap_instance->flows[fd].port_id < 0) { +                port_id = ipcp_flow_req_arr(getpid(), +                                            buf, +                                            UNKNOWN_AP, +                                            UNKNOWN_AE); + +                if (port_id < 0) {                          LOG_ERR("Could not get port id from IRMd");                          close(fd);                          continue;                  } -                _ap_instance->flows[fd].rb     = NULL; -                _ap_instance->flows[fd].state  = FLOW_PENDING; +                rw_lock_wrlock(&_ap_instance->flows_lock); + +                _ap_instance->flows[fd].port_id = port_id; +                _ap_instance->flows[fd].rb      = NULL; +                _ap_instance->flows[fd].state   = FLOW_PENDING; + +                rw_lock_unlock(&_ap_instance->flows_lock);                  LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", -                         _ap_instance->flows[fd].port_id, fd); +                         port_id, fd);          }          return 0; @@ -340,12 +433,26 @@ static void * ipcp_udp_sdu_reader()          int n;          int fd;          char buf[SHIM_UDP_MAX_SDU_SIZE]; -        struct timeval tv = {0, 10}; +        struct timeval tv = {0, 1000};          struct sockaddr_in r_saddr;          fd_set read_fds;          while (true) { +                rw_lock_rdlock(&_ipcp->state_lock); + +                if (_ipcp->state != IPCP_ENROLLED) { +                        rw_lock_unlock(&_ipcp->state_lock); +                        return (void *) 0; +                } + +                rw_lock_unlock(&_ipcp->state_lock); + +                rw_lock_rdlock(&shim_data(_ipcp)->fd_lock); +                  read_fds = shim_data(_ipcp)->flow_fd_s; + +                rw_lock_unlock(&shim_data(_ipcp)->fd_lock); +                  if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)                          continue; @@ -369,22 +476,96 @@ static void * ipcp_udp_sdu_reader()          return (void *) 0;  } +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +static void * ipcp_udp_sdu_loop(void * o) +{ +        while (true) { +                struct rb_entry * e; +                int fd; +                int len = 0; +                char * buf; + +                rw_lock_rdlock(&_ipcp->state_lock); + +                if (_ipcp->state != IPCP_ENROLLED) { +                        rw_lock_unlock(&_ipcp->state_lock); +                        return (void *) 0; +                } + +                rw_lock_unlock(&_ipcp->state_lock); + +                rw_lock_rdlock(&_ap_instance->data_lock); + +                e = shm_ap_rbuff_read(_ap_instance->rb); + +                if (e == NULL) { +                        rw_lock_unlock(&_ap_instance->data_lock); +                        continue; +                } + +                len = shm_du_map_read_sdu((uint8_t **) &buf, +                                          _ap_instance->dum, +                                          e->index); +                if (len == -1) { +                        rw_lock_unlock(&_ap_instance->data_lock); +                        free(e); +                        continue; +                } + +                fd = port_id_to_fd(e->port_id); + +                if (fd == -1) { +                        rw_lock_unlock(&_ap_instance->data_lock); +                        free(e); +                        continue; +                } + +                if (len == 0) { +                        rw_lock_unlock(&_ap_instance->data_lock); +                        free(e); +                        continue; +                } + +                rw_lock_unlock(&_ap_instance->data_lock); + +                send(fd, buf, len, 0); + +                rw_lock_rdlock(&_ap_instance->data_lock); + +                shm_release_du_buff(_ap_instance->dum, e->index); + +                rw_lock_unlock(&_ap_instance->data_lock); + +                free(e); +        } + +        return (void *) 1; +} +  static int ipcp_udp_bootstrap(struct dif_config * conf)  {          char ipstr[INET_ADDRSTRLEN];          char dnsstr[INET_ADDRSTRLEN]; -        int enable = 1; +        int  enable = 1; +        int  fd = -1;          if (conf->type != THIS_TYPE) {                  LOG_ERR("Config doesn't match IPCP type.");                  return -1;          } +        rw_lock_wrlock(&_ipcp->state_lock); +          if (_ipcp->state != IPCP_INIT) { +                rw_lock_unlock(&_ipcp->state_lock);                  LOG_ERR("IPCP in wrong state.");                  return -1;          } +        _ipcp->state = IPCP_BOOTSTRAPPING; + +        rw_lock_unlock(&_ipcp->state_lock); +          if (inet_ntop(AF_INET,                        &conf->ip_addr,                        ipstr, @@ -408,37 +589,36 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)                  strcpy(dnsstr, "not set");          } -        shim_data(_ipcp)->ip_addr  = conf->ip_addr; -        shim_data(_ipcp)->dns_addr = conf->dns_addr; -          /* UDP listen server */ - -        if ((shim_data(_ipcp)->s_fd = -             socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { -                LOG_DBGF("Can't create socket."); +        if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { +                LOG_ERR("Can't create socket.");                  return -1;          } -        if (setsockopt(shim_data(_ipcp)->s_fd, +        if (setsockopt(fd,                         SOL_SOCKET, -                        SO_REUSEADDR, -                        &enable, +                       SO_REUSEADDR, +                       &enable,                         sizeof(int)) < 0) { -                LOG_DBGF("Setsockopt(SO_REUSEADDR) failed."); +                LOG_WARN("Setsockopt(SO_REUSEADDR) failed.");          } +        shim_data(_ipcp)->s_fd     = fd; +        shim_data(_ipcp)->ip_addr  = conf->ip_addr; +        shim_data(_ipcp)->dns_addr = conf->dns_addr; +          shim_data(_ipcp)->s_saddr.sin_family      = AF_INET;          shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr;          shim_data(_ipcp)->s_saddr.sin_port        = LISTEN_PORT; -        if (bind(shim_data(_ipcp)->s_fd, +        if (bind(fd,                   (struct sockaddr *) &shim_data(_ipcp)->s_saddr,                   sizeof shim_data(_ipcp)->s_saddr ) < 0) {                  LOG_ERR("Couldn't bind to %s.", ipstr);                  return -1;          } -        FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); +        rw_lock_wrlock(&_ap_instance->thread_lock);          pthread_create(&_ap_instance->handler,                         NULL, @@ -449,8 +629,25 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)                         ipcp_udp_sdu_reader,                         NULL); +        pthread_create(&_ap_instance->sduloop, +                       NULL, +                       ipcp_udp_sdu_loop, +                       NULL); + +        rw_lock_unlock(&_ap_instance->thread_lock); + +        rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + +        FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + +        rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + +        rw_lock_wrlock(&_ipcp->state_lock); +          _ipcp->state = IPCP_ENROLLED; +        rw_lock_unlock(&_ipcp->state_lock); +          LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.",                  getpid()); @@ -595,16 +792,21 @@ static int ipcp_udp_name_reg(char * name)          uint32_t ip_addr;  #endif -        if (_ipcp->state != IPCP_ENROLLED) { -                LOG_DBGF("Won't register with non-enrolled IPCP."); -                return -1; -        } -          if (strlen(name) > 24) {                  LOG_ERR("DNS names cannot be longer than 24 chars.");                  return -1;          } +        rw_lock_rdlock(&_ipcp->state_lock); + +        if (_ipcp->state != IPCP_ENROLLED) { +                rw_lock_unlock(&_ipcp->state_lock); +                LOG_DBGF("Won't register with non-enrolled IPCP."); +                return -1; /* -ENOTENROLLED */ +        } + +        rw_lock_unlock(&_ipcp->state_lock); +          if (ipcp_data_add_reg_entry(_ipcp->data, name)) {                  LOG_ERR("Failed to add %s to local registry.", name);                  return -1; @@ -659,6 +861,16 @@ static int ipcp_udp_name_unreg(char * name)  #ifdef CONFIG_OUROBOROS_ENABLE_DNS          /* unregister application with DNS server */ +        rw_lock_rdlock(&_ipcp->state_lock); + +        if (_ipcp->state != IPCP_ENROLLED) { +                rw_lock_unlock(&_ipcp->state_lock); +                LOG_DBGF("IPCP is not enrolled"); +                return -1; /* -ENOTENROLLED */ +        } + +        rw_lock_unlock(&_ipcp->state_lock); +          dns_addr = shim_data(_ipcp)->dns_addr;          if (dns_addr != 0) {                  if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) @@ -697,10 +909,21 @@ static int ipcp_udp_flow_alloc(int               port_id,  #ifdef CONFIG_OUROBOROS_ENABLE_DNS          uint32_t           dns_addr = 0;  #endif +        struct shm_ap_rbuff * rb;          if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)                  return -1; +        rw_lock_rdlock(&_ipcp->state_lock); + +        if (_ipcp->state != IPCP_ENROLLED) { +                rw_lock_unlock(&_ipcp->state_lock); +                LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); +                return -1; /* -ENOTENROLLED */ +        } + +        rw_lock_unlock(&_ipcp->state_lock); +          if (strlen(dst_name) > 255              || strlen(src_ap_name) > 255              || strlen(src_ae_name) > 255) { @@ -726,6 +949,7 @@ static int ipcp_udp_flow_alloc(int               port_id,  #ifdef CONFIG_OUROBOROS_ENABLE_DNS          dns_addr = shim_data(_ipcp)->dns_addr; +          if (dns_addr != 0) {                  ip_addr = ddns_resolve(dst_name, dns_addr);                  if (ip_addr == 0) { @@ -789,35 +1013,45 @@ static int ipcp_udp_flow_alloc(int               port_id,          free(recv_buf); -        _ap_instance->flows[fd].port_id = port_id; -        _ap_instance->flows[fd].state   = FLOW_ALLOCATED; -        _ap_instance->flows[fd].rb      = shm_ap_rbuff_open(n_pid); -        if (_ap_instance->flows[fd].rb == NULL) { +        rb = shm_ap_rbuff_open(n_pid); +        if (rb == NULL) {                  LOG_ERR("Could not open N + 1 ringbuffer.");                  close(fd); -                return -1; +                return -1; /* -ENORBUFF */          } +        rw_lock_wrlock(&_ap_instance->flows_lock); + +        _ap_instance->flows[fd].port_id = port_id; +        _ap_instance->flows[fd].state   = FLOW_ALLOCATED; +        _ap_instance->flows[fd].rb      = rb; + +        rw_lock_unlock(&_ap_instance->flows_lock);          /* tell IRMd that flow allocation "worked" */          if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { +                shm_ap_rbuff_close(rb);                  LOG_ERR("Failed to notify IRMd about flow allocation reply");                  close(fd); -                shm_ap_rbuff_close(_ap_instance->flows[fd].rb);                  return -1;          } +        rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); +          FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); +        rw_lock_unlock(&shim_data(_ipcp)->fd_lock); +          LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd);          return fd;  }  static int ipcp_udp_flow_alloc_resp(int   port_id, -                             pid_t n_pid, -                             int   response) +                                    pid_t n_pid, +                                    int   response)  { +        struct shm_ap_rbuff * rb;          int fd = port_id_to_fd(port_id);          if (fd < 0) {                  LOG_DBGF("Could not find flow with port_id %d.", port_id); @@ -829,22 +1063,44 @@ static int ipcp_udp_flow_alloc_resp(int   port_id,          /* awaken pending flow */ +        rw_lock_rdlock(&_ap_instance->flows_lock); +          if (_ap_instance->flows[fd].state != FLOW_PENDING) { +                rw_lock_unlock(&_ap_instance->flows_lock); +                  LOG_DBGF("Flow was not pending.");                  return -1;          } -        _ap_instance->flows[fd].state = FLOW_ALLOCATED; -        _ap_instance->flows[fd].rb    = shm_ap_rbuff_open(n_pid); -        if (_ap_instance->flows[fd].rb == NULL) { +        rw_lock_unlock(&_ap_instance->flows_lock); + +        rb = shm_ap_rbuff_open(n_pid); +        if (rb == NULL) {                  LOG_ERR("Could not open N + 1 ringbuffer."); + +                rw_lock_wrlock(&_ap_instance->flows_lock); +                  _ap_instance->flows[fd].state   = FLOW_NULL;                  _ap_instance->flows[fd].port_id = 0; + +                rw_lock_unlock(&_ap_instance->flows_lock); +                  return 0;          } +        rw_lock_wrlock(&_ap_instance->flows_lock); + +        _ap_instance->flows[fd].state = FLOW_ALLOCATED; +        _ap_instance->flows[fd].rb    = rb; + +        rw_lock_unlock(&_ap_instance->flows_lock); + +        rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); +          FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); +        rw_lock_unlock(&shim_data(_ipcp)->fd_lock); +          LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd);          return 0; @@ -853,18 +1109,33 @@ static int ipcp_udp_flow_alloc_resp(int   port_id,  static int ipcp_udp_flow_dealloc(int port_id)  {          int fd = port_id_to_fd(port_id); +        struct shm_ap_rbuff * rb; +          if (fd < 0) {                  LOG_DBGF("Could not find flow with port_id %d.", port_id);                  return 0;          } +        rw_lock_wrlock(&_ap_instance->flows_lock); +          _ap_instance->flows[fd].state   = FLOW_NULL;          _ap_instance->flows[fd].port_id = 0; -        if (_ap_instance->flows[fd].rb != NULL) -                shm_ap_rbuff_close(_ap_instance->flows[fd].rb); +        rb = _ap_instance->flows[fd].rb; +        _ap_instance->flows[fd].rb      = NULL; + +        rw_lock_unlock(&_ap_instance->flows_lock); + +        if (rb != NULL) +                shm_ap_rbuff_close(rb); + +        rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);          FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + +        rw_lock_unlock(&shim_data(_ipcp)->fd_lock); +          close(fd); +          return 0;  } @@ -877,7 +1148,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name)          if (shim_ap_init(ap_name) < 0)                  return NULL; -        i = malloc(sizeof *i); +        i = ipcp_instance_create();          if (i == NULL)                  return NULL; @@ -914,45 +1185,17 @@ static struct ipcp * ipcp_udp_create(char * ap_name)  #ifndef MAKE_CHECK -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ -/* FIXME: stop eating the CPU */ -static void * ipcp_udp_sdu_loop(void * o) -{ -        while (true) { -                struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); -                int fd; -                int len = 0; -                char * buf; -                if (e == NULL) -                        continue; - -                len = shm_du_map_read_sdu((uint8_t **) &buf, -                                          _ap_instance->dum, -                                          e->index); -                if (len == -1) -                        continue; - -                fd = port_id_to_fd(e->port_id); - -                if (fd == -1) -                        continue; - -                if (len == 0) -                        continue; - -                send(fd, buf, len, 0); - -                shm_release_du_buff(_ap_instance->dum, e->index); -        } - -        return (void *) 1; -} -  int main (int argc, char * argv[])  {          /* argument 1: pid of irmd ? */          /* argument 2: ap name */          struct sigaction sig_act; +        sigset_t  sigset; +        sigemptyset(&sigset); +        sigaddset(&sigset, SIGINT); +        sigaddset(&sigset, SIGQUIT); +        sigaddset(&sigset, SIGHUP); +        sigaddset(&sigset, SIGPIPE);          if (ipcp_arg_check(argc, argv)) {                  LOG_ERR("Wrong arguments."); @@ -980,13 +1223,19 @@ int main (int argc, char * argv[])                  exit(1);          } +        pthread_sigmask(SIG_BLOCK, &sigset, NULL); +          pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); -        pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); -        pthread_join(_ap_instance->sduloop, NULL); +        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); +          pthread_join(_ap_instance->mainloop, NULL); -        pthread_join(_ap_instance->handler, NULL); -        pthread_join(_ap_instance->sdu_reader, NULL); + +        shim_ap_fini(); + +        free(_ipcp->data); +        free(_ipcp->ops); +        free(_ipcp);          exit(0);  } diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6c977cbb..6c04ccc5 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -245,18 +245,17 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)          if (rb == NULL)                  return NULL; -        e = malloc(sizeof(*e)); -        if (e == NULL) -                return NULL; -          pthread_mutex_lock(rb->shm_mutex);          if (shm_rbuff_used(rb) == 0) {                  pthread_mutex_unlock(rb->shm_mutex); -                free(e);                  return NULL;          } +        e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; +          *e = *(rb->shm_base + *rb->ptr_tail);          *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); | 
