Blob Blame History Raw
/*
 * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * for more details.
 * 
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 * 
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
 *
 * Author: Dustin J. Mitchell <dustin@zmanda.com>
 */

#include "amanda.h"
#include "amxfer.h"
#include "glib-util.h"
#include "testutils.h"
#include "event.h"
#include "simpleprng.h"
#include "sockaddr-util.h"

/* Having tests repeat exactly is an advantage, so we use a hard-coded
 * random seed. */
#define RANDOM_SEED 0xf00d

/*
 * XferElement subclasses
 *
 * This file defines a few "private" element classes that each have only one
 * mechanism pair.  These classes are then used to test all of the possible
 * combinations of glue.
 */

/* constants to determine the total amount of data to be transfered; EXTRA is
 * to test out partial-block handling; it should be prime. */
#define TEST_BLOCK_SIZE 32768
#define TEST_BLOCK_COUNT 10
#define TEST_BLOCK_EXTRA 97
#define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)

/* READFD */

static GType xfer_source_readfd_get_type(void);
#define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
#define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
#define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
#define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
#define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
#define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)

typedef struct XferSourceReadfd {
    XferElement __parent__;

    int write_fd;
    GThread *thread;
    simpleprng_state_t prng;
} XferSourceReadfd;

typedef struct {
    XferElementClass __parent__;
} XferSourceReadfdClass;

static gpointer
source_readfd_thread(
    gpointer data)
{
    XferSourceReadfd *self = XFER_SOURCE_READFD(data);
    char buf[TEST_XFER_SIZE];
    int fd = self->write_fd;

    simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));

    if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
	error("error in full_write(): %s", strerror(errno));
    }

    close(fd);

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
source_readfd_setup_impl(
    XferElement *elt)
{
    XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
    int p[2];

    simpleprng_seed(&self->prng, RANDOM_SEED);

    if (pipe(p) < 0)
	g_critical("Error from pipe(): %s", strerror(errno));

    self->write_fd = p[1];
    g_assert(xfer_element_swap_output_fd(elt, p[0]) == -1);

    return TRUE;
}

static gboolean
source_readfd_start_impl(
    XferElement *elt)
{
    XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
    self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
source_readfd_class_init(
    XferSourceReadfdClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_NONE, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->setup = source_readfd_setup_impl;
    xec->start = source_readfd_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_source_readfd_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferSourceReadfdClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) source_readfd_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferSourceReadfd),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
    }

    return type;
}

/* WRITEFD */

static GType xfer_source_writefd_get_type(void);
#define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
#define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
#define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
#define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
#define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
#define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)

typedef struct XferSourceWritefd {
    XferElement __parent__;

    GThread *thread;
    simpleprng_state_t prng;
} XferSourceWritefd;

typedef struct {
    XferElementClass __parent__;
} XferSourceWritefdClass;

static gpointer
source_writefd_thread(
    gpointer data)
{
    XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
    XferElement *elt = XFER_ELEMENT(data);
    char buf[TEST_XFER_SIZE];
    int fd = xfer_element_swap_input_fd(elt->downstream, -1);

    /* this shouldn't happen, although non-test elements handle it gracefully */
    g_assert(fd != -1);

    simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));

    if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
	error("error in full_write(): %s", strerror(errno));
    }

    close(fd);

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
source_writefd_start_impl(
    XferElement *elt)
{
    XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
source_writefd_class_init(
    XferSourceWritefdClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_NONE, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->start = source_writefd_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_source_writefd_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferSourceWritefdClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) source_writefd_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferSourceWritefd),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
    }

    return type;
}

/* PUSH_BUFFER */

static GType xfer_source_push_get_type(void);
#define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
#define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
#define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
#define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
#define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
#define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)

typedef struct XferSourcePush {
    XferElement __parent__;

    GThread *thread;
    simpleprng_state_t prng;
} XferSourcePush;

typedef struct {
    XferElementClass __parent__;
} XferSourcePushClass;

