Blob Blame History Raw
/*
 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
 * Copyright (c) 1991-1999 University of Maryland at College Park
 * Copyright (c) 2007-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 * All Rights Reserved.
 *
 * Permission to use, copy, modify, distribute, and sell this software and its
 * documentation for any purpose is hereby granted without fee, provided that
 * the above copyright notice appear in all copies and that both that
 * copyright notice and this permission notice appear in supporting
 * documentation, and that the name of U.M. not be used in advertising or
 * publicity pertaining to distribution of the software without specific,
 * written prior permission.  U.M. makes no representations about the
 * suitability of this software for any purpose.  It is provided "as is"
 * without express or implied warranty.
 *
 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 *
 * Authors: the Amanda Development Team.  Its members are listed in a
 * file named AUTHORS, in the root directory of this distribution.
 */
/*
 * $Id: protocol.c,v 1.45 2006/05/25 17:07:31 martinea Exp $
 *
 * implements amanda protocol
 */
#include "amanda.h"
#include "conffile.h"
#include "event.h"
#include "packet.h"
#include "security.h"
#include "protocol.h"

#define proto_debug(i, ...) do {	\
       if ((i) <= debug_protocol) {	\
           dbprintf(__VA_ARGS__);	\
       }				\
} while (0)

#ifdef BSD_SECURITY
extern const security_driver_t bsd_security_driver;
#endif
#ifdef KRB5_SECURITY
extern const security_driver_t krb5_security_driver;
#endif
#ifdef RSH_SECURITY
extern const security_driver_t rsh_security_driver;
#endif
#ifdef SSH_SECURITY
extern const security_driver_t ssh_security_driver;
#endif
#ifdef BSDTCP_SECURITY
extern const security_driver_t bsdtcp_security_driver;
#endif
#ifdef SSL_SECURITY
extern const security_driver_t ssl_security_driver;
#endif
#ifdef BSDUDP_SECURITY
extern const security_driver_t bsdudp_security_driver;
#endif
extern const security_driver_t local_security_driver;

/*
 * Valid actions that can be passed to the state machine
 */
typedef enum {
	PA_START,
	PA_TIMEOUT,
	PA_ERROR,
	PA_RCVDATA,
	PA_CONTPEND,
	PA_PENDING,
	PA_CONTINUE,
	PA_FINISH,
	PA_ABORT
} p_action_t;

/*
 * The current state type.  States are represented as function
 * vectors.
 */
struct proto;
typedef p_action_t (*pstate_t)(struct proto *, p_action_t, pkt_t *);

/*
 * This is a request structure that is wrapped around a packet while it
 * is being passed through amanda.  It holds the timeouts, state, and handles
 * for each request.
 */
typedef struct proto {
    pstate_t state;			/* current state of the request */
    char *hostname;			/* remote host */
    const security_driver_t *security_driver;	/* for connect retries */
    security_handle_t *security_handle;	/* network stream for this req */
    time_t timeout;			/* seconds for this timeout */
    time_t repwait;			/* seconds to wait for reply */
    time_t origtime;			/* orig start time of this request */
    time_t curtime;			/* time when this attempt started */
    int connecttries;			/* times we'll retry a connect */
    int resettries;			/* times we'll resend a REQ */
    int reqtries;			/* times we'll wait for an a ACK */
    pkt_t req;				/* the actual wire request */
    protocol_sendreq_callback continuation; /* call when req dies/finishes */
    void *datap;			/* opaque cookie passed to above */
    char *(*conf_fn)(char *, void *);	/* configuration function */
    security_status_t  status;
    event_handle_t    *event_handle;
} proto_t;

#define	CONNECT_WAIT	5	/* secs between connect attempts */
#define ACK_WAIT	10	/* time (secs) to wait for ACK - keep short */
#define RESET_TRIES	2	/* num restarts (reboot/crash) */
#define CURTIME	(time(0) - proto_init_time) /* time relative to start */

/* if no reply in an hour, just forget it */
#define	DROP_DEAD_TIME(t)	(CURTIME - (t) > (60 * 60))

