Blob Blame History Raw
/*
   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/

#include <unistd.h>
#include <fcntl.h>
#include <limits.h>

#include <glusterfs/glusterfs.h>
#include <glusterfs/compat.h>
#include <glusterfs/xlator.h>
#include <glusterfs/logging.h>
#include <glusterfs/common-utils.h>

#include <glusterfs/statedump.h>
#include <glusterfs/syncop.h>

#include "upcall.h"
#include "upcall-mem-types.h"
#include "glusterfs3-xdr.h"
#include "protocol-common.h"
#include <glusterfs/defaults.h>

/*
 * Check if any of the upcall options are enabled:
 *     - cache_invalidation
 */
gf_boolean_t
is_upcall_enabled(xlator_t *this)
{
    upcall_private_t *priv = NULL;
    gf_boolean_t is_enabled = _gf_false;

    if (this->private) {
        priv = (upcall_private_t *)this->private;

        if (priv->cache_invalidation_enabled) {
            is_enabled = _gf_true;
        }
    }

    return is_enabled;
}

/*
 * Get the cache_invalidation_timeout
 */
int32_t
get_cache_invalidation_timeout(xlator_t *this)
{
    upcall_private_t *priv = NULL;
    int32_t timeout = 0;

    if (this->private) {
        priv = (upcall_private_t *)this->private;
        timeout = priv->cache_invalidation_timeout;
    }

    return timeout;
}

/*
 * Allocate and add a new client entry to the given upcall entry
 */
upcall_client_t *
add_upcall_client(call_frame_t *frame, client_t *client,
                  upcall_inode_ctx_t *up_inode_ctx)
{
    upcall_client_t *up_client_entry = NULL;

    pthread_mutex_lock(&up_inode_ctx->client_list_lock);
    {
        up_client_entry = __add_upcall_client(frame, client, up_inode_ctx);
    }
    pthread_mutex_unlock(&up_inode_ctx->client_list_lock);

    return up_client_entry;
}

upcall_client_t *
__add_upcall_client(call_frame_t *frame, client_t *client,
                    upcall_inode_ctx_t *up_inode_ctx)
{
    upcall_client_t *up_client_entry = NULL;

    up_client_entry = GF_CALLOC(1, sizeof(*up_client_entry),
                                gf_upcall_mt_upcall_client_entry_t);
    if (!up_client_entry) {
        gf_msg("upcall", GF_LOG_WARNING, 0, UPCALL_MSG_NO_MEMORY,
               "Memory allocation failed");
        return NULL;
    }
    INIT_LIST_HEAD(&up_client_entry->client_list);
    up_client_entry->client_uid = gf_strdup(client->client_uid);
    up_client_entry->access_time = time(NULL);
    up_client_entry->expire_time_attr = get_cache_invalidation_timeout(
        frame->this);

    list_add_tail(&up_client_entry->client_list, &up_inode_ctx->client_list);

    gf_log(THIS->name, GF_LOG_DEBUG, "upcall_entry_t client added - %s",
           up_client_entry->client_uid);

    return up_client_entry;
}

/*
 * Given client->uid, retrieve the corresponding upcall client entry.
 * If none found, create a new entry.
 */
upcall_client_t *
__get_upcall_client(call_frame_t *frame, client_t *client,
                    upcall_inode_ctx_t *up_inode_ctx)
{
    upcall_client_t *up_client_entry = NULL;
    upcall_client_t *tmp = NULL;
    gf_boolean_t found_client = _gf_false;

    list_for_each_entry_safe(up_client_entry, tmp, &up_inode_ctx->client_list,
                             client_list)
    {
        if (strcmp(client->client_uid, up_client_entry->client_uid) == 0) {
            /* found client entry. Update the access_time */
            up_client_entry->access_time = time(NULL);
            found_client = _gf_true;
            gf_log(THIS->name, GF_LOG_DEBUG, "upcall_entry_t client found - %s",
                   up_client_entry->client_uid);
            break;
        }
    }

    if (!found_client) { /* create one */
        up_client_entry = __add_upcall_client(frame, client, up_inode_ctx);
    }

    return up_client_entry;
}

