Blob Blame History Raw
/*
  Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include <dlfcn.h>

#include "dht-common.h"
#include "tier.h"
#include "tier-common.h"
#include <glusterfs/syscall.h>
#include <glusterfs/events.h>
#include "tier-ctr-interface.h"

/*Hard coded DB info*/
static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3;
/*Hard coded DB info*/

/*Mutex for updating the data movement stats*/
static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER;

/* Stores the path location of promotion query files */
static char *promotion_qfile;
/* Stores the path location of demotion query files */
static char *demotion_qfile;

static void *libhandle;
static gfdb_methods_t gfdb_methods;

#define DB_QUERY_RECORD_SIZE 4096

/*
 * Closes all the fds and frees the qfile_array
 * */
static void
qfile_array_free(tier_qfile_array_t *qfile_array)
{
    ssize_t i = 0;

    if (qfile_array) {
        if (qfile_array->fd_array) {
            for (i = 0; i < qfile_array->array_size; i++) {
                if (qfile_array->fd_array[i] != -1) {
                    sys_close(qfile_array->fd_array[i]);
                }
            }
        }
        GF_FREE(qfile_array->fd_array);
    }
    GF_FREE(qfile_array);
}

/* Create a new query file list with given size */
static tier_qfile_array_t *
qfile_array_new(ssize_t array_size)
{
    int ret = -1;
    tier_qfile_array_t *qfile_array = NULL;
    ssize_t i = 0;

    GF_VALIDATE_OR_GOTO("tier", (array_size > 0), out);

    qfile_array = GF_CALLOC(1, sizeof(tier_qfile_array_t),
                            gf_tier_mt_qfile_array_t);
    if (!qfile_array) {
        gf_msg("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to allocate memory for tier_qfile_array_t");
        goto out;
    }

    qfile_array->fd_array = GF_MALLOC(array_size * sizeof(int),
                                      gf_dht_mt_int32_t);
    if (!qfile_array->fd_array) {
        gf_msg("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to allocate memory for "
               "tier_qfile_array_t->fd_array");
        goto out;
    }

    /* Init all the fds to -1 */
    for (i = 0; i < array_size; i++) {
        qfile_array->fd_array[i] = -1;
    }

    qfile_array->array_size = array_size;
    qfile_array->next_index = 0;

    /* Set exhausted count to list size as the list is empty */
    qfile_array->exhausted_count = qfile_array->array_size;

    ret = 0;
out:
    if (ret) {
        qfile_array_free(qfile_array);
        qfile_array = NULL;
    }
    return qfile_array;
}

/* Checks if the query file list is empty or totally exhausted. */
static gf_boolean_t
is_qfile_array_empty(tier_qfile_array_t *qfile_array)
{
    return (qfile_array->exhausted_count == qfile_array->array_size)
               ? _gf_true
               : _gf_false;
}

/* Shifts the next_fd pointer to the next available fd in the list */
static void
shift_next_index(tier_qfile_array_t *qfile_array)
{
    int qfile_fd = 0;
    int spin_count = 0;

    if (is_qfile_array_empty(qfile_array)) {
        return;
    }

    do {
        /* change next_index in a rotional manner */
        (qfile_array->next_index == (qfile_array->array_size - 1))
            ? qfile_array->next_index = 0
            : qfile_array->next_index++;

        qfile_fd = (qfile_array->fd_array[qfile_array->next_index]);

        spin_count++;

    } while ((qfile_fd == -1) && (spin_count < qfile_array->array_size));
}

/*
 * This is a non-thread safe function to read query records
 * from a list of query files in a Round-Robin manner.
 * As in when the query files get exhuasted they are closed.
 * Returns:
 * 0   if all the query records in all the query files of the list are
 *     exhausted.
 * > 0 if a query record is successfully read. Indicates the size of the query
 *     record read.
 * < 0  if there was failure
 * */
static int
read_query_record_list(tier_qfile_array_t *qfile_array,
                       gfdb_query_record_t **query_record)
{
    int ret = -1;
    int qfile_fd = 0;

    GF_VALIDATE_OR_GOTO("tier", qfile_array, out);
    GF_VALIDATE_OR_GOTO("tier", qfile_array->fd_array, out);

    do {
        if (is_qfile_array_empty(qfile_array)) {
            ret = 0;
            break;
        }

        qfile_fd = qfile_array->fd_array[qfile_array->next_index];
        ret = gfdb_methods.gfdb_read_query_record(qfile_fd, query_record);
        if (ret <= 0) {
            /*The qfile_fd has reached EOF or
             * there was an error.
             * 1. Close the exhausted fd
             * 2. increment the exhausted count
             * 3. shift next_qfile to next qfile
             **/
            sys_close(qfile_fd);
            qfile_array->fd_array[qfile_array->next_index] = -1;
            qfile_array->exhausted_count++;
            /* shift next_qfile to next qfile */
            shift_next_index(qfile_array);
            continue;
        } else {
            /* shift next_qfile to next qfile */
            shift_next_index(qfile_array);
            break;
        }
    } while (1);
out:
    return ret;
}

/* Check and update the watermark every WM_INTERVAL seconds */
#define WM_INTERVAL 5
#define WM_INTERVAL_EMERG 1

static int
tier_check_same_node(xlator_t *this, loc_t *loc, gf_defrag_info_t *defrag)
{
    int ret = -1;
    dict_t *dict = NULL;
    char *uuid_str = NULL;
    uuid_t node_uuid = {
        0,
    };

    GF_VALIDATE_OR_GOTO("tier", this, out);
    GF_VALIDATE_OR_GOTO(this->name, loc, out);
    GF_VALIDATE_OR_GOTO(this->name, defrag, out);

    if (syncop_getxattr(this, loc, &dict, GF_XATTR_NODE_UUID_KEY, NULL, NULL)) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Unable to get NODE_UUID_KEY %s %s\n", loc->name, loc->path);
        goto out;
    }

    if (dict_get_str(dict, GF_XATTR_NODE_UUID_KEY, &uuid_str) < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to get node-uuids for %s", loc->path);
        goto out;
    }

    if (gf_uuid_parse(uuid_str, node_uuid)) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "uuid_parse failed for %s", loc->path);
        goto out;
    }

    if (gf_uuid_compare(node_uuid, defrag->node_uuid)) {
        gf_msg_debug(this->name, 0, "%s does not belong to this node",
                     loc->path);
        ret = 1;
        goto out;
    }

    ret = 0;
out:
    if (dict)
        dict_unref(dict);

    return ret;
}

int
tier_get_fs_stat(xlator_t *this, loc_t *root_loc)
{
    int ret = 0;
    gf_defrag_info_t *defrag = NULL;
    dht_conf_t *conf = NULL;
    dict_t *xdata = NULL;
    struct statvfs statfs = {
        0,
    };
    gf_tier_conf_t *tier_conf = NULL;

    conf = this->private;
    if (!conf) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_STATUS,
               "conf is NULL");
        ret = -1;
        goto exit;
    }

    defrag = conf->defrag;
    if (!defrag) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_STATUS,
               "defrag is NULL");
        ret = -1;
        goto exit;
    }

    tier_conf = &defrag->tier_conf;

    xdata = dict_new();
    if (!xdata) {
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY,
               "failed to allocate dictionary");
        ret = -1;
        goto exit;
    }

    ret = dict_set_int8(xdata, GF_INTERNAL_IGNORE_DEEM_STATFS, 1);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
               "Failed to set " GF_INTERNAL_IGNORE_DEEM_STATFS " in dict");
        ret = -1;
        goto exit;
    }

    /* Find how much free space is on the hot subvolume.
     * Then see if that value */
    /* is less than or greater than user defined watermarks.
     * Stash results in */
    /* the tier_conf data structure. */

    ret = syncop_statfs(conf->subvolumes[1], root_loc, &statfs, xdata, NULL);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_LOG_TIER_STATUS,
               "Unable to obtain statfs.");
        goto exit;
    }

    pthread_mutex_lock(&dm_stat_mutex);

    tier_conf->block_size = statfs.f_bsize;
    tier_conf->blocks_total = statfs.f_blocks;
    tier_conf->blocks_used = statfs.f_blocks - statfs.f_bfree;

    tier_conf->percent_full = GF_PERCENTAGE(tier_conf->blocks_used,
                                            statfs.f_blocks);
    pthread_mutex_unlock(&dm_stat_mutex);

exit:
    if (xdata)
        dict_unref(xdata);
    return ret;
}

static void
tier_send_watermark_event(const char *volname, tier_watermark_op_t old_wm,
                          tier_watermark_op_t new_wm)
{
    if (old_wm == TIER_WM_LOW || old_wm == TIER_WM_NONE) {
        if (new_wm == TIER_WM_MID) {
            gf_event(EVENT_TIER_WATERMARK_RAISED_TO_MID, "vol=%s", volname);
        } else if (new_wm == TIER_WM_HI) {
            gf_event(EVENT_TIER_WATERMARK_HI, "vol=%s", volname);
        }
    } else if (old_wm == TIER_WM_MID) {
        if (new_wm == TIER_WM_LOW) {
            gf_event(EVENT_TIER_WATERMARK_DROPPED_TO_LOW, "vol=%s", volname);
        } else if (new_wm == TIER_WM_HI) {
            gf_event(EVENT_TIER_WATERMARK_HI, "vol=%s", volname);
        }
    } else if (old_wm == TIER_WM_HI) {
        if (new_wm == TIER_WM_MID) {
            gf_event(EVENT_TIER_WATERMARK_DROPPED_TO_MID, "vol=%s", volname);
        } else if (new_wm == TIER_WM_LOW) {
            gf_event(EVENT_TIER_WATERMARK_DROPPED_TO_LOW, "vol=%s", volname);
        }
    }
}

int
tier_check_watermark(xlator_t *this)
{
    int ret = -1;
    gf_defrag_info_t *defrag = NULL;
    dht_conf_t *conf = NULL;
    gf_tier_conf_t *tier_conf = NULL;
    tier_watermark_op_t wm = TIER_WM_NONE;

    conf = this->private;
    if (!conf)
        goto exit;

    defrag = conf->defrag;
    if (!defrag)
        goto exit;

    tier_conf = &defrag->tier_conf;

    if (tier_conf->percent_full < tier_conf->watermark_low) {
        wm = TIER_WM_LOW;

    } else if (tier_conf->percent_full < tier_conf->watermark_hi) {
        wm = TIER_WM_MID;

    } else {
        wm = TIER_WM_HI;
    }

    if (wm != tier_conf->watermark_last) {
        tier_send_watermark_event(tier_conf->volname, tier_conf->watermark_last,
                                  wm);

        tier_conf->watermark_last = wm;
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
               "Tier watermark now %d", wm);
    }

    ret = 0;

exit:
    return ret;
}

static gf_boolean_t
is_hot_tier_full(gf_tier_conf_t *tier_conf)
{
    if (tier_conf && (tier_conf->mode == TIER_MODE_WM) &&
        (tier_conf->watermark_last == TIER_WM_HI))
        return _gf_true;

    return _gf_false;
}