/*
 * Initialization time
 */
static time_t proto_init_time;
static int nb_thread = 0;
static GMutex *protocol_mutex;

/* local functions */

static const char *action2str(p_action_t);
static const char *pstate2str(pstate_t);

static gpointer connect_thread(gpointer data);
static void connect_thread_callback(void *cookie,
				    security_handle_t *	security_handle,
				    security_status_t	status);

static void connect_callback(void *cookie);
static void connect_callbackX(void *, security_handle_t *, security_status_t);
static void connect_wait_callback(void *);
static void recvpkt_callback(void *, pkt_t *, security_status_t);

static p_action_t s_sendreq(proto_t *, p_action_t, pkt_t *);
static p_action_t s_ackwait(proto_t *, p_action_t, pkt_t *);
static p_action_t s_repwait(proto_t *, p_action_t, pkt_t *);
static void state_machine(proto_t *, p_action_t, pkt_t *);

/*
 * -------------------
 * Interface functions
 */

/*
 * Initialize globals.
 */
void
protocol_init(void)
{

    proto_init_time = time(NULL);
    protocol_mutex = g_mutex_new();
}

/*
 * Generate a request packet, and submit it to the state machine
 * for transmission.
 */
void
protocol_sendreq(
    const char *		hostname,
    const security_driver_t *	security_driver,
    char *			(*conf_fn)(char *, void *),
    const char *		req,
    time_t			repwait,
    protocol_sendreq_callback	continuation,
    void *			datap)
{
    proto_t *p;
    char    *platform = NULL;
    char    *distro = NULL;

    p = g_malloc(sizeof(proto_t));
    p->state = s_sendreq;
    p->hostname = g_strdup(hostname);
    p->security_driver = security_driver;
    /* p->security_handle set in connect_callback */
    p->repwait = repwait;
    p->origtime = CURTIME;
    /* p->curtime set in the sendreq state */
    p->connecttries = getconf_int(CNF_CONNECT_TRIES);
    p->resettries = RESET_TRIES;
    p->reqtries = getconf_int(CNF_REQ_TRIES);
    p->conf_fn = conf_fn;
    pkt_init(&p->req, P_REQ, "%s", req);

    /*
     * These are here for the caller
     * We call the continuation function after processing is complete.
     * We pass the datap on through untouched.  It is here so the caller
     * has a way to keep state with each request.
     */
    p->continuation = continuation;
    p->datap = datap;
    p->event_handle = NULL;

    proto_debug(1, _("protocol: security_connect: host %s -> p %p\n"),
		    hostname, p);

    get_platform_and_distro(&platform, &distro);
    if (distro != NULL &&
	!g_str_equal(distro, "mac") &&
#if defined HAVE_FUNC_GETSERVBYNAME_R_4 || defined HAVE_FUNC_GETSERVBYNAME_R_5 || defined HAVE_FUNC_GETSERVBYNAME_R_6
	1 &&
#else
	0 &&
#endif
	(
#ifdef BSDTCP_SECURITY
	 security_driver == &bsdtcp_security_driver ||
#endif
	 security_driver == &local_security_driver ||
#ifdef RSH_SECURITY
	 security_driver == &rsh_security_driver ||
#endif
#ifdef SSL_SECURITY
	 security_driver == &ssl_security_driver ||
#endif
#ifdef SSH_SECURITY
	 security_driver == &ssh_security_driver ||
#endif
	 0)) {
	g_thread_create(connect_thread, (gpointer)p, TRUE, NULL);
	g_mutex_lock(protocol_mutex);
	nb_thread++;
	g_mutex_unlock(protocol_mutex);
    } else {
	// bsd_security_driver		no connect,all use same socket
	// bsdudp_security_driver	no connect,all use same socket
	// krb5_security_driver		untested
	security_connect(p->security_driver, p->hostname, p->conf_fn, connect_callbackX,
			 p, p->datap);
    }
    g_free(platform);
    g_free(distro);
}

