Blob Blame History Raw
/*
 * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 *
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
 */

/* An Xfer abstracts an active data transfer through the Amanda core.
 */

#ifndef XFER_H
#define XFER_H

#include <glib.h>

/* An Xfer represents a flow of data from a source, via zero or more filters,
 * to a destination.  Sources, filters, and destinations are "transfer elements".
 * The job of the Xfer is glue together a sequence of elements, and provide a
 * dispatch point for messages from those elements to the caller.
 *
 * Xfers are not implemented as GObjects because there is no reason to subclass an
 * Xfer or apply any of the other features that come with GObject.
 */

/* The moment-to-moment state of a transfer */
typedef enum {
    XFER_INIT = 1,	/* initializing */
    XFER_START = 2,	/* starting */
    XFER_RUNNING = 3,	/* data flowing */
    XFER_CANCELLING = 4,/* cancellation begun */
    XFER_CANCELLED = 5, /* all elements cancelled; draining data */
    XFER_DONE = 6,	/* data no longer flowing */
} xfer_status;

/* forward declarations */
struct XferElement;
struct XMsgSource;
struct XMsg;

/*
 * "Class" declaration
 */

typedef struct Xfer {
    /* The current status of this transfer.  This is read-only, and 
     * must only be accessed from the main thread or with status_mutex
     * held. */
    xfer_status status;

    /* lock this while checking status in a thread
     * other than the main thread */
    GMutex *status_mutex;

    /* and wait on this for status changes */
    GCond *status_cond;

    /* -- remaining fields are private -- */

    gint refcount;

    /* All transfer elements for this transfer, in order from
     * source to destination.  This is initialized when the Xfer is
     * created. */
    GPtrArray *elements;

    /* temporary string for a representation of this transfer */
    char *repr;

    /* GSource and queue for incoming messages */
    struct XMsgSource *msg_source;
    GAsyncQueue *queue;

    /* Number of active elements remaining (a.k.a. the number of
     * XMSG_DONE messages to expect) */
    gint num_active_elements;

    /* Used to coordinate handing off file descriptors among elements of this
     * xfer */
    GMutex *fd_mutex;

    int cancelled;
} Xfer;

/* Note that all functions must be called from the main thread unless
 * otherwise noted */

/* Create a new Xfer object, which should later be freed with xfref_free.
 *
 * This function adds a reference to each element.  The caller should
 * unreference the elements if it does not intend to use them directly.
 * The Xfer returned has a refcount of one.
 *
 * @param elements: array of pointers to transfer elements, in order from source
 *     to destination
 * @param nelements: length of 'elements'
 * @returns: new Xfer object
 */
Xfer *xfer_new(struct XferElement **elements, unsigned int nelements);

/* Increase the reference count of a transfer.
 *
 * @param xfer: the transfer
 */
void xfer_ref(Xfer *xfer);

/* Decrease the reference count of a transfer, possibly freeing it.  A running
 * transfer (state neither XFER_INIT nor XFER_DONE) will not be freed.
 *
 * @param xfer: the transfer
 */
void xfer_unref(Xfer *xfer);

/* Get a GSource which will produce events corresponding to messages from
 * this transfer.  This is a "peek" operation, so the reference count for the
 * GSource is not affected.  Note that the same GSource is returned on every
 * call for a particular transfer.
 *
 * @returns: GSource object
 */
GSource *xfer_get_source(Xfer *xfer);

/* Typedef for the callback to be set on the GSource returned from
 * xfer_get_source.
 */
typedef void (*XMsgCallback)(gpointer data, struct XMsg *msg, Xfer *xfer);

/* Queue a message for delivery via this transfer's GSource.  This can
 * be called in any thread.
 *
 * @param xfer: the transfer
 * @param msg: the message to queue
 */
void xfer_queue_message(Xfer *xfer, struct XMsg *msg);

/* Get a representation of this transfer.  The string belongs to the transfer, and
 * will be freed when the transfer is freed.
 *
 * @param xfer: the Xfer object
 * @returns: statically allocated string
 */
char *xfer_repr(Xfer *xfer);

/* Start a transfer.  This function will fail with an error message if it is
 * unable to set up the transfer (e.g., if the elements cannot be connected
 * correctly).
 *
 * @param xfer: the Xfer object
 * @param offset: the offset to start the transfer from (must be 0)
 * @param size: the Xfer object: the number of bytes to transfer.
 */
void xfer_start(Xfer *xfer, gint64 offset, gint64 size);

void xfer_set_offset_and_size(Xfer *xfer, gint64 offset, gint64 size);

/* Abort a running transfer.  This essentially tells the source to stop
 * producing data and allows the remainder of the transfer to "drain".  Thus
 * the transfer will signal its completion "normally" some time after
 * xfer_cancel returns.  In particular, the state transitions will occur
 * as follows:
 *
 * - XFER_RUNNING
 * - xfer_cancel()  (note state may still be XFER_RUNNING on return)
 * - XFER_CANCELLING
 * - (individual elements' cancel() methods are invoked)
 * - XFER_CANCELLED
 * - (data drains from the transfer)
 * - XFER_DONE
 *
 * This function can be called from any thread at any time.  It will return
 * without blocking.
 *
 * @param xfer: the Xfer object
 */
void xfer_cancel(Xfer *xfer);

/*
 * Utilities
 */

/* Wait for the xfer's state to become CANCELLED or DONE; this is useful to
 * wait until a cancelletion is in progress before returning an EOF or
 * otherwise handling a failure.  If you call this in the main thread, you'll
 * be waiting for a while.
 *
 * @param xfer: the transfer object
 * @returns: the new status (XFER_CANCELLED or XFER_DONE)
 */
xfer_status wait_until_xfer_cancelled(Xfer *xfer);

/* Wait for the xfer's state to become anything but START; this is
 * called *automatically* for every xfer_element_pull_buffer call, as the
 * upstream element may not be running and ready for a pull just yet.  But
 * the function may be useful in other places, too.
 *
 * @param xfer: the transfer object
 * @returns: the new status (XFER_CANCELLED or XFER_DONE)
 */
xfer_status wait_until_xfer_running(Xfer *xfer);

/* Send an XMSG_ERROR constructed with the given format and arguments, then
 * cancel the transfer, then wait until the transfer is completely cancelled.
 * This is the most common error-handling process for transfer elements.  All
 * that remains to be done on return is to branch to the appropriate point in
 * the cancellation-handling portion of the transfer.
 *
 * @param elt: the transfer element producing the error
 * @param fmt: the format for the error message
 * @param ...: arguments corresponding to the format
 */
void xfer_cancel_with_error(struct XferElement *elt, const char *fmt, ...)
	G_GNUC_PRINTF(2,3);

/* Return the fd in *FDP and set *FDP to NEWFD, all in one step.  The operation
 * is atomic with respect to all other such operations in this transfer, making
 * this a good way to "move" a file descriptor from one element to another.  If
 * xfer is NULL, the operation proceeds with no locking.
 *
 * @param xfer: the xfer within which this fd is used
 * @param fdp: pointer to the file descriptor to swap
 * @param newfd: the new value for *FDP
 * @returns: the previous contents of *fdp (may be -1)
 */
gint xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd);

#endif /* XFER_H */