Blob Blame History Raw
/*
   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/

#ifndef _CHANGELOG_HELPERS_H
#define _CHANGELOG_HELPERS_H

#include <glusterfs/locking.h>
#include <glusterfs/timer.h>
#include "pthread.h"
#include <glusterfs/iobuf.h>
#include <glusterfs/rot-buffs.h>

#include "changelog-misc.h"
#include <glusterfs/call-stub.h>

#include "rpcsvc.h"
#include "changelog-ev-handle.h"

#include "changelog.h"
#include "changelog-messages.h"

/**
 * the changelog entry
 */
typedef struct changelog_log_data {
    /* rollover related */
    unsigned long cld_roll_time;

    /* reopen changelog? */
    gf_boolean_t cld_finale;

    changelog_log_type cld_type;

    /**
     * sincd gfid is _always_ a necessity, it's not a part
     * of the iobuf. by doing this we do not add any overhead
     * for data and metadata related fops.
     */
    uuid_t cld_gfid;

    /**
     * iobufs are used for optionals records: pargfid, path,
     * write offsets etc.. It's the fop implementers job
     * to allocate (iobuf_get() in the fop) and get unref'ed
     * in the callback (CHANGELOG_STACK_UNWIND).
     */
    struct iobuf *cld_iobuf;

#define cld_ptr cld_iobuf->ptr

    /**
     * after allocation you can point this to the length of
     * usable data, but make sure it does not exceed the
     * the size of the requested iobuf.
     */
    size_t cld_iobuf_len;

#define cld_ptr_len cld_iobuf_len

    /**
     * number of optional records
     */
    int cld_xtra_records;
} changelog_log_data_t;

/**
 * holder for dispatch function and private data
 */

typedef struct changelog_priv changelog_priv_t;

typedef struct changelog_dispatcher {
    void *cd_data;
    int (*dispatchfn)(xlator_t *, changelog_priv_t *, void *,
                      changelog_log_data_t *, changelog_log_data_t *);
} changelog_dispatcher_t;

struct changelog_bootstrap {
    changelog_mode_t mode;
    int (*ctor)(xlator_t *, changelog_dispatcher_t *);
    int (*dtor)(xlator_t *, changelog_dispatcher_t *);
};

struct changelog_encoder {
    changelog_encoder_t encoder;
    int (*encode)(xlator_t *, changelog_log_data_t *);
};

/* xlator private */

typedef struct changelog_time_slice {
    /**
     * just in case we need nanosecond granularity some day.
     * field is unused as of now (maybe we'd need it later).
     */
    struct timeval tv_start;

    /**
     * version of changelog file, incremented each time changes
     * rollover.
     */
    unsigned long changelog_version[CHANGELOG_MAX_TYPE];
} changelog_time_slice_t;

typedef struct changelog_rollover {
    /* rollover thread */
    pthread_t rollover_th;

    xlator_t *this;

    pthread_mutex_t lock;
    pthread_cond_t cond;
    gf_boolean_t notify;
} changelog_rollover_t;

typedef struct changelog_fsync {
    /* fsync() thread */
    pthread_t fsync_th;

    xlator_t *this;
} changelog_fsync_t;

/* Draining during changelog rollover (for geo-rep snapshot dependency):
 * --------------------------------------------------------------------
 * The introduction of draining of in-transit fops during changelog rollover
 * (both explicit/timeout triggered) requires coloring of fops. Basically the
 * implementation requires two counters, one counter which keeps the count of
 * current intransit fops which should end up in current changelog and the other
 * counter to keep track of incoming fops which should be drained as part of
 * next changelog rollover event. The fops are colored w.r.t these counters.
 * The fops that are to be drained as part of current changelog rollover is
 * given one color and the fops which keep incoming during this and not
 * necessarily should end up in current changelog and should be drained as part
 * of next changelog rollover are given other color. The color switching
 * continues with each changelog rollover. Two colors(black and white) are
 * chosen here and initially black is chosen is default.
 */