static gpointer
source_push_thread(
    gpointer data)
{
    XferSourcePush *self = XFER_SOURCE_PUSH(data);
    char *buf;
    int i;

    for (i = 0; i < TEST_BLOCK_COUNT; i++) {
	buf = g_malloc(TEST_BLOCK_SIZE);
	simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
	xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
	buf = NULL;
    }

    /* send a smaller block */
    buf = g_malloc(TEST_BLOCK_EXTRA);
    simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
    xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
    buf = NULL;

    /* send EOF */
    xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
source_push_start_impl(
    XferElement *elt)
{
    XferSourcePush *self = XFER_SOURCE_PUSH(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
source_push_class_init(
    XferSourcePushClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->start = source_push_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_source_push_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferSourcePushClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) source_push_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferSourcePush),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
    }

    return type;
}

/* PULL_BUFFER */

static GType xfer_source_pull_get_type(void);
#define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
#define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
#define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
#define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
#define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
#define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)

typedef struct XferSourcePull {
    XferElement __parent__;

    gint nbuffers;
    GThread *thread;
    simpleprng_state_t prng;
} XferSourcePull;

typedef struct {
    XferElementClass __parent__;
} XferSourcePullClass;

static gpointer
source_pull_pull_buffer_impl(
    XferElement *elt,
    size_t *size)
{
    XferSourcePull *self = XFER_SOURCE_PULL(elt);
    char *buf;
    size_t bufsiz;

    if (self->nbuffers > TEST_BLOCK_COUNT) {
	*size = 0;
	return NULL;
    }
    bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;

    self->nbuffers++;

    buf = g_malloc(bufsiz);
    simpleprng_fill_buffer(&self->prng, buf, bufsiz);
    *size = bufsiz;
    return buf;
}

static gboolean
source_pull_setup_impl(
    XferElement *elt)
{
    XferSourcePull *self = XFER_SOURCE_PULL(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    return TRUE;
}

static void
source_pull_class_init(
    XferSourcePullClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->pull_buffer = source_pull_pull_buffer_impl;
    xec->setup = source_pull_setup_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_source_pull_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferSourcePullClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) source_pull_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferSourcePull),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
    }

    return type;
}

/* LISTEN */

static GType xfer_source_listen_get_type(void);
#define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
#define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
#define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
#define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
#define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
#define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)

typedef struct XferSourceListen {
    XferElement __parent__;

    GThread *thread;
    simpleprng_state_t prng;
} XferSourceListen;

typedef struct {
    XferElementClass __parent__;
} XferSourceListenClass;

static gpointer
source_listen_thread(
    gpointer data)
{
    XferSourceListen *self = XFER_SOURCE_LISTEN(data);
    XferElement *elt = XFER_ELEMENT(self);
    DirectTCPAddr *addrs;
    int sock;
    char *buf;
    int i;

    /* set up the sockaddr -- IPv4 only */
    addrs = elt->downstream->input_listen_addrs;
    g_assert(addrs != NULL);

    tu_dbg("making data connection to %s\n", str_sockaddr(addrs));
    sock = socket(SU_GET_FAMILY(addrs), SOCK_STREAM, 0);
    if (sock < 0) {
	error("socket(): %s", strerror(errno));
    }
    if (connect(sock, (struct sockaddr *)addrs, SS_LEN(addrs)) < 0) {
	error("connect(): %s", strerror(errno));
    }

    tu_dbg("connected to %s\n", str_sockaddr(addrs));

    buf = g_malloc(TEST_BLOCK_SIZE);
    for (i = 0; i < TEST_BLOCK_COUNT; i++) {
	simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
	if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
	    error("error in full_write(): %s", strerror(errno));
	}
    }

    /* send a smaller block */
    simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
    if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
	error("error in full_write(): %s", strerror(errno));
    }
    g_free(buf);

    /* send EOF by closing the socket */
    close(sock);

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
source_listen_start_impl(
    XferElement *elt)
{
    XferSourceListen *self = XFER_SOURCE_LISTEN(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
source_listen_class_init(
    XferSourceListenClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->start = source_listen_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_source_listen_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferSourceListenClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) source_listen_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferSourceListen),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
    }

    return type;
}

/* CONNECT */

static GType xfer_source_connect_get_type(void);
#define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
#define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
#define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
#define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
#define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
#define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)

