Blob Blame History Raw
/*
   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/

#include <errno.h>
#include <dirent.h>
#include <stddef.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/resource.h>

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <string.h>

#include <glusterfs/globals.h>
#include <glusterfs/glusterfs.h>
#include <glusterfs/logging.h>
#include <glusterfs/defaults.h>
#include <glusterfs/syncop.h>

#include "gf-changelog-rpc.h"
#include "gf-changelog-helpers.h"

/* from the changelog translator */
#include "changelog-misc.h"
#include "changelog-mem-types.h"
#include "changelog-lib-messages.h"

/**
 * Global singleton xlator pointer for the library, initialized
 * during library load. This should probably be hidden inside
 * an initialized object which is an handle for the consumer.
 *
 * TODO: do away with the global..
 */
xlator_t *master = NULL;

static inline gf_private_t *
gf_changelog_alloc_priv()
{
    int ret = 0;
    gf_private_t *priv = NULL;

    priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t);
    if (!priv)
        goto error_return;
    INIT_LIST_HEAD(&priv->connections);
    INIT_LIST_HEAD(&priv->cleanups);

    ret = pthread_mutex_init(&priv->lock, NULL);
    if (ret != 0)
        goto free_priv;
    ret = pthread_cond_init(&priv->cond, NULL);
    if (ret != 0)
        goto cleanup_mutex;

    priv->api = NULL;
    return priv;

cleanup_mutex:
    (void)pthread_mutex_destroy(&priv->lock);
free_priv:
    GF_FREE(priv);
error_return:
    return NULL;
}

#define GF_CHANGELOG_EVENT_POOL_SIZE 16384
#define GF_CHANGELOG_EVENT_THREAD_COUNT 4

static int
gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx)
{
    cmd_args_t *cmd_args = NULL;
    struct rlimit lim = {
        0,
    };
    call_pool_t *pool = NULL;
    int ret = -1;

    ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end);
    if (ret != 0)
        return -1;

    ctx->process_uuid = generate_glusterfs_ctx_id();
    if (!ctx->process_uuid)
        return -1;

    ctx->page_size = 128 * GF_UNIT_KB;

    ctx->iobuf_pool = iobuf_pool_new();
    if (!ctx->iobuf_pool)
        goto free_pool;

    ctx->event_pool = event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE,
                                     GF_CHANGELOG_EVENT_THREAD_COUNT);
    if (!ctx->event_pool)
        goto free_pool;

    pool = GF_CALLOC(1, sizeof(call_pool_t),
                     gf_changelog_mt_libgfchangelog_call_pool_t);
    if (!pool)
        goto free_pool;

    /* frame_mem_pool size 112 * 64 */
    pool->frame_mem_pool = mem_pool_new(call_frame_t, 32);
    if (!pool->frame_mem_pool)
        goto free_pool;

    /* stack_mem_pool size 256 * 128 */
    pool->stack_mem_pool = mem_pool_new(call_stack_t, 16);

    if (!pool->stack_mem_pool)
        goto free_pool;

    ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16);
    if (!ctx->stub_mem_pool)
        goto free_pool;

    ctx->dict_pool = mem_pool_new(dict_t, 32);
    if (!ctx->dict_pool)
        goto free_pool;

    ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512);
    if (!ctx->dict_pair_pool)
        goto free_pool;

    ctx->dict_data_pool = mem_pool_new(data_t, 512);
    if (!ctx->dict_data_pool)
        goto free_pool;

    ctx->logbuf_pool = mem_pool_new(log_buf_t, 256);
    if (!ctx->logbuf_pool)
        goto free_pool;

    INIT_LIST_HEAD(&pool->all_frames);
    LOCK_INIT(&pool->lock);
    ctx->pool = pool;

    LOCK_INIT(&ctx->lock);

    cmd_args = &ctx->cmd_args;

    INIT_LIST_HEAD(&cmd_args->xlator_options);

    lim.rlim_cur = RLIM_INFINITY;
    lim.rlim_max = RLIM_INFINITY;
    setrlimit(RLIMIT_CORE, &lim);

    return 0;

