/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
* Copyright (c) 1991-1999 University of Maryland at College Park
* Copyright (c) 2007-2012 Zmanda, Inc. All Rights Reserved.
* Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
* All Rights Reserved.
*
* Permission to use, copy, modify, distribute, and sell this software and its
* documentation for any purpose is hereby granted without fee, provided that
* the above copyright notice appear in all copies and that both that
* copyright notice and this permission notice appear in supporting
* documentation, and that the name of U.M. not be used in advertising or
* publicity pertaining to distribution of the software without specific,
* written prior permission. U.M. makes no representations about the
* suitability of this software for any purpose. It is provided "as is"
* without express or implied warranty.
*
* U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
* BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* Authors: the Amanda Development Team. Its members are listed in a
* file named AUTHORS, in the root directory of this distribution.
*/
/*
* $Id: protocol.c,v 1.45 2006/05/25 17:07:31 martinea Exp $
*
* implements amanda protocol
*/
#include "amanda.h"
#include "conffile.h"
#include "event.h"
#include "packet.h"
#include "security.h"
#include "protocol.h"
#define proto_debug(i, ...) do { \
if ((i) <= debug_protocol) { \
dbprintf(__VA_ARGS__); \
} \
} while (0)
#ifdef BSD_SECURITY
extern const security_driver_t bsd_security_driver;
#endif
#ifdef KRB5_SECURITY
extern const security_driver_t krb5_security_driver;
#endif
#ifdef RSH_SECURITY
extern const security_driver_t rsh_security_driver;
#endif
#ifdef SSH_SECURITY
extern const security_driver_t ssh_security_driver;
#endif
#ifdef BSDTCP_SECURITY
extern const security_driver_t bsdtcp_security_driver;
#endif
#ifdef SSL_SECURITY
extern const security_driver_t ssl_security_driver;
#endif
#ifdef BSDUDP_SECURITY
extern const security_driver_t bsdudp_security_driver;
#endif
extern const security_driver_t local_security_driver;
/*
* Valid actions that can be passed to the state machine
*/
typedef enum {
PA_START,
PA_TIMEOUT,
PA_ERROR,
PA_RCVDATA,
PA_CONTPEND,
PA_PENDING,
PA_CONTINUE,
PA_FINISH,
PA_ABORT
} p_action_t;
/*
* The current state type. States are represented as function
* vectors.
*/
struct proto;
typedef p_action_t (*pstate_t)(struct proto *, p_action_t, pkt_t *);
/*
* This is a request structure that is wrapped around a packet while it
* is being passed through amanda. It holds the timeouts, state, and handles
* for each request.
*/
typedef struct proto {
pstate_t state; /* current state of the request */
char *hostname; /* remote host */
const security_driver_t *security_driver; /* for connect retries */
security_handle_t *security_handle; /* network stream for this req */
time_t timeout; /* seconds for this timeout */
time_t repwait; /* seconds to wait for reply */
time_t origtime; /* orig start time of this request */
time_t curtime; /* time when this attempt started */
int connecttries; /* times we'll retry a connect */
int resettries; /* times we'll resend a REQ */
int reqtries; /* times we'll wait for an a ACK */
pkt_t req; /* the actual wire request */
protocol_sendreq_callback continuation; /* call when req dies/finishes */
void *datap; /* opaque cookie passed to above */
char *(*conf_fn)(char *, void *); /* configuration function */
security_status_t status;
event_handle_t *event_handle;
} proto_t;
#define CONNECT_WAIT 5 /* secs between connect attempts */
#define ACK_WAIT 10 /* time (secs) to wait for ACK - keep short */
#define RESET_TRIES 2 /* num restarts (reboot/crash) */
#define CURTIME (time(0) - proto_init_time) /* time relative to start */
/* if no reply in an hour, just forget it */
#define DROP_DEAD_TIME(t) (CURTIME - (t) > (60 * 60))
/*
* Initialization time
*/
static time_t proto_init_time;
static int nb_thread = 0;
static GMutex *protocol_mutex;
/* local functions */
static const char *action2str(p_action_t);
static const char *pstate2str(pstate_t);
static gpointer connect_thread(gpointer data);
static void connect_thread_callback(void *cookie,
security_handle_t * security_handle,
security_status_t status);
static void connect_callback(void *cookie);
static void connect_callbackX(void *, security_handle_t *, security_status_t);
static void connect_wait_callback(void *);
static void recvpkt_callback(void *, pkt_t *, security_status_t);
static p_action_t s_sendreq(proto_t *, p_action_t, pkt_t *);
static p_action_t s_ackwait(proto_t *, p_action_t, pkt_t *);
static p_action_t s_repwait(proto_t *, p_action_t, pkt_t *);
static void state_machine(proto_t *, p_action_t, pkt_t *);
/*
* -------------------
* Interface functions
*/
/*
* Initialize globals.
*/
void
protocol_init(void)
{
proto_init_time = time(NULL);
protocol_mutex = g_mutex_new();
}
/*
* Generate a request packet, and submit it to the state machine
* for transmission.
*/
void
protocol_sendreq(
const char * hostname,
const security_driver_t * security_driver,
char * (*conf_fn)(char *, void *),
const char * req,
time_t repwait,
protocol_sendreq_callback continuation,
void * datap)
{
proto_t *p;
char *platform = NULL;
char *distro = NULL;
p = g_malloc(sizeof(proto_t));
p->state = s_sendreq;
p->hostname = g_strdup(hostname);
p->security_driver = security_driver;
/* p->security_handle set in connect_callback */
p->repwait = repwait;
p->origtime = CURTIME;
/* p->curtime set in the sendreq state */
p->connecttries = getconf_int(CNF_CONNECT_TRIES);
p->resettries = RESET_TRIES;
p->reqtries = getconf_int(CNF_REQ_TRIES);
p->conf_fn = conf_fn;
pkt_init(&p->req, P_REQ, "%s", req);
/*
* These are here for the caller
* We call the continuation function after processing is complete.
* We pass the datap on through untouched. It is here so the caller
* has a way to keep state with each request.
*/
p->continuation = continuation;
p->datap = datap;
p->event_handle = NULL;
proto_debug(1, _("protocol: security_connect: host %s -> p %p\n"),
hostname, p);
get_platform_and_distro(&platform, &distro);
if (distro != NULL &&
!g_str_equal(distro, "mac") &&
#if defined HAVE_FUNC_GETSERVBYNAME_R_4 || defined HAVE_FUNC_GETSERVBYNAME_R_5 || defined HAVE_FUNC_GETSERVBYNAME_R_6
1 &&
#else
0 &&
#endif
(
#ifdef BSDTCP_SECURITY
security_driver == &bsdtcp_security_driver ||
#endif
security_driver == &local_security_driver ||
#ifdef RSH_SECURITY
security_driver == &rsh_security_driver ||
#endif
#ifdef SSL_SECURITY
security_driver == &ssl_security_driver ||
#endif
#ifdef SSH_SECURITY
security_driver == &ssh_security_driver ||
#endif
0)) {
g_thread_create(connect_thread, (gpointer)p, TRUE, NULL);
g_mutex_lock(protocol_mutex);
nb_thread++;
g_mutex_unlock(protocol_mutex);
} else {
// bsd_security_driver no connect,all use same socket
// bsdudp_security_driver no connect,all use same socket
// krb5_security_driver untested
security_connect(p->security_driver, p->hostname, p->conf_fn, connect_callbackX,
p, p->datap);
}
g_free(platform);
g_free(distro);
}
static gpointer
connect_thread(
gpointer data)
{
proto_t *p = (proto_t *)data;
security_connect(p->security_driver, p->hostname, p->conf_fn,
connect_thread_callback, p, p->datap);
g_mutex_lock(protocol_mutex);
nb_thread--;
g_mutex_unlock(protocol_mutex);
return NULL;
}
static void
connect_thread_callback(
void * cookie,
security_handle_t * security_handle,
security_status_t status)
{
proto_t *p = cookie;
p->security_handle = security_handle;
p->status = status;
g_mutex_lock(protocol_mutex);
p->event_handle = event_create((event_id_t)0, EV_TIME, connect_callback, p);
event_activate(p->event_handle);
g_mutex_unlock(protocol_mutex);
}
static void
connect_callbackX(
void * cookie,
security_handle_t * security_handle,
security_status_t status)
{
proto_t *p = cookie;
p->security_handle = security_handle;
p->status = status;
connect_callback(p);
}
/*
* This is a callback for security_connect. After the security layer
* has initiated a connection to the given host, this will be called
* with a security_handle_t.
*
* On error, the security_status_t arg will reflect errors which can
* be had via security_geterror on the handle.
*/
static void
connect_callback(
void *cookie)
{
proto_t *p = cookie;
assert(p != NULL);
if (p->event_handle) {
event_release(p->event_handle);
p->event_handle = 0;
}
proto_debug(1, _("protocol: connect_callback: p %p\n"), p);
switch (p->status) {
case S_OK:
state_machine(p, PA_START, NULL);
break;
case S_TIMEOUT:
security_seterror(p->security_handle, _("timeout during connect"));
/* FALLTHROUGH */
case S_ERROR:
/*
* For timeouts or errors, retry a few times, waiting CONNECT_WAIT
* seconds between each attempt. If they all fail, just return
* an error back to the caller.
*/
if (--p->connecttries == 0) {
state_machine(p, PA_ABORT, NULL);
} else {
proto_debug(1, _("protocol: connect_callback: p %p: retrying %s\n"),
p, p->hostname);
security_close(p->security_handle);
/* XXX overload p->security handle to hold the event handle */
p->security_handle =
(security_handle_t *)event_create(CONNECT_WAIT, EV_TIME,
connect_wait_callback, p);
event_activate((event_handle_t *) p->security_handle);
}
break;
default:
assert(0);
break;
}
}
/*
* This gets called when a host has been put on a wait queue because
* initial connection attempts failed.
*/
static void
connect_wait_callback(
void * cookie)
{
proto_t *p = cookie;
event_release((event_handle_t *)p->security_handle);
if (
#ifdef BSDTCP_SECURITY
p->security_driver == &bsdtcp_security_driver ||
#endif
p->security_driver == &local_security_driver ||
#ifdef RSH_SECURITY
p->security_driver == &rsh_security_driver ||
#endif
#ifdef SSL_SECURITY
p->security_driver == &ssl_security_driver ||
#endif
#ifdef SSH_SECURITY
p->security_driver == &ssh_security_driver ||
#endif
0) {
g_thread_create(connect_thread, (gpointer)p, TRUE, NULL);
g_mutex_lock(protocol_mutex);
nb_thread++;
g_mutex_unlock(protocol_mutex);
} else {
// bsd_security_driver no connect,all use same socket
// bsdudp_security_driver no connect,all use same socket
// krb5_security_driver untested
security_connect(p->security_driver, p->hostname, p->conf_fn, connect_callbackX,
p, p->datap);
}
}
/*
* Does a one pass protocol sweep. Handles any incoming packets that
* are waiting to be processed, and then deals with any pending
* requests that have timed out.
*
* Callers should periodically call this after they have submitted
* requests if they plan on doing a lot of work.
*/
void
protocol_check(void)
{
/* arg == 1 means don't block */
event_loop(1);
}
/*
* Does an infinite pass protocol sweep. This doesn't return until all
* requests have been satisfied or have timed out.
*
* Callers should call this after they have finished submitting requests
* and are just waiting for all of the answers to come back.
*/
void
protocol_run(void)
{
/* arg == 0 means block forever until no more events are left */
event_loop(0);
g_mutex_lock(protocol_mutex);
while (nb_thread > 0) {
g_mutex_unlock(protocol_mutex);
sleep(1);
event_loop(0);
g_mutex_lock(protocol_mutex);
}
g_mutex_unlock(protocol_mutex);
}
/*
* ------------------
* Internal functions
*/
/*
* The guts of the protocol. This handles the many paths a request can
* make, including retrying the request and acknowledgements, and dealing
* with timeouts and successfull replies.
*/
static void
state_machine(
proto_t * p,
p_action_t action,
pkt_t * pkt)
{
pstate_t curstate;
p_action_t retaction;
proto_debug(1, _("protocol: state_machine: initial: p %p action %s pkt %p\n"),
p, action2str(action), (void *)NULL);
assert(p != NULL);
assert(action == PA_RCVDATA || pkt == NULL);
assert(p->state != NULL);
for (;;) {
proto_debug(1, _("protocol: state_machine: p %p state %s action %s\n"),
p, pstate2str(p->state), action2str(action));
if (pkt != NULL) {
proto_debug(1, _("protocol: pkt: %s (t %d) orig REQ (t %d cur %d)\n"),
pkt_type2str(pkt->type), (int)CURTIME,
(int)p->origtime, (int)p->curtime);
proto_debug(1, _("protocol: pkt contents:\n-----\n%s-----\n"),
pkt->body);
}
/*
* p->state is a function pointer to the current state a request
* is in.
*
* We keep track of the last state we were in so we can make
* sure states which return PA_CONTINUE really have transitioned
* the request to a new state.
*/
curstate = p->state;
if (action == PA_ABORT)
/*
* If the passed action indicates a terminal error, then we
* need to move to abort right away.
*/
retaction = PA_ABORT;
else
/*
* Else we run the state and perform the action it
* requests.
*/
retaction = (*curstate)(p, action, pkt);
proto_debug(1, _("protocol: state_machine: p %p state %s returned %s\n"),
p, pstate2str(p->state), action2str(retaction));
/*
* The state function is expected to return one of the following
* p_action_t's.
*/
switch (retaction) {
/*
* Request is still waiting for more data off of the network.
* Setup to receive another pkt, and wait for the recv event
* to occur.
*/
case PA_CONTPEND:
(*p->continuation)(p->datap, pkt, p->security_handle);
/* FALLTHROUGH */
case PA_PENDING:
proto_debug(1, _("protocol: state_machine: p %p state %s: timeout %d\n"),
p, pstate2str(p->state), (int)p->timeout);
/*
* Get the security layer to register a receive event for this
* security handle on our behalf. Have it timeout in p->timeout
* seconds.
*/
security_recvpkt(p->security_handle, recvpkt_callback, p,
(int)p->timeout);
return;
/*
* Request has moved to another state. Loop and run it again.
*/
case PA_CONTINUE:
assert(p->state != curstate);
proto_debug(1, _("protocol: state_machine: p %p: moved from %s to %s\n"),
p, pstate2str(curstate),
pstate2str(p->state));
continue;
/*
* Request has failed in some way locally. The security_handle will
* contain an appropriate error message via security_geterror(). Set
* pkt to NULL to indicate failure to the callback, and then
* fall through to the common finish code.
*
* Note that remote failures finish via PA_FINISH, because they did
* complete successfully locally.
*/
case PA_ABORT:
pkt = NULL;
/* FALLTHROUGH */
/*
* Request has completed successfully.
* Free up resources the request has used, call the continuation
* function specified by the caller and quit.
*/
case PA_FINISH:
(*p->continuation)(p->datap, pkt, p->security_handle);
security_close(p->security_handle);
amfree(p->hostname);
amfree(p->req.body);
amfree(p);
return;
default:
assert(0);
break; /* in case asserts are turned off */
}
/*NOTREACHED*/
}
/*NOTREACHED*/
}
/*
* The request send state. Here, the packet is actually transmitted
* across the network. After setting up timeouts, the request
* moves to the acknowledgement wait state. We return from the state
* machine at this point, and let the request be received from the network.
*/
static p_action_t
s_sendreq(
proto_t * p,
p_action_t action,
pkt_t * pkt)
{
assert(p != NULL);
(void)action; /* Quiet unused parameter warning */
(void)pkt; /* Quiet unused parameter warning */
if (security_sendpkt(p->security_handle, &p->req) < 0) {
/* XXX should retry */
security_seterror(p->security_handle, _("error sending REQ: %s"),
security_geterror(p->security_handle));
return (PA_ABORT);
}
/*
* Remember when this request was first sent
*/
p->curtime = CURTIME;
/*
* Move to the ackwait state
*/
p->state = s_ackwait;
p->timeout = ACK_WAIT;
return (PA_PENDING);
}
/*
* The acknowledge wait state. We can enter here two ways:
*
* - the caller has received a packet, located the request for
* that packet, and called us with an action of PA_RCVDATA.
*
* - the caller has determined that a request has timed out,
* and has called us with PA_TIMEOUT.
*
* Here we process the acknowledgment, which usually means that
* the client has agreed to our request and is working on it.
* It will later send a reply when finished.
*/
static p_action_t
s_ackwait(
proto_t * p,
p_action_t action,
pkt_t * pkt)
{
assert(p != NULL);
/*
* The timeout case. If our retry count has gone to zero
* fail this request. Otherwise, move to the send state
* to retry the request.
*/
if (action == PA_TIMEOUT) {
assert(pkt == NULL);
if (--p->reqtries == 0) {
security_seterror(p->security_handle, _("timeout waiting for ACK"));
return (PA_ABORT);
}
p->state = s_sendreq;
return (PA_CONTINUE);
}
assert(action == PA_RCVDATA);
assert(pkt != NULL);
/*
* The packet-received state. Determine what kind of
* packet we received, and act based on the reply type.
*/
switch (pkt->type) {
/*
* Received an ACK. Everything's good. The client is
* now working on the request. We queue up again and
* wait for the reply.
*/
case P_ACK:
p->state = s_repwait;
p->timeout = p->repwait;
return (PA_PENDING);
/*
* Received a NAK. The request failed, so free up the
* resources associated with it and return.
*
* This should NOT return PA_ABORT because it is not a local failure.
*/
case P_NAK:
return (PA_FINISH);
/*
* The client skipped the ACK, and replied right away.
* Move to the reply state to handle it.
*/
case P_REP:
case P_PREP:
p->state = s_repwait;
return (PA_CONTINUE);
/*
* Unexpected packet. Requeue this request and hope
* we get what we want later.
*/
default:
return (PA_PENDING);
}
}
/*
* The reply wait state. We enter here much like we do with s_ackwait.
*/
static p_action_t
s_repwait(
proto_t * p,
p_action_t action,
pkt_t * pkt)
{
pkt_t ack;
/*
* Timeout waiting for a reply.
*/
if (action == PA_TIMEOUT) {
assert(pkt == NULL);
/*
* If we've blown our timeout limit, free up this packet and
* return.
*/
if (p->resettries == 0 || DROP_DEAD_TIME(p->origtime)) {
security_seterror(p->security_handle, _("timeout waiting for REP"));
return (PA_ABORT);
}
/*
* We still have some tries left. Resend the request.
*/
p->resettries--;
p->state = s_sendreq;
p->reqtries = getconf_int(CNF_REQ_TRIES);
return (PA_CONTINUE);
}
assert(action == PA_RCVDATA);
/* Finish if we get a NAK */
if (pkt->type == P_NAK)
return (PA_FINISH);
/*
* We've received some data. If we didn't get a reply,
* requeue the packet and retry. Otherwise, acknowledge
* the reply, cleanup this packet, and return.
*/
if (pkt->type != P_REP && pkt->type != P_PREP)
return (PA_PENDING);
if(pkt->type == P_REP) {
pkt_init_empty(&ack, P_ACK);
if (security_sendpkt(p->security_handle, &ack) < 0) {
/* XXX should retry */
amfree(ack.body);
security_seterror(p->security_handle, _("error sending ACK: %s"),
security_geterror(p->security_handle));
return (PA_ABORT);
}
amfree(ack.body);
return (PA_FINISH);
}
else if(pkt->type == P_PREP) {
p->timeout = p->repwait - CURTIME + p->curtime + 1;
if (p->timeout <= 0)
p->timeout = 1;
return (PA_CONTPEND);
}
/* should never go here, shut up compiler warning */
return (PA_FINISH);
}
/*
* event callback that receives a packet
*/
static void
recvpkt_callback(
void * cookie,
pkt_t * pkt,
security_status_t status)
{
proto_t *p = cookie;
assert(p != NULL);
switch (status) {
case S_OK:
state_machine(p, PA_RCVDATA, pkt);
break;
case S_TIMEOUT:
state_machine(p, PA_TIMEOUT, NULL);
break;
case S_ERROR:
state_machine(p, PA_ABORT, NULL);
break;
default:
assert(0);
break;
}
}
/*
* --------------
* Misc functions
*/
/*
* Convert a pstate_t into a printable form.
*/
static const char *
pstate2str(
pstate_t pstate)
{
static const struct {
pstate_t type;
const char name[12];
} pstates[] = {
#define X(s) { s, stringize(s) }
X(s_sendreq),
X(s_ackwait),
X(s_repwait),
#undef X
};
guint i;
for (i = 0; i < G_N_ELEMENTS(pstates); i++)
if (pstate == pstates[i].type)
return (pstates[i].name);
return (_("BOGUS PSTATE"));
}
/*
* Convert an p_action_t into a printable form
*/
static const char *
action2str(
p_action_t action)
{
static const struct {
p_action_t type;
const char name[12];
} actions[] = {
#define X(s) { s, stringize(s) }
X(PA_START),
X(PA_TIMEOUT),
X(PA_ERROR),
X(PA_RCVDATA),
X(PA_CONTPEND),
X(PA_PENDING),
X(PA_CONTINUE),
X(PA_FINISH),
X(PA_ABORT),
#undef X
};
guint i;
for (i = 0; i < G_N_ELEMENTS(actions); i++)
if (action == actions[i].type)
return (actions[i].name);
return (_("BOGUS ACTION"));
}