int
__upcall_inode_ctx_set(inode_t *inode, xlator_t *this)
{
    upcall_inode_ctx_t *inode_ctx = NULL;
    upcall_private_t *priv = NULL;
    int ret = -1;
    uint64_t ctx = 0;

    priv = this->private;
    GF_ASSERT(priv);

    ret = __inode_ctx_get(inode, this, &ctx);

    if (!ret)
        goto out;

    inode_ctx = GF_CALLOC(1, sizeof(upcall_inode_ctx_t),
                          gf_upcall_mt_upcall_inode_ctx_t);

    if (!inode_ctx) {
        ret = -ENOMEM;
        goto out;
    }

    pthread_mutex_init(&inode_ctx->client_list_lock, NULL);
    INIT_LIST_HEAD(&inode_ctx->inode_ctx_list);
    INIT_LIST_HEAD(&inode_ctx->client_list);
    inode_ctx->destroy = 0;
    gf_uuid_copy(inode_ctx->gfid, inode->gfid);

    ctx = (long)inode_ctx;
    ret = __inode_ctx_set(inode, this, &ctx);
    if (ret) {
        gf_log(this->name, GF_LOG_DEBUG, "failed to set inode ctx (%p)", inode);
        GF_FREE(inode_ctx);
        goto out;
    }

    /* add this inode_ctx to the global list */
    LOCK(&priv->inode_ctx_lk);
    {
        list_add_tail(&inode_ctx->inode_ctx_list, &priv->inode_ctx_list);
    }
    UNLOCK(&priv->inode_ctx_lk);
out:
    return ret;
}

upcall_inode_ctx_t *
__upcall_inode_ctx_get(inode_t *inode, xlator_t *this)
{
    upcall_inode_ctx_t *inode_ctx = NULL;
    uint64_t ctx = 0;
    int ret = 0;

    ret = __inode_ctx_get(inode, this, &ctx);

    if (ret < 0) {
        ret = __upcall_inode_ctx_set(inode, this);
        if (ret < 0)
            goto out;

        ret = __inode_ctx_get(inode, this, &ctx);
        if (ret < 0)
            goto out;
    }

    inode_ctx = (upcall_inode_ctx_t *)(long)(ctx);

out:
    return inode_ctx;
}

upcall_inode_ctx_t *
upcall_inode_ctx_get(inode_t *inode, xlator_t *this)
{
    upcall_inode_ctx_t *inode_ctx = NULL;

    LOCK(&inode->lock);
    {
        inode_ctx = __upcall_inode_ctx_get(inode, this);
    }
    UNLOCK(&inode->lock);

    return inode_ctx;
}

int
upcall_cleanup_expired_clients(xlator_t *this, upcall_inode_ctx_t *up_inode_ctx)
{
    upcall_client_t *up_client = NULL;
    upcall_client_t *tmp = NULL;
    int ret = -1;
    time_t timeout = 0;
    time_t t_expired = 0;

    timeout = get_cache_invalidation_timeout(this);

    pthread_mutex_lock(&up_inode_ctx->client_list_lock);
    {
        list_for_each_entry_safe(up_client, tmp, &up_inode_ctx->client_list,
                                 client_list)
        {
            t_expired = time(NULL) - up_client->access_time;

            if (t_expired > (2 * timeout)) {
                gf_log(THIS->name, GF_LOG_TRACE, "Cleaning up client_entry(%s)",
                       up_client->client_uid);

                ret = __upcall_cleanup_client_entry(up_client);

                if (ret) {
                    gf_msg("upcall", GF_LOG_WARNING, 0,
                           UPCALL_MSG_INTERNAL_ERROR,
                           "Client entry cleanup failed (%p)", up_client);
                    goto out;
                }
            }
        }
    }
    pthread_mutex_unlock(&up_inode_ctx->client_list_lock);

    ret = 0;
out:
    return ret;
}

int
__upcall_cleanup_client_entry(upcall_client_t *up_client)
{
    list_del_init(&up_client->client_list);

    GF_FREE(up_client->client_uid);
    GF_FREE(up_client);

    return 0;
}

/*
 * Free Upcall inode_ctx client list
 */
int
__upcall_cleanup_inode_ctx_client_list(upcall_inode_ctx_t *inode_ctx)
{
    upcall_client_t *up_client = NULL;
    upcall_client_t *tmp = NULL;

    list_for_each_entry_safe(up_client, tmp, &inode_ctx->client_list,
                             client_list)
    {
        __upcall_cleanup_client_entry(up_client);
    }

    return 0;
}

/*
 * Free upcall_inode_ctx
 */
