Blob Blame History Raw
/*
   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/

#include <glusterfs/xlator.h>
#include <glusterfs/defaults.h>
#include <glusterfs/logging.h>
#include <glusterfs/iobuf.h>
#include <glusterfs/syscall.h>

#include "changelog-helpers.h"
#include "changelog-encoders.h"
#include "changelog-mem-types.h"
#include "changelog-messages.h"

#include "changelog-encoders.h"
#include "changelog-rpc-common.h"
#include <pthread.h>

static void
changelog_cleanup_free_mutex(void *arg_mutex)
{
    pthread_mutex_t *p_mutex = (pthread_mutex_t *)arg_mutex;

    if (p_mutex)
        pthread_mutex_unlock(p_mutex);
}

int
changelog_thread_cleanup(xlator_t *this, pthread_t thr_id)
{
    int ret = 0;
    void *retval = NULL;

    /* send a cancel request to the thread */
    ret = pthread_cancel(thr_id);
    if (ret != 0) {
        gf_msg(this->name, GF_LOG_ERROR, errno,
               CHANGELOG_MSG_PTHREAD_CANCEL_FAILED, "could not cancel thread");
        goto out;
    }

    ret = pthread_join(thr_id, &retval);
    if ((ret != 0) || (retval != PTHREAD_CANCELED)) {
        gf_msg(this->name, GF_LOG_ERROR, errno,
               CHANGELOG_MSG_PTHREAD_CANCEL_FAILED,
               "cancel request not adhered as expected");
    }

out:
    return ret;
}

void *
changelog_get_usable_buffer(changelog_local_t *local)
{
    changelog_log_data_t *cld = NULL;

    if (!local)
        return NULL;

    cld = &local->cld;
    if (!cld->cld_iobuf)
        return NULL;

    return cld->cld_iobuf->ptr;
}

static int
changelog_selector_index(unsigned int selector)
{
    return (ffs(selector) - 1);
}

int
changelog_ev_selected(xlator_t *this, changelog_ev_selector_t *selection,
                      unsigned int selector)
{
    int idx = 0;

    idx = changelog_selector_index(selector);
    gf_msg_debug(this->name, 0, "selector ref count for %d (idx: %d): %d",
                 selector, idx, selection->ref[idx]);
    /* this can be lockless */
    return (idx < CHANGELOG_EV_SELECTION_RANGE && (selection->ref[idx] > 0));
}

void
changelog_select_event(xlator_t *this, changelog_ev_selector_t *selection,
                       unsigned int selector)
{
    int idx = 0;

    LOCK(&selection->reflock);
    {
        while (selector) {
            idx = changelog_selector_index(selector);
            if (idx < CHANGELOG_EV_SELECTION_RANGE) {
                selection->ref[idx]++;
                gf_msg_debug(this->name, 0, "selecting event %d", idx);
            }
            selector &= ~(1 << idx);
        }
    }
    UNLOCK(&selection->reflock);
}

void
changelog_deselect_event(xlator_t *this, changelog_ev_selector_t *selection,
                         unsigned int selector)
{
    int idx = 0;

    LOCK(&selection->reflock);
    {
        while (selector) {
            idx = changelog_selector_index(selector);
            if (idx < CHANGELOG_EV_SELECTION_RANGE) {
                selection->ref[idx]--;
                gf_msg_debug(this->name, 0, "de-selecting event %d", idx);
            }
            selector &= ~(1 << idx);
        }
    }
    UNLOCK(&selection->reflock);
}

int
changelog_init_event_selection(xlator_t *this,
                               changelog_ev_selector_t *selection)
{
    int ret = 0;
    int j = CHANGELOG_EV_SELECTION_RANGE;

    ret = LOCK_INIT(&selection->reflock);
    if (ret != 0)
        return -1;

    LOCK(&selection->reflock);
    {
        while (j--) {
            selection->ref[j] = 0;
        }
    }
    UNLOCK(&selection->reflock);

    return 0;
}

int
changelog_cleanup_event_selection(xlator_t *this,
                                  changelog_ev_selector_t *selection)
{
    int j = CHANGELOG_EV_SELECTION_RANGE;

    LOCK(&selection->reflock);
    {
        while (j--) {
            if (selection->ref[j] > 0)
                gf_msg(this->name, GF_LOG_WARNING, 0,
                       CHANGELOG_MSG_CLEANUP_ON_ACTIVE_REF,
                       "changelog event selection cleaning up "
                       " on active references");
        }
    }
    UNLOCK(&selection->reflock);

    return LOCK_DESTROY(&selection->reflock);
}

static void
changelog_perform_dispatch(xlator_t *this, changelog_priv_t *priv, void *mem,
                           size_t size)
{
    char *buf = NULL;
    void *opaque = NULL;

    buf = rbuf_reserve_write_area(priv->rbuf, size, &opaque);
    if (!buf) {
        gf_msg_callingfn(this->name, GF_LOG_WARNING, 0,
                         CHANGELOG_MSG_DISPATCH_EVENT_FAILED,
                         "failed to dispatch event");
        return;
    }

    memcpy(buf, mem, size);
    rbuf_write_complete(opaque);
}

void
changelog_dispatch_event(xlator_t *this, changelog_priv_t *priv,
                         changelog_event_t *ev)
{
    changelog_ev_selector_t *selection = NULL;

    selection = &priv->ev_selection;
    if (changelog_ev_selected(this, selection, ev->ev_type)) {
        changelog_perform_dispatch(this, priv, ev, CHANGELOG_EV_SIZE);
    }
}

void
changelog_set_usable_record_and_length(changelog_local_t *local, size_t len,
                                       int xr)
{
    changelog_log_data_t *cld = NULL;

    cld = &local->cld;

    cld->cld_ptr_len = len;
    cld->cld_xtra_records = xr;
}

void
changelog_local_cleanup(xlator_t *xl, changelog_local_t *local)
{
    int i = 0;
    changelog_opt_t *co = NULL;
    changelog_log_data_t *cld = NULL;

    if (!local)
        return;

    cld = &local->cld;

    /* cleanup dynamic allocation for extra records */
    if (cld->cld_xtra_records) {
        co = (changelog_opt_t *)cld->cld_ptr;
        for (; i < cld->cld_xtra_records; i++, co++)
            if (co->co_free)
                co->co_free(co);
    }

    CHANGELOG_IOBUF_UNREF(cld->cld_iobuf);

    if (local->inode)
        inode_unref(local->inode);

    mem_put(local);
}

int
changelog_write(int fd, char *buffer, size_t len)
{
    ssize_t size = 0;
    size_t written = 0;

    while (written < len) {
        size = sys_write(fd, buffer + written, len - written);
        if (size <= 0)
            break;

        written += size;
    }

    return (written != len);
}

