Blob Blame History Raw
/*
 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
 * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 *
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
 */

#include "amanda.h"
#include "fileheader.h"
#include "amxfer.h"
#include "xfer-server.h"
#include "conffile.h"
#include "holding.h"
#include "mem-ring.h"

/* A transfer destination that writes an entire dumpfile to one or more files
 * on one or more holding disks */

/* installcheck will fail if HOLDING_BLOCK_BYTES is increased because the
 * fake ENOSPC will no be on the same byte */
#define HEADER_BLOCK_BYTES  DISK_BLOCK_BYTES
#define HOLDING_BLOCK_BYTES DISK_BLOCK_BYTES

/*
 * Xfer Dest Holding
 */

static GObjectClass *parent_class = NULL;

typedef struct XferDestHolding {
    XferElement __parent__;

    /* object parameters
     *
     * These values are supplied to the constructor, and can be assumed
     * constant for the lifetime of the element.
     */

    char       *first_filename;
    /* The thread doing the actual writes to tape; this also handles buffering
     * for streaming */
    GThread *holding_thread;

    /* Ring Buffer
     *
     * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
     * blocksize), and serves as the interface between the holding_thread and
     * the thread calling push_buffer.  Ring_length is the total length of the
     * buffer in bytes, while ring_count is the number of data bytes currently
     * in the buffer.  The mem_ring->add_cond is signalled when data is added to the
     * buffer, while mem_ring->free_cond is signalled when data is removed.  Both
     * are governed by mem_ring->mutex, and both are signalled when the transfer is
     * cancelled.
     */

    mem_ring_t *mem_ring;

    /* Element State
     *
     * "state" includes all of the variables below (including holding
     * parameters).  Note that the holding_thread holdes this mutex for the
     * entire duration of writing a chunk.
     *
     * state_mutex should always be locked before mem_ring->mutex, if both are to be
     * held simultaneously.
     */
    GMutex     *state_mutex;
    GCond      *state_cond;
    gboolean    paused;
    char       *filename;
    char       *new_filename;
    dumpfile_t *chunk_header;
    int         fd;
    guint64     use_bytes;	      /* bytes we can still write to the chunk file */
    guint64     data_bytes_written;   /* bytes written in the current call */
    guint64     header_bytes_written;
    guint64     chunk_offset;         /* bytes written to the current */
				      /* chunk, including header      */

    enum { CHUNK_OK	 = 0,		/* */
	   CHUNK_EOF	 = 1,		/* we read the complete input */
	   CHUNK_EOC	 = 2,		/* we are not allowed to write more bytes tothe chunk file */
	   CHUNK_NO_ROOM = 4,		/* the last write failed with ENOSPC */
	   CHUNK_FAILED  = 8		/* any other error */
    } chunk_status;
} XferDestHolding;

static GType xfer_dest_holding_get_type(void);
#define XFER_DEST_HOLDING_TYPE (xfer_dest_holding_get_type())
#define XFER_DEST_HOLDING(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_holding_get_type(), XferDestHolding)
#define XFER_DEST_HOLDING_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_holding_get_type(), XferDestHolding const)
#define XFER_DEST_HOLDING_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_holding_get_type(), XferDestHoldingClass)
#define IS_XFER_DEST_HOLDING(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_holding_get_type ())
#define XFER_DEST_HOLDING_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_holding_get_type(), XferDestHoldingClass)

typedef struct {
    XferElementClass __parent__;

    void (*start_chunk)(XferDestHolding *self, dumpfile_t *chunk_header, char *filename, guint64 use_bytes);
    char * (*finish_chunk)(XferDestHolding *self);
    guint64 (*get_chunk_bytes_written)(XferDestHolding *self);

} XferDestHoldingClass;

/* local functions */
static int close_chunk(XferDestHolding *xdh, char *cont_filename, char **mesg);
static ssize_t write_header(XferDestHolding *xdh, int fd);
static size_t full_write_with_fake_enospc(int fd, const void *buf, size_t count);

/* we use a function pointer for full_write, so that we can "shim" in
 * full_write_with_fake_enospc for testing
 */
size_t (*db_full_write)(int fd, const void *buf, size_t count);
static off_t fake_enospc_at_byte = -1;

/*
 * Debug logging
 */

#define DBG(LEVEL, ...) if (debug_chunker >= LEVEL) { _xdh_dbg(__VA_ARGS__); }
static void
_xdh_dbg(const char *fmt, ...)
{
    va_list argp;
    gchar *msg;

    arglist_start(argp, fmt);
    msg = g_strdup_vprintf(fmt, argp);
    arglist_end(argp);
    g_debug("XDH: %s", msg);
    g_free(msg);
}


/*
 * Holding Thread
 */

/* Wait for at least one block, or EOF, to be available in the ring buffer.
 * Called with the ring mutex held. */
