Blob Blame History Raw
/*
 * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 *
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
 */

#include "amanda.h"
#include "amutil.h"
#include "device.h"
#include "directtcp.h"
#include "stream.h"
#include "ndmlib.h"
#include "ndmpconnobj.h"
#include "sockaddr-util.h"

/*
 * Type checking and casting macros
 */
#define TYPE_NDMP_DEVICE	(ndmp_device_get_type())
#define NDMP_DEVICE(obj)	G_TYPE_CHECK_INSTANCE_CAST((obj), ndmp_device_get_type(), NdmpDevice)
#define NDMP_DEVICE_CONST(obj)	G_TYPE_CHECK_INSTANCE_CAST((obj), ndmp_device_get_type(), NdmpDevice const)
#define NDMP_DEVICE_CLASS(klass)	G_TYPE_CHECK_CLASS_CAST((klass), ndmp_device_get_type(), NdmpDeviceClass)
#define IS_NDMP_DEVICE(obj)	G_TYPE_CHECK_INSTANCE_TYPE((obj), ndmp_device_get_type ())
#define NDMP_DEVICE_GET_CLASS(obj)	G_TYPE_INSTANCE_GET_CLASS((obj), ndmp_device_get_type(), NdmpDeviceClass)
static GType	ndmp_device_get_type	(void);

/*
 * Main object structure
 */

typedef struct NdmpDevice_ NdmpDevice;
struct NdmpDevice_ {
    Device __parent__;

    NDMPConnection *ndmp;

    /* true if tape service is open on the NDMP server */
    gboolean tape_open;
    guint64      bytes_moved_before;

    /* addresses the object is listening on, and how the connection
     * was opened */
    DirectTCPAddr *listen_addrs;
    gboolean for_writing;

    /* support for IndirectTCP */
    int indirecttcp_sock; /* -1 if not in use */
    int indirect;

    /* Current DirectTCPConnectionNDMP */
    struct DirectTCPConnectionNDMP_ *directtcp_conn;

    /* constructor parameters and properties */
    gchar	 *ndmp_hostname;
    gint	 ndmp_port;
    gchar        *ndmp_device_name;
    gchar	 *ndmp_username;
    gchar	 *ndmp_password;
    gchar	 *ndmp_auth;
    gboolean	 verbose;
    gsize	 read_block_size;

    GMutex	*abort_mutex;
    GCond	*abort_cond;
    gboolean	 cancel;
    int         *cancelled;
};

/*
 * Class definition
 */

typedef struct NdmpDeviceClass_ NdmpDeviceClass;
struct NdmpDeviceClass_ {
    DeviceClass __parent__;
};

/*
 * A directtcp connection subclass representing a running mover on the other end of
 * the given NDMP connection
 */

#define TYPE_DIRECTTCP_CONNECTION_NDMP	(directtcp_connection_ndmp_get_type())
#define DIRECTTCP_CONNECTION_NDMP(obj)	G_TYPE_CHECK_INSTANCE_CAST((obj), directtcp_connection_ndmp_get_type(), DirectTCPConnectionNDMP)
#define DIRECTTCP_CONNECTION_NDMP_CONST(obj)	G_TYPE_CHECK_INSTANCE_CAST((obj), directtcp_connection_ndmp_get_type(), DirectTCPConnectionNDMP const)
#define DIRECTTCP_CONNECTION_NDMP_CLASS(klass)	G_TYPE_CHECK_CLASS_CAST((klass), directtcp_connection_ndmp_get_type(), DirectTCPConnectionNDMPClass)
#define IS_DIRECTTCP_CONNECTION_NDMP(obj)	G_TYPE_CHECK_INSTANCE_TYPE((obj), directtcp_connection_ndmp_get_type ())
#define DIRECTTCP_CONNECTION_NDMP_GET_CLASS(obj)	G_TYPE_INSTANCE_GET_CLASS((obj), directtcp_connection_ndmp_get_type(), DirectTCPConnectionNDMPClass)
GType directtcp_connection_ndmp_get_type(void);

typedef struct DirectTCPConnectionNDMP_ {
    DirectTCPConnection __parent__;

    /* NDMP connection controlling the mover */
    NDMPConnection *ndmp;

    /* mode for this operation */
    ndmp9_mover_mode mode;

    /* last reported mover position in the datastream */
    guint64 offset;
} DirectTCPConnectionNDMP;

typedef struct DirectTCPConnectionNDMPClass_ {
    DirectTCPConnectionClass __parent__;
} DirectTCPConnectionNDMPClass;

static DirectTCPConnectionNDMP *directtcp_connection_ndmp_new(
	NDMPConnection *ndmp,
	ndmp9_mover_mode mode);

/*
 * Constants and static data
 */

#define NDMP_DEVICE_NAME "ndmp"

/* pointer to the class of our parent */
static DeviceClass *parent_class = NULL;

/* robust_write results */
typedef enum {
    ROBUST_WRITE_OK,
    ROBUST_WRITE_OK_LEOM,
    ROBUST_WRITE_ERROR, /* device error already set */
    ROBUST_WRITE_NO_SPACE
} robust_write_result;

/*
 * device-specific properties
 */

/* Authentication information for NDMP agent. Both of these are strings. */
static DevicePropertyBase device_property_ndmp_username;
static DevicePropertyBase device_property_ndmp_password;
static DevicePropertyBase device_property_ndmp_auth;
static DevicePropertyBase device_property_indirect;
#define PROPERTY_NDMP_USERNAME (device_property_ndmp_username.ID)
#define PROPERTY_NDMP_PASSWORD (device_property_ndmp_password.ID)
#define PROPERTY_NDMP_AUTH (device_property_ndmp_auth.ID)
#define PROPERTY_INDIRECT (device_property_indirect.ID)


/*
 * prototypes
 */

void ndmp_device_register(void);
static void set_error_from_ndmp(NdmpDevice *self);

#define ndmp_device_read_size(self) \
    (((NdmpDevice *)(self))->read_block_size? \
	((NdmpDevice *)(self))->read_block_size : ((Device *)(self))->block_size)

/*
 * Utility functions
 */

static gboolean
open_connection(
	NdmpDevice *self)
{
    if (!self->ndmp) {
	self->ndmp = ndmp_connection_new(
	    self->ndmp_hostname,
	    self->ndmp_port,
	    self->ndmp_username,
	    self->ndmp_password,
	    self->ndmp_auth);

	if (ndmp_connection_err_code(self->ndmp)) {
	    char *errmsg = ndmp_connection_err_msg(self->ndmp);
	    device_set_error(DEVICE(self),
		g_strdup_printf("could not connect to ndmp-server '%s:%d': %s",
		    self->ndmp_hostname, self->ndmp_port, errmsg),
		DEVICE_STATUS_DEVICE_ERROR);
	    g_object_unref(self->ndmp);
	    self->ndmp = NULL;
	    return FALSE;
	}

	if (self->verbose)
	    ndmp_connection_set_verbose(self->ndmp, TRUE);

	self->tape_open = FALSE;
    }

    return TRUE;
}

static void
close_connection(
	NdmpDevice *self)
{
    /* note that this does not send NDMP_TAPE_CLOSE, as it's used in error
     * situations too */
    if (self->ndmp) {
	g_object_unref(self->ndmp);
	self->ndmp = NULL;
	self->tape_open = FALSE;
    }
}

static gboolean
open_tape_agent(
    NdmpDevice *self)
{
    guint64 file_num, blockno, blocksize;

    /* if already open, stop now */
    if (self->tape_open) {
	return TRUE;
    }

    if (!open_connection(self)) {
	/* error message set by open_connection */
	return FALSE;
    }

    g_debug("opening tape device '%s' on NDMP server '%s:%d'",
	self->ndmp_device_name, self->ndmp_hostname, self->ndmp_port);

    /* send NDMP_TAPE_OPEN, using RAW mode so that it will open even with no tape */
    if (!ndmp_connection_tape_open(self->ndmp,
		self->ndmp_device_name, NDMP9_TAPE_RAW_MODE)) {
	set_error_from_ndmp(self);
	return FALSE;
    }

    /* check that the block sizes match */
    if (!ndmp_connection_tape_get_state(self->ndmp,
	&blocksize, &file_num, &blockno)) {
	set_error_from_ndmp(self);
	return FALSE;
    }
    if (blocksize != 0 && blocksize != DEVICE(self)->block_size) {
	device_set_error(DEVICE(self),
	    g_strdup_printf("NDMP device has fixed block size %ju, but Amanda "
		    "device is configured with blocksize %ju", (uintmax_t)blocksize,
		    (uintmax_t)(DEVICE(self)->block_size)),
	    DEVICE_STATUS_DEVICE_ERROR);
    }

    self->tape_open = TRUE;

    return TRUE;
}

static gboolean
close_tape_agent(
	NdmpDevice *self)
{
    if (self->tape_open) {
	g_debug("closing tape device '%s' on NDMP server '%s:%d'",
	    self->ndmp_device_name, self->ndmp_hostname, self->ndmp_port);
	self->tape_open = FALSE; /* count it as closed even if there is an error */
	if (!ndmp_connection_tape_close(self->ndmp)) {
	    set_error_from_ndmp(self);
	    return FALSE;
	}
    }

    return TRUE;
}

