|
Packit Service |
392537 |
/*
|
|
Packit Service |
392537 |
* Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
|
|
Packit Service |
392537 |
* Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
|
|
Packit Service |
392537 |
*
|
|
Packit Service |
392537 |
* This program is free software; you can redistribute it and/or
|
|
Packit Service |
392537 |
* modify it under the terms of the GNU General Public License
|
|
Packit Service |
392537 |
* as published by the Free Software Foundation; either version 2
|
|
Packit Service |
392537 |
* of the License, or (at your option) any later version.
|
|
Packit Service |
392537 |
*
|
|
Packit Service |
392537 |
* This program is distributed in the hope that it will be useful, but
|
|
Packit Service |
392537 |
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
|
Packit Service |
392537 |
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
Packit Service |
392537 |
* for more details.
|
|
Packit Service |
392537 |
*
|
|
Packit Service |
392537 |
* You should have received a copy of the GNU General Public License along
|
|
Packit Service |
392537 |
* with this program; if not, write to the Free Software Foundation, Inc.,
|
|
Packit Service |
392537 |
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
Packit Service |
392537 |
*
|
|
Packit Service |
392537 |
* Contact information: Carbonite Inc., 756 N Pastoria Ave
|
|
Packit Service |
392537 |
* Sunnyvale, CA 94085, or: http://www.zmanda.com
|
|
Packit Service |
392537 |
*/
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
#include "amanda.h"
|
|
Packit Service |
392537 |
#include "amxfer.h"
|
|
Packit Service |
392537 |
#include "element-glue.h"
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* XMsgSource objects are GSource "subclasses" which manage
|
|
Packit Service |
392537 |
* a queue of messages, delivering those messages via callback
|
|
Packit Service |
392537 |
* in the mainloop. Messages can be *sent* from any thread without
|
|
Packit Service |
392537 |
* any concern for locking, but must only be received in the main
|
|
Packit Service |
392537 |
* thread, in the default GMainContext.
|
|
Packit Service |
392537 |
*
|
|
Packit Service |
392537 |
* An XMsgSource pointer can be cast to a GSource pointer as
|
|
Packit Service |
392537 |
* necessary.
|
|
Packit Service |
392537 |
*/
|
|
Packit Service |
392537 |
typedef struct XMsgSource {
|
|
Packit Service |
392537 |
GSource source; /* must be the first element of the struct */
|
|
Packit Service |
392537 |
Xfer *xfer;
|
|
Packit Service |
392537 |
} XMsgSource;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* forward prototypes */
|
|
Packit Service |
392537 |
static void xfer_set_status(Xfer *xfer, xfer_status status);
|
|
Packit Service |
392537 |
static XMsgSource *xmsgsource_new(Xfer *xfer);
|
|
Packit Service |
392537 |
static void link_elements(Xfer *xfer);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
Xfer *
|
|
Packit Service |
392537 |
xfer_new(
|
|
Packit Service |
392537 |
XferElement **elements,
|
|
Packit Service |
392537 |
unsigned int nelements)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
Xfer *xfer = g_new0(Xfer, 1);
|
|
Packit Service |
392537 |
unsigned int i;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_assert(elements);
|
|
Packit Service |
392537 |
g_assert(nelements >= 2);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
xfer->status = XFER_INIT;
|
|
Packit Service |
392537 |
xfer->status_mutex = g_mutex_new();
|
|
Packit Service |
392537 |
xfer->status_cond = g_cond_new();
|
|
Packit Service |
392537 |
xfer->fd_mutex = g_mutex_new();
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
xfer->refcount = 1;
|
|
Packit Service |
392537 |
xfer->repr = NULL;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Create our message source and corresponding queue */
|
|
Packit Service |
392537 |
xfer->msg_source = xmsgsource_new(xfer);
|
|
Packit Service |
392537 |
xfer->queue = g_async_queue_new();
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* copy the elements in, verifying that they're all XferElement objects */
|
|
Packit Service |
392537 |
xfer->elements = g_ptr_array_sized_new(nelements);
|
|
Packit Service |
392537 |
for (i = 0; i < nelements; i++) {
|
|
Packit Service |
392537 |
g_assert(elements[i] != NULL);
|
|
Packit Service |
392537 |
g_assert(IS_XFER_ELEMENT(elements[i]));
|
|
Packit Service |
392537 |
g_assert(elements[i]->xfer == NULL);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_ptr_array_add(xfer->elements, (gpointer)elements[i]);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_object_ref(elements[i]);
|
|
Packit Service |
392537 |
elements[i]->xfer = xfer;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
return xfer;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
void
|
|
Packit Service |
392537 |
xfer_ref(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
++xfer->refcount;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
void
|
|
Packit Service |
392537 |
xfer_unref(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
unsigned int i;
|
|
Packit Service |
392537 |
XMsg *msg;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (!xfer) return; /* be friendly to NULLs */
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (--xfer->refcount > 0) return;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_assert(xfer != NULL);
|
|
Packit Service |
392537 |
g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Divorce ourselves from the message source */
|
|
Packit Service |
392537 |
xfer->msg_source->xfer = NULL;
|
|
Packit Service |
392537 |
g_source_unref((GSource *)xfer->msg_source);
|
|
Packit Service |
392537 |
xfer->msg_source = NULL;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Try to empty the message queue */
|
|
Packit Service |
392537 |
while ((msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
|
|
Packit Service |
392537 |
g_warning("Dropping XMsg from %s because the XMsgSource is being destroyed",
|
|
Packit Service |
392537 |
xfer_element_repr(msg->elt));
|
|
Packit Service |
392537 |
g_debug("MSG: %s", xmsg_repr(msg));
|
|
Packit Service |
392537 |
xmsg_free(msg);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
g_async_queue_unref(xfer->queue);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_mutex_free(xfer->status_mutex);
|
|
Packit Service |
392537 |
g_cond_free(xfer->status_cond);
|
|
Packit Service |
392537 |
g_mutex_free(xfer->fd_mutex);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Free our references to the elements, and also set the 'xfer'
|
|
Packit Service |
392537 |
* attribute of each to NULL, making them "unattached" (although
|
|
Packit Service |
392537 |
* subsequent reuse of elements is untested). */
|
|
Packit Service |
392537 |
for (i = 0; i < xfer->elements->len; i++) {
|
|
Packit Service |
392537 |
XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
elt->xfer = NULL;
|
|
Packit Service |
392537 |
g_object_unref(elt);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
g_ptr_array_free(xfer->elements, TRUE);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (xfer->repr)
|
|
Packit Service |
392537 |
g_free(xfer->repr);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_free(xfer);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
GSource *
|
|
Packit Service |
392537 |
xfer_get_source(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
return (GSource *)xfer->msg_source;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
void
|
|
Packit Service |
392537 |
xfer_queue_message(
|
|
Packit Service |
392537 |
Xfer *xfer,
|
|
Packit Service |
392537 |
XMsg *msg)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
g_assert(xfer != NULL);
|
|
Packit Service |
392537 |
g_assert(msg != NULL);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_debug("xfer_queue_message: MSG: %s", xmsg_repr(msg));
|
|
Packit Service |
392537 |
g_async_queue_push(xfer->queue, (gpointer)msg);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* TODO: don't do this if we're in the main thread */
|
|
Packit Service |
392537 |
g_main_context_wakeup(NULL);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
char *
|
|
Packit Service |
392537 |
xfer_repr(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
char *tmpbuf;
|
|
Packit Service |
392537 |
unsigned int i;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (!xfer->repr) {
|
|
Packit Service |
392537 |
g_free(xfer->repr);
|
|
Packit Service |
392537 |
xfer->repr = g_strdup_printf("
|
|
Packit Service |
392537 |
for (i = 0; i < xfer->elements->len; i++) {
|
|
Packit Service |
392537 |
XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
|
|
Packit Service |
392537 |
tmpbuf = g_strconcat(xfer->repr, (i==0)?"":" -> ", xfer_element_repr(elt), NULL);
|
|
Packit Service |
392537 |
g_free(xfer->repr);
|
|
Packit Service |
392537 |
xfer->repr = tmpbuf;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
tmpbuf = g_strconcat(xfer->repr, ")>", NULL);
|
|
Packit Service |
392537 |
g_free(xfer->repr);
|
|
Packit Service |
392537 |
xfer->repr = tmpbuf;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
return xfer->repr;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
void
|
|
Packit Service |
392537 |
xfer_start(
|
|
Packit Service |
392537 |
Xfer *xfer,
|
|
Packit Service |
392537 |
gint64 offset,
|
|
Packit Service |
392537 |
gint64 size)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
unsigned int len;
|
|
Packit Service |
392537 |
unsigned int i;
|
|
Packit Service |
392537 |
gboolean setup_ok;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_assert(xfer != NULL);
|
|
Packit Service |
392537 |
g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
|
|
Packit Service |
392537 |
g_assert(xfer->elements->len >= 2);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_debug("Starting %s", xfer_repr(xfer));
|
|
Packit Service |
392537 |
/* set the status to XFER_START and add a reference to our count, so that
|
|
Packit Service |
392537 |
* we are not freed while still in operation. We'll drop this reference
|
|
Packit Service |
392537 |
* when the status becomes XFER_DONE. */
|
|
Packit Service |
392537 |
xfer_ref(xfer);
|
|
Packit Service |
392537 |
xfer->num_active_elements = 0;
|
|
Packit Service |
392537 |
xfer_set_status(xfer, XFER_START);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Link the elements. This calls error() on failure, and rewrites
|
|
Packit Service |
392537 |
* xfer->elements */
|
|
Packit Service |
392537 |
link_elements(xfer);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Tell all elements to set up. This is done before upstream and downstream
|
|
Packit Service |
392537 |
* are set so that elements cannot interfere with one another before setup()
|
|
Packit Service |
392537 |
* is completed. */
|
|
Packit Service |
392537 |
setup_ok = TRUE;
|
|
Packit Service |
392537 |
for (i = 0; i < xfer->elements->len; i++) {
|
|
Packit Service |
392537 |
XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
|
|
Packit Service |
392537 |
if (!xfer_element_setup(xe)) {
|
|
Packit Service |
392537 |
setup_ok = FALSE;
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* If setup_ok is false, then there is an XMSG_CANCEL in the message queue
|
|
Packit Service |
392537 |
* already, so skip calling start for any of the elements and send an
|
|
Packit Service |
392537 |
* XMSG_DONE, since none of the elements will do so. */
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (setup_ok) {
|
|
Packit Service |
392537 |
/* Set the upstream and downstream links between elements */
|
|
Packit Service |
392537 |
len = xfer->elements->len;
|
|
Packit Service |
392537 |
for (i = 0; i < len; i++) {
|
|
Packit Service |
392537 |
XferElement *elt = g_ptr_array_index(xfer->elements, i);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (i > 0)
|
|
Packit Service |
392537 |
elt->upstream = g_ptr_array_index(xfer->elements, i-1);
|
|
Packit Service |
392537 |
if (i < len-1)
|
|
Packit Service |
392537 |
elt->downstream = g_ptr_array_index(xfer->elements, i+1);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Set offset and size for first element */
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
|
|
Packit Service |
392537 |
xfer_element_set_offset(xe, offset);
|
|
Packit Service |
392537 |
xfer_element_set_size(xe, size);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* now tell them all to start, in order from destination to source */
|
|
Packit Service |
392537 |
for (i = xfer->elements->len; i >= 1; i--) {
|
|
Packit Service |
392537 |
XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
|
|
Packit Service |
392537 |
if (xfer_element_start(xe))
|
|
Packit Service |
392537 |
xfer->num_active_elements++;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* (note that status can only change in the main thread, so we can be
|
|
Packit Service |
392537 |
* certain that the status is still XFER_START and we have not yet been
|
|
Packit Service |
392537 |
* cancelled. We may have an XMSG_CANCEL already queued up for us, though) */
|
|
Packit Service |
392537 |
xfer_set_status(xfer, XFER_RUNNING);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* If this transfer involves no active processing, then we consider it to
|
|
Packit Service |
392537 |
* be done already. We send a "fake" XMSG_DONE from the destination element,
|
|
Packit Service |
392537 |
* so that all of the usual processing will take place. */
|
|
Packit Service |
392537 |
if (xfer->num_active_elements == 0) {
|
|
Packit Service |
392537 |
if (setup_ok)
|
|
Packit Service |
392537 |
g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
|
|
Packit Service |
392537 |
xfer->num_active_elements++;
|
|
Packit Service |
392537 |
xfer_queue_message(xfer,
|
|
Packit Service |
392537 |
xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
|
|
Packit Service |
392537 |
XMSG_DONE, 0));
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
void
|
|
Packit Service |
392537 |
xfer_set_offset_and_size(
|
|
Packit Service |
392537 |
Xfer *xfer,
|
|
Packit Service |
392537 |
gint64 offset,
|
|
Packit Service |
392537 |
gint64 size)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
/* Set offset for first element */
|
|
Packit Service |
392537 |
XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
|
|
Packit Service |
392537 |
xfer_element_set_offset(xe, offset);
|
|
Packit Service |
392537 |
xfer_element_set_size(xe, size);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
void
|
|
Packit Service |
392537 |
xfer_cancel(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
/* Since xfer_cancel can be called from any thread, we just send a message.
|
|
Packit Service |
392537 |
* The action takes place when the message is received. */
|
|
Packit Service |
392537 |
XferElement *src;
|
|
Packit Service |
392537 |
if (xfer->cancelled > 0) return;
|
|
Packit Service |
392537 |
xfer->cancelled++;
|
|
Packit Service |
392537 |
src = g_ptr_array_index(xfer->elements, 0);
|
|
Packit Service |
392537 |
xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
static void
|
|
Packit Service |
392537 |
xfer_set_status(
|
|
Packit Service |
392537 |
Xfer *xfer,
|
|
Packit Service |
392537 |
xfer_status status)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
if (xfer->status == status) return;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_mutex_lock(xfer->status_mutex);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* check that this state transition is valid */
|
|
Packit Service |
392537 |
switch (status) {
|
|
Packit Service |
392537 |
case XFER_START:
|
|
Packit Service |
392537 |
g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
case XFER_RUNNING:
|
|
Packit Service |
392537 |
g_assert(xfer->status == XFER_START);
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
case XFER_CANCELLING:
|
|
Packit Service |
392537 |
g_assert(xfer->status == XFER_RUNNING);
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
case XFER_CANCELLED:
|
|
Packit Service |
392537 |
g_assert(xfer->status == XFER_CANCELLING);
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
case XFER_DONE:
|
|
Packit Service |
392537 |
g_assert(xfer->status == XFER_CANCELLED || xfer->status == XFER_RUNNING);
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
case XFER_INIT:
|
|
Packit Service |
392537 |
default:
|
|
Packit Service |
392537 |
g_assert_not_reached();
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
xfer->status = status;
|
|
Packit Service |
392537 |
g_cond_broadcast(xfer->status_cond);
|
|
Packit Service |
392537 |
g_mutex_unlock(xfer->status_mutex);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/*
|
|
Packit Service |
392537 |
* Element linking
|
|
Packit Service |
392537 |
*/
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* How is ELT linked? link_recurse uses an array of these to track its progress
|
|
Packit Service |
392537 |
* and find the optimal overall linkage. */
|
|
Packit Service |
392537 |
typedef struct linkage {
|
|
Packit Service |
392537 |
XferElement *elt;
|
|
Packit Service |
392537 |
xfer_element_mech_pair_t *mech_pairs;
|
|
Packit Service |
392537 |
int elt_idx; /* index into elt's mech_pairs */
|
|
Packit Service |
392537 |
int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
|
|
Packit Service |
392537 |
} linkage;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Overall state of the recursive linking process */
|
|
Packit Service |
392537 |
typedef struct linking_state {
|
|
Packit Service |
392537 |
int nlinks; /* number of linkage objects in each array */
|
|
Packit Service |
392537 |
linkage *cur; /* "current" linkage */
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
linkage *best; /* best linkage so far */
|
|
Packit Service |
392537 |
gint32 best_cost; /* cost for best */
|
|
Packit Service |
392537 |
} linking_state;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* used for debugging messages */
|
|
Packit Service |
392537 |
static char *
|
|
Packit Service |
392537 |
xfer_mech_name(
|
|
Packit Service |
392537 |
xfer_mech mech)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
switch (mech) {
|
|
Packit Service |
392537 |
case XFER_MECH_NONE: return "NONE";
|
|
Packit Service |
392537 |
case XFER_MECH_READFD: return "READFD";
|
|
Packit Service |
392537 |
case XFER_MECH_WRITEFD: return "WRITEFD";
|
|
Packit Service |
392537 |
case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
|
|
Packit Service |
392537 |
case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
|
|
Packit Service |
392537 |
case XFER_MECH_PULL_BUFFER_STATIC: return "PULL_BUFFER_STATIC";
|
|
Packit Service |
392537 |
case XFER_MECH_PUSH_BUFFER_STATIC: return "PUSH_BUFFER_STATIC";
|
|
Packit Service |
392537 |
case XFER_MECH_DIRECTTCP_LISTEN: return "DIRECTTCP_LISTEN";
|
|
Packit Service |
392537 |
case XFER_MECH_DIRECTTCP_CONNECT: return "DIRECTTCP_CONNECT";
|
|
Packit Service |
392537 |
case XFER_MECH_MEM_RING: return "MEM_RING";
|
|
Packit Service |
392537 |
case XFER_MECH_SHM_RING: return "SHM_RING";
|
|
Packit Service |
392537 |
default: return "UNKNOWN";
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* calculate an integer representing the cost of a mech pair as a
|
|
Packit Service |
392537 |
* single integer. OPS_PER_BYTE is the most important metric,
|
|
Packit Service |
392537 |
* followed by NTHREADS.
|
|
Packit Service |
392537 |
*
|
|
Packit Service |
392537 |
* PAIR will be evaluated multiple times.
|
|
Packit Service |
392537 |
*/
|
|
Packit Service |
392537 |
#define PAIR_COST(pair) (((pair).ops_per_byte << 16) + ((pair).nalloc << 8) + (pair).nthreads)
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* maximum cost */
|
|
Packit Service |
392537 |
#define MAX_COST 0xffffff
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Generate all possible linkages of elements [idx:nlinks], where
|
|
Packit Service |
392537 |
* elements [0:idx-1] have cost 'cost' and end with mechanism
|
|
Packit Service |
392537 |
* 'input_mech'. */
|
|
Packit Service |
392537 |
static void
|
|
Packit Service |
392537 |
link_recurse(
|
|
Packit Service |
392537 |
linking_state *st,
|
|
Packit Service |
392537 |
int idx,
|
|
Packit Service |
392537 |
xfer_mech input_mech,
|
|
Packit Service |
392537 |
gint32 cost)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
xfer_element_mech_pair_t *elt_pairs, *glue_pairs;
|
|
Packit Service |
392537 |
linkage *my;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* if we've overrun the previous best cost already, then bail out */
|
|
Packit Service |
392537 |
if (cost >= st->best_cost)
|
|
Packit Service |
392537 |
return;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* have we linked everything? */
|
|
Packit Service |
392537 |
if (idx == st->nlinks) {
|
|
Packit Service |
392537 |
/* if we ended on other than XFER_MECH_NONE, then this is not a
|
|
Packit Service |
392537 |
* valid transfer */
|
|
Packit Service |
392537 |
if (input_mech != XFER_MECH_NONE) return;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* we already know this has lower cost than the previous best */
|
|
Packit Service |
392537 |
memcpy(st->best, st->cur, st->nlinks * sizeof(linkage));
|
|
Packit Service |
392537 |
st->best_cost = cost;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
return;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* recurse for each linkage we can make that starts with input_mech */
|
|
Packit Service |
392537 |
my = &st->cur[idx];
|
|
Packit Service |
392537 |
elt_pairs = my->mech_pairs;
|
|
Packit Service |
392537 |
glue_pairs = xfer_element_glue_mech_pairs;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
for (my->elt_idx = 0;
|
|
Packit Service |
392537 |
elt_pairs[my->elt_idx].input_mech != XFER_MECH_NONE
|
|
Packit Service |
392537 |
|| elt_pairs[my->elt_idx].output_mech != XFER_MECH_NONE;
|
|
Packit Service |
392537 |
my->elt_idx++) {
|
|
Packit Service |
392537 |
/* reject this pair if the input mech does not match */
|
|
Packit Service |
392537 |
if (elt_pairs[my->elt_idx].input_mech != input_mech)
|
|
Packit Service |
392537 |
continue;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* recurse with no glue */
|
|
Packit Service |
392537 |
my->glue_idx = -1;
|
|
Packit Service |
392537 |
link_recurse(st, idx+1,
|
|
Packit Service |
392537 |
elt_pairs[my->elt_idx].output_mech,
|
|
Packit Service |
392537 |
cost + PAIR_COST(elt_pairs[my->elt_idx]));
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* and recurse with glue */
|
|
Packit Service |
392537 |
for (my->glue_idx = 0;
|
|
Packit Service |
392537 |
glue_pairs[my->glue_idx].input_mech != XFER_MECH_NONE
|
|
Packit Service |
392537 |
|| glue_pairs[my->glue_idx].output_mech != XFER_MECH_NONE;
|
|
Packit Service |
392537 |
my->glue_idx++) {
|
|
Packit Service |
392537 |
/* reject this glue pair if it doesn't match with the element output */
|
|
Packit Service |
392537 |
if (glue_pairs[my->glue_idx].input_mech != elt_pairs[my->elt_idx].output_mech)
|
|
Packit Service |
392537 |
continue;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* and recurse with the glue */
|
|
Packit Service |
392537 |
link_recurse(st, idx+1,
|
|
Packit Service |
392537 |
glue_pairs[my->glue_idx].output_mech,
|
|
Packit Service |
392537 |
cost + PAIR_COST(elt_pairs[my->elt_idx])
|
|
Packit Service |
392537 |
+ PAIR_COST(glue_pairs[my->glue_idx]));
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
static void
|
|
Packit Service |
392537 |
link_elements(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
char *tmpbuf;
|
|
Packit Service |
392537 |
GPtrArray *new_elements;
|
|
Packit Service |
392537 |
XferElement *elt;
|
|
Packit Service |
392537 |
char *linkage_str;
|
|
Packit Service |
392537 |
linking_state st;
|
|
Packit Service |
392537 |
gint i, len;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Note that this algorithm's running time is polynomial in the length of
|
|
Packit Service |
392537 |
* the transfer, with a fairly high order. If Amanda is regularly assembling
|
|
Packit Service |
392537 |
* transfers with more than, say, 6 elements, then the algorithm should be
|
|
Packit Service |
392537 |
* redesigned. */
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* set up the state for recursion */
|
|
Packit Service |
392537 |
st.nlinks = xfer->elements->len;
|
|
Packit Service |
392537 |
st.cur = g_new0(linkage, st.nlinks);
|
|
Packit Service |
392537 |
st.best = g_new0(linkage, st.nlinks);
|
|
Packit Service |
392537 |
st.best_cost = MAX_COST;
|
|
Packit Service |
392537 |
for (i = 0; i < st.nlinks; i++) {
|
|
Packit Service |
392537 |
st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
|
|
Packit Service |
392537 |
st.cur[i].mech_pairs = xfer_element_get_mech_pairs(st.cur[i].elt);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* check that the first element is an XferSource and the last is an XferDest.
|
|
Packit Service |
392537 |
* A source is identified by having no input mechanisms. */
|
|
Packit Service |
392537 |
if (st.cur[0].mech_pairs[0].input_mech != XFER_MECH_NONE)
|
|
Packit Service |
392537 |
error("Transfer element 0 is not a transfer source");
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Similarly, a destination has no output mechanisms. */
|
|
Packit Service |
392537 |
if (st.cur[st.nlinks-1].mech_pairs[0].output_mech != XFER_MECH_NONE)
|
|
Packit Service |
392537 |
error("Last transfer element is not a transfer destination");
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* start recursing with the first element, asserting that its input mech is NONE */
|
|
Packit Service |
392537 |
link_recurse(&st, 0, XFER_MECH_NONE, 0);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* check that we got *some* solution */
|
|
Packit Service |
392537 |
if (st.best_cost == MAX_COST) {
|
|
Packit Service |
392537 |
error(_("Xfer %s cannot be linked."), xfer_repr(xfer));
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Now create a new list of elements, containing any glue elements
|
|
Packit Service |
392537 |
* that we need to add, and set their input_mech and output_mech fields */
|
|
Packit Service |
392537 |
new_elements = g_ptr_array_sized_new(xfer->elements->len);
|
|
Packit Service |
392537 |
for (i = 0; i < st.nlinks; i++) {
|
|
Packit Service |
392537 |
elt = st.best[i].elt;
|
|
Packit Service |
392537 |
elt->input_mech = st.best[i].mech_pairs[st.best[i].elt_idx].input_mech;
|
|
Packit Service |
392537 |
elt->output_mech = st.best[i].mech_pairs[st.best[i].elt_idx].output_mech;
|
|
Packit Service |
392537 |
g_ptr_array_add(new_elements, elt);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (st.best[i].glue_idx != -1) {
|
|
Packit Service |
392537 |
elt = xfer_element_glue();
|
|
Packit Service |
392537 |
elt->xfer = xfer;
|
|
Packit Service |
392537 |
elt->input_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].input_mech;
|
|
Packit Service |
392537 |
elt->output_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].output_mech;
|
|
Packit Service |
392537 |
g_ptr_array_add(new_elements, elt);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* install the new list of elements */
|
|
Packit Service |
392537 |
g_ptr_array_free(xfer->elements, FALSE);
|
|
Packit Service |
392537 |
xfer->elements = new_elements;
|
|
Packit Service |
392537 |
new_elements = NULL;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* debug-log the xfer's linkage */
|
|
Packit Service |
392537 |
len = xfer->elements->len;
|
|
Packit Service |
392537 |
linkage_str = g_strdup("Final linkage: ");
|
|
Packit Service |
392537 |
for (i = 0; i < len; i++) {
|
|
Packit Service |
392537 |
XferElement *elt = g_ptr_array_index(xfer->elements, i);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (i == 0) {
|
|
Packit Service |
392537 |
tmpbuf = g_strconcat(linkage_str, xfer_element_repr(elt), NULL);
|
|
Packit Service |
392537 |
g_free(linkage_str);
|
|
Packit Service |
392537 |
linkage_str = tmpbuf;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
else {
|
|
Packit Service |
392537 |
tmpbuf = g_strdup_printf("%s -(%s)-> %s",
|
|
Packit Service |
392537 |
linkage_str, xfer_mech_name(elt->input_mech), xfer_element_repr(elt));
|
|
Packit Service |
392537 |
g_free(linkage_str);
|
|
Packit Service |
392537 |
linkage_str = tmpbuf;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
g_debug("%s", linkage_str);
|
|
Packit Service |
392537 |
amfree(linkage_str);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
amfree(st.cur);
|
|
Packit Service |
392537 |
amfree(st.best);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/*
|
|
Packit Service |
392537 |
* XMsgSource
|
|
Packit Service |
392537 |
*/
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
static gboolean
|
|
Packit Service |
392537 |
xmsgsource_prepare(
|
|
Packit Service |
392537 |
GSource *source,
|
|
Packit Service |
392537 |
gint *timeout_)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
XMsgSource *xms = (XMsgSource *)source;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
*timeout_ = -1;
|
|
Packit Service |
392537 |
return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
static gboolean
|
|
Packit Service |
392537 |
xmsgsource_check(
|
|
Packit Service |
392537 |
GSource *source)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
XMsgSource *xms = (XMsgSource *)source;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
static gboolean
|
|
Packit Service |
392537 |
xmsgsource_dispatch(
|
|
Packit Service |
392537 |
GSource *source G_GNUC_UNUSED,
|
|
Packit Service |
392537 |
GSourceFunc callback,
|
|
Packit Service |
392537 |
gpointer user_data)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
XMsgSource *xms = (XMsgSource *)source;
|
|
Packit Service |
392537 |
Xfer *xfer = xms->xfer;
|
|
Packit Service |
392537 |
XMsgCallback my_cb = (XMsgCallback)callback;
|
|
Packit Service |
392537 |
XMsg *msg;
|
|
Packit Service |
392537 |
gboolean deliver_to_caller;
|
|
Packit Service |
392537 |
guint i;
|
|
Packit Service |
392537 |
gboolean xfer_done = FALSE;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* we're potentially calling Perl code within this loop, so we have to
|
|
Packit Service |
392537 |
* check that everything is ok on each iteration of the loop. */
|
|
Packit Service |
392537 |
while (xfer
|
|
Packit Service |
392537 |
&& xfer->status != XFER_DONE
|
|
Packit Service |
392537 |
&& (msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* We get first crack at interpreting messages, before calling the
|
|
Packit Service |
392537 |
* designated callback. */
|
|
Packit Service |
392537 |
deliver_to_caller = TRUE;
|
|
Packit Service |
392537 |
switch (msg->type) {
|
|
Packit Service |
392537 |
/* Intercept and count DONE messages so that we can determine when
|
|
Packit Service |
392537 |
* the entire transfer is finished. */
|
|
Packit Service |
392537 |
case XMSG_DONE:
|
|
Packit Service |
392537 |
if (--xfer->num_active_elements <= 0) {
|
|
Packit Service |
392537 |
/* mark the transfer as done, and take a note to break out
|
|
Packit Service |
392537 |
* of this loop after delivering the message to the user */
|
|
Packit Service |
392537 |
xfer_set_status(xfer, XFER_DONE);
|
|
Packit Service |
392537 |
xfer_done = TRUE;
|
|
Packit Service |
392537 |
} else {
|
|
Packit Service |
392537 |
/* eat this XMSG_DONE, since we expect more */
|
|
Packit Service |
392537 |
deliver_to_caller = FALSE;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
case XMSG_CANCEL:
|
|
Packit Service |
392537 |
if (xfer->status == XFER_CANCELLING || xfer->status == XFER_CANCELLED) {
|
|
Packit Service |
392537 |
/* ignore duplicate cancel messages */
|
|
Packit Service |
392537 |
deliver_to_caller = FALSE;
|
|
Packit Service |
392537 |
} else {
|
|
Packit Service |
392537 |
/* call cancel() on each child element */
|
|
Packit Service |
392537 |
gboolean expect_eof;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_debug("Cancelling %s", xfer_repr(xfer));
|
|
Packit Service |
392537 |
xfer_set_status(xfer, XFER_CANCELLING);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
expect_eof = FALSE;
|
|
Packit Service |
392537 |
for (i = 0; i < xfer->elements->len; i++) {
|
|
Packit Service |
392537 |
XferElement *elt = (XferElement *)
|
|
Packit Service |
392537 |
g_ptr_array_index(xfer->elements, i);
|
|
Packit Service |
392537 |
expect_eof = xfer_element_cancel(elt, expect_eof) || expect_eof;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* if nothing in the transfer can generate an EOF, then we
|
|
Packit Service |
392537 |
* can't cancel this transfer, and we'll just have to wait
|
|
Packit Service |
392537 |
* until it's finished. This may happen, for example, if
|
|
Packit Service |
392537 |
* the operating system is copying data for us
|
|
Packit Service |
392537 |
* asynchronously */
|
|
Packit Service |
392537 |
if (!expect_eof)
|
|
Packit Service |
392537 |
g_warning("Transfer %s cannot be cancelled.", xfer_repr(xfer));
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* and now we're done cancelling */
|
|
Packit Service |
392537 |
xfer_set_status(xfer, XFER_CANCELLED);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
default:
|
|
Packit Service |
392537 |
break; /* nothing interesting to do */
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (deliver_to_caller) {
|
|
Packit Service |
392537 |
if (my_cb) {
|
|
Packit Service |
392537 |
my_cb(user_data, msg, xfer);
|
|
Packit Service |
392537 |
} else {
|
|
Packit Service |
392537 |
g_warning("Dropping %s because no callback is set", xmsg_repr(msg));
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
xmsg_free(msg);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* This transfer is done, so kill it and exit the loop */
|
|
Packit Service |
392537 |
if (xfer_done) {
|
|
Packit Service |
392537 |
xfer_unref(xfer);
|
|
Packit Service |
392537 |
xfer = NULL;
|
|
Packit Service |
392537 |
break;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* Never automatically un-queue the event source */
|
|
Packit Service |
392537 |
return TRUE;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
XMsgSource *
|
|
Packit Service |
392537 |
xmsgsource_new(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
static GSourceFuncs *xmsgsource_funcs = NULL;
|
|
Packit Service |
392537 |
GSource *src;
|
|
Packit Service |
392537 |
XMsgSource *xms;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* initialize these here to avoid a compiler warning */
|
|
Packit Service |
392537 |
if (!xmsgsource_funcs) {
|
|
Packit Service |
392537 |
xmsgsource_funcs = g_new0(GSourceFuncs, 1);
|
|
Packit Service |
392537 |
xmsgsource_funcs->prepare = xmsgsource_prepare;
|
|
Packit Service |
392537 |
xmsgsource_funcs->check = xmsgsource_check;
|
|
Packit Service |
392537 |
xmsgsource_funcs->dispatch = xmsgsource_dispatch;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
src = g_source_new(xmsgsource_funcs, sizeof(XMsgSource));
|
|
Packit Service |
392537 |
xms = (XMsgSource *)src;
|
|
Packit Service |
392537 |
xms->xfer = xfer;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
return xms;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
xfer_status
|
|
Packit Service |
392537 |
wait_until_xfer_cancelled(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
xfer_status seen_status;
|
|
Packit Service |
392537 |
g_assert(xfer != NULL);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_mutex_lock(xfer->status_mutex);
|
|
Packit Service |
392537 |
while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE) {
|
|
Packit Service |
392537 |
g_cond_wait(xfer->status_cond, xfer->status_mutex);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
seen_status = xfer->status;
|
|
Packit Service |
392537 |
g_mutex_unlock(xfer->status_mutex);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
return seen_status;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
xfer_status
|
|
Packit Service |
392537 |
wait_until_xfer_running(
|
|
Packit Service |
392537 |
Xfer *xfer)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
xfer_status seen_status;
|
|
Packit Service |
392537 |
g_assert(xfer != NULL);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_mutex_lock(xfer->status_mutex);
|
|
Packit Service |
392537 |
while (xfer->status == XFER_START)
|
|
Packit Service |
392537 |
g_cond_wait(xfer->status_cond, xfer->status_mutex);
|
|
Packit Service |
392537 |
seen_status = xfer->status;
|
|
Packit Service |
392537 |
g_mutex_unlock(xfer->status_mutex);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
return seen_status;
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
void
|
|
Packit Service |
392537 |
xfer_cancel_with_error(
|
|
Packit Service |
392537 |
XferElement *elt,
|
|
Packit Service |
392537 |
const char *fmt,
|
|
Packit Service |
392537 |
...)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
va_list argp;
|
|
Packit Service |
392537 |
XMsg *msg;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_assert(elt != NULL);
|
|
Packit Service |
392537 |
g_assert(elt->xfer != NULL);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
msg = xmsg_new(elt, XMSG_ERROR, 0);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
arglist_start(argp, fmt);
|
|
Packit Service |
392537 |
msg->message = g_strdup_vprintf(fmt, argp);
|
|
Packit Service |
392537 |
arglist_end(argp);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
g_debug("xfer_cancel_with_error: %s", msg->message);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* send the XMSG_ERROR */
|
|
Packit Service |
392537 |
xfer_queue_message(elt->xfer, msg);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
/* cancel the transfer */
|
|
Packit Service |
392537 |
xfer_cancel(elt->xfer);
|
|
Packit Service |
392537 |
}
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
gint
|
|
Packit Service |
392537 |
xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd)
|
|
Packit Service |
392537 |
{
|
|
Packit Service |
392537 |
gint rv;
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
if (xfer)
|
|
Packit Service |
392537 |
g_mutex_lock(xfer->fd_mutex);
|
|
Packit Service |
392537 |
rv = *fdp;
|
|
Packit Service |
392537 |
*fdp = newfd;
|
|
Packit Service |
392537 |
if (xfer)
|
|
Packit Service |
392537 |
g_mutex_unlock(xfer->fd_mutex);
|
|
Packit Service |
392537 |
|
|
Packit Service |
392537 |
return rv;
|
|
Packit Service |
392537 |
}
|