typedef struct XferSourceConnect {
    XferElement __parent__;

    int listen_socket;

    GThread *thread;
    simpleprng_state_t prng;
} XferSourceConnect;

typedef struct {
    XferElementClass __parent__;
} XferSourceConnectClass;

static gpointer
source_connect_thread(
    gpointer data)
{
    XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
    int sock;
    char *buf;
    int i;

    g_assert(self->listen_socket != -1);

    if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
	xfer_cancel_with_error(XFER_ELEMENT(self),
	    _("Error accepting incoming connection: %s"), strerror(errno));
	wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
	return NULL;
    }

    /* close the listening socket now, for good measure */
    close(self->listen_socket);
    self->listen_socket = -1;

    tu_dbg("connection accepted\n");

    buf = g_malloc(TEST_BLOCK_SIZE);
    for (i = 0; i < TEST_BLOCK_COUNT; i++) {
	simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
	if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
	    error("error in full_write(): %s", strerror(errno));
	}
    }

    /* send a smaller block */
    simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
    if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
	error("error in full_write(): %s", strerror(errno));
    }
    g_free(buf);

    /* send EOF by closing the socket */
    close(sock);

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
source_connect_setup_impl(
    XferElement *elt)
{
    XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
    sockaddr_union addr;
    DirectTCPAddr *addrs;
    socklen_t len;
    int sock;

    /* set up self->listen_socket and set elt->output_listen_addrs */
    sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0)
	error("socket(): %s", strerror(errno));

    if (listen(sock, 1) < 0)
	error("listen(): %s", strerror(errno));

    len = sizeof(addr);
    if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
	error("getsockname(): %s", strerror(errno));
    g_assert(SU_GET_FAMILY(&addr) == AF_INET);

    addrs = g_new0(DirectTCPAddr, 2);
    copy_sockaddr(&addrs[0], &addr);
    elt->output_listen_addrs = addrs;

    return TRUE;
}

static gboolean
source_connect_start_impl(
    XferElement *elt)
{
    XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
source_connect_class_init(
    XferSourceConnectClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->setup = source_connect_setup_impl;
    xec->start = source_connect_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_source_connect_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferSourceConnectClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) source_connect_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferSourceConnect),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
    }

    return type;
}

/* READFD */

static GType xfer_dest_readfd_get_type(void);
#define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
#define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
#define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
#define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
#define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
#define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)

typedef struct XferDestReadfd {
    XferElement __parent__;

    GThread *thread;
    simpleprng_state_t prng;
} XferDestReadfd;

typedef struct {
    XferElementClass __parent__;
} XferDestReadfdClass;

static gpointer
dest_readfd_thread(
    gpointer data)
{
    XferDestReadfd *self = XFER_DEST_READFD(data);
    XferElement *elt = XFER_ELEMENT(data);
    char buf[TEST_XFER_SIZE];
    size_t remaining;
    int fd = xfer_element_swap_output_fd(elt->upstream, -1);
    ssize_t nread;

    /* this shouldn't happen, although non-test elements handle it gracefully */
    g_assert(fd != -1);

    remaining = sizeof(buf);
    while (remaining) {
	if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
	    error("error in read(): %s", strerror(errno));
	}
	remaining -= nread;
    }

    /* we should be at EOF here */
    if ((nread = read(fd, buf, 10)) != 0)
	g_critical("too much data entering XferDestReadfd");

    if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
	g_critical("data entering XferDestReadfd does not match");

    close(fd);

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
dest_readfd_start_impl(
    XferElement *elt)
{
    XferDestReadfd *self = XFER_DEST_READFD(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
dest_readfd_class_init(
    XferDestReadfdClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_READFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->start = dest_readfd_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_dest_readfd_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestReadfdClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) dest_readfd_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestReadfd),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
    }

    return type;
}

/* WRITEFD */

static GType xfer_dest_writefd_get_type(void);
#define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
#define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
#define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
#define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
#define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
#define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)

typedef struct XferDestWritefd {
    XferElement __parent__;

    int read_fd;
    GThread *thread;
    simpleprng_state_t prng;
} XferDestWritefd;

typedef struct {
    XferElementClass __parent__;
} XferDestWritefdClass;