typedef enum chlog_fop_color {
    FOP_COLOR_BLACK,
    FOP_COLOR_WHITE
} chlog_fop_color_t;

/* Barrier notify variable */
typedef struct barrier_notify {
    pthread_mutex_t bnotify_mutex;
    pthread_cond_t bnotify_cond;
    gf_boolean_t bnotify;
    gf_boolean_t bnotify_error;
} barrier_notify_t;

/* Two separate mutex and conditional variable set is used
 * to drain white and black fops. */

typedef struct drain_mgmt {
    pthread_mutex_t drain_black_mutex;
    pthread_cond_t drain_black_cond;
    pthread_mutex_t drain_white_mutex;
    pthread_cond_t drain_white_cond;
    /* Represents black fops count in-transit */
    unsigned long black_fop_cnt;
    /* Represents white fops count in-transit */
    unsigned long white_fop_cnt;
    gf_boolean_t drain_wait_black;
    gf_boolean_t drain_wait_white;
} drain_mgmt_t;

/* External barrier as a result of snap on/off indicating flag*/
typedef struct barrier_flags {
    gf_lock_t lock;
    gf_boolean_t barrier_ext;
} barrier_flags_t;

/* Event selection */
typedef struct changelog_ev_selector {
    gf_lock_t reflock;

    /**
     * Array of references for each selection bit.
     */
    unsigned int ref[CHANGELOG_EV_SELECTION_RANGE];
} changelog_ev_selector_t;

/* changelog's private structure */
struct changelog_priv {
    /* changelog journalling */
    gf_boolean_t active;

    /* changelog live notifications */
    gf_boolean_t rpc_active;

    /* to generate unique socket file per brick */
    char *changelog_brick;

    /* logging directory */
    char *changelog_dir;

    /* htime directory */
    char *htime_dir;

    /* one file for all changelog types */
    int changelog_fd;

    /* htime fd for current changelog session */
    int htime_fd;

    /*  c_snap_fd is fd for call-path changelog */
    int c_snap_fd;

    /* rollover_count used by htime */
    int rollover_count;

    gf_lock_t lock;

    /*  lock to synchronize CSNAP updation */
    gf_lock_t c_snap_lock;

    /* written end of the pipe */
    int wfd;

    /* rollover time */
    int32_t rollover_time;

    /* fsync() interval */
    int32_t fsync_interval;

    /* changelog type maps */
    const char *maps[CHANGELOG_MAX_TYPE];

    /* time slicer */
    changelog_time_slice_t slice;

    /* context of the updater */
    changelog_dispatcher_t cd;

    /* context of the rollover thread */
    changelog_rollover_t cr;

    /* context of fsync thread */
    changelog_fsync_t cf;

    /* operation mode */
    changelog_mode_t op_mode;

    /* bootstrap routine for 'current' logger */
    struct changelog_bootstrap *cb;

    /* encoder mode */
    changelog_encoder_t encode_mode;

    /* encoder */
    struct changelog_encoder *ce;

    /**
     * snapshot dependency changes
     */

    /* Draining of fops*/
    drain_mgmt_t dm;

    /* Represents the active color. Initially by default black */
    chlog_fop_color_t current_color;

    /* flag to determine explicit rollover is triggered */
    gf_boolean_t explicit_rollover;

    /* barrier notification variable protected by mutex */
    barrier_notify_t bn;

    /* barrier on/off indicating flags */
    barrier_flags_t bflags;

    /* changelog barrier on/off indicating flag */
    gf_boolean_t barrier_enabled;
    struct list_head queue;
    uint32_t queue_size;
    gf_timer_t *timer;
    struct timespec timeout;

    /**
     * buffers, RPC, event selection, notifications and other
     * beasts.
     */

    /* epoll pthread */
    pthread_t poller;

    /* rotational buffer */
    rbuf_t *rbuf;

