summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/dev.c66
-rw-r--r--src/lib/qosspec.proto17
-rw-r--r--src/lib/sockets.c2
3 files changed, 57 insertions, 28 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 4c21fcdf..5c57a538 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -68,7 +68,6 @@
#define SECMEMSZ 16384
#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
-#define FLOWTIMEO 120 /* seconds */
enum port_state {
PORT_NULL = 0,
@@ -123,7 +122,8 @@ struct flow_set_entry {
struct flow_set {
size_t idx;
- struct timespec chk; /* Last keepalive check */
+ struct timespec chk; /* Last keepalive check. */
+ uint32_t min; /* Minimum keepalive time in set. */
struct list_head flows;
pthread_rwlock_t lock;
@@ -1056,6 +1056,7 @@ static int flow_keepalive(int fd)
struct timespec r_act;
struct flow * flow;
int flow_id;
+ uint32_t timeo;
flow = &ai.flows[fd];
@@ -1067,15 +1068,19 @@ static int flow_keepalive(int fd)
r_act = flow->rcv_act;
flow_id = flow->flow_id;
+ timeo = flow->qs.timeout;
pthread_rwlock_unlock(&ai.lock);
- if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) {
- shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT);
- return -EFLOWDOWN;
+ if (timeo == 0)
+ return 0;
+
+ if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
+ shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
+ return -EFLOWPEER;
}
- if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION)
+ if (ts_diff_ns(&s_act, &now) > (timeo >> 2) * MILLION)
flow_send_keepalive(fd);
return 0;
@@ -1140,7 +1145,7 @@ ssize_t flow_write(int fd,
return -ETIMEDOUT;
if (flow_keepalive(fd))
- return -EFLOWDOWN;
+ return -EFLOWPEER;
frcti_tick(flow->frcti);
@@ -1165,14 +1170,16 @@ ssize_t flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (count != 0 && frcti_snd(flow->frcti, sdb) < 0)
- goto enomem;
+ if (count > 0) {
+ if (frcti_snd(flow->frcti, sdb) < 0)
+ goto enomem;
- if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
- goto enomem;
+ if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
+ goto enomem;
- if (flow->qs.ber == 0 && add_crc(sdb) != 0)
- goto enomem;
+ if (flow->qs.ber == 0 && add_crc(sdb) != 0)
+ goto enomem;
+ }
pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
@@ -1263,7 +1270,7 @@ ssize_t flow_read(int fd,
return -ETIMEDOUT;
if (flow_keepalive(fd) < 0)
- return -EFLOWDOWN;
+ return -EFLOWPEER;
ts_add(&tictime, &tic, &tictime);
@@ -1360,6 +1367,7 @@ struct flow_set * fset_create()
goto fail_bmp_alloc;
set->chk = now;
+ set->min = UINT32_MAX;
pthread_rwlock_unlock(&ai.lock);
@@ -1437,6 +1445,7 @@ void fset_zero(struct flow_set * set)
int fset_add(struct flow_set * set,
int fd)
{
+ struct flow * flow;
struct flow_set_entry * fse;
int ret;
size_t packets;
@@ -1445,13 +1454,17 @@ int fset_add(struct flow_set * set,
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return -EINVAL;
+ flow = &ai.flows[fd];
+
fse = malloc(sizeof(*fse));
if (fse == NULL)
return -ENOMEM;
- pthread_rwlock_wrlock(&ai.lock);
+ fse->fd = fd;
- if (ai.flows[fd].flow_id < 0) {
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (flow->flow_id < 0) {
ret = -EINVAL;
goto fail;
}
@@ -1462,6 +1475,9 @@ int fset_add(struct flow_set * set,
pthread_rwlock_wrlock(&set->lock);
+ if (flow->qs.timeout != 0 && flow->qs.timeout < set->min)
+ set->min = flow->qs.timeout;
+
list_add_tail(&fse->next, &set->flows);
pthread_rwlock_unlock(&set->lock);
@@ -1485,14 +1501,20 @@ void fset_del(struct flow_set * set,
{
struct list_head * p;
struct list_head * h;
+ struct flow * flow;
+ uint32_t min;
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return;
+ flow = &ai.flows[fd];
+
+ min = UINT32_MAX;
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id);
+ if (flow->flow_id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
pthread_rwlock_wrlock(&set->lock);
@@ -1502,10 +1524,14 @@ void fset_del(struct flow_set * set,
if (e->fd == fd) {
list_del(&e->next);
free(e);
- break;
+ } else {
+ if (flow->qs.timeout != 0 && flow->qs.timeout < min)
+ min = flow->qs.timeout;
}
}
+ set->min = min;
+
pthread_rwlock_unlock(&set->lock);
pthread_rwlock_unlock(&ai.lock);
@@ -1544,7 +1570,7 @@ static void fset_keepalive(struct flow_set * set)
pthread_rwlock_wrlock(&set->lock);
- if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) {
+ if (ts_diff_ns(&now, &set->chk) < set->min >> 2) {
pthread_rwlock_unlock(&set->lock);
return;
}
diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto
index 8a355363..3ceedd87 100644
--- a/src/lib/qosspec.proto
+++ b/src/lib/qosspec.proto
@@ -23,12 +23,13 @@
syntax = "proto2";
message qosspec_msg {
- required uint32 delay = 1; /* In ms */
- required uint64 bandwidth = 2; /* In bits/s */
- required uint32 availability = 3; /* Class of 9s */
- required uint32 loss = 4; /* Packet loss */
- required uint32 ber = 5; /* Bit error rate, ppb */
- required uint32 in_order = 6; /* In-order delivery */
- required uint32 max_gap = 7; /* In ms */
- required uint32 cypher_s = 8; /* Crypto strength in bits */
+ required uint32 delay = 1; /* In ms. */
+ required uint64 bandwidth = 2; /* In bits/s. */
+ required uint32 availability = 3; /* Class of 9s. */
+ required uint32 loss = 4; /* Packet loss. */
+ required uint32 ber = 5; /* Bit error rate, ppb. */
+ required uint32 in_order = 6; /* In-order delivery. */
+ required uint32 max_gap = 7; /* In ms. */
+ required uint32 cypher_s = 8; /* Crypto strength in bits. */
+ required uint32 timeout = 9; /* Timeout in ms. */
};
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 8179d2b3..48e95121 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -181,6 +181,7 @@ qosspec_msg_t spec_to_msg(const qosspec_t * qs)
msg.in_order = spec.in_order;
msg.max_gap = spec.max_gap;
msg.cypher_s = spec.cypher_s;
+ msg.timeout = spec.timeout;
return msg;
}
@@ -199,6 +200,7 @@ qosspec_t msg_to_spec(const qosspec_msg_t * msg)
spec.in_order = msg->in_order;
spec.max_gap = msg->max_gap;
spec.cypher_s = msg->cypher_s;
+ spec.timeout = msg->timeout;
return spec;
}