diff options
-rw-r--r-- | include/ouroboros/shm_rbuff.h | 2 | ||||
-rw-r--r-- | src/ipcpd/normal/pol/complete.c | 3 | ||||
-rw-r--r-- | src/ipcpd/shim-data.c | 10 | ||||
-rw-r--r-- | src/irmd/api_table.c | 23 | ||||
-rw-r--r-- | src/irmd/main.c | 2 | ||||
-rw-r--r-- | src/irmd/registry.c | 6 | ||||
-rw-r--r-- | src/lib/dev.c | 6 | ||||
-rw-r--r-- | src/lib/shm_flow_set.c | 2 | ||||
-rw-r--r-- | src/lib/shm_rbuff.c | 20 |
9 files changed, 60 insertions, 14 deletions
diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index f31dab63..8471f47f 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -53,4 +53,6 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb); ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, const struct timespec * timeout); +size_t shm_rbuff_queued(struct shm_rbuff * rb); + #endif /* OUROBOROS_SHM_RBUFF_H */ diff --git a/src/ipcpd/normal/pol/complete.c b/src/ipcpd/normal/pol/complete.c index 3a9dd548..1f3f6031 100644 --- a/src/ipcpd/normal/pol/complete.c +++ b/src/ipcpd/normal/pol/complete.c @@ -83,6 +83,9 @@ static void * allocator(void * o) qs.delay = 0; qs.jitter = 0; + /* FIXME: implement QoS specs */ + qs.cube = QOS_CUBE_BE; + /* FIXME: subscribe to members to keep the graph complete. */ len = rib_children("/" MEMBERS_NAME, &children); for (i = 0; i < len; ++i) { diff --git a/src/ipcpd/shim-data.c b/src/ipcpd/shim-data.c index 6f5832a1..eb4ec33f 100644 --- a/src/ipcpd/shim-data.c +++ b/src/ipcpd/shim-data.c @@ -492,12 +492,10 @@ int shim_data_dir_query_wait(struct dir_query * query, query->state = QUERY_PENDING; - while (query->state == QUERY_PENDING) { - if ((ret = -pthread_cond_timedwait(&query->cond, - &query->lock, - &abstime)) == -ETIMEDOUT) - break; - } + while (query->state == QUERY_PENDING && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&query->cond, + &query->lock, + &abstime); if (query->state == QUERY_DESTROY) ret = -1; diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c index 9ba0f551..3b80ac91 100644 --- a/src/irmd/api_table.c +++ b/src/irmd/api_table.c @@ -35,6 +35,7 @@ struct api_entry * api_entry_create(pid_t api, char * apn) { struct api_entry * e; + pthread_condattr_t cattr; if (apn == NULL) return NULL; @@ -54,8 +55,26 @@ struct api_entry * api_entry_create(pid_t api, char * apn) e->state = API_INIT; - pthread_mutex_init(&e->state_lock, NULL); - pthread_cond_init(&e->state_cond, NULL); + if (pthread_condattr_init(&cattr)) { + free(e); + return NULL; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_mutex_init(&e->state_lock, NULL)) { + free(e); + return NULL; + } + + + if (pthread_cond_init(&e->state_cond, &cattr)) { + pthread_mutex_destroy(&e->state_lock); + free(e); + return NULL; + } return e; } diff --git a/src/irmd/main.c b/src/irmd/main.c index 08e41f8d..9eb34f38 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1356,7 +1356,7 @@ static struct irm_flow * flow_req_arr(pid_t api, pid_t h_api = -1; int port_id = -1; - struct timespec wt = {IRMD_REQ_ARR_TIMEOUT % 1000, + struct timespec wt = {IRMD_REQ_ARR_TIMEOUT / 1000, (IRMD_REQ_ARR_TIMEOUT % 1000) * MILLION}; log_dbg("Flow req arrived from IPCP %d for %s.", api, dst_name); diff --git a/src/irmd/registry.c b/src/irmd/registry.c index a488d80a..a4588963 100644 --- a/src/irmd/registry.c +++ b/src/irmd/registry.c @@ -76,7 +76,7 @@ static struct reg_entry * reg_entry_init(struct reg_entry * e, if (pthread_condattr_init(&cattr)) return NULL; -#ifdef __APPLE__ +#ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif if (pthread_cond_init(&e->state_cond, &cattr)) @@ -429,7 +429,7 @@ int reg_entry_leave_state(struct reg_entry * e, if (timeout) ret = -pthread_cond_timedwait(&e->state_cond, &e->state_lock, - timeout); + &abstime); else ret = -pthread_cond_wait(&e->state_cond, &e->state_lock); @@ -468,7 +468,7 @@ int reg_entry_wait_state(struct reg_entry * e, if (timeout) ret = -pthread_cond_timedwait(&e->state_cond, &e->state_lock, - timeout); + &abstime); else ret = -pthread_cond_wait(&e->state_cond, &e->state_lock); diff --git a/src/lib/dev.c b/src/lib/dev.c index 9ddc5b84..bd706dc8 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1029,6 +1029,8 @@ int flow_set_add(struct flow_set * set, int fd) { int ret; + size_t sdus; + size_t i; if (set == NULL) return -EINVAL; @@ -1038,6 +1040,10 @@ int flow_set_add(struct flow_set * set, ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); + for (i = 0; i < sdus; i++) + shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index a8e3fc79..615fbd2b 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -361,8 +361,6 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set, if (ret == -EOWNERDEAD) pthread_mutex_consistent(set->lock); #endif - if (ret == -ETIMEDOUT) - break; } if (ret != -ETIMEDOUT) { diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index a206a019..b8db7c19 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -384,3 +384,23 @@ void shm_rbuff_fini(struct shm_rbuff * rb) #endif pthread_cleanup_pop(true); } + +size_t shm_rbuff_queued(struct shm_rbuff * rb) +{ + size_t ret; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + ret = shm_rbuff_used(rb); + + pthread_mutex_unlock(rb->lock); + + return ret; +} |