diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 370 | 
1 files changed, 188 insertions, 182 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index 25c3fcd4..ee1e7656 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -42,9 +42,11 @@ struct flow {          struct timespec *     timeout;  }; -struct ap_data { +struct ap_instance {          char *                ap_name; +        char *                daf_name;          pid_t                 api; +          struct shm_rdrbuff *  rdrb;          struct bmp *          fds;          struct shm_ap_rbuff * rb; @@ -52,7 +54,7 @@ struct ap_data {          struct flow           flows[AP_MAX_FLOWS];          pthread_rwlock_t      flows_lock; -} * _ap_instance; +} * ai;  static int api_announce(char * ap_name)  { @@ -63,12 +65,12 @@ static int api_announce(char * ap_name)          msg.code    = IRM_MSG_CODE__IRM_API_ANNOUNCE;          msg.has_api = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); +        pthread_rwlock_rdlock(&ai->data_lock); -        msg.api = _ap_instance->api; +        msg.api = ai->api;          msg.ap_name = ap_name; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { @@ -91,45 +93,46 @@ int ap_init(char * ap_name)          ap_name = path_strip(ap_name); -        _ap_instance = malloc(sizeof(struct ap_data)); -        if (_ap_instance == NULL) { +        ai = malloc(sizeof(*ai)); +        if (ai == NULL) {                  return -ENOMEM;          } -        _ap_instance->api = getpid(); -        _ap_instance->ap_name = ap_name; +        ai->api = getpid(); +        ai->ap_name = ap_name; +        ai->daf_name = NULL; -        _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); -        if (_ap_instance->fds == NULL) { -                free(_ap_instance); +        ai->fds = bmp_create(AP_MAX_FLOWS, 0); +        if (ai->fds == NULL) { +                free(ai);                  return -ENOMEM;          } -        _ap_instance->rdrb = shm_rdrbuff_open(); -        if (_ap_instance->rdrb == NULL) { -                bmp_destroy(_ap_instance->fds); -                free(_ap_instance); +        ai->rdrb = shm_rdrbuff_open(); +        if (ai->rdrb == NULL) { +                bmp_destroy(ai->fds); +                free(ai);                  return -1;          } -        _ap_instance->rb = shm_ap_rbuff_create_s(); -        if (_ap_instance->rb == NULL) { -                shm_rdrbuff_close(_ap_instance->rdrb); -                bmp_destroy(_ap_instance->fds); -                free(_ap_instance); +        ai->rb = shm_ap_rbuff_create_s(); +        if (ai->rb == NULL) { +                shm_rdrbuff_close(ai->rdrb); +                bmp_destroy(ai->fds); +                free(ai);                  return -1;          }          for (i = 0; i < AP_MAX_FLOWS; ++i) { -                _ap_instance->flows[i].rb = NULL; -                _ap_instance->flows[i].port_id = -1; -                _ap_instance->flows[i].oflags = 0; -                _ap_instance->flows[i].api = -1; -                _ap_instance->flows[i].timeout = NULL; +                ai->flows[i].rb = NULL; +                ai->flows[i].port_id = -1; +                ai->flows[i].oflags = 0; +                ai->flows[i].api = -1; +                ai->flows[i].timeout = NULL;          } -        pthread_rwlock_init(&_ap_instance->flows_lock, NULL); -        pthread_rwlock_init(&_ap_instance->data_lock, NULL); +        pthread_rwlock_init(&ai->flows_lock, NULL); +        pthread_rwlock_init(&ai->data_lock, NULL);          if (ap_name != NULL)                  return api_announce(ap_name); @@ -141,42 +144,45 @@ void ap_fini(void)  {          int i = 0; -        if (_ap_instance == NULL) +        if (ai == NULL)                  return; -        pthread_rwlock_wrlock(&_ap_instance->data_lock); +        pthread_rwlock_wrlock(&ai->data_lock);          /* remove all remaining sdus */ -        while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) -                shm_rdrbuff_remove(_ap_instance->rdrb, i); +        while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0) +                shm_rdrbuff_remove(ai->rdrb, i); + +        if (ai->fds != NULL) +                bmp_destroy(ai->fds); +        if (ai->rb != NULL) +                shm_ap_rbuff_destroy(ai->rb); +        if (ai->rdrb != NULL) +                shm_rdrbuff_close(ai->rdrb); -        if (_ap_instance->fds != NULL) -                bmp_destroy(_ap_instance->fds); -        if (_ap_instance->rb != NULL) -                shm_ap_rbuff_destroy(_ap_instance->rb); -        if (_ap_instance->rdrb != NULL) -                shm_rdrbuff_close(_ap_instance->rdrb); +        if (ai->daf_name != NULL) +                free(ai->daf_name); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->flows_lock);          for (i = 0; i < AP_MAX_FLOWS; ++i) -                if (_ap_instance->flows[i].rb != NULL) -                        shm_ap_rbuff_close(_ap_instance->flows[i].rb); +                if (ai->flows[i].rb != NULL) +                        shm_ap_rbuff_close(ai->flows[i].rb); -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock); -        pthread_rwlock_destroy(&_ap_instance->flows_lock); -        pthread_rwlock_destroy(&_ap_instance->data_lock); +        pthread_rwlock_destroy(&ai->flows_lock); +        pthread_rwlock_destroy(&ai->data_lock); -        free(_ap_instance); +        free(ai);  }  static int port_id_to_fd(int port_id)  {          int i;          for (i = 0; i < AP_MAX_FLOWS; ++i) -                if (_ap_instance->flows[i].port_id == port_id) +                if (ai->flows[i].port_id == port_id)                          return i;          return -1;  } @@ -190,11 +196,11 @@ int flow_accept(char ** ae_name)          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_api = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); +        pthread_rwlock_rdlock(&ai->data_lock); -        msg.api     = _ap_instance->api; +        msg.api     = ai->api; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          recv_msg = send_recv_irm_msg_b(&msg);          if (recv_msg == NULL) @@ -205,22 +211,22 @@ int flow_accept(char ** ae_name)                  return -1;          } -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai->flows_lock); -        cfd = bmp_allocate(_ap_instance->fds); -        if (!bmp_is_id_valid(_ap_instance->fds, cfd)) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        cfd = bmp_allocate(ai->fds); +        if (!bmp_is_id_valid(ai->fds, cfd)) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        _ap_instance->flows[cfd].rb = shm_ap_rbuff_open_n(recv_msg->api); -        if (_ap_instance->flows[cfd].rb == NULL) { -                bmp_release(_ap_instance->fds, cfd); -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        ai->flows[cfd].rb = shm_ap_rbuff_open_n(recv_msg->api); +        if (ai->flows[cfd].rb == NULL) { +                bmp_release(ai->fds, cfd); +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -228,21 +234,21 @@ int flow_accept(char ** ae_name)          if (ae_name != NULL) {                  *ae_name = strdup(recv_msg->ae_name);                  if (*ae_name == NULL) { -                        shm_ap_rbuff_close(_ap_instance->flows[cfd].rb); -                        bmp_release(_ap_instance->fds, cfd); -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ap_instance->data_lock); +                        shm_ap_rbuff_close(ai->flows[cfd].rb); +                        bmp_release(ai->fds, cfd); +                        pthread_rwlock_unlock(&ai->flows_lock); +                        pthread_rwlock_unlock(&ai->data_lock);                          irm_msg__free_unpacked(recv_msg, NULL);                          return -ENOMEM;                  }          } -        _ap_instance->flows[cfd].port_id = recv_msg->port_id; -        _ap_instance->flows[cfd].oflags  = FLOW_O_DEFAULT; -        _ap_instance->flows[cfd].api     = recv_msg->api; +        ai->flows[cfd].port_id = recv_msg->port_id; +        ai->flows[cfd].oflags  = FLOW_O_DEFAULT; +        ai->flows[cfd].api     = recv_msg->api; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -261,40 +267,40 @@ int flow_alloc_resp(int fd,          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;          msg.has_api      = true; -        msg.api          = _ap_instance->api; +        msg.api          = ai->api;          msg.has_port_id  = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        msg.port_id      = _ap_instance->flows[fd].port_id; +        msg.port_id      = ai->flows[fd].port_id; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); +        pthread_rwlock_unlock(&ai->flows_lock);          msg.has_response = true;          msg.response     = response;          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -1;          }          if (!recv_msg->has_result) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ret = recv_msg->result; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -320,11 +326,11 @@ int flow_alloc(char * dst_name,          msg.ae_name     = src_ae_name;          msg.has_api     = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); +        pthread_rwlock_rdlock(&ai->data_lock); -        msg.api         = _ap_instance->api; +        msg.api         = ai->api; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { @@ -336,32 +342,32 @@ int flow_alloc(char * dst_name,                  return -1;          } -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai->flows_lock); -        fd = bmp_allocate(_ap_instance->fds); -        if (!bmp_is_id_valid(_ap_instance->fds, fd)) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        fd = bmp_allocate(ai->fds); +        if (!bmp_is_id_valid(ai->fds, fd)) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        _ap_instance->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); -        if (_ap_instance->flows[fd].rb == NULL) { -                bmp_release(_ap_instance->fds, fd); -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); +        if (ai->flows[fd].rb == NULL) { +                bmp_release(ai->fds, fd); +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        _ap_instance->flows[fd].port_id = recv_msg->port_id; -        _ap_instance->flows[fd].oflags  = FLOW_O_DEFAULT; -        _ap_instance->flows[fd].api     = recv_msg->api; +        ai->flows[fd].port_id = recv_msg->port_id; +        ai->flows[fd].oflags  = FLOW_O_DEFAULT; +        ai->flows[fd].api     = recv_msg->api; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -380,19 +386,19 @@ int flow_alloc_res(int fd)          msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;          msg.has_port_id  = true; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        msg.port_id = _ap_instance->flows[fd].port_id; +        msg.port_id = ai->flows[fd].port_id; -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock);          recv_msg = send_recv_irm_msg_b(&msg);          if (recv_msg == NULL) { @@ -422,41 +428,41 @@ int flow_dealloc(int fd)          msg.has_api      = true;          msg.api          = getpid(); -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        msg.port_id = _ap_instance->flows[fd].port_id; +        msg.port_id = ai->flows[fd].port_id; -        _ap_instance->flows[fd].port_id = -1; -        shm_ap_rbuff_close(_ap_instance->flows[fd].rb); -        _ap_instance->flows[fd].rb = NULL; -        _ap_instance->flows[fd].api = -1; +        ai->flows[fd].port_id = -1; +        shm_ap_rbuff_close(ai->flows[fd].rb); +        ai->flows[fd].rb = NULL; +        ai->flows[fd].api = -1; -        bmp_release(_ap_instance->fds, fd); +        bmp_release(ai->fds, fd); -        pthread_rwlock_unlock(&_ap_instance->flows_lock); +        pthread_rwlock_unlock(&ai->flows_lock);          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -1;          }          if (!recv_msg->has_result) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ret = recv_msg->result; -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          irm_msg__free_unpacked(recv_msg, NULL); @@ -470,30 +476,30 @@ int flow_cntl(int fd, int cmd, int oflags)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_wrlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_wrlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        old = _ap_instance->flows[fd].oflags; +        old = ai->flows[fd].oflags;          switch (cmd) {          case FLOW_F_GETFL: /* GET FLOW FLAGS */ -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return old;          case FLOW_F_SETFL: /* SET FLOW FLAGS */ -                _ap_instance->flows[fd].oflags = oflags; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                ai->flows[fd].oflags = oflags; +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return old;          default: -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return FLOW_O_INVALID; /* unknown command */          }  } @@ -509,42 +515,42 @@ ssize_t flow_write(int fd, void * buf, size_t count)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_rdrbuff_write(_ap_instance->rdrb, -                                       _ap_instance->flows[fd].api, +        if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { +                idx = shm_rdrbuff_write(ai->rdrb, +                                       ai->flows[fd].api,                                         DU_BUFF_HEADSPACE,                                         DU_BUFF_TAILSPACE,                                         (uint8_t *) buf,                                         count);                  if (idx == -1) { -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ap_instance->data_lock); +                        pthread_rwlock_unlock(&ai->flows_lock); +                        pthread_rwlock_unlock(&ai->data_lock);                          return -EAGAIN;                  }                  e.index   = idx; -                e.port_id = _ap_instance->flows[fd].port_id; +                e.port_id = ai->flows[fd].port_id; -                if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { -                        shm_rdrbuff_remove(_ap_instance->rdrb, idx); -                        pthread_rwlock_unlock(&_ap_instance->flows_lock); -                        pthread_rwlock_unlock(&_ap_instance->data_lock); +                if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) { +                        shm_rdrbuff_remove(ai->rdrb, idx); +                        pthread_rwlock_unlock(&ai->flows_lock); +                        pthread_rwlock_unlock(&ai->data_lock);                          return -1;                  }          } else { /* blocking */ -                struct shm_rdrbuff * rdrb = _ap_instance->rdrb; -                pid_t                api = _ap_instance->flows[fd].api; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                struct shm_rdrbuff * rdrb = ai->rdrb; +                pid_t                api = ai->flows[fd].api; +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  idx = shm_rdrbuff_write_b(rdrb,                                           api, @@ -553,25 +559,25 @@ ssize_t flow_write(int fd, void * buf, size_t count)                                           (uint8_t *) buf,                                           count); -                pthread_rwlock_rdlock(&_ap_instance->data_lock); -                pthread_rwlock_rdlock(&_ap_instance->flows_lock); +                pthread_rwlock_rdlock(&ai->data_lock); +                pthread_rwlock_rdlock(&ai->flows_lock);                  e.index   = idx; -                e.port_id = _ap_instance->flows[fd].port_id; +                e.port_id = ai->flows[fd].port_id; -                while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) +                while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0)                          ;          } -        pthread_rwlock_unlock(&_ap_instance->flows_lock); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->flows_lock); +        pthread_rwlock_unlock(&ai->data_lock);          return 0;  }  int flow_select(const struct timespec * timeout)  { -        int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout); +        int port_id = shm_ap_rbuff_peek_b(ai->rb, timeout);          if (port_id < 0)                  return port_id;          return port_id_to_fd(port_id); @@ -586,47 +592,47 @@ ssize_t flow_read(int fd, void * buf, size_t count)          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; -        pthread_rwlock_rdlock(&_ap_instance->data_lock); -        pthread_rwlock_rdlock(&_ap_instance->flows_lock); +        pthread_rwlock_rdlock(&ai->data_lock); +        pthread_rwlock_rdlock(&ai->flows_lock); -        if (_ap_instance->flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +        if (ai->flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -ENOTALLOC;          } -        if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_ap_rbuff_read_port(_ap_instance->rb, -                                             _ap_instance->flows[fd].port_id); -                pthread_rwlock_unlock(&_ap_instance->flows_lock); +        if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { +                idx = shm_ap_rbuff_read_port(ai->rb, +                                             ai->flows[fd].port_id); +                pthread_rwlock_unlock(&ai->flows_lock);          } else { -                struct shm_ap_rbuff * rb      = _ap_instance->rb; -                int                   port_id = _ap_instance->flows[fd].port_id; -                struct timespec *     timeout = _ap_instance->flows[fd].timeout; -                pthread_rwlock_unlock(&_ap_instance->flows_lock); -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                struct shm_ap_rbuff * rb      = ai->rb; +                int                   port_id = ai->flows[fd].port_id; +                struct timespec *     timeout = ai->flows[fd].timeout; +                pthread_rwlock_unlock(&ai->flows_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); -                pthread_rwlock_rdlock(&_ap_instance->data_lock); +                pthread_rwlock_rdlock(&ai->data_lock);          }          if (idx < 0) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -EAGAIN;          } -        n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx); +        n = shm_rdrbuff_read(&sdu, ai->rdrb, idx);          if (n < 0) { -                pthread_rwlock_unlock(&_ap_instance->data_lock); +                pthread_rwlock_unlock(&ai->data_lock);                  return -1;          }          memcpy(buf, sdu, MIN(n, count)); -        shm_rdrbuff_remove(_ap_instance->rdrb, idx); +        shm_rdrbuff_remove(ai->rdrb, idx); -        pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_unlock(&ai->data_lock);          return n;  } | 
