/* * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved. * Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Contact information: Carbonite Inc., 756 N Pastoria Ave * Sunnyvale, CA 94085, or: http://www.zmanda.com */ /* An Xfer abstracts an active data transfer through the Amanda core. */ #ifndef XFER_H #define XFER_H #include /* An Xfer represents a flow of data from a source, via zero or more filters, * to a destination. Sources, filters, and destinations are "transfer elements". * The job of the Xfer is glue together a sequence of elements, and provide a * dispatch point for messages from those elements to the caller. * * Xfers are not implemented as GObjects because there is no reason to subclass an * Xfer or apply any of the other features that come with GObject. */ /* The moment-to-moment state of a transfer */ typedef enum { XFER_INIT = 1, /* initializing */ XFER_START = 2, /* starting */ XFER_RUNNING = 3, /* data flowing */ XFER_CANCELLING = 4,/* cancellation begun */ XFER_CANCELLED = 5, /* all elements cancelled; draining data */ XFER_DONE = 6, /* data no longer flowing */ } xfer_status; /* forward declarations */ struct XferElement; struct XMsgSource; struct XMsg; /* * "Class" declaration */ typedef struct Xfer { /* The current status of this transfer. This is read-only, and * must only be accessed from the main thread or with status_mutex * held. */ xfer_status status; /* lock this while checking status in a thread * other than the main thread */ GMutex *status_mutex; /* and wait on this for status changes */ GCond *status_cond; /* -- remaining fields are private -- */ gint refcount; /* All transfer elements for this transfer, in order from * source to destination. This is initialized when the Xfer is * created. */ GPtrArray *elements; /* temporary string for a representation of this transfer */ char *repr; /* GSource and queue for incoming messages */ struct XMsgSource *msg_source; GAsyncQueue *queue; /* Number of active elements remaining (a.k.a. the number of * XMSG_DONE messages to expect) */ gint num_active_elements; /* Used to coordinate handing off file descriptors among elements of this * xfer */ GMutex *fd_mutex; int cancelled; } Xfer; /* Note that all functions must be called from the main thread unless * otherwise noted */ /* Create a new Xfer object, which should later be freed with xfref_free. * * This function adds a reference to each element. The caller should * unreference the elements if it does not intend to use them directly. * The Xfer returned has a refcount of one. * * @param elements: array of pointers to transfer elements, in order from source * to destination * @param nelements: length of 'elements' * @returns: new Xfer object */ Xfer *xfer_new(struct XferElement **elements, unsigned int nelements); /* Increase the reference count of a transfer. * * @param xfer: the transfer */ void xfer_ref(Xfer *xfer); /* Decrease the reference count of a transfer, possibly freeing it. A running * transfer (state neither XFER_INIT nor XFER_DONE) will not be freed. * * @param xfer: the transfer */ void xfer_unref(Xfer *xfer); /* Get a GSource which will produce events corresponding to messages from * this transfer. This is a "peek" operation, so the reference count for the * GSource is not affected. Note that the same GSource is returned on every * call for a particular transfer. * * @returns: GSource object */ GSource *xfer_get_source(Xfer *xfer); /* Typedef for the callback to be set on the GSource returned from * xfer_get_source. */ typedef void (*XMsgCallback)(gpointer data, struct XMsg *msg, Xfer *xfer); /* Queue a message for delivery via this transfer's GSource. This can * be called in any thread. * * @param xfer: the transfer * @param msg: the message to queue */ void xfer_queue_message(Xfer *xfer, struct XMsg *msg); /* Get a representation of this transfer. The string belongs to the transfer, and * will be freed when the transfer is freed. * * @param xfer: the Xfer object * @returns: statically allocated string */ char *xfer_repr(Xfer *xfer); /* Start a transfer. This function will fail with an error message if it is * unable to set up the transfer (e.g., if the elements cannot be connected * correctly). * * @param xfer: the Xfer object * @param offset: the offset to start the transfer from (must be 0) * @param size: the Xfer object: the number of bytes to transfer. */ void xfer_start(Xfer *xfer, gint64 offset, gint64 size); void xfer_set_offset_and_size(Xfer *xfer, gint64 offset, gint64 size); /* Abort a running transfer. This essentially tells the source to stop * producing data and allows the remainder of the transfer to "drain". Thus * the transfer will signal its completion "normally" some time after * xfer_cancel returns. In particular, the state transitions will occur * as follows: * * - XFER_RUNNING * - xfer_cancel() (note state may still be XFER_RUNNING on return) * - XFER_CANCELLING * - (individual elements' cancel() methods are invoked) * - XFER_CANCELLED * - (data drains from the transfer) * - XFER_DONE * * This function can be called from any thread at any time. It will return * without blocking. * * @param xfer: the Xfer object */ void xfer_cancel(Xfer *xfer); /* * Utilities */ /* Wait for the xfer's state to become CANCELLED or DONE; this is useful to * wait until a cancelletion is in progress before returning an EOF or * otherwise handling a failure. If you call this in the main thread, you'll * be waiting for a while. * * @param xfer: the transfer object * @returns: the new status (XFER_CANCELLED or XFER_DONE) */ xfer_status wait_until_xfer_cancelled(Xfer *xfer); /* Wait for the xfer's state to become anything but START; this is * called *automatically* for every xfer_element_pull_buffer call, as the * upstream element may not be running and ready for a pull just yet. But * the function may be useful in other places, too. * * @param xfer: the transfer object * @returns: the new status (XFER_CANCELLED or XFER_DONE) */ xfer_status wait_until_xfer_running(Xfer *xfer); /* Send an XMSG_ERROR constructed with the given format and arguments, then * cancel the transfer, then wait until the transfer is completely cancelled. * This is the most common error-handling process for transfer elements. All * that remains to be done on return is to branch to the appropriate point in * the cancellation-handling portion of the transfer. * * @param elt: the transfer element producing the error * @param fmt: the format for the error message * @param ...: arguments corresponding to the format */ void xfer_cancel_with_error(struct XferElement *elt, const char *fmt, ...) G_GNUC_PRINTF(2,3); /* Return the fd in *FDP and set *FDP to NEWFD, all in one step. The operation * is atomic with respect to all other such operations in this transfer, making * this a good way to "move" a file descriptor from one element to another. If * xfer is NULL, the operation proceeds with no locking. * * @param xfer: the xfer within which this fd is used * @param fdp: pointer to the file descriptor to swap * @param newfd: the new value for *FDP * @returns: the previous contents of *fdp (may be -1) */ gint xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd); #endif /* XFER_H */