int
tier_do_migration(xlator_t *this, int promote)
{
    gf_defrag_info_t *defrag = NULL;
    dht_conf_t *conf = NULL;
    long rand = 0;
    int migrate = 0;
    gf_tier_conf_t *tier_conf = NULL;

    conf = this->private;
    if (!conf)
        goto exit;

    defrag = conf->defrag;
    if (!defrag)
        goto exit;

    if (tier_check_watermark(this) != 0) {
        gf_msg(this->name, GF_LOG_CRITICAL, errno, DHT_MSG_LOG_TIER_ERROR,
               "Failed to get watermark");
        goto exit;
    }

    tier_conf = &defrag->tier_conf;

    switch (tier_conf->watermark_last) {
        case TIER_WM_LOW:
            migrate = promote ? 1 : 0;
            break;
        case TIER_WM_HI:
            migrate = promote ? 0 : 1;
            break;
        case TIER_WM_MID:
            /* coverity[DC.WEAK_CRYPTO] */
            rand = random() % 100;
            if (promote) {
                migrate = (rand > tier_conf->percent_full);
            } else {
                migrate = (rand <= tier_conf->percent_full);
            }
            break;
    }

exit:
    return migrate;
}

int
tier_migrate(xlator_t *this, int is_promotion, dict_t *migrate_data, loc_t *loc,
             gf_tier_conf_t *tier_conf)
{
    int ret = -1;

    pthread_mutex_lock(&tier_conf->pause_mutex);
    if (is_promotion)
        tier_conf->promote_in_progress = 1;
    else
        tier_conf->demote_in_progress = 1;
    pthread_mutex_unlock(&tier_conf->pause_mutex);

    /* Data migration */
    ret = syncop_setxattr(this, loc, migrate_data, 0, NULL, NULL);

    pthread_mutex_lock(&tier_conf->pause_mutex);
    if (is_promotion)
        tier_conf->promote_in_progress = 0;
    else
        tier_conf->demote_in_progress = 0;
    pthread_mutex_unlock(&tier_conf->pause_mutex);

    return ret;
}

/* returns  _gf_true: if file can be promoted
 * returns _gf_false: if file cannot be promoted
 */
static gf_boolean_t
tier_can_promote_file(xlator_t *this, char const *file_name,
                      struct iatt *current, gf_defrag_info_t *defrag)
{
    gf_boolean_t ret = _gf_false;
    fsblkcnt_t estimated_usage = 0;

    if (defrag->tier_conf.tier_max_promote_size &&
        (current->ia_size > defrag->tier_conf.tier_max_promote_size)) {
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
               "File %s (gfid:%s) with size (%" PRIu64
               ") exceeds maxsize "
               "(%d) for promotion. File will not be promoted.",
               file_name, uuid_utoa(current->ia_gfid), current->ia_size,
               defrag->tier_conf.tier_max_promote_size);
        goto err;
    }

    /* bypass further validations for TEST mode */
    if (defrag->tier_conf.mode != TIER_MODE_WM) {
        ret = _gf_true;
        goto err;
    }

    /* convert the file size to blocks as per the block size of the
     * destination tier
     * NOTE: add (block_size - 1) to get the correct block size when
     *       there is a remainder after a modulo
     */
    estimated_usage = ((current->ia_size + defrag->tier_conf.block_size - 1) /
                       defrag->tier_conf.block_size) +
                      defrag->tier_conf.blocks_used;

    /* test if the estimated block usage goes above HI watermark */
    if (GF_PERCENTAGE(estimated_usage, defrag->tier_conf.blocks_total) >=
        defrag->tier_conf.watermark_hi) {
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
               "Estimated block count consumption on "
               "hot tier (%" PRIu64
               ") exceeds hi watermark (%d%%). "
               "File will not be promoted.",
               estimated_usage, defrag->tier_conf.watermark_hi);
        goto err;
    }
    ret = _gf_true;
err:
    return ret;
}

static int
tier_set_migrate_data(dict_t *migrate_data)
{
    int failed = 1;

    failed = dict_set_str(migrate_data, GF_XATTR_FILE_MIGRATE_KEY, "force");
    if (failed) {
        goto bail_out;
    }

    /* Flag to suggest the xattr call is from migrator */
    failed = dict_set_str(migrate_data, "from.migrator", "yes");
    if (failed) {
        goto bail_out;
    }

    /* Flag to suggest its a tiering migration
     * The reason for this dic key-value is that
     * promotions and demotions are multithreaded
     * so the original frame from gf_defrag_start()
     * is not carried. A new frame will be created when
     * we do syncop_setxattr(). This does not have the
     * frame->root->pid of the original frame. So we pass
     * this dic key-value when we do syncop_setxattr() to do
     * data migration and set the frame->root->pid to
     * GF_CLIENT_PID_TIER_DEFRAG in dht_setxattr() just before
     * calling dht_start_rebalance_task() */
    failed = dict_set_str(migrate_data, TIERING_MIGRATION_KEY, "yes");
    if (failed) {
        goto bail_out;
    }

    failed = 0;

bail_out:
    return failed;
}

static char *
tier_get_parent_path(xlator_t *this, loc_t *p_loc, struct iatt *par_stbuf,
                     int *per_link_status)
{
    int ret = -1;
    char *parent_path = NULL;
    dict_t *xdata_request = NULL;
    dict_t *xdata_response = NULL;

    xdata_request = dict_new();
    if (!xdata_request) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to create xdata_request dict");
        goto err;
    }
    ret = dict_set_int32(xdata_request, GET_ANCESTRY_PATH_KEY, 42);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to set value to dict : key %s \n",
               GET_ANCESTRY_PATH_KEY);
        goto err;
    }

    ret = syncop_lookup(this, p_loc, par_stbuf, NULL, xdata_request,
                        &xdata_response);
    /* When the parent gfid is a stale entry, the lookup
     * will fail and stop the demotion process.
     * The parent gfid can be stale when a huge folder is
     * deleted while the files within it are being migrated
     */
    if (ret == -ESTALE) {
        gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_STALE_LOOKUP,
               "Stale entry in parent lookup for %s", uuid_utoa(p_loc->gfid));
        *per_link_status = 1;
        goto err;
    } else if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_LOG_TIER_ERROR,
               "Error in parent lookup for %s", uuid_utoa(p_loc->gfid));
        *per_link_status = -1;
        goto err;
    }
    ret = dict_get_str(xdata_response, GET_ANCESTRY_PATH_KEY, &parent_path);
    if (ret || !parent_path) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to get parent path for %s", uuid_utoa(p_loc->gfid));
        *per_link_status = -1;
        goto err;
    }

err:
    if (xdata_request) {
        dict_unref(xdata_request);
    }

    if (xdata_response) {
        dict_unref(xdata_response);
        xdata_response = NULL;
    }

    return parent_path;
}

static int
tier_get_file_name_and_path(xlator_t *this, uuid_t gfid,
                            gfdb_link_info_t *link_info,
                            char const *parent_path, loc_t *loc,
                            int *per_link_status)
{
    int ret = -1;

    loc->name = gf_strdup(link_info->file_name);
    if (!loc->name) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Memory "
               "allocation failed for %s",
               uuid_utoa(gfid));
        *per_link_status = -1;
        goto err;
    }
    ret = gf_asprintf((char **)&(loc->path), "%s/%s", parent_path, loc->name);
    if (ret < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to "
               "construct file path for %s %s\n",
               parent_path, loc->name);
        *per_link_status = -1;
        goto err;
    }

    ret = 0;

err:
    return ret;
}

static int
tier_lookup_file(xlator_t *this, loc_t *p_loc, loc_t *loc, struct iatt *current,
                 int *per_link_status)
{
    int ret = -1;

    ret = syncop_lookup(this, loc, current, NULL, NULL, NULL);

    /* The file may be deleted even when the parent
     * is available and the lookup will
     * return a stale entry which would stop the
     * migration. so if its a stale entry, then skip
     * the file and keep migrating.
     */
    if (ret == -ESTALE) {
        gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_STALE_LOOKUP,
               "Stale lookup for %s", uuid_utoa(p_loc->gfid));
        *per_link_status = 1;
        goto err;
    } else if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_LOG_TIER_ERROR,
               "Failed to "
               "lookup file %s\n",
               loc->name);
        *per_link_status = -1;
        goto err;
    }
    ret = 0;

err:
    return ret;
}

static gf_boolean_t
tier_is_file_already_at_destination(xlator_t *src_subvol,
                                    query_cbk_args_t *query_cbk_args,
                                    dht_conf_t *conf, int *per_link_status)
{
    gf_boolean_t at_destination = _gf_true;

    if (src_subvol == NULL) {
        *per_link_status = 1;
        goto err;
    }
    if (query_cbk_args->is_promotion && src_subvol == conf->subvolumes[1]) {
        *per_link_status = 1;
        goto err;
    }

    if (!query_cbk_args->is_promotion && src_subvol == conf->subvolumes[0]) {
        *per_link_status = 1;
        goto err;
    }
    at_destination = _gf_false;

err:
    return at_destination;
}

static void
tier_update_migration_counters(query_cbk_args_t *query_cbk_args,
                               gf_defrag_info_t *defrag,
                               uint64_t *total_migrated_bytes, int *total_files)
{
    if (query_cbk_args->is_promotion) {
        defrag->total_files_promoted++;
        *total_migrated_bytes += defrag->tier_conf.st_last_promoted_size;
        pthread_mutex_lock(&dm_stat_mutex);
        defrag->tier_conf.blocks_used += defrag->tier_conf
                                             .st_last_promoted_size;
        pthread_mutex_unlock(&dm_stat_mutex);
    } else {
        defrag->total_files_demoted++;
        *total_migrated_bytes += defrag->tier_conf.st_last_demoted_size;
        pthread_mutex_lock(&dm_stat_mutex);
        defrag->tier_conf.blocks_used -= defrag->tier_conf.st_last_demoted_size;
        pthread_mutex_unlock(&dm_stat_mutex);
    }
    if (defrag->tier_conf.blocks_total) {
        pthread_mutex_lock(&dm_stat_mutex);
        defrag->tier_conf.percent_full = GF_PERCENTAGE(
            defrag->tier_conf.blocks_used, defrag->tier_conf.blocks_total);
        pthread_mutex_unlock(&dm_stat_mutex);
    }

    (*total_files)++;
}

