/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "signal.h"
#include "async_int.h"
#include <ucs/arch/atomic.h>
#include <ucs/datastruct/list.h>
#include <ucs/debug/debug.h>
#include <ucs/debug/log.h>
#include <ucs/sys/compiler.h>
#include <ucs/sys/sys.h>
#include <signal.h>
#define UCS_SIGNAL_MAX_TIMERQS 64
/*
* Per-thread system timer and software timer queue. We can dispatch timers only
* on the same thread which added them.
*/
typedef struct ucs_async_signal_timer {
pid_t tid; /* Thread ID */
timer_t sys_timer_id; /* System timer ID */
ucs_timer_queue_t timerq; /* Queue of timers for the thread */
} ucs_async_signal_timer_t;
static struct {
struct sigaction prev_sighandler; /* Previous signal handler */
int event_count; /* Number of events in use */
pthread_mutex_t event_lock; /* Lock for adding/removing events */
pthread_mutex_t timers_lock; /* Lock for timers array */
ucs_async_signal_timer_t timers[UCS_SIGNAL_MAX_TIMERQS];/* Array of all threads */
} ucs_async_signal_global_context = {
.event_count = 0,
.event_lock = PTHREAD_MUTEX_INITIALIZER,
.timers_lock = PTHREAD_MUTEX_INITIALIZER,
.timers = {{ .tid = 0 }}
};
/**
* In signal mode, we allow user to manipulate events only from the same thread.
* Otherwise, we'd get into big synchronization issues.
*/
#define UCS_ASYNC_SIGNAL_CHECK_THREAD(_async) \
if (ucs_get_tid() != ucs_async_signal_context_tid(_async)) { \
ucs_error("cannot manipulate signal async from different thread"); \
return UCS_ERR_UNREACHABLE; \
}
/**
* @return To which thread the async context should deliver events to.
*/
static pid_t ucs_async_signal_context_tid(ucs_async_context_t *async)
{
static pid_t pid = -1;
if (pid == -1) {
pid = getpid();
}
return (async == NULL) ? pid : async->signal.tid;;
}
static ucs_status_t
ucs_async_signal_set_fd_owner(pid_t dest_tid, int fd)
{
#if HAVE_DECL_F_SETOWN_EX
struct f_owner_ex owner;
owner.type = F_OWNER_TID;
owner.pid = dest_tid;
ucs_trace_async("fcntl(F_SETOWN_EX, fd=%d, tid=%d)", fd, dest_tid);
if (0 > fcntl(fd, F_SETOWN_EX, &owner)) {
ucs_error("fcntl F_SETOWN_EX failed: %m");
return UCS_ERR_IO_ERROR;
}
return UCS_OK;
#else
if (dest_tid != getpid()) {
ucs_error("Cannot use signaled events to threads without F_SETOWN_EX support");
return UCS_ERR_UNSUPPORTED;
}
if (0 > fcntl(fd, F_SETOWN, dest_tid)) {
ucs_error("fcntl F_SETOWN failed: %m");
return UCS_ERR_IO_ERROR;
}
return UCS_OK;
#endif
}
static ucs_status_t
ucs_async_signal_sys_timer_create(int uid, pid_t tid, timer_t *sys_timer_id)
{
struct sigevent ev;
timer_t timer;
int ret;
ucs_trace_func("tid=%d", tid);
/* Create timer signal */
memset(&ev, 0, sizeof(ev));
ev.sigev_notify = SIGEV_THREAD_ID;
ev.sigev_signo = ucs_global_opts.async_signo;
ev.sigev_value.sival_int = uid; /* user parameter to timer */
#if defined(HAVE_SIGEVENT_SIGEV_UN_TID)
ev._sigev_un._tid = tid; /* target thread */
#elif defined(HAVE_SIGEVENT_SIGEV_NOTIFY_THREAD_ID)
ev.sigev_notify_thread_id = tid; /* target thread */
#else
#error "Port me"
#endif
ret = timer_create(CLOCK_REALTIME, &ev, &timer);
if (ret < 0) {
ucs_error("failed to create an interval timer: %m");
return UCS_ERR_INVALID_PARAM;
}
*sys_timer_id = timer;
return UCS_OK;
}
static ucs_status_t
ucs_async_signal_sys_timer_set_interval(timer_t sys_timer_id, ucs_time_t interval)
{
struct itimerspec its;
int ret;
ucs_trace_func("sys_timer_id=%p interval=%.2f usec", sys_timer_id,
ucs_time_to_usec(interval));
/* Modify the timer to have the desired accuracy */
ucs_sec_to_timespec(ucs_time_to_sec(interval), &its.it_interval);
its.it_value = its.it_interval;
ret = timer_settime(sys_timer_id, 0, &its, NULL);
if (ret < 0) {
ucs_error("failed to set the interval for the interval timer: %m");
return UCS_ERR_INVALID_PARAM;
}
return UCS_OK;
}
static void ucs_async_signal_sys_timer_delete(timer_t sys_timer_id)
{
int ret;
ucs_trace_func("sys_timer_id=%p", sys_timer_id);
ret = timer_delete(sys_timer_id);
if (ret < 0) {
ucs_warn("failed to remove the timer: %m");
}
ucs_trace_async("removed system timer %p", sys_timer_id);
}
static ucs_status_t ucs_async_signal_dispatch_timer(int uid)
{
ucs_async_signal_timer_t *timer = &ucs_async_signal_global_context.timers[uid];
ucs_assertv_always((uid >= 0) && (uid < UCS_SIGNAL_MAX_TIMERQS), "uid=%d", uid);
/* No need to take lock - remove operation blocks signals on the same thread */
if (timer->tid != ucs_get_tid()) {
return UCS_OK;
}
return ucs_async_dispatch_timerq(&timer->timerq, ucs_get_time());
}
static void ucs_async_signal_handler(int signo, siginfo_t *siginfo, void *arg)
{
ucs_assert(signo == ucs_global_opts.async_signo);
/* Check event code */
switch (siginfo->si_code) {
case SI_TIMER:
ucs_trace_async("timer signal uid=%d", siginfo->si_value.sival_int);
ucs_async_signal_dispatch_timer(siginfo->si_value.sival_int);
return;
case POLL_IN:
case POLL_OUT:
case POLL_HUP:
case POLL_ERR:
case POLL_MSG:
case POLL_PRI:
ucs_trace_async("async signal handler called for fd %d", siginfo->si_fd);
ucs_async_dispatch_handlers(&siginfo->si_fd, 1);
return;
default:
ucs_warn("signal handler called with unexpected event code %d, ignoring",
siginfo->si_code);
return;
}
}
static void ucs_async_signal_allow(int allow)
{
sigset_t sigset;
ucs_trace_func("enable=%d tid=%d", allow, ucs_get_tid());
sigemptyset(&sigset);
sigaddset(&sigset, ucs_global_opts.async_signo);
pthread_sigmask(allow ? SIG_UNBLOCK : SIG_BLOCK, &sigset, NULL);
}
static void ucs_async_signal_block_all()
{
pthread_mutex_lock(&ucs_async_signal_global_context.event_lock);
if (ucs_async_signal_global_context.event_count > 0) {
ucs_async_signal_allow(0);
}
pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock);
}
static void ucs_async_signal_unblock_all()
{
pthread_mutex_lock(&ucs_async_signal_global_context.event_lock);
if (ucs_async_signal_global_context.event_count > 0) {
ucs_async_signal_allow(1);
}
pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock);
}
static ucs_status_t ucs_async_signal_install_handler()
{
struct sigaction new_action;
int ret;
ucs_trace_func("event_count=%d", ucs_async_signal_global_context.event_count);
pthread_mutex_lock(&ucs_async_signal_global_context.event_lock);
if (ucs_async_signal_global_context.event_count == 0) {
/* Set our signal handler */
new_action.sa_sigaction = ucs_async_signal_handler;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = SA_RESTART|SA_SIGINFO;
#if HAVE_SIGACTION_SA_RESTORER
new_action.sa_restorer = NULL;
#endif
ret = sigaction(ucs_global_opts.async_signo, &new_action,
&ucs_async_signal_global_context.prev_sighandler);
if (ret < 0) {
ucs_error("failed to set a handler for signal %d: %m",
ucs_global_opts.async_signo);
pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock);
return UCS_ERR_INVALID_PARAM;
}
ucs_trace_async("installed signal handler for %s",
ucs_signal_names[ucs_global_opts.async_signo]);
}
++ucs_async_signal_global_context.event_count;
pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock);
return UCS_OK;
}
static void fatal_sighandler(int signo, siginfo_t *siginfo, void *arg)
{
ucs_fatal("got timer signal");
}
static void ucs_async_signal_uninstall_handler()
{
struct sigaction new_action;
int ret;
ucs_trace_func("event_count=%d", ucs_async_signal_global_context.event_count);
pthread_mutex_lock(&ucs_async_signal_global_context.event_lock);
if (--ucs_async_signal_global_context.event_count == 0) {
new_action = ucs_async_signal_global_context.prev_sighandler;
new_action.sa_sigaction = fatal_sighandler;
ret = sigaction(ucs_global_opts.async_signo, &new_action, NULL);
if (ret < 0) {
ucs_warn("failed to restore the async signal handler: %m");
}
ucs_trace_async("uninstalled signal handler for %s",
ucs_signal_names[ucs_global_opts.async_signo]);
}
pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock);
}
static ucs_status_t ucs_async_signal_init(ucs_async_context_t *async)
{
async->signal.block_count = 0;
async->signal.tid = ucs_get_tid();
async->signal.pthread = pthread_self();
async->signal.timer = NULL;
return UCS_OK;
}
static void ucs_async_signal_cleanup(ucs_async_context_t *async)
{
if (async->signal.block_count > 0) {
ucs_warn("destroying async signal context with block_count %d",
async->signal.block_count);
}
}
static ucs_status_t ucs_async_signal_modify_event_fd(ucs_async_context_t *async,
int event_fd, int events)
{
ucs_status_t status;
int add, remove;
UCS_ASYNC_SIGNAL_CHECK_THREAD(async);
if (events) {
add = O_ASYNC; /* Enable notifications */
remove = 0;
} else {
add = 0; /* Disable notifications */
remove = O_ASYNC;
}
ucs_trace_async("fcntl(fd=%d, add=0x%x, remove=0x%x)", event_fd, add, remove);
status = ucs_sys_fcntl_modfl(event_fd, add, remove);
if (status != UCS_OK) {
ucs_error("fcntl F_SETFL failed: %m");
return UCS_ERR_IO_ERROR;
}
return UCS_OK;
}
static ucs_status_t ucs_async_signal_add_event_fd(ucs_async_context_t *async,
int event_fd, int events)
{
ucs_status_t status;
pid_t tid;
UCS_ASYNC_SIGNAL_CHECK_THREAD(async);
status = ucs_async_signal_install_handler();
if (status != UCS_OK) {
goto err;
}
/* Send signal when fd is ready */
ucs_trace_async("fcntl(F_STSIG, fd=%d, sig=%s)", event_fd,
ucs_signal_names[ucs_global_opts.async_signo]);
if (0 > fcntl(event_fd, F_SETSIG, ucs_global_opts.async_signo)) {
ucs_error("fcntl F_SETSIG failed: %m");
status = UCS_ERR_IO_ERROR;
goto err_remove_handler;
}
/* Send the signal to the desired thread */
tid = ucs_async_signal_context_tid(async);
status = ucs_async_signal_set_fd_owner(tid, event_fd);
if (status != UCS_OK) {
goto err_remove_handler;
}
/* Set async events on the file descriptor */
status = ucs_async_signal_modify_event_fd(async, event_fd, events);
if (status != UCS_OK) {
goto err_remove_handler;
}
return UCS_OK;
err_remove_handler:
ucs_async_signal_uninstall_handler();
err:
return status;
}
static ucs_status_t ucs_async_signal_remove_event_fd(ucs_async_context_t *async, int event_fd)
{
ucs_status_t status;
ucs_trace_func("event_fd=%d", event_fd);
UCS_ASYNC_SIGNAL_CHECK_THREAD(async);
ucs_async_signal_allow(0);
status = ucs_sys_fcntl_modfl(event_fd, 0, O_ASYNC);
ucs_async_signal_allow(1);
ucs_async_signal_uninstall_handler();
return status;
}
static int ucs_async_signal_try_block(ucs_async_context_t *async)
{
if (async->signal.block_count > 0) {
return 0;
}
UCS_ASYNC_SIGNAL_BLOCK(async);
return 1;
}
static void ucs_async_signal_unblock(ucs_async_context_t *async)
{
UCS_ASYNC_SIGNAL_UNBLOCK(async);
}
static void ucs_timer_reset_if_empty(ucs_async_signal_timer_t *timer)
{
if (ucs_timerq_is_empty(&timer->timerq)) {
ucs_async_signal_sys_timer_delete(timer->sys_timer_id);
ucs_timerq_cleanup(&timer->timerq);
timer->tid = 0;
}
}
/* Add a timer, possible initializing the timerq */
static ucs_status_t
ucs_async_signal_timerq_add_timer(ucs_async_signal_timer_t *timer, int tid,
int timer_id, ucs_time_t interval)
{
ucs_time_t sys_interval;
ucs_status_t status;
int uid;
if (timer->tid == 0) {
timer->tid = tid;
ucs_timerq_init(&timer->timerq);
uid = (timer - ucs_async_signal_global_context.timers);
status = ucs_async_signal_sys_timer_create(uid, timer->tid,
&timer->sys_timer_id);
if (status != UCS_OK) {
goto err;
}
}
status = ucs_timerq_add(&timer->timerq, timer_id, interval);
if (status != UCS_OK) {
goto err;
}
sys_interval = ucs_timerq_min_interval(&timer->timerq);
status = ucs_async_signal_sys_timer_set_interval(timer->sys_timer_id,
sys_interval);
if (status != UCS_OK) {
goto err_remove;
}
return UCS_OK;
err_remove:
ucs_timerq_remove(&timer->timerq, timer_id);
err:
ucs_timer_reset_if_empty(timer);
return status;
}
/* Remove a timer, possible resetting the timerq */
static ucs_status_t
ucs_async_signal_timerq_remove_timer(ucs_async_signal_timer_t *timer,
int timer_id)
{
ucs_status_t status;
status = ucs_timerq_remove(&timer->timerq, timer_id);
if (status != UCS_OK) {
return status;
}
ucs_timer_reset_if_empty(timer);
return UCS_OK;
}
static ucs_async_signal_timer_t *ucs_async_signal_find_timer(pid_t tid)
{
ucs_async_signal_timer_t *timer;
for (timer = ucs_async_signal_global_context.timers;
timer < &ucs_async_signal_global_context.timers[UCS_SIGNAL_MAX_TIMERQS];
++timer)
{
if (timer->tid == tid) {
return timer;
}
}
return NULL;
}
static ucs_status_t ucs_async_signal_add_timer(ucs_async_context_t *async,
int timer_id, ucs_time_t interval)
{
ucs_async_signal_timer_t *timer;
ucs_status_t status;
pid_t tid;
ucs_trace_func("async=%p interval=%.2fus timer_id=%d",
async, ucs_time_to_usec(interval), timer_id);
UCS_ASYNC_SIGNAL_CHECK_THREAD(async);
/* Must install signal handler before arming the timer */
status = ucs_async_signal_install_handler();
if (status != UCS_OK) {
goto err;
}
ucs_async_signal_allow(0);
pthread_mutex_lock(&ucs_async_signal_global_context.timers_lock);
/* Find existing or available timer queue for the current thread */
tid = ucs_async_signal_context_tid(async);
timer = ucs_async_signal_find_timer(tid);
if (timer == NULL) {
timer = ucs_async_signal_find_timer(0); /* Search for free slot */
}
if (timer == NULL) {
status = UCS_ERR_EXCEEDS_LIMIT;
} else {
status = ucs_async_signal_timerq_add_timer(timer, tid, timer_id, interval);
}
pthread_mutex_unlock(&ucs_async_signal_global_context.timers_lock);
ucs_async_signal_allow(1);
if (status != UCS_OK) {
goto err_uninstall_handler;
}
return UCS_OK;
err_uninstall_handler:
ucs_async_signal_uninstall_handler();
err:
return status;
}
static ucs_status_t ucs_async_signal_remove_timer(ucs_async_context_t *async,
int timer_id)
{
ucs_async_signal_timer_t *timer;
ucs_status_t status;
ucs_trace_func("async=%p timer_id=%d", async, timer_id);
UCS_ASYNC_SIGNAL_CHECK_THREAD(async);
ucs_async_signal_allow(0);
pthread_mutex_lock(&ucs_async_signal_global_context.timers_lock);
timer = ucs_async_signal_find_timer(ucs_async_signal_context_tid(async));
if (timer == NULL) {
status = UCS_ERR_NO_ELEM;
} else {
status = ucs_async_signal_timerq_remove_timer(timer, timer_id);
}
pthread_mutex_unlock(&ucs_async_signal_global_context.timers_lock);
ucs_async_signal_allow(1);
if (status == UCS_OK) {
ucs_async_signal_uninstall_handler();
}
return status;
}
static void ucs_async_signal_global_init()
{
pthread_mutex_init(&ucs_async_signal_global_context.timers_lock, NULL);
}
static void ucs_async_signal_global_cleanup()
{
if (ucs_async_signal_global_context.event_count != 0) {
ucs_warn("signal handler not removed (%d events remaining)",
ucs_async_signal_global_context.event_count);
}
pthread_mutex_destroy(&ucs_async_signal_global_context.timers_lock);
}
ucs_async_ops_t ucs_async_signal_ops = {
.init = ucs_async_signal_global_init,
.cleanup = ucs_async_signal_global_cleanup,
.block = ucs_async_signal_block_all,
.unblock = ucs_async_signal_unblock_all,
.context_init = ucs_async_signal_init,
.context_cleanup = ucs_async_signal_cleanup,
.context_try_block = ucs_async_signal_try_block,
.context_unblock = ucs_async_signal_unblock,
.add_event_fd = ucs_async_signal_add_event_fd,
.remove_event_fd = ucs_async_signal_remove_event_fd,
.modify_event_fd = ucs_async_signal_modify_event_fd,
.add_timer = ucs_async_signal_add_timer,
.remove_timer = ucs_async_signal_remove_timer,
};