/* Copyright (C) 2013 ABRT Team Copyright (C) 2013 Red Hat, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "abrt-inotify.h" #include "abrt_glib.h" #include "libabrt.h" #define STRINGIZE_DETAIL(str) #str #define STRINGIZE(str) STRINGIZE_DETAIL(str) #define DEFAULT_COUNT_OF_WORKERS 10 #define DEFAULT_CACHE_MIB_SIZE 4 static int g_signal_pipe[2]; struct queue { unsigned capacity; GQueue q; }; static int queue_push(struct queue *queue, char *value) { if (g_queue_get_length(&queue->q) >= queue->capacity) return 0; g_queue_push_head(&queue->q, value); return 1; } static char * queue_pop(struct queue *queue) { if (g_queue_is_empty(&queue->q)) return NULL; return (char *)g_queue_pop_tail(&queue->q); } struct process { GMainLoop *main_loop; const char *upload_directory; unsigned children; unsigned max_children; struct queue queue; }; static void process_quit(struct process *proc) { g_main_loop_quit(proc->main_loop); } static void run_abrt_handle_upload(struct process *proc, const char *name) { log_info("Processing file '%s' in directory '%s'", name, proc->upload_directory); ++proc->children; log_debug("Running workers: %d", proc->children); fflush(NULL); /* paranoia */ pid_t pid = fork(); if (pid < 0) { --proc->children; perror_msg("fork"); return; } if (pid == 0) { /* child */ xchdir(proc->upload_directory); if (g_settings_delete_uploaded) execlp("abrt-handle-upload", "abrt-handle-upload", "-d", g_settings_dump_location, proc->upload_directory, name, (char*)NULL); else execlp("abrt-handle-upload", "abrt-handle-upload", g_settings_dump_location, proc->upload_directory, name, (char*)NULL); perror_msg_and_die("Can't execute '%s'", "abrt-handle-upload"); } } static void handle_new_path(struct process *proc, char *name) { log_warning("Detected creation of file '%s' in upload directory '%s'", name, proc->upload_directory); if (proc->children < proc->max_children) { run_abrt_handle_upload(proc, name); free(name); return; } log_debug("Pushing '%s' to deferred queue", name); if (!queue_push(&proc->queue, name)) { error_msg(_("No free workers and full buffer. Omitting archive '%s'"), name); free(name); return; } } static void print_stats(struct process *proc) { /* this is meant only for debugging, so not marking it as translatable */ fprintf(stderr, "%i archives to process, %i active workers\n", g_queue_get_length(&proc->queue.q), proc->children); } static void process_next_in_queue(struct process *proc) { char *name = queue_pop(&proc->queue); if (!name) { log_debug("Deferred queue is empty. Running workers: %d", proc->children); return; } run_abrt_handle_upload(proc, name); free(name); } static void handle_signal(int signo) { int save_errno = errno; uint8_t sig_caught = signo; if (write(g_signal_pipe[1], &sig_caught, 1)) /* we ignore result, if () shuts up stupid compiler */; errno = save_errno; } static gboolean handle_signal_pipe_cb(GIOChannel *gio, GIOCondition condition, gpointer user_data) { struct process *proc = (struct process *)user_data; uint8_t signals[DEFAULT_COUNT_OF_WORKERS]; gsize len = 0; for (;;) { GError *error = NULL; GIOStatus stat = g_io_channel_read_chars(gio, (void *)signals, sizeof(signals), &len, NULL); if (stat == G_IO_STATUS_ERROR) { error_msg_and_die(_("Can't read from gio channel: '%s'"), error ? error->message : ""); } if (stat == G_IO_STATUS_AGAIN) { /* We got all buffered data, but fd is still open. Done for now */ return TRUE; /* "glib, please don't remove this event (yet)" */ } if (stat == G_IO_STATUS_EOF) break; /* G_IO_STATUS_NORMAL */ for (unsigned signo = 0; signo < len; ++signo) { /* we did receive a signal */ log_debug("Got signal %d through signal pipe", signals[signo]); if (signals[signo] == SIGUSR1) { print_stats(proc); } else if (signals[signo] != SIGCHLD) { process_quit(proc); return FALSE; /* remove this event */ } else { while (safe_waitpid(-1, NULL, WNOHANG) > 0) { --proc->children; process_next_in_queue(proc); print_stats(proc); } } } } return TRUE; /* "please don't remove this event" */ } static void handle_inotify_cb(struct abrt_inotify_watch *watch, struct inotify_event *event, void *user_data) { /* Was the (presumable newly created) file closed in upload dir, * or a file moved to upload dir? */ if (!(event->mask & IN_ISDIR) && (event->mask & (IN_CLOSE_WRITE | IN_MOVED_TO))) { const char *ext = strrchr(event->name, '.'); if (ext && strcmp(ext + 1, "working") == 0) return; handle_new_path((struct process *)user_data, xstrdup(event->name)); } } static void daemonize() { /* forking to background */ fflush(NULL); /* paranoia */ pid_t pid = fork(); if (pid < 0) perror_msg_and_die("fork"); if (pid > 0) exit(0); /* Child (daemon) continues */ if (setsid() < 0) perror_msg_and_die("setsid"); /* Change the current working directory */ xchdir("/"); /* Reopen the standard file descriptors to "/dev/null" */ xmove_fd(xopen("/dev/null", O_RDWR), STDIN_FILENO); xdup2(STDIN_FILENO, STDOUT_FILENO); xdup2(STDIN_FILENO, STDERR_FILENO); } int main(int argc, char **argv) { /* I18n */ setlocale(LC_ALL, ""); #if ENABLE_NLS bindtextdomain(PACKAGE, LOCALEDIR); textdomain(PACKAGE); #endif abrt_init(argv); /* Can't keep these strings/structs static: _() doesn't support that */ const char *program_usage_string = _( "& [-vs] [-w NUM] [-c MiB] [UPLOAD_DIRECTORY]\n" "\n" "\nWatches UPLOAD_DIRECTORY and unpacks incoming archives into DumpLocation" "\nspecified in abrt.conf" "\n" "\nIf UPLOAD_DIRECTORY is not provided, uses a value of" "\nWatchCrashdumpArchiveDir option from abrt.conf" ); enum { OPT_v = 1 << 0, OPT_s = 1 << 1, OPT_d = 1 << 2, OPT_w = 1 << 3, OPT_c = 1 << 4, }; int concurrent_workers = DEFAULT_COUNT_OF_WORKERS; int cache_size_mib = DEFAULT_CACHE_MIB_SIZE; /* Keep enum above and order of options below in sync! */ struct options program_options[] = { OPT__VERBOSE(&g_verbose), OPT_BOOL('s', NULL, NULL , _("Log to syslog")), OPT_BOOL('d', NULL, NULL , _("Daemonize")), OPT_INTEGER('w', NULL, &concurrent_workers, _("Number of concurrent workers. Default is "STRINGIZE(DEFAULT_COUNT_OF_WORKERS))), OPT_INTEGER('c', NULL, &cache_size_mib, _("Maximal cache size in MiB. Default is "STRINGIZE(DEFAULT_CACHE_MIB_SIZE))), OPT_END() }; unsigned opts = parse_opts(argc, argv, program_options, program_usage_string); if (concurrent_workers <= 0) error_msg_and_die("Invalid number of workers: %d", concurrent_workers); if (cache_size_mib <= 0) error_msg_and_die("Invalid cache size in MiB: %d", cache_size_mib); if (cache_size_mib > UINT_MAX / (1024 * 1024 / FILENAME_MAX)) error_msg_and_die("Too big cache size. Maximum is : %u MiB", UINT_MAX / (1024 * 1024 / FILENAME_MAX)); struct process proc = {0}; proc.max_children = concurrent_workers; /* By default it is about 1024 entries */ g_queue_init(&proc.queue.q); proc.queue.capacity = cache_size_mib * (1024 * 1024 / FILENAME_MAX); log_debug("Max queue size %u", proc.queue.capacity); argv += optind; if (argv[0]) { proc.upload_directory = argv[0]; if (argv[1]) show_usage_and_die(program_usage_string, program_options); } /* Initialization */ log_info("Loading settings"); if (load_abrt_conf() != 0) return 1; if (!proc.upload_directory) proc.upload_directory = g_settings_sWatchCrashdumpArchiveDir; if (!proc.upload_directory) error_msg_and_die("Neither UPLOAD_DIRECTORY nor WatchCrashdumpArchiveDir was specified"); if (opts & OPT_d) daemonize(); msg_prefix = g_progname; if ((opts & OPT_d) || (opts & OPT_s) || getenv("ABRT_SYSLOG")) { logmode = LOGMODE_JOURNAL; } log_info("Creating glib main loop"); proc.main_loop = g_main_loop_new(NULL, FALSE); log_notice("Setting up a file monitor for '%s'", proc.upload_directory); /* Never returns NULL; it will die if an error occurs */ struct abrt_inotify_watch *aiw = abrt_inotify_watch_init(proc.upload_directory, IN_CLOSE_WRITE | IN_MOVED_TO, handle_inotify_cb, &proc); log_notice("Setting up a signal handler"); /* Set up signal pipe */ xpipe(g_signal_pipe); close_on_exec_on(g_signal_pipe[0]); close_on_exec_on(g_signal_pipe[1]); ndelay_on(g_signal_pipe[0]); ndelay_on(g_signal_pipe[1]); signal(SIGUSR1, handle_signal); signal(SIGTERM, handle_signal); signal(SIGINT, handle_signal); signal(SIGCHLD, handle_signal); GIOChannel *channel_signal = abrt_gio_channel_unix_new(g_signal_pipe[0]); guint channel_signal_source_id = g_io_add_watch(channel_signal, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP | G_IO_NVAL, handle_signal_pipe_cb, &proc); log_info("Starting glib main loop"); g_main_loop_run(proc.main_loop); log_info("Glib main loop finished"); g_source_remove(channel_signal_source_id); GError *error = NULL; g_io_channel_shutdown(channel_signal, FALSE, &error); if (error) { log_notice("Can't shutdown gio channel: '%s'", error ? error->message : ""); g_error_free(error); } g_io_channel_unref(channel_signal); abrt_inotify_watch_destroy(aiw); if (proc.main_loop) g_main_loop_unref(proc.main_loop); free_abrt_conf_data(); return 0; }