diff options
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r-- | src/ipcpd/unicast/fa.c | 853 |
1 files changed, 657 insertions, 196 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index e0727e85..3631fd7b 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2020 + * Ouroboros - Copyright (C) 2016 - 2024 * * Flow allocator of the IPC Process * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as @@ -31,43 +31,57 @@ #define FA "flow-allocator" #define OUROBOROS_PREFIX FA +#include <ouroboros/endian.h> #include <ouroboros/logs.h> #include <ouroboros/fqueue.h> #include <ouroboros/errno.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/rib.h> +#include <ouroboros/random.h> +#include <ouroboros/pthread.h> #include "dir.h" #include "fa.h" #include "psched.h" #include "ipcp.h" #include "dt.h" +#include "ca.h" -#include <pthread.h> +#include <inttypes.h> #include <stdlib.h> #include <string.h> -#define TIMEOUT 10000 /* nanoseconds */ +#if defined (IPCP_FLOW_STATS) && !defined(CLOCK_REALTIME_COARSE) +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +#define TIMEOUT 10 * MILLION /* nanoseconds */ + +#define FLOW_REQ 0 +#define FLOW_REPLY 1 +#define FLOW_UPDATE 2 +#define MSGBUFSZ 2048 -#define FLOW_REQ 0 -#define FLOW_REPLY 1 -#define MSGBUFSZ 2048 +#define STAT_FILE_LEN 0 struct fa_msg { uint64_t s_addr; - uint32_t r_eid; - uint32_t s_eid; + uint64_t r_eid; + uint64_t s_eid; uint8_t code; int8_t response; + uint16_t ece; /* QoS parameters from spec, aligned */ - uint8_t availability; - uint8_t in_order; uint32_t delay; uint64_t bandwidth; uint32_t loss; uint32_t ber; uint32_t max_gap; + uint32_t timeout; uint16_t cypher_s; + uint8_t availability; + uint8_t in_order; } __attribute__((packed)); struct cmd { @@ -75,11 +89,33 @@ struct cmd { struct shm_du_buff * sdb; }; +struct fa_flow { +#ifdef IPCP_FLOW_STATS + time_t stamp; /* Flow creation */ + size_t p_snd; /* Packets sent */ + size_t p_snd_f; /* Packets sent fail */ + size_t b_snd; /* Bytes sent */ + size_t b_snd_f; /* Bytes sent fail */ + size_t p_rcv; /* Packets received */ + size_t p_rcv_f; /* Packets received fail */ + size_t b_rcv; /* Bytes received */ + size_t b_rcv_f; /* Bytes received fail */ + size_t u_snd; /* Flow updates sent */ + size_t u_rcv; /* Flow updates received */ +#endif + uint64_t s_eid; /* Local endpoint id */ + uint64_t r_eid; /* Remote endpoint id */ + uint64_t r_addr; /* Remote address */ + void * ctx; /* Congestion avoidance context */ +}; + struct { pthread_rwlock_t flows_lock; - int r_eid[PROG_MAX_FLOWS]; - uint64_t r_addr[PROG_MAX_FLOWS]; - int fd; + struct fa_flow flows[PROG_MAX_FLOWS]; +#ifdef IPCP_FLOW_STATS + size_t n_flows; +#endif + uint32_t eid; struct list_head cmds; pthread_cond_t cond; @@ -89,26 +125,289 @@ struct { struct psched * psched; } fa; +static int fa_rib_read(const char * path, + char * buf, + size_t len) +{ +#ifdef IPCP_FLOW_STATS + struct fa_flow * flow; + int fd; + char r_addrstr[21]; + char s_eidstr[21]; + char r_eidstr[21]; + char tmstr[20]; + char castr[1024]; + char * entry; + struct tm * tm; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); + + if (fd < 0 || fd >= PROG_MAX_FLOWS) + return -1; + + if (len < 1536) + return 0; + + flow = &fa.flows[fd]; + + buf[0] = '\0'; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (flow->stamp ==0) { + pthread_rwlock_unlock(&fa.flows_lock); + return 0; + } + + sprintf(r_addrstr, "%" PRIu64, flow->r_addr); + sprintf(s_eidstr, "%" PRIu64, flow->s_eid); + sprintf(r_eidstr, "%" PRIu64, flow->r_eid); + + tm = localtime(&flow->stamp); + strftime(tmstr, sizeof(tmstr), "%F %T", tm); + + ca_print_stats(flow->ctx, castr, 1024); + + sprintf(buf, + "Flow established at: %20s\n" + "Remote address: %20s\n" + "Local endpoint ID: %20s\n" + "Remote endpoint ID: %20s\n" + "Sent (packets): %20zu\n" + "Sent (bytes): %20zu\n" + "Send failed (packets): %20zu\n" + "Send failed (bytes): %20zu\n" + "Received (packets): %20zu\n" + "Received (bytes): %20zu\n" + "Receive failed (packets): %20zu\n" + "Receive failed (bytes): %20zu\n" + "Sent flow updates (packets): %20zu\n" + "Received flow updates (packets): %20zu\n" + "%s", + tmstr, r_addrstr, + s_eidstr, r_eidstr, + flow->p_snd, flow->b_snd, + flow->p_snd_f, flow->b_snd_f, + flow->p_rcv, flow->b_rcv, + flow->b_rcv_f, flow->b_rcv_f, + flow->u_snd, flow->u_rcv, + castr); + + pthread_rwlock_unlock(&fa.flows_lock); + + return strlen(buf); +#else + (void) path; + (void) buf; + (void) len; + return 0; +#endif +} + +static int fa_rib_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS + char entry[RIB_PATH_LEN + 1]; + size_t i; + int idx = 0; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (fa.n_flows < 1) { + pthread_rwlock_unlock(&fa.flows_lock); + return 0; + } + + *buf = malloc(sizeof(**buf) * fa.n_flows); + if (*buf == NULL) { + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; + } + + for (i = 0; i < PROG_MAX_FLOWS; ++i) { + struct fa_flow * flow; + + flow = &fa.flows[i]; + if (flow->stamp == 0) + continue; + + sprintf(entry, "%zu", i); + + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) { + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; + } + + strcpy((*buf)[idx++], entry); + } + + assert((size_t) idx == fa.n_flows); + + pthread_rwlock_unlock(&fa.flows_lock); + + return idx; +#else + (void) buf; + return 0; +#endif +} + +static int fa_rib_getattr(const char * path, + struct rib_attr * attr) +{ +#ifdef IPCP_FLOW_STATS + int fd; + char * entry; + struct fa_flow * flow; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); + + flow = &fa.flows[fd]; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (flow->stamp != 0) { + attr->size = 1536; + attr->mtime = flow->stamp; + } else { + attr->size = 0; + attr->mtime = 0; + } + + pthread_rwlock_unlock(&fa.flows_lock); +#else + (void) path; + (void) attr; +#endif + return 0; +} + +static struct rib_ops r_ops = { + .read = fa_rib_read, + .readdir = fa_rib_readdir, + .getattr = fa_rib_getattr +}; + +static int eid_to_fd(uint64_t eid) +{ + struct fa_flow * flow; + int fd; + + fd = eid & 0xFFFFFFFF; + + if (fd < 0 || fd >= PROG_MAX_FLOWS) + return -1; + + flow = &fa.flows[fd]; + + if (flow->s_eid == eid) + return fd; + + return -1; +} + +static uint64_t gen_eid(int fd) +{ + uint32_t rnd; + + if (random_buffer(&rnd, sizeof(rnd)) < 0) + return fa.eid; /* INVALID */ + + fd &= 0xFFFFFFFF; + + return ((uint64_t) rnd << 32) + fd; +} + static void packet_handler(int fd, qoscube_t qc, struct shm_du_buff * sdb) { - pthread_rwlock_rdlock(&fa.flows_lock); + struct fa_flow * flow; + uint64_t r_addr; + uint64_t r_eid; + ca_wnd_t wnd; + size_t len; - if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { - pthread_rwlock_unlock(&fa.flows_lock); + flow = &fa.flows[fd]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + len = shm_du_buff_len(sdb); + +#ifdef IPCP_FLOW_STATS + ++flow->p_snd; + flow->b_snd += len; +#endif + wnd = ca_ctx_update_snd(flow->ctx, len); + + r_addr = flow->r_addr; + r_eid = flow->r_eid; + + pthread_rwlock_unlock(&fa.flows_lock); + + ca_wnd_wait(wnd); + + if (dt_write_packet(r_addr, qc, r_eid, sdb)) { ipcp_sdb_release(sdb); - log_warn("Failed to forward packet."); + log_dbg("Failed to forward packet."); +#ifdef IPCP_FLOW_STATS + pthread_rwlock_wrlock(&fa.flows_lock); + ++flow->p_snd_f; + flow->b_snd_f += len; + pthread_rwlock_unlock(&fa.flows_lock); +#endif return; } +} - pthread_rwlock_unlock(&fa.flows_lock); +static int fa_flow_init(struct fa_flow * flow) +{ +#ifdef IPCP_FLOW_STATS + struct timespec now; +#endif + memset(flow, 0, sizeof(*flow)); + + flow->r_eid = -1; + flow->s_eid = -1; + flow->r_addr = INVALID_ADDR; + + flow->ctx = ca_ctx_create(); + if (flow->ctx == NULL) + return -1; + +#ifdef IPCP_FLOW_STATS + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + flow->stamp = now.tv_sec; + + ++fa.n_flows; +#endif + return 0; } -static void destroy_conn(int fd) +static void fa_flow_fini(struct fa_flow * flow) { - fa.r_eid[fd] = -1; - fa.r_addr[fd] = INVALID_ADDR; + ca_ctx_destroy(flow->ctx); + + memset(flow, 0, sizeof(*flow)); + + flow->r_eid = -1; + flow->s_eid = -1; + flow->r_addr = INVALID_ADDR; + +#ifdef IPCP_FLOW_STATS + --fa.n_flows; +#endif } static void fa_post_packet(void * comp, @@ -138,146 +437,200 @@ static void fa_post_packet(void * comp, pthread_mutex_unlock(&fa.mtx); } -static void * fa_handle_packet(void * o) +static size_t fa_wait_for_fa_msg(struct fa_msg * msg) { - struct timespec ts = {0, TIMEOUT * 1000}; + struct cmd * cmd; + size_t len; - (void) o; + pthread_mutex_lock(&fa.mtx); - while (true) { - struct timespec abstime; - int fd; - uint8_t buf[MSGBUFSZ]; - struct fa_msg * msg; - qosspec_t qs; - struct cmd * cmd; - size_t len; - size_t msg_len; + pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); - pthread_mutex_lock(&fa.mtx); + while (list_is_empty(&fa.cmds)) + pthread_cond_wait(&fa.cond, &fa.mtx); - pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, - &fa.mtx); + cmd = list_last_entry(&fa.cmds, struct cmd, next); + list_del(&cmd->next); - while (list_is_empty(&fa.cmds)) - pthread_cond_wait(&fa.cond, &fa.mtx); + pthread_cleanup_pop(true); - cmd = list_last_entry(&fa.cmds, struct cmd, next); - list_del(&cmd->next); + len = shm_du_buff_len(cmd->sdb); + if (len > MSGBUFSZ || len < sizeof(*msg)) { + log_warn("Invalid flow allocation message (len: %zd).", len); + free(cmd); + return 0; /* No valid message */ + } - pthread_cleanup_pop(true); + memcpy(msg, shm_du_buff_head(cmd->sdb), len); - len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + ipcp_sdb_release(cmd->sdb); - if (len > MSGBUFSZ) { - log_err("Message over buffer size."); - free(cmd); - continue; - } + free(cmd); - msg = (struct fa_msg *) buf; + return len; +} - /* Depending on the message call the function in ipcp-dev.h */ +static int fa_handle_flow_req(struct fa_msg * msg, + size_t len) +{ + size_t msg_len; + int fd; + qosspec_t qs; + struct fa_flow * flow; + uint8_t * dst; + buffer_t data; /* Piggbacked data on flow alloc request. */ + + msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + if (len < msg_len) { + log_err("Invalid flow allocation request"); + return -EPERM; + } - memcpy(msg, shm_du_buff_head(cmd->sdb), len); + dst = (uint8_t *)(msg + 1); + data.data = (uint8_t *) msg + msg_len; + data.len = len - msg_len; - ipcp_sdb_release(cmd->sdb); + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); - free(cmd); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data); + if (fd < 0) + return fd; - switch (msg->code) { - case FLOW_REQ: - msg_len = sizeof(*msg) + ipcp_dir_hash_len(); - - assert(len >= msg_len); - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Won't allocate over non-operational" - "IPCP."); - continue; - } - - assert(ipcpi.alloc_id == -1); - - qs.delay = ntoh32(msg->delay); - qs.bandwidth = ntoh64(msg->bandwidth); - qs.availability = msg->availability; - qs.loss = ntoh32(msg->loss); - qs.ber = ntoh32(msg->ber); - qs.in_order = msg->in_order; - qs.max_gap = ntoh32(msg->max_gap); - qs.cypher_s = ntoh16(msg->cypher_s); - - fd = ipcp_flow_req_arr((uint8_t *) (msg + 1), - ipcp_dir_hash_len(), - qs, - buf + msg_len, - len - msg_len); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_err("Failed to get fd for flow."); - continue; - } - - pthread_rwlock_wrlock(&fa.flows_lock); - - fa.r_eid[fd] = ntoh32(msg->s_eid); - fa.r_addr[fd] = ntoh64(msg->s_addr); + flow = &fa.flows[fd]; - pthread_rwlock_unlock(&fa.flows_lock); + pthread_rwlock_wrlock(&fa.flows_lock); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); + fa_flow_init(flow); - pthread_mutex_unlock(&ipcpi.alloc_lock); + flow->s_eid = gen_eid(fd); + flow->r_eid = ntoh64(msg->s_eid); + flow->r_addr = ntoh64(msg->s_addr); - break; - case FLOW_REPLY: - assert(len >= sizeof(*msg)); + pthread_rwlock_unlock(&fa.flows_lock); - pthread_rwlock_wrlock(&fa.flows_lock); + return fd; +} - fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); +static int fa_handle_flow_reply(struct fa_msg * msg, + size_t len) +{ + int fd; + struct fa_flow * flow; + buffer_t data; /* Piggbacked data on flow alloc request. */ + time_t mpl = IPCP_UNICAST_MPL; - ipcp_flow_alloc_reply(ntoh32(msg->r_eid), - msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); + assert(len >= sizeof(*msg)); - if (msg->response < 0) - destroy_conn(ntoh32(msg->r_eid)); - else - psched_add(fa.psched, ntoh32(msg->r_eid)); + data.data = (uint8_t *) msg + sizeof(*msg); + data.len = len - sizeof(*msg); - pthread_rwlock_unlock(&fa.flows_lock); + pthread_rwlock_wrlock(&fa.flows_lock); + + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow reply for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); + return -ENOTALLOC; + } + + flow = &fa.flows[fd]; + + flow->r_eid = ntoh64(msg->s_eid); + + if (msg->response < 0) + fa_flow_fini(flow); + else + psched_add(fa.psched, fd); + + pthread_rwlock_unlock(&fa.flows_lock); + + if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) { + log_err("Failed to reply for flow allocation on fd %d.", fd); + return -EIRMD; + } + + return 0; +} + +static int fa_handle_flow_update(struct fa_msg * msg, + size_t len) +{ + struct fa_flow * flow; + int fd; + + (void) len; + assert(len >= sizeof(*msg)); + + pthread_rwlock_wrlock(&fa.flows_lock); + + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow update for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); + return -EPERM; + } + + flow = &fa.flows[fd]; +#ifdef IPCP_FLOW_STATS + flow->u_rcv++; +#endif + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); + + pthread_rwlock_unlock(&fa.flows_lock); + + return 0; +} + +static void * fa_handle_packet(void * o) +{ + (void) o; + + while (true) { + uint8_t buf[MSGBUFSZ]; + struct fa_msg * msg; + size_t len; + + msg = (struct fa_msg *) buf; + + len = fa_wait_for_fa_msg(msg); + if (len == 0) + continue; + switch (msg->code) { + case FLOW_REQ: + if (fa_handle_flow_req(msg, len) < 0) + log_err("Error handling flow alloc request."); + break; + case FLOW_REPLY: + if (fa_handle_flow_reply(msg, len) < 0) + log_err("Error handling flow reply."); + break; + case FLOW_UPDATE: + if (fa_handle_flow_update(msg, len) < 0) + log_err("Error handling flow update."); break; default: - log_err("Got an unknown flow allocation message."); + log_warn("Recieved unknown flow allocation message."); break; } } + + return (void *) 0; } int fa_init(void) { - int i; - - for (i = 0; i < PROG_MAX_FLOWS; ++i) - destroy_conn(i); + pthread_condattr_t cattr; if (pthread_rwlock_init(&fa.flows_lock, NULL)) goto fail_rwlock; @@ -285,26 +638,45 @@ int fa_init(void) if (pthread_mutex_init(&fa.mtx, NULL)) goto fail_mtx; - if (pthread_cond_init(&fa.cond, NULL)) + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&fa.cond, &cattr)) goto fail_cond; + pthread_condattr_destroy(&cattr); + list_head_init(&fa.cmds); - fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); + if (rib_reg(FA, &r_ops)) + goto fail_rib_reg; + + fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); + if ((int) fa.eid < 0) + goto fail_rib_reg; return 0; + fail_rib_reg: + pthread_cond_destroy(&fa.cond); fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: pthread_mutex_destroy(&fa.mtx); fail_mtx: pthread_rwlock_destroy(&fa.flows_lock); fail_rwlock: - log_err("Failed to initialize flow allocator."); + return -1; } void fa_fini(void) { + rib_unreg(FA); + pthread_cond_destroy(&fa.cond);; pthread_mutex_destroy(&fa.mtx); pthread_rwlock_destroy(&fa.flows_lock); @@ -316,7 +688,7 @@ int fa_start(void) int pol; int max; - fa.psched = psched_create(packet_handler); + fa.psched = psched_create(packet_handler, np1_flow_read); if (fa.psched == NULL) { log_err("Failed to start packet scheduler."); goto fail_psched; @@ -353,7 +725,6 @@ int fa_start(void) fail_thread: psched_destroy(fa.psched); fail_psched: - log_err("Failed to start flow allocator."); return -1; } @@ -365,17 +736,18 @@ void fa_stop(void) psched_destroy(fa.psched); } -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t dlen) +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct fa_msg * msg; - uint64_t addr; struct shm_du_buff * sdb; - qoscube_t qc; + struct fa_flow * flow; + uint64_t addr; + qoscube_t qc = QOS_CUBE_BE; size_t len; + uint64_t eid; addr = dir_query(dst); if (addr == 0) @@ -383,12 +755,16 @@ int fa_alloc(int fd, len = sizeof(*msg) + ipcp_dir_hash_len(); - if (ipcp_sdb_reserve(&sdb, len + dlen)) + if (ipcp_sdb_reserve(&sdb, len + data->len)) return -1; - msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg = (struct fa_msg *) shm_du_buff_head(sdb); + memset(msg, 0, sizeof(*msg)); + + eid = gen_eid(fd); + msg->code = FLOW_REQ; - msg->s_eid = hton32(fd); + msg->s_eid = hton64(eid); msg->s_addr = hton64(ipcpi.dt_addr); msg->delay = hton32(qs.delay); msg->bandwidth = hton64(qs.bandwidth); @@ -398,111 +774,196 @@ int fa_alloc(int fd, msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - memcpy(shm_du_buff_head(sdb) + len, data, dlen); - - qc = qos_spec_to_cube(qs); + if (data->len > 0) + memcpy(shm_du_buff_head(sdb) + len, data->data, data->len); - if (dt_write_packet(addr, qc, fa.fd, sdb)) { + if (dt_write_packet(addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation request packet."); ipcp_sdb_release(sdb); return -1; } + flow = &fa.flows[fd]; + pthread_rwlock_wrlock(&fa.flows_lock); - assert(fa.r_eid[fd] == -1); - fa.r_addr[fd] = addr; + fa_flow_init(flow); + flow->r_addr = addr; + flow->s_eid = eid; pthread_rwlock_unlock(&fa.flows_lock); return 0; } -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len) +int fa_alloc_resp(int fd, + int response, + const buffer_t * data) { - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; struct fa_msg * msg; struct shm_du_buff * sdb; - qoscube_t qc; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); + struct fa_flow * flow; + qoscube_t qc = QOS_CUBE_BE; - pthread_mutex_lock(&ipcpi.alloc_lock); + flow = &fa.flows[fd]; - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); + goto fail_alloc_resp; } - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) { + log_err("Failed to reserve sdb (%zu bytes).", + sizeof(*msg) + data->len); + goto fail_reserve; } - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); + msg = (struct fa_msg *) shm_du_buff_head(sdb); + memset(msg, 0, sizeof(*msg)); - pthread_mutex_unlock(&ipcpi.alloc_lock); + msg->code = FLOW_REPLY; + msg->response = response; + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { - destroy_conn(fd); - return -1; - } + pthread_rwlock_rdlock(&fa.flows_lock); - pthread_rwlock_wrlock(&fa.flows_lock); + msg->r_eid = hton64(flow->r_eid); + msg->s_eid = hton64(flow->s_eid); - msg = (struct fa_msg *) shm_du_buff_head(sdb); - msg->code = FLOW_REPLY; - msg->r_eid = hton32(fa.r_eid[fd]); - msg->s_eid = hton32(fd); - msg->response = response; + pthread_rwlock_unlock(&fa.flows_lock); - memcpy(msg + 1, data, len); + if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation response packet."); + goto fail_packet; + } if (response < 0) { - destroy_conn(fd); - ipcp_sdb_release(sdb); + pthread_rwlock_rdlock(&fa.flows_lock); + fa_flow_fini(flow); + pthread_rwlock_unlock(&fa.flows_lock); } else { psched_add(fa.psched, fd); } - ipcp_flow_get_qoscube(fd, &qc); + return 0; + + fail_packet: + ipcp_sdb_release(sdb); + fail_reserve: + pthread_rwlock_wrlock(&fa.flows_lock); + fa_flow_fini(flow); + pthread_rwlock_unlock(&fa.flows_lock); + fail_alloc_resp: + return -1; +} + +int fa_dealloc(int fd) +{ + if (ipcp_flow_fini(fd) < 0) + return 0; + + psched_del(fa.psched, fd); - assert(qc >= 0 && qc < QOS_CUBE_MAX); + pthread_rwlock_wrlock(&fa.flows_lock); - if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { - destroy_conn(fd); - pthread_rwlock_unlock(&fa.flows_lock); - ipcp_sdb_release(sdb); + fa_flow_fini(&fa.flows[fd]); + + pthread_rwlock_unlock(&fa.flows_lock); + + ipcp_flow_dealloc(fd); + + return 0; +} + +static int fa_update_remote(int fd, + uint16_t ece) +{ + struct fa_msg * msg; + struct shm_du_buff * sdb; + qoscube_t qc = QOS_CUBE_BE; + struct fa_flow * flow; + uint64_t r_addr; + + if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { + log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg)); return -1; } + msg = (struct fa_msg *) shm_du_buff_head(sdb); + + memset(msg, 0, sizeof(*msg)); + + flow = &fa.flows[fd]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + msg->code = FLOW_UPDATE; + msg->r_eid = hton64(flow->r_eid); + msg->ece = hton16(ece); + + r_addr = flow->r_addr; +#ifdef IPCP_FLOW_STATS + flow->u_snd++; +#endif pthread_rwlock_unlock(&fa.flows_lock); + + if (dt_write_packet(r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow update packet."); + ipcp_sdb_release(sdb); + return -1; + } + return 0; } -int fa_dealloc(int fd) +void fa_np1_rcv(uint64_t eid, + uint8_t ecn, + struct shm_du_buff * sdb) { - if (ipcp_flow_fini(fd) < 0) - return 0; + struct fa_flow * flow; + bool update; + uint16_t ece; + int fd; + size_t len; + + len = shm_du_buff_len(sdb); pthread_rwlock_wrlock(&fa.flows_lock); - psched_del(fa.psched, fd); + fd = eid_to_fd(eid); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_dbg("Received packet for unknown EID %" PRIu64 ".", eid); + ipcp_sdb_release(sdb); + return; + } + + flow = &fa.flows[fd]; - destroy_conn(fd); +#ifdef IPCP_FLOW_STATS + ++flow->p_rcv; + flow->b_rcv += len; +#endif + update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece); pthread_rwlock_unlock(&fa.flows_lock); - flow_dealloc(fd); + if (ipcp_flow_write(fd, sdb) < 0) { + log_dbg("Failed to write to flow %d.", fd); + ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_rwlock_wrlock(&fa.flows_lock); + ++flow->p_rcv_f; + flow->b_rcv_f += len; + pthread_rwlock_unlock(&fa.flows_lock); +#endif + } - return 0; + if (update) + fa_update_remote(eid, ece); } |