static gpointer
connect_thread(
    gpointer data)
{
    proto_t *p = (proto_t *)data;

    security_connect(p->security_driver, p->hostname, p->conf_fn,
		     connect_thread_callback, p, p->datap);
    g_mutex_lock(protocol_mutex);
    nb_thread--;
    g_mutex_unlock(protocol_mutex);
    return NULL;
}

static void
connect_thread_callback(
    void *		cookie,
    security_handle_t *	security_handle,
    security_status_t	status)
{
    proto_t *p = cookie;
    p->security_handle = security_handle;
    p->status = status;

    g_mutex_lock(protocol_mutex);
    p->event_handle = event_create((event_id_t)0, EV_TIME, connect_callback, p);
    event_activate(p->event_handle);
    g_mutex_unlock(protocol_mutex);
}

static void
connect_callbackX(
    void *		cookie,
    security_handle_t *	security_handle,
    security_status_t	status)
{
    proto_t *p = cookie;
    p->security_handle = security_handle;
    p->status = status;
    connect_callback(p);
}

/*
 * This is a callback for security_connect.  After the security layer
 * has initiated a connection to the given host, this will be called
 * with a security_handle_t.
 *
 * On error, the security_status_t arg will reflect errors which can
 * be had via security_geterror on the handle.
 */
static void
connect_callback(
    void *cookie)
{
    proto_t *p = cookie;

    assert(p != NULL);
    if (p->event_handle) {
	event_release(p->event_handle);
	p->event_handle = 0;
    }

    proto_debug(1, _("protocol: connect_callback: p %p\n"), p);

    switch (p->status) {
    case S_OK:
	state_machine(p, PA_START, NULL);
	break;

    case S_TIMEOUT:
	security_seterror(p->security_handle, _("timeout during connect"));
	/* FALLTHROUGH */

    case S_ERROR:
	/*
	 * For timeouts or errors, retry a few times, waiting CONNECT_WAIT
	 * seconds between each attempt.  If they all fail, just return
	 * an error back to the caller.
	 */
	if (--p->connecttries == 0) {
	    state_machine(p, PA_ABORT, NULL);
	} else {
	    proto_debug(1, _("protocol: connect_callback: p %p: retrying %s\n"),
			    p, p->hostname);
	    security_close(p->security_handle);
	    /* XXX overload p->security handle to hold the event handle */
	    p->security_handle =
		(security_handle_t *)event_create(CONNECT_WAIT, EV_TIME,
		connect_wait_callback, p);
	    event_activate((event_handle_t *) p->security_handle);
	}
	break;

    default:
	assert(0);
	break;
    }
}

/*
 * This gets called when a host has been put on a wait queue because
 * initial connection attempts failed.
 */
static void
connect_wait_callback(
    void *	cookie)
{
    proto_t *p = cookie;

    event_release((event_handle_t *)p->security_handle);
    if (
#ifdef BSDTCP_SECURITY
	 p->security_driver == &bsdtcp_security_driver ||
#endif
	 p->security_driver == &local_security_driver ||
#ifdef RSH_SECURITY
	 p->security_driver == &rsh_security_driver ||
#endif
#ifdef SSL_SECURITY
	 p->security_driver == &ssl_security_driver ||
#endif
#ifdef SSH_SECURITY
	 p->security_driver == &ssh_security_driver ||
#endif
	 0) {
	g_thread_create(connect_thread, (gpointer)p, TRUE, NULL);
	g_mutex_lock(protocol_mutex);
	nb_thread++;
	g_mutex_unlock(protocol_mutex);
    } else {
	// bsd_security_driver		no connect,all use same socket
	// bsdudp_security_driver	no connect,all use same socket
	// krb5_security_driver		untested
	security_connect(p->security_driver, p->hostname, p->conf_fn, connect_callbackX,
			 p, p->datap);
    }
}


/*
 * Does a one pass protocol sweep.  Handles any incoming packets that 
 * are waiting to be processed, and then deals with any pending
 * requests that have timed out.
 *
 * Callers should periodically call this after they have submitted
 * requests if they plan on doing a lot of work.
 */