static int
tier_migrate_link(xlator_t *this, dht_conf_t *conf, uuid_t gfid,
                  gfdb_link_info_t *link_info, gf_defrag_info_t *defrag,
                  query_cbk_args_t *query_cbk_args, dict_t *migrate_data,
                  int *per_link_status, int *total_files,
                  uint64_t *total_migrated_bytes)
{
    int ret = -1;
    struct iatt current = {
        0,
    };
    struct iatt par_stbuf = {
        0,
    };
    loc_t p_loc = {
        0,
    };
    loc_t loc = {
        0,
    };
    xlator_t *src_subvol = NULL;
    inode_t *linked_inode = NULL;
    char *parent_path = NULL;

    /* Lookup for parent and get the path of parent */
    gf_uuid_copy(p_loc.gfid, link_info->pargfid);
    p_loc.inode = inode_new(defrag->root_inode->table);
    if (!p_loc.inode) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to create reference to inode"
               " for %s",
               uuid_utoa(p_loc.gfid));

        *per_link_status = -1;
        goto err;
    }

    parent_path = tier_get_parent_path(this, &p_loc, &par_stbuf,
                                       per_link_status);
    if (!parent_path) {
        goto err;
    }

    linked_inode = inode_link(p_loc.inode, NULL, NULL, &par_stbuf);
    inode_unref(p_loc.inode);
    p_loc.inode = linked_inode;

    /* Preparing File Inode */
    gf_uuid_copy(loc.gfid, gfid);
    loc.inode = inode_new(defrag->root_inode->table);
    gf_uuid_copy(loc.pargfid, link_info->pargfid);
    loc.parent = inode_ref(p_loc.inode);

    /* Get filename and Construct file path */
    if (tier_get_file_name_and_path(this, gfid, link_info, parent_path, &loc,
                                    per_link_status) != 0) {
        goto err;
    }
    gf_uuid_copy(loc.parent->gfid, link_info->pargfid);

    /* lookup file inode */
    if (tier_lookup_file(this, &p_loc, &loc, &current, per_link_status) != 0) {
        goto err;
    }

    if (query_cbk_args->is_promotion) {
        if (!tier_can_promote_file(this, link_info->file_name, &current,
                                   defrag)) {
            *per_link_status = 1;
            goto err;
        }
    }

    linked_inode = inode_link(loc.inode, NULL, NULL, &current);
    inode_unref(loc.inode);
    loc.inode = linked_inode;

    /*
     * Do not promote/demote if file already is where it
     * should be. It means another brick moved the file
     * so is not an error. So we set per_link_status = 1
     * so that we ignore counting this.
     */
    src_subvol = dht_subvol_get_cached(this, loc.inode);

    if (tier_is_file_already_at_destination(src_subvol, query_cbk_args, conf,
                                            per_link_status)) {
        goto err;
    }

    gf_msg_debug(this->name, 0, "Tier %s: src_subvol %s file %s",
                 (query_cbk_args->is_promotion ? "promote" : "demote"),
                 src_subvol->name, loc.path);

    ret = tier_check_same_node(this, &loc, defrag);
    if (ret != 0) {
        if (ret < 0) {
            *per_link_status = -1;
            goto err;
        }
        ret = 0;
        /* By setting per_link_status to 1 we are
         * ignoring this status and will not be counting
         * this file for migration */
        *per_link_status = 1;
        goto err;
    }

    gf_uuid_copy(loc.gfid, loc.inode->gfid);

    if (gf_defrag_get_pause_state(&defrag->tier_conf) != TIER_RUNNING) {
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
               "Tiering paused. "
               "Exiting tier_migrate_link");
        goto err;
    }

    ret = tier_migrate(this, query_cbk_args->is_promotion, migrate_data, &loc,
                       &defrag->tier_conf);

    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_LOG_TIER_ERROR,
               "Failed to "
               "migrate %s ",
               loc.path);
        *per_link_status = -1;
        goto err;
    }

    tier_update_migration_counters(query_cbk_args, defrag, total_migrated_bytes,
                                   total_files);

    ret = 0;

err:
    GF_FREE((char *)loc.name);
    loc.name = NULL;
    loc_wipe(&loc);
    loc_wipe(&p_loc);

    if ((*total_files >= defrag->tier_conf.max_migrate_files) ||
        (*total_migrated_bytes > defrag->tier_conf.max_migrate_bytes)) {
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
               "Reached cycle migration limit."
               "migrated bytes %" PRId64 " files %d",
               *total_migrated_bytes, *total_files);
        ret = -1;
    }

    return ret;
}

static int
tier_migrate_using_query_file(void *_args)
{
    int ret = -1;
    query_cbk_args_t *query_cbk_args = (query_cbk_args_t *)_args;
    xlator_t *this = NULL;
    gf_defrag_info_t *defrag = NULL;
    gfdb_query_record_t *query_record = NULL;
    gfdb_link_info_t *link_info = NULL;
    dict_t *migrate_data = NULL;
    /*
     * per_file_status and per_link_status
     *  0  : success
     * -1 : failure
     *  1  : ignore the status and don't count for migration
     * */
    int per_file_status = 0;
    int per_link_status = 0;
    int total_status = 0;
    dht_conf_t *conf = NULL;
    uint64_t total_migrated_bytes = 0;
    int total_files = 0;
    loc_t root_loc = {0};
    gfdb_time_t start_time = {0};
    gfdb_time_t current_time = {0};
    int total_time = 0;
    int max_time = 0;
    gf_boolean_t emergency_demote_mode = _gf_false;

    GF_VALIDATE_OR_GOTO("tier", query_cbk_args, out);
    GF_VALIDATE_OR_GOTO("tier", query_cbk_args->this, out);
    this = query_cbk_args->this;
    GF_VALIDATE_OR_GOTO(this->name, query_cbk_args->defrag, out);
    GF_VALIDATE_OR_GOTO(this->name, query_cbk_args->qfile_array, out);
    GF_VALIDATE_OR_GOTO(this->name, this->private, out);

    conf = this->private;

    defrag = query_cbk_args->defrag;
    migrate_data = dict_new();
    if (!migrate_data)
        goto out;

    emergency_demote_mode = (!query_cbk_args->is_promotion &&
                             is_hot_tier_full(&defrag->tier_conf));

    if (tier_set_migrate_data(migrate_data) != 0) {
        goto out;
    }

    dht_build_root_loc(defrag->root_inode, &root_loc);

    ret = gettimeofday(&start_time, NULL);
    if (query_cbk_args->is_promotion) {
        max_time = defrag->tier_conf.tier_promote_frequency;
    } else {
        max_time = defrag->tier_conf.tier_demote_frequency;
    }

    /* Per file */
    while ((ret = read_query_record_list(query_cbk_args->qfile_array,
                                         &query_record)) != 0) {
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                   "Failed to fetch query record "
                   "from query file");
            goto out;
        }

        if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
            ret = -1;
            gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                   "Exiting tier migration as"
                   "defrag status is not started");
            goto out;
        }

        ret = gettimeofday(&current_time, NULL);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                   "Could not get current time.");
            goto out;
        }

        total_time = current_time.tv_sec - start_time.tv_sec;
        if (total_time > max_time) {
            gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
                   "Max cycle time reached. Exiting migration.");
            goto out;
        }

        per_file_status = 0;
        per_link_status = 0;

        if (gf_defrag_get_pause_state(&defrag->tier_conf) != TIER_RUNNING) {
            gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
                   "Tiering paused. "
                   "Exiting tier_migrate_using_query_file");
            break;
        }

        if (defrag->tier_conf.mode == TIER_MODE_WM) {
            ret = tier_get_fs_stat(this, &root_loc);
            if (ret != 0) {
                gfdb_methods.gfdb_query_record_free(query_record);
                query_record = NULL;
                gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_STATUS,
                       "tier_get_fs_stat() FAILED ... "
                       "skipping file migrations until next cycle");
                break;
            }

            if (!tier_do_migration(this, query_cbk_args->is_promotion)) {
                gfdb_methods.gfdb_query_record_free(query_record);
                query_record = NULL;

                /* We have crossed the high watermark. Stop processing
                 * files if this is a promotion cycle so demotion gets
                 * a chance to start if not already running*/

                if (query_cbk_args->is_promotion &&
                    is_hot_tier_full(&defrag->tier_conf)) {
                    gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
                           "High watermark crossed during "
                           "promotion. Exiting "
                           "tier_migrate_using_query_file");
                    break;
                }
                continue;
            }
        }

        per_link_status = 0;

        /* For now we only support single link migration. And we will
         * ignore other hard links in the link info list of query record
         * TODO: Multiple hard links migration */
        if (!list_empty(&query_record->link_list)) {
            link_info = list_first_entry(&query_record->link_list,
                                         gfdb_link_info_t, list);
        }
        if (link_info != NULL) {
            if (tier_migrate_link(this, conf, query_record->gfid, link_info,
                                  defrag, query_cbk_args, migrate_data,
                                  &per_link_status, &total_files,
                                  &total_migrated_bytes) != 0) {
                gf_msg(
                    this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
                    "%s failed for %s(gfid:%s)",
                    (query_cbk_args->is_promotion ? "Promotion" : "Demotion"),
                    link_info->file_name, uuid_utoa(query_record->gfid));
            }
        }
        per_file_status = per_link_status;

        if (per_file_status < 0) { /* Failure */
            pthread_mutex_lock(&dm_stat_mutex);
            defrag->total_failures++;
            pthread_mutex_unlock(&dm_stat_mutex);
        } else if (per_file_status == 0) { /* Success */
            pthread_mutex_lock(&dm_stat_mutex);
            defrag->total_files++;
            pthread_mutex_unlock(&dm_stat_mutex);
        } else if (per_file_status == 1) { /* Ignore */
            per_file_status = 0;
            /* Since this attempt was ignored we
             * decrement the lookup count*/
            pthread_mutex_lock(&dm_stat_mutex);
            defrag->num_files_lookedup--;
            pthread_mutex_unlock(&dm_stat_mutex);
        }
        total_status = total_status + per_file_status;
        per_link_status = 0;
        per_file_status = 0;

        gfdb_methods.gfdb_query_record_free(query_record);
        query_record = NULL;

        /* If we are demoting and the entry watermark was HI, then
         * we are done with emergency demotions if the current
         * watermark has fallen below hi-watermark level
         */
        if (emergency_demote_mode) {
            if (tier_check_watermark(this) == 0) {
                if (!is_hot_tier_full(&defrag->tier_conf)) {
                    break;
                }
            }
        }
    }

out:
    if (migrate_data)
        dict_unref(migrate_data);

    gfdb_methods.gfdb_query_record_free(query_record);
    query_record = NULL;

    return total_status;
}

/* This is the call back function per record/file from data base */
static int
tier_gf_query_callback(gfdb_query_record_t *gfdb_query_record, void *_args)
{
    int ret = -1;
    query_cbk_args_t *query_cbk_args = _args;

    GF_VALIDATE_OR_GOTO("tier", query_cbk_args, out);
    GF_VALIDATE_OR_GOTO("tier", query_cbk_args->defrag, out);
    GF_VALIDATE_OR_GOTO("tier", (query_cbk_args->query_fd > 0), out);

    ret = gfdb_methods.gfdb_write_query_record(query_cbk_args->query_fd,
                                               gfdb_query_record);
    if (ret) {
        gf_msg("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed writing query record to query file");
        goto out;
    }

    pthread_mutex_lock(&dm_stat_mutex);
    query_cbk_args->defrag->num_files_lookedup++;
    pthread_mutex_unlock(&dm_stat_mutex);

    ret = 0;
out:
    return ret;
}

