Blame xfer-src/xfer-element.c

Packit 23ab03
/*
Packit 23ab03
 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
Packit 23ab03
 * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
Packit 23ab03
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
Packit 23ab03
 *
Packit 23ab03
 * This program is free software; you can redistribute it and/or
Packit 23ab03
 * modify it under the terms of the GNU General Public License
Packit 23ab03
 * as published by the Free Software Foundation; either version 2
Packit 23ab03
 * of the License, or (at your option) any later version.
Packit 23ab03
 *
Packit 23ab03
 * This program is distributed in the hope that it will be useful, but
Packit 23ab03
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
Packit 23ab03
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
Packit 23ab03
 * for more details.
Packit 23ab03
 *
Packit 23ab03
 * You should have received a copy of the GNU General Public License along
Packit 23ab03
 * with this program; if not, write to the Free Software Foundation, Inc.,
Packit 23ab03
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
Packit 23ab03
 *
Packit 23ab03
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
Packit 23ab03
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
Packit 23ab03
 */
Packit 23ab03
Packit 23ab03
#include "amanda.h"
Packit 23ab03
#include "amxfer.h"
Packit 23ab03
Packit 23ab03
/* parent class for XferElement */
Packit 23ab03
static GObjectClass *parent_class = NULL;
Packit 23ab03
Packit 23ab03
/* parent class for XferDest, XferFilter, and XferSource */
Packit 23ab03
static XferElementClass *xfer_element_class = NULL;
Packit 23ab03
Packit 23ab03
/***********************
Packit 23ab03
 * XferElement */
Packit 23ab03
Packit 23ab03
static void
Packit 23ab03
xfer_element_init(
Packit 23ab03
    XferElement *xe)
