Blob Blame History Raw
/*
 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
 * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 *
 * Contact information: Carbonite Inc., 756 N Pastoria Ave
 * Sunnyvale, CA 94085, or: http://www.zmanda.com
 */

#include "amanda.h"
#include "amxfer.h"
#include "xfer-device.h"
#include "conffile.h"

/* A transfer destination that writes an entire dumpfile to one or more files
 * on one or more devices, caching each part so that it can be rewritten on a
 * subsequent volume in the event of an unexpected EOM.   This is designed to
 * work in concert with Amanda::Taper::Scribe. */

/* Future Plans:
 * - capture EOF early enough to avoid wasting a tape when the part size is an even multiple of the volume size - maybe reader thread can just go back and tag previous slab with EOF in that case?
 * - use mmap to make the disk-cacher thread unnecessary, if supported, by simply mapping slabs into the disk cache file
 * - can we find a way to fall back to mem_cache when the disk cache gets ENOSPC? Does it even make sense to try, since this would change the part size?
 * - distinguish some permanent device errors and do not retry the part? (this will be a change of behavior)
 */

/*
 * Slabs
 *
 * Slabs are larger than blocks, and are the unit on which the element
 * operates.  They are designed to be a few times larger than a block, to
 * achieve a corresponding reduction in the number of locks and unlocks used
 * per block, and similar reduction in the the amount of memory overhead
 * required.
 */

typedef struct Slab {
    struct Slab *next;

    /* counts incoming pointers: the preceding slab's 'next' pointer, and pointers
     * from any processes operating on the slab */
    gint refcount;

    /* number of this slab in the sequence, global to this element's lifetime.
     * Since this counts slabs, which are about 1M, this can address 16
     * yottabytes of data before wrapping. */
    guint64 serial;

    /* slab size; this is only less than the element's slab size if the
     * transfer is at EOF. */
    gsize size;

    /* base of the slab_size buffer */
    gchar *base;
} Slab;

/*
 * Xfer Dest Taper
 */

static GObjectClass *parent_class = NULL;

typedef struct XferDestTaperCacher {
    XferDestTaper __parent__;

    /* object parameters
     *
     * These values are supplied to the constructor, and can be assumed
     * constant for the lifetime of the element.
     */

    /* maximum buffer space to use for streaming; this is unrelated to the
     * fallback_splitsize */
    gsize max_memory;

    /* split buffering info; if we're doing memory buffering, use_mem_cache is
     * true; if we're doing disk buffering, disk_cache_dirname is non-NULL and
     * contains the (allocated) filename of the cache file.  In any
     * case, part_size gives the desired part size.  If part_size is zero, then
     * no splitting takes place (so part_size is effectively infinite). */
    gboolean use_mem_cache;
    char *disk_cache_dirname;
    guint64 part_size; /* (bytes) */

    /*
     * threads
     */

    /* The thread doing the actual writes to tape; this also handles buffering
     * for streaming */
    GThread *device_thread;

    /* The thread writing slabs to the disk cache, if any */
    GThread *disk_cache_thread;

    /* slab train
     *
     * All in-memory data is contained in a linked list called the "slab
     * train".  Various components are operating simultaneously at different
     * points in this train.  Data from the upstream XferElement is appended to
     * the head of the train, and the device thread follows along behind,
     * writing data to the device.  When caching parts in memory, the slab
     * train just grows to eventually contain the whole part.  When using an
     * on-disk cache, the disk cache thread writes the tail of the train to
     * disk, freeing slabs to be re-used at the head of the train.  Some
     * careful coordination of these components allows them to operate as
     * independently as possible within the limits of the user's configuration.
     *
     * Slabs are rarely, if ever, freed: the oldest_slab reference generally
     * ensures that all slabs have refcount > 0, and this pointer is only
     * advanced when re-using slabs that have been flushed to the disk cache or
     * when freeing slabs after completion of the transfer. */

    /* pointers into the slab train are all protected by this mutex.  Note that
     * the slabs themselves can be manipulated without this lock; it's only
     * when changing the pointers that the mutex must be held.  Furthermore, a
     * foo_slab variable which is not NULL will not be changed except by its
     * controlling thread (disk_cacher_slab is controlled by disk_cache_thread,
     * and device_slab is controlled by device_thread).  This means that a
     * controlling thread can drop the slab_mutex once it has ensured its slab
     * is non-NULL.
     *
     * Slab_cond is notified when a new slab is made available from the reader.
     * Slab_free_cond is notified when a slab becomes available for
     * reallocation.
     *
     * Any thread waiting on either condition variable should also check
     * elt->cancelled, and act appropriately if awakened in a cancelled state.
     */
    GMutex *slab_mutex; GCond *slab_cond; GCond *slab_free_cond;

    /* slabs in progress by each thread, or NULL if the thread is waiting on
     * slab_cond.  These can only be changed by their respective threads, except
     * when they are NULL (in which case the reader will point them to a new
     * slab and signal the slab_cond). */
    Slab *volatile disk_cacher_slab;
    Slab *volatile mem_cache_slab;
    Slab *volatile device_slab;

    /* tail and head of the slab train */
    Slab *volatile oldest_slab;
    Slab *volatile newest_slab;

    /* thread-specific information
     *
     * These values are only used by one thread, and thus are not
     * subject to any locking or concurrency constraints.
     */

    /* slab in progress by the reader (not in the slab train) */
    Slab *reader_slab;

    /* the serial to be assigned to reader_slab */
    guint64 next_serial;

    /* bytes written to the device in this part */
    guint64 bytes_written;

    /* bytes written to the device in the current slab */
    guint64 slab_bytes_written;

    /* element state
     *
     * "state" includes all of the variables below (including device
     * parameters).  Note that the device_thread reads state values when
     * paused is false without locking the mutex.  No other thread should
     * change state when the element is not paused.
     *
     * If there is every any reason to lock both mutexes, acquire this one
     * first.
     *
     * Any thread waiting on this condition variable should also check
     * elt->cancelled, and act appropriately if awakened in a cancelled state.
     */
    GMutex *state_mutex;
    GCond *state_cond;
    volatile gboolean paused;

    /* The device to write to, and the header to write to it */
    Device *volatile device;
    dumpfile_t *volatile part_header;

    /* If true, when unpaused, the device should begin at the beginning of the
     * cache; if false, it should proceed to the next part. */
    volatile gboolean retry_part;

    /* If true, the previous part was completed successfully; only used for
     * assertions */
    volatile gboolean last_part_successful;

    /* part number in progress */
    volatile guint64 partnum;

    /* if true, the main thread should *not* call start_part */
    volatile gboolean no_more_parts;

    /* the first serial in this part, and the serial to stop at */
    volatile guint64 part_first_serial, part_stop_serial;

    /* read and write file descriptors for the disk cache file, in use by the
     * disk_cache_thread.  If these are -1, wait on state_cond until they are
     * not; once the value is set, it will not change. */
    volatile int disk_cache_read_fd;
    volatile int disk_cache_write_fd;

    /* device parameters
     *
     * Note that these values aren't known until we begin writing to the
     * device; if block_size is zero, threads should block on state_cond until
     * it is nonzero, at which point all of the dependent fields will have
     * their correct values.  Note that, since this value never changes after
     * it has been set, it is safe to read block_size without acquiring the
     * mutext first. */

    /* this device's need for streaming */
    StreamingRequirement streaming;

    /* block size expected by the target device */
    gsize block_size;

    /* Size of a slab - some multiple of the block size */
    gsize slab_size;

    /* maximum number of slabs allowed, rounded up to the next whole slab.  If
     * using mem cache, this is the equivalent of part_size bytes; otherwise,
     * it is equivalent to max_memory bytes. */
    guint64 max_slabs;

    /* number of slabs in a part */
    guint64 slabs_per_part;

    crc_t crc_before_part;
} XferDestTaperCacher;