static gsize
holding_thread_wait_for_block(
    XferDestHolding *self)
{
    XferElement *elt = XFER_ELEMENT(self);
    gsize bytes_needed = HOLDING_BLOCK_BYTES;
    gsize usable;

    while (1) {
	/* are we ready? */
	if (elt->cancelled)
	    break;

	if (self->mem_ring->written - self->mem_ring->readx > bytes_needed)
	    break;

	if (self->mem_ring->eof_flag)
	    break;

	/* nope - so wait */
	g_cond_wait(self->mem_ring->add_cond, self->mem_ring->mutex);
    }

    usable = MIN(self->mem_ring->written - self->mem_ring->readx, bytes_needed);

    return usable;
}

/* Mark WRITTEN bytes as free in the ring buffer.  Called with the ring mutex
 * held. */
static void
holding_thread_consume_block(
    XferDestHolding *self,
    gsize written)
{
    self->mem_ring->readx += written;
    self->mem_ring->read_offset += written;
    if (self->mem_ring->read_offset >= self->mem_ring->ring_size)
	self->mem_ring->read_offset -= self->mem_ring->ring_size;
    g_cond_broadcast(self->mem_ring->free_cond);
}

#ifdef FAILURE_CODE
static int port_open_header = -1;
static int port_write_header = -1;
static int port_write_data = -1;
static int shm_open_header = -1;
static int shm_write_header = -1;
static int shm_write_data = -1;
static int close_chunk_close = -1;
#endif

/* Write an entire chunk.  Called with the state_mutex held */
static gboolean
holding_thread_write_chunk(
    XferDestHolding  *self,
    char            **mesg)
{
    XferElement *elt = XFER_ELEMENT(self);

    self->chunk_status = CHUNK_OK;

    g_mutex_lock(self->mem_ring->mutex);
    while (1) {
	gsize to_write;
	size_t count;

	/* wait for at least one block, and (if necessary) prebuffer */
	to_write = holding_thread_wait_for_block(self);
	to_write = MIN(to_write, HOLDING_BLOCK_BYTES);
	if (elt->cancelled)
	    break;
	if (to_write == 0) {
	    self->chunk_status = CHUNK_EOF;
	    break;
	}
	if (self->chunk_status == CHUNK_EOC) {
	    break;
	}
	to_write = MIN(to_write, self->use_bytes);

	DBG(8, "writing %ju bytes to holding", (uintmax_t)to_write);

	/* note that it's OK to reference these ring_* vars here, as they
	 * are static at this point */
	g_mutex_unlock(self->mem_ring->mutex);
#ifdef FAILURE_CODE
	{
	    if (port_write_data == -1) {
		char *A = getenv("XFER_DEST_HOLDING_PORT_WRITE_DATA");
		if (A) {
		    port_write_data = atoi(A);
		}
	    }
	    if (port_write_data == 0) {
		count = 0;
		errno = ENOSPC;
		goto failure_port_write_data;
	    }
	    port_write_data--;
	}
#endif
	count = db_full_write(self->fd, self->mem_ring->buffer + self->mem_ring->read_offset,
			      (guint)to_write);
#ifdef FAILURE_CODE
failure_port_write_data:
#endif
	g_mutex_lock(self->mem_ring->mutex);

	if (count != to_write) {
	    amfree(*mesg);
	    *mesg = g_strdup_printf("Failed to write data to holding file '%s.tmp': %s", self->filename, strerror(errno));
	    if (count > 0) {
		if (ftruncate(self->fd, self->chunk_offset) != 0) {
		    g_debug("ftruncate failed: %s", strerror(errno));
		    g_mutex_unlock(self->mem_ring->mutex);
		    return FALSE;
		}
	    }
	    self->chunk_status = CHUNK_NO_ROOM;
	    break;
	}
	crc32_add((uint8_t *)(self->mem_ring->buffer + self->mem_ring->read_offset),
			 to_write, &elt->crc);
	self->chunk_offset += count;

	self->data_bytes_written += count;
	self->use_bytes -= count;
	holding_thread_consume_block(self, count);

	if (self->use_bytes <= 0) {
	    self->chunk_status = CHUNK_EOC;
	    /* loop to see if more data is available
	     * chunk_status might become CHUNK_EOF if at end of input file
	     */
	}
    }
    g_mutex_unlock(self->mem_ring->mutex);

    /* if we write all of the blocks, but the finish_file fails, then likely
     * there was some buffering going on in the holding driver, and the blocks
     * did not all make it to permanent storage -- so it's a failed part.  Note
     * that we try to finish_file even if the part failed, just to be thorough.
     */
    if (elt->cancelled) {
	return FALSE;
    }

    return TRUE;
}

