diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/irmd/ipcp.c | 13 | ||||
| -rw-r--r-- | src/irmd/ipcp.h | 5 | ||||
| -rw-r--r-- | src/irmd/main.c | 20 | ||||
| -rw-r--r-- | src/lib/dev.c | 133 | ||||
| -rw-r--r-- | src/lib/frct.c | 244 | ||||
| -rw-r--r-- | src/lib/ipcpd_messages.proto | 3 | ||||
| -rw-r--r-- | src/lib/rxmwheel.c | 261 | ||||
| -rw-r--r-- | src/lib/timerwheel.c | 409 | 
8 files changed, 690 insertions, 398 deletions
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 78408185..cbd9ee15 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -543,16 +543,19 @@ int ipcp_flow_alloc_resp(pid_t        pid,          return ret;  } -int ipcp_flow_dealloc(pid_t pid, -                      int   flow_id) +int ipcp_flow_dealloc(pid_t  pid, +                      int    flow_id, +                      time_t timeo)  {          ipcp_msg_t   msg      = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL;          int          ret      = -1; -        msg.code        = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; -        msg.has_flow_id = true; -        msg.flow_id     = flow_id; +        msg.code          = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; +        msg.has_flow_id   = true; +        msg.flow_id       = flow_id; +        msg.has_timeo_sec = true; +        msg.timeo_sec     = timeo;          recv_msg = send_recv_ipcp_msg(pid, &msg);          if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index ae00792b..652316ba 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -85,7 +85,8 @@ int   ipcp_flow_alloc_resp(pid_t        pid,                             const void * data,                             size_t       len); -int   ipcp_flow_dealloc(pid_t pid, -                        int   flow_id); +int   ipcp_flow_dealloc(pid_t  pid, +                        int    flow_id, +                        time_t timeo);  #endif /* OUROBOROS_IRMD_IPCP_H */ diff --git a/src/irmd/main.c b/src/irmd/main.c index 3709a3e5..3a0ad544 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -68,10 +68,11 @@  #endif  #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ -#define SHM_SAN_HOLDOFF 1000 /* ms */ -#define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo) -#define IB_LEN SOCK_BUF_SIZE -#define BIND_TIMEOUT  10 /* ms */ +#define SHM_SAN_HOLDOFF    1000 /* ms */ +#define IPCP_HASH_LEN(e)   hash_len(e->dir_hash_algo) +#define IB_LEN             SOCK_BUF_SIZE +#define BIND_TIMEOUT       10   /* ms */ +#define DEALLOC_TIME       300  /*  s */  enum init_state {          IPCP_NULL = 0, @@ -1475,7 +1476,8 @@ static int flow_alloc(pid_t              pid,  }  static int flow_dealloc(pid_t pid, -                        int   flow_id) +                        int   flow_id, +                        time_t timeo)  {          pid_t n_1_pid = -1;          int   ret = 0; @@ -1521,7 +1523,7 @@ static int flow_dealloc(pid_t pid,          pthread_rwlock_unlock(&irmd.flows_lock);          if (n_1_pid != -1) -                ret = ipcp_flow_dealloc(n_1_pid, flow_id); +                ret = ipcp_flow_dealloc(n_1_pid, flow_id, timeo);          return ret;  } @@ -1927,7 +1929,7 @@ void * irm_sanitize(void * o)                                  ipcpi   = f->n_1_pid;                                  flow_id = f->flow_id;                                  pthread_rwlock_unlock(&irmd.flows_lock); -                                ipcp_flow_dealloc(ipcpi, flow_id); +                                ipcp_flow_dealloc(ipcpi, flow_id, DEALLOC_TIME);                                  pthread_rwlock_wrlock(&irmd.flows_lock);                                  continue;                          } @@ -2190,7 +2192,9 @@ static void * mainloop(void * o)                          }                          break;                  case IRM_MSG_CODE__IRM_FLOW_DEALLOC: -                        result = flow_dealloc(msg->pid, msg->flow_id); +                        result = flow_dealloc(msg->pid, +                                              msg->flow_id, +                                              msg->timeo_sec);                          break;                  case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:                          assert(msg->pk.len > 0 ? msg->pk.data != NULL diff --git a/src/lib/dev.c b/src/lib/dev.c index df616ead..8d7d7934 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -63,6 +63,7 @@  #define SECMEMSZ  16384  #define SYMMKEYSZ 32  #define MSGBUFSZ  2048 +#define TICTIME   1000000  /* ns */  struct flow_set {          size_t idx; @@ -255,6 +256,9 @@ static void flow_fini(int fd)                  bmp_release(ai.fds, fd);          } +        if (ai.flows[fd].frcti != NULL) +                frcti_destroy(ai.flows[fd].frcti); +          if (ai.flows[fd].rx_rb != NULL) {                  shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);                  shm_rbuff_close(ai.flows[fd].rx_rb); @@ -272,9 +276,6 @@ static void flow_fini(int fd)                  shm_flow_set_close(ai.flows[fd].set);          } -        if (ai.flows[fd].frcti != NULL) -                frcti_destroy(ai.flows[fd].frcti); -          if (ai.flows[fd].ctx != NULL)                  crypt_fini(ai.flows[fd].ctx); @@ -433,8 +434,13 @@ static void init(int     argc,          if (ai.fqset == NULL)                  goto fail_fqset; +        if (timerwheel_init() < 0) +                goto fail_timerwheel; +          return; + fail_timerwheel: +        shm_flow_set_close(ai.fqset);   fail_fqset:          pthread_rwlock_destroy(&ai.lock);   fail_lock: @@ -491,6 +497,8 @@ static void fini(void)                  pthread_cond_destroy(&ai.ports[i].state_cond);          } +        timerwheel_fini(); +          shm_rdrbuff_close(ai.rdrb);          free(ai.flows); @@ -747,25 +755,59 @@ int flow_join(const char *            dst,  int flow_dealloc(int fd)  { -        irm_msg_t   msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg; +        irm_msg_t     msg = IRM_MSG__INIT; +        irm_msg_t *   recv_msg; +        struct flow * f; +        time_t        timeo;          if (fd < 0 || fd >= SYS_MAX_FLOWS )                  return -EINVAL; -        msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC; -        msg.has_flow_id  = true; -        msg.has_pid      = true; -        msg.pid          = ai.pid; +        msg.code           = IRM_MSG_CODE__IRM_FLOW_DEALLOC; +        msg.has_flow_id    = true; +        msg.has_pid        = true; +        msg.pid            = ai.pid; +        msg.has_timeo_sec  = true; +        msg.has_timeo_nsec = true; +        msg.timeo_nsec     = 0; + +        f = &ai.flows[fd];          pthread_rwlock_rdlock(&ai.lock); -        if (ai.flows[fd].flow_id < 0) { +        if (f->flow_id < 0) {                  pthread_rwlock_unlock(&ai.lock);                  return -ENOTALLOC;          } -        msg.flow_id = ai.flows[fd].flow_id; +        msg.flow_id = f->flow_id; + +        timeo = frcti_dealloc(f->frcti); +        while (timeo < 0) { /* keep the flow active for rtx */ +                ssize_t ret; +                uint8_t buf[128]; + +                f->oflags = FLOWFDEFAULT | FLOWFRNOPART; + +                f->rcv_timesout = true; +                f->rcv_timeo.tv_sec = -timeo; +                f->rcv_timeo.tv_nsec = 0; + +                pthread_rwlock_unlock(&ai.lock); + +                ret = flow_read(fd, buf, 128); + +                pthread_rwlock_rdlock(&ai.lock); + +                timeo = frcti_dealloc(f->frcti); + +                if (ret == -ETIMEDOUT && timeo < 0) +                        timeo = -timeo; +        } + +        msg.timeo_sec = timeo; + +        shm_rbuff_fini(ai.flows[fd].tx_rb);          pthread_rwlock_unlock(&ai.lock); @@ -904,13 +946,21 @@ int fccntl(int fd,                          goto einval;                  *fflags = flow->oflags;                  break; +        case FRCTSFLAGS: +                cflags = va_arg(l, uint16_t *); +                if (cflags == NULL) +                        goto einval; +                if (flow->frcti == NULL) +                        goto eperm; +                frcti_setflags(flow->frcti, *cflags); +                break;          case FRCTGFLAGS:                  cflags = (uint16_t *) va_arg(l, int *);                  if (cflags == NULL)                          goto einval;                  if (flow->frcti == NULL)                          goto eperm; -                *cflags = frcti_getconf(flow->frcti); +                *cflags = frcti_getflags(flow->frcti);                  break;          default:                  pthread_rwlock_unlock(&ai.lock); @@ -1067,6 +1117,8 @@ ssize_t flow_read(int    fd,          struct shm_rbuff *   rb;          struct shm_du_buff * sdb;          struct timespec      abs; +        struct timespec      tic = {0, TICTIME}; +        struct timespec      tictime;          struct timespec *    abstime = NULL;          struct flow *        flow;          bool                 noblock; @@ -1096,6 +1148,8 @@ ssize_t flow_read(int    fd,          noblock = flow->oflags & FLOWFRNOBLOCK;          partrd = !(flow->oflags & FLOWFRNOPART); +        ts_add(&tic, &abs, &tictime); +          if (ai.flows[fd].rcv_timesout) {                  ts_add(&abs, &flow->rcv_timeo, &abs);                  abstime = &abs; @@ -1108,9 +1162,21 @@ ssize_t flow_read(int    fd,                          pthread_rwlock_unlock(&ai.lock);                          idx = noblock ? shm_rbuff_read(rb) : -                                shm_rbuff_read_b(rb, abstime); -                        if (idx < 0) -                                return idx; +                                shm_rbuff_read_b(rb, &tictime); +                        if (idx < 0) { +                                frcti_tick(flow->frcti); + +                                if (idx != -ETIMEDOUT) +                                        return idx; + +                                if (abstime != NULL +                                    && ts_diff_ns(&tictime, &abs) < 0) +                                        return -ETIMEDOUT; + +                                ts_add(&tictime, &tic, &tictime); +                                pthread_rwlock_rdlock(&ai.lock); +                                continue; +                        }                          sdb = shm_rdrbuff_get(ai.rdrb, idx);                          if (flow->qs.ber == 0 && chk_crc(sdb) != 0) { @@ -1339,7 +1405,9 @@ ssize_t fevent(struct flow_set *       set,                 const struct timespec * timeo)  {          ssize_t           ret = 0; -        struct timespec   abstime; +        struct timespec   tic = {0, TICTIME}; +        struct timespec   tictime; +        struct timespec   abs;          struct timespec * t = NULL;          if (set == NULL || fq == NULL) @@ -1348,17 +1416,25 @@ ssize_t fevent(struct flow_set *       set,          if (fq->fqsize > 0 && fq->next != fq->fqsize)                  return fq->fqsize; -        if (timeo != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeo, &abstime); -                t = &abstime; -        } +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        ts_add(&tic, &abs, &tictime); +        t = &tictime; + +        if (timeo != NULL) +                ts_add(&abs, timeo, &abs);          while (ret == 0) {                  ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);                  if (ret == -ETIMEDOUT) { -                        fq->fqsize = 0; -                        return -ETIMEDOUT; +                        if (timeo != NULL && ts_diff_ns(t, &abs) < 0) { +                                fq->fqsize = 0; +                                return -ETIMEDOUT; +                        } +                        ret = 0; +                        ts_add(t, &tic, t); +                        timerwheel_move(); +                        continue;                  }                  fq->fqsize = ret << 1; @@ -1382,10 +1458,19 @@ int np1_flow_alloc(pid_t     n_pid,          return flow_init(flow_id, n_pid, qs, NULL);  } -int np1_flow_dealloc(int flow_id) +int np1_flow_dealloc(int    flow_id, +                     time_t timeo)  {          int fd; +        /* +         * TODO: Don't pass timeo to the IPCP but wait in IRMd. +         * This will need async ops, waiting until we bootstrap +         * the IRMd over ouroboros. +         */ + +        sleep(timeo); +          pthread_rwlock_rdlock(&ai.lock);          fd = ai.ports[flow_id].fd; diff --git a/src/lib/frct.c b/src/lib/frct.c index 2bd126f4..c26910fa 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,7 +21,7 @@   */  /* Default Delta-t parameters */ -#define DELT_MPL       (60 * BILLION) /* ns */ +#define DELT_MPL        (5 * BILLION) /* ns */  #define DELT_A          (1 * BILLION) /* ns */  #define DELT_R         (20 * BILLION) /* ns */ @@ -59,8 +59,6 @@ struct frcti {          struct frct_cr    snd_cr;          struct frct_cr    rcv_cr; -        struct rxmwheel * rw; -          ssize_t           rq[RQ_SIZE];          pthread_rwlock_t  lock;  }; @@ -86,7 +84,84 @@ struct frct_pci {          uint32_t ackno;  } __attribute__((packed)); -#include <rxmwheel.c> +static bool before(uint32_t seq1, +                   uint32_t seq2) +{ +        return (int32_t)(seq1 - seq2) < 0; +} + +static bool after(uint32_t seq1, +                  uint32_t seq2) +{ +        return (int32_t)(seq2 - seq1) < 0; +} + +static void __send_ack(int fd, +                       int ackno) +{ +        struct shm_du_buff * sdb; +        struct frct_pci *    pci; +        ssize_t              idx; +        struct flow *        f; + +        /* Raw calls needed to bypass frcti. */ +        idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); +        if (idx < 0) +                return; + +        pci = (struct frct_pci *) shm_du_buff_head(sdb); +        memset(pci, 0, sizeof(*pci)); + +        pci->flags = FRCT_ACK; +        pci->ackno = hton32(ackno); + +        f = &ai.flows[fd]; + +        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { +                ipcp_sdb_release(sdb); +                return; +        } + +        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); +} + +static void frct_send_ack(struct frcti * frcti) +{ +        struct timespec      now; +        time_t               diff; +        uint32_t             ackno; +        int                  fd; + +        assert(frcti); + +        pthread_rwlock_rdlock(&frcti->lock); + +        if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { +                pthread_rwlock_unlock(&frcti->lock); +                return; +        } + +        ackno = frcti->rcv_cr.lwe; +        fd    = frcti->fd; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        diff = ts_diff_ns(&frcti->rcv_cr.act, &now); + +        pthread_rwlock_unlock(&frcti->lock); + +        if (diff > frcti->a || diff < DELT_ACK) +                return; + +        __send_ack(fd, ackno); + +        pthread_rwlock_wrlock(&frcti->lock); + +        if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) +                frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + +        pthread_rwlock_unlock(&frcti->lock); +}  static struct frcti * frcti_create(int fd)  { @@ -123,14 +198,10 @@ static struct frcti * frcti_create(int fd)          frcti->srtt = 0;            /* Updated on first ACK */          frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */          frcti->rto  = 20 * MILLION; /* Initial rxm will be after 20 ms */ -        frcti->rw   = NULL;          if (ai.flows[fd].qs.loss == 0) { -                frcti->snd_cr.cflags |= FRCTFRTX; +                frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;                  frcti->rcv_cr.cflags |= FRCTFRTX; -                frcti->rw = rxmwheel_create(); -                if (frcti->rw == NULL) -                        goto fail_rw;          }          frcti->snd_cr.inact  = (3 * mpl + a + r) / BILLION + 1; /* s */ @@ -141,8 +212,6 @@ static struct frcti * frcti_create(int fd)          return frcti; - fail_rw: -        pthread_rwlock_destroy(&frcti->lock);   fail_lock:          free(frcti);   fail_malloc: @@ -151,24 +220,16 @@ static struct frcti * frcti_create(int fd)  static void frcti_destroy(struct frcti * frcti)  { -        /* -         * FIXME: In case of reliable transmission we should -         * make sure everything we sent is acked. -         */ - -        if (frcti->rw != NULL) -                rxmwheel_destroy(frcti->rw); -          pthread_rwlock_destroy(&frcti->lock);          free(frcti);  } -static uint16_t frcti_getconf(struct frcti * frcti) +static uint16_t frcti_getflags(struct frcti * frcti)  {          uint16_t ret; -        assert (frcti); +        assert(frcti);          pthread_rwlock_rdlock(&frcti->lock); @@ -179,6 +240,22 @@ static uint16_t frcti_getconf(struct frcti * frcti)          return ret;  } +static void frcti_setflags(struct frcti * frcti, +                           uint16_t       flags) +{ +        flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */ + +        assert(frcti); + +        pthread_rwlock_wrlock(&frcti->lock); + +        frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags */ + +        frcti->snd_cr.cflags &= flags; + +        pthread_rwlock_unlock(&frcti->lock); +} +  #define frcti_queued_pdu(frcti)                         \          (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) @@ -189,8 +266,10 @@ static uint16_t frcti_getconf(struct frcti * frcti)          (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))  #define frcti_tick(frcti)                               \ -        (frcti == NULL ? 0 : __frcti_tick(frcti)) +        (frcti == NULL ? 0 : __frcti_tick()) +#define frcti_dealloc(frcti)                            \ +        (frcti == NULL ? 0 : __frcti_dealloc(frcti))  static ssize_t __frcti_queued_pdu(struct frcti * frcti)  { @@ -233,78 +312,41 @@ static ssize_t __frcti_pdu_ready(struct frcti * frcti)          return idx;  } -static bool before(uint32_t seq1, -                   uint32_t seq2) -{ -        return (int32_t)(seq1 - seq2) < 0; -} - -static bool after(uint32_t seq1, -                  uint32_t seq2) -{ -        return (int32_t)(seq2 - seq1) < 0; -} +#include <timerwheel.c> -static void frct_send_ack(struct frcti * frcti) +/* + * Send a final ACK for everything that has not been ACK'd. + * If the flow should be kept active for retransmission, + * the returned time will be negative. + */ +static time_t __frcti_dealloc(struct frcti * frcti)  { -        struct shm_du_buff * sdb; -        struct frct_pci *    pci; -        ssize_t              idx; -        struct timespec      now; -        time_t               diff; -        uint32_t             ackno; -        struct flow *        f; +        struct timespec now; +        time_t          wait; +        int             ackno; +        int             fd = -1; -        assert(frcti); +        clock_gettime(PTHREAD_COND_CLOCK, &now);          pthread_rwlock_rdlock(&frcti->lock); -        if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { -                pthread_rwlock_unlock(&frcti->lock); -                return; -        } -          ackno = frcti->rcv_cr.lwe; +        if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno) +                fd = frcti->fd; -        pthread_rwlock_unlock(&frcti->lock); +        wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec, +                   frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec); -        clock_gettime(PTHREAD_COND_CLOCK, &now); +        if (frcti->snd_cr.cflags & FRCTFLINGER +            && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno)) +                wait = -wait; -        diff = ts_diff_ns(&frcti->rcv_cr.act, &now); - -        if (diff > frcti->a) -                return; - -        if (diff < DELT_ACK) -                return; - -        /* Raw calls needed to bypass frcti. */ -        idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); -        if (idx < 0) -                return; - -        pci = (struct frct_pci *) shm_du_buff_head(sdb); -        memset(pci, 0, sizeof(*pci)); - -        pci->flags = FRCT_ACK; -        pci->ackno = hton32(ackno); - -        f = &ai.flows[frcti->fd]; - -        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { -                pthread_rwlock_rdlock(&ai.lock); -                ipcp_sdb_release(sdb); -                return; -        } - -        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - -        pthread_rwlock_wrlock(&frcti->lock); +        pthread_rwlock_unlock(&frcti->lock); -        if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) -                frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; +        if (fd != -1) +                __send_ack(fd, ackno); -        pthread_rwlock_unlock(&frcti->lock); +        return wait;  }  static int __frcti_snd(struct frcti *       frcti, @@ -315,14 +357,14 @@ static int __frcti_snd(struct frcti *       frcti,          struct frct_cr *  snd_cr;          struct frct_cr *  rcv_cr;          uint32_t          seqno; +        bool              rtx;          assert(frcti);          snd_cr = &frcti->snd_cr;          rcv_cr = &frcti->rcv_cr; -        if (frcti->rw != NULL) -                rxmwheel_move(frcti->rw); +        timerwheel_move();          pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);          if (pci == NULL) @@ -334,6 +376,8 @@ static int __frcti_snd(struct frcti *       frcti,          pthread_rwlock_wrlock(&frcti->lock); +        rtx = snd_cr->cflags & FRCTFRTX; +          pci->flags |= FRCT_DATA;          /* Set DRF if there are no unacknowledged packets. */ @@ -351,7 +395,7 @@ static int __frcti_snd(struct frcti *       frcti,          seqno = snd_cr->seqno;          pci->seqno = hton32(seqno); -        if (!(snd_cr->cflags & FRCTFRTX)) { +        if (!rtx) {                  snd_cr->lwe++;          } else {                  if (!frcti->probe) { @@ -372,8 +416,8 @@ static int __frcti_snd(struct frcti *       frcti,          pthread_rwlock_unlock(&frcti->lock); -        if (frcti->rw != NULL) -                rxmwheel_add(frcti->rw, frcti, seqno, sdb); +        if (rtx) +                timerwheel_rxm(frcti, seqno, sdb);          return 0;  } @@ -387,12 +431,10 @@ static void rtt_estimator(struct frcti * frcti,          if (srtt == 0) { /* first measurement */                  srtt   = mrtt;                  rttvar = mrtt >> 1; -          } else {                  time_t delta = mrtt - srtt;                  srtt += (delta >> 3); -                rttvar -= rttvar >> 2; -                rttvar += ABS(delta) >> 2; +                rttvar += (ABS(delta) - rttvar) >> 2;          }          frcti->srtt     = MAX(1000U, srtt); @@ -401,12 +443,9 @@ static void rtt_estimator(struct frcti * frcti,                                frcti->srtt + (frcti->mdev << 1));  } -static void __frcti_tick(struct frcti * frcti) +static void __frcti_tick(void)  { -        if (frcti->rw != NULL) { -                rxmwheel_move(frcti->rw); -                frct_send_ack(frcti); -        } +        timerwheel_move();  }  /* Always queues the next application packet on the RQ. */ @@ -420,6 +459,7 @@ static void __frcti_rcv(struct frcti *       frcti,          struct frct_cr *  rcv_cr;          uint32_t          seqno;          uint32_t          ackno; +        int               fd = -1;          assert(frcti); @@ -456,8 +496,10 @@ static void __frcti_rcv(struct frcti *       frcti,          if (!(pci->flags & FRCT_DATA))                  goto drop_packet; -        if (before(seqno, rcv_cr->lwe)) +        if (before(seqno, rcv_cr->lwe)) { +                rcv_cr->seqno = seqno;                  goto drop_packet; +        }          if (rcv_cr->cflags & FRCTFRTX) {                  if ((seqno - rcv_cr->lwe) >= RQ_SIZE) @@ -465,6 +507,8 @@ static void __frcti_rcv(struct frcti *       frcti,                  if (frcti->rq[pos] != -1)                          goto drop_packet; /* Duplicate in rq. */ + +                fd = frcti->fd;          } else {                  rcv_cr->lwe = seqno;          } @@ -475,10 +519,16 @@ static void __frcti_rcv(struct frcti *       frcti,          pthread_rwlock_unlock(&frcti->lock); +        if (fd != -1) +                timerwheel_ack(fd, frcti); +          return;   drop_packet:          pthread_rwlock_unlock(&frcti->lock); + +        frct_send_ack(frcti); +          shm_rdrbuff_remove(ai.rdrb, idx);          return;  } @@ -492,7 +542,7 @@ int frcti_filter(struct fqueue * fq)          struct frcti *       frcti;          struct shm_rbuff *   rb; -        while(fq->next < fq->fqsize) { +        while (fq->next < fq->fqsize) {                  if (fq->fqueue[fq->next + 1] != FLOW_PKT)                          return 1; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index b0efe9ab..809117b8 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -52,5 +52,6 @@ message ipcp_msg {          optional layer_info_msg layer_info =  9;          optional int32 response            = 10;          optional string comp               = 11; -        optional int32 result              = 12; +        optional uint32 timeo_sec          = 12; +        optional int32 result              = 13;  }; diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c deleted file mode 100644 index 0572c7b7..00000000 --- a/src/lib/rxmwheel.c +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2020 - * - * Timerwheel - * - *    Dimitri Staessens <dimitri.staessens@ugent.be> - *    Sander Vrijders   <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include <ouroboros/list.h> - -#define RXMQ_S     14                 /* defines #slots           */ -#define RXMQ_M     34                 /* defines max delay  (ns)  */ -#define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution (ns)  */ -#define RXMQ_SLOTS (1 << RXMQ_S) -#define RXMQ_MAX   (1 << RXMQ_M)      /* us                       */ - -/* Overflow limits range to about 6 hours. */ -#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) -#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) - -struct rxm { -        struct list_head     next; -        uint32_t             seqno; -        struct shm_du_buff * sdb; -        uint8_t *            head; -        uint8_t *            tail; -        time_t               t0;    /* Time when original was sent (us). */ -        size_t               mul;   /* RTO multiplier.                   */ -        struct frcti *       frcti; -}; - -struct rxmwheel { -        struct list_head wheel[RXMQ_SLOTS]; - -        size_t           prv; /* Last processed slot. */ -        pthread_mutex_t  lock; -}; - -static void rxmwheel_destroy(struct rxmwheel * rw) -{ -        size_t             i; -        struct list_head * p; -        struct list_head * h; - -        pthread_mutex_destroy(&rw->lock); - -        for (i = 0; i < RXMQ_SLOTS; ++i) { -                list_for_each_safe(p, h, &rw->wheel[i]) { -                        struct rxm * rxm = list_entry(p, struct rxm, next); -                        list_del(&rxm->next); -                        shm_du_buff_ack(rxm->sdb); -                        ipcp_sdb_release(rxm->sdb); -                        free(rxm); -                } -        } -} - -static struct rxmwheel * rxmwheel_create(void) -{ -        struct rxmwheel * rw; -        struct timespec   now; -        size_t            i; - -        rw = malloc(sizeof(*rw)); -        if (rw == NULL) -                return NULL; - -        if (pthread_mutex_init(&rw->lock, NULL)) { -                free(rw); -                return NULL; -        } - -        clock_gettime(PTHREAD_COND_CLOCK, &now); - -        /* Mark the previous timeslot as the last one processed. */ -        rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); - -        for (i = 0; i < RXMQ_SLOTS; ++i) -                list_head_init(&rw->wheel[i]); - -        return rw; -} - -static void rxmwheel_move(struct rxmwheel * rw) -{ -        struct timespec    now; -        struct list_head * p; -        struct list_head * h; -        size_t             slot; -        size_t             i; - -        pthread_mutex_lock(&rw->lock); - -        pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, -                             (void *) &rw->lock); - -        clock_gettime(PTHREAD_COND_CLOCK, &now); - -        slot = ts_to_slot(now); - -        i = rw->prv; - -        if (slot < i) -                slot += RXMQ_SLOTS; - -        while (i++ < slot) { -                list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) { -                        struct rxm *         r; -                        struct frct_cr *     snd_cr; -                        struct frct_cr *     rcv_cr; -                        size_t               rslot; -                        ssize_t              idx; -                        struct shm_du_buff * sdb; -                        uint8_t *            head; -                        struct flow *        f; -                        int                  fd; -                        uint32_t             snd_lwe; -                        uint32_t             rcv_lwe; -                        time_t               rto; - -                        r = list_entry(p, struct rxm, next); - -                        list_del(&r->next); - -                        snd_cr = &r->frcti->snd_cr; -                        rcv_cr = &r->frcti->rcv_cr; -                        fd     = r->frcti->fd; -                        f      = &ai.flows[fd]; - -                        shm_du_buff_ack(r->sdb); - -                        pthread_rwlock_wrlock(&r->frcti->lock); - -                        snd_lwe = snd_cr->lwe; -                        rcv_lwe = rcv_cr->lwe; -                        rto     = r->frcti->rto; -                        /* Assume last RTX is the one that's ACK'd. */ -                        if (r->frcti->probe -                            && (r->frcti->rttseq + 1) == r->seqno) -                                r->frcti->t_probe = now; - -                        pthread_rwlock_unlock(&r->frcti->lock); - -                        /* Has been ack'd, remove. */ -                        if ((int) (r->seqno - snd_lwe) < 0) { -                                ipcp_sdb_release(r->sdb); -                                free(r); -                                continue; -                        } - -                        /* Check for r-timer expiry. */ -                        if (ts_to_ns(now) - r->t0 > r->frcti->r) { -                                ipcp_sdb_release(r->sdb); -                                free(r); -                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); -                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); -                                continue; -                        } - -                        /* Copy the payload, safe rtx in other layers. */ -                        if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { -                                ipcp_sdb_release(r->sdb); -                                free(r); -                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); -                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); -                                continue; -                        } - -                        idx = shm_du_buff_get_idx(sdb); - -                        head = shm_du_buff_head(sdb); -                        memcpy(head, r->head, r->tail - r->head); - -                        ipcp_sdb_release(r->sdb); - -                        ((struct frct_pci *) head)->ackno = hton32(rcv_lwe); - -                        /* Retransmit the copy. */ -                        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { -                                ipcp_sdb_release(sdb); -                                free(r); -                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); -                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); -                                continue; -                        } - -                        /* Reschedule. */ -                        shm_du_buff_wait_ack(sdb); - -                        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - -                        r->head = head; -                        r->tail = shm_du_buff_tail(sdb); -                        r->sdb  = sdb; - -                        /* Schedule at least in the next time slot */ -                        rslot = (slot + MAX((rto >> RXMQ_R), 1)) -                                & (RXMQ_SLOTS - 1); - -                        list_add_tail(&r->next, &rw->wheel[rslot]); -                } -        } - -        rw->prv = slot & (RXMQ_SLOTS - 1); - -        pthread_cleanup_pop(true); -} - -static int rxmwheel_add(struct rxmwheel *    rw, -                        struct frcti *       frcti, -                        uint32_t             seqno, -                        struct shm_du_buff * sdb) -{ -        struct timespec now; -        struct rxm *    r; -        size_t          slot; - -        r = malloc(sizeof(*r)); -        if (r == NULL) -                return -ENOMEM; - -        clock_gettime(PTHREAD_COND_CLOCK, &now); - -        r->t0    = ts_to_ns(now); -        r->mul   = 0; -        r->seqno = seqno; -        r->sdb   = sdb; -        r->head  = shm_du_buff_head(sdb); -        r->tail  = shm_du_buff_tail(sdb); -        r->frcti = frcti; - -        pthread_rwlock_rdlock(&r->frcti->lock); - -        slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); - -        pthread_rwlock_unlock(&r->frcti->lock); - -        pthread_mutex_lock(&rw->lock); - -        list_add_tail(&r->next, &rw->wheel[slot]); - -        shm_du_buff_wait_ack(sdb); - -        pthread_mutex_unlock(&rw->lock); - -        return 0; -} diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c new file mode 100644 index 00000000..33fcbc1c --- /dev/null +++ b/src/lib/timerwheel.c @@ -0,0 +1,409 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Timerwheel + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/list.h> + +#define RXMQ_SLOTS (1 << 8)           /* #slots / level.           */ +#define RXMQ_LVLS  3                  /* #levels, bump for DTN.    */ +#define RXMQ_BUMP  4                  /* factor to bump lvl.       */ +#define RXMQ_RES   20                 /* res (ns) of lowest lvl.   */ + +#define ACKQ_SLOTS (1 << 7)           /* #slots for delayed ACK.   */ +#define ACKQ_RES   20                 /* resolution for dACK.      */ + +/* Overflow limits range to about 6 hours. */ +#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) +#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES) +#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES) + +struct rxm { +        struct list_head     next; +        uint32_t             seqno; +        struct shm_du_buff * sdb; +        uint8_t *            head; +        uint8_t *            tail; +        time_t               t0;      /* Time when original was sent (us). */ +        size_t               mul;     /* RTO multiplier.                   */ +        struct frcti *       frcti; +        int                  fd; +        int                  flow_id; /* Prevent rtx when fd reused.       */ +}; + +struct ack { +        struct list_head next; +        struct frcti *   frcti; +        int              fd; +        int              flow_id; +}; + +struct { +        /* +         * At a 1 ms min resolution, every level bumps the +         * resolution by a factor of 16. +         */ +        struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS]; + +        struct list_head acks[ACKQ_SLOTS]; +        bool             map[ACKQ_SLOTS][PROG_MAX_FLOWS]; + +        size_t           prv_rxm; /* Last processed rxm slot at lvl 0. */ +        size_t           prv_ack; /* Last processed ack slot.          */ +        pthread_mutex_t  lock; +} rw; + +static void timerwheel_fini(void) +{ +        size_t             i; +        size_t             j; +        struct list_head * p; +        struct list_head * h; + +        pthread_mutex_lock(&rw.lock); + +        for (i = 0; i < RXMQ_LVLS; ++i) { +                for (j = 0; j < RXMQ_SLOTS; j++) { +                        list_for_each_safe(p, h, &rw.rxms[i][j]) { +                                struct rxm * rxm; +                                rxm = list_entry(p, struct rxm, next); +                                list_del(&rxm->next); +                                shm_du_buff_ack(rxm->sdb); +                                ipcp_sdb_release(rxm->sdb); +                                free(rxm); +                        } +                } +        } + +        for (i = 0; i < ACKQ_SLOTS; ++i) { +                list_for_each_safe(p, h, &rw.acks[i]) { +                        struct ack * a = list_entry(p, struct ack, next); +                        list_del(&a->next); +                        free(a); +                } +        } + +        pthread_mutex_unlock(&rw.lock); + +        pthread_mutex_destroy(&rw.lock); +} + +static int timerwheel_init(void) +{ +        struct timespec   now; +        size_t            i; +        size_t            j; + +        if (pthread_mutex_init(&rw.lock, NULL)) +                return -1; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1); +        for (i = 0; i < RXMQ_LVLS; ++i) { +                for (j = 0; j < RXMQ_SLOTS; ++j) +                        list_head_init(&rw.rxms[i][j]); +        } + +        rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1); +        for (i = 0; i < ACKQ_SLOTS; ++i) +                list_head_init(&rw.acks[i]); + +        return 0; +} + +static void timerwheel_move(void) +{ +        struct timespec    now; +        struct list_head * p; +        struct list_head * h; +        size_t             rxm_slot; +        size_t             ack_slot; +        size_t             i; +        size_t             j; + +        pthread_mutex_lock(&rw.lock); + +        pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, +                             (void *) &rw.lock); + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        rxm_slot = ts_to_ns(now) >> RXMQ_RES; +        j = rw.prv_rxm; +        rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1); + +        for (i = 0; i < RXMQ_LVLS; ++i) { +                size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1); +                if (j_max_slot < j) +                        j_max_slot += RXMQ_SLOTS; + +                while (j++ < j_max_slot) { +                        list_for_each_safe(p, +                                           h, +                                           &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { +                                struct rxm *         r; +                                struct frct_cr *     snd_cr; +                                struct frct_cr *     rcv_cr; +                                size_t               rslot; +                                ssize_t              idx; +                                struct shm_du_buff * sdb; +                                uint8_t *            head; +                                struct flow *        f; +                                uint32_t             snd_lwe; +                                uint32_t             rcv_lwe; +                                time_t               rto; + +                                r = list_entry(p, struct rxm, next); + +                                list_del(&r->next); + +                                snd_cr = &r->frcti->snd_cr; +                                rcv_cr = &r->frcti->rcv_cr; +                                f      = &ai.flows[r->fd]; + +                                shm_du_buff_ack(r->sdb); + +                                if (f->frcti == NULL +                                    || f->flow_id != r->flow_id) { +                                        ipcp_sdb_release(r->sdb); +                                        free(r); +                                        continue; +                                } + +                                pthread_rwlock_wrlock(&r->frcti->lock); + +                                snd_lwe = snd_cr->lwe; +                                rcv_lwe = rcv_cr->lwe; +                                rto     = r->frcti->rto; + +                                pthread_rwlock_unlock(&r->frcti->lock); + +                                /* Has been ack'd, remove. */ +                                if ((int) (r->seqno - snd_lwe) < 0) { +                                        ipcp_sdb_release(r->sdb); +                                        free(r); +                                        continue; +                                } + +                                /* Check for r-timer expiry. */ +                                if (ts_to_ns(now) - r->t0 > r->frcti->r) { +                                        ipcp_sdb_release(r->sdb); +                                        free(r); +                                        shm_rbuff_set_acl(f->rx_rb, +                                                          ACL_FLOWDOWN); +                                        shm_rbuff_set_acl(f->tx_rb, +                                                          ACL_FLOWDOWN); +                                        continue; +                                } + +                                if (r->frcti->probe +                                    && (r->frcti->rttseq + 1) == r->seqno) +                                        r->frcti->probe = false; + +                                /* Copy the data, safe rtx in other layers. */ +                                if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { +                                        ipcp_sdb_release(r->sdb); +                                        free(r); +                                        shm_rbuff_set_acl(f->rx_rb, +                                                          ACL_FLOWDOWN); +                                        shm_rbuff_set_acl(f->tx_rb, +                                                          ACL_FLOWDOWN); +                                        continue; +                                } + +                                idx = shm_du_buff_get_idx(sdb); + +                                head = shm_du_buff_head(sdb); +                                memcpy(head, r->head, r->tail - r->head); + +                                ipcp_sdb_release(r->sdb); + +                                ((struct frct_pci *) head)->ackno = +                                        hton32(rcv_lwe); + +                                /* Retransmit the copy. */ +                                if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { +                                        ipcp_sdb_release(sdb); +                                        free(r); +                                        shm_rbuff_set_acl(f->rx_rb, +                                                          ACL_FLOWDOWN); +                                        shm_rbuff_set_acl(f->tx_rb, +                                                          ACL_FLOWDOWN); +                                        continue; +                                } + +                                /* Reschedule. */ +                                shm_du_buff_wait_ack(sdb); + +                                shm_flow_set_notify(f->set, +                                                    f->flow_id, +                                                    FLOW_PKT); + +                                r->head = head; +                                r->tail = shm_du_buff_tail(sdb); +                                r->sdb  = sdb; +                                r->mul++; + +                                /* Schedule at least in the next time slot. */ +                                rslot = (rxm_slot +                                         + MAX(((rto * r->mul) >> RXMQ_RES), 1)) +                                        & (RXMQ_SLOTS - 1); + +                                list_add_tail(&r->next, &rw.rxms[i][rslot]); +                        } +                } +                /* Move up a level in the wheel. */ +                rxm_slot >>= RXMQ_BUMP; +        } + +        ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; + +        j = rw.prv_ack; + +        if (ack_slot < j) +                ack_slot += ACKQ_SLOTS; + +        while (j++ < ack_slot) { +                list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) { +                        struct ack *  a; +                        struct flow * f; + +                        a = list_entry(p, struct ack, next); + +                        list_del(&a->next); + +                        f = &ai.flows[a->fd]; + +                        rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false; + +                        if (f->flow_id == a->flow_id && f->frcti != NULL) +                                frct_send_ack(a->frcti); + +                        free(a); + +                } +        } + +        rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1); + +        pthread_cleanup_pop(true); +} + +static int timerwheel_rxm(struct frcti *       frcti, +                          uint32_t             seqno, +                          struct shm_du_buff * sdb) +{ +        struct timespec now; +        struct rxm *    r; +        size_t          slot; +        size_t          lvl = 0; +        time_t          rto_slot; + +        r = malloc(sizeof(*r)); +        if (r == NULL) +                return -ENOMEM; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        r->t0    = ts_to_ns(now); +        r->mul   = 0; +        r->seqno = seqno; +        r->sdb   = sdb; +        r->head  = shm_du_buff_head(sdb); +        r->tail  = shm_du_buff_tail(sdb); +        r->frcti = frcti; + +        pthread_rwlock_rdlock(&r->frcti->lock); + +        rto_slot = frcti->rto >> RXMQ_RES; +        slot     = r->t0 >> RXMQ_RES; + +        r->fd      = frcti->fd; +        r->flow_id = ai.flows[r->fd].flow_id; + +        pthread_rwlock_unlock(&r->frcti->lock); + +        while (rto_slot >= RXMQ_SLOTS) { +                ++lvl; +                rto_slot >>= RXMQ_BUMP; +                slot >>= RXMQ_BUMP; +        } + +        if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */ +                free(r); +                return -EPERM; +        } + +        slot = (slot + rto_slot) & (RXMQ_SLOTS - 1); + +        pthread_mutex_lock(&rw.lock); + +        list_add_tail(&r->next, &rw.rxms[lvl][slot]); + +        shm_du_buff_wait_ack(sdb); + +        pthread_mutex_unlock(&rw.lock); + +        return 0; +} + +static int timerwheel_ack(int            fd, +                          struct frcti * frcti) +{ +        struct timespec now; +        struct ack *    a; +        size_t          slot; + +        a = malloc(sizeof(*a)); +        if (a == NULL) +                return -ENOMEM; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        slot = DELT_ACK >> ACKQ_RES; +        if (slot >= ACKQ_SLOTS) { /* Out of timerwheel range. */ +                free(a); +                return -EPERM; +        } + +        slot = (((ts_to_ns(now) + DELT_ACK) >> ACKQ_RES) + 1) +                & (ACKQ_SLOTS - 1); + +        a->fd    = fd; +        a->frcti = frcti; +        a->flow_id = ai.flows[fd].flow_id; + +        pthread_mutex_lock(&rw.lock); + +        if (rw.map[slot][fd]) { +                pthread_mutex_unlock(&rw.lock); +                free(a); +                return 0; +        } + +        rw.map[slot][fd] = true; + +        list_add_tail(&a->next, &rw.acks[slot]); + +        pthread_mutex_unlock(&rw.lock); + +        return 0; +}  | 
