diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/local/main.c | 78 | ||||
| -rw-r--r-- | src/irmd/api_table.c | 3 | ||||
| -rw-r--r-- | src/irmd/main.c | 69 | ||||
| -rw-r--r-- | src/irmd/registry.c | 61 | ||||
| -rw-r--r-- | src/irmd/registry.h | 1 | ||||
| -rw-r--r-- | src/lib/dev.c | 79 | 
6 files changed, 186 insertions, 105 deletions
| diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 718c8d7e..bb7f8325 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -85,7 +85,7 @@ static void * ipcp_local_sdu_loop(void * o)          (void) o; -        while (flow_event_wait(local_data.flows, local_data.fq, &timeout)) { +        while (true) {                  int fd;                  ssize_t idx; @@ -96,9 +96,14 @@ static void * ipcp_local_sdu_loop(void * o)                          return (void *) 1; /* -ENOTENROLLED */                  } -                pthread_rwlock_rdlock(&local_data.lock); +                pthread_rwlock_unlock(&ipcpi.state_lock); + +                flow_event_wait(local_data.flows, local_data.fq, &timeout);                  while ((fd = fqueue_next(local_data.fq)) >= 0) { +                        pthread_rwlock_rdlock(&ipcpi.state_lock); +                        pthread_rwlock_rdlock(&local_data.lock); +                          idx = local_flow_read(fd);                          assert(idx < (SHM_BUFFER_SIZE)); @@ -107,10 +112,11 @@ static void * ipcp_local_sdu_loop(void * o)                          if (fd != -1)                                  local_flow_write(fd, idx); + +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        pthread_rwlock_unlock(&local_data.lock);                  } -                pthread_rwlock_unlock(&local_data.lock); -                pthread_rwlock_unlock(&ipcpi.state_lock);          }          return (void *) 0; @@ -229,12 +235,6 @@ static int ipcp_local_flow_alloc(int       fd,          assert(dst_name); -        out_fd = ipcp_flow_req_arr(getpid(), dst_name, cube); -        if (out_fd < 0) { -                log_dbg("Flow allocation failed."); -                return -1; -        } -          pthread_rwlock_rdlock(&ipcpi.state_lock);          if (ipcp_get_state() != IPCP_OPERATIONAL) { @@ -243,16 +243,37 @@ static int ipcp_local_flow_alloc(int       fd,                  return -1; /* -ENOTENROLLED */          } +        /* +         * This function needs to return completely before +         * flow_resp. Taking the wrlock on the data is the simplest +         * way to achieve this. +         */ +          pthread_rwlock_wrlock(&local_data.lock); -        local_data.in_out[fd]  = out_fd; -        local_data.in_out[out_fd] = fd; +        out_fd = ipcp_flow_req_arr(getpid(), dst_name, cube); +        if (out_fd < 0) { +                pthread_rwlock_unlock(&ipcpi.state_lock); +                log_dbg("Flow allocation failed: %d", out_fd); +                return -1; +        } -        flow_set_add(local_data.flows, fd); +        /* +         * The idea of the port_wait_assign in dev.c was to do the +         * above synchronisation. But if the lock is not taken, the +         * resp() function may be called before a lock would be taken +         * here. This shim will be deprecated, but ideally the sync is +         * fixed in ipcp.c. +         */ + +        local_data.in_out[fd] = out_fd; +        local_data.in_out[out_fd] = fd;          pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock); +        flow_set_add(local_data.flows, fd); +          log_info("Pending local allocation request on fd %d.", fd);          return 0; @@ -264,24 +285,30 @@ static int ipcp_local_flow_alloc_resp(int fd,          int out_fd = -1;          int ret = -1; -        if (response) -                return 0; -          pthread_rwlock_rdlock(&ipcpi.state_lock); -        pthread_rwlock_rdlock(&local_data.lock); +        pthread_rwlock_wrlock(&local_data.lock); + +        if (response) { +                if (local_data.in_out[fd] != -1) +                        local_data.in_out[local_data.in_out[fd]] = fd; +                local_data.in_out[fd] = -1; +                pthread_rwlock_unlock(&local_data.lock); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                return 0; +        }          out_fd = local_data.in_out[fd]; -        if (out_fd < 0) { +        if (out_fd == -1) {                  pthread_rwlock_unlock(&local_data.lock);                  pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        flow_set_add(local_data.flows, fd); -          pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock); +        flow_set_add(local_data.flows, fd); +          if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0)                  return -1; @@ -297,24 +324,17 @@ static int ipcp_local_flow_dealloc(int fd)          ipcp_flow_fini(fd);          pthread_rwlock_rdlock(&ipcpi.state_lock); - -        if (ipcp_get_state() != IPCP_OPERATIONAL) { -                pthread_rwlock_unlock(&ipcpi.state_lock); -                log_dbg("Won't register with non-enrolled IPCP."); -                return -1; /* -ENOTENROLLED */ -        } -          pthread_rwlock_wrlock(&local_data.lock);          flow_set_del(local_data.flows, fd);          local_data.in_out[fd] = -1; -        flow_dealloc(fd); -          pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock); +        flow_dealloc(fd); +          log_info("Flow with fd %d deallocated.", fd);          return 0; diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c index 1c655004..5ff0fcf6 100644 --- a/src/irmd/api_table.c +++ b/src/irmd/api_table.c @@ -185,7 +185,8 @@ int api_entry_sleep(struct api_entry * e)                                                &dl);          if (e->state == API_DESTROY) { -                reg_entry_del_api(e->re, e->api); +                if (e->re != NULL) +                        reg_entry_del_api(e->re, e->api);                  ret = -1;          } diff --git a/src/irmd/main.c b/src/irmd/main.c index 19d27bf9..e6647285 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1063,16 +1063,21 @@ static struct irm_flow * flow_accept(pid_t api)          api_n1  = f->n_1_api;          port_id = f->port_id; -        log_info("Flow on port_id %d allocated.", f->port_id); -          pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_rdlock(&irmd->reg_lock);          e = api_table_get(&irmd->api_table, api);          if (e == NULL) {                  pthread_rwlock_unlock(&irmd->reg_lock); +                pthread_rwlock_wrlock(&irmd->flows_lock); +                list_del(&f->next); +                bmp_release(irmd->port_ids, f->port_id); +                pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); +                clear_irm_flow(f); +                irm_flow_set_state(f, FLOW_NULL); +                irm_flow_destroy(f);                  log_dbg("Process gone while accepting flow.");                  return NULL;          } @@ -1085,8 +1090,15 @@ static struct irm_flow * flow_accept(pid_t api)          if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {                  pthread_rwlock_unlock(&irmd->reg_lock); +                pthread_rwlock_wrlock(&irmd->flows_lock); +                list_del(&f->next); +                bmp_release(irmd->port_ids, f->port_id); +                pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock);                  ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); +                clear_irm_flow(f); +                irm_flow_set_state(f, FLOW_NULL); +                irm_flow_destroy(f);                  log_err("Entry in wrong state.");                  return NULL;          } @@ -1097,12 +1109,22 @@ static struct irm_flow * flow_accept(pid_t api)          pthread_rwlock_unlock(&irmd->state_lock);          if (ipcp_flow_alloc_resp(api_n1, port_id, api_n, 0)) { -                log_dbg("Failed to respond to alloc."); +                pthread_rwlock_rdlock(&irmd->state_lock); +                pthread_rwlock_wrlock(&irmd->flows_lock); +                list_del(&f->next); +                pthread_rwlock_unlock(&irmd->flows_lock); +                pthread_rwlock_unlock(&irmd->state_lock); +                log_dbg("Failed to respond to alloc. Port_id invalidated."); +                clear_irm_flow(f); +                irm_flow_set_state(f, FLOW_NULL); +                irm_flow_destroy(f);                  return NULL;          }          irm_flow_set_state(f, FLOW_ALLOCATED); +        log_info("Flow on port_id %d allocated.", f->port_id); +          return f;  } @@ -1157,17 +1179,9 @@ static struct irm_flow * flow_alloc(pid_t     api,          assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); -        if (ipcp_flow_alloc(ipcp, port_id, api, -                            dst_name, cube) < 0) { -                pthread_rwlock_rdlock(&irmd->state_lock); -                pthread_rwlock_wrlock(&irmd->flows_lock); -                list_del(&f->next); -                clear_irm_flow(f); -                bmp_release(irmd->port_ids, f->port_id); -                pthread_rwlock_unlock(&irmd->flows_lock); -                pthread_rwlock_unlock(&irmd->state_lock); -                irm_flow_set_state(f, FLOW_NULL); -                irm_flow_destroy(f); +        if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) { +                /* sanitizer cleans this */ +                log_info("Failed to respond to alloc.");                  return NULL;          } @@ -1351,7 +1365,6 @@ static struct irm_flow * flow_req_arr(pid_t     api,                  pthread_rwlock_rdlock(&irmd->state_lock);                  pthread_rwlock_wrlock(&irmd->reg_lock); -          case REG_NAME_FLOW_ACCEPT:                  h_api = reg_entry_get_api(re);                  if (h_api == -1) { @@ -1691,19 +1704,17 @@ void * irm_sanitize(void * o)                          if (irm_flow_get_state(f) == FLOW_ALLOC_PENDING                              && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { -                                list_del(&f->next);                                  log_dbg("Pending port_id %d timed out.",                                           f->port_id); -                                clear_irm_flow(f); +                                f->n_1_api = -1; +                                irm_flow_set_state(f, FLOW_DEALLOC_PENDING);                                  ipcp_flow_dealloc(f->n_1_api, f->port_id); -                                bmp_release(irmd->port_ids, f->port_id); -                                irm_flow_destroy(f);                                  continue;                          }                          if (kill(f->n_api, 0) < 0) {                                  struct shm_flow_set * set; -                                log_dbg("AP-I %d gone, flow %d deallocated.", +                                log_dbg("AP-I %d gone, deallocating flow %d.",                                           f->n_api, f->port_id);                                  set = shm_flow_set_open(f->n_api);                                  if (set != NULL) @@ -1711,22 +1722,18 @@ void * irm_sanitize(void * o)                                  f->n_api = -1;                                  irm_flow_set_state(f, FLOW_DEALLOC_PENDING);                                  ipcp_flow_dealloc(f->n_1_api, f->port_id); -                                clear_irm_flow(f);                                  continue;                          }                          if (kill(f->n_1_api, 0) < 0) {                                  struct shm_flow_set * set; -                                list_del(&f->next);                                  log_err("IPCP %d gone, flow %d removed.",                                          f->n_1_api, f->port_id);                                  set = shm_flow_set_open(f->n_api);                                  if (set != NULL)                                          shm_flow_set_destroy(set); - -                                clear_irm_flow(f); -                                bmp_release(irmd->port_ids, f->port_id); -                                irm_flow_destroy(f); +                                f->n_1_api = -1; +                                irm_flow_set_state(f, FLOW_DEALLOC_PENDING);                          }                  } @@ -2087,6 +2094,8 @@ static int irm_create(void)          if (irmd == NULL)                  return -ENOMEM; +        memset(irmd, 0, sizeof(*irmd)); +          memset(&st, 0, sizeof(st));          irmd->state = IRMD_NULL; @@ -2158,7 +2167,10 @@ static int irm_create(void)          if ((irmd->lf = lockfile_create()) == NULL) {                  if ((irmd->lf = lockfile_open()) == NULL) {                          log_err("Lockfile error."); -                        irm_destroy(); +                        free(irmd->threadpool); +                        bmp_destroy(irmd->thread_ids); +                        bmp_destroy(irmd->port_ids); +                        free(irmd);                          return -1;                  } @@ -2172,6 +2184,9 @@ static int irm_create(void)                          log_info("IRMd already running (%d), exiting.",                                   lockfile_owner(irmd->lf));                          lockfile_close(irmd->lf); +                        free(irmd->threadpool); +                        bmp_destroy(irmd->thread_ids); +                        bmp_destroy(irmd->port_ids);                          free(irmd);                          return -1;                  } diff --git a/src/irmd/registry.c b/src/irmd/registry.c index 2043ca46..53be77cd 100644 --- a/src/irmd/registry.c +++ b/src/irmd/registry.c @@ -37,6 +37,7 @@  #include <signal.h>  #include <unistd.h>  #include <limits.h> +#include <assert.h>  struct reg_dif {          struct list_head next; @@ -121,6 +122,12 @@ static void reg_entry_destroy(struct reg_entry * e)          if (e->name != NULL)                  free(e->name); +        list_for_each_safe(p, h, &e->reg_apis) { +                struct pid_el * pe = list_entry(p, struct pid_el, next); +                list_del(&pe->next); +                free(pe); +        } +          list_for_each_safe(p, h, &e->reg_apns) {                  struct str_el * a = list_entry(p, struct str_el, next);                  list_del(&a->next); @@ -224,9 +231,13 @@ int reg_entry_add_apn(struct reg_entry * e,          list_add(&n->next, &e->reg_apns); +        pthread_mutex_lock(&e->state_lock); +          if (e->state == REG_NAME_IDLE)                  e->state = REG_NAME_AUTO_ACCEPT; +        pthread_mutex_unlock(&e->state_lock); +          return 0;  } @@ -245,11 +256,14 @@ void reg_entry_del_apn(struct reg_entry * e,                  }          } +        pthread_mutex_lock(&e->state_lock); +          if (e->state == REG_NAME_AUTO_ACCEPT && list_is_empty(&e->reg_apns)) {                  e->state = REG_NAME_IDLE;                  pthread_cond_broadcast(&e->state_cond);          } +        pthread_mutex_unlock(&e->state_lock);  }  char * reg_entry_get_apn(struct reg_entry * e) @@ -279,27 +293,29 @@ int reg_entry_add_api(struct reg_entry * e,  {          struct pid_el * i; -        if (e == NULL) -                return -EINVAL; +        assert(e);          if (reg_entry_has_api(e, api)) {                  log_dbg("Instance already registered with this name.");                  return -EPERM;          } +        pthread_mutex_lock(&e->state_lock); +          if (e->state == REG_NAME_NULL) { +                pthread_mutex_unlock(&e->state_lock);                  log_dbg("Tried to add instance in NULL state.");                  return -EPERM;          }          i = malloc(sizeof(*i)); -        if (i == NULL) +        if (i == NULL) { +                pthread_mutex_unlock(&e->state_lock);                  return -ENOMEM; +        }          i->pid = api; -        pthread_mutex_lock(&e->state_lock); -          list_add(&i->next, &e->reg_apis);          if (e->state == REG_NAME_IDLE || @@ -316,6 +332,8 @@ int reg_entry_add_api(struct reg_entry * e,  static void reg_entry_check_state(struct reg_entry * e)  { +        assert(e); +          if (e->state == REG_NAME_DESTROY) {                  e->state = REG_NAME_NULL;                  pthread_cond_broadcast(&e->state_cond); @@ -337,6 +355,9 @@ static void reg_entry_check_state(struct reg_entry * e)  void reg_entry_del_pid_el(struct reg_entry * e,                            struct pid_el *    p)  { +        assert(e); +        assert(p); +          list_del(&p->next);          free(p); @@ -349,6 +370,8 @@ void reg_entry_del_api(struct reg_entry * e,          struct list_head * p;          struct list_head * h; +        assert(e); +          if (e == NULL)                  return; @@ -378,8 +401,7 @@ enum reg_name_state reg_entry_get_state(struct reg_entry * e)  {          enum reg_name_state state; -        if (e == NULL) -                return REG_NAME_NULL; +        assert(e);          pthread_mutex_lock(&e->state_lock); @@ -393,8 +415,7 @@ enum reg_name_state reg_entry_get_state(struct reg_entry * e)  int reg_entry_set_state(struct reg_entry *  e,                          enum reg_name_state state)  { -        if (state == REG_NAME_DESTROY) -                return -EPERM; +        assert(state != REG_NAME_DESTROY);          pthread_mutex_lock(&e->state_lock); @@ -413,8 +434,8 @@ int reg_entry_leave_state(struct reg_entry *  e,          struct timespec abstime;          int ret = 0; -        if (e == NULL || state == REG_NAME_DESTROY) -                return -EINVAL; +        assert(e); +        assert(state != REG_NAME_DESTROY);          if (timeout != NULL) {                  clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -450,8 +471,8 @@ int reg_entry_wait_state(struct reg_entry *  e,          struct timespec abstime;          int ret = 0; -        if (e == NULL || state == REG_NAME_DESTROY) -                return -EINVAL; +        assert(e); +        assert(state != REG_NAME_DESTROY);          if (timeout != NULL) {                  clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -487,6 +508,8 @@ struct reg_entry * registry_get_entry(struct list_head * registry,  {          struct list_head * p   = NULL; +        assert(registry); +          list_for_each(p, registry) {                  struct reg_entry * e = list_entry(p, struct reg_entry, next);                  if (!wildcard_match(name, e->name)) @@ -501,8 +524,8 @@ struct reg_entry * registry_add_name(struct list_head * registry,  {          struct reg_entry * e = NULL; -        if (name == NULL) -                return NULL; +        assert(registry); +        assert(name);          if (registry_has_name(registry, name)) {                  log_dbg("Name %s already registered.", name); @@ -545,12 +568,13 @@ void registry_del_api(struct list_head * registry,  {          struct list_head * p; -        if ( api == -1) -                return; +        assert(registry); +        assert(api > 0);          list_for_each(p, registry) {                  struct reg_entry * e = list_entry(p, struct reg_entry, next);                  pthread_mutex_lock(&e->state_lock); +                assert(e);                  reg_entry_del_api(e, api);                  pthread_mutex_unlock(&e->state_lock);          } @@ -586,8 +610,7 @@ void registry_destroy(struct list_head * registry)          struct list_head * p = NULL;          struct list_head * h = NULL; -        if (registry == NULL) -                return; +        assert(registry);          list_for_each_safe(p, h, registry) {                  struct reg_entry * e = list_entry(p, struct reg_entry, next); diff --git a/src/irmd/registry.h b/src/irmd/registry.h index 2c766732..08e78019 100644 --- a/src/irmd/registry.h +++ b/src/irmd/registry.h @@ -63,7 +63,6 @@ struct reg_entry {          struct list_head    reg_apis;          enum reg_name_state state; -        qoscube_t           qos;          pthread_cond_t      state_cond;          pthread_mutex_t     state_lock;  }; diff --git a/src/lib/dev.c b/src/lib/dev.c index 5acbada2..c063fd47 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -49,6 +49,7 @@ struct fqueue {  enum port_state {          PORT_NULL = 0, +        PORT_INIT,          PORT_ID_PENDING,          PORT_ID_ASSIGNED,          PORT_DESTROY @@ -82,7 +83,7 @@ static void port_destroy(struct port * p)                  pthread_cond_wait(&p->state_cond, &p->state_lock);          p->fd = -1; -        p->state = PORT_ID_PENDING; +        p->state = PORT_INIT;          pthread_mutex_unlock(&p->state_lock);  } @@ -109,11 +110,14 @@ static enum port_state port_wait_assign(struct port * p)          pthread_mutex_lock(&p->state_lock); -        if (p->state != PORT_ID_PENDING) { +        if (p->state == PORT_ID_ASSIGNED) {                  pthread_mutex_unlock(&p->state_lock); -                return -1; +                return PORT_ID_ASSIGNED;          } +        if(p->state == PORT_INIT) +                p->state = PORT_ID_PENDING; +          while (p->state == PORT_ID_PENDING)                  pthread_cond_wait(&p->state_cond, &p->state_lock); @@ -124,6 +128,8 @@ static enum port_state port_wait_assign(struct port * p)          state = p->state; +        assert(state != PORT_INIT); +          pthread_mutex_unlock(&p->state_lock);          return state; @@ -237,7 +243,6 @@ static void reset_flow(int fd)                  shm_flow_set_close(ai.flows[fd].set);          init_flow(fd); -  }  int ap_init(const char * ap_name) @@ -319,7 +324,7 @@ int ap_init(const char * ap_name)          }          for (i = 0; i < IRMD_MAX_FLOWS; ++i) { -                ai.ports[i].state = PORT_ID_PENDING; +                ai.ports[i].state = PORT_INIT;                  pthread_mutex_init(&ai.ports[i].state_lock, NULL);                  pthread_cond_init(&ai.ports[i].state_cond, NULL);          } @@ -354,8 +359,7 @@ void ap_fini()                          ssize_t idx;                          while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)                                  shm_rdrbuff_remove(ai.rdrb, idx); -                        port_set_state(&ai.ports[ai.flows[i].port_id], -                                       PORT_NULL); +                        port_destroy(&ai.ports[ai.flows[i].port_id]);                          reset_flow(i);                  }          } @@ -459,6 +463,8 @@ int flow_accept(qosspec_t *       qs,          ai.flows[fd].api     = recv_msg->api;          ai.flows[fd].cube    = recv_msg->qoscube; +        assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT); +          if (qs != NULL)                  fill_qosspec(qs, ai.flows[fd].cube); @@ -555,6 +561,8 @@ int flow_alloc(const char *      dst_name,          ai.flows[fd].api     = recv_msg->api;          ai.flows[fd].cube    = recv_msg->qoscube; +        assert(ai.ports[recv_msg->port_id].state == PORT_INIT); +          ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -600,6 +608,7 @@ int flow_dealloc(int fd)          if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL); +                assert(false);                  return -1;          } @@ -1113,8 +1122,8 @@ int np1_flow_alloc(pid_t n_api,          ai.flows[fd].oflags  = FLOW_O_DEFAULT;          ai.flows[fd].api     = n_api; -        ai.ports[port_id].fd = fd; -        port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); +        ai.ports[port_id].fd    = fd; +        ai.ports[port_id].state = PORT_ID_ASSIGNED;          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -1127,7 +1136,7 @@ int np1_flow_dealloc(int port_id)          int fd;          pthread_rwlock_rdlock(&ai.data_lock); -        pthread_rwlock_wrlock(&ai.flows_lock); +        pthread_rwlock_rdlock(&ai.flows_lock);          fd = ai.ports[port_id].fd; @@ -1141,10 +1150,11 @@ int np1_flow_resp(int port_id)  {          int fd; -        port_wait_assign(&ai.ports[port_id]); +        if (port_wait_assign(&ai.ports[port_id]) != PORT_ID_ASSIGNED) +                return -1;          pthread_rwlock_rdlock(&ai.data_lock); -        pthread_rwlock_wrlock(&ai.flows_lock); +        pthread_rwlock_rdlock(&ai.flows_lock);          fd = ai.ports[port_id].fd; @@ -1211,66 +1221,78 @@ int ipcp_flow_req_arr(pid_t     api,                  return -1; /* -ENOMOREFDS */          } -        ai.flows[fd].tx_rb    = NULL; -          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock);          recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        if (recv_msg == NULL) { +                ai.ports[fd].state = PORT_INIT; +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  return -EIRMD; +        }          if (!recv_msg->has_port_id || !recv_msg->has_api) { +                ai.ports[fd].state = PORT_INIT; +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          if (recv_msg->has_result && recv_msg->result) { -                   irm_msg__free_unpacked(recv_msg, NULL); -                   return -1; +                ai.ports[fd].state = PORT_INIT; +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL); +                return -1;          }          port_id = recv_msg->port_id;          if (port_id < 0) { +                ai.ports[fd].state = PORT_INIT; +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } -        pthread_rwlock_rdlock(&ai.data_lock); -        pthread_rwlock_wrlock(&ai.flows_lock); -          ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);          if (ai.flows[fd].rx_rb == NULL) { -                irm_msg__free_unpacked(recv_msg, NULL);                  reset_flow(fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id);          if (ai.flows[fd].tx_rb == NULL) { -                irm_msg__free_unpacked(recv_msg, NULL);                  reset_flow(fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ai.flows[fd].set = shm_flow_set_open(recv_msg->api);          if (ai.flows[fd].set == NULL) { -                irm_msg__free_unpacked(recv_msg, NULL);                  reset_flow(fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ai.flows[fd].port_id = port_id; -        ai.flows[fd].oflags = FLOW_O_DEFAULT; +        ai.flows[fd].oflags  = FLOW_O_DEFAULT;          ai.ports[port_id].fd = fd; -        port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); +        port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -1390,19 +1412,20 @@ int ipcp_flow_write(int                  fd,  int ipcp_flow_fini(int fd)  { -        struct shm_rbuff * rb; +        struct shm_rbuff * rx_rb;          flow_set_flags(fd, FLOW_O_WRONLY);          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        rb = ai.flows[fd].rx_rb; +        rx_rb = ai.flows[fd].rx_rb;          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); -        shm_rbuff_fini(rb); +        shm_rbuff_fini(rx_rb); +          return 0;  } | 
