summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c489
1 files changed, 290 insertions, 199 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 17c473ed..391563da 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -28,9 +28,18 @@
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/utils.h>
+#include <ouroboros/select.h>
#include <stdlib.h>
#include <string.h>
+#include <stdio.h>
+
+struct flow_set {
+ bool dirty;
+ bool b[IRMD_MAX_FLOWS]; /* working copy */
+ bool s[IRMD_MAX_FLOWS]; /* safe copy */
+ pthread_rwlock_t lock;
+};
struct flow {
struct shm_ap_rbuff * rb;
@@ -42,17 +51,21 @@ struct flow {
struct timespec * timeout;
};
-struct ap_data {
+struct ap_instance {
char * ap_name;
+ char * daf_name;
pid_t api;
+
struct shm_rdrbuff * rdrb;
struct bmp * fds;
struct shm_ap_rbuff * rb;
pthread_rwlock_t data_lock;
struct flow flows[AP_MAX_FLOWS];
+ int ports[AP_MAX_FLOWS];
+
pthread_rwlock_t flows_lock;
-} * _ap_instance;
+} * ai;
static int api_announce(char * ap_name)
{
@@ -63,12 +76,12 @@ static int api_announce(char * ap_name)
msg.code = IRM_MSG_CODE__IRM_API_ANNOUNCE;
msg.has_api = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
- msg.api = _ap_instance->api;
+ msg.api = ai->api;
msg.ap_name = ap_name;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
@@ -91,45 +104,47 @@ int ap_init(char * ap_name)
ap_name = path_strip(ap_name);
- _ap_instance = malloc(sizeof(struct ap_data));
- if (_ap_instance == NULL) {
+ ai = malloc(sizeof(*ai));
+ if (ai == NULL) {
return -ENOMEM;
}
- _ap_instance->api = getpid();
- _ap_instance->ap_name = ap_name;
+ ai->api = getpid();
+ ai->ap_name = ap_name;
+ ai->daf_name = NULL;
- _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
- if (_ap_instance->fds == NULL) {
- free(_ap_instance);
+ ai->fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (ai->fds == NULL) {
+ free(ai);
return -ENOMEM;
}
- _ap_instance->rdrb = shm_rdrbuff_open();
- if (_ap_instance->rdrb == NULL) {
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
+ ai->rdrb = shm_rdrbuff_open();
+ if (ai->rdrb == NULL) {
+ bmp_destroy(ai->fds);
+ free(ai);
return -1;
}
- _ap_instance->rb = shm_ap_rbuff_create();
- if (_ap_instance->rb == NULL) {
- shm_rdrbuff_close(_ap_instance->rdrb);
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
+ ai->rb = shm_ap_rbuff_create_s();
+ if (ai->rb == NULL) {
+ shm_rdrbuff_close(ai->rdrb);
+ bmp_destroy(ai->fds);
+ free(ai);
return -1;
}
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- _ap_instance->flows[i].rb = NULL;
- _ap_instance->flows[i].port_id = -1;
- _ap_instance->flows[i].oflags = 0;
- _ap_instance->flows[i].api = -1;
- _ap_instance->flows[i].timeout = NULL;
+ ai->flows[i].rb = NULL;
+ ai->flows[i].port_id = -1;
+ ai->flows[i].oflags = 0;
+ ai->flows[i].api = -1;
+ ai->flows[i].timeout = NULL;
+ ai->ports[i] = -1;
}
- pthread_rwlock_init(&_ap_instance->flows_lock, NULL);
- pthread_rwlock_init(&_ap_instance->data_lock, NULL);
+ pthread_rwlock_init(&ai->flows_lock, NULL);
+ pthread_rwlock_init(&ai->data_lock, NULL);
if (ap_name != NULL)
return api_announce(ap_name);
@@ -141,60 +156,56 @@ void ap_fini(void)
{
int i = 0;
- if (_ap_instance == NULL)
+ if (ai == NULL)
return;
- pthread_rwlock_wrlock(&_ap_instance->data_lock);
+ pthread_rwlock_wrlock(&ai->data_lock);
/* remove all remaining sdus */
- while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0)
- shm_rdrbuff_remove(_ap_instance->rdrb, i);
+ while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0)
+ shm_rdrbuff_remove(ai->rdrb, i);
- if (_ap_instance->fds != NULL)
- bmp_destroy(_ap_instance->fds);
- if (_ap_instance->rb != NULL)
- shm_ap_rbuff_destroy(_ap_instance->rb);
- if (_ap_instance->rdrb != NULL)
- shm_rdrbuff_close(_ap_instance->rdrb);
+ if (ai->fds != NULL)
+ bmp_destroy(ai->fds);
+ if (ai->rb != NULL)
+ shm_ap_rbuff_destroy(ai->rb);
+ if (ai->rdrb != NULL)
+ shm_rdrbuff_close(ai->rdrb);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ if (ai->daf_name != NULL)
+ free(ai->daf_name);
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- if (_ap_instance->flows[i].rb != NULL)
- shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
+ if (ai->flows[i].rb != NULL)
+ shm_ap_rbuff_close(ai->flows[i].rb);
+ ai->ports[ai->flows[i].port_id] = -1;
+ }
- pthread_rwlock_destroy(&_ap_instance->flows_lock);
- pthread_rwlock_destroy(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
- free(_ap_instance);
-}
+ pthread_rwlock_destroy(&ai->flows_lock);
+ pthread_rwlock_destroy(&ai->data_lock);
-static int port_id_to_fd(int port_id)
-{
- int i;
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- if (_ap_instance->flows[i].port_id == port_id)
- return i;
- return -1;
+ free(ai);
}
int flow_accept(char ** ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int cfd = -1;
+ int fd = -1;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_api = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
- msg.api = _ap_instance->api;
+ msg.api = ai->api;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
if (recv_msg == NULL)
@@ -205,22 +216,22 @@ int flow_accept(char ** ae_name)
return -1;
}
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai->flows_lock);
- cfd = bmp_allocate(_ap_instance->fds);
- if (!bmp_is_id_valid(_ap_instance->fds, cfd)) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ fd = bmp_allocate(ai->fds);
+ if (!bmp_is_id_valid(ai->fds, fd)) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->api);
- if (_ap_instance->flows[cfd].rb == NULL) {
- bmp_release(_ap_instance->fds, cfd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
+ if (ai->flows[fd].rb == NULL) {
+ bmp_release(ai->fds, fd);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -228,25 +239,27 @@ int flow_accept(char ** ae_name)
if (ae_name != NULL) {
*ae_name = strdup(recv_msg->ae_name);
if (*ae_name == NULL) {
- shm_ap_rbuff_close(_ap_instance->flows[cfd].rb);
- bmp_release(_ap_instance->fds, cfd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ shm_ap_rbuff_close(ai->flows[fd].rb);
+ bmp_release(ai->fds, fd);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -ENOMEM;
}
}
- _ap_instance->flows[cfd].port_id = recv_msg->port_id;
- _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT;
- _ap_instance->flows[cfd].api = recv_msg->api;
+ ai->flows[fd].port_id = recv_msg->port_id;
+ ai->flows[fd].oflags = FLOW_O_DEFAULT;
+ ai->flows[fd].api = recv_msg->api;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->ports[recv_msg->port_id] = fd;
+
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
- return cfd;
+ return fd;
}
int flow_alloc_resp(int fd,
@@ -261,40 +274,40 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_api = true;
- msg.api = _ap_instance->api;
+ msg.api = ai->api;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = ai->flows[fd].port_id;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
msg.has_response = true;
msg.response = response;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -1;
}
if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ret = recv_msg->result;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -320,11 +333,11 @@ int flow_alloc(char * dst_name,
msg.ae_name = src_ae_name;
msg.has_api = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
- msg.api = _ap_instance->api;
+ msg.api = ai->api;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
@@ -336,32 +349,34 @@ int flow_alloc(char * dst_name,
return -1;
}
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai->flows_lock);
- fd = bmp_allocate(_ap_instance->fds);
- if (!bmp_is_id_valid(_ap_instance->fds, fd)) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ fd = bmp_allocate(ai->fds);
+ if (!bmp_is_id_valid(ai->fds, fd)) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
- if (_ap_instance->flows[fd].rb == NULL) {
- bmp_release(_ap_instance->fds, fd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
+ if (ai->flows[fd].rb == NULL) {
+ bmp_release(ai->fds, fd);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- _ap_instance->flows[fd].port_id = recv_msg->port_id;
- _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
- _ap_instance->flows[fd].api = recv_msg->api;
+ ai->flows[fd].port_id = recv_msg->port_id;
+ ai->flows[fd].oflags = FLOW_O_DEFAULT;
+ ai->flows[fd].api = recv_msg->api;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->ports[recv_msg->port_id] = fd;
+
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -380,19 +395,19 @@ int flow_alloc_res(int fd)
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = ai->flows[fd].port_id;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
if (recv_msg == NULL) {
@@ -422,41 +437,43 @@ int flow_dealloc(int fd)
msg.has_api = true;
msg.api = getpid();
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = ai->flows[fd].port_id;
+
+ ai->ports[msg.port_id] = -1;
- _ap_instance->flows[fd].port_id = -1;
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
- _ap_instance->flows[fd].rb = NULL;
- _ap_instance->flows[fd].api = -1;
+ ai->flows[fd].port_id = -1;
+ shm_ap_rbuff_close(ai->flows[fd].rb);
+ ai->flows[fd].rb = NULL;
+ ai->flows[fd].api = -1;
- bmp_release(_ap_instance->fds, fd);
+ bmp_release(ai->fds, fd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -1;
}
if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ret = recv_msg->result;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -470,30 +487,30 @@ int flow_cntl(int fd, int cmd, int oflags)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- old = _ap_instance->flows[fd].oflags;
+ old = ai->flows[fd].oflags;
switch (cmd) {
case FLOW_F_GETFL: /* GET FLOW FLAGS */
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return old;
case FLOW_F_SETFL: /* SET FLOW FLAGS */
- _ap_instance->flows[fd].oflags = oflags;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->flows[fd].oflags = oflags;
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return old;
default:
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return FLOW_O_INVALID; /* unknown command */
}
}
@@ -509,42 +526,42 @@ ssize_t flow_write(int fd, void * buf, size_t count)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_rdrbuff_write(_ap_instance->rdrb,
- _ap_instance->flows[fd].api,
+ if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_rdrbuff_write(ai->rdrb,
+ ai->flows[fd].api,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
(uint8_t *) buf,
count);
if (idx == -1) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -EAGAIN;
}
e.index = idx;
- e.port_id = _ap_instance->flows[fd].port_id;
+ e.port_id = ai->flows[fd].port_id;
- if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
- shm_rdrbuff_remove(_ap_instance->rdrb, idx);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) {
+ shm_rdrbuff_remove(ai->rdrb, idx);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -1;
}
} else { /* blocking */
- struct shm_rdrbuff * rdrb = _ap_instance->rdrb;
- pid_t api = _ap_instance->flows[fd].api;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ struct shm_rdrbuff * rdrb = ai->rdrb;
+ pid_t api = ai->flows[fd].api;
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
idx = shm_rdrbuff_write_b(rdrb,
api,
@@ -553,30 +570,22 @@ ssize_t flow_write(int fd, void * buf, size_t count)
(uint8_t *) buf,
count);
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
e.index = idx;
- e.port_id = _ap_instance->flows[fd].port_id;
+ e.port_id = ai->flows[fd].port_id;
- while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0)
+ while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0)
;
}
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return 0;
}
-int flow_select(const struct timespec * timeout)
-{
- int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout);
- if (port_id < 0)
- return port_id;
- return port_id_to_fd(port_id);
-}
-
ssize_t flow_read(int fd, void * buf, size_t count)
{
int idx = -1;
@@ -586,47 +595,129 @@ ssize_t flow_read(int fd, void * buf, size_t count)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_ap_rbuff_read_port(_ap_instance->rb,
- _ap_instance->flows[fd].port_id);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
+ if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_ap_rbuff_read_port(ai->rb,
+ ai->flows[fd].port_id);
+ pthread_rwlock_unlock(&ai->flows_lock);
} else {
- struct shm_ap_rbuff * rb = _ap_instance->rb;
- int port_id = _ap_instance->flows[fd].port_id;
- struct timespec * timeout = _ap_instance->flows[fd].timeout;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ struct shm_ap_rbuff * rb = ai->rb;
+ int port_id = ai->flows[fd].port_id;
+ struct timespec * timeout = ai->flows[fd].timeout;
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout);
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
}
if (idx < 0) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -EAGAIN;
}
- n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx);
+ n = shm_rdrbuff_read(&sdu, ai->rdrb, idx);
if (n < 0) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -1;
}
memcpy(buf, sdu, MIN(n, count));
- shm_rdrbuff_remove(_ap_instance->rdrb, idx);
+ shm_rdrbuff_remove(ai->rdrb, idx);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return n;
}
+
+/* select functions */
+
+struct flow_set * flow_set_create()
+{
+ struct flow_set * set = malloc(sizeof(*set));
+ if (set == NULL)
+ return NULL;
+
+ if (pthread_rwlock_init(&set->lock, NULL)) {
+ free(set);
+ return NULL;
+ }
+
+ memset(&set->b, 0, sizeof(set->b));
+
+ set->dirty = true;
+
+ return set;
+}
+
+void flow_set_zero(struct flow_set * set)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ memset(&set->b, 0, sizeof(set->b));
+ set->dirty = true;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+void flow_set_add(struct flow_set * set, int fd)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ set->b[ai->flows[fd].port_id] = true;
+ set->dirty = true;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+void flow_set_del(struct flow_set * set, int fd)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ set->b[ai->flows[fd].port_id] = false;
+ set->dirty = true;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+bool flow_set_has(struct flow_set * set, int fd)
+{
+ bool ret;
+ pthread_rwlock_rdlock(&set->lock);
+ ret = set->b[ai->flows[fd].port_id];
+ pthread_rwlock_unlock(&set->lock);
+ return ret;
+}
+
+void flow_set_destroy(struct flow_set * set)
+{
+ pthread_rwlock_destroy(&set->lock);
+ free(set);
+}
+
+static void flow_set_cpy(struct flow_set * set)
+{
+ pthread_rwlock_rdlock(&set->lock);
+ if (set->dirty)
+ memcpy(set->s, set->b, IRMD_MAX_FLOWS);
+ set->dirty = false;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+int flow_select(struct flow_set * set, const struct timespec * timeout)
+{
+ int port_id;
+ if (set == NULL) {
+ port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout);
+ } else {
+ flow_set_cpy(set);
+ port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout);
+ }
+ if (port_id < 0)
+ return port_id;
+ return ai->ports[port_id];
+}