static gpointer
dest_writefd_thread(
    gpointer data)
{
    XferDestWritefd *self = XFER_DEST_WRITEFD(data);
    char buf[TEST_XFER_SIZE];
    size_t remaining;
    int fd = self->read_fd;
    ssize_t nread;

    remaining = sizeof(buf);
    while (remaining) {
	if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
	    error("error in read(): %s", strerror(errno));
	}
	remaining -= nread;
    }

    /* we should be at EOF here */
    if ((nread = read(fd, buf, 10)) != 0)
	g_critical("too much data entering XferDestWritefd");

    if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
	g_critical("data entering XferDestWritefd does not match");

    close(fd);

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
dest_writefd_setup_impl(
    XferElement *elt)
{
    XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
    int p[2];

    simpleprng_seed(&self->prng, RANDOM_SEED);

    if (pipe(p) < 0)
	g_critical("Error from pipe(): %s", strerror(errno));

    self->read_fd = p[0];
    g_assert(xfer_element_swap_input_fd(elt, p[1]) == -1);

    return TRUE;
}

static gboolean
dest_writefd_start_impl(
    XferElement *elt)
{
    XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
    self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
dest_writefd_class_init(
    XferDestWritefdClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_WRITEFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->setup = dest_writefd_setup_impl;
    xec->start = dest_writefd_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_dest_writefd_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestWritefdClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) dest_writefd_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestWritefd),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
    }

    return type;
}

/* PUSH_BUFFER */

static GType xfer_dest_push_get_type(void);
#define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
#define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
#define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
#define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
#define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
#define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)

typedef struct XferDestPush {
    XferElement __parent__;

    char *buf;
    size_t bufpos;

    GThread *thread;
    simpleprng_state_t prng;
} XferDestPush;

typedef struct {
    XferElementClass __parent__;
} XferDestPushClass;

static void
dest_push_push_buffer_impl(
    XferElement *elt,
    gpointer buf,
    size_t size)
{
    XferDestPush *self = XFER_DEST_PUSH(elt);

    if (buf == NULL) {
	/* if we're at EOF, verify we got the right bytes */
	g_assert(self->bufpos == TEST_XFER_SIZE);
	if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
	    g_critical("data entering XferDestPush does not match");
	g_free(self->buf);
	return;
    }

    g_assert(self->bufpos + size <= TEST_XFER_SIZE);
    memcpy(self->buf + self->bufpos, buf, size);
    self->bufpos += size;
}

static gboolean
dest_push_setup_impl(
    XferElement *elt)
{
    XferDestPush *self = XFER_DEST_PUSH(elt);

    self->buf = g_malloc(TEST_XFER_SIZE);
    simpleprng_seed(&self->prng, RANDOM_SEED);

    return TRUE;
}

static void
dest_push_class_init(
    XferDestPushClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(0), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->push_buffer = dest_push_push_buffer_impl;
    xec->setup = dest_push_setup_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_dest_push_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestPushClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) dest_push_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestPush),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
    }

    return type;
}

/* PULL_BUFFER */

static GType xfer_dest_pull_get_type(void);
#define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
#define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
#define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
#define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
#define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
#define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)

typedef struct XferDestPull {
    XferElement __parent__;

    GThread *thread;
    simpleprng_state_t prng;
} XferDestPull;

typedef struct {
    XferElementClass __parent__;
} XferDestPullClass;

static gpointer
dest_pull_thread(
    gpointer data)
{
    XferDestPull *self = XFER_DEST_PULL(data);
    char fullbuf[TEST_XFER_SIZE];
    char *buf;
    size_t bufpos = 0;
    size_t size;

    while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
	g_assert(bufpos + size <= TEST_XFER_SIZE);
	memcpy(fullbuf + bufpos, buf, size);
	bufpos += size;
    }

    /* we're at EOF, so verify we got the right bytes */
    g_assert(bufpos == TEST_XFER_SIZE);
    if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
	g_critical("data entering XferDestPull does not match");

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
dest_pull_start_impl(
    XferElement *elt)
{
    XferDestPull *self = XFER_DEST_PULL(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
dest_pull_class_init(
    XferDestPullClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->start = dest_pull_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_dest_pull_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestPullClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) dest_pull_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestPull),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
    }

    return type;
}

