diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-05-08 16:34:19 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-05-08 16:34:19 +0200 | 
| commit | 5812dfb832e513dc455a0d48624bcad62334d457 (patch) | |
| tree | 93a02e1b20f54bb869eadc856f201412c633315c /src/lib | |
| parent | de8f2015cbd015b1cced366cb12c054be62c23b1 (diff) | |
| parent | 021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff) | |
| download | ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip | |
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/lib/bitmap.c | 19 | ||||
| -rw-r--r-- | src/lib/dev.c | 344 | ||||
| -rw-r--r-- | src/lib/ipcp.c | 65 | ||||
| -rw-r--r-- | src/lib/ipcpd_messages.proto | 6 | ||||
| -rw-r--r-- | src/lib/irmd_messages.proto | 26 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 268 | ||||
| -rw-r--r-- | src/lib/shm_du_map.c | 143 | ||||
| -rw-r--r-- | src/lib/tests/shm_du_map_test.c | 53 | 
9 files changed, 708 insertions, 217 deletions
| diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4922e07c..53a7b354 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -32,6 +32,7 @@ set(SOURCE_FILES    ipcp.c    irm.c    list.c +  shm_ap_rbuff.c    shm_du_map.c    sockets.c    utils.c diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 8aabb4f4..e84145b2 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -108,12 +108,14 @@ struct bmp * bmp_create(size_t bits, ssize_t offset)                  return NULL;          tmp = malloc(sizeof(*tmp)); -        if (!tmp) +        if (tmp == NULL)                  return NULL; -        tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(*(tmp->bitmap))); -        if (!tmp->bitmap) +        tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(unsigned long)); +        if (tmp->bitmap == NULL) { +                free(tmp);                  return NULL; +        }          tmp->size = bits;          tmp->offset = offset; @@ -140,7 +142,8 @@ int bmp_destroy(struct bmp * b)  static ssize_t bad_id(struct bmp * b)  { -        assert(b); +        if (b == NULL) +                return -1;          return b->offset - 1;  } @@ -149,8 +152,8 @@ ssize_t bmp_allocate(struct bmp * b)  {          ssize_t id; -        if (!b) -                return bad_id(b); +        if (b == NULL) +                return -1;          id = (ssize_t) find_next_zero_bit(b->bitmap,                                            b->size); @@ -177,7 +180,7 @@ static bool is_id_valid(struct bmp * b,  bool bmp_is_id_valid(struct bmp * b,                       ssize_t id)  { -        if (!b) +        if (b == NULL)                  return false;          return is_id_valid(b, id); @@ -188,7 +191,7 @@ int bmp_release(struct bmp * b,  {          ssize_t rid; -        if (!b) +        if (b == NULL)                  return -1;          if (!is_id_valid(b, id)) diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d8411c5..c99e8cdb 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,73 +25,190 @@  #include <ouroboros/logs.h>  #include <ouroboros/dev.h>  #include <ouroboros/sockets.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/instance_name.h> +#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/utils.h>  #include <stdlib.h> +#include <string.h> -int ap_reg(char * ap_name, -           char ** difs, -           size_t difs_size) +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE +  #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE +  #define DU_BUFF_TAILSPACE 0 +#endif + +struct flow { +        struct shm_ap_rbuff * rb; +        uint32_t              port_id; +        uint32_t              oflags; + +        /* don't think this needs locking */ +}; + +struct ap_data { +        instance_name_t *     api; +        struct shm_du_map *   dum; +        struct bmp *          fds; + +        struct shm_ap_rbuff * rb; +        struct flow           flows[AP_MAX_FLOWS]; +} * _ap_instance; + + +int ap_init(char * ap_name)  { -        irm_msg_t msg = IRM_MSG__INIT; +        _ap_instance = malloc(sizeof(struct ap_data)); +        if (_ap_instance == NULL) { +                return -1; +        } + +        _ap_instance->api = instance_name_create(); +        if (_ap_instance->api == NULL) { +                free(_ap_instance); +                return -1; +        } + +        if (instance_name_init_from(_ap_instance->api, +                                    ap_name, +                                    getpid()) == NULL) { +                instance_name_destroy(_ap_instance->api); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); +        if (_ap_instance->fds == NULL) { +                instance_name_destroy(_ap_instance->api); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->dum = shm_du_map_open(); +        if (_ap_instance->dum == NULL) { +                instance_name_destroy(_ap_instance->api); +                bmp_destroy(_ap_instance->fds); +                free(_ap_instance); +                return -1; +        } + +        _ap_instance->rb = shm_ap_rbuff_create(); +        if (_ap_instance->rb == NULL) { +                instance_name_destroy(_ap_instance->api); +                bmp_destroy(_ap_instance->fds); +                free(_ap_instance); +                return -1; +        } + +        return 0; +} + +void ap_fini() +{ +        int i = 0; + +        if (_ap_instance == NULL) +                return; +        if (_ap_instance->api != NULL) +                instance_name_destroy(_ap_instance->api); +        if (_ap_instance->fds != NULL) +                bmp_destroy(_ap_instance->fds); +        if (_ap_instance->dum != NULL) +                shm_du_map_close(_ap_instance->dum); +        if (_ap_instance->rb != NULL) +                shm_ap_rbuff_destroy(_ap_instance->rb); +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                if (_ap_instance->flows[i].rb != NULL) +                        shm_ap_rbuff_close(_ap_instance->flows[i].rb); + +        free(_ap_instance); +} + +#if 0 +static int port_id_to_fd(uint32_t port_id) +{ +        int i; +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                if (_ap_instance->flows[i].port_id == port_id +                        && _ap_instance->flows[i].state != FLOW_NULL) +                        return i; +        return -1; +} +#endif + +int ap_reg(char ** difs, +           size_t  len) +{ +        irm_msg_t msg        = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int fd = 0; +        int fd = bmp_allocate(_ap_instance->fds); -        if (ap_name == NULL || -            difs == NULL || -            difs_size == 0 || +        if (difs == NULL || +            len == 0 ||              difs[0] == NULL) {                  return -EINVAL;          } +        if (_ap_instance == NULL) { +                LOG_DBG("ap_init was not called"); +                return -1; +        } +          msg.code       = IRM_MSG_CODE__IRM_AP_REG;          msg.has_pid    = true; -        msg.pid        = getpid(); -        msg.ap_name    = ap_name; +        msg.pid        = _ap_instance->api->id; +        msg.ap_name    = _ap_instance->api->name;          msg.dif_name   = difs; -        msg.n_dif_name = difs_size; +        msg.n_dif_name = len;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_fd == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        fd = recv_msg->fd; +        if (recv_msg->result < 0) +                fd = -1; +          irm_msg__free_unpacked(recv_msg, NULL);          return fd;  } -int ap_unreg(char * ap_name, -             char ** difs, -             size_t difs_size) +int ap_unreg(char ** difs, +             size_t  len)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL;          int ret = -1; -        if (ap_name == NULL || -            difs == NULL || -            difs_size == 0 || +        if (difs == NULL || +            len == 0 ||              difs[0] == NULL) {                  return -EINVAL;          }          msg.code       = IRM_MSG_CODE__IRM_AP_UNREG;          msg.has_pid    = true; -        msg.pid        = getpid(); -        msg.ap_name    = ap_name; +        msg.pid        = _ap_instance->api->id; +        msg.ap_name    = _ap_instance->api->name;          msg.dif_name   = difs; -        msg.n_dif_name = difs_size; +        msg.n_dif_name = len;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -102,38 +219,62 @@ int ap_unreg(char * ap_name,          return ret;  } -int flow_accept(int fd, -                char * ap_name, -                char * ae_name) +int flow_accept(int     fd, +                char ** ap_name, +                char ** ae_name)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int cli_fd = 0; - -        if (ap_name == NULL) { -                return -EINVAL; -        } +        int cfd = -1;          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_pid = true; -        msg.pid     = getpid(); -        msg.has_fd  = true; -        msg.fd      = fd; +        msg.pid     = _ap_instance->api->id;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_fd == false) { +        if (!recv_msg->has_pid || !recv_msg->has_port_id) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        cli_fd  = recv_msg->fd; -        ap_name = recv_msg->ap_name; -        ae_name = recv_msg->ae_name; + +        cfd = bmp_allocate(_ap_instance->fds); + +        _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); +        if (_ap_instance->flows[cfd].rb == NULL) { +                bmp_release(_ap_instance->fds, cfd); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        *ap_name = strdup(recv_msg->ap_name); +        if (*ap_name == NULL) { +                bmp_release(_ap_instance->fds, cfd); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        if (ae_name != NULL) { +                *ae_name = strdup(recv_msg->ae_name); +                if (*ae_name == NULL) { +                        bmp_release(_ap_instance->fds, cfd); +                        irm_msg__free_unpacked(recv_msg, NULL); +                        return -1; +                } +        } + +        _ap_instance->flows[cfd].port_id = recv_msg->port_id; +        _ap_instance->flows[cfd].oflags  = FLOW_O_DEFAULT; + +          irm_msg__free_unpacked(recv_msg, NULL); -        return cli_fd; + +        bmp_release(_ap_instance->fds, fd); + +        return cfd;  }  int flow_alloc_resp(int fd, @@ -145,9 +286,9 @@ int flow_alloc_resp(int fd,          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;          msg.has_pid      = true; -        msg.pid          = getpid(); -        msg.has_fd       = true; -        msg.fd = fd; +        msg.pid          = _ap_instance->api->id; +        msg.has_port_id  = true; +        msg.port_id      = _ap_instance->flows[fd].port_id;          msg.has_response = true;          msg.response     = response; @@ -155,7 +296,7 @@ int flow_alloc_resp(int fd,          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -167,41 +308,49 @@ int flow_alloc_resp(int fd,  }  int flow_alloc(char * dst_name, -               char * src_ap_name,                 char * src_ae_name, -               struct qos_spec * qos, -               int oflags) +               struct qos_spec * qos)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int fd = 0; +        int fd = -1; -        if (dst_name == NULL || -            src_ap_name == NULL) { +        if (dst_name == NULL)                  return -EINVAL; -        }          if (src_ae_name == NULL)                  src_ae_name  = UNKNOWN_AE;          msg.code        = IRM_MSG_CODE__IRM_FLOW_ALLOC;          msg.dst_name    = dst_name; -        msg.ap_name     = src_ap_name; +        msg.ap_name     = _ap_instance->api->name; +        msg.has_pid     = true; +        msg.pid         = _ap_instance->api->id;          msg.ae_name     = src_ae_name; -        msg.has_oflags  = true; -        msg.oflags      = oflags;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_fd == false) { +        if (!recv_msg->has_pid || !recv_msg->has_port_id) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        fd = recv_msg->fd; +        fd = bmp_allocate(_ap_instance->fds); + +        _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); +        if (_ap_instance->flows[fd].rb == NULL) { +                bmp_release(_ap_instance->fds, fd); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1; +        } + +        _ap_instance->flows[fd].port_id = recv_msg->port_id; +        _ap_instance->flows[fd].oflags  = FLOW_O_DEFAULT; +          irm_msg__free_unpacked(recv_msg, NULL); +          return fd;  } @@ -211,17 +360,15 @@ int flow_alloc_res(int fd)          irm_msg_t * recv_msg = NULL;          int result = 0; -        msg.code    = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; -        msg.has_pid = true; -        msg.pid     = getpid(); -        msg.has_fd  = true; -        msg.fd      = fd; +        msg.code          = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; +        msg.has_port_id  = true; +        msg.port_id      = _ap_instance->flows[fd].port_id;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -238,17 +385,15 @@ int flow_dealloc(int fd)          irm_msg_t * recv_msg = NULL;          int ret = -1; -        msg.code    = IRM_MSG_CODE__IRM_FLOW_DEALLOC; -        msg.has_pid = true; -        msg.pid     = getpid(); -        msg.has_fd  = true; -        msg.fd      = fd; +        msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC; +        msg.has_port_id  = true; +        msg.port_id      = _ap_instance->flows[fd].port_id;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -259,47 +404,50 @@ int flow_dealloc(int fd)          return ret;  } -int flow_cntl(int fd, int oflags) +int flow_cntl(int fd, int cmd, int oflags)  { -        irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int ret = -1; - -        msg.has_pid = true; -        msg.pid     = getpid(); -        msg.has_fd  = true; -        msg.fd      = fd; -        msg.oflags  = oflags; +        return -1; +} -        recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) +ssize_t flow_write(int fd, void * buf, size_t count) +{ +        /* the AP chooses the amount of headspace and tailspace */ +        size_t index = shm_create_du_buff(_ap_instance->dum, +                                          count + DU_BUFF_HEADSPACE + +                                          DU_BUFF_TAILSPACE, +                                          DU_BUFF_HEADSPACE, +                                          (uint8_t *) buf, +                                          count); +        struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; +        if (index == -1)                  return -1; -        if (recv_msg->has_result == false) { -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; +        if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { +                shm_release_du_buff(_ap_instance->dum, index); +                return -EPIPE;          } -        ret = recv_msg->result; -        irm_msg__free_unpacked(recv_msg, NULL); - -        return ret; +        return 0;  } -ssize_t flow_write(int fd, -                   void * buf, -                   size_t count) +ssize_t flow_read(int fd, void * buf, size_t count)  { -        LOG_MISSING; +        struct rb_entry * e = NULL; +        int n; +        uint8_t * sdu; +        /* FIXME: move this to a thread  */ +        while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id) +                e = shm_ap_rbuff_read(_ap_instance->rb); + +        n = shm_du_map_read_sdu(&sdu, +                                _ap_instance->dum, +                                e->index); +        if (n < 0) +                return -1; -        return -1; -} +        memcpy(buf, sdu, MIN(n, count)); -ssize_t flow_read(int fd, -                  void * buf, -                  size_t count) -{ -        LOG_MISSING; +        shm_release_du_buff(_ap_instance->dum, e->index); -        return -1; +        return n;  } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 387572b3..75676915 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -121,6 +121,8 @@ pid_t ipcp_create(char *         ipcp_name,                  return pid;          } +        /* clear fd table */ +          if (ipcp_type == IPCP_NORMAL)                  exec_name = IPCP_NORMAL_EXEC;          else if (ipcp_type == IPCP_SHIM_UDP) @@ -286,13 +288,8 @@ int ipcp_enroll(pid_t pid,                  return -EINVAL;          msg.code        = IPCP_MSG_CODE__IPCP_ENROLL; -        msg.member_name = malloc(sizeof(*(msg.member_name))); -        if (msg.member_name == NULL) { -                LOG_ERR("Failed to malloc."); -                return -1; -        } -        msg.n_1_dif     = n_1_dif;          msg.member_name = member_name; +        msg.n_1_dif     = n_1_dif;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL) { @@ -323,8 +320,8 @@ int ipcp_name_reg(pid_t    pid,          if (name == NULL)                  return -1; -        msg.code          = IPCP_MSG_CODE__IPCP_NAME_REG; -        msg.name          = name; +        msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; +        msg.name = name;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL) @@ -368,6 +365,7 @@ int ipcp_name_unreg(pid_t  pid,  int ipcp_flow_alloc(pid_t             pid,                      uint32_t          port_id, +                    pid_t             n_pid,                      char *            dst_name,                      char *            src_ap_name,                      char *            src_ae_name, @@ -381,17 +379,19 @@ int ipcp_flow_alloc(pid_t             pid,                  return -EINVAL;          msg.code        = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; +        msg.has_port_id = true; +        msg.port_id     = port_id; +        msg.has_pid     = true; +        msg.pid         = n_pid;          msg.src_ap_name = src_ap_name;          msg.src_ae_name = src_ae_name;          msg.dst_name    = dst_name; -        msg.port_id     = port_id; -        msg.has_port_id = true;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  ipcp_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -404,17 +404,20 @@ int ipcp_flow_alloc(pid_t             pid,  int ipcp_flow_alloc_resp(pid_t    pid,                           uint32_t port_id, -                         int      result) +                         pid_t    n_pid, +                         int      response)  {          ipcp_msg_t msg = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL;          int ret = -1; -        msg.code        = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; -        msg.has_port_id = true; -        msg.port_id     = port_id; -        msg.has_result  = true; -        msg.result      = result; +        msg.code         = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; +        msg.has_port_id  = true; +        msg.port_id      = port_id; +        msg.has_pid      = true; +        msg.pid          = n_pid; +        msg.has_response = true; +        msg.response     = response;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL) @@ -431,38 +434,38 @@ int ipcp_flow_alloc_resp(pid_t    pid,          return ret;  } -int ipcp_flow_req_arr(pid_t    pid, -                      char *   dst_name, -                      char *   src_ap_name, -                      char *   src_ae_name) +int ipcp_flow_req_arr(pid_t  pid, +                      char * dst_name, +                      char * src_ap_name, +                      char * src_ae_name)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int fd = -1; +        int port_id = -1;          if (src_ap_name == NULL || src_ae_name == NULL)                  return -EINVAL;          msg.code          = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; +        msg.has_pid       = true; +        msg.pid           = pid;          msg.dst_name      = dst_name;          msg.ap_name       = src_ap_name;          msg.ae_name       = src_ae_name; -        msg.pid           = pid; -        msg.has_pid       = true;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; -        if (recv_msg->has_fd == false) { +        if (!recv_msg->has_port_id) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        fd = recv_msg->fd; +        port_id = recv_msg->port_id;          irm_msg__free_unpacked(recv_msg, NULL); -        return fd; +        return port_id;  }  int ipcp_flow_alloc_reply(pid_t    pid, @@ -509,11 +512,11 @@ int ipcp_flow_dealloc(pid_t    pid,                  recv_msg = send_recv_ipcp_msg(pid, &msg);                  if (recv_msg == NULL) -                        return -1; +                        return 0;                  if (recv_msg->has_result == false) {                          ipcp_msg__free_unpacked(recv_msg, NULL); -                        return -1; +                        return 0;                  }                  ret = recv_msg->result; @@ -531,11 +534,11 @@ int ipcp_flow_dealloc(pid_t    pid,                  recv_msg = send_recv_irm_msg(&msg);                  if (recv_msg == NULL) -                        return -1; +                        return 0;                  if (recv_msg->has_result == false) {                          irm_msg__free_unpacked(recv_msg, NULL); -                        return -1; +                        return 0;                  }                  ret = recv_msg->result; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index da4bb469..daca011d 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -25,6 +25,8 @@ message ipcp_msg {          optional string src_ap_name  =  9;          optional string src_ae_name  = 10;          optional dif_config_msg conf = 11; -        optional int32 result        = 12; -        optional int32 fd            = 13; +        optional int32 fd            = 12; +        optional int32 pid           = 13; +        optional int32 response      = 14; +        optional int32 result        = 15;  }; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 89e2c882..c336614e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -36,13 +36,10 @@ enum irm_msg_code {          IRM_FLOW_ALLOC        = 11;          IRM_FLOW_ALLOC_RES    = 12;          IRM_FLOW_DEALLOC      = 13; -        IRM_FLOW_CONTROL      = 14; -        IRM_FLOW_WRITE        = 15; -        IRM_FLOW_READ         = 16; -        IPCP_FLOW_REQ_ARR     = 17; -        IPCP_FLOW_ALLOC_REPLY = 18; -        IPCP_FLOW_DEALLOC     = 19; -        IRM_REPLY             = 20; +        IPCP_FLOW_REQ_ARR     = 14; +        IPCP_FLOW_ALLOC_REPLY = 15; +        IPCP_FLOW_DEALLOC     = 16; +        IRM_REPLY             = 17;  };  message irm_msg { @@ -52,12 +49,11 @@ message irm_msg {          optional uint32 api_id       =  3;          optional uint32 ipcp_type    =  5;          repeated string dif_name     =  6; -        optional int32 fd            =  7; -        optional int32 response      =  8; -        optional int32 oflags        =  9; -        optional string dst_name     = 10; -        optional uint32 port_id      = 11; -        optional int32 pid           = 12; -        optional dif_config_msg conf = 13; -        optional int32 result        = 14; +        optional int32 response      =  7; +        optional string dst_name     =  8; +        optional uint32 port_id      =  9; +        optional int32 pid           = 10; +        optional dif_config_msg conf = 11; +        optional int32 cfd           = 12; +        optional int32 result        = 13;  }; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c new file mode 100644 index 00000000..0a41dfb3 --- /dev/null +++ b/src/lib/shm_ap_rbuff.c @@ -0,0 +1,268 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for application processes + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/shm_ap_rbuff.h> +#define OUROBOROS_PREFIX "shm_ap_rbuff" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <stdbool.h> +#include <errno.h> + +#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry)          \ +                             + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)) + +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)\ +                          & (SHM_RBUFF_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) + +struct shm_ap_rbuff { +        struct rb_entry * shm_base;    /* start of entry */ +        size_t *          ptr_head;    /* start of ringbuffer head */ +        size_t *          ptr_tail;    /* start of ringbuffer tail */ +        pthread_mutex_t * shm_mutex;   /* lock all free space in shm */ +        pid_t             pid;         /* pid to which this rb belongs */ +        int               fd; +}; + +struct shm_ap_rbuff * shm_ap_rbuff_create() +{ +        struct shm_ap_rbuff * rb; +        int                   shm_fd; +        struct rb_entry *     shm_base; +        pthread_mutexattr_t   attr; +        char                  fn[25]; + +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); + +        rb = malloc(sizeof(*rb)); +        if (rb == NULL) { +                LOG_DBGF("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBGF("Failed creating ring buffer."); +                free(rb); +                return NULL; +        } + +        if (lseek(shm_fd, SHM_RBUFF_FILE_SIZE - 1, SEEK_SET) < 0) { +                LOG_DBGF("Failed to extend ringbuffer."); +                free(rb); +                return NULL; +        } + +        if (write(shm_fd, "", 1) != 1) { +                LOG_DBGF("Failed to finalise extension of ringbuffer."); +                free(rb); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_RBUFF_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        if (shm_base == MAP_FAILED) { +                LOG_DBGF("Failed to map shared memory."); +                if (close(shm_fd) == -1) +                        LOG_DBGF("Failed to close invalid shm."); + +                if (shm_unlink(fn) == -1) +                        LOG_DBGF("Failed to remove invalid shm."); + +                free(rb); +                return NULL; +        } + +        rb->shm_base = shm_base; +        rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); +        rb->ptr_tail = (size_t *) +                ((uint8_t *) rb->ptr_head + sizeof(size_t)); +        rb->shm_mutex = (pthread_mutex_t *) +                ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + +        pthread_mutexattr_init(&attr); +        pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); +        pthread_mutex_init(rb->shm_mutex, &attr); + +        *rb->ptr_head = 0; +        *rb->ptr_tail = 0; + +        rb->fd  = shm_fd; +        rb->pid = getpid(); + +        return rb; +} + +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid) +{ +        struct shm_ap_rbuff * rb; +        int                   shm_fd; +        struct rb_entry *     shm_base; +        char                  fn[25]; + +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", pid); + +        rb = malloc(sizeof(*rb)); +        if (rb == NULL) { +                LOG_DBGF("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(fn, O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBGF("Failed opening shared memory %s.", fn); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_RBUFF_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        if (shm_base == MAP_FAILED) { +                LOG_DBGF("Failed to map shared memory."); +                if (close(shm_fd) == -1) +                        LOG_DBGF("Failed to close invalid shm."); + +                if (shm_unlink(fn) == -1) +                        LOG_DBGF("Failed to remove invalid shm."); + +                free(rb); +                return NULL; +        } + +        rb->shm_base = shm_base; +        rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); +        rb->ptr_tail = (size_t *) +                ((uint8_t *) rb->ptr_head + sizeof(size_t)); +        rb->shm_mutex = (pthread_mutex_t *) +                ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + +        rb->fd = shm_fd; +        rb->pid = pid; + +        return rb; +} +void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) +{ +        char fn[25]; + +        if (rb == NULL) { +                LOG_DBGF("Bogus input. Bugging out."); +                return; +        } + +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->pid); + +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBGF("Couldn't unmap shared memory."); + +        free(rb); +} + +void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) +{ +        char fn[25]; + + +        if (rb == NULL) { +                LOG_DBGF("Bogus input. Bugging out."); +                return; +        } + +        if (rb->pid != getpid()) { +                LOG_ERR("Tried to destroy other AP's rbuff."); +                return; +        } + +        sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->pid); + +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBGF("Couldn't unmap shared memory."); + +        if (shm_unlink(fn) == -1) +                LOG_DBGF("Failed to unlink shm."); + +        free(rb); +} + +int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) +{ +        struct rb_entry * pos; + +        if (rb == NULL || e == NULL) +                return -1; + +        pthread_mutex_lock(rb->shm_mutex); + +        if (!shm_rbuff_free(rb)) { +                pthread_mutex_unlock(rb->shm_mutex); +                return -1; +        } + +        pos = rb->shm_base + *rb->ptr_head; +        *pos = *e; +        *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); + +        pthread_mutex_unlock(rb->shm_mutex); + +        return 0; +} +struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) +{ +        struct rb_entry * e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        if (rb == NULL) +                return NULL; + +        pthread_mutex_lock(rb->shm_mutex); + +        if (shm_rbuff_used(rb) == 0) { +                pthread_mutex_unlock(rb->shm_mutex); +                return NULL; +        } + +        *e = *(rb->shm_base + *rb->ptr_tail); + +        *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + +        pthread_mutex_unlock(rb->shm_mutex); + +        return e; +} diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index dfccca6a..56062c9d 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -45,6 +45,9 @@  ((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail *                      \                                           SHM_DU_BUFF_BLOCK_SIZE))) +#define idx_to_du_buff_ptr(dum, idx)                                           \ +        ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) +  #define block_ptr_to_idx(dum, sdb)                                             \          (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) @@ -52,27 +55,31 @@                            & (SHM_BLOCKS_IN_MAP - 1))  #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail -            \ +                            idx_to_du_buff_ptr(dum, idx)->du_head) +  #define MIN(a,b)(a < b ? a : b)  struct shm_du_buff { -        size_t            size; -        size_t            du_head; -        size_t            du_tail; +        size_t size; +        size_t du_head; +        size_t du_tail; +        size_t garbage;  };  struct shm_du_map { -        uint8_t            * shm_base;    /* start of blocks */ -        size_t             * ptr_head;    /* start of ringbuffer head */ -        size_t             * ptr_tail;    /* start of ringbuffer tail */ -        pthread_mutex_t    * shm_mutex;   /* lock all free space in shm */ -        int                  fd; +        uint8_t *         shm_base;    /* start of blocks */ +        size_t *          ptr_head;    /* start of ringbuffer head */ +        size_t *          ptr_tail;    /* start of ringbuffer tail */ +        pthread_mutex_t * shm_mutex;   /* lock all free space in shm */ +        int               fd;  };  struct shm_du_map * shm_du_map_create()  {          struct shm_du_map * dum;          int                 shm_fd; -        uint8_t           * shm_base; +        uint8_t *           shm_base;          pthread_mutexattr_t attr;          dum = malloc(sizeof *dum); @@ -141,7 +148,13 @@ struct shm_du_map * shm_du_map_open()  {          struct shm_du_map * dum;          int                 shm_fd; -        uint8_t           * shm_base; +        uint8_t *           shm_base; + +        dum = malloc(sizeof *dum); +        if (dum == NULL) { +                LOG_DBGF("Could not allocate struct."); +                return NULL; +        }          shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666);          if (shm_fd == -1) { @@ -166,12 +179,6 @@ struct shm_du_map * shm_du_map_open()                  return NULL;          } -        dum = malloc(sizeof *dum); -        if (dum == NULL) { -                LOG_DBGF("Could not allocate struct."); -                return NULL; -        } -          dum->shm_base = shm_base;          dum->ptr_head = (size_t *)                  ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); @@ -195,38 +202,52 @@ void shm_du_map_close(struct shm_du_map * dum)          if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)                  LOG_DBGF("Couldn't unmap shared memory."); +        free(dum); +} + +void shm_du_map_destroy(struct shm_du_map * dum) +{ +        if (dum == NULL) { +                LOG_DBGF("Bogus input. Bugging out."); +                return; +        } + +        if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) +                LOG_DBGF("Couldn't unmap shared memory."); +          if (shm_unlink(SHM_DU_MAP_FILENAME) == -1)                  LOG_DBGF("Failed to unlink shm.");          free(dum);  } -struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, -                                        size_t              size, -                                        size_t              headspace, -                                        uint8_t           * data, -                                        size_t              len) +int shm_create_du_buff(struct shm_du_map * dum, +                       size_t              size, +                       size_t              headspace, +                       uint8_t *           data, +                       size_t              len)  {          struct shm_du_buff * sdb;          long                 blocks = 0;          int                  sz = size + sizeof *sdb;          int                  sz2 = headspace + len + sizeof *sdb; -        uint8_t            * write_pos; +        uint8_t *            write_pos;          size_t               copy_len; +        size_t               index;          if (dum == NULL || data == NULL) {                  LOG_DBGF("Bogus input, bugging out."); -                return NULL; +                return -1;          }          if (headspace >= size) {                  LOG_DBGF("Index out of bounds."); -                return NULL; +                return -1;          }          if (headspace + len > size) {                  LOG_DBGF("Buffer too small for data."); -                return NULL; +                return -1;          }          pthread_mutex_lock(dum->shm_mutex); @@ -237,20 +258,20 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,                  if (sz2 < 0 && sz > 0) {                          pthread_mutex_unlock(dum->shm_mutex);                          LOG_DBG("Can't handle this packet now"); -                        return NULL; +                        return -1;                  }                  ++blocks;          }          if (!shm_map_free(dum, blocks)) {                  pthread_mutex_unlock(dum->shm_mutex); -                LOG_DBGF("Allocation failed, Out of Memory."); -                return NULL; +                return -1;          }          sdb = get_head_ptr(dum);          sdb->size = size; +        sdb->garbage = 0;          sdb->du_head = headspace;          sdb->du_tail = sdb->du_head + len; @@ -267,32 +288,76 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,                  --blocks;          } +        index = *dum->ptr_head - 1; +          pthread_mutex_unlock(dum->shm_mutex); -        return sdb; +        return index;  } -int shm_release_du_buff(struct shm_du_map * dum) +/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ +int shm_du_map_read_sdu(uint8_t **          dst, +                        struct shm_du_map * dum, +                        size_t              idx) +{ +        size_t    len = 0; + +        if (idx > SHM_BLOCKS_IN_MAP) +                return -1; + +        pthread_mutex_lock(dum->shm_mutex); + +        if (*dum->ptr_head == *dum->ptr_tail) { +                pthread_mutex_unlock(dum->shm_mutex); +                return -1; +        } + +        *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) + +                sizeof(struct shm_du_buff) + +                idx_to_du_buff_ptr(dum, idx)->du_head; +        len = sdu_size(dum, idx); + +        pthread_mutex_unlock(dum->shm_mutex); + +        return len; +} + +int shm_release_du_buff(struct shm_du_map * dum, size_t idx)  {          long sz;          long blocks = 0; + +        /* FIXME: this is crap for the test */ +        if (idx > SHM_BLOCKS_IN_MAP) +                idx = *dum->ptr_tail; +          pthread_mutex_lock(dum->shm_mutex);          if (*dum->ptr_head == *dum->ptr_tail) { -                LOG_DBGF("Attempt to free empty ringbuffer. Nothing to do.");                  pthread_mutex_unlock(dum->shm_mutex);                  return -1;          } -        sz = get_tail_ptr(dum)->size; +        idx_to_du_buff_ptr(dum, idx)->garbage = 1; -        while (sz + (long) sizeof (struct shm_du_buff) > 0) { -                sz -= SHM_DU_BUFF_BLOCK_SIZE; -                ++blocks; +        if (idx != *dum->ptr_tail) { +                pthread_mutex_unlock(dum->shm_mutex); +                return 0; +        } + +        while (get_tail_ptr(dum)->garbage == 1) { +                sz = get_tail_ptr(dum)->size; + +                while (sz + (long) sizeof (struct shm_du_buff) > 0) { +                        sz -= SHM_DU_BUFF_BLOCK_SIZE; +                        ++blocks; +                } + +                *(dum->ptr_tail) = +                        (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);          } -        *(dum->ptr_tail) = (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);          pthread_mutex_unlock(dum->shm_mutex);          return 0; @@ -317,7 +382,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,  }  uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, -                                 size_t size) +                                 size_t               size)  {          if (sdb == NULL) {                  LOG_DBGF("Bogus input, bugging out."); @@ -335,7 +400,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb,  }  int shm_du_buff_head_release(struct shm_du_buff * sdb, -                             size_t size) +                             size_t               size)  {          if (sdb == NULL) {                  LOG_DBGF("Bogus input, bugging out."); @@ -353,7 +418,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb,  }  int shm_du_buff_tail_release(struct shm_du_buff * sdb, -                             size_t size) +                             size_t               size)  {          if (sdb == NULL) {                  LOG_DBGF("Bogus input, bugging out."); diff --git a/src/lib/tests/shm_du_map_test.c b/src/lib/tests/shm_du_map_test.c index 85a82e4d..55938a62 100644 --- a/src/lib/tests/shm_du_map_test.c +++ b/src/lib/tests/shm_du_map_test.c @@ -32,7 +32,7 @@  #include <ouroboros/logs.h> -#define SIZE_OF_DU_BUFF 24 +#define SIZE_OF_DU_BUFF 32  #define TEST_BUFF_SIZE (SHM_DU_BUFF_BLOCK_SIZE - SIZE_OF_DU_BUFF)  #define MAX(a,b) (a > b ? a : b) @@ -44,7 +44,7 @@ void * produce()  {          struct shm_du_map * dum;          long                test_buf_size = 0; -        uint8_t           * test_values; +        uint8_t *           test_values;          int                 headspace;          int                 tailspace;          long                i; @@ -66,9 +66,8 @@ void * produce()                  test_values[i] = 170;          clock_gettime(CLOCK_MONOTONIC, &starttime); -        for (i = 1; i < SHM_BLOCKS_IN_MAP; i++) { -                struct shm_du_buff * sdb; -                size_t               len; +        for (i = 1; i < 16 * SHM_BLOCKS_IN_MAP; i++) { +                size_t len;                  test_buf_size = TEST_BUFF_SIZE; @@ -77,21 +76,19 @@ void * produce()                  len = test_buf_size - (headspace + tailspace); -                sdb = shm_create_du_buff(dum, -                                         test_buf_size, -                                         headspace, -                                         test_values, -                                         len); - -                if (sdb != NULL) { -                        bytes_written += len; -                } -                else { -                        sync = -2; -                        break; +                if (shm_create_du_buff(dum, +                                       test_buf_size, +                                       headspace, +                                       test_values, +                                       len) < 0) { +                        continue;                  } + +                bytes_written += len;          } +        sync = -2; +          clock_gettime(CLOCK_MONOTONIC, &stoptime);          elapsed =(stoptime.tv_sec + stoptime.tv_nsec / 1000000000.0) -                  (starttime.tv_sec + starttime.tv_nsec / 1000000000.0); @@ -104,13 +101,14 @@ void * produce()          sync = -1; +        shm_du_map_close(dum); +          return 0;  }  void * consume()  {          struct shm_du_map * dum; -          struct timespec     ts;          ts.tv_sec = 0; @@ -123,10 +121,15 @@ void * consume()                  return (void *)-1;          } -        while (!sync) { -                while (!shm_release_du_buff(dum)); -                nanosleep(&ts, NULL); +        while (true) { +                shm_release_du_buff(dum, 1823429173941); +                if (sync) +                        break;          } +        nanosleep(&ts, NULL); + + +        shm_du_map_close(dum);          return 0;  } @@ -149,7 +152,7 @@ int shm_du_map_test(int argc, char ** argv)                  return -1;          } -        shm_du_map_close(dum); +        shm_du_map_destroy(dum);          LOG_INFO("done."); @@ -165,7 +168,7 @@ int shm_du_map_test(int argc, char ** argv)          pthread_create(&consumer, NULL, consume, NULL);          pthread_join(consumer, NULL); -        shm_du_map_close(dum); +        shm_du_map_destroy(dum);          LOG_INFO("done."); @@ -173,6 +176,8 @@ int shm_du_map_test(int argc, char ** argv)          LOG_INFO("starting concurrency test."); +        sync = 0; +          dum = shm_du_map_create();          res1 = (int) pthread_create(&producer, NULL, produce, NULL); @@ -181,7 +186,7 @@ int shm_du_map_test(int argc, char ** argv)          pthread_join(producer, NULL);          pthread_join(consumer, NULL); -        shm_du_map_close(dum); +        shm_du_map_destroy(dum);          LOG_INFO("done."); | 
