diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 212 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 4 | 
2 files changed, 179 insertions, 37 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index ae27a05f..440f40f9 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -31,6 +31,7 @@  #include <ouroboros/shm_du_map.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/utils.h> +#include <ouroboros/rw_lock.h>  #include <stdlib.h>  #include <string.h> @@ -47,9 +48,11 @@ struct ap_data {          instance_name_t *     api;          struct shm_du_map *   dum;          struct bmp *          fds; -          struct shm_ap_rbuff * rb; +        rw_lock_t             data_lock; +          struct flow           flows[AP_MAX_FLOWS]; +        rw_lock_t             flows_lock;  } * _ap_instance;  int ap_init(char * ap_name) @@ -92,14 +95,19 @@ int ap_init(char * ap_name)          _ap_instance->rb = shm_ap_rbuff_create();          if (_ap_instance->rb == NULL) {                  instance_name_destroy(_ap_instance->api); +                shm_du_map_close(_ap_instance->dum);                  bmp_destroy(_ap_instance->fds);                  free(_ap_instance);                  return -1;          } -        for (i = 0; i < AP_MAX_FLOWS; ++i) +        for (i = 0; i < AP_MAX_FLOWS; ++i) {                  _ap_instance->flows[i].rb = NULL; +                _ap_instance->flows[i].port_id = -1; +        } +        rw_lock_init(&_ap_instance->flows_lock); +        rw_lock_init(&_ap_instance->data_lock);          return 0;  } @@ -110,6 +118,9 @@ void ap_fini(void)          if (_ap_instance == NULL)                  return; + +        rw_lock_wrlock(&_ap_instance->data_lock); +          if (_ap_instance->api != NULL)                  instance_name_destroy(_ap_instance->api);          if (_ap_instance->fds != NULL) @@ -122,6 +133,8 @@ void ap_fini(void)                  if (_ap_instance->flows[i].rb != NULL)                          shm_ap_rbuff_close(_ap_instance->flows[i].rb); +        rw_lock_unlock(&_ap_instance->data_lock); +          free(_ap_instance);  } @@ -142,7 +155,7 @@ int ap_reg(char ** difs,  {          irm_msg_t msg        = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int fd = bmp_allocate(_ap_instance->fds); +        int fd = -1;          if (difs == NULL ||              len == 0 || @@ -157,11 +170,16 @@ int ap_reg(char ** difs,          msg.code       = IRM_MSG_CODE__IRM_AP_REG;          msg.has_pid    = true; -        msg.pid        = _ap_instance->api->id; -        msg.ap_name    = _ap_instance->api->name;          msg.dif_name   = difs;          msg.n_dif_name = len; +        rw_lock_rdlock(&_ap_instance->data_lock); + +        msg.pid        = _ap_instance->api->id; +        msg.ap_name    = _ap_instance->api->name; + +        rw_lock_unlock(&_ap_instance->data_lock); +          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; @@ -176,6 +194,12 @@ int ap_reg(char ** difs,          irm_msg__free_unpacked(recv_msg, NULL); +        rw_lock_wrlock(&_ap_instance->data_lock); + +        fd = bmp_allocate(_ap_instance->fds); + +        rw_lock_unlock(&_ap_instance->data_lock); +          return fd;  } @@ -194,11 +218,16 @@ int ap_unreg(char ** difs,          msg.code       = IRM_MSG_CODE__IRM_AP_UNREG;          msg.has_pid    = true; -        msg.pid        = _ap_instance->api->id; -        msg.ap_name    = _ap_instance->api->name;          msg.dif_name   = difs;          msg.n_dif_name = len; +        rw_lock_rdlock(&_ap_instance->data_lock); + +        msg.pid        = _ap_instance->api->id; +        msg.ap_name    = _ap_instance->api->name; + +        rw_lock_unlock(&_ap_instance->data_lock); +          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; @@ -224,8 +253,13 @@ int flow_accept(int     fd,          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_pid = true; + +        rw_lock_rdlock(&_ap_instance->data_lock); +          msg.pid     = _ap_instance->api->id; +        rw_lock_unlock(&_ap_instance->data_lock); +          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; @@ -235,18 +269,8 @@ int flow_accept(int     fd,                  return -1;          } -        cfd = bmp_allocate(_ap_instance->fds); - -        _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); -        if (_ap_instance->flows[cfd].rb == NULL) { -                bmp_release(_ap_instance->fds, cfd); -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } -          *ap_name = strdup(recv_msg->ap_name);          if (*ap_name == NULL) { -                bmp_release(_ap_instance->fds, cfd);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -254,21 +278,46 @@ int flow_accept(int     fd,          if (ae_name != NULL) {                  *ae_name = strdup(recv_msg->ae_name);                  if (*ae_name == NULL) { -                        bmp_release(_ap_instance->fds, cfd);                          irm_msg__free_unpacked(recv_msg, NULL);                          return -1;                  }          } +        rw_lock_wrlock(&_ap_instance->data_lock); + +        cfd = bmp_allocate(_ap_instance->fds); + +        rw_lock_unlock(&_ap_instance->data_lock); + +        rw_lock_wrlock(&_ap_instance->flows_lock); + +        _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); +        if (_ap_instance->flows[cfd].rb == NULL) { +                rw_lock_wrlock(&_ap_instance->data_lock); + +                bmp_release(_ap_instance->fds, cfd); + +                rw_lock_unlock(&_ap_instance->data_lock); + +                irm_msg__free_unpacked(recv_msg, NULL); + +                rw_lock_unlock(&_ap_instance->flows_lock); +                return -1; +        } +          _ap_instance->flows[cfd].port_id = recv_msg->port_id;          _ap_instance->flows[cfd].oflags  = FLOW_O_DEFAULT; - +        rw_lock_unlock(&_ap_instance->flows_lock);          irm_msg__free_unpacked(recv_msg, NULL); +        rw_lock_wrlock(&_ap_instance->data_lock); +          bmp_release(_ap_instance->fds, fd); +        rw_lock_unlock(&_ap_instance->data_lock); +          return cfd;  } @@ -281,9 +330,21 @@ int flow_alloc_resp(int fd,          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;          msg.has_pid      = true; + +        rw_lock_rdlock(&_ap_instance->data_lock); +          msg.pid          = _ap_instance->api->id; + +        rw_lock_unlock(&_ap_instance->data_lock); +          msg.has_port_id  = true; + +        rw_lock_rdlock(&_ap_instance->flows_lock); +          msg.port_id      = _ap_instance->flows[fd].port_id; + +        rw_lock_unlock(&_ap_instance->flows_lock); +          msg.has_response = true;          msg.response     = response; @@ -318,10 +379,15 @@ int flow_alloc(char * dst_name,          msg.code        = IRM_MSG_CODE__IRM_FLOW_ALLOC;          msg.dst_name    = dst_name; -        msg.ap_name     = _ap_instance->api->name; +        msg.ae_name     = src_ae_name;          msg.has_pid     = true; + +        rw_lock_rdlock(&_ap_instance->data_lock); +          msg.pid         = _ap_instance->api->id; -        msg.ae_name     = src_ae_name; +        msg.ap_name     = _ap_instance->api->name; + +        rw_lock_unlock(&_ap_instance->data_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) @@ -332,11 +398,23 @@ int flow_alloc(char * dst_name,                  return -1;          } +        rw_lock_wrlock(&_ap_instance->data_lock); +          fd = bmp_allocate(_ap_instance->fds); +        rw_lock_unlock(&_ap_instance->data_lock); + +        rw_lock_wrlock(&_ap_instance->flows_lock); +          _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid);          if (_ap_instance->flows[fd].rb == NULL) { +                rw_lock_wrlock(&_ap_instance->data_lock); +                  bmp_release(_ap_instance->fds, fd); + +                rw_lock_unlock(&_ap_instance->data_lock); + +                rw_lock_unlock(&_ap_instance->flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -344,6 +422,8 @@ int flow_alloc(char * dst_name,          _ap_instance->flows[fd].port_id = recv_msg->port_id;          _ap_instance->flows[fd].oflags  = FLOW_O_DEFAULT; +        rw_lock_unlock(&_ap_instance->flows_lock); +          irm_msg__free_unpacked(recv_msg, NULL);          return fd; @@ -357,8 +437,13 @@ int flow_alloc_res(int fd)          msg.code          = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;          msg.has_port_id  = true; + +        rw_lock_rdlock(&_ap_instance->flows_lock); +          msg.port_id      = _ap_instance->flows[fd].port_id; +        rw_lock_unlock(&_ap_instance->flows_lock); +          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; @@ -382,8 +467,14 @@ int flow_dealloc(int fd)          msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC;          msg.has_port_id  = true; + + +        rw_lock_rdlock(&_ap_instance->data_lock); +          msg.port_id      = _ap_instance->flows[fd].port_id; +        rw_lock_unlock(&_ap_instance->data_lock); +          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -1; @@ -401,42 +492,77 @@ int flow_dealloc(int fd)  int flow_cntl(int fd, int cmd, int oflags)  { -        int old = _ap_instance->flows[fd].oflags; +        int old; + +        rw_lock_wrlock(&_ap_instance->flows_lock); + +        old = _ap_instance->flows[fd].oflags; +          switch (cmd) {          case FLOW_F_GETFL: /* GET FLOW FLAGS */ -                return _ap_instance->flows[fd].oflags; +                rw_lock_unlock(&_ap_instance->flows_lock); +                return old;          case FLOW_F_SETFL: /* SET FLOW FLAGS */                  _ap_instance->flows[fd].oflags = oflags; +                rw_lock_unlock(&_ap_instance->flows_lock);                  return old;          default: +                rw_lock_unlock(&_ap_instance->flows_lock);                  return FLOW_O_INVALID; /* unknown command */          }  }  ssize_t flow_write(int fd, void * buf, size_t count)  { -        size_t index = shm_create_du_buff(_ap_instance->dum, -                                          count + DU_BUFF_HEADSPACE + -                                          DU_BUFF_TAILSPACE, -                                          DU_BUFF_HEADSPACE, -                                          (uint8_t *) buf, -                                          count); -        struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; -        if (index == -1) +        size_t index; +        struct rb_entry e; + +        if (buf == NULL) +                return 0; + +        rw_lock_rdlock(&_ap_instance->data_lock); + +        index = shm_create_du_buff(_ap_instance->dum, +                                   count + DU_BUFF_HEADSPACE + +                                   DU_BUFF_TAILSPACE, +                                   DU_BUFF_HEADSPACE, +                                   (uint8_t *) buf, +                                   count); +        if (index == -1) { +                rw_lock_unlock(&_ap_instance->data_lock);                  return -1; +        } + +        rw_lock_rdlock(&_ap_instance->flows_lock); + +        e.index   = index; +        e.port_id = _ap_instance->flows[fd].port_id;          if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {                  if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {                          shm_release_du_buff(_ap_instance->dum, index); + +                        rw_lock_unlock(&_ap_instance->flows_lock); + +                        rw_lock_unlock(&_ap_instance->data_lock); +                          return -EPIPE;                  } +                rw_lock_unlock(&_ap_instance->flows_lock); + +                rw_lock_unlock(&_ap_instance->data_lock); +                  return 0;          } else {                  while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) -                        ; +                        LOG_DBGF("Couldn't write to rbuff.");          } +        rw_lock_unlock(&_ap_instance->data_lock); + +        rw_lock_unlock(&_ap_instance->flows_lock); +          return 0;  } @@ -446,27 +572,41 @@ ssize_t flow_read(int fd, void * buf, size_t count)          int n;          uint8_t * sdu; +        rw_lock_rdlock(&_ap_instance->data_lock); + +        rw_lock_rdlock(&_ap_instance->flows_lock); +          if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {                  e = shm_ap_rbuff_read(_ap_instance->rb);          } else { -                /* FIXME: move this to a thread  */ + +                /* FIXME: this will throw away packets for other fd's */                  while (e == NULL || -                       e->port_id != _ap_instance->flows[fd].port_id) +                       e->port_id != _ap_instance->flows[fd].port_id) {                          e = shm_ap_rbuff_read(_ap_instance->rb); +                }          } -        if (e == NULL) +        rw_lock_unlock(&_ap_instance->flows_lock); + +        if (e == NULL) { +                rw_lock_unlock(&_ap_instance->data_lock);                  return -1; +        }          n = shm_du_map_read_sdu(&sdu,                                  _ap_instance->dum,                                  e->index); -        if (n < 0) +        if (n < 0) { +                rw_lock_unlock(&_ap_instance->data_lock);                  return -1; +        }          memcpy(buf, sdu, MIN(n, count));          shm_release_du_buff(_ap_instance->dum, e->index); +        rw_lock_unlock(&_ap_instance->data_lock); +          return n;  } diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6c04ccc5..da6f0e33 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -253,8 +253,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)          }          e = malloc(sizeof(*e)); -        if (e == NULL) +        if (e == NULL) { +                pthread_mutex_unlock(rb->shm_mutex);                  return NULL; +        }          *e = *(rb->shm_base + *rb->ptr_tail); | 
