/*
Copyright (C) 2013 ABRT Team
Copyright (C) 2013 Red Hat, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "abrt-inotify.h"
#include "abrt_glib.h"
#include "libabrt.h"
#define STRINGIZE_DETAIL(str) #str
#define STRINGIZE(str) STRINGIZE_DETAIL(str)
#define DEFAULT_COUNT_OF_WORKERS 10
#define DEFAULT_CACHE_MIB_SIZE 4
static int g_signal_pipe[2];
struct queue
{
unsigned capacity;
GQueue q;
};
static int
queue_push(struct queue *queue, char *value)
{
if (g_queue_get_length(&queue->q) >= queue->capacity)
return 0;
g_queue_push_head(&queue->q, value);
return 1;
}
static char *
queue_pop(struct queue *queue)
{
if (g_queue_is_empty(&queue->q))
return NULL;
return (char *)g_queue_pop_tail(&queue->q);
}
struct process
{
GMainLoop *main_loop;
const char *upload_directory;
unsigned children;
unsigned max_children;
struct queue queue;
};
static void
process_quit(struct process *proc)
{
g_main_loop_quit(proc->main_loop);
}
static void
run_abrt_handle_upload(struct process *proc, const char *name)
{
log_info("Processing file '%s' in directory '%s'", name, proc->upload_directory);
++proc->children;
log_debug("Running workers: %d", proc->children);
fflush(NULL); /* paranoia */
pid_t pid = fork();
if (pid < 0)
{
--proc->children;
perror_msg("fork");
return;
}
if (pid == 0)
{
/* child */
xchdir(proc->upload_directory);
if (g_settings_delete_uploaded)
execlp("abrt-handle-upload", "abrt-handle-upload", "-d",
g_settings_dump_location, proc->upload_directory, name, (char*)NULL);
else
execlp("abrt-handle-upload", "abrt-handle-upload",
g_settings_dump_location, proc->upload_directory, name, (char*)NULL);
perror_msg_and_die("Can't execute '%s'", "abrt-handle-upload");
}
}
static void
handle_new_path(struct process *proc, char *name)
{
log_warning("Detected creation of file '%s' in upload directory '%s'", name, proc->upload_directory);
if (proc->children < proc->max_children)
{
run_abrt_handle_upload(proc, name);
free(name);
return;
}
log_debug("Pushing '%s' to deferred queue", name);
if (!queue_push(&proc->queue, name))
{
error_msg(_("No free workers and full buffer. Omitting archive '%s'"), name);
free(name);
return;
}
}
static void
print_stats(struct process *proc)
{
/* this is meant only for debugging, so not marking it as translatable */
fprintf(stderr, "%i archives to process, %i active workers\n", g_queue_get_length(&proc->queue.q), proc->children);
}
static void
process_next_in_queue(struct process *proc)
{
char *name = queue_pop(&proc->queue);
if (!name)
{
log_debug("Deferred queue is empty. Running workers: %d", proc->children);
return;
}
run_abrt_handle_upload(proc, name);
free(name);
}
static void
handle_signal(int signo)
{
int save_errno = errno;
uint8_t sig_caught = signo;
if (write(g_signal_pipe[1], &sig_caught, 1))
/* we ignore result, if () shuts up stupid compiler */;
errno = save_errno;
}
static gboolean
handle_signal_pipe_cb(GIOChannel *gio, GIOCondition condition, gpointer user_data)
{
struct process *proc = (struct process *)user_data;
uint8_t signals[DEFAULT_COUNT_OF_WORKERS];
gsize len = 0;
for (;;)
{
GError *error = NULL;
GIOStatus stat = g_io_channel_read_chars(gio, (void *)signals, sizeof(signals), &len, NULL);
if (stat == G_IO_STATUS_ERROR)
{
error_msg_and_die(_("Can't read from gio channel: '%s'"), error ? error->message : "");
}
if (stat == G_IO_STATUS_AGAIN)
{ /* We got all buffered data, but fd is still open. Done for now */
return TRUE; /* "glib, please don't remove this event (yet)" */
}
if (stat == G_IO_STATUS_EOF)
break;
/* G_IO_STATUS_NORMAL */
for (unsigned signo = 0; signo < len; ++signo)
{
/* we did receive a signal */
log_debug("Got signal %d through signal pipe", signals[signo]);
if (signals[signo] == SIGUSR1)
{
print_stats(proc);
}
else if (signals[signo] != SIGCHLD)
{
process_quit(proc);
return FALSE; /* remove this event */
}
else
{
while (safe_waitpid(-1, NULL, WNOHANG) > 0)
{
--proc->children;
process_next_in_queue(proc);
print_stats(proc);
}
}
}
}
return TRUE; /* "please don't remove this event" */
}
static void
handle_inotify_cb(struct abrt_inotify_watch *watch, struct inotify_event *event, void *user_data)
{
/* Was the (presumable newly created) file closed in upload dir,
* or a file moved to upload dir? */
if (!(event->mask & IN_ISDIR) && (event->mask & (IN_CLOSE_WRITE | IN_MOVED_TO)))
{
const char *ext = strrchr(event->name, '.');
if (ext && strcmp(ext + 1, "working") == 0)
return;
handle_new_path((struct process *)user_data, xstrdup(event->name));
}
}
static void
daemonize()
{
/* forking to background */
fflush(NULL); /* paranoia */
pid_t pid = fork();
if (pid < 0)
perror_msg_and_die("fork");
if (pid > 0)
exit(0);
/* Child (daemon) continues */
if (setsid() < 0)
perror_msg_and_die("setsid");
/* Change the current working directory */
xchdir("/");
/* Reopen the standard file descriptors to "/dev/null" */
xmove_fd(xopen("/dev/null", O_RDWR), STDIN_FILENO);
xdup2(STDIN_FILENO, STDOUT_FILENO);
xdup2(STDIN_FILENO, STDERR_FILENO);
}
int
main(int argc, char **argv)
{
/* I18n */
setlocale(LC_ALL, "");
#if ENABLE_NLS
bindtextdomain(PACKAGE, LOCALEDIR);
textdomain(PACKAGE);
#endif
abrt_init(argv);
/* Can't keep these strings/structs static: _() doesn't support that */
const char *program_usage_string = _(
"& [-vs] [-w NUM] [-c MiB] [UPLOAD_DIRECTORY]\n"
"\n"
"\nWatches UPLOAD_DIRECTORY and unpacks incoming archives into DumpLocation"
"\nspecified in abrt.conf"
"\n"
"\nIf UPLOAD_DIRECTORY is not provided, uses a value of"
"\nWatchCrashdumpArchiveDir option from abrt.conf"
);
enum {
OPT_v = 1 << 0,
OPT_s = 1 << 1,
OPT_d = 1 << 2,
OPT_w = 1 << 3,
OPT_c = 1 << 4,
};
int concurrent_workers = DEFAULT_COUNT_OF_WORKERS;
int cache_size_mib = DEFAULT_CACHE_MIB_SIZE;
/* Keep enum above and order of options below in sync! */
struct options program_options[] = {
OPT__VERBOSE(&g_verbose),
OPT_BOOL('s', NULL, NULL , _("Log to syslog")),
OPT_BOOL('d', NULL, NULL , _("Daemonize")),
OPT_INTEGER('w', NULL, &concurrent_workers, _("Number of concurrent workers. Default is "STRINGIZE(DEFAULT_COUNT_OF_WORKERS))),
OPT_INTEGER('c', NULL, &cache_size_mib, _("Maximal cache size in MiB. Default is "STRINGIZE(DEFAULT_CACHE_MIB_SIZE))),
OPT_END()
};
unsigned opts = parse_opts(argc, argv, program_options, program_usage_string);
if (concurrent_workers <= 0)
error_msg_and_die("Invalid number of workers: %d", concurrent_workers);
if (cache_size_mib <= 0)
error_msg_and_die("Invalid cache size in MiB: %d", cache_size_mib);
if (cache_size_mib > UINT_MAX / (1024 * 1024 / FILENAME_MAX))
error_msg_and_die("Too big cache size. Maximum is : %u MiB", UINT_MAX / (1024 * 1024 / FILENAME_MAX));
struct process proc = {0};
proc.max_children = concurrent_workers;
/* By default it is about 1024 entries */
g_queue_init(&proc.queue.q);
proc.queue.capacity = cache_size_mib * (1024 * 1024 / FILENAME_MAX);
log_debug("Max queue size %u", proc.queue.capacity);
argv += optind;
if (argv[0])
{
proc.upload_directory = argv[0];
if (argv[1])
show_usage_and_die(program_usage_string, program_options);
}
/* Initialization */
log_info("Loading settings");
if (load_abrt_conf() != 0)
return 1;
if (!proc.upload_directory)
proc.upload_directory = g_settings_sWatchCrashdumpArchiveDir;
if (!proc.upload_directory)
error_msg_and_die("Neither UPLOAD_DIRECTORY nor WatchCrashdumpArchiveDir was specified");
if (opts & OPT_d)
daemonize();
msg_prefix = g_progname;
if ((opts & OPT_d) || (opts & OPT_s) || getenv("ABRT_SYSLOG"))
{
logmode = LOGMODE_JOURNAL;
}
log_info("Creating glib main loop");
proc.main_loop = g_main_loop_new(NULL, FALSE);
log_notice("Setting up a file monitor for '%s'", proc.upload_directory);
/* Never returns NULL; it will die if an error occurs */
struct abrt_inotify_watch *aiw = abrt_inotify_watch_init(proc.upload_directory,
IN_CLOSE_WRITE | IN_MOVED_TO,
handle_inotify_cb, &proc);
log_notice("Setting up a signal handler");
/* Set up signal pipe */
xpipe(g_signal_pipe);
close_on_exec_on(g_signal_pipe[0]);
close_on_exec_on(g_signal_pipe[1]);
ndelay_on(g_signal_pipe[0]);
ndelay_on(g_signal_pipe[1]);
signal(SIGUSR1, handle_signal);
signal(SIGTERM, handle_signal);
signal(SIGINT, handle_signal);
signal(SIGCHLD, handle_signal);
GIOChannel *channel_signal = abrt_gio_channel_unix_new(g_signal_pipe[0]);
guint channel_signal_source_id = g_io_add_watch(channel_signal,
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP | G_IO_NVAL,
handle_signal_pipe_cb,
&proc);
log_info("Starting glib main loop");
g_main_loop_run(proc.main_loop);
log_info("Glib main loop finished");
g_source_remove(channel_signal_source_id);
GError *error = NULL;
g_io_channel_shutdown(channel_signal, FALSE, &error);
if (error)
{
log_notice("Can't shutdown gio channel: '%s'", error ? error->message : "");
g_error_free(error);
}
g_io_channel_unref(channel_signal);
abrt_inotify_watch_destroy(aiw);
if (proc.main_loop)
g_main_loop_unref(proc.main_loop);
free_abrt_conf_data();
return 0;
}