static gpointer
holding_thread(
    gpointer data)
{
    XferDestHolding *self = XFER_DEST_HOLDING(data);
    XferElement *elt = XFER_ELEMENT(self);
    XMsg *msg;
    gchar *mesg = NULL;
    GTimer *timer = g_timer_new();

    DBG(1, "(this is the holding thread)");

    self->mem_ring = xfer_element_get_mem_ring(elt->upstream);
    mem_ring_consumer_set_size(self->mem_ring, HOLDING_BLOCK_BYTES*32, HOLDING_BLOCK_BYTES);

    /* This is the outer loop, that loops once for each holding file or
     * CONTINUE command */
    g_mutex_lock(self->state_mutex);
    while (1) {
	gboolean done;
	/* wait until the main thread un-pauses us, and check that we have
	 * the relevant holding info available */
	while (self->paused && !elt->cancelled) {
	    DBG(9, "waiting to be unpaused");
	    g_cond_wait(self->state_cond, self->state_mutex);
	}
	DBG(9, "holding_thread done waiting");

        if (elt->cancelled)
	    break;

	self->data_bytes_written = 0;
	self->header_bytes_written = 0;

	/* new holding file */
	if (self->filename == NULL ||
	    strcmp(self->filename, self->new_filename) != 0) {
	    char    *tmp_filename;
	    int      fd;
	    ssize_t  write_header_size;

	    if (self->use_bytes < HEADER_BLOCK_BYTES) {
		self->chunk_status = CHUNK_NO_ROOM;
		goto no_room;
	    }

	    tmp_filename = g_strjoin(NULL, self->new_filename, ".tmp", NULL);

#ifdef FAILURE_CODE
	    {
		if (port_open_header == -1) {
		    char *A = getenv("XFER_DEST_HOLDING_PORT_OPEN_HEADER");
		    if (A) {
			port_open_header = atoi(A);
		    }
		}
		if (port_open_header == 0) {
		    fd = -1;
		    errno = ENOSPC;
		    goto failure_port_open_header;
		}
		port_open_header--;
	    }
#endif
	    fd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600);
#ifdef FAILURE_CODE
failure_port_open_header:
#endif
	    if (fd < 0) {
		self->chunk_status = CHUNK_NO_ROOM;
		g_free(mesg);
		mesg = g_strdup_printf("Failed to open holding file '%s': %s",
				       tmp_filename, strerror(errno));
		g_free(tmp_filename);
		goto no_room;
	    }
	    if (self->filename == NULL) {
		self->chunk_header->type = F_DUMPFILE;
	    } else {
		self->chunk_header->type = F_CONT_DUMPFILE;
	    }
	    self->chunk_header->cont_filename[0] = '\0';

#ifdef FAILURE_CODE
	    {
		if (port_write_header == -1) {
		    char *A = getenv("XFER_DEST_HOLDING_PORT_WRITE_HEADER");
		    if (A) {
			port_write_header = atoi(A);
		    }
		}
		if (port_write_header == 0) {
		    write_header_size = 0;
		    errno = ENOSPC;
		    goto failure_port_write_header;
		}
		port_write_header--;
	    }
#endif
	    write_header_size = write_header(self, fd);
#ifdef FAILURE_CODE
failure_port_write_header:
#endif
	    if (write_header_size != HEADER_BLOCK_BYTES) {
		self->chunk_status = CHUNK_NO_ROOM;
		mesg = g_strdup_printf("Failed to write header to holding file '%s': %s",
				       tmp_filename, strerror(errno));
		aclose(fd);
		unlink(tmp_filename);
		g_free(tmp_filename);
		goto no_room;
	    }
	    g_free(tmp_filename);
	    self->use_bytes -= HEADER_BLOCK_BYTES;

	    /* rewrite old_header */
	    if (self->filename &&
		strcmp(self->filename, self->new_filename) != 0) {
		if (close_chunk(self, self->new_filename, &mesg) == -1) {
		    self->chunk_status = CHUNK_FAILED;
		    aclose(fd);
		    break;
		}
	    }
	    self->filename = self->new_filename;
	    self->new_filename = NULL;
	    self->fd = fd;
	    self->header_bytes_written = HEADER_BLOCK_BYTES;
	    self->chunk_offset = HEADER_BLOCK_BYTES;
	}

	DBG(2, "beginning to write chunk");
	done = holding_thread_write_chunk(self, &mesg);
	DBG(2, "done writing chunk");

	if (!done) /* cancelled */
	    break;

no_room:
	msg = xmsg_new(XFER_ELEMENT(self), XMSG_CHUNK_DONE, 0);
	msg->header_size = self->header_bytes_written;
	msg->data_size = self->data_bytes_written;
	msg->no_room = (self->chunk_status == CHUNK_NO_ROOM);
	if (mesg) {
	    msg->message = mesg;
	    mesg = NULL;
	}

	xfer_queue_message(elt->xfer, msg);

	/* pause ourselves and await instructions from the main thread */
	self->paused = TRUE;

	/* if this is the last part, we're done with the chunk loop */
	if (self->chunk_status == CHUNK_EOF) {
	    break;
	}
    }
    g_mutex_unlock(self->state_mutex);

    if (self->chunk_status == CHUNK_FAILED) {
	msg = xmsg_new(XFER_ELEMENT(self), XMSG_ERROR, 0);
	msg->message = g_strdup(mesg);
	xfer_queue_message(elt->xfer, msg);
    }

    g_debug("sending XMSG_CRC message");
    g_debug("xfer-dest-holding CRC: %08x     size: %lld",
	    crc32_finish(&elt->crc), (long long)elt->crc.size);
    msg = xmsg_new(XFER_ELEMENT(self), XMSG_CRC, 0);
    msg->crc = crc32_finish(&elt->crc);
    msg->size = elt->crc.size;
    xfer_queue_message(elt->xfer, msg);

    msg = xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0);
    msg->duration = g_timer_elapsed(timer, NULL);
    g_timer_destroy(timer);
    /* tell the main thread we're done */
    xfer_queue_message(elt->xfer, msg);

    return NULL;
}