static gboolean
single_ndmp_mtio(
    NdmpDevice *self,
    ndmp9_tape_mtio_op tape_op)
{
    guint resid;

    if (!ndmp_connection_tape_mtio(self->ndmp, tape_op, 1, &resid)) {
	set_error_from_ndmp(self);
	return FALSE;
    }

    if (resid != 0) {
	device_set_error(DEVICE(self),
		g_strdup_printf("NDMP MTIO operation %d did not complete", tape_op),
		DEVICE_STATUS_DEVICE_ERROR);
    }

    return TRUE;
}

/* get the tape state straight from the device; we try to track these things
 * accurately in the device, but sometimes it's good to check. */
static gboolean
ndmp_get_state(
    NdmpDevice *self)
{
    Device *dself = DEVICE(self);
    guint64 file_num, blockno, blocksize;

    if (!ndmp_connection_tape_get_state(self->ndmp,
	&blocksize, &file_num, &blockno)) {
	set_error_from_ndmp(self);
	return FALSE;
    }

    g_assert(file_num < INT_MAX);
    dself->file = (int)file_num;
    dself->block = blockno;

    return TRUE;
}

static robust_write_result
robust_write(
    NdmpDevice *self,
    char *buf,
    guint64 count)
{
    guint64 actual;
    robust_write_result subresult;

    if (!ndmp_connection_tape_write(self->ndmp, buf, count, &actual)) {
	switch (ndmp_connection_err_code(self->ndmp)) {
	    case NDMP9_IO_ERR:
		/* We encountered PEOM; this only happens when the caller ignores
		 * LEOM */
		return ROBUST_WRITE_NO_SPACE;

	    case NDMP9_EOM_ERR:
		/* We encountered LEOM; retry the write (which should succeed) */
		subresult = robust_write(self, buf, count);
		if (subresult != ROBUST_WRITE_OK)
		    return subresult;
		g_debug("ndmp device hit logical EOM");
		return ROBUST_WRITE_OK_LEOM;

	    default:
		set_error_from_ndmp(self);
		return ROBUST_WRITE_ERROR;
	}
    }

    g_assert(count == actual);
    return ROBUST_WRITE_OK;
}

static void
set_error_from_ndmp(
    NdmpDevice *self)
{
    /* translate some error codes to the corresponding Device API status */
    switch (ndmp_connection_err_code(self->ndmp)) {
	case NDMP9_NO_TAPE_LOADED_ERR:
	    device_set_error(DEVICE(self),
		    g_strdup(_("no tape loaded")),
			    DEVICE_STATUS_VOLUME_MISSING);
	    break;

	case NDMP9_DEVICE_BUSY_ERR:
	    device_set_error(DEVICE(self),
		    g_strdup(_("device busy")),
			    DEVICE_STATUS_DEVICE_BUSY);
	    break;

	case NDMP9_IO_ERR:
	    device_set_error(DEVICE(self),
		    g_strdup(_("IO error")),
			    DEVICE_STATUS_VOLUME_UNLABELED |
			    DEVICE_STATUS_VOLUME_ERROR |
			    DEVICE_STATUS_DEVICE_ERROR);
	    break;

	default:
	    device_set_error(DEVICE(self),
		    ndmp_connection_err_msg(self->ndmp),
		    DEVICE_STATUS_DEVICE_ERROR);
	    break;
	}
    close_connection(self);
}

/*
 * Virtual function overrides
 */

static void
ndmp_device_open_device(
    Device *dself,
    char   *device_name,
    char   *device_type,
    char   *device_node)
{
    NdmpDevice          *self = NDMP_DEVICE(dself);
    char *colon, *at;

    /* first, extract the various parts of the device_node:
     * HOST[:PORT]@DEVICE */
    colon = strchr(device_node, ':');
    at = strchr(device_node, '@');
    if (colon > at)
	colon = NULL; /* :PORT only counts if it's before the device name */
    if (!at) {
	device_set_error(dself,
			 g_strdup_printf("invalid ndmp device name '%s'", device_name),
			 DEVICE_STATUS_DEVICE_ERROR);
	return;
    }

    if (colon) {
	char *p = NULL;
	long port = strtol(colon+1, &p, 10);

	if (port < 0 || port >= 65536 || p != at || (!port && EINVAL == errno)) {
	    device_set_error(dself,
			    g_strdup_printf("invalid ndmp port in device name '%s'",
					    device_name),
			    DEVICE_STATUS_DEVICE_ERROR);
	    return;
	}
	self->ndmp_port = (gint)port;
	self->ndmp_hostname = g_strndup(device_node, colon-device_node);
    } else {
	self->ndmp_port = 0; /* (use ndmjob's default, 10000) */
	self->ndmp_hostname = g_strndup(device_node, at-device_node);
    }
    self->ndmp_device_name = g_strdup(at+1);

    if (parent_class->open_device) {
        parent_class->open_device(dself, device_name, device_type, device_node);
    }
}

static void ndmp_device_finalize(GObject * obj_self)
{
    NdmpDevice       *self = NDMP_DEVICE (obj_self);

    if(G_OBJECT_CLASS(parent_class)->finalize)
        (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);

    (void)close_tape_agent(self); /* ignore any error */

    if (self->directtcp_conn)
	g_object_unref(self->directtcp_conn);

    if (self->listen_addrs)
	g_free(self->listen_addrs);

    close_connection(self);

    if (self->ndmp_hostname)
	g_free(self->ndmp_hostname);
    if (self->ndmp_device_name)
	g_free(self->ndmp_device_name);
    if (self->ndmp_username)
	g_free(self->ndmp_username);
    if (self->ndmp_password)
	g_free(self->ndmp_password);
    if (self->ndmp_auth)
	g_free(self->ndmp_auth);
    if (self->indirecttcp_sock != -1)
	close(self->indirecttcp_sock);
}

static DeviceStatusFlags
ndmp_device_read_label(
    Device *dself)
{
    NdmpDevice       *self = NDMP_DEVICE(dself);
    dumpfile_t       *header = NULL;
    gpointer buf = NULL;
    guint64 buf_size = 0;
    gsize read_block_size = 0;

    if (self->verbose) g_debug("ndmp_device_read_label");
    amfree(dself->volume_label);
    amfree(dself->volume_time);
    dumpfile_free(dself->volume_header);
    dself->volume_header = NULL;

    if (device_in_error(self)) return dself->status;

    if (!open_tape_agent(self)) {
	/* error status was set by open_tape_agent */
	return dself->status;
    }

    if (!single_ndmp_mtio(self, NDMP9_MTIO_REW)) {
	/* error message, if any, is set by single_ndmp_mtio */
	return dself->status;
    }

    /* read the tape header from the NDMP server */
    dself->status = 0;
    read_block_size = ndmp_device_read_size(self);
    buf = g_try_malloc(read_block_size);
    if (buf == NULL) {
	device_set_error(dself,
		g_strdup(_("Cannot allocate memory")),
			DEVICE_STATUS_DEVICE_ERROR);
	goto read_err;
    }
    if (!ndmp_connection_tape_read(self->ndmp,
	buf,
	read_block_size,
	&buf_size)) {

	/* handle known errors */
	switch (ndmp_connection_err_code(self->ndmp)) {
	    case NDMP9_NO_TAPE_LOADED_ERR:
		device_set_error(dself,
			g_strdup(_("no tape loaded")),
				DEVICE_STATUS_VOLUME_MISSING);
		goto read_err;

	    case NDMP9_IO_ERR:
		device_set_error(dself,
			g_strdup(_("IO error reading tape label")),
				DEVICE_STATUS_VOLUME_UNLABELED |
				DEVICE_STATUS_VOLUME_ERROR |
				DEVICE_STATUS_DEVICE_ERROR);
		goto read_err;

	    case NDMP9_EOM_ERR:
	    case NDMP9_EOF_ERR:
		device_set_error(dself,
			g_strdup(_("no tape label found")),
				DEVICE_STATUS_VOLUME_UNLABELED);
		header = dself->volume_header = g_new(dumpfile_t, 1);
		fh_init(header);
		goto read_err;

	    default:
		set_error_from_ndmp(self);
		goto read_err;
	    }
	}

	header = dself->volume_header = g_new(dumpfile_t, 1);
	fh_init(header);
	parse_file_header(buf, header, buf_size);

read_err:
    g_free(buf);

    if (dself->status != 0) {
	/* error already set above */
	return dself->status;
    }

    if (!header) {
	device_set_error(dself,
			 g_strdup(_("no header set")),
			 DEVICE_STATUS_VOLUME_UNLABELED);
	return dself->status;
    }

    /* handle a "weird" label */
    if (header->type != F_TAPESTART) {
	device_set_error(dself,
		g_strdup(_("No tapestart header -- unlabeled device?")),
			 DEVICE_STATUS_VOLUME_UNLABELED);
	return dself->status;
    }
    dself->volume_label = g_strdup(header->name);
    dself->volume_time = g_strdup(header->datestamp);
    /* dself->volume_header is already set */

    /* note: connection is left open, as well as the tape device */

    device_set_error(dself, NULL, DEVICE_STATUS_SUCCESS);

    return dself->status;
}


