diff options
| -rw-r--r-- | include/ouroboros/dev.h | 23 | ||||
| -rw-r--r-- | src/ipcpd/normal/connmgr.c | 15 | ||||
| -rw-r--r-- | src/irmd/ipcp.c | 7 | ||||
| -rw-r--r-- | src/irmd/irm_flow.c | 12 | ||||
| -rw-r--r-- | src/irmd/main.c | 203 | ||||
| -rw-r--r-- | src/lib/dev.c | 193 | ||||
| -rw-r--r-- | src/lib/irmd_messages.proto | 18 | ||||
| -rw-r--r-- | src/lib/shm_flow_set.c | 3 | ||||
| -rw-r--r-- | src/tools/cbr/cbr_client.c | 10 | ||||
| -rw-r--r-- | src/tools/cbr/cbr_server.c | 38 | ||||
| -rw-r--r-- | src/tools/echo/echo_client.c | 12 | ||||
| -rw-r--r-- | src/tools/echo/echo_server.c | 22 | ||||
| -rw-r--r-- | src/tools/operf/operf_client.c | 8 | ||||
| -rw-r--r-- | src/tools/operf/operf_server.c | 8 | ||||
| -rw-r--r-- | src/tools/oping/oping_client.c | 11 | ||||
| -rw-r--r-- | src/tools/oping/oping_server.c | 13 | 
16 files changed, 184 insertions, 412 deletions
| diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index e92cdd1c..4984736c 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -24,6 +24,7 @@  #include <ouroboros/qos.h>  #include <unistd.h> +#include <time.h>  #ifndef OUROBOROS_DEV_H  #define OUROBOROS_DEV_H @@ -33,20 +34,14 @@ int     ap_init(const char * ap_name);  void    ap_fini(void); -/* Returns flow descriptor (> 0) and qos spec. */ -int     flow_accept(qosspec_t * spec); +/* Returns flow descriptor, qs updates to supplied QoS. */ +int     flow_alloc(const char *      dst_name, +                   qosspec_t *       qs, +                   struct timespec * timeo); -int     flow_alloc_resp(int fd, -                        int response); - -/* - * Returns flow descriptor (> 0). - * On returning, spec will contain the actual supplied QoS. - */ -int     flow_alloc(const char * dst_name, -                   qosspec_t *  spec); - -int     flow_alloc_res(int fd); +/* Returns flow descriptor, qs updates to supplied QoS. */ +int     flow_accept(qosspec_t *       qs, +                    struct timespec * timeo);  int     flow_dealloc(int fd); @@ -58,4 +53,4 @@ ssize_t flow_read(int    fd,                    void * buf,                    size_t count); -#endif +#endif /* OUROBOROS_DEV_H */ diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index b8314917..8068d173 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -126,18 +126,13 @@ static void * flow_acceptor(void * o)                  pthread_rwlock_unlock(&ipcpi.state_lock); -                fd = flow_accept(&qs); +                fd = flow_accept(&qs, NULL);                  if (fd < 0) {                          if (fd != -EIRMD)                                  log_warn("Flow accept failed: %d", fd);                          continue;                  } -                if (flow_alloc_resp(fd, 0)) { -                        log_err("Failed to respond to flow alloc request."); -                        continue; -                } -                  if (cacep_rcv(fd, &rcv_info)) {                          log_err("Error establishing application connection.");                          flow_dealloc(fd); @@ -286,7 +281,7 @@ int connmgr_alloc(struct ae *   ae,          memset(&conn->conn_info, 0, sizeof(conn->conn_info)); -        conn->flow_info.fd = flow_alloc(dst_name, qs); +        conn->flow_info.fd = flow_alloc(dst_name, qs, NULL);          if (conn->flow_info.fd < 0) {                  log_err("Failed to allocate flow to %s.", dst_name);                  return -1; @@ -297,12 +292,6 @@ int connmgr_alloc(struct ae *   ae,          else                  memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs)); -        if (flow_alloc_res(conn->flow_info.fd)) { -                log_err("Flow allocation to %s failed.", dst_name); -                flow_dealloc(conn->flow_info.fd); -                return -1; -        } -          if (cacep_snd(conn->flow_info.fd, &ae->info)) {                  log_err("Failed to create application connection.");                  flow_dealloc(conn->flow_info.fd); diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 06b66d3b..a8263580 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -176,18 +176,11 @@ pid_t ipcp_create(char *         name,  int ipcp_destroy(pid_t api)  { -        int status; -          if (kill(api, SIGTERM)) {                  log_err("Failed to destroy IPCP");                  return -1;          } -        if (waitpid(api, &status, 0) < 0) { -                log_err("Failed to destroy IPCP"); -                return -1; -        } -          return 0;  } diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index 99966561..4e7c22ef 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -45,6 +45,7 @@ struct irm_flow * irm_flow_create(pid_t n_api,          }          if (pthread_mutex_init(&f->state_lock, NULL)) { +                pthread_cond_destroy(&f->state_cond);                  free(f);                  return NULL;          } @@ -63,6 +64,9 @@ struct irm_flow * irm_flow_create(pid_t n_api,          f->n_1_rb = shm_rbuff_create(n_1_api, port_id);          if (f->n_1_rb == NULL) {                  log_err("Could not create ringbuffer for AP-I %d.", n_1_api); +                shm_rbuff_destroy(f->n_rb); +                pthread_mutex_destroy(&f->state_lock); +                pthread_cond_destroy(&f->state_cond);                  free(f);                  return NULL;          } @@ -122,7 +126,8 @@ enum flow_state irm_flow_get_state(struct irm_flow * f)          return state;  } -void irm_flow_set_state(struct irm_flow * f, enum flow_state state) +void irm_flow_set_state(struct irm_flow * f, +                        enum flow_state   state)  {          assert(f);          assert(state != FLOW_DESTROY); @@ -135,7 +140,8 @@ void irm_flow_set_state(struct irm_flow * f, enum flow_state state)          pthread_mutex_unlock(&f->state_lock);  } -enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state) +enum flow_state irm_flow_wait_state(struct irm_flow * f, +                                    enum flow_state   state)  {          assert(f);          assert(state != FLOW_NULL); @@ -143,6 +149,8 @@ enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state)          pthread_mutex_lock(&f->state_lock); +        assert(f->state != FLOW_NULL); +          while (!(f->state == state || f->state == FLOW_DESTROY))                  pthread_cond_wait(&f->state_cond, &f->state_lock); diff --git a/src/irmd/main.c b/src/irmd/main.c index 9901a608..c7adf386 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -133,7 +133,8 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api)          list_for_each(pos, &irmd->irm_flows) {                  struct irm_flow * e = list_entry(pos, struct irm_flow, next); -                if (e->n_api == n_api) +                if (e->n_api == n_api && +                    irm_flow_get_state(e) == FLOW_ALLOC_PENDING)                          return e;          } @@ -982,7 +983,12 @@ static struct irm_flow * flow_accept(pid_t       api,          struct irm_flow *  f  = NULL;          struct api_entry * e  = NULL;          struct reg_entry * re = NULL; -        struct list_head * p; +        struct list_head * p  = NULL; + +        pid_t api_n1; +        pid_t api_n; +        int   port_id; +        int   ret;          pthread_rwlock_rdlock(&irmd->state_lock); @@ -1016,7 +1022,7 @@ static struct irm_flow * flow_accept(pid_t       api,          pthread_rwlock_unlock(&irmd->reg_lock);          pthread_rwlock_unlock(&irmd->state_lock); -        while (api_entry_sleep(e) == -ETIMEDOUT) { +        while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) {                  pthread_rwlock_rdlock(&irmd->state_lock);                  if (irmd->state != IRMD_RUNNING) {                          pthread_rwlock_unlock(&irmd->state_lock); @@ -1025,126 +1031,76 @@ static struct irm_flow * flow_accept(pid_t       api,                  pthread_rwlock_unlock(&irmd->state_lock);          } -        pthread_rwlock_rdlock(&irmd->state_lock); - -        if (irmd->state != IRMD_RUNNING) { -                reg_entry_set_state(re, REG_NAME_NULL); -                pthread_rwlock_unlock(&irmd->state_lock); +        if (ret == -1) { +                /* The process died, we can exit here. */                  return NULL;          } -        pthread_rwlock_rdlock(&irmd->reg_lock); +        pthread_rwlock_rdlock(&irmd->state_lock); -        e = api_table_get(&irmd->api_table, api); -        if (e == NULL) { -                pthread_rwlock_unlock(&irmd->reg_lock); +        if (irmd->state != IRMD_RUNNING) { +                reg_entry_set_state(re, REG_NAME_NULL);                  pthread_rwlock_unlock(&irmd->state_lock); -                log_dbg("Process gone while accepting flow.");                  return NULL;          } -        pthread_mutex_lock(&e->state_lock); - -        re = e->re; - -        pthread_mutex_unlock(&e->state_lock); - -        if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) { -                pthread_rwlock_unlock(&irmd->reg_lock); -                pthread_rwlock_unlock(&irmd->state_lock); -                log_err("Entry in wrong state."); -                return NULL; -        } -        pthread_rwlock_unlock(&irmd->reg_lock);          pthread_rwlock_rdlock(&irmd->flows_lock);          f = get_irm_flow_n(api);          if (f == NULL) {                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock); -                log_err("Port_id was not created yet."); +                log_warn("Port_id was not created yet.");                  return NULL;          }          *cube = re->qos; +        api_n   = f->n_api; +        api_n1  = f->n_1_api; +        port_id = f->port_id; +          log_info("Flow on port_id %d allocated.", f->port_id);          pthread_rwlock_unlock(&irmd->flows_lock); -        pthread_rwlock_unlock(&irmd->state_lock); - -        return f; -} - -static int flow_alloc_resp(pid_t n_api, -                           int   port_id, -                           int   response) -{ -        struct irm_flow *  f  = NULL; -        struct reg_entry * re = NULL; -        struct api_entry * e  = NULL; -        int ret = -1; - -        pid_t api_n1; -        pid_t api_n; - -        pthread_rwlock_rdlock(&irmd->state_lock); - -        if (irmd->state != IRMD_RUNNING) { -                pthread_rwlock_unlock(&irmd->state_lock); -                return -1; -        } - -        pthread_rwlock_wrlock(&irmd->reg_lock); +        pthread_rwlock_rdlock(&irmd->reg_lock); -        e = api_table_get(&irmd->api_table, n_api); +        e = api_table_get(&irmd->api_table, api);          if (e == NULL) {                  pthread_rwlock_unlock(&irmd->reg_lock);                  pthread_rwlock_unlock(&irmd->state_lock); -                log_err("Unknown AP-I %d responding for port_id %d.", -                        n_api, port_id); -                return -1; +                ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); +                log_dbg("Process gone while accepting flow."); +                return NULL;          } +        pthread_mutex_lock(&e->state_lock); +          re = e->re; -        if (re == NULL) { -                pthread_rwlock_unlock(&irmd->reg_lock); -                pthread_rwlock_unlock(&irmd->state_lock); -                log_err("AP-I %d is not handling a flow request.", n_api); -                return -1; -        } + +        pthread_mutex_unlock(&e->state_lock);          if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {                  pthread_rwlock_unlock(&irmd->reg_lock);                  pthread_rwlock_unlock(&irmd->state_lock); -                log_err("Name %s has no pending flow request.", re->name); -                return -1; +                ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); +                log_err("Entry in wrong state."); +                return NULL;          } -        registry_del_api(&irmd->registry, n_api); +        registry_del_api(&irmd->registry, api);          pthread_rwlock_unlock(&irmd->reg_lock); -        pthread_rwlock_wrlock(&irmd->flows_lock); - -        f = get_irm_flow(port_id); -        if (f == NULL) { -                pthread_rwlock_unlock(&irmd->flows_lock); -                pthread_rwlock_unlock(&irmd->state_lock); -                return -1; -        } - -        api_n  = f->n_api; -        api_n1 = f->n_1_api; - -        pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_unlock(&irmd->state_lock); -        ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response); +        if (ipcp_flow_alloc_resp(api_n1, port_id, api_n, 0)) { +                log_dbg("Failed to respond to alloc."); +                return NULL; +        } -        if (!(response || ret)) -                irm_flow_set_state(f, FLOW_ALLOCATED); +        irm_flow_set_state(f, FLOW_ALLOCATED); -        return ret; +        return f;  }  static struct irm_flow * flow_alloc(pid_t     api, @@ -1196,6 +1152,8 @@ static struct irm_flow * flow_alloc(pid_t     api,          pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_unlock(&irmd->state_lock); +        assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); +          if (ipcp_flow_alloc(ipcp, port_id, api,                              dst_name, cube) < 0) {                  pthread_rwlock_rdlock(&irmd->state_lock); @@ -1210,54 +1168,16 @@ static struct irm_flow * flow_alloc(pid_t     api,                  return NULL;          } -        return f; -} - -static int flow_alloc_res(int port_id) -{ -        struct irm_flow * f; - -        pthread_rwlock_rdlock(&irmd->state_lock); - -        if (irmd->state != IRMD_RUNNING) { -                pthread_rwlock_unlock(&irmd->state_lock); -                return -1; -        } -        pthread_rwlock_rdlock(&irmd->flows_lock); - -        f = get_irm_flow(port_id); -        if (f == NULL) { -                pthread_rwlock_unlock(&irmd->flows_lock); -                pthread_rwlock_unlock(&irmd->state_lock); -                log_err("Could not find port %d.", port_id); -                return -1; -        } - -        if (irm_flow_get_state(f) == FLOW_NULL) { -                pthread_rwlock_unlock(&irmd->flows_lock); -                pthread_rwlock_unlock(&irmd->state_lock); -                log_info("Port %d is deprecated.", port_id); -                return -1; -        } - -        if (irm_flow_get_state(f) == FLOW_ALLOCATED) { -                log_info("Flow on port_id %d allocated.", port_id); -                pthread_rwlock_unlock(&irmd->flows_lock); -                pthread_rwlock_unlock(&irmd->state_lock); -                return 0; +        if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) { +                log_info("Pending flow on port_id %d torn down.", port_id); +                return NULL;          } -        pthread_rwlock_unlock(&irmd->flows_lock); -        pthread_rwlock_unlock(&irmd->state_lock); - -        if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) { -                log_info("Flow on port_id %d allocated.", port_id); -                return 0; -        } +        assert(irm_flow_get_state(f) == FLOW_ALLOCATED); -        log_info("Pending flow on port_id %d torn down.", port_id); +        log_info("Flow on port_id %d allocated.", port_id); -        return -1; +        return f;  }  static int flow_dealloc(pid_t api, @@ -1293,6 +1213,9 @@ static int flow_dealloc(pid_t api,          if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) {                  list_del(&f->next); +                if ((kill(f->n_api, 0) < 0 && f->n_1_api == -1) || +                    (kill (f->n_1_api, 0) < 0 && f->n_api == -1)) +                        irm_flow_set_state(f, FLOW_NULL);                  clear_irm_flow(f);                  irm_flow_destroy(f);                  bmp_release(irmd->port_ids, port_id); @@ -1305,12 +1228,11 @@ static int flow_dealloc(pid_t api,          }          pthread_rwlock_unlock(&irmd->flows_lock); +        pthread_rwlock_unlock(&irmd->state_lock);          if (n_1_api != -1)                  ret = ipcp_flow_dealloc(n_1_api, port_id); -        pthread_rwlock_unlock(&irmd->state_lock); -          return ret;  } @@ -1501,7 +1423,7 @@ static int flow_alloc_reply(int port_id,          struct irm_flow * f;          pthread_rwlock_rdlock(&irmd->state_lock); -        pthread_rwlock_wrlock(&irmd->flows_lock); +        pthread_rwlock_rdlock(&irmd->flows_lock);          f = get_irm_flow(port_id);          if (f == NULL) { @@ -1551,18 +1473,19 @@ static void irm_destroy(void)          list_for_each_safe(p, h, &irmd->ipcps) {                  struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next);                  list_del(&e->next); -                ipcp_destroy(e->api); -                clear_spawned_api(e->api); -                registry_del_api(&irmd->registry, e->api);                  ipcp_entry_destroy(e);          } -        list_for_each_safe(p, h, &irmd->spawned_apis) { +        list_for_each(p, &irmd->spawned_apis) {                  struct pid_el * e = list_entry(p, struct pid_el, next); -                int status;                  if (kill(e->pid, SIGTERM))                          log_dbg("Could not send kill signal to %d.", e->pid); -                else if (waitpid(e->pid, &status, 0) < 0) +        } + +        list_for_each_safe(p, h, &irmd->spawned_apis) { +                struct pid_el * e = list_entry(p, struct pid_el, next); +                int status; +                if (waitpid(e->pid, &status, 0) < 0)                          log_dbg("Error waiting for %d to exit.", e->pid);                  list_del(&e->next);                  registry_del_api(&irmd->registry, e->pid); @@ -1940,12 +1863,6 @@ void * mainloop(void * o)                          ret_msg.has_api     = true;                          ret_msg.api         = e->n_1_api;                          break; -                case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP: -                        ret_msg.has_result = true; -                        ret_msg.result = flow_alloc_resp(msg->api, -                                                         msg->port_id, -                                                         msg->response); -                        break;                  case IRM_MSG_CODE__IRM_FLOW_ALLOC:                          e = flow_alloc(msg->api,                                         msg->dst_name, @@ -1960,10 +1877,6 @@ void * mainloop(void * o)                          ret_msg.has_api     = true;                          ret_msg.api         = e->n_1_api;                          break; -                case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: -                        ret_msg.has_result = true; -                        ret_msg.result = flow_alloc_res(msg->port_id); -                        break;                  case IRM_MSG_CODE__IRM_FLOW_DEALLOC:                          ret_msg.has_result = true;                          ret_msg.result = flow_dealloc(msg->api, msg->port_id); diff --git a/src/lib/dev.c b/src/lib/dev.c index 79797b92..e19083c3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -161,21 +161,21 @@ struct {  } ai;  /* FIXME: translate real spec to cube */ -static qoscube_t spec_to_cube(qosspec_t * spec) +static qoscube_t spec_to_cube(qosspec_t * qs)  { -        if (spec == NULL) +        if (qs == NULL)                  return QOS_CUBE_BE; -        return spec->cube; +        return qs->cube;  }  /* FIXME: fill real spec */ -static void fill_qosspec(qosspec_t * spec, +static void fill_qosspec(qosspec_t * qs,                           qoscube_t   cube)  { -        assert(spec); +        assert(qs); -        spec->cube = cube; +        qs->cube = cube;  }  static int api_announce(char * ap_name) @@ -209,6 +209,17 @@ static int api_announce(char * ap_name)          return ret;  } +static void init_flow(int fd) +{ +        assert(!(fd < 0)); + +        memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); + +        ai.flows[fd].port_id  = -1; +        ai.flows[fd].api      = -1; +        ai.flows[fd].cube     = QOS_CUBE_BE; +} +  static void reset_flow(int fd)  {          assert (!(fd < 0)); @@ -216,25 +227,17 @@ static void reset_flow(int fd)          if (ai.flows[fd].port_id != -1)                  port_destroy(&ai.ports[ai.flows[fd].port_id]); -        ai.flows[fd].port_id = -1; -        if (ai.flows[fd].rx_rb != NULL) { +        if (ai.flows[fd].rx_rb != NULL)                  shm_rbuff_close(ai.flows[fd].rx_rb); -                ai.flows[fd].rx_rb = NULL; -        } -        if (ai.flows[fd].tx_rb != NULL) { + +        if (ai.flows[fd].tx_rb != NULL)                  shm_rbuff_close(ai.flows[fd].tx_rb); -                ai.flows[fd].tx_rb = NULL; -        } -        if (ai.flows[fd].set != NULL) { +        if (ai.flows[fd].set != NULL)                  shm_flow_set_close(ai.flows[fd].set); -                ai.flows[fd].set = NULL; -        } -        ai.flows[fd].oflags = 0; -        ai.flows[fd].api = -1; -        ai.flows[fd].timesout = false; -        ai.flows[fd].cube = QOS_CUBE_BE; +        init_flow(fd); +  }  int ap_init(const char * ap_name) @@ -280,16 +283,8 @@ int ap_init(const char * ap_name)                  return -1;          } -        for (i = 0; i < AP_MAX_FLOWS; ++i) { -                ai.flows[i].rx_rb    = NULL; -                ai.flows[i].tx_rb    = NULL; -                ai.flows[i].set      = NULL; -                ai.flows[i].port_id  = -1; -                ai.flows[i].oflags   = 0; -                ai.flows[i].api      = -1; -                ai.flows[i].timesout = false; -                ai.flows[i].cube     = QOS_CUBE_BE; -        } +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                init_flow(i);          ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);          if (ai.ports == NULL) { @@ -382,7 +377,8 @@ void ap_fini()          pthread_rwlock_destroy(&ai.data_lock);  } -int flow_accept(qosspec_t * spec) +int flow_accept(qosspec_t *       qs, +                struct timespec * timeo)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; @@ -391,6 +387,13 @@ int flow_accept(qosspec_t * spec)          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_api = true; +        if (timeo != NULL) { +                msg.has_timeo_sec = true; +                msg.has_timeo_usec = true; +                msg.timeo_sec  = timeo->tv_sec; +                msg.timeo_usec = timeo->tv_nsec / 1000; +        } +          pthread_rwlock_rdlock(&ai.data_lock);          msg.api     = ai.api; @@ -424,7 +427,6 @@ int flow_accept(qosspec_t * spec)          ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);          if (ai.flows[fd].rx_rb == NULL) { -                reset_flow(fd);                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -435,8 +437,10 @@ int flow_accept(qosspec_t * spec)          ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id);          if (ai.flows[fd].tx_rb == NULL) {                  reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -455,8 +459,8 @@ int flow_accept(qosspec_t * spec)          ai.flows[fd].api     = recv_msg->api;          ai.flows[fd].cube    = recv_msg->qoscube; -        if (spec != NULL) -                fill_qosspec(spec, ai.flows[fd].cube); +        if (qs != NULL) +                fill_qosspec(qs, ai.flows[fd].cube);          ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -469,69 +473,27 @@ int flow_accept(qosspec_t * spec)          return fd;  } -int flow_alloc_resp(int fd, -                    int response) +int flow_alloc(const char *      dst_name, +               qosspec_t *       qs, +               struct timespec * timeo)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; -        int ret = -1; - -        if (fd < 0 || fd >= AP_MAX_FLOWS) -                return -EBADF; - -        msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; -        msg.has_api      = true; -        msg.api          = ai.api; -        msg.has_port_id  = true; - -        pthread_rwlock_rdlock(&ai.data_lock); -        pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai.flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -ENOTALLOC; -        } - -        msg.port_id      = ai.flows[fd].port_id; - -        pthread_rwlock_unlock(&ai.flows_lock); -        pthread_rwlock_unlock(&ai.data_lock); - -        msg.has_response = true; -        msg.response     = response; - -        recv_msg = send_recv_irm_msg(&msg); -        if (recv_msg == NULL) -                return -EIRMD; - -        if (!recv_msg->has_result) { -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        ret = recv_msg->result; - -        irm_msg__free_unpacked(recv_msg, NULL); - -        return ret; -} - -int flow_alloc(const char * dst_name, -               qosspec_t *  spec) -{ -        irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int fd = -1; - -        if (dst_name == NULL) -                return -EINVAL; +        int fd;          msg.code        = IRM_MSG_CODE__IRM_FLOW_ALLOC;          msg.dst_name    = (char *) dst_name;          msg.has_api     = true;          msg.has_qoscube = true; -        msg.qoscube     = spec_to_cube(spec); +        msg.qoscube     = spec_to_cube(qs); + +        if (timeo != NULL) { +                msg.has_timeo_sec = true; +                msg.has_timeo_usec = true; +                msg.timeo_sec  = timeo->tv_sec; +                msg.timeo_usec = timeo->tv_nsec / 1000; +        }          pthread_rwlock_rdlock(&ai.data_lock); @@ -561,7 +523,6 @@ int flow_alloc(const char * dst_name,          ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);          if (ai.flows[fd].rx_rb == NULL) { -                reset_flow(fd);                  bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -571,16 +532,21 @@ int flow_alloc(const char * dst_name,          ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id);          if (ai.flows[fd].tx_rb == NULL) { +                reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          }          ai.flows[fd].set = shm_flow_set_open(recv_msg->api);          if (ai.flows[fd].set == NULL) {                  reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); +                irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -589,7 +555,6 @@ int flow_alloc(const char * dst_name,          ai.flows[fd].api     = recv_msg->api;          ai.flows[fd].cube    = recv_msg->qoscube; -        ai.ports[recv_msg->port_id].fd    = fd;          ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;          pthread_rwlock_unlock(&ai.flows_lock); @@ -600,48 +565,6 @@ int flow_alloc(const char * dst_name,          return fd;  } -int flow_alloc_res(int fd) -{ -        irm_msg_t msg = IRM_MSG__INIT; -        irm_msg_t * recv_msg = NULL; -        int result = 0; - -        if (fd < 0 || fd >= AP_MAX_FLOWS) -                return -EBADF; - -        msg.code         = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; -        msg.has_port_id  = true; - -        pthread_rwlock_rdlock(&ai.data_lock); -        pthread_rwlock_rdlock(&ai.flows_lock); - -        if (ai.flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai.flows_lock); -                pthread_rwlock_unlock(&ai.data_lock); -                return -ENOTALLOC; -        } - -        msg.port_id = ai.flows[fd].port_id; - -        pthread_rwlock_unlock(&ai.flows_lock); -        pthread_rwlock_unlock(&ai.data_lock); - -        recv_msg = send_recv_irm_msg_b(&msg); -        if (recv_msg == NULL) -                return -EIRMD; - -        if (!recv_msg->has_result) { -                irm_msg__free_unpacked(recv_msg, NULL); -                return -1; -        } - -        result = recv_msg->result; - -        irm_msg__free_unpacked(recv_msg, NULL); - -        return result; -} -  int flow_dealloc(int fd)  {          irm_msg_t msg = IRM_MSG__INIT; @@ -804,9 +727,9 @@ int flow_set_timeout(int                     fd,  }  int flow_get_qosspec(int         fd, -                     qosspec_t * spec) +                     qosspec_t * qs)  { -        if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL) +        if (fd < 0 || fd >= AP_MAX_FLOWS || qs == NULL)                  return -EINVAL;          pthread_rwlock_rdlock(&ai.data_lock); @@ -818,7 +741,7 @@ int flow_get_qosspec(int         fd,                  return -ENOTALLOC;          } -        fill_qosspec(spec, ai.flows[fd].cube); +        fill_qosspec(qs, ai.flows[fd].cube);          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index c25d2c18..4fbd676e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -39,14 +39,12 @@ enum irm_msg_code {          IRM_UNBIND_API        = 11;          IRM_REG               = 12;          IRM_UNREG             = 13; -        IRM_FLOW_ACCEPT       = 14; -        IRM_FLOW_ALLOC_RESP   = 15; -        IRM_FLOW_ALLOC        = 16; -        IRM_FLOW_ALLOC_RES    = 17; -        IRM_FLOW_DEALLOC      = 18; -        IPCP_FLOW_REQ_ARR     = 19; -        IPCP_FLOW_ALLOC_REPLY = 20; -        IRM_REPLY             = 21; +        IRM_FLOW_ALLOC        = 14; +        IRM_FLOW_ACCEPT       = 15; +        IRM_FLOW_DEALLOC      = 16; +        IPCP_FLOW_REQ_ARR     = 17; +        IPCP_FLOW_ALLOC_REPLY = 18; +        IRM_REPLY             = 19;  };  message irm_msg { @@ -63,5 +61,7 @@ message irm_msg {          optional dif_config_msg conf = 11;          optional uint32 opts         = 12;          repeated sint32 apis         = 13; -        optional sint32 result       = 14; +        optional uint32 timeo_sec    = 14; +        optional uint32 timeo_usec   = 15; +        optional sint32 result       = 16;  }; diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 615fbd2b..67abbb5b 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -302,7 +302,8 @@ int shm_flow_set_has(struct shm_flow_set * set,          return ret;  } -void shm_flow_set_notify(struct shm_flow_set * set, int port_id) +void shm_flow_set_notify(struct shm_flow_set * set, +                         int                   port_id)  {          assert(set);          assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); diff --git a/src/tools/cbr/cbr_client.c b/src/tools/cbr/cbr_client.c index 16ade13d..5ec1d560 100644 --- a/src/tools/cbr/cbr_client.c +++ b/src/tools/cbr/cbr_client.c @@ -63,7 +63,6 @@ int client_main(char * server,          struct sigaction sig_act;          int fd = 0; -        int result = 0;          char buf[size];          long seqnr = 0;          long gap = size * 8.0 * (BILLION / (double) rate); @@ -90,19 +89,12 @@ int client_main(char * server,          printf("Client started, duration %d, rate %lu b/s, size %d B.\n",                 duration, rate, size); -        fd = flow_alloc(server, NULL); +        fd = flow_alloc(server, NULL, NULL);          if (fd < 0) {                  printf("Failed to allocate flow.\n");                  return -1;          } -        result = flow_alloc_res(fd); -        if (result < 0) { -                printf("Flow allocation refused.\n"); -                flow_dealloc(fd); -                return -1; -        } -          clock_gettime(CLOCK_REALTIME, &start);          if (!flood) {                  while (!stop) { diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c index 9198858c..1a963a64 100644 --- a/src/tools/cbr/cbr_server.c +++ b/src/tools/cbr/cbr_server.c @@ -146,6 +146,8 @@ static void * worker(void * o)                  pthread_mutex_lock(&fds_lock);                  fds_count--; + +                pthread_cond_signal(&fds_signal);                  pthread_mutex_unlock(&fds_lock);          } @@ -154,8 +156,7 @@ static void * worker(void * o)  static void * listener(void * o)  { -        int client_fd = 0; -        int response = 0; +        int fd = 0;          qosspec_t qs;          (void) o; @@ -164,8 +165,19 @@ static void * listener(void * o)                 server_settings.interval, server_settings.timeout);          while (true) { -                client_fd = flow_accept(&qs); -                if (client_fd < 0) { +                pthread_mutex_lock(&fds_lock); +                pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, +                                     (void *) &fds_lock); + +                while (fds_count == THREADS_SIZE) { +                        printf("Can't accept any more flows, waiting.\n"); +                        pthread_cond_wait(&fds_signal, &fds_lock); +                } + +                pthread_cleanup_pop(true); + +                fd = flow_accept(&qs, NULL); +                if (fd < 0) {                          printf("Failed to accept flow.\n");                          break;                  } @@ -174,26 +186,12 @@ static void * listener(void * o)                  pthread_mutex_lock(&fds_lock); -                response = (fds_count < THREADS_SIZE) ? 0 : -1; - -                if (flow_alloc_resp(client_fd, response)) { -                        printf("Failed to give an allocate response.\n"); -                        flow_dealloc(client_fd); -                        pthread_mutex_unlock(&fds_lock); -                        continue; -                } - -                if (response) { -                        printf("Can't accept any more flows, denying.\n"); -                        continue; -                } -                  fds_count++;                  fds_index = (fds_index + 1) % THREADS_SIZE; -                fds[fds_index] = client_fd; +                fds[fds_index] = fd; -                pthread_mutex_unlock(&fds_lock);                  pthread_cond_signal(&fds_signal); +                pthread_mutex_unlock(&fds_lock);          }          return 0; diff --git a/src/tools/echo/echo_client.c b/src/tools/echo/echo_client.c index f84de73a..5ec2051f 100644 --- a/src/tools/echo/echo_client.c +++ b/src/tools/echo/echo_client.c @@ -26,25 +26,17 @@  int client_main(void)  {          int fd = 0; -        int result = 0;          char buf[BUF_SIZE];          char * message  = "Client says hi!";          ssize_t count = 0; -        fd = flow_alloc("echo", NULL); +        fd = flow_alloc("echo", NULL, NULL);          if (fd < 0) {                  printf("Failed to allocate flow.\n");                  return -1;          } -        result = flow_alloc_res(fd); -        if (result < 0) { -                printf("Flow allocation refused.\n"); -                flow_dealloc(fd); -                return -1; -        } - -        if (flow_write(fd, message, strlen(message) + 1) == -1) { +        if (flow_write(fd, message, strlen(message) + 1) < 0) {                  printf("Failed to write SDU.\n");                  flow_dealloc(fd);                  return -1; diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c index aa136485..771155f4 100644 --- a/src/tools/echo/echo_server.c +++ b/src/tools/echo/echo_server.c @@ -37,7 +37,7 @@ void shutdown_server(int signo)  int server_main(void)  { -        int    client_fd = 0; +        int    fd = 0;          char   buf[BUF_SIZE];          ssize_t count = 0;          qosspec_t qs; @@ -51,36 +51,30 @@ int server_main(void)          }          while (true) { -                client_fd = flow_accept(&qs); -                if (client_fd < 0) { +                fd = flow_accept(&qs, NULL); +                if (fd < 0) {                          printf("Failed to accept flow.\n");                          break;                  }                  printf("New flow.\n"); -                if (flow_alloc_resp(client_fd, 0)) { -                        printf("Failed to give an allocate response.\n"); -                        flow_dealloc(client_fd); -                        continue; -                } - -                count = flow_read(client_fd, &buf, BUF_SIZE); +                count = flow_read(fd, &buf, BUF_SIZE);                  if (count < 0) {                          printf("Failed to read SDU.\n"); -                        flow_dealloc(client_fd); +                        flow_dealloc(fd);                          continue;                  }                  printf("Message from client is %.*s.\n", (int) count, buf); -                if (flow_write(client_fd, buf, count) == -1) { +                if (flow_write(fd, buf, count) == -1) {                          printf("Failed to write SDU.\n"); -                        flow_dealloc(client_fd); +                        flow_dealloc(fd);                          continue;                  } -                flow_dealloc(client_fd); +                flow_dealloc(fd);          }          return 0; diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c index d2f08ef4..7827b62b 100644 --- a/src/tools/operf/operf_client.c +++ b/src/tools/operf/operf_client.c @@ -182,18 +182,12 @@ int client_main(void)          client.sent = 0;          client.rcvd = 0; -        fd = flow_alloc(client.s_apn, NULL); +        fd = flow_alloc(client.s_apn, NULL, NULL);          if (fd < 0) {                  printf("Failed to allocate flow.\n");                  return -1;          } -        if (flow_alloc_res(fd)) { -                printf("Flow allocation refused.\n"); -                flow_dealloc(fd); -                return -1; -        } -          clock_gettime(CLOCK_REALTIME, &tic);          pthread_create(&client.reader_pt, NULL, reader, &fd); diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c index 3665d4cc..b17a4f7b 100644 --- a/src/tools/operf/operf_server.c +++ b/src/tools/operf/operf_server.c @@ -108,7 +108,7 @@ void * accept_thread(void * o)          printf("Ouroboros perf server started.\n");          while (true) { -                fd = flow_accept(&qs); +                fd = flow_accept(&qs, NULL);                  if (fd < 0) {                          printf("Failed to accept flow.\n");                          break; @@ -116,12 +116,6 @@ void * accept_thread(void * o)                  printf("New flow %d.\n", fd); -                if (flow_alloc_resp(fd, 0)) { -                        printf("Failed to give an allocate response.\n"); -                        flow_dealloc(fd); -                        continue; -                } -                  clock_gettime(CLOCK_REALTIME, &now);                  pthread_mutex_lock(&server.lock); diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index a91a126c..77a08db7 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -176,7 +176,6 @@ static int client_init(void)          client.rtt_m2 = 0;          pthread_mutex_init(&client.lock, NULL); -        pthread_mutex_lock(&client.lock);          return 0;  } @@ -213,21 +212,13 @@ int client_main(void)                  return -1;          } -        fd = flow_alloc(client.s_apn, NULL); +        fd = flow_alloc(client.s_apn, NULL, NULL);          if (fd < 0) {                  printf("Failed to allocate flow.\n"); -                return -1; -        } - -        if (flow_alloc_res(fd)) { -                printf("Flow allocation refused.\n"); -                flow_dealloc(fd);                  client_fini();                  return -1;          } -        pthread_mutex_unlock(&client.lock); -          clock_gettime(CLOCK_REALTIME, &tic);          pthread_create(&client.reader_pt, NULL, reader, &fd); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index e20e236d..44a301ba 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -57,6 +57,7 @@ void * cleaner_thread(void * o)                  for (i = 0; i < OPING_MAX_FLOWS; ++i)                          if (flow_set_has(server.flows, i) &&                              ts_diff_ms(&server.times[i], &now) > deadline_ms) { +                                printf("Flow %d timed out.\n", i);                                  flow_set_del(server.flows, i);                                  flow_dealloc(i);                          } @@ -110,8 +111,8 @@ void * server_thread(void *o)  void * accept_thread(void * o)  { -        int fd = 0; -        struct timespec now = {0, 0}; +        int fd; +        struct timespec now;          qosspec_t qs;          (void) o; @@ -119,7 +120,7 @@ void * accept_thread(void * o)          printf("Ouroboros ping server started.\n");          while (true) { -                fd = flow_accept(&qs); +                fd = flow_accept(&qs, NULL);                  if (fd < 0) {                          printf("Failed to accept flow.\n");                          break; @@ -127,12 +128,6 @@ void * accept_thread(void * o)                  printf("New flow %d.\n", fd); -                if (flow_alloc_resp(fd, 0)) { -                        printf("Failed to give an allocate response.\n"); -                        flow_dealloc(fd); -                        continue; -                } -                  clock_gettime(CLOCK_REALTIME, &now);                  pthread_mutex_lock(&server.lock); | 