free_pool:
    if (pool) {
        GF_FREE(pool->frame_mem_pool);

        GF_FREE(pool->stack_mem_pool);

        GF_FREE(pool);
    }

    GF_FREE(ctx->stub_mem_pool);

    GF_FREE(ctx->dict_pool);

    GF_FREE(ctx->dict_pair_pool);

    GF_FREE(ctx->dict_data_pool);

    GF_FREE(ctx->logbuf_pool);

    GF_FREE(ctx->iobuf_pool);

    GF_FREE(ctx->event_pool);

    return -1;
}

/* TODO: cleanup ctx defaults */
void
gf_changelog_cleanup_this(xlator_t *this)
{
    glusterfs_ctx_t *ctx = NULL;

    if (!this)
        return;

    ctx = this->ctx;
    syncenv_destroy(ctx->env);
    free(ctx);

    this->private = NULL;
    this->ctx = NULL;

    mem_pools_fini();
}

static int
gf_changelog_init_context()
{
    glusterfs_ctx_t *ctx = NULL;

    ctx = glusterfs_ctx_new();
    if (!ctx)
        goto error_return;

    if (glusterfs_globals_init(ctx))
        goto free_ctx;

    THIS->ctx = ctx;
    if (gf_changelog_ctx_defaults_init(ctx))
        goto free_ctx;

    ctx->env = syncenv_new(0, 0, 0);
    if (!ctx->env)
        goto free_ctx;
    return 0;

free_ctx:
    free(ctx);
    THIS->ctx = NULL;
error_return:
    return -1;
}

static int
gf_changelog_init_master()
{
    int ret = 0;

    ret = gf_changelog_init_context();
    mem_pools_init();

    return ret;
}

/* TODO: cleanup clnt/svc on failure */
int
gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc)
{
    int ret = 0;
    rpcsvc_t *svc = NULL;
    struct rpc_clnt *rpc = NULL;

    /**
     * Initialize a connect back socket. A probe() RPC call to the server
     * triggers a reverse connect.
     */
    svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick,
                                               RPC_SOCK(entry), entry);
    if (!svc)
        goto error_return;
    RPC_REBORP(entry) = svc;

    /* Initialize an RPC client */
    rpc = gf_changelog_rpc_init(this, entry);
    if (!rpc)
        goto error_return;
    RPC_PROBER(entry) = rpc;

    /**
     * @FIXME
     * till we have connection state machine, let's delay the RPC call
     * for now..
     */
    sleep(2);

    /**
     * Probe changelog translator for reverse connection. After a successful
     * call, there's less use of the client and can be disconnected, but
     * let's leave the connection active for any future RPC calls.
     */
    ret = gf_changelog_invoke_rpc(this, entry, proc);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED,
               "Could not initiate probe RPC, bailing out!!!");
        goto error_return;
    }

    return 0;

error_return:
    return -1;
}

int
gf_cleanup_event(xlator_t *this, struct gf_event_list *ev)
{
    int ret = 0;

    ret = gf_thread_cleanup(this, ev->invoker);
    if (ret) {
        gf_msg(this->name, GF_LOG_WARNING, -ret,
               CHANGELOG_LIB_MSG_CLEANUP_ERROR,
               "cannot cleanup callback invoker thread."
               " Not freeing resources");
        return -1;
    }

    ev->entry = NULL;

    return 0;
}