Packit 23ab03
{
Packit 23ab03
    xe->xfer = NULL;
Packit 23ab03
    xe->output_mech = XFER_MECH_NONE;
Packit 23ab03
    xe->input_mech = XFER_MECH_NONE;
Packit 23ab03
    xe->upstream = xe->downstream = NULL;
Packit 23ab03
    xe->_input_fd = xe->_output_fd = -1;
Packit 23ab03
    xe->repr = NULL;
Packit 23ab03
    xe->must_drain = FALSE;
Packit 23ab03
    xe->cancel_on_success = FALSE;
Packit 23ab03
    xe->ignore_broken_pipe = FALSE;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static gboolean
Packit 23ab03
xfer_element_setup_impl(
Packit 23ab03
    XferElement *elt G_GNUC_UNUSED)
Packit 23ab03
{
Packit 23ab03
    return TRUE; /* success */
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static gboolean
Packit 23ab03
xfer_element_set_offset_impl(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gint64       offset)
Packit 23ab03
{
Packit 23ab03
Packit 23ab03
    elt->offset = offset;
Packit 23ab03
Packit 23ab03
    return TRUE; /* success */
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static gboolean
Packit 23ab03
xfer_element_set_size_impl(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gint64       size)
Packit 23ab03
{
Packit 23ab03
    elt->orig_size = size;
Packit 23ab03
    elt->size = size;
Packit 23ab03
Packit 23ab03
    return TRUE; /* success */
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static off_t
Packit 23ab03
xfer_element_get_offset_impl(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return elt->offset;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static off_t
Packit 23ab03
xfer_element_get_orig_size_impl(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return elt->orig_size;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static off_t
Packit 23ab03
xfer_element_get_size_impl(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return elt->size;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static size_t
Packit 23ab03
xfer_element_get_block_size_impl(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return elt->block_size;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static gboolean
Packit 23ab03
xfer_element_start_impl(
Packit 23ab03
    XferElement *elt G_GNUC_UNUSED)
Packit 23ab03
{
Packit 23ab03
    return FALSE; /* will not send XMSG_DONE */
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static gboolean
Packit 23ab03
xfer_element_cancel_impl(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gboolean expect_eof)
Packit 23ab03
{
Packit 23ab03
    elt->cancelled = TRUE;
Packit 23ab03
    elt->expect_eof = expect_eof;
Packit 23ab03
    return elt->can_generate_eof;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static gpointer
Packit 23ab03
xfer_element_pull_buffer_impl(
Packit 23ab03
    XferElement *elt G_GNUC_UNUSED,
Packit 23ab03
    size_t *size G_GNUC_UNUSED)
Packit 23ab03
{
Packit 23ab03
    *size = 0;
Packit 23ab03
    return NULL;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static gpointer
Packit 23ab03
xfer_element_pull_buffer_static_impl(
Packit 23ab03
    XferElement *elt G_GNUC_UNUSED,
Packit 23ab03
    gpointer buf G_GNUC_UNUSED,
Packit 23ab03
    size_t block_size G_GNUC_UNUSED,
Packit 23ab03
    size_t *size G_GNUC_UNUSED)
Packit 23ab03
{
Packit 23ab03
    *size = 0;
Packit 23ab03
    return NULL;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static void
Packit 23ab03
xfer_element_push_buffer_impl(
Packit 23ab03
    XferElement *elt G_GNUC_UNUSED,
Packit 23ab03
    gpointer buf G_GNUC_UNUSED,
Packit 23ab03
    size_t size G_GNUC_UNUSED)
Packit 23ab03
{
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static void
Packit 23ab03
xfer_element_push_buffer_static_impl(
Packit 23ab03
    XferElement *elt G_GNUC_UNUSED,
Packit 23ab03
    gpointer buf G_GNUC_UNUSED,
Packit 23ab03
    size_t size G_GNUC_UNUSED)
Packit 23ab03
{
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static xfer_element_mech_pair_t *
Packit 23ab03
xfer_element_get_mech_pairs_impl(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->mech_pairs;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static char *
Packit 23ab03
xfer_element_repr_impl(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    if (!elt->repr) {
Packit 23ab03
        g_free(elt->repr);
Packit 23ab03
        elt->repr = g_strdup_printf("<%s@%p>",
Packit 23ab03
                                    G_OBJECT_TYPE_NAME(G_OBJECT(elt)), elt);
Packit 23ab03
    }
Packit 23ab03
Packit 23ab03
    return elt->repr;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static void
Packit 23ab03
xfer_element_finalize(
Packit 23ab03
    GObject * obj_self)
Packit 23ab03
{
Packit 23ab03
    XferElement *elt = XFER_ELEMENT(obj_self);
Packit 23ab03
    gint fd;
Packit 23ab03
Packit 23ab03
    /* free the repr cache */
Packit 23ab03
    if (elt->repr) g_free(elt->repr);
Packit 23ab03
Packit 23ab03
    /* close up the input/output file descriptors, being careful to do so
Packit 23ab03
     * atomically, and making any errors doing so into mere warnings */
Packit 23ab03
    fd = xfer_element_swap_input_fd(elt, -1);
Packit 23ab03
    if (fd != -1 && close(fd) != 0)
Packit 23ab03
	g_warning("error closing fd %d: %s", fd, strerror(errno));
Packit 23ab03
    fd = xfer_element_swap_output_fd(elt, -1);
Packit 23ab03
    if (fd != -1 && close(fd) != 0)
Packit 23ab03
	g_warning("error closing fd %d: %s", fd, strerror(errno));
Packit 23ab03
Packit 23ab03
    /* chain up */
Packit 23ab03
    G_OBJECT_CLASS(parent_class)->finalize(obj_self);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
static void
Packit 23ab03
xfer_element_class_init(
Packit 23ab03
    XferElementClass * klass)
Packit 23ab03
{
Packit 23ab03
    GObjectClass *goc = (GObjectClass*) klass;
Packit 23ab03
Packit 23ab03
    klass->repr = xfer_element_repr_impl;
Packit 23ab03
    klass->setup = xfer_element_setup_impl;
Packit 23ab03
    klass->set_offset = xfer_element_set_offset_impl;
Packit 23ab03
    klass->set_size = xfer_element_set_size_impl;
Packit 23ab03
    klass->get_offset = xfer_element_get_offset_impl;
Packit 23ab03
    klass->get_orig_size = xfer_element_get_orig_size_impl;
Packit 23ab03
    klass->get_size = xfer_element_get_size_impl;
Packit 23ab03
    klass->get_block_size = xfer_element_get_block_size_impl;
Packit 23ab03
    klass->start = xfer_element_start_impl;
Packit 23ab03
    klass->cancel = xfer_element_cancel_impl;
Packit 23ab03
    klass->pull_buffer = xfer_element_pull_buffer_impl;
Packit 23ab03
    klass->pull_buffer_static = xfer_element_pull_buffer_static_impl;
Packit 23ab03
    klass->push_buffer = xfer_element_push_buffer_impl;
Packit 23ab03
    klass->push_buffer_static = xfer_element_push_buffer_static_impl;
Packit 23ab03
    klass->get_mech_pairs = xfer_element_get_mech_pairs_impl;
Packit 23ab03
Packit 23ab03
    goc->finalize = xfer_element_finalize;
Packit 23ab03
Packit 23ab03
    klass->perl_class = NULL;
Packit 23ab03
Packit 23ab03
    parent_class = g_type_class_peek_parent(klass);
Packit 23ab03
    xfer_element_class = klass;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
GType
Packit 23ab03
xfer_element_get_type(void)
Packit 23ab03
{
Packit 23ab03
    static GType type = 0;
Packit 23ab03
Packit 23ab03
    if (G_UNLIKELY(type == 0)) {
Packit 23ab03
        static const GTypeInfo info = {
Packit 23ab03
            sizeof (XferElementClass),
Packit 23ab03
            (GBaseInitFunc) NULL,
Packit 23ab03
            (GBaseFinalizeFunc) NULL,
Packit 23ab03
            (GClassInitFunc) xfer_element_class_init,
Packit 23ab03
            (GClassFinalizeFunc) NULL,
Packit 23ab03
            NULL /* class_data */,
Packit 23ab03
            sizeof (XferElement),
Packit 23ab03
            0 /* n_preallocs */,
Packit 23ab03
            (GInstanceInitFunc) xfer_element_init,
Packit 23ab03
            NULL
Packit 23ab03
        };
Packit 23ab03
Packit 23ab03
        type = g_type_register_static (G_TYPE_OBJECT, "XferElement", &info,
Packit 23ab03
                                       (GTypeFlags)G_TYPE_FLAG_ABSTRACT);
Packit 23ab03
    }
Packit 23ab03
Packit 23ab03
    return type;
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
/*
Packit 23ab03
 * Method stubs
Packit 23ab03
 */
Packit 23ab03
Packit 23ab03
void
Packit 23ab03
xfer_element_unref(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    if (elt) g_object_unref(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
char *
Packit 23ab03
xfer_element_repr(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->repr(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
gboolean
Packit 23ab03
xfer_element_setup(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->setup(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
gboolean
Packit 23ab03
xfer_element_set_offset(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gint64       offset)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->set_offset(elt, offset);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
gboolean
Packit 23ab03
xfer_element_set_size(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gint64       size)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->set_size(elt, size);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
off_t
Packit 23ab03
xfer_element_get_offset(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->get_offset(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
off_t
Packit 23ab03
xfer_element_get_orig_size(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->get_orig_size(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
off_t
Packit 23ab03
xfer_element_get_size(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->get_size(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
size_t
Packit 23ab03
xfer_element_get_block_size(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->get_block_size(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
gboolean
Packit 23ab03
xfer_element_start(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->start(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
gboolean
Packit 23ab03
xfer_element_cancel(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gboolean expect_eof)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->cancel(elt, expect_eof);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
gpointer
Packit 23ab03
xfer_element_pull_buffer(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    size_t *size)
Packit 23ab03
{
Packit 23ab03
    xfer_status status;
Packit 23ab03
    /* Make sure that the xfer is running before calling upstream's
Packit 23ab03
     * pull_buffer method; this avoids a race condition where upstream
Packit 23ab03
     * hasn't finished its xfer_element_start yet, and isn't ready for
Packit 23ab03
     * a pull */
Packit 23ab03
    g_mutex_lock(elt->xfer->status_mutex);
Packit 23ab03
    status = elt->xfer->status;
Packit 23ab03
    g_mutex_unlock(elt->xfer->status_mutex);
Packit 23ab03
    if (status == XFER_START)
Packit 23ab03
	wait_until_xfer_running(elt->xfer);
Packit 23ab03
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->pull_buffer(elt, size);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
gpointer
Packit 23ab03
xfer_element_pull_buffer_static(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gpointer buf,
Packit 23ab03
    size_t block_size,
Packit 23ab03
    size_t *size)
Packit 23ab03
{
Packit 23ab03
    xfer_status status;
Packit 23ab03
    /* Make sure that the xfer is running before calling upstream's
Packit 23ab03
     * pull_bufferi_static method; this avoids a race condition where upstream
Packit 23ab03
     * hasn't finished its xfer_element_start yet, and isn't ready for
Packit 23ab03
     * a pull */
Packit 23ab03
    g_mutex_lock(elt->xfer->status_mutex);
Packit 23ab03
    status = elt->xfer->status;
Packit 23ab03
    g_mutex_unlock(elt->xfer->status_mutex);
Packit 23ab03
    if (status == XFER_START)
Packit 23ab03
	wait_until_xfer_running(elt->xfer);
Packit 23ab03
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->pull_buffer_static(elt, buf, block_size, size);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
void
Packit 23ab03
xfer_element_push_buffer(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gpointer buf,
Packit 23ab03
    size_t size)
Packit 23ab03
{
Packit 23ab03
    /* There is no race condition with push_buffer, because downstream
Packit 23ab03
     * elements are started first. */
Packit 23ab03
    XFER_ELEMENT_GET_CLASS(elt)->push_buffer(elt, buf, size);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
void
Packit 23ab03
xfer_element_push_buffer_static(
Packit 23ab03
    XferElement *elt,
Packit 23ab03
    gpointer buf,
Packit 23ab03
    size_t size)
Packit 23ab03
{
Packit 23ab03
    /* There is no race condition with push_buffer, because downstream
Packit 23ab03
     * elements are started first. */
Packit 23ab03
    XFER_ELEMENT_GET_CLASS(elt)->push_buffer_static(elt, buf, size);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
xfer_element_mech_pair_t *
Packit 23ab03
xfer_element_get_mech_pairs(
Packit 23ab03
	XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->get_mech_pairs(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
/****
Packit 23ab03
 * Utilities
Packit 23ab03
 */
Packit 23ab03
Packit 23ab03
void
Packit 23ab03
xfer_element_drain_buffers(
Packit 23ab03
    XferElement *upstream)
Packit 23ab03
{
Packit 23ab03
    gpointer buf;
Packit 23ab03
    size_t size;
Packit 23ab03
Packit 23ab03
    while ((buf =xfer_element_pull_buffer(upstream, &size))) {
Packit 23ab03
	amfree(buf);
Packit 23ab03
    }
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
void
Packit 23ab03
xfer_element_drain_fd(
Packit 23ab03
    int fd)
Packit 23ab03
{
Packit 23ab03
    size_t len;
Packit 23ab03
    char buf[1024];
Packit 23ab03
Packit 23ab03
    while (1) {
Packit 23ab03
	len = read_fully(fd, buf, sizeof(buf), NULL);
Packit 23ab03
	if (len < sizeof(buf))
Packit 23ab03
	    return;
Packit 23ab03
    }
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
mem_ring_t *
Packit 23ab03
xfer_element_get_mem_ring(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    return XFER_ELEMENT_GET_CLASS(elt)->get_mem_ring(elt);
Packit 23ab03
}
Packit 23ab03
Packit 23ab03
shm_ring_t *
Packit 23ab03
xfer_element_get_shm_ring(
Packit 23ab03
    XferElement *elt)
Packit 23ab03
{
Packit 23ab03
    if (elt->shm_ring) {
Packit 23ab03
	return elt->shm_ring;
Packit 23ab03
    } else if (elt->downstream) {
Packit 23ab03
	return xfer_element_get_shm_ring(elt->downstream);
Packit 23ab03
    } else {
Packit 23ab03
	return NULL;
Packit 23ab03
    }
Packit 23ab03
}
Packit 23ab03