void
protocol_check(void)
{

    /* arg == 1 means don't block */
    event_loop(1);
}


/*
 * Does an infinite pass protocol sweep.  This doesn't return until all
 * requests have been satisfied or have timed out.
 *
 * Callers should call this after they have finished submitting requests
 * and are just waiting for all of the answers to come back.
 */
void
protocol_run(void)
{

    /* arg == 0 means block forever until no more events are left */
    event_loop(0);
    g_mutex_lock(protocol_mutex);
    while (nb_thread > 0) {
	g_mutex_unlock(protocol_mutex);
	sleep(1);
	event_loop(0);
	g_mutex_lock(protocol_mutex);
    }
    g_mutex_unlock(protocol_mutex);
}


/*
 * ------------------
 * Internal functions
 */

/*
 * The guts of the protocol.  This handles the many paths a request can
 * make, including retrying the request and acknowledgements, and dealing
 * with timeouts and successfull replies.
 */
static void
state_machine(
    proto_t *	p,
    p_action_t	action,
    pkt_t *	pkt)
{
    pstate_t curstate;
    p_action_t retaction;

    proto_debug(1, _("protocol: state_machine: initial: p %p action %s pkt %p\n"),
		    p, action2str(action), (void *)NULL);

    assert(p != NULL);
    assert(action == PA_RCVDATA || pkt == NULL);
    assert(p->state != NULL);

    for (;;) {
	proto_debug(1, _("protocol: state_machine: p %p state %s action %s\n"),
			p, pstate2str(p->state), action2str(action));
	if (pkt != NULL) {
	    proto_debug(1, _("protocol: pkt: %s (t %d) orig REQ (t %d cur %d)\n"),
			    pkt_type2str(pkt->type), (int)CURTIME,
			    (int)p->origtime, (int)p->curtime);
	    proto_debug(1, _("protocol: pkt contents:\n-----\n%s-----\n"),
			    pkt->body);
	}

	/*
	 * p->state is a function pointer to the current state a request
	 * is in.
	 *
	 * We keep track of the last state we were in so we can make
	 * sure states which return PA_CONTINUE really have transitioned
	 * the request to a new state.
	 */
	curstate = p->state;

	if (action == PA_ABORT)
	    /*
	     * If the passed action indicates a terminal error, then we
	     * need to move to abort right away.
	     */
	    retaction = PA_ABORT;
	else
	    /*
	     * Else we run the state and perform the action it
	     * requests.
	     */
	    retaction = (*curstate)(p, action, pkt);

	proto_debug(1, _("protocol: state_machine: p %p state %s returned %s\n"),
			p, pstate2str(p->state), action2str(retaction));

	/*
	 * The state function is expected to return one of the following
	 * p_action_t's.
	 */
	switch (retaction) {

	/*
	 * Request is still waiting for more data off of the network.
	 * Setup to receive another pkt, and wait for the recv event
	 * to occur.
	 */
	case PA_CONTPEND:
	    (*p->continuation)(p->datap, pkt, p->security_handle);
	    /* FALLTHROUGH */

	case PA_PENDING:
	    proto_debug(1, _("protocol: state_machine: p %p state %s: timeout %d\n"),
			    p, pstate2str(p->state), (int)p->timeout);
	    /*
	     * Get the security layer to register a receive event for this
	     * security handle on our behalf.  Have it timeout in p->timeout
	     * seconds.
	     */
	    security_recvpkt(p->security_handle, recvpkt_callback, p,
		(int)p->timeout);

	    return;

	/*
	 * Request has moved to another state.  Loop and run it again.
	 */
	case PA_CONTINUE:
	    assert(p->state != curstate);
	    proto_debug(1, _("protocol: state_machine: p %p: moved from %s to %s\n"),
			    p, pstate2str(curstate),
			    pstate2str(p->state));
	    continue;

	/*
	 * Request has failed in some way locally.  The security_handle will
	 * contain an appropriate error message via security_geterror().  Set
	 * pkt to NULL to indicate failure to the callback, and then
	 * fall through to the common finish code.
	 *
	 * Note that remote failures finish via PA_FINISH, because they did
	 * complete successfully locally.
	 */
	case PA_ABORT:
	    pkt = NULL;
	    /* FALLTHROUGH */

	/*
	 * Request has completed successfully.
	 * Free up resources the request has used, call the continuation
	 * function specified by the caller and quit.
	 */
	case PA_FINISH:
	    (*p->continuation)(p->datap, pkt, p->security_handle);
	    security_close(p->security_handle);
	    amfree(p->hostname);
	    amfree(p->req.body);
	    amfree(p);
	    return;

	default:
	    assert(0);
	    break;	/* in case asserts are turned off */
	}
	/*NOTREACHED*/
    }
    /*NOTREACHED*/
}