static GType xfer_dest_taper_cacher_get_type(void);
#define XFER_DEST_TAPER_CACHER_TYPE (xfer_dest_taper_cacher_get_type())
#define XFER_DEST_TAPER_CACHER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacher)
#define XFER_DEST_TAPER_CACHER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacher const)
#define XFER_DEST_TAPER_CACHER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_cacher_get_type(), XferDestTaperCacherClass)
#define IS_XFER_DEST_TAPER_CACHER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_cacher_get_type ())
#define XFER_DEST_TAPER_CACHER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacherClass)

typedef struct {
    XferDestTaperClass __parent__;

} XferDestTaperCacherClass;

/*
 * Debug logging
 */

#define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
static void
_xdt_dbg(const char *fmt, ...)
{
    va_list argp;
    char msg[1024];

    arglist_start(argp, fmt);
    g_vsnprintf(msg, sizeof(msg), fmt, argp);
    arglist_end(argp);
    g_debug("XDTC: %s", msg);
}

/*
 * Slab handling
 */

/* called with the slab_mutex held, this gets a new slab to write into, with
 * refcount 1.  It will block if max_memory slabs are already in use, and mem
 * caching is not in use, although allocation may be forced with the 'force'
 * parameter.
 *
 * If the memory allocation cannot be satisfied due to system constraints,
 * this function will send an XMSG_ERROR, wait for the transfer to cancel, and
 * return NULL.  If the transfer is cancelled by some other means while this
 * function is blocked awaiting a free slab, it will return NULL.
 *
 * @param self: the xfer element
 * @param force: allocate a slab even if it would exceed max_memory
 * @returns: a new slab, or NULL if the xfer is cancelled
 */
static Slab *
alloc_slab(
    XferDestTaperCacher *self,
    gboolean force)
{
    XferElement *elt = XFER_ELEMENT(self);
    Slab *rv;

    DBG(8, "alloc_slab(force=%d)", force);
    if (!force) {
	/* throttle based on maximum number of extant slabs */
	while (G_UNLIKELY(
            !elt->cancelled &&
	    self->oldest_slab &&
	    self->newest_slab &&
	    self->oldest_slab->refcount > 1 &&
	    (self->newest_slab->serial - self->oldest_slab->serial + 1) >= self->max_slabs)) {
	    DBG(9, "waiting for available slab");
	    g_cond_wait(self->slab_free_cond, self->slab_mutex);
	}
	DBG(9, "alloc_slab done waiting");

        if (elt->cancelled)
            return NULL;
    }

    /* if the oldest slab doesn't have anything else pointing to it, just use
     * that */
    if (self->oldest_slab && self->oldest_slab->refcount == 1) {
	rv = self->oldest_slab;
	self->oldest_slab = rv->next;
    } else {
	rv = g_new0(Slab, 1);
	rv->refcount = 1;
	rv->base = g_try_malloc(self->slab_size);
	if (!rv->base) {
	    xfer_cancel_with_error(XFER_ELEMENT(self),
		_("Could not allocate %zu bytes of memory: %s"), self->slab_size, strerror(errno));
	    g_free(rv);
	    return NULL;
	}
    }

    rv->next = NULL;
    rv->size = 0;
    return rv;
}

/* called with the slab_mutex held, this frees the given slave entirely.  The
 * reference count is not consulted.
 *
 * @param slab: slab to free
 */
static void
free_slab(
    Slab *slab)
{
    if (slab) {
	if (slab->base)
	    g_free(slab->base);
	g_free(slab);
    }
}

/* called with the slab_mutex held, this decrements the refcount of the
 * given slab
 *
 * @param self: xfer element
 * @param slab: slab to free
 */
static inline void
unref_slab(
    XferDestTaperCacher *self,
    Slab *slab)
{
    g_assert(slab->refcount > 1);
    slab->refcount--;
    if (G_UNLIKELY(slab->refcount == 1 && slab == self->oldest_slab)) {
	g_cond_broadcast(self->slab_free_cond);
    } else if (G_UNLIKELY(slab->refcount == 0)) {
	free_slab(slab);
    }
}