int
htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts,
             char *buffer)
{
    char changelog_path[PATH_MAX + 1] = {
        0,
    };
    int len = -1;
    char x_value[25] = {
        0,
    };
    /* time stamp(10) + : (1) + rolltime (12 ) + buffer (2) */
    int ret = 0;

    if (priv->htime_fd == -1) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR,
               "Htime fd not available for updation");
        ret = -1;
        goto out;
    }
    len = snprintf(changelog_path, PATH_MAX, "%s", buffer);
    if (len >= PATH_MAX) {
        ret = -1;
        goto out;
    }
    if (changelog_write(priv->htime_fd, (void *)changelog_path, len + 1) < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR,
               "Htime file content write failed");
        ret = -1;
        goto out;
    }

    len = snprintf(x_value, sizeof(x_value), "%lu:%d", ts,
                   priv->rollover_count);
    if (len >= sizeof(x_value)) {
        ret = -1;
        goto out;
    }

    if (sys_fsetxattr(priv->htime_fd, HTIME_KEY, x_value, len, XATTR_REPLACE)) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_ERROR,
                "Htime xattr updation failed with XATTR_REPLACE",
                "changelog=%s", changelog_path, NULL);

        if (sys_fsetxattr(priv->htime_fd, HTIME_KEY, x_value, len, 0)) {
            gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_ERROR,
                    "Htime xattr updation failed", "changelog=%s",
                    changelog_path, NULL);
            ret = -1;
            goto out;
        }
    }

    priv->rollover_count += 1;

out:
    return ret;
}

/*
 * Description: Check if the changelog to rollover is empty or not.
 * It is assumed that fd passed is already verified.
 *
 * Returns:
 * 1 : If found empty, changed path from "CHANGELOG.<TS>" to "changelog.<TS>"
 * 0 : If NOT empty, proceed usual.
 */
int
cl_is_empty(xlator_t *this, int fd)
{
    int ret = -1;
    size_t elen = 0;
    int encoding = -1;
    char buffer[1024] = {
        0,
    };
    struct stat stbuf = {
        0,
    };
    int major_version = -1;
    int minor_version = -1;

    ret = sys_fstat(fd, &stbuf);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSTAT_OP_FAILED,
               "Could not stat (CHANGELOG)");
        goto out;
    }

    ret = sys_lseek(fd, 0, SEEK_SET);
    if (ret == -1) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_LSEEK_OP_FAILED,
               "Could not lseek (CHANGELOG)");
        goto out;
    }

    CHANGELOG_GET_HEADER_INFO(fd, buffer, sizeof(buffer), encoding,
                              major_version, minor_version, elen);

    if (elen == stbuf.st_size) {
        ret = 1;
    } else {
        ret = 0;
    }

out:
    return ret;
}

/*
 * Description: Updates "CHANGELOG" to "changelog" for writing changelog path
 * to htime file.
 *
 * Returns:
 * 0  : Success
 * -1 : Error
 */
int
update_path(xlator_t *this, char *cl_path)
{
    const char low_cl[] = "changelog";
    const char up_cl[] = "CHANGELOG";
    char *found = NULL;
    int ret = -1;

    found = strstr(cl_path, up_cl);

    if (found == NULL) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_LSEEK_OP_FAILED,
               "Could not find CHANGELOG in changelog path");
        goto out;
    } else {
        memcpy(found, low_cl, sizeof(low_cl) - 1);
    }

    ret = 0;
out:
    return ret;
}

static int
changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv,
                             unsigned long ts)
{
    int ret = -1;
    int notify = 0;
    int cl_empty_flag = 0;
    char ofile[PATH_MAX] = {
        0,
    };
    char nfile[PATH_MAX] = {
        0,
    };
    changelog_event_t ev = {
        0,
    };

    if (priv->changelog_fd != -1) {
        ret = sys_fsync(priv->changelog_fd);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, errno,
                   CHANGELOG_MSG_FSYNC_OP_FAILED, "fsync failed");
        }
        ret = cl_is_empty(this, priv->changelog_fd);
        if (ret == 1) {
            cl_empty_flag = 1;
        } else if (ret == -1) {
            /* Log error but proceed as usual */
            gf_msg(this->name, GF_LOG_WARNING, 0,
                   CHANGELOG_MSG_DETECT_EMPTY_CHANGELOG_FAILED,
                   "Error detecting empty changelog");
        }
        sys_close(priv->changelog_fd);
        priv->changelog_fd = -1;
    }

    (void)snprintf(ofile, PATH_MAX, "%s/" CHANGELOG_FILE_NAME,
                   priv->changelog_dir);
    (void)snprintf(nfile, PATH_MAX, "%s/" CHANGELOG_FILE_NAME ".%lu",
                   priv->changelog_dir, ts);

    if (cl_empty_flag == 1) {
        ret = sys_unlink(ofile);
        if (ret) {
            gf_smsg(this->name, GF_LOG_ERROR, errno,
                    CHANGELOG_MSG_UNLINK_OP_FAILED,
                    "error unlinking empty changelog", "path=%s", ofile, NULL);
            ret = 0; /* Error in unlinking empty changelog should
                        not break further changelog operation, so
                        reset return value to 0*/
        }
    } else {
        ret = sys_rename(ofile, nfile);

        if (ret && (errno == ENOENT)) {
            ret = 0;
            goto out;
        }
        if (ret) {
            gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_RENAME_ERROR,
                    "error renaming", "from=%s", ofile, "to=%s", nfile, NULL);
        }
    }

    if (!ret && (cl_empty_flag == 0)) {
        notify = 1;
    }

    if (!ret) {
        if (cl_empty_flag) {
            update_path(this, nfile);
        }
        ret = htime_update(this, priv, ts, nfile);
        if (ret == -1) {
            gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR,
                   "could not update htime file");
            goto out;
        }
    }

    if (notify) {
        ev.ev_type = CHANGELOG_OP_TYPE_JOURNAL;
        memcpy(ev.u.journal.path, nfile, strlen(nfile) + 1);
        changelog_dispatch_event(this, priv, &ev);
    }
out:
    /* If this is explicit rollover initiated by snapshot,
     * wakeup reconfigure thread waiting for changelog to
     * rollover. This should happen even in failure cases as
     * well otherwise snapshot will timeout and fail. Hence
     * moved under out.
     */
    if (priv->explicit_rollover) {
        priv->explicit_rollover = _gf_false;

        pthread_mutex_lock(&priv->bn.bnotify_mutex);
        {
            if (ret) {
                priv->bn.bnotify_error = _gf_true;
                gf_msg(this->name, GF_LOG_ERROR, 0,
                       CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED,
                       "Fail snapshot because of "
                       "previous errors");
            } else {
                gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BNOTIFY_INFO,
                        "Explicit "
                        "rollover changelog signaling "
                        "bnotify",
                        "changelog=%s", nfile, NULL);
            }
            priv->bn.bnotify = _gf_false;
            pthread_cond_signal(&priv->bn.bnotify_cond);
        }
        pthread_mutex_unlock(&priv->bn.bnotify_mutex);
    }
    return ret;
}

int
filter_cur_par_dirs(const struct dirent *entry)
{
    if (entry == NULL)
        return 0;

    if ((strcmp(entry->d_name, ".") == 0) || (strcmp(entry->d_name, "..") == 0))
        return 0;
    else
        return 1;
}

/*
 * find_current_htime:
 *       It finds the latest htime file and sets the HTIME_CURRENT
 *       xattr.
 *       RETURN VALUE:
 *           -1 : Error
 *           ret: Number of directory entries;
 */

