summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2023-09-20 08:57:15 +0200
committerSander Vrijders <sander@ouroboros.rocks>2023-09-20 18:51:19 +0200
commit2df952d68587a6422f7462eee55c0f42904df4c9 (patch)
tree0b1cb62fd48d27e80e7abea02069c7748d0b0bf5
parent3be8a872c638b81f39596a2fd5e9b0c43533e002 (diff)
downloadouroboros-2df952d68587a6422f7462eee55c0f42904df4c9.tar.gz
ouroboros-2df952d68587a6422f7462eee55c0f42904df4c9.zip
lib: Revise port construct in application
The application had a port construct, which is a leftover from the early days implementing RINA specs, which had a "port_id" to access flows. O7s doesn't really have a "port" concept, only flows. The port_wait_assign function was used in the IPCP to wait for the IRMd to assign the flow_id and return so the flow object could be created. This renames things a bit, and also simplifies the locking to us a single lock/condvar for managing flows. This should be further improved to move the flow state into the flow object, maintain a double mapping of to flow objects (id_to_flow and fd_to_flow) and malloc flow objects at flow allocation, instead of keeping the full table in memory at init to further reduce memory footprint. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--src/lib/dev.c175
1 files changed, 91 insertions, 84 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index da34f420..ef909e1a 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -74,20 +74,18 @@
#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
-enum port_state {
- PORT_NULL = 0,
- PORT_INIT,
- PORT_ID_PENDING,
- PORT_ID_ASSIGNED,
- PORT_DESTROY
+enum flow_state {
+ FLOW_NULL = 0,
+ FLOW_INIT,
+ FLOW_ID_PENDING,
+ FLOW_ID_ASSIGNED,
+ FLOW_DESTROY
};
-struct port {
+/* map flow_ids to flow descriptors; track state of the flow */
+struct fmap {
int fd;
-
- enum port_state state;
- pthread_mutex_t state_lock;
- pthread_cond_t state_cond;
+ enum flow_state state;
};
#define frcti_to_flow(frcti) \
@@ -139,9 +137,12 @@ struct {
struct bmp * fqueues;
struct flow * flows;
- struct port * ports;
+ struct fmap * id_to_fd;
struct list_head flow_list;
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+
pthread_t tx;
pthread_t rx;
size_t n_frcti;
@@ -150,77 +151,81 @@ struct {
pthread_rwlock_t lock;
} ai;
-static void port_destroy(struct port * p)
+static void flow_destroy(struct fmap * p)
{
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_DESTROY) {
- pthread_mutex_unlock(&p->state_lock);
+ if (p->state == FLOW_DESTROY) {
+ pthread_mutex_unlock(&ai.mtx);
return;
}
- if (p->state == PORT_ID_PENDING)
- p->state = PORT_DESTROY;
+ if (p->state == FLOW_ID_PENDING)
+ p->state = FLOW_DESTROY;
else
- p->state = PORT_NULL;
+ p->state = FLOW_NULL;
- pthread_cond_signal(&p->state_cond);
+ pthread_cond_signal(&ai.cond);
- while (p->state != PORT_NULL)
- pthread_cond_wait(&p->state_cond, &p->state_lock);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx);
+
+ while (p->state != FLOW_NULL)
+ pthread_cond_wait(&ai.cond, &ai.mtx);
p->fd = -1;
- p->state = PORT_INIT;
+ p->state = FLOW_INIT;
- pthread_mutex_unlock(&p->state_lock);
+ pthread_cleanup_pop(true);
}
-static void port_set_state(struct port * p,
- enum port_state state)
+static void flow_set_state(struct fmap * p,
+ enum flow_state state)
{
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_DESTROY) {
- pthread_mutex_unlock(&p->state_lock);
+ if (p->state == FLOW_DESTROY) {
+ pthread_mutex_unlock(&ai.mtx);
return;
}
p->state = state;
- pthread_cond_broadcast(&p->state_cond);
+ pthread_cond_broadcast(&ai.cond);
- pthread_mutex_unlock(&p->state_lock);
+ pthread_mutex_unlock(&ai.mtx);
}
-static enum port_state port_wait_assign(int flow_id)
+static enum flow_state flow_wait_assign(int flow_id)
{
- enum port_state state;
- struct port * p;
+ enum flow_state state;
+ struct fmap * p;
- p = &ai.ports[flow_id];
+ p = &ai.id_to_fd[flow_id];
- pthread_mutex_lock(&p->state_lock);
+ pthread_mutex_lock(&ai.mtx);
- if (p->state == PORT_ID_ASSIGNED) {
- pthread_mutex_unlock(&p->state_lock);
- return PORT_ID_ASSIGNED;
+ if (p->state == FLOW_ID_ASSIGNED) {
+ pthread_mutex_unlock(&ai.mtx);
+ return FLOW_ID_ASSIGNED;
}
- if (p->state == PORT_INIT)
- p->state = PORT_ID_PENDING;
+ if (p->state == FLOW_INIT)
+ p->state = FLOW_ID_PENDING;
- while (p->state == PORT_ID_PENDING)
- pthread_cond_wait(&p->state_cond, &p->state_lock);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx);
- if (p->state == PORT_DESTROY) {
- p->state = PORT_NULL;
- pthread_cond_broadcast(&p->state_cond);
+ while (p->state == FLOW_ID_PENDING)
+ pthread_cond_wait(&ai.cond, &ai.mtx);
+
+ if (p->state == FLOW_DESTROY) {
+ p->state = FLOW_NULL;
+ pthread_cond_broadcast(&ai.cond);
}
state = p->state;
- assert(state != PORT_INIT);
+ pthread_cleanup_pop(true);
- pthread_mutex_unlock(&p->state_lock);
+ assert(state != FLOW_INIT);
return state;
}
@@ -403,7 +408,7 @@ static void flow_fini(int fd)
}
if (ai.flows[fd].flow_id != -1) {
- port_destroy(&ai.ports[ai.flows[fd].flow_id]);
+ flow_destroy(&ai.id_to_fd[ai.flows[fd].flow_id]);
bmp_release(ai.fds, fd);
}
@@ -500,9 +505,9 @@ static int flow_init(int flow_id,
list_add_tail(&flow->next, &ai.flow_list);
- ai.ports[flow_id].fd = fd;
+ ai.id_to_fd[flow_id].fd = fd;
- port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED);
+ flow_set_state(&ai.id_to_fd[flow_id], FLOW_ID_ASSIGNED);
pthread_rwlock_unlock(&ai.lock);
@@ -592,30 +597,34 @@ static void init(int argc,
}
ai.flows = malloc(sizeof(*ai.flows) * PROG_MAX_FLOWS);
- if (ai.flows == NULL)
+ if (ai.flows == NULL) {
+ fprintf(stderr, "FATAL: Could not allocate flows.");
goto fail_flows;
+ }
for (i = 0; i < PROG_MAX_FLOWS; ++i)
flow_clear(i);
- ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS);
- if (ai.ports == NULL)
- goto fail_ports;
+ ai.id_to_fd = malloc(sizeof(*ai.id_to_fd) * SYS_MAX_FLOWS);
+ if (ai.id_to_fd == NULL) {
+ fprintf(stderr, "FATAL: Could not allocate id_to_fd.");
+ goto fail_id_to_fd;
+ }
+
+ for (i = 0; i < SYS_MAX_FLOWS; ++i)
+ ai.id_to_fd[i].state = FLOW_INIT;
- for (i = 0; i < SYS_MAX_FLOWS; ++i) {
- ai.ports[i].state = PORT_INIT;
- if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) {
- fprintf(stderr, "FATAL: Could not init lock %d.", i);
- goto fail_flow_lock;
- }
- if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) {
- pthread_mutex_destroy(&ai.ports[i].state_lock);
- fprintf(stderr, "FATAL: Could not init cond %d.", i);
- goto fail_flow_lock;
- }
- } /* Do not reuse i after this ! */
+ if (pthread_mutex_init(&ai.mtx, NULL)) {
+ fprintf(stderr, "FATAL: Could not init mutex.");
+ goto fail_mtx;
+ }
- if (pthread_rwlock_init(&ai.lock, NULL)) {
+ if (pthread_cond_init(&ai.cond, NULL) < 0) {
+ fprintf(stderr, "FATAL: Could not init condvar.");
+ goto fail_cond;
+ }
+
+ if (pthread_rwlock_init(&ai.lock, NULL) < 0) {
fprintf(stderr, "FATAL: Could not initialize flow lock");
goto fail_flow_lock;
}
@@ -668,12 +677,12 @@ static void init(int argc,
fail_fqset:
pthread_rwlock_destroy(&ai.lock);
fail_flow_lock:
- while (i-- > 0) {
- pthread_mutex_destroy(&ai.ports[i].state_lock);
- pthread_cond_destroy(&ai.ports[i].state_cond);
- }
- free(ai.ports);
- fail_ports:
+ pthread_cond_destroy(&ai.cond);
+ fail_cond:
+ pthread_mutex_destroy(&ai.mtx);
+ fail_mtx:
+ free(ai.id_to_fd);
+ fail_id_to_fd:
free(ai.flows);
fail_flows:
shm_rdrbuff_close(ai.rdrb);
@@ -709,10 +718,8 @@ static void fini(void)
}
}
- for (i = 0; i < SYS_MAX_FLOWS; ++i) {
- pthread_mutex_destroy(&ai.ports[i].state_lock);
- pthread_cond_destroy(&ai.ports[i].state_cond);
- }
+ pthread_cond_destroy(&ai.cond);
+ pthread_mutex_destroy(&ai.mtx);
pthread_rwlock_unlock(&ai.lock);
@@ -729,7 +736,7 @@ static void fini(void)
pthread_rwlock_destroy(&ai.lock);
free(ai.flows);
- free(ai.ports);
+ free(ai.id_to_fd);
shm_rdrbuff_close(ai.rdrb);
@@ -1701,7 +1708,7 @@ static int fqueue_filter(struct fqueue * fq)
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[fq->fqueue[fq->next].flow_id].fd;
+ fd = ai.id_to_fd[fq->fqueue[fq->next].flow_id].fd;
if (fd < 0) {
++fq->next;
pthread_rwlock_unlock(&ai.lock);
@@ -1762,7 +1769,7 @@ int fqueue_next(struct fqueue * fq)
e = fq->fqueue + fq->next;
- fd = ai.ports[e->flow_id].fd;
+ fd = ai.id_to_fd[e->flow_id].fd;
++fq->next;
@@ -1842,7 +1849,7 @@ int np1_flow_dealloc(int flow_id,
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[flow_id].fd;
+ fd = ai.id_to_fd[flow_id].fd;
pthread_rwlock_unlock(&ai.lock);
@@ -1853,12 +1860,12 @@ int np1_flow_resp(int flow_id)
{
int fd;
- if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED)
+ if (flow_wait_assign(flow_id) != FLOW_ID_ASSIGNED)
return -1;
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[flow_id].fd;
+ fd = ai.id_to_fd[flow_id].fd;
pthread_rwlock_unlock(&ai.lock);