From f8c14e0246a6c9cb5e8ff47869b5968abb63f010 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Mon, 12 Dec 2016 13:24:17 +0100 Subject: src, tools: Set/get timeout and get qos for flows Receiver timeouts can now be set on a flow using the flow_set_timeout function. Specifying NULL disables the timeout. The flow_get_timeout function gets the value for the timeout. This commit also deprecates fcntl in favor of flow_get_flags and flow_set_flags functions. struct qos_spec is typedef'd as a qosspec_t. The tools and cdap.c are updated to use the new API. Fixes a bug in operf client where the client's writer thread wouldn't cancel on SIGINT. --- src/ipcpd/normal/fmgr.c | 6 +- src/irmd/main.c | 10 +-- src/lib/cdap.c | 2 +- src/lib/dev.c | 190 +++++++++++++++++++++++++++++++++-------- src/tools/cbr/cbr_server.c | 4 +- src/tools/echo/echo_server.c | 2 +- src/tools/operf/operf.c | 3 - src/tools/operf/operf_client.c | 68 +++------------ src/tools/operf/operf_server.c | 2 +- src/tools/oping/oping.c | 3 - src/tools/oping/oping_client.c | 115 ++++++++++--------------- src/tools/oping/oping_server.c | 4 +- 12 files changed, 227 insertions(+), 182 deletions(-) (limited to 'src') diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 41785ae4..8e416aa4 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -123,9 +123,9 @@ static int add_np1_fd(int fd, static void * fmgr_nm1_acceptor(void * o) { - int fd; - char * ae_name; - struct qos_spec qs; + int fd; + char * ae_name; + qosspec_t qs; (void) o; diff --git a/src/irmd/main.c b/src/irmd/main.c index dc0c26f2..6a6aedf8 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1125,16 +1125,16 @@ static int flow_alloc_resp(pid_t n_api, return ret; } -static struct irm_flow * flow_alloc(pid_t api, - char * dst_name, - char * src_ae_name, - struct qos_spec * qos) +static struct irm_flow * flow_alloc(pid_t api, + char * dst_name, + char * src_ae_name, + qosspec_t * qos) { struct irm_flow * f; pid_t ipcp; int port_id; - /* FIXME: Map qos_spec to qos_cube */ + /* FIXME: Map qosspec to qos_cube */ (void) qos; pthread_rwlock_rdlock(&irmd->state_lock); diff --git a/src/lib/cdap.c b/src/lib/cdap.c index d06a7d39..df79be54 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -203,7 +203,7 @@ struct cdap * cdap_create(struct cdap_ops * ops, ops->cdap_request == NULL) return NULL; - flags = flow_cntl(fd, FLOW_F_GETFL, 0); + flags = flow_get_flags(fd); if (flags & FLOW_O_NONBLOCK) return NULL; diff --git a/src/lib/dev.c b/src/lib/dev.c index 1c0d73a1..bad56129 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -138,7 +138,8 @@ struct flow { pid_t api; - struct timespec * timeout; + bool timesout; + struct timespec rcv_timeo; }; struct { @@ -213,11 +214,7 @@ static void reset_flow(int fd) ai.flows[fd].oflags = 0; ai.flows[fd].api = -1; - - if (ai.flows[fd].timeout != NULL) { - free(ai.flows[fd].timeout); - ai.flows[fd].timeout = NULL; - } + ai.flows[fd].timesout = false; } int ap_init(char * ap_name) @@ -265,13 +262,13 @@ int ap_init(char * ap_name) } for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai.flows[i].rx_rb = NULL; - ai.flows[i].tx_rb = NULL; - ai.flows[i].set = NULL; - ai.flows[i].port_id = -1; - ai.flows[i].oflags = 0; - ai.flows[i].api = -1; - ai.flows[i].timeout = NULL; + ai.flows[i].rx_rb = NULL; + ai.flows[i].tx_rb = NULL; + ai.flows[i].set = NULL; + ai.flows[i].port_id = -1; + ai.flows[i].oflags = 0; + ai.flows[i].api = -1; + ai.flows[i].timesout = false; } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); @@ -341,7 +338,7 @@ void ap_fini() pthread_rwlock_destroy(&ai.data_lock); } -int flow_accept(char ** ae_name, struct qos_spec * qos) +int flow_accept(char ** ae_name, qosspec_t * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -490,7 +487,7 @@ int flow_alloc_resp(int fd, int response) return ret; } -int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) +int flow_alloc(char * dst_name, char * src_ae_name, qosspec_t * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -680,7 +677,7 @@ int flow_dealloc(int fd) return 0; } -int flow_cntl(int fd, int cmd, int oflags) +int flow_set_flags(int fd, int flags) { int old; @@ -698,25 +695,115 @@ int flow_cntl(int fd, int cmd, int oflags) old = ai.flows[fd].oflags; - switch (cmd) { - case FLOW_F_GETFL: /* GET FLOW FLAGS */ + ai.flows[fd].oflags = flags; + if (flags & FLOW_O_WRONLY) + shm_rbuff_block(ai.flows[fd].rx_rb); + if (flags & FLOW_O_RDWR) + shm_rbuff_unblock(ai.flows[fd].rx_rb); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return old; +} + +int flow_get_flags(int fd) +{ + int old; + + if (fd < 0 || fd >= AP_MAX_FLOWS) + return -EBADF; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -ENOTALLOC; + } + + old = ai.flows[fd].oflags; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return old; +} + +int flow_get_timeout(int fd, struct timespec * timeo) +{ + int ret = 0; + + if (fd < 0 || fd >= AP_MAX_FLOWS || timeo == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return old; - case FLOW_F_SETFL: /* SET FLOW FLAGS */ - ai.flows[fd].oflags = oflags; - if (oflags & FLOW_O_WRONLY) - shm_rbuff_block(ai.flows[fd].rx_rb); - if (oflags & FLOW_O_RDWR) - shm_rbuff_unblock(ai.flows[fd].rx_rb); + return -ENOTALLOC; + } + + if (ai.flows[fd].timesout) + *timeo = ai.flows[fd].rcv_timeo; + else + ret = -EPERM; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return ret; +} + +int flow_set_timeout(int fd, struct timespec * timeo) +{ + if (fd < 0 || fd >= AP_MAX_FLOWS) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return old; - default: + return -ENOTALLOC; + } + + if (timeo == NULL) { + ai.flows[fd].timesout = false; + } else { + ai.flows[fd].timesout = true; + ai.flows[fd].rcv_timeo = *timeo; + } + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; +} + +int flow_get_qosspec(int fd, qosspec_t * spec) +{ + if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return FLOW_O_INVALID; /* unknown command */ + return -ENOTALLOC; } + + /* FIXME: map cube to spec */ + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; } ssize_t flow_write(int fd, void * buf, size_t count) @@ -764,7 +851,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) } } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; - struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; + struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -816,19 +903,28 @@ ssize_t flow_read(int fd, void * buf, size_t count) idx = shm_rbuff_read(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.flows_lock); } else { - struct shm_rbuff * rb = ai.flows[fd].rx_rb; - struct timespec * timeout = ai.flows[fd].timeout; + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + bool timeo = ai.flows[fd].timesout; + struct timespec timeout = ai.flows[fd].rcv_timeo; + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_rbuff_read_b(rb, timeout); + + if (timeo) + idx = shm_rbuff_read_b(rb, &timeout); + else + idx = shm_rbuff_read_b(rb, NULL); + pthread_rwlock_rdlock(&ai.data_lock); } - if (idx < 0) { + if (idx == -ETIMEDOUT) { pthread_rwlock_unlock(&ai.data_lock); - return -EAGAIN; + return -ETIMEDOUT; } + assert(idx >= 0); + n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); if (n < 0) { pthread_rwlock_unlock(&ai.data_lock); @@ -844,7 +940,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) return n; } -/* select functions */ +/* fqueue functions */ struct flow_set * flow_set_create() { @@ -1328,7 +1424,7 @@ int ipcp_flow_fini(int fd) { struct shm_rbuff * rb; - flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); + flow_set_flags(fd, FLOW_O_WRONLY); pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1343,6 +1439,28 @@ int ipcp_flow_fini(int fd) return 0; } +int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube) +{ + if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -ENOTALLOC; + } + + *cube = ai.flows[fd].qos; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; +} + ssize_t local_flow_read(int fd) { ssize_t ret; diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c index 104c5e9e..64055cfb 100644 --- a/src/tools/cbr/cbr_server.c +++ b/src/tools/cbr/cbr_server.c @@ -85,7 +85,7 @@ static void handle_flow(int fd) alive = iv_start; ts_add(&iv_start, &intv, &iv_end); - flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK); + flow_set_flags(fd, FLOW_O_NONBLOCK); while (!stop) { clock_gettime(CLOCK_REALTIME, &now); @@ -157,7 +157,7 @@ static void * listener(void * o) { int client_fd = 0; int response = 0; - struct qos_spec qs; + qosspec_t qs; (void) o; diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c index 09575364..c369d3e6 100644 --- a/src/tools/echo/echo_server.c +++ b/src/tools/echo/echo_server.c @@ -40,7 +40,7 @@ int server_main(void) int client_fd = 0; char buf[BUF_SIZE]; ssize_t count = 0; - struct qos_spec qs; + qosspec_t qs; printf("Starting the server.\n"); diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c index bc7ade3a..1716a598 100644 --- a/src/tools/operf/operf.c +++ b/src/tools/operf/operf.c @@ -47,9 +47,6 @@ struct c { unsigned long sent; unsigned long rcvd; - flow_set_t * flows; - fqueue_t * fq; - pthread_t reader_pt; pthread_t writer_pt; } client; diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c index 902a7b41..44f25893 100644 --- a/src/tools/operf/operf_client.c +++ b/src/tools/operf/operf_client.c @@ -22,6 +22,7 @@ */ #include +#include #include #ifdef __FreeBSD__ @@ -45,6 +46,7 @@ static void busy_wait_until(const struct timespec * deadline) while (now.tv_sec == deadline->tv_sec && now.tv_nsec < deadline->tv_nsec) clock_gettime(CLOCK_REALTIME, &now); + pthread_testcancel(); } void shutdown_client(int signo, siginfo_t * info, void * c) @@ -68,23 +70,20 @@ void * reader(void * o) struct timespec timeout = {2, 0}; char buf[OPERF_BUF_SIZE]; - int fd = 0; + int fd = *((int *) o); int msg_len = 0; - (void) o; + flow_set_timeout(fd, &timeout); - /* FIXME: use flow timeout option once we have it */ - while (flow_event_wait(client.flows, client.fq, &timeout) != -ETIMEDOUT) - while ((fd = fqueue_next(client.fq)) >= 0) { - msg_len = flow_read(fd, buf, OPERF_BUF_SIZE); - if (msg_len != client.size) { - printf("Invalid message on fd %d.\n", fd); - continue; - } - - ++client.rcvd; + while ((msg_len = flow_read(fd, buf, OPERF_BUF_SIZE)) != -ETIMEDOUT) { + if (msg_len != client.size) { + printf("Invalid message on fd %d.\n", fd); + continue; } + ++client.rcvd; + } + return (void *) 0; } @@ -160,33 +159,6 @@ void * writer(void * o) return (void *) 0; } -static int client_init(void) -{ - client.flows = flow_set_create(); - if (client.flows == NULL) - return -ENOMEM; - - client.fq = fqueue_create(); - if (client.fq == NULL) { - flow_set_destroy(client.flows); - return -ENOMEM; - } - - client.sent = 0; - client.rcvd = 0; - - return 0; -} - -void client_fini(void) -{ - if (client.flows != NULL) - flow_set_destroy(client.flows); - - if (client.fq != NULL) - fqueue_destroy(client.fq); -} - int client_main(void) { struct sigaction sig_act; @@ -208,32 +180,24 @@ int client_main(void) return -1; } - if (client_init()) { - printf("Failed to initialize client.\n"); - return -1; - } + client.sent = 0; + client.rcvd = 0; fd = flow_alloc(client.s_apn, NULL, NULL); if (fd < 0) { - flow_set_destroy(client.flows); - fqueue_destroy(client.fq); printf("Failed to allocate flow.\n"); return -1; } - flow_set_add(client.flows, fd); - if (flow_alloc_res(fd)) { printf("Flow allocation refused.\n"); - flow_set_del(client.flows, fd); flow_dealloc(fd); - client_fini(); return -1; } clock_gettime(CLOCK_REALTIME, &tic); - pthread_create(&client.reader_pt, NULL, reader, NULL); + pthread_create(&client.reader_pt, NULL, reader, &fd); pthread_create(&client.writer_pt, NULL, writer, &fd); pthread_join(client.writer_pt, NULL); @@ -253,11 +217,7 @@ int client_main(void) (client.rcvd * client.size * 8) / (double) ts_diff_us(&tic, &toc)); - flow_set_del(client.flows, fd); - flow_dealloc(fd); - client_fini(); - return 0; } diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c index 4eb93879..340103d2 100644 --- a/src/tools/operf/operf_server.c +++ b/src/tools/operf/operf_server.c @@ -102,7 +102,7 @@ void * accept_thread(void * o) { int fd = 0; struct timespec now = {0, 0}; - struct qos_spec qs; + qosspec_t qs; (void) o; diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 98d12a7b..224b182b 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -54,9 +54,6 @@ struct c { double rtt_avg; double rtt_m2; - flow_set_t * flows; - fqueue_t * fq; - /* needs locking */ struct timespec * times; pthread_mutex_t lock; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index b30ba5f4..c439cf46 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -60,56 +60,54 @@ void * reader(void * o) char buf[OPING_BUF_SIZE]; struct oping_msg * msg = (struct oping_msg *) buf; - int fd = 0; + int fd = *((int *) o); int msg_len = 0; double ms = 0; double d = 0; - (void) o; - - /* FIXME: use flow timeout option once we have it */ - while (client.rcvd != client.count - && (flow_event_wait(client.flows, client.fq, &timeout) - != -ETIMEDOUT)) { - while ((fd = fqueue_next(client.fq)) >= 0) { - msg_len = flow_read(fd, buf, OPING_BUF_SIZE); - if (msg_len < 0) - continue; - - if (ntohl(msg->type) != ECHO_REPLY) { - printf("Invalid message on fd %d.\n", fd); - continue; - } - - if (ntohl(msg->id) >= client.count) { - printf("Invalid id.\n"); - continue; - } - - ++client.rcvd; - - clock_gettime(CLOCK_REALTIME, &now); - - pthread_mutex_lock(&client.lock); - ms = ts_diff_us(&client.times[ntohl(msg->id)], &now) - / 1000.0; - pthread_mutex_unlock(&client.lock); - - printf("%d bytes from %s: seq=%d time=%.3f ms\n", - msg_len, - client.s_apn, - ntohl(msg->id), - ms); - - if (ms < client.rtt_min) - client.rtt_min = ms; - if (ms > client.rtt_max) - client.rtt_max = ms; - - d = (ms - client.rtt_avg); - client.rtt_avg += d / (float) client.rcvd; - client.rtt_m2 += d * (ms - client.rtt_avg); + flow_set_timeout(fd, &timeout); + + while (client.rcvd != client.count) { + msg_len = flow_read(fd, buf, OPING_BUF_SIZE); + if (msg_len == -ETIMEDOUT) + break; + + if (msg_len < 0) + continue; + + if (ntohl(msg->type) != ECHO_REPLY) { + printf("Invalid message on fd %d.\n", fd); + continue; } + + if (ntohl(msg->id) >= client.count) { + printf("Invalid id.\n"); + continue; + } + + ++client.rcvd; + + clock_gettime(CLOCK_REALTIME, &now); + + pthread_mutex_lock(&client.lock); + ms = ts_diff_us(&client.times[ntohl(msg->id)], &now) + / 1000.0; + pthread_mutex_unlock(&client.lock); + + printf("%d bytes from %s: seq=%d time=%.3f ms\n", + msg_len, + client.s_apn, + ntohl(msg->id), + ms); + + if (ms < client.rtt_min) + client.rtt_min = ms; + if (ms > client.rtt_max) + client.rtt_max = ms; + + d = (ms - client.rtt_avg); + client.rtt_avg += d / (float) client.rcvd; + client.rtt_m2 += d * (ms - client.rtt_avg); } return (void *) 0; @@ -164,20 +162,8 @@ void * writer(void * o) static int client_init(void) { - client.flows = flow_set_create(); - if (client.flows == NULL) - return -ENOMEM; - - client.fq = fqueue_create(); - if (client.fq == NULL) { - flow_set_destroy(client.flows); - return -ENOMEM; - } - client.times = malloc(sizeof(struct timespec) * client.count); if (client.times == NULL) { - flow_set_destroy(client.flows); - fqueue_destroy(client.fq); pthread_mutex_unlock(&client.lock); return -ENOMEM; } @@ -197,12 +183,6 @@ static int client_init(void) void client_fini(void) { - if (client.flows != NULL) - flow_set_destroy(client.flows); - - if (client.fq != NULL) - fqueue_destroy(client.fq); - if (client.times != NULL) free(client.times); } @@ -235,17 +215,12 @@ int client_main(void) fd = flow_alloc(client.s_apn, NULL, NULL); if (fd < 0) { - flow_set_destroy(client.flows); - fqueue_destroy(client.fq); printf("Failed to allocate flow.\n"); return -1; } - flow_set_add(client.flows, fd); - if (flow_alloc_res(fd)) { printf("Flow allocation refused.\n"); - flow_set_del(client.flows, fd); flow_dealloc(fd); client_fini(); return -1; @@ -255,7 +230,7 @@ int client_main(void) clock_gettime(CLOCK_REALTIME, &tic); - pthread_create(&client.reader_pt, NULL, reader, NULL); + pthread_create(&client.reader_pt, NULL, reader, &fd); pthread_create(&client.writer_pt, NULL, writer, &fd); pthread_join(client.writer_pt, NULL); @@ -283,8 +258,6 @@ int client_main(void) printf("NaN ms\n"); } - flow_set_del(client.flows, fd); - flow_dealloc(fd); client_fini(); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 8d7ab1db..63fca567 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -115,7 +115,7 @@ void * accept_thread(void * o) { int fd = 0; struct timespec now = {0, 0}; - struct qos_spec qs; + qosspec_t qs; (void) o; @@ -143,7 +143,7 @@ void * accept_thread(void * o) server.times[fd] = now; pthread_mutex_unlock(&server.lock); - flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK | FLOW_O_RDWR); + flow_set_flags(fd, FLOW_O_NONBLOCK | FLOW_O_RDWR); } return (void *) 0; -- cgit v1.2.3