From 4f44c91c68a3706e04334066f28471d56cc71849 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 2 Sep 2016 13:48:36 +0200 Subject: lib: Add northbound ringbuffers Fast path is split in north and southbound paths. --- include/ouroboros/shm_ap_rbuff.h | 30 ++++++++++++++++++----- src/ipcpd/local/main.c | 6 ++--- src/ipcpd/normal/fmgr.c | 4 ++-- src/ipcpd/normal/main.c | 2 +- src/ipcpd/shim-eth-llc/main.c | 6 ++--- src/ipcpd/shim-udp/main.c | 6 ++--- src/irmd/main.c | 14 +++++++---- src/lib/dev.c | 6 ++--- src/lib/shm_ap_rbuff.c | 51 ++++++++++++++++++++++++++++++++++------ src/lib/shm_rdrbuff.c | 4 ++-- 10 files changed, 94 insertions(+), 35 deletions(-) diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index 9dad0863..594c9260 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -35,20 +35,38 @@ struct rb_entry { int port_id; }; -struct shm_ap_rbuff * shm_ap_rbuff_create(); -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api); +/* recv SDUs from N + 1 */ +struct shm_ap_rbuff * shm_ap_rbuff_create_n(); + +/* recv SDUs from N - 1 */ +struct shm_ap_rbuff * shm_ap_rbuff_create_s(); + +/* write SDUs to N - 1 */ +struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api); + +/* write SDUs to N + 1 */ +struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api); + void shm_ap_rbuff_close(struct shm_ap_rbuff * rb); + void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb); + int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, - struct rb_entry * e); + struct rb_entry * e); + struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb); + int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb); + int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, const struct timespec * timeout); + ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, - int port_id); -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, + int port_id); + +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, + int port_id, const struct timespec * timeout); + void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb); #endif /* OUROBOROS_SHM_AP_RBUFF_H */ diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index f1b6dd9e..c0809429 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -105,7 +105,7 @@ static int shim_ap_init() return -1; } - _ap_instance->rb = shm_ap_rbuff_create(); + _ap_instance->rb = shm_ap_rbuff_create_n(); if (_ap_instance->rb == NULL) { shm_rdrbuff_close(_ap_instance->rdrb); bmp_destroy(_ap_instance->fds); @@ -331,7 +331,7 @@ static int ipcp_local_flow_alloc(pid_t n_api, return -1; /* -ENOTENROLLED */ } - rb = shm_ap_rbuff_open(n_api); + rb = shm_ap_rbuff_open_s(n_api); if (rb == NULL) { pthread_rwlock_unlock(&_ipcp->state_lock); return -1; /* -ENORBUFF */ @@ -421,7 +421,7 @@ static int ipcp_local_flow_alloc_resp(pid_t n_api, return -1; } - rb = shm_ap_rbuff_open(n_api); + rb = shm_ap_rbuff_open_s(n_api); if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); _ap_instance->flows[in_fd].state = FLOW_NULL; diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 3056b46d..d74ad0c8 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -367,7 +367,7 @@ int fmgr_flow_alloc(pid_t n_api, free(buf.data); - flow->flow.rb = shm_ap_rbuff_open(n_api); + flow->flow.rb = shm_ap_rbuff_open_s(n_api); if (flow->flow.rb == NULL) { pthread_mutex_unlock(&fmgr->n_flows_lock); free(flow); @@ -478,7 +478,7 @@ int fmgr_flow_alloc_resp(pid_t n_api, flow->flow.state = FLOW_ALLOCATED; flow->flow.api = n_api; - flow->flow.rb = shm_ap_rbuff_open(n_api); + flow->flow.rb = shm_ap_rbuff_open_s(n_api); if (flow->flow.rb == NULL) { n_flow_dealloc(port_id); pthread_mutex_unlock(&fmgr->n_flows_lock); diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index cf4ae3f1..082973f4 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -212,7 +212,7 @@ struct normal_ipcp_data * normal_ipcp_data_create() return NULL; } - normal_data->rb = shm_ap_rbuff_open(getpid()); + normal_data->rb = shm_ap_rbuff_create_n(); if (normal_data->rb == NULL) { shm_rdrbuff_close(normal_data->rdrb); free(normal_data); diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index d1100001..028d249f 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -161,7 +161,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create() return NULL; } - eth_llc_data->rb = shm_ap_rbuff_create(); + eth_llc_data->rb = shm_ap_rbuff_create_n(); if (eth_llc_data->rb == NULL) { shm_rdrbuff_close(eth_llc_data->rdrb); free(eth_llc_data); @@ -1084,7 +1084,7 @@ static int eth_llc_ipcp_flow_alloc(pid_t n_api, if (qos != QOS_CUBE_BE) LOG_DBGF("QoS requested. Ethernet LLC can't do that. For now."); - rb = shm_ap_rbuff_open(n_api); + rb = shm_ap_rbuff_open_s(n_api); if (rb == NULL) return -1; /* -ENORBUFF */ @@ -1169,7 +1169,7 @@ static int eth_llc_ipcp_flow_alloc_resp(pid_t n_api, return -1; } - rb = shm_ap_rbuff_open(n_api); + rb = shm_ap_rbuff_open_s(n_api); if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); ipcp_flow(index)->state = FLOW_NULL; diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 451a2a4c..85de1eec 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -128,7 +128,7 @@ static int shim_ap_init() return -1; } - _ap_instance->rb = shm_ap_rbuff_create(); + _ap_instance->rb = shm_ap_rbuff_create_n(); if (_ap_instance->rb == NULL) { shm_rdrbuff_close(_ap_instance->rdrb); bmp_destroy(_ap_instance->fds); @@ -1179,7 +1179,7 @@ static int ipcp_udp_flow_alloc(pid_t n_api, if (qos != QOS_CUBE_BE) LOG_DBG("QoS requested. UDP/IP can't do that."); - rb = shm_ap_rbuff_open(n_api); + rb = shm_ap_rbuff_open_s(n_api); if (rb == NULL) return -1; /* -ENORBUFF */ @@ -1333,7 +1333,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_api, return -1; } - rb = shm_ap_rbuff_open(n_api); + rb = shm_ap_rbuff_open_s(n_api); if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); _ap_instance->flows[fd].state = FLOW_NULL; diff --git a/src/irmd/main.c b/src/irmd/main.c index a69dd526..a4962c3b 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1742,7 +1742,7 @@ void * irm_sanitize() if (kill(f->n_api, 0) < 0) { struct shm_ap_rbuff * n_rb = - shm_ap_rbuff_open(f->n_api); + shm_ap_rbuff_open_s(f->n_api); bmp_release(irmd->port_ids, f->port_id); list_del(&f->next); @@ -1755,13 +1755,17 @@ void * irm_sanitize() continue; } if (kill(f->n_1_api, 0) < 0) { - struct shm_ap_rbuff * n_1_rb = - shm_ap_rbuff_open(f->n_1_api); + struct shm_ap_rbuff * n_1_rb_s = + shm_ap_rbuff_open_s(f->n_1_api); + struct shm_ap_rbuff * n_1_rb_n = + shm_ap_rbuff_open_n(f->n_1_api); list_del(&f->next); LOG_ERR("IPCP %d gone, flow %d removed.", f->n_1_api, f->port_id); - if (n_1_rb != NULL) - shm_ap_rbuff_destroy(n_1_rb); + if (n_1_rb_n != NULL) + shm_ap_rbuff_destroy(n_1_rb_n); + if (n_1_rb_s != NULL) + shm_ap_rbuff_destroy(n_1_rb_s); irm_flow_destroy(f); } } diff --git a/src/lib/dev.c b/src/lib/dev.c index 17c473ed..25c3fcd4 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -112,7 +112,7 @@ int ap_init(char * ap_name) return -1; } - _ap_instance->rb = shm_ap_rbuff_create(); + _ap_instance->rb = shm_ap_rbuff_create_s(); if (_ap_instance->rb == NULL) { shm_rdrbuff_close(_ap_instance->rdrb); bmp_destroy(_ap_instance->fds); @@ -216,7 +216,7 @@ int flow_accept(char ** ae_name) return -1; } - _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->api); + _ap_instance->flows[cfd].rb = shm_ap_rbuff_open_n(recv_msg->api); if (_ap_instance->flows[cfd].rb == NULL) { bmp_release(_ap_instance->fds, cfd); pthread_rwlock_unlock(&_ap_instance->flows_lock); @@ -347,7 +347,7 @@ int flow_alloc(char * dst_name, return -1; } - _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); + _ap_instance->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); if (_ap_instance->flows[fd].rb == NULL) { bmp_release(_ap_instance->fds, fd); pthread_rwlock_unlock(&_ap_instance->flows_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 77e288a8..6cc9590e 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -40,6 +40,10 @@ #include #include +#define FN_MAX_CHARS 255 +#define NORTH false +#define SOUTH true + #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) @@ -59,19 +63,23 @@ struct shm_ap_rbuff { pthread_cond_t * add; /* SDU arrived */ pthread_cond_t * del; /* SDU removed */ pid_t api; /* api to which this rb belongs */ + bool dir; /* direction, false = N */ int fd; }; -struct shm_ap_rbuff * shm_ap_rbuff_create() +static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) { struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; pthread_mutexattr_t mattr; pthread_condattr_t cattr; - char fn[25]; + char fn[FN_MAX_CHARS]; - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); + if (dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid()); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid()); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -150,18 +158,22 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() rb->fd = shm_fd; rb->api = getpid(); + rb->dir = dir; return rb; } -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) +static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir) { struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; - char fn[25]; + char fn[FN_MAX_CHARS]; - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); + if (dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -204,9 +216,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) rb->fd = shm_fd; rb->api = api; + rb->dir = dir; return rb; } + +struct shm_ap_rbuff * shm_ap_rbuff_create_n() +{ + return shm_ap_rbuff_create(NORTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_create_s() +{ + return shm_ap_rbuff_create(SOUTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api) +{ + return shm_ap_rbuff_open(api, NORTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api) +{ + return shm_ap_rbuff_open(api, SOUTH); +} + void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) { if (rb == NULL) { @@ -252,7 +286,10 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); + if (rb->dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api); if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 93a889ce..07574f1a 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -139,13 +139,13 @@ static char * rdrb_filename(enum qos_cube qos) ++chars; } while (qm > 0); - str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2); + str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1); if (str == NULL) { LOG_ERR("Failed to create shm_rdrbuff: Out of Memory."); return NULL; } - sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos); + sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos); return str; } -- cgit v1.2.3