diff options
Diffstat (limited to 'src/ipcpd/normal/frct.c')
-rw-r--r-- | src/ipcpd/normal/frct.c | 300 |
1 files changed, 211 insertions, 89 deletions
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 417815b7..abbde779 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -22,8 +22,6 @@ #define OUROBOROS_PREFIX "flow-rtx-control" -#define IDS_SIZE 2048 - #include <ouroboros/config.h> #include <ouroboros/logs.h> #include <ouroboros/bitmap.h> @@ -34,7 +32,8 @@ #include <pthread.h> #include "frct.h" - +#include "rmt.h" +#include "fmgr.h" enum conn_state { CONN_PENDING = 0, @@ -45,29 +44,29 @@ struct frct_i { uint32_t cep_id; uint32_t r_address; uint32_t r_cep_id; + enum qos_cube cube; + uint64_t seqno; enum conn_state state; - struct list_head next; }; -struct frct { - struct dt_const * dtc; +struct { uint32_t address; - struct list_head instances; pthread_mutex_t instances_lock; + struct frct_i ** instances; struct bmp * cep_ids; pthread_mutex_t cep_ids_lock; -} * frct = NULL; +} frct; static int next_cep_id() { int ret; - pthread_mutex_lock(&frct->cep_ids_lock); - ret = bmp_allocate(frct->cep_ids); - pthread_mutex_unlock(&frct->cep_ids_lock); + pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_allocate(frct.cep_ids); + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; } @@ -76,40 +75,34 @@ static int release_cep_id(int id) { int ret; - pthread_mutex_lock(&frct->cep_ids_lock); - ret = bmp_release(frct->cep_ids, id); - pthread_mutex_unlock(&frct->cep_ids_lock); + pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_release(frct.cep_ids, id); + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; } -int frct_init(struct dt_const * dtc, uint32_t address) +int frct_init(uint32_t address) { - if (dtc == NULL) - return -1; + int i; + frct.address = address; - frct = malloc(sizeof(*frct)); - if (frct == NULL) + if (pthread_mutex_init(&frct.cep_ids_lock, NULL)) return -1; - frct->dtc = dtc; - frct->address = address; - - INIT_LIST_HEAD(&frct->instances); - - if (pthread_mutex_init(&frct->cep_ids_lock, NULL)) { - free(frct); + if (pthread_mutex_init(&frct.instances_lock, NULL)) return -1; - } - if (pthread_mutex_init(&frct->instances_lock, NULL)) { - free(frct); + frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); + if (frct.instances == NULL) return -1; - } - frct->cep_ids = bmp_create(IDS_SIZE, 0); - if (frct->cep_ids == NULL) { - free(frct); + for (i = 0; i < IRMD_MAX_FLOWS; i++) + frct.instances[i] = NULL; + + frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1)); + if (frct.cep_ids == NULL) { + free(frct.instances); return -1; } @@ -118,117 +111,246 @@ int frct_init(struct dt_const * dtc, uint32_t address) int frct_fini() { - pthread_mutex_lock(&frct->cep_ids_lock); - bmp_destroy(frct->cep_ids); - pthread_mutex_unlock(&frct->cep_ids_lock); - free(frct); + pthread_mutex_lock(&frct.cep_ids_lock); + bmp_destroy(frct.cep_ids); + pthread_mutex_unlock(&frct.cep_ids_lock); + + free(frct.instances); return 0; } -struct dt_const * frct_dt_const() +static struct frct_i * create_frct_i(uint32_t address, + cep_id_t r_cep_id) { - if (frct == NULL) + struct frct_i * instance; + cep_id_t id; + + instance = malloc(sizeof(*instance)); + if (instance == NULL) return NULL; - return frct->dtc; -} + id = next_cep_id(); + instance->r_address = address; + instance->cep_id = id; + instance->r_cep_id = r_cep_id; + instance->state = CONN_PENDING; + instance->seqno = 0; -int frct_dt_flow(int fd, - enum qos_cube qos) -{ - LOG_MISSING; + frct.instances[id] = instance; - return -1; + return instance; } -int frct_rmt_post() +int frct_rmt_post_sdu(struct pci * pci, + struct shm_du_buff * sdb) { - LOG_MISSING; + struct frct_i * instance; + buffer_t buf; + cep_id_t id; - return -1; + if (pci == NULL || sdb == NULL) + return -1; + + if (pci->dst_cep_id == INVALID_CEP_ID) { + pthread_mutex_lock(&frct.instances_lock); + instance = create_frct_i(pci->src_addr, + pci->src_cep_id); + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + return -1; + } + id = instance->cep_id; + instance->r_cep_id = pci->src_cep_id; + pthread_mutex_unlock(&frct.instances_lock); + + buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + buf.data = shm_du_buff_head(sdb); + + if (fmgr_frct_post_buf(id, &buf)) { + LOG_ERR("Failed to hand buffer to FMGR."); + free(pci); + return -1; + } + } else { + /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ + if (fmgr_frct_post_sdu(pci->dst_cep_id, sdb)) { + LOG_ERR("Failed to hand SDU to FMGR."); + free(pci); + return -1; + } + } + + free(pci); + + return 0; } /* Call under instances lock */ static void destroy_frct_i(struct frct_i * instance) { release_cep_id(instance->cep_id); - list_del(&instance->next); + frct.instances[instance->cep_id] = NULL; free(instance); } -struct frct_i * frct_i_create(uint32_t address, - buffer_t * buf, - enum qos_cube cube) +cep_id_t frct_i_create(uint32_t address, + buffer_t * buf, + enum qos_cube cube) { struct frct_i * instance; + struct pci pci; + cep_id_t id; - if (buf == NULL || - buf->data == NULL) - return NULL; + if (buf == NULL || buf->data == NULL) + return INVALID_CEP_ID; - instance = malloc(sizeof(*instance)); - if (instance == NULL) - return NULL; - - pthread_mutex_lock(&frct->instances_lock); - - instance->r_address = address; - instance->cep_id = next_cep_id(); - instance->state = CONN_PENDING; + pthread_mutex_lock(&frct.instances_lock); + instance = create_frct_i(address, INVALID_CEP_ID); + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + return INVALID_CEP_ID; + } + id = instance->cep_id; + instance->cube = cube; + pthread_mutex_unlock(&frct.instances_lock); + + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = address; + pci.src_addr = frct.address; + pci.dst_cep_id = 0; + pci.src_cep_id = id; + pci.seqno = 0; + pci.qos_id = cube; + + if (rmt_frct_write_buf(&pci, buf)) { + free(instance); + LOG_ERR("Failed to hand PDU to RMT."); + return INVALID_CEP_ID; + } - INIT_LIST_HEAD(&instance->next); - list_add(&instance->next, &frct->instances); + return id; +} - pthread_mutex_unlock(&frct->instances_lock); +int frct_i_accept(cep_id_t id, + buffer_t * buf, + enum qos_cube cube) +{ + struct pci pci; + struct frct_i * instance; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + if (buf == NULL || buf->data == NULL) + return -1; - return instance; -} + pthread_mutex_lock(&frct.instances_lock); -int frct_i_accept(struct frct_i * instance, - buffer_t * buf) -{ - if (instance == NULL || buf == NULL || buf->data == NULL) + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); return -1; + } - pthread_mutex_lock(&frct->instances_lock); if (instance->state != CONN_PENDING) { - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); return -1; } instance->state = CONN_ESTABLISHED; - instance->cep_id = next_cep_id(); + instance->cube = cube; + instance->seqno = 0; - pthread_mutex_unlock(&frct->instances_lock); + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = 0; + pci.qos_id = cube; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + pthread_mutex_unlock(&frct.instances_lock); + + if (rmt_frct_write_buf(&pci, buf)) + return -1; return 0; } -int frct_i_destroy(struct frct_i * instance, - buffer_t * buf) +int frct_i_destroy(cep_id_t id, + buffer_t * buf) { - if (instance == NULL) - return -1; + struct pci pci; + struct frct_i * instance; + + pthread_mutex_lock(&frct.instances_lock); - pthread_mutex_lock(&frct->instances_lock); + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); + return -1; + } if (!(instance->state == CONN_PENDING || instance->state == CONN_ESTABLISHED)) { - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); return -1; } + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = 0; + pci.qos_id = instance->cube; + destroy_frct_i(instance); - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); + + if (buf != NULL && buf->data != NULL) + if (rmt_frct_write_buf(&pci, buf)) + return -1; + + return 0; +} - if (buf != NULL && buf->data != NULL) { +int frct_i_write_sdu(cep_id_t id, + struct shm_du_buff * sdb) +{ + struct pci pci; + struct frct_i * instance; + + if (sdb == NULL) + return -1; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + pthread_mutex_lock(&frct.instances_lock); + + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); + return -1; + } + + if (instance->state != CONN_ESTABLISHED) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Connection is not established."); + return -1; + } + + pci.pdu_type = PDU_TYPE_DTP; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = (instance->seqno)++; + pci.qos_id = instance->cube; + + if (rmt_frct_write_sdu(&pci, sdb)) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Failed to hand SDU to RMT."); + return -1; } return 0; |