diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-03-31 09:58:23 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-03-31 09:58:23 +0000 |
commit | ad01a7fd0b6cd798b2d5a2901ae8499b25360707 (patch) | |
tree | 16b6fd66c3fe93d178e10a137179923b513851f9 | |
parent | 5f79a21b80e68ba59616f0fa431287c3e94c43cf (diff) | |
parent | 7ba0fd0ce19244745c8d2512ce8a003783d914a7 (diff) | |
download | ouroboros-ad01a7fd0b6cd798b2d5a2901ae8499b25360707.tar.gz ouroboros-ad01a7fd0b6cd798b2d5a2901ae8499b25360707.zip |
Merged in dstaesse/ouroboros/be-new-api (pull request #439)
lib: Revise flow allocation API
-rw-r--r-- | include/ouroboros/dev.h | 23 | ||||
-rw-r--r-- | src/ipcpd/normal/connmgr.c | 15 | ||||
-rw-r--r-- | src/irmd/ipcp.c | 7 | ||||
-rw-r--r-- | src/irmd/irm_flow.c | 12 | ||||
-rw-r--r-- | src/irmd/main.c | 203 | ||||
-rw-r--r-- | src/lib/dev.c | 193 | ||||
-rw-r--r-- | src/lib/irmd_messages.proto | 18 | ||||
-rw-r--r-- | src/lib/shm_flow_set.c | 3 | ||||
-rw-r--r-- | src/tools/cbr/cbr_client.c | 10 | ||||
-rw-r--r-- | src/tools/cbr/cbr_server.c | 38 | ||||
-rw-r--r-- | src/tools/echo/echo_client.c | 12 | ||||
-rw-r--r-- | src/tools/echo/echo_server.c | 22 | ||||
-rw-r--r-- | src/tools/operf/operf_client.c | 8 | ||||
-rw-r--r-- | src/tools/operf/operf_server.c | 8 | ||||
-rw-r--r-- | src/tools/oping/oping_client.c | 11 | ||||
-rw-r--r-- | src/tools/oping/oping_server.c | 13 |
16 files changed, 184 insertions, 412 deletions
diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index e92cdd1c..4984736c 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -24,6 +24,7 @@ #include <ouroboros/qos.h> #include <unistd.h> +#include <time.h> #ifndef OUROBOROS_DEV_H #define OUROBOROS_DEV_H @@ -33,20 +34,14 @@ int ap_init(const char * ap_name); void ap_fini(void); -/* Returns flow descriptor (> 0) and qos spec. */ -int flow_accept(qosspec_t * spec); +/* Returns flow descriptor, qs updates to supplied QoS. */ +int flow_alloc(const char * dst_name, + qosspec_t * qs, + struct timespec * timeo); -int flow_alloc_resp(int fd, - int response); - -/* - * Returns flow descriptor (> 0). - * On returning, spec will contain the actual supplied QoS. - */ -int flow_alloc(const char * dst_name, - qosspec_t * spec); - -int flow_alloc_res(int fd); +/* Returns flow descriptor, qs updates to supplied QoS. */ +int flow_accept(qosspec_t * qs, + struct timespec * timeo); int flow_dealloc(int fd); @@ -58,4 +53,4 @@ ssize_t flow_read(int fd, void * buf, size_t count); -#endif +#endif /* OUROBOROS_DEV_H */ diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index b8314917..8068d173 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -126,18 +126,13 @@ static void * flow_acceptor(void * o) pthread_rwlock_unlock(&ipcpi.state_lock); - fd = flow_accept(&qs); + fd = flow_accept(&qs, NULL); if (fd < 0) { if (fd != -EIRMD) log_warn("Flow accept failed: %d", fd); continue; } - if (flow_alloc_resp(fd, 0)) { - log_err("Failed to respond to flow alloc request."); - continue; - } - if (cacep_rcv(fd, &rcv_info)) { log_err("Error establishing application connection."); flow_dealloc(fd); @@ -286,7 +281,7 @@ int connmgr_alloc(struct ae * ae, memset(&conn->conn_info, 0, sizeof(conn->conn_info)); - conn->flow_info.fd = flow_alloc(dst_name, qs); + conn->flow_info.fd = flow_alloc(dst_name, qs, NULL); if (conn->flow_info.fd < 0) { log_err("Failed to allocate flow to %s.", dst_name); return -1; @@ -297,12 +292,6 @@ int connmgr_alloc(struct ae * ae, else memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs)); - if (flow_alloc_res(conn->flow_info.fd)) { - log_err("Flow allocation to %s failed.", dst_name); - flow_dealloc(conn->flow_info.fd); - return -1; - } - if (cacep_snd(conn->flow_info.fd, &ae->info)) { log_err("Failed to create application connection."); flow_dealloc(conn->flow_info.fd); diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 06b66d3b..a8263580 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -176,18 +176,11 @@ pid_t ipcp_create(char * name, int ipcp_destroy(pid_t api) { - int status; - if (kill(api, SIGTERM)) { log_err("Failed to destroy IPCP"); return -1; } - if (waitpid(api, &status, 0) < 0) { - log_err("Failed to destroy IPCP"); - return -1; - } - return 0; } diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index 99966561..4e7c22ef 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -45,6 +45,7 @@ struct irm_flow * irm_flow_create(pid_t n_api, } if (pthread_mutex_init(&f->state_lock, NULL)) { + pthread_cond_destroy(&f->state_cond); free(f); return NULL; } @@ -63,6 +64,9 @@ struct irm_flow * irm_flow_create(pid_t n_api, f->n_1_rb = shm_rbuff_create(n_1_api, port_id); if (f->n_1_rb == NULL) { log_err("Could not create ringbuffer for AP-I %d.", n_1_api); + shm_rbuff_destroy(f->n_rb); + pthread_mutex_destroy(&f->state_lock); + pthread_cond_destroy(&f->state_cond); free(f); return NULL; } @@ -122,7 +126,8 @@ enum flow_state irm_flow_get_state(struct irm_flow * f) return state; } -void irm_flow_set_state(struct irm_flow * f, enum flow_state state) +void irm_flow_set_state(struct irm_flow * f, + enum flow_state state) { assert(f); assert(state != FLOW_DESTROY); @@ -135,7 +140,8 @@ void irm_flow_set_state(struct irm_flow * f, enum flow_state state) pthread_mutex_unlock(&f->state_lock); } -enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state) +enum flow_state irm_flow_wait_state(struct irm_flow * f, + enum flow_state state) { assert(f); assert(state != FLOW_NULL); @@ -143,6 +149,8 @@ enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state) pthread_mutex_lock(&f->state_lock); + assert(f->state != FLOW_NULL); + while (!(f->state == state || f->state == FLOW_DESTROY)) pthread_cond_wait(&f->state_cond, &f->state_lock); diff --git a/src/irmd/main.c b/src/irmd/main.c index 9901a608..c7adf386 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -133,7 +133,8 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api) list_for_each(pos, &irmd->irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->n_api == n_api) + if (e->n_api == n_api && + irm_flow_get_state(e) == FLOW_ALLOC_PENDING) return e; } @@ -982,7 +983,12 @@ static struct irm_flow * flow_accept(pid_t api, struct irm_flow * f = NULL; struct api_entry * e = NULL; struct reg_entry * re = NULL; - struct list_head * p; + struct list_head * p = NULL; + + pid_t api_n1; + pid_t api_n; + int port_id; + int ret; pthread_rwlock_rdlock(&irmd->state_lock); @@ -1016,7 +1022,7 @@ static struct irm_flow * flow_accept(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - while (api_entry_sleep(e) == -ETIMEDOUT) { + while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) { pthread_rwlock_rdlock(&irmd->state_lock); if (irmd->state != IRMD_RUNNING) { pthread_rwlock_unlock(&irmd->state_lock); @@ -1025,126 +1031,76 @@ static struct irm_flow * flow_accept(pid_t api, pthread_rwlock_unlock(&irmd->state_lock); } - pthread_rwlock_rdlock(&irmd->state_lock); - - if (irmd->state != IRMD_RUNNING) { - reg_entry_set_state(re, REG_NAME_NULL); - pthread_rwlock_unlock(&irmd->state_lock); + if (ret == -1) { + /* The process died, we can exit here. */ return NULL; } - pthread_rwlock_rdlock(&irmd->reg_lock); + pthread_rwlock_rdlock(&irmd->state_lock); - e = api_table_get(&irmd->api_table, api); - if (e == NULL) { - pthread_rwlock_unlock(&irmd->reg_lock); + if (irmd->state != IRMD_RUNNING) { + reg_entry_set_state(re, REG_NAME_NULL); pthread_rwlock_unlock(&irmd->state_lock); - log_dbg("Process gone while accepting flow."); return NULL; } - pthread_mutex_lock(&e->state_lock); - - re = e->re; - - pthread_mutex_unlock(&e->state_lock); - - if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) { - pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_unlock(&irmd->state_lock); - log_err("Entry in wrong state."); - return NULL; - } - pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_rdlock(&irmd->flows_lock); f = get_irm_flow_n(api); if (f == NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - log_err("Port_id was not created yet."); + log_warn("Port_id was not created yet."); return NULL; } *cube = re->qos; + api_n = f->n_api; + api_n1 = f->n_1_api; + port_id = f->port_id; + log_info("Flow on port_id %d allocated.", f->port_id); pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - - return f; -} - -static int flow_alloc_resp(pid_t n_api, - int port_id, - int response) -{ - struct irm_flow * f = NULL; - struct reg_entry * re = NULL; - struct api_entry * e = NULL; - int ret = -1; - - pid_t api_n1; - pid_t api_n; - - pthread_rwlock_rdlock(&irmd->state_lock); - - if (irmd->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&irmd->state_lock); - return -1; - } - - pthread_rwlock_wrlock(&irmd->reg_lock); + pthread_rwlock_rdlock(&irmd->reg_lock); - e = api_table_get(&irmd->api_table, n_api); + e = api_table_get(&irmd->api_table, api); if (e == NULL) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - log_err("Unknown AP-I %d responding for port_id %d.", - n_api, port_id); - return -1; + ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); + log_dbg("Process gone while accepting flow."); + return NULL; } + pthread_mutex_lock(&e->state_lock); + re = e->re; - if (re == NULL) { - pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_unlock(&irmd->state_lock); - log_err("AP-I %d is not handling a flow request.", n_api); - return -1; - } + + pthread_mutex_unlock(&e->state_lock); if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) { pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_unlock(&irmd->state_lock); - log_err("Name %s has no pending flow request.", re->name); - return -1; + ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); + log_err("Entry in wrong state."); + return NULL; } - registry_del_api(&irmd->registry, n_api); + registry_del_api(&irmd->registry, api); pthread_rwlock_unlock(&irmd->reg_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - - f = get_irm_flow(port_id); - if (f == NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - return -1; - } - - api_n = f->n_api; - api_n1 = f->n_1_api; - - pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response); + if (ipcp_flow_alloc_resp(api_n1, port_id, api_n, 0)) { + log_dbg("Failed to respond to alloc."); + return NULL; + } - if (!(response || ret)) - irm_flow_set_state(f, FLOW_ALLOCATED); + irm_flow_set_state(f, FLOW_ALLOCATED); - return ret; + return f; } static struct irm_flow * flow_alloc(pid_t api, @@ -1196,6 +1152,8 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); + assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); + if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) { pthread_rwlock_rdlock(&irmd->state_lock); @@ -1210,54 +1168,16 @@ static struct irm_flow * flow_alloc(pid_t api, return NULL; } - return f; -} - -static int flow_alloc_res(int port_id) -{ - struct irm_flow * f; - - pthread_rwlock_rdlock(&irmd->state_lock); - - if (irmd->state != IRMD_RUNNING) { - pthread_rwlock_unlock(&irmd->state_lock); - return -1; - } - pthread_rwlock_rdlock(&irmd->flows_lock); - - f = get_irm_flow(port_id); - if (f == NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - log_err("Could not find port %d.", port_id); - return -1; - } - - if (irm_flow_get_state(f) == FLOW_NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - log_info("Port %d is deprecated.", port_id); - return -1; - } - - if (irm_flow_get_state(f) == FLOW_ALLOCATED) { - log_info("Flow on port_id %d allocated.", port_id); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - return 0; + if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) { + log_info("Pending flow on port_id %d torn down.", port_id); + return NULL; } - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - - if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) { - log_info("Flow on port_id %d allocated.", port_id); - return 0; - } + assert(irm_flow_get_state(f) == FLOW_ALLOCATED); - log_info("Pending flow on port_id %d torn down.", port_id); + log_info("Flow on port_id %d allocated.", port_id); - return -1; + return f; } static int flow_dealloc(pid_t api, @@ -1293,6 +1213,9 @@ static int flow_dealloc(pid_t api, if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) { list_del(&f->next); + if ((kill(f->n_api, 0) < 0 && f->n_1_api == -1) || + (kill (f->n_1_api, 0) < 0 && f->n_api == -1)) + irm_flow_set_state(f, FLOW_NULL); clear_irm_flow(f); irm_flow_destroy(f); bmp_release(irmd->port_ids, port_id); @@ -1305,12 +1228,11 @@ static int flow_dealloc(pid_t api, } pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); if (n_1_api != -1) ret = ipcp_flow_dealloc(n_1_api, port_id); - pthread_rwlock_unlock(&irmd->state_lock); - return ret; } @@ -1501,7 +1423,7 @@ static int flow_alloc_reply(int port_id, struct irm_flow * f; pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); + pthread_rwlock_rdlock(&irmd->flows_lock); f = get_irm_flow(port_id); if (f == NULL) { @@ -1551,18 +1473,19 @@ static void irm_destroy(void) list_for_each_safe(p, h, &irmd->ipcps) { struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next); list_del(&e->next); - ipcp_destroy(e->api); - clear_spawned_api(e->api); - registry_del_api(&irmd->registry, e->api); ipcp_entry_destroy(e); } - list_for_each_safe(p, h, &irmd->spawned_apis) { + list_for_each(p, &irmd->spawned_apis) { struct pid_el * e = list_entry(p, struct pid_el, next); - int status; if (kill(e->pid, SIGTERM)) log_dbg("Could not send kill signal to %d.", e->pid); - else if (waitpid(e->pid, &status, 0) < 0) + } + + list_for_each_safe(p, h, &irmd->spawned_apis) { + struct pid_el * e = list_entry(p, struct pid_el, next); + int status; + if (waitpid(e->pid, &status, 0) < 0) log_dbg("Error waiting for %d to exit.", e->pid); list_del(&e->next); registry_del_api(&irmd->registry, e->pid); @@ -1940,12 +1863,6 @@ void * mainloop(void * o) ret_msg.has_api = true; ret_msg.api = e->n_1_api; break; - case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP: - ret_msg.has_result = true; - ret_msg.result = flow_alloc_resp(msg->api, - msg->port_id, - msg->response); - break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: e = flow_alloc(msg->api, msg->dst_name, @@ -1960,10 +1877,6 @@ void * mainloop(void * o) ret_msg.has_api = true; ret_msg.api = e->n_1_api; break; - case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: - ret_msg.has_result = true; - ret_msg.result = flow_alloc_res(msg->port_id); - break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; ret_msg.result = flow_dealloc(msg->api, msg->port_id); diff --git a/src/lib/dev.c b/src/lib/dev.c index 79797b92..e19083c3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -161,21 +161,21 @@ struct { } ai; /* FIXME: translate real spec to cube */ -static qoscube_t spec_to_cube(qosspec_t * spec) +static qoscube_t spec_to_cube(qosspec_t * qs) { - if (spec == NULL) + if (qs == NULL) return QOS_CUBE_BE; - return spec->cube; + return qs->cube; } /* FIXME: fill real spec */ -static void fill_qosspec(qosspec_t * spec, +static void fill_qosspec(qosspec_t * qs, qoscube_t cube) { - assert(spec); + assert(qs); - spec->cube = cube; + qs->cube = cube; } static int api_announce(char * ap_name) @@ -209,6 +209,17 @@ static int api_announce(char * ap_name) return ret; } +static void init_flow(int fd) +{ + assert(!(fd < 0)); + + memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); + + ai.flows[fd].port_id = -1; + ai.flows[fd].api = -1; + ai.flows[fd].cube = QOS_CUBE_BE; +} + static void reset_flow(int fd) { assert (!(fd < 0)); @@ -216,25 +227,17 @@ static void reset_flow(int fd) if (ai.flows[fd].port_id != -1) port_destroy(&ai.ports[ai.flows[fd].port_id]); - ai.flows[fd].port_id = -1; - if (ai.flows[fd].rx_rb != NULL) { + if (ai.flows[fd].rx_rb != NULL) shm_rbuff_close(ai.flows[fd].rx_rb); - ai.flows[fd].rx_rb = NULL; - } - if (ai.flows[fd].tx_rb != NULL) { + + if (ai.flows[fd].tx_rb != NULL) shm_rbuff_close(ai.flows[fd].tx_rb); - ai.flows[fd].tx_rb = NULL; - } - if (ai.flows[fd].set != NULL) { + if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); - ai.flows[fd].set = NULL; - } - ai.flows[fd].oflags = 0; - ai.flows[fd].api = -1; - ai.flows[fd].timesout = false; - ai.flows[fd].cube = QOS_CUBE_BE; + init_flow(fd); + } int ap_init(const char * ap_name) @@ -280,16 +283,8 @@ int ap_init(const char * ap_name) return -1; } - for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai.flows[i].rx_rb = NULL; - ai.flows[i].tx_rb = NULL; - ai.flows[i].set = NULL; - ai.flows[i].port_id = -1; - ai.flows[i].oflags = 0; - ai.flows[i].api = -1; - ai.flows[i].timesout = false; - ai.flows[i].cube = QOS_CUBE_BE; - } + for (i = 0; i < AP_MAX_FLOWS; ++i) + init_flow(i); ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); if (ai.ports == NULL) { @@ -382,7 +377,8 @@ void ap_fini() pthread_rwlock_destroy(&ai.data_lock); } -int flow_accept(qosspec_t * spec) +int flow_accept(qosspec_t * qs, + struct timespec * timeo) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -391,6 +387,13 @@ int flow_accept(qosspec_t * spec) msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; + if (timeo != NULL) { + msg.has_timeo_sec = true; + msg.has_timeo_usec = true; + msg.timeo_sec = timeo->tv_sec; + msg.timeo_usec = timeo->tv_nsec / 1000; + } + pthread_rwlock_rdlock(&ai.data_lock); msg.api = ai.api; @@ -424,7 +427,6 @@ int flow_accept(qosspec_t * spec) ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); if (ai.flows[fd].rx_rb == NULL) { - reset_flow(fd); bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -435,8 +437,10 @@ int flow_accept(qosspec_t * spec) ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); if (ai.flows[fd].tx_rb == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -455,8 +459,8 @@ int flow_accept(qosspec_t * spec) ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; - if (spec != NULL) - fill_qosspec(spec, ai.flows[fd].cube); + if (qs != NULL) + fill_qosspec(qs, ai.flows[fd].cube); ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -469,69 +473,27 @@ int flow_accept(qosspec_t * spec) return fd; } -int flow_alloc_resp(int fd, - int response) +int flow_alloc(const char * dst_name, + qosspec_t * qs, + struct timespec * timeo) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int ret = -1; - - if (fd < 0 || fd >= AP_MAX_FLOWS) - return -EBADF; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; - msg.has_api = true; - msg.api = ai.api; - msg.has_port_id = true; - - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -ENOTALLOC; - } - - msg.port_id = ai.flows[fd].port_id; - - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - - msg.has_response = true; - msg.response = response; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int flow_alloc(const char * dst_name, - qosspec_t * spec) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int fd = -1; - - if (dst_name == NULL) - return -EINVAL; + int fd; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = (char *) dst_name; msg.has_api = true; msg.has_qoscube = true; - msg.qoscube = spec_to_cube(spec); + msg.qoscube = spec_to_cube(qs); + + if (timeo != NULL) { + msg.has_timeo_sec = true; + msg.has_timeo_usec = true; + msg.timeo_sec = timeo->tv_sec; + msg.timeo_usec = timeo->tv_nsec / 1000; + } pthread_rwlock_rdlock(&ai.data_lock); @@ -561,7 +523,6 @@ int flow_alloc(const char * dst_name, ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); if (ai.flows[fd].rx_rb == NULL) { - reset_flow(fd); bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -571,16 +532,21 @@ int flow_alloc(const char * dst_name, ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); if (ai.flows[fd].tx_rb == NULL) { + reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].set = shm_flow_set_open(recv_msg->api); if (ai.flows[fd].set == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -589,7 +555,6 @@ int flow_alloc(const char * dst_name, ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; - ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; pthread_rwlock_unlock(&ai.flows_lock); @@ -600,48 +565,6 @@ int flow_alloc(const char * dst_name, return fd; } -int flow_alloc_res(int fd) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int result = 0; - - if (fd < 0 || fd >= AP_MAX_FLOWS) - return -EBADF; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; - msg.has_port_id = true; - - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_rdlock(&ai.flows_lock); - - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -ENOTALLOC; - } - - msg.port_id = ai.flows[fd].port_id; - - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - - recv_msg = send_recv_irm_msg_b(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - result = recv_msg->result; - - irm_msg__free_unpacked(recv_msg, NULL); - - return result; -} - int flow_dealloc(int fd) { irm_msg_t msg = IRM_MSG__INIT; @@ -804,9 +727,9 @@ int flow_set_timeout(int fd, } int flow_get_qosspec(int fd, - qosspec_t * spec) + qosspec_t * qs) { - if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL) + if (fd < 0 || fd >= AP_MAX_FLOWS || qs == NULL) return -EINVAL; pthread_rwlock_rdlock(&ai.data_lock); @@ -818,7 +741,7 @@ int flow_get_qosspec(int fd, return -ENOTALLOC; } - fill_qosspec(spec, ai.flows[fd].cube); + fill_qosspec(qs, ai.flows[fd].cube); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index c25d2c18..4fbd676e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -39,14 +39,12 @@ enum irm_msg_code { IRM_UNBIND_API = 11; IRM_REG = 12; IRM_UNREG = 13; - IRM_FLOW_ACCEPT = 14; - IRM_FLOW_ALLOC_RESP = 15; - IRM_FLOW_ALLOC = 16; - IRM_FLOW_ALLOC_RES = 17; - IRM_FLOW_DEALLOC = 18; - IPCP_FLOW_REQ_ARR = 19; - IPCP_FLOW_ALLOC_REPLY = 20; - IRM_REPLY = 21; + IRM_FLOW_ALLOC = 14; + IRM_FLOW_ACCEPT = 15; + IRM_FLOW_DEALLOC = 16; + IPCP_FLOW_REQ_ARR = 17; + IPCP_FLOW_ALLOC_REPLY = 18; + IRM_REPLY = 19; }; message irm_msg { @@ -63,5 +61,7 @@ message irm_msg { optional dif_config_msg conf = 11; optional uint32 opts = 12; repeated sint32 apis = 13; - optional sint32 result = 14; + optional uint32 timeo_sec = 14; + optional uint32 timeo_usec = 15; + optional sint32 result = 16; }; diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 615fbd2b..67abbb5b 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -302,7 +302,8 @@ int shm_flow_set_has(struct shm_flow_set * set, return ret; } -void shm_flow_set_notify(struct shm_flow_set * set, int port_id) +void shm_flow_set_notify(struct shm_flow_set * set, + int port_id) { assert(set); assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); diff --git a/src/tools/cbr/cbr_client.c b/src/tools/cbr/cbr_client.c index 16ade13d..5ec1d560 100644 --- a/src/tools/cbr/cbr_client.c +++ b/src/tools/cbr/cbr_client.c @@ -63,7 +63,6 @@ int client_main(char * server, struct sigaction sig_act; int fd = 0; - int result = 0; char buf[size]; long seqnr = 0; long gap = size * 8.0 * (BILLION / (double) rate); @@ -90,19 +89,12 @@ int client_main(char * server, printf("Client started, duration %d, rate %lu b/s, size %d B.\n", duration, rate, size); - fd = flow_alloc(server, NULL); + fd = flow_alloc(server, NULL, NULL); if (fd < 0) { printf("Failed to allocate flow.\n"); return -1; } - result = flow_alloc_res(fd); - if (result < 0) { - printf("Flow allocation refused.\n"); - flow_dealloc(fd); - return -1; - } - clock_gettime(CLOCK_REALTIME, &start); if (!flood) { while (!stop) { diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c index 9198858c..1a963a64 100644 --- a/src/tools/cbr/cbr_server.c +++ b/src/tools/cbr/cbr_server.c @@ -146,6 +146,8 @@ static void * worker(void * o) pthread_mutex_lock(&fds_lock); fds_count--; + + pthread_cond_signal(&fds_signal); pthread_mutex_unlock(&fds_lock); } @@ -154,8 +156,7 @@ static void * worker(void * o) static void * listener(void * o) { - int client_fd = 0; - int response = 0; + int fd = 0; qosspec_t qs; (void) o; @@ -164,8 +165,19 @@ static void * listener(void * o) server_settings.interval, server_settings.timeout); while (true) { - client_fd = flow_accept(&qs); - if (client_fd < 0) { + pthread_mutex_lock(&fds_lock); + pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, + (void *) &fds_lock); + + while (fds_count == THREADS_SIZE) { + printf("Can't accept any more flows, waiting.\n"); + pthread_cond_wait(&fds_signal, &fds_lock); + } + + pthread_cleanup_pop(true); + + fd = flow_accept(&qs, NULL); + if (fd < 0) { printf("Failed to accept flow.\n"); break; } @@ -174,26 +186,12 @@ static void * listener(void * o) pthread_mutex_lock(&fds_lock); - response = (fds_count < THREADS_SIZE) ? 0 : -1; - - if (flow_alloc_resp(client_fd, response)) { - printf("Failed to give an allocate response.\n"); - flow_dealloc(client_fd); - pthread_mutex_unlock(&fds_lock); - continue; - } - - if (response) { - printf("Can't accept any more flows, denying.\n"); - continue; - } - fds_count++; fds_index = (fds_index + 1) % THREADS_SIZE; - fds[fds_index] = client_fd; + fds[fds_index] = fd; - pthread_mutex_unlock(&fds_lock); pthread_cond_signal(&fds_signal); + pthread_mutex_unlock(&fds_lock); } return 0; diff --git a/src/tools/echo/echo_client.c b/src/tools/echo/echo_client.c index f84de73a..5ec2051f 100644 --- a/src/tools/echo/echo_client.c +++ b/src/tools/echo/echo_client.c @@ -26,25 +26,17 @@ int client_main(void) { int fd = 0; - int result = 0; char buf[BUF_SIZE]; char * message = "Client says hi!"; ssize_t count = 0; - fd = flow_alloc("echo", NULL); + fd = flow_alloc("echo", NULL, NULL); if (fd < 0) { printf("Failed to allocate flow.\n"); return -1; } - result = flow_alloc_res(fd); - if (result < 0) { - printf("Flow allocation refused.\n"); - flow_dealloc(fd); - return -1; - } - - if (flow_write(fd, message, strlen(message) + 1) == -1) { + if (flow_write(fd, message, strlen(message) + 1) < 0) { printf("Failed to write SDU.\n"); flow_dealloc(fd); return -1; diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c index aa136485..771155f4 100644 --- a/src/tools/echo/echo_server.c +++ b/src/tools/echo/echo_server.c @@ -37,7 +37,7 @@ void shutdown_server(int signo) int server_main(void) { - int client_fd = 0; + int fd = 0; char buf[BUF_SIZE]; ssize_t count = 0; qosspec_t qs; @@ -51,36 +51,30 @@ int server_main(void) } while (true) { - client_fd = flow_accept(&qs); - if (client_fd < 0) { + fd = flow_accept(&qs, NULL); + if (fd < 0) { printf("Failed to accept flow.\n"); break; } printf("New flow.\n"); - if (flow_alloc_resp(client_fd, 0)) { - printf("Failed to give an allocate response.\n"); - flow_dealloc(client_fd); - continue; - } - - count = flow_read(client_fd, &buf, BUF_SIZE); + count = flow_read(fd, &buf, BUF_SIZE); if (count < 0) { printf("Failed to read SDU.\n"); - flow_dealloc(client_fd); + flow_dealloc(fd); continue; } printf("Message from client is %.*s.\n", (int) count, buf); - if (flow_write(client_fd, buf, count) == -1) { + if (flow_write(fd, buf, count) == -1) { printf("Failed to write SDU.\n"); - flow_dealloc(client_fd); + flow_dealloc(fd); continue; } - flow_dealloc(client_fd); + flow_dealloc(fd); } return 0; diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c index d2f08ef4..7827b62b 100644 --- a/src/tools/operf/operf_client.c +++ b/src/tools/operf/operf_client.c @@ -182,18 +182,12 @@ int client_main(void) client.sent = 0; client.rcvd = 0; - fd = flow_alloc(client.s_apn, NULL); + fd = flow_alloc(client.s_apn, NULL, NULL); if (fd < 0) { printf("Failed to allocate flow.\n"); return -1; } - if (flow_alloc_res(fd)) { - printf("Flow allocation refused.\n"); - flow_dealloc(fd); - return -1; - } - clock_gettime(CLOCK_REALTIME, &tic); pthread_create(&client.reader_pt, NULL, reader, &fd); diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c index 3665d4cc..b17a4f7b 100644 --- a/src/tools/operf/operf_server.c +++ b/src/tools/operf/operf_server.c @@ -108,7 +108,7 @@ void * accept_thread(void * o) printf("Ouroboros perf server started.\n"); while (true) { - fd = flow_accept(&qs); + fd = flow_accept(&qs, NULL); if (fd < 0) { printf("Failed to accept flow.\n"); break; @@ -116,12 +116,6 @@ void * accept_thread(void * o) printf("New flow %d.\n", fd); - if (flow_alloc_resp(fd, 0)) { - printf("Failed to give an allocate response.\n"); - flow_dealloc(fd); - continue; - } - clock_gettime(CLOCK_REALTIME, &now); pthread_mutex_lock(&server.lock); diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index a91a126c..77a08db7 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -176,7 +176,6 @@ static int client_init(void) client.rtt_m2 = 0; pthread_mutex_init(&client.lock, NULL); - pthread_mutex_lock(&client.lock); return 0; } @@ -213,21 +212,13 @@ int client_main(void) return -1; } - fd = flow_alloc(client.s_apn, NULL); + fd = flow_alloc(client.s_apn, NULL, NULL); if (fd < 0) { printf("Failed to allocate flow.\n"); - return -1; - } - - if (flow_alloc_res(fd)) { - printf("Flow allocation refused.\n"); - flow_dealloc(fd); client_fini(); return -1; } - pthread_mutex_unlock(&client.lock); - clock_gettime(CLOCK_REALTIME, &tic); pthread_create(&client.reader_pt, NULL, reader, &fd); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index e20e236d..44a301ba 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -57,6 +57,7 @@ void * cleaner_thread(void * o) for (i = 0; i < OPING_MAX_FLOWS; ++i) if (flow_set_has(server.flows, i) && ts_diff_ms(&server.times[i], &now) > deadline_ms) { + printf("Flow %d timed out.\n", i); flow_set_del(server.flows, i); flow_dealloc(i); } @@ -110,8 +111,8 @@ void * server_thread(void *o) void * accept_thread(void * o) { - int fd = 0; - struct timespec now = {0, 0}; + int fd; + struct timespec now; qosspec_t qs; (void) o; @@ -119,7 +120,7 @@ void * accept_thread(void * o) printf("Ouroboros ping server started.\n"); while (true) { - fd = flow_accept(&qs); + fd = flow_accept(&qs, NULL); if (fd < 0) { printf("Failed to accept flow.\n"); break; @@ -127,12 +128,6 @@ void * accept_thread(void * o) printf("New flow %d.\n", fd); - if (flow_alloc_resp(fd, 0)) { - printf("Failed to give an allocate response.\n"); - flow_dealloc(fd); - continue; - } - clock_gettime(CLOCK_REALTIME, &now); pthread_mutex_lock(&server.lock); |