int
upcall_cleanup_inode_ctx(xlator_t *this, inode_t *inode)
{
    uint64_t ctx = 0;
    upcall_inode_ctx_t *inode_ctx = NULL;
    int ret = 0;
    upcall_private_t *priv = NULL;

    priv = this->private;
    GF_ASSERT(priv);

    ret = inode_ctx_del(inode, this, &ctx);

    if (ret < 0) {
        gf_msg("upcall", GF_LOG_WARNING, 0, UPCALL_MSG_INTERNAL_ERROR,
               "Failed to del upcall_inode_ctx (%p)", inode);
        goto out;
    }

    inode_ctx = (upcall_inode_ctx_t *)(long)ctx;

    if (inode_ctx) {
        /* Invalidate all the upcall cache entries */
        upcall_cache_forget(this, inode, inode_ctx);

        /* do we really need lock? yes now reaper thread
         * may also be trying to cleanup the client entries.
         */
        pthread_mutex_lock(&inode_ctx->client_list_lock);
        {
            if (!list_empty(&inode_ctx->client_list)) {
                __upcall_cleanup_inode_ctx_client_list(inode_ctx);
            }
        }
        pthread_mutex_unlock(&inode_ctx->client_list_lock);

        /* Mark the inode_ctx to be destroyed */
        inode_ctx->destroy = 1;
        gf_msg_debug("upcall", 0, "set upcall_inode_ctx (%p) to destroy mode",
                     inode_ctx);
    }

out:
    return ret;
}

/*
 * Traverse through the list of upcall_inode_ctx(s),
 * cleanup the expired client entries and destroy the ctx
 * which is no longer valid and has destroy bit set.
 */
void *
upcall_reaper_thread(void *data)
{
    upcall_private_t *priv = NULL;
    upcall_inode_ctx_t *inode_ctx = NULL;
    upcall_inode_ctx_t *tmp = NULL;
    xlator_t *this = NULL;
    time_t timeout = 0;

    this = (xlator_t *)data;
    GF_ASSERT(this);

    priv = this->private;
    GF_ASSERT(priv);

    while (!priv->fini) {
        list_for_each_entry_safe(inode_ctx, tmp, &priv->inode_ctx_list,
                                 inode_ctx_list)
        {
            /* cleanup expired clients */
            upcall_cleanup_expired_clients(this, inode_ctx);

            if (!inode_ctx->destroy) {
                continue;
            }

            LOCK(&priv->inode_ctx_lk);
            {
                /* client list would have been cleaned up*/
                gf_msg_debug("upcall", 0, "Freeing upcall_inode_ctx (%p)",
                             inode_ctx);
                list_del_init(&inode_ctx->inode_ctx_list);
                pthread_mutex_destroy(&inode_ctx->client_list_lock);
                GF_FREE(inode_ctx);
                inode_ctx = NULL;
            }
            UNLOCK(&priv->inode_ctx_lk);
        }

        /* don't do a very busy loop */
        timeout = get_cache_invalidation_timeout(this);
        sleep(timeout / 2);
    }

    return NULL;
}

/*
 * Initialize upcall reaper thread.
 */
int
upcall_reaper_thread_init(xlator_t *this)
{
    upcall_private_t *priv = NULL;
    int ret = -1;

    priv = this->private;
    GF_ASSERT(priv);

    ret = gf_thread_create(&priv->reaper_thr, NULL, upcall_reaper_thread, this,
                           "upreaper");

    return ret;
}

int
up_compare_afr_xattr(dict_t *d, char *k, data_t *v, void *tmp)
{
    dict_t *dict = tmp;

    if (!strncmp(k, AFR_XATTR_PREFIX, SLEN(AFR_XATTR_PREFIX)) &&
        (!is_data_equal(v, dict_get(dict, k))))
        return -1;

    return 0;
}

static void
up_filter_afr_xattr(dict_t *xattrs, char *xattr, data_t *v)
{
    /* Filter the afr pending xattrs, with value 0. Ideally this should
     * be executed only in case of xattrop and not in set and removexattr,
     * butset and remove xattr fops do not come with keys AFR_XATTR_PREFIX
     */
    if (!strncmp(xattr, AFR_XATTR_PREFIX, SLEN(AFR_XATTR_PREFIX)) &&
        (mem_0filled(v->data, v->len) == 0)) {
        dict_del(xattrs, xattr);
    }
    return;
}

static gf_boolean_t
up_key_is_regd_xattr(dict_t *regd_xattrs, char *regd_xattr, data_t *v,
                     void *xattr)
{
    int ret = _gf_false;
    char *key = xattr;

    if (fnmatch(regd_xattr, key, 0) == 0)
        ret = _gf_true;

    return ret;
}

