Blob Blame History Raw
/*
 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
 * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 *
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
 */

#include "amanda.h"
#include "amxfer.h"
#include "xfer-device.h"
#include "conffile.h"
#include "device.h"

/* A transfer destination that writes an entire dumpfile to one or more files
 * on one or more devices, without any caching.  This destination supports both
 * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
 * splitting (in which rewound parts are read from holding disk). */

/*
 * File Slices - Cache Information
 *
 * The cache_inform implementation adds cache information to a linked list of
 * these objects, in order.  The objects are arranged in a linked list, and
 * describe the files in which the part data is stored.  Note that we assume
 * that slices are added *before* they are needed: the xfer element will fail
 * if it tries to rewind and does not find a suitable slice.
 *
 * The slices should be "fast forwarded" after every part, so that the first
 * byte in part_slices is the first byte of the part; when a retry of a part is
 * required, use the iterator methods to properly open the various files and do
 * the buffering.
 */

typedef struct FileSlice {
    struct FileSlice *next;

    /* fully-qualified filename to read from (or NULL to read from
     * disk_cache_read_fd in XferDestTaperCacher) */
    char *filename;

    /* offset in file to start at */
    guint64 offset;

    /* length of data to read */
    guint64 length;
} FileSlice;

/*
 * Xfer Dest Taper
 */

static GObjectClass *parent_class = NULL;

typedef struct XferDestTaperSplitter {
    XferDestTaper __parent__;

    /* object parameters
     *
     * These values are supplied to the constructor, and can be assumed
     * constant for the lifetime of the element.
     */

    /* Maximum size of each part (bytes) */
    guint64 part_size;

    /* the device's need for streaming (it's assumed that all subsequent devices
     * have the same needs) */
    StreamingRequirement streaming;

    /* block size expected by the target device */
    gsize block_size;

    /* TRUE if this element is expecting slices via cache_inform */
    gboolean expect_cache_inform;

    /* The thread doing the actual writes to tape; this also handles buffering
     * for streaming */
    GThread *device_thread;

    /* Ring Buffer */
    GMutex *ring_mutex;
    GCond *ring_cond;
    mem_ring_t *mem_ring;
    gboolean    ring_ready;

    /* Element State
     *
     * "state" includes all of the variables below (including device
     * parameters).  Note that the device_thread holdes this mutex for the
     * entire duration of writing a part.
     *
     * state_mutex should always be locked before mem_ring->mutex, if both are to be
     * held simultaneously.
     */
    GMutex *state_mutex;
    GCond *state_cond;
    volatile gboolean paused;

    /* The device to write to, and the header to write to it */
    Device *volatile device;
    dumpfile_t *volatile part_header;

    /* bytes to read from cached slices before reading from the ring buffer */
    guint64 bytes_to_read_from_slices;
    guint64 max_memory;

    /* part number in progress */
    volatile guint64 partnum;

    /* status of the last part */
    gboolean last_part_eof;
    gboolean last_part_eom;
    gboolean last_part_successful;

    /* true if the element is done writing to devices */
    gboolean no_more_parts;

    /* total bytes written in the current part */
    volatile guint64 part_bytes_written;

    /* The list of active slices for the current part.  The cache_inform method
     * appends to this list. It is safe to read this linked list, beginning at
     * the head, *if* you can guarantee that slices will not be fast-forwarded
     * in the interim.  The finalize method for this class will take care of
     * freeing any leftover slices. Take the part_slices mutex while modifying
     * the links in this list. */
    FileSlice *part_slices;
    GMutex *part_slices_mutex;

    crc_t crc_before_part;
    int more_space;

} XferDestTaperSplitter;

static DeviceWriteResult retry_write(XferDestTaperSplitter *self, gsize to_write,
				     gpointer buf);

static GType xfer_dest_taper_splitter_get_type(void);
#define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
#define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
#define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
#define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
#define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
#define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)

typedef struct {
    XferDestTaperClass __parent__;

} XferDestTaperSplitterClass;

/*
 * Debug logging
 */

#define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
static void
_xdt_dbg(const char *fmt, ...)
{
    va_list argp;
    char msg[1024];

    arglist_start(argp, fmt);
    g_vsnprintf(msg, sizeof(msg), fmt, argp);
    arglist_end(argp);
    g_debug("XDTS: %s", msg);
}

/* "Fast forward" the slice list by the given length.  This will free any
 * slices that are no longer necessary, and adjust the offset and length of the
 * first remaining slice.  This assumes the state mutex is locked during its
 * operation.
 *
 * @param self: element
 * @param length: number of bytes to fast forward
 */
