summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2017-04-21 10:49:26 +0000
committerdimitri staessens <dimitri.staessens@ugent.be>2017-04-21 10:49:26 +0000
commit61ec9ed4da2938d8dfc06e05cc4212f080db398e (patch)
tree67b8576e9747d7815c7eed7170f49a10e5a4e0e0 /src/ipcpd/normal/fmgr.c
parent4bfd6c07281847405e127e9588376fcf20d07a7e (diff)
parenta9d71381a84886007625958b9daea6b2d4a50563 (diff)
downloadouroboros-61ec9ed4da2938d8dfc06e05cc4212f080db398e.tar.gz
ouroboros-61ec9ed4da2938d8dfc06e05cc4212f080db398e.zip
Merged in sandervrijders/ouroboros/be-fmgr-split (pull request #490)
ipcpd: normal: Split flow manager into DT and FA
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c748
1 files changed, 0 insertions, 748 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
deleted file mode 100644
index d055b311..00000000
--- a/src/ipcpd/normal/fmgr.c
+++ /dev/null
@@ -1,748 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Flow manager of the IPC Process
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define OUROBOROS_PREFIX "flow-manager"
-
-#include <ouroboros/config.h>
-#include <ouroboros/logs.h>
-#include <ouroboros/dev.h>
-#include <ouroboros/list.h>
-#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/fqueue.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/cacep.h>
-#include <ouroboros/rib.h>
-
-#include "connmgr.h"
-#include "fmgr.h"
-#include "frct.h"
-#include "ipcp.h"
-#include "shm_pci.h"
-#include "ribconfig.h"
-#include "pff.h"
-#include "neighbors.h"
-#include "gam.h"
-#include "routing.h"
-#include "sdu_sched.h"
-
-#include <stdlib.h>
-#include <stdbool.h>
-#include <pthread.h>
-#include <string.h>
-#include <inttypes.h>
-
-#include "flow_alloc.pb-c.h"
-typedef FlowAllocMsg flow_alloc_msg_t;
-
-#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */
-
-struct {
- pthread_rwlock_t np1_flows_lock;
- cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
- int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
-
- flow_set_t * np1_set[QOS_CUBE_MAX];
- struct sdu_sched * np1_sdu_sched;
-
- flow_set_t * nm1_set[QOS_CUBE_MAX];
- struct sdu_sched * nm1_sdu_sched;
-
- struct pff * pff[QOS_CUBE_MAX];
- struct routing_i * routing[QOS_CUBE_MAX];
-
- struct gam * gam;
- struct nbs * nbs;
- struct ae * ae;
-
- struct nb_notifier nb_notifier;
-} fmgr;
-
-static int fmgr_neighbor_event(enum nb_event event,
- struct conn conn)
-{
- qoscube_t cube;
-
- /* We are only interested in neighbors being added and removed. */
- switch (event) {
- case NEIGHBOR_ADDED:
- ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
- flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd);
- log_dbg("Added fd %d to flow set.", conn.flow_info.fd);
- break;
- case NEIGHBOR_REMOVED:
- ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
- flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd);
- log_dbg("Removed fd %d from flow set.", conn.flow_info.fd);
- break;
- default:
- break;
- }
-
- return 0;
-}
-
-static int np1_sdu_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
-{
- (void) qc;
-
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
-
- if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- log_warn("Failed to hand SDU to FRCT.");
- return -1;
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-static int nm1_sdu_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
-{
- struct pci pci;
-
- memset(&pci, 0, sizeof(pci));
-
- shm_pci_des(sdb, &pci);
-
- if (pci.dst_addr != ipcpi.dt_addr) {
- if (pci.ttl == 0) {
- log_dbg("TTL was zero.");
- ipcp_flow_del(sdb);
- return 0;
- }
-
- pff_lock(fmgr.pff[qc]);
-
- fd = pff_nhop(fmgr.pff[qc], pci.dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[qc]);
- log_err("No next hop for %" PRIu64, pci.dst_addr);
- ipcp_flow_del(sdb);
- return -1;
- }
-
- pff_unlock(fmgr.pff[qc]);
-
- if (ipcp_flow_write(fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fd);
- ipcp_flow_del(sdb);
- return -1;
- }
- } else {
- shm_pci_shrink(sdb);
-
- if (frct_nm1_post_sdu(&pci, sdb)) {
- log_err("Failed to hand PDU to FRCT.");
- return -1;
- }
- }
-
- return 0;
-}
-
-static void fmgr_destroy_flows(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- flow_set_destroy(fmgr.nm1_set[i]);
- flow_set_destroy(fmgr.np1_set[i]);
- }
-}
-
-static void fmgr_destroy_routing(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- routing_i_destroy(fmgr.routing[i]);
-}
-
-static void fmgr_destroy_pff(void)
-{
- int i;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- pff_destroy(fmgr.pff[i]);
-}
-
-int fmgr_init(void)
-{
- int i;
- int j;
- struct conn_info info;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
-
- for (i = 0; i < IPCPD_MAX_CONNS; ++i)
- fmgr.np1_cep_id_to_fd[i] = -1;
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fmgr.np1_set[i] = flow_set_create();
- if (fmgr.np1_set[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
-
- fmgr.nm1_set[i] = flow_set_create();
- if (fmgr.nm1_set[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
- }
-
- if (shm_pci_init()) {
- log_err("Failed to init shm pci.");
- fmgr_destroy_flows();
- return -1;
- }
-
- memset(&info, 0, sizeof(info));
-
- strcpy(info.ae_name, DT_AE);
- strcpy(info.protocol, FRCT_PROTO);
- info.pref_version = 1;
- info.pref_syntax = PROTO_FIXED;
- info.addr = ipcpi.dt_addr;
-
- fmgr.ae = connmgr_ae_create(info);
- if (fmgr.ae == NULL) {
- log_err("Failed to create AE struct.");
- fmgr_destroy_flows();
- return -1;
- }
-
- fmgr.nbs = nbs_create();
- if (fmgr.nbs == NULL) {
- log_err("Failed to create neighbors struct.");
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- fmgr.nb_notifier.notify_call = fmgr_neighbor_event;
- if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) {
- log_err("Failed to register notifier.");
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- if (routing_init(fmgr.nbs)) {
- log_err("Failed to init routing.");
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) {
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- fmgr.pff[i] = pff_create();
- if (fmgr.pff[i] == NULL) {
- for (j = 0; j < i; ++j)
- pff_destroy(fmgr.pff[j]);
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
-
- fmgr.routing[i] = routing_i_create(fmgr.pff[i]);
- if (fmgr.routing[i] == NULL) {
- for (j = 0; j < i; ++j)
- routing_i_destroy(fmgr.routing[j]);
- fmgr_destroy_pff();
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- routing_fini();
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
- nbs_destroy(fmgr.nbs);
- fmgr_destroy_flows();
- connmgr_ae_destroy(fmgr.ae);
- return -1;
- }
- }
-
- return 0;
-}
-
-void fmgr_fini()
-{
- nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
-
- fmgr_destroy_routing();
-
- fmgr_destroy_pff();
-
- routing_fini();
-
- fmgr_destroy_flows();
-
- connmgr_ae_destroy(fmgr.ae);
-
- nbs_destroy(fmgr.nbs);
-}
-
-int fmgr_start(void)
-{
- enum pol_gam pg;
-
- if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg))
- != sizeof(pg)) {
- log_err("Failed to read policy for ribmgr gam.");
- return -1;
- }
-
- fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae);
- if (fmgr.gam == NULL) {
- log_err("Failed to init dt graph adjacency manager.");
- return -1;
- }
-
- fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler);
- if (fmgr.nm1_sdu_sched == NULL) {
- log_err("Failed to create N-1 SDU scheduler.");
- gam_destroy(fmgr.gam);
- return -1;
- }
-
- fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler);
- if (fmgr.np1_sdu_sched == NULL) {
- log_err("Failed to create N+1 SDU scheduler.");
- sdu_sched_destroy(fmgr.nm1_sdu_sched);
- gam_destroy(fmgr.gam);
- return -1;
- }
-
- return 0;
-}
-
-void fmgr_stop(void)
-{
- sdu_sched_destroy(fmgr.np1_sdu_sched);
-
- sdu_sched_destroy(fmgr.nm1_sdu_sched);
-
- gam_destroy(fmgr.gam);
-}
-
-int fmgr_np1_alloc(int fd,
- const uint8_t * dst,
- qoscube_t cube)
-{
- cep_id_t cep_id;
- buffer_t buf;
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- char path[RIB_MAX_PATH_LEN + 1];
- uint64_t addr;
- ssize_t ch;
- ssize_t i;
- char ** children;
- char hashstr[ipcp_dir_hash_strlen() + 1];
- char * dst_ipcp = NULL;
-
- ipcp_hash_str(hashstr, dst);
-
- assert(strlen(hashstr) + strlen(DIR_PATH) + 1
- < RIB_MAX_PATH_LEN);
-
- strcpy(path, DIR_PATH);
-
- rib_path_append(path, hashstr);
-
- ch = rib_children(path, &children);
- if (ch <= 0)
- return -1;
-
- for (i = 0; i < ch; ++i)
- if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0)
- dst_ipcp = children[i];
- else
- free(children[i]);
-
- free(children);
-
- if (dst_ipcp == NULL)
- return -1;
-
- strcpy(path, MEMBERS_PATH);
-
- rib_path_append(path, dst_ipcp);
-
- free(dst_ipcp);
-
- if (rib_read(path, &addr, sizeof(addr)) < 0)
- return -1;
-
- msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
- msg.has_hash = true;
- msg.hash.len = ipcp_dir_hash_len();
- msg.hash.data = (uint8_t *) dst;
- msg.has_qoscube = true;
- msg.qoscube = cube;
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -1;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- cep_id = frct_i_create(addr, &buf, cube);
- if (cep_id == INVALID_CEP_ID) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- free(buf.data);
- return -1;
- }
-
- free(buf.data);
-
- fmgr.np1_fd_to_cep_id[fd] = cep_id;
- fmgr.np1_cep_id_to_fd[cep_id] = fd;
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-/* Call under np1_flows lock */
-static int np1_flow_dealloc(int fd)
-{
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- buffer_t buf;
- int ret;
- qoscube_t cube;
-
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_del(fmgr.np1_set[cube], fd);
-
- msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -ENOMEM;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
-
- fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID;
- fmgr.np1_fd_to_cep_id[fd] = -1;
-
- free(buf.data);
-
- return ret;
-}
-
-int fmgr_np1_alloc_resp(int fd,
- int response)
-{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- buffer_t buf;
-
- msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
- msg.response = response;
- msg.has_response = true;
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &ts);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0)
- return -1;
-
- buf.data = malloc(buf.len);
- if (buf.data == NULL)
- return -ENOMEM;
-
- flow_alloc_msg__pack(&msg, buf.data);
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- if (response < 0) {
- frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
- free(buf.data);
- fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]]
- = INVALID_CEP_ID;
- fmgr.np1_fd_to_cep_id[fd] = -1;
- } else {
- qoscube_t cube;
- ipcp_flow_get_qoscube(fd, &cube);
- if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- free(buf.data);
- return -1;
- }
- flow_set_add(fmgr.np1_set[cube], fd);
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- free(buf.data);
-
- return 0;
-}
-
-int fmgr_np1_dealloc(int fd)
-{
- int ret;
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- ret = np1_flow_dealloc(fd);
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return ret;
-}
-
-int fmgr_np1_post_buf(cep_id_t cep_id,
- buffer_t * buf)
-{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- int ret = 0;
- int fd;
- flow_alloc_msg_t * msg;
- qoscube_t cube;
-
- /* Depending on the message call the function in ipcp-dev.h */
-
- msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
- if (msg == NULL) {
- log_err("Failed to unpack flow alloc message");
- return -1;
- }
-
- switch (msg->code) {
- case FLOW_ALLOC_CODE__FLOW_REQ:
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- if (!msg->has_hash) {
- log_err("Bad flow request.");
- return -1;
- }
-
- while (ipcpi.alloc_id != -1 &&
- ipcp_get_state() == IPCP_OPERATIONAL)
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &ts);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- assert(ipcpi.alloc_id == -1);
-
- fd = ipcp_flow_req_arr(getpid(),
- msg->hash.data,
- ipcp_dir_hash_len(),
- msg->qoscube);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- log_err("Failed to get fd for flow.");
- return -1;
- }
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- fmgr.np1_fd_to_cep_id[fd] = cep_id;
- fmgr.np1_cep_id_to_fd[cep_id] = fd;
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- break;
- case FLOW_ALLOC_CODE__FLOW_REPLY:
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- ret = ipcp_flow_alloc_reply(fd, msg->response);
- if (msg->response < 0) {
- fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID;
- fmgr.np1_cep_id_to_fd[cep_id] = -1;
- } else {
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_add(fmgr.np1_set[cube],
- fmgr.np1_cep_id_to_fd[cep_id]);
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- break;
- case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_del(fmgr.np1_set[cube], fd);
- ret = flow_dealloc(fd);
- break;
- default:
- log_err("Got an unknown flow allocation message.");
- ret = -1;
- break;
- }
-
- flow_alloc_msg__free_unpacked(msg, NULL);
-
- return ret;
-}
-
-int fmgr_np1_post_sdu(cep_id_t cep_id,
- struct shm_du_buff * sdb)
-{
- int fd;
-
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
-
- fd = fmgr.np1_cep_id_to_fd[cep_id];
- if (ipcp_flow_write(fd, sdb)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- log_err("Failed to hand SDU to N flow.");
- return -1;
- }
-
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
-
- return 0;
-}
-
-int fmgr_nm1_write_sdu(struct pci * pci,
- struct shm_du_buff * sdb)
-{
- int fd;
-
- if (pci == NULL || sdb == NULL)
- return -EINVAL;
-
- pff_lock(fmgr.pff[pci->qos_id]);
- fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[pci->qos_id]);
- log_err("Could not get nhop for address %" PRIu64,
- pci->dst_addr);
- ipcp_flow_del(sdb);
- return -1;
- }
- pff_unlock(fmgr.pff[pci->qos_id]);
-
- if (shm_pci_ser(sdb, pci)) {
- log_err("Failed to serialize PDU.");
- ipcp_flow_del(sdb);
- return -1;
- }
-
- if (ipcp_flow_write(fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fd);
- ipcp_flow_del(sdb);
- return -1;
- }
-
- return 0;
-}
-
-int fmgr_nm1_write_buf(struct pci * pci,
- buffer_t * buf)
-{
- buffer_t * buffer;
- int fd;
-
- if (pci == NULL || buf == NULL || buf->data == NULL)
- return -EINVAL;
-
- pff_lock(fmgr.pff[pci->qos_id]);
- fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[pci->qos_id]);
- log_err("Could not get nhop for address %" PRIu64,
- pci->dst_addr);
- return -1;
- }
- pff_unlock(fmgr.pff[pci->qos_id]);
-
- buffer = shm_pci_ser_buf(buf, pci);
- if (buffer == NULL) {
- log_err("Failed to serialize buffer.");
- return -1;
- }
-
- if (flow_write(fd, buffer->data, buffer->len) == -1) {
- log_err("Failed to write buffer to fd.");
- free(buffer);
- return -1;
- }
-
- free(buffer->data);
- free(buffer);
- return 0;
-}