summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c206
1 files changed, 128 insertions, 78 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 1310afd8..9b41e5d4 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -60,7 +60,7 @@ struct flow_set {
};
struct fqueue {
- int fqueue[SHM_BUFFER_SIZE]; /* safe copy from shm */
+ int fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
size_t fqsize;
size_t next;
};
@@ -236,7 +236,7 @@ static int api_announce(char * ap_name)
return ret;
}
-/* Call under flows lock */
+/* Call under flows lock. */
static int finalize_write(int fd,
size_t idx)
{
@@ -264,8 +264,6 @@ static int frcti_init(int fd)
frcti->rcv_lwe = 0;
frcti->rcv_rwe = 0;
- frcti->conf_flags = CONF_ERROR_CHECK;
-
return 0;
}
@@ -284,25 +282,12 @@ static void frcti_fini(int fd)
frcti_clear(fd);
}
-static int frcti_configure(int fd,
- qosspec_t * qos)
+static int frcti_send(int fd,
+ struct frct_pci * pci,
+ struct shm_du_buff * sdb)
{
- /* FIXME: Send configuration message here to other side. */
-
- (void) fd;
- (void) qos;
-
- return 0;
-}
-
-static int frcti_write(int fd,
- struct shm_du_buff * sdb)
-{
- struct frcti * frcti;
- struct frct_pci pci;
struct timespec now = {0, 0};
-
- memset(&pci, 0, sizeof(pci));
+ struct frcti * frcti;
frcti = &(ai.frcti[fd]);
@@ -316,16 +301,15 @@ static int frcti_write(int fd,
/* Set the DRF in the first packet of a new run of SDUs. */
if (frcti->snd_drf) {
- pci.flags |= FLAG_DATA_RUN;
+ pci->flags |= FLAG_DATA_RUN;
frcti->snd_drf = false;
}
frcti->last_snd = now;
- pci.seqno = frcti->snd_lwe++;
- pci.type |= PDU_TYPE_DATA;
+ pci->seqno = frcti->snd_lwe++;
- if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) {
+ if (frct_pci_ser(sdb, pci, frcti->conf_flags & CONF_ERROR_CHECK)) {
pthread_rwlock_unlock(&ai.lock);
return -1;
}
@@ -340,6 +324,65 @@ static int frcti_write(int fd,
return 0;
}
+
+static int frcti_configure(int fd,
+ qosspec_t * qos)
+{
+ struct frcti * frcti;
+ struct frct_pci pci;
+ struct shm_du_buff * sdb;
+
+ frcti = &(ai.frcti[fd]);
+
+ memset(&pci, 0, sizeof(pci));
+
+ if (qos == NULL)
+ return 0;
+
+ if (ipcp_sdb_reserve(&sdb, 0))
+ return -1;
+
+ if (qos->resource_control)
+ pci.conf_flags |= CONF_RESOURCE_CONTROL;
+ if (qos->reliable)
+ pci.conf_flags |= CONF_RELIABLE;
+ if (qos->error_check)
+ pci.conf_flags |= CONF_ERROR_CHECK;
+ if (qos->ordered)
+ pci.conf_flags |= CONF_ORDERED;
+ if (qos->partial)
+ pci.conf_flags |= CONF_PARTIAL;
+
+ /* Always set the DRF on a configure message. */
+ pci.flags |= FLAG_DATA_RUN;
+ pci.type |= PDU_TYPE_CONFIG;
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ frcti->conf_flags = pci.conf_flags;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ if (frcti_send(fd, &pci, sdb)) {
+ shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
+ return -1;
+ }
+
+ return 0;
+}
+
+static int frcti_write(int fd,
+ struct shm_du_buff * sdb)
+{
+ struct frct_pci pci;
+
+ memset(&pci, 0, sizeof(pci));
+
+ pci.type |= PDU_TYPE_DATA;
+
+ return frcti_send(fd, &pci, sdb);
+}
+
static ssize_t frcti_read(int fd)
{
ssize_t idx = -1;
@@ -349,69 +392,77 @@ static ssize_t frcti_read(int fd)
struct shm_du_buff * sdb;
struct timespec now = {0, 0};
- pthread_rwlock_rdlock(&ai.lock);
+ do {
+ pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_rbuff_read(ai.flows[fd].rx_rb);
- pthread_rwlock_unlock(&ai.lock);
- } else {
- struct shm_rbuff * rb = ai.flows[fd].rx_rb;
- bool timeo = ai.flows[fd].timesout;
- struct timespec timeout = ai.flows[fd].rcv_timeo;
+ if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
+ pthread_rwlock_unlock(&ai.lock);
+ } else {
+ struct shm_rbuff * rb = ai.flows[fd].rx_rb;
+ bool timeo = ai.flows[fd].timesout;
+ struct timespec timeout = ai.flows[fd].rcv_timeo;
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&ai.lock);
- if (timeo) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, &timeout, &abstime);
- idx = shm_rbuff_read_b(rb, &abstime);
- } else {
- idx = shm_rbuff_read_b(rb, NULL);
+ if (timeo) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, &timeout, &abstime);
+ idx = shm_rbuff_read_b(rb, &abstime);
+ } else {
+ idx = shm_rbuff_read_b(rb, NULL);
+ }
}
- }
- if (idx < 0)
- return idx;
+ if (idx < 0)
+ return idx;
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
- frcti = &(ai.frcti[fd]);
+ frcti = &(ai.frcti[fd]);
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
- /* SDU may be corrupted. */
- if (frct_pci_des(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) {
- pthread_rwlock_unlock(&ai.lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
- }
+ /* SDU may be corrupted. */
+ if (frct_pci_des(sdb, &pci,
+ frcti->conf_flags & CONF_ERROR_CHECK)) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -EAGAIN;
+ }
- /* Check if receiver inactivity is true. */
- if (!frcti->rcv_drf && ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
- frcti->rcv_drf = true;
+ /* Check if receiver inactivity is true. */
+ if (!frcti->rcv_drf &&
+ ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
+ frcti->rcv_drf = true;
- /* We don't accept packets when there is receiver inactivity. */
- if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
- pthread_rwlock_unlock(&ai.lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
- }
+ /* We don't accept packets when there is receiver inactivity. */
+ if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
+ pthread_rwlock_unlock(&ai.lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -EAGAIN;
+ }
- /*
- * If there is no receiver inactivity and the DRF is set,
- * reset the state of the connection.
- */
- if (pci.flags & FLAG_DATA_RUN)
- frcti->rcv_lwe = pci.seqno;
+ /* If the DRF is set, reset the state of the connection. */
+ if (pci.flags & FLAG_DATA_RUN)
+ frcti->rcv_lwe = pci.seqno;
- if (frcti->rcv_drf)
- frcti->rcv_drf = false;
+ if (pci.type & PDU_TYPE_CONFIG)
+ frcti->conf_flags = pci.conf_flags;
- frcti->last_rcv = now;
+ if (frcti->rcv_drf)
+ frcti->rcv_drf = false;
- pthread_rwlock_unlock(&ai.lock);
+ frcti->last_rcv = now;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ if (!(pci.type & PDU_TYPE_DATA))
+ shm_rdrbuff_remove(ai.rdrb, idx);
+
+ } while (!(pci.type & PDU_TYPE_DATA));
return idx;
}
@@ -768,15 +819,14 @@ int flow_alloc(const char * dst_name,
frcti_init(fd);
+ pthread_rwlock_unlock(&ai.lock);
+
if (frcti_configure(fd, qs)) {
flow_fini(fd);
bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
return -1;
}
- pthread_rwlock_unlock(&ai.lock);
-
return fd;
}
@@ -977,7 +1027,7 @@ ssize_t flow_write(int fd,
return idx;
}
- } else { /* blocking */
+ } else { /* Blocking. */
pthread_rwlock_unlock(&ai.lock);
idx = shm_rdrbuff_write_b(ai.rdrb,
@@ -1053,7 +1103,7 @@ ssize_t flow_read(int fd,
return n;
}
-/* fqueue functions */
+/* fqueue functions. */
struct flow_set * flow_set_create()
{
@@ -1240,7 +1290,7 @@ int flow_event_wait(struct flow_set * set,
return ret;
}
-/* ipcp-dev functions */
+/* ipcp-dev functions. */
int np1_flow_alloc(pid_t n_api,
int port_id,