/*
 * The request send state.  Here, the packet is actually transmitted
 * across the network.  After setting up timeouts, the request
 * moves to the acknowledgement wait state.  We return from the state
 * machine at this point, and let the request be received from the network.
 */
static p_action_t
s_sendreq(
    proto_t *	p,
    p_action_t	action,
    pkt_t *	pkt)
{

    assert(p != NULL);
    (void)action;	/* Quiet unused parameter warning */
    (void)pkt;		/* Quiet unused parameter warning */

    if (security_sendpkt(p->security_handle, &p->req) < 0) {
	/* XXX should retry */
	security_seterror(p->security_handle, _("error sending REQ: %s"),
	    security_geterror(p->security_handle));
	return (PA_ABORT);
    }

    /*
     * Remember when this request was first sent
     */
    p->curtime = CURTIME;

    /*
     * Move to the ackwait state
     */
    p->state = s_ackwait;
    p->timeout = ACK_WAIT;
    return (PA_PENDING);
}

/*
 * The acknowledge wait state.  We can enter here two ways:
 *
 *  - the caller has received a packet, located the request for
 *    that packet, and called us with an action of PA_RCVDATA.
 *    
 *  - the caller has determined that a request has timed out,
 *    and has called us with PA_TIMEOUT.
 *
 * Here we process the acknowledgment, which usually means that
 * the client has agreed to our request and is working on it.
 * It will later send a reply when finished.
 */
static p_action_t
s_ackwait(
    proto_t *	p,
    p_action_t	action,
    pkt_t *	pkt)
{

    assert(p != NULL);

    /*
     * The timeout case.  If our retry count has gone to zero
     * fail this request.  Otherwise, move to the send state
     * to retry the request.
     */
    if (action == PA_TIMEOUT) {
	assert(pkt == NULL);

	if (--p->reqtries == 0) {
	    security_seterror(p->security_handle, _("timeout waiting for ACK"));
	    return (PA_ABORT);
	}

	p->state = s_sendreq;
	return (PA_CONTINUE);
    }

    assert(action == PA_RCVDATA);
    assert(pkt != NULL);

    /*
     * The packet-received state.  Determine what kind of
     * packet we received, and act based on the reply type.
     */
    switch (pkt->type) {

    /*
     * Received an ACK.  Everything's good.  The client is
     * now working on the request.  We queue up again and
     * wait for the reply.
     */
    case P_ACK:
	p->state = s_repwait;
	p->timeout = p->repwait;
	return (PA_PENDING);

    /*
     * Received a NAK.  The request failed, so free up the
     * resources associated with it and return.
     *
     * This should NOT return PA_ABORT because it is not a local failure.
     */
    case P_NAK:
	return (PA_FINISH);

    /*
     * The client skipped the ACK, and replied right away.
     * Move to the reply state to handle it.
     */
    case P_REP:
    case P_PREP:
	p->state = s_repwait;
	return (PA_CONTINUE);

    /*
     * Unexpected packet.  Requeue this request and hope
     * we get what we want later.
     */
    default:
	return (PA_PENDING);
    }
}

