diff options
| -rw-r--r-- | include/ouroboros/ipcp-dev.h | 6 | ||||
| -rw-r--r-- | include/ouroboros/np1_flow.h | 19 | ||||
| -rw-r--r-- | src/ipcpd/eth/eth.c | 11 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.c | 10 | ||||
| -rw-r--r-- | src/ipcpd/udp/main.c | 20 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dt.c | 2 | ||||
| -rw-r--r-- | src/ipcpd/unicast/fa.c | 2 | ||||
| -rw-r--r-- | src/ipcpd/unicast/psched.c | 7 | ||||
| -rw-r--r-- | src/ipcpd/unicast/psched.h | 6 | ||||
| -rw-r--r-- | src/lib/dev.c | 91 | 
10 files changed, 140 insertions, 34 deletions
diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 6472c9fe..307cf3a2 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -47,6 +47,12 @@ int    ipcp_flow_read(int                   fd,  int    ipcp_flow_write(int                  fd,                         struct shm_du_buff * sdb); +int    np1_flow_read(int                   fd, +                     struct shm_du_buff ** sdb); + +int    np1_flow_write(int                  fd, +                      struct shm_du_buff * sdb); +  int    ipcp_flow_fini(int fd);  int    ipcp_flow_get_qoscube(int         fd, diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index b764de91..fdef443b 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -27,13 +27,24 @@  #include <unistd.h> -int  np1_flow_alloc(pid_t     n_pid, -                    int       flow_id, -                    qosspec_t qs); +int  np1_flow_alloc(pid_t n_pid, +                    int   flow_id);  int  np1_flow_resp(int flow_id); -int  np1_flow_dealloc(int flow_id, +int  np1_flow_dealloc(int    flow_id,                        time_t timeo); +static const qosspec_t qos_np1 = { +        .delay        = UINT32_MAX, +        .bandwidth    = 0, +        .availability = 0, +        .loss         = UINT32_MAX, +        .ber          = UINT32_MAX, +        .in_order     = 0, +        .max_gap      = UINT32_MAX, +        .cypher_s     = 0, +        .timeout      = 0 +}; +  #endif /* OUROBOROS_NP1_FLOW_H */ diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 53dc3b69..b7b3a41d 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1016,10 +1016,15 @@ static void * eth_ipcp_packet_reader(void * o)  #ifndef HAVE_NETMAP                          shm_du_buff_head_release(sdb, ETH_HEADER_TOT_SIZE);                          shm_du_buff_truncate(sdb, length); -                        ipcp_flow_write(fd, sdb);  #else -                        flow_write(fd, &e_frame->payload, length); +                        if (ipcp_sdb_reserve(&sdb, length)) +                                continue; + +                        buf = shm_du_buff_head(sdb); +                        memcpy(buf, &e_frame->payload, length);  #endif +                        if (np1_flow_write(fd, sdb) < 0) +                                ipcp_sdb_release(sdb);                  }          } @@ -1062,7 +1067,7 @@ static void * eth_ipcp_packet_writer(void * o)                          if (fqueue_type(fq) != FLOW_PKT)                                  continue; -                        if (ipcp_flow_read(fd, &sdb)) { +                        if (np1_flow_read(fd, &sdb)) {                                  log_dbg("Bad read from fd %d.", fd);                                  continue;                          } diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index d19d8e43..2426fbab 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -514,10 +514,8 @@ static void * mainloop(void * o)                                  break;                          } -                        qs = msg_to_spec(msg->qosspec);                          fd = np1_flow_alloc(msg->pid, -                                            msg->flow_id, -                                            qs); +                                            msg->flow_id);                          if (fd < 0) {                                  log_err("Failed allocating fd on flow_id %d.",                                          msg->flow_id); @@ -525,6 +523,7 @@ static void * mainloop(void * o)                                  break;                          } +                        qs = msg_to_spec(msg->qosspec);                          ret_msg.result =                                  ipcpi.ops->ipcp_flow_alloc(fd,                                                             msg->hash.data, @@ -549,10 +548,8 @@ static void * mainloop(void * o)                                  break;                          } -                        qs = msg_to_spec(msg->qosspec);                          fd = np1_flow_alloc(msg->pid, -                                            msg->flow_id, -                                            qs); +                                            msg->flow_id);                          if (fd < 0) {                                  log_err("Failed allocating fd on flow_id %d.",                                          msg->flow_id); @@ -560,6 +557,7 @@ static void * mainloop(void * o)                                  break;                          } +                        qs = msg_to_spec(msg->qosspec);                          ret_msg.result =                                  ipcpi.ops->ipcp_flow_join(fd,                                                            msg->hash.data, diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index d3104163..6e32638d 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -450,9 +450,11 @@ static void * ipcp_udp_packet_reader(void * o)          eid_p = (uint32_t *) buf;          while (true) { -                struct mgmt_frame * frame; -                struct sockaddr_in  r_saddr; -                socklen_t           len; +                struct mgmt_frame *  frame; +                struct sockaddr_in   r_saddr; +                socklen_t            len; +                struct shm_du_buff * sdb; +                uint8_t *            head;                  len = sizeof(r_saddr); @@ -493,7 +495,15 @@ static void * ipcp_udp_packet_reader(void * o)                          continue;                  } -                flow_write(eid, data, n - sizeof(eid)); +                n-= sizeof(eid); + +                if (ipcp_sdb_reserve(&sdb, n)) +                        continue; + +                head = shm_du_buff_head(sdb); +                memcpy(head, data, n); +                if (np1_flow_write(eid, sdb) < 0) +                        ipcp_sdb_release(sdb);          }          return 0; @@ -536,7 +546,7 @@ static void * ipcp_udp_packet_writer(void * o)                          if (fqueue_type(fq) != FLOW_PKT)                                  continue; -                        if (ipcp_flow_read(fd, &sdb)) { +                        if (np1_flow_read(fd, &sdb)) {                                  log_dbg("Bad read from fd %d.", fd);                                  continue;                          } diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index 9c16e5d0..9cc53edc 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -713,7 +713,7 @@ void dt_fini(void)  int dt_start(void)  { -        dt.psched = psched_create(packet_handler); +        dt.psched = psched_create(packet_handler, ipcp_flow_read);          if (dt.psched == NULL) {                  log_err("Failed to create N-1 packet scheduler.");                  return -1; diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 5f3dd1a7..345d4031 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -756,7 +756,7 @@ int fa_start(void)          int                 pol;          int                 max; -        fa.psched = psched_create(packet_handler); +        fa.psched = psched_create(packet_handler, np1_flow_read);          if (fa.psched == NULL) {                  log_err("Failed to start packet scheduler.");                  goto fail_psched; diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c index 33ac5afe..bb452726 100644 --- a/src/ipcpd/unicast/psched.c +++ b/src/ipcpd/unicast/psched.c @@ -50,6 +50,7 @@ static int qos_prio [] = {  struct psched {          fset_t *         set[QOS_CUBE_MAX];          next_packet_fn_t callback; +        read_fn_t        read;          pthread_t        readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];  }; @@ -101,7 +102,7 @@ static void * packet_reader(void * o)                                  notifier_event(NOTIFY_DT_FLOW_UP, &fd);                                  break;                          case FLOW_PKT: -                                if (ipcp_flow_read(fd, &sdb)) +                                if (sched->read(fd, &sdb) < 0)                                          continue;                                  sched->callback(fd, qc, sdb); @@ -117,7 +118,8 @@ static void * packet_reader(void * o)          return (void *) 0;  } -struct psched * psched_create(next_packet_fn_t callback) +struct psched * psched_create(next_packet_fn_t callback, +                              read_fn_t        read)  {          struct psched *       psched;          struct sched_info *   infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; @@ -131,6 +133,7 @@ struct psched * psched_create(next_packet_fn_t callback)                  goto fail_malloc;          psched->callback = callback; +        psched->read     = read;          for (i = 0; i < QOS_CUBE_MAX; ++i) {                  psched->set[i] = fset_create(); diff --git a/src/ipcpd/unicast/psched.h b/src/ipcpd/unicast/psched.h index 1f22b34b..654d73d9 100644 --- a/src/ipcpd/unicast/psched.h +++ b/src/ipcpd/unicast/psched.h @@ -30,7 +30,11 @@ typedef void (* next_packet_fn_t)(int                  fd,                                    qoscube_t            qc,                                    struct shm_du_buff * sdb); -struct psched * psched_create(next_packet_fn_t callback); +typedef int (* read_fn_t)(int                   fd, +                          struct shm_du_buff ** sdb); + +struct psched * psched_create(next_packet_fn_t callback, +                              read_fn_t        read);  void            psched_destroy(struct psched * psched); diff --git a/src/lib/dev.c b/src/lib/dev.c index ac885711..b3e9c69e 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -38,6 +38,7 @@  #include <ouroboros/sockets.h>  #include <ouroboros/fccntl.h>  #include <ouroboros/bitmap.h> +#include <ouroboros/np1_flow.h>  #include <ouroboros/pthread.h>  #include <ouroboros/random.h>  #include <ouroboros/shm_flow_set.h> @@ -1330,7 +1331,7 @@ ssize_t flow_read(int    fd,                          idx = flow_rx_sdb(flow, &sdb, block, &tictime);                          if (idx < 0) { -                                if (idx != -ETIMEDOUT) +                                if (idx != -ETIMEDOUT && idx != -EAGAIN)                                          return idx;                                  if (abstime != NULL @@ -1740,12 +1741,9 @@ ssize_t fevent(struct flow_set *       set,  /* ipcp-dev functions. */  int np1_flow_alloc(pid_t     n_pid, -                   int       flow_id, -                   qosspec_t qs) +                   int       flow_id)  { -        qs.cypher_s = 0; /* No encryption ctx for np1 */ -        qs.in_order = 0; /* No frct for np1           */ -        return flow_init(flow_id, n_pid, qs, NULL, 0); +        return flow_init(flow_id, n_pid, qos_np1, NULL, 0);  }  int np1_flow_dealloc(int    flow_id, @@ -1855,9 +1853,7 @@ int ipcp_flow_req_arr(const uint8_t * dst,                  return -1;          } -        qs.cypher_s = 0; /* No encryption ctx for np1 */ -        qs.in_order = 0; /* No frct for np1           */ -        fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs, NULL, 0); +        fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0);          irm_msg__free_unpacked(recv_msg, NULL); @@ -1928,8 +1924,14 @@ int ipcp_flow_read(int                   fd,                  pthread_rwlock_unlock(&ai.lock);                  idx = flow_rx_sdb(flow, sdb, false, NULL); -                if (idx < 0) +                if (idx < 0) { +                        if (idx == -EAGAIN) { +                                pthread_rwlock_rdlock(&ai.lock); +                                continue; +                        } +                          return idx; +                }                  pthread_rwlock_rdlock(&ai.lock); @@ -1964,7 +1966,74 @@ int ipcp_flow_write(int                  fd,                  return -EPERM;          } -        ret = flow_tx_sdb(flow, sdb, false, NULL); +        pthread_rwlock_unlock(&ai.lock); + +        ret = flow_tx_sdb(flow, sdb, true, NULL); + +        return ret; +} + +int np1_flow_read(int                   fd, +                  struct shm_du_buff ** sdb) +{ +        struct flow *    flow; +        ssize_t          idx = -1; + +        assert(fd >= 0 && fd < SYS_MAX_FLOWS); +        assert(sdb); + +        flow = &ai.flows[fd]; + +        assert(flow->flow_id >= 0); + +        pthread_rwlock_rdlock(&ai.lock); + +        idx = shm_rbuff_read(flow->rx_rb);; +        if (idx < 0) { +                pthread_rwlock_unlock(&ai.lock); +                return idx; +        } + +        pthread_rwlock_unlock(&ai.lock); + +        *sdb = shm_rdrbuff_get(ai.rdrb, idx); + +        return 0; +} + +int np1_flow_write(int                  fd, +                   struct shm_du_buff * sdb) +{ +        struct flow * flow; +        int           ret; +        ssize_t       idx; + +        assert(fd >= 0 && fd < SYS_MAX_FLOWS); +        assert(sdb); + +        flow = &ai.flows[fd]; + +        pthread_rwlock_rdlock(&ai.lock); + +        if (flow->flow_id < 0) { +                pthread_rwlock_unlock(&ai.lock); +                return -ENOTALLOC; +        } + +        if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { +                pthread_rwlock_unlock(&ai.lock); +                return -EPERM; +        } + +        pthread_rwlock_unlock(&ai.lock); + +        idx = shm_du_buff_get_idx(sdb); + +        ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); +        if (ret < 0) +                shm_rdrbuff_remove(ai.rdrb, idx); +        else +                shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);          return ret;  }  | 
