Blame src/daemon/abrt-upload-watch.c

Packit 8ea169
/*
Packit 8ea169
    Copyright (C) 2013  ABRT Team
Packit 8ea169
    Copyright (C) 2013  Red Hat, Inc.
Packit 8ea169
Packit 8ea169
    This program is free software; you can redistribute it and/or modify
Packit 8ea169
    it under the terms of the GNU General Public License as published by
Packit 8ea169
    the Free Software Foundation; either version 2 of the License, or
Packit 8ea169
    (at your option) any later version.
Packit 8ea169
Packit 8ea169
    This program is distributed in the hope that it will be useful,
Packit 8ea169
    but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 8ea169
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit 8ea169
    GNU General Public License for more details.
Packit 8ea169
Packit 8ea169
    You should have received a copy of the GNU General Public License along
Packit 8ea169
    with this program; if not, write to the Free Software Foundation, Inc.,
Packit 8ea169
    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Packit 8ea169
 */
Packit 8ea169
#include "abrt-inotify.h"
Packit 8ea169
#include "abrt_glib.h"
Packit 8ea169
#include "libabrt.h"
Packit 8ea169
Packit 8ea169
#define STRINGIZE_DETAIL(str) #str
Packit 8ea169
#define STRINGIZE(str) STRINGIZE_DETAIL(str)
Packit 8ea169
Packit 8ea169
#define DEFAULT_COUNT_OF_WORKERS 10
Packit 8ea169
#define DEFAULT_CACHE_MIB_SIZE 4
Packit 8ea169
Packit 8ea169
static int g_signal_pipe[2];
Packit 8ea169
Packit 8ea169
struct queue
Packit 8ea169
{
Packit 8ea169
    unsigned capacity;
Packit 8ea169
    GQueue q;
Packit 8ea169
};
Packit 8ea169
Packit 8ea169
static int
Packit 8ea169
queue_push(struct queue *queue, char *value)
Packit 8ea169
{
Packit 8ea169
    if (g_queue_get_length(&queue->q) >= queue->capacity)
Packit 8ea169
        return 0;
Packit 8ea169
Packit 8ea169
    g_queue_push_head(&queue->q, value);
Packit 8ea169
Packit 8ea169
    return 1;
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static char *
Packit 8ea169
queue_pop(struct queue *queue)
Packit 8ea169
{
Packit 8ea169
    if (g_queue_is_empty(&queue->q))
Packit 8ea169
        return NULL;
Packit 8ea169
Packit 8ea169
    return (char *)g_queue_pop_tail(&queue->q);
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
struct process
Packit 8ea169
{
Packit 8ea169
    GMainLoop *main_loop;
Packit 8ea169
    const char *upload_directory;
Packit 8ea169
    unsigned children;
Packit 8ea169
    unsigned max_children;
Packit 8ea169
    struct queue queue;
Packit 8ea169
};
Packit 8ea169
Packit 8ea169
static void
Packit 8ea169
process_quit(struct process *proc)
Packit 8ea169
{
Packit 8ea169
    g_main_loop_quit(proc->main_loop);
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static void
Packit 8ea169
run_abrt_handle_upload(struct process *proc, const char *name)
Packit 8ea169
{
Packit 8ea169
    log_info("Processing file '%s' in directory '%s'", name, proc->upload_directory);
Packit 8ea169
Packit 8ea169
    ++proc->children;
Packit 8ea169
    log_debug("Running workers: %d", proc->children);
Packit 8ea169
Packit 8ea169
    fflush(NULL); /* paranoia */
Packit 8ea169
    pid_t pid = fork();
Packit 8ea169
    if (pid < 0)
Packit 8ea169
    {
Packit 8ea169
        --proc->children;
Packit 8ea169
        perror_msg("fork");
Packit 8ea169
        return;
Packit 8ea169
    }
Packit 8ea169
Packit 8ea169
    if (pid == 0)
Packit 8ea169
    {
Packit 8ea169
        /* child */
Packit 8ea169
        xchdir(proc->upload_directory);
Packit 8ea169
        if (g_settings_delete_uploaded)
Packit 8ea169
            execlp("abrt-handle-upload", "abrt-handle-upload", "-d",
Packit 8ea169
                           g_settings_dump_location, proc->upload_directory, name, (char*)NULL);
Packit 8ea169
        else
Packit 8ea169
            execlp("abrt-handle-upload", "abrt-handle-upload",
Packit 8ea169
                           g_settings_dump_location, proc->upload_directory, name, (char*)NULL);
Packit 8ea169
        perror_msg_and_die("Can't execute '%s'", "abrt-handle-upload");
Packit 8ea169
    }
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static void
Packit 8ea169
handle_new_path(struct process *proc, char *name)
Packit 8ea169
{
Packit 8ea169
    log_warning("Detected creation of file '%s' in upload directory '%s'", name, proc->upload_directory);
Packit 8ea169
Packit 8ea169
    if (proc->children < proc->max_children)
Packit 8ea169
    {
Packit 8ea169
        run_abrt_handle_upload(proc, name);
Packit 8ea169
        free(name);
Packit 8ea169
        return;
Packit 8ea169
    }
Packit 8ea169
Packit 8ea169
    log_debug("Pushing '%s' to deferred queue", name);
Packit 8ea169
    if (!queue_push(&proc->queue, name))
Packit 8ea169
    {
Packit 8ea169
        error_msg(_("No free workers and full buffer. Omitting archive '%s'"), name);
Packit 8ea169
        free(name);
Packit 8ea169
        return;
Packit 8ea169
    }
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static void
Packit 8ea169
print_stats(struct process *proc)
Packit 8ea169
{
Packit 8ea169
    /* this is meant only for debugging, so not marking it as translatable */
Packit 8ea169
    fprintf(stderr, "%i archives to process, %i active workers\n", g_queue_get_length(&proc->queue.q), proc->children);
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static void
Packit 8ea169
process_next_in_queue(struct process *proc)
Packit 8ea169
{
Packit 8ea169
    char *name = queue_pop(&proc->queue);
Packit 8ea169
    if (!name)
Packit 8ea169
    {
Packit 8ea169
        log_debug("Deferred queue is empty. Running workers: %d", proc->children);
Packit 8ea169
        return;
Packit 8ea169
    }
Packit 8ea169
Packit 8ea169
    run_abrt_handle_upload(proc, name);
Packit 8ea169
    free(name);
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static void
Packit 8ea169
handle_signal(int signo)
Packit 8ea169
{
Packit 8ea169
    int save_errno = errno;
Packit 8ea169
    uint8_t sig_caught = signo;
Packit 8ea169
    if (write(g_signal_pipe[1], &sig_caught, 1))
Packit 8ea169
        /* we ignore result, if () shuts up stupid compiler */;
Packit 8ea169
    errno = save_errno;
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static gboolean
Packit 8ea169
handle_signal_pipe_cb(GIOChannel *gio, GIOCondition condition, gpointer user_data)
Packit 8ea169
{
Packit 8ea169
    struct process *proc = (struct process *)user_data;
Packit 8ea169
    uint8_t signals[DEFAULT_COUNT_OF_WORKERS];
Packit 8ea169
    gsize len = 0;
Packit 8ea169
Packit 8ea169
    for (;;)
Packit 8ea169
    {
Packit 8ea169
        GError *error = NULL;
Packit 8ea169
        GIOStatus stat = g_io_channel_read_chars(gio, (void *)signals, sizeof(signals), &len, NULL);
Packit 8ea169
        if (stat == G_IO_STATUS_ERROR)
Packit 8ea169
        {
Packit 8ea169
            error_msg_and_die(_("Can't read from gio channel: '%s'"), error ? error->message : "");
Packit 8ea169
        }
Packit 8ea169
        if (stat == G_IO_STATUS_AGAIN)
Packit 8ea169
        {   /* We got all buffered data, but fd is still open. Done for now */
Packit 8ea169
            return TRUE; /* "glib, please don't remove this event (yet)" */
Packit 8ea169
        }
Packit 8ea169
        if (stat == G_IO_STATUS_EOF)
Packit 8ea169
            break;
Packit 8ea169
Packit 8ea169
        /* G_IO_STATUS_NORMAL */
Packit 8ea169
        for (unsigned signo = 0; signo < len; ++signo)
Packit 8ea169
        {
Packit 8ea169
            /* we did receive a signal */
Packit 8ea169
            log_debug("Got signal %d through signal pipe", signals[signo]);
Packit 8ea169
            if (signals[signo] == SIGUSR1)
Packit 8ea169
            {
Packit 8ea169
                print_stats(proc);
Packit 8ea169
            }
Packit 8ea169
            else if (signals[signo] != SIGCHLD)
Packit 8ea169
            {
Packit 8ea169
                process_quit(proc);
Packit 8ea169
                return FALSE; /* remove this event */
Packit 8ea169
            }
Packit 8ea169
            else
Packit 8ea169
            {
Packit 8ea169
                while (safe_waitpid(-1, NULL, WNOHANG) > 0)
Packit 8ea169
                {
Packit 8ea169
                    --proc->children;
Packit 8ea169
                    process_next_in_queue(proc);
Packit 8ea169
                    print_stats(proc);
Packit 8ea169
                }
Packit 8ea169
            }
Packit 8ea169
        }
Packit 8ea169
    }
Packit 8ea169
Packit 8ea169
    return TRUE; /* "please don't remove this event" */
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static void
Packit 8ea169
handle_inotify_cb(struct abrt_inotify_watch *watch, struct inotify_event *event, void *user_data)
Packit 8ea169
{
Packit 8ea169
    /* Was the (presumable newly created) file closed in upload dir,
Packit 8ea169
     * or a file moved to upload dir? */
Packit 8ea169
    if (!(event->mask & IN_ISDIR) && (event->mask & (IN_CLOSE_WRITE | IN_MOVED_TO)))
Packit 8ea169
    {
Packit 8ea169
        const char *ext = strrchr(event->name, '.');
Packit 8ea169
        if (ext && strcmp(ext + 1, "working") == 0)
Packit 8ea169
            return;
Packit 8ea169
Packit 8ea169
        handle_new_path((struct process *)user_data, xstrdup(event->name));
Packit 8ea169
    }
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
static void
Packit 8ea169
daemonize()
Packit 8ea169
{
Packit 8ea169
    /* forking to background */
Packit 8ea169
    fflush(NULL); /* paranoia */
Packit 8ea169
    pid_t pid = fork();
Packit 8ea169
    if (pid < 0)
Packit 8ea169
        perror_msg_and_die("fork");
Packit 8ea169
    if (pid > 0)
Packit 8ea169
        exit(0);
Packit 8ea169
Packit 8ea169
    /* Child (daemon) continues */
Packit 8ea169
    if (setsid() < 0)
Packit 8ea169
        perror_msg_and_die("setsid");
Packit 8ea169
Packit 8ea169
    /* Change the current working directory */
Packit 8ea169
    xchdir("/");
Packit 8ea169
Packit 8ea169
    /* Reopen the standard file descriptors to "/dev/null" */
Packit 8ea169
    xmove_fd(xopen("/dev/null", O_RDWR), STDIN_FILENO);
Packit 8ea169
    xdup2(STDIN_FILENO, STDOUT_FILENO);
Packit 8ea169
    xdup2(STDIN_FILENO, STDERR_FILENO);
Packit 8ea169
}
Packit 8ea169
Packit 8ea169
int
Packit 8ea169
main(int argc, char **argv)
Packit 8ea169
{
Packit 8ea169
    /* I18n */
Packit 8ea169
    setlocale(LC_ALL, "");
Packit 8ea169
#if ENABLE_NLS
Packit 8ea169
    bindtextdomain(PACKAGE, LOCALEDIR);
Packit 8ea169
    textdomain(PACKAGE);
Packit 8ea169
#endif
Packit 8ea169
Packit 8ea169
    abrt_init(argv);
Packit 8ea169
    /* Can't keep these strings/structs static: _() doesn't support that */
Packit 8ea169
    const char *program_usage_string = _(
Packit 8ea169
        "& [-vs] [-w NUM] [-c MiB] [UPLOAD_DIRECTORY]\n"
Packit 8ea169
        "\n"
Packit 8ea169
        "\nWatches UPLOAD_DIRECTORY and unpacks incoming archives into DumpLocation"
Packit 8ea169
        "\nspecified in abrt.conf"
Packit 8ea169
        "\n"
Packit 8ea169
        "\nIf UPLOAD_DIRECTORY is not provided, uses a value of"
Packit 8ea169
        "\nWatchCrashdumpArchiveDir option from abrt.conf"
Packit 8ea169
    );
Packit 8ea169
    enum {
Packit 8ea169
        OPT_v = 1 << 0,
Packit 8ea169
        OPT_s = 1 << 1,
Packit 8ea169
        OPT_d = 1 << 2,
Packit 8ea169
        OPT_w = 1 << 3,
Packit 8ea169
        OPT_c = 1 << 4,
Packit 8ea169
    };
Packit 8ea169
Packit 8ea169
    int concurrent_workers = DEFAULT_COUNT_OF_WORKERS;
Packit 8ea169
    int cache_size_mib = DEFAULT_CACHE_MIB_SIZE;
Packit 8ea169
Packit 8ea169
    /* Keep enum above and order of options below in sync! */
Packit 8ea169
    struct options program_options[] = {
Packit 8ea169
        OPT__VERBOSE(&g_verbose),
Packit 8ea169
        OPT_BOOL('s', NULL, NULL              , _("Log to syslog")),
Packit 8ea169
        OPT_BOOL('d', NULL, NULL              , _("Daemonize")),
Packit 8ea169
        OPT_INTEGER('w', NULL, &concurrent_workers, _("Number of concurrent workers. Default is "STRINGIZE(DEFAULT_COUNT_OF_WORKERS))),
Packit 8ea169
        OPT_INTEGER('c', NULL, &cache_size_mib, _("Maximal cache size in MiB. Default is "STRINGIZE(DEFAULT_CACHE_MIB_SIZE))),
Packit 8ea169
        OPT_END()
Packit 8ea169
    };
Packit 8ea169
    unsigned opts = parse_opts(argc, argv, program_options, program_usage_string);
Packit 8ea169
Packit 8ea169
    if (concurrent_workers <= 0)
Packit 8ea169
        error_msg_and_die("Invalid number of workers: %d", concurrent_workers);
Packit 8ea169
Packit 8ea169
    if (cache_size_mib <= 0)
Packit 8ea169
        error_msg_and_die("Invalid cache size in MiB: %d", cache_size_mib);
Packit 8ea169
Packit 8ea169
    if (cache_size_mib > UINT_MAX / (1024 * 1024 / FILENAME_MAX))
Packit 8ea169
        error_msg_and_die("Too big cache size. Maximum is : %u MiB", UINT_MAX / (1024 * 1024 / FILENAME_MAX));
Packit 8ea169
Packit 8ea169
    struct process proc = {0};
Packit 8ea169
    proc.max_children = concurrent_workers;
Packit 8ea169
    /* By default it is about 1024 entries */
Packit 8ea169
    g_queue_init(&proc.queue.q);
Packit 8ea169
    proc.queue.capacity = cache_size_mib * (1024 * 1024 / FILENAME_MAX);
Packit 8ea169
    log_debug("Max queue size %u", proc.queue.capacity);
Packit 8ea169
Packit 8ea169
    argv += optind;
Packit 8ea169
    if (argv[0])
Packit 8ea169
    {
Packit 8ea169
        proc.upload_directory = argv[0];
Packit 8ea169
Packit 8ea169
        if (argv[1])
Packit 8ea169
            show_usage_and_die(program_usage_string, program_options);
Packit 8ea169
    }
Packit 8ea169
Packit 8ea169
    /* Initialization */
Packit 8ea169
    log_info("Loading settings");
Packit 8ea169
    if (load_abrt_conf() != 0)
Packit 8ea169
        return 1;
Packit 8ea169
Packit 8ea169
    if (!proc.upload_directory)
Packit 8ea169
        proc.upload_directory = g_settings_sWatchCrashdumpArchiveDir;
Packit 8ea169
Packit 8ea169
    if (!proc.upload_directory)
Packit 8ea169
        error_msg_and_die("Neither UPLOAD_DIRECTORY nor WatchCrashdumpArchiveDir was specified");
Packit 8ea169
Packit 8ea169
    if (opts & OPT_d)
Packit 8ea169
        daemonize();
Packit 8ea169
Packit 8ea169
    msg_prefix = g_progname;
Packit 8ea169
    if ((opts & OPT_d) || (opts & OPT_s) || getenv("ABRT_SYSLOG"))
Packit 8ea169
    {
Packit 8ea169
        logmode = LOGMODE_JOURNAL;
Packit 8ea169
    }
Packit 8ea169
Packit 8ea169
    log_info("Creating glib main loop");
Packit 8ea169
    proc.main_loop = g_main_loop_new(NULL, FALSE);
Packit 8ea169
Packit 8ea169
    log_notice("Setting up a file monitor for '%s'", proc.upload_directory);
Packit 8ea169
    /* Never returns NULL; it will die if an error occurs */
Packit 8ea169
    struct abrt_inotify_watch *aiw = abrt_inotify_watch_init(proc.upload_directory,
Packit 8ea169
            IN_CLOSE_WRITE | IN_MOVED_TO,
Packit 8ea169
            handle_inotify_cb, &proc;;
Packit 8ea169
Packit 8ea169
    log_notice("Setting up a signal handler");
Packit 8ea169
    /* Set up signal pipe */
Packit 8ea169
    xpipe(g_signal_pipe);
Packit 8ea169
    close_on_exec_on(g_signal_pipe[0]);
Packit 8ea169
    close_on_exec_on(g_signal_pipe[1]);
Packit 8ea169
    ndelay_on(g_signal_pipe[0]);
Packit 8ea169
    ndelay_on(g_signal_pipe[1]);
Packit 8ea169
    signal(SIGUSR1, handle_signal);
Packit 8ea169
    signal(SIGTERM, handle_signal);
Packit 8ea169
    signal(SIGINT, handle_signal);
Packit 8ea169
    signal(SIGCHLD, handle_signal);
Packit 8ea169
    GIOChannel *channel_signal = abrt_gio_channel_unix_new(g_signal_pipe[0]);
Packit 8ea169
    guint channel_signal_source_id = g_io_add_watch(channel_signal,
Packit 8ea169
                G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP | G_IO_NVAL,
Packit 8ea169
                handle_signal_pipe_cb,
Packit 8ea169
                &proc;;
Packit 8ea169
Packit 8ea169
    log_info("Starting glib main loop");
Packit 8ea169
Packit 8ea169
    g_main_loop_run(proc.main_loop);
Packit 8ea169
Packit 8ea169
    log_info("Glib main loop finished");
Packit 8ea169
Packit 8ea169
    g_source_remove(channel_signal_source_id);
Packit 8ea169
Packit 8ea169
    GError *error = NULL;
Packit 8ea169
    g_io_channel_shutdown(channel_signal, FALSE, &error);
Packit 8ea169
    if (error)
Packit 8ea169
    {
Packit 8ea169
        log_notice("Can't shutdown gio channel: '%s'", error ? error->message : "");
Packit 8ea169
        g_error_free(error);
Packit 8ea169
    }
Packit 8ea169
Packit 8ea169
    g_io_channel_unref(channel_signal);
Packit 8ea169
Packit 8ea169
    abrt_inotify_watch_destroy(aiw);
Packit 8ea169
Packit 8ea169
    if (proc.main_loop)
Packit 8ea169
        g_main_loop_unref(proc.main_loop);
Packit 8ea169
Packit 8ea169
    free_abrt_conf_data();
Packit 8ea169
Packit 8ea169
    return 0;
Packit 8ea169
}