/* Create query file in tier process */
static int
tier_process_self_query(tier_brick_list_t *local_brick, void *args)
{
    int ret = -1;
    char *db_path = NULL;
    query_cbk_args_t *query_cbk_args = NULL;
    xlator_t *this = NULL;
    gfdb_conn_node_t *conn_node = NULL;
    dict_t *params_dict = NULL;
    dict_t *ctr_ipc_dict = NULL;
    gfdb_brick_info_t *gfdb_brick_info = args;

    /*Init of all the essentials*/
    GF_VALIDATE_OR_GOTO("tier", gfdb_brick_info, out);
    query_cbk_args = gfdb_brick_info->_query_cbk_args;

    GF_VALIDATE_OR_GOTO("tier", query_cbk_args->this, out);
    this = query_cbk_args->this;

    GF_VALIDATE_OR_GOTO(this->name, gfdb_brick_info->_query_cbk_args, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick->xlator, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick->brick_db_path, out);

    db_path = local_brick->brick_db_path;

    /*Preparing DB parameters before init_db i.e getting db connection*/
    params_dict = dict_new();
    if (!params_dict) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "DB Params cannot initialized");
        goto out;
    }
    SET_DB_PARAM_TO_DICT(this->name, params_dict,
                         (char *)gfdb_methods.get_db_path_key(), db_path, ret,
                         out);

    /*Get the db connection*/
    conn_node = gfdb_methods.init_db((void *)params_dict, dht_tier_db_type);
    if (!conn_node) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "FATAL: Failed initializing db operations");
        goto out;
    }

    /* Query for eligible files from db */
    query_cbk_args->query_fd = open(local_brick->qfile_path,
                                    O_WRONLY | O_CREAT | O_APPEND,
                                    S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
    if (query_cbk_args->query_fd < 0) {
        gf_msg(this->name, GF_LOG_ERROR, errno, DHT_MSG_LOG_TIER_ERROR,
               "Failed to open query file %s", local_brick->qfile_path);
        goto out;
    }
    if (!gfdb_brick_info->_gfdb_promote) {
        if (query_cbk_args->defrag->tier_conf.watermark_last == TIER_WM_HI) {
            /* emergency demotion mode */
            ret = gfdb_methods.find_all(
                conn_node, tier_gf_query_callback, (void *)query_cbk_args,
                query_cbk_args->defrag->tier_conf.query_limit);
        } else {
            if (query_cbk_args->defrag->write_freq_threshold == 0 &&
                query_cbk_args->defrag->read_freq_threshold == 0) {
                ret = gfdb_methods.find_unchanged_for_time(
                    conn_node, tier_gf_query_callback, (void *)query_cbk_args,
                    gfdb_brick_info->time_stamp);
            } else {
                ret = gfdb_methods.find_unchanged_for_time_freq(
                    conn_node, tier_gf_query_callback, (void *)query_cbk_args,
                    gfdb_brick_info->time_stamp,
                    query_cbk_args->defrag->write_freq_threshold,
                    query_cbk_args->defrag->read_freq_threshold, _gf_false);
            }
        }
    } else {
        if (query_cbk_args->defrag->write_freq_threshold == 0 &&
            query_cbk_args->defrag->read_freq_threshold == 0) {
            ret = gfdb_methods.find_recently_changed_files(
                conn_node, tier_gf_query_callback, (void *)query_cbk_args,
                gfdb_brick_info->time_stamp);
        } else {
            ret = gfdb_methods.find_recently_changed_files_freq(
                conn_node, tier_gf_query_callback, (void *)query_cbk_args,
                gfdb_brick_info->time_stamp,
                query_cbk_args->defrag->write_freq_threshold,
                query_cbk_args->defrag->read_freq_threshold, _gf_false);
        }
    }
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "FATAL: query from db failed");
        goto out;
    }

    /*Clear the heat on the DB entries*/
    /*Preparing ctr_ipc_dict*/
    ctr_ipc_dict = dict_new();
    if (!ctr_ipc_dict) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "ctr_ipc_dict cannot initialized");
        goto out;
    }

    SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_dict, GFDB_IPC_CTR_KEY,
                         GFDB_IPC_CTR_CLEAR_OPS, ret, out);

    ret = syncop_ipc(local_brick->xlator, GF_IPC_TARGET_CTR, ctr_ipc_dict,
                     NULL);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed clearing the heat "
               "on db %s error %d",
               local_brick->brick_db_path, ret);
        goto out;
    }

    ret = 0;
out:
    if (params_dict) {
        dict_unref(params_dict);
        params_dict = NULL;
    }

    if (ctr_ipc_dict) {
        dict_unref(ctr_ipc_dict);
        ctr_ipc_dict = NULL;
    }

    if (query_cbk_args && query_cbk_args->query_fd >= 0) {
        sys_close(query_cbk_args->query_fd);
        query_cbk_args->query_fd = -1;
    }
    gfdb_methods.fini_db(conn_node);

    return ret;
}

/*Ask CTR to create the query file*/
static int
tier_process_ctr_query(tier_brick_list_t *local_brick, void *args)
{
    int ret = -1;
    query_cbk_args_t *query_cbk_args = NULL;
    xlator_t *this = NULL;
    dict_t *ctr_ipc_in_dict = NULL;
    dict_t *ctr_ipc_out_dict = NULL;
    gfdb_brick_info_t *gfdb_brick_info = args;
    gfdb_ipc_ctr_params_t *ipc_ctr_params = NULL;
    int count = 0;

    /*Init of all the essentials*/
    GF_VALIDATE_OR_GOTO("tier", gfdb_brick_info, out);
    query_cbk_args = gfdb_brick_info->_query_cbk_args;

    GF_VALIDATE_OR_GOTO("tier", query_cbk_args->this, out);
    this = query_cbk_args->this;

    GF_VALIDATE_OR_GOTO(this->name, gfdb_brick_info->_query_cbk_args, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick->xlator, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick->brick_db_path, out);

    /*Preparing ctr_ipc_in_dict*/
    ctr_ipc_in_dict = dict_new();
    if (!ctr_ipc_in_dict) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "ctr_ipc_in_dict cannot initialized");
        goto out;
    }

    ipc_ctr_params = GF_CALLOC(1, sizeof(gfdb_ipc_ctr_params_t),
                               gf_tier_mt_ipc_ctr_params_t);
    if (!ipc_ctr_params) {
        goto out;
    }

    /* set all the query params*/
    ipc_ctr_params->is_promote = gfdb_brick_info->_gfdb_promote;

    ipc_ctr_params->write_freq_threshold = query_cbk_args->defrag
                                               ->write_freq_threshold;

    ipc_ctr_params->read_freq_threshold = query_cbk_args->defrag
                                              ->read_freq_threshold;

    ipc_ctr_params->query_limit = query_cbk_args->defrag->tier_conf.query_limit;

    ipc_ctr_params->emergency_demote = (!gfdb_brick_info->_gfdb_promote &&
                                        query_cbk_args->defrag->tier_conf
                                                .watermark_last == TIER_WM_HI);

    memcpy(&ipc_ctr_params->time_stamp, gfdb_brick_info->time_stamp,
           sizeof(gfdb_time_t));

    SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict, GFDB_IPC_CTR_KEY,
                         GFDB_IPC_CTR_QUERY_OPS, ret, out);

    SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict,
                         GFDB_IPC_CTR_GET_QFILE_PATH, local_brick->qfile_path,
                         ret, out);

    ret = dict_set_bin(ctr_ipc_in_dict, GFDB_IPC_CTR_GET_QUERY_PARAMS,
                       ipc_ctr_params, sizeof(*ipc_ctr_params));
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED,
               "Failed setting %s to params dictionary",
               GFDB_IPC_CTR_GET_QUERY_PARAMS);
        GF_FREE(ipc_ctr_params);
        goto out;
    }
    ipc_ctr_params = NULL;

    ret = syncop_ipc(local_brick->xlator, GF_IPC_TARGET_CTR, ctr_ipc_in_dict,
                     &ctr_ipc_out_dict);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_IPC_TIER_ERROR,
               "Failed query on %s ret %d", local_brick->brick_db_path, ret);
        goto out;
    }

    ret = dict_get_int32(ctr_ipc_out_dict, GFDB_IPC_CTR_RET_QUERY_COUNT,
                         &count);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed getting count "
               "of records on %s",
               local_brick->brick_db_path);
        goto out;
    }

    if (count < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed query on %s", local_brick->brick_db_path);
        ret = -1;
        goto out;
    }

    pthread_mutex_lock(&dm_stat_mutex);
    query_cbk_args->defrag->num_files_lookedup = count;
    pthread_mutex_unlock(&dm_stat_mutex);

    ret = 0;
out:

    if (ctr_ipc_in_dict) {
        dict_unref(ctr_ipc_in_dict);
        ctr_ipc_in_dict = NULL;
    }

    if (ctr_ipc_out_dict) {
        dict_unref(ctr_ipc_out_dict);
        ctr_ipc_out_dict = NULL;
    }

    GF_FREE(ipc_ctr_params);

    return ret;
}

/* This is the call back function for each brick from hot/cold bricklist
 * It picks up each bricks db and queries for eligible files for migration.
 * The list of eligible files are populated in appropriate query files*/