/*
 * shm Holding Thread
 */

/* Wait for at least one block, or EOF, to be available in the ring buffer. */
static gsize
shm_holding_thread_wait_for_block(
    XferDestHolding *self)
{
    XferElement *elt = XFER_ELEMENT(self);
    gsize usable;

    while (!elt->cancelled &&
	   !elt->shm_ring->mc->cancelled &&
	   !elt->shm_ring->mc->eof_flag &&
	   !(elt->shm_ring->mc->written - elt->shm_ring->mc->readx > HOLDING_BLOCK_BYTES)) {

	if (shm_ring_sem_wait(elt->shm_ring, elt->shm_ring->sem_read) != 0)
	    break;
    }

    usable = MIN(elt->shm_ring->mc->written - elt->shm_ring->mc->readx, HOLDING_BLOCK_BYTES+1);

    return usable;
}

/* Mark WRITTEN bytes as free in the ring buffer.  Called with the ring mutex
 * held. */
static void
shm_holding_thread_consume_block(
    XferDestHolding *self,
    gsize written)
{
    XferElement *elt = XFER_ELEMENT(self);

    elt->shm_ring->mc->readx += written;
    elt->shm_ring->mc->read_offset += written;
    if (elt->shm_ring->mc->read_offset >= elt->shm_ring->mc->ring_size)
	elt->shm_ring->mc->read_offset -= elt->shm_ring->mc->ring_size;
    sem_post(elt->shm_ring->sem_write);
}

/* Write an entire chunk.  Called with the state_mutex held */
static gboolean
shm_holding_thread_write_chunk(
    XferDestHolding  *self,
    char            **mesg)
{
    XferElement *elt = XFER_ELEMENT(self);

    self->chunk_status = CHUNK_OK;

    while (!elt->cancelled &&
	   !elt->shm_ring->mc->cancelled) {
	gsize to_write;
	size_t count;

	/* wait for at least one block, and (if necessary) prebuffer */
	to_write = shm_holding_thread_wait_for_block(self);
	if (elt->cancelled ||
	    elt->shm_ring->mc->cancelled) {
	    break;
	}

	to_write = MIN(to_write, HOLDING_BLOCK_BYTES);
	if (to_write == 0) {
	    self->chunk_status = CHUNK_EOF;
	    break;
	}
	if (self->chunk_status == CHUNK_EOC) {
	    break;
	}
	to_write = MIN(to_write, self->use_bytes);

	DBG(8, "writing %ju bytes to holding", (uintmax_t)to_write);

#ifdef FAILURE_CODE
	{
	    if (shm_write_data == -1) {
		char *A = getenv("XFER_DEST_HOLDING_SHM_WRITE_DATA");
		if (A) {
		    shm_write_data = atoi(A);
		}
	    }
	    if (shm_write_data == 0) {
		count = 0;
		errno = ENOSPC;
		goto failure_shm_write_data;
	    }
	    shm_write_data--;
	}
#endif
	count = db_full_write(self->fd, elt->shm_ring->data + elt->shm_ring->mc->read_offset,
			      (guint)to_write);
#ifdef FAILURE_CODE
failure_shm_write_data:
#endif

	if (count != to_write) {
	    amfree(*mesg);
	    *mesg = g_strdup_printf("Failed to write data to holding file '%s.tmp': %s", self->filename, strerror(errno));
	    if (count > 0) {
		if (ftruncate(self->fd, self->chunk_offset) != 0) {
		    g_debug("ftruncate failed: %s", strerror(errno));
		    return FALSE;
		}
	    }
	    self->chunk_status = CHUNK_NO_ROOM;
	    break;
	}
	crc32_add((uint8_t *)(elt->shm_ring->data + elt->shm_ring->mc->read_offset),
			 to_write, &elt->crc);
	self->chunk_offset += count;

	self->data_bytes_written += count;
	self->use_bytes -= count;
	shm_holding_thread_consume_block(self, count);

	if (self->use_bytes <= 0) {
	    self->chunk_status = CHUNK_EOC;
	    /* loop to see if more data is available
	     * chunk_status might become CHUNK_EOF if at end of input file
	     */
	}
    }

    /* if we write all of the blocks, but the finish_file fails, then likely
     * there was some buffering going on in the holding driver, and the blocks
     * did not all make it to permanent storage -- so it's a failed part.  Note
     * that we try to finish_file even if the part failed, just to be thorough.
     */
    if (elt->cancelled) {
	elt->shm_ring->mc->cancelled = TRUE;
	sem_post(elt->shm_ring->sem_write);
	return FALSE;
    } else if (elt->shm_ring->mc->cancelled) {
	xfer_cancel_with_error(elt, "shm_ring cancelled");
	return FALSE;
    }

    return TRUE;
}

