/*
Copyright (c) 2010-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/resource.h>
#include <sys/file.h>
#include <netdb.h>
#include <signal.h>
#include <libgen.h>
#include <sys/utsname.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <time.h>
#include <semaphore.h>
#include <errno.h>
#ifdef HAVE_MALLOC_H
#include <malloc.h>
#endif
#ifdef HAVE_MALLOC_STATS
#ifdef DEBUG
#include <mcheck.h>
#endif
#endif
#include "quota.h"
#include "quota-messages.h"
extern struct rpc_clnt_program quota_enforcer_clnt;
int32_t
quota_validate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *buf, dict_t *xdata, struct iatt *postparent);
int
quota_enforcer_submit_request(void *req, call_frame_t *frame,
rpc_clnt_prog_t *prog, int procnum,
struct iobref *iobref, xlator_t *this,
fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
{
int ret = -1;
int count = 0;
struct iovec iov = {
0,
};
struct iobuf *iobuf = NULL;
char new_iobref = 0;
ssize_t xdr_size = 0;
quota_priv_t *priv = NULL;
GF_ASSERT(this);
priv = this->private;
if (req) {
xdr_size = xdr_sizeof(xdrproc, req);
iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size);
if (!iobuf) {
goto out;
}
if (!iobref) {
iobref = iobref_new();
if (!iobref) {
goto out;
}
new_iobref = 1;
}
iobref_add(iobref, iobuf);
iov.iov_base = iobuf->ptr;
iov.iov_len = iobuf_size(iobuf);
/* Create the xdr payload */
ret = xdr_serialize_generic(iov, req, xdrproc);
if (ret == -1) {
goto out;
}
iov.iov_len = ret;
count = 1;
}
/* Send the msg */
ret = rpc_clnt_submit(priv->rpc_clnt, prog, procnum, cbkfn, &iov, count,
NULL, 0, iobref, frame, NULL, 0, NULL, 0, NULL);
ret = 0;
out:
if (new_iobref)
iobref_unref(iobref);
if (iobuf)
iobuf_unref(iobuf);
return ret;
}
int
quota_enforcer_lookup_cbk(struct rpc_req *req, struct iovec *iov, int count,
void *myframe)
{
quota_local_t *local = NULL;
call_frame_t *frame = NULL;
int ret = 0;
gfs3_lookup_rsp rsp = {
0,
};
struct iatt stbuf = {
0,
};
struct iatt postparent = {
0,
};
int op_errno = EINVAL;
dict_t *xdata = NULL;
inode_t *inode = NULL;
xlator_t *this = NULL;
quota_priv_t *priv = NULL;
struct timespec retry_delay = {
0,
};
gf_timer_t *timer = NULL;
this = THIS;
frame = myframe;
local = frame->local;
inode = local->validate_loc.inode;
priv = this->private;
if (-1 == req->rpc_status) {
rsp.op_ret = -1;
op_errno = ENOTCONN;
goto out;
}
ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gfs3_lookup_rsp);
if (ret < 0) {
gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_XDR_DECODING_FAILED,
"XDR decoding failed");
rsp.op_ret = -1;
op_errno = EINVAL;
goto out;
}
op_errno = gf_error_to_errno(rsp.op_errno);
gf_stat_to_iatt(&rsp.postparent, &postparent);
if (rsp.op_ret == -1)
goto out;
rsp.op_ret = -1;
gf_stat_to_iatt(&rsp.stat, &stbuf);
GF_PROTOCOL_DICT_UNSERIALIZE(frame->this, xdata, (rsp.xdata.xdata_val),
(rsp.xdata.xdata_len), rsp.op_ret, op_errno,
out);
if ((!gf_uuid_is_null(inode->gfid)) &&
(gf_uuid_compare(stbuf.ia_gfid, inode->gfid) != 0)) {
gf_msg_debug(frame->this->name, ESTALE, "gfid changed for %s",
local->validate_loc.path);
rsp.op_ret = -1;
op_errno = ESTALE;
goto out;
}
rsp.op_ret = 0;
out:
rsp.op_errno = op_errno;
/* We need to retry connecting to quotad on ENOTCONN error.
* Suppose if there are two volumes vol1 and vol2,
* and quota is enabled and limit is set on vol1.
* Now if IO is happening on vol1 and quota is enabled/disabled
* on vol2, quotad gets restarted and client will receive
* ENOTCONN in the IO path of vol1
*/
if (rsp.op_ret == -1 && rsp.op_errno == ENOTCONN) {
if (local->quotad_conn_retry >= 12) {
priv->quotad_conn_status = 1;
gf_log(this->name, GF_LOG_WARNING,
"failed to connect "
"to quotad after retry count %d)",
local->quotad_conn_retry);
} else {
local->quotad_conn_retry++;
}
if (priv->quotad_conn_status == 0) {
/* retry connecting after 5secs for 12 retries
* (up to 60sec).
*/
gf_log(this->name, GF_LOG_DEBUG,
"retry connecting to "
"quotad (retry count %d)",
local->quotad_conn_retry);
retry_delay.tv_sec = 5;
retry_delay.tv_nsec = 0;
timer = gf_timer_call_after(this->ctx, retry_delay,
_quota_enforcer_lookup, (void *)frame);
if (timer == NULL) {
gf_log(this->name, GF_LOG_WARNING,
"failed to "
"set quota_enforcer_lookup with timer");
} else {
goto clean;
}
}
} else {
priv->quotad_conn_status = 0;
}
if (rsp.op_ret == -1) {
/* any error other than ENOENT */
if (rsp.op_errno != ENOENT)
gf_msg(
this->name, GF_LOG_WARNING, rsp.op_errno, Q_MSG_LOOKUP_FAILED,
"Getting cluster-wide size of directory failed "
"(path: %s gfid:%s)",
local->validate_loc.path, loc_gfid_utoa(&local->validate_loc));
else
gf_msg_trace(this->name, ENOENT, "not found on remote node");
} else if (local->quotad_conn_retry) {
gf_log(this->name, GF_LOG_DEBUG,
"connected to quotad after "
"retry count %d",
local->quotad_conn_retry);
}
local->validate_cbk(frame, NULL, this, rsp.op_ret, rsp.op_errno, inode,
&stbuf, xdata, &postparent);
clean:
if (xdata)
dict_unref(xdata);
free(rsp.xdata.xdata_val);
return 0;
}
void
_quota_enforcer_lookup(void *data)
{
quota_local_t *local = NULL;
gfs3_lookup_req req = {
{
0,
},
};
int ret = 0;
int op_errno = ESTALE;
quota_priv_t *priv = NULL;
call_frame_t *frame = NULL;
loc_t *loc = NULL;
xlator_t *this = NULL;
char *dir_path = NULL;
frame = data;
local = frame->local;
this = local->this;
loc = &local->validate_loc;
priv = this->private;
if (!(loc && loc->inode))
goto unwind;
if (!gf_uuid_is_null(loc->inode->gfid))
memcpy(req.gfid, loc->inode->gfid, 16);
else
memcpy(req.gfid, loc->gfid, 16);
if (local->validate_xdata) {
GF_PROTOCOL_DICT_SERIALIZE(this, local->validate_xdata,
(&req.xdata.xdata_val), req.xdata.xdata_len,
op_errno, unwind);
}
if (loc->name)
req.bname = (char *)loc->name;
else
req.bname = "";
if (loc->path)
dir_path = (char *)loc->path;
else
dir_path = "";
ret = quota_enforcer_submit_request(
&req, frame, priv->quota_enforcer, GF_AGGREGATOR_LOOKUP, NULL, this,
quota_enforcer_lookup_cbk, (xdrproc_t)xdr_gfs3_lookup_req);
if (ret) {
gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_RPC_SUBMIT_FAILED,
"Couldn't send the request to "
"fetch cluster wide size of directory (path:%s gfid:%s)",
dir_path, req.gfid);
}
GF_FREE(req.xdata.xdata_val);
return;
unwind:
local->validate_cbk(frame, NULL, this, -1, op_errno, NULL, NULL, NULL,
NULL);
GF_FREE(req.xdata.xdata_val);
return;
}
int
quota_enforcer_lookup(call_frame_t *frame, xlator_t *this, dict_t *xdata,
fop_lookup_cbk_t validate_cbk)
{
quota_local_t *local = NULL;
if (!frame || !this)
goto unwind;
local = frame->local;
local->this = this;
local->validate_cbk = validate_cbk;
local->validate_xdata = dict_ref(xdata);
_quota_enforcer_lookup(frame);
return 0;
unwind:
validate_cbk(frame, NULL, this, -1, ESTALE, NULL, NULL, NULL, NULL);
return 0;
}
int
quota_enforcer_notify(struct rpc_clnt *rpc, void *mydata,
rpc_clnt_event_t event, void *data)
{
xlator_t *this = NULL;
int ret = 0;
quota_priv_t *priv = NULL;
this = mydata;
priv = this->private;
switch (event) {
case RPC_CLNT_CONNECT: {
pthread_mutex_lock(&priv->conn_mutex);
{
priv->conn_status = _gf_true;
}
pthread_mutex_unlock(&priv->conn_mutex);
gf_msg_trace(this->name, 0, "got RPC_CLNT_CONNECT");
break;
}
case RPC_CLNT_DISCONNECT: {
pthread_mutex_lock(&priv->conn_mutex);
{
priv->conn_status = _gf_false;
pthread_cond_signal(&priv->conn_cond);
}
pthread_mutex_unlock(&priv->conn_mutex);
gf_msg_trace(this->name, 0, "got RPC_CLNT_DISCONNECT");
break;
}
default:
gf_msg_trace(this->name, 0, "got some other RPC event %d", event);
ret = 0;
break;
}
return ret;
}
int
quota_enforcer_blocking_connect(rpc_clnt_t *rpc)
{
dict_t *options = NULL;
int ret = -1;
options = dict_new();
if (options == NULL)
goto out;
ret = dict_set_sizen_str_sizen(options, "non-blocking-io", "no");
if (ret)
goto out;
rpc->conn.trans->reconfigure(rpc->conn.trans, options);
rpc_clnt_start(rpc);
ret = dict_set_sizen_str_sizen(options, "non-blocking-io", "yes");
if (ret)
goto out;
rpc->conn.trans->reconfigure(rpc->conn.trans, options);
ret = 0;
out:
if (options)
dict_unref(options);
return ret;
}
// Returns a started rpc_clnt. Creates a new rpc_clnt if quota_priv doesn't have
// one already
struct rpc_clnt *
quota_enforcer_init(xlator_t *this, dict_t *options)
{
struct rpc_clnt *rpc = NULL;
quota_priv_t *priv = NULL;
int ret = -1;
priv = this->private;
LOCK(&priv->lock);
{
if (priv->rpc_clnt) {
ret = 0;
rpc = priv->rpc_clnt;
}
}
UNLOCK(&priv->lock);
if (rpc)
goto out;
priv->quota_enforcer = "a_enforcer_clnt;
ret = dict_set_sizen_str_sizen(options, "transport.address-family", "unix");
if (ret)
goto out;
ret = dict_set_sizen_str_sizen(options, "transport-type", "socket");
if (ret)
goto out;
ret = dict_set_sizen_str_sizen(options, "transport.socket.connect-path",
"/var/run/gluster/quotad.socket");
if (ret)
goto out;
rpc = rpc_clnt_new(options, this, this->name, 16);
if (!rpc) {
ret = -1;
goto out;
}
ret = rpc_clnt_register_notify(rpc, quota_enforcer_notify, this);
if (ret) {
gf_msg("quota", GF_LOG_ERROR, 0, Q_MSG_RPCCLNT_REGISTER_NOTIFY_FAILED,
"failed to register notify");
goto out;
}
ret = quota_enforcer_blocking_connect(rpc);
if (ret)
goto out;
ret = 0;
out:
if (ret) {
if (rpc)
rpc_clnt_unref(rpc);
rpc = NULL;
}
return rpc;
}
struct rpc_clnt_procedure quota_enforcer_actors[GF_AGGREGATOR_MAXVALUE] = {
[GF_AGGREGATOR_NULL] = {"NULL", NULL},
[GF_AGGREGATOR_LOOKUP] = {"LOOKUP", NULL},
};
struct rpc_clnt_program quota_enforcer_clnt = {
.progname = "Quota enforcer",
.prognum = GLUSTER_AGGREGATOR_PROGRAM,
.progver = GLUSTER_AGGREGATOR_VERSION,
.numproc = GF_AGGREGATOR_MAXVALUE,
.proctable = quota_enforcer_actors,
};