static int
tier_process_brick(tier_brick_list_t *local_brick, void *args)
{
    int ret = -1;
    dict_t *ctr_ipc_in_dict = NULL;
    dict_t *ctr_ipc_out_dict = NULL;
    char *strval = NULL;

    GF_VALIDATE_OR_GOTO("tier", local_brick, out);

    GF_VALIDATE_OR_GOTO("tier", local_brick->xlator, out);

    if (dht_tier_db_type == GFDB_SQLITE3) {
        /*Preparing ctr_ipc_in_dict*/
        ctr_ipc_in_dict = dict_new();
        if (!ctr_ipc_in_dict) {
            gf_msg("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                   "ctr_ipc_in_dict cannot initialized");
            goto out;
        }

        ret = dict_set_str(ctr_ipc_in_dict, GFDB_IPC_CTR_KEY,
                           GFDB_IPC_CTR_GET_DB_PARAM_OPS);
        if (ret) {
            gf_msg("tier", GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED,
                   "Failed to set %s "
                   "to params dictionary",
                   GFDB_IPC_CTR_KEY);
            goto out;
        }

        ret = dict_set_str(ctr_ipc_in_dict, GFDB_IPC_CTR_GET_DB_PARAM_OPS, "");
        if (ret) {
            gf_msg("tier", GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED,
                   "Failed to set %s "
                   "to params dictionary",
                   GFDB_IPC_CTR_GET_DB_PARAM_OPS);
            goto out;
        }

        ret = dict_set_str(ctr_ipc_in_dict, GFDB_IPC_CTR_GET_DB_KEY,
                           "journal_mode");
        if (ret) {
            gf_msg("tier", GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED,
                   "Failed to set %s "
                   "to params dictionary",
                   GFDB_IPC_CTR_GET_DB_KEY);
            goto out;
        }

        ret = syncop_ipc(local_brick->xlator, GF_IPC_TARGET_CTR,
                         ctr_ipc_in_dict, &ctr_ipc_out_dict);
        if (ret || ctr_ipc_out_dict == NULL) {
            gf_msg("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                   "Failed to get "
                   "journal_mode of sql db %s",
                   local_brick->brick_db_path);
            goto out;
        }

        ret = dict_get_str(ctr_ipc_out_dict, "journal_mode", &strval);
        if (ret) {
            gf_msg("tier", GF_LOG_ERROR, 0, LG_MSG_GET_PARAM_FAILED,
                   "Failed to get %s "
                   "from params dictionary"
                   "journal_mode",
                   strval);
            goto out;
        }

        if (strval && (strncmp(strval, "wal", SLEN("wal")) == 0)) {
            ret = tier_process_self_query(local_brick, args);
            if (ret) {
                goto out;
            }
        } else {
            ret = tier_process_ctr_query(local_brick, args);
            if (ret) {
                goto out;
            }
        }
        ret = 0;

    } else {
        ret = tier_process_self_query(local_brick, args);
        if (ret) {
            goto out;
        }
    }

    ret = 0;
out:
    if (ctr_ipc_in_dict)
        dict_unref(ctr_ipc_in_dict);

    if (ctr_ipc_out_dict)
        dict_unref(ctr_ipc_out_dict);

    return ret;
}

static int
tier_build_migration_qfile(migration_args_t *args,
                           query_cbk_args_t *query_cbk_args,
                           gf_boolean_t is_promotion)
{
    gfdb_time_t current_time;
    gfdb_brick_info_t gfdb_brick_info;
    gfdb_time_t time_in_past;
    int ret = -1;
    tier_brick_list_t *local_brick = NULL;
    int i = 0;
    time_in_past.tv_sec = args->freq_time;
    time_in_past.tv_usec = 0;

    ret = gettimeofday(&current_time, NULL);
    if (ret == -1) {
        gf_msg(args->this->name, GF_LOG_ERROR, errno,
               DHT_MSG_SYS_CALL_GET_TIME_FAILED, "Failed to get current time");
        goto out;
    }
    time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec;

    /* The migration daemon may run a varying numberof usec after the */
    /* sleep call triggers. A file may be registered in CTR some number */
    /* of usec X after the daemon started and missed in the subsequent */
    /* cycle if the daemon starts Y usec after the period in seconds */
    /* where Y>X. Normalize away this problem by always setting usec */
    /* to 0. */
    time_in_past.tv_usec = 0;

    gfdb_brick_info.time_stamp = &time_in_past;
    gfdb_brick_info._gfdb_promote = is_promotion;
    gfdb_brick_info._query_cbk_args = query_cbk_args;

    list_for_each_entry(local_brick, args->brick_list, list)
    {
        /* Construct query file path for this brick
         * i.e
         * /var/run/gluster/xlator_name/
         * {promote/demote}-brickname-indexinbricklist
         * So that no two query files will have same path even
         * bricks have the same name
         * */
        snprintf(local_brick->qfile_path, PATH_MAX, "%s-%s-%d",
                 GET_QFILE_PATH(gfdb_brick_info._gfdb_promote),
                 local_brick->brick_name, i);

        /* Delete any old query files for this brick */
        sys_unlink(local_brick->qfile_path);

        ret = tier_process_brick(local_brick, &gfdb_brick_info);
        if (ret) {
            gf_msg(args->this->name, GF_LOG_ERROR, 0,
                   DHT_MSG_BRICK_QUERY_FAILED, "Brick %s query failed\n",
                   local_brick->brick_db_path);
        }
        i++;
    }
    ret = 0;
out:
    return ret;
}

static int
tier_migrate_files_using_qfile(migration_args_t *comp,
                               query_cbk_args_t *query_cbk_args)
{
    int ret = -1;
    tier_brick_list_t *local_brick = NULL;
    tier_brick_list_t *temp = NULL;
    gfdb_time_t current_time = {
        0,
    };
    ssize_t qfile_array_size = 0;
    int count = 0;
    int temp_fd = 0;
    gf_tier_conf_t *tier_conf = NULL;

    tier_conf = &(query_cbk_args->defrag->tier_conf);

    /* Time for error query files */
    gettimeofday(&current_time, NULL);

    /* Build the qfile list */
    list_for_each_entry_safe(local_brick, temp, comp->brick_list, list)
    {
        qfile_array_size++;
    }
    query_cbk_args->qfile_array = qfile_array_new(qfile_array_size);
    if (!query_cbk_args->qfile_array) {
        gf_msg("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to create new "
               "qfile_array");
        goto out;
    }

    /*Open all qfiles*/
    count = 0;
    query_cbk_args->qfile_array->exhausted_count = 0;
    list_for_each_entry_safe(local_brick, temp, comp->brick_list, list)
    {
        temp_fd = query_cbk_args->qfile_array->fd_array[count];
        temp_fd = open(local_brick->qfile_path, O_RDONLY,
                       S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
        if (temp_fd < 0) {
            gf_msg("tier", GF_LOG_ERROR, errno, DHT_MSG_LOG_TIER_ERROR,
                   "Failed to open "
                   "%s to the query file",
                   local_brick->qfile_path);
            query_cbk_args->qfile_array->exhausted_count++;
        }
        query_cbk_args->qfile_array->fd_array[count] = temp_fd;
        count++;
    }

    /* Moving the query file index to the next, so that we won't the same
     * query file every cycle as the first one */
    query_cbk_args->qfile_array
        ->next_index = (query_cbk_args->is_promotion)
                           ? tier_conf->last_promote_qfile_index
                           : tier_conf->last_demote_qfile_index;
    shift_next_index(query_cbk_args->qfile_array);
    if (query_cbk_args->is_promotion) {
        tier_conf->last_promote_qfile_index = query_cbk_args->qfile_array
                                                  ->next_index;
    } else {
        tier_conf->last_demote_qfile_index = query_cbk_args->qfile_array
                                                 ->next_index;
    }

    /* Migrate files using query file list */
    ret = tier_migrate_using_query_file((void *)query_cbk_args);
out:
    qfile_array_free(query_cbk_args->qfile_array);

    /* If there is an error rename all the query files to .err files
     * with a timestamp for better debugging */
    if (ret) {
        struct tm tm = {
            0,
        };
        char time_str[128] = {
            0,
        };
        char query_file_path_err[PATH_MAX] = {
            0,
        };
        int32_t len = 0;

        /* Time format for error query files */
        gmtime_r(&current_time.tv_sec, &tm);
        strftime(time_str, sizeof(time_str), "%F-%T", &tm);

        list_for_each_entry_safe(local_brick, temp, comp->brick_list, list)
        {
            /* rename error qfile*/
            len = snprintf(query_file_path_err, sizeof(query_file_path_err),
                           "%s-%s.err", local_brick->qfile_path, time_str);
            if ((len >= 0) && (len < sizeof(query_file_path_err))) {
                if (sys_rename(local_brick->qfile_path, query_file_path_err) ==
                    -1)
                    gf_msg_debug("tier", 0,
                                 "rename "
                                 "failed");
            }
        }
    }

    query_cbk_args->qfile_array = NULL;

    return ret;
}

int
tier_demote(migration_args_t *demotion_args)
{
    query_cbk_args_t query_cbk_args;
    int ret = -1;

    GF_VALIDATE_OR_GOTO("tier", demotion_args, out);
    GF_VALIDATE_OR_GOTO("tier", demotion_args->this, out);
    GF_VALIDATE_OR_GOTO(demotion_args->this->name, demotion_args->brick_list,
                        out);
    GF_VALIDATE_OR_GOTO(demotion_args->this->name, demotion_args->defrag, out);

    THIS = demotion_args->this;

    query_cbk_args.this = demotion_args->this;
    query_cbk_args.defrag = demotion_args->defrag;
    query_cbk_args.is_promotion = 0;

    /*Build the query file using bricklist*/
    ret = tier_build_migration_qfile(demotion_args, &query_cbk_args, _gf_false);
    if (ret)
        goto out;

    /* Migrate files using the query file */
    ret = tier_migrate_files_using_qfile(demotion_args, &query_cbk_args);
    if (ret)
        goto out;

out:
    demotion_args->return_value = ret;
    return ret;
}

int
tier_promote(migration_args_t *promotion_args)
{
    int ret = -1;
    query_cbk_args_t query_cbk_args;

    GF_VALIDATE_OR_GOTO("tier", promotion_args->this, out);
    GF_VALIDATE_OR_GOTO(promotion_args->this->name, promotion_args->brick_list,
                        out);
    GF_VALIDATE_OR_GOTO(promotion_args->this->name, promotion_args->defrag,
                        out);

    THIS = promotion_args->this;

    query_cbk_args.this = promotion_args->this;
    query_cbk_args.defrag = promotion_args->defrag;
    query_cbk_args.is_promotion = 1;

    /*Build the query file using bricklist*/
    ret = tier_build_migration_qfile(promotion_args, &query_cbk_args, _gf_true);
    if (ret)
        goto out;

    /* Migrate files using the query file */
    ret = tier_migrate_files_using_qfile(promotion_args, &query_cbk_args);
    if (ret)
        goto out;

out:
    promotion_args->return_value = ret;
    return ret;
}

/*
 * Command the CTR on a brick to compact the local database using an IPC
 */
static int
tier_process_self_compact(tier_brick_list_t *local_brick, void *args)
{
    int ret = -1;
    char *db_path = NULL;
    query_cbk_args_t *query_cbk_args = NULL;
    xlator_t *this = NULL;
    gfdb_conn_node_t *conn_node = NULL;
    dict_t *params_dict = NULL;
    dict_t *ctr_ipc_dict = NULL;
    gfdb_brick_info_t *gfdb_brick_info = args;

    /*Init of all the essentials*/
    GF_VALIDATE_OR_GOTO("tier", gfdb_brick_info, out);
    query_cbk_args = gfdb_brick_info->_query_cbk_args;

    GF_VALIDATE_OR_GOTO("tier", query_cbk_args->this, out);
    this = query_cbk_args->this;

    GF_VALIDATE_OR_GOTO(this->name, gfdb_brick_info->_query_cbk_args, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick->xlator, out);

    GF_VALIDATE_OR_GOTO(this->name, local_brick->brick_db_path, out);

    db_path = local_brick->brick_db_path;

    /*Preparing DB parameters before init_db i.e getting db connection*/
    params_dict = dict_new();
    if (!params_dict) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "DB Params cannot initialized");
        goto out;
    }
    SET_DB_PARAM_TO_DICT(this->name, params_dict,
                         (char *)gfdb_methods.get_db_path_key(), db_path, ret,
                         out);

    /*Get the db connection*/
    conn_node = gfdb_methods.init_db((void *)params_dict, dht_tier_db_type);
    if (!conn_node) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "FATAL: Failed initializing db operations");
        goto out;
    }

    ret = 0;

    /*Preparing ctr_ipc_dict*/
    ctr_ipc_dict = dict_new();
    if (!ctr_ipc_dict) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "ctr_ipc_dict cannot initialized");
        goto out;
    }

    ret = dict_set_int32(ctr_ipc_dict, "compact_active",
                         query_cbk_args->defrag->tier_conf.compact_active);

    if (ret) {
        gf_msg("tier", GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED,
               "Failed to set %s "
               "to params dictionary",
               "compact_active");
        goto out;
    }

    ret = dict_set_int32(
        ctr_ipc_dict, "compact_mode_switched",
        query_cbk_args->defrag->tier_conf.compact_mode_switched);

    if (ret) {
        gf_msg("tier", GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED,
               "Failed to set %s "
               "to params dictionary",
               "compact_mode_switched");
        goto out;
    }

    SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_dict, GFDB_IPC_CTR_KEY,
                         GFDB_IPC_CTR_SET_COMPACT_PRAGMA, ret, out);

    gf_msg(this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
           "Starting Compaction IPC");

    ret = syncop_ipc(local_brick->xlator, GF_IPC_TARGET_CTR, ctr_ipc_dict,
                     NULL);

    gf_msg(this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
           "Ending Compaction IPC");

    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed compaction "
               "on db %s error %d",
               local_brick->brick_db_path, ret);
        goto out;
    }

    gf_msg(this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
           "SUCCESS: %s Compaction", local_brick->brick_name);

    ret = 0;
out:
    if (params_dict) {
        dict_unref(params_dict);
        params_dict = NULL;
    }

    if (ctr_ipc_dict) {
        dict_unref(ctr_ipc_dict);
        ctr_ipc_dict = NULL;
    }

    gfdb_methods.fini_db(conn_node);

    return ret;
}

/*
 * This is the call back function for each brick from hot/cold bricklist.
 * It determines the database type on each brick and calls the corresponding
 * function to prepare the compaction IPC.
 */
static int
tier_compact_db_brick(tier_brick_list_t *local_brick, void *args)
{
    int ret = -1;

    GF_VALIDATE_OR_GOTO("tier", local_brick, out);

    GF_VALIDATE_OR_GOTO("tier", local_brick->xlator, out);

    ret = tier_process_self_compact(local_brick, args);
    if (ret) {
        gf_msg("tier", GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
               "Brick %s did not compact", local_brick->brick_name);
        goto out;
    }

    ret = 0;

out:

    return ret;
}