static gboolean
ndmp_device_start(
    Device           *dself,
    DeviceAccessMode  mode,
    char             *label,
    char             *timestamp)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    dumpfile_t *header;
    char       *header_buf;

    if (self->verbose) g_debug("ndmp_device_start");
    if (device_in_error(self)) return FALSE;

    if (!open_tape_agent(self)) {
	/* error status was set by open_tape_agent */
	return FALSE;
    }

    if (mode != ACCESS_WRITE && dself->volume_label == NULL) {
	if (ndmp_device_read_label(dself) != DEVICE_STATUS_SUCCESS)
	    /* the error was set by ndmp_device_read_label */
	    return FALSE;
    }

    dself->access_mode = mode;
    g_mutex_lock(dself->device_mutex);
    dself->in_file = FALSE;
    g_mutex_unlock(dself->device_mutex);

    if (!single_ndmp_mtio(self, NDMP9_MTIO_REW)) {
	/* single_ndmp_mtio already set our error message */
	return FALSE;
    }

    /* Position the tape */
    switch (mode) {
    case ACCESS_APPEND:
	device_set_error(dself,
	    g_strdup("operation not supported"),
	    DEVICE_STATUS_DEVICE_ERROR);
	return FALSE;
	break;

    case ACCESS_READ:
	dself->file = 0;
	break;

    case ACCESS_WRITE:
	header = make_tapestart_header(dself, label, timestamp);
	g_assert(header != NULL);

	header_buf = device_build_amanda_header(dself, header, NULL);
	if (header_buf == NULL) {
	    device_set_error(dself,
		g_strdup(_("Tapestart header won't fit in a single block!")),
		DEVICE_STATUS_DEVICE_ERROR);
	    dumpfile_free(header);
	    return FALSE;
	}

	switch (robust_write(self, header_buf, dself->block_size)) {
	    case ROBUST_WRITE_OK_LEOM:
		dself->is_eom = TRUE;
		/* fall through */
	    case ROBUST_WRITE_OK:
		break;

	    case ROBUST_WRITE_NO_SPACE:
		/* this would be an odd error to see writing the tape label, but
		 * oh well */
		device_set_error(dself,
		    g_strdup(_("No space left on device")),
		    DEVICE_STATUS_VOLUME_ERROR);
		dself->is_eom = TRUE;
		/* fall through */

	    case ROBUST_WRITE_ERROR:
		/* error was set by robust_write or above */
		dumpfile_free(header);
		amfree(header_buf);
		return FALSE;

	}
	amfree(header_buf);

	if (!single_ndmp_mtio(self, NDMP9_MTIO_EOF)) {
	    /* error was set by single_ndmp_mtio */
	    dumpfile_free(header);
	    return FALSE;
	}

	g_free(dself->volume_label);
	dself->volume_label = g_strdup(label);
	g_free(dself->volume_time);
	dself->volume_time = g_strdup(timestamp);
	dumpfile_free(dself->volume_header);
	dself->volume_header = header;

	/* unset the VOLUME_UNLABELED flag, if it was set */
	device_set_error(dself, NULL, DEVICE_STATUS_SUCCESS);
	dself->file = 0;
	break;

    default:
	g_assert_not_reached();
    }

    return TRUE;
}

static gboolean
ndmp_device_finish(
    Device *dself)
{
    gboolean rval;

    NdmpDevice *self = NDMP_DEVICE(dself);
    rval = !device_in_error(dself);

    /* we're not in a file anymore */
    dself->access_mode = ACCESS_NULL;

    if (!close_tape_agent(self)) {
	/* error is set by close_tape_agent */
	rval = FALSE;
    }

    if (self->ndmp)
	close_connection(self);

    return rval;
}

static gboolean
ndmp_device_eject(
    Device *dself)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    if (device_in_error(dself)) return FALSE;

    if (!open_connection(self)) {
	/* error was set by open_connection */
	return FALSE;
    }

    if (!single_ndmp_mtio(self, NDMP9_MTIO_OFF)) {
	/* error was set by single_ndmp_mtio */
	return FALSE;
    }

    return TRUE;
}


/* functions for writing */

static gboolean
ndmp_device_start_file(
    Device     *dself,
    dumpfile_t *header)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    char *header_buf;

    if (self->verbose) g_debug("ndmp_device_start_file");
    if (device_in_error(self)) return FALSE;

    dself->is_eof = FALSE;
    dself->is_eom = FALSE;
    g_mutex_lock(dself->device_mutex);
    dself->bytes_written = 0;
    g_mutex_unlock(dself->device_mutex);

    /* set the blocksize in the header properly */
    header->blocksize = dself->block_size;

    header_buf = device_build_amanda_header(dself, header, NULL);
    if (header_buf == NULL) {
	device_set_error(dself,
	    g_strdup(_("Amanda file header won't fit in a single block!")),
	    DEVICE_STATUS_DEVICE_ERROR);
	return FALSE;
    }

    switch (robust_write(self, header_buf, dself->block_size)) {
	case ROBUST_WRITE_OK_LEOM:
	    dself->is_eom = TRUE;
	    /* fall through */

	case ROBUST_WRITE_OK:
	    break;

	case ROBUST_WRITE_NO_SPACE:
	    /* this would be an odd error to see writing the tape label, but
	     * oh well */
	    device_set_error(dself,
		g_strdup(_("No space left on device")),
		DEVICE_STATUS_VOLUME_ERROR);
	    dself->is_eom = TRUE;
	    /* fall through */

	case ROBUST_WRITE_ERROR:
	    /* error was set by robust_write or above */
	    amfree(header_buf);
	    return FALSE;
    }
    amfree(header_buf);

    /* arrange the file numbers correctly */
    g_mutex_lock(dself->device_mutex);
    dself->in_file = TRUE;
    g_mutex_unlock(dself->device_mutex);
    if (!ndmp_get_state(self)) {
	/* error already set by ndmp_get_state */
	return FALSE;
    }

    /* double-check that the tape agent gave us a non-bogus file number */
    g_assert(dself->file > 0);

    return TRUE;
}

static DeviceWriteResult
ndmp_device_write_block(
    Device   *dself,
    guint     size,
    gpointer  data)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    gchar *replacement_buffer = NULL;

    if (device_in_error(self)) return WRITE_FAILED;

    /* zero out to the end of a short block -- tape devices only write
     * whole blocks. */
    if (size < dself->block_size) {
        replacement_buffer = g_try_malloc(dself->block_size);
	if (replacement_buffer == NULL) {
	    device_set_error(dself,
		g_strdup(_("Cannot allocate memory")),
			DEVICE_STATUS_DEVICE_ERROR);
	    return WRITE_FAILED;
	}
        memcpy(replacement_buffer, data, size);
        bzero(replacement_buffer+size, dself->block_size-size);

        data = replacement_buffer;
        size = dself->block_size;
    }

    switch (robust_write(self, data, size)) {
	case ROBUST_WRITE_OK_LEOM:
	    dself->is_eom = TRUE;
	    /* fall through */

	case ROBUST_WRITE_OK:
	    break;

	case ROBUST_WRITE_NO_SPACE:
	    /* this would be an odd error to see writing the tape label, but
	     * oh well */
	    device_set_error(dself,
		g_strdup(_("No space left on device")),
		DEVICE_STATUS_VOLUME_ERROR);
	    dself->is_eom = TRUE;
	    /* fall through */

	case ROBUST_WRITE_ERROR:
	    /* error was set by robust_write or above */
	    if (replacement_buffer) g_free(replacement_buffer);
	    return WRITE_FAILED;
    }

    dself->block++;
    g_mutex_lock(dself->device_mutex);
    dself->bytes_written += size;
    g_mutex_unlock(dself->device_mutex);

    if (replacement_buffer) g_free(replacement_buffer);
    return WRITE_SUCCEED;
}

static gboolean
ndmp_device_finish_file(
    Device *dself)
{
    NdmpDevice *self = NDMP_DEVICE(dself);

    if (!dself->in_file)
	return TRUE;

    /* we're not in a file anymore */
    g_mutex_lock(dself->device_mutex);
    dself->in_file = FALSE;
    g_mutex_unlock(dself->device_mutex);

    if (device_in_error(dself)) return FALSE;

    if (!single_ndmp_mtio(self, NDMP9_MTIO_EOF)) {
	/* error was set by single_ndmp_mtio */
        dself->is_eom = TRUE;
	return FALSE;
    }

    return TRUE;
}

/* functions for reading */

