summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/shim-udp/main.c2
-rw-r--r--src/irmd/main.c66
2 files changed, 61 insertions, 7 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index f583a998..186f0ebc 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -910,7 +910,7 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
SO_REUSEADDR,
&enable,
sizeof(int)) < 0)
- LOG_WARN("Setsockopt(SO_REUSEADDR) failed.");
+ LOG_WARN("Failed to set SO_REUSEADDR.");
memset((char *) &s_saddr, 0, sizeof(s_saddr));
shim_data(_ipcp)->s_saddr.sin_family = AF_INET;
diff --git a/src/irmd/main.c b/src/irmd/main.c
index db96b6ed..c20d63db 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -37,6 +37,7 @@
#include <ouroboros/flow.h>
#include <ouroboros/qos.h>
#include <ouroboros/rw_lock.h>
+#include <ouroboros/time_utils.h>
#include "utils.h"
@@ -50,13 +51,19 @@
#include <pthread.h>
#ifndef IRMD_MAX_FLOWS
- #define IRMD_MAX_FLOWS 4096
+#define IRMD_MAX_FLOWS 4096
#endif
#ifndef IRMD_THREADPOOL_SIZE
- #define IRMD_THREADPOOL_SIZE 3
+#define IRMD_THREADPOOL_SIZE 3
#endif
+#ifndef IRMD_FLOW_TIMEOUT
+#define IRMD_FLOW_TIMEOUT 5000 /* ms */
+#endif
+
+#define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */
+
struct ipcp_entry {
struct list_head next;
instance_name_t * api;
@@ -98,6 +105,7 @@ struct port_map_entry {
pthread_mutex_t res_lock;
enum flow_state state;
+ struct timespec t0;
};
struct irm {
@@ -116,6 +124,8 @@ struct irm {
pthread_t * threadpool;
int sockfd;
rw_lock_t state_lock;
+
+ pthread_t cleanup_flows;
} * instance = NULL;
static struct port_map_entry * port_map_entry_create()
@@ -139,6 +149,9 @@ static struct port_map_entry * port_map_entry_create()
return NULL;
}
+ e->t0.tv_sec = 0;
+ e->t0.tv_nsec = 0;
+
return e;
}
@@ -945,6 +958,8 @@ static struct port_map_entry * flow_alloc(pid_t pid,
pme->n_pid = pid;
pme->state = FLOW_PENDING;
+ if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)
+ LOG_WARN("Failed to set timestamp.");
rw_lock_rdlock(&instance->state_lock);
rw_lock_rdlock(&instance->reg_lock);
@@ -1314,6 +1329,8 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)
}
+ pthread_cancel(instance->cleanup_flows);
+
rw_lock_unlock(&instance->state_lock);
case SIGPIPE:
@@ -1323,6 +1340,43 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)
}
}
+void * irm_flow_cleaner()
+{
+ struct timespec now;
+ struct list_head * pos = NULL;
+ struct list_head * n = NULL;
+ struct timespec timeout = {IRMD_CLEANUP_TIMER / BILLION,
+ IRMD_CLEANUP_TIMER % BILLION};
+
+ while (true) {
+ if(clock_gettime(CLOCK_MONOTONIC, &now) < 0)
+ LOG_WARN("Failed to get time.");
+ /* cleanup stale PENDING flows */
+ rw_lock_rdlock(&instance->state_lock);
+
+ list_for_each_safe(pos, n, &(instance->port_map)) {
+ struct port_map_entry * e =
+ list_entry(pos, struct port_map_entry, next);
+
+ pthread_mutex_lock(&e->res_lock);
+
+ if (e->state == FLOW_PENDING &&
+ ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) {
+ LOG_DBGF("Flow time exceeded on port ID %d.",
+ e->port_id);
+ e->state = FLOW_NULL;
+ pthread_cond_broadcast(&e->res_signal);
+ }
+
+ pthread_mutex_unlock(&e->res_lock);
+ }
+
+ rw_lock_unlock(&instance->state_lock);
+
+ nanosleep(&timeout, NULL);
+ }
+}
+
void * mainloop()
{
uint8_t buf[IRM_MSG_BUF_SIZE];
@@ -1579,10 +1633,8 @@ int main()
if (instance == NULL)
return 1;
- /*
- * FIXME: we need a main loop that delegates messages to subthreads in a
- * way that avoids all possible deadlocks for local apps
- */
+
+ pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL);
for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
pthread_create(&instance->threadpool[t], NULL, mainloop, NULL);
@@ -1591,6 +1643,8 @@ int main()
for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
pthread_join(instance->threadpool[t], NULL);
+ pthread_join(instance->cleanup_flows, NULL);
+
irm_destroy(instance);
return 0;