From 51bb05f15b78fbfdf53417ec1d1c21e999c0e556 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Wed, 26 Sep 2018 14:49:39 +0200 Subject: include: Remove _DEFAULT_SOURCE in endian.h This removes the _DEFAULT_SOURCE definition in the endian header as it should not be there. This avoids double and conflicting definitions. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/irmd/main.c | 7 +++++-- src/irmd/proc_table.c | 4 ++++ src/irmd/registry.c | 4 ++++ 3 files changed, 13 insertions(+), 2 deletions(-) (limited to 'src/irmd') diff --git a/src/irmd/main.c b/src/irmd/main.c index aeb43f0d..4c6908bc 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -20,8 +20,11 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#define _POSIX_C_SOURCE 200812L -#define __XSI_VISIBLE 500 +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif #include "config.h" diff --git a/src/irmd/proc_table.c b/src/irmd/proc_table.c index e8d08447..115eb2da 100644 --- a/src/irmd/proc_table.c +++ b/src/irmd/proc_table.c @@ -20,7 +20,11 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else #define _POSIX_C_SOURCE 200112L +#endif #include "config.h" diff --git a/src/irmd/registry.c b/src/irmd/registry.c index 145a7452..6c86da24 100644 --- a/src/irmd/registry.c +++ b/src/irmd/registry.c @@ -20,7 +20,11 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else #define _POSIX_C_SOURCE 200809L +#endif #include "config.h" -- cgit v1.2.3 From e569fed013e7e01b49b591b192eabdb0431b4976 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 27 Sep 2018 11:45:41 +0200 Subject: irmd: Add missing unlocks and avoid NULL dereference There were missing unlocks in certain error conditions and some NULL pointers were passed to strcmp which is undefined behavior. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/irmd/main.c | 46 ++++++++++++++++++++++++---------------------- src/irmd/proc_table.c | 3 +-- src/irmd/prog_table.c | 3 +-- 3 files changed, 26 insertions(+), 26 deletions(-) (limited to 'src/irmd') diff --git a/src/irmd/main.c b/src/irmd/main.c index 4c6908bc..634bf4de 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -349,8 +349,10 @@ static struct ipcp_entry * get_ipcp_by_dst_name(const char * name, len = IPCP_HASH_LEN(e); hash = malloc(len); - if (hash == NULL) + if (hash == NULL) { + pthread_rwlock_unlock(&irmd.reg_lock); return NULL; + } str_hash(e->dir_hash_algo, hash, name); @@ -828,13 +830,13 @@ static int unbind_program(char * prog, if (name == NULL) prog_table_del(&irmd.prog_table, prog); else { - struct prog_entry * e = prog_table_get(&irmd.prog_table, prog); - prog_entry_del_name(e, name); - } + struct prog_entry * en = prog_table_get(&irmd.prog_table, prog); + prog_entry_del_name(en, name); - e = registry_get_entry(&irmd.registry, name); - if (e != NULL) - reg_entry_del_prog(e, prog); + e = registry_get_entry(&irmd.registry, name); + if (e != NULL) + reg_entry_del_prog(e, prog); + } pthread_rwlock_unlock(&irmd.reg_lock); @@ -856,13 +858,14 @@ static int unbind_process(pid_t pid, if (name == NULL) proc_table_del(&irmd.proc_table, pid); else { - struct proc_entry * e = proc_table_get(&irmd.proc_table, pid); - proc_entry_del_name(e, name); - } + struct proc_entry * en = proc_table_get(&irmd.proc_table, pid); + if (en != NULL) + proc_entry_del_name(en, name); - e = registry_get_entry(&irmd.registry, name); - if (e != NULL) - reg_entry_del_pid(e, pid); + e = registry_get_entry(&irmd.registry, name); + if (e != NULL) + reg_entry_del_pid(e, pid); + } pthread_rwlock_unlock(&irmd.reg_lock); @@ -922,6 +925,7 @@ static ssize_t list_ipcps(ipcp_info_msg_t *** ipcps, return 0; fail: + pthread_rwlock_unlock(&irmd.reg_lock); while (i >= 0) { free((*ipcps)[i]->layer); free((*ipcps)[i]->name); @@ -1180,10 +1184,8 @@ static int flow_accept(pid_t pid, if (ret == -1) return -EPIPE; - if (irmd_get_state() != IRMD_RUNNING) { - reg_entry_set_state(re, REG_NAME_NULL); + if (irmd_get_state() != IRMD_RUNNING) return -EIRMD; - } pthread_rwlock_rdlock(&irmd.flows_lock); @@ -2163,6 +2165,11 @@ static int irm_init(void) } } + if (irmd.lf == NULL) { + log_err("Failed to create lockfile."); + goto fail_lockfile; + } + if (stat(SOCK_PATH, &st) == -1) { if (mkdir(SOCK_PATH, 0777)) { log_err("Failed to create sockets directory."); @@ -2187,11 +2194,6 @@ static int irm_init(void) goto fail_sock_opt; } - if (irmd.lf == NULL) { - log_err("Failed to create lockfile."); - goto fail_sock_opt; - } - if ((irmd.rdrb = shm_rdrbuff_create()) == NULL) { log_err("Failed to create rdrbuff."); goto fail_rdrbuff; @@ -2210,7 +2212,7 @@ static int irm_init(void) gcry_control(GCRYCTL_INITIALIZATION_FINISHED); #endif - irmd.state = IRMD_RUNNING; + irmd_set_state(IRMD_RUNNING); log_info("Ouroboros IPC Resource Manager daemon started..."); diff --git a/src/irmd/proc_table.c b/src/irmd/proc_table.c index 115eb2da..6f9d8e20 100644 --- a/src/irmd/proc_table.c +++ b/src/irmd/proc_table.c @@ -172,8 +172,7 @@ void proc_entry_del_name(struct proc_entry * e, struct str_el * s = list_entry(p, struct str_el, next); if (!strcmp(name, s->str)) { list_del(&s->next); - if (s->str != NULL) - free(s->str); + free(s->str); free(s); } } diff --git a/src/irmd/prog_table.c b/src/irmd/prog_table.c index bd69e156..9aa9be9d 100644 --- a/src/irmd/prog_table.c +++ b/src/irmd/prog_table.c @@ -81,8 +81,7 @@ void prog_entry_destroy(struct prog_entry * e) list_for_each_safe(p, h, &e->names) { struct str_el * s = list_entry(p, struct str_el, next); list_del(&s->next); - if (s->str != NULL) - free(s->str); + free(s->str); free(s); } -- cgit v1.2.3 From b802b25ddfe6f1b6ecabe3ba70e3dac2e99e7a50 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 4 Oct 2018 18:06:32 +0200 Subject: lib: Pass qosspec at flow allocation The flow allocator now passes the full qos specification to the endpoint, instead of just a cube. This is a more flexible architecture, as it makes QoS cubes internal to the layers. Adds endianness transforms for the flow allocator protocol in the normal IPCP. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/ipcp-dev.h | 2 +- include/ouroboros/np1_flow.h | 2 +- include/ouroboros/qos.h | 22 ++++------ include/ouroboros/sockets.h.in | 11 +++++ src/ipcpd/eth/eth.c | 51 ++++++++++++++++------ src/ipcpd/ipcp.c | 13 +++++- src/ipcpd/ipcp.h | 2 +- src/ipcpd/local/main.c | 4 +- src/ipcpd/normal/dt.c | 2 - src/ipcpd/normal/fa.c | 57 +++++++++++++++++------- src/ipcpd/normal/fa.h | 2 +- src/ipcpd/raptor/CMakeLists.txt | 1 + src/ipcpd/raptor/main.c | 77 +++++++++++++++++++------------- src/ipcpd/udp/main.c | 48 +++++++++++++------- src/irmd/ipcp.c | 13 +++--- src/irmd/ipcp.h | 2 +- src/irmd/irm_flow.c | 4 +- src/irmd/irm_flow.h | 5 ++- src/irmd/main.c | 22 ++++++---- src/lib/CMakeLists.txt | 4 +- src/lib/dev.c | 87 ++++++++++++++++-------------------- src/lib/frct.c | 5 +-- src/lib/ipcpd_messages.proto | 3 +- src/lib/irmd_messages.proto | 3 +- src/lib/qos.c | 97 +++++++++++++++++++---------------------- src/lib/qoscube.c | 30 +++---------- src/lib/qosspec.proto | 33 ++++++++++++++ src/lib/sockets.c | 35 +++++++++++++++ 28 files changed, 388 insertions(+), 249 deletions(-) create mode 100644 src/lib/qosspec.proto (limited to 'src/irmd') diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 9a33c25a..a0ed026c 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -31,7 +31,7 @@ int ipcp_create_r(pid_t pid, int ipcp_flow_req_arr(pid_t pid, const uint8_t * dst, size_t len, - qoscube_t cube); + qosspec_t qs); int ipcp_flow_alloc_reply(int fd, int response); diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index 789e82df..3435c24a 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -29,7 +29,7 @@ int np1_flow_alloc(pid_t n_pid, int port_id, - qoscube_t qc); + qosspec_t qs); int np1_flow_resp(int port_id); diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h index 011828d7..2b93f1d0 100644 --- a/include/ouroboros/qos.h +++ b/include/ouroboros/qos.h @@ -27,26 +27,20 @@ #include typedef struct qos_spec { - uint32_t delay; /* In ms */ - uint64_t bandwidth; /* In bits/s */ - uint8_t availability; /* Class of 9s */ - uint32_t loss; /* Packet loss */ - uint8_t in_order; /* In-order delivery, enables FRCT */ - uint32_t maximum_interruption; /* In ms */ + uint32_t delay; /* In ms */ + uint64_t bandwidth; /* In bits/s */ + uint8_t availability; /* Class of 9s */ + uint32_t loss; /* Packet loss */ + uint32_t ber; /* Bit error rate, errors per billion bits */ + uint8_t in_order; /* In-order delivery, enables FRCT */ + uint32_t max_gap; /* In ms */ } qosspec_t; qosspec_t qos_raw; +qosspec_t qos_raw_no_errors; qosspec_t qos_best_effort; qosspec_t qos_video; qosspec_t qos_voice; qosspec_t qos_data; -__BEGIN_DECLS - -int qosspec_init(qosspec_t * qs); - -int qosspec_fini(qosspec_t * qs); - -__END_DECLS - #endif /* OUROBOROS_QOS_H */ diff --git a/include/ouroboros/sockets.h.in b/include/ouroboros/sockets.h.in index 4557a9ef..368923db 100644 --- a/include/ouroboros/sockets.h.in +++ b/include/ouroboros/sockets.h.in @@ -23,6 +23,8 @@ #ifndef OUROBOROS_SOCKETS_H #define OUROBOROS_SOCKETS_H +#include + #include #include "ipcp_config.pb-c.h" @@ -36,6 +38,9 @@ typedef IpcpInfoMsg ipcp_info_msg_t; #include "ipcpd_messages.pb-c.h" typedef IpcpMsg ipcp_msg_t; +#include "qosspec.pb-c.h" +typedef QosspecMsg qosspec_msg_t; + #define SOCK_PATH "/var/run/ouroboros/" #define SOCK_PATH_SUFFIX ".sock" @@ -53,4 +58,10 @@ int client_socket_open(char * file_name); irm_msg_t * send_recv_irm_msg(irm_msg_t * msg); + +/* qos message conversion needed in different components */ +qosspec_msg_t spec_to_msg(qosspec_t * qs); + +qosspec_t msg_to_spec(qosspec_msg_t * msg); + #endif diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 44ef3756..6fd7b805 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -146,15 +146,27 @@ #define NAME_QUERY_REPLY 3 struct mgmt_msg { - uint8_t code; #if defined(BUILD_ETH_DIX) uint16_t seid; uint16_t deid; #elif defined(BUILD_ETH_LLC) uint8_t ssap; uint8_t dsap; + /* QoS here for alignment */ + uint8_t code; + uint8_t availability; +#endif + /* QoS parameters from spec, aligned */ + uint32_t loss; + uint64_t bandwidth; + uint32_t ber; + uint32_t max_gap; + uint32_t delay; + uint8_t in_order; +#if defined (BUILD_ETH_DIX) + uint8_t code; + uint8_t availability; #endif - uint8_t qoscube; int8_t response; } __attribute__((packed)); @@ -433,7 +445,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, uint8_t ssap, #endif const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { uint8_t * buf; struct mgmt_msg * msg; @@ -453,7 +465,14 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, #elif defined(BUILD_ETH_LLC) msg->ssap = ssap; #endif - msg->qoscube = cube; + + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -523,7 +542,7 @@ static int eth_ipcp_req(uint8_t * r_addr, uint8_t r_sap, #endif const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, ALLOC_TIMEO * MILLION}; struct timespec abstime; @@ -547,7 +566,7 @@ static int eth_ipcp_req(uint8_t * r_addr, } /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -687,11 +706,20 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, uint8_t * r_addr) { struct mgmt_msg * msg; + qosspec_t qs; msg = (struct mgmt_msg *) buf; switch (msg->code) { case FLOW_REQ: + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { eth_ipcp_req(r_addr, @@ -701,7 +729,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, msg->ssap, #endif buf + sizeof(*msg), - msg->qoscube); + qs); } break; case FLOW_REPLY: @@ -1553,7 +1581,7 @@ static int eth_ipcp_query(const uint8_t * hash) static int eth_ipcp_flow_alloc(int fd, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { #ifdef BUILD_ETH_LLC uint8_t ssap = 0; @@ -1565,11 +1593,6 @@ static int eth_ipcp_flow_alloc(int fd, assert(hash); - if (cube > QOS_CUBE_DATA) { - log_dbg("Unsupported QoS requested."); - return -1; - } - if (!shim_data_dir_has(eth_data.shim_data, hash)) { log_err("Destination unreachable."); return -1; @@ -1597,7 +1620,7 @@ static int eth_ipcp_flow_alloc(int fd, #elif defined(BUILD_ETH_LLC) ssap, #endif - hash, cube) < 0) { + hash, qs) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 5ea54533..e415bbd9 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -20,6 +20,13 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#define __XSI_VISIBLE 500 +#endif + #if defined(__linux__) && !defined(DISABLE_CORE_LOCK) #define _GNU_SOURCE #define NPROC (sysconf(_SC_NPROCESSORS_ONLN)) @@ -198,6 +205,7 @@ static void * mainloop(void * o) layer_info_msg_t layer_info = LAYER_INFO_MSG__INIT; int fd = -1; struct cmd * cmd; + qosspec_t qs; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -422,9 +430,10 @@ static void * mainloop(void * o) break; } + qs = msg_to_spec(msg->qosspec); fd = np1_flow_alloc(msg->pid, msg->port_id, - msg->qoscube); + qs); if (fd < 0) { log_err("Failed allocating fd on port_id %d.", msg->port_id); @@ -435,7 +444,7 @@ static void * mainloop(void * o) ret_msg.result = ipcpi.ops->ipcp_flow_alloc(fd, msg->hash.data, - msg->qoscube); + qs); break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: ret_msg.has_result = true; diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 5417fc74..13751b6d 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -60,7 +60,7 @@ struct ipcp_ops { int (* ipcp_flow_alloc)(int fd, const uint8_t * dst, - qoscube_t qos); + qosspec_t qs); int (* ipcp_flow_alloc_resp)(int fd, int response); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index c83f85fe..8eae7503 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -183,7 +183,7 @@ static int ipcp_local_query(const uint8_t * hash) static int ipcp_local_flow_alloc(int fd, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; struct timespec abstime; @@ -212,7 +212,7 @@ static int ipcp_local_flow_alloc(int fd, assert(ipcpi.alloc_id == -1); - out_fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + out_fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (out_fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_dbg("Flow allocation failed: %d", out_fd); diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index c3f8f198..a350e4be 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -31,8 +31,6 @@ #define DT "dt" #define OUROBOROS_PREFIX DT -/* FIXME: fix #defines and remove endian.h include. */ -#include #include #include #include diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 10f0a863..4c82e0e0 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -57,8 +57,15 @@ struct fa_msg { uint32_t r_eid; uint32_t s_eid; uint8_t code; - uint8_t qc; int8_t response; + /* QoS parameters from spec, aligned */ + uint8_t availability; + uint8_t in_order; + uint32_t delay; + uint64_t bandwidth; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; } __attribute__((packed)); struct { @@ -100,6 +107,7 @@ static void fa_post_sdu(void * comp, int fd; uint8_t * buf; struct fa_msg * msg; + qosspec_t qs; (void) comp; @@ -142,10 +150,18 @@ static void fa_post_sdu(void * comp, assert(ipcpi.alloc_id == -1); + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + fd = ipcp_flow_req_arr(getpid(), (uint8_t *) (msg + 1), ipcp_dir_hash_len(), - msg->qc); + qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Failed to get fd for flow."); @@ -155,8 +171,8 @@ static void fa_post_sdu(void * comp, pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[fd] = msg->s_eid; - fa.r_addr[fd] = msg->s_addr; + fa.r_eid[fd] = ntoh32(msg->s_eid); + fa.r_addr[fd] = ntoh64(msg->s_addr); pthread_rwlock_unlock(&fa.flows_lock); @@ -169,14 +185,14 @@ static void fa_post_sdu(void * comp, case FLOW_REPLY: pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[msg->r_eid] = msg->s_eid; + fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); - ipcp_flow_alloc_reply(msg->r_eid, msg->response); + ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response); if (msg->response < 0) - destroy_conn(msg->r_eid); + destroy_conn(ntoh32(msg->r_eid)); else - sdu_sched_add(fa.sdu_sched, msg->r_eid); + sdu_sched_add(fa.sdu_sched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -227,11 +243,12 @@ void fa_stop(void) int fa_alloc(int fd, const uint8_t * dst, - qoscube_t qc) + qosspec_t qs) { struct fa_msg * msg; uint64_t addr; struct shm_du_buff * sdb; + qoscube_t qc; addr = dir_query(dst); if (addr == 0) @@ -240,14 +257,22 @@ int fa_alloc(int fd, if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) return -1; - msg = (struct fa_msg *) shm_du_buff_head(sdb); - msg->code = FLOW_REQ; - msg->qc = qc; - msg->s_eid = fd; - msg->s_addr = ipcpi.dt_addr; + msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg->code = FLOW_REQ; + msg->s_eid = hton32(fd); + msg->s_addr = hton64(ipcpi.dt_addr); + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, dst, ipcp_dir_hash_len()); + qc = qos_spec_to_cube(qs); + if (dt_write_sdu(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; @@ -302,8 +327,8 @@ int fa_alloc_resp(int fd, msg = (struct fa_msg *) shm_du_buff_head(sdb); msg->code = FLOW_REPLY; - msg->r_eid = fa.r_eid[fd]; - msg->s_eid = fd; + msg->r_eid = hton32(fa.r_eid[fd]); + msg->s_eid = hton32(fd); msg->response = response; if (response < 0) { diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index 87819d6f..a98d834a 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -36,7 +36,7 @@ void fa_stop(void); int fa_alloc(int fd, const uint8_t * dst, - qoscube_t qos); + qosspec_t qs); int fa_alloc_resp(int fd, int response); diff --git a/src/ipcpd/raptor/CMakeLists.txt b/src/ipcpd/raptor/CMakeLists.txt index 06e6ee29..1883d9bb 100644 --- a/src/ipcpd/raptor/CMakeLists.txt +++ b/src/ipcpd/raptor/CMakeLists.txt @@ -16,6 +16,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux") find_path(RAPTOR_KERNEL_MODULE NAMES raptor.ko.gz + raptor.ko.xz HINTS /lib/modules/${CMAKE_SYSTEM_VERSION}/extra ) diff --git a/src/ipcpd/raptor/main.c b/src/ipcpd/raptor/main.c index 4f0099b3..a01889ec 100644 --- a/src/ipcpd/raptor/main.c +++ b/src/ipcpd/raptor/main.c @@ -90,11 +90,18 @@ #define NAME_QUERY_REPLY 3 struct mgmt_msg { - uint8_t code; - uint8_t ssap; - uint8_t dsap; - uint8_t qoscube; - int8_t response; + uint8_t code; + uint8_t ssap; + uint8_t dsap; + int8_t response; + /* QoS parameters from spec, aligned */ + uint32_t loss; + uint64_t bandwidth; + uint32_t ber; + uint32_t max_gap; + uint32_t delay; + uint8_t in_order; + uint8_t availability; } __attribute__((packed)); struct ef { @@ -278,7 +285,7 @@ static int raptor_send_frame(struct shm_du_buff * sdb, static int raptor_sap_alloc(uint8_t ssap, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { struct mgmt_msg * msg; struct shm_du_buff * sdb; @@ -288,10 +295,16 @@ static int raptor_sap_alloc(uint8_t ssap, return -1; } - msg = (struct mgmt_msg *) shm_du_buff_head(sdb); - msg->code = FLOW_REQ; - msg->ssap = ssap; - msg->qoscube = cube; + msg = (struct mgmt_msg *) shm_du_buff_head(sdb); + msg->code = FLOW_REQ; + msg->ssap = ssap; + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -306,15 +319,15 @@ static int raptor_sap_alloc(uint8_t ssap, return 0; } -static int raptor_sap_alloc_resp(uint8_t ssap, - uint8_t dsap, - int response) +static int raptor_sap_alloc_resp(uint8_t ssap, + uint8_t dsap, + int response) { - struct mgmt_msg * msg; + struct mgmt_msg * msg; struct shm_du_buff * sdb; if (ipcp_sdb_reserve(&sdb, sizeof(*msg)) < 0) { - log_err("failed to reserve sdb for management frame."); + log_err("Failed to reserve sdb for management frame."); return -1; } @@ -337,7 +350,7 @@ static int raptor_sap_alloc_resp(uint8_t ssap, static int raptor_sap_req(uint8_t r_sap, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000}; struct timespec abstime; @@ -361,7 +374,7 @@ static int raptor_sap_req(uint8_t r_sap, } /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -424,12 +437,12 @@ static int raptor_name_query_req(const uint8_t * hash) return 0; if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len()) < 0) { - log_err("failed to reserve sdb for management frame."); + log_err("Failed to reserve sdb for management frame."); return -1; } - msg = (struct mgmt_msg *) shm_du_buff_head(sdb); - msg->code = NAME_QUERY_REPLY; + msg = (struct mgmt_msg *) shm_du_buff_head(sdb); + msg->code = NAME_QUERY_REPLY; memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -456,8 +469,9 @@ static int raptor_name_query_reply(const uint8_t * hash) static int raptor_mgmt_frame(const uint8_t * buf, size_t len) { - struct mgmt_msg * msg = (struct mgmt_msg *) buf; - uint8_t * hash = (uint8_t *) (msg + 1); + struct mgmt_msg * msg = (struct mgmt_msg *) buf; + uint8_t * hash = (uint8_t *) (msg + 1); + qosspec_t qs; switch (msg->code) { case FLOW_REQ: @@ -466,8 +480,16 @@ static int raptor_mgmt_frame(const uint8_t * buf, return -1; } + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + if (shim_data_reg_has(raptor_data.shim_data, hash)) - raptor_sap_req(msg->ssap, hash, msg->qoscube); + raptor_sap_req(msg->ssap, hash, qs); break; case FLOW_REPLY: if (len != sizeof(*msg)) { @@ -901,7 +923,7 @@ static int raptor_query(const uint8_t * hash) static int raptor_flow_alloc(int fd, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { uint8_t ssap = 0; @@ -909,11 +931,6 @@ static int raptor_flow_alloc(int fd, assert(hash); - if (cube != QOS_CUBE_BE) { - log_dbg("Unsupported QoS requested."); - return -1; - } - if (!shim_data_dir_has(raptor_data.shim_data, hash)) { log_err("Destination unreachable."); return -1; @@ -932,7 +949,7 @@ static int raptor_flow_alloc(int fd, pthread_rwlock_unlock(&raptor_data.flows_lock); - if (raptor_sap_alloc(ssap, hash, cube) < 0) { + if (raptor_sap_alloc(ssap, hash, qs) < 0) { pthread_rwlock_wrlock(&raptor_data.flows_lock); bmp_release(raptor_data.saps, raptor_data.fd_to_ef[fd].sap); raptor_data.fd_to_ef[fd].sap = -1; diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 6a350da0..96820662 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -73,8 +73,15 @@ struct mgmt_msg { uint16_t src_udp_port; uint16_t dst_udp_port; uint8_t code; - uint8_t qoscube; uint8_t response; + /* QoS parameters from spec, aligned */ + uint8_t availability; + uint8_t in_order; + uint32_t delay; + uint64_t bandwidth; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; } __attribute__((packed)); struct uf { @@ -219,7 +226,7 @@ static int send_shim_udp_msg(uint8_t * buf, static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, uint16_t src_udp_port, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { uint8_t * buf; struct mgmt_msg * msg; @@ -235,7 +242,13 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, msg = (struct mgmt_msg *) buf; msg->code = FLOW_REQ; msg->src_udp_port = src_udp_port; - msg->qoscube = cube; + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, dst, ipcp_dir_hash_len()); @@ -272,7 +285,7 @@ static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr, static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; struct timespec abstime; @@ -331,7 +344,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, } /* reply to IRM */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -436,7 +449,7 @@ static void * ipcp_udp_listener(void * o) while (true) { struct mgmt_msg * msg = NULL; - + qosspec_t qs; memset(&buf, 0, SHIM_UDP_MSG_SIZE); n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, (struct sockaddr *) &c_saddr, @@ -455,9 +468,16 @@ static void * ipcp_udp_listener(void * o) switch (msg->code) { case FLOW_REQ: c_saddr.sin_port = msg->src_udp_port; + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); ipcp_udp_port_req(&c_saddr, (uint8_t *) (msg + 1), - msg->qoscube); + qs); break; case FLOW_REPLY: ipcp_udp_port_alloc_reply(msg->src_udp_port, @@ -555,7 +575,8 @@ static void * ipcp_udp_sdu_loop(void * o) pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_cleanup_push((void (*)(void *)) ipcp_sdb_release, + pthread_cleanup_push((void (*)(void *)) + ipcp_sdb_release, (void *) sdb); if (send(fd, shm_du_buff_head(sdb), @@ -968,7 +989,7 @@ static int ipcp_udp_query(const uint8_t * hash) static int ipcp_udp_flow_alloc(int fd, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct sockaddr_in r_saddr; /* server address */ struct sockaddr_in f_saddr; /* flow */ @@ -978,12 +999,9 @@ static int ipcp_udp_flow_alloc(int fd, log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst)); - assert(dst); + (void) qs; - if (cube > QOS_CUBE_DATA) { - log_dbg("Unsupported QoS requested."); - return -1; - } + assert(dst); skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (skfd < 0) @@ -1034,7 +1052,7 @@ static int ipcp_udp_flow_alloc(int fd, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, cube) < 0) { + if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, qs) < 0) { pthread_rwlock_wrlock(&udp_data.flows_lock); udp_data.fd_to_uf[fd].udp = -1; diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index dc8f1c6e..0bdf674b 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -433,11 +433,12 @@ int ipcp_flow_alloc(pid_t pid, pid_t n_pid, const uint8_t * dst, size_t len, - qoscube_t cube) + qosspec_t qs) { - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; + ipcp_msg_t msg = IPCP_MSG__INIT; + qosspec_msg_t qs_msg; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; assert(dst); @@ -449,8 +450,8 @@ int ipcp_flow_alloc(pid_t pid, msg.has_hash = true; msg.hash.len = len; msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = cube; + qs_msg = spec_to_msg(&qs); + msg.qosspec = &qs_msg; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 8ff062b2..28396333 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -67,7 +67,7 @@ int ipcp_flow_alloc(pid_t pid, pid_t n_pid, const uint8_t * dst, size_t len, - qoscube_t qos); + qosspec_t qs); int ipcp_flow_alloc_resp(pid_t pid, int port_id, diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index dfbe5e95..a5a9f28c 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -39,7 +39,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, int port_id, - qoscube_t qc) + qosspec_t qs) { pthread_condattr_t cattr; struct irm_flow * f = malloc(sizeof(*f)); @@ -61,7 +61,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, f->n_pid = n_pid; f->n_1_pid = n_1_pid; f->port_id = port_id; - f->qc = qc; + f->qs = qs; f->n_rb = shm_rbuff_create(n_pid, port_id); if (f->n_rb == NULL) { diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index d53984e8..f4de8187 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -43,11 +43,12 @@ struct irm_flow { struct list_head next; int port_id; - qoscube_t qc; pid_t n_pid; pid_t n_1_pid; + qosspec_t qs; + struct shm_rbuff * n_rb; struct shm_rbuff * n_1_rb; @@ -61,7 +62,7 @@ struct irm_flow { struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, int port_id, - qoscube_t qc); + qosspec_t qs); void irm_flow_destroy(struct irm_flow * f); diff --git a/src/irmd/main.c b/src/irmd/main.c index 634bf4de..9504f3b5 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1264,7 +1264,7 @@ static int flow_accept(pid_t pid, static int flow_alloc(pid_t pid, const char * dst, - qoscube_t cube, + qosspec_t qs, struct timespec * timeo, struct irm_flow ** e) { @@ -1288,7 +1288,7 @@ static int flow_alloc(pid_t pid, return -EBADF; } - f = irm_flow_create(pid, ipcp->pid, port_id, cube); + f = irm_flow_create(pid, ipcp->pid, port_id, qs); if (f == NULL) { bmp_release(irmd.port_ids, port_id); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1310,7 +1310,7 @@ static int flow_alloc(pid_t pid, str_hash(ipcp->dir_hash_algo, hash, dst); if (ipcp_flow_alloc(ipcp->pid, port_id, pid, hash, - IPCP_HASH_LEN(ipcp), cube)) { + IPCP_HASH_LEN(ipcp), qs)) { /* sanitizer cleans this */ log_info("Flow_allocation failed."); free(hash); @@ -1418,7 +1418,7 @@ static pid_t auto_execute(char ** argv) static struct irm_flow * flow_req_arr(pid_t pid, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { struct reg_entry * re = NULL; struct prog_entry * a = NULL; @@ -1521,7 +1521,7 @@ static struct irm_flow * flow_req_arr(pid_t pid, return NULL; } - f = irm_flow_create(h_pid, pid, port_id, cube); + f = irm_flow_create(h_pid, pid, port_id, qs); if (f == NULL) { bmp_release(irmd.port_ids, port_id); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1993,17 +1993,19 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IRM_FLOW_ACCEPT: result = flow_accept(msg->pid, timeo, &e); if (result == 0) { + qosspec_msg_t qs_msg; ret_msg->has_port_id = true; ret_msg->port_id = e->port_id; ret_msg->has_pid = true; ret_msg->pid = e->n_1_pid; - ret_msg->has_qoscube = true; - ret_msg->qoscube = e->qc; + qs_msg = spec_to_msg(&e->qs); + ret_msg->qosspec = &qs_msg; } break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: result = flow_alloc(msg->pid, msg->dst, - msg->qoscube, timeo, &e); + msg_to_spec(msg->qosspec), + timeo, &e); if (result == 0) { ret_msg->has_port_id = true; ret_msg->port_id = e->port_id; @@ -2017,7 +2019,7 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->pid, msg->hash.data, - msg->qoscube); + msg_to_spec(msg->qosspec)); result = (e == NULL ? -1 : 0); if (result == 0) { ret_msg->has_port_id = true; @@ -2061,6 +2063,8 @@ static void * mainloop(void * o) irm_msg__pack(ret_msg, buffer.data); + /* Can't free the qosspec. */ + ret_msg->qosspec = NULL; irm_msg__free_unpacked(ret_msg, NULL); pthread_cleanup_push(close_ptr, &sfd); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index e7e07802..aa4e5bf3 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -6,6 +6,8 @@ include_directories(${CMAKE_BINARY_DIR}/include) protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto) protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto) +protobuf_generate_c(QOSSPEC_PROTO_SRCS QOSSPEC_PROTO_HDRS + qosspec.proto) protobuf_generate_c(LAYER_CONFIG_PROTO_SRCS LAYER_CONFIG_PROTO_HDRS ipcp_config.proto) protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto) @@ -214,7 +216,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in" "${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY) add_library(ouroboros-common SHARED ${SOURCE_FILES_COMMON} ${IRM_PROTO_SRCS} - ${IPCP_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) + ${IPCP_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS} ${QOSSPEC_PROTO_SRCS}) add_library(ouroboros-dev SHARED ${SOURCE_FILES_DEV} ${CACEP_PROTO_SRCS}) diff --git a/src/lib/dev.c b/src/lib/dev.c index 3d9e1d49..a92c1e42 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -44,7 +44,6 @@ #include #include #include -#include #include #include @@ -94,7 +93,6 @@ struct flow { struct shm_flow_set * set; int port_id; int oflags; - qoscube_t cube; qosspec_t spec; ssize_t part_idx; @@ -235,7 +233,6 @@ static void flow_clear(int fd) ai.flows[fd].port_id = -1; ai.flows[fd].pid = -1; - ai.flows[fd].cube = QOS_CUBE_BE; } static void flow_fini(int fd) @@ -272,7 +269,7 @@ static void flow_fini(int fd) static int flow_init(int port_id, pid_t pid, - qoscube_t qc) + qosspec_t qs) { int fd; int err = -ENOMEM; @@ -300,9 +297,8 @@ static int flow_init(int port_id, ai.flows[fd].port_id = port_id; ai.flows[fd].oflags = FLOWFDEFAULT; ai.flows[fd].pid = pid; - ai.flows[fd].cube = qc; - ai.flows[fd].spec = qos_cube_to_spec(qc); ai.flows[fd].part_idx = NO_PART; + ai.flows[fd].spec = qs; ai.ports[port_id].fd = fd; @@ -499,7 +495,6 @@ int flow_accept(qosspec_t * qs, irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg; int fd; - qoscube_t qc; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; @@ -528,14 +523,13 @@ int flow_accept(qosspec_t * qs, } if (!recv_msg->has_pid || !recv_msg->has_port_id || - !recv_msg->has_qoscube) { + recv_msg->qosspec == NULL) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - qc = recv_msg->qoscube; - - fd = flow_init(recv_msg->port_id, recv_msg->pid, recv_msg->qoscube); + fd = flow_init(recv_msg->port_id, recv_msg->pid, + msg_to_spec(recv_msg->qosspec)); irm_msg__free_unpacked(recv_msg, NULL); @@ -544,12 +538,10 @@ int flow_accept(qosspec_t * qs, pthread_rwlock_wrlock(&ai.lock); - /* FIXME: check if FRCT is needed based on qc? */ - assert(ai.flows[fd].frcti == NULL); - if (qc != QOS_CUBE_RAW) { - ai.flows[fd].frcti = frcti_create(fd, qc); + if (ai.flows[fd].spec.in_order != 0) { + ai.flows[fd].frcti = frcti_create(fd); if (ai.flows[fd].frcti == NULL) { flow_fini(fd); pthread_rwlock_unlock(&ai.lock); @@ -569,21 +561,17 @@ int flow_alloc(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - qoscube_t qc = QOS_CUBE_RAW; - int fd; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.has_qoscube = true; - msg.pid = ai.pid; - - if (qs != NULL) - qc = qos_spec_to_cube(*qs); + irm_msg_t msg = IRM_MSG__INIT; + qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT; + irm_msg_t * recv_msg; + int fd; - msg.qoscube = qc; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; + msg.dst = (char *) dst; + msg.has_pid = true; + msg.pid = ai.pid; + qs_msg = spec_to_msg(qs); + msg.qosspec = &qs_msg; if (timeo != NULL) { msg.has_timeo_sec = true; @@ -612,7 +600,8 @@ int flow_alloc(const char * dst, return -EIRMD; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + fd = flow_init(recv_msg->port_id, recv_msg->pid, + qs == NULL ? qos_raw : *qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -623,8 +612,8 @@ int flow_alloc(const char * dst, assert(ai.flows[fd].frcti == NULL); - if (qc != QOS_CUBE_RAW) { - ai.flows[fd].frcti = frcti_create(fd, qc); + if (ai.flows[fd].spec.in_order != 0) { + ai.flows[fd].frcti = frcti_create(fd); if (ai.flows[fd].frcti == NULL) { flow_fini(fd); pthread_rwlock_unlock(&ai.lock); @@ -1178,9 +1167,9 @@ int fevent(struct flow_set * set, int np1_flow_alloc(pid_t n_pid, int port_id, - qoscube_t qc) + qosspec_t qs) { - return flow_init(port_id, n_pid, qc); + return flow_init(port_id, n_pid, qs); } int np1_flow_dealloc(int port_id) @@ -1243,25 +1232,25 @@ int ipcp_create_r(pid_t pid, int ipcp_flow_req_arr(pid_t pid, const uint8_t * dst, size_t len, - qoscube_t qc) + qosspec_t qs) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg; + qosspec_msg_t qs_msg; + int fd; assert(dst != NULL); - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_pid = true; - msg.pid = pid; - msg.has_hash = true; - msg.hash.len = len; - msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = qc; + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_pid = true; + msg.pid = pid; + msg.has_hash = true; + msg.hash.len = len; + msg.hash.data = (uint8_t *) dst; + qs_msg = spec_to_msg(&qs); + msg.qosspec = &qs_msg; recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) return -EIRMD; @@ -1275,7 +1264,7 @@ int ipcp_flow_req_arr(pid_t pid, return -1; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + fd = flow_init(recv_msg->port_id, recv_msg->pid, qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -1457,7 +1446,7 @@ int ipcp_flow_get_qoscube(int fd, assert(ai.flows[fd].port_id >= 0); - *cube = ai.flows[fd].cube; + *cube = qos_spec_to_cube(ai.flows[fd].spec); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/frct.c b/src/lib/frct.c index 516c958b..e9acb1dc 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -101,8 +101,7 @@ static void frct_fini(void) timerwheel_destroy(frct.tw); } -static struct frcti * frcti_create(int fd, - qoscube_t qc) +static struct frcti * frcti_create(int fd) { struct frcti * frcti; time_t delta_t; @@ -133,7 +132,7 @@ static struct frcti * frcti_create(int fd, frcti->snd_cr.inact = 3 * delta_t; frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - if (qc == QOS_CUBE_DATA) + if (ai.flows[fd].spec.loss == 0) frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.inact = 2 * delta_t; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index 454af0dc..48198a5b 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -23,6 +23,7 @@ syntax = "proto2"; import "ipcp_config.proto"; +import "qosspec.proto"; enum ipcp_msg_code { IPCP_BOOTSTRAP = 1; @@ -43,7 +44,7 @@ message ipcp_msg { optional bytes hash = 2; optional int32 port_id = 3; optional string dst = 4; - optional uint32 qoscube = 5; + optional qosspec_msg qosspec = 5; optional ipcp_config_msg conf = 6; optional int32 pid = 7; optional layer_info_msg layer_info = 8; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 16dfe828..2ed2ec37 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -23,6 +23,7 @@ syntax = "proto2"; import "ipcp_config.proto"; +import "qosspec.proto"; enum irm_msg_code { IRM_CREATE_IPCP = 1; @@ -67,7 +68,7 @@ message irm_msg { optional string dst = 9; optional bytes hash = 10; optional sint32 port_id = 11; - optional sint32 qoscube = 12; + optional qosspec_msg qosspec = 12; optional ipcp_config_msg conf = 13; optional uint32 opts = 14; repeated ipcp_info_msg ipcps = 15; diff --git a/src/lib/qos.c b/src/lib/qos.c index bee6ed71..8607031e 100644 --- a/src/lib/qos.c +++ b/src/lib/qos.c @@ -28,66 +28,61 @@ #include qosspec_t qos_raw = { - .delay = UINT32_MAX, - .bandwidth = 0, - .availability = 0, - .loss = 1, - .in_order = 0, - .maximum_interruption = UINT32_MAX + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 1, + .in_order = 0, + .max_gap = UINT32_MAX +}; + +qosspec_t qos_raw_no_errors = { + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 0, + .in_order = 0, + .max_gap = UINT32_MAX }; qosspec_t qos_best_effort = { - .delay = UINT32_MAX, - .bandwidth = 0, - .availability = 0, - .loss = 1, - .in_order = 1, - .maximum_interruption = UINT32_MAX + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = UINT32_MAX }; -qosspec_t qos_video = { - .delay = 100, - .bandwidth = UINT64_MAX, - .availability = 3, - .loss = 1, - .in_order = 1, - .maximum_interruption = 100 +qosspec_t qos_video = { + .delay = 100, + .bandwidth = UINT64_MAX, + .availability = 3, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = 100 }; qosspec_t qos_voice = { - .delay = 50, - .bandwidth = 100000, - .availability = 5, - .loss = 1, - .in_order = 1, - .maximum_interruption = 50 + .delay = 50, + .bandwidth = 100000, + .availability = 5, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = 50 }; qosspec_t qos_data = { - .delay = 1000, - .bandwidth = 0, - .availability = 0, - .in_order = 1, - .loss = 0, - .maximum_interruption = 2000 + .delay = 1000, + .bandwidth = 0, + .availability = 0, + .loss = 0, + .ber = 0, + .in_order = 1, + .max_gap = 2000 }; - -int qosspec_init(qosspec_t * qs) -{ - if (qs == NULL) - return -EINVAL; - - *qs = qos_best_effort; - - return 0; -} - -int qosspec_fini(qosspec_t * qs) -{ - if (qs == NULL) - return -EINVAL; - - memset(qs, 0, sizeof(*qs)); - - return 0; -} diff --git a/src/lib/qoscube.c b/src/lib/qoscube.c index 5dfa35ad..efca0e42 100644 --- a/src/lib/qoscube.c +++ b/src/lib/qoscube.c @@ -25,38 +25,20 @@ #include + + qoscube_t qos_spec_to_cube(qosspec_t qs) { - if (qs.loss == 0) - return QOS_CUBE_DATA; - else if (qs.delay <= qos_voice.delay && + if (qs.delay <= qos_voice.delay && qs.bandwidth <= qos_voice.bandwidth && qs.availability >= qos_voice.availability && - qs.maximum_interruption <= qos_voice.maximum_interruption) + qs.max_gap <= qos_voice.max_gap) return QOS_CUBE_VOICE; else if (qs.delay <= qos_video.delay && qs.bandwidth <= qos_video.bandwidth && qs.availability >= qos_video.availability && - qs.maximum_interruption <= qos_video.maximum_interruption) + qs.max_gap <= qos_video.max_gap) return QOS_CUBE_VIDEO; - else if (qs.in_order == 1) - return QOS_CUBE_BE; else - return QOS_CUBE_RAW; -} - -qosspec_t qos_cube_to_spec(qoscube_t qc) -{ - switch (qc) { - case QOS_CUBE_VOICE: - return qos_voice; - case QOS_CUBE_VIDEO: - return qos_video; - case QOS_CUBE_BE: - return qos_best_effort; - case QOS_CUBE_DATA: - return qos_data; - default: - return qos_raw; - } + return QOS_CUBE_BE; } diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto new file mode 100644 index 00000000..f355e345 --- /dev/null +++ b/src/lib/qosspec.proto @@ -0,0 +1,33 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * QoS specification message + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +syntax = "proto2"; + +message qosspec_msg { + required uint32 delay = 1; /* In ms */ + required uint64 bandwidth = 2; /* In bits/s */ + required uint32 availability = 3; /* Class of 9s */ + required uint32 loss = 4; /* Packet loss */ + required uint32 ber = 5; /* Bit error rate, ppb */ + required uint32 in_order = 6; /* In-order delivery */ + required uint32 max_gap = 7; /* In ms */ +}; diff --git a/src/lib/sockets.c b/src/lib/sockets.c index b148b7ca..85726783 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -165,3 +165,38 @@ char * ipcp_sock_path(pid_t pid) return full_name; } + +qosspec_msg_t spec_to_msg(qosspec_t * qs) +{ + qosspec_t spec; + qosspec_msg_t msg = QOSSPEC_MSG__INIT; + + spec = (qs == NULL ? qos_raw : *qs); + + msg.delay = spec.delay; + msg.bandwidth = spec.bandwidth; + msg.availability = spec.availability; + msg.loss = spec.loss; + msg.ber = spec.ber; + msg.in_order = spec.in_order; + msg.max_gap = spec.max_gap; + + return msg; +} + +qosspec_t msg_to_spec(qosspec_msg_t * msg) +{ + qosspec_t spec; + + assert(msg); + + spec.delay = msg->delay; + spec.bandwidth = msg->bandwidth; + spec.availability = msg->availability; + spec.loss = msg->loss; + spec.ber = msg->ber; + spec.in_order = msg->in_order; + spec.max_gap = msg->max_gap; + + return spec; +} -- cgit v1.2.3 From 5d11a6ad590133c92925c6162eb47b4401f16bef Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Fri, 5 Oct 2018 10:24:01 +0200 Subject: ipcpd, lib, irmd, tools: Change SDU to packet This will change SDU (Service Data Unit) to packet everywhere. SDU is OSI terminology, whereas packet is Ouroboros terminology. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- doc/man/ouroboros-tutorial.7 | 2 +- include/ouroboros/shm_rbuff.h | 2 +- include/ouroboros/timerwheel.h | 2 +- src/ipcpd/eth/eth.c | 46 ++++---- src/ipcpd/local/main.c | 12 +- src/ipcpd/normal/CMakeLists.txt | 2 +- src/ipcpd/normal/dht.c | 48 ++++---- src/ipcpd/normal/dt.c | 52 ++++----- src/ipcpd/normal/dt.h | 8 +- src/ipcpd/normal/fa.c | 36 +++--- src/ipcpd/normal/packet_sched.c | 241 ++++++++++++++++++++++++++++++++++++++++ src/ipcpd/normal/packet_sched.h | 43 +++++++ src/ipcpd/normal/sdu_sched.c | 241 ---------------------------------------- src/ipcpd/normal/sdu_sched.h | 43 ------- src/ipcpd/udp/main.c | 64 +++++------ src/irmd/main.c | 2 +- src/lib/CMakeLists.txt | 6 +- src/lib/dev.c | 14 +-- src/lib/frct.c | 2 +- src/lib/shm_rbuff.c | 6 +- src/lib/shm_rbuff_ll.c | 2 +- src/lib/shm_rbuff_pthr.c | 2 +- src/lib/shm_rdrbuff.c | 2 +- src/tools/ocbr/ocbr.c | 10 +- src/tools/ocbr/ocbr_client.c | 2 +- src/tools/ocbr/ocbr_server.c | 17 +-- src/tools/oecho/oecho.c | 8 +- src/tools/operf/operf.c | 4 +- src/tools/operf/operf_client.c | 9 +- src/tools/oping/oping_client.c | 4 +- 30 files changed, 468 insertions(+), 464 deletions(-) create mode 100644 src/ipcpd/normal/packet_sched.c create mode 100644 src/ipcpd/normal/packet_sched.h delete mode 100644 src/ipcpd/normal/sdu_sched.c delete mode 100644 src/ipcpd/normal/sdu_sched.h (limited to 'src/irmd') diff --git a/doc/man/ouroboros-tutorial.7 b/doc/man/ouroboros-tutorial.7 index 98e27254..76fa0068 100644 --- a/doc/man/ouroboros-tutorial.7 +++ b/doc/man/ouroboros-tutorial.7 @@ -108,7 +108,7 @@ Pinging my.oping.server with 64 bytes of data: --- my.oping.server ping statistics --- .br -3 SDUs transmitted, 3 received, 0% packet loss, time: 3001.011 ms +3 packets transmitted, 3 received, 0% packet loss, time: 3001.011 ms .br rtt min/avg/max/mdev = 0.304/0.392/0.475/0.086 ms .RE diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index b2e27c7b..223f6bf4 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h index 231e8103..34994185 100644 --- a/include/ouroboros/timerwheel.h +++ b/include/ouroboros/timerwheel.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Timerwheel * * Dimitri Staessens * Sander Vrijders diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 6fd7b805..1bbfac5b 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -123,7 +123,7 @@ #define DIX_HEADER_SIZE (DIX_EID_SIZE + DIX_LENGTH_SIZE) #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + DIX_HEADER_SIZE) #define MAX_EIDS (1 << (8 * DIX_EID_SIZE)) -#define ETH_MAX_SDU_SIZE (ETH_MTU - DIX_HEADER_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - DIX_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #elif defined(BUILD_ETH_LLC) #define THIS_TYPE IPCP_ETH_LLC @@ -131,7 +131,7 @@ #define LLC_HEADER_SIZE 3 #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE) #define MAX_SAPS 64 -#define ETH_MAX_SDU_SIZE (ETH_MTU - LLC_HEADER_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - LLC_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #endif @@ -230,8 +230,8 @@ struct { fset_t * np1_flows; pthread_rwlock_t flows_lock; - pthread_t sdu_writer[IPCP_ETH_WR_THR]; - pthread_t sdu_reader[IPCP_ETH_RD_THR]; + pthread_t packet_writer[IPCP_ETH_WR_THR]; + pthread_t packet_reader[IPCP_ETH_RD_THR]; #ifdef __linux__ pthread_t if_monitor; @@ -383,7 +383,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, assert(frame); - if (len > (size_t) ETH_MAX_SDU_SIZE) + if (len > (size_t) ETH_MAX_PACKET_SIZE) return -1; e_frame = (struct eth_frame *) frame; @@ -808,7 +808,7 @@ static void * eth_ipcp_mgmt_handler(void * o) return (void *) 0; } -static void * eth_ipcp_sdu_reader(void * o) +static void * eth_ipcp_packet_reader(void * o) { uint8_t br_addr[MAC_SIZE]; #if defined(BUILD_ETH_DIX) @@ -992,7 +992,7 @@ static void cleanup_writer(void * o) fqueue_destroy((fqueue_t *) o); } -static void * eth_ipcp_sdu_writer(void * o) +static void * eth_ipcp_packet_writer(void * o) { int fd; struct shm_du_buff * sdb; @@ -1443,22 +1443,22 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) } for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { - if (pthread_create(ð_data.sdu_reader[idx], + if (pthread_create(ð_data.packet_reader[idx], NULL, - eth_ipcp_sdu_reader, + eth_ipcp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } } for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { - if (pthread_create(ð_data.sdu_writer[idx], + if (pthread_create(ð_data.packet_writer[idx], NULL, - eth_ipcp_sdu_writer, + eth_ipcp_packet_writer, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_writer; + goto fail_packet_writer; } } @@ -1472,16 +1472,16 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sdu_writer: + fail_packet_writer: while (idx > 0) { - pthread_cancel(eth_data.sdu_writer[--idx]); - pthread_join(eth_data.sdu_writer[idx], NULL); + pthread_cancel(eth_data.packet_writer[--idx]); + pthread_join(eth_data.packet_writer[idx], NULL); } idx = IPCP_ETH_RD_THR; - fail_sdu_reader: + fail_packet_reader: while (idx > 0) { - pthread_cancel(eth_data.sdu_reader[--idx]); - pthread_join(eth_data.sdu_reader[idx], NULL); + pthread_cancel(eth_data.packet_reader[--idx]); + pthread_join(eth_data.packet_reader[idx], NULL); } pthread_cancel(eth_data.mgmt_handler); pthread_join(eth_data.mgmt_handler, NULL); @@ -1792,18 +1792,18 @@ int main(int argc, if (ipcp_get_state() == IPCP_SHUTDOWN) { for (i = 0; i < IPCP_ETH_WR_THR; ++i) - pthread_cancel(eth_data.sdu_writer[i]); + pthread_cancel(eth_data.packet_writer[i]); for (i = 0; i < IPCP_ETH_RD_THR; ++i) - pthread_cancel(eth_data.sdu_reader[i]); + pthread_cancel(eth_data.packet_reader[i]); pthread_cancel(eth_data.mgmt_handler); #ifdef __linux__ pthread_cancel(eth_data.if_monitor); #endif for (i = 0; i < IPCP_ETH_WR_THR; ++i) - pthread_join(eth_data.sdu_writer[i], NULL); + pthread_join(eth_data.packet_writer[i], NULL); for (i = 0; i < IPCP_ETH_RD_THR; ++i) - pthread_join(eth_data.sdu_reader[i], NULL); + pthread_join(eth_data.packet_reader[i], NULL); pthread_join(eth_data.mgmt_handler, NULL); #ifdef __linux__ diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 8eae7503..ab43f1f8 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -59,7 +59,7 @@ struct { fqueue_t * fq; pthread_rwlock_t lock; - pthread_t sduloop; + pthread_t packet_loop; } local_data; static int local_data_init(void) @@ -97,7 +97,7 @@ static void local_data_fini(void){ pthread_rwlock_destroy(&local_data.lock); } -static void * ipcp_local_sdu_loop(void * o) +static void * ipcp_local_packet_loop(void * o) { (void) o; @@ -139,8 +139,8 @@ static int ipcp_local_bootstrap(const struct ipcp_config * conf) ipcp_set_state(IPCP_OPERATIONAL); - if (pthread_create(&local_data.sduloop, NULL, - ipcp_local_sdu_loop, NULL)) { + if (pthread_create(&local_data.packet_loop, NULL, + ipcp_local_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); return -1; } @@ -364,8 +364,8 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(local_data.sduloop); - pthread_join(local_data.sduloop, NULL); + pthread_cancel(local_data.packet_loop); + pthread_join(local_data.packet_loop, NULL); } local_data_fini(); diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 1cba7630..0cb7b770 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -42,7 +42,7 @@ set(SOURCE_FILES main.c pff.c routing.c - sdu_sched.c + packet_sched.c # Add policies last pol/alternate_pff.c pol/flat.c diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index a2fa4863..4064bf5c 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -62,21 +62,21 @@ typedef KadContactMsg kad_contact_msg_t; #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ -#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ -#define KAD_K 8 /* Replication factor, MDHT value. */ -#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ -#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 8 /* Response time to wait for a join. */ -#define KAD_T_RESP 5 /* Response time to wait for a response. */ -#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ -#define KAD_QUEER 15 /* Time to declare peer questionable. */ -#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ -#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ -#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ -#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ -#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 8 /* Response time to wait for a join. */ +#define KAD_T_RESP 5 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ +#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ +#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ +#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ enum dht_state { DHT_INIT = 0, @@ -251,7 +251,7 @@ struct join_info { uint64_t addr; }; -struct sdu_info { +struct packet_info { struct dht * dht; struct shm_du_buff * sdb; }; @@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht, kad_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) + if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) break; ipcp_sdb_release(sdb); @@ -2400,7 +2400,7 @@ uint64_t dht_query(struct dht * dht, return 0; } -static void * dht_handle_sdu(void * o) +static void * dht_handle_packet(void * o) { struct dht * dht = (struct dht *) o; @@ -2584,8 +2584,8 @@ static void * dht_handle_sdu(void * o) return (void *) 0; } -static void dht_post_sdu(void * comp, - struct shm_du_buff * sdb) +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) { struct cmd * cmd; struct dht * dht = (struct dht *) comp; @@ -2800,19 +2800,19 @@ struct dht * dht_create(uint64_t addr) dht->addr = addr; dht->id = NULL; #ifndef __DHT_TEST__ - dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); + dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); if (dht->tpm == NULL) goto fail_tpm_create; if (tpm_start(dht->tpm)) goto fail_tpm_start; - dht->fd = dt_reg_comp(dht, &dht_post_sdu, DHT); + dht->fd = dt_reg_comp(dht, &dht_post_packet, DHT); notifier_reg(handle_event, dht); #else (void) handle_event; - (void) dht_handle_sdu; - (void) dht_post_sdu; + (void) dht_handle_packet; + (void) dht_post_packet; #endif dht->state = DHT_INIT; diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index a350e4be..08c937e7 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -46,7 +46,7 @@ #include "dt.h" #include "pff.h" #include "routing.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "comp.h" #include "fa.h" @@ -65,7 +65,7 @@ #endif struct comp_info { - void (* post_sdu)(void * comp, struct shm_du_buff * sdb); + void (* post_packet)(void * comp, struct shm_du_buff * sdb); void * comp; char * name; }; @@ -154,7 +154,7 @@ static void dt_pci_shrink(struct shm_du_buff * sdb) } struct { - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; @@ -421,24 +421,25 @@ static void handle_event(void * self, #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, c->conn_info.addr); #endif - sdu_sched_add(dt.sdu_sched, c->flow_info.fd); - log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd); + packet_sched_add(dt.packet_sched, c->flow_info.fd); + log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); break; case NOTIFY_DT_CONN_DEL: #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, INVALID_ADDR); #endif - sdu_sched_del(dt.sdu_sched, c->flow_info.fd); - log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd); + packet_sched_del(dt.packet_sched, c->flow_info.fd); + log_dbg("Removed fd %d from " + "packet scheduler.", c->flow_info.fd); break; default: break; } } -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { struct dt_pci dt_pci; int ret; @@ -491,7 +492,7 @@ static void sdu_handler(int fd, ret = ipcp_flow_write(ofd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", ofd); + log_dbg("Failed to write packet to fd %d.", ofd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); ipcp_sdb_release(sdb); @@ -560,7 +561,7 @@ static void sdu_handler(int fd, return; } - if (dt.comps[dt_pci.eid].post_sdu == NULL) { + if (dt.comps[dt_pci.eid].post_packet == NULL) { log_err("No registered component on eid %d.", dt_pci.eid); ipcp_sdb_release(sdb); @@ -596,7 +597,8 @@ static void sdu_handler(int fd, pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); #endif - dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb); + dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, + sdb); } } @@ -761,15 +763,15 @@ void dt_fini(void) int dt_start(void) { - dt.sdu_sched = sdu_sched_create(sdu_handler); - if (dt.sdu_sched == NULL) { - log_err("Failed to create N-1 SDU scheduler."); + dt.packet_sched = packet_sched_create(packet_handler); + if (dt.packet_sched == NULL) { + log_err("Failed to create N-1 packet scheduler."); return -1; } if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); return -1; } @@ -780,7 +782,7 @@ void dt_stop(void) { pthread_cancel(dt.listener); pthread_join(dt.listener, NULL); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); } int dt_reg_comp(void * comp, @@ -800,11 +802,11 @@ int dt_reg_comp(void * comp, return -EBADF; } - assert(dt.comps[res_fd].post_sdu == NULL); + assert(dt.comps[res_fd].post_packet == NULL); assert(dt.comps[res_fd].comp == NULL); assert(dt.comps[res_fd].name == NULL); - dt.comps[res_fd].post_sdu = func; + dt.comps[res_fd].post_packet = func; dt.comps[res_fd].comp = comp; dt.comps[res_fd].name = name; @@ -815,10 +817,10 @@ int dt_reg_comp(void * comp, return res_fd; } -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int np1_fd, - struct shm_du_buff * sdb) +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int np1_fd, + struct shm_du_buff * sdb) { int fd; struct dt_pci dt_pci; @@ -863,7 +865,7 @@ int dt_write_sdu(uint64_t dst_addr, #endif ret = ipcp_flow_write(fd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", fd); + log_dbg("Failed to write packet to fd %d.", fd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); goto fail_write; diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index a17098b7..05b8220c 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -47,9 +47,9 @@ int dt_reg_comp(void * comp, void (* func)(void * comp, struct shm_du_buff * sdb), char * name); -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int res_fd, - struct shm_du_buff * sdb); +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int res_fd, + struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 4c82e0e0..d67ba61e 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -39,7 +39,7 @@ #include "dir.h" #include "fa.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "ipcp.h" #include "dt.h" @@ -74,19 +74,19 @@ struct { uint64_t r_addr[PROG_MAX_FLOWS]; int fd; - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; } fa; -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { pthread_rwlock_rdlock(&fa.flows_lock); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); - log_warn("Failed to forward SDU."); + log_warn("Failed to forward packet."); return; } @@ -99,7 +99,7 @@ static void destroy_conn(int fd) fa.r_addr[fd] = INVALID_ADDR; } -static void fa_post_sdu(void * comp, +static void fa_post_packet(void * comp, struct shm_du_buff * sdb) { struct timespec ts = {0, TIMEOUT * 1000}; @@ -192,7 +192,7 @@ static void fa_post_sdu(void * comp, if (msg->response < 0) destroy_conn(ntoh32(msg->r_eid)); else - sdu_sched_add(fa.sdu_sched, ntoh32(msg->r_eid)); + packet_sched_add(fa.packet_sched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -215,7 +215,7 @@ int fa_init(void) if (pthread_rwlock_init(&fa.flows_lock, NULL)) return -1; - fa.fd = dt_reg_comp(&fa, &fa_post_sdu, FA); + fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); return 0; } @@ -227,9 +227,9 @@ void fa_fini(void) int fa_start(void) { - fa.sdu_sched = sdu_sched_create(sdu_handler); - if (fa.sdu_sched == NULL) { - log_err("Failed to create SDU scheduler."); + fa.packet_sched = packet_sched_create(packet_handler); + if (fa.packet_sched == NULL) { + log_err("Failed to create packet scheduler."); return -1; } @@ -238,7 +238,7 @@ int fa_start(void) void fa_stop(void) { - sdu_sched_destroy(fa.sdu_sched); + packet_sched_destroy(fa.packet_sched); } int fa_alloc(int fd, @@ -273,7 +273,7 @@ int fa_alloc(int fd, qc = qos_spec_to_cube(qs); - if (dt_write_sdu(addr, qc, fa.fd, sdb)) { + if (dt_write_packet(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -335,14 +335,14 @@ int fa_alloc_resp(int fd, destroy_conn(fd); ipcp_sdb_release(sdb); } else { - sdu_sched_add(fa.sdu_sched, fd); + packet_sched_add(fa.packet_sched, fd); } ipcp_flow_get_qoscube(fd, &qc); assert(qc >= 0 && qc < QOS_CUBE_MAX); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.fd, sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { destroy_conn(fd); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); @@ -360,7 +360,7 @@ int fa_dealloc(int fd) pthread_rwlock_wrlock(&fa.flows_lock); - sdu_sched_del(fa.sdu_sched, fd); + packet_sched_del(fa.packet_sched, fd); destroy_conn(fd); diff --git a/src/ipcpd/normal/packet_sched.c b/src/ipcpd/normal/packet_sched.c new file mode 100644 index 00000000..fc01fb32 --- /dev/null +++ b/src/ipcpd/normal/packet_sched.c @@ -0,0 +1,241 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include +#include + +#include "ipcp.h" +#include "packet_sched.h" +#include "connmgr.h" + +#include +#include +#include +#include +#include + +static int qos_prio [] = { + QOS_PRIO_RAW, + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE, + QOS_PRIO_DATA +}; + +struct packet_sched { + fset_t * set[QOS_CUBE_MAX]; + next_packet_fn_t callback; + pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { + struct packet_sched * sch; + qoscube_t qc; +}; + +static void cleanup_reader(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ + struct packet_sched * sched; + struct shm_du_buff * sdb; + int fd; + fqueue_t * fq; + qoscube_t qc; + + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; + + ipcp_lock_to_core(); + + free(o); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_reader, fq); + + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) + continue; + + while ((fd = fqueue_next(fq)) >= 0) { + switch (fqueue_type(fq)) { + case FLOW_DEALLOC: + notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); + break; + case FLOW_DOWN: + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + break; + case FLOW_UP: + notifier_event(NOTIFY_DT_FLOW_UP, &fd); + break; + case FLOW_PKT: + if (ipcp_flow_read(fd, &sdb)) + continue; + + sched->callback(fd, qc, sdb); + break; + default: + break; + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +struct packet_sched * packet_sched_create(next_packet_fn_t callback) +{ + struct packet_sched * packet_sched; + struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; + int i; + int j; + + assert(callback); + + packet_sched = malloc(sizeof(*packet_sched)); + if (packet_sched == NULL) + goto fail_malloc; + + packet_sched->callback = callback; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + packet_sched->set[i] = fset_create(); + if (packet_sched->set[i] == NULL) { + for (j = 0; j < i; ++j) + fset_destroy(packet_sched->set[j]); + goto fail_flow_set; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = packet_sched; + infos[i]->qc = i % QOS_CUBE_MAX; + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + if (pthread_create(&packet_sched->readers[i], NULL, + packet_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(packet_sched->readers[j], NULL); + for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + free(infos[i]); + goto fail_infos; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + + (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + + if (pthread_setschedparam(packet_sched->readers[i], pol, &par)) + goto fail_sched; + } + + return packet_sched; + + fail_sched: + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_join(packet_sched->readers[j], NULL); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(packet_sched->set[j]); + fail_flow_set: + free(packet_sched); + fail_malloc: + return NULL; +} + +void packet_sched_destroy(struct packet_sched * packet_sched) +{ + int i; + + assert(packet_sched); + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + pthread_cancel(packet_sched->readers[i]); + pthread_join(packet_sched->readers[i], NULL); + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fset_destroy(packet_sched->set[i]); + + free(packet_sched); +} + +void packet_sched_add(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_add(packet_sched->set[qc], fd); +} + +void packet_sched_del(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_del(packet_sched->set[qc], fd); +} diff --git a/src/ipcpd/normal/packet_sched.h b/src/ipcpd/normal/packet_sched.h new file mode 100644 index 00000000..13ff400d --- /dev/null +++ b/src/ipcpd/normal/packet_sched.h @@ -0,0 +1,43 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H +#define OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H + +#include +#include + +typedef void (* next_packet_fn_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); + +struct packet_sched * packet_sched_create(next_packet_fn_t callback); + +void packet_sched_destroy(struct packet_sched * packet_sched); + +void packet_sched_add(struct packet_sched * packet_sched, + int fd); + +void packet_sched_del(struct packet_sched * packet_sched, + int fd); + +#endif /* OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H */ diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c deleted file mode 100644 index e6d705fb..00000000 --- a/src/ipcpd/normal/sdu_sched.c +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L -#endif - -#include "config.h" - -#include -#include - -#include "ipcp.h" -#include "sdu_sched.h" -#include "connmgr.h" - -#include -#include -#include -#include -#include - -static int qos_prio [] = { - QOS_PRIO_RAW, - QOS_PRIO_BE, - QOS_PRIO_VIDEO, - QOS_PRIO_VOICE, - QOS_PRIO_DATA -}; - -struct sdu_sched { - fset_t * set[QOS_CUBE_MAX]; - next_sdu_fn_t callback; - pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; -}; - -struct sched_info { - struct sdu_sched * sch; - qoscube_t qc; -}; - -static void cleanup_reader(void * o) -{ - fqueue_destroy((fqueue_t *) o); -} - -static void * sdu_reader(void * o) -{ - struct sdu_sched * sched; - struct shm_du_buff * sdb; - int fd; - fqueue_t * fq; - qoscube_t qc; - - sched = ((struct sched_info *) o)->sch; - qc = ((struct sched_info *) o)->qc; - - ipcp_lock_to_core(); - - free(o); - - fq = fqueue_create(); - if (fq == NULL) - return (void *) -1; - - pthread_cleanup_push(cleanup_reader, fq); - - while (true) { - int ret = fevent(sched->set[qc], fq, NULL); - if (ret < 0) - continue; - - while ((fd = fqueue_next(fq)) >= 0) { - switch (fqueue_type(fq)) { - case FLOW_DEALLOC: - notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); - break; - case FLOW_DOWN: - notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); - break; - case FLOW_UP: - notifier_event(NOTIFY_DT_FLOW_UP, &fd); - break; - case FLOW_PKT: - if (ipcp_flow_read(fd, &sdb)) - continue; - - sched->callback(fd, qc, sdb); - break; - default: - break; - } - } - } - - pthread_cleanup_pop(true); - - return (void *) 0; -} - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback) -{ - struct sdu_sched * sdu_sched; - struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; - int i; - int j; - - assert(callback); - - sdu_sched = malloc(sizeof(*sdu_sched)); - if (sdu_sched == NULL) - goto fail_malloc; - - sdu_sched->callback = callback; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sdu_sched->set[i] = fset_create(); - if (sdu_sched->set[i] == NULL) { - for (j = 0; j < i; ++j) - fset_destroy(sdu_sched->set[j]); - goto fail_flow_set; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - infos[i] = malloc(sizeof(*infos[i])); - if (infos[i] == NULL) { - for (j = 0; j < i; ++j) - free(infos[j]); - goto fail_infos; - } - infos[i]->sch = sdu_sched; - infos[i]->qc = i % QOS_CUBE_MAX; - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - if (pthread_create(&sdu_sched->readers[i], NULL, - sdu_reader, infos[i])) { - for (j = 0; j < i; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < i; ++j) - pthread_join(sdu_sched->readers[j], NULL); - for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - free(infos[i]); - goto fail_infos; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - struct sched_param par; - int pol = SCHED_RR; - int min; - int max; - - min = sched_get_priority_min(pol); - max = sched_get_priority_max(pol); - - min = (max - min) / 2; - - par.sched_priority = min + - (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); - - if (pthread_setschedparam(sdu_sched->readers[i], pol, &par)) - goto fail_sched; - } - - return sdu_sched; - - fail_sched: - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_join(sdu_sched->readers[j], NULL); - fail_infos: - for (j = 0; j < QOS_CUBE_MAX; ++j) - fset_destroy(sdu_sched->set[j]); - fail_flow_set: - free(sdu_sched); - fail_malloc: - return NULL; -} - -void sdu_sched_destroy(struct sdu_sched * sdu_sched) -{ - int i; - - assert(sdu_sched); - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - pthread_cancel(sdu_sched->readers[i]); - pthread_join(sdu_sched->readers[i], NULL); - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) - fset_destroy(sdu_sched->set[i]); - - free(sdu_sched); -} - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_add(sdu_sched->set[qc], fd); -} - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_del(sdu_sched->set[qc], fd); -} diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h deleted file mode 100644 index cdbda272..00000000 --- a/src/ipcpd/normal/sdu_sched.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H -#define OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H - -#include -#include - -typedef void (* next_sdu_fn_t)(int fd, - qoscube_t qc, - struct shm_du_buff * sdb); - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback); - -void sdu_sched_destroy(struct sdu_sched * sdu_sched); - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd); - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd); - -#endif /* OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H */ diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 96820662..a1af1e85 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -54,20 +54,20 @@ #include #include -#define FLOW_REQ 1 -#define FLOW_REPLY 2 +#define FLOW_REQ 1 +#define FLOW_REPLY 2 -#define THIS_TYPE IPCP_UDP -#define LISTEN_PORT htons(0x0D1F) -#define SHIM_UDP_BUF_SIZE 256 -#define SHIM_UDP_MSG_SIZE 256 -#define SHIM_UDP_MAX_SDU_SIZE 8980 -#define DNS_TTL 86400 -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ +#define THIS_TYPE IPCP_UDP +#define LISTEN_PORT htons(0x0D1F) +#define SHIM_UDP_BUF_SIZE 256 +#define SHIM_UDP_MSG_SIZE 256 +#define SHIM_UDP_MAX_PACKET_SIZE 8980 +#define DNS_TTL 86400 +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ -#define local_ip (udp_data.s_saddr.sin_addr.s_addr) +#define local_ip (udp_data.s_saddr.sin_addr.s_addr) -#define UDP_MAX_PORTS 0xFFFF +#define UDP_MAX_PORTS 0xFFFF struct mgmt_msg { uint16_t src_udp_port; @@ -106,9 +106,9 @@ struct { struct uf fd_to_uf[SYS_MAX_FLOWS]; pthread_rwlock_t flows_lock; - pthread_t sduloop; + pthread_t packet_loop; pthread_t handler; - pthread_t sdu_reader; + pthread_t packet_reader; bool fd_set_mod; pthread_cond_t fd_set_cond; @@ -495,13 +495,13 @@ static void * ipcp_udp_listener(void * o) return 0; } -static void * ipcp_udp_sdu_reader(void * o) +static void * ipcp_udp_packet_reader(void * o) { ssize_t n; int skfd; int fd; /* FIXME: avoid this copy */ - char buf[SHIM_UDP_MAX_SDU_SIZE]; + char buf[SHIM_UDP_MAX_PACKET_SIZE]; struct sockaddr_in r_saddr; struct timeval tv = {0, FD_UPDATE_TIMEOUT}; fd_set read_fds; @@ -533,7 +533,7 @@ static void * ipcp_udp_sdu_reader(void * o) n = sizeof(r_saddr); if ((n = recvfrom(skfd, &buf, - SHIM_UDP_MAX_SDU_SIZE, + SHIM_UDP_MAX_PACKET_SIZE, 0, (struct sockaddr *) &r_saddr, (unsigned *) &n)) <= 0) @@ -552,7 +552,7 @@ static void * ipcp_udp_sdu_reader(void * o) return (void *) 0; } -static void * ipcp_udp_sdu_loop(void * o) +static void * ipcp_udp_packet_loop(void * o) { int fd; struct shm_du_buff * sdb; @@ -582,7 +582,7 @@ static void * ipcp_udp_sdu_loop(void * o) if (send(fd, shm_du_buff_head(sdb), shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), 0) < 0) - log_err("Failed to send SDU."); + log_err("Failed to send PACKET."); pthread_cleanup_pop(true); } @@ -666,20 +666,20 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) goto fail_bind; } - if (pthread_create(&udp_data.sdu_reader, + if (pthread_create(&udp_data.packet_reader, NULL, - ipcp_udp_sdu_reader, + ipcp_udp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } - if (pthread_create(&udp_data.sduloop, + if (pthread_create(&udp_data.packet_loop, NULL, - ipcp_udp_sdu_loop, + ipcp_udp_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sduloop; + goto fail_packet_loop; } log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid()); @@ -688,10 +688,10 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sduloop: - pthread_cancel(udp_data.sdu_reader); - pthread_join(udp_data.sdu_reader, NULL); - fail_sdu_reader: + fail_packet_loop: + pthread_cancel(udp_data.packet_reader); + pthread_join(udp_data.packet_reader, NULL); + fail_packet_reader: pthread_cancel(udp_data.handler); pthread_join(udp_data.handler, NULL); fail_bind: @@ -1222,13 +1222,13 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(udp_data.sduloop); + pthread_cancel(udp_data.packet_loop); pthread_cancel(udp_data.handler); - pthread_cancel(udp_data.sdu_reader); + pthread_cancel(udp_data.packet_reader); - pthread_join(udp_data.sduloop, NULL); + pthread_join(udp_data.packet_loop, NULL); pthread_join(udp_data.handler, NULL); - pthread_join(udp_data.sdu_reader, NULL); + pthread_join(udp_data.packet_reader, NULL); } udp_data_fini(); diff --git a/src/irmd/main.c b/src/irmd/main.c index 9504f3b5..427e09d1 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -123,7 +123,7 @@ struct { pthread_rwlock_t flows_lock; /* lock for flows */ struct lockfile * lf; /* single irmd per system */ - struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + struct shm_rdrbuff * rdrb; /* rdrbuff for packets */ int sockfd; /* UNIX socket */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index aa4e5bf3..84a9fd72 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -138,7 +138,7 @@ mark_as_advanced(LIBRT_LIBRARIES LIBPTHREAD_LIBRARIES LIBGCRYPT_INCLUDE_DIR SYS_RND_HDR) set(SHM_BUFFER_SIZE 4096 CACHE STRING - "Number of blocks in SDU buffer, must be a power of 2") + "Number of blocks in packet buffer, must be a power of 2") set(SYS_MAX_FLOWS 10240 CACHE STRING "Maximum number of total flows for this system") set(PROG_MAX_FLOWS 4096 CACHE STRING @@ -171,9 +171,9 @@ set(SHM_FLOW_SET_PREFIX "/${SHM_PREFIX}.set." CACHE INTERNAL set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL "Name for the main POSIX shared memory buffer") set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING - "SDU buffer block size, multiple of pagesize for performance") + "Packet buffer block size, multiple of pagesize for performance") set(SHM_RDRB_MULTI_BLOCK true CACHE BOOL - "SDU buffer multiblock SDU support") + "Packet buffer multiblock packet support") set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL "Enable shared memory lockless rbuff support") diff --git a/src/lib/dev.c b/src/lib/dev.c index a92c1e42..00dcf991 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -895,7 +895,7 @@ ssize_t flow_read(int fd, { ssize_t idx; ssize_t n; - uint8_t * sdu; + uint8_t * packet; struct shm_rbuff * rb; struct shm_du_buff * sdb; struct timespec abs; @@ -948,19 +948,19 @@ ssize_t flow_read(int fd, } } - n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); + n = shm_rdrbuff_read(&packet, ai.rdrb, idx); assert(n >= 0); if (n <= (ssize_t) count) { - memcpy(buf, sdu, n); + memcpy(buf, packet, n); shm_rdrbuff_remove(ai.rdrb, idx); flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; return n; } else { if (partrd) { - memcpy(buf, sdu, count); + memcpy(buf, packet, count); sdb = shm_rdrbuff_get(ai.rdrb, idx); shm_du_buff_head_release(sdb, n); flow->part_idx = idx; @@ -1042,7 +1042,7 @@ int fset_add(struct flow_set * set, int fd) { int ret; - size_t sdus; + size_t packets; size_t i; if (set == NULL || fd < 0 || fd > SYS_MAX_FLOWS) @@ -1052,8 +1052,8 @@ int fset_add(struct flow_set * set, ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); - sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); - for (i = 0; i < sdus; i++) + packets = shm_rbuff_queued(ai.flows[fd].rx_rb); + for (i = 0; i < packets; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/frct.c b/src/lib/frct.c index e9acb1dc..2e5c385d 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -294,7 +294,7 @@ static int __frcti_snd(struct frcti * frcti, return 0; } -/* Returns 0 when idx contains an SDU for the application. */ +/* Returns 0 when idx contains a packet for the application. */ static int __frcti_rcv(struct frcti * frcti, struct shm_du_buff * sdb) { diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 453f5183..5319e89c 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer implementations for incoming SDUs + * Ring buffer implementations for incoming packets * * Dimitri Staessens * Sander Vrijders @@ -63,8 +63,8 @@ struct shm_rbuff { size_t * tail; /* start of ringbuffer tail */ size_t * acl; /* access control */ pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * add; /* SDU arrived */ - pthread_cond_t * del; /* SDU removed */ + pthread_cond_t * add; /* packet arrived */ + pthread_cond_t * del; /* packet removed */ pid_t pid; /* pid of the owner */ int port_id; /* port_id of the flow */ }; diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index c488f274..43eac7f6 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Lockless ring buffer for incoming SDUs + * Lockless ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 3b7ea2d4..c74503ff 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 5ae2085d..8bbdda46 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -87,7 +87,7 @@ struct shm_rdrbuff { size_t * tail; /* start of ringbuffer tail */ pthread_mutex_t * lock; /* lock all free space in shm */ pthread_cond_t * full; /* flag when full */ - pthread_cond_t * healthy; /* flag when SDU is read */ + pthread_cond_t * healthy; /* flag when packet is read */ pid_t * pid; /* pid of the irmd owner */ }; diff --git a/src/tools/ocbr/ocbr.c b/src/tools/ocbr/ocbr.c index e2bd84af..12983da3 100644 --- a/src/tools/ocbr/ocbr.c +++ b/src/tools/ocbr/ocbr.c @@ -60,7 +60,7 @@ struct s { static void usage(void) { printf("Usage: cbr [OPTION]...\n" - "Sends SDUs from client to server at a constant bit rate.\n\n" + "Sends packets from client to server at a constant bit rate.\n\n" " -l, --listen Run in server mode\n" "\n" "Server options:\n" @@ -70,10 +70,10 @@ static void usage(void) "Client options:\n" " -n, --server_apn Specify the name of the server.\n" " -d, --duration Duration for sending (s)\n" - " -f, --flood Send SDUs as fast as possible\n" - " -s, --size SDU size (B, max %ld B)\n" + " -f, --flood Send packets as fast as possible\n" + " -s, --size packet size (B, max %ld B)\n" " -r, --rate Rate (b/s)\n" - " --sleep Sleep in between sending SDUs\n" + " --sleep Sleep in between sending packets\n" "\n\n" " --help Display this help text and exit\n", BUF_SIZE); @@ -82,7 +82,7 @@ static void usage(void) int main(int argc, char ** argv) { int duration = 60; /* One minute test */ - int size = 1000; /* 1000 byte SDUs */ + int size = 1000; /* 1000 byte packets */ long rate = 1000000; /* 1 Mb/s */ bool flood = false; bool sleep = false; diff --git a/src/tools/ocbr/ocbr_client.c b/src/tools/ocbr/ocbr_client.c index 026ab001..63b43721 100644 --- a/src/tools/ocbr/ocbr_client.c +++ b/src/tools/ocbr/ocbr_client.c @@ -155,7 +155,7 @@ int client_main(char * server, ms = ts_diff_ms(&start, &end); printf("sent statistics: " - "%9ld SDUs, %12ld bytes in %9d ms, %4.4f Mb/s\n", + "%9ld packets, %12ld bytes in %9d ms, %4.4f Mb/s\n", seqnr, seqnr * size, ms, (seqnr / (ms * 1000.0)) * size * 8.0); flow_dealloc(fd); diff --git a/src/tools/ocbr/ocbr_server.c b/src/tools/ocbr/ocbr_server.c index 4f080eff..75983201 100644 --- a/src/tools/ocbr/ocbr_server.c +++ b/src/tools/ocbr/ocbr_server.c @@ -90,8 +90,8 @@ static void handle_flow(int fd) bool stop = false; - long sdus = 0; - long sdus_intv = 0; + long packets = 0; + long packets_intv = 0; long bytes_read = 0; long bytes_read_intv = 0; @@ -109,7 +109,7 @@ static void handle_flow(int fd) if (count > 0) { clock_gettime(CLOCK_REALTIME, &alive); - sdus++; + packets++; bytes_read += count; } @@ -121,17 +121,18 @@ static void handle_flow(int fd) if (stop || ts_diff_ms(&now, &iv_end) < 0) { long us = ts_diff_us(&iv_start, &now); - printf("Flow %4d: %9ld SDUs (%12ld bytes) in %9ld ms" - " => %9.4f p/s, %9.4f Mb/s\n", + printf("Flow %4d: %9ld packets (%12ld bytes) in %9ld ms" + " => %9.4f pps, %9.4f Mbps\n", fd, - sdus - sdus_intv, + packets - packets_intv, bytes_read - bytes_read_intv, us / 1000, - ((sdus - sdus_intv) / (double) us) * MILLION, + ((packets - packets_intv) / (double) us) + * MILLION, 8 * ((bytes_read - bytes_read_intv) / (double)(us))); iv_start = iv_end; - sdus_intv = sdus; + packets_intv = packets; bytes_read_intv = bytes_read; ts_add(&iv_start, &intv, &iv_end); } diff --git a/src/tools/oecho/oecho.c b/src/tools/oecho/oecho.c index cc173988..b6a74aa5 100644 --- a/src/tools/oecho/oecho.c +++ b/src/tools/oecho/oecho.c @@ -72,7 +72,7 @@ static int server_main(void) count = flow_read(fd, &buf, BUF_SIZE); if (count < 0) { - printf("Failed to read SDU.\n"); + printf("Failed to read packet.\n"); flow_dealloc(fd); continue; } @@ -80,7 +80,7 @@ static int server_main(void) printf("Message from client is %.*s.\n", (int) count, buf); if (flow_write(fd, buf, count) == -1) { - printf("Failed to write SDU.\n"); + printf("Failed to write packet.\n"); flow_dealloc(fd); continue; } @@ -105,14 +105,14 @@ static int client_main(void) } if (flow_write(fd, message, strlen(message) + 1) < 0) { - printf("Failed to write SDU.\n"); + printf("Failed to write packet.\n"); flow_dealloc(fd); return -1; } count = flow_read(fd, buf, BUF_SIZE); if (count < 0) { - printf("Failed to read SDU.\n"); + printf("Failed to read packet.\n"); flow_dealloc(fd); return -1; } diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c index 137e8647..92555c23 100644 --- a/src/tools/operf/operf.c +++ b/src/tools/operf/operf.c @@ -119,8 +119,8 @@ static void usage(void) " -d, --duration Test duration (default 60s)\n" " -r, --rate Rate (b/s)\n" " -s, --size Payload size (B, default 1500)\n" - " -f, --flood Send SDUs as fast as possible\n" - " --sleep Sleep in between sending SDUs\n" + " -f, --flood Send packets as fast as possible\n" + " --sleep Sleep in between sending packets\n" "\n" " --help Display this help text and exit\n"); } diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c index c8873c54..6862944e 100644 --- a/src/tools/operf/operf_client.c +++ b/src/tools/operf/operf_client.c @@ -120,11 +120,12 @@ void * writer(void * o) msg = (struct msg *) buf; if (client.flood) - printf("Flooding %s with %d byte SDUs for %d seconds.\n\n", + printf("Flooding %s with %d byte packets for %d seconds.\n\n", client.server_name, client.size, client.duration / 1000); else - printf("Sending %d byte SDUs for %d s to %s at %.3lf Mb/s.\n\n", + printf("Sending %d byte packets for %d s to %s " + "at %.3lf Mb/s.\n\n", client.size, client.duration / 1000, client.server_name, client.rate / (double) MILLION); @@ -141,7 +142,7 @@ void * writer(void * o) msg->id = client.sent; if (flow_write(*fdp, buf, client.size) == -1) { - printf("Failed to send SDU.\n"); + printf("Failed to send packet.\n"); flow_dealloc(*fdp); free(buf); return (void *) -1; @@ -225,7 +226,7 @@ int client_main(void) printf("\n"); printf("--- %s perf statistics ---\n", client.server_name); - printf("%ld SDUs transmitted, ", client.sent); + printf("%ld packets transmitted, ", client.sent); printf("%ld received, ", client.rcvd); printf("%ld%% packet loss, ", client.sent == 0 ? 0 : 100 - ((100 * client.rcvd) / client.sent)); diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 0f7695b5..a978e659 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -176,7 +176,7 @@ void * writer(void * o) msg->tv_nsec = now.tv_nsec; if (flow_write(*fdp, buf, client.size) == -1) { - printf("Failed to send SDU.\n"); + printf("Failed to send packet.\n"); flow_dealloc(*fdp); free(buf); return (void *) -1; @@ -253,7 +253,7 @@ static int client_main(void) printf("\n"); printf("--- %s ping statistics ---\n", client.s_apn); - printf("%d SDUs transmitted, ", client.sent); + printf("%d packets transmitted, ", client.sent); printf("%d received, ", client.rcvd); printf("%zd out-of-order, ", client.ooo); printf("%.0lf%% packet loss, ", client.sent == 0 ? 0 : -- cgit v1.2.3 From 04384c0eab88615902025023bb3e1339ea0f9d1a Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 5 Oct 2018 11:24:55 +0200 Subject: lib: Rename port_id to flow_id Renames port_id to flow_id according to updated nomenclature. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- doc/man/flow_alloc.3 | 4 +- doc/man/ouroboros-tutorial.7 | 18 +++--- include/ouroboros/np1_flow.h | 6 +- include/ouroboros/shm_flow_set.h | 8 +-- include/ouroboros/shm_rbuff.h | 4 +- src/ipcpd/ipcp.c | 16 ++--- src/irmd/ipcp.c | 18 +++--- src/irmd/ipcp.h | 6 +- src/irmd/irm_flow.c | 8 +-- src/irmd/irm_flow.h | 4 +- src/irmd/main.c | 122 +++++++++++++++++++-------------------- src/lib/dev.c | 102 ++++++++++++++++---------------- src/lib/ipcpd_messages.proto | 2 +- src/lib/irmd_messages.proto | 2 +- src/lib/shm_flow_set.c | 40 ++++++------- src/lib/shm_rbuff.c | 18 +++--- src/lib/shm_rbuff_ll.c | 2 +- src/lib/shm_rbuff_pthr.c | 2 +- 18 files changed, 191 insertions(+), 191 deletions(-) (limited to 'src/irmd') diff --git a/doc/man/flow_alloc.3 b/doc/man/flow_alloc.3 index dda0c877..8a21eda8 100644 --- a/doc/man/flow_alloc.3 +++ b/doc/man/flow_alloc.3 @@ -2,7 +2,7 @@ .\" Dimitri Staessens .\" Sander Vrijders -.TH FLOW_ALLOC 3 2017-12-02 Ouroboros "Ouroboros Programmer's Manual" +.TH FLOW_ALLOC 3 2018-10-05 Ouroboros "Ouroboros Programmer's Manual" .SH NAME @@ -80,7 +80,7 @@ Failed to contact an IRMd instance. \fBflow_accept\fR() and \fBflow_alloc\fR() can also return .B -EBADF -No more flow desciptors or port_ids available. +No more flow desciptors or flow_ids available. .B -ENOMEM Not enough system memory resources available to allocate the flow. diff --git a/doc/man/ouroboros-tutorial.7 b/doc/man/ouroboros-tutorial.7 index 76fa0068..b7a22208 100644 --- a/doc/man/ouroboros-tutorial.7 +++ b/doc/man/ouroboros-tutorial.7 @@ -2,7 +2,7 @@ .\" Dimitri Staessens .\" Sander Vrijders -.TH OUROBOROS-TUTORIAL 7 2017-12-02 Ouroboros "Ouroboros User Manual" +.TH OUROBOROS-TUTORIAL 7 2018-10-05 Ouroboros "Ouroboros User Manual" .SH NAME @@ -114,25 +114,25 @@ rtt min/avg/max/mdev = 0.304/0.392/0.475/0.086 ms .RE That's all there is to it! The IRMd should log the flow -allocation. There are two endpoints of the flow (port_id's 0 and 1), +allocation. There are two endpoints of the flow (flow_id's 0 and 1), one for the server (1) and one for the client (0). After the flow -request, a new port_id is created at the server side (port_id 1) and -then a previously pending flow (on port_id 0) is allocated following +request, a new flow_id is created at the server side (flow_id 1) and +then a previously pending flow (on flow_id 0) is allocated following the response from the server. When the communication is done, the flow is deallocated and the -resources (port_id's 0 and 1) are released. +resources (flow_id's 0 and 1) are released. .RS 4 ==23918== irmd(II): Flow request arrived for my.oping.server. .br -==23918== irmd(II): Flow on port_id 1 allocated. +==23918== irmd(II): Flow on flow_id 1 allocated. .br -==23918== irmd(II): Flow on port_id 0 allocated. +==23918== irmd(II): Flow on flow_id 0 allocated. .br -==23918== irmd(II): Completed deallocation of port_id 0 by process 23932. +==23918== irmd(II): Completed deallocation of flow_id 0 by process 23932. .br -==23918== irmd(II): Completed deallocation of port_id 1 by process 23932. +==23918== irmd(II): Completed deallocation of flow_id 1 by process 23932. .RE .SH TERMINOLOGY diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index 3435c24a..3a2bbd12 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -28,11 +28,11 @@ #include int np1_flow_alloc(pid_t n_pid, - int port_id, + int flow_id, qosspec_t qs); -int np1_flow_resp(int port_id); +int np1_flow_resp(int flow_id); -int np1_flow_dealloc(int port_id); +int np1_flow_dealloc(int flow_id); #endif /* OUROBOROS_NP1_FLOW_H */ diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h index ebf63af5..45d372a0 100644 --- a/include/ouroboros/shm_flow_set.h +++ b/include/ouroboros/shm_flow_set.h @@ -42,18 +42,18 @@ void shm_flow_set_zero(struct shm_flow_set * shm_set, int shm_flow_set_add(struct shm_flow_set * shm_set, size_t idx, - int port_id); + int flow_id); int shm_flow_set_has(struct shm_flow_set * shm_set, size_t idx, - int port_id); + int flow_id); void shm_flow_set_del(struct shm_flow_set * shm_set, size_t idx, - int port_id); + int flow_id); void shm_flow_set_notify(struct shm_flow_set * set, - int port_id, + int flow_id, int event); ssize_t shm_flow_set_wait(const struct shm_flow_set * shm_set, diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index 223f6bf4..447e081e 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -35,10 +35,10 @@ struct shm_rbuff; struct shm_rbuff * shm_rbuff_create(pid_t pid, - int port_id); + int flow_id); struct shm_rbuff * shm_rbuff_open(pid_t pid, - int port_id); + int flow_id); void shm_rbuff_close(struct shm_rbuff * rb); diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index e415bbd9..f8df5640 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -432,11 +432,11 @@ static void * mainloop(void * o) qs = msg_to_spec(msg->qosspec); fd = np1_flow_alloc(msg->pid, - msg->port_id, + msg->flow_id, qs); if (fd < 0) { - log_err("Failed allocating fd on port_id %d.", - msg->port_id); + log_err("Failed allocating fd on flow_id %d.", + msg->flow_id); ret_msg.result = -1; break; } @@ -461,10 +461,10 @@ static void * mainloop(void * o) } if (!msg->response) { - fd = np1_flow_resp(msg->port_id); + fd = np1_flow_resp(msg->flow_id); if (fd < 0) { log_warn("Port_id %d is not known.", - msg->port_id); + msg->flow_id); ret_msg.result = -1; break; } @@ -488,10 +488,10 @@ static void * mainloop(void * o) break; } - fd = np1_flow_dealloc(msg->port_id); + fd = np1_flow_dealloc(msg->flow_id); if (fd < 0) { - log_warn("Could not deallocate port_id %d.", - msg->port_id); + log_warn("Could not deallocate flow_id %d.", + msg->flow_id); ret_msg.result = -1; break; } diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 0bdf674b..20aee79f 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -429,7 +429,7 @@ int ipcp_query(pid_t pid, } int ipcp_flow_alloc(pid_t pid, - int port_id, + int flow_id, pid_t n_pid, const uint8_t * dst, size_t len, @@ -443,8 +443,8 @@ int ipcp_flow_alloc(pid_t pid, assert(dst); msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; - msg.has_port_id = true; - msg.port_id = port_id; + msg.has_flow_id = true; + msg.flow_id = flow_id; msg.has_pid = true; msg.pid = n_pid; msg.has_hash = true; @@ -469,7 +469,7 @@ int ipcp_flow_alloc(pid_t pid, } int ipcp_flow_alloc_resp(pid_t pid, - int port_id, + int flow_id, pid_t n_pid, int response) { @@ -478,8 +478,8 @@ int ipcp_flow_alloc_resp(pid_t pid, int ret = -1; msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; - msg.has_port_id = true; - msg.port_id = port_id; + msg.has_flow_id = true; + msg.flow_id = flow_id; msg.has_pid = true; msg.pid = n_pid; msg.has_response = true; @@ -501,15 +501,15 @@ int ipcp_flow_alloc_resp(pid_t pid, } int ipcp_flow_dealloc(pid_t pid, - int port_id) + int flow_id) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; int ret = -1; msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_port_id = true; - msg.port_id = port_id; + msg.has_flow_id = true; + msg.flow_id = flow_id; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 28396333..5d096788 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -63,18 +63,18 @@ int ipcp_query(pid_t pid, size_t len); int ipcp_flow_alloc(pid_t pid, - int port_id, + int flow_id, pid_t n_pid, const uint8_t * dst, size_t len, qosspec_t qs); int ipcp_flow_alloc_resp(pid_t pid, - int port_id, + int flow_id, pid_t n_pid, int response); int ipcp_flow_dealloc(pid_t pid, - int port_id); + int flow_id); #endif /* OUROBOROS_IRMD_IPCP_H */ diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index a5a9f28c..a0889f09 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -38,7 +38,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, - int port_id, + int flow_id, qosspec_t qs) { pthread_condattr_t cattr; @@ -60,16 +60,16 @@ struct irm_flow * irm_flow_create(pid_t n_pid, f->n_pid = n_pid; f->n_1_pid = n_1_pid; - f->port_id = port_id; + f->flow_id = flow_id; f->qs = qs; - f->n_rb = shm_rbuff_create(n_pid, port_id); + f->n_rb = shm_rbuff_create(n_pid, flow_id); if (f->n_rb == NULL) { log_err("Could not create ringbuffer for process %d.", n_pid); goto fail_n_rbuff; } - f->n_1_rb = shm_rbuff_create(n_1_pid, port_id); + f->n_1_rb = shm_rbuff_create(n_1_pid, flow_id); if (f->n_1_rb == NULL) { log_err("Could not create ringbuffer for process %d.", n_1_pid); goto fail_n_1_rbuff; diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index f4de8187..1cd2e662 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -42,7 +42,7 @@ enum flow_state { struct irm_flow { struct list_head next; - int port_id; + int flow_id; pid_t n_pid; pid_t n_1_pid; @@ -61,7 +61,7 @@ struct irm_flow { struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, - int port_id, + int flow_id, qosspec_t qs); void irm_flow_destroy(struct irm_flow * f); diff --git a/src/irmd/main.c b/src/irmd/main.c index 427e09d1..78fcf7b5 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -118,7 +118,7 @@ struct { struct list_head spawned_pids; /* child processes */ pthread_rwlock_t reg_lock; /* lock for registration info */ - struct bmp * port_ids; /* port_ids for flows */ + struct bmp * flow_ids; /* flow_ids for flows */ struct list_head irm_flows; /* flow information */ pthread_rwlock_t flows_lock; /* lock for flows */ @@ -174,13 +174,13 @@ static void clear_irm_flow(struct irm_flow * f) { shm_rdrbuff_remove(irmd.rdrb, idx); } -static struct irm_flow * get_irm_flow(int port_id) +static struct irm_flow * get_irm_flow(int flow_id) { struct list_head * pos = NULL; list_for_each(pos, &irmd.irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->port_id == port_id) + if (e->flow_id == flow_id) return e; } @@ -1152,7 +1152,7 @@ static int flow_accept(pid_t pid, pid_t pid_n1; pid_t pid_n; - int port_id; + int flow_id; int ret; pthread_rwlock_wrlock(&irmd.reg_lock); @@ -1198,7 +1198,7 @@ static int flow_accept(pid_t pid, pid_n = f->n_pid; pid_n1 = f->n_1_pid; - port_id = f->port_id; + flow_id = f->flow_id; pthread_rwlock_unlock(&irmd.flows_lock); pthread_rwlock_rdlock(&irmd.reg_lock); @@ -1208,9 +1208,9 @@ static int flow_accept(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); list_del(&f->next); - bmp_release(irmd.port_ids, f->port_id); + bmp_release(irmd.flow_ids, f->flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_alloc_resp(pid_n1, port_id, pid_n, -1); + ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1); clear_irm_flow(f); irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); @@ -1228,9 +1228,9 @@ static int flow_accept(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); list_del(&f->next); - bmp_release(irmd.port_ids, f->port_id); + bmp_release(irmd.flow_ids, f->flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_alloc_resp(pid_n1, port_id, pid_n, -1); + ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1); clear_irm_flow(f); irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); @@ -1242,7 +1242,7 @@ static int flow_accept(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); - if (ipcp_flow_alloc_resp(pid_n1, port_id, pid_n, 0)) { + if (ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, 0)) { pthread_rwlock_wrlock(&irmd.flows_lock); list_del(&f->next); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1255,7 +1255,7 @@ static int flow_accept(pid_t pid, irm_flow_set_state(f, FLOW_ALLOCATED); - log_info("Flow on port_id %d allocated.", f->port_id); + log_info("Flow on flow_id %d allocated.", f->flow_id); *fl = f; @@ -1270,7 +1270,7 @@ static int flow_alloc(pid_t pid, { struct irm_flow * f; struct ipcp_entry * ipcp; - int port_id; + int flow_id; int state; uint8_t * hash; @@ -1281,18 +1281,18 @@ static int flow_alloc(pid_t pid, } pthread_rwlock_wrlock(&irmd.flows_lock); - port_id = bmp_allocate(irmd.port_ids); - if (!bmp_is_id_valid(irmd.port_ids, port_id)) { + flow_id = bmp_allocate(irmd.flow_ids); + if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) { pthread_rwlock_unlock(&irmd.flows_lock); - log_err("Could not allocate port_id."); + log_err("Could not allocate flow_id."); return -EBADF; } - f = irm_flow_create(pid, ipcp->pid, port_id, qs); + f = irm_flow_create(pid, ipcp->pid, flow_id, qs); if (f == NULL) { - bmp_release(irmd.port_ids, port_id); + bmp_release(irmd.flow_ids, flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - log_err("Could not allocate port_id."); + log_err("Could not allocate flow_id."); return -ENOMEM; } @@ -1309,7 +1309,7 @@ static int flow_alloc(pid_t pid, str_hash(ipcp->dir_hash_algo, hash, dst); - if (ipcp_flow_alloc(ipcp->pid, port_id, pid, hash, + if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, IPCP_HASH_LEN(ipcp), qs)) { /* sanitizer cleans this */ log_info("Flow_allocation failed."); @@ -1334,13 +1334,13 @@ static int flow_alloc(pid_t pid, *e = f; - log_info("Flow on port_id %d allocated.", port_id); + log_info("Flow on flow_id %d allocated.", flow_id); return 0; } static int flow_dealloc(pid_t pid, - int port_id) + int flow_id) { pid_t n_1_pid = -1; int ret = 0; @@ -1349,10 +1349,10 @@ static int flow_dealloc(pid_t pid, pthread_rwlock_wrlock(&irmd.flows_lock); - f = get_irm_flow(port_id); + f = get_irm_flow(flow_id); if (f == NULL) { pthread_rwlock_unlock(&irmd.flows_lock); - log_dbg("Deallocate unknown port %d by %d.", port_id, pid); + log_dbg("Deallocate unknown port %d by %d.", flow_id, pid); return 0; } @@ -1374,19 +1374,19 @@ static int flow_dealloc(pid_t pid, irm_flow_set_state(f, FLOW_NULL); clear_irm_flow(f); irm_flow_destroy(f); - bmp_release(irmd.port_ids, port_id); - log_info("Completed deallocation of port_id %d by process %d.", - port_id, pid); + bmp_release(irmd.flow_ids, flow_id); + log_info("Completed deallocation of flow_id %d by process %d.", + flow_id, pid); } else { irm_flow_set_state(f, FLOW_DEALLOC_PENDING); - log_dbg("Partial deallocation of port_id %d by process %d.", - port_id, pid); + log_dbg("Partial deallocation of flow_id %d by process %d.", + flow_id, pid); } pthread_rwlock_unlock(&irmd.flows_lock); if (n_1_pid != -1) - ret = ipcp_flow_dealloc(n_1_pid, port_id); + ret = ipcp_flow_dealloc(n_1_pid, flow_id); return ret; } @@ -1428,7 +1428,7 @@ static struct irm_flow * flow_req_arr(pid_t pid, struct pid_el * c_pid; struct ipcp_entry * ipcp; pid_t h_pid = -1; - int port_id = -1; + int flow_id = -1; struct timespec wt = {IRMD_REQ_ARR_TIMEOUT / 1000, (IRMD_REQ_ARR_TIMEOUT % 1000) * MILLION}; @@ -1515,17 +1515,17 @@ static struct irm_flow * flow_req_arr(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); - port_id = bmp_allocate(irmd.port_ids); - if (!bmp_is_id_valid(irmd.port_ids, port_id)) { + flow_id = bmp_allocate(irmd.flow_ids); + if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) { pthread_rwlock_unlock(&irmd.flows_lock); return NULL; } - f = irm_flow_create(h_pid, pid, port_id, qs); + f = irm_flow_create(h_pid, pid, flow_id, qs); if (f == NULL) { - bmp_release(irmd.port_ids, port_id); + bmp_release(irmd.flow_ids, flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - log_err("Could not allocate port_id."); + log_err("Could not allocate flow_id."); return NULL; } @@ -1541,7 +1541,7 @@ static struct irm_flow * flow_req_arr(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); clear_irm_flow(f); - bmp_release(irmd.port_ids, f->port_id); + bmp_release(irmd.flow_ids, f->flow_id); list_del(&f->next); pthread_rwlock_unlock(&irmd.flows_lock); log_err("Could not get process table entry for %d.", h_pid); @@ -1558,14 +1558,14 @@ static struct irm_flow * flow_req_arr(pid_t pid, return f; } -static int flow_alloc_reply(int port_id, +static int flow_alloc_reply(int flow_id, int response) { struct irm_flow * f; pthread_rwlock_rdlock(&irmd.flows_lock); - f = get_irm_flow(port_id); + f = get_irm_flow(flow_id); if (f == NULL) { pthread_rwlock_unlock(&irmd.flows_lock); return -1; @@ -1631,8 +1631,8 @@ static void irm_fini(void) pthread_rwlock_wrlock(&irmd.flows_lock); - if (irmd.port_ids != NULL) - bmp_destroy(irmd.port_ids); + if (irmd.flow_ids != NULL) + bmp_destroy(irmd.flow_ids); list_for_each_safe(p, h, &irmd.irm_flows) { struct irm_flow * f = list_entry(p, struct irm_flow, next); @@ -1759,14 +1759,14 @@ void * irm_sanitize(void * o) list_for_each_safe(p, h, &irmd.irm_flows) { int ipcpi; - int port_id; + int flow_id; struct irm_flow * f = list_entry(p, struct irm_flow, next); if (irm_flow_get_state(f) == FLOW_ALLOC_PENDING && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { - log_dbg("Pending port_id %d timed out.", - f->port_id); + log_dbg("Pending flow_id %d timed out.", + f->flow_id); f->n_pid = -1; irm_flow_set_state(f, FLOW_DEALLOC_PENDING); continue; @@ -1776,16 +1776,16 @@ void * irm_sanitize(void * o) struct shm_flow_set * set; log_dbg("Process %d gone, deallocating " "flow %d.", - f->n_pid, f->port_id); + f->n_pid, f->flow_id); set = shm_flow_set_open(f->n_pid); if (set != NULL) shm_flow_set_destroy(set); f->n_pid = -1; irm_flow_set_state(f, FLOW_DEALLOC_PENDING); ipcpi = f->n_1_pid; - port_id = f->port_id; + flow_id = f->flow_id; pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_dealloc(ipcpi, port_id); + ipcp_flow_dealloc(ipcpi, flow_id); pthread_rwlock_wrlock(&irmd.flows_lock); continue; } @@ -1793,7 +1793,7 @@ void * irm_sanitize(void * o) if (kill(f->n_1_pid, 0) < 0) { struct shm_flow_set * set; log_err("IPCP %d gone, flow %d removed.", - f->n_1_pid, f->port_id); + f->n_1_pid, f->flow_id); set = shm_flow_set_open(f->n_pid); if (set != NULL) shm_flow_set_destroy(set); @@ -1994,8 +1994,8 @@ static void * mainloop(void * o) result = flow_accept(msg->pid, timeo, &e); if (result == 0) { qosspec_msg_t qs_msg; - ret_msg->has_port_id = true; - ret_msg->port_id = e->port_id; + ret_msg->has_flow_id = true; + ret_msg->flow_id = e->flow_id; ret_msg->has_pid = true; ret_msg->pid = e->n_1_pid; qs_msg = spec_to_msg(&e->qs); @@ -2007,14 +2007,14 @@ static void * mainloop(void * o) msg_to_spec(msg->qosspec), timeo, &e); if (result == 0) { - ret_msg->has_port_id = true; - ret_msg->port_id = e->port_id; + ret_msg->has_flow_id = true; + ret_msg->flow_id = e->flow_id; ret_msg->has_pid = true; ret_msg->pid = e->n_1_pid; } break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: - result = flow_dealloc(msg->pid, msg->port_id); + result = flow_dealloc(msg->pid, msg->flow_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->pid, @@ -2022,14 +2022,14 @@ static void * mainloop(void * o) msg_to_spec(msg->qosspec)); result = (e == NULL ? -1 : 0); if (result == 0) { - ret_msg->has_port_id = true; - ret_msg->port_id = e->port_id; + ret_msg->has_flow_id = true; + ret_msg->flow_id = e->flow_id; ret_msg->has_pid = true; ret_msg->pid = e->n_pid; } break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: - result = flow_alloc_reply(msg->port_id, msg->response); + result = flow_alloc_reply(msg->flow_id, msg->response); break; default: log_err("Don't know that message code."); @@ -2143,10 +2143,10 @@ static int irm_init(void) list_head_init(&irmd.irm_flows); list_head_init(&irmd.cmds); - irmd.port_ids = bmp_create(SYS_MAX_FLOWS, 0); - if (irmd.port_ids == NULL) { - log_err("Failed to create port_ids bitmap."); - goto fail_port_ids; + irmd.flow_ids = bmp_create(SYS_MAX_FLOWS, 0); + if (irmd.flow_ids == NULL) { + log_err("Failed to create flow_ids bitmap."); + goto fail_flow_ids; } if ((irmd.lf = lockfile_create()) == NULL) { @@ -2235,8 +2235,8 @@ static int irm_init(void) fail_stat: lockfile_destroy(irmd.lf); fail_lockfile: - bmp_destroy(irmd.port_ids); - fail_port_ids: + bmp_destroy(irmd.flow_ids); + fail_flow_ids: pthread_cond_destroy(&irmd.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&irmd.cmd_lock); diff --git a/src/lib/dev.c b/src/lib/dev.c index 00dcf991..117ce49d 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -91,7 +91,7 @@ struct flow { struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; struct shm_flow_set * set; - int port_id; + int flow_id; int oflags; qosspec_t spec; ssize_t part_idx; @@ -167,12 +167,12 @@ static void port_set_state(struct port * p, pthread_mutex_unlock(&p->state_lock); } -static enum port_state port_wait_assign(int port_id) +static enum port_state port_wait_assign(int flow_id) { enum port_state state; struct port * p; - p = &ai.ports[port_id]; + p = &ai.ports[flow_id]; pthread_mutex_lock(&p->state_lock); @@ -231,7 +231,7 @@ static void flow_clear(int fd) { memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); - ai.flows[fd].port_id = -1; + ai.flows[fd].flow_id = -1; ai.flows[fd].pid = -1; } @@ -239,8 +239,8 @@ static void flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); - if (ai.flows[fd].port_id != -1) { - port_destroy(&ai.ports[ai.flows[fd].port_id]); + if (ai.flows[fd].flow_id != -1) { + port_destroy(&ai.ports[ai.flows[fd].flow_id]); bmp_release(ai.fds, fd); } @@ -256,7 +256,7 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) { shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].port_id, + ai.flows[fd].flow_id, FLOW_DEALLOC); shm_flow_set_close(ai.flows[fd].set); } @@ -267,7 +267,7 @@ static void flow_fini(int fd) flow_clear(fd); } -static int flow_init(int port_id, +static int flow_init(int flow_id, pid_t pid, qosspec_t qs) { @@ -282,11 +282,11 @@ static int flow_init(int port_id, goto fail_fds; } - ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, port_id); + ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, flow_id); if (ai.flows[fd].rx_rb == NULL) goto fail; - ai.flows[fd].tx_rb = shm_rbuff_open(pid, port_id); + ai.flows[fd].tx_rb = shm_rbuff_open(pid, flow_id); if (ai.flows[fd].tx_rb == NULL) goto fail; @@ -294,15 +294,15 @@ static int flow_init(int port_id, if (ai.flows[fd].set == NULL) goto fail; - ai.flows[fd].port_id = port_id; + ai.flows[fd].flow_id = flow_id; ai.flows[fd].oflags = FLOWFDEFAULT; ai.flows[fd].pid = pid; ai.flows[fd].part_idx = NO_PART; ai.flows[fd].spec = qs; - ai.ports[port_id].fd = fd; + ai.ports[flow_id].fd = fd; - port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED); pthread_rwlock_unlock(&ai.lock); @@ -446,7 +446,7 @@ static void fini(void) pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < PROG_MAX_FLOWS; ++i) { - if (ai.flows[i].port_id != -1) { + if (ai.flows[i].flow_id != -1) { ssize_t idx; shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) @@ -522,13 +522,13 @@ int flow_accept(qosspec_t * qs, return res; } - if (!recv_msg->has_pid || !recv_msg->has_port_id || + if (!recv_msg->has_pid || !recv_msg->has_flow_id || recv_msg->qosspec == NULL) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, + fd = flow_init(recv_msg->flow_id, recv_msg->pid, msg_to_spec(recv_msg->qosspec)); irm_msg__free_unpacked(recv_msg, NULL); @@ -595,12 +595,12 @@ int flow_alloc(const char * dst, return res; } - if (!recv_msg->has_pid || !recv_msg->has_port_id) { + if (!recv_msg->has_pid || !recv_msg->has_flow_id) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, + fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs == NULL ? qos_raw : *qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -635,15 +635,15 @@ int flow_dealloc(int fd) return -EINVAL; msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_port_id = true; + msg.has_flow_id = true; msg.has_pid = true; msg.pid = ai.pid; pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].port_id >= 0); + assert(ai.flows[fd].flow_id >= 0); - msg.port_id = ai.flows[fd].port_id; + msg.flow_id = ai.flows[fd].flow_id; pthread_rwlock_unlock(&ai.lock); @@ -690,7 +690,7 @@ int fccntl(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -762,13 +762,13 @@ int fccntl(int fd, rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->port_id, + flow->flow_id, FLOW_DOWN); } else { rx_acl &= ~ACL_FLOWDOWN; tx_acl &= ~ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->port_id, + flow->flow_id, FLOW_UP); } @@ -836,7 +836,7 @@ ssize_t flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -880,7 +880,7 @@ ssize_t flow_write(int fd, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -918,7 +918,7 @@ ssize_t flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -1050,11 +1050,11 @@ int fset_add(struct flow_set * set, pthread_rwlock_wrlock(&ai.lock); - ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id); packets = shm_rbuff_queued(ai.flows[fd].rx_rb); for (i = 0; i < packets; i++) - shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); + shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1069,8 +1069,8 @@ void fset_del(struct flow_set * set, pthread_rwlock_wrlock(&ai.lock); - if (ai.flows[fd].port_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + if (ai.flows[fd].flow_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id); pthread_rwlock_unlock(&ai.lock); } @@ -1085,12 +1085,12 @@ bool fset_has(const struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (ai.flows[fd].flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return false; } - ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1); pthread_rwlock_unlock(&ai.lock); @@ -1166,35 +1166,35 @@ int fevent(struct flow_set * set, /* ipcp-dev functions. */ int np1_flow_alloc(pid_t n_pid, - int port_id, + int flow_id, qosspec_t qs) { - return flow_init(port_id, n_pid, qs); + return flow_init(flow_id, n_pid, qs); } -int np1_flow_dealloc(int port_id) +int np1_flow_dealloc(int flow_id) { int fd; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[port_id].fd; + fd = ai.ports[flow_id].fd; pthread_rwlock_unlock(&ai.lock); return fd; } -int np1_flow_resp(int port_id) +int np1_flow_resp(int flow_id) { int fd; - if (port_wait_assign(port_id) != PORT_ID_ASSIGNED) + if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED) return -1; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[port_id].fd; + fd = ai.ports[flow_id].fd; pthread_rwlock_unlock(&ai.lock); @@ -1254,7 +1254,7 @@ int ipcp_flow_req_arr(pid_t pid, if (recv_msg == NULL) return -EIRMD; - if (!recv_msg->has_port_id || !recv_msg->has_pid) { + if (!recv_msg->has_flow_id || !recv_msg->has_pid) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1264,7 +1264,7 @@ int ipcp_flow_req_arr(pid_t pid, return -1; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qs); + fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -1281,11 +1281,11 @@ int ipcp_flow_alloc_reply(int fd, assert(fd >= 0 && fd < SYS_MAX_FLOWS); msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.has_port_id = true; + msg.has_flow_id = true; pthread_rwlock_rdlock(&ai.lock); - msg.port_id = ai.flows[fd].port_id; + msg.flow_id = ai.flows[fd].flow_id; pthread_rwlock_unlock(&ai.lock); @@ -1322,7 +1322,7 @@ int ipcp_flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(flow->port_id >= 0); + assert(flow->flow_id >= 0); rb = flow->rx_rb; @@ -1360,7 +1360,7 @@ int ipcp_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(flow->port_id >= 0); + assert(flow->flow_id >= 0); if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { pthread_rwlock_unlock(&ai.lock); @@ -1378,7 +1378,7 @@ int ipcp_flow_write(int fd, ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1424,7 +1424,7 @@ void ipcp_flow_fini(int fd) shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].port_id, + ai.flows[fd].flow_id, FLOW_DEALLOC); rx_rb = ai.flows[fd].rx_rb; @@ -1444,7 +1444,7 @@ int ipcp_flow_get_qoscube(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].port_id >= 0); + assert(ai.flows[fd].flow_id >= 0); *cube = qos_spec_to_cube(ai.flows[fd].spec); @@ -1480,14 +1480,14 @@ int local_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index 48198a5b..ae1014ac 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -42,7 +42,7 @@ enum ipcp_msg_code { message ipcp_msg { required ipcp_msg_code code = 1; optional bytes hash = 2; - optional int32 port_id = 3; + optional int32 flow_id = 3; optional string dst = 4; optional qosspec_msg qosspec = 5; optional ipcp_config_msg conf = 6; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 2ed2ec37..351b4a8e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -67,7 +67,7 @@ message irm_msg { optional sint32 response = 8; optional string dst = 9; optional bytes hash = 10; - optional sint32 port_id = 11; + optional sint32 flow_id = 11; optional qosspec_msg qosspec = 12; optional ipcp_config_msg conf = 13; optional uint32 opts = 14; diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 008e4a0d..1c94c599 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -64,7 +64,7 @@ #define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx) struct portevent { - int port_id; + int flow_id; int event; }; @@ -268,20 +268,20 @@ void shm_flow_set_zero(struct shm_flow_set * set, int shm_flow_set_add(struct shm_flow_set * set, size_t idx, - int port_id) + int flow_id) { assert(set); - assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); assert(idx < PROG_MAX_FQUEUES); pthread_mutex_lock(set->lock); - if (set->mtable[port_id] != -1) { + if (set->mtable[flow_id] != -1) { pthread_mutex_unlock(set->lock); return -EPERM; } - set->mtable[port_id] = idx; + set->mtable[flow_id] = idx; pthread_mutex_unlock(set->lock); @@ -290,33 +290,33 @@ int shm_flow_set_add(struct shm_flow_set * set, void shm_flow_set_del(struct shm_flow_set * set, size_t idx, - int port_id) + int flow_id) { assert(set); - assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); assert(idx < PROG_MAX_FQUEUES); pthread_mutex_lock(set->lock); - if (set->mtable[port_id] == (ssize_t) idx) - set->mtable[port_id] = -1; + if (set->mtable[flow_id] == (ssize_t) idx) + set->mtable[flow_id] = -1; pthread_mutex_unlock(set->lock); } int shm_flow_set_has(struct shm_flow_set * set, size_t idx, - int port_id) + int flow_id) { int ret = 0; assert(set); - assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); assert(idx < PROG_MAX_FQUEUES); pthread_mutex_lock(set->lock); - if (set->mtable[port_id] == (ssize_t) idx) + if (set->mtable[flow_id] == (ssize_t) idx) ret = 1; pthread_mutex_unlock(set->lock); @@ -325,25 +325,25 @@ int shm_flow_set_has(struct shm_flow_set * set, } void shm_flow_set_notify(struct shm_flow_set * set, - int port_id, + int flow_id, int event) { assert(set); - assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); pthread_mutex_lock(set->lock); - if (set->mtable[port_id] == -1) { + if (set->mtable[flow_id] == -1) { pthread_mutex_unlock(set->lock); return; } - (fqueue_ptr(set, set->mtable[port_id]) + - (set->heads[set->mtable[port_id]]))->port_id = port_id; - (fqueue_ptr(set, set->mtable[port_id]) + - (set->heads[set->mtable[port_id]])++)->event = event; + (fqueue_ptr(set, set->mtable[flow_id]) + + (set->heads[set->mtable[flow_id]]))->flow_id = flow_id; + (fqueue_ptr(set, set->mtable[flow_id]) + + (set->heads[set->mtable[flow_id]])++)->event = event; - pthread_cond_signal(&set->conds[set->mtable[port_id]]); + pthread_cond_signal(&set->conds[set->mtable[flow_id]]); pthread_mutex_unlock(set->lock); } diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 5319e89c..a6eab699 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -66,7 +66,7 @@ struct shm_rbuff { pthread_cond_t * add; /* packet arrived */ pthread_cond_t * del; /* packet removed */ pid_t pid; /* pid of the owner */ - int port_id; /* port_id of the flow */ + int flow_id; /* flow_id of the flow */ }; void shm_rbuff_close(struct shm_rbuff * rb) @@ -81,7 +81,7 @@ void shm_rbuff_close(struct shm_rbuff * rb) #define MM_FLAGS (PROT_READ | PROT_WRITE) struct shm_rbuff * rbuff_create(pid_t pid, - int port_id, + int flow_id, int flags) { struct shm_rbuff * rb; @@ -89,7 +89,7 @@ struct shm_rbuff * rbuff_create(pid_t pid, ssize_t * shm_base; char fn[FN_MAX_CHARS]; - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", pid, port_id); + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", pid, flow_id); rb = malloc(sizeof(*rb)); if (rb == NULL) @@ -116,7 +116,7 @@ struct shm_rbuff * rbuff_create(pid_t pid, rb->add = (pthread_cond_t *) (rb->lock + 1); rb->del = rb->add + 1; rb->pid = pid; - rb->port_id = port_id; + rb->flow_id = flow_id; return rb; @@ -131,7 +131,7 @@ struct shm_rbuff * rbuff_create(pid_t pid, } struct shm_rbuff * shm_rbuff_create(pid_t pid, - int port_id) + int flow_id) { struct shm_rbuff * rb; pthread_mutexattr_t mattr; @@ -140,7 +140,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, mask = umask(0); - rb = rbuff_create(pid, port_id, O_CREAT | O_EXCL | O_RDWR); + rb = rbuff_create(pid, flow_id, O_CREAT | O_EXCL | O_RDWR); umask(mask); @@ -175,7 +175,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, *rb->tail = 0; rb->pid = pid; - rb->port_id = port_id; + rb->flow_id = flow_id; pthread_mutexattr_destroy(&mattr); pthread_condattr_destroy(&cattr); @@ -197,9 +197,9 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, } struct shm_rbuff * shm_rbuff_open(pid_t pid, - int port_id) + int flow_id) { - return rbuff_create(pid, port_id, O_RDWR); + return rbuff_create(pid, flow_id, O_RDWR); } #if (defined(SHM_RBUFF_LOCKLESS) && \ diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 43eac7f6..0fc9ae7b 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -29,7 +29,7 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) assert(rb); - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->port_id); + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); shm_rbuff_close(rb); diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index c74503ff..51d801f6 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -33,7 +33,7 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) pthread_mutex_unlock(rb->lock); #endif - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->port_id); + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); shm_rbuff_close(rb); -- cgit v1.2.3 From 10c0c07265d91dbbadb53b77aa5be8a27b7b3cd0 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 5 Oct 2018 21:07:26 +0200 Subject: include: Fix QoS include files A lot of files were unnecessarily including qoscube.h. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/ipcp-dev.h | 1 + include/ouroboros/np1_flow.h | 2 +- include/ouroboros/shm_rdrbuff.h | 1 - src/ipcpd/ipcp.h | 1 - src/ipcpd/normal/dt.h | 1 + src/ipcpd/normal/fa.h | 2 +- src/ipcpd/shim-data.h | 1 - src/irmd/ipcp.h | 1 - src/irmd/irm_flow.h | 2 +- src/irmd/registry.h | 1 - 10 files changed, 5 insertions(+), 8 deletions(-) (limited to 'src/irmd') diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index a0ed026c..4f8b5da8 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -21,6 +21,7 @@ */ #include +#include #ifndef OUROBOROS_IPCP_DEV_H #define OUROBOROS_IPCP_DEV_H diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index 3a2bbd12..3f80161a 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -23,7 +23,7 @@ #ifndef OUROBOROS_NP1_FLOW_H #define OUROBOROS_NP1_FLOW_H -#include +#include #include diff --git a/include/ouroboros/shm_rdrbuff.h b/include/ouroboros/shm_rdrbuff.h index 277609c5..49551497 100644 --- a/include/ouroboros/shm_rdrbuff.h +++ b/include/ouroboros/shm_rdrbuff.h @@ -24,7 +24,6 @@ #define OUROBOROS_SHM_RDRBUFF_H #include -#include #include #include diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 13751b6d..1d25fb3f 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -26,7 +26,6 @@ #include #include #include -#include #include #include diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index 05b8220c..b74e84b0 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -24,6 +24,7 @@ #define OUROBOROS_IPCPD_NORMAL_DT_H #include +#include #include #define DT_COMP "Data Transfer" diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index a98d834a..6a836e17 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -23,7 +23,7 @@ #ifndef OUROBOROS_IPCPD_NORMAL_FA_H #define OUROBOROS_IPCPD_NORMAL_FA_H -#include +#include #include int fa_init(void); diff --git a/src/ipcpd/shim-data.h b/src/ipcpd/shim-data.h index 336ffa35..7a8c01c4 100644 --- a/src/ipcpd/shim-data.h +++ b/src/ipcpd/shim-data.h @@ -23,7 +23,6 @@ #ifndef OUROBOROS_IPCPD_IPCP_DATA_H #define OUROBOROS_IPCPD_IPCP_DATA_H -#include #include #include diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 5d096788..8d9686c2 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -22,7 +22,6 @@ #include #include -#include #include diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index 1cd2e662..26263107 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -24,8 +24,8 @@ #define OUROBOROS_IRMD_IRM_FLOW_H #include +#include #include -#include #include #include diff --git a/src/irmd/registry.h b/src/irmd/registry.h index 62d90c39..c9ea8cce 100644 --- a/src/irmd/registry.h +++ b/src/irmd/registry.h @@ -26,7 +26,6 @@ #include #include #include -#include #include "proc_table.h" #include "prog_table.h" -- cgit v1.2.3