From 65f5aa30dc5cb6a284785835b0b14e5b19dfaa9a Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 13 Aug 2017 11:29:46 +0200 Subject: lib: Fix data race on fqueues bitmap This locks the process when allocating and destroying flow_sets. The flows_lock has been renamed to lock. Refactors and fixes a memleak in ouroboros_init. --- src/lib/dev.c | 269 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 143 insertions(+), 126 deletions(-) diff --git a/src/lib/dev.c b/src/lib/dev.c index c8e43778..daa41a23 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -95,7 +95,7 @@ struct { struct flow * flows; struct port * ports; - pthread_rwlock_t flows_lock; + pthread_rwlock_t lock; } ai; static void port_destroy(struct port * p) @@ -144,11 +144,11 @@ static enum port_state port_wait_assign(int port_id) enum port_state state; struct port * p; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); p = &ai.ports[port_id]; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); pthread_mutex_lock(&p->state_lock); @@ -240,18 +240,18 @@ static int flow_init(int port_id, { int fd; - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_wrlock(&ai.lock); fd = bmp_allocate(ai.fds); if (!bmp_is_id_valid(ai.fds, fd)) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -EBADF; } ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOMEM; } @@ -259,7 +259,7 @@ static int flow_init(int port_id, if (ai.flows[fd].tx_rb == NULL) { flow_fini(fd); bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOMEM; } @@ -267,7 +267,7 @@ static int flow_init(int port_id, if (ai.flows[fd].set == NULL) { flow_fini(fd); bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOMEM; } @@ -281,14 +281,15 @@ static int flow_init(int port_id, port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return fd; } int ouroboros_init(const char * ap_name) { - int i = 0; + int i = 0; + int ret = -ENOMEM; assert(ai.ap_name == NULL); @@ -296,82 +297,87 @@ int ouroboros_init(const char * ap_name) ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS); if (ai.fds == NULL) - return -ENOMEM; + goto fail_fds; ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0); - if (ai.fqueues == NULL) { - bmp_destroy(ai.fds); - return -ENOMEM; - } + if (ai.fqueues == NULL) + goto fail_fqueues; ai.fqset = shm_flow_set_create(); - if (ai.fqset == NULL) { - bmp_destroy(ai.fqueues); - bmp_destroy(ai.fds); - return -ENOMEM; - } + if (ai.fqset == NULL) + goto fail_fqset; ai.rdrb = shm_rdrbuff_open(); if (ai.rdrb == NULL) { - shm_flow_set_destroy(ai.fqset); - bmp_destroy(ai.fqueues); - bmp_destroy(ai.fds); - return -EIRMD; + ret = -EIRMD; + goto fail_rdrb; } ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS); - if (ai.flows == NULL) { - shm_rdrbuff_close(ai.rdrb); - shm_flow_set_destroy(ai.fqset); - bmp_destroy(ai.fqueues); - bmp_destroy(ai.fds); - return -ENOMEM; - } + if (ai.flows == NULL) + goto fail_flows; for (i = 0; i < AP_MAX_FLOWS; ++i) flow_clear(i); ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); - if (ai.ports == NULL) { - free(ai.flows); - shm_rdrbuff_close(ai.rdrb); - shm_flow_set_destroy(ai.fqset); - bmp_destroy(ai.fqueues); - bmp_destroy(ai.fds); - return -ENOMEM; - } + if (ai.ports == NULL) + goto fail_ports; if (ap_name != NULL) { ai.ap_name = strdup(path_strip((char *) ap_name)); - if (ai.ap_name == NULL) { - free(ai.flows); - shm_rdrbuff_close(ai.rdrb); - shm_flow_set_destroy(ai.fqset); - bmp_destroy(ai.fqueues); - bmp_destroy(ai.fds); - return -ENOMEM; - } + if (ai.ap_name == NULL) + goto fail_ap_name; if (api_announce((char *) ai.ap_name)) { - free(ai.ap_name); - free(ai.flows); - shm_rdrbuff_close(ai.rdrb); - shm_flow_set_destroy(ai.fqset); - bmp_destroy(ai.fqueues); - bmp_destroy(ai.fds); - return -EIRMD; + ret = -EIRMD; + goto fail_announce; } } for (i = 0; i < IRMD_MAX_FLOWS; ++i) { ai.ports[i].state = PORT_INIT; - pthread_mutex_init(&ai.ports[i].state_lock, NULL); - pthread_cond_init(&ai.ports[i].state_cond, NULL); + if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) { + int j; + for (j = 0; j < i; ++j) + pthread_mutex_destroy(&ai.ports[j].state_lock); + goto fail_announce; + } + if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) { + int j; + for (j = 0; j < i; ++j) + pthread_cond_destroy(&ai.ports[j].state_cond); + goto fail_state_cond; + } } - pthread_rwlock_init(&ai.flows_lock, NULL); + if (pthread_rwlock_init(&ai.lock, NULL)) + goto fail_lock; return 0; + + fail_lock: + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + pthread_cond_destroy(&ai.ports[i].state_cond); + fail_state_cond: + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + pthread_mutex_destroy(&ai.ports[i].state_lock); + fail_announce: + free(ai.ap_name); + fail_ap_name: + free(ai.ports); + fail_ports: + free(ai.flows); + fail_flows: + shm_rdrbuff_close(ai.rdrb); + fail_rdrb: + shm_flow_set_destroy(ai.fqset); + fail_fqset: + bmp_destroy(ai.fqueues); + fail_fqueues: + bmp_destroy(ai.fds); + fail_fds: + return ret; } void ouroboros_fini() @@ -386,7 +392,7 @@ void ouroboros_fini() if (ai.ap_name != NULL) free(ai.ap_name); - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); for (i = 0; i < AP_MAX_FLOWS; ++i) { if (ai.flows[i].port_id != -1) { @@ -407,9 +413,9 @@ void ouroboros_fini() free(ai.flows); free(ai.ports); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); - pthread_rwlock_destroy(&ai.flows_lock); + pthread_rwlock_destroy(&ai.lock); } int flow_accept(qosspec_t * qs, @@ -531,13 +537,13 @@ int flow_dealloc(int fd) msg.has_api = true; msg.api = ai.api; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); assert(!(ai.flows[fd].port_id < 0)); msg.port_id = ai.flows[fd].port_id; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) @@ -551,12 +557,12 @@ int flow_dealloc(int fd) irm_msg__free_unpacked(recv_msg, NULL); - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_wrlock(&ai.lock); flow_fini(fd); bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return 0; } @@ -569,10 +575,10 @@ int flow_set_flags(int fd, if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_wrlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -584,7 +590,7 @@ int flow_set_flags(int fd, if (flags & FLOW_O_RDWR) shm_rbuff_unblock(ai.flows[fd].rx_rb); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return old; } @@ -596,16 +602,16 @@ int flow_get_flags(int fd) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_wrlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } old = ai.flows[fd].oflags; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return old; } @@ -618,10 +624,10 @@ int flow_get_timeout(int fd, if (fd < 0 || fd >= AP_MAX_FLOWS || timeo == NULL) return -EINVAL; - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_wrlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -630,7 +636,7 @@ int flow_get_timeout(int fd, else ret = -EPERM; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return ret; } @@ -641,10 +647,10 @@ int flow_set_timeout(int fd, if (fd < 0 || fd >= AP_MAX_FLOWS) return -EINVAL; - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_wrlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -655,7 +661,7 @@ int flow_set_timeout(int fd, ai.flows[fd].rcv_timeo = *timeo; } - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return 0; } @@ -666,16 +672,16 @@ int flow_get_qosspec(int fd, if (fd < 0 || fd >= AP_MAX_FLOWS || qs == NULL) return -EINVAL; - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_wrlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } *qs = ai.flows[fd].spec; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return 0; } @@ -692,15 +698,15 @@ ssize_t flow_write(int fd, if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } if ((ai.flows[fd].oflags & FLOW_O_ACCMODE) == FLOW_O_RDONLY) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -EPERM; } @@ -711,20 +717,20 @@ ssize_t flow_write(int fd, buf, count); if (idx < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return idx; } if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); assert(tx_rb); @@ -739,12 +745,12 @@ ssize_t flow_write(int fd, return -ENOTALLOC; } - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); } shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return 0; } @@ -760,22 +766,22 @@ ssize_t flow_read(int fd, if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { idx = shm_rbuff_read(ai.flows[fd].rx_rb); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); } else { struct shm_rbuff * rb = ai.flows[fd].rx_rb; bool timeo = ai.flows[fd].timesout; struct timespec timeout = ai.flows[fd].rcv_timeo; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); if (timeo) idx = shm_rbuff_read_b(rb, &timeout); @@ -809,12 +815,17 @@ struct flow_set * flow_set_create() assert(ai.fqueues); + pthread_rwlock_wrlock(&ai.lock); + set->idx = bmp_allocate(ai.fqueues); if (!bmp_is_id_valid(ai.fqueues, set->idx)) { + pthread_rwlock_unlock(&ai.lock); free(set); return NULL; } + pthread_rwlock_unlock(&ai.lock); + return set; } @@ -824,7 +835,13 @@ void flow_set_destroy(struct flow_set * set) return; flow_set_zero(set); + + pthread_rwlock_wrlock(&ai.lock); + bmp_release(ai.fqueues, set->idx); + + pthread_rwlock_unlock(&ai.lock); + free(set); } @@ -867,7 +884,7 @@ int flow_set_add(struct flow_set * set, if (set == NULL) return -EINVAL; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); @@ -875,7 +892,7 @@ int flow_set_add(struct flow_set * set, for (i = 0; i < sdus; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return ret; } @@ -886,12 +903,12 @@ void flow_set_del(struct flow_set * set, if (set == NULL) return; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id >= 0) shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); } bool flow_set_has(const struct flow_set * set, @@ -902,16 +919,16 @@ bool flow_set_has(const struct flow_set * set, if (set == NULL || fd < 0) return false; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return false; } ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return ret; } @@ -926,11 +943,11 @@ int fqueue_next(struct fqueue * fq) if (fq->fqsize == 0) return -EPERM; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); fd = ai.ports[fq->fqueue[fq->next++]].fd; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); if (fq->next == fq->fqsize) { fq->fqsize = 0; @@ -980,11 +997,11 @@ int np1_flow_dealloc(int port_id) { int fd; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); fd = ai.ports[port_id].fd; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return fd; } @@ -996,11 +1013,11 @@ int np1_flow_resp(int port_id) if (port_wait_assign(port_id) != PORT_ID_ASSIGNED) return -1; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); fd = ai.ports[port_id].fd; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return fd; } @@ -1086,11 +1103,11 @@ int ipcp_flow_alloc_reply(int fd, msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; msg.has_port_id = true; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); msg.port_id = ai.flows[fd].port_id; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); msg.has_response = true; msg.response = response; @@ -1121,16 +1138,16 @@ int ipcp_flow_read(int fd, assert(fd >=0); assert(sdb); - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); if ((port_id = ai.flows[fd].port_id) < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } rb = ai.flows[fd].rx_rb; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); idx = shm_rbuff_read(rb); if (idx < 0) @@ -1149,15 +1166,15 @@ int ipcp_flow_write(int fd, if (sdb == NULL) return -EINVAL; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } if ((ai.flows[fd].oflags & FLOW_O_ACCMODE) == FLOW_O_RDONLY) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -EPERM; } @@ -1168,7 +1185,7 @@ int ipcp_flow_write(int fd, shm_rbuff_write(ai.flows[fd].tx_rb, idx); shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return 0; } @@ -1201,11 +1218,11 @@ void ipcp_flow_fini(int fd) flow_set_flags(fd, FLOW_O_WRONLY); - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); rx_rb = ai.flows[fd].rx_rb; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); shm_rbuff_fini(rx_rb); } @@ -1216,16 +1233,16 @@ int ipcp_flow_get_qoscube(int fd, if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL) return -EINVAL; - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_wrlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } *cube = ai.flows[fd].cube; - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return 0; } @@ -1236,11 +1253,11 @@ ssize_t local_flow_read(int fd) assert(fd >= 0); - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); ret = shm_rbuff_read(ai.flows[fd].rx_rb); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return ret; } @@ -1251,10 +1268,10 @@ int local_flow_write(int fd, if (fd < 0) return -EINVAL; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -1262,7 +1279,7 @@ int local_flow_write(int fd, shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return 0; } @@ -1272,19 +1289,19 @@ int ipcp_read_shim(int fd, { ssize_t idx; - pthread_rwlock_rdlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.lock); assert(ai.flows[fd].rx_rb); idx = shm_rbuff_read(ai.flows[fd].rx_rb); if (idx < 0) { - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return -EAGAIN; } *sdb = shm_rdrbuff_get(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.lock); return 0; } -- cgit v1.2.3