diff options
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 269 |
1 files changed, 152 insertions, 117 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index e69fec26..2a5c3f83 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -20,9 +20,14 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + #include <ouroboros/endian.h> -#define _POSIX_C_SOURCE 200809L #include "config.h" #include <ouroboros/hash.h> @@ -39,7 +44,6 @@ #include <ouroboros/shm_rbuff.h> #include <ouroboros/utils.h> #include <ouroboros/fqueue.h> -#include <ouroboros/qoscube.h> #include <stdlib.h> #include <string.h> @@ -56,6 +60,8 @@ #define NO_PART -1 #define DONE_PART -2 +#define CRCLEN (sizeof(uint32_t)) + struct flow_set { size_t idx; }; @@ -89,9 +95,8 @@ struct flow { struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; struct shm_flow_set * set; - int port_id; + int flow_id; int oflags; - qoscube_t cube; qosspec_t spec; ssize_t part_idx; @@ -166,12 +171,12 @@ static void port_set_state(struct port * p, pthread_mutex_unlock(&p->state_lock); } -static enum port_state port_wait_assign(int port_id) +static enum port_state port_wait_assign(int flow_id) { enum port_state state; struct port * p; - p = &ai.ports[port_id]; + p = &ai.ports[flow_id]; pthread_mutex_lock(&p->state_lock); @@ -230,17 +235,16 @@ static void flow_clear(int fd) { memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); - ai.flows[fd].port_id = -1; + ai.flows[fd].flow_id = -1; ai.flows[fd].pid = -1; - ai.flows[fd].cube = QOS_CUBE_BE; } static void flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); - if (ai.flows[fd].port_id != -1) { - port_destroy(&ai.ports[ai.flows[fd].port_id]); + if (ai.flows[fd].flow_id != -1) { + port_destroy(&ai.ports[ai.flows[fd].flow_id]); bmp_release(ai.fds, fd); } @@ -256,7 +260,7 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) { shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].port_id, + ai.flows[fd].flow_id, FLOW_DEALLOC); shm_flow_set_close(ai.flows[fd].set); } @@ -267,9 +271,9 @@ static void flow_fini(int fd) flow_clear(fd); } -static int flow_init(int port_id, +static int flow_init(int flow_id, pid_t pid, - qoscube_t qc) + qosspec_t qs) { int fd; int err = -ENOMEM; @@ -282,11 +286,11 @@ static int flow_init(int port_id, goto fail_fds; } - ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, port_id); + ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, flow_id); if (ai.flows[fd].rx_rb == NULL) goto fail; - ai.flows[fd].tx_rb = shm_rbuff_open(pid, port_id); + ai.flows[fd].tx_rb = shm_rbuff_open(pid, flow_id); if (ai.flows[fd].tx_rb == NULL) goto fail; @@ -294,16 +298,15 @@ static int flow_init(int port_id, if (ai.flows[fd].set == NULL) goto fail; - ai.flows[fd].port_id = port_id; + ai.flows[fd].flow_id = flow_id; ai.flows[fd].oflags = FLOWFDEFAULT; ai.flows[fd].pid = pid; - ai.flows[fd].cube = qc; - ai.flows[fd].spec = qos_cube_to_spec(qc); ai.flows[fd].part_idx = NO_PART; + ai.flows[fd].spec = qs; - ai.ports[port_id].fd = fd; + ai.ports[flow_id].fd = fd; - port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED); pthread_rwlock_unlock(&ai.lock); @@ -447,7 +450,7 @@ static void fini(void) pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < PROG_MAX_FLOWS; ++i) { - if (ai.flows[i].port_id != -1) { + if (ai.flows[i].flow_id != -1) { ssize_t idx; shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) @@ -493,7 +496,6 @@ int flow_accept(qosspec_t * qs, irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg; int fd; - qoscube_t qc; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; @@ -521,15 +523,14 @@ int flow_accept(qosspec_t * qs, return res; } - if (!recv_msg->has_pid || !recv_msg->has_port_id || - !recv_msg->has_qoscube) { + if (!recv_msg->has_pid || !recv_msg->has_flow_id || + recv_msg->qosspec == NULL) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - qc = recv_msg->qoscube; - - fd = flow_init(recv_msg->port_id, recv_msg->pid, recv_msg->qoscube); + fd = flow_init(recv_msg->flow_id, recv_msg->pid, + msg_to_spec(recv_msg->qosspec)); irm_msg__free_unpacked(recv_msg, NULL); @@ -538,12 +539,10 @@ int flow_accept(qosspec_t * qs, pthread_rwlock_wrlock(&ai.lock); - /* FIXME: check if FRCT is needed based on qc? */ - assert(ai.flows[fd].frcti == NULL); - if (qc != QOS_CUBE_RAW) { - ai.flows[fd].frcti = frcti_create(fd, qc); + if (ai.flows[fd].spec.in_order != 0) { + ai.flows[fd].frcti = frcti_create(fd); if (ai.flows[fd].frcti == NULL) { flow_fini(fd); pthread_rwlock_unlock(&ai.lock); @@ -563,21 +562,21 @@ int flow_alloc(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - qoscube_t qc = QOS_CUBE_RAW; - int fd; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.has_qoscube = true; - msg.pid = ai.pid; + irm_msg_t msg = IRM_MSG__INIT; + qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT; + irm_msg_t * recv_msg; + int fd; +#ifdef QOS_DISABLE_CRC if (qs != NULL) - qc = qos_spec_to_cube(*qs); - - msg.qoscube = qc; + qs->ber = 1; +#endif + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; + msg.dst = (char *) dst; + msg.has_pid = true; + msg.pid = ai.pid; + qs_msg = spec_to_msg(qs); + msg.qosspec = &qs_msg; if (timeo != NULL) { msg.has_timeo_sec = true; @@ -601,12 +600,13 @@ int flow_alloc(const char * dst, return res; } - if (!recv_msg->has_pid || !recv_msg->has_port_id) { + if (!recv_msg->has_pid || !recv_msg->has_flow_id) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + fd = flow_init(recv_msg->flow_id, recv_msg->pid, + qs == NULL ? qos_raw : *qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -617,8 +617,8 @@ int flow_alloc(const char * dst, assert(ai.flows[fd].frcti == NULL); - if (qc != QOS_CUBE_RAW) { - ai.flows[fd].frcti = frcti_create(fd, qc); + if (ai.flows[fd].spec.in_order != 0) { + ai.flows[fd].frcti = frcti_create(fd); if (ai.flows[fd].frcti == NULL) { flow_fini(fd); pthread_rwlock_unlock(&ai.lock); @@ -640,15 +640,15 @@ int flow_dealloc(int fd) return -EINVAL; msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_port_id = true; + msg.has_flow_id = true; msg.has_pid = true; msg.pid = ai.pid; pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].port_id >= 0); + assert(ai.flows[fd].flow_id >= 0); - msg.port_id = ai.flows[fd].port_id; + msg.flow_id = ai.flows[fd].flow_id; pthread_rwlock_unlock(&ai.lock); @@ -676,7 +676,6 @@ int fccntl(int fd, int cmd, ...) { - uint16_t sflags; uint32_t * fflags; uint16_t * cflags; va_list l; @@ -696,7 +695,7 @@ int fccntl(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -768,13 +767,13 @@ int fccntl(int fd, rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->port_id, + flow->flow_id, FLOW_DOWN); } else { rx_acl &= ~ACL_FLOWDOWN; tx_acl &= ~ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->port_id, + flow->flow_id, FLOW_UP); } @@ -788,11 +787,6 @@ int fccntl(int fd, goto einval; *fflags = flow->oflags; break; - case FRCTSFLAGS: - sflags = (uint16_t) va_arg(l, int); - if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags)) - goto eperm; - break; case FRCTGFLAGS: cflags = (uint16_t *) va_arg(l, int *); if (cflags == NULL) @@ -824,16 +818,40 @@ int fccntl(int fd, return -EPERM; } +static int chk_crc(struct shm_du_buff * sdb) +{ + uint32_t crc; + uint8_t * head = shm_du_buff_head(sdb); + uint8_t * tail = shm_du_buff_tail_release(sdb, CRCLEN); + + mem_hash(HASH_CRC32, &crc, head, tail - head); + + return !(crc == *((uint32_t *) tail)); +} + +static int add_crc(struct shm_du_buff * sdb) +{ + uint8_t * head = shm_du_buff_head(sdb); + uint8_t * tail = shm_du_buff_tail_alloc(sdb, CRCLEN); + if (tail == NULL) + return -1; + + mem_hash(HASH_CRC32, tail, head, tail - head); + + return 0; +} + ssize_t flow_write(int fd, const void * buf, size_t count) { - struct flow * flow; - ssize_t idx; - int ret; - int flags; - struct timespec abs; - struct timespec * abstime = NULL; + struct flow * flow; + ssize_t idx; + int ret; + int flags; + struct timespec abs; + struct timespec * abstime = NULL; + struct shm_du_buff * sdb; if (buf == NULL) return 0; @@ -847,7 +865,7 @@ ssize_t flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -880,18 +898,25 @@ ssize_t flow_write(int fd, if (idx < 0) return idx; - if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) { + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + if (frcti_snd(flow->frcti, sdb) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOMEM; + } + + if (flow->spec.ber == 0 && add_crc(sdb) != 0) { shm_rdrbuff_remove(ai.rdrb, idx); return -ENOMEM; } pthread_rwlock_rdlock(&ai.lock); - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -906,7 +931,7 @@ ssize_t flow_read(int fd, { ssize_t idx; ssize_t n; - uint8_t * sdu; + uint8_t * packet; struct shm_rbuff * rb; struct shm_du_buff * sdb; struct timespec abs; @@ -929,7 +954,7 @@ ssize_t flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -955,23 +980,25 @@ ssize_t flow_read(int fd, if (idx < 0) return idx; sdb = shm_rdrbuff_get(ai.rdrb, idx); + if (flow->spec.ber == 0 && chk_crc(sdb) != 0) + continue; } while (frcti_rcv(flow->frcti, sdb) != 0); } } - n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); + n = shm_rdrbuff_read(&packet, ai.rdrb, idx); assert(n >= 0); if (n <= (ssize_t) count) { - memcpy(buf, sdu, n); + memcpy(buf, packet, n); shm_rdrbuff_remove(ai.rdrb, idx); flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; return n; } else { if (partrd) { - memcpy(buf, sdu, count); + memcpy(buf, packet, count); sdb = shm_rdrbuff_get(ai.rdrb, idx); shm_du_buff_head_release(sdb, n); flow->part_idx = idx; @@ -1053,7 +1080,7 @@ int fset_add(struct flow_set * set, int fd) { int ret; - size_t sdus; + size_t packets; size_t i; if (set == NULL || fd < 0 || fd > SYS_MAX_FLOWS) @@ -1061,11 +1088,11 @@ int fset_add(struct flow_set * set, pthread_rwlock_wrlock(&ai.lock); - ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id); - sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); - for (i = 0; i < sdus; i++) - shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); + packets = shm_rbuff_queued(ai.flows[fd].rx_rb); + for (i = 0; i < packets; i++) + shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1080,8 +1107,8 @@ void fset_del(struct flow_set * set, pthread_rwlock_wrlock(&ai.lock); - if (ai.flows[fd].port_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + if (ai.flows[fd].flow_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id); pthread_rwlock_unlock(&ai.lock); } @@ -1096,12 +1123,12 @@ bool fset_has(const struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (ai.flows[fd].flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return false; } - ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1); pthread_rwlock_unlock(&ai.lock); @@ -1177,35 +1204,35 @@ int fevent(struct flow_set * set, /* ipcp-dev functions. */ int np1_flow_alloc(pid_t n_pid, - int port_id, - qoscube_t qc) + int flow_id, + qosspec_t qs) { - return flow_init(port_id, n_pid, qc); + return flow_init(flow_id, n_pid, qs); } -int np1_flow_dealloc(int port_id) +int np1_flow_dealloc(int flow_id) { int fd; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[port_id].fd; + fd = ai.ports[flow_id].fd; pthread_rwlock_unlock(&ai.lock); return fd; } -int np1_flow_resp(int port_id) +int np1_flow_resp(int flow_id) { int fd; - if (port_wait_assign(port_id) != PORT_ID_ASSIGNED) + if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED) return -1; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[port_id].fd; + fd = ai.ports[flow_id].fd; pthread_rwlock_unlock(&ai.lock); @@ -1243,29 +1270,29 @@ int ipcp_create_r(pid_t pid, int ipcp_flow_req_arr(pid_t pid, const uint8_t * dst, size_t len, - qoscube_t qc) + qosspec_t qs) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg; + qosspec_msg_t qs_msg; + int fd; assert(dst != NULL); - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_pid = true; - msg.pid = pid; - msg.has_hash = true; - msg.hash.len = len; - msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = qc; + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_pid = true; + msg.pid = pid; + msg.has_hash = true; + msg.hash.len = len; + msg.hash.data = (uint8_t *) dst; + qs_msg = spec_to_msg(&qs); + msg.qosspec = &qs_msg; recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) return -EIRMD; - if (!recv_msg->has_port_id || !recv_msg->has_pid) { + if (!recv_msg->has_flow_id || !recv_msg->has_pid) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1275,7 +1302,7 @@ int ipcp_flow_req_arr(pid_t pid, return -1; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -1292,11 +1319,11 @@ int ipcp_flow_alloc_reply(int fd, assert(fd >= 0 && fd < SYS_MAX_FLOWS); msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.has_port_id = true; + msg.has_flow_id = true; pthread_rwlock_rdlock(&ai.lock); - msg.port_id = ai.flows[fd].port_id; + msg.flow_id = ai.flows[fd].flow_id; pthread_rwlock_unlock(&ai.lock); @@ -1333,7 +1360,7 @@ int ipcp_flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(flow->port_id >= 0); + assert(flow->flow_id >= 0); rb = flow->rx_rb; @@ -1352,6 +1379,8 @@ int ipcp_flow_read(int fd, if (idx < 0) return idx; *sdb = shm_rdrbuff_get(ai.rdrb, idx); + if (flow->spec.ber == 0 && chk_crc(*sdb) != 0) + continue; } while (frcti_rcv(flow->frcti, *sdb) != 0); return 0; @@ -1371,7 +1400,7 @@ int ipcp_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(flow->port_id >= 0); + assert(flow->flow_id >= 0); if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { pthread_rwlock_unlock(&ai.lock); @@ -1387,9 +1416,15 @@ int ipcp_flow_write(int fd, return -ENOMEM; } + if (flow->spec.ber == 0 && add_crc(sdb) != 0) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOMEM; + } + ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1435,7 +1470,7 @@ void ipcp_flow_fini(int fd) shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].port_id, + ai.flows[fd].flow_id, FLOW_DEALLOC); rx_rb = ai.flows[fd].rx_rb; @@ -1455,9 +1490,9 @@ int ipcp_flow_get_qoscube(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].port_id >= 0); + assert(ai.flows[fd].flow_id >= 0); - *cube = ai.flows[fd].cube; + *cube = qos_spec_to_cube(ai.flows[fd].spec); pthread_rwlock_unlock(&ai.lock); @@ -1491,14 +1526,14 @@ int local_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); |