/* LISTEN */

static GType xfer_dest_listen_get_type(void);
#define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
#define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
#define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
#define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
#define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
#define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)

typedef struct XferDestListen {
    XferElement __parent__;

    int listen_socket;

    GThread *thread;
    simpleprng_state_t prng;
} XferDestListen;

typedef struct {
    XferElementClass __parent__;
} XferDestListenClass;

static gpointer
dest_listen_thread(
    gpointer data)
{
    XferDestListen *self = XFER_DEST_LISTEN(data);
    char *buf;
    size_t bytes = 0;
    int sock;

    g_assert(self->listen_socket != -1);

    if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
	xfer_cancel_with_error(XFER_ELEMENT(self),
	    _("Error accepting incoming connection: %s"), strerror(errno));
	wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
	return NULL;
    }

    /* close the listening socket now, for good measure */
    close(self->listen_socket);
    self->listen_socket = -1;

    /* read from the socket until EOF or all of the data is read.  We try to
     * read one extra byte - if we get it, then upstream sent too much data */
    buf = g_malloc(TEST_XFER_SIZE+1);
    bytes = read_fully(sock, buf, TEST_XFER_SIZE+1, NULL);
    g_assert(bytes == TEST_XFER_SIZE);
    close(sock);

    /* we're at EOF, so verify we got the right bytes */
    g_assert(bytes == TEST_XFER_SIZE);
    if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
	g_critical("data entering XferDestListen does not match");

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
dest_listen_setup_impl(
    XferElement *elt)
{
    XferDestListen *self = XFER_DEST_LISTEN(elt);
    sockaddr_union addr;
    DirectTCPAddr *addrs;
    socklen_t len;
    int sock;

    /* set up self->listen_socket and set elt->input_listen_addrs */
    sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0)
	error("socket(): %s", strerror(errno));

    if (listen(sock, 1) < 0)
	error("listen(): %s", strerror(errno));

    len = sizeof(addr);
    if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
	error("getsockname(): %s", strerror(errno));
    g_assert(SU_GET_FAMILY(&addr) == AF_INET);

    addrs = g_new0(DirectTCPAddr, 2);
    copy_sockaddr(&addrs[0], &addr);
    elt->input_listen_addrs = addrs;

    return TRUE;
}

static gboolean
dest_listen_start_impl(
    XferElement *elt)
{
    XferDestListen *self = XFER_DEST_LISTEN(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
dest_listen_class_init(
    XferDestListenClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->setup = dest_listen_setup_impl;
    xec->start = dest_listen_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_dest_listen_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestListenClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) dest_listen_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestListen),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
    }

    return type;
}

/* CONNET */

static GType xfer_dest_connect_get_type(void);
#define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
#define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
#define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
#define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
#define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
#define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)

typedef struct XferDestConnect {
    XferElement __parent__;

    int connect_socket;

    GThread *thread;
    simpleprng_state_t prng;
} XferDestConnect;

typedef struct {
    XferElementClass __parent__;
} XferDestConnectClass;

static gpointer
dest_connect_thread(
    gpointer data)
{
    XferDestConnect *self = XFER_DEST_CONNECT(data);
    XferElement *elt = XFER_ELEMENT(self);
    DirectTCPAddr *addrs;
    sockaddr_union addr;
    char *buf;
    size_t bytes = 0;
    int sock;

    /* set up the sockaddr -- IPv4 only */
    SU_INIT(&addr, AF_INET);
    addrs = elt->upstream->output_listen_addrs;
    g_assert(addrs != NULL);
    copy_sockaddr(&addr, addrs);

    tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
    sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
    if (sock < 0) {
	error("socket(): %s", strerror(errno));
    }
    if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
	error("connect(): %s", strerror(errno));
    }

    tu_dbg("connected to %s\n", str_sockaddr(&addr));

    /* read from the socket until EOF or all of the data is read.  We try to
     * read one extra byte - if we get it, then upstream sent too much data */
    buf = g_malloc(TEST_XFER_SIZE+1);
    bytes = read_fully(sock, buf, TEST_XFER_SIZE+1, NULL);
    g_assert(bytes == TEST_XFER_SIZE);
    close(sock);

    /* we're at EOF, so verify we got the right bytes */
    g_assert(bytes == TEST_XFER_SIZE);
    if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
	g_critical("data entering XferDestConnect does not match");

    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

