diff options
-rw-r--r-- | include/ouroboros/shm_rdrbuff.h | 4 | ||||
-rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 2 | ||||
-rw-r--r-- | src/irmd/irm_flow.c | 38 | ||||
-rw-r--r-- | src/irmd/irm_flow.h | 4 | ||||
-rw-r--r-- | src/irmd/main.c | 136 | ||||
-rw-r--r-- | src/lib/dev.c | 5 | ||||
-rw-r--r-- | src/lib/shm_rbuff.c | 22 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 256 | ||||
-rw-r--r-- | src/tools/cbr/cbr_client.c | 6 | ||||
-rw-r--r-- | src/tools/operf/operf.c | 5 | ||||
-rw-r--r-- | src/tools/operf/operf_client.c | 23 |
11 files changed, 214 insertions, 287 deletions
diff --git a/include/ouroboros/shm_rdrbuff.h b/include/ouroboros/shm_rdrbuff.h index b16e2530..c4e1e7b0 100644 --- a/include/ouroboros/shm_rdrbuff.h +++ b/include/ouroboros/shm_rdrbuff.h @@ -43,18 +43,16 @@ void shm_rdrbuff_close(struct shm_rdrbuff * rdrb); void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb); -void * shm_rdrbuff_sanitize(void * o); +void shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb); /* returns the index of the buffer in the DU map */ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, - pid_t dst_api, size_t headspace, size_t tailspace, uint8_t * data, size_t data_len); ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, - pid_t dst_api, size_t headspace, size_t tailspace, uint8_t * data, diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 3f3c0e1e..fafe8651 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -67,13 +67,13 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t; #define THIS_TYPE IPCP_SHIM_ETH_LLC #define MGMT_SAP 0x01 -#define SHIM_ETH_LLC_MAX_SDU_SIZE 1500 #define MAC_SIZE 6 #define LLC_HEADER_SIZE 3 #define MAX_SAPS 64 #define ETH_HEADER_SIZE (2 * MAC_SIZE + 2) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \ + SHIM_ETH_LLC_MAX_SDU_SIZE) +#define SHIM_ETH_LLC_MAX_SDU_SIZE (1500 - LLC_HEADER_SIZE) #define EVENT_WAIT_TIMEOUT 100 /* us */ #define NAME_QUERY_TIMEOUT 100000000 /* ns */ diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index a228db06..2456f1e2 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -20,7 +20,10 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#define OUROBOROS_PREFIX "irm_flow" + #include <ouroboros/config.h> +#include <ouroboros/logs.h> #include "irm_flow.h" @@ -28,20 +31,12 @@ #include <stdbool.h> #include <assert.h> -struct irm_flow * irm_flow_create() +struct irm_flow * irm_flow_create(pid_t n_api, pid_t n_1_api, int port_id) { struct irm_flow * f = malloc(sizeof(*f)); if (f == NULL) return NULL; - f->n_api = -1; - f->n_1_api = -1; - f->port_id = -1; - f->n_rb = NULL; - f->n_1_rb = NULL; - - f->state = FLOW_NULL; - if (pthread_cond_init(&f->state_cond, NULL)) { free(f); return NULL; @@ -52,8 +47,29 @@ struct irm_flow * irm_flow_create() return NULL; } - f->t0.tv_sec = 0; - f->t0.tv_nsec = 0; + + f->n_api = n_api; + f->n_1_api = n_1_api; + f->port_id = port_id; + + f->n_rb = shm_rbuff_create(n_api, port_id); + if (f->n_rb == NULL) { + LOG_ERR("Could not create ringbuffer for AP-I %d.", n_api); + free(f); + return NULL; + } + + f->n_1_rb = shm_rbuff_create(n_1_api, port_id); + if (f->n_1_rb == NULL) { + LOG_ERR("Could not create ringbuffer for AP-I %d.", n_1_api); + free(f); + return NULL; + } + + f->state = FLOW_ALLOC_PENDING; + + if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) + LOG_WARN("Failed to set timestamp."); return f; } diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index 40a6bb8d..9cbc657d 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -56,7 +56,9 @@ struct irm_flow { pthread_mutex_t state_lock; }; -struct irm_flow * irm_flow_create(void); +struct irm_flow * irm_flow_create(pid_t n_api, + pid_t n_1_api, + int port_id); void irm_flow_destroy(struct irm_flow * f); diff --git a/src/irmd/main.c b/src/irmd/main.c index 6e3f952f..13bfa052 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -55,6 +55,7 @@ #include <sys/wait.h> #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ +#define SHM_SAN_HOLDOFF 1000 /* ms */ struct ipcp_entry { struct list_head next; @@ -1141,49 +1142,21 @@ static struct irm_flow * flow_alloc(pid_t api, } pthread_rwlock_unlock(&irmd->reg_lock); - - f = irm_flow_create(); - if (f == NULL) { - pthread_rwlock_unlock(&irmd->state_lock); - LOG_ERR("Failed to create irm_flow."); - return NULL; - } - - f->n_api = api; - f->state = FLOW_ALLOC_PENDING; - - if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) - LOG_WARN("Failed to set timestamp."); - pthread_rwlock_wrlock(&irmd->flows_lock); - - port_id = f->port_id = bmp_allocate(irmd->port_ids); + port_id = bmp_allocate(irmd->port_ids); if (!bmp_is_id_valid(irmd->port_ids, port_id)) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Could not allocate port_id."); - irm_flow_destroy(f); return NULL; } - f->n_1_api = ipcp; - f->n_rb = shm_rbuff_create(api, port_id); - if (f->n_rb == NULL) { - bmp_release(irmd->port_ids, port_id); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - LOG_ERR("Could not create ringbuffer for AP-I %d.", api); - irm_flow_destroy(f); - return NULL; - } - - f->n_1_rb = shm_rbuff_create(ipcp, port_id); - if (f->n_1_rb == NULL) { + f = irm_flow_create(api, ipcp, port_id); + if (f == NULL) { bmp_release(irmd->port_ids, port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - LOG_ERR("Could not create ringbuffer for AP-I %d.", ipcp); - irm_flow_destroy(f); + LOG_ERR("Could not allocate port_id."); return NULL; } @@ -1268,7 +1241,7 @@ static int flow_dealloc(pid_t api, int port_id) if (f == NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - LOG_DBG("Deallocate called for unknown port."); + LOG_DBG("Deallocate unknown port %d by %d.", port_id, api); return 0; } @@ -1297,11 +1270,12 @@ static int flow_dealloc(pid_t api, int port_id) } pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); if (n_1_api != -1) ret = ipcp_flow_dealloc(n_1_api, port_id); + pthread_rwlock_unlock(&irmd->state_lock); + return ret; } @@ -1351,21 +1325,11 @@ static struct irm_flow * flow_req_arr(pid_t api, struct pid_el * c_api; pid_t h_api = -1; + int port_id = -1; LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.", api, dst_name, ae_name); - f = irm_flow_create(); - if (f == NULL) { - LOG_ERR("Failed to create irm_flow."); - return NULL; - } - - f->state = FLOW_ALLOC_PENDING; - f->n_1_api = api; - if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) - LOG_WARN("Failed to set timestamp."); - pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->reg_lock); @@ -1374,7 +1338,6 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Unknown name: %s.", dst_name); - irm_flow_destroy(f); return NULL; } @@ -1387,14 +1350,12 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("No AP's for %s.", dst_name); - irm_flow_destroy(f); return NULL; case REG_NAME_AUTO_ACCEPT: c_api = malloc(sizeof(*c_api)); if (c_api == NULL) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - irm_flow_destroy(f); return NULL; } @@ -1412,7 +1373,6 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Could not get start apn for reg_entry %s.", re->name); - irm_flow_destroy(f); free(c_api); return NULL; } @@ -1439,7 +1399,6 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_mutex_unlock(&re->state_lock); pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - irm_flow_destroy(f); return NULL; } @@ -1447,13 +1406,12 @@ static struct irm_flow * flow_req_arr(pid_t api, case REG_NAME_FLOW_ACCEPT: pthread_mutex_lock(&re->state_lock); - h_api = f->n_api = reg_entry_get_api(re); + h_api = reg_entry_get_api(re); pthread_mutex_unlock(&re->state_lock); - if (f->n_api == -1) { + if (h_api == -1) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Invalid api returned."); - irm_flow_destroy(f); return NULL; } @@ -1462,39 +1420,25 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("IRMd in wrong state."); - irm_flow_destroy(f); return NULL; } - pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - f->port_id = bmp_allocate(irmd->port_ids); - if (!bmp_is_id_valid(irmd->port_ids, f->port_id)) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - LOG_ERR("Could not create ringbuffer for AP-I %d.", f->n_api); - irm_flow_destroy(f); - return NULL; - } - - f->n_rb = shm_rbuff_create(f->n_api, f->port_id); - if (f->n_rb == NULL) { - bmp_release(irmd->port_ids, f->port_id); + port_id = bmp_allocate(irmd->port_ids); + if (!bmp_is_id_valid(irmd->port_ids, port_id)) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - LOG_ERR("Could not create ringbuffer for AP-I %d.", f->n_api); - irm_flow_destroy(f); return NULL; } - f->n_1_rb = shm_rbuff_create(f->n_1_api, f->port_id); - if (f->n_1_rb == NULL) { - bmp_release(irmd->port_ids, f->port_id); + f = irm_flow_create(h_api, api, port_id); + if (f == NULL) { + bmp_release(irmd->port_ids, port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - LOG_ERR("Could not create ringbuffer for AP-I %d.", f->n_1_api); - irm_flow_destroy(f); + LOG_ERR("Could not allocate port_id."); return NULL; } @@ -1515,6 +1459,7 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_wrlock(&irmd->flows_lock); bmp_release(irmd->port_ids, f->port_id); + list_del(&f->next); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); LOG_ERR("Could not get api table entry for %d.", h_api); @@ -1580,7 +1525,6 @@ static void irm_destroy(void) list_for_each_safe(p, h, &irmd->irm_flows) { struct irm_flow * f = list_entry(p, struct irm_flow, next); list_del(&f->next); - ipcp_flow_dealloc(f->n_1_api, f->port_id); irm_flow_destroy(f); } @@ -1669,6 +1613,46 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) } } +void * shm_sanitize(void * o) +{ + struct list_head * p = NULL; + struct timespec ts = {SHM_SAN_HOLDOFF / 1000, + (SHM_SAN_HOLDOFF % 1000) * MILLION}; + ssize_t idx; + + (void) o; + + while (true) { + shm_rdrbuff_wait_full(irmd->rdrb); + + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); + + list_for_each(p, &irmd->irm_flows) { + struct irm_flow * f = + list_entry(p, struct irm_flow, next); + if (kill(f->n_api, 0) < 0) { + while ((idx = shm_rbuff_read(f->n_rb)) >= 0) + shm_rdrbuff_remove(irmd->rdrb, idx); + continue; + } + + if (kill(f->n_1_api, 0) < 0) { + while ((idx = shm_rbuff_read(f->n_1_rb)) >= 0) + shm_rdrbuff_remove(irmd->rdrb, idx); + continue; + } + } + + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); + + nanosleep(&ts, NULL); + } + + return (void *) 0; +} + void * irm_sanitize(void * o) { struct timespec now; @@ -2210,7 +2194,7 @@ int main(int argc, char ** argv) pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL); pthread_create(&irmd->shm_sanitize, NULL, - shm_rdrbuff_sanitize, irmd->rdrb); + shm_sanitize, irmd->rdrb); /* wait for (all of them) to return */ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) diff --git a/src/lib/dev.c b/src/lib/dev.c index fc8739a2..1c0d73a1 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -536,7 +536,7 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) return -1; } - ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); + ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); if (ai.flows[fd].rx_rb == NULL) { reset_flow(fd); bmp_release(ai.fds, fd); @@ -746,7 +746,6 @@ ssize_t flow_write(int fd, void * buf, size_t count) if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { idx = shm_rdrbuff_write(ai.rdrb, - ai.flows[fd].api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, @@ -766,7 +765,6 @@ ssize_t flow_write(int fd, void * buf, size_t count) } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; - pid_t api = ai.flows[fd].api; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -774,7 +772,6 @@ ssize_t flow_write(int fd, void * buf, size_t count) assert(tx_rb); idx = shm_rdrbuff_write_b(rdrb, - api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 301669e7..c0901ab1 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -43,12 +43,12 @@ #include <stdbool.h> #define FN_MAX_CHARS 255 -#define RB_CLOSED -1 #define RB_OPEN 0 +#define RB_CLOSED 1 #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \ - + 2 * sizeof(size_t) + sizeof(int8_t) \ - + sizeof(pthread_mutex_t) \ + + 3 * sizeof(size_t) \ + + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) #define shm_rbuff_used(rb) ((*rb->head + (SHM_BUFFER_SIZE) - *rb->tail) \ @@ -62,7 +62,7 @@ struct shm_rbuff { ssize_t * shm_base; /* start of entry */ size_t * head; /* start of ringbuffer head */ size_t * tail; /* start of ringbuffer tail */ - int8_t * acl; /* access control */ + size_t * acl; /* access control */ pthread_mutex_t * lock; /* lock all free space in shm */ pthread_cond_t * add; /* SDU arrived */ pthread_cond_t * del; /* SDU removed */ @@ -126,7 +126,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) rb->shm_base = shm_base; rb->head = (size_t *) (rb->shm_base + (SHM_BUFFER_SIZE)); rb->tail = rb->head + 1; - rb->acl = (int8_t *) (rb->tail + 1); + rb->acl = rb->tail + 1; rb->lock = (pthread_mutex_t *) (rb->acl + 1); rb->add = (pthread_cond_t *) (rb->lock + 1); rb->del = rb->add + 1; @@ -153,9 +153,6 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) rb->api = api; rb->port_id = port_id; - if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) - LOG_DBG("Couldn't unmap shared memory."); - return rb; } @@ -202,7 +199,7 @@ struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id) rb->shm_base = shm_base; rb->head = (size_t *) (rb->shm_base + (SHM_BUFFER_SIZE)); rb->tail = rb->head + 1; - rb->acl = (int8_t *) (rb->tail + 1); + rb->acl = rb->tail + 1; rb->lock = (pthread_mutex_t *) (rb->acl + 1); rb->add = (pthread_cond_t *) (rb->lock + 1); rb->del = rb->add + 1; @@ -225,13 +222,16 @@ void shm_rbuff_close(struct shm_rbuff * rb) void shm_rbuff_destroy(struct shm_rbuff * rb) { - char fn[25]; + char fn[FN_MAX_CHARS]; if (rb == NULL) return; sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id); + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + if (shm_unlink(fn) == -1) LOG_DBG("Failed to unlink shm %s.", fn); @@ -251,7 +251,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) pthread_mutex_consistent(rb->lock); } #endif - if (*rb->acl) { + if (*rb->acl == RB_CLOSED) { pthread_mutex_unlock(rb->lock); return -ENOTALLOC; } diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index dc1feb10..a8245447 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -42,16 +42,26 @@ #include <ouroboros/logs.h> #define SHM_BLOCKS_SIZE ((SHM_BUFFER_SIZE) * SHM_RDRB_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof(size_t) \ +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof(size_t) \ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) +#ifndef SHM_RDRB_MULTI_BLOCK +#define WAIT_BLOCKS 1 +#else +#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4) +#if WAIT_BLOCKS == 0 +#undef WAIT_BLOCKS +#define WAIT_BLOCKS 1 +#endif +#endif + #define get_head_ptr(rdrb) \ - ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->ptr_head \ + ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head \ * SHM_RDRB_BLOCK_SIZE))) #define get_tail_ptr(rdrb) \ - ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->ptr_tail \ + ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail \ * SHM_RDRB_BLOCK_SIZE))) #define idx_to_du_buff_ptr(rdrb, idx) \ @@ -61,13 +71,19 @@ (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE) #define shm_rdrb_used(rdrb) \ - ((*rdrb->ptr_head + (SHM_BUFFER_SIZE) - *rdrb->ptr_tail) \ + ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) \ & ((SHM_BUFFER_SIZE) - 1)) + #define shm_rdrb_free(rdrb, i) \ (shm_rdrb_used(rdrb) + i < (SHM_BUFFER_SIZE)) #define shm_rdrb_empty(rdrb) \ - (*rdrb->ptr_tail == *rdrb->ptr_head) + (*rdrb->tail == *rdrb->head) + +enum shm_du_buff_flags { + SDB_VALID = 0, + SDB_NULL +}; struct shm_du_buff { size_t size; @@ -76,20 +92,18 @@ struct shm_du_buff { #endif size_t du_head; size_t du_tail; - pid_t dst_api; + size_t flags; size_t idx; }; struct shm_rdrbuff { - uint8_t * shm_base; /* start of blocks */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * lock; /* lock all free space in shm */ - size_t * choked; /* stale sdu detection */ - pthread_cond_t * healthy; /* du map is healthy */ - pthread_cond_t * full; /* run sanitizer when buffer full */ - pid_t * api; /* api of the irmd owner */ - enum qos_cube qos; /* qos id which this buffer serves */ + uint8_t * shm_base; /* start of blocks */ + size_t * head; /* start of ringbuffer head */ + size_t * tail; /* start of ringbuffer tail */ + pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_cond_t * full; /* flag when full */ + pthread_cond_t * healthy; /* flag when SDU is read */ + pid_t * api; /* api of the irmd owner */ }; static void garbage_collect(struct shm_rdrbuff * rdrb) @@ -97,61 +111,31 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) #ifdef SHM_RDRB_MULTI_BLOCK struct shm_du_buff * sdb; while (!shm_rdrb_empty(rdrb) && - (sdb = get_tail_ptr(rdrb))->dst_api == -1) - *rdrb->ptr_tail = (*rdrb->ptr_tail + sdb->blocks) + (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL) + *rdrb->tail = (*rdrb->tail + sdb->blocks) & ((SHM_BUFFER_SIZE) - 1); #else - while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->dst_api == -1) - *rdrb->ptr_tail = - (*rdrb->ptr_tail + 1) & ((SHM_BUFFER_SIZE) - 1); - + while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL) + *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); #endif + pthread_cond_broadcast(rdrb->healthy); } -static void clean_sdus(struct shm_rdrbuff * rdrb, pid_t api) -{ - size_t idx = *rdrb->ptr_tail; - struct shm_du_buff * buf; - - while (idx != *rdrb->ptr_head) { - buf = idx_to_du_buff_ptr(rdrb, idx); - if (buf->dst_api == api) - buf->dst_api = -1; -#ifdef SHM_RDRB_MULTI_BLOCK - idx = (idx + buf->blocks) & ((SHM_BUFFER_SIZE) - 1); -#else - idx = (idx + 1) & ((SHM_BUFFER_SIZE) - 1); -#endif - } - - garbage_collect(rdrb); - - *rdrb->choked = 0; -} - -static char * rdrb_filename(enum qos_cube qos) +static char * rdrb_filename(void) { - size_t chars = 0; char * str; - int qm = QOS_MAX; - do { - qm /= 10; - ++chars; - } while (qm > 0); - - str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1); + str = malloc(strlen(SHM_RDRB_PREFIX) + 1); if (str == NULL) { LOG_ERR("Failed to create shm_rdrbuff: Out of Memory."); return NULL; } - sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos); + sprintf(str, "%s", SHM_RDRB_PREFIX); return str; } -/* FIXME: create a ringbuffer for each qos cube in the system */ struct shm_rdrbuff * shm_rdrbuff_create() { struct shm_rdrbuff * rdrb; @@ -160,8 +144,7 @@ struct shm_rdrbuff * shm_rdrbuff_create() uint8_t * shm_base; pthread_mutexattr_t mattr; pthread_condattr_t cattr; - enum qos_cube qos = QOS_CUBE_BE; - char * shm_rdrb_fn = rdrb_filename(qos); + char * shm_rdrb_fn = rdrb_filename(); if (shm_rdrb_fn == NULL) { LOG_ERR("Could not create rdrbuff. Out of Memory"); return NULL; @@ -212,14 +195,12 @@ struct shm_rdrbuff * shm_rdrbuff_create() } rdrb->shm_base = shm_base; - rdrb->ptr_head = (size_t *) - ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); - rdrb->ptr_tail = rdrb->ptr_head + 1; - rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); - rdrb->choked = (size_t *) (rdrb->lock + 1); - rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); - rdrb->full = rdrb->healthy + 1; - rdrb->api = (pid_t *) (rdrb->full + 1); + rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); + rdrb->tail = rdrb->head + 1; + rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1); + rdrb->full = (pthread_cond_t *) (rdrb->lock + 1); + rdrb->healthy = rdrb->full + 1; + rdrb->api = (pid_t *) (rdrb->healthy + 1); pthread_mutexattr_init(&mattr); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -236,29 +217,22 @@ struct shm_rdrbuff * shm_rdrbuff_create() pthread_cond_init(rdrb->full, &cattr); pthread_cond_init(rdrb->healthy, &cattr); - *rdrb->ptr_head = 0; - *rdrb->ptr_tail = 0; - - *rdrb->choked = 0; + *rdrb->head = 0; + *rdrb->tail = 0; *rdrb->api = getpid(); - rdrb->qos = qos; - free(shm_rdrb_fn); return rdrb; } -/* FIXME: open a ringbuffer for each qos cube in the system */ struct shm_rdrbuff * shm_rdrbuff_open() { struct shm_rdrbuff * rdrb; int shm_fd; uint8_t * shm_base; - - enum qos_cube qos = QOS_CUBE_BE; - char * shm_rdrb_fn = rdrb_filename(qos); + char * shm_rdrb_fn = rdrb_filename(); if (shm_rdrb_fn == NULL) { LOG_ERR("Could not create rdrbuff. Out of Memory"); return NULL; @@ -297,32 +271,20 @@ struct shm_rdrbuff * shm_rdrbuff_open() } rdrb->shm_base = shm_base; - rdrb->ptr_head = (size_t *) - ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); - rdrb->ptr_tail = rdrb->ptr_head + 1; - rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); - rdrb->choked = (size_t *) (rdrb->lock + 1); - rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); - rdrb->full = rdrb->healthy + 1; - rdrb->api = (pid_t *) (rdrb->full + 1); - - rdrb->qos = qos; + rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); + rdrb->tail = rdrb->head + 1; + rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1); + rdrb->full = (pthread_cond_t *) (rdrb->lock + 1); + rdrb->healthy = rdrb->full + 1; + rdrb->api = (pid_t *) (rdrb->healthy + 1); free(shm_rdrb_fn); return rdrb; } -void * shm_rdrbuff_sanitize(void * o) +void shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb) { - struct shm_rdrbuff * rdrb = (struct shm_rdrbuff *) o; - struct timespec intv - = {SHM_DU_TIMEOUT_MICROS / MILLION, - (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; - - pid_t api; - - assert(o); #ifdef __APPLE__ pthread_mutex_lock(rdrb->lock); @@ -332,14 +294,10 @@ void * shm_rdrbuff_sanitize(void * o) pthread_mutex_consistent(rdrb->lock); } #endif - pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, (void *) rdrb->lock); - while (true) { - int ret = 0; - struct timespec now; - struct timespec dl; + while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) { #ifdef __APPLE__ pthread_cond_wait(rdrb->full, rdrb->lock); #else @@ -348,49 +306,11 @@ void * shm_rdrbuff_sanitize(void * o) pthread_mutex_consistent(rdrb->lock); } #endif - *rdrb->choked = 1; - - garbage_collect(rdrb); - - if (shm_rdrb_empty(rdrb)) { - pthread_cond_broadcast(rdrb->healthy); - continue; - } - - api = get_tail_ptr(rdrb)->dst_api; - - if (kill(api, 0)) { - LOG_DBGF("Dead process %d left stale sdu.", api); - clean_sdus(rdrb, api); - pthread_cond_broadcast(rdrb->healthy); - continue; - } - - clock_gettime(CLOCK_REALTIME, &now); - ts_add(&now, &intv, &dl); - while (*rdrb->choked) { - ret = pthread_cond_timedwait(rdrb->healthy, - rdrb->lock, - &dl); - if (!ret) - continue; -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(rdrb->lock); - } -#endif - if (ret == ETIMEDOUT) { - LOG_DBGF("SDU timed out (dst: %d).", api); - clean_sdus(rdrb, api); - } - } - pthread_cond_broadcast(rdrb->healthy); } - pthread_cleanup_pop(true); + garbage_collect(rdrb); - return (void *) 0; + pthread_cleanup_pop(true); } void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) @@ -417,7 +337,7 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); - shm_rdrb_fn = rdrb_filename(rdrb->qos); + shm_rdrb_fn = rdrb_filename(); if (shm_rdrb_fn == NULL) { LOG_ERR("Could not create rdrbuff. Out of Memory"); return; @@ -431,7 +351,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) } ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, - pid_t dst_api, size_t headspace, size_t tailspace, uint8_t * data, @@ -444,7 +363,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, size_t padblocks = 0; #endif ssize_t sz = size + sizeof(*sdb); - uint8_t * write_pos; assert(rdrb); assert(data); @@ -469,14 +387,15 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, ++blocks; } - if (blocks + *rdrb->ptr_head > (SHM_BUFFER_SIZE)) - padblocks = (SHM_BUFFER_SIZE) - *rdrb->ptr_head; + if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) + padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; if (!shm_rdrb_free(rdrb, blocks + padblocks)) { #else if (!shm_rdrb_free(rdrb, 1)) { #endif - pthread_cond_signal(rdrb->full); + LOG_DBG("buffer full, idx = %ld.", *rdrb->tail); + pthread_cond_broadcast(rdrb->full); pthread_mutex_unlock(rdrb->lock); return -1; } @@ -486,31 +405,29 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->dst_api = -1; + sdb->flags = SDB_NULL; sdb->du_head = 0; sdb->du_tail = 0; - sdb->idx = *rdrb->ptr_head; + sdb->idx = *rdrb->head; - *rdrb->ptr_head = 0; + *rdrb->head = 0; } #endif sdb = get_head_ptr(rdrb); sdb->size = size; - sdb->dst_api = dst_api; + sdb->flags = SDB_VALID; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; #endif - write_pos = ((uint8_t *) (sdb + 1)) + headspace; - - memcpy(write_pos, data, len); + memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); - sdb->idx = *rdrb->ptr_head; + sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK - *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & ((SHM_BUFFER_SIZE) - 1); + *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); #else - *rdrb->ptr_head = (*rdrb->ptr_head + 1) & ((SHM_BUFFER_SIZE) - 1); + *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); #endif pthread_mutex_unlock(rdrb->lock); @@ -518,7 +435,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, } ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, - pid_t dst_api, size_t headspace, size_t tailspace, uint8_t * data, @@ -531,7 +447,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, size_t padblocks = 0; #endif ssize_t sz = size + sizeof(*sdb); - uint8_t * write_pos; assert(rdrb); assert(data); @@ -559,14 +474,14 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, ++blocks; } - if (blocks + *rdrb->ptr_head > (SHM_BUFFER_SIZE)) - padblocks = (SHM_BUFFER_SIZE) - *rdrb->ptr_head; + if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) + padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; while (!shm_rdrb_free(rdrb, (blocks + padblocks))) { #else while (!shm_rdrb_free(rdrb, 1)) { #endif - pthread_cond_signal(rdrb->full); + pthread_cond_broadcast(rdrb->full); pthread_cond_wait(rdrb->healthy, rdrb->lock); } @@ -575,31 +490,29 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->dst_api = -1; + sdb->flags = SDB_NULL; sdb->du_head = 0; sdb->du_tail = 0; - sdb->idx = *rdrb->ptr_head; + sdb->idx = *rdrb->head; - *rdrb->ptr_head = 0; + *rdrb->head = 0; } #endif sdb = get_head_ptr(rdrb); sdb->size = size; - sdb->dst_api = dst_api; + sdb->flags = SDB_VALID; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; #endif - write_pos = ((uint8_t *) (sdb + 1)) + headspace; - - memcpy(write_pos, data, len); + memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); - sdb->idx = *rdrb->ptr_head; + sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK - *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & ((SHM_BUFFER_SIZE) - 1); + *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); #else - *rdrb->ptr_head = (*rdrb->ptr_head + 1) & ((SHM_BUFFER_SIZE) - 1); + *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); #endif pthread_cleanup_pop(true); @@ -684,18 +597,15 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, size_t idx) return -1; } - idx_to_du_buff_ptr(rdrb, idx)->dst_api = -1; + idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; - if (idx != *rdrb->ptr_tail) { + if (idx != *rdrb->tail) { pthread_mutex_unlock(rdrb->lock); return 0; } garbage_collect(rdrb); - *rdrb->choked = 0; - - pthread_cond_broadcast(rdrb->healthy); pthread_mutex_unlock(rdrb->lock); return 0; diff --git a/src/tools/cbr/cbr_client.c b/src/tools/cbr/cbr_client.c index b2cf7d7f..58198b86 100644 --- a/src/tools/cbr/cbr_client.c +++ b/src/tools/cbr/cbr_client.c @@ -46,7 +46,7 @@ int client_main(char * server, int result = 0; bool stop = false; char buf[size]; - int seqnr = 0; + long seqnr = 0; long gap = size * 8.0 * (BILLION / (double) rate); struct timespec start; @@ -114,8 +114,8 @@ int client_main(char * server, ms = ts_diff_ms(&start, &end); printf("sent statistics: " - "%9d SDUs, %12d bytes in %9d ms, %4.4f Mb/s\n", - seqnr, seqnr * size, ms, (seqnr * size * 8.0)/(ms * 1000)); + "%9ld SDUs, %12ld bytes in %9d ms, %4.4f Mb/s\n", + seqnr, seqnr * size, ms, (seqnr / (ms * 1000.0)) * size * 8.0); flow_dealloc(fd); diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c index b52109cf..46dfc14d 100644 --- a/src/tools/operf/operf.c +++ b/src/tools/operf/operf.c @@ -41,6 +41,7 @@ struct c { int size; long rate; bool flood; + bool sleep; int duration; size_t sent; @@ -81,6 +82,7 @@ static void usage(void) " -r, --rate Rate (b/s)\n" " -s, --size Payload size (B, default 1500)\n" " -f, --flood Send SDUs as fast as possible\n" + " --sleep Sleep in between sending SDUs\n" " --help Display this help text and exit\n"); } @@ -100,6 +102,7 @@ int main(int argc, char ** argv) server.timeout = 1000; /* ms */ client.rate = 1000000; client.flood = false; + client.sleep = false; while (argc > 0) { if (strcmp(*argv, "-n") == 0 || @@ -127,6 +130,8 @@ int main(int argc, char ** argv) } else if (strcmp(*argv, "-f") == 0 || strcmp(*argv, "--flood") == 0) { client.flood = true; + } else if (strcmp(*argv, "--sleep") == 0) { + client.sleep = true; } else if (strcmp(*argv, "-l") == 0 || strcmp(*argv, "--listen") == 0) { serv = true; diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c index 1f6226d4..902a7b41 100644 --- a/src/tools/operf/operf_client.c +++ b/src/tools/operf/operf_client.c @@ -36,6 +36,17 @@ #include <errno.h> #include <float.h> +static void busy_wait_until(const struct timespec * deadline) +{ + struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); + while (now.tv_sec < deadline->tv_sec) + clock_gettime(CLOCK_REALTIME, &now); + while (now.tv_sec == deadline->tv_sec + && now.tv_nsec < deadline->tv_nsec) + clock_gettime(CLOCK_REALTIME, &now); +} + void shutdown_client(int signo, siginfo_t * info, void * c) { (void) info; @@ -85,6 +96,7 @@ void * writer(void * o) struct timespec now; struct timespec start; struct timespec intv = {(gap / BILLION), gap % BILLION}; + struct timespec end = {0, 0}; char * buf = malloc(client.size); if (buf == NULL) @@ -123,6 +135,9 @@ void * writer(void * o) } } else { while (ts_diff_ms(&start, &now) < client.duration) { + clock_gettime(CLOCK_REALTIME, &now); + ts_add(&now, &intv, &end); + if (flow_write(*fdp, buf, client.size) == -1) { printf("Failed to send SDU.\n"); flow_dealloc(*fdp); @@ -131,10 +146,10 @@ void * writer(void * o) } ++client.sent; - - nanosleep(&intv, NULL); - - clock_gettime(CLOCK_REALTIME, &now); + if (client.sleep) + nanosleep(&intv, NULL); + else + busy_wait_until(&end); } } |