summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri.staessens@ugent.be>2018-10-06 18:06:47 +0200
committerDimitri Staessens <dimitri.staessens@ugent.be>2018-10-06 18:06:47 +0200
commit0b2e5c5410580c755cef02114e51f15b19cfaffa (patch)
tree63d684e6057c9caa43739b599d54a72f9959d4f8 /src/lib/dev.c
parentbfc29ca20406ccd69363b0f9796987534318e7ae (diff)
parentd9ad3852613cda026d4520b5c608ada7433dd7d9 (diff)
downloadouroboros-0b2e5c5410580c755cef02114e51f15b19cfaffa.tar.gz
ouroboros-0b2e5c5410580c755cef02114e51f15b19cfaffa.zip
Merge branch 'testing' into be
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c269
1 files changed, 152 insertions, 117 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index e69fec26..2a5c3f83 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -20,9 +20,14 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200809L
+#endif
+
#include <ouroboros/endian.h>
-#define _POSIX_C_SOURCE 200809L
#include "config.h"
#include <ouroboros/hash.h>
@@ -39,7 +44,6 @@
#include <ouroboros/shm_rbuff.h>
#include <ouroboros/utils.h>
#include <ouroboros/fqueue.h>
-#include <ouroboros/qoscube.h>
#include <stdlib.h>
#include <string.h>
@@ -56,6 +60,8 @@
#define NO_PART -1
#define DONE_PART -2
+#define CRCLEN (sizeof(uint32_t))
+
struct flow_set {
size_t idx;
};
@@ -89,9 +95,8 @@ struct flow {
struct shm_rbuff * rx_rb;
struct shm_rbuff * tx_rb;
struct shm_flow_set * set;
- int port_id;
+ int flow_id;
int oflags;
- qoscube_t cube;
qosspec_t spec;
ssize_t part_idx;
@@ -166,12 +171,12 @@ static void port_set_state(struct port * p,
pthread_mutex_unlock(&p->state_lock);
}
-static enum port_state port_wait_assign(int port_id)
+static enum port_state port_wait_assign(int flow_id)
{
enum port_state state;
struct port * p;
- p = &ai.ports[port_id];
+ p = &ai.ports[flow_id];
pthread_mutex_lock(&p->state_lock);
@@ -230,17 +235,16 @@ static void flow_clear(int fd)
{
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
- ai.flows[fd].port_id = -1;
+ ai.flows[fd].flow_id = -1;
ai.flows[fd].pid = -1;
- ai.flows[fd].cube = QOS_CUBE_BE;
}
static void flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
- if (ai.flows[fd].port_id != -1) {
- port_destroy(&ai.ports[ai.flows[fd].port_id]);
+ if (ai.flows[fd].flow_id != -1) {
+ port_destroy(&ai.ports[ai.flows[fd].flow_id]);
bmp_release(ai.fds, fd);
}
@@ -256,7 +260,7 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL) {
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].port_id,
+ ai.flows[fd].flow_id,
FLOW_DEALLOC);
shm_flow_set_close(ai.flows[fd].set);
}
@@ -267,9 +271,9 @@ static void flow_fini(int fd)
flow_clear(fd);
}
-static int flow_init(int port_id,
+static int flow_init(int flow_id,
pid_t pid,
- qoscube_t qc)
+ qosspec_t qs)
{
int fd;
int err = -ENOMEM;
@@ -282,11 +286,11 @@ static int flow_init(int port_id,
goto fail_fds;
}
- ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, port_id);
+ ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, flow_id);
if (ai.flows[fd].rx_rb == NULL)
goto fail;
- ai.flows[fd].tx_rb = shm_rbuff_open(pid, port_id);
+ ai.flows[fd].tx_rb = shm_rbuff_open(pid, flow_id);
if (ai.flows[fd].tx_rb == NULL)
goto fail;
@@ -294,16 +298,15 @@ static int flow_init(int port_id,
if (ai.flows[fd].set == NULL)
goto fail;
- ai.flows[fd].port_id = port_id;
+ ai.flows[fd].flow_id = flow_id;
ai.flows[fd].oflags = FLOWFDEFAULT;
ai.flows[fd].pid = pid;
- ai.flows[fd].cube = qc;
- ai.flows[fd].spec = qos_cube_to_spec(qc);
ai.flows[fd].part_idx = NO_PART;
+ ai.flows[fd].spec = qs;
- ai.ports[port_id].fd = fd;
+ ai.ports[flow_id].fd = fd;
- port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
+ port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED);
pthread_rwlock_unlock(&ai.lock);
@@ -447,7 +450,7 @@ static void fini(void)
pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
- if (ai.flows[i].port_id != -1) {
+ if (ai.flows[i].flow_id != -1) {
ssize_t idx;
shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN);
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
@@ -493,7 +496,6 @@ int flow_accept(qosspec_t * qs,
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg;
int fd;
- qoscube_t qc;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_pid = true;
@@ -521,15 +523,14 @@ int flow_accept(qosspec_t * qs,
return res;
}
- if (!recv_msg->has_pid || !recv_msg->has_port_id ||
- !recv_msg->has_qoscube) {
+ if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
+ recv_msg->qosspec == NULL) {
irm_msg__free_unpacked(recv_msg, NULL);
return -EIRMD;
}
- qc = recv_msg->qoscube;
-
- fd = flow_init(recv_msg->port_id, recv_msg->pid, recv_msg->qoscube);
+ fd = flow_init(recv_msg->flow_id, recv_msg->pid,
+ msg_to_spec(recv_msg->qosspec));
irm_msg__free_unpacked(recv_msg, NULL);
@@ -538,12 +539,10 @@ int flow_accept(qosspec_t * qs,
pthread_rwlock_wrlock(&ai.lock);
- /* FIXME: check if FRCT is needed based on qc? */
-
assert(ai.flows[fd].frcti == NULL);
- if (qc != QOS_CUBE_RAW) {
- ai.flows[fd].frcti = frcti_create(fd, qc);
+ if (ai.flows[fd].spec.in_order != 0) {
+ ai.flows[fd].frcti = frcti_create(fd);
if (ai.flows[fd].frcti == NULL) {
flow_fini(fd);
pthread_rwlock_unlock(&ai.lock);
@@ -563,21 +562,21 @@ int flow_alloc(const char * dst,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- qoscube_t qc = QOS_CUBE_RAW;
- int fd;
-
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.has_qoscube = true;
- msg.pid = ai.pid;
+ irm_msg_t msg = IRM_MSG__INIT;
+ qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT;
+ irm_msg_t * recv_msg;
+ int fd;
+#ifdef QOS_DISABLE_CRC
if (qs != NULL)
- qc = qos_spec_to_cube(*qs);
-
- msg.qoscube = qc;
+ qs->ber = 1;
+#endif
+ msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
+ msg.dst = (char *) dst;
+ msg.has_pid = true;
+ msg.pid = ai.pid;
+ qs_msg = spec_to_msg(qs);
+ msg.qosspec = &qs_msg;
if (timeo != NULL) {
msg.has_timeo_sec = true;
@@ -601,12 +600,13 @@ int flow_alloc(const char * dst,
return res;
}
- if (!recv_msg->has_pid || !recv_msg->has_port_id) {
+ if (!recv_msg->has_pid || !recv_msg->has_flow_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -EIRMD;
}
- fd = flow_init(recv_msg->port_id, recv_msg->pid, qc);
+ fd = flow_init(recv_msg->flow_id, recv_msg->pid,
+ qs == NULL ? qos_raw : *qs);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -617,8 +617,8 @@ int flow_alloc(const char * dst,
assert(ai.flows[fd].frcti == NULL);
- if (qc != QOS_CUBE_RAW) {
- ai.flows[fd].frcti = frcti_create(fd, qc);
+ if (ai.flows[fd].spec.in_order != 0) {
+ ai.flows[fd].frcti = frcti_create(fd);
if (ai.flows[fd].frcti == NULL) {
flow_fini(fd);
pthread_rwlock_unlock(&ai.lock);
@@ -640,15 +640,15 @@ int flow_dealloc(int fd)
return -EINVAL;
msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_port_id = true;
+ msg.has_flow_id = true;
msg.has_pid = true;
msg.pid = ai.pid;
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].port_id >= 0);
+ assert(ai.flows[fd].flow_id >= 0);
- msg.port_id = ai.flows[fd].port_id;
+ msg.flow_id = ai.flows[fd].flow_id;
pthread_rwlock_unlock(&ai.lock);
@@ -676,7 +676,6 @@ int fccntl(int fd,
int cmd,
...)
{
- uint16_t sflags;
uint32_t * fflags;
uint16_t * cflags;
va_list l;
@@ -696,7 +695,7 @@ int fccntl(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->port_id < 0) {
+ if (flow->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -768,13 +767,13 @@ int fccntl(int fd,
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->port_id,
+ flow->flow_id,
FLOW_DOWN);
} else {
rx_acl &= ~ACL_FLOWDOWN;
tx_acl &= ~ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->port_id,
+ flow->flow_id,
FLOW_UP);
}
@@ -788,11 +787,6 @@ int fccntl(int fd,
goto einval;
*fflags = flow->oflags;
break;
- case FRCTSFLAGS:
- sflags = (uint16_t) va_arg(l, int);
- if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags))
- goto eperm;
- break;
case FRCTGFLAGS:
cflags = (uint16_t *) va_arg(l, int *);
if (cflags == NULL)
@@ -824,16 +818,40 @@ int fccntl(int fd,
return -EPERM;
}
+static int chk_crc(struct shm_du_buff * sdb)
+{
+ uint32_t crc;
+ uint8_t * head = shm_du_buff_head(sdb);
+ uint8_t * tail = shm_du_buff_tail_release(sdb, CRCLEN);
+
+ mem_hash(HASH_CRC32, &crc, head, tail - head);
+
+ return !(crc == *((uint32_t *) tail));
+}
+
+static int add_crc(struct shm_du_buff * sdb)
+{
+ uint8_t * head = shm_du_buff_head(sdb);
+ uint8_t * tail = shm_du_buff_tail_alloc(sdb, CRCLEN);
+ if (tail == NULL)
+ return -1;
+
+ mem_hash(HASH_CRC32, tail, head, tail - head);
+
+ return 0;
+}
+
ssize_t flow_write(int fd,
const void * buf,
size_t count)
{
- struct flow * flow;
- ssize_t idx;
- int ret;
- int flags;
- struct timespec abs;
- struct timespec * abstime = NULL;
+ struct flow * flow;
+ ssize_t idx;
+ int ret;
+ int flags;
+ struct timespec abs;
+ struct timespec * abstime = NULL;
+ struct shm_du_buff * sdb;
if (buf == NULL)
return 0;
@@ -847,7 +865,7 @@ ssize_t flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->port_id < 0) {
+ if (flow->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -880,18 +898,25 @@ ssize_t flow_write(int fd,
if (idx < 0)
return idx;
- if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) {
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+ if (frcti_snd(flow->frcti, sdb) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -ENOMEM;
+ }
+
+ if (flow->spec.ber == 0 && add_crc(sdb) != 0) {
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
pthread_rwlock_rdlock(&ai.lock);
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ ret = shm_rbuff_write(flow->tx_rb, idx);
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -906,7 +931,7 @@ ssize_t flow_read(int fd,
{
ssize_t idx;
ssize_t n;
- uint8_t * sdu;
+ uint8_t * packet;
struct shm_rbuff * rb;
struct shm_du_buff * sdb;
struct timespec abs;
@@ -929,7 +954,7 @@ ssize_t flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->port_id < 0) {
+ if (flow->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -955,23 +980,25 @@ ssize_t flow_read(int fd,
if (idx < 0)
return idx;
sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ if (flow->spec.ber == 0 && chk_crc(sdb) != 0)
+ continue;
} while (frcti_rcv(flow->frcti, sdb) != 0);
}
}
- n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
+ n = shm_rdrbuff_read(&packet, ai.rdrb, idx);
assert(n >= 0);
if (n <= (ssize_t) count) {
- memcpy(buf, sdu, n);
+ memcpy(buf, packet, n);
shm_rdrbuff_remove(ai.rdrb, idx);
flow->part_idx = (partrd && n == (ssize_t) count) ?
DONE_PART : NO_PART;
return n;
} else {
if (partrd) {
- memcpy(buf, sdu, count);
+ memcpy(buf, packet, count);
sdb = shm_rdrbuff_get(ai.rdrb, idx);
shm_du_buff_head_release(sdb, n);
flow->part_idx = idx;
@@ -1053,7 +1080,7 @@ int fset_add(struct flow_set * set,
int fd)
{
int ret;
- size_t sdus;
+ size_t packets;
size_t i;
if (set == NULL || fd < 0 || fd > SYS_MAX_FLOWS)
@@ -1061,11 +1088,11 @@ int fset_add(struct flow_set * set,
pthread_rwlock_wrlock(&ai.lock);
- ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id);
+ ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);
- sdus = shm_rbuff_queued(ai.flows[fd].rx_rb);
- for (i = 0; i < sdus; i++)
- shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT);
+ packets = shm_rbuff_queued(ai.flows[fd].rx_rb);
+ for (i = 0; i < packets; i++)
+ shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1080,8 +1107,8 @@ void fset_del(struct flow_set * set,
pthread_rwlock_wrlock(&ai.lock);
- if (ai.flows[fd].port_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id);
+ if (ai.flows[fd].flow_id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id);
pthread_rwlock_unlock(&ai.lock);
}
@@ -1096,12 +1123,12 @@ bool fset_has(const struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (ai.flows[fd].flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return false;
}
- ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1);
+ ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1);
pthread_rwlock_unlock(&ai.lock);
@@ -1177,35 +1204,35 @@ int fevent(struct flow_set * set,
/* ipcp-dev functions. */
int np1_flow_alloc(pid_t n_pid,
- int port_id,
- qoscube_t qc)
+ int flow_id,
+ qosspec_t qs)
{
- return flow_init(port_id, n_pid, qc);
+ return flow_init(flow_id, n_pid, qs);
}
-int np1_flow_dealloc(int port_id)
+int np1_flow_dealloc(int flow_id)
{
int fd;
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[port_id].fd;
+ fd = ai.ports[flow_id].fd;
pthread_rwlock_unlock(&ai.lock);
return fd;
}
-int np1_flow_resp(int port_id)
+int np1_flow_resp(int flow_id)
{
int fd;
- if (port_wait_assign(port_id) != PORT_ID_ASSIGNED)
+ if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED)
return -1;
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[port_id].fd;
+ fd = ai.ports[flow_id].fd;
pthread_rwlock_unlock(&ai.lock);
@@ -1243,29 +1270,29 @@ int ipcp_create_r(pid_t pid,
int ipcp_flow_req_arr(pid_t pid,
const uint8_t * dst,
size_t len,
- qoscube_t qc)
+ qosspec_t qs)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg;
+ qosspec_msg_t qs_msg;
+ int fd;
assert(dst != NULL);
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_pid = true;
- msg.pid = pid;
- msg.has_hash = true;
- msg.hash.len = len;
- msg.hash.data = (uint8_t *) dst;
- msg.has_qoscube = true;
- msg.qoscube = qc;
+ msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg.has_pid = true;
+ msg.pid = pid;
+ msg.has_hash = true;
+ msg.hash.len = len;
+ msg.hash.data = (uint8_t *) dst;
+ qs_msg = spec_to_msg(&qs);
+ msg.qosspec = &qs_msg;
recv_msg = send_recv_irm_msg(&msg);
-
if (recv_msg == NULL)
return -EIRMD;
- if (!recv_msg->has_port_id || !recv_msg->has_pid) {
+ if (!recv_msg->has_flow_id || !recv_msg->has_pid) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -1275,7 +1302,7 @@ int ipcp_flow_req_arr(pid_t pid,
return -1;
}
- fd = flow_init(recv_msg->port_id, recv_msg->pid, qc);
+ fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -1292,11 +1319,11 @@ int ipcp_flow_alloc_reply(int fd,
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg.has_port_id = true;
+ msg.has_flow_id = true;
pthread_rwlock_rdlock(&ai.lock);
- msg.port_id = ai.flows[fd].port_id;
+ msg.flow_id = ai.flows[fd].flow_id;
pthread_rwlock_unlock(&ai.lock);
@@ -1333,7 +1360,7 @@ int ipcp_flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(flow->port_id >= 0);
+ assert(flow->flow_id >= 0);
rb = flow->rx_rb;
@@ -1352,6 +1379,8 @@ int ipcp_flow_read(int fd,
if (idx < 0)
return idx;
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ if (flow->spec.ber == 0 && chk_crc(*sdb) != 0)
+ continue;
} while (frcti_rcv(flow->frcti, *sdb) != 0);
return 0;
@@ -1371,7 +1400,7 @@ int ipcp_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(flow->port_id >= 0);
+ assert(flow->flow_id >= 0);
if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
pthread_rwlock_unlock(&ai.lock);
@@ -1387,9 +1416,15 @@ int ipcp_flow_write(int fd,
return -ENOMEM;
}
+ if (flow->spec.ber == 0 && add_crc(sdb) != 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -ENOMEM;
+ }
+
ret = shm_rbuff_write(flow->tx_rb, idx);
if (ret == 0)
- shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1435,7 +1470,7 @@ void ipcp_flow_fini(int fd)
shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].port_id,
+ ai.flows[fd].flow_id,
FLOW_DEALLOC);
rx_rb = ai.flows[fd].rx_rb;
@@ -1455,9 +1490,9 @@ int ipcp_flow_get_qoscube(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].port_id >= 0);
+ assert(ai.flows[fd].flow_id >= 0);
- *cube = ai.flows[fd].cube;
+ *cube = qos_spec_to_cube(ai.flows[fd].spec);
pthread_rwlock_unlock(&ai.lock);
@@ -1491,14 +1526,14 @@ int local_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->port_id < 0) {
+ if (flow->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
ret = shm_rbuff_write(flow->tx_rb, idx);
if (ret == 0)
- shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);