diff options
-rw-r--r-- | src/lib/dev.c | 175 |
1 files changed, 91 insertions, 84 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index da34f420..ef909e1a 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -74,20 +74,18 @@ #define SYMMKEYSZ 32 #define MSGBUFSZ 2048 -enum port_state { - PORT_NULL = 0, - PORT_INIT, - PORT_ID_PENDING, - PORT_ID_ASSIGNED, - PORT_DESTROY +enum flow_state { + FLOW_NULL = 0, + FLOW_INIT, + FLOW_ID_PENDING, + FLOW_ID_ASSIGNED, + FLOW_DESTROY }; -struct port { +/* map flow_ids to flow descriptors; track state of the flow */ +struct fmap { int fd; - - enum port_state state; - pthread_mutex_t state_lock; - pthread_cond_t state_cond; + enum flow_state state; }; #define frcti_to_flow(frcti) \ @@ -139,9 +137,12 @@ struct { struct bmp * fqueues; struct flow * flows; - struct port * ports; + struct fmap * id_to_fd; struct list_head flow_list; + pthread_mutex_t mtx; + pthread_cond_t cond; + pthread_t tx; pthread_t rx; size_t n_frcti; @@ -150,77 +151,81 @@ struct { pthread_rwlock_t lock; } ai; -static void port_destroy(struct port * p) +static void flow_destroy(struct fmap * p) { - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&ai.mtx); - if (p->state == PORT_DESTROY) { - pthread_mutex_unlock(&p->state_lock); + if (p->state == FLOW_DESTROY) { + pthread_mutex_unlock(&ai.mtx); return; } - if (p->state == PORT_ID_PENDING) - p->state = PORT_DESTROY; + if (p->state == FLOW_ID_PENDING) + p->state = FLOW_DESTROY; else - p->state = PORT_NULL; + p->state = FLOW_NULL; - pthread_cond_signal(&p->state_cond); + pthread_cond_signal(&ai.cond); - while (p->state != PORT_NULL) - pthread_cond_wait(&p->state_cond, &p->state_lock); + pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx); + + while (p->state != FLOW_NULL) + pthread_cond_wait(&ai.cond, &ai.mtx); p->fd = -1; - p->state = PORT_INIT; + p->state = FLOW_INIT; - pthread_mutex_unlock(&p->state_lock); + pthread_cleanup_pop(true); } -static void port_set_state(struct port * p, - enum port_state state) +static void flow_set_state(struct fmap * p, + enum flow_state state) { - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&ai.mtx); - if (p->state == PORT_DESTROY) { - pthread_mutex_unlock(&p->state_lock); + if (p->state == FLOW_DESTROY) { + pthread_mutex_unlock(&ai.mtx); return; } p->state = state; - pthread_cond_broadcast(&p->state_cond); + pthread_cond_broadcast(&ai.cond); - pthread_mutex_unlock(&p->state_lock); + pthread_mutex_unlock(&ai.mtx); } -static enum port_state port_wait_assign(int flow_id) +static enum flow_state flow_wait_assign(int flow_id) { - enum port_state state; - struct port * p; + enum flow_state state; + struct fmap * p; - p = &ai.ports[flow_id]; + p = &ai.id_to_fd[flow_id]; - pthread_mutex_lock(&p->state_lock); + pthread_mutex_lock(&ai.mtx); - if (p->state == PORT_ID_ASSIGNED) { - pthread_mutex_unlock(&p->state_lock); - return PORT_ID_ASSIGNED; + if (p->state == FLOW_ID_ASSIGNED) { + pthread_mutex_unlock(&ai.mtx); + return FLOW_ID_ASSIGNED; } - if (p->state == PORT_INIT) - p->state = PORT_ID_PENDING; + if (p->state == FLOW_INIT) + p->state = FLOW_ID_PENDING; - while (p->state == PORT_ID_PENDING) - pthread_cond_wait(&p->state_cond, &p->state_lock); + pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx); - if (p->state == PORT_DESTROY) { - p->state = PORT_NULL; - pthread_cond_broadcast(&p->state_cond); + while (p->state == FLOW_ID_PENDING) + pthread_cond_wait(&ai.cond, &ai.mtx); + + if (p->state == FLOW_DESTROY) { + p->state = FLOW_NULL; + pthread_cond_broadcast(&ai.cond); } state = p->state; - assert(state != PORT_INIT); + pthread_cleanup_pop(true); - pthread_mutex_unlock(&p->state_lock); + assert(state != FLOW_INIT); return state; } @@ -403,7 +408,7 @@ static void flow_fini(int fd) } if (ai.flows[fd].flow_id != -1) { - port_destroy(&ai.ports[ai.flows[fd].flow_id]); + flow_destroy(&ai.id_to_fd[ai.flows[fd].flow_id]); bmp_release(ai.fds, fd); } @@ -500,9 +505,9 @@ static int flow_init(int flow_id, list_add_tail(&flow->next, &ai.flow_list); - ai.ports[flow_id].fd = fd; + ai.id_to_fd[flow_id].fd = fd; - port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED); + flow_set_state(&ai.id_to_fd[flow_id], FLOW_ID_ASSIGNED); pthread_rwlock_unlock(&ai.lock); @@ -592,30 +597,34 @@ static void init(int argc, } ai.flows = malloc(sizeof(*ai.flows) * PROG_MAX_FLOWS); - if (ai.flows == NULL) + if (ai.flows == NULL) { + fprintf(stderr, "FATAL: Could not allocate flows."); goto fail_flows; + } for (i = 0; i < PROG_MAX_FLOWS; ++i) flow_clear(i); - ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS); - if (ai.ports == NULL) - goto fail_ports; + ai.id_to_fd = malloc(sizeof(*ai.id_to_fd) * SYS_MAX_FLOWS); + if (ai.id_to_fd == NULL) { + fprintf(stderr, "FATAL: Could not allocate id_to_fd."); + goto fail_id_to_fd; + } + + for (i = 0; i < SYS_MAX_FLOWS; ++i) + ai.id_to_fd[i].state = FLOW_INIT; - for (i = 0; i < SYS_MAX_FLOWS; ++i) { - ai.ports[i].state = PORT_INIT; - if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) { - fprintf(stderr, "FATAL: Could not init lock %d.", i); - goto fail_flow_lock; - } - if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) { - pthread_mutex_destroy(&ai.ports[i].state_lock); - fprintf(stderr, "FATAL: Could not init cond %d.", i); - goto fail_flow_lock; - } - } /* Do not reuse i after this ! */ + if (pthread_mutex_init(&ai.mtx, NULL)) { + fprintf(stderr, "FATAL: Could not init mutex."); + goto fail_mtx; + } - if (pthread_rwlock_init(&ai.lock, NULL)) { + if (pthread_cond_init(&ai.cond, NULL) < 0) { + fprintf(stderr, "FATAL: Could not init condvar."); + goto fail_cond; + } + + if (pthread_rwlock_init(&ai.lock, NULL) < 0) { fprintf(stderr, "FATAL: Could not initialize flow lock"); goto fail_flow_lock; } @@ -668,12 +677,12 @@ static void init(int argc, fail_fqset: pthread_rwlock_destroy(&ai.lock); fail_flow_lock: - while (i-- > 0) { - pthread_mutex_destroy(&ai.ports[i].state_lock); - pthread_cond_destroy(&ai.ports[i].state_cond); - } - free(ai.ports); - fail_ports: + pthread_cond_destroy(&ai.cond); + fail_cond: + pthread_mutex_destroy(&ai.mtx); + fail_mtx: + free(ai.id_to_fd); + fail_id_to_fd: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); @@ -709,10 +718,8 @@ static void fini(void) } } - for (i = 0; i < SYS_MAX_FLOWS; ++i) { - pthread_mutex_destroy(&ai.ports[i].state_lock); - pthread_cond_destroy(&ai.ports[i].state_cond); - } + pthread_cond_destroy(&ai.cond); + pthread_mutex_destroy(&ai.mtx); pthread_rwlock_unlock(&ai.lock); @@ -729,7 +736,7 @@ static void fini(void) pthread_rwlock_destroy(&ai.lock); free(ai.flows); - free(ai.ports); + free(ai.id_to_fd); shm_rdrbuff_close(ai.rdrb); @@ -1701,7 +1708,7 @@ static int fqueue_filter(struct fqueue * fq) pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[fq->fqueue[fq->next].flow_id].fd; + fd = ai.id_to_fd[fq->fqueue[fq->next].flow_id].fd; if (fd < 0) { ++fq->next; pthread_rwlock_unlock(&ai.lock); @@ -1762,7 +1769,7 @@ int fqueue_next(struct fqueue * fq) e = fq->fqueue + fq->next; - fd = ai.ports[e->flow_id].fd; + fd = ai.id_to_fd[e->flow_id].fd; ++fq->next; @@ -1842,7 +1849,7 @@ int np1_flow_dealloc(int flow_id, pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[flow_id].fd; + fd = ai.id_to_fd[flow_id].fd; pthread_rwlock_unlock(&ai.lock); @@ -1853,12 +1860,12 @@ int np1_flow_resp(int flow_id) { int fd; - if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED) + if (flow_wait_assign(flow_id) != FLOW_ID_ASSIGNED) return -1; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[flow_id].fd; + fd = ai.id_to_fd[flow_id].fd; pthread_rwlock_unlock(&ai.lock); |