Blob Blame History Raw
/*
 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
 * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 *
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
 */

#include "amanda.h"
#include "amxfer.h"
#include "xfer-device.h"
#include "conffile.h"

/* A transfer destination that writes and entire dumpfile to one or more files
 * on one or more devices via DirectTCP, handling the work of spanning a
 * directtcp connection over multiple devices.  Note that this assumes the
 * devices support early EOM warning. */

/*
 * Xfer Dest Taper DirectTCP
 */

static GType xfer_dest_taper_directtcp_get_type(void);
#define XFER_DEST_TAPER_DIRECTTCP_TYPE (xfer_dest_taper_directtcp_get_type())
#define XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP)
#define XFER_DEST_TAPER_DIRECTTCP_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP const)
#define XFER_DEST_TAPER_DIRECTTCP_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
#define IS_XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_directtcp_get_type ())
#define XFER_DEST_TAPER_DIRECTTCP_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)

static GObjectClass *parent_class = NULL;

typedef struct XferDestTaperDirectTCP {
    XferDestTaper __parent__;

    /* constructor parameters */
    guint64 part_size; /* (bytes) */

    /* thread */
    GThread *worker_thread;

    /* state (governs everything below) */
    GMutex *state_mutex;

    /* part parameters */
    Device *volatile device; /* device to write to (refcounted) */
    dumpfile_t *volatile part_header;

    /* did the device listen proceed without error? */
    gboolean listen_ok;

    /* part number in progress */
    volatile guint64 partnum;

    /* connection we're writing to (refcounted) */
    DirectTCPConnection *conn;

    /* is the element paused, waiting to start a new part? this is set to FALSE
     * by the main thread to start a part, and the worker thread waits on the
     * corresponding condition variable. */
    volatile gboolean paused;
    GCond *paused_cond;
    GCond *abort_cond; /* condition to trigger to abort an NDMP command */

} XferDestTaperDirectTCP;

typedef struct {
    XferDestTaperClass __parent__;
} XferDestTaperDirectTCPClass;

/*
 * Debug logging
 */

#define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
static void
_xdt_dbg(const char *fmt, ...)
{
    va_list argp;
    char msg[1024];

    arglist_start(argp, fmt);
    g_vsnprintf(msg, sizeof(msg), fmt, argp);
    arglist_end(argp);
    g_debug("XDTD: %s", msg);
}
/*
 * Worker Thread
 */