static dumpfile_t*
ndmp_device_seek_file(
    Device *dself,
    guint   file)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    gint delta;
    guint resid;
    gpointer buf;
    guint64 buf_size;
    dumpfile_t *header;
    gsize read_block_size = 0;

    if (self->verbose) g_debug("ndmp_device_seek_file %d", file);
    if (device_in_error(dself)) return FALSE;

    /* file 0 is the tape header, and isn't seekable as a distinct
     * Device-API-level file */
    if (file == 0) {
	device_set_error(dself,
	    g_strdup("cannot seek to file 0"),
	    DEVICE_STATUS_DEVICE_ERROR);
	return NULL;
    }

    /* first, make sure the file and block numbers are correct */
    if (!ndmp_get_state(self)) {
	/* error already set by ndmp_get_state */
	return FALSE;
    }

    /* now calculate the file delta */
    delta = file - dself->file;

    if (delta <= 0) {
	/* Note that this algorithm will rewind to the beginning of
	 * the current part, too */

	/* BSF *past* the filemark we want to seek to */
	if (!ndmp_connection_tape_mtio(self->ndmp, NDMP9_MTIO_BSF, -delta + 1, &resid)) {
	    set_error_from_ndmp(self);
	    return NULL;
	}
	if (resid != 0)
	    goto incomplete_bsf;

	/* now we are on the BOT side of the filemark, but we want to be
	 * on the EOT side of it.  An FSF will get us there.. */
	if (!ndmp_connection_tape_mtio(self->ndmp, NDMP9_MTIO_FSF, 1, &resid)) {
	    set_error_from_ndmp(self);
	    return NULL;
	}

	if (resid != 0) {
incomplete_bsf:
	    device_set_error(dself,
		g_strdup_printf("BSF operation failed to seek by %d files", resid),
		DEVICE_STATUS_DEVICE_ERROR);
	    return NULL;
	}
    } else /* (delta > 0) */ {
	if (!ndmp_connection_tape_mtio(self->ndmp, NDMP9_MTIO_FSF, delta, &resid)) {
	    set_error_from_ndmp(self);
	    return FALSE;
	}

	/* if we didn't seek all the way there, then we're past the tapeend */
	if (resid > 0) {
	    device_set_error(dself,
		g_strdup_printf(_("Could not seek forward to file %d"), file),
		DEVICE_STATUS_VOLUME_ERROR);
	    return NULL;
	}
    }

    /* fix up status */
    g_mutex_lock(dself->device_mutex);
    dself->in_file = TRUE;
    g_mutex_unlock(dself->device_mutex);
    dself->file = file;
    dself->block = 0;
    g_mutex_lock(dself->device_mutex);
    dself->bytes_read = 0;
    g_mutex_unlock(dself->device_mutex);

    /* now read the header */
    read_block_size = ndmp_device_read_size(self);
    buf = g_try_malloc(read_block_size);
    if (buf == NULL) {
	device_set_error(dself,
		g_strdup(_("Cannot allocate memory")),
			DEVICE_STATUS_DEVICE_ERROR);
	return NULL;
    }
    if (!ndmp_connection_tape_read(self->ndmp,
		buf, read_block_size, &buf_size)) {
	switch (ndmp_connection_err_code(self->ndmp)) {
	    case NDMP9_EOF_ERR:
	    case NDMP9_EOM_ERR:
		return make_tapeend_header();

	    default:
		set_error_from_ndmp(self);
		g_free(buf);
		return NULL;
	}
    }

    header = g_new(dumpfile_t, 1);
    fh_init(header);
    parse_file_header(buf, header, buf_size);
    g_free(buf);

    return header;
}

static gboolean
ndmp_device_seek_block(
    Device  *dself,
    guint64  block)
{
    if (device_in_error(dself)) return FALSE;

    dself->block = block;

    device_set_error(dself, g_strdup("operation not supported"), DEVICE_STATUS_DEVICE_ERROR);
    return FALSE;
}

static int
ndmp_device_read_block (Device * dself, gpointer data, int *size_req, int max_block G_GNUC_UNUSED) {
    NdmpDevice *self = NDMP_DEVICE(dself);
    guint64 requested, actual;
    gsize read_block_size = ndmp_device_read_size(self);

    /* We checked the NDMP device's blocksize when the device was opened, which should
     * catch any misalignent of server block size and Amanda block size */

    g_assert(read_block_size < INT_MAX); /* check data type mismatch */
    if (!data || *size_req < (int)(read_block_size)) {
	*size_req = (int)(read_block_size);
	return 0;
    }

    requested = *size_req;
    if (!ndmp_connection_tape_read(self->ndmp,
	data,
	requested,
	&actual)) {

	/* handle known errors */
	switch (ndmp_connection_err_code(self->ndmp)) {
	    case NDMP9_EOM_ERR:
	    case NDMP9_EOF_ERR:
		dself->is_eof = TRUE;
		return -1;

	    default:
		set_error_from_ndmp(self);
		return -1;
	}
    }

    *size_req = (int)actual; /* cast is OK - requested size was < INT_MAX too */
    g_mutex_lock(dself->device_mutex);
    dself->bytes_read += actual;
    g_mutex_unlock(dself->device_mutex);

    return *size_req;
}

static gboolean
indirecttcp_listen(
    NdmpDevice *self,
    DirectTCPAddr **addrs)
{
    in_port_t port;

    if (self->verbose) g_debug("indirecttcp_listen");
    self->indirecttcp_sock = stream_server(AF_INET, &port, 0, STREAM_BUFSIZE, 0);
    if (self->indirecttcp_sock < 0) {
	device_set_error(DEVICE(self),
	    g_strdup_printf("Could not bind indirecttcp socket: %s", strerror(errno)),
	    DEVICE_STATUS_DEVICE_ERROR);
	return FALSE;
    }

    /* An IndirectTCP address is 255.255.255.255:$port */
    self->listen_addrs = *addrs = g_new0(DirectTCPAddr, 2);
    addrs[0]->sin.sin_family = AF_INET;
    addrs[0]->sin.sin_addr.s_addr = htonl(0xffffffff);
    SU_SET_PORT(addrs[0], port);

    return TRUE;
}

static gboolean
listen_impl(
    Device *dself,
    gboolean for_writing,
    DirectTCPAddr **addrs)
{
    NdmpDevice *self = NDMP_DEVICE(dself);

    if (self->verbose) g_debug("listen_impl");
    if (device_in_error(self)) return FALSE;

    /* check status */
    g_assert(!self->listen_addrs);

    if (!open_tape_agent(self)) {
	/* error message was set by open_tape_agent */
	return FALSE;
    }

    self->for_writing = for_writing;

    /* first, set the window to an empty span so that the mover doesn't start
     * reading or writing data immediately.  NDMJOB tends to reset the record
     * size periodically (in direct contradiction to the spec), so we reset it
     * here as well. */
    if (!ndmp_connection_mover_set_record_size(self->ndmp,
		DEVICE(self)->block_size)) {
	set_error_from_ndmp(self);
	return FALSE;
    }

    if (for_writing) {
	/* if we're forcing indirecttcp, just do it */
	if (self->indirect) {
	    return indirecttcp_listen(self, addrs);
	}
	if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
	    /* NDMP9_ILLEGAL_ARGS_ERR means the NDMP server doesn't like a zero-byte
	     * mover window, so we'll ignore it */
	    if (ndmp_connection_err_code(self->ndmp) != NDMP9_ILLEGAL_ARGS_ERR) {
		set_error_from_ndmp(self);
		return FALSE;
	    }

	    g_debug("NDMP Device: cannot set zero-length mover window; "
		    "falling back to IndirectTCP");
	    /* In this case, we need to set up IndirectTCP */
	    return indirecttcp_listen(self, addrs);
	}
    } else {
	/* For reading, set the window to the second mover record, so that the
	 * mover will pause immediately when it wants to read the first mover
	 * record. */
	if (!ndmp_connection_mover_set_window(self->ndmp, 0, -1)) {
	    set_error_from_ndmp(self);
	    return FALSE;
	}
    }

    /* then tell it to start listening */
    if (!ndmp_connection_mover_listen(self->ndmp,
		for_writing? NDMP9_MOVER_MODE_READ : NDMP9_MOVER_MODE_WRITE,
		NDMP9_ADDR_TCP,
		addrs)) {
	set_error_from_ndmp(self);
	return FALSE;
    }
    self->listen_addrs = *addrs;

    return TRUE;
}


static gpointer
accept_wait_cond(
        gpointer data)
{
    NdmpDevice *self = NDMP_DEVICE(data);

    ndmp9_mover_state state;
    guint64 bytes_moved;

    gulong backoff = G_USEC_PER_SEC/20; /* 5 msec */
    if (self->verbose) g_debug("accept_wait_cond");
    g_mutex_lock(self->abort_mutex);
    while (1) {
	g_mutex_unlock(self->abort_mutex);
	if (!ndmp_connection_mover_get_state(self->ndmp,
		    &state, &bytes_moved, NULL, NULL)) {
	    g_mutex_lock(self->abort_mutex);
	    set_error_from_ndmp(self);
	    state = 0;
	    break;
	}
	g_mutex_lock(self->abort_mutex);

	if (state != NDMP9_MOVER_STATE_LISTEN)
	    break;

	/* back off a little bit to give the other side time to breathe,
	 * but not more than one second */
	g_mutex_unlock(self->abort_mutex);
	g_usleep(backoff);
	g_mutex_lock(self->abort_mutex);
	if (self->cancel)
	    break;
	backoff *= 2;
	if (backoff > G_USEC_PER_SEC)
	    backoff = G_USEC_PER_SEC;
    }

    self->cancel = TRUE;
    g_cond_broadcast(self->abort_cond);
    g_mutex_unlock(self->abort_mutex);
    return GINT_TO_POINTER(state);
}