static void
fast_forward_slices(
	XferDestTaperSplitter *self,
	guint64 length)
{
    FileSlice *slice;

    /* consume slices until we've eaten the whole part */
    g_mutex_lock(self->part_slices_mutex);
    while (length > 0) {
	g_assert(self->part_slices);
	slice = self->part_slices;

	if (slice->length <= length) {
	    length -= slice->length;

	    self->part_slices = slice->next;
	    if (slice->filename)
		g_free(slice->filename);
	    g_free(slice);
	    slice = self->part_slices;
	} else {
	    slice->length -= length;
	    slice->offset += length;
	    break;
	}
    }
    g_mutex_unlock(self->part_slices_mutex);
}

/*
 * Slice Iterator
 */

/* A struct for use in iterating over data in the slices */
typedef struct SliceIterator {
    /* current slice */
    FileSlice *slice;

    /* file descriptor of the current file, or -1 if it's not open yet */
    int cur_fd;

    /* bytes remaining in this slice */
    guint64 slice_remaining;
} SliceIterator;

/* Utility functions for SliceIterator */

/* Begin iterating over slices, starting at the first byte of the first slice.
 * Initializes a pre-allocated SliceIterator.  The caller must ensure that
 * fast_forward_slices is not called while an iteration is in
 * progress.
 */
static void
iterate_slices(
	XferDestTaperSplitter *self,
	SliceIterator *iter)
{
    iter->cur_fd = -1;
    iter->slice_remaining = 0;
    g_mutex_lock(self->part_slices_mutex);
    iter->slice = self->part_slices;
    /* it's safe to unlock this because, at worst, a new entry will
     * be appended while the iterator is in progress */
    g_mutex_unlock(self->part_slices_mutex);
}


/* Get a block of data from the iterator, returning a pointer to a buffer
 * containing the data; the buffer remains the property of the iterator.
 * Returns NULL on error, after calling xfer_cancel_with_error with an
 * appropriate error message.  This function does not block, so it does not
 * check for cancellation.
 */
static gpointer
iterator_get_block(
	XferDestTaperSplitter *self,
	SliceIterator *iter,
	gpointer buf,
	gsize bytes_needed)
{
    gsize buf_offset = 0;
    XferElement *elt = XFER_ELEMENT(self);

    g_assert(iter != NULL);
    g_assert(buf != NULL);

    while (bytes_needed > 0) {
	gsize read_size;
	gsize bytes_read;

	if (iter->cur_fd < 0) {
	    guint64 offset;

	    g_assert(iter->slice != NULL);
	    g_assert(iter->slice->filename != NULL);

	    iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
	    if (iter->cur_fd < 0) {
		xfer_cancel_with_error(elt,
		    _("Could not open '%s' for reading: %s"),
		    iter->slice->filename, strerror(errno));
		return NULL;
	    }

	    iter->slice_remaining = iter->slice->length;
	    offset = iter->slice->offset;

	    if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
		xfer_cancel_with_error(elt,
		    _("Could not seek '%s' for reading: %s"),
		    iter->slice->filename, strerror(errno));
		return NULL;
	    }
	}

	read_size = MIN(iter->slice_remaining, bytes_needed);
	bytes_read = read_fully(iter->cur_fd, (gchar *)buf + buf_offset, read_size,
	    NULL);
	if (bytes_read < read_size) {
	    xfer_cancel_with_error(elt, _("Error reading '%s': %s"),
		iter->slice->filename,
		errno ? strerror(errno) : _("Unexpected EOF"));
	    return NULL;
	}

	iter->slice_remaining -= bytes_read;
	buf_offset += bytes_read;
	bytes_needed -= bytes_read;

	if (iter->slice_remaining <= 0) {
	    if (close(iter->cur_fd) < 0) {
		xfer_cancel_with_error(elt,
		    _("Could not close fd %d: %s"),
		    iter->cur_fd, strerror(errno));
		return NULL;
	    }
	    iter->cur_fd = -1;

	    iter->slice = iter->slice->next;

	    if (elt->cancelled)
		return NULL;
	}
    }

    return buf;
}


/* Free the iterator's resources */
static void
iterator_free(
	SliceIterator *iter)
{
    if (iter->cur_fd >= 0)
	close(iter->cur_fd);
}

/*
 * Device Thread
 */

/* Wait for at least one block, or EOF, to be available in the ring buffer.
 * Called with the ring mutex held. */