static gpointer
shm_holding_thread(
    gpointer data)
{
    XferDestHolding *self = XFER_DEST_HOLDING(data);
    XferElement *elt = XFER_ELEMENT(self);
    XMsg *msg;
    gchar *mesg = NULL;
    GTimer *timer = g_timer_new();

    DBG(1, "(this is the holding thread)");

    shm_ring_consumer_set_size(elt->shm_ring, HOLDING_BLOCK_BYTES*32, HOLDING_BLOCK_BYTES);

    /* This is the outer loop, that loops once for each holding file or
     * CONTINUE command */
    g_mutex_lock(self->state_mutex);
    while (1) {
	gboolean done;
	/* wait until the main thread un-pauses us, and check that we have
	 * the relevant holding info available */
	while (self->paused && !elt->cancelled) {
	    DBG(9, "waiting to be unpaused");
	    g_cond_wait(self->state_cond, self->state_mutex);
	}
	DBG(9, "shm_holding_thread done waiting");

        if (elt->cancelled)
	    break;

	self->data_bytes_written = 0;
	self->header_bytes_written = 0;

	/* new holding file */
	if (self->filename == NULL ||
	    strcmp(self->filename, self->new_filename) != 0) {
	    char    *tmp_filename;
	    int      fd;
	    ssize_t  write_header_size;

	    if (self->use_bytes < HEADER_BLOCK_BYTES) {
		self->chunk_status = CHUNK_NO_ROOM;
		goto no_room;
	    }

	    tmp_filename = g_strjoin(NULL, self->new_filename, ".tmp", NULL);

#ifdef FAILURE_CODE
	    {
		if (shm_open_header == -1) {
		    char *A = getenv("XFER_DEST_HOLDING_SHM_OPEN_HEADER");
		    if (A) {
			shm_open_header = atoi(A);
		    }
		}
		if (shm_open_header == 0) {
		    fd = -1;
		    errno = ENOSPC;
		    goto failure_shm_open_header;
		}
		shm_open_header--;
	    }
#endif
	    fd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600);
#ifdef FAILURE_CODE
failure_shm_open_header:
#endif
	    if (fd < 0) {
		self->chunk_status = CHUNK_NO_ROOM;
		g_free(mesg);
		mesg = g_strdup_printf("Failed to open holding file '%s': %s",
				       tmp_filename, strerror(errno));
		g_free(tmp_filename);
		goto no_room;
	    }
	    if (self->filename == NULL) {
		self->chunk_header->type = F_DUMPFILE;
	    } else {
		self->chunk_header->type = F_CONT_DUMPFILE;
	    }
	    self->chunk_header->cont_filename[0] = '\0';

#ifdef FAILURE_CODE
	    {
		if (shm_write_header == -1) {
		    char *A = getenv("XFER_DEST_HOLDING_SHM_WRITE_HEADER");
		    if (A) {
			shm_write_header = atoi(A);
		    }
		}
		if (shm_write_header == 0) {
		    write_header_size = 0;
		    errno = ENOSPC;
		    goto failure_shm_write_header;
		}
		shm_write_header--;
	    }
#endif
	    write_header_size = write_header(self, fd);
#ifdef FAILURE_CODE
failure_shm_write_header:
#endif
	    if (write_header_size != HEADER_BLOCK_BYTES) {
		self->chunk_status = CHUNK_NO_ROOM;
		mesg = g_strdup_printf("Failed to write header to holding file '%s': %s",
				       tmp_filename, strerror(errno));
		aclose(fd);
		unlink(tmp_filename);
		g_free(tmp_filename);
		goto no_room;
	    }
	    g_free(tmp_filename);
	    self->use_bytes -= HEADER_BLOCK_BYTES;

	    /* rewrite old_header */
	    if (self->filename &&
		strcmp(self->filename, self->new_filename) != 0) {
		if (close_chunk(self, self->new_filename, &mesg) == -1) {
		    self->chunk_status = CHUNK_FAILED;
		    aclose(fd);
		    break;
		}
	    }
	    self->filename = self->new_filename;
	    self->new_filename = NULL;
	    self->fd = fd;
	    self->header_bytes_written = HEADER_BLOCK_BYTES;
	    self->chunk_offset = HEADER_BLOCK_BYTES;
	}

	DBG(2, "beginning to write chunk");
	done = shm_holding_thread_write_chunk(self, &mesg);
	DBG(2, "done writing chunk");

	if (!done) /* cancelled */
	    break;

no_room:
	msg = xmsg_new(XFER_ELEMENT(self), XMSG_CHUNK_DONE, 0);
	msg->header_size = self->header_bytes_written;
	msg->data_size = self->data_bytes_written;
	msg->no_room = (self->chunk_status == CHUNK_NO_ROOM);
	if (mesg) {
	    msg->message = mesg;
	    mesg = NULL;
	}

	xfer_queue_message(elt->xfer, msg);

	/* pause ourselves and await instructions from the main thread */
	self->paused = TRUE;

	/* if this is the last part, we're done with the chunk loop */
	if (self->chunk_status == CHUNK_EOF) {
	    break;
	}
    }
    g_mutex_unlock(self->state_mutex);

    // notify the producer that everythinng is read
    sem_post(elt->shm_ring->sem_write);

    if (self->chunk_status == CHUNK_FAILED) {
	xfer_cancel_with_error(elt, "%s", mesg);
	msg = xmsg_new(XFER_ELEMENT(self), XMSG_ERROR, 0);
	msg->message = g_strdup(mesg);
	xfer_queue_message(elt->xfer, msg);
    }

    g_debug("sending XMSG_CRC message");
    g_debug("xfer-dest-holding CRC: %08x     size: %lld",
	    crc32_finish(&elt->crc), (long long)elt->crc.size);
    msg = xmsg_new(XFER_ELEMENT(self), XMSG_CRC, 0);
    msg->crc = crc32_finish(&elt->crc);
    msg->size = elt->crc.size;
    xfer_queue_message(elt->xfer, msg);

    msg = xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0);
    msg->duration = g_timer_elapsed(timer, NULL);
    g_timer_destroy(timer);
    /* tell the main thread we're done */
    xfer_queue_message(elt->xfer, msg);

    return NULL;
}