/* called with the slab_mutex held, this sets *slabp to *slabp->next,
 * adjusting refcounts appropriately, and returns the new value
 *
 * @param self: xfer element
 * @param slabp: slab pointer to advance
 * @returns: new value of *slabp
 */
static inline Slab *
next_slab(
    XferDestTaperCacher *self,
    Slab * volatile *slabp)
{
    Slab *next;

    if (!slabp || !*slabp)
	return NULL;

    next = (*slabp)->next;
    if (next)
	next->refcount++;
    if (*slabp)
	unref_slab(self, *slabp);
    *slabp = next;

    return next;
}

/*
 * Disk Cache
 *
 * The disk cache thread's job is simply to follow along the slab train at
 * maximum speed, writing slabs to the disk cache file. */

static gboolean
open_disk_cache_fds(
    XferDestTaperCacher *self)
{
    char * filename;

    g_assert(self->disk_cache_read_fd == -1);
    g_assert(self->disk_cache_write_fd == -1);

    g_mutex_lock(self->state_mutex);
    filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
                               self->disk_cache_dirname);

    self->disk_cache_write_fd = g_mkstemp(filename);
    if (self->disk_cache_write_fd < 0) {
	g_mutex_unlock(self->state_mutex);
	xfer_cancel_with_error(XFER_ELEMENT(self),
	    _("Error creating cache file in '%s': %s"), self->disk_cache_dirname,
	    strerror(errno));
	g_free(filename);
	return FALSE;
    }

    /* open a separate copy of the file for reading */
    self->disk_cache_read_fd = open(filename, O_RDONLY);
    if (self->disk_cache_read_fd < 0) {
	g_mutex_unlock(self->state_mutex);
	xfer_cancel_with_error(XFER_ELEMENT(self),
	    _("Error opening cache file in '%s': %s"), self->disk_cache_dirname,
	    strerror(errno));
	g_free(filename);
	return FALSE;
    }

    /* signal anyone waiting for this value */
    g_cond_broadcast(self->state_cond);
    g_mutex_unlock(self->state_mutex);

    /* errors from unlink are not fatal */
    if (unlink(filename) < 0) {
	g_warning("While unlinking '%s': %s (ignored)", filename, strerror(errno));
    }

    g_free(filename);
    return TRUE;
}

static gpointer
disk_cache_thread(
    gpointer data)
{
    XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(data);
    XferElement *elt = XFER_ELEMENT(self);

    DBG(1, "(this is the disk cache thread)");

    /* open up the disk cache file first */
    if (!open_disk_cache_fds(self))
	return NULL;

    while (!elt->cancelled) {
	gboolean eof, eop;
	guint64 stop_serial;
	Slab *slab;

	/* rewind to the begining of the disk cache file */
	if (lseek(self->disk_cache_write_fd, 0, SEEK_SET) == -1) {
	    xfer_cancel_with_error(XFER_ELEMENT(self),
		_("Error seeking disk cache file in '%s': %s"), self->disk_cache_dirname,
		strerror(errno));
	    return NULL;
	}

	/* we need to sit and wait for the next part to begin, first making sure
	 * we have a slab .. */
	g_mutex_lock(self->slab_mutex);
	while (!self->disk_cacher_slab && !elt->cancelled) {
	    DBG(9, "waiting for a disk slab");
	    g_cond_wait(self->slab_cond, self->slab_mutex);
	}
	DBG(9, "disk_cache_thread done waiting");
	g_mutex_unlock(self->slab_mutex);

	if (elt->cancelled)
	    break;

	/* this slab is now fixed until this thread changes it */
	g_assert(self->disk_cacher_slab != NULL);

	/* and then making sure we're ready to write that slab. */
	g_mutex_lock(self->state_mutex);
        while ((self->paused ||
		    (self->disk_cacher_slab && self->disk_cacher_slab->serial > self->part_first_serial))
		&& !elt->cancelled) {
            DBG(9, "waiting for the disk slab to become current and un-paused");
            g_cond_wait(self->state_cond, self->state_mutex);
        }
	DBG(9, "disk_cache_thread done waiting");

	stop_serial = self->part_stop_serial;
	g_mutex_unlock(self->state_mutex);

	if (elt->cancelled)
	    break;

	g_mutex_lock(self->slab_mutex);
	slab = self->disk_cacher_slab;
	eop = eof = FALSE;
	while (!eop && !eof) {
	    /* if we're at the head of the slab train, wait for more data */
	    while (!self->disk_cacher_slab && !elt->cancelled) {
		DBG(9, "waiting for the next disk slab");
		g_cond_wait(self->slab_cond, self->slab_mutex);
	    }
	    DBG(9, "disk_cache_thread done waiting");

            if (elt->cancelled)
                break;

	    /* drop the lock long enough to write the slab; the refcount
	     * protects the slab during this time */
	    slab = self->disk_cacher_slab;
	    g_mutex_unlock(self->slab_mutex);

	    if (full_write(self->disk_cache_write_fd, slab->base, slab->size) < slab->size) {
		xfer_cancel_with_error(XFER_ELEMENT(self),
		    _("Error writing to disk cache file in '%s': %s"), self->disk_cache_dirname,
		    strerror(errno));
		return NULL;
	    }

	    eof = slab->size < self->slab_size;
	    eop = (slab->serial + 1 == stop_serial);

	    g_mutex_lock(self->slab_mutex);
	    next_slab(self, &self->disk_cacher_slab);
	}
	g_mutex_unlock(self->slab_mutex);

	if (eof) {
	    /* this very thread should have just set this value to NULL, and since it's
	     * EOF, there should not be any 'next' slab */
	    g_assert(self->disk_cacher_slab == NULL);
	    break;
	}
    }

    return NULL;
}

/*
 * Device Thread
 *
 * The device thread's job is to write slabs to self->device, applying whatever
 * streaming algorithms are required.  It does this by alternately getting the
 * next slab from a "slab source" and writing that slab to the device.  Most of
 * the slab source functions assume that self->slab_mutex is held, but may
 * release the mutex (either explicitly or via a g_cond_wait), so it is not
 * valid to assume that any slab pointers remain unchanged after a slab_source
 * function invocation.
 */

