summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-09-02 13:48:36 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-09-02 14:59:42 +0200
commit4f44c91c68a3706e04334066f28471d56cc71849 (patch)
treec31bfb75b7abfea47a1773fcbe2782d4832d5c49
parent4493d36a4769c6625e3025a0c484bf0ec65708bd (diff)
downloadouroboros-4f44c91c68a3706e04334066f28471d56cc71849.tar.gz
ouroboros-4f44c91c68a3706e04334066f28471d56cc71849.zip
lib: Add northbound ringbuffers
Fast path is split in north and southbound paths.
-rw-r--r--include/ouroboros/shm_ap_rbuff.h30
-rw-r--r--src/ipcpd/local/main.c6
-rw-r--r--src/ipcpd/normal/fmgr.c4
-rw-r--r--src/ipcpd/normal/main.c2
-rw-r--r--src/ipcpd/shim-eth-llc/main.c6
-rw-r--r--src/ipcpd/shim-udp/main.c6
-rw-r--r--src/irmd/main.c14
-rw-r--r--src/lib/dev.c6
-rw-r--r--src/lib/shm_ap_rbuff.c51
-rw-r--r--src/lib/shm_rdrbuff.c4
10 files changed, 94 insertions, 35 deletions
diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h
index 9dad0863..594c9260 100644
--- a/include/ouroboros/shm_ap_rbuff.h
+++ b/include/ouroboros/shm_ap_rbuff.h
@@ -35,20 +35,38 @@ struct rb_entry {
int port_id;
};
-struct shm_ap_rbuff * shm_ap_rbuff_create();
-struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api);
+/* recv SDUs from N + 1 */
+struct shm_ap_rbuff * shm_ap_rbuff_create_n();
+
+/* recv SDUs from N - 1 */
+struct shm_ap_rbuff * shm_ap_rbuff_create_s();
+
+/* write SDUs to N - 1 */
+struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api);
+
+/* write SDUs to N + 1 */
+struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api);
+
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb);
+
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);
+
int shm_ap_rbuff_write(struct shm_ap_rbuff * rb,
- struct rb_entry * e);
+ struct rb_entry * e);
+
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb);
+
int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb);
+
int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
const struct timespec * timeout);
+
ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb,
- int port_id);
-ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
- int port_id,
+ int port_id);
+
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
const struct timespec * timeout);
+
void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb);
#endif /* OUROBOROS_SHM_AP_RBUFF_H */
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index f1b6dd9e..c0809429 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -105,7 +105,7 @@ static int shim_ap_init()
return -1;
}
- _ap_instance->rb = shm_ap_rbuff_create();
+ _ap_instance->rb = shm_ap_rbuff_create_n();
if (_ap_instance->rb == NULL) {
shm_rdrbuff_close(_ap_instance->rdrb);
bmp_destroy(_ap_instance->fds);
@@ -331,7 +331,7 @@ static int ipcp_local_flow_alloc(pid_t n_api,
return -1; /* -ENOTENROLLED */
}
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL) {
pthread_rwlock_unlock(&_ipcp->state_lock);
return -1; /* -ENORBUFF */
@@ -421,7 +421,7 @@ static int ipcp_local_flow_alloc_resp(pid_t n_api,
return -1;
}
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
_ap_instance->flows[in_fd].state = FLOW_NULL;
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 3056b46d..d74ad0c8 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -367,7 +367,7 @@ int fmgr_flow_alloc(pid_t n_api,
free(buf.data);
- flow->flow.rb = shm_ap_rbuff_open(n_api);
+ flow->flow.rb = shm_ap_rbuff_open_s(n_api);
if (flow->flow.rb == NULL) {
pthread_mutex_unlock(&fmgr->n_flows_lock);
free(flow);
@@ -478,7 +478,7 @@ int fmgr_flow_alloc_resp(pid_t n_api,
flow->flow.state = FLOW_ALLOCATED;
flow->flow.api = n_api;
- flow->flow.rb = shm_ap_rbuff_open(n_api);
+ flow->flow.rb = shm_ap_rbuff_open_s(n_api);
if (flow->flow.rb == NULL) {
n_flow_dealloc(port_id);
pthread_mutex_unlock(&fmgr->n_flows_lock);
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index cf4ae3f1..082973f4 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -212,7 +212,7 @@ struct normal_ipcp_data * normal_ipcp_data_create()
return NULL;
}
- normal_data->rb = shm_ap_rbuff_open(getpid());
+ normal_data->rb = shm_ap_rbuff_create_n();
if (normal_data->rb == NULL) {
shm_rdrbuff_close(normal_data->rdrb);
free(normal_data);
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index d1100001..028d249f 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -161,7 +161,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()
return NULL;
}
- eth_llc_data->rb = shm_ap_rbuff_create();
+ eth_llc_data->rb = shm_ap_rbuff_create_n();
if (eth_llc_data->rb == NULL) {
shm_rdrbuff_close(eth_llc_data->rdrb);
free(eth_llc_data);
@@ -1084,7 +1084,7 @@ static int eth_llc_ipcp_flow_alloc(pid_t n_api,
if (qos != QOS_CUBE_BE)
LOG_DBGF("QoS requested. Ethernet LLC can't do that. For now.");
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL)
return -1; /* -ENORBUFF */
@@ -1169,7 +1169,7 @@ static int eth_llc_ipcp_flow_alloc_resp(pid_t n_api,
return -1;
}
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
ipcp_flow(index)->state = FLOW_NULL;
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 451a2a4c..85de1eec 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -128,7 +128,7 @@ static int shim_ap_init()
return -1;
}
- _ap_instance->rb = shm_ap_rbuff_create();
+ _ap_instance->rb = shm_ap_rbuff_create_n();
if (_ap_instance->rb == NULL) {
shm_rdrbuff_close(_ap_instance->rdrb);
bmp_destroy(_ap_instance->fds);
@@ -1179,7 +1179,7 @@ static int ipcp_udp_flow_alloc(pid_t n_api,
if (qos != QOS_CUBE_BE)
LOG_DBG("QoS requested. UDP/IP can't do that.");
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL)
return -1; /* -ENORBUFF */
@@ -1333,7 +1333,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_api,
return -1;
}
- rb = shm_ap_rbuff_open(n_api);
+ rb = shm_ap_rbuff_open_s(n_api);
if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
_ap_instance->flows[fd].state = FLOW_NULL;
diff --git a/src/irmd/main.c b/src/irmd/main.c
index a69dd526..a4962c3b 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1742,7 +1742,7 @@ void * irm_sanitize()
if (kill(f->n_api, 0) < 0) {
struct shm_ap_rbuff * n_rb =
- shm_ap_rbuff_open(f->n_api);
+ shm_ap_rbuff_open_s(f->n_api);
bmp_release(irmd->port_ids, f->port_id);
list_del(&f->next);
@@ -1755,13 +1755,17 @@ void * irm_sanitize()
continue;
}
if (kill(f->n_1_api, 0) < 0) {
- struct shm_ap_rbuff * n_1_rb =
- shm_ap_rbuff_open(f->n_1_api);
+ struct shm_ap_rbuff * n_1_rb_s =
+ shm_ap_rbuff_open_s(f->n_1_api);
+ struct shm_ap_rbuff * n_1_rb_n =
+ shm_ap_rbuff_open_n(f->n_1_api);
list_del(&f->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
f->n_1_api, f->port_id);
- if (n_1_rb != NULL)
- shm_ap_rbuff_destroy(n_1_rb);
+ if (n_1_rb_n != NULL)
+ shm_ap_rbuff_destroy(n_1_rb_n);
+ if (n_1_rb_s != NULL)
+ shm_ap_rbuff_destroy(n_1_rb_s);
irm_flow_destroy(f);
}
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 17c473ed..25c3fcd4 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -112,7 +112,7 @@ int ap_init(char * ap_name)
return -1;
}
- _ap_instance->rb = shm_ap_rbuff_create();
+ _ap_instance->rb = shm_ap_rbuff_create_s();
if (_ap_instance->rb == NULL) {
shm_rdrbuff_close(_ap_instance->rdrb);
bmp_destroy(_ap_instance->fds);
@@ -216,7 +216,7 @@ int flow_accept(char ** ae_name)
return -1;
}
- _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->api);
+ _ap_instance->flows[cfd].rb = shm_ap_rbuff_open_n(recv_msg->api);
if (_ap_instance->flows[cfd].rb == NULL) {
bmp_release(_ap_instance->fds, cfd);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
@@ -347,7 +347,7 @@ int flow_alloc(char * dst_name,
return -1;
}
- _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
if (_ap_instance->flows[fd].rb == NULL) {
bmp_release(_ap_instance->fds, fd);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 77e288a8..6cc9590e 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -40,6 +40,10 @@
#include <signal.h>
#include <sys/stat.h>
+#define FN_MAX_CHARS 255
+#define NORTH false
+#define SOUTH true
+
#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
+ 2 * sizeof (pthread_cond_t))
@@ -59,19 +63,23 @@ struct shm_ap_rbuff {
pthread_cond_t * add; /* SDU arrived */
pthread_cond_t * del; /* SDU removed */
pid_t api; /* api to which this rb belongs */
+ bool dir; /* direction, false = N */
int fd;
};
-struct shm_ap_rbuff * shm_ap_rbuff_create()
+static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
{
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
pthread_mutexattr_t mattr;
pthread_condattr_t cattr;
- char fn[25];
+ char fn[FN_MAX_CHARS];
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
+ if (dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid());
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid());
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -150,18 +158,22 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
rb->fd = shm_fd;
rb->api = getpid();
+ rb->dir = dir;
return rb;
}
-struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
+static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)
{
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
- char fn[25];
+ char fn[FN_MAX_CHARS];
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api);
+ if (dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api);
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api);
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -204,9 +216,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
rb->fd = shm_fd;
rb->api = api;
+ rb->dir = dir;
return rb;
}
+
+struct shm_ap_rbuff * shm_ap_rbuff_create_n()
+{
+ return shm_ap_rbuff_create(NORTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_create_s()
+{
+ return shm_ap_rbuff_create(SOUTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api)
+{
+ return shm_ap_rbuff_open(api, NORTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api)
+{
+ return shm_ap_rbuff_open(api, SOUTH);
+}
+
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
{
if (rb == NULL) {
@@ -252,7 +286,10 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
if (close(rb->fd) < 0)
LOG_DBG("Couldn't close shared memory.");
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api);
+ if (rb->dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api);
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api);
if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
LOG_DBG("Couldn't unmap shared memory.");
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index 93a889ce..07574f1a 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -139,13 +139,13 @@ static char * rdrb_filename(enum qos_cube qos)
++chars;
} while (qm > 0);
- str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2);
+ str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1);
if (str == NULL) {
LOG_ERR("Failed to create shm_rdrbuff: Out of Memory.");
return NULL;
}
- sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos);
+ sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos);
return str;
}