/*
 * Element mechanics
 */

static gboolean
start_impl(
    XferElement *elt)
{
    XferDestHolding *self = (XferDestHolding *)elt;
    GError *error = NULL;

    if (elt->input_mech == XFER_MECH_SHM_RING) {
        self->holding_thread = g_thread_create(shm_holding_thread, (gpointer)self, FALSE, &error);
    } else {
        self->holding_thread = g_thread_create(holding_thread, (gpointer)self, FALSE, &error);
    }
    if (!self->holding_thread) {
        g_critical(_("Error creating new thread: %s (%s)"),
            error->message, errno? strerror(errno) : _("no error code"));
    }

    return TRUE;
}

static gboolean
cancel_impl(
    XferElement *elt,
    gboolean expect_eof)
{
    XferDestHolding *self = XFER_DEST_HOLDING(elt);
    gboolean rv;

    /* chain up first */
    rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);

    /* then signal all of our condition variables, so that threads waiting on them
     * wake up and see elt->cancelled. */
    if (self->mem_ring) {
	g_mutex_lock(self->mem_ring->mutex);
	g_cond_broadcast(self->mem_ring->add_cond);
	g_cond_broadcast(self->mem_ring->free_cond);
	g_mutex_unlock(self->mem_ring->mutex);
    }
    if (elt->shm_ring) {
	elt->shm_ring->mc->cancelled = TRUE;
	sem_post(elt->shm_ring->sem_ready);
	sem_post(elt->shm_ring->sem_start);
	sem_post(elt->shm_ring->sem_read);
	sem_post(elt->shm_ring->sem_write);
    }

    g_mutex_lock(self->state_mutex);
    g_cond_broadcast(self->state_cond);
    g_mutex_unlock(self->state_mutex);

    return rv;
}

static void
start_chunk_impl(
    XferDestHolding *xdh,
    dumpfile_t *chunk_header,
    char *filename,
    guint64 use_bytes)
{
    XferDestHolding *self = XFER_DEST_HOLDING(xdh);

    g_assert(chunk_header != NULL);

    DBG(1, "start_chunk(%s)", filename);

    g_mutex_lock(self->state_mutex);
    g_assert(self->paused);

    self->chunk_header = chunk_header;
    self->use_bytes = use_bytes;
    self->new_filename = g_strdup(filename);
    if (!self->first_filename) {
	self->first_filename = g_strdup(filename);
    }

    DBG(1, "unpausing");
    self->paused = FALSE;
    g_cond_broadcast(self->state_cond);

    g_mutex_unlock(self->state_mutex);
}

