diff options
-rw-r--r-- | include/ouroboros/fcntl.h | 18 | ||||
-rw-r--r-- | src/ipcpd/normal/enroll.c | 8 | ||||
-rw-r--r-- | src/lib/cdap.c | 11 | ||||
-rw-r--r-- | src/lib/cdap_req.c | 6 | ||||
-rw-r--r-- | src/lib/cdap_req.h | 1 | ||||
-rw-r--r-- | src/lib/dev.c | 18 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 12 |
7 files changed, 47 insertions, 27 deletions
diff --git a/include/ouroboros/fcntl.h b/include/ouroboros/fcntl.h index c07b7bf0..65e8c378 100644 --- a/include/ouroboros/fcntl.h +++ b/include/ouroboros/fcntl.h @@ -37,18 +37,18 @@ #define FLOW_O_INVALID (FLOW_O_WRONLY | FLOW_O_RDWR) -int flow_set_flags(int fd, - int flags); +int flow_set_flags(int fd, + int flags); -int flow_get_flags(int fd); +int flow_get_flags(int fd); -int flow_set_timeout(int fd, - const struct timespec * to); +int flow_set_timeout(int fd, + const struct timespec * to); -int flow_get_timeout(int fd, - struct timespec * to); +int flow_get_timeout(int fd, + struct timespec * to); -int flow_get_qosspec(int fd, - qosspec_t * spec); +int flow_get_qosspec(int fd, + qosspec_t * qs); #endif /* OUROBOROS_FCNTL_H */ diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c index e2762993..3e6a0197 100644 --- a/src/ipcpd/normal/enroll.c +++ b/src/ipcpd/normal/enroll.c @@ -200,7 +200,7 @@ int enroll_boot(char * dst_name) clock_gettime(CLOCK_REALTIME, &t0); key = cdap_request_send(cdap, CDAP_READ, TIME_PATH, NULL, 0, 0); - if (key == NULL) { + if (key == NULL || key[0] == INVALID_CDAP_KEY) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); @@ -232,7 +232,7 @@ int enroll_boot(char * dst_name) free(data); key = cdap_request_send(cdap, CDAP_READ, boot_ro, NULL, 0, 0); - if (key == NULL) { + if (key == NULL || key[0] == INVALID_CDAP_KEY) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); @@ -263,7 +263,7 @@ int enroll_boot(char * dst_name) log_dbg("Packed information inserted into RIB."); key = cdap_request_send(cdap, CDAP_READ, members_ro, NULL, 0, 0); - if (key == NULL) { + if (key == NULL || key[0] == INVALID_CDAP_KEY) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); @@ -294,7 +294,7 @@ int enroll_boot(char * dst_name) log_dbg("Packed information inserted into RIB."); key = cdap_request_send(cdap, CDAP_READ, dif_ro, NULL, 0, 0); - if (key == NULL) { + if (key == NULL || key[0] == INVALID_CDAP_KEY) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 824f2c5d..f0db2419 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -183,8 +183,8 @@ static struct cdap_rcvd * cdap_rcvd_get_by_key(struct cdap * instance, list_for_each_safe(p, h, &instance->rcvd) { rcvd = list_entry(p, struct cdap_rcvd, next); if (rcvd->key == key) { - pthread_mutex_unlock(&instance->rcvd_lock); list_del(&rcvd->next); + pthread_mutex_unlock(&instance->rcvd_lock); return rcvd; } } @@ -669,6 +669,7 @@ cdap_key_t * cdap_request_send(struct cdap * instance, pthread_rwlock_unlock(&instance->flows_lock); release_id(instance, *key); release_id(instance, iid); + *key = INVALID_CDAP_KEY; return keys; } @@ -678,13 +679,17 @@ cdap_key_t * cdap_request_send(struct cdap * instance, cdap_sent_del(instance, req); release_id(instance, *key); release_id(instance, iid); + *key = INVALID_CDAP_KEY; return keys; } if (ret < 0) { + pthread_rwlock_unlock(&instance->flows_lock); cdap_sent_del(instance, req); release_id(instance, *key); release_id(instance, iid); + *key = INVALID_CDAP_KEY; + return keys; } ++key; @@ -717,6 +722,7 @@ int cdap_reply_wait(struct cdap * instance, if (ret < 0) { cdap_sent_del(instance, r); release_id(instance, iid); + release_id(instance, key); return ret; } @@ -731,6 +737,7 @@ int cdap_reply_wait(struct cdap * instance, cdap_sent_del(instance, r); release_id(instance, iid); + release_id(instance, key); return ret; } @@ -766,6 +773,8 @@ cdap_key_t cdap_request_wait(struct cdap * instance, } } + assert(rcv->proc == false); + rcv->proc = true; list_del(&rcv->next); list_add_tail(&rcv->next, &instance->rcvd); diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c index df748058..4eab6fa6 100644 --- a/src/lib/cdap_req.c +++ b/src/lib/cdap_req.c @@ -76,6 +76,7 @@ void cdap_req_destroy(struct cdap_req * creq) creq->state = REQ_NULL; pthread_cond_broadcast(&creq->cond); break; + case REQ_INIT_PENDING: case REQ_PENDING: case REQ_RESPONSE: creq->state = REQ_DESTROY; @@ -151,7 +152,10 @@ void cdap_req_respond(struct cdap_req * creq, pthread_mutex_lock(&creq->lock); - while (creq->state == REQ_INIT) + if (creq->state == REQ_INIT) + creq->state = REQ_INIT_PENDING; + + while (creq->state == REQ_INIT_PENDING) pthread_cond_wait(&creq->cond, &creq->lock); if (creq->state != REQ_PENDING) { diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h index 648ebc75..b21467f3 100644 --- a/src/lib/cdap_req.h +++ b/src/lib/cdap_req.h @@ -36,6 +36,7 @@ typedef cdap_key_t invoke_id_t; enum creq_state { REQ_NULL = 0, REQ_INIT, + REQ_INIT_PENDING, REQ_PENDING, REQ_RESPONSE, REQ_DONE, diff --git a/src/lib/dev.c b/src/lib/dev.c index e19083c3..5acbada2 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -555,6 +555,7 @@ int flow_alloc(const char * dst_name, ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; + ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; pthread_rwlock_unlock(&ai.flows_lock); @@ -921,7 +922,7 @@ struct fqueue * fqueue_create() if (fq == NULL) return NULL; - memset(fq->fqueue, -1, SHM_BUFFER_SIZE); + memset(fq->fqueue, -1, (SHM_BUFFER_SIZE) * sizeof(*fq->fqueue)); fq->fqsize = 0; fq->next = 0; @@ -1021,11 +1022,8 @@ int fqueue_next(struct fqueue * fq) if (fq == NULL) return -EINVAL; - if (fq->next == fq->fqsize) { - fq->fqsize = 0; - fq->next = 0; + if (fq->fqsize == 0) return -EPERM; - } pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1035,6 +1033,11 @@ int fqueue_next(struct fqueue * fq) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + if (fq->next == fq->fqsize) { + fq->fqsize = 0; + fq->next = 0; + } + return fd; } @@ -1319,6 +1322,9 @@ int ipcp_flow_read(int fd, int port_id = -1; struct shm_rbuff * rb; + assert(fd >=0); + assert(sdb); + pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1427,6 +1433,8 @@ ssize_t local_flow_read(int fd) { ssize_t ret; + assert(fd >= 0); + pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index b8d73650..9dffdf74 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -380,6 +380,8 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb->flags = SDB_VALID; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK + sdb->blocks = blocks; + *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); #else *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); @@ -389,9 +391,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb->size = size; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; -#ifdef SHM_RDRB_MULTI_BLOCK - sdb->blocks = blocks; -#endif + memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); return sdb->idx; @@ -461,6 +461,8 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb->flags = SDB_VALID; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK + sdb->blocks = blocks; + *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); #else *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); @@ -470,12 +472,8 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb->size = size; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; -#ifdef SHM_RDRB_MULTI_BLOCK - sdb->blocks = blocks; -#endif memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); - return sdb->idx; } |