static int
tier_send_compact(migration_args_t *args, query_cbk_args_t *query_cbk_args)
{
    gfdb_time_t current_time;
    gfdb_brick_info_t gfdb_brick_info;
    gfdb_time_t time_in_past;
    int ret = -1;
    tier_brick_list_t *local_brick = NULL;

    time_in_past.tv_sec = args->freq_time;
    time_in_past.tv_usec = 0;

    ret = gettimeofday(&current_time, NULL);
    if (ret == -1) {
        gf_msg(args->this->name, GF_LOG_ERROR, errno,
               DHT_MSG_SYS_CALL_GET_TIME_FAILED, "Failed to get current time");
        goto out;
    }
    time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec;

    /* The migration daemon may run a varying numberof usec after the sleep
       call triggers. A file may be registered in CTR some number of usec X
       after the daemon started and missed in the subsequent cycle if the
       daemon starts Y usec after the period in seconds where Y>X. Normalize
       away this problem by always setting usec to 0. */
    time_in_past.tv_usec = 0;

    gfdb_brick_info.time_stamp = &time_in_past;

    /* This is meant to say we are always compacting at this point */
    /* We simply borrow the promotion flag to do this */
    gfdb_brick_info._gfdb_promote = 1;

    gfdb_brick_info._query_cbk_args = query_cbk_args;

    list_for_each_entry(local_brick, args->brick_list, list)
    {
        gf_msg(args->this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
               "Start compaction for %s", local_brick->brick_name);

        ret = tier_compact_db_brick(local_brick, &gfdb_brick_info);
        if (ret) {
            gf_msg(args->this->name, GF_LOG_ERROR, 0,
                   DHT_MSG_BRICK_QUERY_FAILED, "Brick %s compaction failed\n",
                   local_brick->brick_db_path);
        }

        gf_msg(args->this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
               "End compaction for %s", local_brick->brick_name);
    }
    ret = 0;
out:
    return ret;
}

static int
tier_compact(void *args)
{
    int ret = -1;
    query_cbk_args_t query_cbk_args;
    migration_args_t *compaction_args = args;

    GF_VALIDATE_OR_GOTO("tier", compaction_args->this, out);
    GF_VALIDATE_OR_GOTO(compaction_args->this->name,
                        compaction_args->brick_list, out);
    GF_VALIDATE_OR_GOTO(compaction_args->this->name, compaction_args->defrag,
                        out);

    THIS = compaction_args->this;

    query_cbk_args.this = compaction_args->this;
    query_cbk_args.defrag = compaction_args->defrag;
    query_cbk_args.is_compaction = 1;

    /* Send the compaction pragma out to all the bricks on the bricklist. */
    /* tier_get_bricklist ensures all bricks on the list are local to */
    /* this node. */
    ret = tier_send_compact(compaction_args, &query_cbk_args);
    if (ret)
        goto out;

    ret = 0;
out:
    compaction_args->return_value = ret;
    return ret;
}

static int
tier_get_bricklist(xlator_t *xl, struct list_head *local_bricklist_head)
{
    xlator_list_t *child = NULL;
    char *rv = NULL;
    char *rh = NULL;
    char *brickname = NULL;
    char db_name[PATH_MAX] = "";
    int ret = 0;
    tier_brick_list_t *local_brick = NULL;
    int32_t len = 0;

    GF_VALIDATE_OR_GOTO("tier", xl, out);
    GF_VALIDATE_OR_GOTO("tier", local_bricklist_head, out);

    /*
     * This function obtains remote subvolumes and filters out only
     * those running on the same node as the tier daemon.
     */
    if (strcmp(xl->type, "protocol/client") == 0) {
        ret = dict_get_str(xl->options, "remote-host", &rh);
        if (ret < 0)
            goto out;

        if (gf_is_local_addr(rh)) {
            local_brick = GF_CALLOC(1, sizeof(tier_brick_list_t),
                                    gf_tier_mt_bricklist_t);
            if (!local_brick) {
                goto out;
            }

            ret = dict_get_str(xl->options, "remote-subvolume", &rv);
            if (ret < 0)
                goto out;

            brickname = strrchr(rv, '/') + 1;
            snprintf(db_name, sizeof(db_name), "%s.db", brickname);

            local_brick->brick_db_path = GF_MALLOC(PATH_MAX, gf_common_mt_char);
            if (!local_brick->brick_db_path) {
                gf_msg("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_STATUS,
                       "Failed to allocate memory for"
                       " bricklist.");
                ret = -1;
                goto out;
            }

            len = snprintf(local_brick->brick_db_path, PATH_MAX, "%s/%s/%s", rv,
                           GF_HIDDEN_PATH, db_name);
            if ((len < 0) || (len >= PATH_MAX)) {
                gf_msg("tier", GF_LOG_ERROR, EINVAL, DHT_MSG_LOG_TIER_STATUS,
                       "DB path too long");
                ret = -1;
                goto out;
            }

            local_brick->xlator = xl;

            snprintf(local_brick->brick_name, NAME_MAX, "%s", brickname);

            list_add_tail(&(local_brick->list), local_bricklist_head);

            ret = 0;
            goto out;
        }
    }

    for (child = xl->children; child; child = child->next) {
        ret = tier_get_bricklist(child->xlator, local_bricklist_head);
        if (ret) {
            goto out;
        }
    }

    ret = 0;
out:

    if (ret) {
        if (local_brick) {
            GF_FREE(local_brick->brick_db_path);
        }
        GF_FREE(local_brick);
    }

    return ret;
}

int
tier_get_freq_demote(gf_tier_conf_t *tier_conf)
{
    if ((tier_conf->mode == TIER_MODE_WM) &&
        (tier_conf->watermark_last == TIER_WM_HI))
        return DEFAULT_DEMOTE_DEGRADED;
    else
        return tier_conf->tier_demote_frequency;
}

int
tier_get_freq_promote(gf_tier_conf_t *tier_conf)
{
    return tier_conf->tier_promote_frequency;
}

int
tier_get_freq_compact_hot(gf_tier_conf_t *tier_conf)
{
    return tier_conf->tier_compact_hot_frequency;
}

int
tier_get_freq_compact_cold(gf_tier_conf_t *tier_conf)
{
    return tier_conf->tier_compact_cold_frequency;
}

static int
tier_check_demote(gfdb_time_t current_time, int freq)
{
    return ((current_time.tv_sec % freq) == 0) ? _gf_true : _gf_false;
}

static gf_boolean_t
tier_check_promote(gf_tier_conf_t *tier_conf, gfdb_time_t current_time,
                   int freq)
{
    if ((tier_conf->mode == TIER_MODE_WM) &&
        (tier_conf->watermark_last == TIER_WM_HI))
        return _gf_false;

    else
        return ((current_time.tv_sec % freq) == 0) ? _gf_true : _gf_false;
}

static gf_boolean_t
tier_check_compact(gf_tier_conf_t *tier_conf, gfdb_time_t current_time,
                   int freq_compact)
{
    if (!(tier_conf->compact_active || tier_conf->compact_mode_switched))
        return _gf_false;

    return ((current_time.tv_sec % freq_compact) == 0) ? _gf_true : _gf_false;
}

void
clear_bricklist(struct list_head *brick_list)
{
    tier_brick_list_t *local_brick = NULL;
    tier_brick_list_t *temp = NULL;

    if (list_empty(brick_list)) {
        return;
    }

    list_for_each_entry_safe(local_brick, temp, brick_list, list)
    {
        list_del(&local_brick->list);
        GF_FREE(local_brick->brick_db_path);
        GF_FREE(local_brick);
    }
}

static void
set_brick_list_qpath(struct list_head *brick_list, gf_boolean_t is_cold)
{
    tier_brick_list_t *local_brick = NULL;
    int i = 0;

    GF_VALIDATE_OR_GOTO("tier", brick_list, out);

    list_for_each_entry(local_brick, brick_list, list)
    {
        /* Construct query file path for this brick
         * i.e
         * /var/run/gluster/xlator_name/
         * {promote/demote}-brickname-indexinbricklist
         * So that no two query files will have same path even
         * bricks have the same name
         * */
        snprintf(local_brick->qfile_path, PATH_MAX, "%s-%s-%d",
                 GET_QFILE_PATH(is_cold), local_brick->brick_name, i);
        i++;
    }
out:
    return;
}

static int
tier_prepare_compact(migration_args_t *args, gfdb_time_t current_time)
{
    xlator_t *this = NULL;
    dht_conf_t *conf = NULL;
    gf_defrag_info_t *defrag = NULL;
    gf_tier_conf_t *tier_conf = NULL;
    gf_boolean_t is_hot_tier = args->is_hot_tier;
    int freq = 0;
    int ret = -1;
    const char *tier_type = is_hot_tier ? "hot" : "cold";

    this = args->this;

    conf = this->private;

    defrag = conf->defrag;

    tier_conf = &defrag->tier_conf;

    freq = is_hot_tier ? tier_get_freq_compact_hot(tier_conf)
                       : tier_get_freq_compact_cold(tier_conf);

    defrag->tier_conf.compact_mode_switched =
        is_hot_tier ? defrag->tier_conf.compact_mode_switched_hot
                    : defrag->tier_conf.compact_mode_switched_cold;

    gf_msg(this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
           "Compact mode %i", defrag->tier_conf.compact_mode_switched);

    if (tier_check_compact(tier_conf, current_time, freq)) {
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
               "Start compaction on %s tier", tier_type);

        args->freq_time = freq;
        ret = tier_compact(args);
        if (ret) {
            gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                   "Compaction failed on "
                   "%s tier",
                   tier_type);
            goto out;
        }

        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
               "End compaction on %s tier", tier_type);

        if (is_hot_tier) {
            defrag->tier_conf.compact_mode_switched_hot = _gf_false;
        } else {
            defrag->tier_conf.compact_mode_switched_cold = _gf_false;
        }
    }

out:
    return ret;
}

static int
tier_get_wm_interval(tier_mode_t mode, tier_watermark_op_t wm)
{
    if (mode == TIER_MODE_WM && wm == TIER_WM_HI)
        return WM_INTERVAL_EMERG;

    return WM_INTERVAL;
}

/*
 * Main tiering loop. This is called from the promotion and the
 * demotion threads spawned in tier_start().
 *
 * Every second, wake from sleep to perform tasks.
 * 1. Check trigger to migrate data.
 * 2. Check for state changes (pause, unpause, stop).
 */