static gpointer
worker_thread(
    gpointer data)
{
    XferElement *elt = (XferElement *)data;
    XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
    GTimer *timer = g_timer_new();
    int result;

    /* This thread's job is to accept() an incoming connection, then call
     * write_from_connection for each part, and then close the connection */

    /* If the device_listen failed, then we will soon be cancelled, so wait
     * for that to occur and then send XMSG_DONE */
    if (!self->listen_ok) {
	DBG(2, "listen failed; waiting for cancellation without attempting an accept");
	wait_until_xfer_cancelled(elt->xfer);
	goto send_xmsg_done;
    }

    g_mutex_lock(self->state_mutex);

    /* first, accept a new connection from the device */
    DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
    result = device_accept(self->device, &self->conn, &elt->cancelled,
			   self->state_mutex, self->abort_cond);
    if (result == 1 && !elt->cancelled) {
	xfer_cancel_with_error(XFER_ELEMENT(self),
	    "accepting DirectTCP connection: %s",
	    device_error_or_status(self->device));
	g_mutex_unlock(self->state_mutex);
	wait_until_xfer_cancelled(elt->xfer);
	goto send_xmsg_done;
    } else if (result == 2 || elt->cancelled) {
	g_mutex_unlock(self->state_mutex);
	wait_until_xfer_cancelled(elt->xfer);
	goto send_xmsg_done;
    }

    DBG(2, "connection accepted; sending XMSG_READY");
    xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));

    /* round the part size up to the next multiple of the block size */
    if (self->part_size) {
	self->part_size += self->device->block_size-1;
	self->part_size -= self->part_size % self->device->block_size;
    }

    /* now loop until we're out of parts */
    while (1) {
	guint64 size;
	int fileno;
	XMsg *msg = NULL;
	gboolean eom, eof;

	/* wait to be un-paused */
	while (!elt->cancelled && self->paused) {
	    DBG(9, "waiting to be un-paused");
	    g_cond_wait(self->paused_cond, self->state_mutex);
	}
	DBG(9, "worker_thread done waiting");

	if (elt->cancelled)
	    break;

	DBG(2, "writing part to %s", self->device->device_name);
	if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
	    /* this is not fatal to the transfer, since no data was lost.  We
	     * just need a new device.  The scribe special-cases 0-byte parts, and will
	     * not record this in the catalog. */

	    /* clean up */
	    dumpfile_free(self->part_header);
	    self->part_header = NULL;

	    goto empty_part;
	}

	dumpfile_free(self->part_header);
	self->part_header = NULL;

	fileno = self->device->file;
	g_assert(fileno > 0);

	/* write the part */
	g_timer_start(timer);
	result = device_write_from_connection(self->device,
		self->part_size, &size, &elt->cancelled,
		self->state_mutex, self->abort_cond);
	if (result == 1 && !elt->cancelled) {
	    /* even if this is just a physical EOM, we may have lost data, so
	     * the whole transfer is dead. */
	    xfer_cancel_with_error(XFER_ELEMENT(self),
		"Error writing from DirectTCP connection: %s",
		device_error_or_status(self->device));
	    device_finish_file(self->device);
	    goto cancelled;
	} else if (result == 2 || elt->cancelled) {
	    device_finish_file(self->device);
	    goto cancelled;
	}
	g_timer_stop(timer);

	eom = self->device->is_eom;
	eof = self->device->is_eof;

	/* finish the file, even if we're at EOM, but if this fails then we may
	 * have lost data */
	if (!device_finish_file(self->device)) {
	    xfer_cancel_with_error(XFER_ELEMENT(self),
		"Error finishing tape file: %s",
		device_error_or_status(self->device));
	    goto cancelled;
	}

	/* if we wrote zero bytes and reached EOM, then this is an empty part */
	if (eom && !eof && size == 0) {
	    goto empty_part;
	}

	msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
	msg->size = size;
	msg->duration = g_timer_elapsed(timer, NULL);
	msg->partnum = self->partnum;
	msg->fileno = fileno;
	msg->successful = TRUE;
	msg->eom = eom;
	msg->eof = eof;

	/* time runs backward on some test boxes, so make sure this is positive */
	if (msg->duration < 0) msg->duration = 0;

	xfer_queue_message(elt->xfer, msg);

        self->partnum++;

	/* we're done at EOF */
	if (eof)
	    break;

	/* wait to be unpaused again */
	self->paused = TRUE;
	continue;

empty_part:
	msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
	msg->size = 0;
	msg->duration = 0;
	msg->partnum = 0;
	msg->fileno = 0;
	msg->successful = TRUE;
	msg->eom = TRUE;
	msg->eof = FALSE;
	xfer_queue_message(elt->xfer, msg);

	/* wait to be unpaused again */
	self->paused = TRUE;
	continue;

cancelled:
	/* drop the mutex and wait until all elements have been cancelled
	 * before closing the connection */
	g_mutex_unlock(self->state_mutex);
	wait_until_xfer_cancelled(elt->xfer);
	g_mutex_lock(self->state_mutex);
	break;
    }

    /* close the DirectTCP connection */
    directtcp_connection_close(self->conn);
    g_object_unref(self->conn);
    self->conn = NULL;

    g_mutex_unlock(self->state_mutex);
    g_timer_destroy(timer);

send_xmsg_done:
    xfer_queue_message(elt->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

/*
 * Element mechanics
 */

static gboolean
setup_impl(
    XferElement *elt)
{
    XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;

    /* start the device listening, and get the addresses */
    if (!device_listen(self->device, TRUE, &elt->input_listen_addrs)) {
	elt->input_listen_addrs = NULL;
	xfer_cancel_with_error(XFER_ELEMENT(self),
	    "Error starting DirectTCP listen: %s",
	    device_error_or_status(self->device));
	self->listen_ok = FALSE;
	return FALSE;
    }

    self->listen_ok = TRUE;
    return TRUE;
}

static gboolean
start_impl(
    XferElement *elt)
{
    XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
    GError *error = NULL;

    self->paused = TRUE;

    /* start up the thread */
    self->worker_thread = g_thread_create(worker_thread, (gpointer)self, TRUE, &error);
    if (!self->worker_thread) {
	g_critical(_("Error creating new thread: %s (%s)"),
	    error->message, errno? strerror(errno) : _("no error code"));
    }

    return TRUE;
}

static gboolean
cancel_impl(
    XferElement *elt,
    gboolean expect_eof)
{
    XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
    gboolean rv;

    /* chain up first */
    rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);

    /* signal all of the condition variables to realize that we're no
     * longer paused */
    g_mutex_lock(self->state_mutex);
    g_cond_broadcast(self->paused_cond);
    g_cond_broadcast(self->abort_cond);
    g_mutex_unlock(self->state_mutex);

    return rv;
}

