From 7ba0fd0ce19244745c8d2512ce8a003783d914a7 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Thu, 30 Mar 2017 20:33:22 +0200 Subject: lib: Revise flow allocation API The flow_alloc_res and flow_alloc_resp calls have been removed. The flow_alloc and flow_accept calls are now both blocking and take an additional timeout argument. --- src/lib/dev.c | 193 +++++++++++++------------------------------- src/lib/irmd_messages.proto | 18 ++--- src/lib/shm_flow_set.c | 3 +- 3 files changed, 69 insertions(+), 145 deletions(-) (limited to 'src/lib') diff --git a/src/lib/dev.c b/src/lib/dev.c index 79797b92..e19083c3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -161,21 +161,21 @@ struct { } ai; /* FIXME: translate real spec to cube */ -static qoscube_t spec_to_cube(qosspec_t * spec) +static qoscube_t spec_to_cube(qosspec_t * qs) { - if (spec == NULL) + if (qs == NULL) return QOS_CUBE_BE; - return spec->cube; + return qs->cube; } /* FIXME: fill real spec */ -static void fill_qosspec(qosspec_t * spec, +static void fill_qosspec(qosspec_t * qs, qoscube_t cube) { - assert(spec); + assert(qs); - spec->cube = cube; + qs->cube = cube; } static int api_announce(char * ap_name) @@ -209,6 +209,17 @@ static int api_announce(char * ap_name) return ret; } +static void init_flow(int fd) +{ + assert(!(fd < 0)); + + memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); + + ai.flows[fd].port_id = -1; + ai.flows[fd].api = -1; + ai.flows[fd].cube = QOS_CUBE_BE; +} + static void reset_flow(int fd) { assert (!(fd < 0)); @@ -216,25 +227,17 @@ static void reset_flow(int fd) if (ai.flows[fd].port_id != -1) port_destroy(&ai.ports[ai.flows[fd].port_id]); - ai.flows[fd].port_id = -1; - if (ai.flows[fd].rx_rb != NULL) { + if (ai.flows[fd].rx_rb != NULL) shm_rbuff_close(ai.flows[fd].rx_rb); - ai.flows[fd].rx_rb = NULL; - } - if (ai.flows[fd].tx_rb != NULL) { + + if (ai.flows[fd].tx_rb != NULL) shm_rbuff_close(ai.flows[fd].tx_rb); - ai.flows[fd].tx_rb = NULL; - } - if (ai.flows[fd].set != NULL) { + if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); - ai.flows[fd].set = NULL; - } - ai.flows[fd].oflags = 0; - ai.flows[fd].api = -1; - ai.flows[fd].timesout = false; - ai.flows[fd].cube = QOS_CUBE_BE; + init_flow(fd); + } int ap_init(const char * ap_name) @@ -280,16 +283,8 @@ int ap_init(const char * ap_name) return -1; } - for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai.flows[i].rx_rb = NULL; - ai.flows[i].tx_rb = NULL; - ai.flows[i].set = NULL; - ai.flows[i].port_id = -1; - ai.flows[i].oflags = 0; - ai.flows[i].api = -1; - ai.flows[i].timesout = false; - ai.flows[i].cube = QOS_CUBE_BE; - } + for (i = 0; i < AP_MAX_FLOWS; ++i) + init_flow(i); ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); if (ai.ports == NULL) { @@ -382,7 +377,8 @@ void ap_fini() pthread_rwlock_destroy(&ai.data_lock); } -int flow_accept(qosspec_t * spec) +int flow_accept(qosspec_t * qs, + struct timespec * timeo) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -391,6 +387,13 @@ int flow_accept(qosspec_t * spec) msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; + if (timeo != NULL) { + msg.has_timeo_sec = true; + msg.has_timeo_usec = true; + msg.timeo_sec = timeo->tv_sec; + msg.timeo_usec = timeo->tv_nsec / 1000; + } + pthread_rwlock_rdlock(&ai.data_lock); msg.api = ai.api; @@ -424,7 +427,6 @@ int flow_accept(qosspec_t * spec) ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); if (ai.flows[fd].rx_rb == NULL) { - reset_flow(fd); bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -435,8 +437,10 @@ int flow_accept(qosspec_t * spec) ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); if (ai.flows[fd].tx_rb == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -455,8 +459,8 @@ int flow_accept(qosspec_t * spec) ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; - if (spec != NULL) - fill_qosspec(spec, ai.flows[fd].cube); + if (qs != NULL) + fill_qosspec(qs, ai.flows[fd].cube); ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -469,69 +473,27 @@ int flow_accept(qosspec_t * spec) return fd; } -int flow_alloc_resp(int fd, - int response) +int flow_alloc(const char * dst_name, + qosspec_t * qs, + struct timespec * timeo) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int ret = -1; - - if (fd < 0 || fd >= AP_MAX_FLOWS) - return -EBADF; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; - msg.has_api = true; - msg.api = ai.api; - msg.has_port_id = true; - - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -ENOTALLOC; - } - - msg.port_id = ai.flows[fd].port_id; - - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - - msg.has_response = true; - msg.response = response; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int flow_alloc(const char * dst_name, - qosspec_t * spec) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int fd = -1; - - if (dst_name == NULL) - return -EINVAL; + int fd; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = (char *) dst_name; msg.has_api = true; msg.has_qoscube = true; - msg.qoscube = spec_to_cube(spec); + msg.qoscube = spec_to_cube(qs); + + if (timeo != NULL) { + msg.has_timeo_sec = true; + msg.has_timeo_usec = true; + msg.timeo_sec = timeo->tv_sec; + msg.timeo_usec = timeo->tv_nsec / 1000; + } pthread_rwlock_rdlock(&ai.data_lock); @@ -561,7 +523,6 @@ int flow_alloc(const char * dst_name, ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); if (ai.flows[fd].rx_rb == NULL) { - reset_flow(fd); bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -571,16 +532,21 @@ int flow_alloc(const char * dst_name, ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); if (ai.flows[fd].tx_rb == NULL) { + reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].set = shm_flow_set_open(recv_msg->api); if (ai.flows[fd].set == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -589,7 +555,6 @@ int flow_alloc(const char * dst_name, ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; - ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; pthread_rwlock_unlock(&ai.flows_lock); @@ -600,48 +565,6 @@ int flow_alloc(const char * dst_name, return fd; } -int flow_alloc_res(int fd) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int result = 0; - - if (fd < 0 || fd >= AP_MAX_FLOWS) - return -EBADF; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; - msg.has_port_id = true; - - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_rdlock(&ai.flows_lock); - - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - return -ENOTALLOC; - } - - msg.port_id = ai.flows[fd].port_id; - - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - - recv_msg = send_recv_irm_msg_b(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - result = recv_msg->result; - - irm_msg__free_unpacked(recv_msg, NULL); - - return result; -} - int flow_dealloc(int fd) { irm_msg_t msg = IRM_MSG__INIT; @@ -804,9 +727,9 @@ int flow_set_timeout(int fd, } int flow_get_qosspec(int fd, - qosspec_t * spec) + qosspec_t * qs) { - if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL) + if (fd < 0 || fd >= AP_MAX_FLOWS || qs == NULL) return -EINVAL; pthread_rwlock_rdlock(&ai.data_lock); @@ -818,7 +741,7 @@ int flow_get_qosspec(int fd, return -ENOTALLOC; } - fill_qosspec(spec, ai.flows[fd].cube); + fill_qosspec(qs, ai.flows[fd].cube); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index c25d2c18..4fbd676e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -39,14 +39,12 @@ enum irm_msg_code { IRM_UNBIND_API = 11; IRM_REG = 12; IRM_UNREG = 13; - IRM_FLOW_ACCEPT = 14; - IRM_FLOW_ALLOC_RESP = 15; - IRM_FLOW_ALLOC = 16; - IRM_FLOW_ALLOC_RES = 17; - IRM_FLOW_DEALLOC = 18; - IPCP_FLOW_REQ_ARR = 19; - IPCP_FLOW_ALLOC_REPLY = 20; - IRM_REPLY = 21; + IRM_FLOW_ALLOC = 14; + IRM_FLOW_ACCEPT = 15; + IRM_FLOW_DEALLOC = 16; + IPCP_FLOW_REQ_ARR = 17; + IPCP_FLOW_ALLOC_REPLY = 18; + IRM_REPLY = 19; }; message irm_msg { @@ -63,5 +61,7 @@ message irm_msg { optional dif_config_msg conf = 11; optional uint32 opts = 12; repeated sint32 apis = 13; - optional sint32 result = 14; + optional uint32 timeo_sec = 14; + optional uint32 timeo_usec = 15; + optional sint32 result = 16; }; diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 615fbd2b..67abbb5b 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -302,7 +302,8 @@ int shm_flow_set_has(struct shm_flow_set * set, return ret; } -void shm_flow_set_notify(struct shm_flow_set * set, int port_id) +void shm_flow_set_notify(struct shm_flow_set * set, + int port_id) { assert(set); assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); -- cgit v1.2.3