static int
accept_impl(
    Device *dself,
    DirectTCPConnection **dtcpconn,
    gboolean *cancelled,
    GMutex *abort_mutex,
    GCond *abort_cond)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    ndmp9_mover_state state;
    ndmp9_mover_mode mode;
    ndmp9_mover_halt_reason mover_halt_reason  = NDMP9_MOVER_HALT_NA;
    ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
    guint64  seek_position;
    GThread *wait_thread;
    int      result;
    char    *err;

    if (self->verbose) g_debug("accept_impl");
    if (device_in_error(self)) return 1;

    self->abort_mutex = abort_mutex;
    self->abort_cond = abort_cond;
    self->cancelled = cancelled;
    self->cancel = FALSE;

    g_assert(self->listen_addrs);

    *dtcpconn = NULL;

    if (!self->for_writing) {
	/* when reading, we don't get any kind of notification that the
	 * connection has been established, but we can't call NDMP_MOVER_READ
	 * until the mover is active.  So we have to poll, waiting for ACTIVE.
	 * This is ugly. */

	wait_thread = g_thread_create(accept_wait_cond, (gpointer)self, TRUE,
				      NULL);
	while (!*cancelled && !self->cancel) {
	    g_cond_wait(self->abort_cond, self->abort_mutex);
	}
	self->cancel = TRUE;
	g_mutex_unlock(self->abort_mutex);
	state = GPOINTER_TO_INT(g_thread_join(wait_thread));
	g_mutex_lock(self->abort_mutex);

	if (*cancelled) {
	    result = 2;
	    goto accept_failed;
	}

	/* double-check state */
	if (state != NDMP9_MOVER_STATE_ACTIVE) {
	    device_set_error(DEVICE(self),
		g_strdup("mover did not enter the ACTIVE state as expected"),
		DEVICE_STATUS_DEVICE_ERROR);
	    result = 1;
	    goto accept_failed;
	}

	/* now, we need to get this into the PAUSED state, since right now we
	 * aren't allowed to perform any tape movement commands.  So we issue a
	 * MOVER_READ request for the whole darn image stream after setting the
	 * usual empty window. Note that this means the whole dump will be read
	 * in one MOVER_READ operation, even if it does not begin at the
	 * beginning of a part. */
	if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
	    set_error_from_ndmp(self);
	    result = 1;
	    goto accept_failed;
	}

	/* now we should expect a notice that the mover has paused */
    } else {
	/* when writing, the mover will pause as soon as the first byte comes
	 * in, so there's no need to do anything to trigger the pause. */

	if (self->indirecttcp_sock == -1) {
	    /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to
	     * write outside the window, while the standard specifies .._EOW,
	     * instead.  When reading to a connection, we get the appropriate
	     * .._SEEK.  It's easy enough to handle both.
	    */
	    result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
			NULL,
			&mover_halt_reason,
			&mover_pause_reason, &seek_position,
			cancelled,
			abort_mutex, abort_cond);

	    if (result == 2) {
		goto accept_failed;
	    }

	    err = NULL;
	    if (mover_pause_reason) {
		switch (mover_pause_reason) {
		    case NDMP9_MOVER_PAUSE_SEEK:
		    case NDMP9_MOVER_PAUSE_EOW:
			break;
		    default:
			err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
			break;
		}
	    } else if (mover_halt_reason) {
		err = "unexpected NOTIFY_MOVER_HALT";
	    }

	    if (err) {
		device_set_error(DEVICE(self),
		    g_strdup_printf("waiting NDMP_MOVER_PAUSE_SEEK: %s", err),
		    DEVICE_STATUS_DEVICE_ERROR);
		result = 1;
		goto accept_failed;
	    }
	}
    }

    /* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to write
     * outside the window, while the standard specifies .._EOW, instead.  When
     * reading to a connection, we get the appropriate .._SEEK.  It's easy
     * enough to handle both. */

    if (self->indirecttcp_sock == -1) {
	g_free(self->listen_addrs);
	self->listen_addrs = NULL;
    }

    if (self->for_writing)
	mode = NDMP9_MOVER_MODE_READ;
    else
	mode = NDMP9_MOVER_MODE_WRITE;

    /* set up the new directtcp connection */
    if (self->directtcp_conn)
	g_object_unref(self->directtcp_conn);
    self->directtcp_conn =
	directtcp_connection_ndmp_new(self->ndmp, mode);
    *dtcpconn = DIRECTTCP_CONNECTION(self->directtcp_conn);

    /* reference it for the caller */
    g_object_ref(*dtcpconn);

    return 0;

accept_failed:
    if (self->indirecttcp_sock == -1) {
        g_free(self->listen_addrs);
        self->listen_addrs = NULL;
    }
    return result;
}


static int
connect_impl(
    Device *dself,
    gboolean for_writing,
    DirectTCPAddr *addrs,
    DirectTCPConnection **dtcpconn,
    int    *cancelled,
    GMutex *abort_mutex,
    GCond  *abort_cond)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    ndmp9_mover_mode mode;
    ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
    ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
    guint64 seek_position;
    int result;

    if (self->verbose) g_debug("connect_impl");
    g_assert(!self->listen_addrs);

    *dtcpconn = NULL;
    self->for_writing = for_writing;

    if (!open_tape_agent(self)) {
	/* error message was set by open_tape_agent */
	return 1;
    }

    /* first, set the window to an empty span so that the mover doesn't start
     * reading or writing data immediately.  NDMJOB tends to reset the record
     * size periodically (in direct contradiction to the spec), so we reset it
     * here as well. */
    if (!ndmp_connection_mover_set_record_size(self->ndmp,
		DEVICE(self)->block_size)) {
	set_error_from_ndmp(self);
	return 1;
    }

    if (self->for_writing) {
	mode = NDMP9_MOVER_MODE_READ;
	if (!ndmp_connection_mover_set_window(self->ndmp, 0, 0)) {
	    set_error_from_ndmp(self);
	    return 1;
	}
    } else {
	mode = NDMP9_MOVER_MODE_WRITE;
	if (!ndmp_connection_mover_set_window(self->ndmp, 0, -1)) {
	    set_error_from_ndmp(self);
	    return 1;
	}
    }

    if (!ndmp_connection_mover_connect(self->ndmp, mode, addrs)) {
	set_error_from_ndmp(self);
	return 1;
    }

    if (!self->for_writing) {
	/* The agent is in the ACTIVE state, and will remain so until we tell
	 * it to do something else.  The thing we want to is for it to start
	 * reading data from the tape, which will immediately trigger an EOW or
	 * SEEK pause. */
	if (!ndmp_connection_mover_read(self->ndmp, 0, G_MAXUINT64)) {
	    set_error_from_ndmp(self);
	    return 1;
	}

    } else {
	/* when writing, the mover will pause as soon as the first byte comes
	 * in, so there's no need to do anything to trigger the pause.
	 */

	/* NDMJOB sends NDMP9_MOVER_PAUSE_SEEK to indicate that it wants to
	 * write outside the window, while the standard specifies .._EOW,
	 * instead.  When reading to a connection, we get the appropriate
	 * .._SEEK.  It's easy enough to handle both.
	 */

	result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
		NULL,
		&mover_halt_reason,
		&mover_pause_reason, &seek_position,
		cancelled,
		abort_mutex, abort_cond);

	if (result == 2) {
	    return 2;
	}

	if (mover_halt_reason != NDMP9_MOVER_HALT_NA) {
	    device_set_error(DEVICE(self),
		g_strdup_printf("got NDMP9_MOVER_HALT"),
		DEVICE_STATUS_DEVICE_ERROR);
	    return 1;
	}
	if (mover_pause_reason != NDMP9_MOVER_PAUSE_SEEK &&
	    mover_pause_reason != NDMP9_MOVER_PAUSE_EOW) {
	    device_set_error(DEVICE(self),
	    g_strdup_printf("got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK"),
	    DEVICE_STATUS_DEVICE_ERROR);
	    return 1;
	}
    }

    if (self->listen_addrs) {
	g_free(self->listen_addrs);
	self->listen_addrs = NULL;
    }

    /* set up the new directtcp connection */
    if (self->directtcp_conn)
	g_object_unref(self->directtcp_conn);
    self->directtcp_conn =
	directtcp_connection_ndmp_new(self->ndmp, mode);
    *dtcpconn = DIRECTTCP_CONNECTION(self->directtcp_conn);

    /* reference it for the caller */
    g_object_ref(*dtcpconn);

    return 0;
}

