/* * Soft: Keepalived is a failover program for the LVS project * . It monitor & manipulate * a loadbalanced server pool using multi-layer checks. * * Part: Scheduling framework. This code is highly inspired from * the thread management routine (thread.c) present in the * very nice zebra project (http://www.zebra.org). * * Author: Alexandre Cassen, * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU General Public License for more details. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. * * Copyright (C) 2001-2017 Alexandre Cassen, */ #include "config.h" /* SNMP should be included first: it redefines "FREE" */ #ifdef _WITH_SNMP_ #include #include #include #undef FREE #endif #ifndef _DEBUG_ #define NDEBUG #endif #include #include #include #include #include #include #ifdef HAVE_SIGNALFD #include #endif #include #include #include "scheduler.h" #include "memory.h" #include "rbtree.h" #include "utils.h" #include "signals.h" #include "logger.h" #include "bitops.h" #include "git-commit.h" #include "timer.h" #if !HAVE_EPOLL_CREATE1 || !defined TFD_NONBLOCK #include "old_socket.h" #endif #ifdef THREAD_DUMP typedef struct _func_det { const char *name; int (*func)(thread_t *); rb_node_t n; } func_det_t; #endif /* global vars */ thread_master_t *master = NULL; #ifndef _DEBUG_ prog_type_t prog_type; /* Parent/VRRP/Checker process */ #endif #ifdef _WITH_SNMP_ bool snmp_running; /* True if this process is running SNMP */ #endif /* local variables */ static bool shutting_down; static int sav_argc; static char **sav_argv; #ifdef _EPOLL_DEBUG_ bool do_epoll_debug; #endif #ifdef _EPOLL_THREAD_DUMP_ bool do_epoll_thread_dump; #endif #ifdef THREAD_DUMP static rb_root_t funcs = RB_ROOT; #endif #ifdef _VRRP_FD_DEBUG_ static void (*extra_threads_debug)(void); #endif /* Function that returns prog_name if pid is a known child */ static char const * (*child_finder_name)(pid_t); #ifdef THREAD_DUMP static const char * get_thread_type_str(thread_type_t id) { if (id == THREAD_READ) return "READ"; if (id == THREAD_WRITE) return "WRITE"; if (id == THREAD_TIMER) return "TIMER"; if (id == THREAD_TIMER_SHUTDOWN) return "TIMER_SHUTDOWN"; if (id == THREAD_EVENT) return "EVENT"; if (id == THREAD_CHILD) return "CHILD"; if (id == THREAD_READY) return "READY"; if (id == THREAD_UNUSED) return "UNUSED"; if (id == THREAD_WRITE_TIMEOUT) return "WRITE_TIMEOUT"; if (id == THREAD_READ_TIMEOUT) return "READ_TIMEOUT"; if (id == THREAD_CHILD_TIMEOUT) return "CHILD_TIMEOUT"; if (id == THREAD_CHILD_TERMINATED) return "CHILD_TERMINATED"; if (id == THREAD_TERMINATE_START) return "TERMINATE_START"; if (id == THREAD_TERMINATE) return "TERMINATE"; if (id == THREAD_READY_FD) return "READY_FD"; if (id == THREAD_READ_ERROR) return "READ_ERROR"; if (id == THREAD_WRITE_ERROR) return "WRITE_ERROR"; #ifdef USE_SIGNAL_THREADS if (id == THREAD_SIGNAL) return "SIGNAL"; #endif return "unknown"; } static inline int function_cmp(const func_det_t *func1, const func_det_t *func2) { if (func1->func < func2->func) return -1; if (func1->func > func2->func) return 1; return 0; } static const char * get_function_name(int (*func)(thread_t *)) { func_det_t func_det = { .func = func }; func_det_t *match; static char address[19]; if (!RB_EMPTY_ROOT(&funcs)) { match = rb_search(&funcs, &func_det, n, function_cmp); if (match) return match->name; } snprintf(address, sizeof address, "%p", func); return address; } const char * get_signal_function_name(void (*func)(void *, int)) { /* The cast should really be (int (*)(thread_t *))func, but gcc 8.1 produces * a warning with -Wcast-function-type, that the cast is to an incompatible * function type. Since we don't actually call the function, but merely use * it to compare function addresses, what we cast it do doesn't really matter */ return get_function_name((void *)func); } void register_thread_address(const char *func_name, int (*func)(thread_t *)) { func_det_t *func_det; func_det = (func_det_t *) MALLOC(sizeof(func_det_t)); if (!func_det) return; func_det->name = func_name; func_det->func = func; rb_insert_sort(&funcs, func_det, n, function_cmp); } void register_signal_handler_address(const char *func_name, void (*func)(void *, int)) { /* See comment in get_signal_function_name() above */ register_thread_address(func_name, (void *)func); } void deregister_thread_addresses(void) { func_det_t *func_det, *func_det_tmp; if (RB_EMPTY_ROOT(&funcs)) return; rb_for_each_entry_safe(func_det, func_det_tmp, &funcs, n) { rb_erase(&func_det->n, &funcs); FREE(func_det); } } #endif #ifdef _VRRP_FD_DEBUG_ void set_extra_threads_debug(void (*func)(void)) { extra_threads_debug = func; } #endif /* Move ready thread into ready queue */ static int thread_move_ready(thread_master_t *m, rb_root_cached_t *root, thread_t *thread, int type) { rb_erase_cached(&thread->n, root); INIT_LIST_HEAD(&thread->next); list_add_tail(&thread->next, &m->ready); if (thread->type != THREAD_TIMER_SHUTDOWN) thread->type = type; return 0; } /* Move ready thread into ready queue */ static void thread_rb_move_ready(thread_master_t *m, rb_root_cached_t *root, int type) { thread_t *thread, *thread_tmp; rb_for_each_entry_safe_cached(thread, thread_tmp, root, n) { if (thread->sands.tv_sec == TIMER_DISABLED || timercmp(&time_now, &thread->sands, <)) break; if (type == THREAD_READ_TIMEOUT) thread->event->read = NULL; else if (type == THREAD_WRITE_TIMEOUT) thread->event->write = NULL; thread_move_ready(m, root, thread, type); } } /* Update timer value */ static void thread_update_timer(rb_root_cached_t *root, timeval_t *timer_min) { thread_t *first; if (!root->rb_root.rb_node) return; first = rb_entry(rb_first_cached(root), thread_t, n); if (!first) return; if (first->sands.tv_sec == TIMER_DISABLED) return; if (!timerisset(timer_min) || timercmp(&first->sands, timer_min, <=)) *timer_min = first->sands; } /* Compute the wait timer. Take care of timeouted fd */ static void thread_set_timer(thread_master_t *m) { timeval_t timer_wait; struct itimerspec its; /* Prepare timer */ timerclear(&timer_wait); thread_update_timer(&m->timer, &timer_wait); thread_update_timer(&m->write, &timer_wait); thread_update_timer(&m->read, &timer_wait); thread_update_timer(&m->child, &timer_wait); if (timerisset(&timer_wait)) { /* Re-read the current time to get the maximum accuracy */ set_time_now(); /* Take care about monotonic clock */ timersub(&timer_wait, &time_now, &timer_wait); if (timer_wait.tv_sec < 0) { /* This will disable the timerfd */ timerclear(&timer_wait); } } else { /* set timer to a VERY long time */ timer_wait.tv_sec = LONG_MAX; timer_wait.tv_usec = 0; } its.it_value.tv_sec = timer_wait.tv_sec; if (!timerisset(&timer_wait)) { /* We could try to avoid doing the epoll_wait since * testing shows it takes about 4 microseconds * for the timer to expire. */ its.it_value.tv_nsec = 1; } else its.it_value.tv_nsec = timer_wait.tv_usec * 1000; /* We don't want periodic timer expiry */ its.it_interval.tv_sec = its.it_interval.tv_nsec = 0; timerfd_settime(m->timer_fd, 0, &its, NULL); #ifdef _EPOLL_DEBUG_ if (do_epoll_debug) log_message(LOG_INFO, "Setting timer_fd %lu.%9.9ld", its.it_value.tv_sec, its.it_value.tv_nsec); #endif } static int thread_timerfd_handler(thread_t *thread) { thread_master_t *m = thread->master; uint64_t expired; ssize_t len; len = read(m->timer_fd, &expired, sizeof(expired)); if (len < 0) log_message(LOG_ERR, "scheduler: Error reading on timerfd fd:%d (%m)", m->timer_fd); /* Read, Write, Timer, Child thread. */ thread_rb_move_ready(m, &m->read, THREAD_READ_TIMEOUT); thread_rb_move_ready(m, &m->write, THREAD_WRITE_TIMEOUT); thread_rb_move_ready(m, &m->timer, THREAD_READY); thread_rb_move_ready(m, &m->child, THREAD_CHILD_TIMEOUT); /* Register next timerfd thread */ m->timer_thread = thread_add_read(m, thread_timerfd_handler, NULL, m->timer_fd, TIMER_NEVER); return 0; } /* Child PID cmp helper */ static inline int thread_child_pid_cmp(thread_t *t1, thread_t *t2) { return t1->u.c.pid - t2->u.c.pid; } void set_child_finder_name(char const * (*func)(pid_t)) { child_finder_name = func; } void save_cmd_line_options(int argc, char **argv) { sav_argc = argc; sav_argv = argv; } #ifndef _DEBUG_ static const char * get_end(const char *str, size_t max_len) { size_t len = strlen(str); const char *end; if (len <= max_len) return str + len; end = str + max_len; if (*end == ' ') return end; while (end > str && *--end != ' '); if (end > str) return end; return str + max_len; } static void log_options(const char *option, const char *option_str, unsigned indent) { const char *p = option_str; size_t opt_len = strlen(option); const char *end; bool first_line = true; while (*p) { /* Skip leading spaces */ while (*p == ' ') p++; end = get_end(p, 100 - opt_len); if (first_line) { log_message(LOG_INFO, "%*s%s: %.*s", indent, "", option, (int)(end - p), p); first_line = false; } else log_message(LOG_INFO, "%*s%.*s", (int)(indent + opt_len + 2), "", (int)(end - p), p); p = end; } } void log_command_line(unsigned indent) { size_t len = 0; char *log_str; char *p; int i; if (!sav_argv) return; for (i = 0; i < sav_argc; i++) len += strlen(sav_argv[i]) + 3; /* Add opening and closing 's, and following space or '\0' */ log_str = MALLOC(len); for (i = 0, p = log_str; i < sav_argc; i++) p += sprintf(p, "%s'%s'", i ? " " : "", sav_argv[i]); log_options("Command line", log_str, indent); FREE(log_str); } /* report_child_status returns true if the exit is a hard error, so unable to continue */ bool report_child_status(int status, pid_t pid, char const *prog_name) { char const *prog_id = NULL; char pid_buf[12]; /* "pid 4194303" + '\0' - see definition of PID_MAX_LIMIT in include/linux/threads.h */ int exit_status ; if (prog_name) prog_id = prog_name; else if (child_finder_name) prog_id = child_finder_name(pid); if (!prog_id) { snprintf(pid_buf, sizeof(pid_buf), "pid %d", pid); prog_id = pid_buf; } if (WIFEXITED(status)) { exit_status = WEXITSTATUS(status); /* Handle exit codes of vrrp or checker child */ if (exit_status == KEEPALIVED_EXIT_FATAL || exit_status == KEEPALIVED_EXIT_CONFIG) { log_message(LOG_INFO, "%s exited with permanent error %s. Terminating", prog_id, exit_status == KEEPALIVED_EXIT_CONFIG ? "CONFIG" : "FATAL" ); return true; } if (exit_status != EXIT_SUCCESS) log_message(LOG_INFO, "%s exited with status %d", prog_id, exit_status); } else if (WIFSIGNALED(status)) { if (WTERMSIG(status) == SIGSEGV) { struct utsname uname_buf; log_message(LOG_INFO, "%s exited due to segmentation fault (SIGSEGV).", prog_id); log_message(LOG_INFO, " %s", "Please report a bug at https://github.com/acassen/keepalived/issues"); log_message(LOG_INFO, " %s", "and include this log from when keepalived started, a description"); log_message(LOG_INFO, " %s", "of what happened before the crash, your configuration file and the details below."); log_message(LOG_INFO, " %s", "Also provide the output of keepalived -v, what Linux distro and version"); log_message(LOG_INFO, " %s", "you are running on, and whether keepalived is being run in a container or VM."); log_message(LOG_INFO, " %s", "A failure to provide all this information may mean the crash cannot be investigated."); log_message(LOG_INFO, " %s", "If you are able to provide a stack backtrace with gdb that would really help."); log_message(LOG_INFO, " Source version %s %s%s", PACKAGE_VERSION, #ifdef GIT_COMMIT ", git commit ", GIT_COMMIT #else "", "" #endif ); log_message(LOG_INFO, " Built with kernel headers for Linux %d.%d.%d", (LINUX_VERSION_CODE >> 16) & 0xff, (LINUX_VERSION_CODE >> 8) & 0xff, (LINUX_VERSION_CODE ) & 0xff); uname(&uname_buf); log_message(LOG_INFO, " Running on %s %s %s", uname_buf.sysname, uname_buf.release, uname_buf.version); log_command_line(2); log_options("configure options", KEEPALIVED_CONFIGURE_OPTIONS, 2); log_options("Config options", CONFIGURATION_OPTIONS, 2); log_options("System options", SYSTEM_OPTIONS, 2); // if (__test_bit(DONT_RESPAWN_BIT, &debug)) // segv_termination = true; } else log_message(LOG_INFO, "%s exited due to signal %d", prog_id, WTERMSIG(status)); } return false; } #endif /* epoll related */ static int thread_events_resize(thread_master_t *m, int delta) { unsigned int new_size; m->epoll_count += delta; if (m->epoll_count < m->epoll_size) return 0; new_size = ((m->epoll_count / THREAD_EPOLL_REALLOC_THRESH) + 1); new_size *= THREAD_EPOLL_REALLOC_THRESH; if (m->epoll_events) FREE(m->epoll_events); m->epoll_events = MALLOC(new_size * sizeof(struct epoll_event)); if (!m->epoll_events) { m->epoll_size = 0; return -1; } m->epoll_size = new_size; return 0; } static inline int thread_event_cmp(const thread_event_t *event1, const thread_event_t *event2) { if (event1->fd < event2->fd) return -1; if (event1->fd > event2->fd) return 1; return 0; } static thread_event_t * thread_event_new(thread_master_t *m, int fd) { thread_event_t *event; event = (thread_event_t *) MALLOC(sizeof(thread_event_t)); if (!event) return NULL; if (thread_events_resize(m, 1) < 0) { FREE(event); return NULL; } event->fd = fd; rb_insert_sort(&m->io_events, event, n, thread_event_cmp); return event; } static thread_event_t * thread_event_get(thread_master_t *m, int fd) { thread_event_t event = { .fd = fd }; return rb_search(&m->io_events, &event, n, thread_event_cmp); } static int thread_event_set(thread_t *thread) { thread_event_t *event = thread->event; thread_master_t *m = thread->master; struct epoll_event ev; int op; memset(&ev, 0, sizeof(struct epoll_event)); ev.data.ptr = event; if (__test_bit(THREAD_FL_READ_BIT, &event->flags)) ev.events |= EPOLLIN; if (__test_bit(THREAD_FL_WRITE_BIT, &event->flags)) ev.events |= EPOLLOUT; if (__test_bit(THREAD_FL_EPOLL_BIT, &event->flags)) op = EPOLL_CTL_MOD; else op = EPOLL_CTL_ADD; if (epoll_ctl(m->epoll_fd, op, event->fd, &ev) < 0) { log_message(LOG_INFO, "scheduler: Error performing control on EPOLL instance (%m)"); return -1; } __set_bit(THREAD_FL_EPOLL_BIT, &event->flags); return 0; } static int thread_event_cancel(thread_t *thread) { thread_event_t *event = thread->event; thread_master_t *m = thread->master; if (!event) { log_message(LOG_INFO, "scheduler: Error performing epoll_ctl DEL op no event linked?!"); return -1; } if (m->epoll_fd != -1 && epoll_ctl(m->epoll_fd, EPOLL_CTL_DEL, event->fd, NULL) < 0 && errno != EBADF) { log_message(LOG_INFO, "scheduler: Error performing epoll_ctl DEL op for fd:%d (%m)", event->fd); return -1; } rb_erase(&event->n, &m->io_events); if (event == m->current_event) m->current_event = NULL; FREE(thread->event); return 0; } static int thread_event_del(thread_t *thread, unsigned flag) { thread_event_t *event = thread->event; if (!__test_bit(flag, &event->flags)) return 0; if (flag == THREAD_FL_EPOLL_READ_BIT) { __clear_bit(THREAD_FL_READ_BIT, &event->flags); if (!__test_bit(THREAD_FL_EPOLL_WRITE_BIT, &event->flags)) return thread_event_cancel(thread); event->read = NULL; } else if (flag == THREAD_FL_EPOLL_WRITE_BIT) { __clear_bit(THREAD_FL_WRITE_BIT, &event->flags); if (!__test_bit(THREAD_FL_EPOLL_READ_BIT, &event->flags)) return thread_event_cancel(thread); event->write = NULL; } if (thread_event_set(thread) < 0) return -1; __clear_bit(flag, &event->flags); return 0; } /* Make thread master. */ thread_master_t * thread_make_master(void) { thread_master_t *new; new = (thread_master_t *) MALLOC(sizeof (thread_master_t)); #if HAVE_EPOLL_CREATE1 new->epoll_fd = epoll_create1(EPOLL_CLOEXEC); #else new->epoll_fd = epoll_create(1); #endif if (new->epoll_fd < 0) { log_message(LOG_INFO, "scheduler: Error creating EPOLL instance (%m)"); FREE(new); return NULL; } #if !HAVE_EPOLL_CREATE1 if (set_sock_flags(new->epoll_fd, F_SETFD, FD_CLOEXEC)) log_message(LOG_INFO, "Unable to set CLOEXEC on epoll_fd - %s (%d)", strerror(errno), errno); #endif new->read = RB_ROOT_CACHED; new->write = RB_ROOT_CACHED; new->timer = RB_ROOT_CACHED; new->child = RB_ROOT_CACHED; new->io_events = RB_ROOT; new->child_pid = RB_ROOT; INIT_LIST_HEAD(&new->event); #ifdef USE_SIGNAL_THREADS INIT_LIST_HEAD(&new->signal); #endif INIT_LIST_HEAD(&new->ready); INIT_LIST_HEAD(&new->unuse); /* Register timerfd thread */ new->timer_fd = timerfd_create(CLOCK_MONOTONIC, #ifdef TFD_NONBLOCK /* Since Linux 2.6.27 */ TFD_NONBLOCK | TFD_CLOEXEC #else 0 #endif ); if (new->timer_fd < 0) { log_message(LOG_ERR, "scheduler: Cant create timerfd (%m)"); FREE(new); return NULL; } #ifndef TFD_NONBLOCK if (set_sock_flags(new->timer_fd, F_SETFL, O_NONBLOCK)) log_message(LOG_INFO, "Unable to set NONBLOCK on timer_fd - %s (%d)", strerror(errno), errno); if (set_sock_flags(new->timer_fd, F_SETFD, FD_CLOEXEC)) log_message(LOG_INFO, "Unable to set CLOEXEC on timer_fd - %s (%d)", strerror(errno), errno); #endif new->signal_fd = signal_handler_init(); new->timer_thread = thread_add_read(new, thread_timerfd_handler, NULL, new->timer_fd, TIMER_NEVER); add_signal_read_thread(new); return new; } #ifdef THREAD_DUMP static char * timer_delay(timeval_t sands) { static char str[42]; if (sands.tv_sec == TIMER_DISABLED) return "NEVER"; if (sands.tv_sec == 0 && sands.tv_usec == 0) return "UNSET"; if (timercmp(&sands, &time_now, >=)) { sands = timer_sub_now(sands); snprintf(str, sizeof str, "%lu.%6.6ld", sands.tv_sec, sands.tv_usec); } else { timersub(&time_now, &sands, &sands); snprintf(str, sizeof str, "-%lu.%6.6ld", sands.tv_sec, sands.tv_usec); } return str; } /* Dump rbtree */ static void thread_rb_dump(rb_root_cached_t *root, const char *tree, FILE *fp) { thread_t *thread; int i = 1; conf_write(fp, "----[ Begin rb_dump %s ]----", tree); rb_for_each_entry_cached(thread, root, n) conf_write(fp, "#%.2d Thread type %s, event_fd %d, val/fd/pid %d, timer: %s, func %s(), id %ld", i++, get_thread_type_str(thread->type), thread->event ? thread->event->fd: -2, thread->u.val, timer_delay(thread->sands), get_function_name(thread->func), thread->id); conf_write(fp, "----[ End rb_dump ]----"); } static void thread_list_dump(list_head_t *l, const char *list, FILE *fp) { thread_t *thread; int i = 1; conf_write(fp, "----[ Begin list_dump %s ]----", list); list_for_each_entry(thread, l, next) conf_write(fp, "#%.2d Thread:%p type %s func %s() id %ld", i++, thread, get_thread_type_str(thread->type), get_function_name(thread->func), thread->id); conf_write(fp, "----[ End list_dump ]----"); } static void event_rb_dump(rb_root_t *root, const char *tree, FILE *fp) { thread_event_t *event; int i = 1; conf_write(fp, "----[ Begin rb_dump %s ]----", tree); rb_for_each_entry(event, root, n) conf_write(fp, "#%.2d event %p fd %d, flags: 0x%lx, read %p, write %p", i++, event, event->fd, event->flags, event->read, event->write); conf_write(fp, "----[ End rb_dump ]----"); } void dump_thread_data(thread_master_t *m, FILE *fp) { thread_rb_dump(&m->read, "read", fp); thread_rb_dump(&m->write, "write", fp); thread_rb_dump(&m->child, "child", fp); thread_rb_dump(&m->timer, "timer", fp); thread_list_dump(&m->event, "event", fp); thread_list_dump(&m->ready, "ready", fp); #ifdef USE_SIGNAL_THREADS thread_list_dump(&m->signal, "signal", fp); #endif thread_list_dump(&m->unuse, "unuse", fp); event_rb_dump(&m->io_events, "io_events", fp); } #endif /* declare thread_timer_cmp() for rbtree compares */ RB_TIMER_CMP(thread); /* Free all unused thread. */ static void thread_clean_unuse(thread_master_t * m) { thread_t *thread, *thread_tmp; list_head_t *l = &m->unuse; list_for_each_entry_safe(thread, thread_tmp, l, next) { list_head_del(&thread->next); /* free the thread */ FREE(thread); m->alloc--; } INIT_LIST_HEAD(l); } /* Move thread to unuse list. */ static void thread_add_unuse(thread_master_t *m, thread_t *thread) { assert(m != NULL); thread->type = THREAD_UNUSED; thread->event = NULL; INIT_LIST_HEAD(&thread->next); list_add_tail(&thread->next, &m->unuse); } /* Move list element to unuse queue */ static void thread_destroy_list(thread_master_t *m, list_head_t *l) { thread_t *thread, *thread_tmp; list_for_each_entry_safe(thread, thread_tmp, l, next) { if (thread->event) { thread_del_read(thread); thread_del_write(thread); } list_head_del(&thread->next); thread_add_unuse(m, thread); } } static void thread_destroy_rb(thread_master_t *m, rb_root_cached_t *root) { thread_t *thread, *thread_tmp; rb_for_each_entry_safe_cached(thread, thread_tmp, root, n) { rb_erase_cached(&thread->n, root); /* Do we have a thread_event, and does it need deleting? */ if (thread->type == THREAD_READ) thread_del_read(thread); else if (thread->type == THREAD_WRITE) thread_del_write(thread); thread_add_unuse(m, thread); } } /* Cleanup master */ void thread_cleanup_master(thread_master_t * m) { /* Unuse current thread lists */ thread_destroy_rb(m, &m->read); thread_destroy_rb(m, &m->write); thread_destroy_rb(m, &m->timer); thread_destroy_rb(m, &m->child); thread_destroy_list(m, &m->event); #ifdef USE_SIGNAL_THREADS thread_destroy_list(m, &m->signal); #endif thread_destroy_list(m, &m->ready); m->child_pid = RB_ROOT; /* Clean garbage */ thread_clean_unuse(m); FREE(m->epoll_events); m->epoll_size = 0; m->epoll_count = 0; m->timer_thread = NULL; #ifdef _WITH_SNMP_ m->snmp_timer_thread = NULL; FD_ZERO(&m->snmp_fdset); m->snmp_fdsetsize = 0; #endif } /* Stop thread scheduler. */ void thread_destroy_master(thread_master_t * m) { if (m->epoll_fd != -1) { close(m->epoll_fd); m->epoll_fd = -1; } if (m->timer_fd != -1) close(m->timer_fd); if (m->signal_fd != -1) signal_handler_destroy(); thread_cleanup_master(m); FREE(m); } /* Delete top of the list and return it. */ static thread_t * thread_trim_head(list_head_t *l) { thread_t *thread; if (list_empty(l)) return NULL; thread = list_first_entry(l, thread_t, next); list_del_init(&thread->next); return thread; } /* Make unique thread id for non pthread version of thread manager. */ static inline unsigned long thread_get_id(thread_master_t *m) { return m->id++; } /* Make new thread. */ static thread_t * thread_new(thread_master_t *m) { thread_t *new; /* If one thread is already allocated return it */ new = thread_trim_head(&m->unuse); if (!new) { new = (thread_t *)MALLOC(sizeof(thread_t)); m->alloc++; } INIT_LIST_HEAD(&new->next); new->id = thread_get_id(m); return new; } /* Add new read thread. */ thread_t * thread_add_read_sands(thread_master_t *m, int (*func) (thread_t *), void *arg, int fd, timeval_t *sands) { thread_event_t *event; thread_t *thread; assert(m != NULL); /* I feel lucky ! :D */ if (m->current_event && m->current_event->fd == fd) event = m->current_event; else event = thread_event_get(m, fd); if (!event) { if (!(event = thread_event_new(m, fd))) { log_message(LOG_INFO, "scheduler: Cant allocate read event for fd [%d](%m)", fd); return NULL; } } else if (__test_bit(THREAD_FL_READ_BIT, &event->flags) && event->read) { log_message(LOG_INFO, "scheduler: There is already read event %p (read %p) registered on fd [%d]", event, event->read, fd); return NULL; } thread = thread_new(m); thread->type = THREAD_READ; thread->master = m; thread->func = func; thread->arg = arg; thread->u.fd = fd; thread->event = event; /* Set & flag event */ __set_bit(THREAD_FL_READ_BIT, &event->flags); event->read = thread; if (!__test_bit(THREAD_FL_EPOLL_READ_BIT, &event->flags)) { if (thread_event_set(thread) < 0) { log_message(LOG_INFO, "scheduler: Cant register read event for fd [%d](%m)", fd); thread_add_unuse(m, thread); return NULL; } __set_bit(THREAD_FL_EPOLL_READ_BIT, &event->flags); } thread->sands = *sands; /* Sort the thread. */ rb_insert_sort_cached(&m->read, thread, n, thread_timer_cmp); return thread; } thread_t * thread_add_read(thread_master_t *m, int (*func) (thread_t *), void *arg, int fd, unsigned long timer) { timeval_t sands; /* Compute read timeout value */ if (timer == TIMER_NEVER) sands.tv_sec = TIMER_DISABLED; else { set_time_now(); sands = timer_add_long(time_now, timer); } return thread_add_read_sands(m, func, arg, fd, &sands); } int thread_del_read(thread_t *thread) { if (!thread || !thread->event) return -1; if (thread_event_del(thread, THREAD_FL_EPOLL_READ_BIT) < 0) return -1; return 0; } #ifdef _WITH_SNMP_ static void thread_del_read_fd(thread_master_t *m, int fd) { thread_event_t *event; event = thread_event_get(m, fd); if (!event || !event->read) return; thread_cancel(event->read); } #endif static void thread_read_requeue(thread_master_t *m, int fd, const timeval_t *new_sands) { thread_t *thread; thread_event_t *event; event = thread_event_get(m, fd); if (!event || !event->read) return; thread = event->read; if (thread->type != THREAD_READ) { /* If the thread is not on the read list, don't touch it */ return; } thread->sands = *new_sands; rb_move_cached(&thread->master->read, thread, n, thread_timer_cmp); } void thread_requeue_read(thread_master_t *m, int fd, const timeval_t *sands) { thread_read_requeue(m, fd, sands); } /* Add new write thread. */ thread_t * thread_add_write(thread_master_t *m, int (*func) (thread_t *), void *arg, int fd, unsigned long timer) { thread_event_t *event; thread_t *thread; assert(m != NULL); /* I feel lucky ! :D */ if (m->current_event && m->current_event->fd == fd) event = m->current_event; else event = thread_event_get(m, fd); if (!event) { if (!(event = thread_event_new(m, fd))) { log_message(LOG_INFO, "scheduler: Cant allocate write event for fd [%d](%m)", fd); return NULL; } } else if (__test_bit(THREAD_FL_WRITE_BIT, &event->flags) && event->write) { log_message(LOG_INFO, "scheduler: There is already write event registered on fd [%d]", fd); return NULL; } thread = thread_new(m); thread->type = THREAD_WRITE; thread->master = m; thread->func = func; thread->arg = arg; thread->u.fd = fd; thread->event = event; /* Set & flag event */ __set_bit(THREAD_FL_WRITE_BIT, &event->flags); event->write = thread; if (!__test_bit(THREAD_FL_EPOLL_WRITE_BIT, &event->flags)) { if (thread_event_set(thread) < 0) { log_message(LOG_INFO, "scheduler: Cant register write event for fd [%d](%m)" , fd); thread_add_unuse(m, thread); return NULL; } __set_bit(THREAD_FL_EPOLL_WRITE_BIT, &event->flags); } /* Compute write timeout value */ if (timer == TIMER_NEVER) thread->sands.tv_sec = TIMER_DISABLED; else { set_time_now(); thread->sands = timer_add_long(time_now, timer); } /* Sort the thread. */ rb_insert_sort_cached(&m->write, thread, n, thread_timer_cmp); return thread; } int thread_del_write(thread_t *thread) { if (!thread || !thread->event) return -1; if (thread_event_del(thread, THREAD_FL_EPOLL_WRITE_BIT) < 0) return -1; return 0; } void thread_close_fd(thread_t *thread) { if (thread->u.fd == -1) return; if (thread->event) thread_event_cancel(thread); close(thread->u.fd); thread->u.fd = -1; } /* Add timer event thread. */ thread_t * thread_add_timer(thread_master_t *m, int (*func) (thread_t *), void *arg, unsigned long timer) { thread_t *thread; assert(m != NULL); thread = thread_new(m); thread->type = THREAD_TIMER; thread->master = m; thread->func = func; thread->arg = arg; /* Do we need jitter here? */ if (timer == TIMER_NEVER) thread->sands.tv_sec = TIMER_DISABLED; else { set_time_now(); thread->sands = timer_add_long(time_now, timer); } /* Sort by timeval. */ rb_insert_sort_cached(&m->timer, thread, n, thread_timer_cmp); return thread; } void timer_thread_update_timeout(thread_t *thread, unsigned long timer) { timeval_t sands; if (thread->type > THREAD_MAX_WAITING) { /* It is probably on the ready list, so we'd better just let it run */ return; } set_time_now(); sands = timer_add_long(time_now, timer); if (timercmp(&thread->sands, &sands, ==)) return; thread->sands = sands; rb_move_cached(&thread->master->timer, thread, n, thread_timer_cmp); } thread_t * thread_add_timer_shutdown(thread_master_t *m, int(*func)(thread_t *), void *arg, unsigned long timer) { thread_t *thread = thread_add_timer(m, func, arg, timer); thread->type = THREAD_TIMER_SHUTDOWN; return thread; } /* Add a child thread. */ thread_t * thread_add_child(thread_master_t * m, int (*func) (thread_t *), void * arg, pid_t pid, unsigned long timer) { thread_t *thread; assert(m != NULL); thread = thread_new(m); thread->type = THREAD_CHILD; thread->master = m; thread->func = func; thread->arg = arg; thread->u.c.pid = pid; thread->u.c.status = 0; /* Compute child timeout value */ if (timer == TIMER_NEVER) thread->sands.tv_sec = TIMER_DISABLED; else { set_time_now(); thread->sands = timer_add_long(time_now, timer); } /* Sort by timeval. */ rb_insert_sort_cached(&m->child, thread, n, thread_timer_cmp); /* Sort by PID */ rb_insert_sort(&m->child_pid, thread, rb_data, thread_child_pid_cmp); return thread; } void thread_children_reschedule(thread_master_t *m, int (*func)(thread_t *), unsigned long timer) { thread_t *thread; // What is this used for ?? set_time_now(); rb_for_each_entry_cached(thread, &m->child, n) { thread->func = func; thread->sands = timer_add_long(time_now, timer); } } /* Add simple event thread. */ thread_t * thread_add_event(thread_master_t * m, int (*func) (thread_t *), void *arg, int val) { thread_t *thread; assert(m != NULL); thread = thread_new(m); thread->type = THREAD_EVENT; thread->master = m; thread->func = func; thread->arg = arg; thread->u.val = val; INIT_LIST_HEAD(&thread->next); list_add_tail(&thread->next, &m->event); return thread; } /* Add terminate event thread. */ static thread_t * thread_add_generic_terminate_event(thread_master_t * m, thread_type_t type, int (*func)(thread_t *)) { thread_t *thread; assert(m != NULL); thread = thread_new(m); thread->type = type; thread->master = m; thread->func = func; thread->arg = NULL; thread->u.val = 0; INIT_LIST_HEAD(&thread->next); list_add_tail(&thread->next, &m->event); return thread; } thread_t * thread_add_terminate_event(thread_master_t *m) { return thread_add_generic_terminate_event(m, THREAD_TERMINATE, NULL); } thread_t * thread_add_start_terminate_event(thread_master_t *m, int(*func)(thread_t *)) { return thread_add_generic_terminate_event(m, THREAD_TERMINATE_START, func); } #ifdef USE_SIGNAL_THREADS /* Add signal thread. */ thread_t * thread_add_signal(thread_master_t *m, int (*func) (thread_t *), void *arg, int signum) { thread_t *thread; assert(m != NULL); thread = thread_new(m); thread->type = THREAD_SIGNAL; thread->master = m; thread->func = func; thread->arg = arg; thread->u.val = signum; INIT_LIST_HEAD(&thread->next); list_add_tail(&thread->next, &m->signal); /* Update signalfd accordingly */ if (sigismember(&m->signal_mask, signum)) return thread; sigaddset(&m->signal_mask, signum); signalfd(m->signal_fd, &m->signal_mask, 0); return thread; } #endif /* Cancel thread from scheduler. */ void thread_cancel(thread_t *thread) { thread_master_t *m; if (!thread || thread->type == THREAD_UNUSED) return; m = thread->master; switch (thread->type) { case THREAD_READ: thread_event_del(thread, THREAD_FL_EPOLL_READ_BIT); rb_erase_cached(&thread->n, &m->read); break; case THREAD_WRITE: thread_event_del(thread, THREAD_FL_EPOLL_WRITE_BIT); rb_erase_cached(&thread->n, &m->write); break; case THREAD_TIMER: rb_erase_cached(&thread->n, &m->timer); break; case THREAD_CHILD: /* Does this need to kill the child, or is that the * caller's job? * This function is currently unused, so leave it for now. */ rb_erase_cached(&thread->n, &m->child); rb_erase(&thread->rb_data, &m->child_pid); break; case THREAD_READY_FD: case THREAD_READ_TIMEOUT: case THREAD_WRITE_TIMEOUT: if (thread->event) { rb_erase(&thread->event->n, &m->io_events); FREE(thread->event); } /* ... falls through ... */ case THREAD_EVENT: case THREAD_READY: #ifdef USE_SIGNAL_THREADS case THREAD_SIGNAL: #endif case THREAD_CHILD_TIMEOUT: case THREAD_CHILD_TERMINATED: list_head_del(&thread->next); break; default: break; } thread_add_unuse(m, thread); } void thread_cancel_read(thread_master_t *m, int fd) { thread_t *thread, *thread_tmp; rb_for_each_entry_safe_cached(thread, thread_tmp, &m->read, n) { if (thread->u.fd == fd) { if (thread->event->write) { thread_cancel(thread->event->write); thread->event->write = NULL; } thread_cancel(thread); break; } } } #ifdef _INCLUDE_UNUSED_CODE_ /* Delete all events which has argument value arg. */ void thread_cancel_event(thread_master_t *m, void *arg) { thread_t *thread, *thread_tmp; list_head_t *l = &m->event; // Why doesn't this use thread_cancel() above list_for_each_entry_safe(thread, thread_tmp, l, next) { if (thread->arg == arg) { list_head_del(&thread->next); thread_add_unuse(m, thread); } } } #endif #ifdef _WITH_SNMP_ static int snmp_read_thread(thread_t *thread) { fd_set snmp_fdset; FD_ZERO(&snmp_fdset); FD_SET(thread->u.fd, &snmp_fdset); snmp_read(&snmp_fdset); netsnmp_check_outstanding_agent_requests(); thread_add_read(thread->master, snmp_read_thread, thread->arg, thread->u.fd, TIMER_NEVER); return 0; } int snmp_timeout_thread(thread_t *thread) { snmp_timeout(); run_alarms(); netsnmp_check_outstanding_agent_requests(); thread->master->snmp_timer_thread = thread_add_timer(thread->master, snmp_timeout_thread, thread->arg, TIMER_NEVER); return 0; } // See https://vincent.bernat.im/en/blog/2012-snmp-event-loop static void snmp_epoll_info(thread_master_t *m) { fd_set snmp_fdset; int fdsetsize = 0; int max_fdsetsize; struct timeval snmp_timer_wait = { .tv_sec = TIMER_DISABLED }; int snmpblock = true; unsigned long *old_set, *new_set; // Must be unsigned for ffsl() to work for us unsigned long diff; int i; int fd; int bit; #if 0 // TODO #if sizeof fd_mask != sizeof diff #error "snmp_epoll_info sizeof(fd_mask) does not match old_set/new_set/diff" #endif #endif FD_ZERO(&snmp_fdset); /* When SNMP is enabled, we may have to select() on additional * FD. snmp_select_info() will add them to `readfd'. The trick * with this function is its last argument. We need to set it * true to set its own timer that we then compare against ours. */ snmp_select_info(&fdsetsize, &snmp_fdset, &snmp_timer_wait, &snmpblock); if (snmpblock) snmp_timer_wait.tv_sec = TIMER_DISABLED; timer_thread_update_timeout(m->snmp_timer_thread, timer_long(snmp_timer_wait)); max_fdsetsize = m->snmp_fdsetsize > fdsetsize ? m->snmp_fdsetsize : fdsetsize; if (!max_fdsetsize) return; for (i = 0, old_set = (unsigned long *)&m->snmp_fdset, new_set = (unsigned long *)&snmp_fdset; i <= max_fdsetsize / (int)sizeof(*new_set); i++, old_set++, new_set++) { if (*old_set == *new_set) continue; diff = *old_set ^ *new_set; fd = i * sizeof(*old_set) * CHAR_BIT - 1; do { bit = ffsl(diff); diff >>= bit; fd += bit; if (FD_ISSET(fd, &snmp_fdset)) { /* Add the fd */ thread_add_read(m, snmp_read_thread, 0, fd, TIMER_NEVER); FD_SET(fd, &m->snmp_fdset); } else { /* Remove the fd */ thread_del_read_fd(m, fd); FD_CLR(fd, &m->snmp_fdset); } } while (diff); } m->snmp_fdsetsize = fdsetsize; } #endif /* Fetch next ready thread. */ static list_head_t * thread_fetch_next_queue(thread_master_t *m) { int sav_errno; int last_epoll_errno = 0; int ret; int i; assert(m != NULL); /* If there is event process it first. */ if (m->event.next != &m->event) return &m->event; /* If there are ready threads process them */ if (m->ready.next != &m->ready) return &m->ready; do { #ifdef _WITH_SNMP_ if (snmp_running) snmp_epoll_info(m); #endif /* Calculate and set wait timer. Take care of timeouted fd. */ thread_set_timer(m); #ifdef _VRRP_FD_DEBUG_ if (extra_threads_debug) extra_threads_debug(); #endif #ifdef _EPOLL_THREAD_DUMP_ if (do_epoll_thread_dump) dump_thread_data(m, NULL); #endif #ifdef _EPOLL_DEBUG_ if (do_epoll_debug) log_message(LOG_INFO, "calling epoll_wait"); #endif /* Call epoll function. */ ret = epoll_wait(m->epoll_fd, m->epoll_events, m->epoll_count, -1); sav_errno = errno; #ifdef _EPOLL_DEBUG_ if (do_epoll_debug) { if (ret == -1) log_message(LOG_INFO, "epoll_wait returned %d, errno %d", ret, sav_errno); else log_message(LOG_INFO, "epoll_wait returned %d fds", ret); } #endif if (ret < 0) { if (sav_errno == EINTR) continue; /* Real error. */ if (sav_errno != last_epoll_errno) { /* Log the error first time only */ log_message(LOG_INFO, "scheduler: epoll_wait error: %s", strerror(sav_errno)); last_epoll_errno = sav_errno; } /* Make sure we don't sit it a tight loop */ if (sav_errno == EBADF || sav_errno == EFAULT || sav_errno == EINVAL) sleep(1); continue; } /* Handle epoll events */ for (i = 0; i < ret; i++) { struct epoll_event *ep_ev; thread_event_t *ev; ep_ev = &m->epoll_events[i]; ev = ep_ev->data.ptr; /* Error */ // TODO - no thread processing function handles THREAD_READ_ERROR/THREAD_WRITE_ERROR yet if (ep_ev->events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) { if (ev->read) { thread_move_ready(m, &m->read, ev->read, THREAD_READ_ERROR); ev->read = NULL; } if (ev->write) { thread_move_ready(m, &m->write, ev->write, THREAD_WRITE_ERROR); ev->write = NULL; } continue; } /* READ */ if (ep_ev->events & EPOLLIN) { if (!ev->read) { log_message(LOG_INFO, "scheduler: No read thread bound on fd:%d (fl:0x%.4X)" , ev->fd, ep_ev->events); assert(0); } thread_move_ready(m, &m->read, ev->read, THREAD_READY_FD); ev->read = NULL; } /* WRITE */ if (ep_ev->events & EPOLLOUT) { if (!ev->write) { log_message(LOG_INFO, "scheduler: No write thread bound on fd:%d (fl:0x%.4X)" , ev->fd, ep_ev->events); assert(0); } thread_move_ready(m, &m->write, ev->write, THREAD_READY_FD); ev->write = NULL; } if (ep_ev->events & EPOLLHUP) { log_message(LOG_INFO, "Received EPOLLHUP for fd %d", ev->fd); } if (ep_ev->events & EPOLLERR) { log_message(LOG_INFO, "Received EPOLLERR for fd %d", ev->fd); } } /* Update current time */ set_time_now(); /* If there is a ready thread, return it. */ if (m->ready.next != &m->ready) return &m->ready; } while (true); } /* Call thread ! */ static inline void thread_call(thread_t * thread) { #ifdef _EPOLL_DEBUG_ if (do_epoll_debug) log_message(LOG_INFO, "Calling thread function %s(), type %s, val/fd/pid %d, status %d id %lu", get_function_name(thread->func), get_thread_type_str(thread->type), thread->u.val, thread->u.c.status, thread->id); #endif (*thread->func) (thread); } void process_threads(thread_master_t *m) { thread_t* thread; list_head_t *thread_list; int thread_type; /* * Processing the master thread queues, * return and execute one ready thread. */ while ((thread_list = thread_fetch_next_queue(m))) { /* Run until error, used for debuging only */ #if defined _DEBUG_ && defined _MEM_CHECK_ if (__test_bit(MEM_ERR_DETECT_BIT, &debug) #ifdef _WITH_VRRP_ && __test_bit(DONT_RELEASE_VRRP_BIT, &debug) #endif ) { __clear_bit(MEM_ERR_DETECT_BIT, &debug); #ifdef _WITH_VRRP_ __clear_bit(DONT_RELEASE_VRRP_BIT, &debug); #endif thread_add_terminate_event(master); } #endif /* If we are shutting down, only process relevant thread types. * We only want timer and signal fd, and don't want inotify, vrrp socket, * snmp_read, bfd_receiver, bfd pipe in vrrp/check, dbus pipe or netlink fds. */ thread = thread_trim_head(thread_list); if (!shutting_down || (thread->type == THREAD_READY_FD && (thread->u.fd == m->timer_fd || thread->u.fd == m->signal_fd)) || thread->type == THREAD_CHILD || thread->type == THREAD_CHILD_TIMEOUT || thread->type == THREAD_CHILD_TERMINATED || thread->type == THREAD_TIMER_SHUTDOWN || thread->type == THREAD_TERMINATE) { if (thread->func) thread_call(thread); if (thread->type == THREAD_TERMINATE_START) shutting_down = true; } m->current_event = (thread->type == THREAD_READY_FD) ? thread->event : NULL; thread_type = thread->type; thread_add_unuse(master, thread); /* If we are shutting down, and the shutdown timer is not running and * all children have terminated, then we can terminate */ if (shutting_down && !m->shutdown_timer_running && !m->child.rb_root.rb_node) break; /* If daemon hanging event is received stop processing */ if (thread_type == THREAD_TERMINATE) break; } } static void process_child_termination(pid_t pid, int status) { thread_master_t * m = master; thread_t th = { .u.c.pid = pid }; thread_t *thread; bool permanent_vrrp_checker_error = false; #ifndef _DEBUG_ if (prog_type == PROG_TYPE_PARENT) permanent_vrrp_checker_error = report_child_status(status, pid, NULL); #endif thread = rb_search(&master->child_pid, &th, rb_data, thread_child_pid_cmp); #ifdef _EPOLL_DEBUG_ if (do_epoll_debug) log_message(LOG_INFO, "Child %d terminated with status 0x%x, thread_id %lu", pid, status, thread ? thread->id : 0); #endif if (!thread) return; rb_erase(&thread->rb_data, &master->child_pid); thread->u.c.status = status; if (permanent_vrrp_checker_error) { /* The child had a permanant error, so no point in respawning */ rb_erase_cached(&thread->n, &m->child); thread_add_unuse(m, thread); thread_add_terminate_event(m); } else thread_move_ready(m, &m->child, thread, THREAD_CHILD_TERMINATED); } /* Synchronous signal handler to reap child processes */ void thread_child_handler(__attribute__((unused)) void *v, __attribute__((unused)) int unused) { pid_t pid; int status; while ((pid = waitpid(-1, &status, WNOHANG))) { if (pid == -1) { if (errno == ECHILD) return; DBG("waitpid error: %s", strerror(errno)); assert(0); } process_child_termination(pid, status); } } void thread_add_base_threads(thread_master_t *m) { m->timer_thread = thread_add_read(m, thread_timerfd_handler, NULL, m->timer_fd, TIMER_NEVER); add_signal_read_thread(m); #ifdef _WITH_SNMP_ m->snmp_timer_thread = thread_add_timer(m, snmp_timeout_thread, 0, TIMER_NEVER); #endif } /* Our infinite scheduling loop */ void launch_thread_scheduler(thread_master_t *m) { // TODO - do this somewhere better signal_set(SIGCHLD, thread_child_handler, m); process_threads(m); } #ifdef THREAD_DUMP void register_scheduler_addresses(void) { #ifdef _WITH_SNMP_ register_thread_address("snmp_timeout_thread", snmp_timeout_thread); register_thread_address("snmp_read_thread", snmp_read_thread); #endif register_thread_address("thread_timerfd_handler", thread_timerfd_handler); register_signal_handler_address("thread_child_handler", thread_child_handler); } #endif