diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 175 | 
1 files changed, 91 insertions, 84 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index da34f420..ef909e1a 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -74,20 +74,18 @@  #define SYMMKEYSZ 32  #define MSGBUFSZ  2048 -enum port_state { -        PORT_NULL = 0, -        PORT_INIT, -        PORT_ID_PENDING, -        PORT_ID_ASSIGNED, -        PORT_DESTROY +enum flow_state { +        FLOW_NULL = 0, +        FLOW_INIT, +        FLOW_ID_PENDING, +        FLOW_ID_ASSIGNED, +        FLOW_DESTROY  }; -struct port { +/* map flow_ids to flow descriptors; track state of the flow */ +struct fmap {          int             fd; - -        enum port_state state; -        pthread_mutex_t state_lock; -        pthread_cond_t  state_cond; +        enum flow_state state;  };  #define frcti_to_flow(frcti) \ @@ -139,9 +137,12 @@ struct {          struct bmp *          fqueues;          struct flow *         flows; -        struct port *         ports; +        struct fmap *         id_to_fd;          struct list_head      flow_list; +        pthread_mutex_t       mtx; +        pthread_cond_t        cond; +          pthread_t             tx;          pthread_t             rx;          size_t                n_frcti; @@ -150,77 +151,81 @@ struct {          pthread_rwlock_t      lock;  } ai; -static void port_destroy(struct port * p) +static void flow_destroy(struct fmap * p)  { -        pthread_mutex_lock(&p->state_lock); +        pthread_mutex_lock(&ai.mtx); -        if (p->state == PORT_DESTROY) { -                pthread_mutex_unlock(&p->state_lock); +        if (p->state == FLOW_DESTROY) { +                pthread_mutex_unlock(&ai.mtx);                  return;          } -        if (p->state == PORT_ID_PENDING) -                p->state = PORT_DESTROY; +        if (p->state == FLOW_ID_PENDING) +                p->state = FLOW_DESTROY;          else -                p->state = PORT_NULL; +                p->state = FLOW_NULL; -        pthread_cond_signal(&p->state_cond); +        pthread_cond_signal(&ai.cond); -        while (p->state != PORT_NULL) -                pthread_cond_wait(&p->state_cond, &p->state_lock); +        pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx); + +        while (p->state != FLOW_NULL) +                pthread_cond_wait(&ai.cond, &ai.mtx);          p->fd    = -1; -        p->state = PORT_INIT; +        p->state = FLOW_INIT; -        pthread_mutex_unlock(&p->state_lock); +        pthread_cleanup_pop(true);  } -static void port_set_state(struct port *   p, -                           enum port_state state) +static void flow_set_state(struct fmap *   p, +                           enum flow_state state)  { -        pthread_mutex_lock(&p->state_lock); +        pthread_mutex_lock(&ai.mtx); -        if (p->state == PORT_DESTROY) { -                pthread_mutex_unlock(&p->state_lock); +        if (p->state == FLOW_DESTROY) { +                pthread_mutex_unlock(&ai.mtx);                  return;          }          p->state = state; -        pthread_cond_broadcast(&p->state_cond); +        pthread_cond_broadcast(&ai.cond); -        pthread_mutex_unlock(&p->state_lock); +        pthread_mutex_unlock(&ai.mtx);  } -static enum port_state port_wait_assign(int flow_id) +static enum flow_state flow_wait_assign(int flow_id)  { -        enum port_state state; -        struct port *   p; +        enum flow_state state; +        struct fmap *   p; -        p = &ai.ports[flow_id]; +        p = &ai.id_to_fd[flow_id]; -        pthread_mutex_lock(&p->state_lock); +        pthread_mutex_lock(&ai.mtx); -        if (p->state == PORT_ID_ASSIGNED) { -                pthread_mutex_unlock(&p->state_lock); -                return PORT_ID_ASSIGNED; +        if (p->state == FLOW_ID_ASSIGNED) { +                pthread_mutex_unlock(&ai.mtx); +                return FLOW_ID_ASSIGNED;          } -        if (p->state == PORT_INIT) -                p->state = PORT_ID_PENDING; +        if (p->state == FLOW_INIT) +                p->state = FLOW_ID_PENDING; -        while (p->state == PORT_ID_PENDING) -                pthread_cond_wait(&p->state_cond, &p->state_lock); +        pthread_cleanup_push(__cleanup_mutex_unlock, &ai.mtx); -        if (p->state == PORT_DESTROY) { -                p->state = PORT_NULL; -                pthread_cond_broadcast(&p->state_cond); +        while (p->state == FLOW_ID_PENDING) +                pthread_cond_wait(&ai.cond, &ai.mtx); + +        if (p->state == FLOW_DESTROY) { +                p->state = FLOW_NULL; +                pthread_cond_broadcast(&ai.cond);          }          state = p->state; -        assert(state != PORT_INIT); +        pthread_cleanup_pop(true); -        pthread_mutex_unlock(&p->state_lock); +        assert(state != FLOW_INIT);          return state;  } @@ -403,7 +408,7 @@ static void flow_fini(int fd)          }          if (ai.flows[fd].flow_id != -1) { -                port_destroy(&ai.ports[ai.flows[fd].flow_id]); +                flow_destroy(&ai.id_to_fd[ai.flows[fd].flow_id]);                  bmp_release(ai.fds, fd);          } @@ -500,9 +505,9 @@ static int flow_init(int       flow_id,          list_add_tail(&flow->next, &ai.flow_list); -        ai.ports[flow_id].fd = fd; +        ai.id_to_fd[flow_id].fd = fd; -        port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED); +        flow_set_state(&ai.id_to_fd[flow_id], FLOW_ID_ASSIGNED);          pthread_rwlock_unlock(&ai.lock); @@ -592,30 +597,34 @@ static void init(int     argc,          }          ai.flows = malloc(sizeof(*ai.flows) * PROG_MAX_FLOWS); -        if (ai.flows == NULL) +        if (ai.flows == NULL) { +                fprintf(stderr, "FATAL: Could not allocate flows.");                  goto fail_flows; +        }          for (i = 0; i < PROG_MAX_FLOWS; ++i)                  flow_clear(i); -        ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS); -        if (ai.ports == NULL) -                goto fail_ports; +        ai.id_to_fd = malloc(sizeof(*ai.id_to_fd) * SYS_MAX_FLOWS); +        if (ai.id_to_fd == NULL) { +                fprintf(stderr, "FATAL: Could not allocate id_to_fd."); +                goto fail_id_to_fd; +        } + +        for (i = 0; i < SYS_MAX_FLOWS; ++i) +                ai.id_to_fd[i].state = FLOW_INIT; -        for (i = 0; i < SYS_MAX_FLOWS; ++i) { -                ai.ports[i].state = PORT_INIT; -                if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) { -                        fprintf(stderr, "FATAL: Could not init lock %d.", i); -                        goto fail_flow_lock; -                } -                if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) { -                        pthread_mutex_destroy(&ai.ports[i].state_lock); -                        fprintf(stderr, "FATAL: Could not init cond %d.", i); -                        goto fail_flow_lock; -                } -        } /* Do not reuse i after this ! */ +        if (pthread_mutex_init(&ai.mtx, NULL)) { +                fprintf(stderr, "FATAL: Could not init mutex."); +                goto fail_mtx; +        } -        if (pthread_rwlock_init(&ai.lock, NULL)) { +        if (pthread_cond_init(&ai.cond, NULL) < 0) { +                fprintf(stderr, "FATAL: Could not init condvar."); +                goto fail_cond; +        } + +        if (pthread_rwlock_init(&ai.lock, NULL) < 0) {                  fprintf(stderr, "FATAL: Could not initialize flow lock");                  goto fail_flow_lock;          } @@ -668,12 +677,12 @@ static void init(int     argc,   fail_fqset:          pthread_rwlock_destroy(&ai.lock);   fail_flow_lock: -        while (i-- > 0) { -                pthread_mutex_destroy(&ai.ports[i].state_lock); -                pthread_cond_destroy(&ai.ports[i].state_cond); -        } -        free(ai.ports); - fail_ports: +        pthread_cond_destroy(&ai.cond); + fail_cond: +        pthread_mutex_destroy(&ai.mtx); + fail_mtx: +        free(ai.id_to_fd); + fail_id_to_fd:          free(ai.flows);   fail_flows:          shm_rdrbuff_close(ai.rdrb); @@ -709,10 +718,8 @@ static void fini(void)                  }          } -        for (i = 0; i < SYS_MAX_FLOWS; ++i) { -                pthread_mutex_destroy(&ai.ports[i].state_lock); -                pthread_cond_destroy(&ai.ports[i].state_cond); -        } +        pthread_cond_destroy(&ai.cond); +        pthread_mutex_destroy(&ai.mtx);          pthread_rwlock_unlock(&ai.lock); @@ -729,7 +736,7 @@ static void fini(void)          pthread_rwlock_destroy(&ai.lock);          free(ai.flows); -        free(ai.ports); +        free(ai.id_to_fd);          shm_rdrbuff_close(ai.rdrb); @@ -1701,7 +1708,7 @@ static int fqueue_filter(struct fqueue * fq)                  pthread_rwlock_rdlock(&ai.lock); -                fd = ai.ports[fq->fqueue[fq->next].flow_id].fd; +                fd = ai.id_to_fd[fq->fqueue[fq->next].flow_id].fd;                  if (fd < 0) {                          ++fq->next;                          pthread_rwlock_unlock(&ai.lock); @@ -1762,7 +1769,7 @@ int fqueue_next(struct fqueue * fq)          e = fq->fqueue + fq->next; -        fd = ai.ports[e->flow_id].fd; +        fd = ai.id_to_fd[e->flow_id].fd;          ++fq->next; @@ -1842,7 +1849,7 @@ int np1_flow_dealloc(int    flow_id,          pthread_rwlock_rdlock(&ai.lock); -        fd = ai.ports[flow_id].fd; +        fd = ai.id_to_fd[flow_id].fd;          pthread_rwlock_unlock(&ai.lock); @@ -1853,12 +1860,12 @@ int np1_flow_resp(int flow_id)  {          int fd; -        if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED) +        if (flow_wait_assign(flow_id) != FLOW_ID_ASSIGNED)                  return -1;          pthread_rwlock_rdlock(&ai.lock); -        fd = ai.ports[flow_id].fd; +        fd = ai.id_to_fd[flow_id].fd;          pthread_rwlock_unlock(&ai.lock); | 
