diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/crypt.c | 3 | ||||
| -rw-r--r-- | src/lib/dev.c | 182 | ||||
| -rw-r--r-- | src/lib/frct.c | 1 | 
3 files changed, 163 insertions, 23 deletions
diff --git a/src/lib/crypt.c b/src/lib/crypt.c index 070f5113..043eae13 100644 --- a/src/lib/crypt.c +++ b/src/lib/crypt.c @@ -217,6 +217,9 @@ static int openssl_encrypt(struct flow *        f,          in = shm_du_buff_head(sdb);          in_sz = shm_du_buff_tail(sdb) - in; +        if (in_sz == 0) +                return 0; +          if (random_buffer(iv, IVSZ) < 0)                  goto fail_iv; diff --git a/src/lib/dev.c b/src/lib/dev.c index 0acc7455..4c21fcdf 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -68,6 +68,7 @@  #define SECMEMSZ  16384  #define SYMMKEYSZ 32  #define MSGBUFSZ  2048 +#define FLOWTIMEO 120 /* seconds */  enum port_state {          PORT_NULL = 0, @@ -102,6 +103,9 @@ struct flow {          pid_t                 pid; +        struct timespec       snd_act; +        struct timespec       rcv_act; +          bool                  snd_timesout;          bool                  rcv_timesout;          struct timespec       snd_timeo; @@ -119,6 +123,8 @@ struct flow_set_entry {  struct flow_set {          size_t idx; +        struct timespec  chk;   /* Last keepalive check */ +          struct list_head flows;          pthread_rwlock_t lock;  }; @@ -300,8 +306,11 @@ static int flow_init(int       flow_id,                       qosspec_t qs,                       uint8_t * s)  { -        int fd; -        int err = -ENOMEM; +        struct timespec now; +        int             fd; +        int             err = -ENOMEM; + +        clock_gettime(PTHREAD_COND_CLOCK, &now);          pthread_rwlock_wrlock(&ai.lock); @@ -328,6 +337,8 @@ static int flow_init(int       flow_id,          ai.flows[fd].pid      = pid;          ai.flows[fd].part_idx = NO_PART;          ai.flows[fd].qs       = qs; +        ai.flows[fd].snd_act  = now; +        ai.flows[fd].rcv_act  = now;          if (qs.cypher_s > 0) {                  assert(s != NULL); @@ -1033,6 +1044,43 @@ static int add_crc(struct shm_du_buff * sdb)          return 0;  } +static void flow_send_keepalive(int fd) +{ +        flow_write(fd, NULL, 0); +} + +static int flow_keepalive(int fd) +{ +        struct timespec    now; +        struct timespec    s_act; +        struct timespec    r_act; +        struct flow *      flow; +        int                flow_id; + +        flow = &ai.flows[fd]; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        pthread_rwlock_rdlock(&ai.lock); + +        s_act = flow->snd_act; +        r_act = flow->rcv_act; + +        flow_id = flow->flow_id; + +        pthread_rwlock_unlock(&ai.lock); + +        if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) { +                shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT); +                return -EFLOWDOWN; +        } + +        if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION) +                flow_send_keepalive(fd); + +        return 0; +} +  ssize_t flow_write(int          fd,                     const void * buf,                     size_t       count) @@ -1048,17 +1096,17 @@ ssize_t flow_write(int          fd,          struct shm_du_buff * sdb;          uint8_t *            ptr; -        if (buf == NULL) +        if (buf == NULL && count != 0)                  return 0; -        if (fd < 0 || fd > PROG_MAX_FLOWS) +        if (fd < 0 || fd >= PROG_MAX_FLOWS)                  return -EBADF;          flow = &ai.flows[fd];          clock_gettime(PTHREAD_COND_CLOCK, &abs); -        pthread_rwlock_rdlock(&ai.lock); +        pthread_rwlock_wrlock(&ai.lock);          if (flow->flow_id < 0) {                  pthread_rwlock_unlock(&ai.lock); @@ -1091,6 +1139,9 @@ ssize_t flow_write(int          fd,                          if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0)                                  return -ETIMEDOUT; +                        if (flow_keepalive(fd)) +                                return -EFLOWDOWN; +                          frcti_tick(flow->frcti);                          ts_add(&tictime, &tic, &tictime); @@ -1101,11 +1152,20 @@ ssize_t flow_write(int          fd,          if (idx < 0)                  return idx; -        memcpy(ptr, buf, count); +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        pthread_rwlock_wrlock(&ai.lock); + +        flow->snd_act = abs; + +        pthread_rwlock_unlock(&ai.lock); + +        if (count > 0) +                memcpy(ptr, buf, count);          pthread_rwlock_rdlock(&ai.lock); -        if (frcti_snd(flow->frcti, sdb) < 0) +        if (count != 0 && frcti_snd(flow->frcti, sdb) < 0)                  goto enomem;          if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) @@ -1114,6 +1174,8 @@ ssize_t flow_write(int          fd,          if (flow->qs.ber == 0 && add_crc(sdb) != 0)                  goto enomem; +        pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); +          if (flags & FLOWFWNOBLOCK)                  ret = shm_rbuff_write(flow->tx_rb, idx);          else @@ -1124,7 +1186,7 @@ ssize_t flow_write(int          fd,          else                  shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); -        pthread_rwlock_unlock(&ai.lock); +        pthread_cleanup_pop(true);          return ret < 0 ? (ssize_t) ret : (ssize_t) count; @@ -1144,6 +1206,7 @@ ssize_t flow_read(int    fd,          struct shm_rbuff *   rb;          struct shm_du_buff * sdb;          struct timespec      abs; +        struct timespec      now;          struct timespec      tic = {0, TICTIME};          struct timespec      tictime;          struct timespec *    abstime = NULL; @@ -1156,7 +1219,7 @@ ssize_t flow_read(int    fd,          flow = &ai.flows[fd]; -        clock_gettime(PTHREAD_COND_CLOCK, &abs); +        clock_gettime(PTHREAD_COND_CLOCK, &now);          pthread_rwlock_rdlock(&ai.lock); @@ -1175,15 +1238,14 @@ ssize_t flow_read(int    fd,          noblock = flow->oflags & FLOWFRNOBLOCK;          partrd = !(flow->oflags & FLOWFRNOPART); -        ts_add(&tic, &abs, &tictime); +        ts_add(&now, &tic, &tictime);          if (ai.flows[fd].rcv_timesout) { -                ts_add(&abs, &flow->rcv_timeo, &abs); +                ts_add(&now, &flow->rcv_timeo, &abs);                  abstime = &abs;          }          idx = flow->part_idx; -          if (idx < 0) {                  while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {                          pthread_rwlock_unlock(&ai.lock); @@ -1200,20 +1262,28 @@ ssize_t flow_read(int    fd,                                      && ts_diff_ns(&tictime, &abs) <= 0)                                          return -ETIMEDOUT; +                                if (flow_keepalive(fd) < 0) +                                        return -EFLOWDOWN; +                                  ts_add(&tictime, &tic, &tictime); -                                pthread_rwlock_rdlock(&ai.lock); + +                                pthread_rwlock_wrlock(&ai.lock);                                  continue;                          }                          sdb = shm_rdrbuff_get(ai.rdrb, idx); -                        if (flow->qs.ber == 0 && chk_crc(sdb) != 0) { -                                pthread_rwlock_rdlock(&ai.lock); + +                        pthread_rwlock_wrlock(&ai.lock); + +                        flow->rcv_act = tictime; + +                        if ((flow->qs.ber == 0 && chk_crc(sdb) != 0) || +                            shm_du_buff_head(sdb) == shm_du_buff_tail(sdb)) {                                  shm_rdrbuff_remove(ai.rdrb, idx); +                                idx = -EAGAIN;                                  continue;                          } -                        pthread_rwlock_rdlock(&ai.lock); -                          if (flow->qs.cypher_s > 0                              && crypt_decrypt(flow, sdb) < 0) {                                  pthread_rwlock_unlock(&ai.lock); @@ -1242,6 +1312,8 @@ ssize_t flow_read(int    fd,                  flow->part_idx = (partrd && n == (ssize_t) count) ?                          DONE_PART : NO_PART; +                flow->rcv_act = now; +                  pthread_rwlock_unlock(&ai.lock);                  return n;          } else { @@ -1251,6 +1323,9 @@ ssize_t flow_read(int    fd,                          shm_du_buff_head_release(sdb, n);                          pthread_rwlock_wrlock(&ai.lock);                          flow->part_idx = idx; + +                        flow->rcv_act = now; +                          pthread_rwlock_unlock(&ai.lock);                          return count;                  } else { @@ -1265,6 +1340,9 @@ ssize_t flow_read(int    fd,  struct flow_set * fset_create()  {          struct flow_set * set; +        struct timespec   now; + +        clock_gettime(PTHREAD_COND_CLOCK, &now);          set = malloc(sizeof(*set));          if (set == NULL) @@ -1281,6 +1359,8 @@ struct flow_set * fset_create()          if (!bmp_is_id_valid(ai.fqueues, set->idx))                  goto fail_bmp_alloc; +        set->chk = now; +          pthread_rwlock_unlock(&ai.lock);          list_head_init(&set->flows); @@ -1453,6 +1533,48 @@ bool fset_has(const struct flow_set * set,          return ret;  } +static void fset_keepalive(struct flow_set * set) +{ +        struct timespec    now; +        struct list_head * p; +        struct list_head * h; +        struct list_head   copy; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        pthread_rwlock_wrlock(&set->lock); + +        if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) { +                pthread_rwlock_unlock(&set->lock); +                return; +        } + +        set->chk = now; + +        list_head_init(©); + +        list_for_each(p, &set->flows) { +                struct flow_set_entry * c; +                struct flow_set_entry * e; +                e = list_entry(p, struct flow_set_entry, next); +                c = malloc(sizeof(*c)); +                if (c == NULL) +                        continue; +                c->fd = e->fd; +                list_add_tail(&c->next, ©); +        } + +        pthread_rwlock_unlock(&set->lock); + +        list_for_each_safe(p, h, ©) { +                struct flow_set_entry * e; +                e = list_entry(p, struct flow_set_entry, next); +                flow_send_keepalive(e->fd); +                list_del(&e->next); +                free(e); +        } +} +  int fqueue_next(struct fqueue * fq)  {          int fd; @@ -1525,6 +1647,7 @@ ssize_t fevent(struct flow_set *       set,                          ts_add(t, &tic, t);                          pthread_rwlock_rdlock(&ai.lock);                          timerwheel_move(); +                        fset_keepalive(set);                          pthread_rwlock_unlock(&ai.lock);                          continue;                  } @@ -1707,6 +1830,7 @@ int ipcp_flow_alloc_reply(int          fd,  int ipcp_flow_read(int                   fd,                     struct shm_du_buff ** sdb)  { +        struct timespec    now;          struct flow *      flow;          struct shm_rbuff * rb;          ssize_t            idx = -1; @@ -1729,11 +1853,18 @@ int ipcp_flow_read(int                   fd,                  if (idx < 0)                          return idx; -                pthread_rwlock_rdlock(&ai.lock); +                clock_gettime(PTHREAD_COND_CLOCK, &now); + +                pthread_rwlock_wrlock(&ai.lock);                  *sdb = shm_rdrbuff_get(ai.rdrb, idx); -                if (flow->qs.ber == 0 && chk_crc(*sdb) != 0) +                if ((flow->qs.ber == 0 && chk_crc(*sdb) != 0) || +                    (shm_du_buff_head(*sdb) == shm_du_buff_tail(*sdb))) { +                        shm_rdrbuff_remove(ai.rdrb, idx);                          continue; +                } + +                flow->rcv_act = now;                  frcti_rcv(flow->frcti, *sdb);          } @@ -1750,16 +1881,19 @@ int ipcp_flow_read(int                   fd,  int ipcp_flow_write(int                  fd,                      struct shm_du_buff * sdb)  { -        struct flow * flow; -        int           ret; -        ssize_t       idx; +        struct timespec now; +        struct flow *   flow; +        int             ret; +        ssize_t         idx;          assert(fd >= 0 && fd < SYS_MAX_FLOWS);          assert(sdb); +        clock_gettime(PTHREAD_COND_CLOCK, &now); +          flow = &ai.flows[fd]; -        pthread_rwlock_rdlock(&ai.lock); +        pthread_rwlock_wrlock(&ai.lock);          if (flow->flow_id < 0) {                  pthread_rwlock_unlock(&ai.lock); @@ -1792,6 +1926,8 @@ int ipcp_flow_write(int                  fd,          else                  shm_rdrbuff_remove(ai.rdrb, idx); +        flow->snd_act = now; +          pthread_rwlock_unlock(&ai.lock);          assert(ret <= 0); diff --git a/src/lib/frct.c b/src/lib/frct.c index ff938aec..e9741aaf 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -667,6 +667,7 @@ static int __frcti_snd(struct frcti *       frcti,          bool              rtx;          assert(frcti); +        assert(shm_du_buff_head(sdb) != shm_du_buff_tail(sdb));          snd_cr = &frcti->snd_cr;          rcv_cr = &frcti->rcv_cr;  | 
