diff options
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 133 |
1 files changed, 109 insertions, 24 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index df616ead..8d7d7934 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -63,6 +63,7 @@ #define SECMEMSZ 16384 #define SYMMKEYSZ 32 #define MSGBUFSZ 2048 +#define TICTIME 1000000 /* ns */ struct flow_set { size_t idx; @@ -255,6 +256,9 @@ static void flow_fini(int fd) bmp_release(ai.fds, fd); } + if (ai.flows[fd].frcti != NULL) + frcti_destroy(ai.flows[fd].frcti); + if (ai.flows[fd].rx_rb != NULL) { shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); shm_rbuff_close(ai.flows[fd].rx_rb); @@ -272,9 +276,6 @@ static void flow_fini(int fd) shm_flow_set_close(ai.flows[fd].set); } - if (ai.flows[fd].frcti != NULL) - frcti_destroy(ai.flows[fd].frcti); - if (ai.flows[fd].ctx != NULL) crypt_fini(ai.flows[fd].ctx); @@ -433,8 +434,13 @@ static void init(int argc, if (ai.fqset == NULL) goto fail_fqset; + if (timerwheel_init() < 0) + goto fail_timerwheel; + return; + fail_timerwheel: + shm_flow_set_close(ai.fqset); fail_fqset: pthread_rwlock_destroy(&ai.lock); fail_lock: @@ -491,6 +497,8 @@ static void fini(void) pthread_cond_destroy(&ai.ports[i].state_cond); } + timerwheel_fini(); + shm_rdrbuff_close(ai.rdrb); free(ai.flows); @@ -747,25 +755,59 @@ int flow_join(const char * dst, int flow_dealloc(int fd) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg; + struct flow * f; + time_t timeo; if (fd < 0 || fd >= SYS_MAX_FLOWS ) return -EINVAL; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_flow_id = true; - msg.has_pid = true; - msg.pid = ai.pid; + msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; + msg.has_flow_id = true; + msg.has_pid = true; + msg.pid = ai.pid; + msg.has_timeo_sec = true; + msg.has_timeo_nsec = true; + msg.timeo_nsec = 0; + + f = &ai.flows[fd]; pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id < 0) { + if (f->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - msg.flow_id = ai.flows[fd].flow_id; + msg.flow_id = f->flow_id; + + timeo = frcti_dealloc(f->frcti); + while (timeo < 0) { /* keep the flow active for rtx */ + ssize_t ret; + uint8_t buf[128]; + + f->oflags = FLOWFDEFAULT | FLOWFRNOPART; + + f->rcv_timesout = true; + f->rcv_timeo.tv_sec = -timeo; + f->rcv_timeo.tv_nsec = 0; + + pthread_rwlock_unlock(&ai.lock); + + ret = flow_read(fd, buf, 128); + + pthread_rwlock_rdlock(&ai.lock); + + timeo = frcti_dealloc(f->frcti); + + if (ret == -ETIMEDOUT && timeo < 0) + timeo = -timeo; + } + + msg.timeo_sec = timeo; + + shm_rbuff_fini(ai.flows[fd].tx_rb); pthread_rwlock_unlock(&ai.lock); @@ -904,13 +946,21 @@ int fccntl(int fd, goto einval; *fflags = flow->oflags; break; + case FRCTSFLAGS: + cflags = va_arg(l, uint16_t *); + if (cflags == NULL) + goto einval; + if (flow->frcti == NULL) + goto eperm; + frcti_setflags(flow->frcti, *cflags); + break; case FRCTGFLAGS: cflags = (uint16_t *) va_arg(l, int *); if (cflags == NULL) goto einval; if (flow->frcti == NULL) goto eperm; - *cflags = frcti_getconf(flow->frcti); + *cflags = frcti_getflags(flow->frcti); break; default: pthread_rwlock_unlock(&ai.lock); @@ -1067,6 +1117,8 @@ ssize_t flow_read(int fd, struct shm_rbuff * rb; struct shm_du_buff * sdb; struct timespec abs; + struct timespec tic = {0, TICTIME}; + struct timespec tictime; struct timespec * abstime = NULL; struct flow * flow; bool noblock; @@ -1096,6 +1148,8 @@ ssize_t flow_read(int fd, noblock = flow->oflags & FLOWFRNOBLOCK; partrd = !(flow->oflags & FLOWFRNOPART); + ts_add(&tic, &abs, &tictime); + if (ai.flows[fd].rcv_timesout) { ts_add(&abs, &flow->rcv_timeo, &abs); abstime = &abs; @@ -1108,9 +1162,21 @@ ssize_t flow_read(int fd, pthread_rwlock_unlock(&ai.lock); idx = noblock ? shm_rbuff_read(rb) : - shm_rbuff_read_b(rb, abstime); - if (idx < 0) - return idx; + shm_rbuff_read_b(rb, &tictime); + if (idx < 0) { + frcti_tick(flow->frcti); + + if (idx != -ETIMEDOUT) + return idx; + + if (abstime != NULL + && ts_diff_ns(&tictime, &abs) < 0) + return -ETIMEDOUT; + + ts_add(&tictime, &tic, &tictime); + pthread_rwlock_rdlock(&ai.lock); + continue; + } sdb = shm_rdrbuff_get(ai.rdrb, idx); if (flow->qs.ber == 0 && chk_crc(sdb) != 0) { @@ -1339,7 +1405,9 @@ ssize_t fevent(struct flow_set * set, const struct timespec * timeo) { ssize_t ret = 0; - struct timespec abstime; + struct timespec tic = {0, TICTIME}; + struct timespec tictime; + struct timespec abs; struct timespec * t = NULL; if (set == NULL || fq == NULL) @@ -1348,17 +1416,25 @@ ssize_t fevent(struct flow_set * set, if (fq->fqsize > 0 && fq->next != fq->fqsize) return fq->fqsize; - if (timeo != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeo, &abstime); - t = &abstime; - } + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + ts_add(&tic, &abs, &tictime); + t = &tictime; + + if (timeo != NULL) + ts_add(&abs, timeo, &abs); while (ret == 0) { ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); if (ret == -ETIMEDOUT) { - fq->fqsize = 0; - return -ETIMEDOUT; + if (timeo != NULL && ts_diff_ns(t, &abs) < 0) { + fq->fqsize = 0; + return -ETIMEDOUT; + } + ret = 0; + ts_add(t, &tic, t); + timerwheel_move(); + continue; } fq->fqsize = ret << 1; @@ -1382,10 +1458,19 @@ int np1_flow_alloc(pid_t n_pid, return flow_init(flow_id, n_pid, qs, NULL); } -int np1_flow_dealloc(int flow_id) +int np1_flow_dealloc(int flow_id, + time_t timeo) { int fd; + /* + * TODO: Don't pass timeo to the IPCP but wait in IRMd. + * This will need async ops, waiting until we bootstrap + * the IRMd over ouroboros. + */ + + sleep(timeo); + pthread_rwlock_rdlock(&ai.lock); fd = ai.ports[flow_id].fd; |