int
find_current_htime(int ht_dir_fd, const char *ht_dir_path, char *ht_file_bname)
{
    struct dirent **namelist = NULL;
    int ret = 0;
    int cnt = 0;
    int i = 0;
    xlator_t *this = NULL;

    this = THIS;
    GF_ASSERT(this);
    GF_ASSERT(ht_dir_path);

    cnt = scandir(ht_dir_path, &namelist, filter_cur_par_dirs, alphasort);
    if (cnt < 0) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_SCAN_DIR_FAILED,
               "scandir failed");
    } else if (cnt > 0) {
        if (snprintf(ht_file_bname, NAME_MAX, "%s",
                     namelist[cnt - 1]->d_name) >= NAME_MAX) {
            ret = -1;
            goto out;
        }
        if (sys_fsetxattr(ht_dir_fd, HTIME_CURRENT, ht_file_bname,
                          strlen(ht_file_bname), 0)) {
            gf_msg(this->name, GF_LOG_ERROR, errno,
                   CHANGELOG_MSG_FSETXATTR_FAILED,
                   "fsetxattr failed: HTIME_CURRENT");
            ret = -1;
            goto out;
        }

        if (sys_fsync(ht_dir_fd) < 0) {
            gf_msg(this->name, GF_LOG_ERROR, errno,
                   CHANGELOG_MSG_FSYNC_OP_FAILED, "fsync failed");
            ret = -1;
            goto out;
        }
    }

out:
    for (i = 0; i < cnt; i++)
        free(namelist[i]);
    free(namelist);

    if (ret)
        cnt = ret;

    return cnt;
}

/* Returns 0 on successful open of htime file
 * returns -1 on failure or error
 */
int
htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts)
{
    int ht_file_fd = -1;
    int ht_dir_fd = -1;
    int ret = 0;
    int cnt = 0;
    char ht_dir_path[PATH_MAX] = {
        0,
    };
    char ht_file_path[PATH_MAX] = {
        0,
    };
    char ht_file_bname[NAME_MAX] = {
        0,
    };
    char x_value[NAME_MAX] = {
        0,
    };
    int flags = 0;
    unsigned long min_ts = 0;
    unsigned long max_ts = 0;
    unsigned long total = 0;
    unsigned long total1 = 0;
    ssize_t size = 0;
    struct stat stat_buf = {
        0,
    };
    unsigned long record_len = 0;
    int32_t len = 0;

    CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, ht_dir_path);

    /* Open htime directory to get HTIME_CURRENT */
    ht_dir_fd = open(ht_dir_path, O_RDONLY);
    if (ht_dir_fd == -1) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED,
                "open failed", "path=%s", ht_dir_path, NULL);
        ret = -1;
        goto out;
    }

    size = sys_fgetxattr(ht_dir_fd, HTIME_CURRENT, ht_file_bname,
                         sizeof(ht_file_bname));
    if (size < 0) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FGETXATTR_FAILED,
               "Error extracting"
               " HTIME_CURRENT.");

        /* If upgrade scenario, find the latest HTIME.TSTAMP file
         * and use the same. If error, create a new HTIME.TSTAMP
         * file.
         */
        cnt = find_current_htime(ht_dir_fd, ht_dir_path, ht_file_bname);
        if (cnt <= 0) {
            gf_msg(this->name, GF_LOG_INFO, errno, CHANGELOG_MSG_HTIME_INFO,
                   "HTIME_CURRENT not found. Changelog enabled"
                   " before init");
            sys_close(ht_dir_fd);
            return htime_create(this, priv, ts);
        }

        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_ERROR,
               "Error extracting"
               " HTIME_CURRENT.");
    }

    gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_HTIME_INFO,
            "HTIME_CURRENT", "path=%s", ht_file_bname, NULL);
    len = snprintf(ht_file_path, PATH_MAX, "%s/%s", ht_dir_path, ht_file_bname);
    if ((len < 0) || (len >= PATH_MAX)) {
        ret = -1;
        goto out;
    }

    /* Open in append mode as existing htime file is used */
    flags |= (O_RDWR | O_SYNC | O_APPEND);
    ht_file_fd = open(ht_file_path, flags,
                      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
    if (ht_file_fd < 0) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED,
                "unable to open htime file", "path=%s", ht_file_path, NULL);
        ret = -1;
        goto out;
    }

    /* save this htime_fd in priv->htime_fd */
    priv->htime_fd = ht_file_fd;

    ret = sys_fstat(ht_file_fd, &stat_buf);
    if (ret < 0) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_ERROR,
                "unable to stat htime file", "path=%s", ht_file_path, NULL);
        ret = -1;
        goto out;
    }

    /* Initialize rollover-number in priv to current number */
    size = sys_fgetxattr(ht_file_fd, HTIME_KEY, x_value, sizeof(x_value));
    if (size < 0) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FGETXATTR_FAILED,
                "error extracting max"
                " timstamp from htime file",
                "path=%s", ht_file_path, NULL);
        ret = -1;
        goto out;
    }

    sscanf(x_value, "%lu:%lu", &max_ts, &total);

    /* 22 = 1(/) + 20(CHANGELOG.TIMESTAMP) + 1(\x00) */
    record_len = strlen(priv->changelog_dir) + 22;
    total1 = stat_buf.st_size / record_len;
    if (total != total1) {
        gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_TOTAL_LOG_INFO,
                "Mismatch of changelog count. "
                "INIT CASE",
                "xattr_total=%lu", total, "size_total=%lu", total1, NULL);
    }

    gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_TOTAL_LOG_INFO,
            "INIT CASE", "min=%lu", min_ts, "max=%lu", max_ts,
            "total_changelogs=%lu", total, NULL);

    if (total < total1)
        priv->rollover_count = total1 + 1;
    else
        priv->rollover_count = total + 1;

out:
    if (ht_dir_fd != -1)
        sys_close(ht_dir_fd);
    return ret;
}

/* Returns 0 on successful creation of htime file
 * returns -1 on failure or error
 */
int
htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts)
{
    int ht_file_fd = -1;
    int ht_dir_fd = -1;
    int ret = 0;
    char ht_dir_path[PATH_MAX] = {
        0,
    };
    char ht_file_path[PATH_MAX] = {
        0,
    };
    char ht_file_bname[NAME_MAX + 1] = {
        0,
    };
    int flags = 0;
    int32_t len = 0;

    gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_HTIME_INFO,
            "Changelog enable: Creating new "
            "HTIME file",
            "name=%lu", ts, NULL);

    CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, ht_dir_path);

    /* get the htime file name in ht_file_path */
    len = snprintf(ht_file_path, PATH_MAX, "%s/%s.%lu", ht_dir_path,
                   HTIME_FILE_NAME, ts);
    if ((len < 0) || (len >= PATH_MAX)) {
        ret = -1;
        goto out;
    }

    flags |= (O_CREAT | O_RDWR | O_SYNC);
    ht_file_fd = open(ht_file_path, flags,
                      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
    if (ht_file_fd < 0) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED,
                "unable to create htime file", "path=%s", ht_file_path, NULL);
        ret = -1;
        goto out;
    }

    if (sys_fsetxattr(ht_file_fd, HTIME_KEY, HTIME_INITIAL_VALUE,
                      sizeof(HTIME_INITIAL_VALUE) - 1, 0)) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSETXATTR_FAILED,
               "Htime xattr initialization failed");
        ret = -1;
        goto out;
    }

    ret = sys_fsync(ht_file_fd);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSYNC_OP_FAILED,
               "fsync failed");
        goto out;
    }

    /* save this htime_fd in priv->htime_fd */
    priv->htime_fd = ht_file_fd;

    ht_file_fd = -1;

    /* Set xattr HTIME_CURRENT on htime directory to htime filename */
    ht_dir_fd = open(ht_dir_path, O_RDONLY);
    if (ht_dir_fd == -1) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED,
                "open failed", "path=%s", ht_dir_path, NULL);
        ret = -1;
        goto out;
    }

    (void)snprintf(ht_file_bname, sizeof(ht_file_bname), "%s.%lu",
                   HTIME_FILE_NAME, ts);
    if (sys_fsetxattr(ht_dir_fd, HTIME_CURRENT, ht_file_bname,
                      strlen(ht_file_bname), 0)) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSETXATTR_FAILED,
               "fsetxattr failed:"
               " HTIME_CURRENT");
        ret = -1;
        goto out;
    }

    ret = sys_fsync(ht_dir_fd);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSYNC_OP_FAILED,
               "fsync failed");
        goto out;
    }

    /* initialize rollover-number in priv to 1 */
    priv->rollover_count = 1;

