/*
Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include "glusterfs/syncop.h"
#include "glusterfs/syncop-utils.h"
#include "glusterfs/common-utils.h"
#include "glusterfs/libglusterfs-messages.h"
struct syncop_dir_scan_data {
xlator_t *subvol;
loc_t *parent;
void *data;
gf_dirent_t *q;
gf_dirent_t *entry;
pthread_cond_t *cond;
pthread_mutex_t *mut;
syncop_dir_scan_fn_t fn;
uint32_t *jobs_running;
uint32_t *qlen;
int32_t *retval;
};
int
syncop_dirfd(xlator_t *subvol, loc_t *loc, fd_t **fd, int pid)
{
int ret = 0;
fd_t *dirfd = NULL;
if (!fd)
return -EINVAL;
dirfd = fd_create(loc->inode, pid);
if (!dirfd) {
gf_msg(subvol->name, GF_LOG_ERROR, errno, LG_MSG_FD_CREATE_FAILED,
"fd_create of %s", uuid_utoa(loc->gfid));
ret = -errno;
goto out;
}
ret = syncop_opendir(subvol, loc, dirfd, NULL, NULL);
if (ret) {
/*
* On Linux, if the brick was not updated, opendir will
* fail. We therefore use backward compatible code
* that violate the standards by reusing offsets
* in seekdir() from different DIR *, but it works on Linux.
*
* On other systems it never worked, hence we do not need
* to provide backward-compatibility.
*/
#ifdef GF_LINUX_HOST_OS
fd_unref(dirfd);
dirfd = fd_anonymous(loc->inode);
if (!dirfd) {
gf_msg(subvol->name, GF_LOG_ERROR, errno,
LG_MSG_FD_ANONYMOUS_FAILED,
"fd_anonymous of "
"%s",
uuid_utoa(loc->gfid));
ret = -errno;
goto out;
}
ret = 0;
#else /* GF_LINUX_HOST_OS */
fd_unref(dirfd);
gf_msg(subvol->name, GF_LOG_ERROR, errno, LG_MSG_DIR_OP_FAILED,
"opendir of %s", uuid_utoa(loc->gfid));
goto out;
#endif /* GF_LINUX_HOST_OS */
} else {
fd_bind(dirfd);
}
out:
if (ret == 0)
*fd = dirfd;
return ret;
}
int
syncop_ftw(xlator_t *subvol, loc_t *loc, int pid, void *data,
int (*fn)(xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
void *data))
{
loc_t child_loc = {
0,
};
fd_t *fd = NULL;
uint64_t offset = 0;
gf_dirent_t *entry = NULL;
int ret = 0;
gf_dirent_t entries;
ret = syncop_dirfd(subvol, loc, &fd, pid);
if (ret)
goto out;
INIT_LIST_HEAD(&entries.list);
while ((ret = syncop_readdirp(subvol, fd, 131072, offset, &entries, NULL,
NULL))) {
if (ret < 0)
break;
if (ret > 0) {
/* If the entries are only '.', and '..' then ret
* value will be non-zero. so set it to zero here. */
ret = 0;
}
list_for_each_entry(entry, &entries.list, list)
{
offset = entry->d_off;
if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
continue;
gf_link_inode_from_dirent(NULL, fd->inode, entry);
ret = fn(subvol, entry, loc, data);
if (ret)
break;
if (entry->d_stat.ia_type == IA_IFDIR) {
child_loc.inode = inode_ref(entry->inode);
gf_uuid_copy(child_loc.gfid, entry->inode->gfid);
ret = syncop_ftw(subvol, &child_loc, pid, data, fn);
loc_wipe(&child_loc);
if (ret)
break;
}
}
gf_dirent_free(&entries);
if (ret)
break;
}
out:
if (fd)
fd_unref(fd);
return ret;
}
/**
* Syncop_ftw_throttle can be used in a configurable way to control
* the speed at which crawling is done. It takes 2 more arguments
* compared to syncop_ftw.
* After @count entries are finished in a directory (to be
* precise, @count files) sleep for @sleep_time seconds.
* If either @count or @sleep_time is <=0, then it behaves similar to
* syncop_ftw.
*/
int
syncop_ftw_throttle(xlator_t *subvol, loc_t *loc, int pid, void *data,
int (*fn)(xlator_t *subvol, gf_dirent_t *entry,
loc_t *parent, void *data),
int count, int sleep_time)
{
loc_t child_loc = {
0,
};
fd_t *fd = NULL;
uint64_t offset = 0;
gf_dirent_t *entry = NULL;
int ret = 0;
gf_dirent_t entries;
int tmp = 0;
if (sleep_time <= 0) {
ret = syncop_ftw(subvol, loc, pid, data, fn);
goto out;
}
ret = syncop_dirfd(subvol, loc, &fd, pid);
if (ret)
goto out;
INIT_LIST_HEAD(&entries.list);
while ((ret = syncop_readdirp(subvol, fd, 131072, offset, &entries, NULL,
NULL))) {
if (ret < 0)
break;
if (ret > 0) {
/* If the entries are only '.', and '..' then ret
* value will be non-zero. so set it to zero here. */
ret = 0;
}
tmp = 0;
list_for_each_entry(entry, &entries.list, list)
{
offset = entry->d_off;
if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
continue;
if (++tmp >= count) {
tmp = 0;
sleep(sleep_time);
}
gf_link_inode_from_dirent(NULL, fd->inode, entry);
ret = fn(subvol, entry, loc, data);
if (ret)
continue;
if (entry->d_stat.ia_type == IA_IFDIR) {
child_loc.inode = inode_ref(entry->inode);
gf_uuid_copy(child_loc.gfid, entry->inode->gfid);
ret = syncop_ftw_throttle(subvol, &child_loc, pid, data, fn,
count, sleep_time);
loc_wipe(&child_loc);
if (ret)
continue;
}
}
gf_dirent_free(&entries);
if (ret)
break;
}
out:
if (fd)
fd_unref(fd);
return ret;
}
static void
_scan_data_destroy(struct syncop_dir_scan_data *data)
{
GF_FREE(data);
}
static int
_dir_scan_job_fn_cbk(int ret, call_frame_t *frame, void *opaque)
{
struct syncop_dir_scan_data *scan_data = opaque;
_scan_data_destroy(scan_data);
return 0;
}
static int
_dir_scan_job_fn(void *data)
{
struct syncop_dir_scan_data *scan_data = data;
gf_dirent_t *entry = NULL;
int ret = 0;
entry = scan_data->entry;
scan_data->entry = NULL;
do {
ret = scan_data->fn(scan_data->subvol, entry, scan_data->parent,
scan_data->data);
gf_dirent_entry_free(entry);
entry = NULL;
pthread_mutex_lock(scan_data->mut);
{
if (ret)
*scan_data->retval |= ret;
if (list_empty(&scan_data->q->list)) {
(*scan_data->jobs_running)--;
pthread_cond_broadcast(scan_data->cond);
} else {
entry = list_first_entry(&scan_data->q->list,
typeof(*scan_data->q), list);
list_del_init(&entry->list);
(*scan_data->qlen)--;
}
}
pthread_mutex_unlock(scan_data->mut);
} while (entry);
return ret;
}
static int
_run_dir_scan_task(call_frame_t *frame, xlator_t *subvol, loc_t *parent,
gf_dirent_t *q, gf_dirent_t *entry, int *retval,
pthread_mutex_t *mut, pthread_cond_t *cond,
uint32_t *jobs_running, uint32_t *qlen,
syncop_dir_scan_fn_t fn, void *data)
{
int ret = 0;
struct syncop_dir_scan_data *scan_data = NULL;
scan_data = GF_CALLOC(1, sizeof(struct syncop_dir_scan_data),
gf_common_mt_scan_data);
if (!scan_data) {
ret = -ENOMEM;
goto out;
}
scan_data->subvol = subvol;
scan_data->parent = parent;
scan_data->data = data;
scan_data->mut = mut;
scan_data->cond = cond;
scan_data->fn = fn;
scan_data->jobs_running = jobs_running;
scan_data->entry = entry;
scan_data->q = q;
scan_data->qlen = qlen;
scan_data->retval = retval;
ret = synctask_new(subvol->ctx->env, _dir_scan_job_fn, _dir_scan_job_fn_cbk,
frame, scan_data);
out:
if (ret < 0) {
gf_dirent_entry_free(entry);
_scan_data_destroy(scan_data);
pthread_mutex_lock(mut);
{
*jobs_running = *jobs_running - 1;
}
pthread_mutex_unlock(mut);
/*No need to cond-broadcast*/
}
return ret;
}
int
syncop_mt_dir_scan(call_frame_t *frame, xlator_t *subvol, loc_t *loc, int pid,
void *data, syncop_dir_scan_fn_t fn, dict_t *xdata,
uint32_t max_jobs, uint32_t max_qlen)
{
fd_t *fd = NULL;
uint64_t offset = 0;
gf_dirent_t *last = NULL;
int ret = 0;
int retval = 0;
gf_dirent_t q;
gf_dirent_t *entry = NULL;
gf_dirent_t *tmp = NULL;
uint32_t jobs_running = 0;
uint32_t qlen = 0;
pthread_cond_t cond;
pthread_mutex_t mut;
gf_boolean_t cond_init = _gf_false;
gf_boolean_t mut_init = _gf_false;
gf_dirent_t entries;
/*For this functionality to be implemented in general, we need
* synccond_t infra which doesn't block the executing thread. Until then
* return failures inside synctask if they use this.*/
if (synctask_get())
return -ENOTSUP;
if (max_jobs == 0)
return -EINVAL;
/*Code becomes simpler this way. cond_wait just on qlength.
* Little bit of cheating*/
if (max_qlen == 0)
max_qlen = 1;
ret = syncop_dirfd(subvol, loc, &fd, pid);
if (ret)
goto out;
INIT_LIST_HEAD(&entries.list);
INIT_LIST_HEAD(&q.list);
ret = pthread_mutex_init(&mut, NULL);
if (ret)
goto out;
mut_init = _gf_true;
ret = pthread_cond_init(&cond, NULL);
if (ret)
goto out;
cond_init = _gf_true;
while ((ret = syncop_readdir(subvol, fd, 131072, offset, &entries, xdata,
NULL))) {
if (ret < 0)
break;
if (ret > 0) {
/* If the entries are only '.', and '..' then ret
* value will be non-zero. so set it to zero here. */
ret = 0;
}
last = list_last_entry(&entries.list, typeof(*last), list);
offset = last->d_off;
list_for_each_entry_safe(entry, tmp, &entries.list, list)
{
list_del_init(&entry->list);
if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, "..")) {
gf_dirent_entry_free(entry);
continue;
}
if (entry->d_stat.ia_type == IA_IFDIR) {
ret = fn(subvol, entry, loc, data);
gf_dirent_entry_free(entry);
if (ret)
goto out;
continue;
}
if (retval) /*Any jobs failed?*/
goto out;
pthread_mutex_lock(&mut);
{
while (qlen == max_qlen)
pthread_cond_wait(&cond, &mut);
if (max_jobs == jobs_running) {
list_add_tail(&entry->list, &q.list);
qlen++;
entry = NULL;
} else {
jobs_running++;
}
}
pthread_mutex_unlock(&mut);
if (!entry)
continue;
ret = _run_dir_scan_task(frame, subvol, loc, &q, entry, &retval,
&mut, &cond, &jobs_running, &qlen, fn,
data);
if (ret)
goto out;
}
}
out:
if (fd)
fd_unref(fd);
if (mut_init && cond_init) {
pthread_mutex_lock(&mut);
{
while (jobs_running)
pthread_cond_wait(&cond, &mut);
}
pthread_mutex_unlock(&mut);
gf_dirent_free(&q);
gf_dirent_free(&entries);
}
if (mut_init)
pthread_mutex_destroy(&mut);
if (cond_init)
pthread_cond_destroy(&cond);
return ret | retval;
}
int
syncop_dir_scan(xlator_t *subvol, loc_t *loc, int pid, void *data,
int (*fn)(xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
void *data))
{
fd_t *fd = NULL;
uint64_t offset = 0;
gf_dirent_t *entry = NULL;
int ret = 0;
gf_dirent_t entries;
ret = syncop_dirfd(subvol, loc, &fd, pid);
if (ret)
goto out;
INIT_LIST_HEAD(&entries.list);
while ((ret = syncop_readdir(subvol, fd, 131072, offset, &entries, NULL,
NULL))) {
if (ret < 0)
break;
if (ret > 0) {
/* If the entries are only '.', and '..' then ret
* value will be non-zero. so set it to zero here. */
ret = 0;
}
list_for_each_entry(entry, &entries.list, list)
{
offset = entry->d_off;
if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
continue;
ret = fn(subvol, entry, loc, data);
if (ret)
break;
}
gf_dirent_free(&entries);
if (ret)
break;
}
out:
if (fd)
fd_unref(fd);
return ret;
}
int
syncop_is_subvol_local(xlator_t *this, loc_t *loc, gf_boolean_t *is_local)
{
char *pathinfo = NULL;
dict_t *xattr = NULL;
int ret = 0;
if (!this || !this->type || !is_local)
return -EINVAL;
if (strcmp(this->type, "protocol/client") != 0)
return -EINVAL;
*is_local = _gf_false;
ret = syncop_getxattr(this, loc, &xattr, GF_XATTR_PATHINFO_KEY, NULL, NULL);
if (ret < 0) {
ret = -1;
goto out;
}
if (!xattr) {
ret = -EINVAL;
goto out;
}
ret = dict_get_str(xattr, GF_XATTR_PATHINFO_KEY, &pathinfo);
if (ret)
goto out;
ret = glusterfs_is_local_pathinfo(pathinfo, is_local);
gf_msg_debug(this->name, 0, "subvol %s is %slocal", this->name,
*is_local ? "" : "not ");
out:
if (xattr)
dict_unref(xattr);
return ret;
}
/**
* For hard resove, it it telling posix to make use of the
* gfid2path extended attribute stored on disk. Otherwise
* posix xlator (with GFID_TO_PATH_KEY as the key) will just
* do a in memory inode_path to get the path. Depending upon
* the consumer of this function, they can choose how they want
* to proceed. If doing a xattr operation sounds costly, then
* use GFID_TO_PATH_KEY as the key for getxattr.
**/
int
syncop_gfid_to_path_hard(inode_table_t *itable, xlator_t *subvol, uuid_t gfid,
inode_t *inode, char **path_p,
gf_boolean_t hard_resolve)
{
int ret = 0;
char *path = NULL;
loc_t loc = {
0,
};
dict_t *xattr = NULL;
gf_uuid_copy(loc.gfid, gfid);
if (!inode)
loc.inode = inode_new(itable);
else
loc.inode = inode_ref(inode);
if (!hard_resolve)
ret = syncop_getxattr(subvol, &loc, &xattr, GFID_TO_PATH_KEY, NULL,
NULL);
else
ret = syncop_getxattr(subvol, &loc, &xattr, GFID2PATH_VIRT_XATTR_KEY,
NULL, NULL);
if (ret < 0)
goto out;
/*
* posix will do dict_set_dynstr for GFID_TO_PATH_KEY i.e.
* for in memory search for the path. And for on disk xattr
* fetching of the path for the key GFID2PATH_VIRT_XATTR_KEY
* it uses dict_set_dynptr. So, for GFID2PATH_VIRT_XATTR_KEY
* use dict_get_ptr to avoid dict complaining about type
* mismatch (i.e. str vs ptr)
*/
if (!hard_resolve)
ret = dict_get_str(xattr, GFID_TO_PATH_KEY, &path);
else
ret = dict_get_ptr(xattr, GFID2PATH_VIRT_XATTR_KEY, (void **)&path);
if (ret || !path) {
ret = -EINVAL;
goto out;
}
if (path_p) {
*path_p = gf_strdup(path);
if (!*path_p) {
ret = -ENOMEM;
goto out;
}
}
ret = 0;
out:
if (xattr)
dict_unref(xattr);
loc_wipe(&loc);
return ret;
}
int
syncop_gfid_to_path(inode_table_t *itable, xlator_t *subvol, uuid_t gfid,
char **path_p)
{
return syncop_gfid_to_path_hard(itable, subvol, gfid, NULL, path_p,
_gf_false);
}
int
syncop_inode_find(xlator_t *this, xlator_t *subvol, uuid_t gfid,
inode_t **inode, dict_t *xdata, dict_t **rsp_dict)
{
int ret = 0;
loc_t loc = {
0,
};
struct iatt iatt = {
0,
};
*inode = NULL;
*inode = inode_find(this->itable, gfid);
if (*inode)
goto out;
loc.inode = inode_new(this->itable);
if (!loc.inode) {
ret = -ENOMEM;
goto out;
}
gf_uuid_copy(loc.gfid, gfid);
ret = syncop_lookup(subvol, &loc, &iatt, NULL, xdata, rsp_dict);
if (ret < 0)
goto out;
*inode = inode_link(loc.inode, NULL, NULL, &iatt);
if (!*inode) {
ret = -ENOMEM;
goto out;
}
out:
loc_wipe(&loc);
return ret;
}