/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "thread.h"
#include "async_int.h"
#include "pipe.h"
#include <ucs/arch/atomic.h>
#include <ucs/sys/checker.h>
#include <ucs/sys/stubs.h>
#include <ucs/sys/event_set.h>
#define UCS_ASYNC_EPOLL_MAX_EVENTS 16
#define UCS_ASYNC_EPOLL_MIN_TIMEOUT_MS 2.0
typedef struct ucs_async_thread {
ucs_async_pipe_t wakeup;
ucs_sys_event_set_t *event_set;
ucs_timer_queue_t timerq;
pthread_t thread_id;
int stop;
uint32_t refcnt;
} ucs_async_thread_t;
typedef struct ucs_async_thread_global_context {
ucs_async_thread_t *thread;
unsigned use_count;
pthread_mutex_t lock;
} ucs_async_thread_global_context_t;
typedef struct ucs_async_thread_callback_arg {
ucs_async_thread_t *thread;
int *is_missed;
} ucs_async_thread_callback_arg_t;
static ucs_async_thread_global_context_t ucs_async_thread_global_context = {
.thread = NULL,
.use_count = 0,
.lock = PTHREAD_MUTEX_INITIALIZER
};
static void ucs_async_thread_hold(ucs_async_thread_t *thread)
{
ucs_atomic_add32(&thread->refcnt, 1);
}
static void ucs_async_thread_put(ucs_async_thread_t *thread)
{
if (ucs_atomic_fsub32(&thread->refcnt, 1) == 1) {
ucs_event_set_cleanup(thread->event_set);
ucs_async_pipe_destroy(&thread->wakeup);
ucs_timerq_cleanup(&thread->timerq);
ucs_free(thread);
}
}
static void ucs_async_thread_ev_handler(void *callback_data, int event,
void *arg)
{
ucs_async_thread_callback_arg_t *cb_arg = (void*)arg;
int fd = (int)(uintptr_t)callback_data;
ucs_status_t status;
ucs_trace_async("ucs_async_thread_ev_handler(fd=%d, event=%d)",
fd, event);
if (fd == ucs_async_pipe_rfd(&cb_arg->thread->wakeup)) {
ucs_trace_async("progress thread woken up");
ucs_async_pipe_drain(&cb_arg->thread->wakeup);
return;
}
status = ucs_async_dispatch_handlers(&fd, 1);
if (status == UCS_ERR_NO_PROGRESS) {
*cb_arg->is_missed = 1;
}
}
static void *ucs_async_thread_func(void *arg)
{
ucs_async_thread_t *thread = arg;
ucs_time_t last_time, curr_time, timer_interval, time_spent;
int is_missed, timeout_ms;
ucs_status_t status;
unsigned num_events;
ucs_async_thread_callback_arg_t cb_arg;
is_missed = 0;
curr_time = ucs_get_time();
last_time = ucs_get_time();
cb_arg.thread = thread;
cb_arg.is_missed = &is_missed;
while (!thread->stop) {
num_events = ucs_min(UCS_ASYNC_EPOLL_MAX_EVENTS,
ucs_sys_event_set_max_wait_events);
/* If we didn't get the lock, give other threads priority */
if (is_missed) {
sched_yield();
is_missed = 0;
}
/* Wait until the remainder of current period */
timer_interval = ucs_timerq_min_interval(&thread->timerq);
if (timer_interval == UCS_TIME_INFINITY) {
timeout_ms = -1;
} else {
time_spent = curr_time - last_time;
timeout_ms = ucs_time_to_msec(timer_interval -
ucs_min(time_spent, timer_interval));
}
status = ucs_event_set_wait(thread->event_set,
&num_events, timeout_ms,
ucs_async_thread_ev_handler,
(void*)&cb_arg);
if (UCS_STATUS_IS_ERR(status)) {
ucs_fatal("ucs_event_set_wait() failed: %d", status);
}
/* Check timers */
curr_time = ucs_get_time();
if (curr_time - last_time > timer_interval) {
status = ucs_async_dispatch_timerq(&thread->timerq, curr_time);
if (status == UCS_ERR_NO_PROGRESS) {
is_missed = 1;
}
last_time = curr_time;
}
}
ucs_async_thread_put(thread);
return NULL;
}
static ucs_status_t ucs_async_thread_start(ucs_async_thread_t **thread_p)
{
ucs_async_thread_t *thread;
ucs_status_t status;
int wakeup_rfd;
int ret;
ucs_trace_func("");
pthread_mutex_lock(&ucs_async_thread_global_context.lock);
if (ucs_async_thread_global_context.use_count++ > 0) {
/* Thread already started */
status = UCS_OK;
goto out_unlock;
}
ucs_assert_always(ucs_async_thread_global_context.thread == NULL);
thread = ucs_malloc(sizeof(*thread), "async_thread_context");
if (thread == NULL) {
status = UCS_ERR_NO_MEMORY;
goto err;
}
thread->stop = 0;
thread->refcnt = 1;
status = ucs_timerq_init(&thread->timerq);
if (status != UCS_OK) {
goto err_free;
}
status = ucs_async_pipe_create(&thread->wakeup);
if (status != UCS_OK) {
goto err_timerq_cleanup;
}
status = ucs_event_set_create(&thread->event_set);
if (status != UCS_OK) {
goto err_close_pipe;
}
/* Store file descriptor into void * storage without memory allocation. */
wakeup_rfd = ucs_async_pipe_rfd(&thread->wakeup);
status = ucs_event_set_add(thread->event_set, wakeup_rfd,
UCS_EVENT_SET_EVREAD,
(void *)(uintptr_t)wakeup_rfd);
if (status != UCS_OK) {
status = UCS_ERR_IO_ERROR;
goto err_free_event_set;
}
ret = pthread_create(&thread->thread_id, NULL, ucs_async_thread_func, thread);
if (ret != 0) {
ucs_error("pthread_create() returned %d: %m", ret);
status = UCS_ERR_IO_ERROR;
goto err_free_event_set;
}
ucs_async_thread_global_context.thread = thread;
status = UCS_OK;
goto out_unlock;
err_free_event_set:
ucs_event_set_cleanup(thread->event_set);
err_close_pipe:
ucs_async_pipe_destroy(&thread->wakeup);
err_timerq_cleanup:
ucs_timerq_cleanup(&thread->timerq);
err_free:
ucs_free(thread);
err:
--ucs_async_thread_global_context.use_count;
out_unlock:
ucs_assert_always(ucs_async_thread_global_context.thread != NULL);
*thread_p = ucs_async_thread_global_context.thread;
pthread_mutex_unlock(&ucs_async_thread_global_context.lock);
return status;
}
static void ucs_async_thread_stop()
{
ucs_async_thread_t *thread = NULL;
ucs_trace_func("");
pthread_mutex_lock(&ucs_async_thread_global_context.lock);
if (--ucs_async_thread_global_context.use_count == 0) {
thread = ucs_async_thread_global_context.thread;
ucs_async_thread_hold(thread);
thread->stop = 1;
ucs_async_pipe_push(&thread->wakeup);
ucs_async_thread_global_context.thread = NULL;
}
pthread_mutex_unlock(&ucs_async_thread_global_context.lock);
if (thread != NULL) {
if (pthread_self() == thread->thread_id) {
pthread_detach(thread->thread_id);
} else {
pthread_join(thread->thread_id, NULL);
}
ucs_async_thread_put(thread);
}
}
static ucs_status_t ucs_async_thread_spinlock_init(ucs_async_context_t *async)
{
return ucs_spinlock_init(&async->thread.spinlock);
}
static void ucs_async_thread_spinlock_cleanup(ucs_async_context_t *async)
{
ucs_status_t status;
status = ucs_spinlock_destroy(&async->thread.spinlock);
if (status != UCS_OK) {
ucs_warn("ucs_spinlock_destroy() failed (%d)", status);
}
}
static int ucs_async_thread_spinlock_try_block(ucs_async_context_t *async)
{
return ucs_spin_trylock(&async->thread.spinlock);
}
static void ucs_async_thread_spinlock_unblock(ucs_async_context_t *async)
{
ucs_spin_unlock(&async->thread.spinlock);
}
static ucs_status_t ucs_async_thread_mutex_init(ucs_async_context_t *async)
{
pthread_mutexattr_t attr;
int ret;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
ret = pthread_mutex_init(&async->thread.mutex, &attr);
if (ret == 0) {
return UCS_OK;
}
ucs_error("failed to initialize async lock: %s", strerror(ret));
return UCS_ERR_INVALID_PARAM;
}
static void ucs_async_thread_mutex_cleanup(ucs_async_context_t *async)
{
int ret = pthread_mutex_destroy(&async->thread.mutex);
if (ret != 0) {
ucs_warn("failed to destroy async lock: %s", strerror(ret));
}
}
static ucs_status_t ucs_async_thread_add_event_fd(ucs_async_context_t *async,
int event_fd, int events)
{
ucs_async_thread_t *thread;
ucs_status_t status;
status = ucs_async_thread_start(&thread);
if (status != UCS_OK) {
goto err;
}
/* Store file descriptor into void * storage without memory allocation. */
status = ucs_event_set_add(thread->event_set, event_fd,
(ucs_event_set_type_t)events,
(void *)(uintptr_t)event_fd);
if (status != UCS_OK) {
status = UCS_ERR_IO_ERROR;
goto err_removed;
}
ucs_async_pipe_push(&thread->wakeup);
return UCS_OK;
err_removed:
ucs_async_thread_stop();
err:
return status;
}
static ucs_status_t ucs_async_thread_remove_event_fd(ucs_async_context_t *async,
int event_fd)
{
ucs_async_thread_t *thread = ucs_async_thread_global_context.thread;
ucs_status_t status;
status = ucs_event_set_del(thread->event_set, event_fd);
if (status != UCS_OK) {
return status;
}
ucs_async_thread_stop();
return UCS_OK;
}
static ucs_status_t ucs_async_thread_modify_event_fd(ucs_async_context_t *async,
int event_fd, int events)
{
/* Store file descriptor into void * storage without memory allocation. */
return ucs_event_set_mod(ucs_async_thread_global_context.thread->event_set,
event_fd, (ucs_event_set_type_t)events,
(void *)(uintptr_t)event_fd);
}
static int ucs_async_thread_mutex_try_block(ucs_async_context_t *async)
{
return !pthread_mutex_trylock(&async->thread.mutex);
}
static void ucs_async_thread_mutex_unblock(ucs_async_context_t *async)
{
(void)pthread_mutex_unlock(&async->thread.mutex);
}
static ucs_status_t ucs_async_thread_add_timer(ucs_async_context_t *async,
int timer_id, ucs_time_t interval)
{
ucs_async_thread_t *thread;
ucs_status_t status;
if (ucs_time_to_msec(interval) == 0) {
ucs_error("timer interval is too small (%.2f usec)", ucs_time_to_usec(interval));
status = UCS_ERR_INVALID_PARAM;
goto err;
}
status = ucs_async_thread_start(&thread);
if (status != UCS_OK) {
goto err;
}
status = ucs_timerq_add(&thread->timerq, timer_id, interval);
if (status != UCS_OK) {
goto err_stop;
}
ucs_async_pipe_push(&thread->wakeup);
return UCS_OK;
err_stop:
ucs_async_thread_stop();
err:
return status;
}
static ucs_status_t ucs_async_thread_remove_timer(ucs_async_context_t *async,
int timer_id)
{
ucs_async_thread_t *thread = ucs_async_thread_global_context.thread;
ucs_timerq_remove(&thread->timerq, timer_id);
ucs_async_pipe_push(&thread->wakeup);
ucs_async_thread_stop();
return UCS_OK;
}
static void ucs_async_signal_global_cleanup()
{
if (ucs_async_thread_global_context.thread != NULL) {
ucs_debug("async thread still running (use count %u)",
ucs_async_thread_global_context.use_count);
}
}
ucs_async_ops_t ucs_async_thread_spinlock_ops = {
.init = ucs_empty_function,
.cleanup = ucs_async_signal_global_cleanup,
.block = ucs_empty_function,
.unblock = ucs_empty_function,
.context_init = ucs_async_thread_spinlock_init,
.context_cleanup = ucs_async_thread_spinlock_cleanup,
.context_try_block = ucs_async_thread_spinlock_try_block,
.context_unblock = ucs_async_thread_spinlock_unblock,
.add_event_fd = ucs_async_thread_add_event_fd,
.remove_event_fd = ucs_async_thread_remove_event_fd,
.modify_event_fd = ucs_async_thread_modify_event_fd,
.add_timer = ucs_async_thread_add_timer,
.remove_timer = ucs_async_thread_remove_timer,
};
ucs_async_ops_t ucs_async_thread_mutex_ops = {
.init = ucs_empty_function,
.cleanup = ucs_async_signal_global_cleanup,
.block = ucs_empty_function,
.unblock = ucs_empty_function,
.context_init = ucs_async_thread_mutex_init,
.context_cleanup = ucs_async_thread_mutex_cleanup,
.context_try_block = ucs_async_thread_mutex_try_block,
.context_unblock = ucs_async_thread_mutex_unblock,
.add_event_fd = ucs_async_thread_add_event_fd,
.remove_event_fd = ucs_async_thread_remove_event_fd,
.modify_event_fd = ucs_async_thread_modify_event_fd,
.add_timer = ucs_async_thread_add_timer,
.remove_timer = ucs_async_thread_remove_timer,
};