summaryrefslogtreecommitdiff
path: root/src/irmd
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-09-18 06:27:43 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-04 15:16:00 +0200
commitc96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (patch)
treeacd08f09f5a094e897020e97961b2847209df043 /src/irmd
parent2e561a634ae3e747b293a4e05eaf44726968dc1a (diff)
downloadouroboros-c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f.tar.gz
ouroboros-c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f.zip
lib, ipcp: Revise fast path and flow interfaces
IPCPs can now use ap_init() to initialize the memory. All flows are accessed using flow descriptors, this greatly simplifies IPCP development. Reverts the fast path to a single ap_rbuff per process. Splits lib/ipcp into irmd/ipcp and lib/ipcp-dev. Adds a lib/shim-dev holding tailored functions for shims. Moves the buffer_t to utils.h. Fixes the shim-eth-llc length field. Removes the flow from shared.h. Fixes #4 Fixes #5
Diffstat (limited to 'src/irmd')
-rw-r--r--src/irmd/CMakeLists.txt1
-rw-r--r--src/irmd/ipcp.c402
-rw-r--r--src/irmd/ipcp.h62
-rw-r--r--src/irmd/irm_flow.c47
-rw-r--r--src/irmd/irm_flow.h18
-rw-r--r--src/irmd/main.c176
-rw-r--r--src/irmd/utils.h3
7 files changed, 582 insertions, 127 deletions
diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt
index 05919326..16b53414 100644
--- a/src/irmd/CMakeLists.txt
+++ b/src/irmd/CMakeLists.txt
@@ -8,6 +8,7 @@ set(SOURCE_FILES
# Add source files here
api_table.c
apn_table.c
+ ipcp.c
irm_flow.c
main.c
registry.c
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
new file mode 100644
index 00000000..f79e6caf
--- /dev/null
+++ b/src/irmd/ipcp.c
@@ -0,0 +1,402 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The API to instruct IPCPs
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/config.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/utils.h>
+#include <ouroboros/sockets.h>
+
+#define OUROBOROS_PREFIX "irmd/ipcp"
+
+#include <ouroboros/logs.h>
+
+#include "ipcp.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+
+static void close_ptr(void * o)
+{
+ close(*(int *) o);
+}
+
+ipcp_msg_t * send_recv_ipcp_msg(pid_t api, ipcp_msg_t * msg)
+{
+ int sockfd = 0;
+ buffer_t buf;
+ char * sock_path = NULL;
+ ssize_t count = 0;
+ ipcp_msg_t * recv_msg = NULL;
+
+ struct timeval tv = {(SOCKET_TIMEOUT / 1000),
+ (SOCKET_TIMEOUT % 1000) * 1000};
+
+ sock_path = ipcp_sock_path(api);
+ if (sock_path == NULL)
+ return NULL;
+
+ sockfd = client_socket_open(sock_path);
+ if (sockfd < 0) {
+ free(sock_path);
+ return NULL;
+ }
+
+ if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
+ (void *) &tv, sizeof(tv)))
+ LOG_WARN("Failed to set timeout on socket.");
+
+ free(sock_path);
+
+ buf.len = ipcp_msg__get_packed_size(msg);
+ if (buf.len == 0) {
+ close(sockfd);
+ return NULL;
+ }
+
+ buf.data = malloc(IPCP_MSG_BUF_SIZE);
+ if (buf.data == NULL) {
+ close(sockfd);
+ return NULL;
+ }
+
+ pthread_cleanup_push(close_ptr, (void *) &sockfd);
+ pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data);
+
+ ipcp_msg__pack(msg, buf.data);
+
+ if (write(sockfd, buf.data, buf.len) != -1)
+ count = read(sockfd, buf.data, IPCP_MSG_BUF_SIZE);
+
+ if (count > 0)
+ recv_msg = ipcp_msg__unpack(NULL, count, buf.data);
+
+ pthread_cleanup_pop(true);
+ pthread_cleanup_pop(true);
+
+ return recv_msg;
+}
+
+pid_t ipcp_create(enum ipcp_type ipcp_type)
+{
+ pid_t api = -1;
+ char irmd_api[10];
+ size_t len = 0;
+ char * ipcp_dir = "/sbin/";
+ char * full_name = NULL;
+ char * exec_name = NULL;
+ char * log_file = NULL;
+
+ sprintf(irmd_api, "%u", getpid());
+
+ api = fork();
+ if (api == -1) {
+ LOG_ERR("Failed to fork");
+ return api;
+ }
+
+ if (api != 0) {
+ return api;
+ }
+
+ if (ipcp_type == IPCP_NORMAL)
+ exec_name = IPCP_NORMAL_EXEC;
+ else if (ipcp_type == IPCP_SHIM_UDP)
+ exec_name = IPCP_SHIM_UDP_EXEC;
+ else if (ipcp_type == IPCP_SHIM_ETH_LLC)
+ exec_name = IPCP_SHIM_ETH_LLC_EXEC;
+ else if (ipcp_type == IPCP_LOCAL)
+ exec_name = IPCP_LOCAL_EXEC;
+ else
+ exit(EXIT_FAILURE);
+
+ len += strlen(INSTALL_PREFIX);
+ len += strlen(ipcp_dir);
+ len += strlen(exec_name);
+ len += 1;
+
+ full_name = malloc(len + 1);
+ if (full_name == NULL) {
+ LOG_ERR("Failed to malloc");
+ exit(EXIT_FAILURE);
+ }
+
+ strcpy(full_name, INSTALL_PREFIX);
+ strcat(full_name, ipcp_dir);
+ strcat(full_name, exec_name);
+ full_name[len] = '\0';
+
+ if (logfile != NULL) {
+ log_file = malloc(20);
+ if (log_file == NULL) {
+ LOG_ERR("Failed to malloc.");
+ exit(EXIT_FAILURE);
+ }
+ sprintf(log_file, "ipcpd-%u.log", getpid());
+ }
+
+ /* log_file to be placed at the end */
+ char * argv[] = {full_name,
+ irmd_api,
+ log_file,
+ 0};
+
+ char * envp[] = {0};
+
+ execve(argv[0], &argv[0], envp);
+
+ LOG_DBG("%s", strerror(errno));
+ LOG_ERR("Failed to load IPCP daemon");
+ LOG_ERR("Make sure to run the installed version");
+ free(full_name);
+ exit(EXIT_FAILURE);
+}
+
+int ipcp_destroy(pid_t api)
+{
+ int status;
+
+ if (kill(api, SIGTERM)) {
+ LOG_ERR("Failed to destroy IPCP");
+ return -1;
+ }
+
+ if (waitpid(api, &status, 0) < 0) {
+ LOG_ERR("Failed to destroy IPCP");
+ return -1;
+ }
+
+ return 0;
+}
+
+int ipcp_bootstrap(pid_t api,
+ dif_config_msg_t * conf)
+{
+ ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ if (conf == NULL)
+ return -EINVAL;
+
+ msg.code = IPCP_MSG_CODE__IPCP_BOOTSTRAP;
+ msg.conf = conf;
+
+ recv_msg = send_recv_ipcp_msg(api, &msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_enroll(pid_t api,
+ char * dif_name)
+{
+ ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ if (dif_name == NULL)
+ return -EINVAL;
+
+ msg.code = IPCP_MSG_CODE__IPCP_ENROLL;
+ msg.dif_name = dif_name;
+
+ recv_msg = send_recv_ipcp_msg(api, &msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_name_reg(pid_t api,
+ char * name)
+{
+ ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ if (name == NULL)
+ return -1;
+
+ msg.code = IPCP_MSG_CODE__IPCP_NAME_REG;
+ msg.name = name;
+
+ recv_msg = send_recv_ipcp_msg(api, &msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_name_unreg(pid_t api,
+ char * name)
+{
+ ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ msg.code = IPCP_MSG_CODE__IPCP_NAME_UNREG;
+ msg.name = name;
+
+ recv_msg = send_recv_ipcp_msg(api, &msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_flow_alloc(pid_t api,
+ int port_id,
+ pid_t n_api,
+ char * dst_name,
+ char * src_ae_name,
+ enum qos_cube qos)
+{
+ ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ if (dst_name == NULL || src_ae_name == NULL)
+ return -EINVAL;
+
+ msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC;
+ msg.has_port_id = true;
+ msg.port_id = port_id;
+ msg.has_api = true;
+ msg.api = n_api;
+ msg.src_ae_name = src_ae_name;
+ msg.dst_name = dst_name;
+ msg.has_qos_cube = true;
+ msg.qos_cube = qos;
+
+ recv_msg = send_recv_ipcp_msg(api, &msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (!recv_msg->has_result) {
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_flow_alloc_resp(pid_t api,
+ int port_id,
+ pid_t n_api,
+ int response)
+{
+ ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP;
+ msg.has_port_id = true;
+ msg.port_id = port_id;
+ msg.has_api = true;
+ msg.api = n_api;
+ msg.has_response = true;
+ msg.response = response;
+
+ recv_msg = send_recv_ipcp_msg(api, &msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_flow_dealloc(pid_t api,
+ int port_id)
+{
+
+ ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC;
+ msg.has_port_id = true;
+ msg.port_id = port_id;
+
+ recv_msg = send_recv_ipcp_msg(api, &msg);
+ if (recv_msg == NULL)
+ return 0;
+
+ if (recv_msg->has_result == false) {
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+ return 0;
+ }
+
+ ret = recv_msg->result;
+ ipcp_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h
new file mode 100644
index 00000000..930695fa
--- /dev/null
+++ b/src/irmd/ipcp.h
@@ -0,0 +1,62 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The API for the IRM to instruct IPCPs
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/irm_config.h>
+#include <ouroboros/sockets.h>
+#include <ouroboros/shared.h>
+
+#include <sys/types.h>
+
+#ifndef OUROBOROS_IPCP_H
+#define OUROBOROS_IPCP_H
+
+/* Returns the process id */
+pid_t ipcp_create(enum ipcp_type ipcp_type);
+
+int ipcp_destroy(pid_t api);
+
+int ipcp_enroll(pid_t api,
+ char * dif_name);
+
+int ipcp_bootstrap(pid_t api,
+ dif_config_msg_t * conf);
+
+int ipcp_name_reg(pid_t api,
+ char * name);
+int ipcp_name_unreg(pid_t api,
+ char * name);
+
+int ipcp_flow_alloc(pid_t api,
+ int port_id,
+ pid_t n_api,
+ char * dst_name,
+ char * src_ae_name,
+ enum qos_cube qos);
+int ipcp_flow_alloc_resp(pid_t api,
+ int port_id,
+ pid_t n_api,
+ int response);
+
+int ipcp_flow_dealloc(pid_t api,
+ int port_id);
+
+#endif /* OUROBOROS_IPCP_H */
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index d9fe3fb3..b99c6f97 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -58,6 +58,11 @@ void irm_flow_destroy(struct irm_flow * f)
{
pthread_mutex_lock(&f->state_lock);
+ if (f->state == FLOW_DESTROY) {
+ pthread_mutex_unlock(&f->state_lock);
+ return;
+ }
+
if (f->state == FLOW_PENDING)
f->state = FLOW_DESTROY;
else
@@ -75,3 +80,45 @@ void irm_flow_destroy(struct irm_flow * f)
free(f);
}
+
+enum flow_state irm_flow_get_state(struct irm_flow * f)
+{
+ enum flow_state state;
+
+ pthread_mutex_lock(&f->state_lock);
+
+ state = f->state;
+
+ pthread_mutex_unlock(&f->state_lock);
+
+ return state;
+}
+
+void irm_flow_set_state(struct irm_flow * f, enum flow_state state)
+{
+ pthread_mutex_lock(&f->state_lock);
+
+ f->state = state;
+ pthread_cond_broadcast(&f->state_cond);
+
+ pthread_mutex_unlock(&f->state_lock);
+}
+
+enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state)
+{
+ pthread_mutex_lock(&f->state_lock);
+
+ while (!(f->state == state || f->state == FLOW_DESTROY))
+ pthread_cond_wait(&f->state_cond, &f->state_lock);
+
+ if (state == FLOW_DESTROY) {
+ f->state = FLOW_NULL;
+ pthread_cond_broadcast(&f->state_cond);
+ }
+
+ state = f->state;
+
+ pthread_mutex_unlock(&f->state_lock);
+
+ return state;
+}
diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h
index b7e5a1be..db6598bf 100644
--- a/src/irmd/irm_flow.h
+++ b/src/irmd/irm_flow.h
@@ -24,12 +24,18 @@
#define OUROBOROS_IRMD_IRM_FLOW_H
#include <ouroboros/list.h>
-#include <ouroboros/shared.h>
#include <sys/types.h>
#include <pthread.h>
#include <time.h>
+enum flow_state {
+ FLOW_NULL = 0,
+ FLOW_PENDING,
+ FLOW_ALLOCATED,
+ FLOW_DESTROY
+};
+
struct irm_flow {
struct list_head next;
@@ -46,6 +52,16 @@ struct irm_flow {
};
struct irm_flow * irm_flow_create();
+
void irm_flow_destroy(struct irm_flow * f);
+enum flow_state irm_flow_get_state(struct irm_flow * f);
+
+
+void irm_flow_set_state(struct irm_flow * f,
+ enum flow_state state);
+
+enum flow_state irm_flow_wait_state(struct irm_flow * f,
+ enum flow_state state);
+
#endif /* OUROBOROS_IRMD_IRM_FLOW_H */
diff --git a/src/irmd/main.c b/src/irmd/main.c
index cc9160bf..523741ef 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -21,14 +21,9 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "irmd"
-
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
-#include <ouroboros/logs.h>
#include <ouroboros/sockets.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/nsm.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
#include <ouroboros/irm_config.h>
@@ -36,14 +31,19 @@
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/bitmap.h>
-#include <ouroboros/flow.h>
#include <ouroboros/qos.h>
#include <ouroboros/time_utils.h>
+#define OUROBOROS_PREFIX "irmd"
+
+#include <ouroboros/logs.h>
+
+
#include "utils.h"
#include "registry.h"
#include "irm_flow.h"
#include "api_table.h"
+#include "ipcp.h"
#include <sys/socket.h>
#include <sys/un.h>
@@ -60,10 +60,12 @@
struct ipcp_entry {
struct list_head next;
+
char * name;
pid_t api;
enum ipcp_type type;
char * dif_name;
+
pthread_cond_t init_cond;
pthread_mutex_t init_lock;
bool init;
@@ -100,7 +102,7 @@ struct irm {
pthread_t irm_sanitize;
pthread_t shm_sanitize;
-} * irmd = NULL;
+} * irmd;
static struct irm_flow * get_irm_flow(int port_id)
{
@@ -108,7 +110,6 @@ static struct irm_flow * get_irm_flow(int port_id)
list_for_each(pos, &irmd->irm_flows) {
struct irm_flow * e = list_entry(pos, struct irm_flow, next);
-
if (e->port_id == port_id)
return e;
}
@@ -122,7 +123,6 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api)
list_for_each(pos, &irmd->irm_flows) {
struct irm_flow * e = list_entry(pos, struct irm_flow, next);
-
if (e->n_api == n_api)
return e;
}
@@ -965,8 +965,7 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name)
return NULL;
}
- LOG_INFO("New instance (%d) of %s added.", api, e->apn);
-
+ LOG_DBG("New instance (%d) of %s added.", api, e->apn);
LOG_DBG("This instance accepts flows for:");
list_for_each(p, &e->names) {
struct str_el * s = list_entry(p, struct str_el, next);
@@ -1053,8 +1052,8 @@ static int flow_alloc_resp(pid_t n_api,
struct api_entry * e = NULL;
int ret = -1;
- pid_t f_n_1_api;
- pid_t f_n_api;
+ pid_t api_n1;
+ pid_t api_n;
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -1107,21 +1106,17 @@ static int flow_alloc_resp(pid_t n_api,
return -1;
}
- f_n_api = f->n_api;
- f_n_1_api = f->n_1_api;
-
- if (!response) {
- f->state = FLOW_ALLOCATED;
- pthread_cond_signal(&f->state_cond);
- }
+ api_n = f->n_api;
+ api_n1 = f->n_1_api;
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- ret = ipcp_flow_alloc_resp(f_n_1_api,
- port_id,
- f_n_api,
- response);
+ ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response);
+
+ if (!(response || ret))
+ irm_flow_set_state(f, FLOW_ALLOCATED);
+
return ret;
}
@@ -1132,6 +1127,7 @@ static struct irm_flow * flow_alloc(pid_t api,
{
struct irm_flow * f;
pid_t ipcp;
+ int port_id;
/* FIXME: Map qos_spec to qos_cube */
@@ -1151,6 +1147,7 @@ static struct irm_flow * flow_alloc(pid_t api,
f->n_api = api;
f->state = FLOW_PENDING;
+
if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
LOG_WARN("Failed to set timestamp.");
@@ -1167,7 +1164,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- f->port_id = bmp_allocate(irmd->port_ids);
+ port_id = f->port_id = bmp_allocate(irmd->port_ids);
f->n_1_api = ipcp;
list_add(&f->next, &irmd->irm_flows);
@@ -1175,19 +1172,15 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- if (ipcp_flow_alloc(ipcp,
- f->port_id,
- f->n_api,
- dst_name,
- src_ae_name,
- QOS_CUBE_BE) < 0) {
+ if (ipcp_flow_alloc(ipcp, port_id, api,
+ dst_name, src_ae_name, QOS_CUBE_BE) < 0) {
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
list_del(&f->next);
bmp_release(irmd->port_ids, f->port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- free(f);
+ irm_flow_destroy(f);
return NULL;
}
@@ -1208,20 +1201,20 @@ static int flow_alloc_res(int port_id)
f = get_irm_flow(port_id);
if (f == NULL) {
- LOG_ERR("Could not find port %d.", port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
+ LOG_ERR("Could not find port %d.", port_id);
return -1;
}
- if (f->state == FLOW_NULL) {
- LOG_INFO("Port %d is deprecated.", port_id);
+ if (irm_flow_get_state(f) == FLOW_NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
+ LOG_INFO("Port %d is deprecated.", port_id);
return -1;
}
- if (f->state == FLOW_ALLOCATED) {
+ if (irm_flow_get_state(f) == FLOW_ALLOCATED) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
@@ -1230,35 +1223,13 @@ static int flow_alloc_res(int port_id)
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- pthread_mutex_lock(&f->state_lock);
-
- while (f->state == FLOW_PENDING)
- pthread_cond_wait(&f->state_cond, &f->state_lock);
-
- pthread_mutex_unlock(&f->state_lock);
-
- pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
- pthread_mutex_lock(&f->state_lock);
-
- if (f->state == FLOW_ALLOCATED) {
- pthread_cond_broadcast(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
+ if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED)
return 0;
- }
-
- f->state = FLOW_NULL;
- pthread_cond_broadcast(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
-static int flow_dealloc(int port_id)
+static int flow_dealloc(pid_t api, int port_id)
{
pid_t n_1_api;
int ret = 0;
@@ -1282,7 +1253,8 @@ static int flow_dealloc(int port_id)
pthread_rwlock_unlock(&irmd->flows_lock);
- ret = ipcp_flow_dealloc(n_1_api, port_id);
+ if (api != n_1_api)
+ ret = ipcp_flow_dealloc(n_1_api, port_id);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1340,6 +1312,9 @@ static struct irm_flow * flow_req_arr(pid_t api,
struct pid_el * c_api;
pid_t h_api = -1;
+ LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.",
+ api, dst_name, ae_name);
+
f = irm_flow_create();
if (f == NULL) {
LOG_ERR("Failed to create irm_flow.");
@@ -1490,8 +1465,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
return f;
}
-static int flow_alloc_reply(int port_id,
- int response)
+static int flow_alloc_reply(int port_id, int response)
{
struct irm_flow * f;
@@ -1505,18 +1479,10 @@ static int flow_alloc_reply(int port_id,
return -1;
}
- pthread_mutex_lock(&f->state_lock);
-
if (!response)
- f->state = FLOW_ALLOCATED;
-
+ irm_flow_set_state(f, FLOW_ALLOCATED);
else
- f->state = FLOW_NULL;
-
- if (pthread_cond_signal(&f->state_cond))
- LOG_ERR("Failed to send signal.");
-
- pthread_mutex_unlock(&f->state_lock);
+ irm_flow_set_state(f, FLOW_NULL);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1524,30 +1490,6 @@ static int flow_alloc_reply(int port_id,
return 0;
}
-static int flow_dealloc_ipcp(int port_id)
-{
- struct irm_flow * f = NULL;
-
- pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
-
- f = get_irm_flow(port_id);
- if (f == NULL) {
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- return 0;
- }
-
- list_del(&f->next);
-
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
-
- irm_flow_destroy(f);
-
- return 0;
-}
-
static void irm_destroy()
{
struct list_head * p;
@@ -1729,46 +1671,35 @@ void * irm_sanitize()
struct irm_flow * f =
list_entry(p, struct irm_flow, next);
- pthread_mutex_lock(&f->state_lock);
-
- if (f->state == FLOW_PENDING &&
- ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
+ if (irm_flow_get_state(f) == FLOW_PENDING
+ && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
LOG_INFO("Pending port_id %d timed out.",
f->port_id);
- f->state = FLOW_NULL;
- pthread_cond_signal(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
+ irm_flow_set_state(f, FLOW_NULL);
continue;
}
- pthread_mutex_unlock(&f->state_lock);
-
if (kill(f->n_api, 0) < 0) {
- struct shm_ap_rbuff * n_rb =
- shm_ap_rbuff_open_s(f->n_api);
+ struct shm_ap_rbuff * rb =
+ shm_ap_rbuff_open(f->n_api);
bmp_release(irmd->port_ids, f->port_id);
-
list_del(&f->next);
LOG_INFO("AP-I %d gone, flow %d deallocated.",
f->n_api, f->port_id);
ipcp_flow_dealloc(f->n_1_api, f->port_id);
- if (n_rb != NULL)
- shm_ap_rbuff_destroy(n_rb);
+ if (rb != NULL)
+ shm_ap_rbuff_destroy(rb);
irm_flow_destroy(f);
continue;
}
if (kill(f->n_1_api, 0) < 0) {
- struct shm_ap_rbuff * n_1_rb_s =
- shm_ap_rbuff_open_s(f->n_1_api);
- struct shm_ap_rbuff * n_1_rb_n =
- shm_ap_rbuff_open_n(f->n_1_api);
+ struct shm_ap_rbuff * rb =
+ shm_ap_rbuff_open(f->n_1_api);
list_del(&f->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
f->n_1_api, f->port_id);
- if (n_1_rb_n != NULL)
- shm_ap_rbuff_destroy(n_1_rb_n);
- if (n_1_rb_s != NULL)
- shm_ap_rbuff_destroy(n_1_rb_s);
+ if (rb != NULL)
+ shm_ap_rbuff_destroy(rb);
irm_flow_destroy(f);
}
}
@@ -1939,7 +1870,7 @@ void * mainloop()
break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_dealloc(msg->port_id);
+ ret_msg.result = flow_dealloc(msg->api, msg->port_id);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
e = flow_req_arr(msg->api,
@@ -1950,7 +1881,6 @@ void * mainloop()
ret_msg.result = -1;
break;
}
- /* FIXME: badly timed dealloc may give SEGV */
ret_msg.has_port_id = true;
ret_msg.port_id = e->port_id;
ret_msg.has_api = true;
@@ -1961,10 +1891,6 @@ void * mainloop()
ret_msg.result = flow_alloc_reply(msg->port_id,
msg->response);
break;
- case IRM_MSG_CODE__IPCP_FLOW_DEALLOC:
- ret_msg.has_result = true;
- ret_msg.result = flow_dealloc_ipcp(msg->port_id);
- break;
default:
LOG_ERR("Don't know that message code.");
break;
diff --git a/src/irmd/utils.h b/src/irmd/utils.h
index 37c745af..2fbc8ef2 100644
--- a/src/irmd/utils.h
+++ b/src/irmd/utils.h
@@ -40,7 +40,8 @@ struct pid_el {
pid_t pid;
};
-int wildcard_match(const char * pattern, const char * string);
+int wildcard_match(const char * pattern,
+ const char * string);
/* functions for copying and destroying arguments list */
char ** argvdup(char ** argv);