    /* changelog RPC server */
    rpcsvc_t *rpc;

    /* event selection */
    changelog_ev_selector_t ev_selection;

    /* client handling (reverse connection) */
    pthread_t connector;

    int nr_dispatchers;
    pthread_t *ev_dispatcher;

    changelog_clnt_t connections;

    /* glusterfind dependency to capture paths on deleted entries*/
    gf_boolean_t capture_del_path;

    /* Save total no. of listners */
    gf_atomic_t listnercnt;

    /* Save total no. of xprt are associated with listner */
    gf_atomic_t xprtcnt;

    /* Save xprt list */
    struct list_head xprt_list;

    /* Save total no. of client connection */
    gf_atomic_t clntcnt;

    /* Save cleanup brick in victim */
    xlator_t *victim;

    /* Status to save cleanup notify status */
    gf_boolean_t notify_down;
};

struct changelog_local {
    inode_t *inode;
    gf_boolean_t update_no_check;

    changelog_log_data_t cld;

    /**
     * ->prev_entry is used in cases when there needs to be
     * additional changelog entry for the parent (eg. rename)
     * It's analogous to ->next in single linked list world,
     * but we call it as ->prev_entry... ha ha ha
     */
    struct changelog_local *prev_entry;

    /* snap dependency changes */
    chlog_fop_color_t color;
};

typedef struct changelog_local changelog_local_t;

/* inode version is stored in inode ctx */
typedef struct changelog_inode_ctx {
    unsigned long iversion[CHANGELOG_MAX_TYPE];
} changelog_inode_ctx_t;

#define CHANGELOG_INODE_VERSION_TYPE(ctx, type) &(ctx->iversion[type])

/**
 * Optional Records:
 *  fops that need to save additional information request a array of
 *  @changelog_opt_t struct. The array is allocated via @iobufs.
 */
typedef enum {
    CHANGELOG_OPT_REC_FOP,
    CHANGELOG_OPT_REC_ENTRY,
    CHANGELOG_OPT_REC_UINT32,
} changelog_optional_rec_type_t;

struct changelog_entry_fields {
    uuid_t cef_uuid;
    char *cef_bname;
    char *cef_path;
};

typedef struct {
    /**
     * @co_covert can be used to do post-processing of the record before
     * it's persisted to the CHANGELOG. If this is NULL, then the record
     * is persisted as per it's in memory format.
     */
    size_t (*co_convert)(void *data, char *buffer, gf_boolean_t encode);

    /* release routines */
    void (*co_free)(void *data);

    /* type of the field */
    changelog_optional_rec_type_t co_type;

    /**
     * sizeof of the 'valid' field in the union. This field is not used if
     * @co_convert is specified.
     */
    size_t co_len;

    union {
        unsigned int co_uint32;
        glusterfs_fop_t co_fop;
        struct changelog_entry_fields co_entry;
    };
} changelog_opt_t;

#define CHANGELOG_OPT_RECORD_LEN sizeof(changelog_opt_t)

/**
 * helpers routines
 */

int
changelog_thread_cleanup(xlator_t *this, pthread_t thr_id);

void *
changelog_get_usable_buffer(changelog_local_t *local);

void
changelog_set_usable_record_and_length(changelog_local_t *local, size_t len,
                                       int xr);
void
changelog_local_cleanup(xlator_t *xl, changelog_local_t *local);
changelog_local_t *
changelog_local_init(xlator_t *this, inode_t *inode, uuid_t gfid,
                     int xtra_records, gf_boolean_t update_flag);
int
changelog_start_next_change(xlator_t *this, changelog_priv_t *priv,
                            unsigned long ts, gf_boolean_t finale);
int
changelog_open_journal(xlator_t *this, changelog_priv_t *priv);
int
changelog_fill_rollover_data(changelog_log_data_t *cld, gf_boolean_t is_last);
int
changelog_inject_single_event(xlator_t *this, changelog_priv_t *priv,
                              changelog_log_data_t *cld);