/* This struct tracks the current state of the slab source */
typedef struct slab_source_state {
    /* temporary slab used for reading from disk */
    Slab *tmp_slab;

    /* next serial to read from disk */
    guint64 next_serial;
} slab_source_state;

/* Called with the slab_mutex held, this function pre-buffers enough data into the slab
 * train to meet the device's streaming needs. */
static gboolean
slab_source_prebuffer(
    XferDestTaperCacher *self)
{
    XferElement *elt = XFER_ELEMENT(self);
    guint64 prebuffer_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
    guint64 i;
    Slab *slab;

    /* always prebuffer at least one slab, even if max_memory is 0 */
    if (prebuffer_slabs == 0) prebuffer_slabs = 1;

    /* pre-buffering is not necessary if we're retrying a part */
    if (self->retry_part)
	return TRUE;

    /* pre-buffering means waiting until we have at least prebuffer_slabs in the
     * slab train ahead of the device_slab, or the newest slab is at EOF. */
    while (!elt->cancelled) {
	gboolean eof_or_eop = FALSE;

	/* see if there's enough data yet */
	for (i = 0, slab = self->device_slab;
	     i < prebuffer_slabs && slab != NULL;
	     i++, slab = slab->next) {
	    eof_or_eop = (slab->size < self->slab_size)
		|| (slab->serial + 1 == self->part_stop_serial);
	}
	if (i == prebuffer_slabs || eof_or_eop)
	    break;

	DBG(9, "prebuffering wait");
	g_cond_wait(self->slab_cond, self->slab_mutex);
    }
    DBG(9, "slab_source_prebuffer done waiting");

    if (elt->cancelled) {
	self->last_part_successful = FALSE;
	self->no_more_parts = TRUE;
	return FALSE;
    }

    return TRUE;
}

/* Called without the slab_mutex held, this function sets up a new slab_source_state
 * object based on the configuratino of the Xfer Element. */
static inline gboolean
slab_source_setup(
    XferDestTaperCacher *self,
    slab_source_state *state)
{
    XferElement *elt = XFER_ELEMENT(self);
    state->tmp_slab = NULL;
    state->next_serial = G_MAXUINT64;

    /* if we're to retry the part, rewind to the beginning */
    if (self->retry_part) {
	if (self->use_mem_cache) {
	    /* rewind device_slab to point to the mem_cache_slab */
	    g_mutex_lock(self->slab_mutex);
	    if (self->device_slab)
		unref_slab(self, self->device_slab);
	    self->device_slab = self->mem_cache_slab;
	    if(self->device_slab != NULL)
		self->device_slab->refcount++;
	    g_mutex_unlock(self->slab_mutex);
	} else {
	    g_mutex_lock(self->slab_mutex);

	    /* we're going to read from the disk cache until we get to the oldest useful
	     * slab in memory, so it had best exist */
	    g_assert(self->oldest_slab != NULL);

	    /* point device_slab at the oldest slab we have */
	    self->oldest_slab->refcount++;
	    if (self->device_slab)
		unref_slab(self, self->device_slab);
	    self->device_slab = self->oldest_slab;

	    /* and increment it until it is at least the slab we want to start from */
	    while (self->device_slab->serial < self->part_first_serial) {
		next_slab(self, &self->device_slab);
	    }

	    /* get a new, temporary slab for use while reading */
	    state->tmp_slab = alloc_slab(self, TRUE);

	    g_mutex_unlock(self->slab_mutex);

	    if (!state->tmp_slab) {
                /* if we couldn't allocate a slab, then we're cancelled, so we're done with
                 * this part. */
		self->last_part_successful = FALSE;
		self->no_more_parts = TRUE;
		return FALSE;
	    }

	    state->tmp_slab->size = self->slab_size;
	    state->next_serial = self->part_first_serial;

	    /* We're reading from the disk cache, so we need a file descriptor
	     * to read from, so wait for disk_cache_thread to open the
	     * disk_cache_read_fd */
	    g_assert(self->disk_cache_dirname);
	    g_mutex_lock(self->state_mutex);
	    while (self->disk_cache_read_fd == -1 && !elt->cancelled) {
		DBG(9, "waiting for disk_cache_thread to set disk_cache_read_fd");
		g_cond_wait(self->state_cond, self->state_mutex);
	    }
	    DBG(9, "slab_source_setup done waiting");
	    g_mutex_unlock(self->state_mutex);

	    if (elt->cancelled) {
		self->last_part_successful = FALSE;
		self->no_more_parts = TRUE;
		return FALSE;
	    }

	    /* rewind to the beginning */
	    if (lseek(self->disk_cache_read_fd, 0, SEEK_SET) == -1) {
		xfer_cancel_with_error(XFER_ELEMENT(self),
		    _("Could not seek disk cache file for reading: %s"),
		    strerror(errno));
		self->last_part_successful = FALSE;
		self->no_more_parts = TRUE;
		return FALSE;
	    }
	}
    }

    /* if the streaming mode requires it, pre-buffer */
    if (self->streaming == STREAMING_REQUIREMENT_DESIRED ||
	self->streaming == STREAMING_REQUIREMENT_REQUIRED) {
	gboolean prebuffer_ok;

	g_mutex_lock(self->slab_mutex);
	prebuffer_ok = slab_source_prebuffer(self);
	g_mutex_unlock(self->slab_mutex);
	if (!prebuffer_ok)
	    return FALSE;
    }

    return TRUE;
}

/* Called with the slab_mutex held, this does the work of slab_source_get when
 * reading from the disk cache.  Note that this explicitly releases the
 * slab_mutex during execution - do not depend on any protected values across a
 * call to this function.  The mutex is held on return. */