static gsize
device_thread_wait_for_block(
    XferDestTaperSplitter *self,
    gboolean *eof_flag)
{
    XferElement *elt = XFER_ELEMENT(self);
    gsize bytes_needed = self->device->block_size;
    gsize usable = 0;

    *eof_flag = FALSE;

    if (self->mem_ring) {
	gsize max_ring_block_size = MAX(self->mem_ring->producer_block_size,
					self->mem_ring->consumer_block_size);

	/* for any kind of streaming, we need to fill the entire buffer before the
	 * first byte */
	if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
	    bytes_needed = self->mem_ring->ring_size - max_ring_block_size;

	while (1) {
	    /* are we ready? */
	    if (elt->cancelled)
		break;

	    usable = self->mem_ring->written - self->mem_ring->readx;
	    *eof_flag = self->mem_ring->eof_flag;
	    if (usable >= bytes_needed) {
		break;
	    }

	    if (self->mem_ring->eof_flag)
		break;

	    /* nope - so wait */
	    g_cond_wait(self->mem_ring->add_cond, self->mem_ring->mutex);

	    /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
	     * we need to wait for the entire buffer to fill */
	    if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
		bytes_needed = self->mem_ring->ring_size - max_ring_block_size;
	}

    } else { // shm_ring
	gsize max_ring_block_size = MAX(elt->shm_ring->mc->producer_block_size,
					elt->shm_ring->mc->consumer_block_size);
	if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
	    bytes_needed = elt->shm_ring->ring_size - max_ring_block_size;

	while (!elt->cancelled &&
	       !elt->shm_ring->mc->cancelled) {

	    if (shm_ring_sem_wait(elt->shm_ring, elt->shm_ring->sem_read) != 0)
		break;
	    usable = elt->shm_ring->mc->written - elt->shm_ring->mc->readx;
	    *eof_flag = elt->shm_ring->mc->eof_flag;

	    if (elt->cancelled || elt->shm_ring->mc->cancelled)
		break;

	    if (usable >= bytes_needed)
		break;

	    if (*eof_flag)
		break;


	    if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
		bytes_needed = elt->shm_ring->ring_size - max_ring_block_size;
	}
	if (elt->shm_ring->mc->cancelled && !elt->cancelled) {
	    xfer_cancel_with_error(elt, "shm_ring_cancelled");
	}
    }
    if (self->part_size)
       usable = MIN(usable, self->part_size - self->part_bytes_written);

    return usable;
}

/* Mark readx bytes as free in the ring buffer.  Called with the ring mutex
 * held. */
static void
device_thread_consume_block(
    XferDestTaperSplitter *self,
    gsize readx)
{
    XferElement *elt = XFER_ELEMENT(self);
    uint64_t read_offset;

    if (self->mem_ring) {
	read_offset = self->mem_ring->read_offset + readx;
	if (read_offset >= self->mem_ring->ring_size)
	    read_offset -= self->mem_ring->ring_size;
	self->mem_ring->readx += readx;
	self->mem_ring->read_offset = read_offset;
	g_cond_broadcast(self->mem_ring->free_cond);
    } else { // shm_ring
	read_offset = elt->shm_ring->mc->read_offset + readx;
	if (read_offset >= elt->shm_ring->ring_size)
	    read_offset -= elt->shm_ring->ring_size;
	elt->shm_ring->mc->readx += readx;
	elt->shm_ring->mc->read_offset = read_offset;
	sem_post(elt->shm_ring->sem_write);
    }
}

