diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/ipcp.c | 6 | ||||
| -rw-r--r-- | src/ipcpd/normal/main.c | 18 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 50 | ||||
| -rw-r--r-- | src/lib/cdap.c | 153 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 2 | 
5 files changed, 155 insertions, 74 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 544b10df..0263d7b5 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -33,6 +33,8 @@  struct ipcp * ipcp_instance_create()  { +        pthread_condattr_t cattr; +          struct ipcp * i = malloc(sizeof *i);          if (i == NULL)                  return NULL; @@ -43,7 +45,9 @@ struct ipcp * ipcp_instance_create()          i->state   = IPCP_INIT;          pthread_mutex_init(&i->state_lock, NULL); -        pthread_cond_init(&i->state_cond, NULL); +        pthread_condattr_init(&cattr); +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +        pthread_cond_init(&i->state_cond, &cattr);          return i;  } diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index cf6ac728..4173246d 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -28,12 +28,14 @@  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/dev.h>  #include <ouroboros/ipcp.h> +#include <ouroboros/time_utils.h>  #include <stdbool.h>  #include <signal.h>  #include <stdlib.h>  #include <pthread.h>  #include <string.h> +#include <errno.h>  #include "fmgr.h"  #include "ribmgr.h" @@ -131,6 +133,13 @@ static int normal_ipcp_name_unreg(char * name)  static int normal_ipcp_enroll(char * dif_name)  { +        struct timespec timeout = {(ENROLL_TIMEOUT / 1000), +                                   (ENROLL_TIMEOUT % 1000) * MILLION}; +        struct timespec abstime; + +        clock_gettime(PTHREAD_COND_CLOCK, &abstime); +        ts_add(&abstime, &timeout, &abstime); +          pthread_mutex_lock(&_ipcp->state_lock);          if (_ipcp->state != IPCP_INIT) { @@ -147,10 +156,15 @@ static int normal_ipcp_enroll(char * dif_name)                  return -1;          } -        /* FIXME: Change into timedwait, see solution in irmd first */          pthread_mutex_lock(&_ipcp->state_lock);          while (_ipcp->state != IPCP_ENROLLED) -                pthread_cond_wait(&_ipcp->state_cond, &_ipcp->state_lock); +                if (pthread_cond_timedwait(&_ipcp->state_cond, +                                           &_ipcp->state_lock, +                                           &abstime) == ETIMEDOUT) { +                        pthread_mutex_unlock(&_ipcp->state_lock); +                        LOG_ERR("Enrollment didn't complete in time."); +                        return -1; +                }          pthread_mutex_unlock(&_ipcp->state_lock);          return 0; diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index bbc29b64..c8d517b5 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -26,10 +26,12 @@  #include <ouroboros/logs.h>  #include <ouroboros/cdap.h>  #include <ouroboros/list.h> +#include <ouroboros/time_utils.h>  #include <stdlib.h>  #include <pthread.h>  #include <string.h> +#include <errno.h>  #include "ribmgr.h"  #include "dt_const.h" @@ -58,6 +60,9 @@ struct cdap_request {          char *           name;          int              invoke_id;          struct cdap *    instance; +        int              result; +        bool             replied; +        pthread_cond_t   cond;          struct list_head next;  }; @@ -81,12 +86,19 @@ struct rib {  } * rib = NULL;  /* Call while holding cdap_reqs_lock */ -int cdap_request_add(struct cdap * instance, +int cdap_result_wait(struct cdap * instance,                       enum cdap_opcode code,                       char * name,                       int invoke_id)  {          struct cdap_request * req; +        pthread_condattr_t cattr; +        struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000), +                                   (CDAP_REPLY_TIMEOUT % 1000) * MILLION}; +        struct timespec abstime; + +        clock_gettime(PTHREAD_COND_CLOCK, &abstime); +        ts_add(&abstime, &timeout, &abstime);          req = malloc(sizeof(*req));          if (req == NULL) @@ -95,6 +107,12 @@ int cdap_request_add(struct cdap * instance,          req->code = code;          req->invoke_id = invoke_id;          req->instance = instance; +        req->result = -1; +        req->replied = false; + +        pthread_condattr_init(&cattr); +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +        pthread_cond_init(&req->cond, &cattr);          req->name = strdup(name);          if (req->name == NULL) { @@ -106,7 +124,15 @@ int cdap_request_add(struct cdap * instance,          list_add(&req->next, &rib->cdap_reqs); -        return 0; +        while (req->replied == false) +                if (pthread_cond_timedwait(&req->cond, +                                           &rib->cdap_reqs_lock, +                                           &abstime) == ETIMEDOUT) { +                        LOG_ERR("Didn't receive a CDAP reply in time."); +                        return -1; +                } + +        return req->result;  }  int ribmgr_init() @@ -190,6 +216,14 @@ int ribmgr_cdap_reply(struct cdap * instance,                          /* FIXME: In case of a read, update values here */ +                        req->replied = true; +                        req->result = result; +                        pthread_cond_broadcast(&req->cond); +                        pthread_mutex_unlock(&rib->cdap_reqs_lock); + +                        sched_yield(); + +                        pthread_mutex_lock(&rib->cdap_reqs_lock);                          free(req->name);                          list_del(&req->next);                          free(req); @@ -350,11 +384,11 @@ int ribmgr_cdap_start(struct cdap * instance,                          return -1;                  } -                if (cdap_request_add(instance, WRITE, STATIC_INFO, iid)) { +                if (cdap_result_wait(instance, WRITE, STATIC_INFO, iid)) {                          pthread_mutex_unlock(&rib->cdap_reqs_lock);                          pthread_mutex_unlock(&_ipcp->state_lock);                          free(data); -                        LOG_ERR("Failed to add CDAP request to list."); +                        LOG_ERR("Remote did not receive static information.");                          return -1;                  }                  pthread_mutex_unlock(&rib->cdap_reqs_lock); @@ -374,11 +408,11 @@ int ribmgr_cdap_start(struct cdap * instance,                          return -1;                  } -                if (cdap_request_add(instance, STOP, ENROLLMENT, iid)) { +                if (cdap_result_wait(instance, STOP, ENROLLMENT, iid)) {                          pthread_mutex_unlock(&rib->cdap_reqs_lock);                          pthread_mutex_unlock(&_ipcp->state_lock);                          free(data); -                        LOG_ERR("Failed to add CDAP request to list."); +                        LOG_ERR("Remote failed to complete enrollment.");                          return -1;                  }                  pthread_mutex_unlock(&rib->cdap_reqs_lock); @@ -470,10 +504,10 @@ int ribmgr_add_flow(int fd)                          return -1;                  } -                if (cdap_request_add(instance, START, ENROLLMENT, iid)) { +                if (cdap_result_wait(instance, START, ENROLLMENT, iid)) {                          pthread_mutex_unlock(&rib->cdap_reqs_lock);                          pthread_rwlock_unlock(&rib->flows_lock); -                        LOG_ERR("Failed to add CDAP request to list."); +                        LOG_ERR("Failed to start enrollment.");                          cdap_destroy(instance);                          free(flow);                          return -1; diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 4c70b2e4..5dc050a4 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -43,6 +43,12 @@ struct cdap {          struct cdap_ops * ops;  }; +struct cdap_info { +        pthread_t thread; +        struct cdap * instance; +        cdap_t * msg; +}; +  static int next_invoke_id(struct cdap * instance)  {          int ret; @@ -66,12 +72,84 @@ static int release_invoke_id(struct cdap * instance,          return ret;  } +static void * handle_cdap_msg(void * o) +{ +        struct cdap_info * info = (struct cdap_info *) o; +        struct cdap * instance = info->instance; +        cdap_t * msg = info->msg; + +        switch (msg->opcode) { +        case OPCODE__READ: +                if (msg->name != NULL) +                        instance->ops->cdap_read(instance, +                                                 msg->invoke_id, +                                                 msg->name); +                break; +        case OPCODE__WRITE: +                if (msg->name != NULL && +                    msg->has_value) +                        instance->ops->cdap_write(instance, +                                                  msg->invoke_id, +                                                  msg->name, +                                                  msg->value.data, +                                                  msg->value.len, +                                                  msg->flags); +                break; +        case OPCODE__CREATE: +                if (msg->name != NULL && +                    msg->has_value) +                        instance->ops->cdap_create(instance, +                                                   msg->invoke_id, +                                                   msg->name, +                                                   msg->value.data, +                                                   msg->value.len); +                break; +        case OPCODE__DELETE: +                if (msg->name != NULL && +                    msg->has_value) +                        instance->ops->cdap_create(instance, +                                                   msg->invoke_id, +                                                   msg->name, +                                                   msg->value.data, +                                                   msg->value.len); +                break; +        case OPCODE__START: +                if (msg->name != NULL) +                        instance->ops->cdap_start(instance, +                                                  msg->invoke_id, +                                                  msg->name); +                break; +        case OPCODE__STOP: +                if (msg->name != NULL) +                        instance->ops->cdap_stop(instance, +                                                 msg->invoke_id, +                                                 msg->name); +                break; +        case OPCODE__REPLY: +                instance->ops->cdap_reply(instance, +                                          msg->invoke_id, +                                          msg->result, +                                          msg->value.data, +                                          msg->value.len); +                release_invoke_id(instance, msg->invoke_id); +                break; +        default: +                break; +        } + +        free(info); +        cdap__free_unpacked(msg, NULL); + +        return (void *) 0; +} +  static void * sdu_reader(void * o)  {          struct cdap * instance = (struct cdap *) o;          cdap_t * msg;          uint8_t buf[BUF_SIZE];          ssize_t len; +        struct cdap_info * cdap_info;          while (true) {                  len = flow_read(instance->fd, buf, BUF_SIZE); @@ -82,69 +160,22 @@ static void * sdu_reader(void * o)                  if (msg == NULL)                          continue; -                switch (msg->opcode) { -                case OPCODE__READ: -                        if (msg->name != NULL) -                                instance->ops->cdap_read(instance, -                                                         msg->invoke_id, -                                                         msg->name); -                        break; -                case OPCODE__WRITE: -                        if (msg->name != NULL && -                            msg->has_value) { -                                instance->ops->cdap_write(instance, -                                                          msg->invoke_id, -                                                          msg->name, -                                                          msg->value.data, -                                                          msg->value.len, -                                                          msg->flags); -                        } -                        break; -                case OPCODE__CREATE: -                        if (msg->name != NULL && -                            msg->has_value) { -                                instance->ops->cdap_create(instance, -                                                           msg->invoke_id, -                                                           msg->name, -                                                           msg->value.data, -                                                           msg->value.len); -                        } -                        break; -                case OPCODE__DELETE: -                        if (msg->name != NULL && -                            msg->has_value) { -                                instance->ops->cdap_create(instance, -                                                           msg->invoke_id, -                                                           msg->name, -                                                           msg->value.data, -                                                           msg->value.len); -                        } -                        break; -                case OPCODE__START: -                        if (msg->name != NULL) -                                instance->ops->cdap_start(instance, -                                                          msg->invoke_id, -                                                          msg->name); -                        break; -                case OPCODE__STOP: -                        if (msg->name != NULL) -                                instance->ops->cdap_stop(instance, -                                                         msg->invoke_id, -                                                         msg->name); -                        break; -                case OPCODE__REPLY: -                        instance->ops->cdap_reply(instance, -                                                  msg->invoke_id, -                                                  msg->result, -                                                  msg->value.data, -                                                  msg->value.len); -                        release_invoke_id(instance, msg->invoke_id); -                        break; -                default: -                        break; +                cdap_info = malloc(sizeof(*cdap_info)); +                if (cdap_info == NULL) { +                        cdap__free_unpacked(msg, NULL); +                        continue;                  } -                cdap__free_unpacked(msg, NULL); +                cdap_info->instance = instance; +                cdap_info->msg = msg; + +                pthread_create(&cdap_info->thread, +                               NULL, +                               handle_cdap_msg, +                               (void *) cdap_info); + +                pthread_detach(cdap_info->thread); +          }          return (void *) 0; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 1c7fd600..4ca29636 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -40,8 +40,6 @@  #include <signal.h>  #include <sys/stat.h> -#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC -  #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry)         \                               + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \                               + 2 * sizeof (pthread_cond_t)) | 
