summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/CMakeLists.txt3
-rw-r--r--include/ouroboros/dev.h19
-rw-r--r--include/ouroboros/select.h50
-rw-r--r--include/ouroboros/shm_ap_rbuff.h35
-rw-r--r--include/ouroboros/wrap/ouroboros.i2
-rw-r--r--src/ipcpd/local/main.c6
-rw-r--r--src/ipcpd/normal/fmgr.c4
-rw-r--r--src/ipcpd/normal/main.c2
-rw-r--r--src/ipcpd/shim-eth-llc/main.c7
-rw-r--r--src/ipcpd/shim-udp/main.c6
-rw-r--r--src/irmd/main.c18
-rw-r--r--src/lib/dev.c489
-rw-r--r--src/lib/lockfile.c9
-rw-r--r--src/lib/shm_ap_rbuff.c148
-rw-r--r--src/lib/shm_rdrbuff.c18
-rw-r--r--src/tools/cbr/cbr.c8
-rw-r--r--src/tools/oping/oping.c9
-rw-r--r--src/tools/oping/oping_client.c4
-rw-r--r--src/tools/oping/oping_server.c26
19 files changed, 581 insertions, 282 deletions
diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt
index ae922b89..78a7bb9c 100644
--- a/include/ouroboros/CMakeLists.txt
+++ b/include/ouroboros/CMakeLists.txt
@@ -10,7 +10,8 @@ set(HEADER_FILES
irm.h
irm_config.h
nsm.h
- qos.h)
+ qos.h
+ select.h)
install(FILES ${HEADER_FILES} DESTINATION usr/include/ouroboros)
diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h
index d5fb744b..fe5ff4b5 100644
--- a/include/ouroboros/dev.h
+++ b/include/ouroboros/dev.h
@@ -22,7 +22,6 @@
#include <unistd.h>
#include <stdint.h>
-#include <time.h>
#include <ouroboros/qos.h>
#include <ouroboros/flow.h>
@@ -34,10 +33,12 @@
/* These calls should be removed once we write the ouroboros OS. */
int ap_init(char * ap_name);
+
void ap_fini(void);
/* Returns file descriptor (> 0) and client AE name. */
int flow_accept(char ** ae_name);
+
int flow_alloc_resp(int fd, int result);
/*
@@ -47,13 +48,21 @@ int flow_alloc_resp(int fd, int result);
int flow_alloc(char * dst_name,
char * src_ae_name,
struct qos_spec * qos);
+
int flow_alloc_res(int fd);
int flow_dealloc(int fd);
-int flow_cntl(int fd, int cmd, int oflags);
-ssize_t flow_write(int fd, void * buf, size_t count);
-ssize_t flow_read(int fd, void * buf, size_t count);
-int flow_select(const struct timespec * timeout);
+int flow_cntl(int fd,
+ int cmd,
+ int oflags);
+
+ssize_t flow_write(int fd,
+ void * buf,
+ size_t count);
+
+ssize_t flow_read(int fd,
+ void * buf,
+ size_t count);
#endif
diff --git a/include/ouroboros/select.h b/include/ouroboros/select.h
new file mode 100644
index 00000000..9e0b8fec
--- /dev/null
+++ b/include/ouroboros/select.h
@@ -0,0 +1,50 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * A select call for flows
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_SELECT_H
+#define OUROBOROS_SELECT_H
+
+#include <stdbool.h>
+#include <time.h>
+
+struct flow_set;
+
+struct flow_set * flow_set_create();
+
+void flow_set_destroy(struct flow_set * set);
+
+void flow_set_zero(struct flow_set * set);
+
+void flow_set_add(struct flow_set * set,
+ int fd);
+
+void flow_set_del(struct flow_set * set,
+ int fd);
+
+bool flow_set_has(struct flow_set * set,
+ int fd);
+
+int flow_select(struct flow_set * set,
+ const struct timespec * timeout);
+
+#endif /* OUROBOROS_SELECT_H */
diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h
index 9dad0863..6b11fd2d 100644
--- a/include/ouroboros/shm_ap_rbuff.h
+++ b/include/ouroboros/shm_ap_rbuff.h
@@ -24,6 +24,7 @@
#ifndef OUROBOROS_SHM_AP_RBUFF_H
#define OUROBOROS_SHM_AP_RBUFF_H
+#include <ouroboros/select.h>
#include <sys/types.h>
#include <sys/time.h>
#include <stdbool.h>
@@ -35,20 +36,40 @@ struct rb_entry {
int port_id;
};
-struct shm_ap_rbuff * shm_ap_rbuff_create();
-struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api);
+/* recv SDUs from N + 1 */
+struct shm_ap_rbuff * shm_ap_rbuff_create_n();
+
+/* recv SDUs from N - 1 */
+struct shm_ap_rbuff * shm_ap_rbuff_create_s();
+
+/* write SDUs to N - 1 */
+struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api);
+
+/* write SDUs to N + 1 */
+struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api);
+
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb);
+
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);
+
int shm_ap_rbuff_write(struct shm_ap_rbuff * rb,
- struct rb_entry * e);
+ struct rb_entry * e);
+
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb);
+
int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb);
-int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
+
+int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
+ bool * set,
const struct timespec * timeout);
+
ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb,
- int port_id);
-ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
- int port_id,
+ int port_id);
+
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
const struct timespec * timeout);
+
void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb);
+
#endif /* OUROBOROS_SHM_AP_RBUFF_H */
diff --git a/include/ouroboros/wrap/ouroboros.i b/include/ouroboros/wrap/ouroboros.i
index 386c21cc..2f66aa16 100644
--- a/include/ouroboros/wrap/ouroboros.i
+++ b/include/ouroboros/wrap/ouroboros.i
@@ -30,6 +30,7 @@
#include "ouroboros/irm_config.h"
#include "ouroboros/nsm.h"
#include "ouroboros/qos.h"
+#include "ouroboros/select.h"
%}
typedef int pid_t;
@@ -42,3 +43,4 @@ typedef int pid_t;
%include "ouroboros/irm_config.h"
%include "ouroboros/nsm.h"
%include "ouroboros/qos.h"
+%include "ouroboros/select.h"
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index f1b6dd9e..c0809429 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -105,7 +105,7 @@ static int shim_ap_init()
return -1;
}
- _ap_instance->rb = shm_ap_rbuff_create();
+ _ap_instance->rb = shm_ap_rbuff_create_n();
if (_ap_instance->rb == NULL) {
shm_rdrbuff_close(_ap_instance->rdrb);
bmp_destroy(_ap_instance->fds);
@@ -331,7 +331,7 @@ static int ipcp_local_flow_alloc(pid_t n_api,
return -1; /* -ENOTENROLLED */
}
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL) {
pthread_rwlock_unlock(&_ipcp->state_lock);
return -1; /* -ENORBUFF */
@@ -421,7 +421,7 @@ static int ipcp_local_flow_alloc_resp(pid_t n_api,
return -1;
}
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
_ap_instance->flows[in_fd].state = FLOW_NULL;
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 5d54842e..79b1bb4b 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -367,7 +367,7 @@ int fmgr_flow_alloc(pid_t n_api,
free(buf.data);
- flow->flow.rb = shm_ap_rbuff_open(n_api);
+ flow->flow.rb = shm_ap_rbuff_open_s(n_api);
if (flow->flow.rb == NULL) {
pthread_mutex_unlock(&fmgr->n_flows_lock);
free(flow);
@@ -478,7 +478,7 @@ int fmgr_flow_alloc_resp(pid_t n_api,
flow->flow.state = FLOW_ALLOCATED;
flow->flow.api = n_api;
- flow->flow.rb = shm_ap_rbuff_open(n_api);
+ flow->flow.rb = shm_ap_rbuff_open_s(n_api);
if (flow->flow.rb == NULL) {
n_flow_dealloc(port_id);
pthread_mutex_unlock(&fmgr->n_flows_lock);
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index cf4ae3f1..082973f4 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -212,7 +212,7 @@ struct normal_ipcp_data * normal_ipcp_data_create()
return NULL;
}
- normal_data->rb = shm_ap_rbuff_open(getpid());
+ normal_data->rb = shm_ap_rbuff_create_n();
if (normal_data->rb == NULL) {
shm_rdrbuff_close(normal_data->rdrb);
free(normal_data);
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index a1ded117..d74984cc 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -62,6 +62,7 @@
#ifdef __FreeBSD__
#include <net/if_dl.h>
#include <netinet/if_ether.h>
+#include <ifaddrs.h>
#endif
#include <poll.h>
@@ -161,7 +162,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()
return NULL;
}
- eth_llc_data->rb = shm_ap_rbuff_create();
+ eth_llc_data->rb = shm_ap_rbuff_create_n();
if (eth_llc_data->rb == NULL) {
shm_rdrbuff_close(eth_llc_data->rdrb);
free(eth_llc_data);
@@ -1084,7 +1085,7 @@ static int eth_llc_ipcp_flow_alloc(pid_t n_api,
if (qos != QOS_CUBE_BE)
LOG_DBGF("QoS requested. Ethernet LLC can't do that. For now.");
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL)
return -1; /* -ENORBUFF */
@@ -1169,7 +1170,7 @@ static int eth_llc_ipcp_flow_alloc_resp(pid_t n_api,
return -1;
}
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
ipcp_flow(index)->state = FLOW_NULL;
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 34af71a7..c35bd244 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -128,7 +128,7 @@ static int shim_ap_init()
return -1;
}
- _ap_instance->rb = shm_ap_rbuff_create();
+ _ap_instance->rb = shm_ap_rbuff_create_n();
if (_ap_instance->rb == NULL) {
shm_rdrbuff_close(_ap_instance->rdrb);
bmp_destroy(_ap_instance->fds);
@@ -1179,7 +1179,7 @@ static int ipcp_udp_flow_alloc(pid_t n_api,
if (qos != QOS_CUBE_BE)
LOG_DBG("QoS requested. UDP/IP can't do that.");
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL)
return -1; /* -ENORBUFF */
@@ -1333,7 +1333,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_api,
return -1;
}
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
_ap_instance->flows[fd].state = FLOW_NULL;
diff --git a/src/irmd/main.c b/src/irmd/main.c
index a69dd526..cc9160bf 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -212,6 +212,9 @@ static pid_t get_ipcp_by_dst_name(char * dst_name)
list_for_each(p, &irmd->ipcps) {
struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next);
+ if (e->dif_name == NULL)
+ continue;
+
if (strcmp(e->dif_name, dif_name) == 0)
return e->api;
}
@@ -1742,7 +1745,7 @@ void * irm_sanitize()
if (kill(f->n_api, 0) < 0) {
struct shm_ap_rbuff * n_rb =
- shm_ap_rbuff_open(f->n_api);
+ shm_ap_rbuff_open_s(f->n_api);
bmp_release(irmd->port_ids, f->port_id);
list_del(&f->next);
@@ -1755,13 +1758,17 @@ void * irm_sanitize()
continue;
}
if (kill(f->n_1_api, 0) < 0) {
- struct shm_ap_rbuff * n_1_rb =
- shm_ap_rbuff_open(f->n_1_api);
+ struct shm_ap_rbuff * n_1_rb_s =
+ shm_ap_rbuff_open_s(f->n_1_api);
+ struct shm_ap_rbuff * n_1_rb_n =
+ shm_ap_rbuff_open_n(f->n_1_api);
list_del(&f->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
f->n_1_api, f->port_id);
- if (n_1_rb != NULL)
- shm_ap_rbuff_destroy(n_1_rb);
+ if (n_1_rb_n != NULL)
+ shm_ap_rbuff_destroy(n_1_rb_n);
+ if (n_1_rb_s != NULL)
+ shm_ap_rbuff_destroy(n_1_rb_s);
irm_flow_destroy(f);
}
}
@@ -2152,7 +2159,6 @@ int main(int argc, char ** argv)
}
}
-
if (!use_stdout &&
(log_dir = opendir(INSTALL_PREFIX LOG_DIR)) != NULL) {
while ((ent = readdir(log_dir)) != NULL) {
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 17c473ed..391563da 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -28,9 +28,18 @@
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/utils.h>
+#include <ouroboros/select.h>
#include <stdlib.h>
#include <string.h>
+#include <stdio.h>
+
+struct flow_set {
+ bool dirty;
+ bool b[IRMD_MAX_FLOWS]; /* working copy */
+ bool s[IRMD_MAX_FLOWS]; /* safe copy */
+ pthread_rwlock_t lock;
+};
struct flow {
struct shm_ap_rbuff * rb;
@@ -42,17 +51,21 @@ struct flow {
struct timespec * timeout;
};
-struct ap_data {
+struct ap_instance {
char * ap_name;
+ char * daf_name;
pid_t api;
+
struct shm_rdrbuff * rdrb;
struct bmp * fds;
struct shm_ap_rbuff * rb;
pthread_rwlock_t data_lock;
struct flow flows[AP_MAX_FLOWS];
+ int ports[AP_MAX_FLOWS];
+
pthread_rwlock_t flows_lock;
-} * _ap_instance;
+} * ai;
static int api_announce(char * ap_name)
{
@@ -63,12 +76,12 @@ static int api_announce(char * ap_name)
msg.code = IRM_MSG_CODE__IRM_API_ANNOUNCE;
msg.has_api = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
- msg.api = _ap_instance->api;
+ msg.api = ai->api;
msg.ap_name = ap_name;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
@@ -91,45 +104,47 @@ int ap_init(char * ap_name)
ap_name = path_strip(ap_name);
- _ap_instance = malloc(sizeof(struct ap_data));
- if (_ap_instance == NULL) {
+ ai = malloc(sizeof(*ai));
+ if (ai == NULL) {
return -ENOMEM;
}
- _ap_instance->api = getpid();
- _ap_instance->ap_name = ap_name;
+ ai->api = getpid();
+ ai->ap_name = ap_name;
+ ai->daf_name = NULL;
- _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
- if (_ap_instance->fds == NULL) {
- free(_ap_instance);
+ ai->fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (ai->fds == NULL) {
+ free(ai);
return -ENOMEM;
}
- _ap_instance->rdrb = shm_rdrbuff_open();
- if (_ap_instance->rdrb == NULL) {
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
+ ai->rdrb = shm_rdrbuff_open();
+ if (ai->rdrb == NULL) {
+ bmp_destroy(ai->fds);
+ free(ai);
return -1;
}
- _ap_instance->rb = shm_ap_rbuff_create();
- if (_ap_instance->rb == NULL) {
- shm_rdrbuff_close(_ap_instance->rdrb);
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
+ ai->rb = shm_ap_rbuff_create_s();
+ if (ai->rb == NULL) {
+ shm_rdrbuff_close(ai->rdrb);
+ bmp_destroy(ai->fds);
+ free(ai);
return -1;
}
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- _ap_instance->flows[i].rb = NULL;
- _ap_instance->flows[i].port_id = -1;
- _ap_instance->flows[i].oflags = 0;
- _ap_instance->flows[i].api = -1;
- _ap_instance->flows[i].timeout = NULL;
+ ai->flows[i].rb = NULL;
+ ai->flows[i].port_id = -1;
+ ai->flows[i].oflags = 0;
+ ai->flows[i].api = -1;
+ ai->flows[i].timeout = NULL;
+ ai->ports[i] = -1;
}
- pthread_rwlock_init(&_ap_instance->flows_lock, NULL);
- pthread_rwlock_init(&_ap_instance->data_lock, NULL);
+ pthread_rwlock_init(&ai->flows_lock, NULL);
+ pthread_rwlock_init(&ai->data_lock, NULL);
if (ap_name != NULL)
return api_announce(ap_name);
@@ -141,60 +156,56 @@ void ap_fini(void)
{
int i = 0;
- if (_ap_instance == NULL)
+ if (ai == NULL)
return;
- pthread_rwlock_wrlock(&_ap_instance->data_lock);
+ pthread_rwlock_wrlock(&ai->data_lock);
/* remove all remaining sdus */
- while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0)
- shm_rdrbuff_remove(_ap_instance->rdrb, i);
+ while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0)
+ shm_rdrbuff_remove(ai->rdrb, i);
- if (_ap_instance->fds != NULL)
- bmp_destroy(_ap_instance->fds);
- if (_ap_instance->rb != NULL)
- shm_ap_rbuff_destroy(_ap_instance->rb);
- if (_ap_instance->rdrb != NULL)
- shm_rdrbuff_close(_ap_instance->rdrb);
+ if (ai->fds != NULL)
+ bmp_destroy(ai->fds);
+ if (ai->rb != NULL)
+ shm_ap_rbuff_destroy(ai->rb);
+ if (ai->rdrb != NULL)
+ shm_rdrbuff_close(ai->rdrb);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ if (ai->daf_name != NULL)
+ free(ai->daf_name);
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- if (_ap_instance->flows[i].rb != NULL)
- shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
+ if (ai->flows[i].rb != NULL)
+ shm_ap_rbuff_close(ai->flows[i].rb);
+ ai->ports[ai->flows[i].port_id] = -1;
+ }
- pthread_rwlock_destroy(&_ap_instance->flows_lock);
- pthread_rwlock_destroy(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
- free(_ap_instance);
-}
+ pthread_rwlock_destroy(&ai->flows_lock);
+ pthread_rwlock_destroy(&ai->data_lock);
-static int port_id_to_fd(int port_id)
-{
- int i;
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- if (_ap_instance->flows[i].port_id == port_id)
- return i;
- return -1;
+ free(ai);
}
int flow_accept(char ** ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int cfd = -1;
+ int fd = -1;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_api = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
- msg.api = _ap_instance->api;
+ msg.api = ai->api;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
if (recv_msg == NULL)
@@ -205,22 +216,22 @@ int flow_accept(char ** ae_name)
return -1;
}
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai->flows_lock);
- cfd = bmp_allocate(_ap_instance->fds);
- if (!bmp_is_id_valid(_ap_instance->fds, cfd)) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ fd = bmp_allocate(ai->fds);
+ if (!bmp_is_id_valid(ai->fds, fd)) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->api);
- if (_ap_instance->flows[cfd].rb == NULL) {
- bmp_release(_ap_instance->fds, cfd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
+ if (ai->flows[fd].rb == NULL) {
+ bmp_release(ai->fds, fd);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -228,25 +239,27 @@ int flow_accept(char ** ae_name)
if (ae_name != NULL) {
*ae_name = strdup(recv_msg->ae_name);
if (*ae_name == NULL) {
- shm_ap_rbuff_close(_ap_instance->flows[cfd].rb);
- bmp_release(_ap_instance->fds, cfd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ shm_ap_rbuff_close(ai->flows[fd].rb);
+ bmp_release(ai->fds, fd);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -ENOMEM;
}
}
- _ap_instance->flows[cfd].port_id = recv_msg->port_id;
- _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT;
- _ap_instance->flows[cfd].api = recv_msg->api;
+ ai->flows[fd].port_id = recv_msg->port_id;
+ ai->flows[fd].oflags = FLOW_O_DEFAULT;
+ ai->flows[fd].api = recv_msg->api;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->ports[recv_msg->port_id] = fd;
+
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
- return cfd;
+ return fd;
}
int flow_alloc_resp(int fd,
@@ -261,40 +274,40 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_api = true;
- msg.api = _ap_instance->api;
+ msg.api = ai->api;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = ai->flows[fd].port_id;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
msg.has_response = true;
msg.response = response;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -1;
}
if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ret = recv_msg->result;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -320,11 +333,11 @@ int flow_alloc(char * dst_name,
msg.ae_name = src_ae_name;
msg.has_api = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
- msg.api = _ap_instance->api;
+ msg.api = ai->api;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
@@ -336,32 +349,34 @@ int flow_alloc(char * dst_name,
return -1;
}
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai->flows_lock);
- fd = bmp_allocate(_ap_instance->fds);
- if (!bmp_is_id_valid(_ap_instance->fds, fd)) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ fd = bmp_allocate(ai->fds);
+ if (!bmp_is_id_valid(ai->fds, fd)) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
- if (_ap_instance->flows[fd].rb == NULL) {
- bmp_release(_ap_instance->fds, fd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
+ if (ai->flows[fd].rb == NULL) {
+ bmp_release(ai->fds, fd);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- _ap_instance->flows[fd].port_id = recv_msg->port_id;
- _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
- _ap_instance->flows[fd].api = recv_msg->api;
+ ai->flows[fd].port_id = recv_msg->port_id;
+ ai->flows[fd].oflags = FLOW_O_DEFAULT;
+ ai->flows[fd].api = recv_msg->api;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->ports[recv_msg->port_id] = fd;
+
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -380,19 +395,19 @@ int flow_alloc_res(int fd)
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = ai->flows[fd].port_id;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
if (recv_msg == NULL) {
@@ -422,41 +437,43 @@ int flow_dealloc(int fd)
msg.has_api = true;
msg.api = getpid();
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- msg.port_id = _ap_instance->flows[fd].port_id;
+ msg.port_id = ai->flows[fd].port_id;
+
+ ai->ports[msg.port_id] = -1;
- _ap_instance->flows[fd].port_id = -1;
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
- _ap_instance->flows[fd].rb = NULL;
- _ap_instance->flows[fd].api = -1;
+ ai->flows[fd].port_id = -1;
+ shm_ap_rbuff_close(ai->flows[fd].rb);
+ ai->flows[fd].rb = NULL;
+ ai->flows[fd].api = -1;
- bmp_release(_ap_instance->fds, fd);
+ bmp_release(ai->fds, fd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -1;
}
if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ret = recv_msg->result;
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -470,30 +487,30 @@ int flow_cntl(int fd, int cmd, int oflags)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- old = _ap_instance->flows[fd].oflags;
+ old = ai->flows[fd].oflags;
switch (cmd) {
case FLOW_F_GETFL: /* GET FLOW FLAGS */
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return old;
case FLOW_F_SETFL: /* SET FLOW FLAGS */
- _ap_instance->flows[fd].oflags = oflags;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ ai->flows[fd].oflags = oflags;
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return old;
default:
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return FLOW_O_INVALID; /* unknown command */
}
}
@@ -509,42 +526,42 @@ ssize_t flow_write(int fd, void * buf, size_t count)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_rdrbuff_write(_ap_instance->rdrb,
- _ap_instance->flows[fd].api,
+ if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_rdrbuff_write(ai->rdrb,
+ ai->flows[fd].api,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
(uint8_t *) buf,
count);
if (idx == -1) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -EAGAIN;
}
e.index = idx;
- e.port_id = _ap_instance->flows[fd].port_id;
+ e.port_id = ai->flows[fd].port_id;
- if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
- shm_rdrbuff_remove(_ap_instance->rdrb, idx);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) {
+ shm_rdrbuff_remove(ai->rdrb, idx);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -1;
}
} else { /* blocking */
- struct shm_rdrbuff * rdrb = _ap_instance->rdrb;
- pid_t api = _ap_instance->flows[fd].api;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ struct shm_rdrbuff * rdrb = ai->rdrb;
+ pid_t api = ai->flows[fd].api;
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
idx = shm_rdrbuff_write_b(rdrb,
api,
@@ -553,30 +570,22 @@ ssize_t flow_write(int fd, void * buf, size_t count)
(uint8_t *) buf,
count);
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
e.index = idx;
- e.port_id = _ap_instance->flows[fd].port_id;
+ e.port_id = ai->flows[fd].port_id;
- while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0)
+ while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0)
;
}
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return 0;
}
-int flow_select(const struct timespec * timeout)
-{
- int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout);
- if (port_id < 0)
- return port_id;
- return port_id_to_fd(port_id);
-}
-
ssize_t flow_read(int fd, void * buf, size_t count)
{
int idx = -1;
@@ -586,47 +595,129 @@ ssize_t flow_read(int fd, void * buf, size_t count)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai->flows_lock);
- if (_ap_instance->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ if (ai->flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -ENOTALLOC;
}
- if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_ap_rbuff_read_port(_ap_instance->rb,
- _ap_instance->flows[fd].port_id);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
+ if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_ap_rbuff_read_port(ai->rb,
+ ai->flows[fd].port_id);
+ pthread_rwlock_unlock(&ai->flows_lock);
} else {
- struct shm_ap_rbuff * rb = _ap_instance->rb;
- int port_id = _ap_instance->flows[fd].port_id;
- struct timespec * timeout = _ap_instance->flows[fd].timeout;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ struct shm_ap_rbuff * rb = ai->rb;
+ int port_id = ai->flows[fd].port_id;
+ struct timespec * timeout = ai->flows[fd].timeout;
+ pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout);
- pthread_rwlock_rdlock(&_ap_instance->data_lock);
+ pthread_rwlock_rdlock(&ai->data_lock);
}
if (idx < 0) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -EAGAIN;
}
- n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx);
+ n = shm_rdrbuff_read(&sdu, ai->rdrb, idx);
if (n < 0) {
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return -1;
}
memcpy(buf, sdu, MIN(n, count));
- shm_rdrbuff_remove(_ap_instance->rdrb, idx);
+ shm_rdrbuff_remove(ai->rdrb, idx);
- pthread_rwlock_unlock(&_ap_instance->data_lock);
+ pthread_rwlock_unlock(&ai->data_lock);
return n;
}
+
+/* select functions */
+
+struct flow_set * flow_set_create()
+{
+ struct flow_set * set = malloc(sizeof(*set));
+ if (set == NULL)
+ return NULL;
+
+ if (pthread_rwlock_init(&set->lock, NULL)) {
+ free(set);
+ return NULL;
+ }
+
+ memset(&set->b, 0, sizeof(set->b));
+
+ set->dirty = true;
+
+ return set;
+}
+
+void flow_set_zero(struct flow_set * set)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ memset(&set->b, 0, sizeof(set->b));
+ set->dirty = true;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+void flow_set_add(struct flow_set * set, int fd)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ set->b[ai->flows[fd].port_id] = true;
+ set->dirty = true;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+void flow_set_del(struct flow_set * set, int fd)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ set->b[ai->flows[fd].port_id] = false;
+ set->dirty = true;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+bool flow_set_has(struct flow_set * set, int fd)
+{
+ bool ret;
+ pthread_rwlock_rdlock(&set->lock);
+ ret = set->b[ai->flows[fd].port_id];
+ pthread_rwlock_unlock(&set->lock);
+ return ret;
+}
+
+void flow_set_destroy(struct flow_set * set)
+{
+ pthread_rwlock_destroy(&set->lock);
+ free(set);
+}
+
+static void flow_set_cpy(struct flow_set * set)
+{
+ pthread_rwlock_rdlock(&set->lock);
+ if (set->dirty)
+ memcpy(set->s, set->b, IRMD_MAX_FLOWS);
+ set->dirty = false;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+int flow_select(struct flow_set * set, const struct timespec * timeout)
+{
+ int port_id;
+ if (set == NULL) {
+ port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout);
+ } else {
+ flow_set_cpy(set);
+ port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout);
+ }
+ if (port_id < 0)
+ return port_id;
+ return ai->ports[port_id];
+}
diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c
index 75ee2085..81bed687 100644
--- a/src/lib/lockfile.c
+++ b/src/lib/lockfile.c
@@ -43,10 +43,13 @@ struct lockfile {
};
struct lockfile * lockfile_create() {
+ mode_t mask;
struct lockfile * lf = malloc(sizeof(*lf));
if (lf == NULL)
return NULL;
+ mask = umask(0);
+
lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666);
if (lf->fd == -1) {
LOG_DBGF("Could not create lock file.");
@@ -54,11 +57,7 @@ struct lockfile * lockfile_create() {
return NULL;
}
- if (fchmod(lf->fd, 0666)) {
- LOG_DBGF("Failed to chmod lockfile.");
- free(lf);
- return NULL;
- }
+ umask(mask);
if (ftruncate(lf->fd, LF_SIZE - 1) < 0) {
LOG_DBGF("Failed to extend lockfile.");
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 77e288a8..473894d5 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -40,6 +40,10 @@
#include <signal.h>
#include <sys/stat.h>
+#define FN_MAX_CHARS 255
+#define NORTH false
+#define SOUTH true
+
#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
+ 2 * sizeof (pthread_cond_t))
@@ -59,19 +63,24 @@ struct shm_ap_rbuff {
pthread_cond_t * add; /* SDU arrived */
pthread_cond_t * del; /* SDU removed */
pid_t api; /* api to which this rb belongs */
+ bool dir; /* direction, false = N */
int fd;
};
-struct shm_ap_rbuff * shm_ap_rbuff_create()
+static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
{
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
pthread_mutexattr_t mattr;
pthread_condattr_t cattr;
- char fn[25];
+ char fn[FN_MAX_CHARS];
+ mode_t mask;
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
+ if (dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid());
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid());
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -79,6 +88,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
return NULL;
}
+ mask = umask(0);
+
shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
if (shm_fd == -1) {
LOG_DBG("Failed creating ring buffer.");
@@ -86,11 +97,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
return NULL;
}
- if (fchmod(shm_fd, 0666)) {
- LOG_DBG("Failed to chmod shared memory.");
- free(rb);
- return NULL;
- }
+ umask(mask);
if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) {
LOG_DBG("Failed to extend ringbuffer.");
@@ -150,18 +157,22 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
rb->fd = shm_fd;
rb->api = getpid();
+ rb->dir = dir;
return rb;
}
-struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
+static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)
{
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
- char fn[25];
+ char fn[FN_MAX_CHARS];
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api);
+ if (dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api);
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api);
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -204,9 +215,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
rb->fd = shm_fd;
rb->api = api;
+ rb->dir = dir;
return rb;
}
+
+struct shm_ap_rbuff * shm_ap_rbuff_create_n()
+{
+ return shm_ap_rbuff_create(NORTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_create_s()
+{
+ return shm_ap_rbuff_create(SOUTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api)
+{
+ return shm_ap_rbuff_open(api, NORTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api)
+{
+ return shm_ap_rbuff_open(api, SOUTH);
+}
+
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
{
if (rb == NULL) {
@@ -252,7 +285,10 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
if (close(rb->fd) < 0)
LOG_DBG("Couldn't close shared memory.");
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api);
+ if (rb->dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api);
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api);
if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
LOG_DBG("Couldn't unmap shared memory.");
@@ -311,15 +347,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)
return -1;
}
- ret = (rb->shm_base + *rb->ptr_tail)->index;
+ ret = tail_el_ptr(rb)->index;
pthread_mutex_unlock(rb->lock);
return ret;
}
-int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
- const struct timespec * timeout)
+static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb,
+ const struct timespec * timeout)
{
struct timespec abstime;
int ret = 0;
@@ -360,7 +396,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
}
if (ret != ETIMEDOUT)
- ret = (rb->shm_base + *rb->ptr_tail)->port_id;
+ ret = tail_el_ptr(rb)->port_id;
+ else
+ ret = -ETIMEDOUT;
+
+ pthread_cleanup_pop(true);
+
+ return ret;
+}
+
+int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
+ bool * set,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret;
+
+ if (set == NULL)
+ return shm_ap_rbuff_peek_b_all(rb, timeout);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id])
+ && (ret != ETIMEDOUT)) {
+ while (shm_rbuff_empty(rb)) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->add,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->add, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+
+ while (!set[tail_el_ptr(rb)->port_id]) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->del,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->del, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+ }
+
+ if (ret != ETIMEDOUT)
+ ret = tail_el_ptr(rb)->port_id;
else
ret = -ETIMEDOUT;
@@ -369,6 +480,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
return ret;
}
+
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
{
struct rb_entry * e = NULL;
@@ -434,8 +546,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
return idx;
}
-ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
- int port_id,
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
const struct timespec * timeout)
{
struct timespec abstime;
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index b0d295d9..7c4927fc 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -139,13 +139,13 @@ static char * rdrb_filename(enum qos_cube qos)
++chars;
} while (qm > 0);
- str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2);
+ str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1);
if (str == NULL) {
LOG_ERR("Failed to create shm_rdrbuff: Out of Memory.");
return NULL;
}
- sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos);
+ sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos);
return str;
}
@@ -154,6 +154,7 @@ static char * rdrb_filename(enum qos_cube qos)
struct shm_rdrbuff * shm_rdrbuff_create()
{
struct shm_rdrbuff * rdrb;
+ mode_t mask;
int shm_fd;
uint8_t * shm_base;
pthread_mutexattr_t mattr;
@@ -171,6 +172,8 @@ struct shm_rdrbuff * shm_rdrbuff_create()
return NULL;
}
+ mask = umask(0);
+
shm_fd = shm_open(shm_rdrb_fn, O_CREAT | O_EXCL | O_RDWR, 0666);
if (shm_fd == -1) {
LOG_DBGF("Failed creating shared memory map.");
@@ -179,12 +182,7 @@ struct shm_rdrbuff * shm_rdrbuff_create()
return NULL;
}
- if (fchmod(shm_fd, 0666)) {
- LOG_DBGF("Failed to chmod shared memory map.");
- free(shm_rdrb_fn);
- free(rdrb);
- return NULL;
- }
+ umask(mask);
if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) {
LOG_DBGF("Failed to extend shared memory map.");
@@ -469,7 +467,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
#ifndef SHM_RDRB_MULTI_BLOCK
if (sz > SHM_RDRB_BLOCK_SIZE) {
- LOG_DBGF("Multi-block SDU's disabled. Dropping.");
+ LOG_DBGF("Multi-block SDUs disabled. Dropping.");
return -1;
}
#endif
@@ -558,7 +556,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
#ifndef SHM_RDRB_MULTI_BLOCK
if (sz > SHM_RDRB_BLOCK_SIZE) {
- LOG_DBGF("Multi-block SDU's disabled. Dropping.");
+ LOG_DBGF("Multi-block SDUs disabled. Dropping.");
return -1;
}
#endif
diff --git a/src/tools/cbr/cbr.c b/src/tools/cbr/cbr.c
index e42492df..27c51586 100644
--- a/src/tools/cbr/cbr.c
+++ b/src/tools/cbr/cbr.c
@@ -44,7 +44,7 @@ struct s {
static void usage(void)
{
printf("Usage: cbr [OPTION]...\n"
- "Sends SDU's from client to server at a constant bit rate.\n\n"
+ "Sends SDUs from client to server at a constant bit rate.\n\n"
" -l, --listen Run in server mode\n"
"\n"
"Server options:\n"
@@ -54,10 +54,10 @@ static void usage(void)
"Client options:\n"
" -n, --server_apn Specify the name of the server.\n"
" -d, --duration Duration for sending (s)\n"
- " -f, --flood Send SDU's as fast as possible\n"
+ " -f, --flood Send SDUs as fast as possible\n"
" -s, --size SDU size (B)\n"
" -r, --rate Rate (b/s)\n"
- " --sleep Sleep in between sending sdu's\n"
+ " --sleep Sleep in between sending SDUs\n"
"\n\n"
" --help Display this help text and exit\n");
}
@@ -65,7 +65,7 @@ static void usage(void)
int main(int argc, char ** argv)
{
int duration = 60; /* One minute test */
- int size = 1000; /* 1000 byte SDU's */
+ int size = 1000; /* 1000 byte SDUs */
long rate = 1000000; /* 1 Mb/s */
bool flood = false;
bool sleep = false;
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c
index 2871e79e..7d2edf33 100644
--- a/src/tools/oping/oping.c
+++ b/src/tools/oping/oping.c
@@ -23,6 +23,9 @@
#define _POSIX_C_SOURCE 199506L
+#include <ouroboros/select.h>
+#include <ouroboros/dev.h>
+
#include <stdio.h>
#include <string.h>
#include <pthread.h>
@@ -59,9 +62,9 @@ struct c {
} client;
struct s {
- struct timespec times[OPING_MAX_FLOWS];
- bool flows[OPING_MAX_FLOWS];
- pthread_mutex_t lock;
+ struct timespec times[OPING_MAX_FLOWS];
+ struct flow_set * flows;
+ pthread_mutex_t lock;
pthread_t cleaner_pt;
pthread_t accept_pt;
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index 0d4a10af..3a254984 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -65,7 +65,7 @@ void * reader(void * o)
/* FIXME: use flow timeout option once we have it */
while(client.rcvd != client.count &&
- (fd = flow_select(&timeout)) != -ETIMEDOUT) {
+ (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) {
flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK);
while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {
if (msg_len < 0)
@@ -216,7 +216,7 @@ int client_main()
printf("\n");
printf("--- %s ping statistics ---\n", client.s_apn);
- printf("%d SDU's transmitted, ", client.sent);
+ printf("%d SDUs transmitted, ", client.sent);
printf("%d received, ", client.rcvd);
printf("%d%% packet loss, ", client.sent == 0 ? 0 :
100 - ((100 * client.rcvd) / client.sent));
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index 7761110d..9c7b1be7 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -21,8 +21,6 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#include <ouroboros/dev.h>
-
#ifdef __FreeBSD__
#define __XSI_VISIBLE 500
#endif
@@ -53,9 +51,9 @@ void * cleaner_thread(void * o)
clock_gettime(CLOCK_REALTIME, &now);
pthread_mutex_lock(&server.lock);
for (i = 0; i < OPING_MAX_FLOWS; ++i)
- if (server.flows[i] &&
+ if (flow_set_has(server.flows, i) &&
ts_diff_ms(&server.times[i], &now) > deadline_ms) {
- server.flows[i] = false;
+ flow_set_del(server.flows, i);
flow_dealloc(i);
}
@@ -70,10 +68,16 @@ void * server_thread(void *o)
int msg_len = 0;
struct oping_msg * msg = (struct oping_msg *) buf;
struct timespec now = {0, 0};
+ struct timespec timeout = {0, 100 * MILLION};
while (true) {
-
- int fd = flow_select(NULL);
+ int fd = flow_select(server.flows, &timeout);
+ if (fd == -ETIMEDOUT)
+ continue;
+ if (fd < 0) {
+ printf("Failed to get active fd.\n");
+ continue;
+ }
while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {
if (msg_len < 0)
continue;
@@ -126,7 +130,7 @@ void * accept_thread(void * o)
clock_gettime(CLOCK_REALTIME, &now);
pthread_mutex_lock(&server.lock);
- server.flows[fd] = true;
+ flow_set_add(server.flows, fd);
server.times[fd] = now;
pthread_mutex_unlock(&server.lock);
@@ -139,7 +143,6 @@ void * accept_thread(void * o)
int server_main()
{
struct sigaction sig_act;
- int i = 0;
memset(&sig_act, 0, sizeof sig_act);
sig_act.sa_sigaction = &shutdown_server;
@@ -153,8 +156,11 @@ int server_main()
return -1;
}
- for (i = 0; i < OPING_MAX_FLOWS; ++i)
- server.flows[i] = false;
+ server.flows = flow_set_create();
+ if (server.flows == NULL)
+ return 0;
+
+ flow_set_zero(server.flows);
pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);
pthread_create(&server.accept_pt, NULL, accept_thread, NULL);