out:
    if (ht_dir_fd != -1)
        sys_close(ht_dir_fd);
    if (ht_file_fd != -1)
        sys_close(ht_file_fd);
    return ret;
}

/* Description:
 *      Opens the snap changelog to log call path fops in it.
 *      This changelos name is "CHANGELOG.SNAP", stored in
 *      path ".glusterfs/changelogs/csnap".
 * Returns:
 *       0  : On success.
 *      -1  : On failure.
 */
int
changelog_snap_open(xlator_t *this, changelog_priv_t *priv)
{
    int fd = -1;
    int ret = 0;
    int flags = 0;
    char buffer[1024] = {
        0,
    };
    char c_snap_path[PATH_MAX] = {
        0,
    };
    char csnap_dir_path[PATH_MAX] = {
        0,
    };
    int32_t len = 0;

    CHANGELOG_FILL_CSNAP_DIR(priv->changelog_dir, csnap_dir_path);

    len = snprintf(c_snap_path, PATH_MAX, "%s/" CSNAP_FILE_NAME,
                   csnap_dir_path);
    if ((len < 0) || (len >= PATH_MAX)) {
        ret = -1;
        goto out;
    }

    flags |= (O_CREAT | O_RDWR | O_TRUNC);

    fd = open(c_snap_path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
    if (fd < 0) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED,
                "unable to open file", "path=%s", c_snap_path, NULL);
        ret = -1;
        goto out;
    }
    priv->c_snap_fd = fd;

    (void)snprintf(buffer, 1024, CHANGELOG_HEADER, CHANGELOG_VERSION_MAJOR,
                   CHANGELOG_VERSION_MINOR, priv->ce->encoder);
    ret = changelog_snap_write_change(priv, buffer, strlen(buffer));
    if (ret < 0) {
        sys_close(priv->c_snap_fd);
        priv->c_snap_fd = -1;
        goto out;
    }

out:
    return ret;
}

/*
 * Description:
 *      Starts logging fop details in CSNAP journal.
 * Returns:
 *       0 : On success.
 *      -1 : On Failure.
 */
int
changelog_snap_logging_start(xlator_t *this, changelog_priv_t *priv)
{
    int ret = 0;

    ret = changelog_snap_open(this, priv);
    gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_SNAP_INFO,
           "Now starting to log in call path");

    return ret;
}

/*
 * Description:
 *      Stops logging fop details in CSNAP journal.
 * Returns:
 *       0 : On success.
 *      -1 : On Failure.
 */
int
changelog_snap_logging_stop(xlator_t *this, changelog_priv_t *priv)
{
    int ret = 0;

    sys_close(priv->c_snap_fd);
    priv->c_snap_fd = -1;

    gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_SNAP_INFO,
           "Stopped to log in call path");

    return ret;
}

int
changelog_open_journal(xlator_t *this, changelog_priv_t *priv)
{
    int fd = 0;
    int ret = -1;
    int flags = 0;
    char buffer[1024] = {
        0,
    };
    char changelog_path[PATH_MAX] = {
        0,
    };

    (void)snprintf(changelog_path, PATH_MAX, "%s/" CHANGELOG_FILE_NAME,
                   priv->changelog_dir);

    flags |= (O_CREAT | O_RDWR);
    if (priv->fsync_interval == 0)
        flags |= O_SYNC;

    fd = open(changelog_path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
    if (fd < 0) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED,
                "unable to open/create changelog file."
                " change-logging will be"
                " inactive",
                "path=%s", changelog_path, NULL);
        goto out;
    }

    priv->changelog_fd = fd;

    (void)snprintf(buffer, 1024, CHANGELOG_HEADER, CHANGELOG_VERSION_MAJOR,
                   CHANGELOG_VERSION_MINOR, priv->ce->encoder);
    ret = changelog_write_change(priv, buffer, strlen(buffer));
    if (ret) {
        sys_close(priv->changelog_fd);
        priv->changelog_fd = -1;
        goto out;
    }

    ret = 0;

out:
    return ret;
}

int
changelog_start_next_change(xlator_t *this, changelog_priv_t *priv,
                            unsigned long ts, gf_boolean_t finale)
{
    int ret = -1;

    ret = changelog_rollover_changelog(this, priv, ts);

    if (!ret && !finale)
        ret = changelog_open_journal(this, priv);

    return ret;
}

/**
 * return the length of entry
 */
size_t
changelog_entry_length()
{
    return sizeof(changelog_log_data_t);
}

int
changelog_fill_rollover_data(changelog_log_data_t *cld, gf_boolean_t is_last)
{
    struct timeval tv = {
        0,
    };

    cld->cld_type = CHANGELOG_TYPE_ROLLOVER;

    if (gettimeofday(&tv, NULL))
        return -1;

    cld->cld_roll_time = (unsigned long)tv.tv_sec;
    cld->cld_finale = is_last;
    return 0;
}

int
changelog_snap_write_change(changelog_priv_t *priv, char *buffer, size_t len)
{
    return changelog_write(priv->c_snap_fd, buffer, len);
}

int
changelog_write_change(changelog_priv_t *priv, char *buffer, size_t len)
{
    return changelog_write(priv->changelog_fd, buffer, len);
}

/*
 * Descriptions:
 *      Writes fop details in ascii format to CSNAP.
 * Issues:
 *      Not Encoding agnostic.
 * Returns:
 *      0 : On Success.
 *     -1 : On Failure.
 */
int
changelog_snap_handle_ascii_change(xlator_t *this, changelog_log_data_t *cld)
{
    size_t off = 0;
    size_t gfid_len = 0;
    char *gfid_str = NULL;
    char *buffer = NULL;
    changelog_priv_t *priv = NULL;
    int ret = 0;

    if (this == NULL) {
        ret = -1;
        goto out;
    }

    priv = this->private;

    if (priv == NULL) {
        ret = -1;
        goto out;
    }

    gfid_str = uuid_utoa(cld->cld_gfid);
    gfid_len = strlen(gfid_str);

    /*  extra bytes for decorations */
    buffer = alloca(gfid_len + cld->cld_ptr_len + 10);
    CHANGELOG_STORE_ASCII(priv, buffer, off, gfid_str, gfid_len, cld);

    CHANGELOG_FILL_BUFFER(buffer, off, "\0", 1);

    ret = changelog_snap_write_change(priv, buffer, off);

    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_WRITE_FAILED,
               "error writing csnap to disk");
    }
    gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_SNAP_INFO,
           "Successfully wrote to csnap");
    ret = 0;
out:
    return ret;
}

