From e82a5c416673e1eb2fbafb1deaaa8ac07971215e Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 14 May 2016 21:10:08 +0200 Subject: lib: dev.c: added locking Locking is required for multi-threaded applications. Flows are locked separately. Read/Write locks are used for concurrent reads. --- src/lib/dev.c | 212 ++++++++++++++++++++++++++++++++++++++++--------- src/lib/shm_ap_rbuff.c | 4 +- 2 files changed, 179 insertions(+), 37 deletions(-) diff --git a/src/lib/dev.c b/src/lib/dev.c index ae27a05f..440f40f9 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -47,9 +48,11 @@ struct ap_data { instance_name_t * api; struct shm_du_map * dum; struct bmp * fds; - struct shm_ap_rbuff * rb; + rw_lock_t data_lock; + struct flow flows[AP_MAX_FLOWS]; + rw_lock_t flows_lock; } * _ap_instance; int ap_init(char * ap_name) @@ -92,14 +95,19 @@ int ap_init(char * ap_name) _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { instance_name_destroy(_ap_instance->api); + shm_du_map_close(_ap_instance->dum); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; } - for (i = 0; i < AP_MAX_FLOWS; ++i) + for (i = 0; i < AP_MAX_FLOWS; ++i) { _ap_instance->flows[i].rb = NULL; + _ap_instance->flows[i].port_id = -1; + } + rw_lock_init(&_ap_instance->flows_lock); + rw_lock_init(&_ap_instance->data_lock); return 0; } @@ -110,6 +118,9 @@ void ap_fini(void) if (_ap_instance == NULL) return; + + rw_lock_wrlock(&_ap_instance->data_lock); + if (_ap_instance->api != NULL) instance_name_destroy(_ap_instance->api); if (_ap_instance->fds != NULL) @@ -122,6 +133,8 @@ void ap_fini(void) if (_ap_instance->flows[i].rb != NULL) shm_ap_rbuff_close(_ap_instance->flows[i].rb); + rw_lock_unlock(&_ap_instance->data_lock); + free(_ap_instance); } @@ -142,7 +155,7 @@ int ap_reg(char ** difs, { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = bmp_allocate(_ap_instance->fds); + int fd = -1; if (difs == NULL || len == 0 || @@ -157,11 +170,16 @@ int ap_reg(char ** difs, msg.code = IRM_MSG_CODE__IRM_AP_REG; msg.has_pid = true; - msg.pid = _ap_instance->api->id; - msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; msg.n_dif_name = len; + rw_lock_rdlock(&_ap_instance->data_lock); + + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; + + rw_lock_unlock(&_ap_instance->data_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -176,6 +194,12 @@ int ap_reg(char ** difs, irm_msg__free_unpacked(recv_msg, NULL); + rw_lock_wrlock(&_ap_instance->data_lock); + + fd = bmp_allocate(_ap_instance->fds); + + rw_lock_unlock(&_ap_instance->data_lock); + return fd; } @@ -194,11 +218,16 @@ int ap_unreg(char ** difs, msg.code = IRM_MSG_CODE__IRM_AP_UNREG; msg.has_pid = true; - msg.pid = _ap_instance->api->id; - msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; msg.n_dif_name = len; + rw_lock_rdlock(&_ap_instance->data_lock); + + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; + + rw_lock_unlock(&_ap_instance->data_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -224,8 +253,13 @@ int flow_accept(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; + + rw_lock_rdlock(&_ap_instance->data_lock); + msg.pid = _ap_instance->api->id; + rw_lock_unlock(&_ap_instance->data_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -235,18 +269,8 @@ int flow_accept(int fd, return -1; } - cfd = bmp_allocate(_ap_instance->fds); - - _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); - if (_ap_instance->flows[cfd].rb == NULL) { - bmp_release(_ap_instance->fds, cfd); - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - *ap_name = strdup(recv_msg->ap_name); if (*ap_name == NULL) { - bmp_release(_ap_instance->fds, cfd); irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -254,21 +278,46 @@ int flow_accept(int fd, if (ae_name != NULL) { *ae_name = strdup(recv_msg->ae_name); if (*ae_name == NULL) { - bmp_release(_ap_instance->fds, cfd); irm_msg__free_unpacked(recv_msg, NULL); return -1; } } + rw_lock_wrlock(&_ap_instance->data_lock); + + cfd = bmp_allocate(_ap_instance->fds); + + rw_lock_unlock(&_ap_instance->data_lock); + + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[cfd].rb == NULL) { + rw_lock_wrlock(&_ap_instance->data_lock); + + bmp_release(_ap_instance->fds, cfd); + + rw_lock_unlock(&_ap_instance->data_lock); + + irm_msg__free_unpacked(recv_msg, NULL); + + rw_lock_unlock(&_ap_instance->flows_lock); + return -1; + } + _ap_instance->flows[cfd].port_id = recv_msg->port_id; _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT; - + rw_lock_unlock(&_ap_instance->flows_lock); irm_msg__free_unpacked(recv_msg, NULL); + rw_lock_wrlock(&_ap_instance->data_lock); + bmp_release(_ap_instance->fds, fd); + rw_lock_unlock(&_ap_instance->data_lock); + return cfd; } @@ -281,9 +330,21 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_pid = true; + + rw_lock_rdlock(&_ap_instance->data_lock); + msg.pid = _ap_instance->api->id; + + rw_lock_unlock(&_ap_instance->data_lock); + msg.has_port_id = true; + + rw_lock_rdlock(&_ap_instance->flows_lock); + msg.port_id = _ap_instance->flows[fd].port_id; + + rw_lock_unlock(&_ap_instance->flows_lock); + msg.has_response = true; msg.response = response; @@ -318,10 +379,15 @@ int flow_alloc(char * dst_name, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = dst_name; - msg.ap_name = _ap_instance->api->name; + msg.ae_name = src_ae_name; msg.has_pid = true; + + rw_lock_rdlock(&_ap_instance->data_lock); + msg.pid = _ap_instance->api->id; - msg.ae_name = src_ae_name; + msg.ap_name = _ap_instance->api->name; + + rw_lock_unlock(&_ap_instance->data_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) @@ -332,11 +398,23 @@ int flow_alloc(char * dst_name, return -1; } + rw_lock_wrlock(&_ap_instance->data_lock); + fd = bmp_allocate(_ap_instance->fds); + rw_lock_unlock(&_ap_instance->data_lock); + + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); if (_ap_instance->flows[fd].rb == NULL) { + rw_lock_wrlock(&_ap_instance->data_lock); + bmp_release(_ap_instance->fds, fd); + + rw_lock_unlock(&_ap_instance->data_lock); + + rw_lock_unlock(&_ap_instance->flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -344,6 +422,8 @@ int flow_alloc(char * dst_name, _ap_instance->flows[fd].port_id = recv_msg->port_id; _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + rw_lock_unlock(&_ap_instance->flows_lock); + irm_msg__free_unpacked(recv_msg, NULL); return fd; @@ -357,8 +437,13 @@ int flow_alloc_res(int fd) msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; msg.has_port_id = true; + + rw_lock_rdlock(&_ap_instance->flows_lock); + msg.port_id = _ap_instance->flows[fd].port_id; + rw_lock_unlock(&_ap_instance->flows_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -382,8 +467,14 @@ int flow_dealloc(int fd) msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; msg.has_port_id = true; + + + rw_lock_rdlock(&_ap_instance->data_lock); + msg.port_id = _ap_instance->flows[fd].port_id; + rw_lock_unlock(&_ap_instance->data_lock); + recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -401,42 +492,77 @@ int flow_dealloc(int fd) int flow_cntl(int fd, int cmd, int oflags) { - int old = _ap_instance->flows[fd].oflags; + int old; + + rw_lock_wrlock(&_ap_instance->flows_lock); + + old = _ap_instance->flows[fd].oflags; + switch (cmd) { case FLOW_F_GETFL: /* GET FLOW FLAGS */ - return _ap_instance->flows[fd].oflags; + rw_lock_unlock(&_ap_instance->flows_lock); + return old; case FLOW_F_SETFL: /* SET FLOW FLAGS */ _ap_instance->flows[fd].oflags = oflags; + rw_lock_unlock(&_ap_instance->flows_lock); return old; default: + rw_lock_unlock(&_ap_instance->flows_lock); return FLOW_O_INVALID; /* unknown command */ } } ssize_t flow_write(int fd, void * buf, size_t count) { - size_t index = shm_create_du_buff(_ap_instance->dum, - count + DU_BUFF_HEADSPACE + - DU_BUFF_TAILSPACE, - DU_BUFF_HEADSPACE, - (uint8_t *) buf, - count); - struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; - if (index == -1) + size_t index; + struct rb_entry e; + + if (buf == NULL) + return 0; + + rw_lock_rdlock(&_ap_instance->data_lock); + + index = shm_create_du_buff(_ap_instance->dum, + count + DU_BUFF_HEADSPACE + + DU_BUFF_TAILSPACE, + DU_BUFF_HEADSPACE, + (uint8_t *) buf, + count); + if (index == -1) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } + + rw_lock_rdlock(&_ap_instance->flows_lock); + + e.index = index; + e.port_id = _ap_instance->flows[fd].port_id; if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { shm_release_du_buff(_ap_instance->dum, index); + + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + return -EPIPE; } + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + return 0; } else { while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) - ; + LOG_DBGF("Couldn't write to rbuff."); } + rw_lock_unlock(&_ap_instance->data_lock); + + rw_lock_unlock(&_ap_instance->flows_lock); + return 0; } @@ -446,27 +572,41 @@ ssize_t flow_read(int fd, void * buf, size_t count) int n; uint8_t * sdu; + rw_lock_rdlock(&_ap_instance->data_lock); + + rw_lock_rdlock(&_ap_instance->flows_lock); + if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { e = shm_ap_rbuff_read(_ap_instance->rb); } else { - /* FIXME: move this to a thread */ + + /* FIXME: this will throw away packets for other fd's */ while (e == NULL || - e->port_id != _ap_instance->flows[fd].port_id) + e->port_id != _ap_instance->flows[fd].port_id) { e = shm_ap_rbuff_read(_ap_instance->rb); + } } - if (e == NULL) + rw_lock_unlock(&_ap_instance->flows_lock); + + if (e == NULL) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } n = shm_du_map_read_sdu(&sdu, _ap_instance->dum, e->index); - if (n < 0) + if (n < 0) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } memcpy(buf, sdu, MIN(n, count)); shm_release_du_buff(_ap_instance->dum, e->index); + rw_lock_unlock(&_ap_instance->data_lock); + return n; } diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6c04ccc5..da6f0e33 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -253,8 +253,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) } e = malloc(sizeof(*e)); - if (e == NULL) + if (e == NULL) { + pthread_mutex_unlock(rb->shm_mutex); return NULL; + } *e = *(rb->shm_base + *rb->ptr_tail); -- cgit v1.2.3