static gboolean
indirecttcp_start_writing(
	NdmpDevice *self)
{
    DirectTCPAddr *real_addrs, *iter;
    int conn_sock;

    /* The current state is that the other end is trying to connect to
     * indirecttcp_sock.  The mover remains IDLE, although its window is set
     * correctly for the part we are about to write. */

    g_debug("indirecttcp_start_writing, ready to accept");
    conn_sock = accept(self->indirecttcp_sock, NULL, NULL);
    if (conn_sock < 0) {
	device_set_error(DEVICE(self),
	    g_strdup_printf("Could not accept indirecttcp socket: %s", strerror(errno)),
	    DEVICE_STATUS_DEVICE_ERROR);
	return FALSE;
    }
    g_debug("indirecttcp_start_writing, accepted");

    close(self->indirecttcp_sock);
    self->indirecttcp_sock = -1;

    /* tell mover to start listening */
    g_assert(self->for_writing);
    if (!ndmp_connection_mover_listen(self->ndmp,
		NDMP9_MOVER_MODE_READ,
		NDMP9_ADDR_TCP,
		&real_addrs)) {
	set_error_from_ndmp(self);
	close(conn_sock);
	return FALSE;
    }

    /* format the addresses and send them down the socket */
    for (iter = real_addrs; iter && SU_GET_FAMILY(iter) != 0; iter++) {
	char inet[INET_ADDRSTRLEN];
	const char *addr;
	char *addrspec;

	addr = inet_ntop(AF_INET, &iter->sin.sin_addr.s_addr, inet, INET_ADDRSTRLEN);

	addrspec = g_strdup_printf("%s:%d%s", addr, SU_GET_PORT(iter),
		SU_GET_FAMILY(iter+1) !=0? " ":"");
	g_debug("indirecttcp_start_writing, send %s", addrspec);
	if (full_write(conn_sock, addrspec, strlen(addrspec)) < strlen(addrspec)) {
	    device_set_error(DEVICE(self),
		g_strdup_printf("writing to indirecttcp socket: %s", strerror(errno)),
		DEVICE_STATUS_DEVICE_ERROR);
	    close(conn_sock);
	    return FALSE;
	}
    }

    /* close the socket for good.  This ensures that the next call to
     * write_from_connection_impl will not go through the mover setup process.
     * */
    if (close(conn_sock) < 0) {
	device_set_error(DEVICE(self),
	    g_strdup_printf("closing indirecttcp socket: %s", strerror(errno)),
	    DEVICE_STATUS_DEVICE_ERROR);
	return FALSE;
    }
    conn_sock = -1;

    /* and free the listen_addrs, since we didn't free them in accept_impl */
    if (self->listen_addrs) {
	g_free(self->listen_addrs);
	self->listen_addrs = NULL;
    }

    /* Now it's up to the remote end to connect to the mover and start sending
     * data.  We won't get any notification when this happens, although we could
     * in principle poll for such a thing. */
    return TRUE;
}

static int
write_from_connection_impl(
    Device  *dself,
    guint64  size,
    guint64 *actual_size,
    int     *cancelled,
    GMutex  *abort_mutex,
    GCond   *abort_cond)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    DirectTCPConnectionNDMP *nconn = self->directtcp_conn;
    gboolean eom = FALSE, eof = FALSE, eow = FALSE;
    ndmp9_mover_state mover_state;
    ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
    ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
    guint64 bytes_moved_before, bytes_moved_after;
    gchar *err;
    int result;

    if (device_in_error(self)) return FALSE;

    g_debug("write_from_connection_impl");
    if (actual_size)
	*actual_size = 0;

    /* if this is false, then the caller did not use use_connection correctly */
    g_assert(self->directtcp_conn != NULL);
    g_assert(self->ndmp == nconn->ndmp);
    g_assert(nconn->mode == NDMP9_MOVER_MODE_READ);

    if (!ndmp_connection_mover_get_state(self->ndmp,
		&mover_state, &bytes_moved_before, NULL, NULL)) {
	set_error_from_ndmp(self);
	return 1;
    }

    if (self->indirecttcp_sock != -1) {
	/* If we're doing IndirectTCP, then we've deferred the whole
	 * mover_set_window mover_listen process.. until now.
	 * So the mover should be IDLE.
	 */
	g_assert(mover_state == NDMP9_MOVER_STATE_IDLE);
    } else {
	/* the mover had best be PAUSED right now */
	g_assert(mover_state == NDMP9_MOVER_STATE_PAUSED);
    }

    /* we want to set the window regardless of whether this is directtcp or
     * indirecttcp
     */
    if (!ndmp_connection_mover_set_window(self->ndmp,
		nconn->offset,
		size && size != G_MAXUINT64 ? size
					    : G_MAXUINT64 - nconn->offset)) {
	set_error_from_ndmp(self);
	return 1;
    }

    /* for DirectTCP, we just tell the mover to continue; IndirectTCP is more complicated. */
    if (self->indirecttcp_sock != -1) {
	if (!indirecttcp_start_writing(self)) {
	    return 1;
	}
    } else {
	if (!ndmp_connection_mover_continue(self->ndmp)) {
	    set_error_from_ndmp(self);
	    return 1;
	}
    }

    /* now wait for the mover to pause itself again, or halt on EOF or an error */
     result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
		    NULL,
		    &mover_halt_reason,
		    &mover_pause_reason, NULL,
		    cancelled, abort_mutex, abort_cond);
    if (result == 2) {
	return 2;
    }

    err = NULL;
    if (mover_pause_reason) {
	switch (mover_pause_reason) {
	    case NDMP9_MOVER_PAUSE_EOM:
		eom = TRUE;
		break;

	    /* ndmjob sends .._SEEK when it should send .._EOW, so deal with
		* both equivalently */
	    case NDMP9_MOVER_PAUSE_EOW:
	    case NDMP9_MOVER_PAUSE_SEEK:
		eow = TRUE;
		break;

	    default:
		err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
		break;
	}
    } else if (mover_halt_reason) {
	switch (mover_halt_reason) {
	    case NDMP9_MOVER_HALT_CONNECT_CLOSED:
		eof = TRUE;
		break;

	    default:
	    case NDMP9_MOVER_HALT_ABORTED:
	    /* case NDMP9_MOVER_HALT_MEDIA_ERROR: <-- not in ndmjob */
	    case NDMP9_MOVER_HALT_INTERNAL_ERROR:
	    case NDMP9_MOVER_HALT_CONNECT_ERROR:
		err = "unexpected NDMP_NOTIFY_MOVER_HALTED";
		break;
	}
    }

    if (err) {
	device_set_error(DEVICE(self),
	    g_strdup_printf("waiting for accept: %s", err),
	    DEVICE_STATUS_DEVICE_ERROR);
	return 1;
    }

    /* no error, so the mover stopped due to one of EOM (volume out of space),
     * EOF (data connection is done), or EOW (maximum part size was written).
     * In any case, we want to know how many bytes were written. */

    if (!ndmp_connection_mover_get_state(self->ndmp,
		&mover_state, &bytes_moved_after, NULL, NULL)) {
	set_error_from_ndmp(self);
	return 1;
    }
    size = bytes_moved_after - bytes_moved_before;
    nconn->offset += size;

    if (actual_size) {
	*actual_size = bytes_moved_after - bytes_moved_before;
    }

    if (eow) {
        ; /* mover finished the whole part -- nothing to report! */
    } else if (eof) {
        DEVICE(self)->is_eof = TRUE;
    } else if (eom) {
        /* this is a *lossless* EOM, so no need to set error, but
         * we do need to figure out the actual size */
        DEVICE(self)->is_eom = TRUE;
    } else {
	g_assert_not_reached();
        error("not reached");
    }

    return 0;
}