static char *
finish_chunk_impl(
    XferDestHolding *xdh)
{
    XferDestHolding *self = XFER_DEST_HOLDING(xdh);
    char *mesg = NULL;

    g_mutex_lock(self->state_mutex);
    if (close_chunk(self, NULL, &mesg) == -1) {
    }
    g_mutex_unlock(self->state_mutex);
    return mesg;
}

static int
close_chunk(
    XferDestHolding *xdh,
    char  *cont_filename,
    char **mesg)
{
    XferDestHolding *self = XFER_DEST_HOLDING(xdh);
    int close_result;
    int save_errno = 0;

    if (self->fd == -1) {
	errno = ENOSPC; /* BOGUS */
	return -1;
    }

    lseek(self->fd, 0L, SEEK_SET);
    if (strcmp(self->filename, self->first_filename) == 0) {
	self->chunk_header->type = F_DUMPFILE;
    } else {
	self->chunk_header->type = F_CONT_DUMPFILE;
    }
    if (cont_filename) {
	strncpy(self->chunk_header->cont_filename, cont_filename, sizeof(self->chunk_header->cont_filename));
	self->chunk_header->cont_filename[sizeof(self->chunk_header->cont_filename)-1] = '\0';
    } else {
	self->chunk_header->cont_filename[0] = '\0';
    }
    close_result = write_header(self, self->fd);
    if (close_result == -1) {
	save_errno = errno;
	*mesg = g_strdup_printf("Failed to rewrite header on holding file '%s': %s", self->filename, strerror(save_errno));
	close(self->fd);
	self->fd = -1;
	g_free(self->filename);
	self->filename = NULL;
	errno = save_errno;
	return close_result;
    }
#ifdef FAILURE_CODE
    {
	if (close_chunk_close == -1) {
	    char *A = getenv("XFER_DEST_HOLDING_CLOSE_CHUNK");
	    if (A) {
		close_chunk_close = atoi(A);
	    }
	}
	if (close_chunk_close == 0) {
	    close(self->fd);
	    close_result = -1;
	    save_errno = ENOSPC;
	    goto failure_close_chunk_close;
	}
	close_chunk_close--;
    }
#endif
    close_result = close(self->fd);
    save_errno = errno;
#ifdef FAILURE_CODE
failure_close_chunk_close:
#endif
    if (close_result == -1) {
	*mesg = g_strdup_printf("Failed to close holding file '%s': %s", self->filename, strerror(save_errno));
    }
    self->fd = -1;
    g_free(self->filename);
    self->filename = NULL;
    errno = save_errno;
    return close_result;
}

static guint64
get_chunk_bytes_written_impl(
    XferDestHolding *xdhself)
{
    XferDestHolding *self = XFER_DEST_HOLDING(xdhself);

    /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
     * partial write to the 64-bit value on a 32-bit system).  This is ok for
     * the moment, as it's only informational, but be warned. */
    return self->data_bytes_written;
}

static void
instance_init(
    XferElement *elt)
{
    XferDestHolding *self = XFER_DEST_HOLDING(elt);
    elt->can_generate_eof = FALSE;

    self->state_mutex = g_mutex_new();
    self->state_cond = g_cond_new();

    self->fd = -1;
    self->use_bytes = 0;
    self->paused = TRUE;
    self->chunk_header = NULL;
    self->filename = NULL;
    self->first_filename = NULL;
    self->new_filename = NULL;
    self->data_bytes_written = 0;
    self->header_bytes_written = 0;
    crc32_init(&elt->crc);
}

static gboolean
setup_impl(
    XferElement *elt)
{
    if (elt->input_mech == XFER_MECH_SHM_RING) {
	elt->shm_ring = shm_ring_create(NULL);
    }

    return TRUE;
}

static void
finalize_impl(
    GObject * obj_self)
{
    XferDestHolding *self = XFER_DEST_HOLDING(obj_self);
    XferElement *elt = XFER_ELEMENT(self);

    g_mutex_free(self->state_mutex);
    g_cond_free(self->state_cond);

    if (elt->shm_ring) {
	close_consumer_shm_ring(elt->shm_ring);
	elt->shm_ring = NULL;
    }

    self->mem_ring = NULL;
    amfree(self->filename);
    amfree(self->first_filename);
    amfree(self->new_filename);
    self->chunk_header = NULL;

    /* chain up */
    G_OBJECT_CLASS(parent_class)->finalize(obj_self);
}