int
changelog_handle_change(xlator_t *this, changelog_priv_t *priv,
                        changelog_log_data_t *cld)
{
    int ret = 0;

    if (CHANGELOG_TYPE_IS_ROLLOVER(cld->cld_type)) {
        changelog_encode_change(priv);
        ret = changelog_start_next_change(this, priv, cld->cld_roll_time,
                                          cld->cld_finale);
        if (ret)
            gf_msg(this->name, GF_LOG_ERROR, 0,
                   CHANGELOG_MSG_GET_TIME_OP_FAILED,
                   "Problem rolling over changelog(s)");
        goto out;
    }

    /**
     * case when there is reconfigure done (disabling changelog) and there
     * are still fops that have updates in prgress.
     */
    if (priv->changelog_fd == -1)
        return 0;

    if (CHANGELOG_TYPE_IS_FSYNC(cld->cld_type)) {
        ret = sys_fsync(priv->changelog_fd);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, errno,
                   CHANGELOG_MSG_FSYNC_OP_FAILED, "fsync failed");
        }
        goto out;
    }

    ret = priv->ce->encode(this, cld);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_WRITE_FAILED,
               "error writing changelog to disk");
    }

out:
    return ret;
}

changelog_local_t *
changelog_local_init(xlator_t *this, inode_t *inode, uuid_t gfid,
                     int xtra_records, gf_boolean_t update_flag)
{
    changelog_local_t *local = NULL;
    struct iobuf *iobuf = NULL;

    /**
     * We relax the presence of inode if @update_flag is true.
     * The caller (implementation of the fop) needs to be careful to
     * not blindly use local->inode.
     */
    if (!update_flag && !inode) {
        gf_msg_callingfn(this->name, GF_LOG_WARNING, 0,
                         CHANGELOG_MSG_INODE_NOT_FOUND,
                         "inode needed for version checking !!!");
        goto out;
    }

    if (xtra_records) {
        iobuf = iobuf_get2(this->ctx->iobuf_pool,
                           xtra_records * CHANGELOG_OPT_RECORD_LEN);
        if (!iobuf)
            goto out;
    }

    local = mem_get0(this->local_pool);
    if (!local) {
        CHANGELOG_IOBUF_UNREF(iobuf);
        goto out;
    }

    local->update_no_check = update_flag;

    gf_uuid_copy(local->cld.cld_gfid, gfid);

    local->cld.cld_iobuf = iobuf;
    local->cld.cld_xtra_records = 0; /* set by the caller */

    if (inode)
        local->inode = inode_ref(inode);

out:
    return local;
}

int
changelog_forget(xlator_t *this, inode_t *inode)
{
    uint64_t ctx_addr = 0;
    changelog_inode_ctx_t *ctx = NULL;

    inode_ctx_del(inode, this, &ctx_addr);
    if (!ctx_addr)
        return 0;

    ctx = (changelog_inode_ctx_t *)(long)ctx_addr;
    GF_FREE(ctx);

    return 0;
}

int
changelog_inject_single_event(xlator_t *this, changelog_priv_t *priv,
                              changelog_log_data_t *cld)
{
    return priv->cd.dispatchfn(this, priv, priv->cd.cd_data, cld, NULL);
}

/* Wait till all the black fops are drained */
void
changelog_drain_black_fops(xlator_t *this, changelog_priv_t *priv)
{
    int ret = 0;

    /* clean up framework of pthread_mutex is required here as
     * 'reconfigure' terminates the changelog_rollover thread
     * on graph change.
     */
    pthread_cleanup_push(changelog_cleanup_free_mutex,
                         &priv->dm.drain_black_mutex);
    ret = pthread_mutex_lock(&priv->dm.drain_black_mutex);
    if (ret)
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_ERROR,
                "pthread error", "error=%d", ret, NULL);
    while (priv->dm.black_fop_cnt > 0) {
        gf_msg_debug(this->name, 0, "Conditional wait on black fops: %ld",
                     priv->dm.black_fop_cnt);
        priv->dm.drain_wait_black = _gf_true;
        ret = pthread_cond_wait(&priv->dm.drain_black_cond,
                                &priv->dm.drain_black_mutex);
        if (ret)
            gf_smsg(this->name, GF_LOG_ERROR, errno,
                    CHANGELOG_MSG_PTHREAD_COND_WAIT_FAILED,
                    "pthread cond wait failed", "error=%d", ret, NULL);
    }
    priv->dm.drain_wait_black = _gf_false;
    ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
    if (ret)
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_ERROR,
                "pthread error", "error=%d", ret, NULL);
    pthread_cleanup_pop(0);
    gf_msg_debug(this->name, 0, "Woke up: Conditional wait on black fops");
}

/* Wait till all the white  fops are drained */
void
changelog_drain_white_fops(xlator_t *this, changelog_priv_t *priv)
{
    int ret = 0;

    /* clean up framework of pthread_mutex is required here as
     * 'reconfigure' terminates the changelog_rollover thread
     * on graph change.
     */
    pthread_cleanup_push(changelog_cleanup_free_mutex,
                         &priv->dm.drain_white_mutex);
    ret = pthread_mutex_lock(&priv->dm.drain_white_mutex);
    if (ret)
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_ERROR,
                "pthread error", "error=%d", ret, NULL);
    while (priv->dm.white_fop_cnt > 0) {
        gf_msg_debug(this->name, 0, "Conditional wait on white fops : %ld",
                     priv->dm.white_fop_cnt);
        priv->dm.drain_wait_white = _gf_true;
        ret = pthread_cond_wait(&priv->dm.drain_white_cond,
                                &priv->dm.drain_white_mutex);
        if (ret)
            gf_smsg(this->name, GF_LOG_ERROR, errno,
                    CHANGELOG_MSG_PTHREAD_COND_WAIT_FAILED,
                    "pthread cond wait failed", "error=%d", ret, NULL);
    }
    priv->dm.drain_wait_white = _gf_false;
    ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
    if (ret)
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_ERROR,
                "pthread error", "error=%d", ret, NULL);
    pthread_cleanup_pop(0);
    gf_msg_debug(this->name, 0, "Woke up: Conditional wait on white fops");
}

/**
 * TODO: these threads have many thing in common (wake up after
 * a certain time etc..). move them into separate routine.
 */
