summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/irmd/main.c40
-rw-r--r--src/lib/shm_rbuff.c10
-rw-r--r--src/tools/cbr/cbr_server.c2
3 files changed, 44 insertions, 8 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 13bfa052..d8cb27fa 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -30,6 +30,7 @@
#include <ouroboros/utils.h>
#include <ouroboros/irm_config.h>
#include <ouroboros/lockfile.h>
+#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rbuff.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/bitmap.h>
@@ -103,6 +104,18 @@ struct irm {
pthread_t shm_sanitize;
} * irmd;
+static void clear_irm_flow(struct irm_flow * f) {
+ ssize_t idx;
+
+ assert(f);
+
+ while ((idx = shm_rbuff_read(f->n_rb)) >= 0)
+ shm_rdrbuff_remove(irmd->rdrb, idx);
+
+ while ((idx = shm_rbuff_read(f->n_1_rb)) >= 0)
+ shm_rdrbuff_remove(irmd->rdrb, idx);
+}
+
static struct irm_flow * get_irm_flow(int port_id)
{
struct list_head * pos = NULL;
@@ -1170,6 +1183,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
list_del(&f->next);
+ clear_irm_flow(f);
bmp_release(irmd->port_ids, f->port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1259,6 +1273,7 @@ static int flow_dealloc(pid_t api, int port_id)
if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) {
list_del(&f->next);
+ clear_irm_flow(f);
irm_flow_destroy(f);
bmp_release(irmd->port_ids, port_id);
LOG_INFO("Completed deallocation of port_id %d by AP-I %d.",
@@ -1458,6 +1473,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
if (e == NULL) {
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
+ clear_irm_flow(f);
bmp_release(irmd->port_ids, f->port_id);
list_del(&f->next);
pthread_rwlock_unlock(&irmd->flows_lock);
@@ -1525,6 +1541,7 @@ static void irm_destroy(void)
list_for_each_safe(p, h, &irmd->irm_flows) {
struct irm_flow * f = list_entry(p, struct irm_flow, next);
list_del(&f->next);
+ clear_irm_flow(f);
irm_flow_destroy(f);
}
@@ -1734,28 +1751,41 @@ void * irm_sanitize(void * o)
if (irm_flow_get_state(f) == FLOW_ALLOC_PENDING
&& ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
- bmp_release(irmd->port_ids, f->port_id);
list_del(&f->next);
LOG_DBG("Pending port_id %d timed out.",
f->port_id);
+ clear_irm_flow(f);
ipcp_flow_dealloc(f->n_1_api, f->port_id);
+ bmp_release(irmd->port_ids, f->port_id);
irm_flow_destroy(f);
continue;
}
if (kill(f->n_api, 0) < 0) {
- bmp_release(irmd->port_ids, f->port_id);
- list_del(&f->next);
+ struct shm_flow_set * set;
LOG_DBG("AP-I %d gone, flow %d deallocated.",
f->n_api, f->port_id);
+ set = shm_flow_set_open(f->n_api);
+ if (set != NULL)
+ shm_flow_set_destroy(set);
+ f->n_api = -1;
+ irm_flow_set_state(f, FLOW_DEALLOC_PENDING);
ipcp_flow_dealloc(f->n_1_api, f->port_id);
- irm_flow_destroy(f);
+ clear_irm_flow(f);
continue;
}
+
if (kill(f->n_1_api, 0) < 0) {
+ struct shm_flow_set * set;
list_del(&f->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
f->n_1_api, f->port_id);
+ set = shm_flow_set_open(f->n_api);
+ if (set != NULL)
+ shm_flow_set_destroy(set);
+
+ clear_irm_flow(f);
+ bmp_release(irmd->port_ids, f->port_id);
irm_flow_destroy(f);
}
}
@@ -2064,7 +2094,6 @@ static int irm_create(void)
if (kill(lockfile_owner(irmd->lf), 0) < 0) {
LOG_INFO("IRMd didn't properly shut down last time.");
- /* FIXME: do this for each QOS_CUBE in the system */
shm_rdrbuff_destroy(shm_rdrbuff_open());
LOG_INFO("Stale resources cleaned");
lockfile_destroy(irmd->lf);
@@ -2083,7 +2112,6 @@ static int irm_create(void)
return -1;
}
- /* FIXME: create an rdrb for each QOS_CUBE in the system */
if ((irmd->rdrb = shm_rdrbuff_create()) == NULL) {
irm_destroy();
return -1;
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c
index 1e97364c..5d6d30c7 100644
--- a/src/lib/shm_rbuff.c
+++ b/src/lib/shm_rbuff.c
@@ -224,9 +224,15 @@ void shm_rbuff_destroy(struct shm_rbuff * rb)
{
char fn[FN_MAX_CHARS];
- if (rb == NULL)
- return;
+ assert(rb);
+
+#ifdef CONFIG_OUROBOROS_DEBUG
+ pthread_mutex_lock(rb->lock);
+ assert(shm_rbuff_empty(rb));
+
+ pthread_mutex_unlock(rb->lock);
+#endif
sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id);
if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c
index eb45a2c6..52c2aa73 100644
--- a/src/tools/cbr/cbr_server.c
+++ b/src/tools/cbr/cbr_server.c
@@ -121,6 +121,8 @@ void handle_flow(int fd)
ts_add(&iv_start, &intv, &iv_end);
}
}
+
+ flow_dealloc(fd);
}
void * worker(void * o)