From f8c14e0246a6c9cb5e8ff47869b5968abb63f010 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Mon, 12 Dec 2016 13:24:17 +0100 Subject: src, tools: Set/get timeout and get qos for flows Receiver timeouts can now be set on a flow using the flow_set_timeout function. Specifying NULL disables the timeout. The flow_get_timeout function gets the value for the timeout. This commit also deprecates fcntl in favor of flow_get_flags and flow_set_flags functions. struct qos_spec is typedef'd as a qosspec_t. The tools and cdap.c are updated to use the new API. Fixes a bug in operf client where the client's writer thread wouldn't cancel on SIGINT. --- src/lib/cdap.c | 2 +- src/lib/dev.c | 190 ++++++++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 155 insertions(+), 37 deletions(-) (limited to 'src/lib') diff --git a/src/lib/cdap.c b/src/lib/cdap.c index d06a7d39..df79be54 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -203,7 +203,7 @@ struct cdap * cdap_create(struct cdap_ops * ops, ops->cdap_request == NULL) return NULL; - flags = flow_cntl(fd, FLOW_F_GETFL, 0); + flags = flow_get_flags(fd); if (flags & FLOW_O_NONBLOCK) return NULL; diff --git a/src/lib/dev.c b/src/lib/dev.c index 1c0d73a1..bad56129 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -138,7 +138,8 @@ struct flow { pid_t api; - struct timespec * timeout; + bool timesout; + struct timespec rcv_timeo; }; struct { @@ -213,11 +214,7 @@ static void reset_flow(int fd) ai.flows[fd].oflags = 0; ai.flows[fd].api = -1; - - if (ai.flows[fd].timeout != NULL) { - free(ai.flows[fd].timeout); - ai.flows[fd].timeout = NULL; - } + ai.flows[fd].timesout = false; } int ap_init(char * ap_name) @@ -265,13 +262,13 @@ int ap_init(char * ap_name) } for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai.flows[i].rx_rb = NULL; - ai.flows[i].tx_rb = NULL; - ai.flows[i].set = NULL; - ai.flows[i].port_id = -1; - ai.flows[i].oflags = 0; - ai.flows[i].api = -1; - ai.flows[i].timeout = NULL; + ai.flows[i].rx_rb = NULL; + ai.flows[i].tx_rb = NULL; + ai.flows[i].set = NULL; + ai.flows[i].port_id = -1; + ai.flows[i].oflags = 0; + ai.flows[i].api = -1; + ai.flows[i].timesout = false; } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); @@ -341,7 +338,7 @@ void ap_fini() pthread_rwlock_destroy(&ai.data_lock); } -int flow_accept(char ** ae_name, struct qos_spec * qos) +int flow_accept(char ** ae_name, qosspec_t * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -490,7 +487,7 @@ int flow_alloc_resp(int fd, int response) return ret; } -int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) +int flow_alloc(char * dst_name, char * src_ae_name, qosspec_t * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -680,7 +677,7 @@ int flow_dealloc(int fd) return 0; } -int flow_cntl(int fd, int cmd, int oflags) +int flow_set_flags(int fd, int flags) { int old; @@ -698,25 +695,115 @@ int flow_cntl(int fd, int cmd, int oflags) old = ai.flows[fd].oflags; - switch (cmd) { - case FLOW_F_GETFL: /* GET FLOW FLAGS */ + ai.flows[fd].oflags = flags; + if (flags & FLOW_O_WRONLY) + shm_rbuff_block(ai.flows[fd].rx_rb); + if (flags & FLOW_O_RDWR) + shm_rbuff_unblock(ai.flows[fd].rx_rb); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return old; +} + +int flow_get_flags(int fd) +{ + int old; + + if (fd < 0 || fd >= AP_MAX_FLOWS) + return -EBADF; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -ENOTALLOC; + } + + old = ai.flows[fd].oflags; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return old; +} + +int flow_get_timeout(int fd, struct timespec * timeo) +{ + int ret = 0; + + if (fd < 0 || fd >= AP_MAX_FLOWS || timeo == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return old; - case FLOW_F_SETFL: /* SET FLOW FLAGS */ - ai.flows[fd].oflags = oflags; - if (oflags & FLOW_O_WRONLY) - shm_rbuff_block(ai.flows[fd].rx_rb); - if (oflags & FLOW_O_RDWR) - shm_rbuff_unblock(ai.flows[fd].rx_rb); + return -ENOTALLOC; + } + + if (ai.flows[fd].timesout) + *timeo = ai.flows[fd].rcv_timeo; + else + ret = -EPERM; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return ret; +} + +int flow_set_timeout(int fd, struct timespec * timeo) +{ + if (fd < 0 || fd >= AP_MAX_FLOWS) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return old; - default: + return -ENOTALLOC; + } + + if (timeo == NULL) { + ai.flows[fd].timesout = false; + } else { + ai.flows[fd].timesout = true; + ai.flows[fd].rcv_timeo = *timeo; + } + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; +} + +int flow_get_qosspec(int fd, qosspec_t * spec) +{ + if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return FLOW_O_INVALID; /* unknown command */ + return -ENOTALLOC; } + + /* FIXME: map cube to spec */ + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; } ssize_t flow_write(int fd, void * buf, size_t count) @@ -764,7 +851,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) } } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; - struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; + struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -816,19 +903,28 @@ ssize_t flow_read(int fd, void * buf, size_t count) idx = shm_rbuff_read(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.flows_lock); } else { - struct shm_rbuff * rb = ai.flows[fd].rx_rb; - struct timespec * timeout = ai.flows[fd].timeout; + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + bool timeo = ai.flows[fd].timesout; + struct timespec timeout = ai.flows[fd].rcv_timeo; + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_rbuff_read_b(rb, timeout); + + if (timeo) + idx = shm_rbuff_read_b(rb, &timeout); + else + idx = shm_rbuff_read_b(rb, NULL); + pthread_rwlock_rdlock(&ai.data_lock); } - if (idx < 0) { + if (idx == -ETIMEDOUT) { pthread_rwlock_unlock(&ai.data_lock); - return -EAGAIN; + return -ETIMEDOUT; } + assert(idx >= 0); + n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); if (n < 0) { pthread_rwlock_unlock(&ai.data_lock); @@ -844,7 +940,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) return n; } -/* select functions */ +/* fqueue functions */ struct flow_set * flow_set_create() { @@ -1328,7 +1424,7 @@ int ipcp_flow_fini(int fd) { struct shm_rbuff * rb; - flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); + flow_set_flags(fd, FLOW_O_WRONLY); pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1343,6 +1439,28 @@ int ipcp_flow_fini(int fd) return 0; } +int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube) +{ + if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -ENOTALLOC; + } + + *cube = ai.flows[fd].qos; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; +} + ssize_t local_flow_read(int fd) { ssize_t ret; -- cgit v1.2.3