void *
changelog_rollover(void *data)
{
    int ret = 0;
    xlator_t *this = NULL;
    struct timespec tv = {
        0,
    };
    changelog_log_data_t cld = {
        0,
    };
    changelog_time_slice_t *slice = NULL;
    changelog_priv_t *priv = data;

    this = priv->cr.this;
    slice = &priv->slice;

    while (1) {
        (void)pthread_testcancel();

        tv.tv_sec = time(NULL) + priv->rollover_time;
        tv.tv_nsec = 0;
        ret = 0; /* Reset ret to zero */

        /* The race between actual rollover and explicit rollover is
         * handled. If actual rollover is being done and the
         * explicit rollover event comes, the event is not missed.
         * Since explicit rollover sets 'cr.notify' to true, this
         * thread doesn't wait on 'pthread_cond_timedwait'.
         */
        pthread_cleanup_push(changelog_cleanup_free_mutex, &priv->cr.lock);
        pthread_mutex_lock(&priv->cr.lock);
        {
            while (ret == 0 && !priv->cr.notify)
                ret = pthread_cond_timedwait(&priv->cr.cond, &priv->cr.lock,
                                             &tv);
            if (ret == 0)
                priv->cr.notify = _gf_false;
        }
        pthread_mutex_unlock(&priv->cr.lock);
        pthread_cleanup_pop(0);

        if (ret == 0) {
            gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO,
                   "Explicit wakeup on barrier notify");
            priv->explicit_rollover = _gf_true;
        } else if (ret && ret != ETIMEDOUT) {
            gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_SELECT_FAILED,
                   "pthread_cond_timedwait failed");
            continue;
        } else if (ret && ret == ETIMEDOUT) {
            gf_msg_debug(this->name, 0, "Wokeup on timeout");
        }

        /* Reading curent_color without lock is fine here
         * as it is only modified here and is next to reading.
         */
        if (priv->current_color == FOP_COLOR_BLACK) {
            LOCK(&priv->lock);
            priv->current_color = FOP_COLOR_WHITE;
            UNLOCK(&priv->lock);
            gf_msg_debug(this->name, 0,
                         "Black fops"
                         " to be drained:%ld",
                         priv->dm.black_fop_cnt);
            changelog_drain_black_fops(this, priv);
        } else {
            LOCK(&priv->lock);
            priv->current_color = FOP_COLOR_BLACK;
            UNLOCK(&priv->lock);
            gf_msg_debug(this->name, 0,
                         "White fops"
                         " to be drained:%ld",
                         priv->dm.white_fop_cnt);
            changelog_drain_white_fops(this, priv);
        }

        /* Adding delay of 1 second only during explicit rollover:
         *
         * Changelog rollover can happen either due to actual
         * or the explicit rollover during snapshot. Actual
         * rollover is controlled by tuneable called 'rollover-time'.
         * The minimum granularity for rollover-time is 1 second.
         * Explicit rollover is asynchronous in nature and happens
         * during snapshot.
         *
         * Basically, rollover renames the current CHANGELOG file
         * to CHANGELOG.TIMESTAMP. Let's assume, at time 't1',
         * actual and explicit rollover raced against  each
         * other and actual rollover won the race renaming the
         * CHANGELOG file to CHANGELOG.t1 and opens a new
         * CHANGELOG file. There is high chance that, an immediate
         * explicit rollover at time 't1' can happen with in the same
         * second to rename CHANGELOG file to CHANGELOG.t1 resulting in
         * purging the earlier CHANGELOG.t1 file created by actual
         * rollover. So adding a delay of 1 second guarantees unique
         * CHANGELOG.TIMESTAMP during  explicit rollover.
         */
        if (priv->explicit_rollover == _gf_true)
            sleep(1);

        ret = changelog_fill_rollover_data(&cld, _gf_false);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0,
                   CHANGELOG_MSG_GET_TIME_OP_FAILED,
                   "failed to fill rollover data");
            continue;
        }

        _mask_cancellation();

        LOCK(&priv->lock);
        {
            ret = changelog_inject_single_event(this, priv, &cld);
            if (!ret)
                SLICE_VERSION_UPDATE(slice);
        }
        UNLOCK(&priv->lock);

        _unmask_cancellation();
    }

    return NULL;
}

void *
changelog_fsync_thread(void *data)
{
    int ret = 0;
    xlator_t *this = NULL;
    struct timeval tv = {
        0,
    };
    changelog_log_data_t cld = {
        0,
    };
    changelog_priv_t *priv = data;

    this = priv->cf.this;
    cld.cld_type = CHANGELOG_TYPE_FSYNC;

    while (1) {
        (void)pthread_testcancel();

        tv.tv_sec = priv->fsync_interval;
        tv.tv_usec = 0;

        ret = select(0, NULL, NULL, NULL, &tv);
        if (ret)
            continue;

        _mask_cancellation();

        ret = changelog_inject_single_event(this, priv, &cld);
        if (ret)
            gf_msg(this->name, GF_LOG_ERROR, 0,
                   CHANGELOG_MSG_INJECT_FSYNC_FAILED,
                   "failed to inject fsync event");

        _unmask_cancellation();
    }

    return NULL;
}

/* macros for inode/changelog version checks */

#define INODE_VERSION_UPDATE(priv, inode, iver, slice, type)                   \
    do {                                                                       \
        LOCK(&inode->lock);                                                    \
        {                                                                      \
            LOCK(&priv->lock);                                                 \
            {                                                                  \
                *iver = slice->changelog_version[type];                        \
            }                                                                  \
            UNLOCK(&priv->lock);                                               \
        }                                                                      \
        UNLOCK(&inode->lock);                                                  \
    } while (0)

#define INODE_VERSION_EQUALS_SLICE(priv, ver, slice, type, upd)                \
    do {                                                                       \
        LOCK(&priv->lock);                                                     \
        {                                                                      \
            upd = (ver == slice->changelog_version[type]) ? _gf_false          \
                                                          : _gf_true;          \
        }                                                                      \
        UNLOCK(&priv->lock);                                                   \
    } while (0)

static int
__changelog_inode_ctx_set(xlator_t *this, inode_t *inode,
                          changelog_inode_ctx_t *ctx)
{
    uint64_t ctx_addr = (uint64_t)(uintptr_t)ctx;
    return __inode_ctx_set(inode, this, &ctx_addr);
}

/**
 * one shot routine to get the address and the value of a inode version
 * for a particular type.
 */
changelog_inode_ctx_t *
__changelog_inode_ctx_get(xlator_t *this, inode_t *inode, unsigned long **iver,
                          unsigned long *version, changelog_log_type type)
{
    int ret = 0;
    uint64_t ctx_addr = 0;
    changelog_inode_ctx_t *ctx = NULL;

    ret = __inode_ctx_get(inode, this, &ctx_addr);
    if (ret < 0)
        ctx_addr = 0;
    if (ctx_addr != 0) {
        ctx = (changelog_inode_ctx_t *)(long)ctx_addr;
        goto out;
    }

    ctx = GF_CALLOC(1, sizeof(*ctx), gf_changelog_mt_inode_ctx_t);
    if (!ctx)
        goto out;

    ret = __changelog_inode_ctx_set(this, inode, ctx);
    if (ret) {
        GF_FREE(ctx);
        ctx = NULL;
    }

out:
    if (ctx && iver && version) {
        *iver = CHANGELOG_INODE_VERSION_TYPE(ctx, type);
        *version = **iver;
    }

    return ctx;
}

static changelog_inode_ctx_t *
changelog_inode_ctx_get(xlator_t *this, inode_t *inode, unsigned long **iver,
                        unsigned long *version, changelog_log_type type)
{
    changelog_inode_ctx_t *ctx = NULL;

    LOCK(&inode->lock);
    {
        ctx = __changelog_inode_ctx_get(this, inode, iver, version, type);
    }
    UNLOCK(&inode->lock);

    return ctx;
}