/* Write an entire part.  Called with the state_mutex held */
static XMsg *
device_thread_write_part(
    XferDestTaperSplitter *self)
{
    GTimer *timer = g_timer_new();
    XferElement *elt = XFER_ELEMENT(self);

    enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
    int fileno = 0;
    XMsg *msg;
    void *buf;

    self->part_bytes_written = 0;
    self->crc_before_part = elt->crc;

    g_timer_start(timer);

    /* write the header; if this fails or hits LEOM, we consider this a
     * successful 0-byte part */
    if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
	part_status = PART_LEOM;
	goto part_done;
    }

    fileno = self->device->file;
    g_assert(fileno > 0);

    /* free the header, now that it's written */
    dumpfile_free(self->part_header);
    self->part_header = NULL;

    /* First, read the requisite number of bytes from the part_slices, if the part was
     * unsuccessful. */
    if (self->bytes_to_read_from_slices) {
	SliceIterator iter;
	gsize to_write = self->block_size;
	gpointer buf = g_malloc(to_write);
	gboolean successful = TRUE;
	guint64 bytes_from_slices = self->bytes_to_read_from_slices;

	DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);

	iterate_slices(self, &iter);
	while (bytes_from_slices) {
	    DeviceWriteResult ok;

	    if (!iterator_get_block(self, &iter, buf, to_write)) {
		part_status = PART_FAILED;
		successful = FALSE;
		break;
	    }

	    /* note that it's OK to reference these ring_* vars here, as they
	     * are static at this point */
	    ok = device_write_block(self->device, (guint)to_write, buf);
	    DBG(8, "writing %ju bytes from slice to device", (uintmax_t)to_write);

	    if (ok == WRITE_SPACE)
		ok = retry_write(self, to_write, buf);

	    if (ok == WRITE_FAILED) {
		part_status = PART_FAILED;
		successful = FALSE;
		break;
	    } else if (ok == WRITE_FULL) {
		part_status = PART_EOP;
		successful = FALSE;
		break;
	    } else if (ok == WRITE_SPACE) {
		part_status = PART_EOP;
		successful = FALSE;
		break;
	    }

	    self->part_bytes_written += to_write;
	    bytes_from_slices -= to_write;
	    crc32_add((uint8_t *)buf, to_write, &elt->crc);

	    if (self->part_size && self->part_bytes_written >= self->part_size) {
		part_status = PART_EOP;
		successful = FALSE;
		break;
	    } else if (self->device->is_eom) {
		part_status = PART_LEOM;
		successful = FALSE;
		break;
	    }
	}

	iterator_free(&iter);
	g_free(buf);

	/* if we didn't finish, get out of here now */
	if (!successful)
	    goto part_done;
    }

    if (self->mem_ring)
	g_mutex_lock(self->mem_ring->mutex);
    while (!elt->cancelled &&
	   (!elt->shm_ring || !elt->shm_ring->mc->cancelled)) {
	DeviceWriteResult ok;
	gboolean eof_flag;

	/* wait for at least one block, and (if necessary) prebuffer */
	gsize to_writeX = device_thread_wait_for_block(self, &eof_flag);
	if (elt->cancelled || (elt->shm_ring && elt->shm_ring->mc->cancelled))
	    break;

	while (to_writeX >= self->device->block_size || eof_flag) {
	    //crc_t block_crc;
	    gsize to_write = MIN(to_writeX, self->device->block_size);
	    if (elt->cancelled)
		goto part_done_unlock;

	    if (to_write == 0) {
		part_status = PART_EOF;
		goto part_done_unlock;
	    }

	    if (self->mem_ring)
		g_mutex_unlock(self->mem_ring->mutex);
	    DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);

	    /* note that it's OK to reference these ring_* vars here, as they
	     * are static at this point */
	    if (self->mem_ring) {
		buf = self->mem_ring->buffer + self->mem_ring->read_offset;
	    } else {
		buf = elt->shm_ring->data + elt->shm_ring->mc->read_offset;
	    }
	    ok = device_write_block(self->device, (guint)to_write, buf);

	    if (ok == WRITE_SPACE)
	    ok = retry_write(self, to_write, buf);

	    if (self->mem_ring)
		g_mutex_lock(self->mem_ring->mutex);

	    if (ok == WRITE_FAILED) {
		part_status = PART_FAILED;
		goto part_done_unlock;
	    } else if (ok == WRITE_FULL) {
		part_status = PART_EOP;
		goto part_done_unlock;
	    } else if (ok == WRITE_SPACE) {
		part_status = PART_EOP;
		goto part_done_unlock;
	    }

	    crc32_add((uint8_t *)(buf),
			 to_write, &elt->crc);
	    self->part_bytes_written += to_write;
	    device_thread_consume_block(self, to_write);

	    if (self->part_size && self->part_bytes_written >= self->part_size) {
		part_status = PART_EOP;
		goto part_done_unlock;
	    } else if (self->device->is_eom) {
		part_status = PART_LEOM;
		goto part_done_unlock;
	    }
	    to_writeX -= to_write;
	}
    }
part_done_unlock:
    if (self->mem_ring)
	g_mutex_unlock(self->mem_ring->mutex);

part_done:
    if (elt->shm_ring) {
	if (elt->cancelled) {
	    if (elt->shm_ring) {
		g_debug("device_thread_write_part: cancelling shm-ring because xfer is cancelled");
		elt->shm_ring->mc->cancelled = TRUE;
	    }
	} else if (elt->shm_ring->mc->cancelled) {
	    part_status = PART_FAILED;
	    xfer_cancel_with_error(elt, "shm_ring cancelled");
	}
	sem_post(elt->shm_ring->sem_read);
	sem_post(elt->shm_ring->sem_read);
	sem_post(elt->shm_ring->sem_read);
	sem_post(elt->shm_ring->sem_write);
    }
    /* if we write all of the blocks, but the finish_file fails, then likely
     * there was some buffering going on in the device driver, and the blocks
     * did not all make it to permanent storage -- so it's a failed part.  Note
     * that we try to finish_file even if the part failed, just to be thorough. */
    if (self->device->in_file) {
	if (!device_finish_file(self->device)) {
	    if (!elt->cancelled) {
		part_status = PART_FAILED;
	    }
	}
    }

    g_timer_stop(timer);
    if (part_status == PART_FAILED) {
	elt->crc = self->crc_before_part;
    }

    msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
    msg->size = self->part_bytes_written;
    msg->duration = g_timer_elapsed(timer, NULL);
    msg->partnum = self->partnum;
    msg->fileno = fileno;
    msg->successful = self->last_part_successful = part_status != PART_FAILED;
    msg->eom = self->last_part_eom = part_status == PART_LEOM || self->device->is_eom;
    msg->eof = self->last_part_eof = part_status == PART_EOF;

    /* time runs backward on some test boxes, so make sure this is positive */
    if (msg->duration < 0) msg->duration = 0;

    if (msg->successful && msg->size > 0)
	self->partnum++;
    //self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
    self->no_more_parts = msg->eof;


    g_timer_destroy(timer);

    return msg;
}

