summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c71
1 files changed, 50 insertions, 21 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index efd08146..df616ead 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1017,18 +1017,22 @@ ssize_t flow_write(int fd,
memcpy(ptr, buf, count);
+ pthread_rwlock_wrlock(&ai.lock);
+
if (frcti_snd(flow->frcti, sdb) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
- pthread_rwlock_wrlock(&ai.lock);
- if (flow->qs.cypher_s > 0)
+ if (flow->qs.cypher_s > 0) {
if (crypt_encrypt(flow, sdb) < 0) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
+ }
+
pthread_rwlock_unlock(&ai.lock);
if (flow->qs.ber == 0 && add_crc(sdb) != 0) {
@@ -1097,12 +1101,12 @@ ssize_t flow_read(int fd,
abstime = &abs;
}
- pthread_rwlock_unlock(&ai.lock);
-
idx = flow->part_idx;
+
if (idx < 0) {
- idx = frcti_queued_pdu(flow->frcti);
- while (idx < 0) {
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+
idx = noblock ? shm_rbuff_read(rb) :
shm_rbuff_read_b(rb, abstime);
if (idx < 0)
@@ -1110,20 +1114,28 @@ ssize_t flow_read(int fd,
sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
+ pthread_rwlock_rdlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
continue;
}
+ pthread_rwlock_rdlock(&ai.lock);
+
if (flow->qs.cypher_s > 0
&& crypt_decrypt(flow, sdb) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
- idx = frcti_rcv(flow->frcti, sdb);
+ frcti_rcv(flow->frcti, sdb);
}
}
+ frcti_tick(flow->frcti);
+
+ pthread_rwlock_unlock(&ai.lock);
+
n = shm_rdrbuff_read(&packet, ai.rdrb, idx);
assert(n >= 0);
@@ -1144,7 +1156,9 @@ ssize_t flow_read(int fd,
memcpy(buf, packet, count);
sdb = shm_rdrbuff_get(ai.rdrb, idx);
shm_du_buff_head_release(sdb, n);
+ pthread_rwlock_wrlock(&ai.lock);
flow->part_idx = idx;
+ pthread_rwlock_unlock(&ai.lock);
return count;
} else {
shm_rdrbuff_remove(ai.rdrb, idx);
@@ -1295,6 +1309,11 @@ int fqueue_next(struct fqueue * fq)
pthread_rwlock_rdlock(&ai.lock);
+ if (fq->next != 0 && frcti_filter(fq) == 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -EPERM;
+ }
+
fd = ai.ports[fq->fqueue[fq->next]].fd;
fq->next += 2;
@@ -1319,7 +1338,7 @@ ssize_t fevent(struct flow_set * set,
struct fqueue * fq,
const struct timespec * timeo)
{
- ssize_t ret;
+ ssize_t ret = 0;
struct timespec abstime;
struct timespec * t = NULL;
@@ -1335,18 +1354,22 @@ ssize_t fevent(struct flow_set * set,
t = &abstime;
}
- ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
- if (ret == -ETIMEDOUT) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
- }
+ while (ret == 0) {
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
+ if (ret == -ETIMEDOUT) {
+ fq->fqsize = 0;
+ return -ETIMEDOUT;
+ }
- fq->fqsize = ret << 1;
- fq->next = 0;
+ fq->fqsize = ret << 1;
+ fq->next = 0;
+
+ ret = frcti_filter(fq);
+ }
assert(ret);
- return ret;
+ return 1;
}
/* ipcp-dev functions. */
@@ -1509,7 +1532,7 @@ int ipcp_flow_read(int fd,
{
struct flow * flow;
struct shm_rbuff * rb;
- ssize_t idx;
+ ssize_t idx = -1;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(sdb);
@@ -1522,20 +1545,26 @@ int ipcp_flow_read(int fd,
rb = flow->rx_rb;
- pthread_rwlock_unlock(&ai.lock);
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
- idx = frcti_queued_pdu(flow->frcti);
- while (idx < 0) {
idx = shm_rbuff_read(rb);
if (idx < 0)
return idx;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(*sdb) != 0)
continue;
- idx = frcti_rcv(flow->frcti, *sdb);
+ frcti_rcv(flow->frcti, *sdb);
}
+ frcti_tick(flow->frcti);
+
+ pthread_rwlock_unlock(&ai.lock);
+
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
return 0;