int
up_filter_unregd_xattr(dict_t *xattrs, char *xattr, data_t *v,
                       void *regd_xattrs)
{
    int ret = 0;

    ret = dict_foreach_match(regd_xattrs, up_key_is_regd_xattr, xattr,
                             dict_null_foreach_fn, NULL);
    if (ret == 0) {
        /* xattr was not found in the registered xattr, hence do not
         * send notification for its change
         */
        dict_del(xattrs, xattr);
        goto out;
    }
    up_filter_afr_xattr(xattrs, xattr, v);
out:
    return 0;
}

int
up_filter_xattr(dict_t *xattr, dict_t *regd_xattrs)
{
    int ret = 0;

    ret = dict_foreach(xattr, up_filter_unregd_xattr, regd_xattrs);

    return ret;
}

gf_boolean_t
up_invalidate_needed(dict_t *xattrs)
{
    if (dict_key_count(xattrs) == 0) {
        gf_msg_trace("upcall", 0,
                     "None of xattrs requested for"
                     " invalidation, were changed. Nothing to "
                     "invalidate");
        return _gf_false;
    }

    return _gf_true;
}

/*
 * Given a client, first fetch upcall_entry_t from the inode_ctx client list.
 * Later traverse through the client list of that upcall entry. If this client
 * is not present in the list, create one client entry with this client info.
 * Also check if there are other clients which need to be notified of this
 * op. If yes send notify calls to them.
 *
 * Since sending notifications for cache_invalidation is a best effort,
 * any errors during the process are logged and ignored.
 */
void
upcall_cache_invalidate(call_frame_t *frame, xlator_t *this, client_t *client,
                        inode_t *inode, uint32_t flags, struct iatt *stbuf,
                        struct iatt *p_stbuf, struct iatt *oldp_stbuf,
                        dict_t *xattr)
{
    upcall_client_t *up_client_entry = NULL;
    upcall_client_t *tmp = NULL;
    upcall_inode_ctx_t *up_inode_ctx = NULL;
    gf_boolean_t found = _gf_false;
    inode_t *linked_inode = NULL;

    if (!is_upcall_enabled(this))
        return;

    /* server-side generated fops like quota/marker will not have any
     * client associated with them. Ignore such fops.
     */
    if (!client) {
        gf_msg_debug("upcall", 0, "Internal fop - client NULL");
        return;
    }

    /* For nameless LOOKUPs, inode created shall always be
     * invalid. Hence check if there is any already linked inode.
     * If yes, update the inode_ctx of that valid inode
     */
    if (inode && (inode->ia_type == IA_INVAL) && stbuf) {
        linked_inode = inode_find(inode->table, stbuf->ia_gfid);
        if (linked_inode) {
            gf_log("upcall", GF_LOG_DEBUG,
                   "upcall_inode_ctx_get of linked inode (%p)", inode);
            up_inode_ctx = upcall_inode_ctx_get(linked_inode, this);
        }
    }

    if (inode && !up_inode_ctx)
        up_inode_ctx = upcall_inode_ctx_get(inode, this);

    if (!up_inode_ctx) {
        gf_msg("upcall", GF_LOG_WARNING, 0, UPCALL_MSG_INTERNAL_ERROR,
               "upcall_inode_ctx_get failed (%p)", inode);
        return;
    }

    /* In case of LOOKUP, if first time, inode created shall be
     * invalid till it gets linked to inode table. Read gfid from
     * the stat returned in such cases.
     */
    if (gf_uuid_is_null(up_inode_ctx->gfid) && stbuf) {
        /* That means inode must have been invalid when this inode_ctx
         * is created. Copy the gfid value from stbuf instead.
         */
        gf_uuid_copy(up_inode_ctx->gfid, stbuf->ia_gfid);
    }

    if (gf_uuid_is_null(up_inode_ctx->gfid)) {
        gf_msg_debug(this->name, 0,
                     "up_inode_ctx->gfid and "
                     "stbuf->ia_gfid is NULL, fop:%s",
                     gf_fop_list[frame->root->op]);
        goto out;
    }

    pthread_mutex_lock(&up_inode_ctx->client_list_lock);
    {
        list_for_each_entry_safe(up_client_entry, tmp,
                                 &up_inode_ctx->client_list, client_list)
        {
            /* Do not send UPCALL event if same client. */
            if (!strcmp(client->client_uid, up_client_entry->client_uid)) {
                up_client_entry->access_time = time(NULL);
                found = _gf_true;
                continue;
            }

            /*
             * Ignore sending notifications in case of only UP_ATIME
             */
            if (!(flags & ~(UP_ATIME))) {
                if (found)
                    break;
                else /* we still need to find current client entry*/
                    continue;
            }

            /* any other client */

            /* XXX: Send notifications asynchrounously
             * instead of in the I/O path - BZ 1200264
             *  Also if the file is frequently accessed, set
             *  expire_time_attr to 0.
             */
            upcall_client_cache_invalidate(this, up_inode_ctx->gfid,
                                           up_client_entry, flags, stbuf,
                                           p_stbuf, oldp_stbuf, xattr);
        }

        if (!found) {
            up_client_entry = __add_upcall_client(frame, client, up_inode_ctx);
        }
    }
    pthread_mutex_unlock(&up_inode_ctx->client_list_lock);
out:
    /* release the ref from inode_find */
    if (linked_inode)
        inode_unref(linked_inode);
    return;
}

