/*
Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#define RPC_CLNT_DEFAULT_REQUEST_COUNT 512
#include "rpc-clnt.h"
#include "rpc-clnt-ping.h"
#include <glusterfs/byte-order.h>
#include "xdr-rpcclnt.h"
#include "rpc-transport.h"
#include "protocol-common.h"
#include <glusterfs/mem-pool.h>
#include "xdr-rpc.h"
#include "rpc-common-xdr.h"
void
rpc_clnt_reply_deinit(struct rpc_req *req, struct mem_pool *pool);
struct saved_frame *
__saved_frames_get_timedout(struct saved_frames *frames, uint32_t timeout,
struct timeval *current)
{
struct saved_frame *bailout_frame = NULL, *tmp = NULL;
if (!list_empty(&frames->sf.list)) {
tmp = list_entry(frames->sf.list.next, typeof(*tmp), list);
if ((tmp->saved_at.tv_sec + timeout) <= current->tv_sec) {
bailout_frame = tmp;
list_del_init(&bailout_frame->list);
frames->count--;
}
}
return bailout_frame;
}
static int
_is_lock_fop(struct saved_frame *sframe)
{
int fop = 0;
if (SFRAME_GET_PROGNUM(sframe) == GLUSTER_FOP_PROGRAM &&
SFRAME_GET_PROGVER(sframe) == GLUSTER_FOP_VERSION)
fop = SFRAME_GET_PROCNUM(sframe);
return ((fop == GFS3_OP_LK) || (fop == GFS3_OP_INODELK) ||
(fop == GFS3_OP_FINODELK) || (fop == GFS3_OP_ENTRYLK) ||
(fop == GFS3_OP_FENTRYLK));
}
static struct saved_frame *
__saved_frames_put(struct saved_frames *frames, void *frame,
struct rpc_req *rpcreq)
{
struct saved_frame *saved_frame = mem_get(
rpcreq->conn->rpc_clnt->saved_frames_pool);
if (!saved_frame) {
goto out;
}
/* THIS should be saved and set back */
INIT_LIST_HEAD(&saved_frame->list);
saved_frame->capital_this = THIS;
saved_frame->frame = frame;
saved_frame->rpcreq = rpcreq;
gettimeofday(&saved_frame->saved_at, NULL);
memset(&saved_frame->rsp, 0, sizeof(rpc_transport_rsp_t));
if (_is_lock_fop(saved_frame))
list_add_tail(&saved_frame->list, &frames->lk_sf.list);
else
list_add_tail(&saved_frame->list, &frames->sf.list);
frames->count++;
out:
return saved_frame;
}
static void
call_bail(void *data)
{
rpc_transport_t *trans = NULL;
struct rpc_clnt *clnt = NULL;
rpc_clnt_connection_t *conn = NULL;
struct timeval current;
struct list_head list;
struct saved_frame *saved_frame = NULL;
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
char frame_sent[256] = {
0,
};
struct timespec timeout = {
0,
};
char peerid[UNIX_PATH_MAX] = {0};
gf_boolean_t need_unref = _gf_false;
int len;
GF_VALIDATE_OR_GOTO("client", data, out);
clnt = data;
conn = &clnt->conn;
pthread_mutex_lock(&conn->lock);
{
trans = conn->trans;
if (trans) {
(void)snprintf(peerid, sizeof(peerid), "%s",
conn->trans->peerinfo.identifier);
}
}
pthread_mutex_unlock(&conn->lock);
/*rpc_clnt_connection_cleanup will be unwinding all saved frames,
* bailed or otherwise*/
if (!trans)
goto out;
gettimeofday(¤t, NULL);
INIT_LIST_HEAD(&list);
pthread_mutex_lock(&conn->lock);
{
/* Chaining to get call-always functionality from
call-once timer */
if (conn->timer) {
timeout.tv_sec = 10;
timeout.tv_nsec = 0;
/* Ref rpc as it's added to timer event queue */
rpc_clnt_ref(clnt);
gf_timer_call_cancel(clnt->ctx, conn->timer);
conn->timer = gf_timer_call_after(clnt->ctx, timeout, call_bail,
(void *)clnt);
if (conn->timer == NULL) {
gf_log(conn->name, GF_LOG_WARNING,
"Cannot create bailout timer for %s", peerid);
need_unref = _gf_true;
}
}
do {
saved_frame = __saved_frames_get_timedout(
conn->saved_frames, conn->frame_timeout, ¤t);
if (saved_frame)
list_add(&saved_frame->list, &list);
} while (saved_frame);
}
pthread_mutex_unlock(&conn->lock);
if (list_empty(&list))
goto out;
list_for_each_entry_safe(trav, tmp, &list, list)
{
gf_time_fmt(frame_sent, sizeof frame_sent, trav->saved_at.tv_sec,
gf_timefmt_FT);
len = strlen(frame_sent);
snprintf(frame_sent + len, sizeof(frame_sent) - len,
".%" GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
gf_log(conn->name, GF_LOG_ERROR,
"bailing out frame type(%s), op(%s(%d)), xid = 0x%x, "
"unique = %" PRIu64 ", sent = %s, timeout = %d for %s",
trav->rpcreq->prog->progname,
(trav->rpcreq->prog->procnames)
? trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
: "--",
trav->rpcreq->procnum, trav->rpcreq->xid,
((call_frame_t *)(trav->frame))->root->unique, frame_sent,
conn->frame_timeout, peerid);
clnt = rpc_clnt_ref(clnt);
trav->rpcreq->rpc_status = -1;
trav->rpcreq->cbkfn(trav->rpcreq, NULL, 0, trav->frame);
rpc_clnt_reply_deinit(trav->rpcreq, clnt->reqpool);
clnt = rpc_clnt_unref(clnt);
list_del_init(&trav->list);
mem_put(trav);
}
out:
rpc_clnt_unref(clnt);
if (need_unref)
rpc_clnt_unref(clnt);
return;
}
/* to be called with conn->lock held */
static struct saved_frame *
__save_frame(struct rpc_clnt *rpc_clnt, call_frame_t *frame,
struct rpc_req *rpcreq)
{
rpc_clnt_connection_t *conn = &rpc_clnt->conn;
struct timespec timeout = {
0,
};
struct saved_frame *saved_frame = __saved_frames_put(conn->saved_frames,
frame, rpcreq);
if (saved_frame == NULL) {
goto out;
}
/* TODO: make timeout configurable */
if (conn->timer == NULL) {
timeout.tv_sec = 10;
timeout.tv_nsec = 0;
rpc_clnt_ref(rpc_clnt);
conn->timer = gf_timer_call_after(rpc_clnt->ctx, timeout, call_bail,
(void *)rpc_clnt);
}
out:
return saved_frame;
}
struct saved_frames *
saved_frames_new(void)
{
struct saved_frames *saved_frames = NULL;
saved_frames = GF_CALLOC(1, sizeof(*saved_frames),
gf_common_mt_rpcclnt_savedframe_t);
if (!saved_frames) {
return NULL;
}
INIT_LIST_HEAD(&saved_frames->sf.list);
INIT_LIST_HEAD(&saved_frames->lk_sf.list);
return saved_frames;
}
int
__saved_frame_copy(struct saved_frames *frames, int64_t callid,
struct saved_frame *saved_frame)
{
struct saved_frame *tmp = NULL;
int ret = -1;
if (!saved_frame) {
ret = 0;
goto out;
}
list_for_each_entry(tmp, &frames->sf.list, list)
{
if (tmp->rpcreq->xid == callid) {
*saved_frame = *tmp;
ret = 0;
goto out;
}
}
list_for_each_entry(tmp, &frames->lk_sf.list, list)
{
if (tmp->rpcreq->xid == callid) {
*saved_frame = *tmp;
ret = 0;
goto out;
}
}
out:
return ret;
}
struct saved_frame *
__saved_frame_get(struct saved_frames *frames, int64_t callid)
{
struct saved_frame *saved_frame = NULL;
struct saved_frame *tmp = NULL;
list_for_each_entry(tmp, &frames->sf.list, list)
{
if (tmp->rpcreq->xid == callid) {
list_del_init(&tmp->list);
frames->count--;
saved_frame = tmp;
goto out;
}
}
list_for_each_entry(tmp, &frames->lk_sf.list, list)
{
if (tmp->rpcreq->xid == callid) {
list_del_init(&tmp->list);
frames->count--;
saved_frame = tmp;
goto out;
}
}
out:
if (saved_frame) {
THIS = saved_frame->capital_this;
}
return saved_frame;
}
void
saved_frames_unwind(struct saved_frames *saved_frames)
{
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
char timestr[1024] = {
0,
};
int len;
list_splice_init(&saved_frames->lk_sf.list, &saved_frames->sf.list);
list_for_each_entry_safe(trav, tmp, &saved_frames->sf.list, list)
{
gf_time_fmt(timestr, sizeof timestr, trav->saved_at.tv_sec,
gf_timefmt_FT);
len = strlen(timestr);
snprintf(timestr + len, sizeof(timestr) - len, ".%" GF_PRI_SUSECONDS,
trav->saved_at.tv_usec);
if (!trav->rpcreq || !trav->rpcreq->prog)
continue;
gf_log_callingfn(
trav->rpcreq->conn->name, GF_LOG_ERROR,
"forced unwinding frame type(%s) op(%s(%d)) "
"called at %s (xid=0x%x)",
trav->rpcreq->prog->progname,
((trav->rpcreq->prog->procnames)
? trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
: "--"),
trav->rpcreq->procnum, timestr, trav->rpcreq->xid);
saved_frames->count--;
trav->rpcreq->rpc_status = -1;
trav->rpcreq->cbkfn(trav->rpcreq, NULL, 0, trav->frame);
rpc_clnt_reply_deinit(trav->rpcreq,
trav->rpcreq->conn->rpc_clnt->reqpool);
list_del_init(&trav->list);
mem_put(trav);
}
}
void
saved_frames_destroy(struct saved_frames *frames)
{
if (!frames)
return;
saved_frames_unwind(frames);
GF_FREE(frames);
}
void
rpc_clnt_reconnect(void *conn_ptr)
{
rpc_transport_t *trans = NULL;
rpc_clnt_connection_t *conn = NULL;
struct timespec ts = {0, 0};
struct rpc_clnt *clnt = NULL;
gf_boolean_t need_unref = _gf_false;
conn = conn_ptr;
clnt = conn->rpc_clnt;
pthread_mutex_lock(&conn->lock);
{
trans = conn->trans;
if (!trans) {
pthread_mutex_unlock(&conn->lock);
return;
}
if (conn->reconnect)
gf_timer_call_cancel(clnt->ctx, conn->reconnect);
conn->reconnect = 0;
if ((conn->connected == 0) && !clnt->disabled) {
ts.tv_sec = 3;
ts.tv_nsec = 0;
gf_log(conn->name, GF_LOG_TRACE, "attempting reconnect");
(void)rpc_transport_connect(trans, conn->config.remote_port);
rpc_clnt_ref(clnt);
conn->reconnect = gf_timer_call_after(clnt->ctx, ts,
rpc_clnt_reconnect, conn);
if (!conn->reconnect) {
need_unref = _gf_true;
gf_log(conn->name, GF_LOG_ERROR,
"Error adding to timer event queue");
}
} else {
gf_log(conn->name, GF_LOG_TRACE, "breaking reconnect chain");
}
}
pthread_mutex_unlock(&conn->lock);
rpc_clnt_unref(clnt);
if (need_unref)
rpc_clnt_unref(clnt);
return;
}
int
rpc_clnt_fill_request_info(struct rpc_clnt *clnt, rpc_request_info_t *info)
{
struct saved_frame saved_frame;
int ret = -1;
pthread_mutex_lock(&clnt->conn.lock);
{
ret = __saved_frame_copy(clnt->conn.saved_frames, info->xid,
&saved_frame);
}
pthread_mutex_unlock(&clnt->conn.lock);
if (ret == -1) {
gf_log(clnt->conn.name, GF_LOG_CRITICAL,
"cannot lookup the saved "
"frame corresponding to xid (%d)",
info->xid);
goto out;
}
info->prognum = saved_frame.rpcreq->prog->prognum;
info->procnum = saved_frame.rpcreq->procnum;
info->progver = saved_frame.rpcreq->prog->progver;
info->rpc_req = saved_frame.rpcreq;
info->rsp = saved_frame.rsp;
ret = 0;
out:
return ret;
}
int
rpc_clnt_reconnect_cleanup(rpc_clnt_connection_t *conn)
{
struct rpc_clnt *clnt = NULL;
int ret = 0;
gf_boolean_t reconnect_unref = _gf_false;
if (!conn) {
goto out;
}
clnt = conn->rpc_clnt;
pthread_mutex_lock(&conn->lock);
{
if (conn->reconnect) {
ret = gf_timer_call_cancel(clnt->ctx, conn->reconnect);
if (!ret) {
reconnect_unref = _gf_true;
conn->cleanup_gen++;
}
conn->reconnect = NULL;
}
}
pthread_mutex_unlock(&conn->lock);
if (reconnect_unref)
rpc_clnt_unref(clnt);
out:
return 0;
}
/*
* client_protocol_cleanup - cleanup function
* @trans: transport object
*
*/
int
rpc_clnt_connection_cleanup(rpc_clnt_connection_t *conn)
{
struct saved_frames *saved_frames = NULL;
struct rpc_clnt *clnt = NULL;
int unref = 0;
int ret = 0;
gf_boolean_t timer_unref = _gf_false;
if (!conn) {
goto out;
}
clnt = conn->rpc_clnt;
pthread_mutex_lock(&conn->lock);
{
saved_frames = conn->saved_frames;
conn->saved_frames = saved_frames_new();
/* bailout logic cleanup */
if (conn->timer) {
ret = gf_timer_call_cancel(clnt->ctx, conn->timer);
if (!ret)
timer_unref = _gf_true;
conn->timer = NULL;
}
conn->connected = 0;
conn->disconnected = 1;
unref = rpc_clnt_remove_ping_timer_locked(clnt);
/*reset rpc msgs stats*/
conn->pingcnt = 0;
conn->msgcnt = 0;
conn->cleanup_gen++;
}
pthread_mutex_unlock(&conn->lock);
saved_frames_destroy(saved_frames);
if (unref)
rpc_clnt_unref(clnt);
if (timer_unref)
rpc_clnt_unref(clnt);
out:
return 0;
}
/*
* lookup_frame - lookup call frame corresponding to a given callid
* @trans: transport object
* @callid: call id of the frame
*
* not for external reference
*/
static struct saved_frame *
lookup_frame(rpc_clnt_connection_t *conn, int64_t callid)
{
struct saved_frame *frame = NULL;
pthread_mutex_lock(&conn->lock);
{
frame = __saved_frame_get(conn->saved_frames, callid);
}
pthread_mutex_unlock(&conn->lock);
return frame;
}
int
rpc_clnt_reply_fill(rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn,
struct rpc_msg *replymsg, struct iovec progmsg,
struct rpc_req *req, struct saved_frame *saved_frame)
{
int ret = -1;
if ((!conn) || (!replymsg) || (!req) || (!saved_frame) || (!msg)) {
goto out;
}
req->rpc_status = 0;
if ((rpc_reply_status(replymsg) == MSG_DENIED) ||
(rpc_accepted_reply_status(replymsg) != SUCCESS)) {
req->rpc_status = -1;
}
req->rsp[0] = progmsg;
req->rsp_iobref = iobref_ref(msg->iobref);
if (msg->vectored) {
req->rsp[1] = msg->vector[1];
req->rspcnt = 2;
} else {
req->rspcnt = 1;
}
/* By this time, the data bytes for the auth scheme would have already
* been copied into the required sections of the req structure,
* we just need to fill in the meta-data about it now.
*/
if (req->rpc_status == 0) {
/*
* req->verf.flavour = rpc_reply_verf_flavour (replymsg);
* req->verf.datalen = rpc_reply_verf_len (replymsg);
*/
}
ret = 0;
out:
return ret;
}
void
rpc_clnt_reply_deinit(struct rpc_req *req, struct mem_pool *pool)
{
if (!req) {
goto out;
}
if (req->rsp_iobref) {
iobref_unref(req->rsp_iobref);
}
mem_put(req);
out:
return;
}
/* TODO: use mem-pool for allocating requests */
int
rpc_clnt_reply_init(rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
struct rpc_req *req, struct saved_frame *saved_frame)
{
char *msgbuf = NULL;
struct rpc_msg rpcmsg;
struct iovec progmsg; /* RPC Program payload */
size_t msglen = 0;
int ret = -1;
msgbuf = msg->vector[0].iov_base;
msglen = msg->vector[0].iov_len;
ret = xdr_to_rpc_reply(msgbuf, msglen, &rpcmsg, &progmsg,
req->verf.authdata);
if (ret != 0) {
gf_log(conn->name, GF_LOG_WARNING, "RPC reply decoding failed");
goto out;
}
ret = rpc_clnt_reply_fill(msg, conn, &rpcmsg, progmsg, req, saved_frame);
if (ret != 0) {
goto out;
}
gf_log(conn->name, GF_LOG_TRACE,
"received rpc message (RPC XID: 0x%x"
" Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)",
saved_frame->rpcreq->xid, saved_frame->rpcreq->prog->progname,
saved_frame->rpcreq->prog->progver, saved_frame->rpcreq->procnum,
conn->name);
out:
if (ret != 0) {
req->rpc_status = -1;
}
return ret;
}
int
rpc_clnt_handle_cbk(struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
{
char *msgbuf = NULL;
rpcclnt_cb_program_t *program = NULL;
struct rpc_msg rpcmsg;
struct iovec progmsg; /* RPC Program payload */
size_t msglen = 0;
int found = 0;
int ret = -1;
int procnum = 0;
msgbuf = msg->vector[0].iov_base;
msglen = msg->vector[0].iov_len;
clnt = rpc_clnt_ref(clnt);
ret = xdr_to_rpc_call(msgbuf, msglen, &rpcmsg, &progmsg, NULL, NULL);
if (ret == -1) {
gf_log(clnt->conn.name, GF_LOG_WARNING, "RPC call decoding failed");
goto out;
}
gf_log(clnt->conn.name, GF_LOG_TRACE,
"receivd rpc message (XID: 0x%" GF_PRI_RPC_XID
", "
"Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID
", "
"ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC
") "
"from rpc-transport (%s)",
rpc_call_xid(&rpcmsg), rpc_call_rpcvers(&rpcmsg),
rpc_call_program(&rpcmsg), rpc_call_progver(&rpcmsg),
rpc_call_progproc(&rpcmsg), clnt->conn.name);
procnum = rpc_call_progproc(&rpcmsg);
pthread_mutex_lock(&clnt->lock);
{
list_for_each_entry(program, &clnt->programs, program)
{
if ((program->prognum == rpc_call_program(&rpcmsg)) &&
(program->progver == rpc_call_progver(&rpcmsg))) {
found = 1;
break;
}
}
}
pthread_mutex_unlock(&clnt->lock);
if (found && (procnum < program->numactors) &&
(program->actors[procnum].actor)) {
program->actors[procnum].actor(clnt, program->mydata, &progmsg);
}
out:
rpc_clnt_unref(clnt);
return ret;
}
int
rpc_clnt_handle_reply(struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
{
rpc_clnt_connection_t *conn = NULL;
struct saved_frame *saved_frame = NULL;
int ret = -1;
struct rpc_req *req = NULL;
uint32_t xid = 0;
clnt = rpc_clnt_ref(clnt);
conn = &clnt->conn;
xid = ntoh32(*((uint32_t *)pollin->vector[0].iov_base));
saved_frame = lookup_frame(conn, xid);
if (saved_frame == NULL) {
gf_log(conn->name, GF_LOG_ERROR,
"cannot lookup the saved frame for reply with xid (%u)", xid);
goto out;
}
req = saved_frame->rpcreq;
if (req == NULL) {
gf_log(conn->name, GF_LOG_ERROR, "no request with frame for xid (%u)",
xid);
goto out;
}
ret = rpc_clnt_reply_init(conn, pollin, req, saved_frame);
if (ret != 0) {
req->rpc_status = -1;
gf_log(conn->name, GF_LOG_WARNING, "initialising rpc reply failed");
}
req->cbkfn(req, req->rsp, req->rspcnt, saved_frame->frame);
if (req) {
rpc_clnt_reply_deinit(req, conn->rpc_clnt->reqpool);
}
out:
if (saved_frame) {
mem_put(saved_frame);
}
rpc_clnt_unref(clnt);
return ret;
}
gf_boolean_t
is_rpc_clnt_disconnected(rpc_clnt_connection_t *conn)
{
gf_boolean_t disconnected = _gf_true;
if (!conn)
return disconnected;
pthread_mutex_lock(&conn->lock);
{
disconnected = conn->disconnected;
}
pthread_mutex_unlock(&conn->lock);
return disconnected;
}
static void
rpc_clnt_destroy(struct rpc_clnt *rpc);
#define RPC_THIS_SAVE(xl) \
do { \
old_THIS = THIS; \
if (!old_THIS) \
gf_log_callingfn("rpc", GF_LOG_CRITICAL, \
"THIS is not initialised."); \
THIS = xl; \
} while (0)
#define RPC_THIS_RESTORE (THIS = old_THIS)
static int
rpc_clnt_handle_disconnect(struct rpc_clnt *clnt, rpc_clnt_connection_t *conn)
{
struct timespec ts = {
0,
};
gf_boolean_t unref_clnt = _gf_false;
uint64_t pre_notify_gen = 0, post_notify_gen = 0;
pthread_mutex_lock(&conn->lock);
{
pre_notify_gen = conn->cleanup_gen;
}
pthread_mutex_unlock(&conn->lock);
if (clnt->notifyfn)
clnt->notifyfn(clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL);
pthread_mutex_lock(&conn->lock);
{
post_notify_gen = conn->cleanup_gen;
}
pthread_mutex_unlock(&conn->lock);
if (pre_notify_gen == post_notify_gen) {
/* program didn't invoke cleanup, so rpc has to do it */
rpc_clnt_connection_cleanup(conn);
}
pthread_mutex_lock(&conn->lock);
{
if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) {
ts.tv_sec = 10;
ts.tv_nsec = 0;
rpc_clnt_ref(clnt);
conn->reconnect = gf_timer_call_after(clnt->ctx, ts,
rpc_clnt_reconnect, conn);
if (conn->reconnect == NULL) {
gf_log(conn->name, GF_LOG_WARNING,
"Cannot create rpc_clnt_reconnect timer");
unref_clnt = _gf_true;
}
}
}
pthread_mutex_unlock(&conn->lock);
if (unref_clnt)
rpc_clnt_unref(clnt);
return 0;
}
int
rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...)
{
rpc_clnt_connection_t *conn = NULL;
struct rpc_clnt *clnt = NULL;
int ret = -1;
rpc_request_info_t *req_info = NULL;
rpc_transport_pollin_t *pollin = NULL;
void *clnt_mydata = NULL;
DECLARE_OLD_THIS;
conn = mydata;
if (conn == NULL) {
goto out;
}
clnt = conn->rpc_clnt;
if (!clnt)
goto out;
RPC_THIS_SAVE(clnt->owner);
switch (event) {
case RPC_TRANSPORT_DISCONNECT: {
rpc_clnt_handle_disconnect(clnt, conn);
/* The auth_value was being reset to AUTH_GLUSTERFS_v2.
* if (clnt->auth_value)
* clnt->auth_value = AUTH_GLUSTERFS_v2;
* It should not be reset here. The disconnect during
* portmap request can race with handshake. If handshake
* happens first and disconnect later, auth_value would set
* to default value and it never sets back to actual auth_value
* supported by server. But it's important to set to lower
* version supported in the case where the server downgrades.
* So moving this code to RPC_TRANSPORT_CONNECT. Note that
* CONNECT cannot race with handshake as by nature it is
* serialized with handhake. An handshake can happen only
* on a connected transport and hence its strictly serialized.
*/
break;
}
case RPC_TRANSPORT_CLEANUP:
if (clnt->notifyfn) {
clnt_mydata = clnt->mydata;
clnt->mydata = NULL;
ret = clnt->notifyfn(clnt, clnt_mydata, RPC_CLNT_DESTROY, NULL);
if (ret < 0) {
gf_log(trans->name, GF_LOG_WARNING,
"client notify handler returned error "
"while handling RPC_CLNT_DESTROY");
}
}
rpc_clnt_destroy(clnt);
ret = 0;
break;
case RPC_TRANSPORT_MAP_XID_REQUEST: {
req_info = data;
ret = rpc_clnt_fill_request_info(clnt, req_info);
break;
}
case RPC_TRANSPORT_MSG_RECEIVED: {
clock_gettime(CLOCK_REALTIME, &conn->last_received);
pollin = data;
if (pollin->is_reply)
ret = rpc_clnt_handle_reply(clnt, pollin);
else
ret = rpc_clnt_handle_cbk(clnt, pollin);
/* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,
* data);
*/
break;
}
case RPC_TRANSPORT_MSG_SENT: {
clock_gettime(CLOCK_REALTIME, &conn->last_sent);
ret = 0;
break;
}
case RPC_TRANSPORT_CONNECT: {
pthread_mutex_lock(&conn->lock);
{
/* Every time there is a disconnection, processes
* should try to connect to 'glusterd' (ie, default
* port) or whichever port given as 'option remote-port'
* in volume file. */
/* Below code makes sure the (re-)configured port lasts
* for just one successful attempt */
conn->config.remote_port = 0;
conn->connected = 1;
conn->disconnected = 0;
}
pthread_mutex_unlock(&conn->lock);
/* auth value should be set to lower version available
* and will be set to appropriate version supported by
* server after the handshake.
*/
if (clnt->auth_value)
clnt->auth_value = AUTH_GLUSTERFS_v2;
if (clnt->notifyfn)
ret = clnt->notifyfn(clnt, clnt->mydata, RPC_CLNT_CONNECT,
NULL);
break;
}
case RPC_TRANSPORT_ACCEPT:
/* only meaningful on a server, no need of handling this event
* in a client.
*/
ret = 0;
break;
case RPC_TRANSPORT_EVENT_THREAD_DIED:
/* only meaningful on a server, no need of handling this event on a
* client */
ret = 0;
break;
}
out:
RPC_THIS_RESTORE;
return ret;
}
static int
rpc_clnt_connection_init(struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
dict_t *options, char *name)
{
int ret = -1;
rpc_clnt_connection_t *conn = NULL;
rpc_transport_t *trans = NULL;
conn = &clnt->conn;
pthread_mutex_init(&clnt->conn.lock, NULL);
conn->name = gf_strdup(name);
if (!conn->name) {
ret = -1;
goto out;
}
ret = dict_get_int32(options, "frame-timeout", &conn->frame_timeout);
if (ret >= 0) {
gf_log(name, GF_LOG_INFO, "setting frame-timeout to %d",
conn->frame_timeout);
} else {
gf_log(name, GF_LOG_DEBUG, "defaulting frame-timeout to 30mins");
conn->frame_timeout = 1800;
}
conn->rpc_clnt = clnt;
ret = dict_get_int32(options, "ping-timeout", &conn->ping_timeout);
if (ret >= 0) {
gf_log(name, GF_LOG_DEBUG, "setting ping-timeout to %d",
conn->ping_timeout);
} else {
/*TODO: Once the epoll thread model is fixed,
change the default ping-timeout to 30sec */
gf_log(name, GF_LOG_DEBUG, "disable ping-timeout");
conn->ping_timeout = 0;
}
trans = rpc_transport_load(ctx, options, name);
if (!trans) {
gf_log(name, GF_LOG_WARNING,
"loading of new rpc-transport"
" failed");
ret = -1;
goto out;
}
rpc_transport_ref(trans);
pthread_mutex_lock(&conn->lock);
{
conn->trans = trans;
trans = NULL;
}
pthread_mutex_unlock(&conn->lock);
ret = rpc_transport_register_notify(conn->trans, rpc_clnt_notify, conn);
if (ret == -1) {
gf_log(name, GF_LOG_WARNING, "registering notify failed");
goto out;
}
conn->saved_frames = saved_frames_new();
if (!conn->saved_frames) {
gf_log(name, GF_LOG_WARNING,
"creation of saved_frames "
"failed");
ret = -1;
goto out;
}
ret = 0;
out:
if (ret) {
pthread_mutex_lock(&conn->lock);
{
trans = conn->trans;
conn->trans = NULL;
}
pthread_mutex_unlock(&conn->lock);
if (trans)
rpc_transport_unref(trans);
// conn cleanup needs to be done since we might have failed to
// register notification.
rpc_clnt_connection_cleanup(conn);
}
return ret;
}
struct rpc_clnt *
rpc_clnt_new(dict_t *options, xlator_t *owner, char *name,
uint32_t reqpool_size)
{
int ret = -1;
struct rpc_clnt *rpc = NULL;
glusterfs_ctx_t *ctx = owner->ctx;
rpc = GF_CALLOC(1, sizeof(*rpc), gf_common_mt_rpcclnt_t);
if (!rpc) {
goto out;
}
pthread_mutex_init(&rpc->lock, NULL);
rpc->ctx = ctx;
rpc->owner = owner;
GF_ATOMIC_INIT(rpc->xid, 1);
if (!reqpool_size)
reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT;
rpc->reqpool = mem_pool_new(struct rpc_req, reqpool_size);
if (rpc->reqpool == NULL) {
pthread_mutex_destroy(&rpc->lock);
GF_FREE(rpc);
rpc = NULL;
goto out;
}
rpc->saved_frames_pool = mem_pool_new(struct saved_frame, reqpool_size);
if (rpc->saved_frames_pool == NULL) {
pthread_mutex_destroy(&rpc->lock);
mem_pool_destroy(rpc->reqpool);
GF_FREE(rpc);
rpc = NULL;
goto out;
}
ret = rpc_clnt_connection_init(rpc, ctx, options, name);
if (ret == -1) {
pthread_mutex_destroy(&rpc->lock);
mem_pool_destroy(rpc->reqpool);
mem_pool_destroy(rpc->saved_frames_pool);
GF_FREE(rpc);
rpc = NULL;
if (options)
dict_unref(options);
goto out;
}
/* This is handled to make sure we have modularity in getting the
auth data changed */
gf_boolean_t auth_null = dict_get_str_boolean(options, "auth-null", 0);
rpc->auth_value = (auth_null) ? 0 : AUTH_GLUSTERFS_v2;
rpc = rpc_clnt_ref(rpc);
INIT_LIST_HEAD(&rpc->programs);
out:
return rpc;
}
int
rpc_clnt_start(struct rpc_clnt *rpc)
{
struct rpc_clnt_connection *conn = NULL;
if (!rpc)
return -1;
conn = &rpc->conn;
pthread_mutex_lock(&conn->lock);
{
rpc->disabled = 0;
}
pthread_mutex_unlock(&conn->lock);
/* Corresponding unref will be either on successful timer cancel or last
* rpc_clnt_reconnect fire event.
*/
rpc_clnt_ref(rpc);
rpc_clnt_reconnect(conn);
return 0;
}
int
rpc_clnt_cleanup_and_start(struct rpc_clnt *rpc)
{
struct rpc_clnt_connection *conn = NULL;
if (!rpc)
return -1;
conn = &rpc->conn;
rpc_clnt_connection_cleanup(conn);
pthread_mutex_lock(&conn->lock);
{
rpc->disabled = 0;
}
pthread_mutex_unlock(&conn->lock);
/* Corresponding unref will be either on successful timer cancel or last
* rpc_clnt_reconnect fire event.
*/
rpc_clnt_ref(rpc);
rpc_clnt_reconnect(conn);
return 0;
}
int
rpc_clnt_register_notify(struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
void *mydata)
{
rpc->mydata = mydata;
rpc->notifyfn = fn;
return 0;
}
/* used for GF_LOG_OCCASIONALLY() */
static int gf_auth_max_groups_log = 0;
static inline int
setup_glusterfs_auth_param_v3(call_frame_t *frame, auth_glusterfs_params_v3 *au,
int lk_owner_len, char *owner_data)
{
int ret = -1;
unsigned int max_groups = 0;
int max_lkowner_len = 0;
au->pid = frame->root->pid;
au->uid = frame->root->uid;
au->gid = frame->root->gid;
au->flags = frame->root->flags;
au->ctime_sec = frame->root->ctime.tv_sec;
au->ctime_nsec = frame->root->ctime.tv_nsec;
au->lk_owner.lk_owner_val = owner_data;
au->lk_owner.lk_owner_len = lk_owner_len;
au->groups.groups_val = frame->root->groups;
au->groups.groups_len = frame->root->ngrps;
/* The number of groups and the size of lk_owner depend on oneother.
* We can truncate the groups, but should not touch the lk_owner. */
max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS(lk_owner_len, AUTH_GLUSTERFS_v3);
if (au->groups.groups_len > max_groups) {
GF_LOG_OCCASIONALLY(gf_auth_max_groups_log, "rpc-auth", GF_LOG_WARNING,
"truncating grouplist "
"from %d to %d",
au->groups.groups_len, max_groups);
au->groups.groups_len = max_groups;
}
max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER(au->groups.groups_len,
AUTH_GLUSTERFS_v3);
if (lk_owner_len > max_lkowner_len) {
gf_log("rpc-clnt", GF_LOG_ERROR,
"lkowner field is too "
"big (%d), it does not fit in the rpc-header",
au->lk_owner.lk_owner_len);
errno = E2BIG;
goto out;
}
ret = 0;
out:
return ret;
}
static inline int
setup_glusterfs_auth_param_v2(call_frame_t *frame, auth_glusterfs_parms_v2 *au,
int lk_owner_len, char *owner_data)
{
unsigned int max_groups = 0;
int max_lkowner_len = 0;
int ret = -1;
au->pid = frame->root->pid;
au->uid = frame->root->uid;
au->gid = frame->root->gid;
au->lk_owner.lk_owner_val = owner_data;
au->lk_owner.lk_owner_len = lk_owner_len;
au->groups.groups_val = frame->root->groups;
au->groups.groups_len = frame->root->ngrps;
/* The number of groups and the size of lk_owner depend on oneother.
* We can truncate the groups, but should not touch the lk_owner. */
max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS(lk_owner_len, AUTH_GLUSTERFS_v2);
if (au->groups.groups_len > max_groups) {
GF_LOG_OCCASIONALLY(gf_auth_max_groups_log, "rpc-auth", GF_LOG_WARNING,
"truncating grouplist "
"from %d to %d",
au->groups.groups_len, max_groups);
au->groups.groups_len = max_groups;
}
max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER(au->groups.groups_len,
AUTH_GLUSTERFS_v2);
if (lk_owner_len > max_lkowner_len) {
gf_log("rpc-auth", GF_LOG_ERROR,
"lkowner field is too "
"big (%d), it does not fit in the rpc-header",
au->lk_owner.lk_owner_len);
errno = E2BIG;
goto out;
}
ret = 0;
out:
return ret;
}
static ssize_t
xdr_serialize_glusterfs_auth(struct rpc_clnt *clnt, call_frame_t *frame,
char *dest)
{
ssize_t ret = -1;
XDR xdr;
char owner[4] = {
0,
};
int32_t pid = 0;
char *lk_owner_data = NULL;
int lk_owner_len = 0;
if ((!dest))
return -1;
xdrmem_create(&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE);
if (frame->root->lk_owner.len) {
lk_owner_data = frame->root->lk_owner.data;
lk_owner_len = frame->root->lk_owner.len;
} else {
pid = frame->root->pid;
owner[0] = (char)(pid & 0xff);
owner[1] = (char)((pid >> 8) & 0xff);
owner[2] = (char)((pid >> 16) & 0xff);
owner[3] = (char)((pid >> 24) & 0xff);
lk_owner_data = owner;
lk_owner_len = 4;
}
if (clnt->auth_value == AUTH_GLUSTERFS_v2) {
auth_glusterfs_parms_v2 au_v2 = {
0,
};
ret = setup_glusterfs_auth_param_v2(frame, &au_v2, lk_owner_len,
lk_owner_data);
if (ret)
goto out;
if (!xdr_auth_glusterfs_parms_v2(&xdr, &au_v2)) {
gf_log(THIS->name, GF_LOG_WARNING,
"failed to encode auth glusterfs elements");
ret = -1;
goto out;
}
} else if (clnt->auth_value == AUTH_GLUSTERFS_v3) {
auth_glusterfs_params_v3 au_v3 = {
0,
};
ret = setup_glusterfs_auth_param_v3(frame, &au_v3, lk_owner_len,
lk_owner_data);
if (ret)
goto out;
if (!xdr_auth_glusterfs_params_v3(&xdr, &au_v3)) {
gf_log(THIS->name, GF_LOG_WARNING,
"failed to encode auth glusterfs elements");
ret = -1;
goto out;
}
} else {
gf_log(THIS->name, GF_LOG_WARNING,
"failed to encode auth glusterfs elements");
ret = -1;
goto out;
}
ret = (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base));
out:
return ret;
}
int
rpc_clnt_fill_request(struct rpc_clnt *clnt, int prognum, int progver,
int procnum, uint64_t xid, call_frame_t *fr,
struct rpc_msg *request, char *auth_data)
{
int ret = -1;
if (!request) {
goto out;
}
memset(request, 0, sizeof(*request));
request->rm_xid = xid;
request->rm_direction = CALL;
request->rm_call.cb_rpcvers = 2;
request->rm_call.cb_prog = prognum;
request->rm_call.cb_vers = progver;
request->rm_call.cb_proc = procnum;
if (!clnt->auth_value) {
request->rm_call.cb_cred.oa_flavor = AUTH_NULL;
request->rm_call.cb_cred.oa_base = NULL;
request->rm_call.cb_cred.oa_length = 0;
} else {
ret = xdr_serialize_glusterfs_auth(clnt, fr, auth_data);
if (ret == -1) {
gf_log("rpc-clnt", GF_LOG_WARNING,
"cannot encode auth credentials");
goto out;
}
request->rm_call.cb_cred.oa_flavor = clnt->auth_value;
request->rm_call.cb_cred.oa_base = auth_data;
request->rm_call.cb_cred.oa_length = ret;
}
request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
request->rm_call.cb_verf.oa_base = NULL;
request->rm_call.cb_verf.oa_length = 0;
ret = 0;
out:
return ret;
}
struct iovec
rpc_clnt_record_build_header(char *recordstart, size_t rlen,
struct rpc_msg *request, size_t payload)
{
struct iovec requesthdr = {
0,
};
struct iovec txrecord = {0, 0};
int ret = -1;
size_t fraglen = 0;
ret = rpc_request_to_xdr(request, recordstart, rlen, &requesthdr);
if (ret == -1) {
gf_log("rpc-clnt", GF_LOG_DEBUG, "Failed to create RPC request");
goto out;
}
fraglen = payload + requesthdr.iov_len;
gf_log("rpc-clnt", GF_LOG_TRACE,
"Request fraglen %zu, payload: %zu, "
"rpc hdr: %zu",
fraglen, payload, requesthdr.iov_len);
txrecord.iov_base = recordstart;
/* Remember, this is only the vec for the RPC header and does not
* include the payload above. We needed the payload only to calculate
* the size of the full fragment. This size is sent in the fragment
* header.
*/
txrecord.iov_len = requesthdr.iov_len;
out:
return txrecord;
}
struct iobuf *
rpc_clnt_record_build_record(struct rpc_clnt *clnt, call_frame_t *fr,
int prognum, int progver, int procnum,
size_t hdrsize, uint64_t xid, struct iovec *recbuf)
{
struct rpc_msg request = {
0,
};
struct iobuf *request_iob = NULL;
char *record = NULL;
struct iovec recordhdr = {
0,
};
size_t pagesize = 0;
int ret = -1;
size_t xdr_size = 0;
char auth_data[GF_MAX_AUTH_BYTES] = {
0,
};
if ((!clnt) || (!recbuf)) {
goto out;
}
/* Fill the rpc structure and XDR it into the buffer got above. */
ret = rpc_clnt_fill_request(clnt, prognum, progver, procnum, xid, fr,
&request, auth_data);
if (ret == -1) {
gf_log(clnt->conn.name, GF_LOG_WARNING,
"cannot build a rpc-request xid (%" PRIu64 ")", xid);
goto out;
}
xdr_size = xdr_sizeof((xdrproc_t)xdr_callmsg, &request);
/* First, try to get a pointer into the buffer which the RPC
* layer can use.
*/
request_iob = iobuf_get2(clnt->ctx->iobuf_pool, (xdr_size + hdrsize));
if (!request_iob) {
goto out;
}
pagesize = iobuf_pagesize(request_iob);
record = iobuf_ptr(request_iob); /* Now we have it. */
recordhdr = rpc_clnt_record_build_header(record, pagesize, &request,
hdrsize);
if (!recordhdr.iov_base) {
gf_log(clnt->conn.name, GF_LOG_ERROR, "Failed to build record header");
iobuf_unref(request_iob);
request_iob = NULL;
recbuf->iov_base = NULL;
goto out;
}
recbuf->iov_base = recordhdr.iov_base;
recbuf->iov_len = recordhdr.iov_len;
out:
return request_iob;
}
static inline struct iobuf *
rpc_clnt_record(struct rpc_clnt *clnt, call_frame_t *call_frame,
rpc_clnt_prog_t *prog, int procnum, size_t hdrlen,
struct iovec *rpchdr, uint64_t callid)
{
if (!prog || !rpchdr || !call_frame) {
return NULL;
}
return rpc_clnt_record_build_record(clnt, call_frame, prog->prognum,
prog->progver, procnum, hdrlen, callid,
rpchdr);
}
int
rpcclnt_cbk_program_register(struct rpc_clnt *clnt,
rpcclnt_cb_program_t *program, void *mydata)
{
int ret = -1;
char already_registered = 0;
rpcclnt_cb_program_t *tmp = NULL;
if (!clnt)
goto out;
if (program->actors == NULL)
goto out;
pthread_mutex_lock(&clnt->lock);
{
list_for_each_entry(tmp, &clnt->programs, program)
{
if ((program->prognum == tmp->prognum) &&
(program->progver == tmp->progver)) {
already_registered = 1;
break;
}
}
}
pthread_mutex_unlock(&clnt->lock);
if (already_registered) {
gf_log_callingfn(clnt->conn.name, GF_LOG_DEBUG, "already registered");
ret = 0;
goto out;
}
tmp = GF_MALLOC(sizeof(*tmp), gf_common_mt_rpcclnt_cb_program_t);
if (tmp == NULL) {
goto out;
}
memcpy(tmp, program, sizeof(*tmp));
INIT_LIST_HEAD(&tmp->program);
tmp->mydata = mydata;
pthread_mutex_lock(&clnt->lock);
{
list_add_tail(&tmp->program, &clnt->programs);
}
pthread_mutex_unlock(&clnt->lock);
ret = 0;
gf_log(clnt->conn.name, GF_LOG_DEBUG,
"New program registered: %s, Num: %d, Ver: %d", program->progname,
program->prognum, program->progver);
out:
if (ret == -1 && clnt) {
gf_log(clnt->conn.name, GF_LOG_ERROR,
"Program registration failed:"
" %s, Num: %d, Ver: %d",
program->progname, program->prognum, program->progver);
}
return ret;
}
int
rpc_clnt_submit(struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum,
fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount,
struct iovec *progpayload, int progpayloadcount,
struct iobref *iobref, void *frame, struct iovec *rsphdr,
int rsphdr_count, struct iovec *rsp_payload,
int rsp_payload_count, struct iobref *rsp_iobref)
{
rpc_clnt_connection_t *conn = NULL;
struct iobuf *request_iob = NULL;
struct iovec rpchdr = {
0,
};
struct rpc_req *rpcreq = NULL;
rpc_transport_req_t req;
int ret = -1;
int proglen = 0;
char new_iobref = 0;
uint64_t callid = 0;
gf_boolean_t need_unref = _gf_false;
call_frame_t *cframe = frame;
if (!rpc || !prog || !frame) {
goto out;
}
conn = &rpc->conn;
rpcreq = mem_get(rpc->reqpool);
if (rpcreq == NULL) {
goto out;
}
memset(rpcreq, 0, sizeof(*rpcreq));
memset(&req, 0, sizeof(req));
if (!iobref) {
iobref = iobref_new();
if (!iobref) {
goto out;
}
new_iobref = 1;
}
callid = GF_ATOMIC_INC(rpc->xid);
rpcreq->prog = prog;
rpcreq->procnum = procnum;
rpcreq->conn = conn;
rpcreq->xid = callid;
rpcreq->cbkfn = cbkfn;
ret = -1;
if (proghdr) {
proglen += iov_length(proghdr, proghdrcount);
}
request_iob = rpc_clnt_record(rpc, frame, prog, procnum, proglen, &rpchdr,
callid);
if (!request_iob) {
gf_log(conn->name, GF_LOG_WARNING, "cannot build rpc-record");
goto out;
}
iobref_add(iobref, request_iob);
req.msg.rpchdr = &rpchdr;
req.msg.rpchdrcount = 1;
req.msg.proghdr = proghdr;
req.msg.proghdrcount = proghdrcount;
req.msg.progpayload = progpayload;
req.msg.progpayloadcount = progpayloadcount;
req.msg.iobref = iobref;
req.rsp.rsphdr = rsphdr;
req.rsp.rsphdr_count = rsphdr_count;
req.rsp.rsp_payload = rsp_payload;
req.rsp.rsp_payload_count = rsp_payload_count;
req.rsp.rsp_iobref = rsp_iobref;
req.rpc_req = rpcreq;
pthread_mutex_lock(&conn->lock);
{
if (conn->connected == 0) {
if (rpc->disabled)
goto unlock;
ret = rpc_transport_connect(conn->trans, conn->config.remote_port);
if (ret < 0) {
gf_log(conn->name,
(errno == EINPROGRESS) ? GF_LOG_DEBUG : GF_LOG_WARNING,
"error returned while attempting to "
"connect to host:%s, port:%d",
conn->config.remote_host, conn->config.remote_port);
goto unlock;
}
}
ret = rpc_transport_submit_request(conn->trans, &req);
if (ret == -1) {
gf_log(conn->name, GF_LOG_WARNING,
"failed to submit rpc-request "
"(unique: %" PRIu64
", XID: 0x%x Program: %s, "
"ProgVers: %d, Proc: %d) to rpc-transport (%s)",
cframe->root->unique, rpcreq->xid, rpcreq->prog->progname,
rpcreq->prog->progver, rpcreq->procnum, conn->name);
} else if ((ret >= 0) && frame) {
/* Save the frame in queue */
__save_frame(rpc, frame, rpcreq);
/* A ref on rpc-clnt object is taken while registering
* call_bail to timer in __save_frame. If it fails to
* register, it needs an unref and should happen outside
* conn->lock which otherwise leads to deadlocks */
if (conn->timer == NULL)
need_unref = _gf_true;
conn->msgcnt++;
gf_log("rpc-clnt", GF_LOG_TRACE,
"submitted request "
"(unique: %" PRIu64
", XID: 0x%x, Program: %s, "
"ProgVers: %d, Proc: %d) to rpc-transport (%s)",
cframe->root->unique, rpcreq->xid, rpcreq->prog->progname,
rpcreq->prog->progver, rpcreq->procnum, conn->name);
}
}
unlock:
pthread_mutex_unlock(&conn->lock);
if (need_unref)
rpc_clnt_unref(rpc);
if (ret == -1) {
goto out;
}
rpc_clnt_check_and_start_ping(rpc);
ret = 0;
out:
if (request_iob) {
iobuf_unref(request_iob);
}
if (new_iobref && iobref) {
iobref_unref(iobref);
}
if (frame && (ret == -1)) {
if (rpcreq) {
rpcreq->rpc_status = -1;
cbkfn(rpcreq, NULL, 0, frame);
mem_put(rpcreq);
}
}
return ret;
}
struct rpc_clnt *
rpc_clnt_ref(struct rpc_clnt *rpc)
{
if (!rpc)
return NULL;
GF_ATOMIC_INC(rpc->refcount);
return rpc;
}
static void
rpc_clnt_trigger_destroy(struct rpc_clnt *rpc)
{
rpc_clnt_connection_t *conn = NULL;
rpc_transport_t *trans = NULL;
if (!rpc)
return;
/* reading conn->trans outside conn->lock is OK, since this is the last
* ref*/
conn = &rpc->conn;
trans = conn->trans;
rpc_clnt_disconnect(rpc);
/* This is to account for rpc_clnt_disable that might have been called
* before rpc_clnt_unref */
if (trans) {
/* set conn->trans to NULL before rpc_transport_unref
* as rpc_transport_unref can potentially free conn
*/
conn->trans = NULL;
rpc_transport_unref(trans);
}
}
static void
rpc_clnt_destroy(struct rpc_clnt *rpc)
{
rpcclnt_cb_program_t *program = NULL;
rpcclnt_cb_program_t *tmp = NULL;
struct saved_frames *saved_frames = NULL;
rpc_clnt_connection_t *conn = NULL;
if (!rpc)
return;
conn = &rpc->conn;
GF_FREE(rpc->conn.name);
/* Access saved_frames in critical-section to avoid
crash in rpc_clnt_connection_cleanup at the time
of destroying saved frames
*/
pthread_mutex_lock(&conn->lock);
{
saved_frames = conn->saved_frames;
conn->saved_frames = NULL;
}
pthread_mutex_unlock(&conn->lock);
saved_frames_destroy(saved_frames);
pthread_mutex_destroy(&rpc->lock);
pthread_mutex_destroy(&rpc->conn.lock);
/* mem-pool should be destroyed, otherwise,
it will cause huge memory leaks */
mem_pool_destroy(rpc->reqpool);
mem_pool_destroy(rpc->saved_frames_pool);
list_for_each_entry_safe(program, tmp, &rpc->programs, program)
{
GF_FREE(program);
}
GF_FREE(rpc);
return;
}
struct rpc_clnt *
rpc_clnt_unref(struct rpc_clnt *rpc)
{
int count = 0;
if (!rpc)
return NULL;
count = GF_ATOMIC_DEC(rpc->refcount);
if (!count) {
rpc_clnt_trigger_destroy(rpc);
return NULL;
}
return rpc;
}
void
rpc_clnt_disable(struct rpc_clnt *rpc)
{
rpc_clnt_connection_t *conn = NULL;
rpc_transport_t *trans = NULL;
int unref = 0;
int ret = 0;
gf_boolean_t timer_unref = _gf_false;
gf_boolean_t reconnect_unref = _gf_false;
if (!rpc) {
goto out;
}
conn = &rpc->conn;
pthread_mutex_lock(&conn->lock);
{
rpc->disabled = 1;
if (conn->timer) {
ret = gf_timer_call_cancel(rpc->ctx, conn->timer);
/* If the event is not fired and it actually cancelled
* the timer, do the unref else registered call back
* function will take care of it.
*/
if (!ret)
timer_unref = _gf_true;
conn->timer = NULL;
}
if (conn->reconnect) {
ret = gf_timer_call_cancel(rpc->ctx, conn->reconnect);
if (!ret)
reconnect_unref = _gf_true;
conn->reconnect = NULL;
}
conn->connected = 0;
unref = rpc_clnt_remove_ping_timer_locked(rpc);
trans = conn->trans;
}
pthread_mutex_unlock(&conn->lock);
if (trans) {
rpc_transport_disconnect(trans, _gf_true);
/* The auth_value was being reset to AUTH_GLUSTERFS_v2.
* if (clnt->auth_value)
* clnt->auth_value = AUTH_GLUSTERFS_v2;
* It should not be reset here. The disconnect during
* portmap request can race with handshake. If handshake
* happens first and disconnect later, auth_value would set
* to default value and it never sets back to actual auth_value
* supported by server. But it's important to set to lower
* version supported in the case where the server downgrades.
* So moving this code to RPC_TRANSPORT_CONNECT. Note that
* CONNECT cannot race with handshake as by nature it is
* serialized with handhake. An handshake can happen only
* on a connected transport and hence its strictly serialized.
*/
}
if (unref)
rpc_clnt_unref(rpc);
if (timer_unref)
rpc_clnt_unref(rpc);
if (reconnect_unref)
rpc_clnt_unref(rpc);
out:
return;
}
void
rpc_clnt_disconnect(struct rpc_clnt *rpc)
{
rpc_clnt_connection_t *conn = NULL;
rpc_transport_t *trans = NULL;
int unref = 0;
int ret = 0;
gf_boolean_t timer_unref = _gf_false;
gf_boolean_t reconnect_unref = _gf_false;
if (!rpc)
goto out;
conn = &rpc->conn;
pthread_mutex_lock(&conn->lock);
{
rpc->disabled = 1;
if (conn->timer) {
ret = gf_timer_call_cancel(rpc->ctx, conn->timer);
/* If the event is not fired and it actually cancelled
* the timer, do the unref else registered call back
* function will take care of unref.
*/
if (!ret)
timer_unref = _gf_true;
conn->timer = NULL;
}
if (conn->reconnect) {
ret = gf_timer_call_cancel(rpc->ctx, conn->reconnect);
if (!ret)
reconnect_unref = _gf_true;
conn->reconnect = NULL;
}
conn->connected = 0;
unref = rpc_clnt_remove_ping_timer_locked(rpc);
trans = conn->trans;
}
pthread_mutex_unlock(&conn->lock);
if (trans) {
rpc_transport_disconnect(trans, _gf_true);
/* The auth_value was being reset to AUTH_GLUSTERFS_v2.
* if (clnt->auth_value)
* clnt->auth_value = AUTH_GLUSTERFS_v2;
* It should not be reset here. The disconnect during
* portmap request can race with handshake. If handshake
* happens first and disconnect later, auth_value would set
* to default value and it never sets back to actual auth_value
* supported by server. But it's important to set to lower
* version supported in the case where the server downgrades.
* So moving this code to RPC_TRANSPORT_CONNECT. Note that
* CONNECT cannot race with handshake as by nature it is
* serialized with handhake. An handshake can happen only
* on a connected transport and hence its strictly serialized.
*/
}
if (unref)
rpc_clnt_unref(rpc);
if (timer_unref)
rpc_clnt_unref(rpc);
if (reconnect_unref)
rpc_clnt_unref(rpc);
out:
return;
}
void
rpc_clnt_reconfig(struct rpc_clnt *rpc, struct rpc_clnt_config *config)
{
if (config->ping_timeout) {
if (config->ping_timeout != rpc->conn.ping_timeout)
gf_log(rpc->conn.name, GF_LOG_INFO,
"changing ping timeout to %d (from %d)",
config->ping_timeout, rpc->conn.ping_timeout);
pthread_mutex_lock(&rpc->conn.lock);
{
rpc->conn.ping_timeout = config->ping_timeout;
}
pthread_mutex_unlock(&rpc->conn.lock);
}
if (config->rpc_timeout) {
if (config->rpc_timeout != rpc->conn.config.rpc_timeout)
gf_log(rpc->conn.name, GF_LOG_INFO,
"changing timeout to %d (from %d)", config->rpc_timeout,
rpc->conn.config.rpc_timeout);
rpc->conn.config.rpc_timeout = config->rpc_timeout;
}
if (config->remote_port) {
if (config->remote_port != rpc->conn.config.remote_port)
gf_log(rpc->conn.name, GF_LOG_INFO, "changing port to %d (from %d)",
config->remote_port, rpc->conn.config.remote_port);
rpc->conn.config.remote_port = config->remote_port;
}
if (config->remote_host) {
if (rpc->conn.config.remote_host) {
if (strcmp(rpc->conn.config.remote_host, config->remote_host))
gf_log(rpc->conn.name, GF_LOG_INFO,
"changing hostname to %s (from %s)", config->remote_host,
rpc->conn.config.remote_host);
GF_FREE(rpc->conn.config.remote_host);
} else {
gf_log(rpc->conn.name, GF_LOG_INFO, "setting hostname to %s",
config->remote_host);
}
rpc->conn.config.remote_host = gf_strdup(config->remote_host);
}
}