/*
 * The reply wait state.  We enter here much like we do with s_ackwait.
 */
static p_action_t
s_repwait(
    proto_t *	p,
    p_action_t	action,
    pkt_t *	pkt)
{
    pkt_t ack;

    /*
     * Timeout waiting for a reply.
     */
    if (action == PA_TIMEOUT) {
	assert(pkt == NULL);

	/*
	 * If we've blown our timeout limit, free up this packet and
	 * return.
	 */
	if (p->resettries == 0 || DROP_DEAD_TIME(p->origtime)) {
	    security_seterror(p->security_handle, _("timeout waiting for REP"));
	    return (PA_ABORT);
	}

	/*
	 * We still have some tries left.  Resend the request.
	 */
	p->resettries--;
	p->state = s_sendreq;
	p->reqtries = getconf_int(CNF_REQ_TRIES);
	return (PA_CONTINUE);
    }

    assert(action == PA_RCVDATA);

    /* Finish if we get a NAK */
    if (pkt->type == P_NAK)
	return (PA_FINISH);

    /*
     * We've received some data.  If we didn't get a reply,
     * requeue the packet and retry.  Otherwise, acknowledge
     * the reply, cleanup this packet, and return.
     */
    if (pkt->type != P_REP && pkt->type != P_PREP)
	return (PA_PENDING);

    if(pkt->type == P_REP) {
	pkt_init_empty(&ack, P_ACK);
	if (security_sendpkt(p->security_handle, &ack) < 0) {
	    /* XXX should retry */
	    amfree(ack.body);
	    security_seterror(p->security_handle, _("error sending ACK: %s"),
		security_geterror(p->security_handle));
	    return (PA_ABORT);
	}
	amfree(ack.body);
	return (PA_FINISH);
    }
    else if(pkt->type == P_PREP) {
	p->timeout = p->repwait - CURTIME + p->curtime + 1;
	if (p->timeout <= 0)
	    p->timeout = 1;
	return (PA_CONTPEND);
    }

    /* should never go here, shut up compiler warning */
    return (PA_FINISH);
}

/*
 * event callback that receives a packet
 */
static void
recvpkt_callback(
    void *		cookie,
    pkt_t *		pkt,
    security_status_t	status)
{
    proto_t *p = cookie;

    assert(p != NULL);

    switch (status) {
    case S_OK:
	state_machine(p, PA_RCVDATA, pkt);
	break;
    case S_TIMEOUT:
	state_machine(p, PA_TIMEOUT, NULL);
	break;
    case S_ERROR:
	state_machine(p, PA_ABORT, NULL);
	break;
    default:
	assert(0);
	break;
    }
}

/*
 * --------------
 * Misc functions
 */

/*
 * Convert a pstate_t into a printable form.
 */
static const char *
pstate2str(
    pstate_t	pstate)
{
    static const struct {
	pstate_t type;
	const char name[12];
    } pstates[] = {
#define	X(s)	{ s, stringize(s) }
	X(s_sendreq),
	X(s_ackwait),
	X(s_repwait),
#undef X
    };

    guint i;

    for (i = 0; i < G_N_ELEMENTS(pstates); i++)
	if (pstate == pstates[i].type)
	    return (pstates[i].name);
    return (_("BOGUS PSTATE"));
}

/*
 * Convert an p_action_t into a printable form
 */
static const char *
action2str(
    p_action_t	action)
{
    static const struct {
	p_action_t type;
	const char name[12];
    } actions[] = {
#define	X(s)	{ s, stringize(s) }
	X(PA_START),
	X(PA_TIMEOUT),
	X(PA_ERROR),
	X(PA_RCVDATA),
	X(PA_CONTPEND),
	X(PA_PENDING),
	X(PA_CONTINUE),
	X(PA_FINISH),
	X(PA_ABORT),
#undef X
    };

    guint i;

    for (i = 0; i < G_N_ELEMENTS(actions); i++)
	if (action == actions[i].type)
	    return (actions[i].name);
    return (_("BOGUS ACTION"));
}