/** * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "signal.h" #include "async_int.h" #include #include #include #include #include #include #include #define UCS_SIGNAL_MAX_TIMERQS 64 /* * Per-thread system timer and software timer queue. We can dispatch timers only * on the same thread which added them. */ typedef struct ucs_async_signal_timer { pid_t tid; /* Thread ID */ timer_t sys_timer_id; /* System timer ID */ ucs_timer_queue_t timerq; /* Queue of timers for the thread */ } ucs_async_signal_timer_t; static struct { struct sigaction prev_sighandler; /* Previous signal handler */ int event_count; /* Number of events in use */ pthread_mutex_t event_lock; /* Lock for adding/removing events */ pthread_mutex_t timers_lock; /* Lock for timers array */ ucs_async_signal_timer_t timers[UCS_SIGNAL_MAX_TIMERQS];/* Array of all threads */ } ucs_async_signal_global_context = { .event_count = 0, .event_lock = PTHREAD_MUTEX_INITIALIZER, .timers_lock = PTHREAD_MUTEX_INITIALIZER, .timers = {{ .tid = 0 }} }; /** * In signal mode, we allow user to manipulate events only from the same thread. * Otherwise, we'd get into big synchronization issues. */ #define UCS_ASYNC_SIGNAL_CHECK_THREAD(_async) \ if (ucs_get_tid() != ucs_async_signal_context_tid(_async)) { \ ucs_error("cannot manipulate signal async from different thread"); \ return UCS_ERR_UNREACHABLE; \ } /** * @return To which thread the async context should deliver events to. */ static pid_t ucs_async_signal_context_tid(ucs_async_context_t *async) { static pid_t pid = -1; if (pid == -1) { pid = getpid(); } return (async == NULL) ? pid : async->signal.tid;; } static ucs_status_t ucs_async_signal_set_fd_owner(pid_t dest_tid, int fd) { #if HAVE_DECL_F_SETOWN_EX struct f_owner_ex owner; owner.type = F_OWNER_TID; owner.pid = dest_tid; ucs_trace_async("fcntl(F_SETOWN_EX, fd=%d, tid=%d)", fd, dest_tid); if (0 > fcntl(fd, F_SETOWN_EX, &owner)) { ucs_error("fcntl F_SETOWN_EX failed: %m"); return UCS_ERR_IO_ERROR; } return UCS_OK; #else if (dest_tid != getpid()) { ucs_error("Cannot use signaled events to threads without F_SETOWN_EX support"); return UCS_ERR_UNSUPPORTED; } if (0 > fcntl(fd, F_SETOWN, dest_tid)) { ucs_error("fcntl F_SETOWN failed: %m"); return UCS_ERR_IO_ERROR; } return UCS_OK; #endif } static ucs_status_t ucs_async_signal_sys_timer_create(int uid, pid_t tid, timer_t *sys_timer_id) { struct sigevent ev; timer_t timer; int ret; ucs_trace_func("tid=%d", tid); /* Create timer signal */ memset(&ev, 0, sizeof(ev)); ev.sigev_notify = SIGEV_THREAD_ID; ev.sigev_signo = ucs_global_opts.async_signo; ev.sigev_value.sival_int = uid; /* user parameter to timer */ #if defined(HAVE_SIGEVENT_SIGEV_UN_TID) ev._sigev_un._tid = tid; /* target thread */ #elif defined(HAVE_SIGEVENT_SIGEV_NOTIFY_THREAD_ID) ev.sigev_notify_thread_id = tid; /* target thread */ #else #error "Port me" #endif ret = timer_create(CLOCK_REALTIME, &ev, &timer); if (ret < 0) { ucs_error("failed to create an interval timer: %m"); return UCS_ERR_INVALID_PARAM; } *sys_timer_id = timer; return UCS_OK; } static ucs_status_t ucs_async_signal_sys_timer_set_interval(timer_t sys_timer_id, ucs_time_t interval) { struct itimerspec its; int ret; ucs_trace_func("sys_timer_id=%p interval=%.2f usec", sys_timer_id, ucs_time_to_usec(interval)); /* Modify the timer to have the desired accuracy */ ucs_sec_to_timespec(ucs_time_to_sec(interval), &its.it_interval); its.it_value = its.it_interval; ret = timer_settime(sys_timer_id, 0, &its, NULL); if (ret < 0) { ucs_error("failed to set the interval for the interval timer: %m"); return UCS_ERR_INVALID_PARAM; } return UCS_OK; } static void ucs_async_signal_sys_timer_delete(timer_t sys_timer_id) { int ret; ucs_trace_func("sys_timer_id=%p", sys_timer_id); ret = timer_delete(sys_timer_id); if (ret < 0) { ucs_warn("failed to remove the timer: %m"); } ucs_trace_async("removed system timer %p", sys_timer_id); } static ucs_status_t ucs_async_signal_dispatch_timer(int uid) { ucs_async_signal_timer_t *timer = &ucs_async_signal_global_context.timers[uid]; ucs_assertv_always((uid >= 0) && (uid < UCS_SIGNAL_MAX_TIMERQS), "uid=%d", uid); /* No need to take lock - remove operation blocks signals on the same thread */ if (timer->tid != ucs_get_tid()) { return UCS_OK; } return ucs_async_dispatch_timerq(&timer->timerq, ucs_get_time()); } static void ucs_async_signal_handler(int signo, siginfo_t *siginfo, void *arg) { ucs_assert(signo == ucs_global_opts.async_signo); /* Check event code */ switch (siginfo->si_code) { case SI_TIMER: ucs_trace_async("timer signal uid=%d", siginfo->si_value.sival_int); ucs_async_signal_dispatch_timer(siginfo->si_value.sival_int); return; case POLL_IN: case POLL_OUT: case POLL_HUP: case POLL_ERR: case POLL_MSG: case POLL_PRI: ucs_trace_async("async signal handler called for fd %d", siginfo->si_fd); ucs_async_dispatch_handlers(&siginfo->si_fd, 1); return; default: ucs_warn("signal handler called with unexpected event code %d, ignoring", siginfo->si_code); return; } } static void ucs_async_signal_allow(int allow) { sigset_t sigset; ucs_trace_func("enable=%d tid=%d", allow, ucs_get_tid()); sigemptyset(&sigset); sigaddset(&sigset, ucs_global_opts.async_signo); pthread_sigmask(allow ? SIG_UNBLOCK : SIG_BLOCK, &sigset, NULL); } static void ucs_async_signal_block_all() { pthread_mutex_lock(&ucs_async_signal_global_context.event_lock); if (ucs_async_signal_global_context.event_count > 0) { ucs_async_signal_allow(0); } pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock); } static void ucs_async_signal_unblock_all() { pthread_mutex_lock(&ucs_async_signal_global_context.event_lock); if (ucs_async_signal_global_context.event_count > 0) { ucs_async_signal_allow(1); } pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock); } static ucs_status_t ucs_async_signal_install_handler() { struct sigaction new_action; int ret; ucs_trace_func("event_count=%d", ucs_async_signal_global_context.event_count); pthread_mutex_lock(&ucs_async_signal_global_context.event_lock); if (ucs_async_signal_global_context.event_count == 0) { /* Set our signal handler */ new_action.sa_sigaction = ucs_async_signal_handler; sigemptyset(&new_action.sa_mask); new_action.sa_flags = SA_RESTART|SA_SIGINFO; #if HAVE_SIGACTION_SA_RESTORER new_action.sa_restorer = NULL; #endif ret = sigaction(ucs_global_opts.async_signo, &new_action, &ucs_async_signal_global_context.prev_sighandler); if (ret < 0) { ucs_error("failed to set a handler for signal %d: %m", ucs_global_opts.async_signo); pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock); return UCS_ERR_INVALID_PARAM; } ucs_trace_async("installed signal handler for %s", ucs_signal_names[ucs_global_opts.async_signo]); } ++ucs_async_signal_global_context.event_count; pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock); return UCS_OK; } static void fatal_sighandler(int signo, siginfo_t *siginfo, void *arg) { ucs_fatal("got timer signal"); } static void ucs_async_signal_uninstall_handler() { struct sigaction new_action; int ret; ucs_trace_func("event_count=%d", ucs_async_signal_global_context.event_count); pthread_mutex_lock(&ucs_async_signal_global_context.event_lock); if (--ucs_async_signal_global_context.event_count == 0) { new_action = ucs_async_signal_global_context.prev_sighandler; new_action.sa_sigaction = fatal_sighandler; ret = sigaction(ucs_global_opts.async_signo, &new_action, NULL); if (ret < 0) { ucs_warn("failed to restore the async signal handler: %m"); } ucs_trace_async("uninstalled signal handler for %s", ucs_signal_names[ucs_global_opts.async_signo]); } pthread_mutex_unlock(&ucs_async_signal_global_context.event_lock); } static ucs_status_t ucs_async_signal_init(ucs_async_context_t *async) { async->signal.block_count = 0; async->signal.tid = ucs_get_tid(); async->signal.pthread = pthread_self(); async->signal.timer = NULL; return UCS_OK; } static void ucs_async_signal_cleanup(ucs_async_context_t *async) { if (async->signal.block_count > 0) { ucs_warn("destroying async signal context with block_count %d", async->signal.block_count); } } static ucs_status_t ucs_async_signal_modify_event_fd(ucs_async_context_t *async, int event_fd, int events) { ucs_status_t status; int add, remove; UCS_ASYNC_SIGNAL_CHECK_THREAD(async); if (events) { add = O_ASYNC; /* Enable notifications */ remove = 0; } else { add = 0; /* Disable notifications */ remove = O_ASYNC; } ucs_trace_async("fcntl(fd=%d, add=0x%x, remove=0x%x)", event_fd, add, remove); status = ucs_sys_fcntl_modfl(event_fd, add, remove); if (status != UCS_OK) { ucs_error("fcntl F_SETFL failed: %m"); return UCS_ERR_IO_ERROR; } return UCS_OK; } static ucs_status_t ucs_async_signal_add_event_fd(ucs_async_context_t *async, int event_fd, int events) { ucs_status_t status; pid_t tid; UCS_ASYNC_SIGNAL_CHECK_THREAD(async); status = ucs_async_signal_install_handler(); if (status != UCS_OK) { goto err; } /* Send signal when fd is ready */ ucs_trace_async("fcntl(F_STSIG, fd=%d, sig=%s)", event_fd, ucs_signal_names[ucs_global_opts.async_signo]); if (0 > fcntl(event_fd, F_SETSIG, ucs_global_opts.async_signo)) { ucs_error("fcntl F_SETSIG failed: %m"); status = UCS_ERR_IO_ERROR; goto err_remove_handler; } /* Send the signal to the desired thread */ tid = ucs_async_signal_context_tid(async); status = ucs_async_signal_set_fd_owner(tid, event_fd); if (status != UCS_OK) { goto err_remove_handler; } /* Set async events on the file descriptor */ status = ucs_async_signal_modify_event_fd(async, event_fd, events); if (status != UCS_OK) { goto err_remove_handler; } return UCS_OK; err_remove_handler: ucs_async_signal_uninstall_handler(); err: return status; } static ucs_status_t ucs_async_signal_remove_event_fd(ucs_async_context_t *async, int event_fd) { ucs_status_t status; ucs_trace_func("event_fd=%d", event_fd); UCS_ASYNC_SIGNAL_CHECK_THREAD(async); ucs_async_signal_allow(0); status = ucs_sys_fcntl_modfl(event_fd, 0, O_ASYNC); ucs_async_signal_allow(1); ucs_async_signal_uninstall_handler(); return status; } static int ucs_async_signal_try_block(ucs_async_context_t *async) { if (async->signal.block_count > 0) { return 0; } UCS_ASYNC_SIGNAL_BLOCK(async); return 1; } static void ucs_async_signal_unblock(ucs_async_context_t *async) { UCS_ASYNC_SIGNAL_UNBLOCK(async); } static void ucs_timer_reset_if_empty(ucs_async_signal_timer_t *timer) { if (ucs_timerq_is_empty(&timer->timerq)) { ucs_async_signal_sys_timer_delete(timer->sys_timer_id); ucs_timerq_cleanup(&timer->timerq); timer->tid = 0; } } /* Add a timer, possible initializing the timerq */ static ucs_status_t ucs_async_signal_timerq_add_timer(ucs_async_signal_timer_t *timer, int tid, int timer_id, ucs_time_t interval) { ucs_time_t sys_interval; ucs_status_t status; int uid; if (timer->tid == 0) { timer->tid = tid; ucs_timerq_init(&timer->timerq); uid = (timer - ucs_async_signal_global_context.timers); status = ucs_async_signal_sys_timer_create(uid, timer->tid, &timer->sys_timer_id); if (status != UCS_OK) { goto err; } } status = ucs_timerq_add(&timer->timerq, timer_id, interval); if (status != UCS_OK) { goto err; } sys_interval = ucs_timerq_min_interval(&timer->timerq); status = ucs_async_signal_sys_timer_set_interval(timer->sys_timer_id, sys_interval); if (status != UCS_OK) { goto err_remove; } return UCS_OK; err_remove: ucs_timerq_remove(&timer->timerq, timer_id); err: ucs_timer_reset_if_empty(timer); return status; } /* Remove a timer, possible resetting the timerq */ static ucs_status_t ucs_async_signal_timerq_remove_timer(ucs_async_signal_timer_t *timer, int timer_id) { ucs_status_t status; status = ucs_timerq_remove(&timer->timerq, timer_id); if (status != UCS_OK) { return status; } ucs_timer_reset_if_empty(timer); return UCS_OK; } static ucs_async_signal_timer_t *ucs_async_signal_find_timer(pid_t tid) { ucs_async_signal_timer_t *timer; for (timer = ucs_async_signal_global_context.timers; timer < &ucs_async_signal_global_context.timers[UCS_SIGNAL_MAX_TIMERQS]; ++timer) { if (timer->tid == tid) { return timer; } } return NULL; } static ucs_status_t ucs_async_signal_add_timer(ucs_async_context_t *async, int timer_id, ucs_time_t interval) { ucs_async_signal_timer_t *timer; ucs_status_t status; pid_t tid; ucs_trace_func("async=%p interval=%.2fus timer_id=%d", async, ucs_time_to_usec(interval), timer_id); UCS_ASYNC_SIGNAL_CHECK_THREAD(async); /* Must install signal handler before arming the timer */ status = ucs_async_signal_install_handler(); if (status != UCS_OK) { goto err; } ucs_async_signal_allow(0); pthread_mutex_lock(&ucs_async_signal_global_context.timers_lock); /* Find existing or available timer queue for the current thread */ tid = ucs_async_signal_context_tid(async); timer = ucs_async_signal_find_timer(tid); if (timer == NULL) { timer = ucs_async_signal_find_timer(0); /* Search for free slot */ } if (timer == NULL) { status = UCS_ERR_EXCEEDS_LIMIT; } else { status = ucs_async_signal_timerq_add_timer(timer, tid, timer_id, interval); } pthread_mutex_unlock(&ucs_async_signal_global_context.timers_lock); ucs_async_signal_allow(1); if (status != UCS_OK) { goto err_uninstall_handler; } return UCS_OK; err_uninstall_handler: ucs_async_signal_uninstall_handler(); err: return status; } static ucs_status_t ucs_async_signal_remove_timer(ucs_async_context_t *async, int timer_id) { ucs_async_signal_timer_t *timer; ucs_status_t status; ucs_trace_func("async=%p timer_id=%d", async, timer_id); UCS_ASYNC_SIGNAL_CHECK_THREAD(async); ucs_async_signal_allow(0); pthread_mutex_lock(&ucs_async_signal_global_context.timers_lock); timer = ucs_async_signal_find_timer(ucs_async_signal_context_tid(async)); if (timer == NULL) { status = UCS_ERR_NO_ELEM; } else { status = ucs_async_signal_timerq_remove_timer(timer, timer_id); } pthread_mutex_unlock(&ucs_async_signal_global_context.timers_lock); ucs_async_signal_allow(1); if (status == UCS_OK) { ucs_async_signal_uninstall_handler(); } return status; } static void ucs_async_signal_global_init() { pthread_mutex_init(&ucs_async_signal_global_context.timers_lock, NULL); } static void ucs_async_signal_global_cleanup() { if (ucs_async_signal_global_context.event_count != 0) { ucs_warn("signal handler not removed (%d events remaining)", ucs_async_signal_global_context.event_count); } pthread_mutex_destroy(&ucs_async_signal_global_context.timers_lock); } ucs_async_ops_t ucs_async_signal_ops = { .init = ucs_async_signal_global_init, .cleanup = ucs_async_signal_global_cleanup, .block = ucs_async_signal_block_all, .unblock = ucs_async_signal_unblock_all, .context_init = ucs_async_signal_init, .context_cleanup = ucs_async_signal_cleanup, .context_try_block = ucs_async_signal_try_block, .context_unblock = ucs_async_signal_unblock, .add_event_fd = ucs_async_signal_add_event_fd, .remove_event_fd = ucs_async_signal_remove_event_fd, .modify_event_fd = ucs_async_signal_modify_event_fd, .add_timer = ucs_async_signal_add_timer, .remove_timer = ucs_async_signal_remove_timer, };