static int
read_to_connection_impl(
    Device *dself,
    guint64 size,
    guint64 *actual_size,
    int     *cancelled,
    GMutex  *abort_mutex,
    GCond   *abort_cond)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    DirectTCPConnectionNDMP *nconn = self->directtcp_conn;
    gboolean eom = FALSE, eof = FALSE, eow = FALSE;
    ndmp9_mover_state mover_state;
    ndmp9_mover_halt_reason mover_halt_reason = NDMP9_MOVER_HALT_NA;
    ndmp9_mover_pause_reason mover_pause_reason = NDMP9_MOVER_PAUSE_NA;
    guint64 bytes_moved_before, bytes_moved_after;
    gchar *err;
    int result;

    if (actual_size)
	*actual_size = 0;

    g_debug("read_to_connection_impl");
    if (device_in_error(self)) return 1;

    /* read_to_connection does not support IndirectTCP */
    g_assert(self->indirecttcp_sock == -1);

    /* if this is false, then the caller did not use use_connection correctly */
    g_assert(nconn != NULL);
    g_assert(self->ndmp == nconn->ndmp);
    g_assert(nconn->mode == NDMP9_MOVER_MODE_WRITE);

    if (!ndmp_connection_mover_get_state(self->ndmp,
		&mover_state, &bytes_moved_before, NULL, NULL)) {
	set_error_from_ndmp(self);
	return 1;
    }

    if (mover_state == NDMP9_MOVER_STATE_PAUSED) {
	/* the mover had best be PAUSED right now */
	g_assert(mover_state == NDMP9_MOVER_STATE_PAUSED);

	if (!ndmp_connection_mover_set_window(self->ndmp,
		nconn->offset,
		size && size != G_MAXUINT64 ? size
					    : G_MAXUINT64 - nconn->offset)) {
	    set_error_from_ndmp(self);
	    return 1;
	}

	if (!ndmp_connection_mover_continue(self->ndmp)) {
	    set_error_from_ndmp(self);
	    return 1;
	}
    }

    /* now wait for the mover to pause itself again, or halt on EOF or an error */
    result = ndmp_connection_wait_for_notify_with_cond(self->ndmp,
		    NULL,
		    &mover_halt_reason,
		    &mover_pause_reason, NULL,
		    cancelled, abort_mutex, abort_cond);
    if (result == 2) {
	return 2;
    }

    err = NULL;
    if (mover_pause_reason) {
	switch (mover_pause_reason) {
	    case NDMP9_MOVER_PAUSE_EOF:
		eof = TRUE;
		break;

	    /* ndmjob sends .._SEEK when it should send .._EOW, so deal with
		* both equivalently */
	    case NDMP9_MOVER_PAUSE_EOW:
	    case NDMP9_MOVER_PAUSE_SEEK:
		eow = TRUE;
		break;

	    default:
		err = "got NOTIFY_MOVER_PAUSED, but not because of EOW or SEEK";
		break;
	}
    } else if (mover_halt_reason) {
	switch (mover_halt_reason) {
	    case NDMP9_MOVER_HALT_CONNECT_CLOSED:
		eof = TRUE;
		break;

	    default:
	    case NDMP9_MOVER_HALT_ABORTED:
	    /* case NDMP9_MOVER_HALT_MEDIA_ERROR: <-- not in ndmjob */
	    case NDMP9_MOVER_HALT_INTERNAL_ERROR:
	    case NDMP9_MOVER_HALT_CONNECT_ERROR:
		err = "unexpected NDMP_NOTIFY_MOVER_HALTED";
		break;
	}
    }

    if (err) {
	device_set_error(DEVICE(self),
	    g_strdup_printf("waiting for accept: %s", err),
	    DEVICE_STATUS_DEVICE_ERROR);
	return 1;
    }

    /* no error, so the mover stopped due to one of EOM (volume out of space),
     * EOF (data connection is done), or EOW (maximum part size was written).
     * In any case, we want to know how many bytes were written. */

    if (!ndmp_connection_mover_get_state(self->ndmp,
		&mover_state, &bytes_moved_after, NULL, NULL)) {
	set_error_from_ndmp(self);
	return 1;
    }
    size = bytes_moved_after - self->bytes_moved_before;
    nconn->offset += size;

    if (actual_size) {
	*actual_size = size;
    }
    self->bytes_moved_before = bytes_moved_after;

    if (eow) {
        ; /* mover finished the whole part -- nothing to report! */
    } else if (eof) {
        DEVICE(self)->is_eof = TRUE;
    } else if (eom) {
        /* this is a *lossless* EOM, so no need to set error, but
         * we do need to figure out the actual size */
        DEVICE(self)->is_eom = TRUE;
    } else {
	g_assert_not_reached();
        error("not reached");
    }

    return 0;
}

static gboolean
use_connection_impl(
    Device *dself,
    DirectTCPConnection *conn)
{
    NdmpDevice *self = NDMP_DEVICE(dself);
    DirectTCPConnectionNDMP *nconn;

    if (self->verbose) g_debug("read_to_connection_impl");
    /* the device_use_connection_impl wrapper already made sure we're in
     * ACCESS_NULL, but we may have opened the tape service already to read
     * a label - so close it to be sure */
    if (!close_tape_agent(self)) {
	/* error was already set by close_tape_agent */
	return FALSE;
    }

    /* we had best not be listening when this is called */
    g_assert(!self->listen_addrs);

    if (!IS_DIRECTTCP_CONNECTION_NDMP(conn)) {
	device_set_error(DEVICE(self),
	    g_strdup("existing DirectTCPConnection is not compatible with this device"),
	    DEVICE_STATUS_DEVICE_ERROR);
	return FALSE;
    }

    if (self->directtcp_conn)
	g_object_unref(self->directtcp_conn);
    self->directtcp_conn = nconn = DIRECTTCP_CONNECTION_NDMP(conn);
    g_object_ref(self->directtcp_conn);

    /* if this is a different connection, use it */
    if (nconn->ndmp != self->ndmp) {
	if (self->ndmp)
	    close_connection(self);
	self->ndmp = nconn->ndmp;
	g_object_ref(self->ndmp);
    }

    return TRUE;
}

/*
 * Class mechanics
 */

static gboolean
ndmp_device_set_username_fn(Device *dself,
    DevicePropertyBase *base, GValue *val,
    PropertySurety surety, PropertySource source)
{
    NdmpDevice *self = NDMP_DEVICE(dself);

    amfree(self->ndmp_username);
    self->ndmp_username = g_value_dup_string(val);
    device_clear_volume_details(dself);

    return device_simple_property_set_fn(dself, base, val, surety, source);
}

static gboolean
ndmp_device_set_password_fn(Device *dself,
    DevicePropertyBase *base, GValue *val,
    PropertySurety surety, PropertySource source)
{
    NdmpDevice *self = NDMP_DEVICE(dself);

    amfree(self->ndmp_password);
    self->ndmp_password = g_value_dup_string(val);
    device_clear_volume_details(dself);

    return device_simple_property_set_fn(dself, base, val, surety, source);
}

static gboolean
ndmp_device_set_auth_fn(Device *dself,
    DevicePropertyBase *base, GValue *val,
    PropertySurety surety, PropertySource source)
{
    NdmpDevice *self = NDMP_DEVICE(dself);

    amfree(self->ndmp_auth);
    self->ndmp_auth = g_value_dup_string(val);
    device_clear_volume_details(dself);

    return device_simple_property_set_fn(dself, base, val, surety, source);
}

static gboolean
ndmp_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
    GValue *val, PropertySurety surety, PropertySource source)
{
    NdmpDevice *self = NDMP_DEVICE(p_self);

    self->verbose = g_value_get_boolean(val);

    /* if the connection is active, set up verbose logging or turn it off */
    if (self->ndmp) {
	ndmp_connection_set_verbose(self->ndmp, self->verbose);
    }


    return device_simple_property_set_fn(p_self, base, val, surety, source);
}

static gboolean
ndmp_device_set_read_block_size_fn(Device *p_self, DevicePropertyBase *base G_GNUC_UNUSED,
    GValue *val, PropertySurety surety, PropertySource source)
{
    NdmpDevice *self = NDMP_DEVICE(p_self);
    gsize read_block_size = g_value_get_uint(val);

    if (read_block_size != 0 &&
	    ((gsize)read_block_size < p_self->block_size ||
	     (gsize)read_block_size > p_self->max_block_size)) {
	device_set_error(p_self,
	    g_strdup_printf("Error setting READ-BLOCK-SIZE property to '%zu', it must be between %zu and %zu", read_block_size, p_self->block_size, p_self->max_block_size),
	    DEVICE_STATUS_DEVICE_ERROR);
	return FALSE;
    }

    self->read_block_size = read_block_size;

    /* use the READ_BLOCK_SIZE, even if we're invoked to get the old READ_BUFFER_SIZE */
    return device_simple_property_set_fn(p_self, base,
					val, surety, source);
}

static gboolean
ndmp_device_set_indirect_fn(Device *dself,
    DevicePropertyBase *base, GValue *val,
    PropertySurety surety, PropertySource source)
{
    NdmpDevice *self = NDMP_DEVICE(dself);

    self->indirect = g_value_get_boolean(val);

    return device_simple_property_set_fn(dself, base, val, surety, source);
}

static gboolean
ndmp_device_set_leom_fn(Device *dself,
    DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
    PropertySurety surety G_GNUC_UNUSED, PropertySource source G_GNUC_UNUSED)
{
    gboolean leom = g_value_get_boolean(val);

    if (!leom) {
	device_set_error(dself,
	    g_strdup_printf("Error setting LEOM property, it must be TRUE"),
	    DEVICE_STATUS_DEVICE_ERROR);
	return FALSE;
    }
    return TRUE;
}

static void
ndmp_device_class_init(NdmpDeviceClass * c G_GNUC_UNUSED)
{
    GObjectClass *g_object_class = (GObjectClass*) c;
    DeviceClass *device_class = (DeviceClass *)c;

    parent_class = g_type_class_ref (TYPE_DEVICE);

    device_class->open_device = ndmp_device_open_device;
    device_class->read_label = ndmp_device_read_label;
    device_class->start = ndmp_device_start;
    device_class->finish = ndmp_device_finish;
    device_class->eject = ndmp_device_eject;

    device_class->start_file = ndmp_device_start_file;
    device_class->write_block = ndmp_device_write_block;
    device_class->finish_file = ndmp_device_finish_file;

    device_class->seek_file = ndmp_device_seek_file;
    device_class->seek_block = ndmp_device_seek_block;
    device_class->read_block = ndmp_device_read_block;

    device_class->directtcp_supported = TRUE;
    device_class->listen = listen_impl;
    device_class->accept= accept_impl;
    device_class->connect= connect_impl;
    device_class->write_from_connection = write_from_connection_impl;
    device_class->read_to_connection = read_to_connection_impl;
    device_class->use_connection = use_connection_impl;

    g_object_class->finalize = ndmp_device_finalize;

    device_class_register_property(device_class, PROPERTY_NDMP_USERNAME,
	    PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
	    device_simple_property_get_fn,
	    ndmp_device_set_username_fn);

    device_class_register_property(device_class, PROPERTY_NDMP_PASSWORD,
	    PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
	    device_simple_property_get_fn,
	    ndmp_device_set_password_fn);

    device_class_register_property(device_class, PROPERTY_NDMP_AUTH,
	    PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
	    device_simple_property_get_fn,
	    ndmp_device_set_auth_fn);

    device_class_register_property(device_class, PROPERTY_VERBOSE,
	    PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK,
	    device_simple_property_get_fn,
	    ndmp_device_set_verbose_fn);

    device_class_register_property(device_class, PROPERTY_INDIRECT,
	    PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK,
	    device_simple_property_get_fn,
	    ndmp_device_set_indirect_fn);

    device_class_register_property(device_class, PROPERTY_READ_BLOCK_SIZE,
	    PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
	    device_simple_property_get_fn,
	    ndmp_device_set_read_block_size_fn);

    device_class_register_property(device_class, PROPERTY_LEOM,
	    PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
	    device_simple_property_get_fn,
	    ndmp_device_set_leom_fn);

}