static int
gf_init_event(gf_changelog_t *entry)
{
    int ret = 0;
    struct gf_event_list *ev = NULL;

    ev = &entry->event;
    ev->entry = entry;

    ret = pthread_mutex_init(&ev->lock, NULL);
    if (ret != 0)
        goto error_return;
    ret = pthread_cond_init(&ev->cond, NULL);
    if (ret != 0)
        goto cleanup_mutex;
    INIT_LIST_HEAD(&ev->events);

    ev->next_seq = 0; /* bootstrap sequencing */

    if (GF_NEED_ORDERED_EVENTS(entry)) {
        entry->pickevent = pick_event_ordered;
        entry->queueevent = queue_ordered_event;
    } else {
        entry->pickevent = pick_event_unordered;
        entry->queueevent = queue_unordered_event;
    }

    ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker,
                           ev, "clogcbki");
    if (ret != 0) {
        entry->pickevent = NULL;
        entry->queueevent = NULL;
        goto cleanup_cond;
    }

    return 0;

cleanup_cond:
    (void)pthread_cond_destroy(&ev->cond);
cleanup_mutex:
    (void)pthread_mutex_destroy(&ev->lock);
error_return:
    return -1;
}

/**
 * TODO:
 *  - cleanup invoker thread
 *  - cleanup event list
 *  - destroy rpc{-clnt, svc}
 */
int
gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry)
{
    return 0;
}

int
gf_cleanup_connections(xlator_t *this)
{
    return 0;
}

static int
gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick,
                          gf_boolean_t ordered, void *xl)
{
    int ret = 0;
    gf_private_t *priv = NULL;
    gf_changelog_t *entry = NULL;

    priv = this->private;

    if (!brick->callback || !brick->init || !brick->fini)
        goto error_return;

    entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t);
    if (!entry)
        goto error_return;
    INIT_LIST_HEAD(&entry->list);

    LOCK_INIT(&entry->statelock);
    entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING;

    entry->notify = brick->filter;
    if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX)
        goto free_entry;

    entry->this = this;
    entry->invokerxl = xl;

    entry->ordered = ordered;
    ret = gf_init_event(entry);
    if (ret)
        goto free_entry;

    entry->fini = brick->fini;
    entry->callback = brick->callback;
    entry->connected = brick->connected;
    entry->disconnected = brick->disconnected;

    entry->ptr = brick->init(this, brick);
    if (!entry->ptr)
        goto cleanup_event;
    priv->api = entry->ptr; /* pointer to API, if required */

    pthread_mutex_lock(&priv->lock);
    {
        list_add_tail(&entry->list, &priv->connections);
    }
    pthread_mutex_unlock(&priv->lock);

    ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER);
    if (ret)
        goto cleanup_event;
    return 0;

cleanup_event:
    (void)gf_cleanup_event(this, &entry->event);
free_entry:
    gf_msg_debug(this->name, 0, "freeing entry %p", entry);
    list_del(&entry->list); /* FIXME: kludge for now */
    GF_FREE(entry);
error_return:
    return -1;
}

int
gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick,
                            gf_boolean_t ordered, void *xl)
{
    return gf_setup_brick_connection(this, brick, ordered, xl);
}

static int
gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel)
{
    /* passing ident as NULL means to use default ident for syslog */
    if (gf_log_init(this->ctx, logfile, NULL))
        return -1;

    gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel);
    return 0;
}

static int
gf_changelog_set_master(xlator_t *master, void *xl)
{
    int32_t ret = 0;
    xlator_t *this = NULL;
    xlator_t *old_this = NULL;
    gf_private_t *priv = NULL;

    this = xl;
    if (!this || !this->ctx) {
        ret = gf_changelog_init_master();
        if (ret)
            return -1;
        this = THIS;
    }

    master->ctx = this->ctx;

    INIT_LIST_HEAD(&master->volume_options);
    SAVE_THIS(THIS);

    ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end);
    if (ret != 0)
        goto restore_this;

    priv = gf_changelog_alloc_priv();
    if (!priv) {
        ret = -1;
        goto restore_this;
    }

    if (!xl) {
        /* poller thread */
        ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS,
                               "clogpoll");
        if (ret != 0) {
            GF_FREE(priv);
            gf_msg(master->name, GF_LOG_ERROR, 0,
                   CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED,
                   "failed to spawn poller thread");
            goto restore_this;
        }
    }

    master->private = priv;

