From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 19 Oct 2016 22:25:46 +0200 Subject: lib: Demultiplex the fast path The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API. --- CMakeLists.txt | 1 + include/ouroboros/CMakeLists.txt | 4 +- include/ouroboros/config.h.in | 4 +- include/ouroboros/fqueue.h | 62 ++++ include/ouroboros/ipcp-dev.h | 3 - include/ouroboros/local-dev.h | 8 +- include/ouroboros/select.h | 52 --- include/ouroboros/shm_ap_rbuff.h | 73 ---- include/ouroboros/shm_flow_set.h | 63 ++++ include/ouroboros/shm_rbuff.h | 53 +++ include/ouroboros/wrap/ouroboros.i | 4 +- src/ipcpd/ipcp.c | 6 +- src/ipcpd/local/main.c | 68 +++- src/ipcpd/normal/fmgr.c | 143 ++++---- src/ipcpd/shim-eth-llc/main.c | 106 ++++-- src/ipcpd/shim-udp/main.c | 84 +++-- src/irmd/ipcp.c | 13 +- src/irmd/main.c | 14 +- src/lib/CMakeLists.txt | 3 +- src/lib/dev.c | 500 ++++++++++++++++++---------- src/lib/lockfile.c | 39 +-- src/lib/shm_ap_rbuff.c | 661 ------------------------------------- src/lib/shm_flow_set.c | 408 +++++++++++++++++++++++ src/lib/shm_rbuff.c | 424 ++++++++++++++++++++++++ src/lib/shm_rdrbuff.c | 29 +- src/tools/oping/oping.c | 4 +- src/tools/oping/oping_client.c | 25 +- src/tools/oping/oping_server.c | 21 +- 28 files changed, 1686 insertions(+), 1189 deletions(-) create mode 100644 include/ouroboros/fqueue.h delete mode 100644 include/ouroboros/select.h delete mode 100644 include/ouroboros/shm_ap_rbuff.h create mode 100644 include/ouroboros/shm_flow_set.h create mode 100644 include/ouroboros/shm_rbuff.h delete mode 100644 src/lib/shm_ap_rbuff.c create mode 100644 src/lib/shm_flow_set.c create mode 100644 src/lib/shm_rbuff.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 0bfb46d8..95ed6b8a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,6 +43,7 @@ test_and_set_c_compiler_flag_global(-std=c89) test_and_set_c_compiler_flag_global(-Wall) test_and_set_c_compiler_flag_global(-Werror) test_and_set_c_compiler_flag_global(-Wundef) +test_and_set_c_compiler_flag_global(-Wdeclaration-after-statement) test_and_set_c_compiler_flag_global(-fmax-errors=5) configure_file( diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt index f24857ed..41feb65e 100644 --- a/include/ouroboros/CMakeLists.txt +++ b/include/ouroboros/CMakeLists.txt @@ -7,11 +7,11 @@ set(HEADER_FILES dev.h errno.h fcntl.h + fqueue.h irm.h irm_config.h nsm.h - qos.h - select.h) + qos.h) install(FILES ${HEADER_FILES} DESTINATION usr/include/ouroboros) diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 143ae7c8..a9d65aec 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -36,6 +36,7 @@ #define IPCP_NORMAL_EXEC "@IPCP_NORMAL_TARGET@" #define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@" #define AP_MAX_FLOWS 256 +#define AP_MAX_FQUEUES 64 #define SHM_RDRB_BLOCK_SIZE sysconf(_SC_PAGESIZE) #define SHM_RDRB_MULTI_BLOCK #define SHM_RDRB_PREFIX "/ouroboros.rdrb." @@ -43,7 +44,8 @@ #define SHM_BUFFER_SIZE (1 << 14) #define DU_BUFF_HEADSPACE 128 #define DU_BUFF_TAILSPACE 0 -#define SHM_AP_RBUFF_PREFIX "/ouroboros.rbuff." +#define SHM_RBUFF_PREFIX "/ouroboros.rbuff." +#define SHM_FLOW_SET_PREFIX "/ouroboros.sets." #define IRMD_MAX_FLOWS 4096 #define IRMD_THREADPOOL_SIZE 5 #define LOG_DIR "/@LOG_DIR@/" diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h new file mode 100644 index 00000000..943d6510 --- /dev/null +++ b/include/ouroboros/fqueue.h @@ -0,0 +1,62 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Flow queues + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_FQUEUE_H +#define OUROBOROS_FQUEUE_H + +#include +#include + +struct flow_set; + +struct fqueue; + +typedef struct flow_set flow_set_t; +typedef struct fqueue fqueue_t; + +flow_set_t * flow_set_create(); + +void flow_set_destroy(flow_set_t * set); + +fqueue_t * fqueue_create(); + +void fqueue_destroy(struct fqueue * fq); + +void flow_set_zero(flow_set_t * set); + +int flow_set_add(flow_set_t * set, + int fd); + +bool flow_set_has(flow_set_t * set, + int fd); + +void flow_set_del(flow_set_t * set, + int fd); + +int fqueue_next(fqueue_t * fq); + +int flow_event_wait(flow_set_t * set, + fqueue_t * fq, + const struct timespec * timeout); + +#endif /* OUROBOROS_SELECT_H */ diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 9343aeaa..3ab05bd7 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -47,7 +47,4 @@ int ipcp_flow_write(int fd, void ipcp_flow_del(struct shm_du_buff * sdb); -/* returns flow descriptor and du buff */ -int ipcp_read_shim(struct shm_du_buff ** sdb); - #endif /* OUROBOROS_IPCP_DEV_H */ diff --git a/include/ouroboros/local-dev.h b/include/ouroboros/local-dev.h index 77ff47e9..30f440b1 100644 --- a/include/ouroboros/local-dev.h +++ b/include/ouroboros/local-dev.h @@ -20,14 +20,12 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#include - #ifndef OUROBOROS_LOCAL_DEV_H #define OUROBOROS_LOCAL_DEV_H -struct rb_entry * local_flow_read(int fd); +ssize_t local_flow_read(int fd); -int local_flow_write(int fd, - struct rb_entry * e); +int local_flow_write(int fd, + ssize_t idx); #endif /* OUROBOROS_LOCAL_DEV_H */ diff --git a/include/ouroboros/select.h b/include/ouroboros/select.h deleted file mode 100644 index de309b8d..00000000 --- a/include/ouroboros/select.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * A select call for flows - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_SELECT_H -#define OUROBOROS_SELECT_H - -#include -#include - -struct flow_set; - -typedef struct flow_set flow_set_t; - -flow_set_t * flow_set_create(); - -void flow_set_destroy(flow_set_t * set); - -void flow_set_zero(flow_set_t * set); - -void flow_set_add(flow_set_t * set, - int fd); - -void flow_set_del(flow_set_t * set, - int fd); - -bool flow_set_has(flow_set_t * set, - int fd); - -int flow_select(flow_set_t * set, - const struct timespec * timeout); - -#endif /* OUROBOROS_SELECT_H */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h deleted file mode 100644 index 453e4bf8..00000000 --- a/include/ouroboros/shm_ap_rbuff.h +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Ring buffer for application processes - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_SHM_AP_RBUFF_H -#define OUROBOROS_SHM_AP_RBUFF_H - -#include -#include -#include -#include - -struct shm_ap_rbuff; - -struct rb_entry { - ssize_t index; - int port_id; -}; - -struct shm_ap_rbuff * shm_ap_rbuff_create(); - -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api); - -void shm_ap_rbuff_close(struct shm_ap_rbuff * rb); - -void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb); - -void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, - int port_id); - -int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, - int port_id); - -int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, - struct rb_entry * e); - -struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb); - -int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb); - -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - bool * set, - const struct timespec * timeout); - -ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, - int port_id); - -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, - const struct timespec * timeout); - -void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb); - -#endif /* OUROBOROS_SHM_AP_RBUFF_H */ diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h new file mode 100644 index 00000000..32db5d36 --- /dev/null +++ b/include/ouroboros/shm_flow_set.h @@ -0,0 +1,63 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Management of flow_sets for fqueue + * + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SHM_FLOW_SET_H +#define OUROBOROS_SHM_FLOW_SET_H + +#include + +#include + +struct shm_flow_set; + +struct shm_flow_set * shm_flow_set_create(); + +void shm_flow_set_destroy(struct shm_flow_set * set); + +struct shm_flow_set * shm_flow_set_open(pid_t api); + +void shm_flow_set_close(struct shm_flow_set * set); + +void shm_flow_set_zero(struct shm_flow_set * shm_set, + ssize_t idx); + +int shm_flow_set_add(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id); + +int shm_flow_set_has(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id); + +void shm_flow_set_del(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id); + +void shm_flow_set_notify(struct shm_flow_set * set, + int port_id); + +int shm_flow_set_wait(const struct shm_flow_set * shm_set, + ssize_t idx, + int * fqueue, + const struct timespec * timeout); + +#endif /* OUROBOROS_SHM_FLOW_SET_H */ diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h new file mode 100644 index 00000000..03660b88 --- /dev/null +++ b/include/ouroboros/shm_rbuff.h @@ -0,0 +1,53 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for incoming SDUs + * + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SHM_RBUFF_H +#define OUROBOROS_SHM_RBUFF_H + +#include +#include + +struct shm_rbuff; + +struct shm_rbuff * shm_rbuff_create(int port_id); + +struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id); + +void shm_rbuff_close(struct shm_rbuff * rb); + +void shm_rbuff_destroy(struct shm_rbuff * rb); + +int shm_rbuff_block(struct shm_rbuff * rb); + +void shm_rbuff_unblock(struct shm_rbuff * rb); + +int shm_rbuff_write(struct shm_rbuff * rb, + ssize_t idx); + +ssize_t shm_rbuff_read(struct shm_rbuff * rb); + +ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, + const struct timespec * timeout); + +void shm_rbuff_reset(struct shm_rbuff * rb); + +#endif /* OUROBOROS_SHM_RBUFF_H */ diff --git a/include/ouroboros/wrap/ouroboros.i b/include/ouroboros/wrap/ouroboros.i index 394b505a..26cc6076 100644 --- a/include/ouroboros/wrap/ouroboros.i +++ b/include/ouroboros/wrap/ouroboros.i @@ -26,11 +26,11 @@ #include "ouroboros/dev.h" #include "ouroboros/errno.h" #include "ouroboros/fcntl.h" +#include "ouroboros/fqueue.h" #include "ouroboros/irm.h" #include "ouroboros/irm_config.h" #include "ouroboros/nsm.h" #include "ouroboros/qos.h" -#include "ouroboros/select.h" %} typedef int pid_t; @@ -39,8 +39,8 @@ typedef int pid_t; %include "ouroboros/dev.h" %include "ouroboros/errno.h" %include "ouroboros/fcntl.h" +%include "ouroboros/fqueue.h" %include "ouroboros/irm.h" %include "ouroboros/irm_config.h" %include "ouroboros/nsm.h" %include "ouroboros/qos.h" -%include "ouroboros/select.h" diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a9f80ee7..f9246c7a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -323,9 +323,9 @@ void * ipcp_main_loop(void * o) ret_msg.has_result = true; ret_msg.result = ipcpi.ops->ipcp_flow_alloc(fd, - msg->dst_name, - msg->src_ae_name, - msg->qos_cube); + msg->dst_name, + msg->src_ae_name, + msg->qos_cube); if (ret_msg.result < 0) { LOG_DBG("Deallocate failed on port_id %d.", msg->port_id); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 4e500a8a..68c9ae8c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include #define OUROBOROS_PREFIX "ipcpd/local" @@ -39,6 +39,7 @@ #include #include +#define EVENT_WAIT_TIMEOUT 100 /* us */ #define THIS_TYPE IPCP_LOCAL /* global for trapping signal */ @@ -46,18 +47,25 @@ int irmd_api; struct { int in_out[IRMD_MAX_FLOWS]; + flow_set_t * flows; pthread_rwlock_t lock; pthread_t sduloop; } local_data; -void local_data_init() +int local_data_init() { int i; for (i = 0; i < IRMD_MAX_FLOWS; ++i) local_data.in_out[i] = -1; + local_data.flows = flow_set_create(); + if (local_data.flows == NULL) + return -ENFILE; + pthread_rwlock_init(&local_data.lock, NULL); + + return 0; } void local_data_fini() @@ -67,11 +75,24 @@ void local_data_fini() static void * ipcp_local_sdu_loop(void * o) { + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { int fd; - struct rb_entry * e; + int ret; + ssize_t idx; + + ret = flow_event_wait(local_data.flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = flow_select(NULL, NULL); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o) pthread_rwlock_rdlock(&local_data.lock); - e = local_flow_read(fd); + while ((fd = fqueue_next(fq)) >= 0) { + idx = local_flow_read(fd); - fd = local_data.in_out[fd]; + fd = local_data.in_out[fd]; - if (fd != -1) - local_flow_write(fd, e); + if (fd != -1) + local_flow_write(fd, idx); + } pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); - - free(e); } - return (void *) 1; + return (void *) 0; } void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name) if (ipcp_data_add_reg_entry(ipcpi.data, name)) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Failed to add %s to local registry.", name); + LOG_DBG("Failed to add %s to local registry.", name); return -1; } @@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int fd, if (ipcp_get_state() != IPCP_ENROLLED) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Won't register with non-enrolled IPCP."); + LOG_DBG("Won't register with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } pthread_rwlock_wrlock(&local_data.lock); + flow_set_add(local_data.flows, fd); + out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); local_data.in_out[fd] = out_fd; @@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) return 0; pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&local_data.lock); out_fd = local_data.in_out[fd]; if (out_fd < 0) { @@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(local_data.flows, fd); + + pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) @@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd) if (fd < 0) return -EINVAL; + flow_set_del(local_data.flows, fd); + while (flow_dealloc(fd) == -EBUSY) nanosleep(&t, NULL); @@ -289,9 +318,14 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - local_data_init(); - if (ap_init(NULL) < 0) { + LOG_ERR("Failed to init application."); + close_logfile(); + exit(EXIT_FAILURE); + } + + if (local_data_init() < 0) { + LOG_ERR("Failed to init local data."); close_logfile(); exit(EXIT_FAILURE); } @@ -331,10 +365,10 @@ int main(int argc, char * argv[]) pthread_cancel(local_data.sduloop); pthread_join(local_data.sduloop, NULL); - ap_fini(); - local_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8c627641..2800dcb2 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include #include @@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o) struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct np1_flow * flow; + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.np1_set, &timeout); - if (fd == -ETIMEDOUT) + int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to retrieve flow."); - continue; - } + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + + flow = fmgr.np1_flows[fd]; + if (flow == NULL) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to retrieve flow."); + continue; + } + + if (frct_i_write_sdu(flow->cep_id, sdb)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to hand SDU to FRCT."); + continue; + } - if (frct_i_write_sdu(flow->cep_id, sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to hand SDU to FRCT."); - continue; - } - pthread_rwlock_unlock(&fmgr.np1_flows_lock); + } } return (void *) 0; @@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o) struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct shm_du_buff * sdb; struct pci * pci; - + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.nm1_set, &timeout); - if (fd == -ETIMEDOUT) - continue; - - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - pci = shm_pci_des(sdb); - if (pci == NULL) { - LOG_ERR("Failed to get PCI."); - ipcp_flow_del(sdb); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - if (pci->dst_addr != ribmgr_address()) { - LOG_DBG("PDU needs to be forwarded."); + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } - if (pci->ttl == 0) { - LOG_DBG("TTL was zero."); + if (pci->dst_addr != ribmgr_address()) { + LOG_DBG("PDU needs to be forwarded."); + + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ ipcp_flow_del(sdb); free(pci); continue; } - if (shm_pci_dec_ttl(sdb)) { - LOG_ERR("Failed to decrease TTL."); + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); ipcp_flow_del(sdb); free(pci); continue; } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_shrink(sdb)) { - LOG_ERR("Failed to shrink PDU."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - if (frct_nm1_post_sdu(pci, sdb)) { - LOG_ERR("Failed to hand PDU to FRCT."); - ipcp_flow_del(sdb); - free(pci); - continue; + if (frct_nm1_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } } } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 399d3dc8..db258c8b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -30,6 +30,8 @@ #include #include #include +#include +#include #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc" @@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t; #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \ + SHIM_ETH_LLC_MAX_SDU_SIZE) +#define EVENT_WAIT_TIMEOUT 100 /* us */ + /* global for trapping signal */ int irmd_api; @@ -110,6 +114,7 @@ struct { uint8_t * tx_ring; int tx_offset; #endif + flow_set_t * np1_flows; int * ef_to_fd; struct ef * fd_to_ef; pthread_rwlock_t flows_lock; @@ -139,6 +144,14 @@ static int eth_llc_data_init() return -ENOMEM; } + eth_llc_data.np1_flows = flow_set_create(); + if (eth_llc_data.np1_flows == NULL) { + bmp_destroy(eth_llc_data.saps); + free(eth_llc_data.ef_to_fd); + free(eth_llc_data.fd_to_ef); + return -ENOMEM; + } + for (i = 0; i < MAX_SAPS; ++i) eth_llc_data.ef_to_fd[i] = -1; @@ -156,6 +169,7 @@ static int eth_llc_data_init() void eth_llc_data_fini() { bmp_destroy(eth_llc_data.saps); + flow_set_destroy(eth_llc_data.np1_flows); free(eth_llc_data.fd_to_ef); free(eth_llc_data.ef_to_fd); pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr) return 0; } - bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); - pthread_rwlock_unlock(ð_llc_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); - - LOG_DBG("Flow with fd %d deallocated.", fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); return 0; } static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr) { - shim_eth_llc_msg_t * msg = NULL; - - msg = shim_eth_llc_msg__unpack(NULL, len, buf); + shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf); if (msg == NULL) { LOG_ERR("Failed to unpack."); return -1; @@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o) static void * eth_llc_ipcp_sdu_writer(void * o) { + int fd; + struct shm_du_buff * sdb; + uint8_t ssap; + uint8_t dsap; + uint8_t r_addr[MAC_SIZE]; + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { - int fd; - struct shm_du_buff * sdb; - uint8_t ssap; - uint8_t dsap; - uint8_t r_addr[MAC_SIZE]; - - fd = ipcp_read_shim(&sdb); - if (fd < 0) + int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(ð_llc_data.flows_lock); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } - ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); - dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); - memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); + dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); + memcpy(r_addr, + eth_llc_data.fd_to_ef[fd].r_addr, + MAC_SIZE); + + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - eth_llc_ipcp_send_frame(r_addr, dsap, ssap, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - - shm_du_buff_head(sdb)); - ipcp_flow_del(sdb); + eth_llc_ipcp_send_frame(r_addr, dsap, ssap, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) + - shm_du_buff_head(sdb)); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int fd, uint8_t ssap = 0; uint8_t r_addr[MAC_SIZE]; - LOG_INFO("Allocating flow to %s.", dst_name); + LOG_DBG("Allocating flow to %s.", dst_name); if (dst_name == NULL || src_ae_name == NULL) return -1; @@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int fd, return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap); return 0; @@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap); return 0; @@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) static int eth_llc_ipcp_flow_dealloc(int fd) { + struct timespec t = {0, 10000}; + uint8_t sap; uint8_t r_sap; uint8_t addr[MAC_SIZE]; int ret; + flow_set_del(eth_llc_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(ð_llc_data.flows_lock); @@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd) if (ret < 0) LOG_DBG("Could not notify remote."); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1008,10 +1042,12 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - if (eth_llc_data_init() < 0) + if (ap_init(NULL) < 0) { + close_logfile(); exit(EXIT_FAILURE); + } - if (ap_init(NULL) < 0) { + if (eth_llc_data_init() < 0) { close_logfile(); exit(EXIT_FAILURE); } @@ -1054,10 +1090,10 @@ int main(int argc, char * argv[]) pthread_join(eth_llc_data.sdu_writer, NULL); pthread_join(eth_llc_data.sdu_reader, NULL); - ap_fini(); - eth_llc_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 7c109a8a..050623e4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -27,6 +27,9 @@ #include #include #include +#include +#include +#include #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -75,6 +78,7 @@ struct { struct sockaddr_in s_saddr; int s_fd; + flow_set_t * np1_flows; fd_set flow_fd_s; /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ int uf_to_fd[FD_SETSIZE]; @@ -90,7 +94,7 @@ struct { pthread_mutex_t fd_set_lock; } udp_data; -static void udp_data_init() +static int udp_data_init() { int i; @@ -104,13 +108,21 @@ static void udp_data_init() FD_ZERO(&udp_data.flow_fd_s); + udp_data.np1_flows = flow_set_create(); + if (udp_data.np1_flows == NULL) + return -ENOMEM; + pthread_rwlock_init(&udp_data.flows_lock, NULL); pthread_cond_init(&udp_data.fd_set_cond, NULL); pthread_mutex_init(&udp_data.fd_set_lock, NULL); + + return 0; } static void udp_data_fini() { + flow_set_destroy(udp_data.np1_flows); + pthread_rwlock_destroy(&udp_data.flows_lock); pthread_mutex_destroy(&udp_data.fd_set_lock); pthread_cond_destroy(&udp_data.fd_set_cond); @@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port) pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); close(skfd); @@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader() static void * ipcp_udp_sdu_loop(void * o) { + int fd; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; + struct shm_du_buff * sdb; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd; - struct shm_du_buff * sdb; + int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = ipcp_read_shim(&sdb); - if (fd < 0) + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); continue; + } - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } - fd = udp_data.fd_to_uf[fd].skfd; + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + fd = udp_data.fd_to_uf[fd].skfd; + + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (send(fd, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - 0) < 0) - LOG_ERR("Failed to send SDU."); + if (send(fd, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + 0) < 0) + LOG_ERR("Failed to send SDU."); - ipcp_flow_del(sdb); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd, udp_data.fd_to_uf[fd].skfd = skfd; udp_data.uf_to_fd[skfd] = fd; + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response) set_fd(skfd); + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd) { int skfd = -1; int remote_udp = -1; + struct timespec t = {0, 10000}; struct sockaddr_in r_saddr; socklen_t r_saddr_len = sizeof(r_saddr); + flow_set_del(udp_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd) close(skfd); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1149,13 +1184,16 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - udp_data_init(); - if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } + if (udp_data_init() < 0) { + close_logfile(); + exit(EXIT_FAILURE); + } + /* store the process id of the irmd */ irmd_api = atoi(argv[1]); @@ -1196,10 +1234,10 @@ int main(int argc, char * argv[]) pthread_join(udp_data.handler, NULL); pthread_join(udp_data.sdu_reader, NULL); - ap_fini(); - udp_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index f79e6caf..33f7650a 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -111,6 +111,7 @@ pid_t ipcp_create(enum ipcp_type ipcp_type) char * full_name = NULL; char * exec_name = NULL; char * log_file = NULL; + char * argv[4]; sprintf(irmd_api, "%u", getpid()); @@ -161,14 +162,12 @@ pid_t ipcp_create(enum ipcp_type ipcp_type) } /* log_file to be placed at the end */ - char * argv[] = {full_name, - irmd_api, - log_file, - 0}; + argv[0] = full_name; + argv[1] = irmd_api; + argv[2] = log_file; + argv[3] = NULL; - char * envp[] = {0}; - - execve(argv[0], &argv[0], envp); + execv(argv[0], &argv[0]); LOG_DBG("%s", strerror(errno)); LOG_ERR("Failed to load IPCP daemon"); diff --git a/src/irmd/main.c b/src/irmd/main.c index 157fd8eb..67941e41 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include #include #include @@ -1692,26 +1692,26 @@ void * irm_sanitize() } if (kill(f->n_api, 0) < 0) { - struct shm_ap_rbuff * rb = - shm_ap_rbuff_open(f->n_api); + struct shm_rbuff * rb = + shm_rbuff_open(f->n_api, f->port_id); bmp_release(irmd->port_ids, f->port_id); list_del(&f->next); LOG_INFO("AP-I %d gone, flow %d deallocated.", f->n_api, f->port_id); ipcp_flow_dealloc(f->n_1_api, f->port_id); if (rb != NULL) - shm_ap_rbuff_destroy(rb); + shm_rbuff_destroy(rb); irm_flow_destroy(f); continue; } if (kill(f->n_1_api, 0) < 0) { - struct shm_ap_rbuff * rb = - shm_ap_rbuff_open(f->n_1_api); + struct shm_rbuff * rb = + shm_rbuff_open(f->n_1_api, f->port_id); list_del(&f->next); LOG_ERR("IPCP %d gone, flow %d removed.", f->n_1_api, f->port_id); if (rb != NULL) - shm_ap_rbuff_destroy(rb); + shm_rbuff_destroy(rb); irm_flow_destroy(f); } } diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index b94d0eea..20ea473d 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -35,7 +35,8 @@ set(SOURCE_FILES lockfile.c logs.c nsm.c - shm_ap_rbuff.c + shm_flow_set.c + shm_rbuff.c shm_rdrbuff.c sockets.c time_utils.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 77c2d06a..f735e72b 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -3,7 +3,8 @@ * * API for applications * - * Sander Vrijders + * Dimitri Staessens + * Sander Vrijders * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -26,20 +27,24 @@ #include #include #include +#include #include -#include +#include #include -#include +#include #include #include #include struct flow_set { - bool dirty; - bool b[IRMD_MAX_FLOWS]; /* working copy */ - bool s[IRMD_MAX_FLOWS]; /* safe copy */ - pthread_rwlock_t lock; + size_t idx; +}; + +struct fqueue { + int fqueue[SHM_BUFFER_SIZE]; /* safe copy from shm */ + size_t fqsize; + size_t next; }; enum port_state { @@ -124,7 +129,9 @@ enum port_state port_wait_assign(struct port * p) } struct flow { - struct shm_ap_rbuff * rb; + struct shm_rbuff * rx_rb; + struct shm_rbuff * tx_rb; + struct shm_flow_set * set; int port_id; int oflags; @@ -139,10 +146,11 @@ struct { pid_t api; struct shm_rdrbuff * rdrb; - struct shm_ap_rbuff * rb; + struct shm_flow_set * fqset; pthread_rwlock_t data_lock; struct bmp * fds; + struct bmp * fqueues; struct flow * flows; struct port * ports; @@ -194,40 +202,52 @@ int ap_init(char * ap_name) if (ai.fds == NULL) return -ENOMEM; - ai.rdrb = shm_rdrbuff_open(); - if (ai.rdrb == NULL) { + ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0); + if (ai.fqueues == NULL) { + bmp_destroy(ai.fds); + return -ENOMEM; + } + + ai.fqset = shm_flow_set_create(); + if (ai.fqset == NULL) { + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } - ai.rb = shm_ap_rbuff_create(); - if (ai.rb == NULL) { - shm_rdrbuff_close(ai.rdrb); + ai.rdrb = shm_rdrbuff_open(); + if (ai.rdrb == NULL) { + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS); if (ai.flows == NULL) { - shm_ap_rbuff_destroy(ai.rb); shm_rdrbuff_close(ai.rdrb); + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai.flows[i].rb = NULL; + ai.flows[i].rx_rb = NULL; + ai.flows[i].tx_rb = NULL; + ai.flows[i].set = NULL; ai.flows[i].port_id = -1; - ai.flows[i].oflags = 0; - ai.flows[i].api = -1; + ai.flows[i].oflags = 0; + ai.flows[i].api = -1; ai.flows[i].timeout = NULL; } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); - if (ai.flows == NULL) { + if (ai.ports == NULL) { free(ai.flows); - shm_ap_rbuff_destroy(ai.rb); shm_rdrbuff_close(ai.rdrb); + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } @@ -253,16 +273,10 @@ void ap_fini() pthread_rwlock_wrlock(&ai.data_lock); - /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_pop_idx(ai.rb)) >= 0) - shm_rdrbuff_remove(ai.rdrb, i); - - if (ai.fds != NULL) - bmp_destroy(ai.fds); - if (ai.rb != NULL) - shm_ap_rbuff_destroy(ai.rb); - if (ai.rdrb != NULL) - shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); + bmp_destroy(ai.fqueues); + shm_flow_set_destroy(ai.fqset); + shm_rdrbuff_close(ai.rdrb); if (ai.daf_name != NULL) free(ai.daf_name); @@ -270,8 +284,15 @@ void ap_fini() pthread_rwlock_rdlock(&ai.flows_lock); for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (ai.flows[i].rb != NULL) - shm_ap_rbuff_close(ai.flows[i].rb); + if (ai.flows[i].tx_rb != NULL) { + int idx; + while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) + shm_rdrbuff_remove(ai.rdrb, idx); + shm_rbuff_destroy(ai.flows[i].rx_rb); + shm_rbuff_close(ai.flows[i].tx_rb); + shm_flow_set_close(ai.flows[i].set); + } + if (ai.flows[i].timeout != NULL) free(ai.flows[i].timeout); } @@ -328,8 +349,8 @@ int flow_accept(char ** ae_name) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -337,10 +358,24 @@ int flow_accept(char ** ae_name) return -1; } + ai.flows[fd].set = shm_flow_set_open(recv_msg->api); + if (ai.flows[fd].set == NULL) { + bmp_release(ai.fds, fd); + shm_rbuff_destroy(ai.flows[fd].rx_rb); + shm_rbuff_close(ai.flows[fd].tx_rb); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + if (ae_name != NULL) { *ae_name = strdup(recv_msg->ae_name); if (*ae_name == NULL) { - shm_ap_rbuff_close(ai.flows[fd].rb); + shm_rbuff_destroy(ai.flows[fd].tx_rb); + shm_rbuff_close(ai.flows[fd].tx_rb); + shm_flow_set_close(ai.flows[fd].set); bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -356,8 +391,6 @@ int flow_accept(char ** ae_name) ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -410,6 +443,17 @@ int flow_alloc_resp(int fd, int response) ret = recv_msg->result; + pthread_rwlock_wrlock(&ai.flows_lock); + + ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, + ai.flows[fd].port_id); + if (ai.flows[fd].tx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -461,8 +505,11 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].port_id = recv_msg->port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = recv_msg->api; + ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -470,9 +517,26 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) return -1; } - ai.flows[fd].port_id = recv_msg->port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; - ai.flows[fd].api = recv_msg->api; + ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); + if (ai.flows[fd].tx_rb == NULL) { + shm_rbuff_destroy(ai.flows[fd].rx_rb); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ai.flows[fd].set = shm_flow_set_open(recv_msg->api); + if (ai.flows[fd].set == NULL) { + shm_rbuff_close(ai.flows[fd].tx_rb); + shm_rbuff_destroy(ai.flows[fd].rx_rb); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -480,8 +544,6 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); - irm_msg__free_unpacked(recv_msg, NULL); return fd; @@ -548,7 +610,7 @@ int flow_dealloc(int fd) return -ENOTALLOC; } - if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) { + if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EBUSY; @@ -559,8 +621,10 @@ int flow_dealloc(int fd) port_destroy(&ai.ports[msg.port_id]); ai.flows[fd].port_id = -1; - shm_ap_rbuff_close(ai.flows[fd].rb); - ai.flows[fd].rb = NULL; + shm_rbuff_destroy(ai.flows[fd].rx_rb); + ai.flows[fd].rx_rb = NULL; + shm_rbuff_close(ai.flows[fd].tx_rb); + ai.flows[fd].tx_rb = NULL; ai.flows[fd].oflags = 0; ai.flows[fd].api = -1; if (ai.flows[fd].timeout != NULL) { @@ -604,9 +668,9 @@ int flow_cntl(int fd, int cmd, int oflags) case FLOW_F_SETFL: /* SET FLOW FLAGS */ ai.flows[fd].oflags = oflags; if (oflags & FLOW_O_WRONLY) - shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id); + shm_rbuff_block(ai.flows[fd].rx_rb); if (oflags & FLOW_O_RDWR) - shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id); + shm_rbuff_unblock(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return old; @@ -620,7 +684,6 @@ int flow_cntl(int fd, int cmd, int oflags) ssize_t flow_write(int fd, void * buf, size_t count) { ssize_t idx; - struct rb_entry e; if (buf == NULL) return 0; @@ -653,13 +716,10 @@ ssize_t flow_write(int fd, void * buf, size_t count) if (idx < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return -idx; + return idx; } - e.index = idx; - e.port_id = ai.flows[fd].port_id; - - if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -667,7 +727,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) } } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; - pid_t api = ai.flows[fd].api; + pid_t api = ai.flows[fd].api; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -681,17 +741,16 @@ ssize_t flow_write(int fd, void * buf, size_t count) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - e.index = idx; - e.port_id = ai.flows[fd].port_id; - - if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { - shm_rdrbuff_remove(ai.rdrb, e.index); + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } } + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -717,15 +776,14 @@ ssize_t flow_read(int fd, void * buf, size_t count) } if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.flows_lock); } else { - struct shm_ap_rbuff * rb = ai.rb; - int port_id = ai.flows[fd].port_id; - struct timespec * timeout = ai.flows[fd].timeout; + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + struct timespec * timeout = ai.flows[fd].timeout; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); + idx = shm_rbuff_read_b(rb, timeout); pthread_rwlock_rdlock(&ai.data_lock); } @@ -757,79 +815,163 @@ struct flow_set * flow_set_create() if (set == NULL) return NULL; - if (pthread_rwlock_init(&set->lock, NULL)) { + assert(ai.fqueues); + + set->idx = bmp_allocate(ai.fqueues); + if (!bmp_is_id_valid(ai.fqueues, set->idx)) { free(set); return NULL; } - memset(set->b, 0, IRMD_MAX_FLOWS); - memset(set->s, 0, IRMD_MAX_FLOWS); + return set; +} - set->dirty = true; +void flow_set_destroy(struct flow_set * set) +{ + if (set == NULL) + return; - return set; + flow_set_zero(set); + bmp_release(ai.fqueues, set->idx); + free(set); } -void flow_set_zero(struct flow_set * set) +struct fqueue * fqueue_create() { - pthread_rwlock_wrlock(&set->lock); - memset(set->b, 0, IRMD_MAX_FLOWS); - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + struct fqueue * fq = malloc(sizeof(*fq)); + if (fq == NULL) + return NULL; + + memset(fq->fqueue, -1, SHM_BUFFER_SIZE); + fq->fqsize = 0; + fq->next = 0; + + return fq; } -void flow_set_add(struct flow_set * set, int fd) +void fqueue_destroy(struct fqueue * fq) { - pthread_rwlock_wrlock(&set->lock); - set->b[ai.flows[fd].port_id] = true; - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + if (fq == NULL) + return + free(fq); } -void flow_set_del(struct flow_set * set, int fd) +void flow_set_zero(struct flow_set * set) { - pthread_rwlock_wrlock(&set->lock); - set->b[ai.flows[fd].port_id] = false; - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + if (set == NULL) + return; + + pthread_rwlock_rdlock(&ai.data_lock); + + shm_flow_set_zero(ai.fqset, set->idx); + + pthread_rwlock_unlock(&ai.data_lock); } -bool flow_set_has(struct flow_set * set, int fd) +int flow_set_add(struct flow_set * set, int fd) { - bool ret; - pthread_rwlock_rdlock(&set->lock); - ret = set->b[ai.flows[fd].port_id]; - pthread_rwlock_unlock(&set->lock); + int ret; + + if (set == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return ret; } -void flow_set_destroy(struct flow_set * set) +void flow_set_del(struct flow_set * set, int fd) { - pthread_rwlock_destroy(&set->lock); - free(set); + if (set == NULL) + return; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].port_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); } -static void flow_set_cpy(struct flow_set * set) +bool flow_set_has(struct flow_set * set, int fd) { - pthread_rwlock_rdlock(&set->lock); - if (set->dirty) - memcpy(set->s, set->b, IRMD_MAX_FLOWS); - set->dirty = false; - pthread_rwlock_unlock(&set->lock); + bool ret = false; + + if (set == NULL || fd < 0) + return false; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return false; + } + + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return ret; } -int flow_select(struct flow_set * set, const struct timespec * timeout) +int fqueue_next(struct fqueue * fq) { - int port_id; - if (set == NULL) { - port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout); - } else { - flow_set_cpy(set); - port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout); + int fd; + + if (fq == NULL) + return -EINVAL; + + if (fq->next == fq->fqsize) { + fq->fqsize = 0; + fq->next = 0; + return -EPERM; } - if (port_id < 0) - return port_id; - return ai.ports[port_id].fd; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + fd = ai.ports[fq->fqueue[fq->next++]].fd; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int flow_event_wait(struct flow_set * set, + struct fqueue * fq, + const struct timespec * timeout) +{ + int ret; + + if (set == NULL) + return -EINVAL; + + if (fq->fqsize > 0) + return 0; + + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (ret == -ETIMEDOUT) + return -ETIMEDOUT; + + if (ret < 0) + return ret; + + fq->fqsize = ret; + fq->next = 0; + + return 0; } /* ipcp-dev functions */ @@ -848,8 +990,8 @@ int np1_flow_alloc(pid_t n_api, int port_id) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(n_api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].rx_rb = shm_rbuff_create(port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -863,8 +1005,6 @@ int np1_flow_alloc(pid_t n_api, int port_id) ai.ports[port_id].fd = fd; port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); - shm_ap_rbuff_open_port(ai.rb, port_id); - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -890,7 +1030,6 @@ int np1_flow_dealloc(int port_id) int np1_flow_resp(pid_t n_api, int port_id) { int fd; - struct shm_ap_rbuff * rb; port_wait_assign(&ai.ports[port_id]); @@ -904,18 +1043,26 @@ int np1_flow_resp(pid_t n_api, int port_id) return fd; } - rb = shm_ap_rbuff_open(n_api); - if (rb == NULL) { + ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); + if (ai.flows[fd].tx_rb == NULL) { ai.flows[fd].port_id = -1; + shm_rbuff_destroy(ai.flows[fd].rx_rb); port_destroy(&ai.ports[port_id]); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -1; } - ai.flows[fd].rb = rb; - - shm_ap_rbuff_open_port(ai.rb, port_id); + ai.flows[fd].set = shm_flow_set_open(n_api); + if (ai.flows[fd].set == NULL) { + shm_rbuff_close(ai.flows[fd].tx_rb); + ai.flows[fd].port_id = -1; + shm_rbuff_destroy(ai.flows[fd].rx_rb); + port_destroy(&ai.ports[port_id]); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -929,9 +1076,9 @@ int ipcp_create_r(pid_t api) irm_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.has_api = true; - msg.api = api; + msg.code = IRM_MSG_CODE__IPCP_CREATE_R; + msg.has_api = true; + msg.api = api; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) @@ -958,11 +1105,11 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) if (dst_name == NULL || src_ae_name == NULL) return -EINVAL; - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_api = true; - msg.api = api; - msg.dst_name = dst_name; - msg.ae_name = src_ae_name; + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_api = true; + msg.api = api; + msg.dst_name = dst_name; + msg.ae_name = src_ae_name; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_wrlock(&ai.flows_lock); @@ -974,7 +1121,7 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) return -1; /* -ENOMOREFDS */ } - ai.flows[fd].rb = NULL; + ai.flows[fd].tx_rb = NULL; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -996,8 +1143,16 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_wrlock(&ai.flows_lock); + ai.flows[fd].rx_rb = shm_rbuff_create(port_id); + if (ai.flows[fd].rx_rb == NULL) { + ai.flows[fd].port_id = -1; + port_destroy(&ai.ports[port_id]); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + ai.flows[fd].port_id = port_id; - ai.flows[fd].rb = NULL; ai.ports[port_id].fd = fd; port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); @@ -1019,16 +1174,13 @@ int ipcp_flow_alloc_reply(int fd, int response) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - msg.port_id = ai.flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); msg.has_response = true; msg.response = response; - if (response) - shm_ap_rbuff_open_port(ai.rb, msg.port_id); - recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -1039,6 +1191,26 @@ int ipcp_flow_alloc_reply(int fd, int response) } ret = recv_msg->result; + + pthread_rwlock_wrlock(&ai.flows_lock); + + ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, + ai.flows[fd].port_id); + if (ai.flows[fd].tx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); + if (ai.flows[fd].set == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + pthread_rwlock_unlock(&ai.flows_lock); + irm_msg__free_unpacked(recv_msg, NULL); return ret; @@ -1061,7 +1233,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_ap_rbuff_read_port(ai.rb, port_id); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); if (idx < 0) { pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1081,7 +1253,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - struct rb_entry e; + ssize_t idx; if (sdb == NULL) return -EINVAL; @@ -1095,16 +1267,16 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return -EPERM; } - if (ai.flows[fd].rb == NULL) { + if (ai.flows[fd].tx_rb == NULL) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EPERM; } - e.index = shm_du_buff_get_idx(sdb); - e.port_id = ai.flows[fd].port_id; + idx = shm_du_buff_get_idx(sdb); - shm_ap_rbuff_write(ai.flows[fd].rb, &e); + shm_rbuff_write(ai.flows[fd].tx_rb, idx); + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1112,46 +1284,28 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return 0; } -struct rb_entry * local_flow_read(int fd) +ssize_t local_flow_read(int fd) { - int port_id; - struct rb_entry * e = NULL; - - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_rdlock(&ai.flows_lock); - - port_id = ai.flows[fd].port_id; - - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - - if (port_id != -1) { - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; - e->index = shm_ap_rbuff_read_port(ai.rb, port_id); - } - - return e; + return shm_rbuff_read(ai.flows[fd].rx_rb); } -int local_flow_write(int fd, struct rb_entry * e) +int local_flow_write(int fd, ssize_t idx) { - if (e == NULL || fd < 0) + if (fd < 0) return -EINVAL; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].rb == NULL) { + if (ai.flows[fd].tx_rb == NULL) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EPERM; } - e->port_id = ai.flows[fd].port_id; + shm_rbuff_write(ai.flows[fd].tx_rb, idx); - shm_ap_rbuff_write(ai.flows[fd].rb, e); + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1159,22 +1313,26 @@ int local_flow_write(int fd, struct rb_entry * e) return 0; } -int ipcp_read_shim(struct shm_du_buff ** sdb) +int ipcp_read_shim(int fd, struct shm_du_buff ** sdb) { - int fd; - struct rb_entry * e = shm_ap_rbuff_read(ai.rb); + ssize_t idx; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - fd = ai.ports[e->port_id].fd; + if (ai.flows[fd].rx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } - *sdb = shm_rdrbuff_get(ai.rdrb, e->index); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return fd; + return 0; } void ipcp_flow_del(struct shm_du_buff * sdb) diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c index 04ce9324..a0222f18 100644 --- a/src/lib/lockfile.c +++ b/src/lib/lockfile.c @@ -39,10 +39,10 @@ struct lockfile { pid_t * api; - int fd; }; struct lockfile * lockfile_create() { + int fd; mode_t mask; struct lockfile * lf = malloc(sizeof(*lf)); if (lf == NULL) @@ -50,8 +50,8 @@ struct lockfile * lockfile_create() { mask = umask(0); - lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); - if (lf->fd == -1) { + fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); + if (fd == -1) { LOG_DBGF("Could not create lock file."); free(lf); return NULL; @@ -59,30 +59,24 @@ struct lockfile * lockfile_create() { umask(mask); - if (ftruncate(lf->fd, LF_SIZE - 1) < 0) { + if (ftruncate(fd, LF_SIZE - 1) < 0) { LOG_DBGF("Failed to extend lockfile."); free(lf); return NULL; } -#ifndef __APPLE__ - if (write(lf->fd, "", 1) != 1) { - LOG_DBGF("Failed to finalise lockfile."); - free(lf); - return NULL; - } -#endif + lf->api = mmap(NULL, LF_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, - lf->fd, + fd, 0); + close (fd); + if (lf->api == MAP_FAILED) { LOG_DBGF("Failed to map lockfile."); - if (shm_unlink(LOCKFILE_NAME) == -1) LOG_DBGF("Failed to remove invalid lockfile."); - free(lf); return NULL; } @@ -93,12 +87,13 @@ struct lockfile * lockfile_create() { } struct lockfile * lockfile_open() { + int fd; struct lockfile * lf = malloc(sizeof(*lf)); if (lf == NULL) return NULL; - lf->fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); - if (lf->fd < 0) { + fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); + if (fd < 0) { LOG_DBGF("Could not open lock file."); free(lf); return NULL; @@ -107,15 +102,15 @@ struct lockfile * lockfile_open() { lf->api = mmap(NULL, LF_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, - lf->fd, + fd, 0); + close(fd); + if (lf->api == MAP_FAILED) { LOG_DBGF("Failed to map lockfile."); - if (shm_unlink(LOCKFILE_NAME) == -1) LOG_DBGF("Failed to remove invalid lockfile."); - free(lf); return NULL; } @@ -130,9 +125,6 @@ void lockfile_close(struct lockfile * lf) return; } - if (close(lf->fd) < 0) - LOG_DBGF("Couldn't close lockfile."); - if (munmap(lf->api, LF_SIZE) == -1) LOG_DBGF("Couldn't unmap lockfile."); @@ -151,9 +143,6 @@ void lockfile_destroy(struct lockfile * lf) return; } - if (close(lf->fd) < 0) - LOG_DBGF("Couldn't close lockfile."); - if (munmap(lf->api, LF_SIZE) == -1) LOG_DBGF("Couldn't unmap lockfile."); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c deleted file mode 100644 index 5cbf5bd0..00000000 --- a/src/lib/shm_ap_rbuff.c +++ /dev/null @@ -1,661 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Ring buffer for application processes - * - * Dimitri Staessens - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include -#include -#include -#include -#include - -#define OUROBOROS_PREFIX "shm_ap_rbuff" - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define FN_MAX_CHARS 255 - -#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ - + IRMD_MAX_FLOWS * sizeof(int8_t) \ - + IRMD_MAX_FLOWS * sizeof (ssize_t) \ - + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ - + 2 * sizeof (pthread_cond_t)) - -#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \ - & (SHM_BUFFER_SIZE - 1)) -#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) -#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) - -struct shm_ap_rbuff { - struct rb_entry * shm_base; /* start of entry */ - size_t * head; /* start of ringbuffer head */ - size_t * tail; /* start of ringbuffer tail */ - int8_t * acl; /* start of port_id access table */ - ssize_t * cntrs; /* start of port_id counters */ - pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * add; /* SDU arrived */ - pthread_cond_t * del; /* SDU removed */ - pid_t api; /* api to which this rb belongs */ - int fd; -}; - -struct shm_ap_rbuff * shm_ap_rbuff_create() -{ - struct shm_ap_rbuff * rb; - int shm_fd; - struct rb_entry * shm_base; - pthread_mutexattr_t mattr; - pthread_condattr_t cattr; - char fn[FN_MAX_CHARS]; - mode_t mask; - int i; - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); - - rb = malloc(sizeof(*rb)); - if (rb == NULL) { - LOG_DBG("Could not allocate struct."); - return NULL; - } - - mask = umask(0); - - shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); - if (shm_fd == -1) { - LOG_DBG("Failed creating ring buffer."); - free(rb); - return NULL; - } - - umask(mask); - - if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { - LOG_DBG("Failed to extend ringbuffer."); - free(rb); - return NULL; - } -#ifndef __APPLE__ - if (write(shm_fd, "", 1) != 1) { - LOG_DBG("Failed to finalise extension of ringbuffer."); - free(rb); - return NULL; - } -#endif - shm_base = mmap(NULL, - SHM_RBUFF_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - - if (shm_base == MAP_FAILED) { - LOG_DBG("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to remove invalid shm."); - - free(rb); - return NULL; - } - - rb->shm_base = shm_base; - rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->tail = rb->head + 1; - rb->acl = (int8_t *) (rb->tail + 1); - rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); - rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); - rb->add = (pthread_cond_t *) (rb->lock + 1); - rb->del = rb->add + 1; - - pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ - pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); -#endif - pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(rb->lock, &mattr); - - pthread_condattr_init(&cattr); - pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - for (i = 0; i < IRMD_MAX_FLOWS; ++i) { - rb->cntrs[i] = 0; - rb->acl[i] = -1; - } - - pthread_cond_init(rb->add, &cattr); - pthread_cond_init(rb->del, &cattr); - - *rb->head = 0; - *rb->tail = 0; - - rb->fd = shm_fd; - rb->api = getpid(); - - return rb; -} - -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) -{ - struct shm_ap_rbuff * rb; - int shm_fd; - struct rb_entry * shm_base; - char fn[FN_MAX_CHARS]; - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); - - rb = malloc(sizeof(*rb)); - if (rb == NULL) { - LOG_DBG("Could not allocate struct."); - return NULL; - } - - shm_fd = shm_open(fn, O_RDWR, 0666); - if (shm_fd == -1) { - LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); - free(rb); - return NULL; - } - - shm_base = mmap(NULL, - SHM_RBUFF_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - - if (shm_base == MAP_FAILED) { - LOG_DBG("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to remove invalid shm."); - - free(rb); - return NULL; - } - - rb->shm_base = shm_base; - rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->tail = rb->head + 1; - rb->acl = (int8_t *) (rb->tail + 1); - rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); - rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); - rb->add = (pthread_cond_t *) (rb->lock + 1); - rb->del = rb->add + 1; - - rb->fd = shm_fd; - rb->api = api; - - return rb; -} - -void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) -{ - assert(rb); - - if (close(rb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - - if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) - LOG_DBG("Couldn't unmap shared memory."); - - free(rb); -} - -void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id) -{ - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - rb->acl[port_id] = 0; /* open */ - - pthread_mutex_unlock(rb->lock); -} - -int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) -{ - int ret = 0; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - rb->acl[port_id] = -1; - - if (rb->cntrs[port_id] > 0) - ret = -EBUSY; - - pthread_mutex_unlock(rb->lock); - - return ret; -} - -void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) -{ - char fn[25]; - struct lockfile * lf = NULL; - - assert(rb); - - if (rb->api != getpid()) { - lf = lockfile_open(); - if (lf == NULL) - return; - if (lockfile_owner(lf) == getpid()) { - LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", - rb->api, getpid()); - lockfile_close(lf); - } else { - LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", - getpid(), rb->api); - lockfile_close(lf); - return; - } - } - - if (close(rb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); - - if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) - LOG_DBG("Couldn't unmap shared memory."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to unlink shm."); - - free(rb); -} - -int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) -{ - assert(rb); - assert(e); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (rb->acl[e->port_id]) { - pthread_mutex_unlock(rb->lock); - return -ENOTALLOC; - } - - if (!shm_rbuff_free(rb)) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - if (shm_rbuff_empty(rb)) - pthread_cond_broadcast(rb->add); - - *head_el_ptr(rb) = *e; - *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); - - ++rb->cntrs[e->port_id]; - - pthread_mutex_unlock(rb->lock); - - return 0; -} - -int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) -{ - int ret = 0; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (shm_rbuff_empty(rb)) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - ret = tail_el_ptr(rb)->index; - --rb->cntrs[tail_el_ptr(rb)->port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_mutex_unlock(rb->lock); - - return ret; -} - -static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret = 0; - - assert(rb); - - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - if (ret != ETIMEDOUT) - ret = tail_el_ptr(rb)->port_id; - else - ret = -ETIMEDOUT; - - pthread_cleanup_pop(true); - - return ret; -} - -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - bool * set, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret; - - assert(rb); - - if (set == NULL) - return shm_ap_rbuff_peek_b_all(rb, timeout); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); - - while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) - && (ret != ETIMEDOUT)) { - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); - -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - while (!set[tail_el_ptr(rb)->port_id]) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->del, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->del, rb->lock); - -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - } - - if (ret != ETIMEDOUT) - ret = tail_el_ptr(rb)->port_id; - else - ret = -ETIMEDOUT; - - pthread_cleanup_pop(true); - - return ret; -} - - -struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) -{ - struct rb_entry * e = NULL; - - assert(rb); - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - while (shm_rbuff_empty(rb)) -#ifdef __APPLE__ - pthread_cond_wait(rb->add, rb->lock); -#else - if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - e = malloc(sizeof(*e)); - if (e != NULL) { - *e = *(rb->shm_base + *rb->tail); - --rb->cntrs[e->port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - } - - pthread_cleanup_pop(true); - - return e; -} - -ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) -{ - ssize_t idx = -1; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - idx = tail_el_ptr(rb)->index; - --rb->cntrs[port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_cond_broadcast(rb->del); - pthread_mutex_unlock(rb->lock); - - return idx; -} - -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret = 0; - ssize_t idx = -1; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (timeout != NULL) { - idx = -ETIMEDOUT; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); - - while ((shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) - && (ret != ETIMEDOUT)) { - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - while (tail_el_ptr(rb)->port_id != port_id) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->del, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->del, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - } - - if (ret != ETIMEDOUT) { - idx = tail_el_ptr(rb)->index; - --rb->cntrs[port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_cond_broadcast(rb->del); - } - - pthread_cleanup_pop(true); - - return idx; -} - -void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) -{ - assert(rb); - - pthread_mutex_lock(rb->lock); - *rb->tail = 0; - *rb->head = 0; - pthread_mutex_unlock(rb->lock); -} diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c new file mode 100644 index 00000000..c960bd25 --- /dev/null +++ b/src/lib/shm_flow_set.c @@ -0,0 +1,408 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Management of flow_sets for fqueue + * + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include +#include + +#define OUROBOROS_PREFIX "shm_flow_set" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define FN_MAX_CHARS 255 + +#define FQUEUESIZE (SHM_BUFFER_SIZE * sizeof(int)) + +#define SHM_FLOW_SET_FILE_SIZE (IRMD_MAX_FLOWS * sizeof(ssize_t) \ + + AP_MAX_FQUEUES * sizeof(size_t) \ + + AP_MAX_FQUEUES * sizeof(pthread_cond_t) \ + + AP_MAX_FQUEUES * FQUEUESIZE \ + + sizeof(pthread_mutex_t)) + +#define fqueue_ptr(fs, idx) (fs->fqueues + SHM_BUFFER_SIZE * idx) + +struct shm_flow_set { + ssize_t * mtable; + size_t * heads; + pthread_cond_t * conds; + int * fqueues; + pthread_mutex_t * lock; + + pid_t api; +}; + +struct shm_flow_set * shm_flow_set_create() +{ + struct shm_flow_set * set; + ssize_t * shm_base; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + char fn[FN_MAX_CHARS]; + mode_t mask; + int shm_fd; + int i; + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid()); + + set = malloc(sizeof(*set)); + if (set == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + mask = umask(0); + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("Failed creating flag file."); + free(set); + return NULL; + } + + umask(mask); + + if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) { + LOG_DBG("Failed to extend flag file."); + free(set); + close(shm_fd); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FLOW_SET_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + + free(set); + return NULL; + } + + set->mtable = shm_base; + set->heads = (size_t *) (set->mtable + IRMD_MAX_FLOWS); + set->conds = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); + set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); + set->lock = (pthread_mutex_t *) + (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + + pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(set->lock, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + for (i = 0; i < AP_MAX_FQUEUES; ++i) { + set->heads[i] = 0; + pthread_cond_init(&set->conds[i], &cattr); + } + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + set->mtable[i] = -1; + + set->api = getpid(); + + return set; +} + +struct shm_flow_set * shm_flow_set_open(pid_t api) +{ + struct shm_flow_set * set; + ssize_t * shm_base; + char fn[FN_MAX_CHARS]; + int shm_fd; + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", api); + + set = malloc(sizeof(*set)); + if (set == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); + free(set); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FLOW_SET_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + free(set); + return NULL; + } + + set->mtable = shm_base; + set->heads = (size_t *) (set->mtable + IRMD_MAX_FLOWS); + set->conds = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); + set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); + set->lock = (pthread_mutex_t *) + (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + + set->api = api; + + return set; +} + +void shm_flow_set_destroy(struct shm_flow_set * set) +{ + char fn[25]; + struct lockfile * lf = NULL; + + assert(set); + + if (set->api != getpid()) { + lf = lockfile_open(); + if (lf == NULL) { + LOG_ERR("Failed to open lockfile."); + return; + } + + if (lockfile_owner(lf) == getpid()) { + LOG_DBG("Flow set %d destroyed by IRMd %d.", + set->api, getpid()); + lockfile_close(lf); + } else { + LOG_ERR("AP-I %d tried to destroy flowset owned by %d.", + getpid(), set->api); + lockfile_close(lf); + return; + } + } + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->api); + + if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to unlink shm."); + + free(set); +} + +void shm_flow_set_close(struct shm_flow_set * set) +{ + assert(set); + + if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + free(set); +} + +void shm_flow_set_zero(struct shm_flow_set * shm_set, + ssize_t idx) +{ + ssize_t i = 0; + + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + if (shm_set->mtable[i] == idx) + shm_set->mtable[i] = -1; + + shm_set->heads[idx] = 0; + + pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_add(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] != -1) { + pthread_mutex_unlock(shm_set->lock); + return -EPERM; + } + + shm_set->mtable[port_id] = idx; + + pthread_mutex_unlock(shm_set->lock); + + return 0; +} + +void shm_flow_set_del(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == idx) + shm_set->mtable[port_id] = -1; + + pthread_mutex_unlock(shm_set->lock); +} + +int shm_flow_set_has(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + int ret = 0; + + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == idx) + ret = 1; + + pthread_mutex_unlock(shm_set->lock); + + return ret; +} + +void shm_flow_set_notify(struct shm_flow_set * shm_set, int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == -1) { + pthread_mutex_unlock(shm_set->lock); + return; + } + + *(fqueue_ptr(shm_set, shm_set->mtable[port_id]) + + (shm_set->heads[shm_set->mtable[port_id]])++) = port_id; + + pthread_cond_signal(&shm_set->conds[shm_set->mtable[port_id]]); + + pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_wait(const struct shm_flow_set * shm_set, + ssize_t idx, + int * fqueue, + const struct timespec * timeout) +{ + int ret = 0; + struct timespec abstime; + + assert(shm_set); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +#ifdef __APPLE__ + pthread_mutex_lock(shm_set->lock); +#else + if (pthread_mutex_lock(shm_set->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(shm_set->lock); + } +#endif + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) shm_set->lock); + + while (shm_set->heads[idx] == 0 && ret != -ETIMEDOUT) { + if (timeout != NULL) + ret = pthread_cond_timedwait(shm_set->conds + idx, + shm_set->lock, + &abstime); + else + ret = pthread_cond_wait(shm_set->conds + idx, + shm_set->lock); +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(shm_set->lock); + } +#endif + if (ret == ETIMEDOUT) { + ret = -ETIMEDOUT; + break; + } + } + + if (ret != -ETIMEDOUT) { + memcpy(fqueue, + fqueue_ptr(shm_set, idx), + shm_set->heads[idx] * sizeof(int)); + ret = shm_set->heads[idx]; + shm_set->heads[idx] = 0; + } + + pthread_cleanup_pop(true); + + return ret; +} diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c new file mode 100644 index 00000000..cf094488 --- /dev/null +++ b/src/lib/shm_rbuff.c @@ -0,0 +1,424 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for incoming SDUs + * + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include + +#define OUROBOROS_PREFIX "shm_rbuff" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define FN_MAX_CHARS 255 + +#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(ssize_t) \ + + 2 * sizeof(size_t) + sizeof(int8_t) \ + + sizeof(pthread_mutex_t) \ + + 2 * sizeof (pthread_cond_t)) + +#define shm_rbuff_used(rb) ((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \ + & (SHM_BUFFER_SIZE - 1)) +#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) +#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) + +struct shm_rbuff { + ssize_t * shm_base; /* start of entry */ + size_t * head; /* start of ringbuffer head */ + size_t * tail; /* start of ringbuffer tail */ + int8_t * acl; /* access control */ + pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_cond_t * add; /* SDU arrived */ + pthread_cond_t * del; /* SDU removed */ + pid_t api; /* api of the owner */ + int port_id; /* port_id of the flow */ +}; + +struct shm_rbuff * shm_rbuff_create(int port_id) +{ + struct shm_rbuff * rb; + int shm_fd; + ssize_t * shm_base; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + char fn[FN_MAX_CHARS]; + mode_t mask; + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", getpid(), port_id); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + mask = umask(0); + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("Failed creating ring buffer."); + free(rb); + return NULL; + } + + umask(mask); + + if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { + LOG_DBG("Failed to extend ringbuffer."); + free(rb); + close(shm_fd); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->lock = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; + + pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(rb->lock, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + pthread_cond_init(rb->add, &cattr); + pthread_cond_init(rb->del, &cattr); + + *rb->acl = 0; + *rb->head = 0; + *rb->tail = 0; + + rb->api = getpid(); + rb->port_id = port_id; + + return rb; +} + +struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id) +{ + struct shm_rbuff * rb; + int shm_fd; + ssize_t * shm_base; + char fn[FN_MAX_CHARS]; + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", api, port_id); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); + free(rb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->lock = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; + + rb->api = api; + rb->port_id = port_id; + + return rb; +} + +void shm_rbuff_close(struct shm_rbuff * rb) +{ + assert(rb); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + free(rb); +} + +void shm_rbuff_destroy(struct shm_rbuff * rb) +{ + char fn[25]; + struct lockfile * lf = NULL; + + assert(rb); + + if (rb->api != getpid()) { + lf = lockfile_open(); + if (lf == NULL) { + LOG_ERR("Failed to open lockfile."); + return; + } + + if (lockfile_owner(lf) == getpid()) { + LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", + rb->api, getpid()); + lockfile_close(lf); + } else { + LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", + getpid(), rb->api); + lockfile_close(lf); + return; + } + } + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to unlink shm."); + + free(rb); +} + +int shm_rbuff_write(struct shm_rbuff * rb, ssize_t idx) +{ + assert(rb); + assert(idx >= 0); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (*rb->acl) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + + if (!shm_rbuff_free(rb)) { + pthread_mutex_unlock(rb->lock); + return -1; + } + + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + + *head_el_ptr(rb) = idx; + *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); + + pthread_mutex_unlock(rb->lock); + + return 0; +} + +ssize_t shm_rbuff_read(struct shm_rbuff * rb) +{ + int ret = 0; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (shm_rbuff_empty(rb)) { + pthread_mutex_unlock(rb->lock); + return -1; + } + + ret = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + ssize_t idx = -1; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (timeout != NULL) { + idx = -ETIMEDOUT; + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (shm_rbuff_empty(rb) && (ret != ETIMEDOUT)) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->add, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->add, rb->lock); +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) { + idx = -ETIMEDOUT; + break; + } + } + + if (idx != -ETIMEDOUT) { + idx = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); + pthread_cond_broadcast(rb->del); + } + + pthread_cleanup_pop(true); + + return idx; +} + +int shm_rbuff_block(struct shm_rbuff * rb) +{ + int ret = 0; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + *rb->acl = -1; + + if (!shm_rbuff_empty(rb)) + ret = -EBUSY; + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +void shm_rbuff_unblock(struct shm_rbuff * rb) +{ + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + *rb->acl = 0; /* open */ + + pthread_mutex_unlock(rb->lock); +} + +void shm_rbuff_reset(struct shm_rbuff * rb) +{ + assert(rb); + + pthread_mutex_lock(rb->lock); + *rb->tail = 0; + *rb->head = 0; + pthread_mutex_unlock(rb->lock); +} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index f6683dc2..e5a37577 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -90,7 +90,6 @@ struct shm_rdrbuff { pthread_cond_t * full; /* run sanitizer when buffer full */ pid_t * api; /* api of the irmd owner */ enum qos_cube qos; /* qos id which this buffer serves */ - int fd; }; static void garbage_collect(struct shm_rdrbuff * rdrb) @@ -189,17 +188,11 @@ struct shm_rdrbuff * shm_rdrbuff_create() if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { LOG_DBGF("Failed to extend shared memory map."); free(shm_rdrb_fn); + close(shm_fd); free(rdrb); return NULL; } -#ifndef __APPLE - if (write(shm_fd, "", 1) != 1) { - LOG_DBGF("Failed to finalise extension of shared memory map."); - free(shm_rdrb_fn); - free(rdrb); - return NULL; - } -#endif + shm_base = mmap(NULL, SHM_FILE_SIZE, PROT_READ | PROT_WRITE, @@ -207,6 +200,8 @@ struct shm_rdrbuff * shm_rdrbuff_create() shm_fd, 0); + close(shm_fd); + if (shm_base == MAP_FAILED) { LOG_DBGF("Failed to map shared memory."); if (shm_unlink(shm_rdrb_fn) == -1) @@ -235,6 +230,9 @@ struct shm_rdrbuff * shm_rdrbuff_create() pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif pthread_cond_init(rdrb->full, &cattr); pthread_cond_init(rdrb->healthy, &cattr); @@ -246,7 +244,6 @@ struct shm_rdrbuff * shm_rdrbuff_create() *rdrb->api = getpid(); rdrb->qos = qos; - rdrb->fd = shm_fd; free(shm_rdrb_fn); @@ -287,10 +284,11 @@ struct shm_rdrbuff * shm_rdrbuff_open() MAP_SHARED, shm_fd, 0); + + close(shm_fd); + if (shm_base == MAP_FAILED) { LOG_DBGF("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); if (shm_unlink(shm_rdrb_fn) == -1) LOG_DBG("Failed to unlink invalid shm."); free(shm_rdrb_fn); @@ -309,7 +307,6 @@ struct shm_rdrbuff * shm_rdrbuff_open() rdrb->api = (pid_t *) (rdrb->full + 1); rdrb->qos = qos; - rdrb->fd = shm_fd; free(shm_rdrb_fn); @@ -400,9 +397,6 @@ void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) { assert(rdrb); - if (close(rdrb->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) LOG_DBGF("Couldn't unmap shared memory."); @@ -420,9 +414,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) return; } - if (close(rdrb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 7d41b497..0ca40326 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -23,7 +23,7 @@ #define _POSIX_C_SOURCE 199506L -#include +#include #include #include @@ -53,6 +53,8 @@ struct c { float rtt_avg; float rtt_m2; + flow_set_t * flows; + /* needs locking */ struct timespec * times; pthread_mutex_t lock; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 4742d0de..40f75785 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -55,20 +55,21 @@ void * reader(void * o) struct timespec timeout = {2, 0}; struct timespec now = {0, 0}; - struct oping_msg * msg; char buf[OPING_BUF_SIZE]; + struct oping_msg * msg = (struct oping_msg *) buf; int fd = 0; int msg_len = 0; float ms = 0; float d = 0; - - msg = (struct oping_msg *) buf; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; /* FIXME: use flow timeout option once we have it */ - while(client.rcvd != client.count && - (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) { - flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK); - while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { + while (client.rcvd != client.count + && flow_event_wait(client.flows, fq, &timeout) != -ETIMEDOUT) { + while ((fd = fqueue_next(fq)) >= 0) { + msg_len = flow_read(fd, buf, OPING_BUF_SIZE); if (msg_len < 0) continue; @@ -165,12 +166,20 @@ int client_main() struct timespec tic; struct timespec toc; - int fd = flow_alloc(client.s_apn, NULL, NULL); + int fd; + + client.flows = flow_set_create(); + if (client.flows == NULL) + return 0; + + fd = flow_alloc(client.s_apn, NULL, NULL); if (fd < 0) { printf("Failed to allocate flow.\n"); return -1; } + flow_set_add(client.flows, fd); + if (flow_alloc_res(fd)) { printf("Flow allocation refused.\n"); flow_dealloc(fd); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 845f0cbd..8a5a3512 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -69,16 +69,23 @@ void * server_thread(void *o) struct oping_msg * msg = (struct oping_msg *) buf; struct timespec now = {0, 0}; struct timespec timeout = {0, 100 * MILLION}; + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(server.flows, &timeout); - if (fd == -ETIMEDOUT) - continue; - if (fd < 0) { - printf("Failed to get active fd.\n"); + int ret = flow_event_wait(server.flows, fq, &timeout); + if (ret == -ETIMEDOUT) continue; + + if (ret < 0) { + printf("Event error.\n"); + break; } - while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { + + while ((fd = fqueue_next(fq)) >= 0) { + msg_len = flow_read(fd, buf, OPING_BUF_SIZE); if (msg_len < 0) continue; @@ -160,8 +167,6 @@ int server_main() if (server.flows == NULL) return 0; - flow_set_zero(server.flows); - pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL); pthread_create(&server.accept_pt, NULL, accept_thread, NULL); pthread_create(&server.server_pt, NULL, server_thread, NULL); -- cgit v1.2.3