static Slab *
slab_source_get_from_disk(
    XferDestTaperCacher *self,
    slab_source_state *state,
    guint64 serial)
{
    XferDestTaper *xdt = XFER_DEST_TAPER(self);
    gsize bytes_read;

    g_assert(state->next_serial == serial);

    /* NOTE: slab_mutex is held, but we don't need it here, so release it for the moment */
    g_mutex_unlock(self->slab_mutex);

    bytes_read = read_fully(self->disk_cache_read_fd, state->tmp_slab->base,
        self->slab_size, NULL);

    if (bytes_read < self->slab_size) {
	xfer_cancel_with_error(XFER_ELEMENT(xdt),
	    _("Error reading disk cache: %s"),
	    errno ? strerror(errno) : _("Unexpected EOF"));
	goto fatal_error;
    }

    state->tmp_slab->serial = state->next_serial++;
    g_mutex_lock(self->slab_mutex);
    return state->tmp_slab;

fatal_error:
    g_mutex_lock(self->slab_mutex);
    self->last_part_successful = FALSE;
    self->no_more_parts = TRUE;
    return NULL;
}

/* Called with the slab_mutex held, this function gets the slab with the given
 * serial number, waiting if necessary for that slab to be available.  Note
 * that the slab_mutex may be released during execution, although it is always
 * held on return. */
static inline Slab *
slab_source_get(
    XferDestTaperCacher *self,
    slab_source_state *state,
    guint64 serial)
{
    XferElement *elt = (XferElement *)self;

    /* device_slab is only NULL if we're following the slab train, so wait for
     * a new slab */
    if (!self->device_slab) {
	/* if the streaming mode requires it, pre-buffer */
	if (self->streaming == STREAMING_REQUIREMENT_DESIRED) {
	    if (!slab_source_prebuffer(self))
		return NULL;

	    /* fall through to make sure we have a device_slab;
	     * slab_source_prebuffer doesn't guarantee device_slab != NULL */
	}

	while (self->device_slab == NULL && !elt->cancelled) {
	    DBG(9, "waiting for the next slab");
	    g_cond_wait(self->slab_cond, self->slab_mutex);
	}
	DBG(9, "slab_source_get done waiting");

	if (elt->cancelled)
	    goto fatal_error;
    }

    /* device slab is now set, and only this thread can change it */
    g_assert(self->device_slab);

    /* if the next item in the device slab is the one we want, then the job is
     * pretty easy */
    if (G_LIKELY(serial == self->device_slab->serial))
	return self->device_slab;

    /* otherwise, we're reading from disk */
    g_assert(serial < self->device_slab->serial);
    return slab_source_get_from_disk(self, state, serial);

fatal_error:
    self->last_part_successful = FALSE;
    self->no_more_parts = TRUE;
    return NULL;
}

/* Called without the slab_mutex held, this frees any resources assigned
 * to the slab source state */
static inline void
slab_source_free(
    XferDestTaperCacher *self,
    slab_source_state *state)
{
    if (state->tmp_slab) {
	g_mutex_lock(self->slab_mutex);
	free_slab(state->tmp_slab);
	g_mutex_unlock(self->slab_mutex);
    }
}

/* Called without the slab_mutex, this writes the given slab to the device */
static gboolean
write_slab_to_device(
    XferDestTaperCacher *self,
    Slab *slab)
{
    XferElement *elt = XFER_ELEMENT(self);
    gchar *buf = slab->base;
    gsize remaining = slab->size;

    while (remaining && !elt->cancelled) {
	gsize write_size = MIN(self->block_size, remaining);
	DeviceWriteResult ok;
	ok = device_write_block(self->device, write_size, buf);
	if (ok != WRITE_SUCCEED) {
            self->bytes_written += slab->size - remaining;

            /* TODO: handle an error without is_eom
             * differently/fatally? or at least with a warning? */
	    self->last_part_successful = FALSE;
	    self->no_more_parts = FALSE;
	    return FALSE;
	}

	crc32_add((uint8_t *)buf, write_size, &elt->crc);
	buf += write_size;
	self->slab_bytes_written += write_size;
	remaining -= write_size;
    }

    if (elt->cancelled) {
	self->last_part_successful = FALSE;
	self->no_more_parts = TRUE;
        return FALSE;
    }

    self->bytes_written += slab->size;
    self->slab_bytes_written = 0;
    return TRUE;
}

static XMsg *
device_thread_write_part(
    XferDestTaperCacher *self)
{
    XferElement *elt = XFER_ELEMENT(self);
    GTimer *timer = g_timer_new();
    XMsg *msg;
    slab_source_state src_state = {0, 0};
    guint64 serial, stop_serial;
    gboolean eof = FALSE;
    int fileno = 0;
    int failed = 0;
    int slab_source_set = 0;

    self->last_part_successful = FALSE;
    self->bytes_written = 0;
    self->crc_before_part = elt->crc;

    if (!device_start_file(self->device, self->part_header)) {
	failed = 1;
	goto part_done;
    }

    dumpfile_free(self->part_header);
    self->part_header = NULL;

    fileno = self->device->file;
    g_assert(fileno > 0);

    if (!slab_source_setup(self, &src_state))
	goto part_done;
    slab_source_set = 1;

    g_timer_start(timer);

    stop_serial = self->part_stop_serial;
    g_mutex_lock(self->slab_mutex);
    for (serial = self->part_first_serial; serial < stop_serial && !eof; serial++) {
	Slab *slab = slab_source_get(self, &src_state, serial);
	DBG(8, "writing slab %p (serial %ju) to device", slab, serial);
	g_mutex_unlock(self->slab_mutex);
	if (!slab) {
	    failed = 1;
	    goto part_done;
	}

	eof = slab->size < self->slab_size;

	if (!write_slab_to_device(self, slab)) {
	    failed = 1;
	    goto part_done;
	}

	g_mutex_lock(self->slab_mutex);
	DBG(8, "wrote slab %p to device", slab);

	/* if we're reading from the slab train, advance self->device_slab. */
	if (slab == self->device_slab) {
	    next_slab(self, &self->device_slab);
	}
    }
    g_mutex_unlock(self->slab_mutex);

part_done:
    /* if we write all of the blocks, but the finish_file fails, then likely
     * there was some buffering going on in the device driver, and the blocks
     * did not all make it to permanent storage -- so it's a failed part. */
    if (self->device->in_file && !device_finish_file(self->device))
	failed = 1;

    if (slab_source_set) {
	slab_source_free(self, &src_state);
    }

    if (!failed) {
	self->last_part_successful = TRUE;
	self->no_more_parts = eof;
    } else {
	elt->crc = self->crc_before_part;
    }

    g_timer_stop(timer);

    msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
    msg->size = self->bytes_written;
    msg->duration = g_timer_elapsed(timer, NULL);
    msg->partnum = self->partnum;
    msg->fileno = fileno;
    msg->successful = self->last_part_successful;
    msg->eom = !self->last_part_successful;
    msg->eof = self->no_more_parts;

    /* time runs backward on some test boxes, so make sure this is positive */
    if (msg->duration < 0) msg->duration = 0;

    if (self->last_part_successful)
	self->partnum++;

    g_timer_destroy(timer);

    return msg;
}