static gpointer
device_thread(
    gpointer data)
{
    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
    XferElement *elt = XFER_ELEMENT(self);
    XMsg *msg;

    DBG(1, "(this is the device thread)");

    if (elt->input_mech == XFER_MECH_PUSH_BUFFER) {
	self->mem_ring = create_mem_ring();
	init_mem_ring(self->mem_ring, self->max_memory, self->device->block_size);
    } else if (elt->input_mech == XFER_MECH_MEM_RING) {
	self->mem_ring = xfer_element_get_mem_ring(elt->upstream);
	mem_ring_consumer_set_size(self->mem_ring, self->max_memory, self->device->block_size);
    } else if (elt->input_mech == XFER_MECH_SHM_RING) {
	shm_ring_consumer_set_size(elt->shm_ring, self->max_memory, self->device->block_size);
	if (elt->input_mech == XFER_MECH_SHM_RING &&
	    elt->shm_ring->mc->cancelled) {
	    g_debug("A shm_ring is cancelled");
	    if (!elt->cancelled) {
		xfer_cancel_with_error(XFER_ELEMENT(self)->upstream,
			_("failed to set shm-ring"));
	    }
	    goto device_thread_done;
	}
    }
    crc32_init(&elt->crc);

    /* This is the outer loop, that loops once for each split part written to
     * tape. */
    g_mutex_lock(self->ring_mutex);
    self->ring_ready = TRUE;
    g_cond_broadcast(self->ring_cond);
    g_mutex_unlock(self->ring_mutex);
    g_mutex_lock(self->state_mutex);
    while (1) {
	/* wait until the main thread un-pauses us, and check that we have
	 * the relevant device info available (block_size) */
	while (self->paused && !elt->cancelled) {
	    DBG(9, "device_thread waiting to be unpaused");
	    g_cond_wait(self->state_cond, self->state_mutex);
	}
	DBG(9, "device_thread done waiting");

        if (elt->cancelled)
	    break;

	if (elt->input_mech == XFER_MECH_SHM_RING &&
	    elt->shm_ring->mc->cancelled) {
	    g_debug("B shm_ring is cancelled");
	    if (!elt->cancelled) {
		xfer_cancel_with_error(XFER_ELEMENT(self)->upstream,
		    _("shm-ring cancelled"));
	    }
	    break;
	}

	DBG(2, "device_thread beginning to write part");
	msg = device_thread_write_part(self);
	DBG(2, "device_thread done writing part");

	if (!msg) /* cancelled */
	    break;

	/* release the slices for this part, if there were any slices */
	if (msg->successful && self->expect_cache_inform) {
	    fast_forward_slices(self, msg->size);
	}

	xfer_queue_message(elt->xfer, msg);

	/* pause ourselves and await instructions from the main thread */
	self->paused = TRUE;

	/* if this is the last part, we're done with the part loop */
	if (self->no_more_parts)
	    break;
    }
    g_mutex_unlock(self->state_mutex);

    // notify the producer that everythinng is read
    if (elt->input_mech == XFER_MECH_SHM_RING) {
	sem_post(elt->shm_ring->sem_write);
    }

    g_debug("device_thread sending XMSG_CRC message");
    DBG(2, "xfer-dest-taper-splitter CRC: %08x      size %lld",
	   crc32_finish(&elt->crc), (long long)elt->crc.size);
    msg = xmsg_new(XFER_ELEMENT(self), XMSG_CRC, 0);
    msg->crc = crc32_finish(&elt->crc);
    msg->size = elt->crc.size;
    xfer_queue_message(elt->xfer, msg);

device_thread_done:
    /* tell the main thread we're done */
    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

/*
 * Class mechanics
 */

static void
push_buffer_impl(
    XferElement *elt,
    gpointer buf,
    size_t size)
{
    XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
    gchar *p = buf;

    DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);

    /* do nothing if cancelled */
    if (G_UNLIKELY(elt->cancelled)) {
        goto free_and_finish;
    }

    if (G_UNLIKELY(!self->ring_ready)) {
	g_mutex_lock(self->ring_mutex);
	while (!self->ring_ready && !elt->cancelled) {
	    g_cond_wait(self->ring_cond, self->ring_mutex);
	}
	if (elt->cancelled)
	    goto unlock_and_free_and_finish;
	g_mutex_unlock(self->ring_mutex);
    }

    /* handle EOF */
    if (G_UNLIKELY(buf == NULL)) {
	/* indicate EOF to the device thread */
	g_mutex_lock(self->mem_ring->mutex);
	self->mem_ring->eof_flag = TRUE;
	g_cond_broadcast(self->mem_ring->add_cond);
	g_mutex_unlock(self->mem_ring->mutex);
	goto free_and_finish;
    }

    /* push the block into the ring buffer, in pieces if necessary */
    g_mutex_lock(self->mem_ring->mutex);
    while (size > 0) {
	gsize avail;

	/* wait for some space */
	while (self->mem_ring->written - self->mem_ring->readx == self->mem_ring->ring_size && !elt->cancelled) {
	    DBG(9, "push_buffer waiting for any space to buffer pushed data");
	    g_cond_wait(self->mem_ring->free_cond, self->mem_ring->mutex);
	}
	DBG(9, "push_buffer done waiting");

	if (elt->cancelled)
	    goto unlock_and_free_and_finish;

	/* only copy to the end of the buffer, if the available space wraps
	 * around to the beginning */
	avail = MIN(size, self->mem_ring->ring_size - (self->mem_ring->written - self->mem_ring->readx));
	avail = MIN(avail, self->mem_ring->ring_size - self->mem_ring->write_offset);

	/* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
	memmove(self->mem_ring->buffer + self->mem_ring->write_offset, p, avail);

	/* reset the ring variables to represent this state */
	self->mem_ring->written += avail;
	self->mem_ring->write_offset += avail; /* will, at most, hit mem_ring->ring_size */
	if (self->mem_ring->write_offset == self->mem_ring->ring_size)
	    self->mem_ring->write_offset = 0;
	p = (gpointer)((guchar *)p + avail);
	size -= avail;

	/* and give the device thread a notice that data is ready */
	g_cond_broadcast(self->mem_ring->add_cond);
    }

unlock_and_free_and_finish:
    g_mutex_unlock(self->mem_ring->mutex);

free_and_finish:
    if (buf)
        g_free(buf);
}

