/** * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "thread.h" #include "async_int.h" #include "pipe.h" #include <ucs/arch/atomic.h> #include <ucs/sys/checker.h> #include <ucs/sys/stubs.h> #include <ucs/sys/event_set.h> #define UCS_ASYNC_EPOLL_MAX_EVENTS 16 #define UCS_ASYNC_EPOLL_MIN_TIMEOUT_MS 2.0 typedef struct ucs_async_thread { ucs_async_pipe_t wakeup; ucs_sys_event_set_t *event_set; ucs_timer_queue_t timerq; pthread_t thread_id; int stop; uint32_t refcnt; } ucs_async_thread_t; typedef struct ucs_async_thread_global_context { ucs_async_thread_t *thread; unsigned use_count; pthread_mutex_t lock; } ucs_async_thread_global_context_t; typedef struct ucs_async_thread_callback_arg { ucs_async_thread_t *thread; int *is_missed; } ucs_async_thread_callback_arg_t; static ucs_async_thread_global_context_t ucs_async_thread_global_context = { .thread = NULL, .use_count = 0, .lock = PTHREAD_MUTEX_INITIALIZER }; static void ucs_async_thread_hold(ucs_async_thread_t *thread) { ucs_atomic_add32(&thread->refcnt, 1); } static void ucs_async_thread_put(ucs_async_thread_t *thread) { if (ucs_atomic_fsub32(&thread->refcnt, 1) == 1) { ucs_event_set_cleanup(thread->event_set); ucs_async_pipe_destroy(&thread->wakeup); ucs_timerq_cleanup(&thread->timerq); ucs_free(thread); } } static void ucs_async_thread_ev_handler(void *callback_data, int event, void *arg) { ucs_async_thread_callback_arg_t *cb_arg = (void*)arg; int fd = (int)(uintptr_t)callback_data; ucs_status_t status; ucs_trace_async("ucs_async_thread_ev_handler(fd=%d, event=%d)", fd, event); if (fd == ucs_async_pipe_rfd(&cb_arg->thread->wakeup)) { ucs_trace_async("progress thread woken up"); ucs_async_pipe_drain(&cb_arg->thread->wakeup); return; } status = ucs_async_dispatch_handlers(&fd, 1); if (status == UCS_ERR_NO_PROGRESS) { *cb_arg->is_missed = 1; } } static void *ucs_async_thread_func(void *arg) { ucs_async_thread_t *thread = arg; ucs_time_t last_time, curr_time, timer_interval, time_spent; int is_missed, timeout_ms; ucs_status_t status; unsigned num_events; ucs_async_thread_callback_arg_t cb_arg; is_missed = 0; curr_time = ucs_get_time(); last_time = ucs_get_time(); cb_arg.thread = thread; cb_arg.is_missed = &is_missed; while (!thread->stop) { num_events = ucs_min(UCS_ASYNC_EPOLL_MAX_EVENTS, ucs_sys_event_set_max_wait_events); /* If we didn't get the lock, give other threads priority */ if (is_missed) { sched_yield(); is_missed = 0; } /* Wait until the remainder of current period */ timer_interval = ucs_timerq_min_interval(&thread->timerq); if (timer_interval == UCS_TIME_INFINITY) { timeout_ms = -1; } else { time_spent = curr_time - last_time; timeout_ms = ucs_time_to_msec(timer_interval - ucs_min(time_spent, timer_interval)); } status = ucs_event_set_wait(thread->event_set, &num_events, timeout_ms, ucs_async_thread_ev_handler, (void*)&cb_arg); if (UCS_STATUS_IS_ERR(status)) { ucs_fatal("ucs_event_set_wait() failed: %d", status); } /* Check timers */ curr_time = ucs_get_time(); if (curr_time - last_time > timer_interval) { status = ucs_async_dispatch_timerq(&thread->timerq, curr_time); if (status == UCS_ERR_NO_PROGRESS) { is_missed = 1; } last_time = curr_time; } } ucs_async_thread_put(thread); return NULL; } static ucs_status_t ucs_async_thread_start(ucs_async_thread_t **thread_p) { ucs_async_thread_t *thread; ucs_status_t status; int wakeup_rfd; int ret; ucs_trace_func(""); pthread_mutex_lock(&ucs_async_thread_global_context.lock); if (ucs_async_thread_global_context.use_count++ > 0) { /* Thread already started */ status = UCS_OK; goto out_unlock; } ucs_assert_always(ucs_async_thread_global_context.thread == NULL); thread = ucs_malloc(sizeof(*thread), "async_thread_context"); if (thread == NULL) { status = UCS_ERR_NO_MEMORY; goto err; } thread->stop = 0; thread->refcnt = 1; status = ucs_timerq_init(&thread->timerq); if (status != UCS_OK) { goto err_free; } status = ucs_async_pipe_create(&thread->wakeup); if (status != UCS_OK) { goto err_timerq_cleanup; } status = ucs_event_set_create(&thread->event_set); if (status != UCS_OK) { goto err_close_pipe; } /* Store file descriptor into void * storage without memory allocation. */ wakeup_rfd = ucs_async_pipe_rfd(&thread->wakeup); status = ucs_event_set_add(thread->event_set, wakeup_rfd, UCS_EVENT_SET_EVREAD, (void *)(uintptr_t)wakeup_rfd); if (status != UCS_OK) { status = UCS_ERR_IO_ERROR; goto err_free_event_set; } ret = pthread_create(&thread->thread_id, NULL, ucs_async_thread_func, thread); if (ret != 0) { ucs_error("pthread_create() returned %d: %m", ret); status = UCS_ERR_IO_ERROR; goto err_free_event_set; } ucs_async_thread_global_context.thread = thread; status = UCS_OK; goto out_unlock; err_free_event_set: ucs_event_set_cleanup(thread->event_set); err_close_pipe: ucs_async_pipe_destroy(&thread->wakeup); err_timerq_cleanup: ucs_timerq_cleanup(&thread->timerq); err_free: ucs_free(thread); err: --ucs_async_thread_global_context.use_count; out_unlock: ucs_assert_always(ucs_async_thread_global_context.thread != NULL); *thread_p = ucs_async_thread_global_context.thread; pthread_mutex_unlock(&ucs_async_thread_global_context.lock); return status; } static void ucs_async_thread_stop() { ucs_async_thread_t *thread = NULL; ucs_trace_func(""); pthread_mutex_lock(&ucs_async_thread_global_context.lock); if (--ucs_async_thread_global_context.use_count == 0) { thread = ucs_async_thread_global_context.thread; ucs_async_thread_hold(thread); thread->stop = 1; ucs_async_pipe_push(&thread->wakeup); ucs_async_thread_global_context.thread = NULL; } pthread_mutex_unlock(&ucs_async_thread_global_context.lock); if (thread != NULL) { if (pthread_self() == thread->thread_id) { pthread_detach(thread->thread_id); } else { pthread_join(thread->thread_id, NULL); } ucs_async_thread_put(thread); } } static ucs_status_t ucs_async_thread_spinlock_init(ucs_async_context_t *async) { return ucs_spinlock_init(&async->thread.spinlock); } static void ucs_async_thread_spinlock_cleanup(ucs_async_context_t *async) { ucs_status_t status; status = ucs_spinlock_destroy(&async->thread.spinlock); if (status != UCS_OK) { ucs_warn("ucs_spinlock_destroy() failed (%d)", status); } } static int ucs_async_thread_spinlock_try_block(ucs_async_context_t *async) { return ucs_spin_trylock(&async->thread.spinlock); } static void ucs_async_thread_spinlock_unblock(ucs_async_context_t *async) { ucs_spin_unlock(&async->thread.spinlock); } static ucs_status_t ucs_async_thread_mutex_init(ucs_async_context_t *async) { pthread_mutexattr_t attr; int ret; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); ret = pthread_mutex_init(&async->thread.mutex, &attr); if (ret == 0) { return UCS_OK; } ucs_error("failed to initialize async lock: %s", strerror(ret)); return UCS_ERR_INVALID_PARAM; } static void ucs_async_thread_mutex_cleanup(ucs_async_context_t *async) { int ret = pthread_mutex_destroy(&async->thread.mutex); if (ret != 0) { ucs_warn("failed to destroy async lock: %s", strerror(ret)); } } static ucs_status_t ucs_async_thread_add_event_fd(ucs_async_context_t *async, int event_fd, int events) { ucs_async_thread_t *thread; ucs_status_t status; status = ucs_async_thread_start(&thread); if (status != UCS_OK) { goto err; } /* Store file descriptor into void * storage without memory allocation. */ status = ucs_event_set_add(thread->event_set, event_fd, (ucs_event_set_type_t)events, (void *)(uintptr_t)event_fd); if (status != UCS_OK) { status = UCS_ERR_IO_ERROR; goto err_removed; } ucs_async_pipe_push(&thread->wakeup); return UCS_OK; err_removed: ucs_async_thread_stop(); err: return status; } static ucs_status_t ucs_async_thread_remove_event_fd(ucs_async_context_t *async, int event_fd) { ucs_async_thread_t *thread = ucs_async_thread_global_context.thread; ucs_status_t status; status = ucs_event_set_del(thread->event_set, event_fd); if (status != UCS_OK) { return status; } ucs_async_thread_stop(); return UCS_OK; } static ucs_status_t ucs_async_thread_modify_event_fd(ucs_async_context_t *async, int event_fd, int events) { /* Store file descriptor into void * storage without memory allocation. */ return ucs_event_set_mod(ucs_async_thread_global_context.thread->event_set, event_fd, (ucs_event_set_type_t)events, (void *)(uintptr_t)event_fd); } static int ucs_async_thread_mutex_try_block(ucs_async_context_t *async) { return !pthread_mutex_trylock(&async->thread.mutex); } static void ucs_async_thread_mutex_unblock(ucs_async_context_t *async) { (void)pthread_mutex_unlock(&async->thread.mutex); } static ucs_status_t ucs_async_thread_add_timer(ucs_async_context_t *async, int timer_id, ucs_time_t interval) { ucs_async_thread_t *thread; ucs_status_t status; if (ucs_time_to_msec(interval) == 0) { ucs_error("timer interval is too small (%.2f usec)", ucs_time_to_usec(interval)); status = UCS_ERR_INVALID_PARAM; goto err; } status = ucs_async_thread_start(&thread); if (status != UCS_OK) { goto err; } status = ucs_timerq_add(&thread->timerq, timer_id, interval); if (status != UCS_OK) { goto err_stop; } ucs_async_pipe_push(&thread->wakeup); return UCS_OK; err_stop: ucs_async_thread_stop(); err: return status; } static ucs_status_t ucs_async_thread_remove_timer(ucs_async_context_t *async, int timer_id) { ucs_async_thread_t *thread = ucs_async_thread_global_context.thread; ucs_timerq_remove(&thread->timerq, timer_id); ucs_async_pipe_push(&thread->wakeup); ucs_async_thread_stop(); return UCS_OK; } static void ucs_async_signal_global_cleanup() { if (ucs_async_thread_global_context.thread != NULL) { ucs_debug("async thread still running (use count %u)", ucs_async_thread_global_context.use_count); } } ucs_async_ops_t ucs_async_thread_spinlock_ops = { .init = ucs_empty_function, .cleanup = ucs_async_signal_global_cleanup, .block = ucs_empty_function, .unblock = ucs_empty_function, .context_init = ucs_async_thread_spinlock_init, .context_cleanup = ucs_async_thread_spinlock_cleanup, .context_try_block = ucs_async_thread_spinlock_try_block, .context_unblock = ucs_async_thread_spinlock_unblock, .add_event_fd = ucs_async_thread_add_event_fd, .remove_event_fd = ucs_async_thread_remove_event_fd, .modify_event_fd = ucs_async_thread_modify_event_fd, .add_timer = ucs_async_thread_add_timer, .remove_timer = ucs_async_thread_remove_timer, }; ucs_async_ops_t ucs_async_thread_mutex_ops = { .init = ucs_empty_function, .cleanup = ucs_async_signal_global_cleanup, .block = ucs_empty_function, .unblock = ucs_empty_function, .context_init = ucs_async_thread_mutex_init, .context_cleanup = ucs_async_thread_mutex_cleanup, .context_try_block = ucs_async_thread_mutex_try_block, .context_unblock = ucs_async_thread_mutex_unblock, .add_event_fd = ucs_async_thread_add_event_fd, .remove_event_fd = ucs_async_thread_remove_event_fd, .modify_event_fd = ucs_async_thread_modify_event_fd, .add_timer = ucs_async_thread_add_timer, .remove_timer = ucs_async_thread_remove_timer, };