summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/shim-eth-llc/main.c2
-rw-r--r--src/irmd/irm_flow.c38
-rw-r--r--src/irmd/irm_flow.h4
-rw-r--r--src/irmd/main.c136
-rw-r--r--src/lib/dev.c5
-rw-r--r--src/lib/shm_rbuff.c22
-rw-r--r--src/lib/shm_rdrbuff.c256
-rw-r--r--src/tools/cbr/cbr_client.c6
-rw-r--r--src/tools/operf/operf.c5
-rw-r--r--src/tools/operf/operf_client.c23
10 files changed, 213 insertions, 284 deletions
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 3f3c0e1e..fafe8651 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -67,13 +67,13 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t;
#define THIS_TYPE IPCP_SHIM_ETH_LLC
#define MGMT_SAP 0x01
-#define SHIM_ETH_LLC_MAX_SDU_SIZE 1500
#define MAC_SIZE 6
#define LLC_HEADER_SIZE 3
#define MAX_SAPS 64
#define ETH_HEADER_SIZE (2 * MAC_SIZE + 2)
#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \
+ SHIM_ETH_LLC_MAX_SDU_SIZE)
+#define SHIM_ETH_LLC_MAX_SDU_SIZE (1500 - LLC_HEADER_SIZE)
#define EVENT_WAIT_TIMEOUT 100 /* us */
#define NAME_QUERY_TIMEOUT 100000000 /* ns */
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index a228db06..2456f1e2 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -20,7 +20,10 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
+#define OUROBOROS_PREFIX "irm_flow"
+
#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
#include "irm_flow.h"
@@ -28,20 +31,12 @@
#include <stdbool.h>
#include <assert.h>
-struct irm_flow * irm_flow_create()
+struct irm_flow * irm_flow_create(pid_t n_api, pid_t n_1_api, int port_id)
{
struct irm_flow * f = malloc(sizeof(*f));
if (f == NULL)
return NULL;
- f->n_api = -1;
- f->n_1_api = -1;
- f->port_id = -1;
- f->n_rb = NULL;
- f->n_1_rb = NULL;
-
- f->state = FLOW_NULL;
-
if (pthread_cond_init(&f->state_cond, NULL)) {
free(f);
return NULL;
@@ -52,8 +47,29 @@ struct irm_flow * irm_flow_create()
return NULL;
}
- f->t0.tv_sec = 0;
- f->t0.tv_nsec = 0;
+
+ f->n_api = n_api;
+ f->n_1_api = n_1_api;
+ f->port_id = port_id;
+
+ f->n_rb = shm_rbuff_create(n_api, port_id);
+ if (f->n_rb == NULL) {
+ LOG_ERR("Could not create ringbuffer for AP-I %d.", n_api);
+ free(f);
+ return NULL;
+ }
+
+ f->n_1_rb = shm_rbuff_create(n_1_api, port_id);
+ if (f->n_1_rb == NULL) {
+ LOG_ERR("Could not create ringbuffer for AP-I %d.", n_1_api);
+ free(f);
+ return NULL;
+ }
+
+ f->state = FLOW_ALLOC_PENDING;
+
+ if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
+ LOG_WARN("Failed to set timestamp.");
return f;
}
diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h
index 40a6bb8d..9cbc657d 100644
--- a/src/irmd/irm_flow.h
+++ b/src/irmd/irm_flow.h
@@ -56,7 +56,9 @@ struct irm_flow {
pthread_mutex_t state_lock;
};
-struct irm_flow * irm_flow_create(void);
+struct irm_flow * irm_flow_create(pid_t n_api,
+ pid_t n_1_api,
+ int port_id);
void irm_flow_destroy(struct irm_flow * f);
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 6e3f952f..13bfa052 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -55,6 +55,7 @@
#include <sys/wait.h>
#define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */
+#define SHM_SAN_HOLDOFF 1000 /* ms */
struct ipcp_entry {
struct list_head next;
@@ -1141,49 +1142,21 @@ static struct irm_flow * flow_alloc(pid_t api,
}
pthread_rwlock_unlock(&irmd->reg_lock);
-
- f = irm_flow_create();
- if (f == NULL) {
- pthread_rwlock_unlock(&irmd->state_lock);
- LOG_ERR("Failed to create irm_flow.");
- return NULL;
- }
-
- f->n_api = api;
- f->state = FLOW_ALLOC_PENDING;
-
- if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
- LOG_WARN("Failed to set timestamp.");
-
pthread_rwlock_wrlock(&irmd->flows_lock);
-
- port_id = f->port_id = bmp_allocate(irmd->port_ids);
+ port_id = bmp_allocate(irmd->port_ids);
if (!bmp_is_id_valid(irmd->port_ids, port_id)) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Could not allocate port_id.");
- irm_flow_destroy(f);
return NULL;
}
- f->n_1_api = ipcp;
- f->n_rb = shm_rbuff_create(api, port_id);
- if (f->n_rb == NULL) {
- bmp_release(irmd->port_ids, port_id);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- LOG_ERR("Could not create ringbuffer for AP-I %d.", api);
- irm_flow_destroy(f);
- return NULL;
- }
-
- f->n_1_rb = shm_rbuff_create(ipcp, port_id);
- if (f->n_1_rb == NULL) {
+ f = irm_flow_create(api, ipcp, port_id);
+ if (f == NULL) {
bmp_release(irmd->port_ids, port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- LOG_ERR("Could not create ringbuffer for AP-I %d.", ipcp);
- irm_flow_destroy(f);
+ LOG_ERR("Could not allocate port_id.");
return NULL;
}
@@ -1268,7 +1241,7 @@ static int flow_dealloc(pid_t api, int port_id)
if (f == NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- LOG_DBG("Deallocate called for unknown port.");
+ LOG_DBG("Deallocate unknown port %d by %d.", port_id, api);
return 0;
}
@@ -1297,11 +1270,12 @@ static int flow_dealloc(pid_t api, int port_id)
}
pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
if (n_1_api != -1)
ret = ipcp_flow_dealloc(n_1_api, port_id);
+ pthread_rwlock_unlock(&irmd->state_lock);
+
return ret;
}
@@ -1351,21 +1325,11 @@ static struct irm_flow * flow_req_arr(pid_t api,
struct pid_el * c_api;
pid_t h_api = -1;
+ int port_id = -1;
LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.",
api, dst_name, ae_name);
- f = irm_flow_create();
- if (f == NULL) {
- LOG_ERR("Failed to create irm_flow.");
- return NULL;
- }
-
- f->state = FLOW_ALLOC_PENDING;
- f->n_1_api = api;
- if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
- LOG_WARN("Failed to set timestamp.");
-
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->reg_lock);
@@ -1374,7 +1338,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Unknown name: %s.", dst_name);
- irm_flow_destroy(f);
return NULL;
}
@@ -1387,14 +1350,12 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("No AP's for %s.", dst_name);
- irm_flow_destroy(f);
return NULL;
case REG_NAME_AUTO_ACCEPT:
c_api = malloc(sizeof(*c_api));
if (c_api == NULL) {
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- irm_flow_destroy(f);
return NULL;
}
@@ -1412,7 +1373,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Could not get start apn for reg_entry %s.",
re->name);
- irm_flow_destroy(f);
free(c_api);
return NULL;
}
@@ -1439,7 +1399,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_mutex_unlock(&re->state_lock);
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- irm_flow_destroy(f);
return NULL;
}
@@ -1447,13 +1406,12 @@ static struct irm_flow * flow_req_arr(pid_t api,
case REG_NAME_FLOW_ACCEPT:
pthread_mutex_lock(&re->state_lock);
- h_api = f->n_api = reg_entry_get_api(re);
+ h_api = reg_entry_get_api(re);
pthread_mutex_unlock(&re->state_lock);
- if (f->n_api == -1) {
+ if (h_api == -1) {
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Invalid api returned.");
- irm_flow_destroy(f);
return NULL;
}
@@ -1462,39 +1420,25 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("IRMd in wrong state.");
- irm_flow_destroy(f);
return NULL;
}
- pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- f->port_id = bmp_allocate(irmd->port_ids);
- if (!bmp_is_id_valid(irmd->port_ids, f->port_id)) {
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- LOG_ERR("Could not create ringbuffer for AP-I %d.", f->n_api);
- irm_flow_destroy(f);
- return NULL;
- }
-
- f->n_rb = shm_rbuff_create(f->n_api, f->port_id);
- if (f->n_rb == NULL) {
- bmp_release(irmd->port_ids, f->port_id);
+ port_id = bmp_allocate(irmd->port_ids);
+ if (!bmp_is_id_valid(irmd->port_ids, port_id)) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- LOG_ERR("Could not create ringbuffer for AP-I %d.", f->n_api);
- irm_flow_destroy(f);
return NULL;
}
- f->n_1_rb = shm_rbuff_create(f->n_1_api, f->port_id);
- if (f->n_1_rb == NULL) {
- bmp_release(irmd->port_ids, f->port_id);
+ f = irm_flow_create(h_api, api, port_id);
+ if (f == NULL) {
+ bmp_release(irmd->port_ids, port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- LOG_ERR("Could not create ringbuffer for AP-I %d.", f->n_1_api);
- irm_flow_destroy(f);
+ LOG_ERR("Could not allocate port_id.");
return NULL;
}
@@ -1515,6 +1459,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
bmp_release(irmd->port_ids, f->port_id);
+ list_del(&f->next);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Could not get api table entry for %d.", h_api);
@@ -1580,7 +1525,6 @@ static void irm_destroy(void)
list_for_each_safe(p, h, &irmd->irm_flows) {
struct irm_flow * f = list_entry(p, struct irm_flow, next);
list_del(&f->next);
- ipcp_flow_dealloc(f->n_1_api, f->port_id);
irm_flow_destroy(f);
}
@@ -1669,6 +1613,46 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)
}
}
+void * shm_sanitize(void * o)
+{
+ struct list_head * p = NULL;
+ struct timespec ts = {SHM_SAN_HOLDOFF / 1000,
+ (SHM_SAN_HOLDOFF % 1000) * MILLION};
+ ssize_t idx;
+
+ (void) o;
+
+ while (true) {
+ shm_rdrbuff_wait_full(irmd->rdrb);
+
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
+
+ list_for_each(p, &irmd->irm_flows) {
+ struct irm_flow * f =
+ list_entry(p, struct irm_flow, next);
+ if (kill(f->n_api, 0) < 0) {
+ while ((idx = shm_rbuff_read(f->n_rb)) >= 0)
+ shm_rdrbuff_remove(irmd->rdrb, idx);
+ continue;
+ }
+
+ if (kill(f->n_1_api, 0) < 0) {
+ while ((idx = shm_rbuff_read(f->n_1_rb)) >= 0)
+ shm_rdrbuff_remove(irmd->rdrb, idx);
+ continue;
+ }
+ }
+
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
+
+ nanosleep(&ts, NULL);
+ }
+
+ return (void *) 0;
+}
+
void * irm_sanitize(void * o)
{
struct timespec now;
@@ -2210,7 +2194,7 @@ int main(int argc, char ** argv)
pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL);
pthread_create(&irmd->shm_sanitize, NULL,
- shm_rdrbuff_sanitize, irmd->rdrb);
+ shm_sanitize, irmd->rdrb);
/* wait for (all of them) to return */
for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
diff --git a/src/lib/dev.c b/src/lib/dev.c
index fc8739a2..1c0d73a1 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -536,7 +536,7 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
return -1;
}
- ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);
+ ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);
if (ai.flows[fd].rx_rb == NULL) {
reset_flow(fd);
bmp_release(ai.fds, fd);
@@ -746,7 +746,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)
if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
idx = shm_rdrbuff_write(ai.rdrb,
- ai.flows[fd].api,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
buf,
@@ -766,7 +765,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)
} else { /* blocking */
struct shm_rdrbuff * rdrb = ai.rdrb;
struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb;
- pid_t api = ai.flows[fd].api;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -774,7 +772,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)
assert(tx_rb);
idx = shm_rdrbuff_write_b(rdrb,
- api,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
buf,
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c
index 301669e7..c0901ab1 100644
--- a/src/lib/shm_rbuff.c
+++ b/src/lib/shm_rbuff.c
@@ -43,12 +43,12 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_CLOSED -1
#define RB_OPEN 0
+#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
- + 2 * sizeof(size_t) + sizeof(int8_t) \
- + sizeof(pthread_mutex_t) \
+ + 3 * sizeof(size_t) \
+ + sizeof(pthread_mutex_t) \
+ 2 * sizeof (pthread_cond_t))
#define shm_rbuff_used(rb) ((*rb->head + (SHM_BUFFER_SIZE) - *rb->tail) \
@@ -62,7 +62,7 @@ struct shm_rbuff {
ssize_t * shm_base; /* start of entry */
size_t * head; /* start of ringbuffer head */
size_t * tail; /* start of ringbuffer tail */
- int8_t * acl; /* access control */
+ size_t * acl; /* access control */
pthread_mutex_t * lock; /* lock all free space in shm */
pthread_cond_t * add; /* SDU arrived */
pthread_cond_t * del; /* SDU removed */
@@ -126,7 +126,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
rb->shm_base = shm_base;
rb->head = (size_t *) (rb->shm_base + (SHM_BUFFER_SIZE));
rb->tail = rb->head + 1;
- rb->acl = (int8_t *) (rb->tail + 1);
+ rb->acl = rb->tail + 1;
rb->lock = (pthread_mutex_t *) (rb->acl + 1);
rb->add = (pthread_cond_t *) (rb->lock + 1);
rb->del = rb->add + 1;
@@ -153,9 +153,6 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
rb->api = api;
rb->port_id = port_id;
- if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
- LOG_DBG("Couldn't unmap shared memory.");
-
return rb;
}
@@ -202,7 +199,7 @@ struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id)
rb->shm_base = shm_base;
rb->head = (size_t *) (rb->shm_base + (SHM_BUFFER_SIZE));
rb->tail = rb->head + 1;
- rb->acl = (int8_t *) (rb->tail + 1);
+ rb->acl = rb->tail + 1;
rb->lock = (pthread_mutex_t *) (rb->acl + 1);
rb->add = (pthread_cond_t *) (rb->lock + 1);
rb->del = rb->add + 1;
@@ -225,13 +222,16 @@ void shm_rbuff_close(struct shm_rbuff * rb)
void shm_rbuff_destroy(struct shm_rbuff * rb)
{
- char fn[25];
+ char fn[FN_MAX_CHARS];
if (rb == NULL)
return;
sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id);
+ if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
+ LOG_DBG("Couldn't unmap shared memory.");
+
if (shm_unlink(fn) == -1)
LOG_DBG("Failed to unlink shm %s.", fn);
@@ -251,7 +251,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
pthread_mutex_consistent(rb->lock);
}
#endif
- if (*rb->acl) {
+ if (*rb->acl == RB_CLOSED) {
pthread_mutex_unlock(rb->lock);
return -ENOTALLOC;
}
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index dc1feb10..a8245447 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -42,16 +42,26 @@
#include <ouroboros/logs.h>
#define SHM_BLOCKS_SIZE ((SHM_BUFFER_SIZE) * SHM_RDRB_BLOCK_SIZE)
-#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof(size_t) \
+#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof(size_t) \
+ sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
+#ifndef SHM_RDRB_MULTI_BLOCK
+#define WAIT_BLOCKS 1
+#else
+#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4)
+#if WAIT_BLOCKS == 0
+#undef WAIT_BLOCKS
+#define WAIT_BLOCKS 1
+#endif
+#endif
+
#define get_head_ptr(rdrb) \
- ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->ptr_head \
+ ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head \
* SHM_RDRB_BLOCK_SIZE)))
#define get_tail_ptr(rdrb) \
- ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->ptr_tail \
+ ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail \
* SHM_RDRB_BLOCK_SIZE)))
#define idx_to_du_buff_ptr(rdrb, idx) \
@@ -61,13 +71,19 @@
(((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE)
#define shm_rdrb_used(rdrb) \
- ((*rdrb->ptr_head + (SHM_BUFFER_SIZE) - *rdrb->ptr_tail) \
+ ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) \
& ((SHM_BUFFER_SIZE) - 1))
+
#define shm_rdrb_free(rdrb, i) \
(shm_rdrb_used(rdrb) + i < (SHM_BUFFER_SIZE))
#define shm_rdrb_empty(rdrb) \
- (*rdrb->ptr_tail == *rdrb->ptr_head)
+ (*rdrb->tail == *rdrb->head)
+
+enum shm_du_buff_flags {
+ SDB_VALID = 0,
+ SDB_NULL
+};
struct shm_du_buff {
size_t size;
@@ -76,20 +92,18 @@ struct shm_du_buff {
#endif
size_t du_head;
size_t du_tail;
- pid_t dst_api;
+ size_t flags;
size_t idx;
};
struct shm_rdrbuff {
- uint8_t * shm_base; /* start of blocks */
- size_t * ptr_head; /* start of ringbuffer head */
- size_t * ptr_tail; /* start of ringbuffer tail */
- pthread_mutex_t * lock; /* lock all free space in shm */
- size_t * choked; /* stale sdu detection */
- pthread_cond_t * healthy; /* du map is healthy */
- pthread_cond_t * full; /* run sanitizer when buffer full */
- pid_t * api; /* api of the irmd owner */
- enum qos_cube qos; /* qos id which this buffer serves */
+ uint8_t * shm_base; /* start of blocks */
+ size_t * head; /* start of ringbuffer head */
+ size_t * tail; /* start of ringbuffer tail */
+ pthread_mutex_t * lock; /* lock all free space in shm */
+ pthread_cond_t * full; /* flag when full */
+ pthread_cond_t * healthy; /* flag when SDU is read */
+ pid_t * api; /* api of the irmd owner */
};
static void garbage_collect(struct shm_rdrbuff * rdrb)
@@ -97,61 +111,31 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)
#ifdef SHM_RDRB_MULTI_BLOCK
struct shm_du_buff * sdb;
while (!shm_rdrb_empty(rdrb) &&
- (sdb = get_tail_ptr(rdrb))->dst_api == -1)
- *rdrb->ptr_tail = (*rdrb->ptr_tail + sdb->blocks)
+ (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL)
+ *rdrb->tail = (*rdrb->tail + sdb->blocks)
& ((SHM_BUFFER_SIZE) - 1);
#else
- while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->dst_api == -1)
- *rdrb->ptr_tail =
- (*rdrb->ptr_tail + 1) & ((SHM_BUFFER_SIZE) - 1);
-
+ while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL)
+ *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);
#endif
+ pthread_cond_broadcast(rdrb->healthy);
}
-static void clean_sdus(struct shm_rdrbuff * rdrb, pid_t api)
-{
- size_t idx = *rdrb->ptr_tail;
- struct shm_du_buff * buf;
-
- while (idx != *rdrb->ptr_head) {
- buf = idx_to_du_buff_ptr(rdrb, idx);
- if (buf->dst_api == api)
- buf->dst_api = -1;
-#ifdef SHM_RDRB_MULTI_BLOCK
- idx = (idx + buf->blocks) & ((SHM_BUFFER_SIZE) - 1);
-#else
- idx = (idx + 1) & ((SHM_BUFFER_SIZE) - 1);
-#endif
- }
-
- garbage_collect(rdrb);
-
- *rdrb->choked = 0;
-}
-
-static char * rdrb_filename(enum qos_cube qos)
+static char * rdrb_filename(void)
{
- size_t chars = 0;
char * str;
- int qm = QOS_MAX;
- do {
- qm /= 10;
- ++chars;
- } while (qm > 0);
-
- str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1);
+ str = malloc(strlen(SHM_RDRB_PREFIX) + 1);
if (str == NULL) {
LOG_ERR("Failed to create shm_rdrbuff: Out of Memory.");
return NULL;
}
- sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos);
+ sprintf(str, "%s", SHM_RDRB_PREFIX);
return str;
}
-/* FIXME: create a ringbuffer for each qos cube in the system */
struct shm_rdrbuff * shm_rdrbuff_create()
{
struct shm_rdrbuff * rdrb;
@@ -160,8 +144,7 @@ struct shm_rdrbuff * shm_rdrbuff_create()
uint8_t * shm_base;
pthread_mutexattr_t mattr;
pthread_condattr_t cattr;
- enum qos_cube qos = QOS_CUBE_BE;
- char * shm_rdrb_fn = rdrb_filename(qos);
+ char * shm_rdrb_fn = rdrb_filename();
if (shm_rdrb_fn == NULL) {
LOG_ERR("Could not create rdrbuff. Out of Memory");
return NULL;
@@ -212,14 +195,12 @@ struct shm_rdrbuff * shm_rdrbuff_create()
}
rdrb->shm_base = shm_base;
- rdrb->ptr_head = (size_t *)
- ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE);
- rdrb->ptr_tail = rdrb->ptr_head + 1;
- rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1);
- rdrb->choked = (size_t *) (rdrb->lock + 1);
- rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1);
- rdrb->full = rdrb->healthy + 1;
- rdrb->api = (pid_t *) (rdrb->full + 1);
+ rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE);
+ rdrb->tail = rdrb->head + 1;
+ rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1);
+ rdrb->full = (pthread_cond_t *) (rdrb->lock + 1);
+ rdrb->healthy = rdrb->full + 1;
+ rdrb->api = (pid_t *) (rdrb->healthy + 1);
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
@@ -236,29 +217,22 @@ struct shm_rdrbuff * shm_rdrbuff_create()
pthread_cond_init(rdrb->full, &cattr);
pthread_cond_init(rdrb->healthy, &cattr);
- *rdrb->ptr_head = 0;
- *rdrb->ptr_tail = 0;
-
- *rdrb->choked = 0;
+ *rdrb->head = 0;
+ *rdrb->tail = 0;
*rdrb->api = getpid();
- rdrb->qos = qos;
-
free(shm_rdrb_fn);
return rdrb;
}
-/* FIXME: open a ringbuffer for each qos cube in the system */
struct shm_rdrbuff * shm_rdrbuff_open()
{
struct shm_rdrbuff * rdrb;
int shm_fd;
uint8_t * shm_base;
-
- enum qos_cube qos = QOS_CUBE_BE;
- char * shm_rdrb_fn = rdrb_filename(qos);
+ char * shm_rdrb_fn = rdrb_filename();
if (shm_rdrb_fn == NULL) {
LOG_ERR("Could not create rdrbuff. Out of Memory");
return NULL;
@@ -297,32 +271,20 @@ struct shm_rdrbuff * shm_rdrbuff_open()
}
rdrb->shm_base = shm_base;
- rdrb->ptr_head = (size_t *)
- ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE);
- rdrb->ptr_tail = rdrb->ptr_head + 1;
- rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1);
- rdrb->choked = (size_t *) (rdrb->lock + 1);
- rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1);
- rdrb->full = rdrb->healthy + 1;
- rdrb->api = (pid_t *) (rdrb->full + 1);
-
- rdrb->qos = qos;
+ rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE);
+ rdrb->tail = rdrb->head + 1;
+ rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1);
+ rdrb->full = (pthread_cond_t *) (rdrb->lock + 1);
+ rdrb->healthy = rdrb->full + 1;
+ rdrb->api = (pid_t *) (rdrb->healthy + 1);
free(shm_rdrb_fn);
return rdrb;
}
-void * shm_rdrbuff_sanitize(void * o)
+void shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb)
{
- struct shm_rdrbuff * rdrb = (struct shm_rdrbuff *) o;
- struct timespec intv
- = {SHM_DU_TIMEOUT_MICROS / MILLION,
- (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000};
-
- pid_t api;
-
- assert(o);
#ifdef __APPLE__
pthread_mutex_lock(rdrb->lock);
@@ -332,14 +294,10 @@ void * shm_rdrbuff_sanitize(void * o)
pthread_mutex_consistent(rdrb->lock);
}
#endif
-
pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
(void *) rdrb->lock);
- while (true) {
- int ret = 0;
- struct timespec now;
- struct timespec dl;
+ while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) {
#ifdef __APPLE__
pthread_cond_wait(rdrb->full, rdrb->lock);
#else
@@ -348,49 +306,11 @@ void * shm_rdrbuff_sanitize(void * o)
pthread_mutex_consistent(rdrb->lock);
}
#endif
- *rdrb->choked = 1;
-
- garbage_collect(rdrb);
-
- if (shm_rdrb_empty(rdrb)) {
- pthread_cond_broadcast(rdrb->healthy);
- continue;
- }
-
- api = get_tail_ptr(rdrb)->dst_api;
-
- if (kill(api, 0)) {
- LOG_DBGF("Dead process %d left stale sdu.", api);
- clean_sdus(rdrb, api);
- pthread_cond_broadcast(rdrb->healthy);
- continue;
- }
-
- clock_gettime(CLOCK_REALTIME, &now);
- ts_add(&now, &intv, &dl);
- while (*rdrb->choked) {
- ret = pthread_cond_timedwait(rdrb->healthy,
- rdrb->lock,
- &dl);
- if (!ret)
- continue;
-#ifndef __APPLE__
- if (ret == EOWNERDEAD) {
- LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(rdrb->lock);
- }
-#endif
- if (ret == ETIMEDOUT) {
- LOG_DBGF("SDU timed out (dst: %d).", api);
- clean_sdus(rdrb, api);
- }
- }
- pthread_cond_broadcast(rdrb->healthy);
}
- pthread_cleanup_pop(true);
+ garbage_collect(rdrb);
- return (void *) 0;
+ pthread_cleanup_pop(true);
}
void shm_rdrbuff_close(struct shm_rdrbuff * rdrb)
@@ -417,7 +337,7 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)
if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)
LOG_DBG("Couldn't unmap shared memory.");
- shm_rdrb_fn = rdrb_filename(rdrb->qos);
+ shm_rdrb_fn = rdrb_filename();
if (shm_rdrb_fn == NULL) {
LOG_ERR("Could not create rdrbuff. Out of Memory");
return;
@@ -431,7 +351,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)
}
ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
- pid_t dst_api,
size_t headspace,
size_t tailspace,
uint8_t * data,
@@ -444,7 +363,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
size_t padblocks = 0;
#endif
ssize_t sz = size + sizeof(*sdb);
- uint8_t * write_pos;
assert(rdrb);
assert(data);
@@ -469,14 +387,15 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
++blocks;
}
- if (blocks + *rdrb->ptr_head > (SHM_BUFFER_SIZE))
- padblocks = (SHM_BUFFER_SIZE) - *rdrb->ptr_head;
+ if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
+ padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
if (!shm_rdrb_free(rdrb, blocks + padblocks)) {
#else
if (!shm_rdrb_free(rdrb, 1)) {
#endif
- pthread_cond_signal(rdrb->full);
+ LOG_DBG("buffer full, idx = %ld.", *rdrb->tail);
+ pthread_cond_broadcast(rdrb->full);
pthread_mutex_unlock(rdrb->lock);
return -1;
}
@@ -486,31 +405,29 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
sdb = get_head_ptr(rdrb);
sdb->size = 0;
sdb->blocks = padblocks;
- sdb->dst_api = -1;
+ sdb->flags = SDB_NULL;
sdb->du_head = 0;
sdb->du_tail = 0;
- sdb->idx = *rdrb->ptr_head;
+ sdb->idx = *rdrb->head;
- *rdrb->ptr_head = 0;
+ *rdrb->head = 0;
}
#endif
sdb = get_head_ptr(rdrb);
sdb->size = size;
- sdb->dst_api = dst_api;
+ sdb->flags = SDB_VALID;
sdb->du_head = headspace;
sdb->du_tail = sdb->du_head + len;
#ifdef SHM_RDRB_MULTI_BLOCK
sdb->blocks = blocks;
#endif
- write_pos = ((uint8_t *) (sdb + 1)) + headspace;
-
- memcpy(write_pos, data, len);
+ memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len);
- sdb->idx = *rdrb->ptr_head;
+ sdb->idx = *rdrb->head;
#ifdef SHM_RDRB_MULTI_BLOCK
- *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & ((SHM_BUFFER_SIZE) - 1);
+ *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1);
#else
- *rdrb->ptr_head = (*rdrb->ptr_head + 1) & ((SHM_BUFFER_SIZE) - 1);
+ *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1);
#endif
pthread_mutex_unlock(rdrb->lock);
@@ -518,7 +435,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
}
ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
- pid_t dst_api,
size_t headspace,
size_t tailspace,
uint8_t * data,
@@ -531,7 +447,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
size_t padblocks = 0;
#endif
ssize_t sz = size + sizeof(*sdb);
- uint8_t * write_pos;
assert(rdrb);
assert(data);
@@ -559,14 +474,14 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
++blocks;
}
- if (blocks + *rdrb->ptr_head > (SHM_BUFFER_SIZE))
- padblocks = (SHM_BUFFER_SIZE) - *rdrb->ptr_head;
+ if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
+ padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
while (!shm_rdrb_free(rdrb, (blocks + padblocks))) {
#else
while (!shm_rdrb_free(rdrb, 1)) {
#endif
- pthread_cond_signal(rdrb->full);
+ pthread_cond_broadcast(rdrb->full);
pthread_cond_wait(rdrb->healthy, rdrb->lock);
}
@@ -575,31 +490,29 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
sdb = get_head_ptr(rdrb);
sdb->size = 0;
sdb->blocks = padblocks;
- sdb->dst_api = -1;
+ sdb->flags = SDB_NULL;
sdb->du_head = 0;
sdb->du_tail = 0;
- sdb->idx = *rdrb->ptr_head;
+ sdb->idx = *rdrb->head;
- *rdrb->ptr_head = 0;
+ *rdrb->head = 0;
}
#endif
sdb = get_head_ptr(rdrb);
sdb->size = size;
- sdb->dst_api = dst_api;
+ sdb->flags = SDB_VALID;
sdb->du_head = headspace;
sdb->du_tail = sdb->du_head + len;
#ifdef SHM_RDRB_MULTI_BLOCK
sdb->blocks = blocks;
#endif
- write_pos = ((uint8_t *) (sdb + 1)) + headspace;
-
- memcpy(write_pos, data, len);
+ memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len);
- sdb->idx = *rdrb->ptr_head;
+ sdb->idx = *rdrb->head;
#ifdef SHM_RDRB_MULTI_BLOCK
- *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & ((SHM_BUFFER_SIZE) - 1);
+ *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1);
#else
- *rdrb->ptr_head = (*rdrb->ptr_head + 1) & ((SHM_BUFFER_SIZE) - 1);
+ *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1);
#endif
pthread_cleanup_pop(true);
@@ -684,18 +597,15 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, size_t idx)
return -1;
}
- idx_to_du_buff_ptr(rdrb, idx)->dst_api = -1;
+ idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL;
- if (idx != *rdrb->ptr_tail) {
+ if (idx != *rdrb->tail) {
pthread_mutex_unlock(rdrb->lock);
return 0;
}
garbage_collect(rdrb);
- *rdrb->choked = 0;
-
- pthread_cond_broadcast(rdrb->healthy);
pthread_mutex_unlock(rdrb->lock);
return 0;
diff --git a/src/tools/cbr/cbr_client.c b/src/tools/cbr/cbr_client.c
index b2cf7d7f..58198b86 100644
--- a/src/tools/cbr/cbr_client.c
+++ b/src/tools/cbr/cbr_client.c
@@ -46,7 +46,7 @@ int client_main(char * server,
int result = 0;
bool stop = false;
char buf[size];
- int seqnr = 0;
+ long seqnr = 0;
long gap = size * 8.0 * (BILLION / (double) rate);
struct timespec start;
@@ -114,8 +114,8 @@ int client_main(char * server,
ms = ts_diff_ms(&start, &end);
printf("sent statistics: "
- "%9d SDUs, %12d bytes in %9d ms, %4.4f Mb/s\n",
- seqnr, seqnr * size, ms, (seqnr * size * 8.0)/(ms * 1000));
+ "%9ld SDUs, %12ld bytes in %9d ms, %4.4f Mb/s\n",
+ seqnr, seqnr * size, ms, (seqnr / (ms * 1000.0)) * size * 8.0);
flow_dealloc(fd);
diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c
index b52109cf..46dfc14d 100644
--- a/src/tools/operf/operf.c
+++ b/src/tools/operf/operf.c
@@ -41,6 +41,7 @@ struct c {
int size;
long rate;
bool flood;
+ bool sleep;
int duration;
size_t sent;
@@ -81,6 +82,7 @@ static void usage(void)
" -r, --rate Rate (b/s)\n"
" -s, --size Payload size (B, default 1500)\n"
" -f, --flood Send SDUs as fast as possible\n"
+ " --sleep Sleep in between sending SDUs\n"
" --help Display this help text and exit\n");
}
@@ -100,6 +102,7 @@ int main(int argc, char ** argv)
server.timeout = 1000; /* ms */
client.rate = 1000000;
client.flood = false;
+ client.sleep = false;
while (argc > 0) {
if (strcmp(*argv, "-n") == 0 ||
@@ -127,6 +130,8 @@ int main(int argc, char ** argv)
} else if (strcmp(*argv, "-f") == 0 ||
strcmp(*argv, "--flood") == 0) {
client.flood = true;
+ } else if (strcmp(*argv, "--sleep") == 0) {
+ client.sleep = true;
} else if (strcmp(*argv, "-l") == 0 ||
strcmp(*argv, "--listen") == 0) {
serv = true;
diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c
index 1f6226d4..902a7b41 100644
--- a/src/tools/operf/operf_client.c
+++ b/src/tools/operf/operf_client.c
@@ -36,6 +36,17 @@
#include <errno.h>
#include <float.h>
+static void busy_wait_until(const struct timespec * deadline)
+{
+ struct timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+ while (now.tv_sec < deadline->tv_sec)
+ clock_gettime(CLOCK_REALTIME, &now);
+ while (now.tv_sec == deadline->tv_sec
+ && now.tv_nsec < deadline->tv_nsec)
+ clock_gettime(CLOCK_REALTIME, &now);
+}
+
void shutdown_client(int signo, siginfo_t * info, void * c)
{
(void) info;
@@ -85,6 +96,7 @@ void * writer(void * o)
struct timespec now;
struct timespec start;
struct timespec intv = {(gap / BILLION), gap % BILLION};
+ struct timespec end = {0, 0};
char * buf = malloc(client.size);
if (buf == NULL)
@@ -123,6 +135,9 @@ void * writer(void * o)
}
} else {
while (ts_diff_ms(&start, &now) < client.duration) {
+ clock_gettime(CLOCK_REALTIME, &now);
+ ts_add(&now, &intv, &end);
+
if (flow_write(*fdp, buf, client.size) == -1) {
printf("Failed to send SDU.\n");
flow_dealloc(*fdp);
@@ -131,10 +146,10 @@ void * writer(void * o)
}
++client.sent;
-
- nanosleep(&intv, NULL);
-
- clock_gettime(CLOCK_REALTIME, &now);
+ if (client.sleep)
+ nanosleep(&intv, NULL);
+ else
+ busy_wait_until(&end);
}
}