/* Called with the status_mutex held, this frees any cached data for
 * a successful part */
static void
release_part_cache(
    XferDestTaperCacher *self)
{
    if (self->use_mem_cache && self->mem_cache_slab) {
	/* move up the mem_cache_slab to point to the first slab in
	 * the next part (probably NULL at this point), so that the
	 * reader can continue reading data into the new mem cache
	 * immediately. */
	g_mutex_lock(self->slab_mutex);
	unref_slab(self, self->mem_cache_slab);
	self->mem_cache_slab = self->device_slab;
	if (self->mem_cache_slab)
	    self->mem_cache_slab->refcount++;
	g_mutex_unlock(self->slab_mutex);
    }

    /* the disk cache gets reused automatically (rewinding to offset 0), so
     * there's nothing else to do */
}

static gpointer
device_thread(
    gpointer data)
{
    XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(data);
    XferElement *elt = XFER_ELEMENT(self);
    XMsg *msg;

    DBG(1, "(this is the device thread)");

    if (self->disk_cache_dirname) {
        GError *error = NULL;
	self->disk_cache_thread = g_thread_create(disk_cache_thread, (gpointer)self, TRUE, &error);
        if (!self->disk_cache_thread) {
            g_critical(_("Error creating new thread: %s (%s)"),
                error->message, errno? strerror(errno) : _("no error code"));
        }
    }

    /* This is the outer loop, that loops once for each split part written to
     * tape. */
    g_mutex_lock(self->state_mutex);
    while (1) {
	/* wait until the main thread un-pauses us, and check that we have
	 * the relevant device info available (block_size) */
	while (self->paused && !elt->cancelled) {
	    DBG(9, "waiting to be unpaused");
	    g_cond_wait(self->state_cond, self->state_mutex);
	}
	DBG(9, "device_thread done waiting");

        if (elt->cancelled)
	    break;

        g_mutex_unlock(self->state_mutex);
	self->slab_bytes_written = 0;
	DBG(2, "beginning to write part");
	msg = device_thread_write_part(self);
	DBG(2, "done writing part");
        g_mutex_lock(self->state_mutex);

	/* release any cache of a successful part, but don't bother at EOF */
	if (msg->successful && !msg->eof)
	    release_part_cache(self);

	xfer_queue_message(elt->xfer, msg);

	/* if this is the last part, we're done with the part loop */
	if (self->no_more_parts)
	    break;

	/* pause ourselves and await instructions from the main thread */
	self->paused = TRUE;
    }

    g_mutex_unlock(self->state_mutex);

    /* make sure the other thread is done before we send XMSG_DONE */
    if (self->disk_cache_thread)
        g_thread_join(self->disk_cache_thread);

    g_debug("sending XMSG_CRC message");
    g_debug("xfer-dest-taper-cacher CRC %08x      size %lld",
	    crc32_finish(&elt->crc), (long long)elt->crc.size);
    msg = xmsg_new(XFER_ELEMENT(self), XMSG_CRC, 0);
    msg->crc = crc32_finish(&elt->crc);
    msg->size = elt->crc.size;
    xfer_queue_message(elt->xfer, msg);

    /* tell the main thread we're done */
    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));

    return NULL;
}

/*
 * Class mechanics
 */

/* called with the slab_mutex held, this adds the reader_slab to the head of
 * the slab train and signals the condition variable. */
static void
add_reader_slab_to_train(
    XferDestTaperCacher *self)
{
    Slab *slab = self->reader_slab;

    DBG(3, "adding slab of new data to the slab train");

    if (self->newest_slab) {
	self->newest_slab->next = slab;
	slab->refcount++;

	self->newest_slab->refcount--;
    }

    self->newest_slab = slab; /* steal reader_slab's ref */
    self->reader_slab = NULL;

    /* steal reader_slab's reference for newest_slab */

    /* if any of the other pointers are waiting for this slab, update them */
    if (self->disk_cache_dirname && !self->disk_cacher_slab) {
	self->disk_cacher_slab = slab;
	slab->refcount++;
    }
    if (self->use_mem_cache && !self->mem_cache_slab) {
	self->mem_cache_slab = slab;
	slab->refcount++;
    }
    if (!self->device_slab) {
	self->device_slab = slab;
	slab->refcount++;
    }
    if (!self->oldest_slab) {
	self->oldest_slab = slab;
	slab->refcount++;
    }

    g_cond_broadcast(self->slab_cond);
}

