From c69dd846c5aa2bed4db16961c5774a20cea7f828 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 11 Oct 2016 13:39:43 +0200 Subject: lib: Track SDUs in the fast path This will allow to finalize deallocating flows until all SDUs have been processed. Read and write calls will now block when a flow was deallocated. Replaces NULL checks in the fast path with asserts. --- src/lib/dev.c | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) (limited to 'src/lib/dev.c') diff --git a/src/lib/dev.c b/src/lib/dev.c index e20d23d4..c1f769ad 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -356,6 +356,8 @@ int flow_accept(char ** ae_name) ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; + shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -478,6 +480,8 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); + irm_msg__free_unpacked(recv_msg, NULL); return fd; @@ -559,6 +563,8 @@ int flow_dealloc(int fd) bmp_release(ai.fds, fd); + shm_ap_rbuff_close_port(ai.rb, msg.port_id); + pthread_rwlock_unlock(&ai.flows_lock); send_irm_msg(&msg); @@ -630,10 +636,10 @@ ssize_t flow_write(int fd, void * buf, size_t count) DU_BUFF_TAILSPACE, buf, count); - if (idx == -1) { + if (idx < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return -EAGAIN; + return -idx; } e.index = idx; @@ -643,7 +649,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return -1; + return -ENOTALLOC; } } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; @@ -664,8 +670,12 @@ ssize_t flow_write(int fd, void * buf, size_t count) e.index = idx; e.port_id = ai.flows[fd].port_id; - while (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) - ; + if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { + shm_rdrbuff_remove(ai.rdrb, e.index); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -ENOTALLOC; + } } pthread_rwlock_unlock(&ai.flows_lock); @@ -838,6 +848,8 @@ int np1_flow_alloc(pid_t n_api, int port_id) ai.ports[port_id].fd = fd; port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + shm_ap_rbuff_open_port(ai.rb, port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -867,6 +879,8 @@ int np1_flow_dealloc(int port_id) port_destroy(&ai.ports[port_id]); + shm_ap_rbuff_close_port(ai.rb, port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -902,6 +916,8 @@ int np1_flow_resp(pid_t n_api, int port_id) ai.flows[fd].rb = rb; + shm_ap_rbuff_open_port(ai.rb, port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1011,6 +1027,9 @@ int ipcp_flow_alloc_reply(int fd, int response) msg.has_response = true; msg.response = response; + if (response) + shm_ap_rbuff_open_port(ai.rb, msg.port_id); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; -- cgit v1.2.3