static void
start_part_impl(
    XferDestTaper *xdtself,
    gboolean retry_part,
    dumpfile_t *header)
{
    XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);

    /* the only way self->device can become NULL is if use_device fails, in
     * which case an error is already queued up, so just return silently */
    if (self->device == NULL)
	return;

    g_assert(!self->device->in_file);
    g_assert(header != NULL);

    DBG(1, "start_part(retry_part=%d)", retry_part);

    g_mutex_lock(self->state_mutex);
    g_assert(self->paused);

    if (self->part_header)
	dumpfile_free(self->part_header);
    self->part_header = dumpfile_copy(header);

    DBG(1, "unpausing");
    self->paused = FALSE;
    g_cond_broadcast(self->paused_cond);

    g_mutex_unlock(self->state_mutex);
}

static void
use_device_impl(
    XferDestTaper *xdtself,
    Device *device)
{
    XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);

    /* short-circuit if nothing is changing */
    if (self->device == device)
	return;

    g_mutex_lock(self->state_mutex);

    if (self->device)
	g_object_unref(self->device);
    self->device = NULL;

    /* if we already have a connection, then make this device use it */
    if (self->conn) {
	if (!device_use_connection(device, self->conn)) {
	    /* queue up an error for later, and leave the device NULL.
	     * start_part will see this and fail silently */
	    xfer_cancel_with_error(XFER_ELEMENT(self),
		_("Failed part was not cached; cannot retry"));
	    return;
	}
    }

    self->device = device;
    g_object_ref(device);

    g_mutex_unlock(self->state_mutex);
}

static guint64
get_part_bytes_written_impl(
    XferDestTaper *xdtself G_GNUC_UNUSED)
{
    /* This operation is not supported for this taper dest.  Maybe someday. */
    return 0;
}

static void
instance_init(
    XferElement *elt)
{
    XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
    elt->can_generate_eof = FALSE;

    self->worker_thread = NULL;
    self->paused = TRUE;
    self->conn = NULL;
    self->state_mutex = g_mutex_new();
    self->paused_cond = g_cond_new();
    self->abort_cond = g_cond_new();
}

static void
finalize_impl(
    GObject * obj_self)
{
    XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(obj_self);

    if (self->conn)
	g_object_unref(self->conn);
    self->conn = NULL;

    if (self->device)
	g_object_unref(self->device);
    self->device = NULL;

    if (self->device)
	g_object_unref(self->device);
    self->device = NULL;

    g_mutex_free(self->state_mutex);
    g_cond_free(self->paused_cond);
    g_cond_free(self->abort_cond);

    if (self->part_header)
	dumpfile_free(self->part_header);
    self->part_header = NULL;

    /* chain up */
    G_OBJECT_CLASS(parent_class)->finalize(obj_self);
}

static void
class_init(
    XferDestTaperDirectTCPClass * selfc)
{
    XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
    XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
    GObjectClass *goc = G_OBJECT_CLASS(selfc);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) }
    };

    klass->start = start_impl;
    klass->setup = setup_impl;
    klass->cancel = cancel_impl;
    xdt_klass->start_part = start_part_impl;
    xdt_klass->use_device = use_device_impl;
    xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
    goc->finalize = finalize_impl;

    klass->perl_class = "Amanda::Xfer::Dest::Taper::DirectTCP";
    klass->mech_pairs = mech_pairs;

    parent_class = g_type_class_peek_parent(selfc);
}

static GType
xfer_dest_taper_directtcp_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestTaperDirectTCPClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestTaperDirectTCP),
            0 /* n_preallocs */,
            (GInstanceInitFunc) instance_init,
            NULL
        };

        type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperDirectTCP", &info, 0);
    }

    return type;
}

/*
 * Constructor
 */

XferElement *
xfer_dest_taper_directtcp(Device *first_device, guint64 part_size)
{
    XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)g_object_new(XFER_DEST_TAPER_DIRECTTCP_TYPE, NULL);

    g_assert(device_directtcp_supported(first_device));

    self->part_size = part_size;
    self->device = first_device;
    self->partnum = 1;
    g_object_ref(self->device);

    return XFER_ELEMENT(self);
}