/*
* Soft: Keepalived is a failover program for the LVS project
* <www.linuxvirtualserver.org>. It monitor & manipulate
* a loadbalanced server pool using multi-layer checks.
*
* Part: Scheduling framework. This code is highly inspired from
* the thread management routine (thread.c) present in the
* very nice zebra project (http://www.zebra.org).
*
* Author: Alexandre Cassen, <acassen@linux-vs.org>
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version
* 2 of the License, or (at your option) any later version.
*
* Copyright (C) 2001-2017 Alexandre Cassen, <acassen@gmail.com>
*/
#include "config.h"
/* SNMP should be included first: it redefines "FREE" */
#ifdef _WITH_SNMP_
#include <net-snmp/net-snmp-config.h>
#include <net-snmp/net-snmp-includes.h>
#include <net-snmp/agent/net-snmp-agent-includes.h>
#undef FREE
#endif
#ifndef _DEBUG_
#define NDEBUG
#endif
#include <assert.h>
#include <errno.h>
#include <sys/wait.h>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#ifdef HAVE_SIGNALFD
#include <sys/signalfd.h>
#endif
#include <sys/utsname.h>
#include <linux/version.h>
#include "scheduler.h"
#include "memory.h"
#include "rbtree.h"
#include "utils.h"
#include "signals.h"
#include "logger.h"
#include "bitops.h"
#include "git-commit.h"
#include "timer.h"
#if !HAVE_EPOLL_CREATE1 || !defined TFD_NONBLOCK
#include "old_socket.h"
#endif
#ifdef THREAD_DUMP
typedef struct _func_det {
const char *name;
int (*func)(thread_t *);
rb_node_t n;
} func_det_t;
#endif
/* global vars */
thread_master_t *master = NULL;
#ifndef _DEBUG_
prog_type_t prog_type; /* Parent/VRRP/Checker process */
#endif
#ifdef _WITH_SNMP_
bool snmp_running; /* True if this process is running SNMP */
#endif
/* local variables */
static bool shutting_down;
static int sav_argc;
static char **sav_argv;
#ifdef _EPOLL_DEBUG_
bool do_epoll_debug;
#endif
#ifdef _EPOLL_THREAD_DUMP_
bool do_epoll_thread_dump;
#endif
#ifdef THREAD_DUMP
static rb_root_t funcs = RB_ROOT;
#endif
#ifdef _VRRP_FD_DEBUG_
static void (*extra_threads_debug)(void);
#endif
/* Function that returns prog_name if pid is a known child */
static char const * (*child_finder_name)(pid_t);
#ifdef THREAD_DUMP
static const char *
get_thread_type_str(thread_type_t id)
{
if (id == THREAD_READ) return "READ";
if (id == THREAD_WRITE) return "WRITE";
if (id == THREAD_TIMER) return "TIMER";
if (id == THREAD_TIMER_SHUTDOWN) return "TIMER_SHUTDOWN";
if (id == THREAD_EVENT) return "EVENT";
if (id == THREAD_CHILD) return "CHILD";
if (id == THREAD_READY) return "READY";
if (id == THREAD_UNUSED) return "UNUSED";
if (id == THREAD_WRITE_TIMEOUT) return "WRITE_TIMEOUT";
if (id == THREAD_READ_TIMEOUT) return "READ_TIMEOUT";
if (id == THREAD_CHILD_TIMEOUT) return "CHILD_TIMEOUT";
if (id == THREAD_CHILD_TERMINATED) return "CHILD_TERMINATED";
if (id == THREAD_TERMINATE_START) return "TERMINATE_START";
if (id == THREAD_TERMINATE) return "TERMINATE";
if (id == THREAD_READY_FD) return "READY_FD";
if (id == THREAD_READ_ERROR) return "READ_ERROR";
if (id == THREAD_WRITE_ERROR) return "WRITE_ERROR";
#ifdef USE_SIGNAL_THREADS
if (id == THREAD_SIGNAL) return "SIGNAL";
#endif
return "unknown";
}
static inline int
function_cmp(const func_det_t *func1, const func_det_t *func2)
{
if (func1->func < func2->func)
return -1;
if (func1->func > func2->func)
return 1;
return 0;
}
static const char *
get_function_name(int (*func)(thread_t *))
{
func_det_t func_det = { .func = func };
func_det_t *match;
static char address[19];
if (!RB_EMPTY_ROOT(&funcs)) {
match = rb_search(&funcs, &func_det, n, function_cmp);
if (match)
return match->name;
}
snprintf(address, sizeof address, "%p", func);
return address;
}
const char *
get_signal_function_name(void (*func)(void *, int))
{
/* The cast should really be (int (*)(thread_t *))func, but gcc 8.1 produces
* a warning with -Wcast-function-type, that the cast is to an incompatible
* function type. Since we don't actually call the function, but merely use
* it to compare function addresses, what we cast it do doesn't really matter */
return get_function_name((void *)func);
}
void
register_thread_address(const char *func_name, int (*func)(thread_t *))
{
func_det_t *func_det;
func_det = (func_det_t *) MALLOC(sizeof(func_det_t));
if (!func_det)
return;
func_det->name = func_name;
func_det->func = func;
rb_insert_sort(&funcs, func_det, n, function_cmp);
}
void
register_signal_handler_address(const char *func_name, void (*func)(void *, int))
{
/* See comment in get_signal_function_name() above */
register_thread_address(func_name, (void *)func);
}
void
deregister_thread_addresses(void)
{
func_det_t *func_det, *func_det_tmp;
if (RB_EMPTY_ROOT(&funcs))
return;
rb_for_each_entry_safe(func_det, func_det_tmp, &funcs, n) {
rb_erase(&func_det->n, &funcs);
FREE(func_det);
}
}
#endif
#ifdef _VRRP_FD_DEBUG_
void
set_extra_threads_debug(void (*func)(void))
{
extra_threads_debug = func;
}
#endif
/* Move ready thread into ready queue */
static int
thread_move_ready(thread_master_t *m, rb_root_cached_t *root, thread_t *thread, int type)
{
rb_erase_cached(&thread->n, root);
INIT_LIST_HEAD(&thread->next);
list_add_tail(&thread->next, &m->ready);
if (thread->type != THREAD_TIMER_SHUTDOWN)
thread->type = type;
return 0;
}
/* Move ready thread into ready queue */
static void
thread_rb_move_ready(thread_master_t *m, rb_root_cached_t *root, int type)
{
thread_t *thread, *thread_tmp;
rb_for_each_entry_safe_cached(thread, thread_tmp, root, n) {
if (thread->sands.tv_sec == TIMER_DISABLED || timercmp(&time_now, &thread->sands, <))
break;
if (type == THREAD_READ_TIMEOUT)
thread->event->read = NULL;
else if (type == THREAD_WRITE_TIMEOUT)
thread->event->write = NULL;
thread_move_ready(m, root, thread, type);
}
}
/* Update timer value */
static void
thread_update_timer(rb_root_cached_t *root, timeval_t *timer_min)
{
thread_t *first;
if (!root->rb_root.rb_node)
return;
first = rb_entry(rb_first_cached(root), thread_t, n);
if (!first)
return;
if (first->sands.tv_sec == TIMER_DISABLED)
return;
if (!timerisset(timer_min) ||
timercmp(&first->sands, timer_min, <=))
*timer_min = first->sands;
}
/* Compute the wait timer. Take care of timeouted fd */
static void
thread_set_timer(thread_master_t *m)
{
timeval_t timer_wait;
struct itimerspec its;
/* Prepare timer */
timerclear(&timer_wait);
thread_update_timer(&m->timer, &timer_wait);
thread_update_timer(&m->write, &timer_wait);
thread_update_timer(&m->read, &timer_wait);
thread_update_timer(&m->child, &timer_wait);
if (timerisset(&timer_wait)) {
/* Re-read the current time to get the maximum accuracy */
set_time_now();
/* Take care about monotonic clock */
timersub(&timer_wait, &time_now, &timer_wait);
if (timer_wait.tv_sec < 0) {
/* This will disable the timerfd */
timerclear(&timer_wait);
}
} else {
/* set timer to a VERY long time */
timer_wait.tv_sec = LONG_MAX;
timer_wait.tv_usec = 0;
}
its.it_value.tv_sec = timer_wait.tv_sec;
if (!timerisset(&timer_wait)) {
/* We could try to avoid doing the epoll_wait since
* testing shows it takes about 4 microseconds
* for the timer to expire. */
its.it_value.tv_nsec = 1;
}
else
its.it_value.tv_nsec = timer_wait.tv_usec * 1000;
/* We don't want periodic timer expiry */
its.it_interval.tv_sec = its.it_interval.tv_nsec = 0;
timerfd_settime(m->timer_fd, 0, &its, NULL);
#ifdef _EPOLL_DEBUG_
if (do_epoll_debug)
log_message(LOG_INFO, "Setting timer_fd %lu.%9.9ld", its.it_value.tv_sec, its.it_value.tv_nsec);
#endif
}
static int
thread_timerfd_handler(thread_t *thread)
{
thread_master_t *m = thread->master;
uint64_t expired;
ssize_t len;
len = read(m->timer_fd, &expired, sizeof(expired));
if (len < 0)
log_message(LOG_ERR, "scheduler: Error reading on timerfd fd:%d (%m)", m->timer_fd);
/* Read, Write, Timer, Child thread. */
thread_rb_move_ready(m, &m->read, THREAD_READ_TIMEOUT);
thread_rb_move_ready(m, &m->write, THREAD_WRITE_TIMEOUT);
thread_rb_move_ready(m, &m->timer, THREAD_READY);
thread_rb_move_ready(m, &m->child, THREAD_CHILD_TIMEOUT);
/* Register next timerfd thread */
m->timer_thread = thread_add_read(m, thread_timerfd_handler, NULL, m->timer_fd, TIMER_NEVER);
return 0;
}
/* Child PID cmp helper */
static inline int
thread_child_pid_cmp(thread_t *t1, thread_t *t2)
{
return t1->u.c.pid - t2->u.c.pid;
}
void
set_child_finder_name(char const * (*func)(pid_t))
{
child_finder_name = func;
}
void
save_cmd_line_options(int argc, char **argv)
{
sav_argc = argc;
sav_argv = argv;
}
#ifndef _DEBUG_
static const char *
get_end(const char *str, size_t max_len)
{
size_t len = strlen(str);
const char *end;
if (len <= max_len)
return str + len;
end = str + max_len;
if (*end == ' ')
return end;
while (end > str && *--end != ' ');
if (end > str)
return end;
return str + max_len;
}
static void
log_options(const char *option, const char *option_str, unsigned indent)
{
const char *p = option_str;
size_t opt_len = strlen(option);
const char *end;
bool first_line = true;
while (*p) {
/* Skip leading spaces */
while (*p == ' ')
p++;
end = get_end(p, 100 - opt_len);
if (first_line) {
log_message(LOG_INFO, "%*s%s: %.*s", indent, "", option, (int)(end - p), p);
first_line = false;
}
else
log_message(LOG_INFO, "%*s%.*s", (int)(indent + opt_len + 2), "", (int)(end - p), p);
p = end;
}
}
void
log_command_line(unsigned indent)
{
size_t len = 0;
char *log_str;
char *p;
int i;
if (!sav_argv)
return;
for (i = 0; i < sav_argc; i++)
len += strlen(sav_argv[i]) + 3; /* Add opening and closing 's, and following space or '\0' */
log_str = MALLOC(len);
for (i = 0, p = log_str; i < sav_argc; i++)
p += sprintf(p, "%s'%s'", i ? " " : "", sav_argv[i]);
log_options("Command line", log_str, indent);
FREE(log_str);
}
/* report_child_status returns true if the exit is a hard error, so unable to continue */
bool
report_child_status(int status, pid_t pid, char const *prog_name)
{
char const *prog_id = NULL;
char pid_buf[12]; /* "pid 4194303" + '\0' - see definition of PID_MAX_LIMIT in include/linux/threads.h */
int exit_status ;
if (prog_name)
prog_id = prog_name;
else if (child_finder_name)
prog_id = child_finder_name(pid);
if (!prog_id) {
snprintf(pid_buf, sizeof(pid_buf), "pid %d", pid);
prog_id = pid_buf;
}
if (WIFEXITED(status)) {
exit_status = WEXITSTATUS(status);
/* Handle exit codes of vrrp or checker child */
if (exit_status == KEEPALIVED_EXIT_FATAL ||
exit_status == KEEPALIVED_EXIT_CONFIG) {
log_message(LOG_INFO, "%s exited with permanent error %s. Terminating", prog_id, exit_status == KEEPALIVED_EXIT_CONFIG ? "CONFIG" : "FATAL" );
return true;
}
if (exit_status != EXIT_SUCCESS)
log_message(LOG_INFO, "%s exited with status %d", prog_id, exit_status);
} else if (WIFSIGNALED(status)) {
if (WTERMSIG(status) == SIGSEGV) {
struct utsname uname_buf;
log_message(LOG_INFO, "%s exited due to segmentation fault (SIGSEGV).", prog_id);
log_message(LOG_INFO, " %s", "Please report a bug at https://github.com/acassen/keepalived/issues");
log_message(LOG_INFO, " %s", "and include this log from when keepalived started, a description");
log_message(LOG_INFO, " %s", "of what happened before the crash, your configuration file and the details below.");
log_message(LOG_INFO, " %s", "Also provide the output of keepalived -v, what Linux distro and version");
log_message(LOG_INFO, " %s", "you are running on, and whether keepalived is being run in a container or VM.");
log_message(LOG_INFO, " %s", "A failure to provide all this information may mean the crash cannot be investigated.");
log_message(LOG_INFO, " %s", "If you are able to provide a stack backtrace with gdb that would really help.");
log_message(LOG_INFO, " Source version %s %s%s", PACKAGE_VERSION,
#ifdef GIT_COMMIT
", git commit ", GIT_COMMIT
#else
"", ""
#endif
);
log_message(LOG_INFO, " Built with kernel headers for Linux %d.%d.%d",
(LINUX_VERSION_CODE >> 16) & 0xff,
(LINUX_VERSION_CODE >> 8) & 0xff,
(LINUX_VERSION_CODE ) & 0xff);
uname(&uname_buf);
log_message(LOG_INFO, " Running on %s %s %s", uname_buf.sysname, uname_buf.release, uname_buf.version);
log_command_line(2);
log_options("configure options", KEEPALIVED_CONFIGURE_OPTIONS, 2);
log_options("Config options", CONFIGURATION_OPTIONS, 2);
log_options("System options", SYSTEM_OPTIONS, 2);
// if (__test_bit(DONT_RESPAWN_BIT, &debug))
// segv_termination = true;
}
else
log_message(LOG_INFO, "%s exited due to signal %d", prog_id, WTERMSIG(status));
}
return false;
}
#endif
/* epoll related */
static int
thread_events_resize(thread_master_t *m, int delta)
{
unsigned int new_size;
m->epoll_count += delta;
if (m->epoll_count < m->epoll_size)
return 0;
new_size = ((m->epoll_count / THREAD_EPOLL_REALLOC_THRESH) + 1);
new_size *= THREAD_EPOLL_REALLOC_THRESH;
if (m->epoll_events)
FREE(m->epoll_events);
m->epoll_events = MALLOC(new_size * sizeof(struct epoll_event));
if (!m->epoll_events) {
m->epoll_size = 0;
return -1;
}
m->epoll_size = new_size;
return 0;
}
static inline int
thread_event_cmp(const thread_event_t *event1, const thread_event_t *event2)
{
if (event1->fd < event2->fd)
return -1;
if (event1->fd > event2->fd)
return 1;
return 0;
}
static thread_event_t *
thread_event_new(thread_master_t *m, int fd)
{
thread_event_t *event;
event = (thread_event_t *) MALLOC(sizeof(thread_event_t));
if (!event)
return NULL;
if (thread_events_resize(m, 1) < 0) {
FREE(event);
return NULL;
}
event->fd = fd;
rb_insert_sort(&m->io_events, event, n, thread_event_cmp);
return event;
}
static thread_event_t *
thread_event_get(thread_master_t *m, int fd)
{
thread_event_t event = { .fd = fd };
return rb_search(&m->io_events, &event, n, thread_event_cmp);
}
static int
thread_event_set(thread_t *thread)
{
thread_event_t *event = thread->event;
thread_master_t *m = thread->master;
struct epoll_event ev;
int op;
memset(&ev, 0, sizeof(struct epoll_event));
ev.data.ptr = event;
if (__test_bit(THREAD_FL_READ_BIT, &event->flags))
ev.events |= EPOLLIN;
if (__test_bit(THREAD_FL_WRITE_BIT, &event->flags))
ev.events |= EPOLLOUT;
if (__test_bit(THREAD_FL_EPOLL_BIT, &event->flags))
op = EPOLL_CTL_MOD;
else
op = EPOLL_CTL_ADD;
if (epoll_ctl(m->epoll_fd, op, event->fd, &ev) < 0) {
log_message(LOG_INFO, "scheduler: Error performing control on EPOLL instance (%m)");
return -1;
}
__set_bit(THREAD_FL_EPOLL_BIT, &event->flags);
return 0;
}
static int
thread_event_cancel(thread_t *thread)
{
thread_event_t *event = thread->event;
thread_master_t *m = thread->master;
if (!event) {
log_message(LOG_INFO, "scheduler: Error performing epoll_ctl DEL op no event linked?!");
return -1;
}
if (m->epoll_fd != -1 &&
epoll_ctl(m->epoll_fd, EPOLL_CTL_DEL, event->fd, NULL) < 0 &&
errno != EBADF) {
log_message(LOG_INFO, "scheduler: Error performing epoll_ctl DEL op for fd:%d (%m)", event->fd);
return -1;
}
rb_erase(&event->n, &m->io_events);
if (event == m->current_event)
m->current_event = NULL;
FREE(thread->event);
return 0;
}
static int
thread_event_del(thread_t *thread, unsigned flag)
{
thread_event_t *event = thread->event;
if (!__test_bit(flag, &event->flags))
return 0;
if (flag == THREAD_FL_EPOLL_READ_BIT) {
__clear_bit(THREAD_FL_READ_BIT, &event->flags);
if (!__test_bit(THREAD_FL_EPOLL_WRITE_BIT, &event->flags))
return thread_event_cancel(thread);
event->read = NULL;
}
else if (flag == THREAD_FL_EPOLL_WRITE_BIT) {
__clear_bit(THREAD_FL_WRITE_BIT, &event->flags);
if (!__test_bit(THREAD_FL_EPOLL_READ_BIT, &event->flags))
return thread_event_cancel(thread);
event->write = NULL;
}
if (thread_event_set(thread) < 0)
return -1;
__clear_bit(flag, &event->flags);
return 0;
}
/* Make thread master. */
thread_master_t *
thread_make_master(void)
{
thread_master_t *new;
new = (thread_master_t *) MALLOC(sizeof (thread_master_t));
#if HAVE_EPOLL_CREATE1
new->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
#else
new->epoll_fd = epoll_create(1);
#endif
if (new->epoll_fd < 0) {
log_message(LOG_INFO, "scheduler: Error creating EPOLL instance (%m)");
FREE(new);
return NULL;
}
#if !HAVE_EPOLL_CREATE1
if (set_sock_flags(new->epoll_fd, F_SETFD, FD_CLOEXEC))
log_message(LOG_INFO, "Unable to set CLOEXEC on epoll_fd - %s (%d)", strerror(errno), errno);
#endif
new->read = RB_ROOT_CACHED;
new->write = RB_ROOT_CACHED;
new->timer = RB_ROOT_CACHED;
new->child = RB_ROOT_CACHED;
new->io_events = RB_ROOT;
new->child_pid = RB_ROOT;
INIT_LIST_HEAD(&new->event);
#ifdef USE_SIGNAL_THREADS
INIT_LIST_HEAD(&new->signal);
#endif
INIT_LIST_HEAD(&new->ready);
INIT_LIST_HEAD(&new->unuse);
/* Register timerfd thread */
new->timer_fd = timerfd_create(CLOCK_MONOTONIC,
#ifdef TFD_NONBLOCK /* Since Linux 2.6.27 */
TFD_NONBLOCK | TFD_CLOEXEC
#else
0
#endif
);
if (new->timer_fd < 0) {
log_message(LOG_ERR, "scheduler: Cant create timerfd (%m)");
FREE(new);
return NULL;
}
#ifndef TFD_NONBLOCK
if (set_sock_flags(new->timer_fd, F_SETFL, O_NONBLOCK))
log_message(LOG_INFO, "Unable to set NONBLOCK on timer_fd - %s (%d)", strerror(errno), errno);
if (set_sock_flags(new->timer_fd, F_SETFD, FD_CLOEXEC))
log_message(LOG_INFO, "Unable to set CLOEXEC on timer_fd - %s (%d)", strerror(errno), errno);
#endif
new->signal_fd = signal_handler_init();
new->timer_thread = thread_add_read(new, thread_timerfd_handler, NULL, new->timer_fd, TIMER_NEVER);
add_signal_read_thread(new);
return new;
}
#ifdef THREAD_DUMP
static char *
timer_delay(timeval_t sands)
{
static char str[42];
if (sands.tv_sec == TIMER_DISABLED)
return "NEVER";
if (sands.tv_sec == 0 && sands.tv_usec == 0)
return "UNSET";
if (timercmp(&sands, &time_now, >=)) {
sands = timer_sub_now(sands);
snprintf(str, sizeof str, "%lu.%6.6ld", sands.tv_sec, sands.tv_usec);
} else {
timersub(&time_now, &sands, &sands);
snprintf(str, sizeof str, "-%lu.%6.6ld", sands.tv_sec, sands.tv_usec);
}
return str;
}
/* Dump rbtree */
static void
thread_rb_dump(rb_root_cached_t *root, const char *tree, FILE *fp)
{
thread_t *thread;
int i = 1;
conf_write(fp, "----[ Begin rb_dump %s ]----", tree);
rb_for_each_entry_cached(thread, root, n)
conf_write(fp, "#%.2d Thread type %s, event_fd %d, val/fd/pid %d, timer: %s, func %s(), id %ld", i++, get_thread_type_str(thread->type), thread->event ? thread->event->fd: -2, thread->u.val, timer_delay(thread->sands), get_function_name(thread->func), thread->id);
conf_write(fp, "----[ End rb_dump ]----");
}
static void
thread_list_dump(list_head_t *l, const char *list, FILE *fp)
{
thread_t *thread;
int i = 1;
conf_write(fp, "----[ Begin list_dump %s ]----", list);
list_for_each_entry(thread, l, next)
conf_write(fp, "#%.2d Thread:%p type %s func %s() id %ld",
i++, thread, get_thread_type_str(thread->type), get_function_name(thread->func), thread->id);
conf_write(fp, "----[ End list_dump ]----");
}
static void
event_rb_dump(rb_root_t *root, const char *tree, FILE *fp)
{
thread_event_t *event;
int i = 1;
conf_write(fp, "----[ Begin rb_dump %s ]----", tree);
rb_for_each_entry(event, root, n)
conf_write(fp, "#%.2d event %p fd %d, flags: 0x%lx, read %p, write %p", i++, event, event->fd, event->flags, event->read, event->write);
conf_write(fp, "----[ End rb_dump ]----");
}
void
dump_thread_data(thread_master_t *m, FILE *fp)
{
thread_rb_dump(&m->read, "read", fp);
thread_rb_dump(&m->write, "write", fp);
thread_rb_dump(&m->child, "child", fp);
thread_rb_dump(&m->timer, "timer", fp);
thread_list_dump(&m->event, "event", fp);
thread_list_dump(&m->ready, "ready", fp);
#ifdef USE_SIGNAL_THREADS
thread_list_dump(&m->signal, "signal", fp);
#endif
thread_list_dump(&m->unuse, "unuse", fp);
event_rb_dump(&m->io_events, "io_events", fp);
}
#endif
/* declare thread_timer_cmp() for rbtree compares */
RB_TIMER_CMP(thread);
/* Free all unused thread. */
static void
thread_clean_unuse(thread_master_t * m)
{
thread_t *thread, *thread_tmp;
list_head_t *l = &m->unuse;
list_for_each_entry_safe(thread, thread_tmp, l, next) {
list_head_del(&thread->next);
/* free the thread */
FREE(thread);
m->alloc--;
}
INIT_LIST_HEAD(l);
}
/* Move thread to unuse list. */
static void
thread_add_unuse(thread_master_t *m, thread_t *thread)
{
assert(m != NULL);
thread->type = THREAD_UNUSED;
thread->event = NULL;
INIT_LIST_HEAD(&thread->next);
list_add_tail(&thread->next, &m->unuse);
}
/* Move list element to unuse queue */
static void
thread_destroy_list(thread_master_t *m, list_head_t *l)
{
thread_t *thread, *thread_tmp;
list_for_each_entry_safe(thread, thread_tmp, l, next) {
if (thread->event) {
thread_del_read(thread);
thread_del_write(thread);
}
list_head_del(&thread->next);
thread_add_unuse(m, thread);
}
}
static void
thread_destroy_rb(thread_master_t *m, rb_root_cached_t *root)
{
thread_t *thread, *thread_tmp;
rb_for_each_entry_safe_cached(thread, thread_tmp, root, n) {
rb_erase_cached(&thread->n, root);
/* Do we have a thread_event, and does it need deleting? */
if (thread->type == THREAD_READ)
thread_del_read(thread);
else if (thread->type == THREAD_WRITE)
thread_del_write(thread);
thread_add_unuse(m, thread);
}
}
/* Cleanup master */
void
thread_cleanup_master(thread_master_t * m)
{
/* Unuse current thread lists */
thread_destroy_rb(m, &m->read);
thread_destroy_rb(m, &m->write);
thread_destroy_rb(m, &m->timer);
thread_destroy_rb(m, &m->child);
thread_destroy_list(m, &m->event);
#ifdef USE_SIGNAL_THREADS
thread_destroy_list(m, &m->signal);
#endif
thread_destroy_list(m, &m->ready);
m->child_pid = RB_ROOT;
/* Clean garbage */
thread_clean_unuse(m);
FREE(m->epoll_events);
m->epoll_size = 0;
m->epoll_count = 0;
m->timer_thread = NULL;
#ifdef _WITH_SNMP_
m->snmp_timer_thread = NULL;
FD_ZERO(&m->snmp_fdset);
m->snmp_fdsetsize = 0;
#endif
}
/* Stop thread scheduler. */
void
thread_destroy_master(thread_master_t * m)
{
if (m->epoll_fd != -1) {
close(m->epoll_fd);
m->epoll_fd = -1;
}
if (m->timer_fd != -1)
close(m->timer_fd);
if (m->signal_fd != -1)
signal_handler_destroy();
thread_cleanup_master(m);
FREE(m);
}
/* Delete top of the list and return it. */
static thread_t *
thread_trim_head(list_head_t *l)
{
thread_t *thread;
if (list_empty(l))
return NULL;
thread = list_first_entry(l, thread_t, next);
list_del_init(&thread->next);
return thread;
}
/* Make unique thread id for non pthread version of thread manager. */
static inline unsigned long
thread_get_id(thread_master_t *m)
{
return m->id++;
}
/* Make new thread. */
static thread_t *
thread_new(thread_master_t *m)
{
thread_t *new;
/* If one thread is already allocated return it */
new = thread_trim_head(&m->unuse);
if (!new) {
new = (thread_t *)MALLOC(sizeof(thread_t));
m->alloc++;
}
INIT_LIST_HEAD(&new->next);
new->id = thread_get_id(m);
return new;
}
/* Add new read thread. */
thread_t *
thread_add_read_sands(thread_master_t *m, int (*func) (thread_t *), void *arg, int fd, timeval_t *sands)
{
thread_event_t *event;
thread_t *thread;
assert(m != NULL);
/* I feel lucky ! :D */
if (m->current_event && m->current_event->fd == fd)
event = m->current_event;
else
event = thread_event_get(m, fd);
if (!event) {
if (!(event = thread_event_new(m, fd))) {
log_message(LOG_INFO, "scheduler: Cant allocate read event for fd [%d](%m)", fd);
return NULL;
}
}
else if (__test_bit(THREAD_FL_READ_BIT, &event->flags) && event->read) {
log_message(LOG_INFO, "scheduler: There is already read event %p (read %p) registered on fd [%d]", event, event->read, fd);
return NULL;
}
thread = thread_new(m);
thread->type = THREAD_READ;
thread->master = m;
thread->func = func;
thread->arg = arg;
thread->u.fd = fd;
thread->event = event;
/* Set & flag event */
__set_bit(THREAD_FL_READ_BIT, &event->flags);
event->read = thread;
if (!__test_bit(THREAD_FL_EPOLL_READ_BIT, &event->flags)) {
if (thread_event_set(thread) < 0) {
log_message(LOG_INFO, "scheduler: Cant register read event for fd [%d](%m)", fd);
thread_add_unuse(m, thread);
return NULL;
}
__set_bit(THREAD_FL_EPOLL_READ_BIT, &event->flags);
}
thread->sands = *sands;
/* Sort the thread. */
rb_insert_sort_cached(&m->read, thread, n, thread_timer_cmp);
return thread;
}
thread_t *
thread_add_read(thread_master_t *m, int (*func) (thread_t *), void *arg, int fd, unsigned long timer)
{
timeval_t sands;
/* Compute read timeout value */
if (timer == TIMER_NEVER)
sands.tv_sec = TIMER_DISABLED;
else {
set_time_now();
sands = timer_add_long(time_now, timer);
}
return thread_add_read_sands(m, func, arg, fd, &sands);
}
int
thread_del_read(thread_t *thread)
{
if (!thread || !thread->event)
return -1;
if (thread_event_del(thread, THREAD_FL_EPOLL_READ_BIT) < 0)
return -1;
return 0;
}
#ifdef _WITH_SNMP_
static void
thread_del_read_fd(thread_master_t *m, int fd)
{
thread_event_t *event;
event = thread_event_get(m, fd);
if (!event || !event->read)
return;
thread_cancel(event->read);
}
#endif
static void
thread_read_requeue(thread_master_t *m, int fd, const timeval_t *new_sands)
{
thread_t *thread;
thread_event_t *event;
event = thread_event_get(m, fd);
if (!event || !event->read)
return;
thread = event->read;
if (thread->type != THREAD_READ) {
/* If the thread is not on the read list, don't touch it */
return;
}
thread->sands = *new_sands;
rb_move_cached(&thread->master->read, thread, n, thread_timer_cmp);
}
void
thread_requeue_read(thread_master_t *m, int fd, const timeval_t *sands)
{
thread_read_requeue(m, fd, sands);
}
/* Add new write thread. */
thread_t *
thread_add_write(thread_master_t *m, int (*func) (thread_t *), void *arg, int fd, unsigned long timer)
{
thread_event_t *event;
thread_t *thread;
assert(m != NULL);
/* I feel lucky ! :D */
if (m->current_event && m->current_event->fd == fd)
event = m->current_event;
else
event = thread_event_get(m, fd);
if (!event) {
if (!(event = thread_event_new(m, fd))) {
log_message(LOG_INFO, "scheduler: Cant allocate write event for fd [%d](%m)", fd);
return NULL;
}
}
else if (__test_bit(THREAD_FL_WRITE_BIT, &event->flags) && event->write) {
log_message(LOG_INFO, "scheduler: There is already write event registered on fd [%d]", fd);
return NULL;
}
thread = thread_new(m);
thread->type = THREAD_WRITE;
thread->master = m;
thread->func = func;
thread->arg = arg;
thread->u.fd = fd;
thread->event = event;
/* Set & flag event */
__set_bit(THREAD_FL_WRITE_BIT, &event->flags);
event->write = thread;
if (!__test_bit(THREAD_FL_EPOLL_WRITE_BIT, &event->flags)) {
if (thread_event_set(thread) < 0) {
log_message(LOG_INFO, "scheduler: Cant register write event for fd [%d](%m)" , fd);
thread_add_unuse(m, thread);
return NULL;
}
__set_bit(THREAD_FL_EPOLL_WRITE_BIT, &event->flags);
}
/* Compute write timeout value */
if (timer == TIMER_NEVER)
thread->sands.tv_sec = TIMER_DISABLED;
else {
set_time_now();
thread->sands = timer_add_long(time_now, timer);
}
/* Sort the thread. */
rb_insert_sort_cached(&m->write, thread, n, thread_timer_cmp);
return thread;
}
int
thread_del_write(thread_t *thread)
{
if (!thread || !thread->event)
return -1;
if (thread_event_del(thread, THREAD_FL_EPOLL_WRITE_BIT) < 0)
return -1;
return 0;
}
void
thread_close_fd(thread_t *thread)
{
if (thread->u.fd == -1)
return;
if (thread->event)
thread_event_cancel(thread);
close(thread->u.fd);
thread->u.fd = -1;
}
/* Add timer event thread. */
thread_t *
thread_add_timer(thread_master_t *m, int (*func) (thread_t *), void *arg, unsigned long timer)
{
thread_t *thread;
assert(m != NULL);
thread = thread_new(m);
thread->type = THREAD_TIMER;
thread->master = m;
thread->func = func;
thread->arg = arg;
/* Do we need jitter here? */
if (timer == TIMER_NEVER)
thread->sands.tv_sec = TIMER_DISABLED;
else {
set_time_now();
thread->sands = timer_add_long(time_now, timer);
}
/* Sort by timeval. */
rb_insert_sort_cached(&m->timer, thread, n, thread_timer_cmp);
return thread;
}
void
timer_thread_update_timeout(thread_t *thread, unsigned long timer)
{
timeval_t sands;
if (thread->type > THREAD_MAX_WAITING) {
/* It is probably on the ready list, so we'd better just let it run */
return;
}
set_time_now();
sands = timer_add_long(time_now, timer);
if (timercmp(&thread->sands, &sands, ==))
return;
thread->sands = sands;
rb_move_cached(&thread->master->timer, thread, n, thread_timer_cmp);
}
thread_t *
thread_add_timer_shutdown(thread_master_t *m, int(*func)(thread_t *), void *arg, unsigned long timer)
{
thread_t *thread = thread_add_timer(m, func, arg, timer);
thread->type = THREAD_TIMER_SHUTDOWN;
return thread;
}
/* Add a child thread. */
thread_t *
thread_add_child(thread_master_t * m, int (*func) (thread_t *), void * arg, pid_t pid, unsigned long timer)
{
thread_t *thread;
assert(m != NULL);
thread = thread_new(m);
thread->type = THREAD_CHILD;
thread->master = m;
thread->func = func;
thread->arg = arg;
thread->u.c.pid = pid;
thread->u.c.status = 0;
/* Compute child timeout value */
if (timer == TIMER_NEVER)
thread->sands.tv_sec = TIMER_DISABLED;
else {
set_time_now();
thread->sands = timer_add_long(time_now, timer);
}
/* Sort by timeval. */
rb_insert_sort_cached(&m->child, thread, n, thread_timer_cmp);
/* Sort by PID */
rb_insert_sort(&m->child_pid, thread, rb_data, thread_child_pid_cmp);
return thread;
}
void
thread_children_reschedule(thread_master_t *m, int (*func)(thread_t *), unsigned long timer)
{
thread_t *thread;
// What is this used for ??
set_time_now();
rb_for_each_entry_cached(thread, &m->child, n) {
thread->func = func;
thread->sands = timer_add_long(time_now, timer);
}
}
/* Add simple event thread. */
thread_t *
thread_add_event(thread_master_t * m, int (*func) (thread_t *), void *arg, int val)
{
thread_t *thread;
assert(m != NULL);
thread = thread_new(m);
thread->type = THREAD_EVENT;
thread->master = m;
thread->func = func;
thread->arg = arg;
thread->u.val = val;
INIT_LIST_HEAD(&thread->next);
list_add_tail(&thread->next, &m->event);
return thread;
}
/* Add terminate event thread. */
static thread_t *
thread_add_generic_terminate_event(thread_master_t * m, thread_type_t type, int (*func)(thread_t *))
{
thread_t *thread;
assert(m != NULL);
thread = thread_new(m);
thread->type = type;
thread->master = m;
thread->func = func;
thread->arg = NULL;
thread->u.val = 0;
INIT_LIST_HEAD(&thread->next);
list_add_tail(&thread->next, &m->event);
return thread;
}
thread_t *
thread_add_terminate_event(thread_master_t *m)
{
return thread_add_generic_terminate_event(m, THREAD_TERMINATE, NULL);
}
thread_t *
thread_add_start_terminate_event(thread_master_t *m, int(*func)(thread_t *))
{
return thread_add_generic_terminate_event(m, THREAD_TERMINATE_START, func);
}
#ifdef USE_SIGNAL_THREADS
/* Add signal thread. */
thread_t *
thread_add_signal(thread_master_t *m, int (*func) (thread_t *), void *arg, int signum)
{
thread_t *thread;
assert(m != NULL);
thread = thread_new(m);
thread->type = THREAD_SIGNAL;
thread->master = m;
thread->func = func;
thread->arg = arg;
thread->u.val = signum;
INIT_LIST_HEAD(&thread->next);
list_add_tail(&thread->next, &m->signal);
/* Update signalfd accordingly */
if (sigismember(&m->signal_mask, signum))
return thread;
sigaddset(&m->signal_mask, signum);
signalfd(m->signal_fd, &m->signal_mask, 0);
return thread;
}
#endif
/* Cancel thread from scheduler. */
void
thread_cancel(thread_t *thread)
{
thread_master_t *m;
if (!thread || thread->type == THREAD_UNUSED)
return;
m = thread->master;
switch (thread->type) {
case THREAD_READ:
thread_event_del(thread, THREAD_FL_EPOLL_READ_BIT);
rb_erase_cached(&thread->n, &m->read);
break;
case THREAD_WRITE:
thread_event_del(thread, THREAD_FL_EPOLL_WRITE_BIT);
rb_erase_cached(&thread->n, &m->write);
break;
case THREAD_TIMER:
rb_erase_cached(&thread->n, &m->timer);
break;
case THREAD_CHILD:
/* Does this need to kill the child, or is that the
* caller's job?
* This function is currently unused, so leave it for now.
*/
rb_erase_cached(&thread->n, &m->child);
rb_erase(&thread->rb_data, &m->child_pid);
break;
case THREAD_READY_FD:
case THREAD_READ_TIMEOUT:
case THREAD_WRITE_TIMEOUT:
if (thread->event) {
rb_erase(&thread->event->n, &m->io_events);
FREE(thread->event);
}
/* ... falls through ... */
case THREAD_EVENT:
case THREAD_READY:
#ifdef USE_SIGNAL_THREADS
case THREAD_SIGNAL:
#endif
case THREAD_CHILD_TIMEOUT:
case THREAD_CHILD_TERMINATED:
list_head_del(&thread->next);
break;
default:
break;
}
thread_add_unuse(m, thread);
}
void
thread_cancel_read(thread_master_t *m, int fd)
{
thread_t *thread, *thread_tmp;
rb_for_each_entry_safe_cached(thread, thread_tmp, &m->read, n) {
if (thread->u.fd == fd) {
if (thread->event->write) {
thread_cancel(thread->event->write);
thread->event->write = NULL;
}
thread_cancel(thread);
break;
}
}
}
#ifdef _INCLUDE_UNUSED_CODE_
/* Delete all events which has argument value arg. */
void
thread_cancel_event(thread_master_t *m, void *arg)
{
thread_t *thread, *thread_tmp;
list_head_t *l = &m->event;
// Why doesn't this use thread_cancel() above
list_for_each_entry_safe(thread, thread_tmp, l, next) {
if (thread->arg == arg) {
list_head_del(&thread->next);
thread_add_unuse(m, thread);
}
}
}
#endif
#ifdef _WITH_SNMP_
static int
snmp_read_thread(thread_t *thread)
{
fd_set snmp_fdset;
FD_ZERO(&snmp_fdset);
FD_SET(thread->u.fd, &snmp_fdset);
snmp_read(&snmp_fdset);
netsnmp_check_outstanding_agent_requests();
thread_add_read(thread->master, snmp_read_thread, thread->arg, thread->u.fd, TIMER_NEVER);
return 0;
}
int
snmp_timeout_thread(thread_t *thread)
{
snmp_timeout();
run_alarms();
netsnmp_check_outstanding_agent_requests();
thread->master->snmp_timer_thread = thread_add_timer(thread->master, snmp_timeout_thread, thread->arg, TIMER_NEVER);
return 0;
}
// See https://vincent.bernat.im/en/blog/2012-snmp-event-loop
static void
snmp_epoll_info(thread_master_t *m)
{
fd_set snmp_fdset;
int fdsetsize = 0;
int max_fdsetsize;
struct timeval snmp_timer_wait = { .tv_sec = TIMER_DISABLED };
int snmpblock = true;
unsigned long *old_set, *new_set; // Must be unsigned for ffsl() to work for us
unsigned long diff;
int i;
int fd;
int bit;
#if 0
// TODO
#if sizeof fd_mask != sizeof diff
#error "snmp_epoll_info sizeof(fd_mask) does not match old_set/new_set/diff"
#endif
#endif
FD_ZERO(&snmp_fdset);
/* When SNMP is enabled, we may have to select() on additional
* FD. snmp_select_info() will add them to `readfd'. The trick
* with this function is its last argument. We need to set it
* true to set its own timer that we then compare against ours. */
snmp_select_info(&fdsetsize, &snmp_fdset, &snmp_timer_wait, &snmpblock);
if (snmpblock)
snmp_timer_wait.tv_sec = TIMER_DISABLED;
timer_thread_update_timeout(m->snmp_timer_thread, timer_long(snmp_timer_wait));
max_fdsetsize = m->snmp_fdsetsize > fdsetsize ? m->snmp_fdsetsize : fdsetsize;
if (!max_fdsetsize)
return;
for (i = 0, old_set = (unsigned long *)&m->snmp_fdset, new_set = (unsigned long *)&snmp_fdset; i <= max_fdsetsize / (int)sizeof(*new_set); i++, old_set++, new_set++) {
if (*old_set == *new_set)
continue;
diff = *old_set ^ *new_set;
fd = i * sizeof(*old_set) * CHAR_BIT - 1;
do {
bit = ffsl(diff);
diff >>= bit;
fd += bit;
if (FD_ISSET(fd, &snmp_fdset)) {
/* Add the fd */
thread_add_read(m, snmp_read_thread, 0, fd, TIMER_NEVER);
FD_SET(fd, &m->snmp_fdset);
} else {
/* Remove the fd */
thread_del_read_fd(m, fd);
FD_CLR(fd, &m->snmp_fdset);
}
} while (diff);
}
m->snmp_fdsetsize = fdsetsize;
}
#endif
/* Fetch next ready thread. */
static list_head_t *
thread_fetch_next_queue(thread_master_t *m)
{
int sav_errno;
int last_epoll_errno = 0;
int ret;
int i;
assert(m != NULL);
/* If there is event process it first. */
if (m->event.next != &m->event)
return &m->event;
/* If there are ready threads process them */
if (m->ready.next != &m->ready)
return &m->ready;
do {
#ifdef _WITH_SNMP_
if (snmp_running)
snmp_epoll_info(m);
#endif
/* Calculate and set wait timer. Take care of timeouted fd. */
thread_set_timer(m);
#ifdef _VRRP_FD_DEBUG_
if (extra_threads_debug)
extra_threads_debug();
#endif
#ifdef _EPOLL_THREAD_DUMP_
if (do_epoll_thread_dump)
dump_thread_data(m, NULL);
#endif
#ifdef _EPOLL_DEBUG_
if (do_epoll_debug)
log_message(LOG_INFO, "calling epoll_wait");
#endif
/* Call epoll function. */
ret = epoll_wait(m->epoll_fd, m->epoll_events, m->epoll_count, -1);
sav_errno = errno;
#ifdef _EPOLL_DEBUG_
if (do_epoll_debug) {
if (ret == -1)
log_message(LOG_INFO, "epoll_wait returned %d, errno %d", ret, sav_errno);
else
log_message(LOG_INFO, "epoll_wait returned %d fds", ret);
}
#endif
if (ret < 0) {
if (sav_errno == EINTR)
continue;
/* Real error. */
if (sav_errno != last_epoll_errno) {
/* Log the error first time only */
log_message(LOG_INFO, "scheduler: epoll_wait error: %s", strerror(sav_errno));
last_epoll_errno = sav_errno;
}
/* Make sure we don't sit it a tight loop */
if (sav_errno == EBADF || sav_errno == EFAULT || sav_errno == EINVAL)
sleep(1);
continue;
}
/* Handle epoll events */
for (i = 0; i < ret; i++) {
struct epoll_event *ep_ev;
thread_event_t *ev;
ep_ev = &m->epoll_events[i];
ev = ep_ev->data.ptr;
/* Error */
// TODO - no thread processing function handles THREAD_READ_ERROR/THREAD_WRITE_ERROR yet
if (ep_ev->events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) {
if (ev->read) {
thread_move_ready(m, &m->read, ev->read, THREAD_READ_ERROR);
ev->read = NULL;
}
if (ev->write) {
thread_move_ready(m, &m->write, ev->write, THREAD_WRITE_ERROR);
ev->write = NULL;
}
continue;
}
/* READ */
if (ep_ev->events & EPOLLIN) {
if (!ev->read) {
log_message(LOG_INFO, "scheduler: No read thread bound on fd:%d (fl:0x%.4X)"
, ev->fd, ep_ev->events);
assert(0);
}
thread_move_ready(m, &m->read, ev->read, THREAD_READY_FD);
ev->read = NULL;
}
/* WRITE */
if (ep_ev->events & EPOLLOUT) {
if (!ev->write) {
log_message(LOG_INFO, "scheduler: No write thread bound on fd:%d (fl:0x%.4X)"
, ev->fd, ep_ev->events);
assert(0);
}
thread_move_ready(m, &m->write, ev->write, THREAD_READY_FD);
ev->write = NULL;
}
if (ep_ev->events & EPOLLHUP) {
log_message(LOG_INFO, "Received EPOLLHUP for fd %d", ev->fd);
}
if (ep_ev->events & EPOLLERR) {
log_message(LOG_INFO, "Received EPOLLERR for fd %d", ev->fd);
}
}
/* Update current time */
set_time_now();
/* If there is a ready thread, return it. */
if (m->ready.next != &m->ready)
return &m->ready;
} while (true);
}
/* Call thread ! */
static inline void
thread_call(thread_t * thread)
{
#ifdef _EPOLL_DEBUG_
if (do_epoll_debug)
log_message(LOG_INFO, "Calling thread function %s(), type %s, val/fd/pid %d, status %d id %lu", get_function_name(thread->func), get_thread_type_str(thread->type), thread->u.val, thread->u.c.status, thread->id);
#endif
(*thread->func) (thread);
}
void
process_threads(thread_master_t *m)
{
thread_t* thread;
list_head_t *thread_list;
int thread_type;
/*
* Processing the master thread queues,
* return and execute one ready thread.
*/
while ((thread_list = thread_fetch_next_queue(m))) {
/* Run until error, used for debuging only */
#if defined _DEBUG_ && defined _MEM_CHECK_
if (__test_bit(MEM_ERR_DETECT_BIT, &debug)
#ifdef _WITH_VRRP_
&& __test_bit(DONT_RELEASE_VRRP_BIT, &debug)
#endif
) {
__clear_bit(MEM_ERR_DETECT_BIT, &debug);
#ifdef _WITH_VRRP_
__clear_bit(DONT_RELEASE_VRRP_BIT, &debug);
#endif
thread_add_terminate_event(master);
}
#endif
/* If we are shutting down, only process relevant thread types.
* We only want timer and signal fd, and don't want inotify, vrrp socket,
* snmp_read, bfd_receiver, bfd pipe in vrrp/check, dbus pipe or netlink fds. */
thread = thread_trim_head(thread_list);
if (!shutting_down ||
(thread->type == THREAD_READY_FD &&
(thread->u.fd == m->timer_fd || thread->u.fd == m->signal_fd)) ||
thread->type == THREAD_CHILD ||
thread->type == THREAD_CHILD_TIMEOUT ||
thread->type == THREAD_CHILD_TERMINATED ||
thread->type == THREAD_TIMER_SHUTDOWN ||
thread->type == THREAD_TERMINATE) {
if (thread->func)
thread_call(thread);
if (thread->type == THREAD_TERMINATE_START)
shutting_down = true;
}
m->current_event = (thread->type == THREAD_READY_FD) ? thread->event : NULL;
thread_type = thread->type;
thread_add_unuse(master, thread);
/* If we are shutting down, and the shutdown timer is not running and
* all children have terminated, then we can terminate */
if (shutting_down && !m->shutdown_timer_running && !m->child.rb_root.rb_node)
break;
/* If daemon hanging event is received stop processing */
if (thread_type == THREAD_TERMINATE)
break;
}
}
static void
process_child_termination(pid_t pid, int status)
{
thread_master_t * m = master;
thread_t th = { .u.c.pid = pid };
thread_t *thread;
bool permanent_vrrp_checker_error = false;
#ifndef _DEBUG_
if (prog_type == PROG_TYPE_PARENT)
permanent_vrrp_checker_error = report_child_status(status, pid, NULL);
#endif
thread = rb_search(&master->child_pid, &th, rb_data, thread_child_pid_cmp);
#ifdef _EPOLL_DEBUG_
if (do_epoll_debug)
log_message(LOG_INFO, "Child %d terminated with status 0x%x, thread_id %lu", pid, status, thread ? thread->id : 0);
#endif
if (!thread)
return;
rb_erase(&thread->rb_data, &master->child_pid);
thread->u.c.status = status;
if (permanent_vrrp_checker_error)
{
/* The child had a permanant error, so no point in respawning */
rb_erase_cached(&thread->n, &m->child);
thread_add_unuse(m, thread);
thread_add_terminate_event(m);
}
else
thread_move_ready(m, &m->child, thread, THREAD_CHILD_TERMINATED);
}
/* Synchronous signal handler to reap child processes */
void
thread_child_handler(__attribute__((unused)) void *v, __attribute__((unused)) int unused)
{
pid_t pid;
int status;
while ((pid = waitpid(-1, &status, WNOHANG))) {
if (pid == -1) {
if (errno == ECHILD)
return;
DBG("waitpid error: %s", strerror(errno));
assert(0);
}
process_child_termination(pid, status);
}
}
void
thread_add_base_threads(thread_master_t *m)
{
m->timer_thread = thread_add_read(m, thread_timerfd_handler, NULL, m->timer_fd, TIMER_NEVER);
add_signal_read_thread(m);
#ifdef _WITH_SNMP_
m->snmp_timer_thread = thread_add_timer(m, snmp_timeout_thread, 0, TIMER_NEVER);
#endif
}
/* Our infinite scheduling loop */
void
launch_thread_scheduler(thread_master_t *m)
{
// TODO - do this somewhere better
signal_set(SIGCHLD, thread_child_handler, m);
process_threads(m);
}
#ifdef THREAD_DUMP
void
register_scheduler_addresses(void)
{
#ifdef _WITH_SNMP_
register_thread_address("snmp_timeout_thread", snmp_timeout_thread);
register_thread_address("snmp_read_thread", snmp_read_thread);
#endif
register_thread_address("thread_timerfd_handler", thread_timerfd_handler);
register_signal_handler_address("thread_child_handler", thread_child_handler);
}
#endif