static void
push_buffer_static_impl(
    XferElement *elt,
    gpointer buf,
    size_t size)
{
    XferDestTaperCacher *self = (XferDestTaperCacher *)elt;
    char *p;

    DBG(3, "push_buffer_static(%p, %ju)", buf, (uintmax_t)size);

    /* do nothing if cancelled */
    if (G_UNLIKELY(elt->cancelled)) {
        goto free_and_finish;
    }

    /* handle EOF */
    if (G_UNLIKELY(buf == NULL) || size == 0) {
	/* send off the last, probably partial slab */
	g_mutex_lock(self->slab_mutex);

	/* create a new, empty slab if necessary */
	if (!self->reader_slab) {
	    self->reader_slab = alloc_slab(self, FALSE);
            if (!self->reader_slab) {
                /* we've been cancelled while waiting for a slab */
                g_mutex_unlock(self->slab_mutex);

                /* wait for the xfer to cancel, so we don't get another buffer
                 * pushed to us (and do so *without* the mutex held) */
                wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);

                goto free_and_finish;
            }
	    self->reader_slab->serial = self->next_serial++;
	}

	add_reader_slab_to_train(self);
	g_mutex_unlock(self->slab_mutex);

	goto free_and_finish;
    }

    p = buf;
    while (1) {
	gsize copy_size;

	/* get a fresh slab, if needed */
	if (G_UNLIKELY(!self->reader_slab) || self->reader_slab->size == self->slab_size) {
	    g_mutex_lock(self->slab_mutex);
	    if (self->reader_slab)
		add_reader_slab_to_train(self);
	    self->reader_slab = alloc_slab(self, FALSE);
            if (!self->reader_slab) {
                /* we've been cancelled while waiting for a slab */
                g_mutex_unlock(self->slab_mutex);

                /* wait for the xfer to cancel, so we don't get another buffer
                 * pushed to us (and do so *without* the mutex held) */
                wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);

                goto free_and_finish;
            }
	    self->reader_slab->serial = self->next_serial++;
	    g_mutex_unlock(self->slab_mutex);
	}

	if (size == 0)
	    break;

	copy_size = MIN(self->slab_size - self->reader_slab->size, size);
	memcpy(self->reader_slab->base+self->reader_slab->size, p, copy_size);

	self->reader_slab->size += copy_size;
	p += copy_size;
	size -= copy_size;
    }

free_and_finish:
    return;
}

static void
push_buffer_impl(
    XferElement *elt,
    gpointer buf,
    size_t size)
{
    push_buffer_static_impl(elt, buf, size);

    if (buf)
        g_free(buf);
}

/*
 * Element mechanics
 */

static gboolean
start_impl(
    XferElement *elt)
{
    XferDestTaperCacher *self = (XferDestTaperCacher *)elt;
    GError *error = NULL;

    self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
    if (!self->device_thread) {
        g_critical(_("Error creating new thread: %s (%s)"),
            error->message, errno? strerror(errno) : _("no error code"));
    }

    return TRUE;
}

static gboolean
cancel_impl(
    XferElement *elt,
    gboolean expect_eof)
{
    XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(elt);
    gboolean rv;

    /* chain up first */
    rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);

    /* then signal all of our condition variables, so that threads waiting on them
     * wake up and see elt->cancelled. */
    g_mutex_lock(self->slab_mutex);
    g_cond_broadcast(self->slab_cond);
    g_cond_broadcast(self->slab_free_cond);
    g_mutex_unlock(self->slab_mutex);

    g_mutex_lock(self->state_mutex);
    g_cond_broadcast(self->state_cond);
    g_mutex_unlock(self->state_mutex);

    return rv;
}

static void
start_part_impl(
    XferDestTaper *xdt,
    gboolean retry_part,
    dumpfile_t *header)
{
    XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);

    g_assert(self->device != NULL);
    g_assert(!self->device->in_file);
    g_assert(header != NULL);

    DBG(1, "start_part(retry_part=%d)", retry_part);

    g_mutex_lock(self->state_mutex);
    g_assert(self->paused);
    g_assert(!self->no_more_parts);

    if (self->part_header)
	dumpfile_free(self->part_header);
    self->part_header = dumpfile_copy(header);

    if (retry_part) {
	g_assert(!self->last_part_successful);
	self->retry_part = TRUE;
    } else {
	g_assert(self->last_part_successful);
	self->retry_part = FALSE;
	self->part_first_serial = self->part_stop_serial;
	if (self->part_size != 0) {
	    self->part_stop_serial = self->part_first_serial + self->slabs_per_part;
	} else {
	    /* set part_stop_serial to an effectively infinite value */
	    self->part_stop_serial = G_MAXUINT64;
	}
    }

    DBG(1, "unpausing");
    self->paused = FALSE;
    g_cond_broadcast(self->state_cond);

    g_mutex_unlock(self->state_mutex);
}

static void
use_device_impl(
    XferDestTaper *xdt,
    Device *device)
{
    XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
    GValue val;

    /* short-circuit if nothing is changing */
    if (self->device == device)
	return;

    g_mutex_lock(self->state_mutex);
    if (self->device)
	g_object_unref(self->device);
    self->device = device;
    g_object_ref(device);

    /* get this new device's streaming requirements */
    bzero(&val, sizeof(val));
    if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
        || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
        g_warning("Couldn't get streaming type for %s", self->device->device_name);
        self->streaming = STREAMING_REQUIREMENT_REQUIRED;
    } else {
        self->streaming = g_value_get_enum(&val);
    }
    g_value_unset(&val);

    /* check that the blocksize hasn't changed */
    if (self->block_size != device->block_size) {
        g_mutex_unlock(self->state_mutex);
        xfer_cancel_with_error(XFER_ELEMENT(self),
            _("All devices used by the taper must have the same block size"));
        return;
    }
    g_mutex_unlock(self->state_mutex);
}

static guint64
get_part_bytes_written_impl(
    XferDestTaper *xdt)
{
    XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);

    /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
     * partial write to the 64-bit value on a 32-bit system).  This is ok for
     * the moment, as it's only informational, but be warned. */
    if (self->device) {
	return device_get_bytes_written(self->device);
    } else {
	return self->bytes_written + self->slab_bytes_written;
    }

}

static void
instance_init(
    XferElement *elt)
{
    XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(elt);
    elt->can_generate_eof = FALSE;

    self->state_mutex = g_mutex_new();
    self->state_cond = g_cond_new();
    self->slab_mutex = g_mutex_new();
    self->slab_cond = g_cond_new();
    self->slab_free_cond = g_cond_new();

    self->last_part_successful = TRUE;
    self->paused = TRUE;
    self->part_stop_serial = 0;
    self->disk_cache_read_fd = -1;
    self->disk_cache_write_fd = -1;
    crc32_init(&elt->crc);
}