static gboolean
dest_connect_start_impl(
    XferElement *elt)
{
    XferDestConnect *self = XFER_DEST_CONNECT(elt);

    simpleprng_seed(&self->prng, RANDOM_SEED);

    self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);

    return TRUE;
}

static void
dest_connect_class_init(
    XferDestConnectClass * klass)
{
    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    xec->start = dest_connect_start_impl;
    xec->mech_pairs = mech_pairs;
}

GType
xfer_dest_connect_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestConnectClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) dest_connect_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestConnect),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
    }

    return type;
}


/*
 * Tests
 */

static void
test_xfer_generic_callback(
    gpointer data G_GNUC_UNUSED,
    XMsg *msg,
    Xfer *xfer)
{
    tu_dbg("Received message %s\n", xmsg_repr(msg));

    switch (msg->type) {
	case XMSG_DONE:
	    /* are we done? */
	    if (xfer->status == XFER_DONE) {
		tu_dbg("all elements are done!\n");
		g_main_loop_quit(default_main_loop());
	    }
	    break;

	default:
	    break;
    }
}

/****
 * Run a simple transfer with some xor filters
 */

static int
test_xfer_simple(void)
{
    unsigned int i;
    GSource *src;
    XferElement *elements[] = {
	xfer_source_random(100*1024, RANDOM_SEED),
	xfer_filter_xor('d'),
	xfer_filter_xor('d'),
	xfer_dest_null(RANDOM_SEED),
    };

    Xfer *xfer = xfer_new(elements, G_N_ELEMENTS(elements));
    src = xfer_get_source(xfer);
    g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
    g_source_attach(src, NULL);
    tu_dbg("Transfer: %s\n", xfer_repr(xfer));

    /* unreference the elements */
    for (i = 0; i < G_N_ELEMENTS(elements); i++) {
	g_object_unref(elements[i]);
	g_assert(G_OBJECT(elements[i])->ref_count == 1);
	elements[i] = NULL;
    }

    xfer_start(xfer, 0, 0);

    g_main_loop_run(default_main_loop());
    g_assert(xfer->status == XFER_DONE);

    xfer_unref(xfer);

    return 1;
}

/****
 * Run a transfer between two files, with or without filters
 */

static int
test_xfer_files(gboolean add_filters)
{
    unsigned int i;
    unsigned int elts;
    GSource *src;
    char *in_filename = __FILE__;
    char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
    int rfd, wfd;
    Xfer *xfer;
    XferElement *elements[4];

    rfd = open(in_filename, O_RDONLY, 0);
    if (rfd < 0) {
	g_critical("Could not open '%s': %s", in_filename, strerror(errno));
	exit(1);
    }

    wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
    if (wfd < 0) {
	g_critical("Could not open '%s': %s", out_filename, strerror(errno));
	exit(1);
    }

    elts = 0;
    elements[elts++] = xfer_source_fd(rfd);
    if (add_filters) {
	elements[elts++] = xfer_filter_xor(0xab);
	elements[elts++] = xfer_filter_xor(0xab);
    }
    elements[elts++] = xfer_dest_fd(wfd);

    xfer = xfer_new(elements, elts);
    src = xfer_get_source(xfer);
    g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
    g_source_attach(src, NULL);
    tu_dbg("Transfer: %s\n", xfer_repr(xfer));

    /* unreference the elements */
    for (i = 0; i < elts; i++) {
	g_object_unref(elements[i]);
	g_assert(G_OBJECT(elements[i])->ref_count == 1);
	elements[i] = NULL;
    }

    xfer_start(xfer, 0, 0);

    g_main_loop_run(default_main_loop());
    g_assert(xfer->status == XFER_DONE);

    if (fcntl(rfd, F_GETFD) == -1) {
	fprintf(stderr,"rfd is already closed\n");
	exit(1);
    }
    if (fcntl(wfd, F_GETFD) == -1) {
	fprintf(stderr,"wfd is already closed\n");
	exit(1);
    }
    close(rfd);
    close(wfd);
    xfer_unref(xfer);

    unlink(out_filename); /* ignore any errors */

    return 1;
}

