From af6756b94bb8c78d2d09a28966427e68b95c5a93 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 27 Sep 2018 11:49:20 +0200 Subject: lib: Remove configuration from FRCT This removes configuration from the FRCT protocol to send it during flow allocation. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/frct.c | 75 +++++++++++++--------------------------------------------- 1 file changed, 17 insertions(+), 58 deletions(-) (limited to 'src/lib/frct.c') diff --git a/src/lib/frct.c b/src/lib/frct.c index 296d5b2c..c057ba50 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -34,12 +34,9 @@ #define FRCT_CRCLEN (sizeof(uint32_t)) struct frct_cr { - bool drf; uint32_t lwe; uint32_t rwe; - uint32_t seqno; - bool conf; uint8_t cflags; time_t act; @@ -53,6 +50,8 @@ struct frcti { time_t a; time_t r; + uint32_t seqno; + struct frct_cr snd_cr; struct frct_cr rcv_cr; @@ -73,15 +72,12 @@ enum frct_flags { FRCT_ACK = 0x03, /* ACK field valid */ FRCT_FC = 0x08, /* FC window valid */ FRCT_RDVZ = 0x10, /* Rendez-vous */ - FRCT_CFG = 0x20, /* Configuration */ - FRCT_MFGM = 0x40, /* More fragments */ - FRCT_CRC = 0x80, /* CRC present */ + FRCT_MFGM = 0x20, /* More fragments */ + FRCT_CRC = 0x40, /* CRC present */ }; struct frct_pci { - uint8_t flags; - - uint8_t cflags; + uint16_t flags; uint16_t window; @@ -134,14 +130,13 @@ static struct frcti * frcti_create(int fd, delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; + frcti->snd_cr.inact = 3 * delta_t; + frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + if (qc == QOS_CUBE_DATA) frcti->snd_cr.cflags |= FRCTFRTX; - frcti->snd_cr.conf = true; - frcti->snd_cr.inact = 3 * delta_t + 1; - frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - - frcti->rcv_cr.inact = 2 * delta_t + 1; + frcti->rcv_cr.inact = 2 * delta_t; frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); return frcti; @@ -156,7 +151,7 @@ static void frcti_destroy(struct frcti * frcti) { /* * FIXME: In case of reliable transmission we should - * make sure everything is acked. + * make sure everything we sent is acked. */ pthread_rwlock_destroy(&frcti->lock); @@ -164,24 +159,6 @@ static void frcti_destroy(struct frcti * frcti) free(frcti); } -static int frcti_setconf(struct frcti * frcti, - uint16_t flags) -{ - assert(frcti); - - pthread_rwlock_wrlock(&frcti->lock); - - if (frcti->snd_cr.cflags != flags) { - frcti->snd_cr.cflags = flags; - frcti->snd_cr.conf = true; - frcti->snd_cr.drf = true; - } - - pthread_rwlock_unlock(&frcti->lock); - - return 0; -} - static uint16_t frcti_getconf(struct frcti * frcti) { uint16_t ret; @@ -219,14 +196,6 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); idx = frcti->rq[pos]; if (idx != -1) { - struct shm_du_buff * sdb; - struct frct_pci * pci; - - sdb = shm_rdrbuff_get(ai.rdrb, idx); - pci = (struct frct_pci *) shm_du_buff_head(sdb) - 1; - if (pci->flags & FRCT_CFG) - frcti->rcv_cr.cflags = pci->cflags; - ++frcti->rcv_cr.lwe; frcti->rq[pos] = -1; } @@ -297,28 +266,22 @@ static int __frcti_snd(struct frcti * frcti, } /* Set DRF if there are no unacknowledged packets. */ - if (snd_cr->seqno == snd_cr->lwe) + if (frcti->seqno == snd_cr->lwe) pci->flags |= FRCT_DRF; - if (snd_cr->conf) { - /* FIXME: This packet must be acked! */ - pci->flags |= FRCT_CFG; - pci->cflags = snd_cr->cflags; - } - /* Choose a new sequence number if sender inactivity expired. */ if (now.tv_sec - snd_cr->act > snd_cr->inact) { /* There are no unacknowledged packets. */ - assert(snd_cr->seqno == snd_cr->lwe); -#ifdef OUROBOROS_CONFIG_DEBUG - frcti->snd_cr.seqno = 0; + assert(frcti->seqno == snd_cr->lwe); +#ifdef CONFIG_OUROBOROS_DEBUG + frcti->seqno = 0; #else - random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); + random_buffer(&frcti->seqno, sizeof(frcti->seqno)); #endif - frcti->snd_cr.lwe = frcti->snd_cr.seqno; + frcti->snd_cr.lwe = frcti->seqno; } - pci->seqno = hton32(snd_cr->seqno++); + pci->seqno = hton32(frcti->seqno++); if (!(snd_cr->cflags & FRCTFRTX)) snd_cr->lwe++; else @@ -326,7 +289,6 @@ static int __frcti_snd(struct frcti * frcti, snd_cr->lwe++; snd_cr->act = now.tv_sec; - snd_cr->conf = false; pthread_rwlock_unlock(&frcti->lock); @@ -376,9 +338,6 @@ static int __frcti_rcv(struct frcti * frcti, if (seqno == rcv_cr->lwe) { ++rcv_cr->lwe; - /* Check for online reconfiguration. */ - if (pci->flags & FRCT_CFG) - rcv_cr->cflags = pci->cflags; } else { /* Out of order. */ if ((int32_t)(seqno - rcv_cr->lwe) < 0) /* Duplicate. */ goto drop_packet; -- cgit v1.2.3 From 937f2b345aa76272a1c80828e7666ab87611c0d1 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 27 Sep 2018 11:43:02 +0200 Subject: lib: Check return values init functions This will check the return values of init functions so that the code is more robust. It also removes a duplicate init in the timerwheel, checks for buffer overflows in the RIB and checks string lengths. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/lib/cacep.c | 7 ++++++- src/lib/frct.c | 6 +++--- src/lib/irm.c | 14 ++++++++++---- src/lib/rib.c | 15 ++++++++++++++- src/lib/shm_flow_set.c | 44 +++++++++++++++++++++++++------------------- src/lib/timerwheel.c | 3 --- 6 files changed, 58 insertions(+), 31 deletions(-) (limited to 'src/lib/frct.c') diff --git a/src/lib/cacep.c b/src/lib/cacep.c index 6efb7295..12751078 100644 --- a/src/lib/cacep.c +++ b/src/lib/cacep.c @@ -32,7 +32,7 @@ #include "cacep.pb-c.h" typedef CacepMsg cacep_msg_t; -#define BUF_SIZE 64 +#define BUF_SIZE 128 static int read_msg(int fd, struct conn_info * info) @@ -49,6 +49,11 @@ static int read_msg(int fd, if (msg == NULL) return -1; + if (strlen(msg->comp_name) > CACEP_BUF_STRLEN) { + cacep_msg__free_unpacked(msg, NULL); + return -1; + } + strcpy(info->comp_name, msg->comp_name); strcpy(info->protocol, msg->protocol); diff --git a/src/lib/frct.c b/src/lib/frct.c index c057ba50..516c958b 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -357,15 +357,15 @@ static int __frcti_rcv(struct frcti * frcti, rcv_cr->act = now.tv_sec; + pthread_rwlock_unlock(&frcti->lock); + if (!(pci->flags & FRCT_DATA)) shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&frcti->lock); - return ret; drop_packet: - shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&frcti->lock); + shm_rdrbuff_remove(ai.rdrb, idx); return -EAGAIN; } diff --git a/src/lib/irm.c b/src/lib/irm.c index bd34669f..d88475c4 100644 --- a/src/lib/irm.c +++ b/src/lib/irm.c @@ -319,10 +319,10 @@ static int check_prog(const char * prog) static int check_prog_path(char ** prog) { - char * path = getenv("PATH"); - char * path_end = path + strlen(path) + 1; + char * path; + char * path_end; char * pstart; - char * pstop = path; + char * pstop; char * tmp; char * tstop; char * tstart; @@ -331,9 +331,15 @@ static int check_prog_path(char ** prog) assert(prog); - if (*prog == NULL || path == NULL) + if (*prog == NULL) return -EINVAL; + path = getenv("PATH"); + if (path == NULL) + return -ENOENT; + + pstop = path; + path_end = path + strlen(path) + 1; if (!strlen(path) || strchr(*prog, '/') != NULL) { if ((ret = check_prog(*prog)) < 0) return ret; diff --git a/src/lib/rib.c b/src/lib/rib.c index 685575e5..88db9ed8 100644 --- a/src/lib/rib.c +++ b/src/lib/rib.c @@ -101,6 +101,9 @@ static int rib_read(const char * path, char comp[RIB_PATH_LEN + 1]; char * c; + if (strlen(path) > RIB_PATH_LEN) + return -1; + strcpy(comp, path + 1); c = strstr(comp, "/"); @@ -183,6 +186,9 @@ static size_t __getattr(const char * path, char comp[RIB_PATH_LEN + 1]; char * c; + if (strlen(path) > RIB_PATH_LEN) + return -1; + strcpy(comp, path + 1); c = strstr(comp, "/"); @@ -282,7 +288,8 @@ int rib_init(const char * mountpt) if (stat(rib.mnt, &st) == -1) switch(errno) { case ENOENT: - mkdir(rib.mnt, 0777); + if (mkdir(rib.mnt, 0777)) + return -1; break; case ENOTCONN: fuse_unmount(rib.mnt, rib.ch); @@ -385,6 +392,12 @@ int rib_reg(const char * path, return -ENOMEM; } + if (strlen(path) > RIB_PATH_LEN) { + pthread_rwlock_unlock(&rib.lock); + free(rc); + return -1; + } + strcpy(rc->path, path); rc->ops = ops; diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index bb9e3caa..008e4a0d 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -98,17 +98,14 @@ struct shm_flow_set * shm_flow_set_create() mask = umask(0); shm_fd = shm_open(fn, O_CREAT | O_RDWR, 0666); - if (shm_fd == -1) { - free(set); - return NULL; - } + if (shm_fd == -1) + goto fail_shm_open; umask(mask); if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) { - free(set); close(shm_fd); - return NULL; + goto fail_shm_open; } shm_base = mmap(NULL, @@ -120,11 +117,8 @@ struct shm_flow_set * shm_flow_set_create() close(shm_fd); - if (shm_base == MAP_FAILED) { - shm_unlink(fn); - free(set); - return NULL; - } + if (shm_base == MAP_FAILED) + goto fail_mmap; set->mtable = shm_base; set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS); @@ -133,21 +127,27 @@ struct shm_flow_set * shm_flow_set_create() set->lock = (pthread_mutex_t *) (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); - pthread_mutexattr_init(&mattr); + if (pthread_mutexattr_init(&mattr)) + goto fail_mmap; + #ifdef HAVE_ROBUST_MUTEX - pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); + if (pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST)) + goto fail_mmap; #endif - pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(set->lock, &mattr); + if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) || + pthread_mutex_init(set->lock, &mattr) || + pthread_condattr_init(&cattr) || + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED)) + goto fail_mmap; - pthread_condattr_init(&cattr); - pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); #ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); + if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) + goto fail_mmap; #endif for (i = 0; i < PROG_MAX_FQUEUES; ++i) { set->heads[i] = 0; - pthread_cond_init(&set->conds[i], &cattr); + if (pthread_cond_init(&set->conds[i], &cattr)) + goto fail_mmap; } for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -156,6 +156,12 @@ struct shm_flow_set * shm_flow_set_create() set->pid = getpid(); return set; + + fail_mmap: + shm_unlink(fn); + fail_shm_open: + free(set); + return NULL; } struct shm_flow_set * shm_flow_set_open(pid_t pid) diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index ef8489bf..0feda642 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -121,9 +121,6 @@ struct timerwheel * timerwheel_create(time_t resolution, if (tw == NULL) return NULL; - if (pthread_mutex_init(&tw->lock, NULL)) - return NULL; - tw->elements = 1; while (tw->elements < (size_t) max_delay / resolution) -- cgit v1.2.3 From b802b25ddfe6f1b6ecabe3ba70e3dac2e99e7a50 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 4 Oct 2018 18:06:32 +0200 Subject: lib: Pass qosspec at flow allocation The flow allocator now passes the full qos specification to the endpoint, instead of just a cube. This is a more flexible architecture, as it makes QoS cubes internal to the layers. Adds endianness transforms for the flow allocator protocol in the normal IPCP. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/ipcp-dev.h | 2 +- include/ouroboros/np1_flow.h | 2 +- include/ouroboros/qos.h | 22 ++++------ include/ouroboros/sockets.h.in | 11 +++++ src/ipcpd/eth/eth.c | 51 ++++++++++++++++------ src/ipcpd/ipcp.c | 13 +++++- src/ipcpd/ipcp.h | 2 +- src/ipcpd/local/main.c | 4 +- src/ipcpd/normal/dt.c | 2 - src/ipcpd/normal/fa.c | 57 +++++++++++++++++------- src/ipcpd/normal/fa.h | 2 +- src/ipcpd/raptor/CMakeLists.txt | 1 + src/ipcpd/raptor/main.c | 77 +++++++++++++++++++------------- src/ipcpd/udp/main.c | 48 +++++++++++++------- src/irmd/ipcp.c | 13 +++--- src/irmd/ipcp.h | 2 +- src/irmd/irm_flow.c | 4 +- src/irmd/irm_flow.h | 5 ++- src/irmd/main.c | 22 ++++++---- src/lib/CMakeLists.txt | 4 +- src/lib/dev.c | 87 ++++++++++++++++-------------------- src/lib/frct.c | 5 +-- src/lib/ipcpd_messages.proto | 3 +- src/lib/irmd_messages.proto | 3 +- src/lib/qos.c | 97 +++++++++++++++++++---------------------- src/lib/qoscube.c | 30 +++---------- src/lib/qosspec.proto | 33 ++++++++++++++ src/lib/sockets.c | 35 +++++++++++++++ 28 files changed, 388 insertions(+), 249 deletions(-) create mode 100644 src/lib/qosspec.proto (limited to 'src/lib/frct.c') diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 9a33c25a..a0ed026c 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -31,7 +31,7 @@ int ipcp_create_r(pid_t pid, int ipcp_flow_req_arr(pid_t pid, const uint8_t * dst, size_t len, - qoscube_t cube); + qosspec_t qs); int ipcp_flow_alloc_reply(int fd, int response); diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index 789e82df..3435c24a 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -29,7 +29,7 @@ int np1_flow_alloc(pid_t n_pid, int port_id, - qoscube_t qc); + qosspec_t qs); int np1_flow_resp(int port_id); diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h index 011828d7..2b93f1d0 100644 --- a/include/ouroboros/qos.h +++ b/include/ouroboros/qos.h @@ -27,26 +27,20 @@ #include typedef struct qos_spec { - uint32_t delay; /* In ms */ - uint64_t bandwidth; /* In bits/s */ - uint8_t availability; /* Class of 9s */ - uint32_t loss; /* Packet loss */ - uint8_t in_order; /* In-order delivery, enables FRCT */ - uint32_t maximum_interruption; /* In ms */ + uint32_t delay; /* In ms */ + uint64_t bandwidth; /* In bits/s */ + uint8_t availability; /* Class of 9s */ + uint32_t loss; /* Packet loss */ + uint32_t ber; /* Bit error rate, errors per billion bits */ + uint8_t in_order; /* In-order delivery, enables FRCT */ + uint32_t max_gap; /* In ms */ } qosspec_t; qosspec_t qos_raw; +qosspec_t qos_raw_no_errors; qosspec_t qos_best_effort; qosspec_t qos_video; qosspec_t qos_voice; qosspec_t qos_data; -__BEGIN_DECLS - -int qosspec_init(qosspec_t * qs); - -int qosspec_fini(qosspec_t * qs); - -__END_DECLS - #endif /* OUROBOROS_QOS_H */ diff --git a/include/ouroboros/sockets.h.in b/include/ouroboros/sockets.h.in index 4557a9ef..368923db 100644 --- a/include/ouroboros/sockets.h.in +++ b/include/ouroboros/sockets.h.in @@ -23,6 +23,8 @@ #ifndef OUROBOROS_SOCKETS_H #define OUROBOROS_SOCKETS_H +#include + #include #include "ipcp_config.pb-c.h" @@ -36,6 +38,9 @@ typedef IpcpInfoMsg ipcp_info_msg_t; #include "ipcpd_messages.pb-c.h" typedef IpcpMsg ipcp_msg_t; +#include "qosspec.pb-c.h" +typedef QosspecMsg qosspec_msg_t; + #define SOCK_PATH "/var/run/ouroboros/" #define SOCK_PATH_SUFFIX ".sock" @@ -53,4 +58,10 @@ int client_socket_open(char * file_name); irm_msg_t * send_recv_irm_msg(irm_msg_t * msg); + +/* qos message conversion needed in different components */ +qosspec_msg_t spec_to_msg(qosspec_t * qs); + +qosspec_t msg_to_spec(qosspec_msg_t * msg); + #endif diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 44ef3756..6fd7b805 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -146,15 +146,27 @@ #define NAME_QUERY_REPLY 3 struct mgmt_msg { - uint8_t code; #if defined(BUILD_ETH_DIX) uint16_t seid; uint16_t deid; #elif defined(BUILD_ETH_LLC) uint8_t ssap; uint8_t dsap; + /* QoS here for alignment */ + uint8_t code; + uint8_t availability; +#endif + /* QoS parameters from spec, aligned */ + uint32_t loss; + uint64_t bandwidth; + uint32_t ber; + uint32_t max_gap; + uint32_t delay; + uint8_t in_order; +#if defined (BUILD_ETH_DIX) + uint8_t code; + uint8_t availability; #endif - uint8_t qoscube; int8_t response; } __attribute__((packed)); @@ -433,7 +445,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, uint8_t ssap, #endif const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { uint8_t * buf; struct mgmt_msg * msg; @@ -453,7 +465,14 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, #elif defined(BUILD_ETH_LLC) msg->ssap = ssap; #endif - msg->qoscube = cube; + + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -523,7 +542,7 @@ static int eth_ipcp_req(uint8_t * r_addr, uint8_t r_sap, #endif const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, ALLOC_TIMEO * MILLION}; struct timespec abstime; @@ -547,7 +566,7 @@ static int eth_ipcp_req(uint8_t * r_addr, } /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -687,11 +706,20 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, uint8_t * r_addr) { struct mgmt_msg * msg; + qosspec_t qs; msg = (struct mgmt_msg *) buf; switch (msg->code) { case FLOW_REQ: + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { eth_ipcp_req(r_addr, @@ -701,7 +729,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, msg->ssap, #endif buf + sizeof(*msg), - msg->qoscube); + qs); } break; case FLOW_REPLY: @@ -1553,7 +1581,7 @@ static int eth_ipcp_query(const uint8_t * hash) static int eth_ipcp_flow_alloc(int fd, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { #ifdef BUILD_ETH_LLC uint8_t ssap = 0; @@ -1565,11 +1593,6 @@ static int eth_ipcp_flow_alloc(int fd, assert(hash); - if (cube > QOS_CUBE_DATA) { - log_dbg("Unsupported QoS requested."); - return -1; - } - if (!shim_data_dir_has(eth_data.shim_data, hash)) { log_err("Destination unreachable."); return -1; @@ -1597,7 +1620,7 @@ static int eth_ipcp_flow_alloc(int fd, #elif defined(BUILD_ETH_LLC) ssap, #endif - hash, cube) < 0) { + hash, qs) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 5ea54533..e415bbd9 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -20,6 +20,13 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#define __XSI_VISIBLE 500 +#endif + #if defined(__linux__) && !defined(DISABLE_CORE_LOCK) #define _GNU_SOURCE #define NPROC (sysconf(_SC_NPROCESSORS_ONLN)) @@ -198,6 +205,7 @@ static void * mainloop(void * o) layer_info_msg_t layer_info = LAYER_INFO_MSG__INIT; int fd = -1; struct cmd * cmd; + qosspec_t qs; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -422,9 +430,10 @@ static void * mainloop(void * o) break; } + qs = msg_to_spec(msg->qosspec); fd = np1_flow_alloc(msg->pid, msg->port_id, - msg->qoscube); + qs); if (fd < 0) { log_err("Failed allocating fd on port_id %d.", msg->port_id); @@ -435,7 +444,7 @@ static void * mainloop(void * o) ret_msg.result = ipcpi.ops->ipcp_flow_alloc(fd, msg->hash.data, - msg->qoscube); + qs); break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: ret_msg.has_result = true; diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 5417fc74..13751b6d 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -60,7 +60,7 @@ struct ipcp_ops { int (* ipcp_flow_alloc)(int fd, const uint8_t * dst, - qoscube_t qos); + qosspec_t qs); int (* ipcp_flow_alloc_resp)(int fd, int response); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index c83f85fe..8eae7503 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -183,7 +183,7 @@ static int ipcp_local_query(const uint8_t * hash) static int ipcp_local_flow_alloc(int fd, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; struct timespec abstime; @@ -212,7 +212,7 @@ static int ipcp_local_flow_alloc(int fd, assert(ipcpi.alloc_id == -1); - out_fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + out_fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (out_fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_dbg("Flow allocation failed: %d", out_fd); diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index c3f8f198..a350e4be 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -31,8 +31,6 @@ #define DT "dt" #define OUROBOROS_PREFIX DT -/* FIXME: fix #defines and remove endian.h include. */ -#include #include #include #include diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 10f0a863..4c82e0e0 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -57,8 +57,15 @@ struct fa_msg { uint32_t r_eid; uint32_t s_eid; uint8_t code; - uint8_t qc; int8_t response; + /* QoS parameters from spec, aligned */ + uint8_t availability; + uint8_t in_order; + uint32_t delay; + uint64_t bandwidth; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; } __attribute__((packed)); struct { @@ -100,6 +107,7 @@ static void fa_post_sdu(void * comp, int fd; uint8_t * buf; struct fa_msg * msg; + qosspec_t qs; (void) comp; @@ -142,10 +150,18 @@ static void fa_post_sdu(void * comp, assert(ipcpi.alloc_id == -1); + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + fd = ipcp_flow_req_arr(getpid(), (uint8_t *) (msg + 1), ipcp_dir_hash_len(), - msg->qc); + qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Failed to get fd for flow."); @@ -155,8 +171,8 @@ static void fa_post_sdu(void * comp, pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[fd] = msg->s_eid; - fa.r_addr[fd] = msg->s_addr; + fa.r_eid[fd] = ntoh32(msg->s_eid); + fa.r_addr[fd] = ntoh64(msg->s_addr); pthread_rwlock_unlock(&fa.flows_lock); @@ -169,14 +185,14 @@ static void fa_post_sdu(void * comp, case FLOW_REPLY: pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[msg->r_eid] = msg->s_eid; + fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); - ipcp_flow_alloc_reply(msg->r_eid, msg->response); + ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response); if (msg->response < 0) - destroy_conn(msg->r_eid); + destroy_conn(ntoh32(msg->r_eid)); else - sdu_sched_add(fa.sdu_sched, msg->r_eid); + sdu_sched_add(fa.sdu_sched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -227,11 +243,12 @@ void fa_stop(void) int fa_alloc(int fd, const uint8_t * dst, - qoscube_t qc) + qosspec_t qs) { struct fa_msg * msg; uint64_t addr; struct shm_du_buff * sdb; + qoscube_t qc; addr = dir_query(dst); if (addr == 0) @@ -240,14 +257,22 @@ int fa_alloc(int fd, if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) return -1; - msg = (struct fa_msg *) shm_du_buff_head(sdb); - msg->code = FLOW_REQ; - msg->qc = qc; - msg->s_eid = fd; - msg->s_addr = ipcpi.dt_addr; + msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg->code = FLOW_REQ; + msg->s_eid = hton32(fd); + msg->s_addr = hton64(ipcpi.dt_addr); + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, dst, ipcp_dir_hash_len()); + qc = qos_spec_to_cube(qs); + if (dt_write_sdu(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; @@ -302,8 +327,8 @@ int fa_alloc_resp(int fd, msg = (struct fa_msg *) shm_du_buff_head(sdb); msg->code = FLOW_REPLY; - msg->r_eid = fa.r_eid[fd]; - msg->s_eid = fd; + msg->r_eid = hton32(fa.r_eid[fd]); + msg->s_eid = hton32(fd); msg->response = response; if (response < 0) { diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index 87819d6f..a98d834a 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -36,7 +36,7 @@ void fa_stop(void); int fa_alloc(int fd, const uint8_t * dst, - qoscube_t qos); + qosspec_t qs); int fa_alloc_resp(int fd, int response); diff --git a/src/ipcpd/raptor/CMakeLists.txt b/src/ipcpd/raptor/CMakeLists.txt index 06e6ee29..1883d9bb 100644 --- a/src/ipcpd/raptor/CMakeLists.txt +++ b/src/ipcpd/raptor/CMakeLists.txt @@ -16,6 +16,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux") find_path(RAPTOR_KERNEL_MODULE NAMES raptor.ko.gz + raptor.ko.xz HINTS /lib/modules/${CMAKE_SYSTEM_VERSION}/extra ) diff --git a/src/ipcpd/raptor/main.c b/src/ipcpd/raptor/main.c index 4f0099b3..a01889ec 100644 --- a/src/ipcpd/raptor/main.c +++ b/src/ipcpd/raptor/main.c @@ -90,11 +90,18 @@ #define NAME_QUERY_REPLY 3 struct mgmt_msg { - uint8_t code; - uint8_t ssap; - uint8_t dsap; - uint8_t qoscube; - int8_t response; + uint8_t code; + uint8_t ssap; + uint8_t dsap; + int8_t response; + /* QoS parameters from spec, aligned */ + uint32_t loss; + uint64_t bandwidth; + uint32_t ber; + uint32_t max_gap; + uint32_t delay; + uint8_t in_order; + uint8_t availability; } __attribute__((packed)); struct ef { @@ -278,7 +285,7 @@ static int raptor_send_frame(struct shm_du_buff * sdb, static int raptor_sap_alloc(uint8_t ssap, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { struct mgmt_msg * msg; struct shm_du_buff * sdb; @@ -288,10 +295,16 @@ static int raptor_sap_alloc(uint8_t ssap, return -1; } - msg = (struct mgmt_msg *) shm_du_buff_head(sdb); - msg->code = FLOW_REQ; - msg->ssap = ssap; - msg->qoscube = cube; + msg = (struct mgmt_msg *) shm_du_buff_head(sdb); + msg->code = FLOW_REQ; + msg->ssap = ssap; + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -306,15 +319,15 @@ static int raptor_sap_alloc(uint8_t ssap, return 0; } -static int raptor_sap_alloc_resp(uint8_t ssap, - uint8_t dsap, - int response) +static int raptor_sap_alloc_resp(uint8_t ssap, + uint8_t dsap, + int response) { - struct mgmt_msg * msg; + struct mgmt_msg * msg; struct shm_du_buff * sdb; if (ipcp_sdb_reserve(&sdb, sizeof(*msg)) < 0) { - log_err("failed to reserve sdb for management frame."); + log_err("Failed to reserve sdb for management frame."); return -1; } @@ -337,7 +350,7 @@ static int raptor_sap_alloc_resp(uint8_t ssap, static int raptor_sap_req(uint8_t r_sap, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000}; struct timespec abstime; @@ -361,7 +374,7 @@ static int raptor_sap_req(uint8_t r_sap, } /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -424,12 +437,12 @@ static int raptor_name_query_req(const uint8_t * hash) return 0; if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len()) < 0) { - log_err("failed to reserve sdb for management frame."); + log_err("Failed to reserve sdb for management frame."); return -1; } - msg = (struct mgmt_msg *) shm_du_buff_head(sdb); - msg->code = NAME_QUERY_REPLY; + msg = (struct mgmt_msg *) shm_du_buff_head(sdb); + msg->code = NAME_QUERY_REPLY; memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -456,8 +469,9 @@ static int raptor_name_query_reply(const uint8_t * hash) static int raptor_mgmt_frame(const uint8_t * buf, size_t len) { - struct mgmt_msg * msg = (struct mgmt_msg *) buf; - uint8_t * hash = (uint8_t *) (msg + 1); + struct mgmt_msg * msg = (struct mgmt_msg *) buf; + uint8_t * hash = (uint8_t *) (msg + 1); + qosspec_t qs; switch (msg->code) { case FLOW_REQ: @@ -466,8 +480,16 @@ static int raptor_mgmt_frame(const uint8_t * buf, return -1; } + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + if (shim_data_reg_has(raptor_data.shim_data, hash)) - raptor_sap_req(msg->ssap, hash, msg->qoscube); + raptor_sap_req(msg->ssap, hash, qs); break; case FLOW_REPLY: if (len != sizeof(*msg)) { @@ -901,7 +923,7 @@ static int raptor_query(const uint8_t * hash) static int raptor_flow_alloc(int fd, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { uint8_t ssap = 0; @@ -909,11 +931,6 @@ static int raptor_flow_alloc(int fd, assert(hash); - if (cube != QOS_CUBE_BE) { - log_dbg("Unsupported QoS requested."); - return -1; - } - if (!shim_data_dir_has(raptor_data.shim_data, hash)) { log_err("Destination unreachable."); return -1; @@ -932,7 +949,7 @@ static int raptor_flow_alloc(int fd, pthread_rwlock_unlock(&raptor_data.flows_lock); - if (raptor_sap_alloc(ssap, hash, cube) < 0) { + if (raptor_sap_alloc(ssap, hash, qs) < 0) { pthread_rwlock_wrlock(&raptor_data.flows_lock); bmp_release(raptor_data.saps, raptor_data.fd_to_ef[fd].sap); raptor_data.fd_to_ef[fd].sap = -1; diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 6a350da0..96820662 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -73,8 +73,15 @@ struct mgmt_msg { uint16_t src_udp_port; uint16_t dst_udp_port; uint8_t code; - uint8_t qoscube; uint8_t response; + /* QoS parameters from spec, aligned */ + uint8_t availability; + uint8_t in_order; + uint32_t delay; + uint64_t bandwidth; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; } __attribute__((packed)); struct uf { @@ -219,7 +226,7 @@ static int send_shim_udp_msg(uint8_t * buf, static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, uint16_t src_udp_port, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { uint8_t * buf; struct mgmt_msg * msg; @@ -235,7 +242,13 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, msg = (struct mgmt_msg *) buf; msg->code = FLOW_REQ; msg->src_udp_port = src_udp_port; - msg->qoscube = cube; + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, dst, ipcp_dir_hash_len()); @@ -272,7 +285,7 @@ static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr, static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; struct timespec abstime; @@ -331,7 +344,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, } /* reply to IRM */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -436,7 +449,7 @@ static void * ipcp_udp_listener(void * o) while (true) { struct mgmt_msg * msg = NULL; - + qosspec_t qs; memset(&buf, 0, SHIM_UDP_MSG_SIZE); n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, (struct sockaddr *) &c_saddr, @@ -455,9 +468,16 @@ static void * ipcp_udp_listener(void * o) switch (msg->code) { case FLOW_REQ: c_saddr.sin_port = msg->src_udp_port; + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); ipcp_udp_port_req(&c_saddr, (uint8_t *) (msg + 1), - msg->qoscube); + qs); break; case FLOW_REPLY: ipcp_udp_port_alloc_reply(msg->src_udp_port, @@ -555,7 +575,8 @@ static void * ipcp_udp_sdu_loop(void * o) pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_cleanup_push((void (*)(void *)) ipcp_sdb_release, + pthread_cleanup_push((void (*)(void *)) + ipcp_sdb_release, (void *) sdb); if (send(fd, shm_du_buff_head(sdb), @@ -968,7 +989,7 @@ static int ipcp_udp_query(const uint8_t * hash) static int ipcp_udp_flow_alloc(int fd, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct sockaddr_in r_saddr; /* server address */ struct sockaddr_in f_saddr; /* flow */ @@ -978,12 +999,9 @@ static int ipcp_udp_flow_alloc(int fd, log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst)); - assert(dst); + (void) qs; - if (cube > QOS_CUBE_DATA) { - log_dbg("Unsupported QoS requested."); - return -1; - } + assert(dst); skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (skfd < 0) @@ -1034,7 +1052,7 @@ static int ipcp_udp_flow_alloc(int fd, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, cube) < 0) { + if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, qs) < 0) { pthread_rwlock_wrlock(&udp_data.flows_lock); udp_data.fd_to_uf[fd].udp = -1; diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index dc8f1c6e..0bdf674b 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -433,11 +433,12 @@ int ipcp_flow_alloc(pid_t pid, pid_t n_pid, const uint8_t * dst, size_t len, - qoscube_t cube) + qosspec_t qs) { - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; + ipcp_msg_t msg = IPCP_MSG__INIT; + qosspec_msg_t qs_msg; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; assert(dst); @@ -449,8 +450,8 @@ int ipcp_flow_alloc(pid_t pid, msg.has_hash = true; msg.hash.len = len; msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = cube; + qs_msg = spec_to_msg(&qs); + msg.qosspec = &qs_msg; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 8ff062b2..28396333 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -67,7 +67,7 @@ int ipcp_flow_alloc(pid_t pid, pid_t n_pid, const uint8_t * dst, size_t len, - qoscube_t qos); + qosspec_t qs); int ipcp_flow_alloc_resp(pid_t pid, int port_id, diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index dfbe5e95..a5a9f28c 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -39,7 +39,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, int port_id, - qoscube_t qc) + qosspec_t qs) { pthread_condattr_t cattr; struct irm_flow * f = malloc(sizeof(*f)); @@ -61,7 +61,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, f->n_pid = n_pid; f->n_1_pid = n_1_pid; f->port_id = port_id; - f->qc = qc; + f->qs = qs; f->n_rb = shm_rbuff_create(n_pid, port_id); if (f->n_rb == NULL) { diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index d53984e8..f4de8187 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -43,11 +43,12 @@ struct irm_flow { struct list_head next; int port_id; - qoscube_t qc; pid_t n_pid; pid_t n_1_pid; + qosspec_t qs; + struct shm_rbuff * n_rb; struct shm_rbuff * n_1_rb; @@ -61,7 +62,7 @@ struct irm_flow { struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, int port_id, - qoscube_t qc); + qosspec_t qs); void irm_flow_destroy(struct irm_flow * f); diff --git a/src/irmd/main.c b/src/irmd/main.c index 634bf4de..9504f3b5 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1264,7 +1264,7 @@ static int flow_accept(pid_t pid, static int flow_alloc(pid_t pid, const char * dst, - qoscube_t cube, + qosspec_t qs, struct timespec * timeo, struct irm_flow ** e) { @@ -1288,7 +1288,7 @@ static int flow_alloc(pid_t pid, return -EBADF; } - f = irm_flow_create(pid, ipcp->pid, port_id, cube); + f = irm_flow_create(pid, ipcp->pid, port_id, qs); if (f == NULL) { bmp_release(irmd.port_ids, port_id); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1310,7 +1310,7 @@ static int flow_alloc(pid_t pid, str_hash(ipcp->dir_hash_algo, hash, dst); if (ipcp_flow_alloc(ipcp->pid, port_id, pid, hash, - IPCP_HASH_LEN(ipcp), cube)) { + IPCP_HASH_LEN(ipcp), qs)) { /* sanitizer cleans this */ log_info("Flow_allocation failed."); free(hash); @@ -1418,7 +1418,7 @@ static pid_t auto_execute(char ** argv) static struct irm_flow * flow_req_arr(pid_t pid, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { struct reg_entry * re = NULL; struct prog_entry * a = NULL; @@ -1521,7 +1521,7 @@ static struct irm_flow * flow_req_arr(pid_t pid, return NULL; } - f = irm_flow_create(h_pid, pid, port_id, cube); + f = irm_flow_create(h_pid, pid, port_id, qs); if (f == NULL) { bmp_release(irmd.port_ids, port_id); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1993,17 +1993,19 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IRM_FLOW_ACCEPT: result = flow_accept(msg->pid, timeo, &e); if (result == 0) { + qosspec_msg_t qs_msg; ret_msg->has_port_id = true; ret_msg->port_id = e->port_id; ret_msg->has_pid = true; ret_msg->pid = e->n_1_pid; - ret_msg->has_qoscube = true; - ret_msg->qoscube = e->qc; + qs_msg = spec_to_msg(&e->qs); + ret_msg->qosspec = &qs_msg; } break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: result = flow_alloc(msg->pid, msg->dst, - msg->qoscube, timeo, &e); + msg_to_spec(msg->qosspec), + timeo, &e); if (result == 0) { ret_msg->has_port_id = true; ret_msg->port_id = e->port_id; @@ -2017,7 +2019,7 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->pid, msg->hash.data, - msg->qoscube); + msg_to_spec(msg->qosspec)); result = (e == NULL ? -1 : 0); if (result == 0) { ret_msg->has_port_id = true; @@ -2061,6 +2063,8 @@ static void * mainloop(void * o) irm_msg__pack(ret_msg, buffer.data); + /* Can't free the qosspec. */ + ret_msg->qosspec = NULL; irm_msg__free_unpacked(ret_msg, NULL); pthread_cleanup_push(close_ptr, &sfd); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index e7e07802..aa4e5bf3 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -6,6 +6,8 @@ include_directories(${CMAKE_BINARY_DIR}/include) protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto) protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto) +protobuf_generate_c(QOSSPEC_PROTO_SRCS QOSSPEC_PROTO_HDRS + qosspec.proto) protobuf_generate_c(LAYER_CONFIG_PROTO_SRCS LAYER_CONFIG_PROTO_HDRS ipcp_config.proto) protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto) @@ -214,7 +216,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in" "${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY) add_library(ouroboros-common SHARED ${SOURCE_FILES_COMMON} ${IRM_PROTO_SRCS} - ${IPCP_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) + ${IPCP_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS} ${QOSSPEC_PROTO_SRCS}) add_library(ouroboros-dev SHARED ${SOURCE_FILES_DEV} ${CACEP_PROTO_SRCS}) diff --git a/src/lib/dev.c b/src/lib/dev.c index 3d9e1d49..a92c1e42 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -44,7 +44,6 @@ #include #include #include -#include #include #include @@ -94,7 +93,6 @@ struct flow { struct shm_flow_set * set; int port_id; int oflags; - qoscube_t cube; qosspec_t spec; ssize_t part_idx; @@ -235,7 +233,6 @@ static void flow_clear(int fd) ai.flows[fd].port_id = -1; ai.flows[fd].pid = -1; - ai.flows[fd].cube = QOS_CUBE_BE; } static void flow_fini(int fd) @@ -272,7 +269,7 @@ static void flow_fini(int fd) static int flow_init(int port_id, pid_t pid, - qoscube_t qc) + qosspec_t qs) { int fd; int err = -ENOMEM; @@ -300,9 +297,8 @@ static int flow_init(int port_id, ai.flows[fd].port_id = port_id; ai.flows[fd].oflags = FLOWFDEFAULT; ai.flows[fd].pid = pid; - ai.flows[fd].cube = qc; - ai.flows[fd].spec = qos_cube_to_spec(qc); ai.flows[fd].part_idx = NO_PART; + ai.flows[fd].spec = qs; ai.ports[port_id].fd = fd; @@ -499,7 +495,6 @@ int flow_accept(qosspec_t * qs, irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg; int fd; - qoscube_t qc; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; @@ -528,14 +523,13 @@ int flow_accept(qosspec_t * qs, } if (!recv_msg->has_pid || !recv_msg->has_port_id || - !recv_msg->has_qoscube) { + recv_msg->qosspec == NULL) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - qc = recv_msg->qoscube; - - fd = flow_init(recv_msg->port_id, recv_msg->pid, recv_msg->qoscube); + fd = flow_init(recv_msg->port_id, recv_msg->pid, + msg_to_spec(recv_msg->qosspec)); irm_msg__free_unpacked(recv_msg, NULL); @@ -544,12 +538,10 @@ int flow_accept(qosspec_t * qs, pthread_rwlock_wrlock(&ai.lock); - /* FIXME: check if FRCT is needed based on qc? */ - assert(ai.flows[fd].frcti == NULL); - if (qc != QOS_CUBE_RAW) { - ai.flows[fd].frcti = frcti_create(fd, qc); + if (ai.flows[fd].spec.in_order != 0) { + ai.flows[fd].frcti = frcti_create(fd); if (ai.flows[fd].frcti == NULL) { flow_fini(fd); pthread_rwlock_unlock(&ai.lock); @@ -569,21 +561,17 @@ int flow_alloc(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - qoscube_t qc = QOS_CUBE_RAW; - int fd; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.has_qoscube = true; - msg.pid = ai.pid; - - if (qs != NULL) - qc = qos_spec_to_cube(*qs); + irm_msg_t msg = IRM_MSG__INIT; + qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT; + irm_msg_t * recv_msg; + int fd; - msg.qoscube = qc; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; + msg.dst = (char *) dst; + msg.has_pid = true; + msg.pid = ai.pid; + qs_msg = spec_to_msg(qs); + msg.qosspec = &qs_msg; if (timeo != NULL) { msg.has_timeo_sec = true; @@ -612,7 +600,8 @@ int flow_alloc(const char * dst, return -EIRMD; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + fd = flow_init(recv_msg->port_id, recv_msg->pid, + qs == NULL ? qos_raw : *qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -623,8 +612,8 @@ int flow_alloc(const char * dst, assert(ai.flows[fd].frcti == NULL); - if (qc != QOS_CUBE_RAW) { - ai.flows[fd].frcti = frcti_create(fd, qc); + if (ai.flows[fd].spec.in_order != 0) { + ai.flows[fd].frcti = frcti_create(fd); if (ai.flows[fd].frcti == NULL) { flow_fini(fd); pthread_rwlock_unlock(&ai.lock); @@ -1178,9 +1167,9 @@ int fevent(struct flow_set * set, int np1_flow_alloc(pid_t n_pid, int port_id, - qoscube_t qc) + qosspec_t qs) { - return flow_init(port_id, n_pid, qc); + return flow_init(port_id, n_pid, qs); } int np1_flow_dealloc(int port_id) @@ -1243,25 +1232,25 @@ int ipcp_create_r(pid_t pid, int ipcp_flow_req_arr(pid_t pid, const uint8_t * dst, size_t len, - qoscube_t qc) + qosspec_t qs) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg; + qosspec_msg_t qs_msg; + int fd; assert(dst != NULL); - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_pid = true; - msg.pid = pid; - msg.has_hash = true; - msg.hash.len = len; - msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = qc; + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_pid = true; + msg.pid = pid; + msg.has_hash = true; + msg.hash.len = len; + msg.hash.data = (uint8_t *) dst; + qs_msg = spec_to_msg(&qs); + msg.qosspec = &qs_msg; recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) return -EIRMD; @@ -1275,7 +1264,7 @@ int ipcp_flow_req_arr(pid_t pid, return -1; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + fd = flow_init(recv_msg->port_id, recv_msg->pid, qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -1457,7 +1446,7 @@ int ipcp_flow_get_qoscube(int fd, assert(ai.flows[fd].port_id >= 0); - *cube = ai.flows[fd].cube; + *cube = qos_spec_to_cube(ai.flows[fd].spec); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/frct.c b/src/lib/frct.c index 516c958b..e9acb1dc 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -101,8 +101,7 @@ static void frct_fini(void) timerwheel_destroy(frct.tw); } -static struct frcti * frcti_create(int fd, - qoscube_t qc) +static struct frcti * frcti_create(int fd) { struct frcti * frcti; time_t delta_t; @@ -133,7 +132,7 @@ static struct frcti * frcti_create(int fd, frcti->snd_cr.inact = 3 * delta_t; frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - if (qc == QOS_CUBE_DATA) + if (ai.flows[fd].spec.loss == 0) frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.inact = 2 * delta_t; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index 454af0dc..48198a5b 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -23,6 +23,7 @@ syntax = "proto2"; import "ipcp_config.proto"; +import "qosspec.proto"; enum ipcp_msg_code { IPCP_BOOTSTRAP = 1; @@ -43,7 +44,7 @@ message ipcp_msg { optional bytes hash = 2; optional int32 port_id = 3; optional string dst = 4; - optional uint32 qoscube = 5; + optional qosspec_msg qosspec = 5; optional ipcp_config_msg conf = 6; optional int32 pid = 7; optional layer_info_msg layer_info = 8; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 16dfe828..2ed2ec37 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -23,6 +23,7 @@ syntax = "proto2"; import "ipcp_config.proto"; +import "qosspec.proto"; enum irm_msg_code { IRM_CREATE_IPCP = 1; @@ -67,7 +68,7 @@ message irm_msg { optional string dst = 9; optional bytes hash = 10; optional sint32 port_id = 11; - optional sint32 qoscube = 12; + optional qosspec_msg qosspec = 12; optional ipcp_config_msg conf = 13; optional uint32 opts = 14; repeated ipcp_info_msg ipcps = 15; diff --git a/src/lib/qos.c b/src/lib/qos.c index bee6ed71..8607031e 100644 --- a/src/lib/qos.c +++ b/src/lib/qos.c @@ -28,66 +28,61 @@ #include qosspec_t qos_raw = { - .delay = UINT32_MAX, - .bandwidth = 0, - .availability = 0, - .loss = 1, - .in_order = 0, - .maximum_interruption = UINT32_MAX + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 1, + .in_order = 0, + .max_gap = UINT32_MAX +}; + +qosspec_t qos_raw_no_errors = { + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 0, + .in_order = 0, + .max_gap = UINT32_MAX }; qosspec_t qos_best_effort = { - .delay = UINT32_MAX, - .bandwidth = 0, - .availability = 0, - .loss = 1, - .in_order = 1, - .maximum_interruption = UINT32_MAX + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = UINT32_MAX }; -qosspec_t qos_video = { - .delay = 100, - .bandwidth = UINT64_MAX, - .availability = 3, - .loss = 1, - .in_order = 1, - .maximum_interruption = 100 +qosspec_t qos_video = { + .delay = 100, + .bandwidth = UINT64_MAX, + .availability = 3, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = 100 }; qosspec_t qos_voice = { - .delay = 50, - .bandwidth = 100000, - .availability = 5, - .loss = 1, - .in_order = 1, - .maximum_interruption = 50 + .delay = 50, + .bandwidth = 100000, + .availability = 5, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = 50 }; qosspec_t qos_data = { - .delay = 1000, - .bandwidth = 0, - .availability = 0, - .in_order = 1, - .loss = 0, - .maximum_interruption = 2000 + .delay = 1000, + .bandwidth = 0, + .availability = 0, + .loss = 0, + .ber = 0, + .in_order = 1, + .max_gap = 2000 }; - -int qosspec_init(qosspec_t * qs) -{ - if (qs == NULL) - return -EINVAL; - - *qs = qos_best_effort; - - return 0; -} - -int qosspec_fini(qosspec_t * qs) -{ - if (qs == NULL) - return -EINVAL; - - memset(qs, 0, sizeof(*qs)); - - return 0; -} diff --git a/src/lib/qoscube.c b/src/lib/qoscube.c index 5dfa35ad..efca0e42 100644 --- a/src/lib/qoscube.c +++ b/src/lib/qoscube.c @@ -25,38 +25,20 @@ #include + + qoscube_t qos_spec_to_cube(qosspec_t qs) { - if (qs.loss == 0) - return QOS_CUBE_DATA; - else if (qs.delay <= qos_voice.delay && + if (qs.delay <= qos_voice.delay && qs.bandwidth <= qos_voice.bandwidth && qs.availability >= qos_voice.availability && - qs.maximum_interruption <= qos_voice.maximum_interruption) + qs.max_gap <= qos_voice.max_gap) return QOS_CUBE_VOICE; else if (qs.delay <= qos_video.delay && qs.bandwidth <= qos_video.bandwidth && qs.availability >= qos_video.availability && - qs.maximum_interruption <= qos_video.maximum_interruption) + qs.max_gap <= qos_video.max_gap) return QOS_CUBE_VIDEO; - else if (qs.in_order == 1) - return QOS_CUBE_BE; else - return QOS_CUBE_RAW; -} - -qosspec_t qos_cube_to_spec(qoscube_t qc) -{ - switch (qc) { - case QOS_CUBE_VOICE: - return qos_voice; - case QOS_CUBE_VIDEO: - return qos_video; - case QOS_CUBE_BE: - return qos_best_effort; - case QOS_CUBE_DATA: - return qos_data; - default: - return qos_raw; - } + return QOS_CUBE_BE; } diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto new file mode 100644 index 00000000..f355e345 --- /dev/null +++ b/src/lib/qosspec.proto @@ -0,0 +1,33 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * QoS specification message + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +syntax = "proto2"; + +message qosspec_msg { + required uint32 delay = 1; /* In ms */ + required uint64 bandwidth = 2; /* In bits/s */ + required uint32 availability = 3; /* Class of 9s */ + required uint32 loss = 4; /* Packet loss */ + required uint32 ber = 5; /* Bit error rate, ppb */ + required uint32 in_order = 6; /* In-order delivery */ + required uint32 max_gap = 7; /* In ms */ +}; diff --git a/src/lib/sockets.c b/src/lib/sockets.c index b148b7ca..85726783 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -165,3 +165,38 @@ char * ipcp_sock_path(pid_t pid) return full_name; } + +qosspec_msg_t spec_to_msg(qosspec_t * qs) +{ + qosspec_t spec; + qosspec_msg_t msg = QOSSPEC_MSG__INIT; + + spec = (qs == NULL ? qos_raw : *qs); + + msg.delay = spec.delay; + msg.bandwidth = spec.bandwidth; + msg.availability = spec.availability; + msg.loss = spec.loss; + msg.ber = spec.ber; + msg.in_order = spec.in_order; + msg.max_gap = spec.max_gap; + + return msg; +} + +qosspec_t msg_to_spec(qosspec_msg_t * msg) +{ + qosspec_t spec; + + assert(msg); + + spec.delay = msg->delay; + spec.bandwidth = msg->bandwidth; + spec.availability = msg->availability; + spec.loss = msg->loss; + spec.ber = msg->ber; + spec.in_order = msg->in_order; + spec.max_gap = msg->max_gap; + + return spec; +} -- cgit v1.2.3 From 5d11a6ad590133c92925c6162eb47b4401f16bef Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Fri, 5 Oct 2018 10:24:01 +0200 Subject: ipcpd, lib, irmd, tools: Change SDU to packet This will change SDU (Service Data Unit) to packet everywhere. SDU is OSI terminology, whereas packet is Ouroboros terminology. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- doc/man/ouroboros-tutorial.7 | 2 +- include/ouroboros/shm_rbuff.h | 2 +- include/ouroboros/timerwheel.h | 2 +- src/ipcpd/eth/eth.c | 46 ++++---- src/ipcpd/local/main.c | 12 +- src/ipcpd/normal/CMakeLists.txt | 2 +- src/ipcpd/normal/dht.c | 48 ++++---- src/ipcpd/normal/dt.c | 52 ++++----- src/ipcpd/normal/dt.h | 8 +- src/ipcpd/normal/fa.c | 36 +++--- src/ipcpd/normal/packet_sched.c | 241 ++++++++++++++++++++++++++++++++++++++++ src/ipcpd/normal/packet_sched.h | 43 +++++++ src/ipcpd/normal/sdu_sched.c | 241 ---------------------------------------- src/ipcpd/normal/sdu_sched.h | 43 ------- src/ipcpd/udp/main.c | 64 +++++------ src/irmd/main.c | 2 +- src/lib/CMakeLists.txt | 6 +- src/lib/dev.c | 14 +-- src/lib/frct.c | 2 +- src/lib/shm_rbuff.c | 6 +- src/lib/shm_rbuff_ll.c | 2 +- src/lib/shm_rbuff_pthr.c | 2 +- src/lib/shm_rdrbuff.c | 2 +- src/tools/ocbr/ocbr.c | 10 +- src/tools/ocbr/ocbr_client.c | 2 +- src/tools/ocbr/ocbr_server.c | 17 +-- src/tools/oecho/oecho.c | 8 +- src/tools/operf/operf.c | 4 +- src/tools/operf/operf_client.c | 9 +- src/tools/oping/oping_client.c | 4 +- 30 files changed, 468 insertions(+), 464 deletions(-) create mode 100644 src/ipcpd/normal/packet_sched.c create mode 100644 src/ipcpd/normal/packet_sched.h delete mode 100644 src/ipcpd/normal/sdu_sched.c delete mode 100644 src/ipcpd/normal/sdu_sched.h (limited to 'src/lib/frct.c') diff --git a/doc/man/ouroboros-tutorial.7 b/doc/man/ouroboros-tutorial.7 index 98e27254..76fa0068 100644 --- a/doc/man/ouroboros-tutorial.7 +++ b/doc/man/ouroboros-tutorial.7 @@ -108,7 +108,7 @@ Pinging my.oping.server with 64 bytes of data: --- my.oping.server ping statistics --- .br -3 SDUs transmitted, 3 received, 0% packet loss, time: 3001.011 ms +3 packets transmitted, 3 received, 0% packet loss, time: 3001.011 ms .br rtt min/avg/max/mdev = 0.304/0.392/0.475/0.086 ms .RE diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index b2e27c7b..223f6bf4 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h index 231e8103..34994185 100644 --- a/include/ouroboros/timerwheel.h +++ b/include/ouroboros/timerwheel.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Timerwheel * * Dimitri Staessens * Sander Vrijders diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 6fd7b805..1bbfac5b 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -123,7 +123,7 @@ #define DIX_HEADER_SIZE (DIX_EID_SIZE + DIX_LENGTH_SIZE) #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + DIX_HEADER_SIZE) #define MAX_EIDS (1 << (8 * DIX_EID_SIZE)) -#define ETH_MAX_SDU_SIZE (ETH_MTU - DIX_HEADER_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - DIX_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #elif defined(BUILD_ETH_LLC) #define THIS_TYPE IPCP_ETH_LLC @@ -131,7 +131,7 @@ #define LLC_HEADER_SIZE 3 #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE) #define MAX_SAPS 64 -#define ETH_MAX_SDU_SIZE (ETH_MTU - LLC_HEADER_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - LLC_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #endif @@ -230,8 +230,8 @@ struct { fset_t * np1_flows; pthread_rwlock_t flows_lock; - pthread_t sdu_writer[IPCP_ETH_WR_THR]; - pthread_t sdu_reader[IPCP_ETH_RD_THR]; + pthread_t packet_writer[IPCP_ETH_WR_THR]; + pthread_t packet_reader[IPCP_ETH_RD_THR]; #ifdef __linux__ pthread_t if_monitor; @@ -383,7 +383,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, assert(frame); - if (len > (size_t) ETH_MAX_SDU_SIZE) + if (len > (size_t) ETH_MAX_PACKET_SIZE) return -1; e_frame = (struct eth_frame *) frame; @@ -808,7 +808,7 @@ static void * eth_ipcp_mgmt_handler(void * o) return (void *) 0; } -static void * eth_ipcp_sdu_reader(void * o) +static void * eth_ipcp_packet_reader(void * o) { uint8_t br_addr[MAC_SIZE]; #if defined(BUILD_ETH_DIX) @@ -992,7 +992,7 @@ static void cleanup_writer(void * o) fqueue_destroy((fqueue_t *) o); } -static void * eth_ipcp_sdu_writer(void * o) +static void * eth_ipcp_packet_writer(void * o) { int fd; struct shm_du_buff * sdb; @@ -1443,22 +1443,22 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) } for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { - if (pthread_create(ð_data.sdu_reader[idx], + if (pthread_create(ð_data.packet_reader[idx], NULL, - eth_ipcp_sdu_reader, + eth_ipcp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } } for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { - if (pthread_create(ð_data.sdu_writer[idx], + if (pthread_create(ð_data.packet_writer[idx], NULL, - eth_ipcp_sdu_writer, + eth_ipcp_packet_writer, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_writer; + goto fail_packet_writer; } } @@ -1472,16 +1472,16 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sdu_writer: + fail_packet_writer: while (idx > 0) { - pthread_cancel(eth_data.sdu_writer[--idx]); - pthread_join(eth_data.sdu_writer[idx], NULL); + pthread_cancel(eth_data.packet_writer[--idx]); + pthread_join(eth_data.packet_writer[idx], NULL); } idx = IPCP_ETH_RD_THR; - fail_sdu_reader: + fail_packet_reader: while (idx > 0) { - pthread_cancel(eth_data.sdu_reader[--idx]); - pthread_join(eth_data.sdu_reader[idx], NULL); + pthread_cancel(eth_data.packet_reader[--idx]); + pthread_join(eth_data.packet_reader[idx], NULL); } pthread_cancel(eth_data.mgmt_handler); pthread_join(eth_data.mgmt_handler, NULL); @@ -1792,18 +1792,18 @@ int main(int argc, if (ipcp_get_state() == IPCP_SHUTDOWN) { for (i = 0; i < IPCP_ETH_WR_THR; ++i) - pthread_cancel(eth_data.sdu_writer[i]); + pthread_cancel(eth_data.packet_writer[i]); for (i = 0; i < IPCP_ETH_RD_THR; ++i) - pthread_cancel(eth_data.sdu_reader[i]); + pthread_cancel(eth_data.packet_reader[i]); pthread_cancel(eth_data.mgmt_handler); #ifdef __linux__ pthread_cancel(eth_data.if_monitor); #endif for (i = 0; i < IPCP_ETH_WR_THR; ++i) - pthread_join(eth_data.sdu_writer[i], NULL); + pthread_join(eth_data.packet_writer[i], NULL); for (i = 0; i < IPCP_ETH_RD_THR; ++i) - pthread_join(eth_data.sdu_reader[i], NULL); + pthread_join(eth_data.packet_reader[i], NULL); pthread_join(eth_data.mgmt_handler, NULL); #ifdef __linux__ diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 8eae7503..ab43f1f8 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -59,7 +59,7 @@ struct { fqueue_t * fq; pthread_rwlock_t lock; - pthread_t sduloop; + pthread_t packet_loop; } local_data; static int local_data_init(void) @@ -97,7 +97,7 @@ static void local_data_fini(void){ pthread_rwlock_destroy(&local_data.lock); } -static void * ipcp_local_sdu_loop(void * o) +static void * ipcp_local_packet_loop(void * o) { (void) o; @@ -139,8 +139,8 @@ static int ipcp_local_bootstrap(const struct ipcp_config * conf) ipcp_set_state(IPCP_OPERATIONAL); - if (pthread_create(&local_data.sduloop, NULL, - ipcp_local_sdu_loop, NULL)) { + if (pthread_create(&local_data.packet_loop, NULL, + ipcp_local_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); return -1; } @@ -364,8 +364,8 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(local_data.sduloop); - pthread_join(local_data.sduloop, NULL); + pthread_cancel(local_data.packet_loop); + pthread_join(local_data.packet_loop, NULL); } local_data_fini(); diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 1cba7630..0cb7b770 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -42,7 +42,7 @@ set(SOURCE_FILES main.c pff.c routing.c - sdu_sched.c + packet_sched.c # Add policies last pol/alternate_pff.c pol/flat.c diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index a2fa4863..4064bf5c 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -62,21 +62,21 @@ typedef KadContactMsg kad_contact_msg_t; #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ -#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ -#define KAD_K 8 /* Replication factor, MDHT value. */ -#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ -#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 8 /* Response time to wait for a join. */ -#define KAD_T_RESP 5 /* Response time to wait for a response. */ -#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ -#define KAD_QUEER 15 /* Time to declare peer questionable. */ -#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ -#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ -#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ -#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ -#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 8 /* Response time to wait for a join. */ +#define KAD_T_RESP 5 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ +#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ +#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ +#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ enum dht_state { DHT_INIT = 0, @@ -251,7 +251,7 @@ struct join_info { uint64_t addr; }; -struct sdu_info { +struct packet_info { struct dht * dht; struct shm_du_buff * sdb; }; @@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht, kad_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) + if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) break; ipcp_sdb_release(sdb); @@ -2400,7 +2400,7 @@ uint64_t dht_query(struct dht * dht, return 0; } -static void * dht_handle_sdu(void * o) +static void * dht_handle_packet(void * o) { struct dht * dht = (struct dht *) o; @@ -2584,8 +2584,8 @@ static void * dht_handle_sdu(void * o) return (void *) 0; } -static void dht_post_sdu(void * comp, - struct shm_du_buff * sdb) +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) { struct cmd * cmd; struct dht * dht = (struct dht *) comp; @@ -2800,19 +2800,19 @@ struct dht * dht_create(uint64_t addr) dht->addr = addr; dht->id = NULL; #ifndef __DHT_TEST__ - dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); + dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); if (dht->tpm == NULL) goto fail_tpm_create; if (tpm_start(dht->tpm)) goto fail_tpm_start; - dht->fd = dt_reg_comp(dht, &dht_post_sdu, DHT); + dht->fd = dt_reg_comp(dht, &dht_post_packet, DHT); notifier_reg(handle_event, dht); #else (void) handle_event; - (void) dht_handle_sdu; - (void) dht_post_sdu; + (void) dht_handle_packet; + (void) dht_post_packet; #endif dht->state = DHT_INIT; diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index a350e4be..08c937e7 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -46,7 +46,7 @@ #include "dt.h" #include "pff.h" #include "routing.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "comp.h" #include "fa.h" @@ -65,7 +65,7 @@ #endif struct comp_info { - void (* post_sdu)(void * comp, struct shm_du_buff * sdb); + void (* post_packet)(void * comp, struct shm_du_buff * sdb); void * comp; char * name; }; @@ -154,7 +154,7 @@ static void dt_pci_shrink(struct shm_du_buff * sdb) } struct { - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; @@ -421,24 +421,25 @@ static void handle_event(void * self, #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, c->conn_info.addr); #endif - sdu_sched_add(dt.sdu_sched, c->flow_info.fd); - log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd); + packet_sched_add(dt.packet_sched, c->flow_info.fd); + log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); break; case NOTIFY_DT_CONN_DEL: #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, INVALID_ADDR); #endif - sdu_sched_del(dt.sdu_sched, c->flow_info.fd); - log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd); + packet_sched_del(dt.packet_sched, c->flow_info.fd); + log_dbg("Removed fd %d from " + "packet scheduler.", c->flow_info.fd); break; default: break; } } -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { struct dt_pci dt_pci; int ret; @@ -491,7 +492,7 @@ static void sdu_handler(int fd, ret = ipcp_flow_write(ofd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", ofd); + log_dbg("Failed to write packet to fd %d.", ofd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); ipcp_sdb_release(sdb); @@ -560,7 +561,7 @@ static void sdu_handler(int fd, return; } - if (dt.comps[dt_pci.eid].post_sdu == NULL) { + if (dt.comps[dt_pci.eid].post_packet == NULL) { log_err("No registered component on eid %d.", dt_pci.eid); ipcp_sdb_release(sdb); @@ -596,7 +597,8 @@ static void sdu_handler(int fd, pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); #endif - dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb); + dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, + sdb); } } @@ -761,15 +763,15 @@ void dt_fini(void) int dt_start(void) { - dt.sdu_sched = sdu_sched_create(sdu_handler); - if (dt.sdu_sched == NULL) { - log_err("Failed to create N-1 SDU scheduler."); + dt.packet_sched = packet_sched_create(packet_handler); + if (dt.packet_sched == NULL) { + log_err("Failed to create N-1 packet scheduler."); return -1; } if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); return -1; } @@ -780,7 +782,7 @@ void dt_stop(void) { pthread_cancel(dt.listener); pthread_join(dt.listener, NULL); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); } int dt_reg_comp(void * comp, @@ -800,11 +802,11 @@ int dt_reg_comp(void * comp, return -EBADF; } - assert(dt.comps[res_fd].post_sdu == NULL); + assert(dt.comps[res_fd].post_packet == NULL); assert(dt.comps[res_fd].comp == NULL); assert(dt.comps[res_fd].name == NULL); - dt.comps[res_fd].post_sdu = func; + dt.comps[res_fd].post_packet = func; dt.comps[res_fd].comp = comp; dt.comps[res_fd].name = name; @@ -815,10 +817,10 @@ int dt_reg_comp(void * comp, return res_fd; } -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int np1_fd, - struct shm_du_buff * sdb) +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int np1_fd, + struct shm_du_buff * sdb) { int fd; struct dt_pci dt_pci; @@ -863,7 +865,7 @@ int dt_write_sdu(uint64_t dst_addr, #endif ret = ipcp_flow_write(fd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", fd); + log_dbg("Failed to write packet to fd %d.", fd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); goto fail_write; diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index a17098b7..05b8220c 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -47,9 +47,9 @@ int dt_reg_comp(void * comp, void (* func)(void * comp, struct shm_du_buff * sdb), char * name); -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int res_fd, - struct shm_du_buff * sdb); +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int res_fd, + struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 4c82e0e0..d67ba61e 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -39,7 +39,7 @@ #include "dir.h" #include "fa.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "ipcp.h" #include "dt.h" @@ -74,19 +74,19 @@ struct { uint64_t r_addr[PROG_MAX_FLOWS]; int fd; - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; } fa; -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { pthread_rwlock_rdlock(&fa.flows_lock); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); - log_warn("Failed to forward SDU."); + log_warn("Failed to forward packet."); return; } @@ -99,7 +99,7 @@ static void destroy_conn(int fd) fa.r_addr[fd] = INVALID_ADDR; } -static void fa_post_sdu(void * comp, +static void fa_post_packet(void * comp, struct shm_du_buff * sdb) { struct timespec ts = {0, TIMEOUT * 1000}; @@ -192,7 +192,7 @@ static void fa_post_sdu(void * comp, if (msg->response < 0) destroy_conn(ntoh32(msg->r_eid)); else - sdu_sched_add(fa.sdu_sched, ntoh32(msg->r_eid)); + packet_sched_add(fa.packet_sched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -215,7 +215,7 @@ int fa_init(void) if (pthread_rwlock_init(&fa.flows_lock, NULL)) return -1; - fa.fd = dt_reg_comp(&fa, &fa_post_sdu, FA); + fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); return 0; } @@ -227,9 +227,9 @@ void fa_fini(void) int fa_start(void) { - fa.sdu_sched = sdu_sched_create(sdu_handler); - if (fa.sdu_sched == NULL) { - log_err("Failed to create SDU scheduler."); + fa.packet_sched = packet_sched_create(packet_handler); + if (fa.packet_sched == NULL) { + log_err("Failed to create packet scheduler."); return -1; } @@ -238,7 +238,7 @@ int fa_start(void) void fa_stop(void) { - sdu_sched_destroy(fa.sdu_sched); + packet_sched_destroy(fa.packet_sched); } int fa_alloc(int fd, @@ -273,7 +273,7 @@ int fa_alloc(int fd, qc = qos_spec_to_cube(qs); - if (dt_write_sdu(addr, qc, fa.fd, sdb)) { + if (dt_write_packet(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -335,14 +335,14 @@ int fa_alloc_resp(int fd, destroy_conn(fd); ipcp_sdb_release(sdb); } else { - sdu_sched_add(fa.sdu_sched, fd); + packet_sched_add(fa.packet_sched, fd); } ipcp_flow_get_qoscube(fd, &qc); assert(qc >= 0 && qc < QOS_CUBE_MAX); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.fd, sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { destroy_conn(fd); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); @@ -360,7 +360,7 @@ int fa_dealloc(int fd) pthread_rwlock_wrlock(&fa.flows_lock); - sdu_sched_del(fa.sdu_sched, fd); + packet_sched_del(fa.packet_sched, fd); destroy_conn(fd); diff --git a/src/ipcpd/normal/packet_sched.c b/src/ipcpd/normal/packet_sched.c new file mode 100644 index 00000000..fc01fb32 --- /dev/null +++ b/src/ipcpd/normal/packet_sched.c @@ -0,0 +1,241 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include +#include + +#include "ipcp.h" +#include "packet_sched.h" +#include "connmgr.h" + +#include +#include +#include +#include +#include + +static int qos_prio [] = { + QOS_PRIO_RAW, + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE, + QOS_PRIO_DATA +}; + +struct packet_sched { + fset_t * set[QOS_CUBE_MAX]; + next_packet_fn_t callback; + pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { + struct packet_sched * sch; + qoscube_t qc; +}; + +static void cleanup_reader(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ + struct packet_sched * sched; + struct shm_du_buff * sdb; + int fd; + fqueue_t * fq; + qoscube_t qc; + + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; + + ipcp_lock_to_core(); + + free(o); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_reader, fq); + + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) + continue; + + while ((fd = fqueue_next(fq)) >= 0) { + switch (fqueue_type(fq)) { + case FLOW_DEALLOC: + notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); + break; + case FLOW_DOWN: + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + break; + case FLOW_UP: + notifier_event(NOTIFY_DT_FLOW_UP, &fd); + break; + case FLOW_PKT: + if (ipcp_flow_read(fd, &sdb)) + continue; + + sched->callback(fd, qc, sdb); + break; + default: + break; + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +struct packet_sched * packet_sched_create(next_packet_fn_t callback) +{ + struct packet_sched * packet_sched; + struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; + int i; + int j; + + assert(callback); + + packet_sched = malloc(sizeof(*packet_sched)); + if (packet_sched == NULL) + goto fail_malloc; + + packet_sched->callback = callback; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + packet_sched->set[i] = fset_create(); + if (packet_sched->set[i] == NULL) { + for (j = 0; j < i; ++j) + fset_destroy(packet_sched->set[j]); + goto fail_flow_set; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = packet_sched; + infos[i]->qc = i % QOS_CUBE_MAX; + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + if (pthread_create(&packet_sched->readers[i], NULL, + packet_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(packet_sched->readers[j], NULL); + for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + free(infos[i]); + goto fail_infos; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + + (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + + if (pthread_setschedparam(packet_sched->readers[i], pol, &par)) + goto fail_sched; + } + + return packet_sched; + + fail_sched: + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_join(packet_sched->readers[j], NULL); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(packet_sched->set[j]); + fail_flow_set: + free(packet_sched); + fail_malloc: + return NULL; +} + +void packet_sched_destroy(struct packet_sched * packet_sched) +{ + int i; + + assert(packet_sched); + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + pthread_cancel(packet_sched->readers[i]); + pthread_join(packet_sched->readers[i], NULL); + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fset_destroy(packet_sched->set[i]); + + free(packet_sched); +} + +void packet_sched_add(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_add(packet_sched->set[qc], fd); +} + +void packet_sched_del(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_del(packet_sched->set[qc], fd); +} diff --git a/src/ipcpd/normal/packet_sched.h b/src/ipcpd/normal/packet_sched.h new file mode 100644 index 00000000..13ff400d --- /dev/null +++ b/src/ipcpd/normal/packet_sched.h @@ -0,0 +1,43 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H +#define OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H + +#include +#include + +typedef void (* next_packet_fn_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); + +struct packet_sched * packet_sched_create(next_packet_fn_t callback); + +void packet_sched_destroy(struct packet_sched * packet_sched); + +void packet_sched_add(struct packet_sched * packet_sched, + int fd); + +void packet_sched_del(struct packet_sched * packet_sched, + int fd); + +#endif /* OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H */ diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c deleted file mode 100644 index e6d705fb..00000000 --- a/src/ipcpd/normal/sdu_sched.c +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L -#endif - -#include "config.h" - -#include -#include - -#include "ipcp.h" -#include "sdu_sched.h" -#include "connmgr.h" - -#include -#include -#include -#include -#include - -static int qos_prio [] = { - QOS_PRIO_RAW, - QOS_PRIO_BE, - QOS_PRIO_VIDEO, - QOS_PRIO_VOICE, - QOS_PRIO_DATA -}; - -struct sdu_sched { - fset_t * set[QOS_CUBE_MAX]; - next_sdu_fn_t callback; - pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; -}; - -struct sched_info { - struct sdu_sched * sch; - qoscube_t qc; -}; - -static void cleanup_reader(void * o) -{ - fqueue_destroy((fqueue_t *) o); -} - -static void * sdu_reader(void * o) -{ - struct sdu_sched * sched; - struct shm_du_buff * sdb; - int fd; - fqueue_t * fq; - qoscube_t qc; - - sched = ((struct sched_info *) o)->sch; - qc = ((struct sched_info *) o)->qc; - - ipcp_lock_to_core(); - - free(o); - - fq = fqueue_create(); - if (fq == NULL) - return (void *) -1; - - pthread_cleanup_push(cleanup_reader, fq); - - while (true) { - int ret = fevent(sched->set[qc], fq, NULL); - if (ret < 0) - continue; - - while ((fd = fqueue_next(fq)) >= 0) { - switch (fqueue_type(fq)) { - case FLOW_DEALLOC: - notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); - break; - case FLOW_DOWN: - notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); - break; - case FLOW_UP: - notifier_event(NOTIFY_DT_FLOW_UP, &fd); - break; - case FLOW_PKT: - if (ipcp_flow_read(fd, &sdb)) - continue; - - sched->callback(fd, qc, sdb); - break; - default: - break; - } - } - } - - pthread_cleanup_pop(true); - - return (void *) 0; -} - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback) -{ - struct sdu_sched * sdu_sched; - struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; - int i; - int j; - - assert(callback); - - sdu_sched = malloc(sizeof(*sdu_sched)); - if (sdu_sched == NULL) - goto fail_malloc; - - sdu_sched->callback = callback; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sdu_sched->set[i] = fset_create(); - if (sdu_sched->set[i] == NULL) { - for (j = 0; j < i; ++j) - fset_destroy(sdu_sched->set[j]); - goto fail_flow_set; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - infos[i] = malloc(sizeof(*infos[i])); - if (infos[i] == NULL) { - for (j = 0; j < i; ++j) - free(infos[j]); - goto fail_infos; - } - infos[i]->sch = sdu_sched; - infos[i]->qc = i % QOS_CUBE_MAX; - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - if (pthread_create(&sdu_sched->readers[i], NULL, - sdu_reader, infos[i])) { - for (j = 0; j < i; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < i; ++j) - pthread_join(sdu_sched->readers[j], NULL); - for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - free(infos[i]); - goto fail_infos; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - struct sched_param par; - int pol = SCHED_RR; - int min; - int max; - - min = sched_get_priority_min(pol); - max = sched_get_priority_max(pol); - - min = (max - min) / 2; - - par.sched_priority = min + - (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); - - if (pthread_setschedparam(sdu_sched->readers[i], pol, &par)) - goto fail_sched; - } - - return sdu_sched; - - fail_sched: - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_join(sdu_sched->readers[j], NULL); - fail_infos: - for (j = 0; j < QOS_CUBE_MAX; ++j) - fset_destroy(sdu_sched->set[j]); - fail_flow_set: - free(sdu_sched); - fail_malloc: - return NULL; -} - -void sdu_sched_destroy(struct sdu_sched * sdu_sched) -{ - int i; - - assert(sdu_sched); - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - pthread_cancel(sdu_sched->readers[i]); - pthread_join(sdu_sched->readers[i], NULL); - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) - fset_destroy(sdu_sched->set[i]); - - free(sdu_sched); -} - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_add(sdu_sched->set[qc], fd); -} - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_del(sdu_sched->set[qc], fd); -} diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h deleted file mode 100644 index cdbda272..00000000 --- a/src/ipcpd/normal/sdu_sched.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H -#define OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H - -#include -#include - -typedef void (* next_sdu_fn_t)(int fd, - qoscube_t qc, - struct shm_du_buff * sdb); - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback); - -void sdu_sched_destroy(struct sdu_sched * sdu_sched); - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd); - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd); - -#endif /* OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H */ diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 96820662..a1af1e85 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -54,20 +54,20 @@ #include #include -#define FLOW_REQ 1 -#define FLOW_REPLY 2 +#define FLOW_REQ 1 +#define FLOW_REPLY 2 -#define THIS_TYPE IPCP_UDP -#define LISTEN_PORT htons(0x0D1F) -#define SHIM_UDP_BUF_SIZE 256 -#define SHIM_UDP_MSG_SIZE 256 -#define SHIM_UDP_MAX_SDU_SIZE 8980 -#define DNS_TTL 86400 -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ +#define THIS_TYPE IPCP_UDP +#define LISTEN_PORT htons(0x0D1F) +#define SHIM_UDP_BUF_SIZE 256 +#define SHIM_UDP_MSG_SIZE 256 +#define SHIM_UDP_MAX_PACKET_SIZE 8980 +#define DNS_TTL 86400 +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ -#define local_ip (udp_data.s_saddr.sin_addr.s_addr) +#define local_ip (udp_data.s_saddr.sin_addr.s_addr) -#define UDP_MAX_PORTS 0xFFFF +#define UDP_MAX_PORTS 0xFFFF struct mgmt_msg { uint16_t src_udp_port; @@ -106,9 +106,9 @@ struct { struct uf fd_to_uf[SYS_MAX_FLOWS]; pthread_rwlock_t flows_lock; - pthread_t sduloop; + pthread_t packet_loop; pthread_t handler; - pthread_t sdu_reader; + pthread_t packet_reader; bool fd_set_mod; pthread_cond_t fd_set_cond; @@ -495,13 +495,13 @@ static void * ipcp_udp_listener(void * o) return 0; } -static void * ipcp_udp_sdu_reader(void * o) +static void * ipcp_udp_packet_reader(void * o) { ssize_t n; int skfd; int fd; /* FIXME: avoid this copy */ - char buf[SHIM_UDP_MAX_SDU_SIZE]; + char buf[SHIM_UDP_MAX_PACKET_SIZE]; struct sockaddr_in r_saddr; struct timeval tv = {0, FD_UPDATE_TIMEOUT}; fd_set read_fds; @@ -533,7 +533,7 @@ static void * ipcp_udp_sdu_reader(void * o) n = sizeof(r_saddr); if ((n = recvfrom(skfd, &buf, - SHIM_UDP_MAX_SDU_SIZE, + SHIM_UDP_MAX_PACKET_SIZE, 0, (struct sockaddr *) &r_saddr, (unsigned *) &n)) <= 0) @@ -552,7 +552,7 @@ static void * ipcp_udp_sdu_reader(void * o) return (void *) 0; } -static void * ipcp_udp_sdu_loop(void * o) +static void * ipcp_udp_packet_loop(void * o) { int fd; struct shm_du_buff * sdb; @@ -582,7 +582,7 @@ static void * ipcp_udp_sdu_loop(void * o) if (send(fd, shm_du_buff_head(sdb), shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), 0) < 0) - log_err("Failed to send SDU."); + log_err("Failed to send PACKET."); pthread_cleanup_pop(true); } @@ -666,20 +666,20 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) goto fail_bind; } - if (pthread_create(&udp_data.sdu_reader, + if (pthread_create(&udp_data.packet_reader, NULL, - ipcp_udp_sdu_reader, + ipcp_udp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } - if (pthread_create(&udp_data.sduloop, + if (pthread_create(&udp_data.packet_loop, NULL, - ipcp_udp_sdu_loop, + ipcp_udp_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sduloop; + goto fail_packet_loop; } log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid()); @@ -688,10 +688,10 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sduloop: - pthread_cancel(udp_data.sdu_reader); - pthread_join(udp_data.sdu_reader, NULL); - fail_sdu_reader: + fail_packet_loop: + pthread_cancel(udp_data.packet_reader); + pthread_join(udp_data.packet_reader, NULL); + fail_packet_reader: pthread_cancel(udp_data.handler); pthread_join(udp_data.handler, NULL); fail_bind: @@ -1222,13 +1222,13 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(udp_data.sduloop); + pthread_cancel(udp_data.packet_loop); pthread_cancel(udp_data.handler); - pthread_cancel(udp_data.sdu_reader); + pthread_cancel(udp_data.packet_reader); - pthread_join(udp_data.sduloop, NULL); + pthread_join(udp_data.packet_loop, NULL); pthread_join(udp_data.handler, NULL); - pthread_join(udp_data.sdu_reader, NULL); + pthread_join(udp_data.packet_reader, NULL); } udp_data_fini(); diff --git a/src/irmd/main.c b/src/irmd/main.c index 9504f3b5..427e09d1 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -123,7 +123,7 @@ struct { pthread_rwlock_t flows_lock; /* lock for flows */ struct lockfile * lf; /* single irmd per system */ - struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + struct shm_rdrbuff * rdrb; /* rdrbuff for packets */ int sockfd; /* UNIX socket */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index aa4e5bf3..84a9fd72 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -138,7 +138,7 @@ mark_as_advanced(LIBRT_LIBRARIES LIBPTHREAD_LIBRARIES LIBGCRYPT_INCLUDE_DIR SYS_RND_HDR) set(SHM_BUFFER_SIZE 4096 CACHE STRING - "Number of blocks in SDU buffer, must be a power of 2") + "Number of blocks in packet buffer, must be a power of 2") set(SYS_MAX_FLOWS 10240 CACHE STRING "Maximum number of total flows for this system") set(PROG_MAX_FLOWS 4096 CACHE STRING @@ -171,9 +171,9 @@ set(SHM_FLOW_SET_PREFIX "/${SHM_PREFIX}.set." CACHE INTERNAL set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL "Name for the main POSIX shared memory buffer") set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING - "SDU buffer block size, multiple of pagesize for performance") + "Packet buffer block size, multiple of pagesize for performance") set(SHM_RDRB_MULTI_BLOCK true CACHE BOOL - "SDU buffer multiblock SDU support") + "Packet buffer multiblock packet support") set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL "Enable shared memory lockless rbuff support") diff --git a/src/lib/dev.c b/src/lib/dev.c index a92c1e42..00dcf991 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -895,7 +895,7 @@ ssize_t flow_read(int fd, { ssize_t idx; ssize_t n; - uint8_t * sdu; + uint8_t * packet; struct shm_rbuff * rb; struct shm_du_buff * sdb; struct timespec abs; @@ -948,19 +948,19 @@ ssize_t flow_read(int fd, } } - n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); + n = shm_rdrbuff_read(&packet, ai.rdrb, idx); assert(n >= 0); if (n <= (ssize_t) count) { - memcpy(buf, sdu, n); + memcpy(buf, packet, n); shm_rdrbuff_remove(ai.rdrb, idx); flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; return n; } else { if (partrd) { - memcpy(buf, sdu, count); + memcpy(buf, packet, count); sdb = shm_rdrbuff_get(ai.rdrb, idx); shm_du_buff_head_release(sdb, n); flow->part_idx = idx; @@ -1042,7 +1042,7 @@ int fset_add(struct flow_set * set, int fd) { int ret; - size_t sdus; + size_t packets; size_t i; if (set == NULL || fd < 0 || fd > SYS_MAX_FLOWS) @@ -1052,8 +1052,8 @@ int fset_add(struct flow_set * set, ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); - sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); - for (i = 0; i < sdus; i++) + packets = shm_rbuff_queued(ai.flows[fd].rx_rb); + for (i = 0; i < packets; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/frct.c b/src/lib/frct.c index e9acb1dc..2e5c385d 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -294,7 +294,7 @@ static int __frcti_snd(struct frcti * frcti, return 0; } -/* Returns 0 when idx contains an SDU for the application. */ +/* Returns 0 when idx contains a packet for the application. */ static int __frcti_rcv(struct frcti * frcti, struct shm_du_buff * sdb) { diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 453f5183..5319e89c 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer implementations for incoming SDUs + * Ring buffer implementations for incoming packets * * Dimitri Staessens * Sander Vrijders @@ -63,8 +63,8 @@ struct shm_rbuff { size_t * tail; /* start of ringbuffer tail */ size_t * acl; /* access control */ pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * add; /* SDU arrived */ - pthread_cond_t * del; /* SDU removed */ + pthread_cond_t * add; /* packet arrived */ + pthread_cond_t * del; /* packet removed */ pid_t pid; /* pid of the owner */ int port_id; /* port_id of the flow */ }; diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index c488f274..43eac7f6 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Lockless ring buffer for incoming SDUs + * Lockless ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 3b7ea2d4..c74503ff 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 5ae2085d..8bbdda46 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -87,7 +87,7 @@ struct shm_rdrbuff { size_t * tail; /* start of ringbuffer tail */ pthread_mutex_t * lock; /* lock all free space in shm */ pthread_cond_t * full; /* flag when full */ - pthread_cond_t * healthy; /* flag when SDU is read */ + pthread_cond_t * healthy; /* flag when packet is read */ pid_t * pid; /* pid of the irmd owner */ }; diff --git a/src/tools/ocbr/ocbr.c b/src/tools/ocbr/ocbr.c index e2bd84af..12983da3 100644 --- a/src/tools/ocbr/ocbr.c +++ b/src/tools/ocbr/ocbr.c @@ -60,7 +60,7 @@ struct s { static void usage(void) { printf("Usage: cbr [OPTION]...\n" - "Sends SDUs from client to server at a constant bit rate.\n\n" + "Sends packets from client to server at a constant bit rate.\n\n" " -l, --listen Run in server mode\n" "\n" "Server options:\n" @@ -70,10 +70,10 @@ static void usage(void) "Client options:\n" " -n, --server_apn Specify the name of the server.\n" " -d, --duration Duration for sending (s)\n" - " -f, --flood Send SDUs as fast as possible\n" - " -s, --size SDU size (B, max %ld B)\n" + " -f, --flood Send packets as fast as possible\n" + " -s, --size packet size (B, max %ld B)\n" " -r, --rate Rate (b/s)\n" - " --sleep Sleep in between sending SDUs\n" + " --sleep Sleep in between sending packets\n" "\n\n" " --help Display this help text and exit\n", BUF_SIZE); @@ -82,7 +82,7 @@ static void usage(void) int main(int argc, char ** argv) { int duration = 60; /* One minute test */ - int size = 1000; /* 1000 byte SDUs */ + int size = 1000; /* 1000 byte packets */ long rate = 1000000; /* 1 Mb/s */ bool flood = false; bool sleep = false; diff --git a/src/tools/ocbr/ocbr_client.c b/src/tools/ocbr/ocbr_client.c index 026ab001..63b43721 100644 --- a/src/tools/ocbr/ocbr_client.c +++ b/src/tools/ocbr/ocbr_client.c @@ -155,7 +155,7 @@ int client_main(char * server, ms = ts_diff_ms(&start, &end); printf("sent statistics: " - "%9ld SDUs, %12ld bytes in %9d ms, %4.4f Mb/s\n", + "%9ld packets, %12ld bytes in %9d ms, %4.4f Mb/s\n", seqnr, seqnr * size, ms, (seqnr / (ms * 1000.0)) * size * 8.0); flow_dealloc(fd); diff --git a/src/tools/ocbr/ocbr_server.c b/src/tools/ocbr/ocbr_server.c index 4f080eff..75983201 100644 --- a/src/tools/ocbr/ocbr_server.c +++ b/src/tools/ocbr/ocbr_server.c @@ -90,8 +90,8 @@ static void handle_flow(int fd) bool stop = false; - long sdus = 0; - long sdus_intv = 0; + long packets = 0; + long packets_intv = 0; long bytes_read = 0; long bytes_read_intv = 0; @@ -109,7 +109,7 @@ static void handle_flow(int fd) if (count > 0) { clock_gettime(CLOCK_REALTIME, &alive); - sdus++; + packets++; bytes_read += count; } @@ -121,17 +121,18 @@ static void handle_flow(int fd) if (stop || ts_diff_ms(&now, &iv_end) < 0) { long us = ts_diff_us(&iv_start, &now); - printf("Flow %4d: %9ld SDUs (%12ld bytes) in %9ld ms" - " => %9.4f p/s, %9.4f Mb/s\n", + printf("Flow %4d: %9ld packets (%12ld bytes) in %9ld ms" + " => %9.4f pps, %9.4f Mbps\n", fd, - sdus - sdus_intv, + packets - packets_intv, bytes_read - bytes_read_intv, us / 1000, - ((sdus - sdus_intv) / (double) us) * MILLION, + ((packets - packets_intv) / (double) us) + * MILLION, 8 * ((bytes_read - bytes_read_intv) / (double)(us))); iv_start = iv_end; - sdus_intv = sdus; + packets_intv = packets; bytes_read_intv = bytes_read; ts_add(&iv_start, &intv, &iv_end); } diff --git a/src/tools/oecho/oecho.c b/src/tools/oecho/oecho.c index cc173988..b6a74aa5 100644 --- a/src/tools/oecho/oecho.c +++ b/src/tools/oecho/oecho.c @@ -72,7 +72,7 @@ static int server_main(void) count = flow_read(fd, &buf, BUF_SIZE); if (count < 0) { - printf("Failed to read SDU.\n"); + printf("Failed to read packet.\n"); flow_dealloc(fd); continue; } @@ -80,7 +80,7 @@ static int server_main(void) printf("Message from client is %.*s.\n", (int) count, buf); if (flow_write(fd, buf, count) == -1) { - printf("Failed to write SDU.\n"); + printf("Failed to write packet.\n"); flow_dealloc(fd); continue; } @@ -105,14 +105,14 @@ static int client_main(void) } if (flow_write(fd, message, strlen(message) + 1) < 0) { - printf("Failed to write SDU.\n"); + printf("Failed to write packet.\n"); flow_dealloc(fd); return -1; } count = flow_read(fd, buf, BUF_SIZE); if (count < 0) { - printf("Failed to read SDU.\n"); + printf("Failed to read packet.\n"); flow_dealloc(fd); return -1; } diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c index 137e8647..92555c23 100644 --- a/src/tools/operf/operf.c +++ b/src/tools/operf/operf.c @@ -119,8 +119,8 @@ static void usage(void) " -d, --duration Test duration (default 60s)\n" " -r, --rate Rate (b/s)\n" " -s, --size Payload size (B, default 1500)\n" - " -f, --flood Send SDUs as fast as possible\n" - " --sleep Sleep in between sending SDUs\n" + " -f, --flood Send packets as fast as possible\n" + " --sleep Sleep in between sending packets\n" "\n" " --help Display this help text and exit\n"); } diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c index c8873c54..6862944e 100644 --- a/src/tools/operf/operf_client.c +++ b/src/tools/operf/operf_client.c @@ -120,11 +120,12 @@ void * writer(void * o) msg = (struct msg *) buf; if (client.flood) - printf("Flooding %s with %d byte SDUs for %d seconds.\n\n", + printf("Flooding %s with %d byte packets for %d seconds.\n\n", client.server_name, client.size, client.duration / 1000); else - printf("Sending %d byte SDUs for %d s to %s at %.3lf Mb/s.\n\n", + printf("Sending %d byte packets for %d s to %s " + "at %.3lf Mb/s.\n\n", client.size, client.duration / 1000, client.server_name, client.rate / (double) MILLION); @@ -141,7 +142,7 @@ void * writer(void * o) msg->id = client.sent; if (flow_write(*fdp, buf, client.size) == -1) { - printf("Failed to send SDU.\n"); + printf("Failed to send packet.\n"); flow_dealloc(*fdp); free(buf); return (void *) -1; @@ -225,7 +226,7 @@ int client_main(void) printf("\n"); printf("--- %s perf statistics ---\n", client.server_name); - printf("%ld SDUs transmitted, ", client.sent); + printf("%ld packets transmitted, ", client.sent); printf("%ld received, ", client.rcvd); printf("%ld%% packet loss, ", client.sent == 0 ? 0 : 100 - ((100 * client.rcvd) / client.sent)); diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 0f7695b5..a978e659 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -176,7 +176,7 @@ void * writer(void * o) msg->tv_nsec = now.tv_nsec; if (flow_write(*fdp, buf, client.size) == -1) { - printf("Failed to send SDU.\n"); + printf("Failed to send packet.\n"); flow_dealloc(*fdp); free(buf); return (void *) -1; @@ -253,7 +253,7 @@ static int client_main(void) printf("\n"); printf("--- %s ping statistics ---\n", client.s_apn); - printf("%d SDUs transmitted, ", client.sent); + printf("%d packets transmitted, ", client.sent); printf("%d received, ", client.rcvd); printf("%zd out-of-order, ", client.ooo); printf("%.0lf%% packet loss, ", client.sent == 0 ? 0 : -- cgit v1.2.3 From 070075812728a9949a636e7629fa8235b41ce41b Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 5 Oct 2018 17:04:16 +0200 Subject: lib: Split error checking from FRCT This splits off the CRC from FRCT so it can be set independently. Ouroboros now allows raw flows with error checking. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/fccntl.h | 1 - src/lib/CMakeLists.txt | 2 ++ src/lib/config.h.in | 1 + src/lib/dev.c | 63 ++++++++++++++++++++++++++++++++++++++++------ src/lib/frct.c | 36 -------------------------- 5 files changed, 58 insertions(+), 45 deletions(-) (limited to 'src/lib/frct.c') diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h index 7d1f7fe8..b11c14db 100644 --- a/include/ouroboros/fccntl.h +++ b/include/ouroboros/fccntl.h @@ -47,7 +47,6 @@ /* FRCT flags */ #define FRCTFRESCNTRL 00000001 /* Feedback from receiver */ #define FRCTFRTX 00000002 /* Reliable flow */ -#define FRCTFERRCHCK 00000004 /* Check for errors */ /* Flow operations */ #define FLOWSRCVTIMEO 00000001 /* Set read timeout */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 84a9fd72..f66e7ab9 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -176,6 +176,8 @@ set(SHM_RDRB_MULTI_BLOCK true CACHE BOOL "Packet buffer multiblock packet support") set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL "Enable shared memory lockless rbuff support") +set(QOS_DISABLE_CRC 0 CACHE BOOL + "Ignores ber setting on all QoS cubes") set(SOURCE_FILES_DEV # Add source files here diff --git a/src/lib/config.h.in b/src/lib/config.h.in index 69e7f4b0..e8cfeba3 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -28,6 +28,7 @@ #cmakedefine SHM_RBUFF_LOCKLESS #cmakedefine SHM_RDRB_MULTI_BLOCK +#cmakedefine QOS_DISABLE_CRC #define SHM_RBUFF_PREFIX "@SHM_RBUFF_PREFIX@" #define SHM_LOCKFILE_NAME "@SHM_LOCKFILE_NAME@" diff --git a/src/lib/dev.c b/src/lib/dev.c index 117ce49d..a7b342ba 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -61,6 +61,8 @@ #define NO_PART -1 #define DONE_PART -2 +#define CRCLEN (sizeof(uint32_t)) + struct flow_set { size_t idx; }; @@ -566,6 +568,10 @@ int flow_alloc(const char * dst, irm_msg_t * recv_msg; int fd; +#ifdef QOS_DISABLE_CRC + if (qs != NULL) + qs->ber = 1; +#endif msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst = (char *) dst; msg.has_pid = true; @@ -813,16 +819,40 @@ int fccntl(int fd, return -EPERM; } +static int chk_crc(struct shm_du_buff * sdb) +{ + uint32_t crc; + uint8_t * head = shm_du_buff_head(sdb); + uint8_t * tail = shm_du_buff_tail_release(sdb, CRCLEN); + + mem_hash(HASH_CRC32, &crc, head, tail - head); + + return !(crc == *((uint32_t *) tail)); +} + +static int add_crc(struct shm_du_buff * sdb) +{ + uint8_t * head = shm_du_buff_head(sdb); + uint8_t * tail = shm_du_buff_tail_alloc(sdb, CRCLEN); + if (tail == NULL) + return -1; + + mem_hash(HASH_CRC32, tail, head, tail - head); + + return 0; +} + ssize_t flow_write(int fd, const void * buf, size_t count) { - struct flow * flow; - ssize_t idx; - int ret; - int flags; - struct timespec abs; - struct timespec * abstime = NULL; + struct flow * flow; + ssize_t idx; + int ret; + int flags; + struct timespec abs; + struct timespec * abstime = NULL; + struct shm_du_buff * sdb; if (buf == NULL) return 0; @@ -869,14 +899,21 @@ ssize_t flow_write(int fd, if (idx < 0) return idx; - if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) { + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + if (frcti_snd(flow->frcti, sdb) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOMEM; + } + + if (flow->spec.ber == 0 && add_crc(sdb) != 0) { shm_rdrbuff_remove(ai.rdrb, idx); return -ENOMEM; } pthread_rwlock_rdlock(&ai.lock); - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else @@ -944,6 +981,8 @@ ssize_t flow_read(int fd, if (idx < 0) return idx; sdb = shm_rdrbuff_get(ai.rdrb, idx); + if (flow->spec.ber == 0 && chk_crc(sdb) != 0) + continue; } while (frcti_rcv(flow->frcti, sdb) != 0); } } @@ -1341,6 +1380,8 @@ int ipcp_flow_read(int fd, if (idx < 0) return idx; *sdb = shm_rdrbuff_get(ai.rdrb, idx); + if (flow->spec.ber == 0 && chk_crc(*sdb) != 0) + continue; } while (frcti_rcv(flow->frcti, *sdb) != 0); return 0; @@ -1376,6 +1417,12 @@ int ipcp_flow_write(int fd, return -ENOMEM; } + if (flow->spec.ber == 0 && add_crc(sdb) != 0) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOMEM; + } + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); diff --git a/src/lib/frct.c b/src/lib/frct.c index 2e5c385d..424367c3 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -31,7 +31,6 @@ #define TW_RESOLUTION 1 /* ms */ #define FRCT_PCILEN (sizeof(struct frct_pci)) -#define FRCT_CRCLEN (sizeof(uint32_t)) struct frct_cr { uint32_t lwe; @@ -204,22 +203,6 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) return idx; } -static int frct_chk_crc(uint8_t * head, - uint8_t * tail) -{ - uint32_t crc; - - mem_hash(HASH_CRC32, &crc, head, tail - head); - - return crc == *((uint32_t *) tail); -} - -static void frct_add_crc(uint8_t * head, - uint8_t * tail) -{ - mem_hash(HASH_CRC32, tail, head, tail - head); -} - static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) { struct frct_pci * pci = NULL; @@ -252,18 +235,6 @@ static int __frcti_snd(struct frcti * frcti, pci->flags |= FRCT_DATA; - if (snd_cr->cflags & FRCTFERRCHCK) { - uint8_t * tail = shm_du_buff_tail_alloc(sdb, FRCT_CRCLEN); - if (tail == NULL) { - pthread_rwlock_unlock(&frcti->lock); - return -1; - } - - frct_add_crc((uint8_t *) pci, tail); - - pci->flags |= FRCT_CRC; - } - /* Set DRF if there are no unacknowledged packets. */ if (frcti->seqno == snd_cr->lwe) pci->flags |= FRCT_DRF; @@ -317,13 +288,6 @@ static int __frcti_rcv(struct frcti * frcti, idx = shm_du_buff_get_idx(sdb); - /* PDU may be corrupted. */ - if (pci->flags & FRCT_CRC) { - uint8_t * tail = shm_du_buff_tail_release(sdb, FRCT_CRCLEN); - if (frct_chk_crc((uint8_t *) pci, tail)) - goto drop_packet; - } - seqno = ntoh32(pci->seqno); /* Check if receiver inactivity is true. */ -- cgit v1.2.3 From 0c1cfca084b0fc6a282f253d6e7c2dad6427de65 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sat, 6 Oct 2018 16:39:23 +0200 Subject: lib: Keep track of highest delivered seqno The FRCT kept only a left window edge in the receiver connection window, however, it needs to keep track of the left window edge (highest ACK'd sequence number) and the highest delivered sequence number, so it can delay ACKs that cannot be piggybacked. TCP recommends at most 500 ms for delayed ACKs (probably good to keep it near half of RTO). Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/frct.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) (limited to 'src/lib/frct.c') diff --git a/src/lib/frct.c b/src/lib/frct.c index 424367c3..200a9fe7 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -37,6 +37,7 @@ struct frct_cr { uint32_t rwe; uint8_t cflags; + uint32_t seqno; time_t act; time_t inact; @@ -49,8 +50,6 @@ struct frcti { time_t a; time_t r; - uint32_t seqno; - struct frct_cr snd_cr; struct frct_cr rcv_cr; @@ -236,22 +235,22 @@ static int __frcti_snd(struct frcti * frcti, pci->flags |= FRCT_DATA; /* Set DRF if there are no unacknowledged packets. */ - if (frcti->seqno == snd_cr->lwe) + if (snd_cr->seqno == snd_cr->lwe) pci->flags |= FRCT_DRF; /* Choose a new sequence number if sender inactivity expired. */ if (now.tv_sec - snd_cr->act > snd_cr->inact) { /* There are no unacknowledged packets. */ - assert(frcti->seqno == snd_cr->lwe); + assert(snd_cr->seqno == snd_cr->lwe); #ifdef CONFIG_OUROBOROS_DEBUG - frcti->seqno = 0; + snd_cr->seqno = 0; #else - random_buffer(&frcti->seqno, sizeof(frcti->seqno)); + random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); #endif - frcti->snd_cr.lwe = frcti->seqno; + frcti->snd_cr.lwe = snd_cr->seqno; } - pci->seqno = hton32(frcti->seqno++); + pci->seqno = hton32(snd_cr->seqno++); if (!(snd_cr->cflags & FRCTFRTX)) snd_cr->lwe++; else @@ -294,27 +293,27 @@ static int __frcti_rcv(struct frcti * frcti, if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { /* Inactive receiver, check for DRF. */ if (pci->flags & FRCT_DRF) /* New run. */ - rcv_cr->lwe = seqno; + rcv_cr->seqno = seqno; else goto drop_packet; } - if (seqno == rcv_cr->lwe) { - ++rcv_cr->lwe; + if (seqno == rcv_cr->seqno) { + ++rcv_cr->seqno; } else { /* Out of order. */ - if ((int32_t)(seqno - rcv_cr->lwe) < 0) /* Duplicate. */ + if ((int32_t)(seqno - rcv_cr->seqno) < 0) /* Duplicate. */ goto drop_packet; if (rcv_cr->cflags & FRCTFRTX) { size_t pos = seqno & (RQ_SIZE - 1); - if ((seqno - rcv_cr->lwe) > RQ_SIZE /* Out of rq. */ + if ((seqno - rcv_cr->seqno) > RQ_SIZE /* Out of rq. */ || frcti->rq[pos] != -1) /* Duplicate in rq. */ goto drop_packet; /* Queue. */ frcti->rq[pos] = idx; ret = -EAGAIN; } else { - rcv_cr->lwe = seqno; + rcv_cr->seqno = seqno; } } -- cgit v1.2.3