/**
 * This is the main update routine. Locking has been made granular so as to
 * maximize parallelism of fops - I'll try to explain it below using execution
 * timelines.
 *
 * Basically, the contention is between multiple execution threads of this
 * routine and the roll-over thread. So, instead of having a big lock, we hold
 * granular locks: inode->lock and priv->lock. Now I'll explain what happens
 * when there is an update and a roll-over at just about the same time.
 * NOTE:
 *  - the dispatcher itself synchronizes updates via it's own lock
 *  - the slice version in incremented by the roll-over thread
 *
 * Case 1: When the rollover thread wins before the inode version can be
 * compared with the slice version.
 *
 *          [updater]                 |             [rollover]
 *                                    |
 *                                    |           <SLICE: 1, 1, 1>
 * <changelog_update>                 |
 *   <changelog_inode_ctx_get>        |
 *      <CTX: 1, 1, 1>                |
 *                                    |         <dispatch-rollover-event>
 *                                    |         LOCK (&priv->lock)
 *                                    |            <SLICE_VERSION_UPDATE>
 *                                    |              <SLICE: 2, 2, 2>
 *                                    |         UNLOCK (&priv->lock)
 *                                    |
 * LOCK (&priv->lock)                 |
 *   <INODE_VERSION_EQUALS_SLICE>     |
 *    I: 1 <-> S: 2                   |
 *    update: true                    |
 * UNLOCK (&priv->lock)               |
 *                                    |
 * <if update == true>                |
 *  <dispath-update-event>            |
 *  <INODE_VERSION_UPDATE>            |
 *   LOCK (&inode->lock)              |
 *    LOCK (&priv->lock)              |
 *     <CTX: 2, 1, 1>                 |
 *    UNLOCK (&priv->lock)            |
 *   UNLOCK (&inode->lock)            |
 *
 * Therefore, the change gets recorded in the next change (no lost change). If
 * the slice version was ahead of the inode version (say I:1, S: 2), then
 * anyway the comparison would result in a update (I: 1, S: 3).
 *
 * If the rollover time is too less, then there is another contention when the
 * updater tries to bring up inode version to the slice version (this is also
 * the case when the roll-over thread wakes up during INODE_VERSION_UPDATE.
 *
 *   <CTX: 1, 1, 1>                   |       <SLICE: 2, 2, 2>
 *                                    |
 *                                    |
 * <dispath-update-event>             |
 * <INODE_VERSION_UPDATE>             |
 *  LOCK (&inode->lock)               |
 *   LOCK (&priv->lock)               |
 *    <CTX: 2, 1, 1>                  |
 *   UNLOCK (&priv->lock)             |
 *  UNLOCK (&inode->lock)             |
 *                                    |         <dispatch-rollover-event>
 *                                    |         LOCK (&priv->lock)
 *                                    |            <SLICE_VERSION_UPDATE>
 *                                    |              <SLICE: 3, 3, 3>
 *                                    |         UNLOCK (&priv->lock)
 *
 *
 * Case 2: When the fop thread wins
 *
 *          [updater]                 |             [rollover]
 *                                    |
 *                                    |           <SLICE: 1, 1, 1>
 * <changelog_update>                 |
 *   <changelog_inode_ctx_get>        |
 *      <CTX: 0, 0, 0>                |
 *                                    |
 * LOCK (&priv->lock)                 |
 *   <INODE_VERSION_EQUALS_SLICE>     |
 *    I: 0 <-> S: 1                   |
 *    update: true                    |
 * UNLOCK (&priv->lock)               |
 *                                    |         <dispatch-rollover-event>
 *                                    |         LOCK (&priv->lock)
 *                                    |            <SLICE_VERSION_UPDATE>
 *                                    |              <SLICE: 2, 2, 2>
 *                                    |         UNLOCK (&priv->lock)
 * <if update == true>                |
 *  <dispath-update-event>            |
 *  <INODE_VERSION_UPDATE>            |
 *   LOCK (&inode->lock)              |
 *    LOCK (&priv->lock)              |
 *     <CTX: 2, 0, 0>                 |
 *    UNLOCK (&priv->lock)            |
 *   UNLOCK (&inode->lock)            |
 *
 * Here again, if the inode version was equal to the slice version (I: 1, S: 1)
 * then there is no need to record an update (as the equality of the two version
 * signifies an update was recorded in the current time slice).
 */
void
changelog_update(xlator_t *this, changelog_priv_t *priv,
                 changelog_local_t *local, changelog_log_type type)
{
    int ret = 0;
    unsigned long *iver = NULL;
    unsigned long version = 0;
    inode_t *inode = NULL;
    changelog_time_slice_t *slice = NULL;
    changelog_inode_ctx_t *ctx = NULL;
    changelog_log_data_t *cld_0 = NULL;
    changelog_log_data_t *cld_1 = NULL;
    changelog_local_t *next_local = NULL;
    gf_boolean_t need_upd = _gf_true;

    slice = &priv->slice;

    /**
     * for fops that do not require inode version checking
     */
    if (local->update_no_check)
        goto update;

    inode = local->inode;

    ctx = changelog_inode_ctx_get(this, inode, &iver, &version, type);
    if (!ctx)
        goto update;

    INODE_VERSION_EQUALS_SLICE(priv, version, slice, type, need_upd);

update:
    if (need_upd) {
        cld_0 = &local->cld;
        cld_0->cld_type = type;

        if ((next_local = local->prev_entry) != NULL) {
            cld_1 = &next_local->cld;
            cld_1->cld_type = type;
        }

        ret = priv->cd.dispatchfn(this, priv, priv->cd.cd_data, cld_0, cld_1);

        /**
         * update after the dispatcher has successfully done
         * it's job.
         */
        if (!local->update_no_check && iver && !ret)
            INODE_VERSION_UPDATE(priv, inode, iver, slice, type);
    }

    return;
}

/* Begin: Geo-rep snapshot dependency changes */

/* changelog_color_fop_and_inc_cnt: Assign color and inc fop cnt.
 *
 * Assigning color and increment of corresponding fop count should happen
 * in a lock (i.e., there should be no window between them). If it does not,
 * we might miss draining those fops which are colored but not yet incremented
 * the count. Let's assume black fops are draining. If the black fop count
 * reaches zero, we say draining is completed but we miss black fops which are
 * not incremented fop count but color is assigned black.
 */

void
changelog_color_fop_and_inc_cnt(xlator_t *this, changelog_priv_t *priv,
                                changelog_local_t *local)
{
    if (!priv || !local)
        return;

    LOCK(&priv->lock);
    {
        local->color = priv->current_color;
        changelog_inc_fop_cnt(this, priv, local);
    }
    UNLOCK(&priv->lock);
}

/* Increments the respective fop counter based on the fop color */
void
changelog_inc_fop_cnt(xlator_t *this, changelog_priv_t *priv,
                      changelog_local_t *local)
{
    int ret = 0;

    if (local) {
        if (local->color == FOP_COLOR_BLACK) {
            ret = pthread_mutex_lock(&priv->dm.drain_black_mutex);
            CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
            {
                priv->dm.black_fop_cnt++;
            }
            ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
            CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
        } else {
            ret = pthread_mutex_lock(&priv->dm.drain_white_mutex);
            CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
            {
                priv->dm.white_fop_cnt++;
            }
            ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
            CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
        }
    }
out:
    return;
}

/* Decrements the respective fop counter based on the fop color */
void
changelog_dec_fop_cnt(xlator_t *this, changelog_priv_t *priv,
                      changelog_local_t *local)
{
    int ret = 0;

    if (local) {
        if (local->color == FOP_COLOR_BLACK) {
            ret = pthread_mutex_lock(&priv->dm.drain_black_mutex);
            CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
            {
                priv->dm.black_fop_cnt--;
                if (priv->dm.black_fop_cnt == 0 &&
                    priv->dm.drain_wait_black == _gf_true) {
                    ret = pthread_cond_signal(&priv->dm.drain_black_cond);
                    CHANGELOG_PTHREAD_ERROR_HANDLE_2(
                        ret, out, priv->dm.drain_black_mutex);
                    gf_msg_debug(this->name, 0,
                                 "Signalled "
                                 "draining of black");
                }
            }
            ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex);
            CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
        } else {
            ret = pthread_mutex_lock(&priv->dm.drain_white_mutex);
            CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
            {
                priv->dm.white_fop_cnt--;
                if (priv->dm.white_fop_cnt == 0 &&
                    priv->dm.drain_wait_white == _gf_true) {
                    ret = pthread_cond_signal(&priv->dm.drain_white_cond);
                    CHANGELOG_PTHREAD_ERROR_HANDLE_2(
                        ret, out, priv->dm.drain_white_mutex);
                    gf_msg_debug(this->name, 0,
                                 "Signalled "
                                 "draining of white");
                }
            }
            ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex);
            CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
        }
    }