/*
 * If the upcall_client_t has recently accessed the file (i.e, within
 * priv->cache_invalidation_timeout), send a upcall notification.
 */
void
upcall_client_cache_invalidate(xlator_t *this, uuid_t gfid,
                               upcall_client_t *up_client_entry, uint32_t flags,
                               struct iatt *stbuf, struct iatt *p_stbuf,
                               struct iatt *oldp_stbuf, dict_t *xattr)
{
    struct gf_upcall up_req = {
        0,
    };
    struct gf_upcall_cache_invalidation ca_req = {
        0,
    };
    time_t timeout = 0;
    int ret = -1;
    time_t t_expired = time(NULL) - up_client_entry->access_time;

    GF_VALIDATE_OR_GOTO("upcall_client_cache_invalidate",
                        !(gf_uuid_is_null(gfid)), out);
    timeout = get_cache_invalidation_timeout(this);

    if (t_expired < timeout) {
        /* Send notify call */
        up_req.client_uid = up_client_entry->client_uid;
        gf_uuid_copy(up_req.gfid, gfid);

        ca_req.flags = flags;
        ca_req.expire_time_attr = up_client_entry->expire_time_attr;
        if (stbuf)
            ca_req.stat = *stbuf;
        if (p_stbuf)
            ca_req.p_stat = *p_stbuf;
        if (oldp_stbuf)
            ca_req.oldp_stat = *oldp_stbuf;
        ca_req.dict = xattr;

        up_req.data = &ca_req;
        up_req.event_type = GF_UPCALL_CACHE_INVALIDATION;

        gf_log(THIS->name, GF_LOG_TRACE,
               "Cache invalidation notification sent to %s",
               up_client_entry->client_uid);

        /* Need to send inode flags */
        ret = this->notify(this, GF_EVENT_UPCALL, &up_req);

        /*
         * notify may fail as the client could have been
         * dis(re)connected. Cleanup the client entry.
         */
        if (ret < 0)
            __upcall_cleanup_client_entry(up_client_entry);

    } else {
        gf_log(THIS->name, GF_LOG_TRACE,
               "Cache invalidation notification NOT sent to %s",
               up_client_entry->client_uid);

        if (t_expired > (2 * timeout)) {
            /* Cleanup the entry */
            __upcall_cleanup_client_entry(up_client_entry);
        }
    }
out:
    return;
}

/*
 * This is called during upcall_inode_ctx cleanup in case of 'inode_forget'.
 * Send "UP_FORGET" to all the clients so that they invalidate their cache
 * entry and do a fresh lookup next time when any I/O comes in.
 */
void
upcall_cache_forget(xlator_t *this, inode_t *inode,
                    upcall_inode_ctx_t *up_inode_ctx)
{
    upcall_client_t *up_client_entry = NULL;
    upcall_client_t *tmp = NULL;
    uint32_t flags = 0;

    if (!up_inode_ctx) {
        return;
    }

    pthread_mutex_lock(&up_inode_ctx->client_list_lock);
    {
        list_for_each_entry_safe(up_client_entry, tmp,
                                 &up_inode_ctx->client_list, client_list)
        {
            flags = UP_FORGET;

            /* Set the access time to time(NULL)
             * to send notify */
            up_client_entry->access_time = time(NULL);

            upcall_client_cache_invalidate(this, up_inode_ctx->gfid,
                                           up_client_entry, flags, NULL, NULL,
                                           NULL, NULL);
        }
    }
    pthread_mutex_unlock(&up_inode_ctx->client_list_lock);
}