static void *
tier_run(void *in_args)
{
    dht_conf_t *conf = NULL;
    gfdb_time_t current_time = {0};
    int freq = 0;
    int ret = 0;
    xlator_t *any = NULL;
    xlator_t *xlator = NULL;
    gf_tier_conf_t *tier_conf = NULL;
    loc_t root_loc = {0};
    int check_watermark = 0;
    gf_defrag_info_t *defrag = NULL;
    xlator_t *this = NULL;
    migration_args_t *args = in_args;
    GF_VALIDATE_OR_GOTO("tier", args, out);
    GF_VALIDATE_OR_GOTO("tier", args->brick_list, out);

    this = args->this;
    GF_VALIDATE_OR_GOTO("tier", this, out);

    conf = this->private;
    GF_VALIDATE_OR_GOTO("tier", conf, out);

    defrag = conf->defrag;
    GF_VALIDATE_OR_GOTO("tier", defrag, out);

    if (list_empty(args->brick_list)) {
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_ERROR,
               "Brick list for tier is empty. Exiting.");
        goto out;
    }

    defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
    tier_conf = &defrag->tier_conf;

    dht_build_root_loc(defrag->root_inode, &root_loc);

    while (1) {
        /*
         * Check if a graph switch occurred. If so, stop migration
         * thread. It will need to be restarted manually.
         */
        any = THIS->ctx->active->first;
        xlator = xlator_search_by_name(any, this->name);

        if (xlator != this) {
            gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
                   "Detected graph switch. Exiting migration "
                   "daemon.");
            goto out;
        }

        gf_defrag_check_pause_tier(tier_conf);

        sleep(1);

        if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
            ret = 1;
            gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                   "defrag->defrag_status != "
                   "GF_DEFRAG_STATUS_STARTED");
            goto out;
        }

        if (defrag->cmd == GF_DEFRAG_CMD_START_DETACH_TIER ||
            defrag->cmd == GF_DEFRAG_CMD_DETACH_START) {
            ret = 0;
            defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE;
            gf_msg(this->name, GF_LOG_DEBUG, 0, DHT_MSG_LOG_TIER_ERROR,
                   "defrag->defrag_cmd == "
                   "GF_DEFRAG_CMD_START_DETACH_TIER");
            goto out;
        }

        if (gf_defrag_get_pause_state(&defrag->tier_conf) != TIER_RUNNING)
            continue;

        /* To have proper synchronization amongst all
         * brick holding nodes, so that promotion and demotions
         * start atomically w.r.t promotion/demotion frequency
         * period, all nodes should have their system time
         * in-sync with each other either manually set or
         * using a NTP server*/
        ret = gettimeofday(&current_time, NULL);
        if (ret == -1) {
            gf_msg(this->name, GF_LOG_ERROR, errno,
                   DHT_MSG_SYS_CALL_GET_TIME_FAILED,
                   "Failed to get current time");
            goto out;
        }

        check_watermark++;

        /* emergency demotion requires frequent watermark monitoring */
        if (check_watermark >=
            tier_get_wm_interval(tier_conf->mode, tier_conf->watermark_last)) {
            check_watermark = 0;
            if (tier_conf->mode == TIER_MODE_WM) {
                ret = tier_get_fs_stat(this, &root_loc);
                if (ret != 0) {
                    continue;
                }
                ret = tier_check_watermark(this);
                if (ret != 0) {
                    gf_msg(this->name, GF_LOG_CRITICAL, errno,
                           DHT_MSG_LOG_TIER_ERROR, "Failed to get watermark");
                    continue;
                }
            }
        }

        if (args->is_promotion) {
            freq = tier_get_freq_promote(tier_conf);

            if (tier_check_promote(tier_conf, current_time, freq)) {
                args->freq_time = freq;
                ret = tier_promote(args);
                if (ret) {
                    gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                           "Promotion failed");
                }
            }
        } else if (args->is_compaction) {
            tier_prepare_compact(args, current_time);
        } else {
            freq = tier_get_freq_demote(tier_conf);

            if (tier_check_demote(current_time, freq)) {
                args->freq_time = freq;
                ret = tier_demote(args);
                if (ret) {
                    gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                           "Demotion failed");
                }
            }
        }

        /* Check the statfs immediately after the processing threads
           return */
        check_watermark = WM_INTERVAL;
    }

    ret = 0;
out:

    args->return_value = ret;

    return NULL;
}

int
tier_start(xlator_t *this, gf_defrag_info_t *defrag)
{
    pthread_t promote_thread;
    pthread_t demote_thread;
    pthread_t hot_compact_thread;
    pthread_t cold_compact_thread;
    int ret = -1;
    struct list_head bricklist_hot = {0};
    struct list_head bricklist_cold = {0};
    migration_args_t promotion_args = {0};
    migration_args_t demotion_args = {0};
    migration_args_t hot_compaction_args = {0};
    migration_args_t cold_compaction_args = {0};
    dht_conf_t *conf = NULL;

    INIT_LIST_HEAD((&bricklist_hot));
    INIT_LIST_HEAD((&bricklist_cold));

    conf = this->private;

    tier_get_bricklist(conf->subvolumes[1], &bricklist_hot);
    set_brick_list_qpath(&bricklist_hot, _gf_false);

    demotion_args.this = this;
    demotion_args.brick_list = &bricklist_hot;
    demotion_args.defrag = defrag;
    demotion_args.is_promotion = _gf_false;
    demotion_args.is_compaction = _gf_false;

    ret = gf_thread_create(&demote_thread, NULL, &tier_run, &demotion_args,
                           "tierdem");
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to start demotion thread.");
        defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
        goto cleanup;
    }

    tier_get_bricklist(conf->subvolumes[0], &bricklist_cold);
    set_brick_list_qpath(&bricklist_cold, _gf_true);

    promotion_args.this = this;
    promotion_args.brick_list = &bricklist_cold;
    promotion_args.defrag = defrag;
    promotion_args.is_promotion = _gf_true;

    ret = gf_thread_create(&promote_thread, NULL, &tier_run, &promotion_args,
                           "tierpro");
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to start promotion thread.");
        defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
        goto waitforspawned;
    }

    hot_compaction_args.this = this;
    hot_compaction_args.brick_list = &bricklist_hot;
    hot_compaction_args.defrag = defrag;
    hot_compaction_args.is_promotion = _gf_false;
    hot_compaction_args.is_compaction = _gf_true;
    hot_compaction_args.is_hot_tier = _gf_true;

    ret = gf_thread_create(&hot_compact_thread, NULL, &tier_run,
                           &hot_compaction_args, "tierhcom");
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to start compaction thread.");
        defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
        goto waitforspawnedpromote;
    }

    cold_compaction_args.this = this;
    cold_compaction_args.brick_list = &bricklist_cold;
    cold_compaction_args.defrag = defrag;
    cold_compaction_args.is_promotion = _gf_false;
    cold_compaction_args.is_compaction = _gf_true;
    cold_compaction_args.is_hot_tier = _gf_false;

    ret = gf_thread_create(&cold_compact_thread, NULL, &tier_run,
                           &cold_compaction_args, "tierccom");
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Failed to start compaction thread.");
        defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
        goto waitforspawnedhotcompact;
    }
    pthread_join(cold_compact_thread, NULL);

waitforspawnedhotcompact:
    pthread_join(hot_compact_thread, NULL);

waitforspawnedpromote:
    pthread_join(promote_thread, NULL);

waitforspawned:
    pthread_join(demote_thread, NULL);

cleanup:
    clear_bricklist(&bricklist_cold);
    clear_bricklist(&bricklist_hot);
    return ret;
}

int32_t
tier_migration_needed(xlator_t *this)
{
    gf_defrag_info_t *defrag = NULL;
    dht_conf_t *conf = NULL;
    int ret = 0;

    conf = this->private;

    GF_VALIDATE_OR_GOTO(this->name, conf, out);
    GF_VALIDATE_OR_GOTO(this->name, conf->defrag, out);

    defrag = conf->defrag;

    if ((defrag->cmd == GF_DEFRAG_CMD_START_TIER) ||
        (defrag->cmd == GF_DEFRAG_CMD_START_DETACH_TIER))
        ret = 1;
out:
    return ret;
}

int32_t
tier_migration_get_dst(xlator_t *this, dht_local_t *local)
{
    dht_conf_t *conf = NULL;
    int32_t ret = -1;
    gf_defrag_info_t *defrag = NULL;

    GF_VALIDATE_OR_GOTO("tier", this, out);
    GF_VALIDATE_OR_GOTO(this->name, this->private, out);

    conf = this->private;

    defrag = conf->defrag;

    if (defrag && defrag->cmd == GF_DEFRAG_CMD_START_DETACH_TIER) {
        local->rebalance.target_node = conf->subvolumes[0];

    } else if (conf->subvolumes[0] == local->cached_subvol)
        local->rebalance.target_node = conf->subvolumes[1];
    else
        local->rebalance.target_node = conf->subvolumes[0];

    if (local->rebalance.target_node)
        ret = 0;

out:
    return ret;
}

xlator_t *
tier_search(xlator_t *this, dht_layout_t *layout, const char *name)
{
    xlator_t *subvol = NULL;
    dht_conf_t *conf = NULL;

    GF_VALIDATE_OR_GOTO("tier", this, out);
    GF_VALIDATE_OR_GOTO(this->name, this->private, out);

    conf = this->private;

    subvol = TIER_HASHED_SUBVOL;

out:
    return subvol;
}

static int
tier_load_externals(xlator_t *this)
{
    int ret = -1;
    char *libpathfull = (LIBDIR "/libgfdb.so.0");
    get_gfdb_methods_t get_gfdb_methods;

    GF_VALIDATE_OR_GOTO("this", this, out);

    libhandle = dlopen(libpathfull, RTLD_NOW);
    if (!libhandle) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Error loading libgfdb.so %s\n", dlerror());
        ret = -1;
        goto out;
    }

    get_gfdb_methods = dlsym(libhandle, "get_gfdb_methods");
    if (!get_gfdb_methods) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Error loading get_gfdb_methods()");
        ret = -1;
        goto out;
    }

    get_gfdb_methods(&gfdb_methods);

    ret = 0;

out:
    if (ret && libhandle)
        dlclose(libhandle);

    return ret;
}

static tier_mode_t
tier_validate_mode(char *mode)
{
    int ret = -1;

    if (strcmp(mode, "test") == 0) {
        ret = TIER_MODE_TEST;
    } else {
        ret = TIER_MODE_WM;
    }

    return ret;
}

static gf_boolean_t
tier_validate_compact_mode(char *mode)
{
    gf_boolean_t ret = _gf_false;

    gf_msg("tier", GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
           "tier_validate_compact_mode: mode = %s", mode);

    if (!strcmp(mode, "on")) {
        ret = _gf_true;
    } else {
        ret = _gf_false;
    }

    gf_msg("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_STATUS,
           "tier_validate_compact_mode: ret = %i", ret);

    return ret;
}

int
tier_init_methods(xlator_t *this)
{
    int ret = -1;
    dht_conf_t *conf = NULL;
    dht_methods_t *methods = NULL;

    GF_VALIDATE_OR_GOTO("tier", this, err);

    conf = this->private;

    methods = &(conf->methods);

    methods->migration_get_dst_subvol = tier_migration_get_dst;
    methods->migration_other = tier_start;
    methods->migration_needed = tier_migration_needed;
    methods->layout_search = tier_search;

    ret = 0;
err:
    return ret;
}

static void
tier_save_vol_name(xlator_t *this)
{
    dht_conf_t *conf = NULL;
    gf_defrag_info_t *defrag = NULL;
    char *suffix = NULL;
    int name_len = 0;

    conf = this->private;
    defrag = conf->defrag;

    suffix = strstr(this->name, "-tier-dht");

    if (suffix)
        name_len = suffix - this->name;
    else
        name_len = strlen(this->name);

    if (name_len > GD_VOLUME_NAME_MAX)
        name_len = GD_VOLUME_NAME_MAX;

    strncpy(defrag->tier_conf.volname, this->name, name_len);
    defrag->tier_conf.volname[name_len] = 0;
}