out:
    return;
}

/* Write to a pipe setup between changelog main thread and changelog
 * rollover thread to initiate explicit rollover of changelog journal.
 */
int
changelog_barrier_notify(changelog_priv_t *priv, char *buf)
{
    int ret = 0;

    pthread_mutex_lock(&priv->cr.lock);
    {
        ret = pthread_cond_signal(&priv->cr.cond);
        priv->cr.notify = _gf_true;
    }
    pthread_mutex_unlock(&priv->cr.lock);
    return ret;
}

/* Clean up flags set on barrier notification */
void
changelog_barrier_cleanup(xlator_t *this, changelog_priv_t *priv,
                          struct list_head *queue)
{
    int ret = 0;

    LOCK(&priv->bflags.lock);
    priv->bflags.barrier_ext = _gf_false;
    UNLOCK(&priv->bflags.lock);

    ret = pthread_mutex_lock(&priv->bn.bnotify_mutex);
    CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);
    {
        priv->bn.bnotify = _gf_false;
    }
    ret = pthread_mutex_unlock(&priv->bn.bnotify_mutex);
    CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, out);

    /* Disable changelog barrier and dequeue fops */
    LOCK(&priv->lock);
    {
        if (priv->barrier_enabled == _gf_true)
            __chlog_barrier_disable(this, queue);
        else
            ret = -1;
    }
    UNLOCK(&priv->lock);
    if (ret == 0)
        chlog_barrier_dequeue_all(this, queue);

out:
    return;
}
/* End: Geo-Rep snapshot dependency changes */

int32_t
changelog_fill_entry_buf(call_frame_t *frame, xlator_t *this, loc_t *loc,
                         changelog_local_t **local)
{
    changelog_opt_t *co = NULL;
    size_t xtra_len = 0;
    char *dup_path = NULL;
    char *bname = NULL;
    inode_t *parent = NULL;

    GF_ASSERT(this);

    parent = inode_parent(loc->inode, 0, 0);
    if (!parent) {
        gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_INODE_NOT_FOUND,
                "Parent inode not found", "gfid=%s",
                uuid_utoa(loc->inode->gfid), NULL);
        goto err;
    }

    CHANGELOG_INIT_NOCHECK(this, *local, loc->inode, loc->inode->gfid, 5);
    if (!(*local)) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_LOCAL_INIT_FAILED,
               "changelog local"
               " initiatilization failed");
        goto err;
    }

    co = changelog_get_usable_buffer(*local);
    if (!co) {
        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_NO_MEMORY,
               "Failed to get buffer");
        goto err;
    }

    if (loc->inode->ia_type == IA_IFDIR) {
        CHANGLOG_FILL_FOP_NUMBER(co, GF_FOP_MKDIR, fop_fn, xtra_len);
        co++;
        CHANGELOG_FILL_UINT32(co, S_IFDIR | 0755, number_fn, xtra_len);
        co++;
    } else {
        CHANGLOG_FILL_FOP_NUMBER(co, GF_FOP_CREATE, fop_fn, xtra_len);
        co++;
        CHANGELOG_FILL_UINT32(co, S_IFREG | 0644, number_fn, xtra_len);
        co++;
    }

    CHANGELOG_FILL_UINT32(co, frame->root->uid, number_fn, xtra_len);
    co++;

    CHANGELOG_FILL_UINT32(co, frame->root->gid, number_fn, xtra_len);
    co++;

    dup_path = gf_strdup(loc->path);
    bname = basename(dup_path);

    CHANGELOG_FILL_ENTRY(co, parent->gfid, bname, entry_fn, entry_free_fn,
                         xtra_len, err);
    changelog_set_usable_record_and_length(*local, xtra_len, 5);

    if (dup_path)
        GF_FREE(dup_path);
    if (parent)
        inode_unref(parent);
    return 0;

err:
    if (dup_path)
        GF_FREE(dup_path);
    if (parent)
        inode_unref(parent);
    return -1;
}

/*
 * resolve_pargfid_to_path:
 *      It converts given pargfid to path by doing recursive readlinks at the
 * backend. If bname is given, it suffixes bname to pargfid to form the
 * complete path else it doesn't. It allocates memory for the path and is
 * caller's responsibility to free the same. If bname is NULL and pargfid
 * is ROOT, then it returns "."
 */

int
resolve_pargfid_to_path(xlator_t *this, const uuid_t pgfid, char **path,
                        char *bname)
{
    char *linkname = NULL;
    char *dir_handle = NULL;
    char *pgfidstr = NULL;
    char *saveptr = NULL;
    ssize_t len = 0;
    int ret = 0;
    uuid_t tmp_gfid = {
        0,
    };
    uuid_t pargfid = {
        0,
    };
    changelog_priv_t *priv = NULL;
    char gpath[PATH_MAX] = {
        0,
    };
    char result[PATH_MAX] = {
        0,
    };
    char *dir_name = NULL;
    char pre_dir_name[PATH_MAX] = {
        0,
    };

    GF_ASSERT(this);
    priv = this->private;
    GF_ASSERT(priv);

    gf_uuid_copy(pargfid, pgfid);
    if (!path || gf_uuid_is_null(pargfid)) {
        ret = -1;
        goto out;
    }

    if (__is_root_gfid(pargfid)) {
        if (bname)
            *path = gf_strdup(bname);
        else
            *path = gf_strdup(".");
        return ret;
    }

    dir_handle = alloca(PATH_MAX);
    linkname = alloca(PATH_MAX);
    (void)snprintf(gpath, PATH_MAX, "%s/.glusterfs/", priv->changelog_brick);

    while (!(__is_root_gfid(pargfid))) {
        len = snprintf(dir_handle, PATH_MAX, "%s/%02x/%02x/%s", gpath,
                       pargfid[0], pargfid[1], uuid_utoa(pargfid));
        if ((len < 0) || (len >= PATH_MAX)) {
            ret = -1;
            goto out;
        }

        len = sys_readlink(dir_handle, linkname, PATH_MAX);
        if (len < 0) {
            gf_smsg(this->name, GF_LOG_ERROR, errno,
                    CHANGELOG_MSG_READLINK_OP_FAILED,
                    "could not read the "
                    "link from the gfid handle",
                    "handle=%s", dir_handle, NULL);
            ret = -1;
            goto out;
        }

        linkname[len] = '\0';

        pgfidstr = strtok_r(linkname + strlen("../../00/00/"), "/", &saveptr);
        dir_name = strtok_r(NULL, "/", &saveptr);

        len = snprintf(result, PATH_MAX, "%s/%s", dir_name, pre_dir_name);
        if ((len < 0) || (len >= PATH_MAX)) {
            ret = -1;
            goto out;
        }
        if (snprintf(pre_dir_name, len + 1, "%s", result) >= len + 1) {
            ret = -1;
            goto out;
        }

        gf_uuid_parse(pgfidstr, tmp_gfid);
        gf_uuid_copy(pargfid, tmp_gfid);
    }

    if (bname)
        strncat(result, bname, strlen(bname) + 1);

    *path = gf_strdup(result);

out:
    return ret;
}