size_t
changelog_entry_length();
int
changelog_write(int fd, char *buffer, size_t len);
int
changelog_write_change(changelog_priv_t *priv, char *buffer, size_t len);
int
changelog_handle_change(xlator_t *this, changelog_priv_t *priv,
                        changelog_log_data_t *cld);
void
changelog_update(xlator_t *this, changelog_priv_t *priv,
                 changelog_local_t *local, changelog_log_type type);
void *
changelog_rollover(void *data);
void *
changelog_fsync_thread(void *data);
int
changelog_forget(xlator_t *this, inode_t *inode);
int
htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts,
             char *buffer);
int
htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts);
int
htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts);

/* Geo-Rep snapshot dependency changes */
void
changelog_color_fop_and_inc_cnt(xlator_t *this, changelog_priv_t *priv,
                                changelog_local_t *local);
void
changelog_inc_fop_cnt(xlator_t *this, changelog_priv_t *priv,
                      changelog_local_t *local);
void
changelog_dec_fop_cnt(xlator_t *this, changelog_priv_t *priv,
                      changelog_local_t *local);
int
changelog_barrier_notify(changelog_priv_t *priv, char *buf);
void
changelog_barrier_cleanup(xlator_t *this, changelog_priv_t *priv,
                          struct list_head *queue);
void
changelog_drain_white_fops(xlator_t *this, changelog_priv_t *priv);
void
changelog_drain_black_fops(xlator_t *this, changelog_priv_t *priv);

/* Crash consistency of changelog wrt snapshot */
int
changelog_snap_logging_stop(xlator_t *this, changelog_priv_t *priv);
int
changelog_snap_logging_start(xlator_t *this, changelog_priv_t *priv);
int
changelog_snap_open(xlator_t *this, changelog_priv_t *priv);
int
changelog_snap_handle_ascii_change(xlator_t *this, changelog_log_data_t *cld);
int
changelog_snap_write_change(changelog_priv_t *priv, char *buffer, size_t len);

/* Changelog barrier routines */
void
__chlog_barrier_enqueue(xlator_t *this, call_stub_t *stub);
void
__chlog_barrier_disable(xlator_t *this, struct list_head *queue);
void
chlog_barrier_dequeue_all(xlator_t *this, struct list_head *queue);
call_stub_t *
__chlog_barrier_dequeue(xlator_t *this, struct list_head *queue);
int
__chlog_barrier_enable(xlator_t *this, changelog_priv_t *priv);

int32_t
changelog_fill_entry_buf(call_frame_t *frame, xlator_t *this, loc_t *loc,
                         changelog_local_t **local);

/* event selection routines */
void
changelog_select_event(xlator_t *, changelog_ev_selector_t *, unsigned int);
void
changelog_deselect_event(xlator_t *, changelog_ev_selector_t *, unsigned int);
int
changelog_init_event_selection(xlator_t *, changelog_ev_selector_t *);
int
changelog_cleanup_event_selection(xlator_t *, changelog_ev_selector_t *);
int
changelog_ev_selected(xlator_t *, changelog_ev_selector_t *, unsigned int);
void
changelog_dispatch_event(xlator_t *, changelog_priv_t *, changelog_event_t *);

changelog_inode_ctx_t *
__changelog_inode_ctx_get(xlator_t *, inode_t *, unsigned long **,
                          unsigned long *, changelog_log_type);
int
resolve_pargfid_to_path(xlator_t *this, const uuid_t gfid, char **path,
                        char *bname);

/* macros */