restore_this:
    RESTORE_THIS();

    return ret;
}

int
gf_changelog_init(void *xl)
{
    int ret = 0;
    gf_private_t *priv = NULL;

    if (master)
        return 0;

    master = calloc(1, sizeof(*master));
    if (!master)
        goto error_return;

    master->name = strdup("gfchangelog");
    if (!master->name)
        goto dealloc_master;

    ret = gf_changelog_set_master(master, xl);
    if (ret)
        goto dealloc_name;

    priv = master->private;
    ret = gf_thread_create(&priv->connectionjanitor, NULL,
                           gf_changelog_connection_janitor, master, "clogjan");
    if (ret != 0) {
        /* TODO: cleanup priv, mutex (poller thread for !xl) */
        goto dealloc_name;
    }

    return 0;

dealloc_name:
    free(master->name);
dealloc_master:
    free(master);
    master = NULL;
error_return:
    return -1;
}

int
gf_changelog_register_generic(struct gf_brick_spec *bricks, int count,
                              int ordered, char *logfile, int lvl, void *xl)
{
    int ret = 0;
    xlator_t *this = NULL;
    xlator_t *old_this = NULL;
    struct gf_brick_spec *brick = NULL;
    gf_boolean_t need_order = _gf_false;

    SAVE_THIS(xl);

    this = THIS;
    if (!this)
        goto error_return;

    ret = gf_changelog_setup_logging(this, logfile, lvl);
    if (ret)
        goto error_return;

    need_order = (ordered) ? _gf_true : _gf_false;

    brick = bricks;
    while (count--) {
        gf_smsg(this->name, GF_LOG_INFO, 0,
                CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "Registering brick",
                "brick=%s", brick->brick_path, "notify_filter=%d",
                brick->filter, NULL);

        ret = gf_changelog_register_brick(this, brick, need_order, xl);
        if (ret != 0) {
            gf_msg(this->name, GF_LOG_ERROR, 0,
                   CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED,
                   "Error registering with changelog xlator");
            break;
        }

        brick++;
    }

    if (ret != 0)
        goto cleanup_inited_bricks;

    RESTORE_THIS();
    return 0;

cleanup_inited_bricks:
    gf_cleanup_connections(this);
error_return:
    RESTORE_THIS();
    return -1;
}

/**
 * @API
 *  gf_changelog_register()
 *
 * This is _NOT_ a generic register API. It's a special API to handle
 * updates at a journal granulality. This is used by consumers wanting
 * to process persistent journal such as geo-replication via a set of
 * APIs. All of this is required to maintain backward compatibility.
 * Owner specific private data is stored in ->api (in gf_private_t),
 * which is used by APIs to access it's private data. This limits
 * the library access to a single brick, but that's how it used to
 * be anyway. Furthermore, this API solely _owns_ "this", therefore
 * callers already having a notion of "this" are expected to use the
 * newer API.
 *
 * Newer applications wanting to use this library need not face this
 * limitation and reply of the much more feature rich generic register
 * API, which is purely callback based.
 *
 * NOTE: @max_reconnects is not used but required for backward compat.
 *
 * For generic API, refer gf_changelog_register_generic().
 */
int
gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file,
                      int log_level, int max_reconnects)
{
    struct gf_brick_spec brick = {
        0,
    };

    if (master)
        THIS = master;
    else
        return -1;

    brick.brick_path = brick_path;
    brick.filter = CHANGELOG_OP_TYPE_JOURNAL;

    brick.init = gf_changelog_journal_init;
    brick.fini = gf_changelog_journal_fini;
    brick.callback = gf_changelog_handle_journal;
    brick.connected = gf_changelog_journal_connect;
    brick.disconnected = gf_changelog_journal_disconnect;

    brick.ptr = scratch_dir;

    return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level,
                                         NULL);
}