diff options
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/dev.c | 489 | ||||
-rw-r--r-- | src/lib/lockfile.c | 9 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 148 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 18 |
4 files changed, 432 insertions, 232 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 17c473ed..391563da 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -28,9 +28,18 @@ #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/utils.h> +#include <ouroboros/select.h> #include <stdlib.h> #include <string.h> +#include <stdio.h> + +struct flow_set { + bool dirty; + bool b[IRMD_MAX_FLOWS]; /* working copy */ + bool s[IRMD_MAX_FLOWS]; /* safe copy */ + pthread_rwlock_t lock; +}; struct flow { struct shm_ap_rbuff * rb; @@ -42,17 +51,21 @@ struct flow { struct timespec * timeout; }; -struct ap_data { +struct ap_instance { char * ap_name; + char * daf_name; pid_t api; + struct shm_rdrbuff * rdrb; struct bmp * fds; struct shm_ap_rbuff * rb; pthread_rwlock_t data_lock; struct flow flows[AP_MAX_FLOWS]; + int ports[AP_MAX_FLOWS]; + pthread_rwlock_t flows_lock; -} * _ap_instance; +} * ai; static int api_announce(char * ap_name) { @@ -63,12 +76,12 @@ static int api_announce(char * ap_name) msg.code = IRM_MSG_CODE__IRM_API_ANNOUNCE; msg.has_api = true; - pthread_rwlock_rdlock(&_ap_instance->data_lock); + pthread_rwlock_rdlock(&ai->data_lock); - msg.api = _ap_instance->api; + msg.api = ai->api; msg.ap_name = ap_name; - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { @@ -91,45 +104,47 @@ int ap_init(char * ap_name) ap_name = path_strip(ap_name); - _ap_instance = malloc(sizeof(struct ap_data)); - if (_ap_instance == NULL) { + ai = malloc(sizeof(*ai)); + if (ai == NULL) { return -ENOMEM; } - _ap_instance->api = getpid(); - _ap_instance->ap_name = ap_name; + ai->api = getpid(); + ai->ap_name = ap_name; + ai->daf_name = NULL; - _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); - if (_ap_instance->fds == NULL) { - free(_ap_instance); + ai->fds = bmp_create(AP_MAX_FLOWS, 0); + if (ai->fds == NULL) { + free(ai); return -ENOMEM; } - _ap_instance->rdrb = shm_rdrbuff_open(); - if (_ap_instance->rdrb == NULL) { - bmp_destroy(_ap_instance->fds); - free(_ap_instance); + ai->rdrb = shm_rdrbuff_open(); + if (ai->rdrb == NULL) { + bmp_destroy(ai->fds); + free(ai); return -1; } - _ap_instance->rb = shm_ap_rbuff_create(); - if (_ap_instance->rb == NULL) { - shm_rdrbuff_close(_ap_instance->rdrb); - bmp_destroy(_ap_instance->fds); - free(_ap_instance); + ai->rb = shm_ap_rbuff_create_s(); + if (ai->rb == NULL) { + shm_rdrbuff_close(ai->rdrb); + bmp_destroy(ai->fds); + free(ai); return -1; } for (i = 0; i < AP_MAX_FLOWS; ++i) { - _ap_instance->flows[i].rb = NULL; - _ap_instance->flows[i].port_id = -1; - _ap_instance->flows[i].oflags = 0; - _ap_instance->flows[i].api = -1; - _ap_instance->flows[i].timeout = NULL; + ai->flows[i].rb = NULL; + ai->flows[i].port_id = -1; + ai->flows[i].oflags = 0; + ai->flows[i].api = -1; + ai->flows[i].timeout = NULL; + ai->ports[i] = -1; } - pthread_rwlock_init(&_ap_instance->flows_lock, NULL); - pthread_rwlock_init(&_ap_instance->data_lock, NULL); + pthread_rwlock_init(&ai->flows_lock, NULL); + pthread_rwlock_init(&ai->data_lock, NULL); if (ap_name != NULL) return api_announce(ap_name); @@ -141,60 +156,56 @@ void ap_fini(void) { int i = 0; - if (_ap_instance == NULL) + if (ai == NULL) return; - pthread_rwlock_wrlock(&_ap_instance->data_lock); + pthread_rwlock_wrlock(&ai->data_lock); /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) - shm_rdrbuff_remove(_ap_instance->rdrb, i); + while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0) + shm_rdrbuff_remove(ai->rdrb, i); - if (_ap_instance->fds != NULL) - bmp_destroy(_ap_instance->fds); - if (_ap_instance->rb != NULL) - shm_ap_rbuff_destroy(_ap_instance->rb); - if (_ap_instance->rdrb != NULL) - shm_rdrbuff_close(_ap_instance->rdrb); + if (ai->fds != NULL) + bmp_destroy(ai->fds); + if (ai->rb != NULL) + shm_ap_rbuff_destroy(ai->rb); + if (ai->rdrb != NULL) + shm_rdrbuff_close(ai->rdrb); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + if (ai->daf_name != NULL) + free(ai->daf_name); - for (i = 0; i < AP_MAX_FLOWS; ++i) - if (_ap_instance->flows[i].rb != NULL) - shm_ap_rbuff_close(_ap_instance->flows[i].rb); + pthread_rwlock_rdlock(&ai->flows_lock); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + for (i = 0; i < AP_MAX_FLOWS; ++i) { + if (ai->flows[i].rb != NULL) + shm_ap_rbuff_close(ai->flows[i].rb); + ai->ports[ai->flows[i].port_id] = -1; + } - pthread_rwlock_destroy(&_ap_instance->flows_lock); - pthread_rwlock_destroy(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); - free(_ap_instance); -} + pthread_rwlock_destroy(&ai->flows_lock); + pthread_rwlock_destroy(&ai->data_lock); -static int port_id_to_fd(int port_id) -{ - int i; - for (i = 0; i < AP_MAX_FLOWS; ++i) - if (_ap_instance->flows[i].port_id == port_id) - return i; - return -1; + free(ai); } int flow_accept(char ** ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int cfd = -1; + int fd = -1; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; - pthread_rwlock_rdlock(&_ap_instance->data_lock); + pthread_rwlock_rdlock(&ai->data_lock); - msg.api = _ap_instance->api; + msg.api = ai->api; - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); recv_msg = send_recv_irm_msg_b(&msg); if (recv_msg == NULL) @@ -205,22 +216,22 @@ int flow_accept(char ** ae_name) return -1; } - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_wrlock(&ai->flows_lock); - cfd = bmp_allocate(_ap_instance->fds); - if (!bmp_is_id_valid(_ap_instance->fds, cfd)) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + fd = bmp_allocate(ai->fds); + if (!bmp_is_id_valid(ai->fds, fd)) { + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->api); - if (_ap_instance->flows[cfd].rb == NULL) { - bmp_release(_ap_instance->fds, cfd); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); + if (ai->flows[fd].rb == NULL) { + bmp_release(ai->fds, fd); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -228,25 +239,27 @@ int flow_accept(char ** ae_name) if (ae_name != NULL) { *ae_name = strdup(recv_msg->ae_name); if (*ae_name == NULL) { - shm_ap_rbuff_close(_ap_instance->flows[cfd].rb); - bmp_release(_ap_instance->fds, cfd); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + shm_ap_rbuff_close(ai->flows[fd].rb); + bmp_release(ai->fds, fd); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -ENOMEM; } } - _ap_instance->flows[cfd].port_id = recv_msg->port_id; - _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT; - _ap_instance->flows[cfd].api = recv_msg->api; + ai->flows[fd].port_id = recv_msg->port_id; + ai->flows[fd].oflags = FLOW_O_DEFAULT; + ai->flows[fd].api = recv_msg->api; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + ai->ports[recv_msg->port_id] = fd; + + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); - return cfd; + return fd; } int flow_alloc_resp(int fd, @@ -261,40 +274,40 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_api = true; - msg.api = _ap_instance->api; + msg.api = ai->api; msg.has_port_id = true; - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai->flows_lock); - if (_ap_instance->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + if (ai->flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return -ENOTALLOC; } - msg.port_id = _ap_instance->flows[fd].port_id; + msg.port_id = ai->flows[fd].port_id; - pthread_rwlock_unlock(&_ap_instance->flows_lock); + pthread_rwlock_unlock(&ai->flows_lock); msg.has_response = true; msg.response = response; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); return -1; } if (!recv_msg->has_result) { - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } ret = recv_msg->result; - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -320,11 +333,11 @@ int flow_alloc(char * dst_name, msg.ae_name = src_ae_name; msg.has_api = true; - pthread_rwlock_rdlock(&_ap_instance->data_lock); + pthread_rwlock_rdlock(&ai->data_lock); - msg.api = _ap_instance->api; + msg.api = ai->api; - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { @@ -336,32 +349,34 @@ int flow_alloc(char * dst_name, return -1; } - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_wrlock(&ai->flows_lock); - fd = bmp_allocate(_ap_instance->fds); - if (!bmp_is_id_valid(_ap_instance->fds, fd)) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + fd = bmp_allocate(ai->fds); + if (!bmp_is_id_valid(ai->fds, fd)) { + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); - if (_ap_instance->flows[fd].rb == NULL) { - bmp_release(_ap_instance->fds, fd); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); + if (ai->flows[fd].rb == NULL) { + bmp_release(ai->fds, fd); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - _ap_instance->flows[fd].port_id = recv_msg->port_id; - _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; - _ap_instance->flows[fd].api = recv_msg->api; + ai->flows[fd].port_id = recv_msg->port_id; + ai->flows[fd].oflags = FLOW_O_DEFAULT; + ai->flows[fd].api = recv_msg->api; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + ai->ports[recv_msg->port_id] = fd; + + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -380,19 +395,19 @@ int flow_alloc_res(int fd) msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; msg.has_port_id = true; - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai->flows_lock); - if (_ap_instance->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + if (ai->flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return -ENOTALLOC; } - msg.port_id = _ap_instance->flows[fd].port_id; + msg.port_id = ai->flows[fd].port_id; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); recv_msg = send_recv_irm_msg_b(&msg); if (recv_msg == NULL) { @@ -422,41 +437,43 @@ int flow_dealloc(int fd) msg.has_api = true; msg.api = getpid(); - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_wrlock(&ai->flows_lock); - if (_ap_instance->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + if (ai->flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return -ENOTALLOC; } - msg.port_id = _ap_instance->flows[fd].port_id; + msg.port_id = ai->flows[fd].port_id; + + ai->ports[msg.port_id] = -1; - _ap_instance->flows[fd].port_id = -1; - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); - _ap_instance->flows[fd].rb = NULL; - _ap_instance->flows[fd].api = -1; + ai->flows[fd].port_id = -1; + shm_ap_rbuff_close(ai->flows[fd].rb); + ai->flows[fd].rb = NULL; + ai->flows[fd].api = -1; - bmp_release(_ap_instance->fds, fd); + bmp_release(ai->fds, fd); - pthread_rwlock_unlock(&_ap_instance->flows_lock); + pthread_rwlock_unlock(&ai->flows_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); return -1; } if (!recv_msg->has_result) { - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } ret = recv_msg->result; - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -470,30 +487,30 @@ int flow_cntl(int fd, int cmd, int oflags) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_wrlock(&ai->flows_lock); - if (_ap_instance->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + if (ai->flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return -ENOTALLOC; } - old = _ap_instance->flows[fd].oflags; + old = ai->flows[fd].oflags; switch (cmd) { case FLOW_F_GETFL: /* GET FLOW FLAGS */ - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return old; case FLOW_F_SETFL: /* SET FLOW FLAGS */ - _ap_instance->flows[fd].oflags = oflags; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + ai->flows[fd].oflags = oflags; + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return old; default: - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return FLOW_O_INVALID; /* unknown command */ } } @@ -509,42 +526,42 @@ ssize_t flow_write(int fd, void * buf, size_t count) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai->flows_lock); - if (_ap_instance->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + if (ai->flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return -ENOTALLOC; } - if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_rdrbuff_write(_ap_instance->rdrb, - _ap_instance->flows[fd].api, + if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_rdrbuff_write(ai->rdrb, + ai->flows[fd].api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, (uint8_t *) buf, count); if (idx == -1) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return -EAGAIN; } e.index = idx; - e.port_id = _ap_instance->flows[fd].port_id; + e.port_id = ai->flows[fd].port_id; - if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { - shm_rdrbuff_remove(_ap_instance->rdrb, idx); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) { + shm_rdrbuff_remove(ai->rdrb, idx); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return -1; } } else { /* blocking */ - struct shm_rdrbuff * rdrb = _ap_instance->rdrb; - pid_t api = _ap_instance->flows[fd].api; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + struct shm_rdrbuff * rdrb = ai->rdrb; + pid_t api = ai->flows[fd].api; + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); idx = shm_rdrbuff_write_b(rdrb, api, @@ -553,30 +570,22 @@ ssize_t flow_write(int fd, void * buf, size_t count) (uint8_t *) buf, count); - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai->flows_lock); e.index = idx; - e.port_id = _ap_instance->flows[fd].port_id; + e.port_id = ai->flows[fd].port_id; - while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) + while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) ; } - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return 0; } -int flow_select(const struct timespec * timeout) -{ - int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout); - if (port_id < 0) - return port_id; - return port_id_to_fd(port_id); -} - ssize_t flow_read(int fd, void * buf, size_t count) { int idx = -1; @@ -586,47 +595,129 @@ ssize_t flow_read(int fd, void * buf, size_t count) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&_ap_instance->data_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai->flows_lock); - if (_ap_instance->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + if (ai->flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); return -ENOTALLOC; } - if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_ap_rbuff_read_port(_ap_instance->rb, - _ap_instance->flows[fd].port_id); - pthread_rwlock_unlock(&_ap_instance->flows_lock); + if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_ap_rbuff_read_port(ai->rb, + ai->flows[fd].port_id); + pthread_rwlock_unlock(&ai->flows_lock); } else { - struct shm_ap_rbuff * rb = _ap_instance->rb; - int port_id = _ap_instance->flows[fd].port_id; - struct timespec * timeout = _ap_instance->flows[fd].timeout; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ap_instance->data_lock); + struct shm_ap_rbuff * rb = ai->rb; + int port_id = ai->flows[fd].port_id; + struct timespec * timeout = ai->flows[fd].timeout; + pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai->data_lock); idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); - pthread_rwlock_rdlock(&_ap_instance->data_lock); + pthread_rwlock_rdlock(&ai->data_lock); } if (idx < 0) { - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); return -EAGAIN; } - n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx); + n = shm_rdrbuff_read(&sdu, ai->rdrb, idx); if (n < 0) { - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); return -1; } memcpy(buf, sdu, MIN(n, count)); - shm_rdrbuff_remove(_ap_instance->rdrb, idx); + shm_rdrbuff_remove(ai->rdrb, idx); - pthread_rwlock_unlock(&_ap_instance->data_lock); + pthread_rwlock_unlock(&ai->data_lock); return n; } + +/* select functions */ + +struct flow_set * flow_set_create() +{ + struct flow_set * set = malloc(sizeof(*set)); + if (set == NULL) + return NULL; + + if (pthread_rwlock_init(&set->lock, NULL)) { + free(set); + return NULL; + } + + memset(&set->b, 0, sizeof(set->b)); + + set->dirty = true; + + return set; +} + +void flow_set_zero(struct flow_set * set) +{ + pthread_rwlock_wrlock(&set->lock); + memset(&set->b, 0, sizeof(set->b)); + set->dirty = true; + pthread_rwlock_unlock(&set->lock); +} + +void flow_set_add(struct flow_set * set, int fd) +{ + pthread_rwlock_wrlock(&set->lock); + set->b[ai->flows[fd].port_id] = true; + set->dirty = true; + pthread_rwlock_unlock(&set->lock); +} + +void flow_set_del(struct flow_set * set, int fd) +{ + pthread_rwlock_wrlock(&set->lock); + set->b[ai->flows[fd].port_id] = false; + set->dirty = true; + pthread_rwlock_unlock(&set->lock); +} + +bool flow_set_has(struct flow_set * set, int fd) +{ + bool ret; + pthread_rwlock_rdlock(&set->lock); + ret = set->b[ai->flows[fd].port_id]; + pthread_rwlock_unlock(&set->lock); + return ret; +} + +void flow_set_destroy(struct flow_set * set) +{ + pthread_rwlock_destroy(&set->lock); + free(set); +} + +static void flow_set_cpy(struct flow_set * set) +{ + pthread_rwlock_rdlock(&set->lock); + if (set->dirty) + memcpy(set->s, set->b, IRMD_MAX_FLOWS); + set->dirty = false; + pthread_rwlock_unlock(&set->lock); +} + +int flow_select(struct flow_set * set, const struct timespec * timeout) +{ + int port_id; + if (set == NULL) { + port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout); + } else { + flow_set_cpy(set); + port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout); + } + if (port_id < 0) + return port_id; + return ai->ports[port_id]; +} diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c index 75ee2085..81bed687 100644 --- a/src/lib/lockfile.c +++ b/src/lib/lockfile.c @@ -43,10 +43,13 @@ struct lockfile { }; struct lockfile * lockfile_create() { + mode_t mask; struct lockfile * lf = malloc(sizeof(*lf)); if (lf == NULL) return NULL; + mask = umask(0); + lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); if (lf->fd == -1) { LOG_DBGF("Could not create lock file."); @@ -54,11 +57,7 @@ struct lockfile * lockfile_create() { return NULL; } - if (fchmod(lf->fd, 0666)) { - LOG_DBGF("Failed to chmod lockfile."); - free(lf); - return NULL; - } + umask(mask); if (ftruncate(lf->fd, LF_SIZE - 1) < 0) { LOG_DBGF("Failed to extend lockfile."); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 77e288a8..473894d5 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -40,6 +40,10 @@ #include <signal.h> #include <sys/stat.h> +#define FN_MAX_CHARS 255 +#define NORTH false +#define SOUTH true + #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) @@ -59,19 +63,24 @@ struct shm_ap_rbuff { pthread_cond_t * add; /* SDU arrived */ pthread_cond_t * del; /* SDU removed */ pid_t api; /* api to which this rb belongs */ + bool dir; /* direction, false = N */ int fd; }; -struct shm_ap_rbuff * shm_ap_rbuff_create() +static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) { struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; pthread_mutexattr_t mattr; pthread_condattr_t cattr; - char fn[25]; + char fn[FN_MAX_CHARS]; + mode_t mask; - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); + if (dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid()); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid()); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -79,6 +88,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() return NULL; } + mask = umask(0); + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); if (shm_fd == -1) { LOG_DBG("Failed creating ring buffer."); @@ -86,11 +97,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() return NULL; } - if (fchmod(shm_fd, 0666)) { - LOG_DBG("Failed to chmod shared memory."); - free(rb); - return NULL; - } + umask(mask); if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { LOG_DBG("Failed to extend ringbuffer."); @@ -150,18 +157,22 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() rb->fd = shm_fd; rb->api = getpid(); + rb->dir = dir; return rb; } -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) +static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir) { struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; - char fn[25]; + char fn[FN_MAX_CHARS]; - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); + if (dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -204,9 +215,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) rb->fd = shm_fd; rb->api = api; + rb->dir = dir; return rb; } + +struct shm_ap_rbuff * shm_ap_rbuff_create_n() +{ + return shm_ap_rbuff_create(NORTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_create_s() +{ + return shm_ap_rbuff_create(SOUTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api) +{ + return shm_ap_rbuff_open(api, NORTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api) +{ + return shm_ap_rbuff_open(api, SOUTH); +} + void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) { if (rb == NULL) { @@ -252,7 +285,10 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); + if (rb->dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api); if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); @@ -311,15 +347,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb) return -1; } - ret = (rb->shm_base + *rb->ptr_tail)->index; + ret = tail_el_ptr(rb)->index; pthread_mutex_unlock(rb->lock); return ret; } -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - const struct timespec * timeout) +static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, + const struct timespec * timeout) { struct timespec abstime; int ret = 0; @@ -360,7 +396,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, } if (ret != ETIMEDOUT) - ret = (rb->shm_base + *rb->ptr_tail)->port_id; + ret = tail_el_ptr(rb)->port_id; + else + ret = -ETIMEDOUT; + + pthread_cleanup_pop(true); + + return ret; +} + +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, + bool * set, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret; + + if (set == NULL) + return shm_ap_rbuff_peek_b_all(rb, timeout); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) + && (ret != ETIMEDOUT)) { + while (shm_rbuff_empty(rb)) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->add, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->add, rb->lock); + +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) + break; + } + + while (!set[tail_el_ptr(rb)->port_id]) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->del, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->del, rb->lock); + +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) + break; + } + } + + if (ret != ETIMEDOUT) + ret = tail_el_ptr(rb)->port_id; else ret = -ETIMEDOUT; @@ -369,6 +480,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, return ret; } + struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) { struct rb_entry * e = NULL; @@ -434,8 +546,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) return idx; } -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, + int port_id, const struct timespec * timeout) { struct timespec abstime; diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index b0d295d9..7c4927fc 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -139,13 +139,13 @@ static char * rdrb_filename(enum qos_cube qos) ++chars; } while (qm > 0); - str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2); + str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1); if (str == NULL) { LOG_ERR("Failed to create shm_rdrbuff: Out of Memory."); return NULL; } - sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos); + sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos); return str; } @@ -154,6 +154,7 @@ static char * rdrb_filename(enum qos_cube qos) struct shm_rdrbuff * shm_rdrbuff_create() { struct shm_rdrbuff * rdrb; + mode_t mask; int shm_fd; uint8_t * shm_base; pthread_mutexattr_t mattr; @@ -171,6 +172,8 @@ struct shm_rdrbuff * shm_rdrbuff_create() return NULL; } + mask = umask(0); + shm_fd = shm_open(shm_rdrb_fn, O_CREAT | O_EXCL | O_RDWR, 0666); if (shm_fd == -1) { LOG_DBGF("Failed creating shared memory map."); @@ -179,12 +182,7 @@ struct shm_rdrbuff * shm_rdrbuff_create() return NULL; } - if (fchmod(shm_fd, 0666)) { - LOG_DBGF("Failed to chmod shared memory map."); - free(shm_rdrb_fn); - free(rdrb); - return NULL; - } + umask(mask); if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { LOG_DBGF("Failed to extend shared memory map."); @@ -469,7 +467,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, #ifndef SHM_RDRB_MULTI_BLOCK if (sz > SHM_RDRB_BLOCK_SIZE) { - LOG_DBGF("Multi-block SDU's disabled. Dropping."); + LOG_DBGF("Multi-block SDUs disabled. Dropping."); return -1; } #endif @@ -558,7 +556,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, #ifndef SHM_RDRB_MULTI_BLOCK if (sz > SHM_RDRB_BLOCK_SIZE) { - LOG_DBGF("Multi-block SDU's disabled. Dropping."); + LOG_DBGF("Multi-block SDUs disabled. Dropping."); return -1; } #endif |