static void
class_init(
    XferDestHoldingClass * selfc)
{
    XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
    XferDestHoldingClass *xdh_klass = XFER_DEST_HOLDING_CLASS(selfc);
    GObjectClass *goc = G_OBJECT_CLASS(selfc);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_MEM_RING, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_SHM_RING, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) }
    };

    klass->setup = setup_impl;
    klass->start = start_impl;
    klass->cancel = cancel_impl;
    xdh_klass->start_chunk = start_chunk_impl;
    xdh_klass->finish_chunk = finish_chunk_impl;
    xdh_klass->get_chunk_bytes_written = get_chunk_bytes_written_impl;
    goc->finalize = finalize_impl;

    klass->perl_class = "Amanda::Xfer::Dest::Holding";
    klass->mech_pairs = mech_pairs;

    parent_class = g_type_class_peek_parent(selfc);
}

static GType
xfer_dest_holding_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestHoldingClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestHolding),
            0 /* n_preallocs */,
            (GInstanceInitFunc) instance_init,
            NULL
        };

        type = g_type_register_static(XFER_ELEMENT_TYPE, "XferDestHolding",
				      &info, 0);
    }

    return type;
}

/*
 * Constructor
 */

XferElement *
xfer_dest_holding(
    size_t max_memory)
{
    XferDestHolding *self = (XferDestHolding *)g_object_new(XFER_DEST_HOLDING_TYPE, NULL);
    XferElement *elt = XFER_ELEMENT(self);
    char *env;

    /* max_memory get rounded up to the next multiple of block_size */
    max_memory = ((max_memory + HOLDING_BLOCK_BYTES - 1)
			/ HOLDING_BLOCK_BYTES) * HOLDING_BLOCK_BYTES;

    self->paused = TRUE;

    /* set up a fake ENOSPC for testing purposes.  Note that this counts
     * headers as well as data written to disk. */
    env = getenv("CHUNKER_FAKE_ENOSPC_AT");
    if (env) {
	fake_enospc_at_byte = (off_t)atoi(env); /* these values are never > MAXINT */
	db_full_write = full_write_with_fake_enospc;
	DBG(1,"will trigger fake ENOSPC at byte %d", (int)fake_enospc_at_byte);
    } else {
	db_full_write = full_write;
    }

    return elt;
}

/*
 * Send an Amanda dump header to the output file and set file->blocksize
 */
static ssize_t
write_header(
    XferDestHolding *self,
    int fd)
{
    char *buffer;
    size_t written;

    self->chunk_header->blocksize = HEADER_BLOCK_BYTES;
    if (debug_chunker > 1)
        dump_dumpfile_t((dumpfile_t *)self->chunk_header);
    buffer = build_header((dumpfile_t *)self->chunk_header, NULL, HEADER_BLOCK_BYTES);
    if (!buffer) /* this shouldn't happen */
        error(_("header does not fit in %zd bytes"), (size_t)HEADER_BLOCK_BYTES);

    written = db_full_write(fd, buffer, HEADER_BLOCK_BYTES);
    g_free(buffer);
    if(written == HEADER_BLOCK_BYTES) return HEADER_BLOCK_BYTES;

    /* fake ENOSPC when we get a short write without errno set */
    if(errno == 0)
        errno = ENOSPC;

    return (ssize_t)-1;
}

static size_t
full_write_with_fake_enospc(
    int fd,
    const void *buf,
    size_t count)
{
    size_t rc;

    //DBG(1,"HERE %zd %zd", count, (size_t)fake_enospc_at_byte);

    if (count <= (size_t)fake_enospc_at_byte) {
	fake_enospc_at_byte -= count;
	return full_write(fd, buf, count);
    }

    /* if we get here, the caller has requested a size that is less
     * than fake_enospc_at_byte. */
    count = fake_enospc_at_byte;
    DBG(1,"returning fake ENOSPC");

    if (fake_enospc_at_byte) {
	rc = full_write(fd, buf, fake_enospc_at_byte);
	if (rc == (size_t)fake_enospc_at_byte) {
	    /* full_write succeeded, so fake a failure */
	    errno = ENOSPC;
	}
    } else {
	/* no bytes to write; just fake an error */
	errno = ENOSPC;
	rc = 0;
    }

    /* switch back to calling full_write directly */
    fake_enospc_at_byte = -1;
    db_full_write = full_write;
    return rc;
}

void
xfer_dest_holding_start_chunk(
    XferElement *elt,
    dumpfile_t *chunk_header,
    char *filename,
    guint64 use_bytes)
{
    XferDestHoldingClass *klass;
    g_assert(IS_XFER_DEST_HOLDING(elt));

    klass = XFER_DEST_HOLDING_GET_CLASS(elt);
    klass->start_chunk(XFER_DEST_HOLDING(elt), chunk_header, filename,
                                         use_bytes);
}

char *
xfer_dest_holding_finish_chunk(
    XferElement *elt)
{
    XferDestHoldingClass *klass;
    g_assert(IS_XFER_DEST_HOLDING(elt));

    klass = XFER_DEST_HOLDING_GET_CLASS(elt);
    return klass->finish_chunk(XFER_DEST_HOLDING(elt));
}