diff options
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/lib/cdap.c | 1 | ||||
-rw-r--r-- | src/lib/dev.c | 845 | ||||
-rw-r--r-- | src/lib/ipcp.c | 514 | ||||
-rw-r--r-- | src/lib/irm.c | 2 | ||||
-rw-r--r-- | src/lib/irmd_messages.proto | 3 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 50 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 46 | ||||
-rw-r--r-- | src/lib/sockets.c | 6 |
9 files changed, 678 insertions, 790 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 14e7051a..b94d0eea 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -30,7 +30,6 @@ set(SOURCE_FILES bitmap.c cdap.c dev.c - ipcp.c irm.c list.c lockfile.c diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 8b1b3bc6..92a05221 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -24,6 +24,7 @@ #include <ouroboros/cdap.h> #include <ouroboros/bitmap.h> #include <ouroboros/dev.h> +#include <ouroboros/fcntl.h> #include <stdlib.h> #include <pthread.h> diff --git a/src/lib/dev.c b/src/lib/dev.c index 391563da..178ee287 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -24,6 +24,7 @@ #include <ouroboros/errno.h> #include <ouroboros/dev.h> #include <ouroboros/sockets.h> +#include <ouroboros/fcntl.h> #include <ouroboros/bitmap.h> #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> @@ -41,6 +42,87 @@ struct flow_set { pthread_rwlock_t lock; }; +enum port_state { + PORT_NULL = 0, + PORT_ID_PENDING, + PORT_ID_ASSIGNED, + PORT_DESTROY +}; + +struct port { + int fd; + + enum port_state state; + pthread_mutex_t state_lock; + pthread_cond_t state_cond; +}; + +static void port_destroy(struct port * p) +{ + pthread_mutex_lock(&p->state_lock); + + if (p->state == PORT_DESTROY) { + pthread_mutex_unlock(&p->state_lock); + return; + } + + if (p->state == PORT_ID_PENDING) + p->state = PORT_DESTROY; + else + p->state = PORT_NULL; + + pthread_cond_signal(&p->state_cond); + + while (p->state != PORT_NULL) + pthread_cond_wait(&p->state_cond, &p->state_lock); + + p->fd = -1; + p->state = PORT_ID_PENDING; + + pthread_mutex_unlock(&p->state_lock); +} + +static void port_set_state(struct port * p, enum port_state state) +{ + pthread_mutex_lock(&p->state_lock); + + if (p->state == PORT_DESTROY) { + pthread_mutex_unlock(&p->state_lock); + return; + } + + p->state = state; + pthread_cond_broadcast(&p->state_cond); + + pthread_mutex_unlock(&p->state_lock); +} + +enum port_state port_wait_assign(struct port * p) +{ + enum port_state state; + + pthread_mutex_lock(&p->state_lock); + + if (p->state != PORT_ID_PENDING) { + pthread_mutex_unlock(&p->state_lock); + return -1; + } + + while (!(p->state == PORT_ID_ASSIGNED || p->state == PORT_DESTROY)) + pthread_cond_wait(&p->state_cond, &p->state_lock); + + if (p->state == PORT_DESTROY) { + p->state = PORT_NULL; + pthread_cond_broadcast(&p->state_cond); + } + + state = p->state; + + pthread_mutex_unlock(&p->state_lock); + + return state; +} + struct flow { struct shm_ap_rbuff * rb; int port_id; @@ -48,24 +130,24 @@ struct flow { pid_t api; - struct timespec * timeout; + struct timespec timeout; }; -struct ap_instance { +struct { char * ap_name; char * daf_name; pid_t api; struct shm_rdrbuff * rdrb; - struct bmp * fds; struct shm_ap_rbuff * rb; pthread_rwlock_t data_lock; - struct flow flows[AP_MAX_FLOWS]; - int ports[AP_MAX_FLOWS]; + struct bmp * fds; + struct flow * flows; + struct port * ports; pthread_rwlock_t flows_lock; -} * ai; +} ai; static int api_announce(char * ap_name) { @@ -76,12 +158,12 @@ static int api_announce(char * ap_name) msg.code = IRM_MSG_CODE__IRM_API_ANNOUNCE; msg.has_api = true; - pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai.data_lock); - msg.api = ai->api; + msg.api = ai.api; msg.ap_name = ap_name; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { @@ -104,47 +186,61 @@ int ap_init(char * ap_name) ap_name = path_strip(ap_name); - ai = malloc(sizeof(*ai)); - if (ai == NULL) { - return -ENOMEM; - } - - ai->api = getpid(); - ai->ap_name = ap_name; - ai->daf_name = NULL; + ai.api = getpid(); + ai.ap_name = ap_name; + ai.daf_name = NULL; - ai->fds = bmp_create(AP_MAX_FLOWS, 0); - if (ai->fds == NULL) { - free(ai); + ai.fds = bmp_create(AP_MAX_FLOWS, 0); + if (ai.fds == NULL) return -ENOMEM; + + ai.rdrb = shm_rdrbuff_open(); + if (ai.rdrb == NULL) { + bmp_destroy(ai.fds); + return -1; } - ai->rdrb = shm_rdrbuff_open(); - if (ai->rdrb == NULL) { - bmp_destroy(ai->fds); - free(ai); + ai.rb = shm_ap_rbuff_create(); + if (ai.rb == NULL) { + shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); return -1; } - ai->rb = shm_ap_rbuff_create_s(); - if (ai->rb == NULL) { - shm_rdrbuff_close(ai->rdrb); - bmp_destroy(ai->fds); - free(ai); + ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS); + if (ai.flows == NULL) { + shm_ap_rbuff_destroy(ai.rb); + shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); return -1; } for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai->flows[i].rb = NULL; - ai->flows[i].port_id = -1; - ai->flows[i].oflags = 0; - ai->flows[i].api = -1; - ai->flows[i].timeout = NULL; - ai->ports[i] = -1; + ai.flows[i].rb = NULL; + ai.flows[i].port_id = -1; + ai.flows[i].oflags = 0; + ai.flows[i].api = -1; + ai.flows[i].timeout.tv_sec = 0; + ai.flows[i].timeout.tv_nsec = 0; } - pthread_rwlock_init(&ai->flows_lock, NULL); - pthread_rwlock_init(&ai->data_lock, NULL); + ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); + if (ai.flows == NULL) { + free(ai.flows); + shm_ap_rbuff_destroy(ai.rb); + shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); + return -1; + } + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) { + ai.ports[i].state = PORT_ID_PENDING; + pthread_mutex_init(&ai.ports[i].state_lock, NULL); + pthread_cond_init(&ai.ports[i].state_cond, NULL); + } + + pthread_rwlock_init(&ai.flows_lock, NULL); + pthread_rwlock_init(&ai.data_lock, NULL); if (ap_name != NULL) return api_announce(ap_name); @@ -152,46 +248,49 @@ int ap_init(char * ap_name) return 0; } -void ap_fini(void) +void ap_fini() { int i = 0; - if (ai == NULL) - return; - - pthread_rwlock_wrlock(&ai->data_lock); + pthread_rwlock_wrlock(&ai.data_lock); /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0) - shm_rdrbuff_remove(ai->rdrb, i); + while ((i = shm_ap_rbuff_peek_idx(ai.rb)) >= 0) + shm_rdrbuff_remove(ai.rdrb, i); - if (ai->fds != NULL) - bmp_destroy(ai->fds); - if (ai->rb != NULL) - shm_ap_rbuff_destroy(ai->rb); - if (ai->rdrb != NULL) - shm_rdrbuff_close(ai->rdrb); + if (ai.fds != NULL) + bmp_destroy(ai.fds); + if (ai.rb != NULL) + shm_ap_rbuff_destroy(ai.rb); + if (ai.rdrb != NULL) + shm_rdrbuff_close(ai.rdrb); - if (ai->daf_name != NULL) - free(ai->daf_name); + if (ai.daf_name != NULL) + free(ai.daf_name); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (ai->flows[i].rb != NULL) - shm_ap_rbuff_close(ai->flows[i].rb); - ai->ports[ai->flows[i].port_id] = -1; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (ai.flows[i].rb != NULL) + shm_ap_rbuff_close(ai.flows[i].rb); + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) { + ai.ports[i].state = PORT_NULL; + pthread_mutex_destroy(&ai.ports[i].state_lock); + pthread_cond_destroy(&ai.ports[i].state_cond); } - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + free(ai.flows); + free(ai.ports); - pthread_rwlock_destroy(&ai->flows_lock); - pthread_rwlock_destroy(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); - free(ai); + pthread_rwlock_destroy(&ai.flows_lock); + pthread_rwlock_destroy(&ai.data_lock); } + int flow_accept(char ** ae_name) { irm_msg_t msg = IRM_MSG__INIT; @@ -201,11 +300,11 @@ int flow_accept(char ** ae_name) msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; - pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai.data_lock); - msg.api = ai->api; + msg.api = ai.api; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg_b(&msg); if (recv_msg == NULL) @@ -216,22 +315,22 @@ int flow_accept(char ** ae_name) return -1; } - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_wrlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); - fd = bmp_allocate(ai->fds); - if (!bmp_is_id_valid(ai->fds, fd)) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); - if (ai->flows[fd].rb == NULL) { - bmp_release(ai->fds, fd); - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); + if (ai.flows[fd].rb == NULL) { + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -239,31 +338,31 @@ int flow_accept(char ** ae_name) if (ae_name != NULL) { *ae_name = strdup(recv_msg->ae_name); if (*ae_name == NULL) { - shm_ap_rbuff_close(ai->flows[fd].rb); - bmp_release(ai->fds, fd); - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + shm_ap_rbuff_close(ai.flows[fd].rb); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -ENOMEM; } } - ai->flows[fd].port_id = recv_msg->port_id; - ai->flows[fd].oflags = FLOW_O_DEFAULT; - ai->flows[fd].api = recv_msg->api; + ai.flows[fd].port_id = recv_msg->port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = recv_msg->api; - ai->ports[recv_msg->port_id] = fd; + ai.ports[recv_msg->port_id].fd = fd; + ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return fd; } -int flow_alloc_resp(int fd, - int response) +int flow_alloc_resp(int fd, int response) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -274,49 +373,47 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_api = true; - msg.api = ai->api; + msg.api = ai.api; msg.has_port_id = true; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - msg.port_id = ai->flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; - pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai.flows_lock); msg.has_response = true; msg.response = response; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return -1; } if (!recv_msg->has_result) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } ret = recv_msg->result; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return ret; } -int flow_alloc(char * dst_name, - char * src_ae_name, - struct qos_spec * qos) +int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -333,11 +430,11 @@ int flow_alloc(char * dst_name, msg.ae_name = src_ae_name; msg.has_api = true; - pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai.data_lock); - msg.api = ai->api; + msg.api = ai.api; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { @@ -349,34 +446,35 @@ int flow_alloc(char * dst_name, return -1; } - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_wrlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); - fd = bmp_allocate(ai->fds); - if (!bmp_is_id_valid(ai->fds, fd)) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); - if (ai->flows[fd].rb == NULL) { - bmp_release(ai->fds, fd); - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); + if (ai.flows[fd].rb == NULL) { + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - ai->flows[fd].port_id = recv_msg->port_id; - ai->flows[fd].oflags = FLOW_O_DEFAULT; - ai->flows[fd].api = recv_msg->api; + ai.flows[fd].port_id = recv_msg->port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = recv_msg->api; - ai->ports[recv_msg->port_id] = fd; + ai.ports[recv_msg->port_id].fd = fd; + ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -395,19 +493,19 @@ int flow_alloc_res(int fd) msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; msg.has_port_id = true; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - msg.port_id = ai->flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg_b(&msg); if (recv_msg == NULL) { @@ -437,43 +535,43 @@ int flow_dealloc(int fd) msg.has_api = true; msg.api = getpid(); - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_wrlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - msg.port_id = ai->flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; - ai->ports[msg.port_id] = -1; + port_destroy(&ai.ports[msg.port_id]); - ai->flows[fd].port_id = -1; - shm_ap_rbuff_close(ai->flows[fd].rb); - ai->flows[fd].rb = NULL; - ai->flows[fd].api = -1; + ai.flows[fd].port_id = -1; + shm_ap_rbuff_close(ai.flows[fd].rb); + ai.flows[fd].rb = NULL; + ai.flows[fd].api = -1; - bmp_release(ai->fds, fd); + bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai.flows_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return -1; } if (!recv_msg->has_result) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } ret = recv_msg->result; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -487,30 +585,30 @@ int flow_cntl(int fd, int cmd, int oflags) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_wrlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - old = ai->flows[fd].oflags; + old = ai.flows[fd].oflags; switch (cmd) { case FLOW_F_GETFL: /* GET FLOW FLAGS */ - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return old; case FLOW_F_SETFL: /* SET FLOW FLAGS */ - ai->flows[fd].oflags = oflags; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + ai.flows[fd].oflags = oflags; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return old; default: - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return FLOW_O_INVALID; /* unknown command */ } } @@ -526,62 +624,62 @@ ssize_t flow_write(int fd, void * buf, size_t count) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_rdrbuff_write(ai->rdrb, - ai->flows[fd].api, + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_rdrbuff_write(ai.rdrb, + ai.flows[fd].api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, - (uint8_t *) buf, + buf, count); if (idx == -1) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -EAGAIN; } e.index = idx; - e.port_id = ai->flows[fd].port_id; + e.port_id = ai.flows[fd].port_id; - if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) { - shm_rdrbuff_remove(ai->rdrb, idx); - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -1; } } else { /* blocking */ - struct shm_rdrbuff * rdrb = ai->rdrb; - pid_t api = ai->flows[fd].api; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + struct shm_rdrbuff * rdrb = ai.rdrb; + pid_t api = ai.flows[fd].api; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); idx = shm_rdrbuff_write_b(rdrb, - api, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - (uint8_t *) buf, - count); + api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + buf, + count); - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); e.index = idx; - e.port_id = ai->flows[fd].port_id; + e.port_id = ai.flows[fd].port_id; - while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) + while (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) ; } - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return 0; } @@ -595,47 +693,44 @@ ssize_t flow_read(int fd, void * buf, size_t count) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_ap_rbuff_read_port(ai->rb, - ai->flows[fd].port_id); - pthread_rwlock_unlock(&ai->flows_lock); + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id); + pthread_rwlock_unlock(&ai.flows_lock); } else { - struct shm_ap_rbuff * rb = ai->rb; - int port_id = ai->flows[fd].port_id; - struct timespec * timeout = ai->flows[fd].timeout; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); - - idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); - - pthread_rwlock_rdlock(&ai->data_lock); + struct shm_ap_rbuff * rb = ai.rb; + int port_id = ai.flows[fd].port_id; + struct timespec timeout = ai.flows[fd].timeout; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + idx = shm_ap_rbuff_read_port_b(rb, port_id, &timeout); + pthread_rwlock_rdlock(&ai.data_lock); } if (idx < 0) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return -EAGAIN; } - n = shm_rdrbuff_read(&sdu, ai->rdrb, idx); + n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); if (n < 0) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return -1; } memcpy(buf, sdu, MIN(n, count)); - shm_rdrbuff_remove(ai->rdrb, idx); + shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return n; } @@ -671,7 +766,7 @@ void flow_set_zero(struct flow_set * set) void flow_set_add(struct flow_set * set, int fd) { pthread_rwlock_wrlock(&set->lock); - set->b[ai->flows[fd].port_id] = true; + set->b[ai.flows[fd].port_id] = true; set->dirty = true; pthread_rwlock_unlock(&set->lock); } @@ -679,7 +774,7 @@ void flow_set_add(struct flow_set * set, int fd) void flow_set_del(struct flow_set * set, int fd) { pthread_rwlock_wrlock(&set->lock); - set->b[ai->flows[fd].port_id] = false; + set->b[ai.flows[fd].port_id] = false; set->dirty = true; pthread_rwlock_unlock(&set->lock); } @@ -688,7 +783,7 @@ bool flow_set_has(struct flow_set * set, int fd) { bool ret; pthread_rwlock_rdlock(&set->lock); - ret = set->b[ai->flows[fd].port_id]; + ret = set->b[ai.flows[fd].port_id]; pthread_rwlock_unlock(&set->lock); return ret; } @@ -712,12 +807,324 @@ int flow_select(struct flow_set * set, const struct timespec * timeout) { int port_id; if (set == NULL) { - port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout); + port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout); } else { flow_set_cpy(set); - port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout); + port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout); } if (port_id < 0) return port_id; - return ai->ports[port_id]; + return ai.ports[port_id].fd; +} + +/* ipcp-dev functions */ + +int np1_flow_alloc(pid_t n_api, int port_id) +{ + int fd; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].rb = shm_ap_rbuff_open(n_api); + if (ai.flows[fd].rb == NULL) { + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].port_id = port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = n_api; + + ai.ports[port_id].fd = fd; + port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int np1_flow_dealloc(int port_id) +{ + int fd; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + fd = ai.ports[port_id].fd; + if (fd < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return fd; + } + + ai.flows[fd].port_id = -1; + shm_ap_rbuff_close(ai.flows[fd].rb); + ai.flows[fd].rb = NULL; + ai.flows[fd].api = -1; + + bmp_release(ai.fds, fd); + + port_destroy(&ai.ports[port_id]); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + + +int np1_flow_resp(pid_t n_api, int port_id) +{ + int fd; + struct shm_ap_rbuff * rb; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + port_wait_assign(&ai.ports[port_id]); + + fd = ai.ports[port_id].fd; + if (fd < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return fd; + } + + rb = shm_ap_rbuff_open(n_api); + if (rb == NULL) { + ai.flows[fd].port_id = -1; + port_destroy(&ai.ports[port_id]); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].rb = rb; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int ipcp_create_r(pid_t api) +{ + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int ret = -1; + + msg.code = IRM_MSG_CODE__IPCP_CREATE_R; + msg.has_api = true; + msg.api = api; + + recv_msg = send_recv_irm_msg(&msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + irm_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) +{ + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int port_id = -1; + int fd = -1; + + if (dst_name == NULL || src_ae_name == NULL) + return -EINVAL; + + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_api = true; + msg.api = api; + msg.dst_name = dst_name; + msg.ae_name = src_ae_name; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; /* -ENOMOREFDS */ + } + + ai.flows[fd].rb = NULL; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + recv_msg = send_recv_irm_msg(&msg); + if (recv_msg == NULL) + return -1; + + if (!recv_msg->has_port_id) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + port_id = recv_msg->port_id; + irm_msg__free_unpacked(recv_msg, NULL); + if (port_id < 0) + return -1; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + ai.flows[fd].port_id = port_id; + ai.flows[fd].rb = NULL; + + ai.ports[port_id].fd = fd; + port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int ipcp_flow_alloc_reply(int fd, int response) +{ + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int ret = -1; + + msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; + msg.has_port_id = true; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + msg.port_id = ai.flows[fd].port_id; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + msg.has_response = true; + msg.response = response; + + recv_msg = send_recv_irm_msg(&msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + irm_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_flow_read(struct shm_du_buff ** sdb) +{ + int fd; + struct rb_entry * e; + + e = shm_ap_rbuff_read(ai.rb); + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + fd = ai.ports[e->port_id].fd; + + *sdb = shm_rdrbuff_get(ai.rdrb, e->index); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int ipcp_flow_write(int fd, struct shm_du_buff * sdb) +{ + struct rb_entry e; + + if (sdb == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } + + e.index = shm_du_buff_get_idx(sdb); + e.port_id = ai.flows[fd].port_id; + + shm_ap_rbuff_write(ai.flows[fd].rb, &e); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; +} + +int local_flow_read(struct rb_entry * e) +{ + int fd; + + *e = *(shm_ap_rbuff_read(ai.rb)); + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + fd = ai.ports[e->port_id].fd; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int local_flow_write(int fd, struct rb_entry * e) +{ + if (e == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } + + e->port_id = ai.flows[fd].port_id; + + shm_ap_rbuff_write(ai.flows[fd].rb, e); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; +} + +void ipcp_flow_del(struct shm_du_buff * sdb) +{ + shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c deleted file mode 100644 index 01741121..00000000 --- a/src/lib/ipcp.c +++ /dev/null @@ -1,514 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * The API to instruct IPCPs - * - * Sander Vrijders <sander.vrijders@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#define OUROBOROS_PREFIX "lib-ipcp" - -#include <ouroboros/config.h> -#include <ouroboros/errno.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/common.h> -#include <ouroboros/logs.h> -#include <ouroboros/utils.h> -#include <ouroboros/sockets.h> - -#include <stdlib.h> -#include <string.h> -#include <signal.h> -#include <stdbool.h> -#include <pthread.h> -#include <sys/types.h> -#include <sys/wait.h> -#include <sys/socket.h> -#include <sys/time.h> - -static void close_ptr(void * o) -{ - close(*((int *) o)); -} - -static ipcp_msg_t * send_recv_ipcp_msg(pid_t api, - ipcp_msg_t * msg) -{ - int sockfd = 0; - buffer_t buf; - char * sock_path = NULL; - ssize_t count = 0; - ipcp_msg_t * recv_msg = NULL; - - struct timeval tv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; - - sock_path = ipcp_sock_path(api); - if (sock_path == NULL) - return NULL; - - sockfd = client_socket_open(sock_path); - if (sockfd < 0) { - free(sock_path); - return NULL; - } - - if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, - (void *) &tv, sizeof(tv))) - LOG_WARN("Failed to set timeout on socket."); - - free(sock_path); - - buf.len = ipcp_msg__get_packed_size(msg); - if (buf.len == 0) { - close(sockfd); - return NULL; - } - - buf.data = malloc(IPCP_MSG_BUF_SIZE); - if (buf.data == NULL) { - close(sockfd); - return NULL; - } - - pthread_cleanup_push(close_ptr, (void *) &sockfd); - pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data); - - ipcp_msg__pack(msg, buf.data); - - if (write(sockfd, buf.data, buf.len) != -1) - count = read(sockfd, buf.data, IPCP_MSG_BUF_SIZE); - - if (count > 0) - recv_msg = ipcp_msg__unpack(NULL, count, buf.data); - - pthread_cleanup_pop(true); - pthread_cleanup_pop(true); - - return recv_msg; -} - -pid_t ipcp_create(enum ipcp_type ipcp_type) -{ - pid_t api = -1; - char irmd_api[10]; - size_t len = 0; - char * ipcp_dir = "/sbin/"; - char * full_name = NULL; - char * exec_name = NULL; - char * log_file = NULL; - - sprintf(irmd_api, "%u", getpid()); - - api = fork(); - if (api == -1) { - LOG_ERR("Failed to fork"); - return api; - } - - if (api != 0) { - return api; - } - - if (ipcp_type == IPCP_NORMAL) - exec_name = IPCP_NORMAL_EXEC; - else if (ipcp_type == IPCP_SHIM_UDP) - exec_name = IPCP_SHIM_UDP_EXEC; - else if (ipcp_type == IPCP_SHIM_ETH_LLC) - exec_name = IPCP_SHIM_ETH_LLC_EXEC; - else if (ipcp_type == IPCP_LOCAL) - exec_name = IPCP_LOCAL_EXEC; - else - exit(EXIT_FAILURE); - - len += strlen(INSTALL_PREFIX); - len += strlen(ipcp_dir); - len += strlen(exec_name); - len += 1; - - full_name = malloc(len + 1); - if (full_name == NULL) { - LOG_ERR("Failed to malloc"); - exit(EXIT_FAILURE); - } - - strcpy(full_name, INSTALL_PREFIX); - strcat(full_name, ipcp_dir); - strcat(full_name, exec_name); - full_name[len] = '\0'; - - if (logfile != NULL) { - log_file = malloc(20); - if (log_file == NULL) { - LOG_ERR("Failed to malloc."); - exit(EXIT_FAILURE); - } - sprintf(log_file, "ipcpd-%u.log", getpid()); - } - - /* log_file to be placed at the end */ - char * argv[] = {full_name, - irmd_api, - log_file, - 0}; - - char * envp[] = {0}; - - execve(argv[0], &argv[0], envp); - - LOG_DBG("%s", strerror(errno)); - LOG_ERR("Failed to load IPCP daemon"); - LOG_ERR("Make sure to run the installed version"); - free(full_name); - exit(EXIT_FAILURE); -} - -int ipcp_create_r(pid_t api) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.has_api = true; - msg.api = api; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int ipcp_destroy(pid_t api) -{ - int status; - - if (kill(api, SIGTERM)) { - LOG_ERR("Failed to destroy IPCP"); - return -1; - } - - if (waitpid(api, &status, 0) < 0) { - LOG_ERR("Failed to destroy IPCP"); - return -1; - } - - return 0; -} - -int ipcp_bootstrap(pid_t api, - dif_config_msg_t * conf) -{ - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; - - if (conf == NULL) - return -EINVAL; - - msg.code = IPCP_MSG_CODE__IPCP_BOOTSTRAP; - msg.conf = conf; - - recv_msg = send_recv_ipcp_msg(api, &msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - ipcp_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int ipcp_enroll(pid_t api, - char * dif_name) -{ - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; - - if (dif_name == NULL) - return -EINVAL; - - msg.code = IPCP_MSG_CODE__IPCP_ENROLL; - msg.dif_name = dif_name; - - recv_msg = send_recv_ipcp_msg(api, &msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - ipcp_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int ipcp_name_reg(pid_t api, - char * name) -{ - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; - - if (name == NULL) - return -1; - - msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; - msg.name = name; - - recv_msg = send_recv_ipcp_msg(api, &msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - ipcp_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int ipcp_name_unreg(pid_t api, - char * name) -{ - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IPCP_MSG_CODE__IPCP_NAME_UNREG; - msg.name = name; - - recv_msg = send_recv_ipcp_msg(api, &msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - ipcp_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int ipcp_flow_alloc(pid_t api, - int port_id, - pid_t n_api, - char * dst_name, - char * src_ae_name, - enum qos_cube qos) -{ - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; - - if (dst_name == NULL || src_ae_name == NULL) - return -EINVAL; - - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; - msg.has_port_id = true; - msg.port_id = port_id; - msg.has_api = true; - msg.api = n_api; - msg.src_ae_name = src_ae_name; - msg.dst_name = dst_name; - msg.has_qos_cube = true; - msg.qos_cube = qos; - - recv_msg = send_recv_ipcp_msg(api, &msg); - if (recv_msg == NULL) - return -1; - - if (!recv_msg->has_result) { - ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - ipcp_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int ipcp_flow_alloc_resp(pid_t api, - int port_id, - pid_t n_api, - int response) -{ - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; - msg.has_port_id = true; - msg.port_id = port_id; - msg.has_api = true; - msg.api = n_api; - msg.has_response = true; - msg.response = response; - - recv_msg = send_recv_ipcp_msg(api, &msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - ipcp_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int ipcp_flow_req_arr(pid_t api, - char * dst_name, - char * src_ae_name) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int port_id = -1; - - if (dst_name == NULL || src_ae_name == NULL) - return -EINVAL; - - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_api = true; - msg.api = api; - msg.dst_name = dst_name; - msg.ae_name = src_ae_name; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -1; - - if (!recv_msg->has_port_id) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - port_id = recv_msg->port_id; - irm_msg__free_unpacked(recv_msg, NULL); - - return port_id; -} - -int ipcp_flow_alloc_reply(pid_t api, - int port_id, - int response) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.port_id = port_id; - msg.has_port_id = true; - msg.response = response; - msg.has_response = true; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - - -int ipcp_flow_dealloc(pid_t api, - int port_id) -{ - - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_port_id = true; - msg.port_id = port_id; - - recv_msg = send_recv_ipcp_msg(api, &msg); - if (recv_msg == NULL) - return 0; - - if (recv_msg->has_result == false) { - ipcp_msg__free_unpacked(recv_msg, NULL); - return 0; - } - - ret = recv_msg->result; - ipcp_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - -int irm_flow_dealloc(int port_id) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_port_id = true; - msg.port_id = port_id; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return 0; - - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return 0; - } - - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; -} diff --git a/src/lib/irm.c b/src/lib/irm.c index fce11ba5..c4c6395b 100644 --- a/src/lib/irm.c +++ b/src/lib/irm.c @@ -25,7 +25,7 @@ #include <ouroboros/config.h> #include <ouroboros/errno.h> #include <ouroboros/irm.h> -#include <ouroboros/common.h> +#include <ouroboros/utils.h> #include <ouroboros/logs.h> #include <ouroboros/sockets.h> diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 7a634201..61c27d01 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -43,8 +43,7 @@ enum irm_msg_code { IRM_FLOW_DEALLOC = 18; IPCP_FLOW_REQ_ARR = 19; IPCP_FLOW_ALLOC_REPLY = 20; - IPCP_FLOW_DEALLOC = 21; - IRM_REPLY = 22; + IRM_REPLY = 21; }; message irm_msg { diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index d9e332fe..184a1bf2 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -21,14 +21,14 @@ */ #include <ouroboros/config.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h> #include <ouroboros/errno.h> #define OUROBOROS_PREFIX "shm_ap_rbuff" #include <ouroboros/logs.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/lockfile.h> -#include <ouroboros/time_utils.h> #include <pthread.h> #include <sys/mman.h> @@ -41,8 +41,6 @@ #include <sys/stat.h> #define FN_MAX_CHARS 255 -#define NORTH false -#define SOUTH true #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ @@ -63,11 +61,10 @@ struct shm_ap_rbuff { pthread_cond_t * add; /* SDU arrived */ pthread_cond_t * del; /* SDU removed */ pid_t api; /* api to which this rb belongs */ - bool dir; /* direction, false = N */ int fd; }; -static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) +struct shm_ap_rbuff * shm_ap_rbuff_create() { struct shm_ap_rbuff * rb; int shm_fd; @@ -77,10 +74,7 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) char fn[FN_MAX_CHARS]; mode_t mask; - if (dir == SOUTH) - sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid()); - else - sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid()); + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -157,22 +151,18 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) rb->fd = shm_fd; rb->api = getpid(); - rb->dir = dir; return rb; } -static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir) +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) { struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; char fn[FN_MAX_CHARS]; - if (dir == SOUTH) - sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api); - else - sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api); + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -215,31 +205,10 @@ static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir) rb->fd = shm_fd; rb->api = api; - rb->dir = dir; return rb; } -struct shm_ap_rbuff * shm_ap_rbuff_create_n() -{ - return shm_ap_rbuff_create(NORTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_create_s() -{ - return shm_ap_rbuff_create(SOUTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api) -{ - return shm_ap_rbuff_open(api, NORTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api) -{ - return shm_ap_rbuff_open(api, SOUTH); -} - void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) { if (rb == NULL) { @@ -285,10 +254,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); - if (rb->dir == SOUTH) - sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api); - else - sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api); + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index bf5c7f16..fb58a4d6 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -24,7 +24,6 @@ #include <ouroboros/config.h> #include <ouroboros/errno.h> #include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/time_utils.h> #include <pthread.h> @@ -35,6 +34,7 @@ #include <string.h> #include <signal.h> #include <sys/stat.h> +#include <stdbool.h> #define OUROBOROS_PREFIX "shm_rdrbuff" @@ -76,6 +76,7 @@ struct shm_du_buff { size_t du_head; size_t du_tail; pid_t dst_api; + size_t idx; }; struct shm_rdrbuff { @@ -458,7 +459,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, #endif int sz = size + sizeof *sdb; uint8_t * write_pos; - ssize_t idx = -1; if (rdrb == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -505,6 +505,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb->dst_api = -1; sdb->du_head = 0; sdb->du_tail = 0; + sdb->idx = *rdrb->ptr_head; *rdrb->ptr_head = 0; } @@ -521,7 +522,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, memcpy(write_pos, data, len); - idx = *rdrb->ptr_head; + sdb->idx = *rdrb->ptr_head; #ifdef SHM_RDRB_MULTI_BLOCK *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); #else @@ -529,7 +530,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, #endif pthread_mutex_unlock(rdrb->lock); - return idx; + return sdb->idx; } ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, @@ -547,7 +548,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, #endif int sz = size + sizeof *sdb; uint8_t * write_pos; - ssize_t idx = -1; if (rdrb == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -596,6 +596,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb->dst_api = -1; sdb->du_head = 0; sdb->du_tail = 0; + sdb->idx = *rdrb->ptr_head; *rdrb->ptr_head = 0; } @@ -612,7 +613,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, memcpy(write_pos, data, len); - idx = *rdrb->ptr_head; + sdb->idx = *rdrb->ptr_head; #ifdef SHM_RDRB_MULTI_BLOCK *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); #else @@ -620,7 +621,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, #endif pthread_cleanup_pop(true); - return idx; + return sdb->idx; } int shm_rdrbuff_read(uint8_t ** dst, @@ -654,6 +655,32 @@ int shm_rdrbuff_read(uint8_t ** dst, return len; } +struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, ssize_t idx) +{ + struct shm_du_buff * sdb; + + if (idx > SHM_BUFFER_SIZE) + return NULL; +#ifdef __APPLE__ + pthread_mutex_lock(rdrb->lock); +#else + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } +#endif + if (shm_rdrb_empty(rdrb)) { + pthread_mutex_unlock(rdrb->lock); + return NULL; + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + pthread_mutex_unlock(rdrb->lock); + + return sdb; +} + int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) { if (idx > SHM_BUFFER_SIZE) @@ -688,6 +715,11 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) return 0; } +size_t shm_du_buff_get_idx(struct shm_du_buff * sdb) +{ + return sdb->idx; +} + uint8_t * shm_du_buff_head(struct shm_du_buff * sdb) { if (sdb == NULL) diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 751c61b2..408e79e7 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -25,7 +25,6 @@ #include <ouroboros/config.h> #include <ouroboros/errno.h> #include <ouroboros/logs.h> -#include <ouroboros/common.h> #include <ouroboros/sockets.h> #include <ouroboros/utils.h> @@ -102,13 +101,12 @@ int server_socket_open(char * file_name) return sockfd; } -void close_ptr(void * o) +static void close_ptr(void * o) { close(*(int *) o); } -static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, - bool timed) +static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed) { int sockfd; buffer_t buf; |