static int
test_xfer_files_simple(void)
{
    return test_xfer_files(FALSE);
}

static int
test_xfer_files_filter(void)
{
    return test_xfer_files(TRUE);
}

/*****
 * test each possible combination of source and destination mechansim
 */

static int
test_glue_combo(
    XferElement *source,
    XferElement *dest)
{
    unsigned int i;
    GSource *src;
    XferElement *elements[] = { source, dest };

    Xfer *xfer = xfer_new(elements, G_N_ELEMENTS(elements));
    src = xfer_get_source(xfer);
    g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
    g_source_attach(src, NULL);

    /* unreference the elements */
    for (i = 0; i < G_N_ELEMENTS(elements); i++) {
	g_object_unref(elements[i]);
	g_assert(G_OBJECT(elements[i])->ref_count == 1);
	elements[i] = NULL;
    }

    xfer_start(xfer, 0, 0);

    g_main_loop_run(default_main_loop());
    g_assert(xfer->status == XFER_DONE);

    xfer_unref(xfer);

    return 1;
}

#define make_test_glue(n, s, d) static int n(void) \
{\
    return test_glue_combo((XferElement *)g_object_new(s, NULL), \
			   (XferElement *)g_object_new(d, NULL)); \
}
make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)

/*
 * Main driver
 */

int
main(int argc, char **argv)
{
    static TestUtilsTest tests[] = {
	TU_TEST(test_xfer_simple, 90),
	TU_TEST(test_xfer_files_simple, 90),
	TU_TEST(test_xfer_files_filter, 90),
        TU_TEST(test_glue_READFD_READFD, 90),
        TU_TEST(test_glue_READFD_WRITEFD, 90),
        TU_TEST(test_glue_READFD_PUSH, 90),
        TU_TEST(test_glue_READFD_PULL, 90),
        TU_TEST(test_glue_READFD_LISTEN, 90),
        TU_TEST(test_glue_READFD_CONNECT, 90),
        TU_TEST(test_glue_WRITEFD_READFD, 90),
        TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
        TU_TEST(test_glue_WRITEFD_PUSH, 90),
        TU_TEST(test_glue_WRITEFD_PULL, 90),
        TU_TEST(test_glue_WRITEFD_LISTEN, 90),
        TU_TEST(test_glue_WRITEFD_CONNECT, 90),
        TU_TEST(test_glue_PUSH_READFD, 90),
        TU_TEST(test_glue_PUSH_WRITEFD, 90),
        TU_TEST(test_glue_PUSH_PUSH, 90),
        TU_TEST(test_glue_PUSH_PULL, 90),
        TU_TEST(test_glue_PUSH_LISTEN, 90),
        TU_TEST(test_glue_PUSH_CONNECT, 90),
        TU_TEST(test_glue_PULL_READFD, 90),
        TU_TEST(test_glue_PULL_WRITEFD, 90),
        TU_TEST(test_glue_PULL_PUSH, 90),
        TU_TEST(test_glue_PULL_PULL, 90),
        TU_TEST(test_glue_PULL_LISTEN, 90),
        TU_TEST(test_glue_PULL_CONNECT, 90),
        TU_TEST(test_glue_LISTEN_READFD, 90),
        TU_TEST(test_glue_LISTEN_WRITEFD, 90),
        TU_TEST(test_glue_LISTEN_PUSH, 90),
        TU_TEST(test_glue_LISTEN_PULL, 90),
        TU_TEST(test_glue_LISTEN_LISTEN, 90),
        TU_TEST(test_glue_LISTEN_CONNECT, 90),
        TU_TEST(test_glue_CONNECT_READFD, 90),
        TU_TEST(test_glue_CONNECT_WRITEFD, 90),
        TU_TEST(test_glue_CONNECT_PUSH, 90),
        TU_TEST(test_glue_CONNECT_PULL, 90),
        TU_TEST(test_glue_CONNECT_LISTEN, 90),
        TU_TEST(test_glue_CONNECT_CONNECT, 90),
	TU_END()
    };

    glib_init();

    config_init(0, NULL);
    return testutils_run_tests(argc, argv, tests);
}