diff options
-rw-r--r-- | include/ouroboros/shm_du_buff.h | 2 | ||||
-rw-r--r-- | src/ipcpd/eth/eth.c | 4 | ||||
-rw-r--r-- | src/ipcpd/udp/main.c | 2 | ||||
-rw-r--r-- | src/ipcpd/unicast/dir/dht.c | 2 | ||||
-rw-r--r-- | src/ipcpd/unicast/dt.c | 6 | ||||
-rw-r--r-- | src/ipcpd/unicast/fa.c | 6 | ||||
-rw-r--r-- | src/lib/crypt.c | 3 | ||||
-rw-r--r-- | src/lib/dev.c | 131 | ||||
-rw-r--r-- | src/lib/frct.c | 2 | ||||
-rw-r--r-- | src/lib/shm_flow_set.c | 12 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 7 | ||||
-rw-r--r-- | src/lib/timerwheel.c | 2 |
12 files changed, 99 insertions, 80 deletions
diff --git a/include/ouroboros/shm_du_buff.h b/include/ouroboros/shm_du_buff.h index da350055..0b83f913 100644 --- a/include/ouroboros/shm_du_buff.h +++ b/include/ouroboros/shm_du_buff.h @@ -34,6 +34,8 @@ uint8_t * shm_du_buff_head(struct shm_du_buff * sdb); uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb); +size_t shm_du_buff_len(struct shm_du_buff * sdb); + uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, size_t size); diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 25f42fc8..53dc3b69 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -950,7 +950,7 @@ static void * eth_ipcp_packet_reader(void * o) deid = ntohs(e_frame->eid); if (deid == MGMT_EID) { #elif defined (BUILD_ETH_LLC) - if (length > 0x05FF) {/* DIX */ + if (length > 0x05FF) { /* DIX */ #ifndef HAVE_NETMAP ipcp_sdb_release(sdb); #endif @@ -1067,7 +1067,7 @@ static void * eth_ipcp_packet_writer(void * o) continue; } - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); if (shm_du_buff_head_alloc(sdb, ETH_HEADER_TOT_SIZE) == NULL) { diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 7def856b..d3104163 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -541,7 +541,7 @@ static void * ipcp_udp_packet_writer(void * o) continue; } - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); if (len > IPCP_UDP_MAX_PACKET_SIZE) { log_dbg("Packet length exceeds MTU."); ipcp_sdb_release(sdb); diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index a847edc0..65d7c3ec 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -2461,7 +2461,7 @@ static void * dht_handle_packet(void * o) pthread_cleanup_pop(true); - i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + i = shm_du_buff_len(cmd->sdb); msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); #ifndef __DHT_TEST__ diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index f2013809..9c16e5d0 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -435,7 +435,7 @@ static void packet_handler(int fd, uint8_t * head; size_t len; - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); #ifndef IPCP_FLOW_STATS (void) fd; @@ -781,7 +781,7 @@ int dt_write_packet(uint64_t dst_addr, assert(sdb); assert(dst_addr != ipcpi.dt_addr); - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); #ifdef IPCP_FLOW_STATS if (eid < PROG_RES_FDS) { @@ -815,7 +815,7 @@ int dt_write_packet(uint64_t dst_addr, goto fail_write; } - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); dt_pci.dst_addr = dst_addr; dt_pci.qc = qc; diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 508f2d73..5f3dd1a7 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -341,7 +341,7 @@ static void packet_handler(int fd, pthread_rwlock_wrlock(&fa.flows_lock); - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); #ifdef IPCP_FLOW_STATS ++flow->p_snd; @@ -453,7 +453,7 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg) pthread_cleanup_pop(true); - len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + len = shm_du_buff_len(cmd->sdb); if (len > MSGBUFSZ || len < sizeof(*msg)) { log_warn("Invalid flow allocation message (len: %zd)\n", len); free(cmd); @@ -988,7 +988,7 @@ void fa_np1_rcv(uint64_t eid, int fd; size_t len; - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); pthread_rwlock_wrlock(&fa.flows_lock); diff --git a/src/lib/crypt.c b/src/lib/crypt.c index e19981bc..2985fc6a 100644 --- a/src/lib/crypt.c +++ b/src/lib/crypt.c @@ -282,8 +282,7 @@ static int openssl_decrypt(struct flow * f, int in_sz; int tmp_sz; - in = shm_du_buff_head(sdb); - in_sz = shm_du_buff_tail(sdb) - in; + in_sz = shm_du_buff_len(sdb); if (in_sz < IVSZ) return -ECRYPT; diff --git a/src/lib/dev.c b/src/lib/dev.c index d73205e2..5d12beec 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -93,7 +93,7 @@ struct flow { struct shm_rbuff * tx_rb; struct shm_flow_set * set; int flow_id; - int oflags; + uint16_t oflags; qosspec_t qs; ssize_t part_idx; @@ -120,7 +120,7 @@ struct flow_set_entry { }; struct flow_set { - size_t idx; + size_t idx; struct timespec chk; /* Last keepalive check. */ uint32_t min; /* Minimum keepalive time in set. */ @@ -1214,6 +1214,52 @@ ssize_t flow_write(int fd, return -ENOMEM; } +static bool invalid_pkt(struct flow * flow, + struct shm_du_buff * sdb) +{ + if (shm_du_buff_len(sdb) == 0) + return true; + + if (flow->qs.ber == 0 && chk_crc(sdb) != 0) + return true; + + if (flow->qs.cypher_s > 0 && crypt_decrypt(flow, sdb) < 0) + return true; + + return false; +} + +static ssize_t flow_rx_sdb(struct flow * flow, + struct shm_du_buff ** sdb, + bool block, + struct timespec * abstime) +{ + ssize_t idx; + struct timespec now; + + idx = block ? shm_rbuff_read_b(flow->rx_rb, abstime) : + shm_rbuff_read(flow->rx_rb); + if (idx < 0) + return idx; + + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&ai.lock); + + flow->rcv_act = now; + + pthread_rwlock_unlock(&ai.lock); + + if (invalid_pkt(flow, *sdb)) { + shm_rdrbuff_remove(ai.rdrb, idx); + return -EAGAIN; + } + + return idx; +} + ssize_t flow_read(int fd, void * buf, size_t count) @@ -1221,7 +1267,6 @@ ssize_t flow_read(int fd, ssize_t idx; ssize_t n; uint8_t * packet; - struct shm_rbuff * rb; struct shm_du_buff * sdb; struct timespec abs; struct timespec now; @@ -1229,10 +1274,10 @@ ssize_t flow_read(int fd, struct timespec tictime; struct timespec * abstime = NULL; struct flow * flow; - bool noblock; + bool block; bool partrd; - if (fd < 0 || fd > PROG_MAX_FLOWS) + if (fd < 0 || fd >= PROG_MAX_FLOWS) return -EBADF; flow = &ai.flows[fd]; @@ -1241,24 +1286,23 @@ ssize_t flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->part_idx == DONE_PART) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); - flow->part_idx = NO_PART; - return 0; + return -ENOTALLOC; } - if (flow->flow_id < 0) { + if (flow->part_idx == DONE_PART) { pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; + flow->part_idx = NO_PART; + return 0; } - rb = flow->rx_rb; - noblock = flow->oflags & FLOWFRNOBLOCK; + block = !(flow->oflags & FLOWFRNOBLOCK); partrd = !(flow->oflags & FLOWFRNOPART); ts_add(&now, &tic, &tictime); - if (ai.flows[fd].rcv_timesout) { + if (flow->rcv_timesout) { ts_add(&now, &flow->rcv_timeo, &abs); abstime = &abs; } @@ -1268,8 +1312,7 @@ ssize_t flow_read(int fd, while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { pthread_rwlock_unlock(&ai.lock); - idx = noblock ? shm_rbuff_read(rb) : - shm_rbuff_read_b(rb, &tictime); + idx = flow_rx_sdb(flow, &sdb, block, &tictime); if (idx < 0) { frcti_tick(flow->frcti); @@ -1285,45 +1328,31 @@ ssize_t flow_read(int fd, ts_add(&tictime, &tic, &tictime); - pthread_rwlock_wrlock(&ai.lock); - continue; - } - - sdb = shm_rdrbuff_get(ai.rdrb, idx); - - pthread_rwlock_wrlock(&ai.lock); - - flow->rcv_act = tictime; - - if ((flow->qs.ber == 0 && chk_crc(sdb) != 0) || - shm_du_buff_head(sdb) == shm_du_buff_tail(sdb)) { - shm_rdrbuff_remove(ai.rdrb, idx); - idx = -EAGAIN; + pthread_rwlock_rdlock(&ai.lock); continue; } - if (flow->qs.cypher_s > 0 - && crypt_decrypt(flow, sdb) < 0) { - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -ENOMEM; - } + pthread_rwlock_rdlock(&ai.lock); frcti_rcv(flow->frcti, sdb); } } + sdb = shm_rdrbuff_get(ai.rdrb, idx); + frcti_tick(flow->frcti); pthread_rwlock_unlock(&ai.lock); - n = shm_rdrbuff_read(&packet, ai.rdrb, idx); + packet = shm_du_buff_head(sdb); + + n = shm_du_buff_len(sdb); assert(n >= 0); if (n <= (ssize_t) count) { memcpy(buf, packet, n); - shm_rdrbuff_remove(ai.rdrb, idx); + ipcp_sdb_release(sdb); pthread_rwlock_wrlock(&ai.lock); @@ -1337,7 +1366,6 @@ ssize_t flow_read(int fd, } else { if (partrd) { memcpy(buf, packet, count); - sdb = shm_rdrbuff_get(ai.rdrb, idx); shm_du_buff_head_release(sdb, n); pthread_rwlock_wrlock(&ai.lock); flow->part_idx = idx; @@ -1347,7 +1375,7 @@ ssize_t flow_read(int fd, pthread_rwlock_unlock(&ai.lock); return count; } else { - shm_rdrbuff_remove(ai.rdrb, idx); + ipcp_sdb_release(sdb); return -EMSGSIZE; } } @@ -1870,10 +1898,8 @@ int ipcp_flow_alloc_reply(int fd, int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) { - struct timespec now; - struct flow * flow; - struct shm_rbuff * rb; - ssize_t idx = -1; + struct flow * flow; + ssize_t idx = -1; assert(fd >= 0 && fd < SYS_MAX_FLOWS); assert(sdb); @@ -1884,27 +1910,14 @@ int ipcp_flow_read(int fd, assert(flow->flow_id >= 0); - rb = flow->rx_rb; - while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { pthread_rwlock_unlock(&ai.lock); - idx = shm_rbuff_read(rb); + idx = flow_rx_sdb(flow, sdb, false, NULL); if (idx < 0) return idx; - clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_wrlock(&ai.lock); - - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - if ((flow->qs.ber == 0 && chk_crc(*sdb) != 0) || - (shm_du_buff_head(*sdb) == shm_du_buff_tail(*sdb))) { - shm_rdrbuff_remove(ai.rdrb, idx); - continue; - } - - flow->rcv_act = now; + pthread_rwlock_rdlock(&ai.lock); frcti_rcv(flow->frcti, *sdb); } @@ -1913,8 +1926,6 @@ int ipcp_flow_read(int fd, pthread_rwlock_unlock(&ai.lock); - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - return 0; } diff --git a/src/lib/frct.c b/src/lib/frct.c index 99962868..a93a1006 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -681,7 +681,7 @@ static int __frcti_snd(struct frcti * frcti, bool rtx; assert(frcti); - assert(shm_du_buff_head(sdb) != shm_du_buff_tail(sdb)); + assert(shm_du_buff_len(sdb) != 0); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 5a9bee6c..d325a253 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -96,10 +96,8 @@ static struct shm_flow_set * flow_set_create(pid_t pid, if (shm_fd == -1) goto fail_shm_open; - if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) { - close(shm_fd); - goto fail_shm_open; - } + if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) + goto fail_truncate; shm_base = mmap(NULL, SHM_FLOW_SET_FILE_SIZE, @@ -108,11 +106,11 @@ static struct shm_flow_set * flow_set_create(pid_t pid, shm_fd, 0); - close(shm_fd); - if (shm_base == MAP_FAILED) goto fail_mmap; + close(shm_fd); + set->mtable = shm_base; set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS); set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); @@ -125,6 +123,8 @@ static struct shm_flow_set * flow_set_create(pid_t pid, fail_mmap: if (flags & O_CREAT) shm_unlink(fn); + fail_truncate: + close(shm_fd); fail_shm_open: free(set); fail_malloc: diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index dfa45af6..e283388f 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -532,6 +532,13 @@ uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb) return (uint8_t *) (sdb + 1) + sdb->du_tail; } +size_t shm_du_buff_len(struct shm_du_buff * sdb) +{ + assert(sdb); + + return sdb->du_tail - sdb->du_head; +} + uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, size_t size) { diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 580f838d..c3be08e0 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -337,7 +337,7 @@ static int timerwheel_rxm(struct frcti * frcti, r->mul = 0; r->seqno = seqno; r->frcti = frcti; - r->len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + r->len = shm_du_buff_len(sdb); #ifdef RXM_BUFFER_ON_HEAP r->pkt = malloc(r->len); if (r->pkt == NULL) { |