diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/cdap.c | 153 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 2 | 
2 files changed, 92 insertions, 63 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 4c70b2e4..5dc050a4 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -43,6 +43,12 @@ struct cdap {          struct cdap_ops * ops;  }; +struct cdap_info { +        pthread_t thread; +        struct cdap * instance; +        cdap_t * msg; +}; +  static int next_invoke_id(struct cdap * instance)  {          int ret; @@ -66,12 +72,84 @@ static int release_invoke_id(struct cdap * instance,          return ret;  } +static void * handle_cdap_msg(void * o) +{ +        struct cdap_info * info = (struct cdap_info *) o; +        struct cdap * instance = info->instance; +        cdap_t * msg = info->msg; + +        switch (msg->opcode) { +        case OPCODE__READ: +                if (msg->name != NULL) +                        instance->ops->cdap_read(instance, +                                                 msg->invoke_id, +                                                 msg->name); +                break; +        case OPCODE__WRITE: +                if (msg->name != NULL && +                    msg->has_value) +                        instance->ops->cdap_write(instance, +                                                  msg->invoke_id, +                                                  msg->name, +                                                  msg->value.data, +                                                  msg->value.len, +                                                  msg->flags); +                break; +        case OPCODE__CREATE: +                if (msg->name != NULL && +                    msg->has_value) +                        instance->ops->cdap_create(instance, +                                                   msg->invoke_id, +                                                   msg->name, +                                                   msg->value.data, +                                                   msg->value.len); +                break; +        case OPCODE__DELETE: +                if (msg->name != NULL && +                    msg->has_value) +                        instance->ops->cdap_create(instance, +                                                   msg->invoke_id, +                                                   msg->name, +                                                   msg->value.data, +                                                   msg->value.len); +                break; +        case OPCODE__START: +                if (msg->name != NULL) +                        instance->ops->cdap_start(instance, +                                                  msg->invoke_id, +                                                  msg->name); +                break; +        case OPCODE__STOP: +                if (msg->name != NULL) +                        instance->ops->cdap_stop(instance, +                                                 msg->invoke_id, +                                                 msg->name); +                break; +        case OPCODE__REPLY: +                instance->ops->cdap_reply(instance, +                                          msg->invoke_id, +                                          msg->result, +                                          msg->value.data, +                                          msg->value.len); +                release_invoke_id(instance, msg->invoke_id); +                break; +        default: +                break; +        } + +        free(info); +        cdap__free_unpacked(msg, NULL); + +        return (void *) 0; +} +  static void * sdu_reader(void * o)  {          struct cdap * instance = (struct cdap *) o;          cdap_t * msg;          uint8_t buf[BUF_SIZE];          ssize_t len; +        struct cdap_info * cdap_info;          while (true) {                  len = flow_read(instance->fd, buf, BUF_SIZE); @@ -82,69 +160,22 @@ static void * sdu_reader(void * o)                  if (msg == NULL)                          continue; -                switch (msg->opcode) { -                case OPCODE__READ: -                        if (msg->name != NULL) -                                instance->ops->cdap_read(instance, -                                                         msg->invoke_id, -                                                         msg->name); -                        break; -                case OPCODE__WRITE: -                        if (msg->name != NULL && -                            msg->has_value) { -                                instance->ops->cdap_write(instance, -                                                          msg->invoke_id, -                                                          msg->name, -                                                          msg->value.data, -                                                          msg->value.len, -                                                          msg->flags); -                        } -                        break; -                case OPCODE__CREATE: -                        if (msg->name != NULL && -                            msg->has_value) { -                                instance->ops->cdap_create(instance, -                                                           msg->invoke_id, -                                                           msg->name, -                                                           msg->value.data, -                                                           msg->value.len); -                        } -                        break; -                case OPCODE__DELETE: -                        if (msg->name != NULL && -                            msg->has_value) { -                                instance->ops->cdap_create(instance, -                                                           msg->invoke_id, -                                                           msg->name, -                                                           msg->value.data, -                                                           msg->value.len); -                        } -                        break; -                case OPCODE__START: -                        if (msg->name != NULL) -                                instance->ops->cdap_start(instance, -                                                          msg->invoke_id, -                                                          msg->name); -                        break; -                case OPCODE__STOP: -                        if (msg->name != NULL) -                                instance->ops->cdap_stop(instance, -                                                         msg->invoke_id, -                                                         msg->name); -                        break; -                case OPCODE__REPLY: -                        instance->ops->cdap_reply(instance, -                                                  msg->invoke_id, -                                                  msg->result, -                                                  msg->value.data, -                                                  msg->value.len); -                        release_invoke_id(instance, msg->invoke_id); -                        break; -                default: -                        break; +                cdap_info = malloc(sizeof(*cdap_info)); +                if (cdap_info == NULL) { +                        cdap__free_unpacked(msg, NULL); +                        continue;                  } -                cdap__free_unpacked(msg, NULL); +                cdap_info->instance = instance; +                cdap_info->msg = msg; + +                pthread_create(&cdap_info->thread, +                               NULL, +                               handle_cdap_msg, +                               (void *) cdap_info); + +                pthread_detach(cdap_info->thread); +          }          return (void *) 0; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 1c7fd600..4ca29636 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -40,8 +40,6 @@  #include <signal.h>  #include <sys/stat.h> -#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC -  #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry)         \                               + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \                               + 2 * sizeof (pthread_cond_t))  | 