int
tier_init(xlator_t *this)
{
    int ret = -1;
    int freq = 0;
    int maxsize = 0;
    dht_conf_t *conf = NULL;
    gf_defrag_info_t *defrag = NULL;
    char *voldir = NULL;
    char *mode = NULL;
    char *paused = NULL;
    tier_mode_t tier_mode = DEFAULT_TIER_MODE;
    gf_boolean_t compact_mode = _gf_false;

    ret = dht_init(this);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "tier_init failed");
        goto out;
    }

    conf = this->private;

    ret = tier_init_methods(this);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "tier_init_methods failed");
        goto out;
    }

    if (conf->subvolume_cnt != 2) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Invalid number of subvolumes %d", conf->subvolume_cnt);
        goto out;
    }

    /* if instatiated from client side initialization is complete. */
    if (!conf->defrag) {
        ret = 0;
        goto out;
    }

    /* if instatiated from server side, load db libraries */
    ret = tier_load_externals(this);
    if (ret) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "Could not load externals. Aborting");
        goto out;
    }

    defrag = conf->defrag;

    defrag->tier_conf.last_demote_qfile_index = 0;
    defrag->tier_conf.last_promote_qfile_index = 0;

    defrag->tier_conf.is_tier = 1;
    defrag->this = this;

    ret = dict_get_int32(this->options, "tier-max-promote-file-size", &maxsize);
    if (ret) {
        maxsize = 0;
    }

    defrag->tier_conf.tier_max_promote_size = maxsize;

    ret = dict_get_int32(this->options, "tier-promote-frequency", &freq);
    if (ret) {
        freq = DEFAULT_PROMOTE_FREQ_SEC;
    }

    defrag->tier_conf.tier_promote_frequency = freq;

    ret = dict_get_int32(this->options, "tier-demote-frequency", &freq);
    if (ret) {
        freq = DEFAULT_DEMOTE_FREQ_SEC;
    }

    defrag->tier_conf.tier_demote_frequency = freq;

    ret = dict_get_int32(this->options, "tier-hot-compact-frequency", &freq);
    if (ret) {
        freq = DEFAULT_HOT_COMPACT_FREQ_SEC;
    }

    defrag->tier_conf.tier_compact_hot_frequency = freq;

    ret = dict_get_int32(this->options, "tier-cold-compact-frequency", &freq);
    if (ret) {
        freq = DEFAULT_COLD_COMPACT_FREQ_SEC;
    }

    defrag->tier_conf.tier_compact_cold_frequency = freq;

    ret = dict_get_int32(this->options, "watermark-hi", &freq);
    if (ret) {
        freq = DEFAULT_WM_HI;
    }

    defrag->tier_conf.watermark_hi = freq;

    ret = dict_get_int32(this->options, "watermark-low", &freq);
    if (ret) {
        freq = DEFAULT_WM_LOW;
    }

    defrag->tier_conf.watermark_low = freq;

    ret = dict_get_int32(this->options, "write-freq-threshold", &freq);
    if (ret) {
        freq = DEFAULT_WRITE_FREQ_SEC;
    }

    defrag->write_freq_threshold = freq;

    ret = dict_get_int32(this->options, "read-freq-threshold", &freq);
    if (ret) {
        freq = DEFAULT_READ_FREQ_SEC;
    }

    defrag->read_freq_threshold = freq;

    ret = dict_get_int32(this->options, "tier-max-mb", &freq);
    if (ret) {
        freq = DEFAULT_TIER_MAX_MIGRATE_MB;
    }

    defrag->tier_conf.max_migrate_bytes = (uint64_t)freq * 1024 * 1024;

    ret = dict_get_int32(this->options, "tier-max-files", &freq);
    if (ret) {
        freq = DEFAULT_TIER_MAX_MIGRATE_FILES;
    }

    defrag->tier_conf.max_migrate_files = freq;

    ret = dict_get_int32(this->options, "tier-query-limit",
                         &(defrag->tier_conf.query_limit));
    if (ret) {
        defrag->tier_conf.query_limit = DEFAULT_TIER_QUERY_LIMIT;
    }

    ret = dict_get_str(this->options, "tier-compact", &mode);

    if (ret) {
        defrag->tier_conf.compact_active = DEFAULT_COMP_MODE;
    } else {
        compact_mode = tier_validate_compact_mode(mode);
        /* If compaction is now active, we need to inform the bricks on
           the hot and cold tier of this. See dht-common.h for more. */
        defrag->tier_conf.compact_active = compact_mode;
        if (compact_mode) {
            defrag->tier_conf.compact_mode_switched_hot = _gf_true;
            defrag->tier_conf.compact_mode_switched_cold = _gf_true;
        }
    }

    ret = dict_get_str(this->options, "tier-mode", &mode);
    if (ret) {
        defrag->tier_conf.mode = DEFAULT_TIER_MODE;
    } else {
        tier_mode = tier_validate_mode(mode);
        defrag->tier_conf.mode = tier_mode;
    }

    pthread_mutex_init(&defrag->tier_conf.pause_mutex, 0);

    gf_defrag_set_pause_state(&defrag->tier_conf, TIER_RUNNING);

    ret = dict_get_str(this->options, "tier-pause", &paused);

    if (paused && strcmp(paused, "on") == 0)
        gf_defrag_set_pause_state(&defrag->tier_conf, TIER_REQUEST_PAUSE);

    ret = gf_asprintf(&voldir, "%s/%s", DEFAULT_VAR_RUN_DIRECTORY, this->name);
    if (ret < 0)
        goto out;

    ret = mkdir_p(voldir, 0777, _gf_true);
    if (ret == -1 && errno != EEXIST) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
               "tier_init failed");

        GF_FREE(voldir);
        goto out;
    }

    GF_FREE(voldir);

    ret = gf_asprintf(&promotion_qfile, "%s/%s/promote",
                      DEFAULT_VAR_RUN_DIRECTORY, this->name);
    if (ret < 0)
        goto out;

    ret = gf_asprintf(&demotion_qfile, "%s/%s/demote",
                      DEFAULT_VAR_RUN_DIRECTORY, this->name);
    if (ret < 0) {
        GF_FREE(promotion_qfile);
        goto out;
    }

    gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
           "Promote/demote frequency %d/%d "
           "Write/Read freq thresholds %d/%d",
           defrag->tier_conf.tier_promote_frequency,
           defrag->tier_conf.tier_demote_frequency,
           defrag->write_freq_threshold, defrag->read_freq_threshold);

    tier_save_vol_name(this);

    ret = 0;

out:

    return ret;
}

int
tier_cli_pause_done(int op_ret, call_frame_t *sync_frame, void *data)
{
    gf_msg("tier", GF_LOG_INFO, 0, DHT_MSG_TIER_PAUSED,
           "Migrate file paused with op_ret %d", op_ret);

    return op_ret;
}

int
tier_cli_pause(void *data)
{
    gf_defrag_info_t *defrag = NULL;
    xlator_t *this = NULL;
    dht_conf_t *conf = NULL;
    int ret = -1;

    this = data;

    conf = this->private;
    GF_VALIDATE_OR_GOTO(this->name, conf, exit);

    defrag = conf->defrag;
    GF_VALIDATE_OR_GOTO(this->name, defrag, exit);

    gf_defrag_pause_tier(this, defrag);

    ret = 0;
exit:
    return ret;
}

int
tier_reconfigure(xlator_t *this, dict_t *options)
{
    dht_conf_t *conf = NULL;
    gf_defrag_info_t *defrag = NULL;
    char *mode = NULL;
    int migrate_mb = 0;
    gf_boolean_t req_pause = _gf_false;
    int ret = 0;
    call_frame_t *frame = NULL;
    gf_boolean_t last_compact_setting = _gf_false;

    conf = this->private;

    if (conf->defrag) {
        defrag = conf->defrag;
        GF_OPTION_RECONF("tier-max-promote-file-size",
                         defrag->tier_conf.tier_max_promote_size, options,
                         int32, out);

        GF_OPTION_RECONF("tier-promote-frequency",
                         defrag->tier_conf.tier_promote_frequency, options,
                         int32, out);

        GF_OPTION_RECONF("tier-demote-frequency",
                         defrag->tier_conf.tier_demote_frequency, options,
                         int32, out);

        GF_OPTION_RECONF("write-freq-threshold", defrag->write_freq_threshold,
                         options, int32, out);

        GF_OPTION_RECONF("read-freq-threshold", defrag->read_freq_threshold,
                         options, int32, out);

        GF_OPTION_RECONF("watermark-hi", defrag->tier_conf.watermark_hi,
                         options, int32, out);

        GF_OPTION_RECONF("watermark-low", defrag->tier_conf.watermark_low,
                         options, int32, out);

        last_compact_setting = defrag->tier_conf.compact_active;

        GF_OPTION_RECONF("tier-compact", defrag->tier_conf.compact_active,
                         options, bool, out);

        if (last_compact_setting != defrag->tier_conf.compact_active) {
            defrag->tier_conf.compact_mode_switched_hot = _gf_true;
            defrag->tier_conf.compact_mode_switched_cold = _gf_true;
            gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
                   "compact mode switched");
        }

        GF_OPTION_RECONF("tier-hot-compact-frequency",
                         defrag->tier_conf.tier_compact_hot_frequency, options,
                         int32, out);

        GF_OPTION_RECONF("tier-cold-compact-frequency",
                         defrag->tier_conf.tier_compact_cold_frequency, options,
                         int32, out);

        GF_OPTION_RECONF("tier-mode", mode, options, str, out);
        defrag->tier_conf.mode = tier_validate_mode(mode);

        GF_OPTION_RECONF("tier-max-mb", migrate_mb, options, int32, out);
        defrag->tier_conf.max_migrate_bytes = (uint64_t)migrate_mb * 1024 *
                                              1024;

        GF_OPTION_RECONF("tier-max-files", defrag->tier_conf.max_migrate_files,
                         options, int32, out);

        GF_OPTION_RECONF("tier-query-limit", defrag->tier_conf.query_limit,
                         options, int32, out);

        GF_OPTION_RECONF("tier-pause", req_pause, options, bool, out);

        if (req_pause == _gf_true) {
            frame = create_frame(this, this->ctx->pool);
            if (!frame)
                goto out;

            frame->root->pid = GF_CLIENT_PID_DEFRAG;

            ret = synctask_new(this->ctx->env, tier_cli_pause,
                               tier_cli_pause_done, frame, this);

            if (ret) {
                gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                       "pause tier failed on reconfigure");
            }
        } else {
            ret = gf_defrag_resume_tier(this, defrag);
            if (ret) {
                gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                       "resume tier failed on reconfigure");
            }
        }
    }

out:
    return dht_reconfigure(this, options);
}

void
tier_fini(xlator_t *this)
{
    if (libhandle)
        dlclose(libhandle);

    GF_FREE(demotion_qfile);
    GF_FREE(promotion_qfile);

    dht_fini(this);
}

struct xlator_fops fops = {

    .lookup = dht_lookup,
    .create = tier_create,
    .mknod = dht_mknod,

    .open = dht_open,
    .statfs = tier_statfs,
    .opendir = dht_opendir,
    .readdir = tier_readdir,
    .readdirp = tier_readdirp,
    .fsyncdir = dht_fsyncdir,
    .symlink = dht_symlink,
    .unlink = tier_unlink,
    .link = tier_link,
    .mkdir = dht_mkdir,
    .rmdir = dht_rmdir,
    .rename = dht_rename,
    .entrylk = dht_entrylk,
    .fentrylk = dht_fentrylk,

    /* Inode read operations */
    .stat = dht_stat,
    .fstat = dht_fstat,
    .access = dht_access,
    .readlink = dht_readlink,
    .getxattr = dht_getxattr,
    .fgetxattr = dht_fgetxattr,
    .readv = dht_readv,
    .flush = dht_flush,
    .fsync = dht_fsync,
    .inodelk = dht_inodelk,
    .finodelk = dht_finodelk,
    .lk = dht_lk,

    /* Inode write operations */
    .fremovexattr = dht_fremovexattr,
    .removexattr = dht_removexattr,
    .setxattr = dht_setxattr,
    .fsetxattr = dht_fsetxattr,
    .truncate = dht_truncate,
    .ftruncate = dht_ftruncate,
    .writev = dht_writev,
    .xattrop = dht_xattrop,
    .fxattrop = dht_fxattrop,
    .setattr = dht_setattr,
    .fsetattr = dht_fsetattr,
    .fallocate = dht_fallocate,
    .discard = dht_discard,
    .zerofill = dht_zerofill,
};

struct xlator_cbks cbks = {.release = dht_release, .forget = dht_forget};

extern int32_t
mem_acct_init(xlator_t *this);

extern struct volume_options dht_options[];

xlator_api_t xlator_api = {
    .init = tier_init,
    .fini = tier_fini,
    .notify = dht_notify,
    .reconfigure = tier_reconfigure,
    .mem_acct_init = mem_acct_init,
    .op_version = {GD_OP_VERSION_3_7_0}, /* Present from the initial version */
    .fops = &fops,
    .cbks = &cbks,
    .options = dht_options,
    .identifier = "tier",
    .category = GF_MAINTAINED,
};