#define CHANGELOG_STACK_UNWIND(fop, frame, params...)                          \
    do {                                                                       \
        changelog_local_t *__local = NULL;                                     \
        xlator_t *__xl = NULL;                                                 \
        if (frame) {                                                           \
            __local = frame->local;                                            \
            __xl = frame->this;                                                \
            frame->local = NULL;                                               \
        }                                                                      \
        STACK_UNWIND_STRICT(fop, frame, params);                               \
        if (__local && __local->prev_entry)                                    \
            changelog_local_cleanup(__xl, __local->prev_entry);                \
        changelog_local_cleanup(__xl, __local);                                \
    } while (0)

#define CHANGELOG_IOBUF_REF(iobuf)                                             \
    do {                                                                       \
        if (iobuf)                                                             \
            iobuf_ref(iobuf);                                                  \
    } while (0)

#define CHANGELOG_IOBUF_UNREF(iobuf)                                           \
    do {                                                                       \
        if (iobuf)                                                             \
            iobuf_unref(iobuf);                                                \
    } while (0)

#define CHANGELOG_FILL_BUFFER(buffer, off, val, len)                           \
    do {                                                                       \
        memcpy(buffer + off, val, len);                                        \
        off += len;                                                            \
    } while (0)

#define SLICE_VERSION_UPDATE(slice)                                            \
    do {                                                                       \
        int i = 0;                                                             \
        for (; i < CHANGELOG_MAX_TYPE; i++) {                                  \
            slice->changelog_version[i]++;                                     \
        }                                                                      \
    } while (0)

#define CHANGELOG_FILL_UINT32(co, number, converter, xlen)                     \
    do {                                                                       \
        co->co_convert = converter;                                            \
        co->co_free = NULL;                                                    \
        co->co_type = CHANGELOG_OPT_REC_UINT32;                                \
        co->co_uint32 = number;                                                \
        xlen += sizeof(unsigned int);                                          \
    } while (0)

#define CHANGLOG_FILL_FOP_NUMBER(co, fop, converter, xlen)                     \
    do {                                                                       \
        co->co_convert = converter;                                            \
        co->co_free = NULL;                                                    \
        co->co_type = CHANGELOG_OPT_REC_FOP;                                   \
        co->co_fop = fop;                                                      \
        xlen += sizeof(fop);                                                   \
    } while (0)

#define CHANGELOG_FILL_ENTRY(co, pargfid, bname, converter, freefn, xlen,      \
                             label)                                            \
    do {                                                                       \
        co->co_convert = converter;                                            \
        co->co_free = freefn;                                                  \
        co->co_type = CHANGELOG_OPT_REC_ENTRY;                                 \
        gf_uuid_copy(co->co_entry.cef_uuid, pargfid);                          \
        co->co_entry.cef_bname = gf_strdup(bname);                             \
        if (!co->co_entry.cef_bname)                                           \
            goto label;                                                        \
        xlen += (UUID_CANONICAL_FORM_LEN + strlen(bname));                     \
    } while (0)

#define CHANGELOG_FILL_ENTRY_DIR_PATH(co, pargfid, bname, converter,           \
                                      del_freefn, xlen, label, capture_del)    \
    do {                                                                       \
        co->co_convert = converter;                                            \
        co->co_free = del_freefn;                                              \
        co->co_type = CHANGELOG_OPT_REC_ENTRY;                                 \
        gf_uuid_copy(co->co_entry.cef_uuid, pargfid);                          \
        co->co_entry.cef_bname = gf_strdup(bname);                             \
        if (!co->co_entry.cef_bname)                                           \
            goto label;                                                        \
        xlen += (UUID_CANONICAL_FORM_LEN + strlen(bname));                     \
        if (!capture_del ||                                                    \
            resolve_pargfid_to_path(this, pargfid, &(co->co_entry.cef_path),   \
                                    co->co_entry.cef_bname)) {                 \
            co->co_entry.cef_path = gf_strdup("\0");                           \
            xlen += 1;                                                         \
        } else {                                                               \
            xlen += (strlen(co->co_entry.cef_path));                           \
        }                                                                      \
    } while (0)