/*
 * Element mechanics
 */

static gboolean
start_impl(
    XferElement *elt)
{
    XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
    GError *error = NULL;

    self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
    if (!self->device_thread) {
        g_critical(_("Error creating new thread: %s (%s)"),
            error->message, errno? strerror(errno) : _("no error code"));
    }

    return TRUE;
}

static gboolean
cancel_impl(
    XferElement *elt,
    gboolean expect_eof)
{
    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
    gboolean rv;

    /* chain up first */
    rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);

    /* then signal all of our condition variables, so that threads waiting on them
     * wake up and see elt->cancelled. */
    g_mutex_lock(self->ring_mutex);
    g_cond_broadcast(self->ring_cond);
    g_mutex_unlock(self->ring_mutex);

    if (elt->shm_ring && !elt->shm_ring->mc->cancelled) {
	if (!elt->shm_ring->mc->cancelled) {
	    g_debug("XDTS:cancel_impl: cancelling shm-ring because xfer is cancelled");
	    elt->shm_ring->mc->cancelled = TRUE;
	}
	sem_post(elt->shm_ring->sem_ready);
	sem_post(elt->shm_ring->sem_start);
	sem_post(elt->shm_ring->sem_read);
	sem_post(elt->shm_ring->sem_write);
    }
    if (self->mem_ring) {
	g_mutex_lock(self->mem_ring->mutex);
	self->mem_ring->eof_flag = TRUE;
	g_cond_broadcast(self->mem_ring->add_cond);
	g_cond_broadcast(self->mem_ring->free_cond);
	g_mutex_unlock(self->mem_ring->mutex);
    }

    g_mutex_lock(self->state_mutex);
    g_cond_broadcast(self->state_cond);
    g_mutex_unlock(self->state_mutex);

    return rv;
}