static void
ndmp_device_init(NdmpDevice *self)
{
    Device *dself = DEVICE(self);
    GValue response;

    /* begin unconnected */
    self->ndmp = NULL;

    /* decent defaults */
    dself->block_size = 32768;
    dself->min_block_size = 32768;
    dself->max_block_size = SIZE_MAX;
    dself->allow_take_scribe_from = FALSE;

    bzero(&response, sizeof(response));

    g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
    g_value_set_enum(&response, CONCURRENCY_PARADIGM_EXCLUSIVE);
    device_set_simple_property(dself, PROPERTY_CONCURRENCY,
	    &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
    g_value_unset(&response);

    g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
    g_value_set_enum(&response, STREAMING_REQUIREMENT_DESIRED);
    device_set_simple_property(dself, PROPERTY_STREAMING,
	    &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
    g_value_unset(&response);

    g_value_init(&response, G_TYPE_BOOLEAN);
    g_value_set_boolean(&response, FALSE);
    device_set_simple_property(dself, PROPERTY_APPENDABLE,
	    &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
    g_value_unset(&response);

    g_value_init(&response, G_TYPE_BOOLEAN);
    g_value_set_boolean(&response, FALSE);
    device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
	    &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
    g_value_unset(&response);

    g_value_init(&response, G_TYPE_BOOLEAN);
    g_value_set_boolean(&response, FALSE);
    device_set_simple_property(dself, PROPERTY_FULL_DELETION,
	    &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
    g_value_unset(&response);

    g_value_init(&response, G_TYPE_BOOLEAN);
    g_value_set_boolean(&response, TRUE);
    device_set_simple_property(dself, PROPERTY_LEOM,
	    &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
    g_value_unset(&response);

    g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
    g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
    device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
	    &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
    g_value_unset(&response);

    self->read_block_size = 0;
    g_value_init(&response, G_TYPE_UINT);
    g_value_set_uint(&response, self->read_block_size);
    device_set_simple_property(dself, PROPERTY_READ_BLOCK_SIZE,
            &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
    g_value_unset(&response);

    g_value_init(&response, G_TYPE_STRING);
    g_value_set_string(&response, "ndmp");
    device_set_simple_property(dself, PROPERTY_NDMP_USERNAME,
	    &response, PROPERTY_SURETY_BAD, PROPERTY_SOURCE_DEFAULT);
    g_value_unset(&response);
    self->ndmp_username = g_strdup("ndmp");

    g_value_init(&response, G_TYPE_STRING);
    g_value_set_string(&response, "ndmp");
    device_set_simple_property(dself, PROPERTY_NDMP_PASSWORD,
	    &response, PROPERTY_SURETY_BAD, PROPERTY_SOURCE_DEFAULT);
    g_value_unset(&response);
    self->ndmp_password = g_strdup("ndmp");

    g_value_init(&response, G_TYPE_STRING);
    g_value_set_string(&response, "md5");
    device_set_simple_property(dself, PROPERTY_NDMP_AUTH,
	    &response, PROPERTY_SURETY_BAD, PROPERTY_SOURCE_DEFAULT);
    g_value_unset(&response);
    self->ndmp_auth = g_strdup("md5");

    g_value_init(&response, G_TYPE_BOOLEAN);
    g_value_set_boolean(&response, FALSE);
    device_set_simple_property(dself, PROPERTY_INDIRECT,
	    &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
    g_value_unset(&response);
    self->indirect = TRUE;

    self->indirecttcp_sock = -1;
}

static GType
ndmp_device_get_type(void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (NdmpDeviceClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) ndmp_device_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (NdmpDevice),
            0 /* n_preallocs */,
            (GInstanceInitFunc) ndmp_device_init,
            NULL
        };

        type = g_type_register_static (TYPE_DEVICE, "NdmpDevice", &info,
                                       (GTypeFlags)0);
    }

    return type;
}

static Device*
ndmp_device_factory(
    char *device_name,
    char *device_type,
    char *device_node)
{
    Device *rval;
    g_assert(g_str_equal(device_type, NDMP_DEVICE_NAME));
    rval = DEVICE(g_object_new(TYPE_NDMP_DEVICE, NULL));

    device_open_device(rval, device_name, device_type, device_node);
    return rval;
}

void
ndmp_device_register(void)
{
    static const char * device_prefix_list[] = { NDMP_DEVICE_NAME, NULL };

    /* register the device itself */
    register_device(ndmp_device_factory, device_prefix_list);

    device_property_fill_and_register(&device_property_ndmp_username,
                                      G_TYPE_STRING, "ndmp_username",
       "Username for access to the NDMP agent");
    device_property_fill_and_register(&device_property_ndmp_password,
                                      G_TYPE_STRING, "ndmp_password",
       "Password for access to the NDMP agent");
    device_property_fill_and_register(&device_property_ndmp_auth,
                                      G_TYPE_STRING, "ndmp_auth",
       "Authentication method for the NDMP agent - md5 (default), text, none, or void");
    device_property_fill_and_register(&device_property_indirect,
                                      G_TYPE_BOOLEAN, "indirect",
       "Use Indirect TCP mode, even if the NDMP server supports "
       "window length 0");
}

/*
 * DirectTCPConnectionNDMP implementation
 */

static char *
directtcp_connection_ndmp_close(DirectTCPConnection *dself)
{
    DirectTCPConnectionNDMP *self = DIRECTTCP_CONNECTION_NDMP(dself);
    char *rv = NULL;
    ndmp9_mover_state state;
    guint64 bytes_moved;
    ndmp9_mover_halt_reason mover_halt_reason;
    gboolean expect_notif = FALSE;

    /* based on the current state, we may need to abort or stop the
     * mover before closing it */
    if (!ndmp_connection_mover_get_state(self->ndmp, &state,
				    &bytes_moved, NULL, NULL)) {
	rv = ndmp_connection_err_msg(self->ndmp);
	goto error;
    }

    switch (state) {
	case NDMP9_MOVER_STATE_HALTED:
	    break; /* nothing to do but ndmp_mover_close, below */

	case NDMP9_MOVER_STATE_PAUSED:
	    if (!ndmp_connection_mover_close(self->ndmp)) {
		rv = ndmp_connection_err_msg(self->ndmp);
		goto error;
	    }
	    expect_notif = TRUE;
	    break;

	case NDMP9_MOVER_STATE_ACTIVE:
	default:
	    if (!ndmp_connection_mover_abort(self->ndmp)) {
		rv = ndmp_connection_err_msg(self->ndmp);
		goto error;
	    }
	    expect_notif = TRUE;
	    break;
    }

    /* the spec isn't entirely clear that mover_close and mover_abort should
     * generate a NOTIF_MOVER_HALTED, but ndmjob does it */
    if (expect_notif) {
	if (!ndmp_connection_wait_for_notify(self->ndmp,
		NULL,
		&mover_halt_reason, /* value is ignored.. */
		NULL, NULL)) {
	    goto error;
	}
    }

    if (!ndmp_connection_mover_stop(self->ndmp)) {
	rv = ndmp_connection_err_msg(self->ndmp);
	goto error;
    }

error:

    /* self->ndmp is always set */
    g_object_unref(self->ndmp);
    self->ndmp = NULL;

    return rv;
}

static void
directtcp_connection_ndmp_class_init(DirectTCPConnectionNDMPClass * c)
{
    DirectTCPConnectionClass *connc = (DirectTCPConnectionClass *)c;

    connc->close = directtcp_connection_ndmp_close;
}

GType
directtcp_connection_ndmp_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (DirectTCPConnectionNDMPClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) directtcp_connection_ndmp_class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (DirectTCPConnectionNDMP),
            0 /* n_preallocs */,
            (GInstanceInitFunc) NULL,
            NULL
        };

        type = g_type_register_static(TYPE_DIRECTTCP_CONNECTION,
                                "DirectTCPConnectionNDMP", &info, (GTypeFlags)0);
    }

    return type;
}

static DirectTCPConnectionNDMP *
directtcp_connection_ndmp_new(
    NDMPConnection *ndmp,
    ndmp9_mover_mode mode)
{
    DirectTCPConnectionNDMP *dcn = DIRECTTCP_CONNECTION_NDMP(
	    g_object_new(TYPE_DIRECTTCP_CONNECTION_NDMP, NULL));

    /* hang onto a copy of this NDMP connection */
    g_object_ref(ndmp);
    dcn->ndmp = ndmp;
    dcn->mode = mode;
    dcn->offset = 0;

    return dcn;
}