#define CHANGELOG_INIT(this, local, inode, gfid, xrec)                         \
    local = changelog_local_init(this, inode, gfid, xrec, _gf_false)

#define CHANGELOG_INIT_NOCHECK(this, local, inode, gfid, xrec)                 \
    local = changelog_local_init(this, inode, gfid, xrec, _gf_true)

#define CHANGELOG_NOT_ACTIVE_THEN_GOTO(frame, priv, label)                     \
    do {                                                                       \
        if (!priv->active)                                                     \
            goto label;                                                        \
        /* ignore rebalance process's activity. */                             \
        if ((frame->root->pid == GF_CLIENT_PID_DEFRAG) ||                      \
            (frame->root->pid == GF_CLIENT_PID_TIER_DEFRAG))                   \
            goto label;                                                        \
    } while (0)

/* If it is a METADATA entry and fop num being GF_FOP_NULL, don't
 * log in the changelog as it is of no use. And also if it is
 * logged, since slicing version checking is done for metadata
 * entries, the subsequent entries with valid fop num which falls
 * to same changelog will be missed. Hence check for boundary
 * condition.
 */
#define CHANGELOG_OP_BOUNDARY_CHECK(frame, label)                              \
    do {                                                                       \
        if (frame->root->op <= GF_FOP_NULL ||                                  \
            frame->root->op >= GF_FOP_MAXVALUE)                                \
            goto label;                                                        \
    } while (0)

/**
 * ignore internal fops for all clients except AFR self-heal daemon
 */
#define CHANGELOG_IF_INTERNAL_FOP_THEN_GOTO(frame, dict, label)                \
    do {                                                                       \
        if ((frame->root->pid != GF_CLIENT_PID_SELF_HEALD) && dict &&          \
            dict_get(dict, GLUSTERFS_INTERNAL_FOP_KEY))                        \
            goto label;                                                        \
    } while (0)

#define CHANGELOG_COND_GOTO(priv, cond, label)                                 \
    do {                                                                       \
        if (!priv->active || cond)                                             \
            goto label;                                                        \
    } while (0)

/* Begin: Geo-Rep snapshot dependency changes */

#define DICT_ERROR -1
#define BARRIER_OFF 0
#define BARRIER_ON 1
#define DICT_DEFAULT 2

#define CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, label)                           \
    do {                                                                       \
        if (!priv->active) {                                                   \
            gf_msg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_NOT_ACTIVE,    \
                   "Changelog is not active, return success");                 \
            ret = 0;                                                           \
            goto label;                                                        \
        }                                                                      \
    } while (0)

/* Log pthread error and goto label */
#define CHANGELOG_PTHREAD_ERROR_HANDLE_0(ret, label)                           \
    do {                                                                       \
        if (ret) {                                                             \
            gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_PTHREAD_ERROR,  \
                    "pthread error", "error=%d", ret, NULL);                   \
            ret = -1;                                                          \
            goto label;                                                        \
        }                                                                      \
    } while (0);

/* Log pthread error, set flag and goto label */
#define CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret, label, flag)                     \
    do {                                                                       \
        if (ret) {                                                             \
            gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_PTHREAD_ERROR,  \
                    "pthread error", "error=%d", ret, NULL);                   \
            ret = -1;                                                          \
            flag = _gf_true;                                                   \
            goto label;                                                        \
        }                                                                      \
    } while (0)

/* Log pthread error, unlock mutex and goto label */
#define CHANGELOG_PTHREAD_ERROR_HANDLE_2(ret, label, mutex)                    \
    do {                                                                       \
        if (ret) {                                                             \
            gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_PTHREAD_ERROR,  \
                    "pthread error", "error=%d", ret, NULL);                   \
            ret = -1;                                                          \
            pthread_mutex_unlock(&mutex);                                      \
            goto label;                                                        \
        }                                                                      \
    } while (0)

/* End: Geo-Rep snapshot dependency changes */

#endif /* _CHANGELOG_HELPERS_H */