static void
start_part_impl(
    XferDestTaper *xdt,
    gboolean retry_part,
    dumpfile_t *header)
{
    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
    XferElement *elt = XFER_ELEMENT(self);

    g_assert(self->device != NULL);
    g_assert(!self->device->in_file);
    g_assert(header != NULL);

    DBG(1, "start_part() start_part_impl");

    /* we can only retry the part if we're getting slices via cache_inform's */
    if (retry_part) {
	if (self->last_part_successful) {
	    xfer_cancel_with_error(XFER_ELEMENT(self),
		_("Previous part did not fail; cannot retry"));
	    if (elt->shm_ring && !elt->shm_ring->mc->cancelled) {
		elt->shm_ring->mc->cancelled = TRUE;
		sem_post(elt->shm_ring->sem_ready);
		sem_post(elt->shm_ring->sem_start);
		sem_post(elt->shm_ring->sem_read);
		sem_post(elt->shm_ring->sem_write);
	    }
	    return;
	}

	if (!self->expect_cache_inform) {
	    xfer_cancel_with_error(XFER_ELEMENT(self),
		_("No cache for previous failed part; cannot retry"));
	    if (elt->shm_ring && !elt->shm_ring->mc->cancelled) {
		elt->shm_ring->mc->cancelled = TRUE;
		sem_post(elt->shm_ring->sem_ready);
		sem_post(elt->shm_ring->sem_start);
		sem_post(elt->shm_ring->sem_read);
		sem_post(elt->shm_ring->sem_write);
	    }
	    return;
	}

	self->bytes_to_read_from_slices = self->part_bytes_written;
    } else {
	/* don't read any bytes from the slices, since we're not retrying */
	self->bytes_to_read_from_slices = 0;
    }

    g_mutex_lock(self->state_mutex);
    g_assert(self->paused);
    g_assert(!self->no_more_parts);

    if (self->part_header)
	dumpfile_free(self->part_header);
    self->part_header = dumpfile_copy(header);

    DBG(1, "unpausing");
    self->paused = FALSE;
    g_cond_broadcast(self->state_cond);

    g_mutex_unlock(self->state_mutex);
}

static void
use_device_impl(
    XferDestTaper *xdtself,
    Device *device)
{
    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
    StreamingRequirement newstreaming;
    GValue val;

    DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");

    /* short-circuit if nothing is changing */
    if (self->device == device)
	return;

    g_mutex_lock(self->state_mutex);
    if (self->device)
	g_object_unref(self->device);
    self->device = device;
    g_object_ref(device);

    /* get this new device's streaming requirements */
    bzero(&val, sizeof(val));
    if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
        || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
        g_warning("Couldn't get streaming type for %s", self->device->device_name);
    } else {
        newstreaming = g_value_get_enum(&val);
	if (newstreaming != self->streaming)
	    g_warning("New device has different streaming requirements from the original; "
		    "ignoring new requirement");
    }
    g_value_unset(&val);

    /* check that the blocksize hasn't changed */
    if (self->block_size != device->block_size) {
        g_mutex_unlock(self->state_mutex);
        xfer_cancel_with_error(XFER_ELEMENT(self),
            _("All devices used by the taper must have the same block size"));
        return;
    }
    g_mutex_unlock(self->state_mutex);
}

static void
cache_inform_impl(
    XferDestTaper *xdt,
    const char *filename,
    off_t offset,
    off_t length)
{
    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
    FileSlice *slice = g_new(FileSlice, 1), *iter;

    slice->next = NULL;
    slice->filename = g_strdup(filename);
    slice->offset = offset;
    slice->length = length;

    g_mutex_lock(self->part_slices_mutex);
    if (self->part_slices) {
	for (iter = self->part_slices; iter->next; iter = iter->next) {}
	iter->next = slice;
    } else {
	self->part_slices = slice;
    }
    g_mutex_unlock(self->part_slices_mutex);
}

static guint64
get_part_bytes_written_impl(
    XferDestTaper *xdtself)
{
    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);

    /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
     * partial write to the 64-bit value on a 32-bit system).  This is ok for
     * the moment, as it's only informational, but be warned. */
    if (self->device) {
	return device_get_bytes_written(self->device);
    } else {
	return self->part_bytes_written;
    }
}

static void
new_space_available_impl(
    XferDestTaper *xdtself,
    int            made_space)
{
    XferDestTaperSplitter *self;

    self = XFER_DEST_TAPER_SPLITTER(xdtself);

    self->more_space = made_space;
    g_mutex_lock(self->state_mutex);
    device_reset(self->device);
    g_cond_broadcast(self->state_cond);
    g_mutex_unlock(self->state_mutex);
}
static void
instance_init(
    XferElement *elt)
{
    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
    elt->can_generate_eof = FALSE;

    self->ring_mutex = g_mutex_new();
    self->ring_cond = g_cond_new();
    self->state_mutex = g_mutex_new();
    self->state_cond = g_cond_new();
    self->part_slices_mutex = g_mutex_new();

    self->device = NULL;
    self->paused = TRUE;
    self->part_header = NULL;
    self->partnum = 1;
    self->part_bytes_written = 0;
    self->part_slices = NULL;
    crc32_init(&elt->crc);
}

static gboolean
setup_impl(
    XferElement *elt)
{
    if (elt->input_mech == XFER_MECH_SHM_RING) {
	elt->shm_ring = shm_ring_create(NULL);
    }

    return TRUE;
}