static void
finalize_impl(
    GObject * obj_self)
{
    XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(obj_self);
    Slab *slab, *next_slab;

    if (self->disk_cache_dirname)
	g_free(self->disk_cache_dirname);

    g_mutex_free(self->state_mutex);
    g_cond_free(self->state_cond);

    g_mutex_free(self->slab_mutex);
    g_cond_free(self->slab_cond);
    g_cond_free(self->slab_free_cond);

    /* free the slab train, without reference to the refcounts */
    for (slab = self->oldest_slab; slab != NULL; slab = next_slab) {
        next_slab = slab->next;
        free_slab(slab);
    }
    self->disk_cacher_slab = NULL;
    self->mem_cache_slab = NULL;
    self->device_slab = NULL;
    self->oldest_slab = NULL;
    self->newest_slab = NULL;

    if (self->reader_slab) {
        free_slab(self->reader_slab);
        self->reader_slab = NULL;
    }

    if (self->part_header)
	dumpfile_free(self->part_header);

    if (self->disk_cache_read_fd != -1)
	close(self->disk_cache_read_fd); /* ignore error */
    if (self->disk_cache_write_fd != -1)
	close(self->disk_cache_write_fd); /* ignore error */

    if (self->device)
	g_object_unref(self->device);

    /* chain up */
    G_OBJECT_CLASS(parent_class)->finalize(obj_self);
}

static void
class_init(
    XferDestTaperCacherClass * selfc)
{
    XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
    XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
    GObjectClass *goc = G_OBJECT_CLASS(selfc);
    static xfer_element_mech_pair_t mech_pairs[] = {
	{ XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_PUSH_BUFFER_STATIC, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
	{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) }
    };

    klass->start = start_impl;
    klass->cancel = cancel_impl;
    klass->push_buffer = push_buffer_impl;
    klass->push_buffer_static = push_buffer_static_impl;
    xdt_klass->start_part = start_part_impl;
    xdt_klass->use_device = use_device_impl;
    xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
    goc->finalize = finalize_impl;

    klass->perl_class = "Amanda::Xfer::Dest::Taper::Cacher";
    klass->mech_pairs = mech_pairs;

    parent_class = g_type_class_peek_parent(selfc);
}

static GType
xfer_dest_taper_cacher_get_type (void)
{
    static GType type = 0;

    if (G_UNLIKELY(type == 0)) {
        static const GTypeInfo info = {
            sizeof (XferDestTaperCacherClass),
            (GBaseInitFunc) NULL,
            (GBaseFinalizeFunc) NULL,
            (GClassInitFunc) class_init,
            (GClassFinalizeFunc) NULL,
            NULL /* class_data */,
            sizeof (XferDestTaperCacher),
            0 /* n_preallocs */,
            (GInstanceInitFunc) instance_init,
            NULL
        };

        type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperCacher", &info, 0);
    }

    return type;
}

/*
 * Constructor
 */

XferElement *
xfer_dest_taper_cacher(
    Device *first_device,
    size_t max_memory,
    guint64 part_size,
    gboolean use_mem_cache,
    const char *disk_cache_dirname)
{
    XferDestTaperCacher *self = (XferDestTaperCacher *)g_object_new(XFER_DEST_TAPER_CACHER_TYPE, NULL);

    self->max_memory = max_memory;
    self->part_size = part_size;
    self->partnum = 1;
    self->device = first_device;
    g_object_ref(self->device);

    /* pick only one caching mechanism, caller! */
    if (use_mem_cache)
	g_assert(!disk_cache_dirname);
    if (disk_cache_dirname)
	g_assert(!use_mem_cache);

    /* and if part size is zero, then we don't do any caching */
    g_assert(part_size != 0 || (!use_mem_cache && !disk_cache_dirname));

    self->use_mem_cache = use_mem_cache;
    if (disk_cache_dirname)
	self->disk_cache_dirname = g_strdup(disk_cache_dirname);

    /* calculate the device-dependent parameters */
    self->block_size = first_device->block_size;

    /* The slab size should be large enough to justify the overhead of all
     * of the mutexes, but it needs to be small enough to have a few slabs
     * available so that the threads are not constantly waiting on one
     * another.  The choice is sixteen blocks, not more than a quarter of
     * the part size, and not more than 10MB.  If we're not using the mem
     * cache, then avoid exceeding max_memory by keeping the slab size less
     * than a quarter of max_memory. */

    self->slab_size = self->block_size * 16;
    if (self->part_size)
        self->slab_size = MIN(self->slab_size, self->part_size / 4);
    self->slab_size = MIN(self->slab_size, 10*1024*1024);
    if (!use_mem_cache)
        self->slab_size = MIN(self->slab_size, self->max_memory / 4);

    /* round slab size up to the nearest multiple of the block size */
    self->slab_size =
        ((self->slab_size + self->block_size - 1) / self->block_size) * self->block_size;

    /* round part size up to a multiple of the slab size */
    if (self->part_size != 0) {
        self->slabs_per_part = (self->part_size + self->slab_size - 1) / self->slab_size;
        self->part_size = self->slabs_per_part * self->slab_size;
    } else {
        self->slabs_per_part = 0;
    }

    /* set max_slabs */
    if (use_mem_cache) {
        self->max_slabs = self->slabs_per_part; /* increase max_slabs to serve as mem buf */
    } else {
	self->max_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
    }

    /* Note that max_slabs == 1 will cause deadlocks, due to some assumptions in
        * alloc_slab, so we check here that it's at least 2. */
    if (self->max_slabs < 2)
        self->max_slabs = 2;

    DBG(1, "using slab_size %zu and max_slabs %ju", self->slab_size, (uintmax_t)self->max_slabs);

    return XFER_ELEMENT(self);
}