static void
finalize_impl(
    GObject * obj_self)
{
    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
    XferElement *elt = XFER_ELEMENT(self);
    FileSlice *slice, *next_slice;

    g_mutex_free(self->ring_mutex);
    g_cond_free(self->ring_cond);
    g_mutex_free(self->state_mutex);
    g_cond_free(self->state_cond);

    if (self->mem_ring) {
	g_mutex_free(self->mem_ring->mutex);
	g_cond_free(self->mem_ring->add_cond);
	g_cond_free(self->mem_ring->free_cond);
    }

    if (elt->shm_ring) {
	close_consumer_shm_ring(elt->shm_ring);
	elt->shm_ring = NULL;
    }

    g_mutex_free(self->part_slices_mutex);

    for (slice = self->part_slices; slice; slice = next_slice) {
	next_slice = slice->next;
	if (slice->filename)
	    g_free(slice->filename);
	g_free(slice);
    }

    if (self->mem_ring && self->mem_ring->buffer)
	g_free(self->mem_ring->buffer);

    if (self->part_header)
	dumpfile_free(self->part_header);

    if (self->device)
	g_object_unref(self->device);

    /* chain up */
    G_OBJECT_CLASS(parent_class)->finalize(obj_self);
}

static void
class_init(
    XferDestTaperSplitterClass * selfc)
{
    XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
    XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
    GObjectClass *goc = G_OBJECT_CLASS(selfc);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_MEM_RING, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(2), XFER_NALLOC(1) },
	{ XFER_MECH_SHM_RING, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
    };

    klass->setup = setup_impl;
    klass->start = start_impl;
    klass->cancel = cancel_impl;
    klass->push_buffer = push_buffer_impl;
    xdt_klass->start_part = start_part_impl;
    xdt_klass->use_device = use_device_impl;
    xdt_klass->cache_inform = cache_inform_impl;
    xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
    xdt_klass->new_space_available = new_space_available_impl;
    goc->finalize = finalize_impl;

    klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
    klass->mech_pairs = mech_pairs;

    parent_class = g_type_class_peek_parent(selfc);
}

static GType
xfer_dest_taper_splitter_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestTaperSplitterClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestTaperSplitter),
            0 /* n_preallocs */,
            (GInstanceInitFunc) instance_init,
            NULL
        };

        type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
    }

    return type;
}

/*
 * Constructor
 */

XferElement *
xfer_dest_taper_splitter(
    Device *first_device,
    size_t max_memory,
    guint64 part_size,
    gboolean expect_cache_inform)
{
    XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
    GValue val;

    /* max_memory and part_size get rounded up to the next multiple of
     * block_size */
    max_memory = ((max_memory + first_device->block_size - 1)
			/ first_device->block_size) * first_device->block_size;
    if (part_size)
	part_size = ((part_size + first_device->block_size - 1)
			    / first_device->block_size) * first_device->block_size;

    self->part_size = part_size;
    self->partnum = 1;
    self->device = first_device;

    g_object_ref(self->device);
    self->block_size = first_device->block_size;
    self->paused = TRUE;
    self->no_more_parts = FALSE;

    self->max_memory = max_memory;

    /* get this new device's streaming requirements */
    bzero(&val, sizeof(val));
    if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
        || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
        g_warning("Couldn't get streaming type for %s", self->device->device_name);
        self->streaming = STREAMING_REQUIREMENT_REQUIRED;
    } else {
        self->streaming = g_value_get_enum(&val);
    }
    g_value_unset(&val);

    /* grab data from cache_inform, just in case we hit PEOM */
    self->expect_cache_inform = expect_cache_inform;

    return XFER_ELEMENT(self);
}

static DeviceWriteResult
retry_write(
    XferDestTaperSplitter *self,
    gsize to_write,
    gpointer buf)
{
    XferElement *elt = XFER_ELEMENT(self);
    XMsg *msg;
    DeviceWriteResult result;

    msg = xmsg_new(XFER_ELEMENT(self), XMSG_NO_SPACE, 0);
    xfer_queue_message(elt->xfer, msg);

    while(1) {
	self->more_space = -1;
	while (self->more_space == -1 && !elt->cancelled) {
	    g_cond_wait(self->state_cond, self->state_mutex);
	}

	if (elt->cancelled) {
	    return WRITE_FAILED;
	}

	result = device_write_block(self->device, (guint)to_write, buf);
	if (result != WRITE_SPACE) {
	    return result;   /* WRITE_SUCCEED, WRITE_FAILED or WRITE_FULL */
	}
	if (self->more_space == 0) {
	    return result;   /* WRITE_SPACE */
	}
    }
}