/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
* Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
* Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Contact information: Carbonite Inc., 756 N Pastoria Ave
* Sunnyvale, CA 94085, or: http://www.zmanda.com
*/
#include "amanda.h"
#include "amxfer.h"
#include "xfer-device.h"
#include "conffile.h"
#include "device.h"
/* A transfer destination that writes an entire dumpfile to one or more files
* on one or more devices, without any caching. This destination supports both
* LEOM-based splitting (in which parts are never rewound) and cache_inform-based
* splitting (in which rewound parts are read from holding disk). */
/*
* File Slices - Cache Information
*
* The cache_inform implementation adds cache information to a linked list of
* these objects, in order. The objects are arranged in a linked list, and
* describe the files in which the part data is stored. Note that we assume
* that slices are added *before* they are needed: the xfer element will fail
* if it tries to rewind and does not find a suitable slice.
*
* The slices should be "fast forwarded" after every part, so that the first
* byte in part_slices is the first byte of the part; when a retry of a part is
* required, use the iterator methods to properly open the various files and do
* the buffering.
*/
typedef struct FileSlice {
struct FileSlice *next;
/* fully-qualified filename to read from (or NULL to read from
* disk_cache_read_fd in XferDestTaperCacher) */
char *filename;
/* offset in file to start at */
guint64 offset;
/* length of data to read */
guint64 length;
} FileSlice;
/*
* Xfer Dest Taper
*/
static GObjectClass *parent_class = NULL;
typedef struct XferDestTaperSplitter {
XferDestTaper __parent__;
/* object parameters
*
* These values are supplied to the constructor, and can be assumed
* constant for the lifetime of the element.
*/
/* Maximum size of each part (bytes) */
guint64 part_size;
/* the device's need for streaming (it's assumed that all subsequent devices
* have the same needs) */
StreamingRequirement streaming;
/* block size expected by the target device */
gsize block_size;
/* TRUE if this element is expecting slices via cache_inform */
gboolean expect_cache_inform;
/* The thread doing the actual writes to tape; this also handles buffering
* for streaming */
GThread *device_thread;
/* Ring Buffer */
GMutex *ring_mutex;
GCond *ring_cond;
mem_ring_t *mem_ring;
gboolean ring_ready;
/* Element State
*
* "state" includes all of the variables below (including device
* parameters). Note that the device_thread holdes this mutex for the
* entire duration of writing a part.
*
* state_mutex should always be locked before mem_ring->mutex, if both are to be
* held simultaneously.
*/
GMutex *state_mutex;
GCond *state_cond;
volatile gboolean paused;
/* The device to write to, and the header to write to it */
Device *volatile device;
dumpfile_t *volatile part_header;
/* bytes to read from cached slices before reading from the ring buffer */
guint64 bytes_to_read_from_slices;
guint64 max_memory;
/* part number in progress */
volatile guint64 partnum;
/* status of the last part */
gboolean last_part_eof;
gboolean last_part_eom;
gboolean last_part_successful;
/* true if the element is done writing to devices */
gboolean no_more_parts;
/* total bytes written in the current part */
volatile guint64 part_bytes_written;
/* The list of active slices for the current part. The cache_inform method
* appends to this list. It is safe to read this linked list, beginning at
* the head, *if* you can guarantee that slices will not be fast-forwarded
* in the interim. The finalize method for this class will take care of
* freeing any leftover slices. Take the part_slices mutex while modifying
* the links in this list. */
FileSlice *part_slices;
GMutex *part_slices_mutex;
crc_t crc_before_part;
int more_space;
} XferDestTaperSplitter;
static DeviceWriteResult retry_write(XferDestTaperSplitter *self, gsize to_write,
gpointer buf);
static GType xfer_dest_taper_splitter_get_type(void);
#define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
#define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
#define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
#define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
#define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
#define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
typedef struct {
XferDestTaperClass __parent__;
} XferDestTaperSplitterClass;
/*
* Debug logging
*/
#define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
static void
_xdt_dbg(const char *fmt, ...)
{
va_list argp;
char msg[1024];
arglist_start(argp, fmt);
g_vsnprintf(msg, sizeof(msg), fmt, argp);
arglist_end(argp);
g_debug("XDTS: %s", msg);
}
/* "Fast forward" the slice list by the given length. This will free any
* slices that are no longer necessary, and adjust the offset and length of the
* first remaining slice. This assumes the state mutex is locked during its
* operation.
*
* @param self: element
* @param length: number of bytes to fast forward
*/
static void
fast_forward_slices(
XferDestTaperSplitter *self,
guint64 length)
{
FileSlice *slice;
/* consume slices until we've eaten the whole part */
g_mutex_lock(self->part_slices_mutex);
while (length > 0) {
g_assert(self->part_slices);
slice = self->part_slices;
if (slice->length <= length) {
length -= slice->length;
self->part_slices = slice->next;
if (slice->filename)
g_free(slice->filename);
g_free(slice);
slice = self->part_slices;
} else {
slice->length -= length;
slice->offset += length;
break;
}
}
g_mutex_unlock(self->part_slices_mutex);
}
/*
* Slice Iterator
*/
/* A struct for use in iterating over data in the slices */
typedef struct SliceIterator {
/* current slice */
FileSlice *slice;
/* file descriptor of the current file, or -1 if it's not open yet */
int cur_fd;
/* bytes remaining in this slice */
guint64 slice_remaining;
} SliceIterator;
/* Utility functions for SliceIterator */
/* Begin iterating over slices, starting at the first byte of the first slice.
* Initializes a pre-allocated SliceIterator. The caller must ensure that
* fast_forward_slices is not called while an iteration is in
* progress.
*/
static void
iterate_slices(
XferDestTaperSplitter *self,
SliceIterator *iter)
{
iter->cur_fd = -1;
iter->slice_remaining = 0;
g_mutex_lock(self->part_slices_mutex);
iter->slice = self->part_slices;
/* it's safe to unlock this because, at worst, a new entry will
* be appended while the iterator is in progress */
g_mutex_unlock(self->part_slices_mutex);
}
/* Get a block of data from the iterator, returning a pointer to a buffer
* containing the data; the buffer remains the property of the iterator.
* Returns NULL on error, after calling xfer_cancel_with_error with an
* appropriate error message. This function does not block, so it does not
* check for cancellation.
*/
static gpointer
iterator_get_block(
XferDestTaperSplitter *self,
SliceIterator *iter,
gpointer buf,
gsize bytes_needed)
{
gsize buf_offset = 0;
XferElement *elt = XFER_ELEMENT(self);
g_assert(iter != NULL);
g_assert(buf != NULL);
while (bytes_needed > 0) {
gsize read_size;
gsize bytes_read;
if (iter->cur_fd < 0) {
guint64 offset;
g_assert(iter->slice != NULL);
g_assert(iter->slice->filename != NULL);
iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
if (iter->cur_fd < 0) {
xfer_cancel_with_error(elt,
_("Could not open '%s' for reading: %s"),
iter->slice->filename, strerror(errno));
return NULL;
}
iter->slice_remaining = iter->slice->length;
offset = iter->slice->offset;
if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
xfer_cancel_with_error(elt,
_("Could not seek '%s' for reading: %s"),
iter->slice->filename, strerror(errno));
return NULL;
}
}
read_size = MIN(iter->slice_remaining, bytes_needed);
bytes_read = read_fully(iter->cur_fd, (gchar *)buf + buf_offset, read_size,
NULL);
if (bytes_read < read_size) {
xfer_cancel_with_error(elt, _("Error reading '%s': %s"),
iter->slice->filename,
errno ? strerror(errno) : _("Unexpected EOF"));
return NULL;
}
iter->slice_remaining -= bytes_read;
buf_offset += bytes_read;
bytes_needed -= bytes_read;
if (iter->slice_remaining <= 0) {
if (close(iter->cur_fd) < 0) {
xfer_cancel_with_error(elt,
_("Could not close fd %d: %s"),
iter->cur_fd, strerror(errno));
return NULL;
}
iter->cur_fd = -1;
iter->slice = iter->slice->next;
if (elt->cancelled)
return NULL;
}
}
return buf;
}
/* Free the iterator's resources */
static void
iterator_free(
SliceIterator *iter)
{
if (iter->cur_fd >= 0)
close(iter->cur_fd);
}
/*
* Device Thread
*/
/* Wait for at least one block, or EOF, to be available in the ring buffer.
* Called with the ring mutex held. */
static gsize
device_thread_wait_for_block(
XferDestTaperSplitter *self,
gboolean *eof_flag)
{
XferElement *elt = XFER_ELEMENT(self);
gsize bytes_needed = self->device->block_size;
gsize usable = 0;
*eof_flag = FALSE;
if (self->mem_ring) {
gsize max_ring_block_size = MAX(self->mem_ring->producer_block_size,
self->mem_ring->consumer_block_size);
/* for any kind of streaming, we need to fill the entire buffer before the
* first byte */
if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
bytes_needed = self->mem_ring->ring_size - max_ring_block_size;
while (1) {
/* are we ready? */
if (elt->cancelled)
break;
usable = self->mem_ring->written - self->mem_ring->readx;
*eof_flag = self->mem_ring->eof_flag;
if (usable >= bytes_needed) {
break;
}
if (self->mem_ring->eof_flag)
break;
/* nope - so wait */
g_cond_wait(self->mem_ring->add_cond, self->mem_ring->mutex);
/* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
* we need to wait for the entire buffer to fill */
if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
bytes_needed = self->mem_ring->ring_size - max_ring_block_size;
}
} else { // shm_ring
gsize max_ring_block_size = MAX(elt->shm_ring->mc->producer_block_size,
elt->shm_ring->mc->consumer_block_size);
if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
bytes_needed = elt->shm_ring->ring_size - max_ring_block_size;
while (!elt->cancelled &&
!elt->shm_ring->mc->cancelled) {
if (shm_ring_sem_wait(elt->shm_ring, elt->shm_ring->sem_read) != 0)
break;
usable = elt->shm_ring->mc->written - elt->shm_ring->mc->readx;
*eof_flag = elt->shm_ring->mc->eof_flag;
if (elt->cancelled || elt->shm_ring->mc->cancelled)
break;
if (usable >= bytes_needed)
break;
if (*eof_flag)
break;
if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
bytes_needed = elt->shm_ring->ring_size - max_ring_block_size;
}
if (elt->shm_ring->mc->cancelled && !elt->cancelled) {
xfer_cancel_with_error(elt, "shm_ring_cancelled");
}
}
if (self->part_size)
usable = MIN(usable, self->part_size - self->part_bytes_written);
return usable;
}
/* Mark readx bytes as free in the ring buffer. Called with the ring mutex
* held. */
static void
device_thread_consume_block(
XferDestTaperSplitter *self,
gsize readx)
{
XferElement *elt = XFER_ELEMENT(self);
uint64_t read_offset;
if (self->mem_ring) {
read_offset = self->mem_ring->read_offset + readx;
if (read_offset >= self->mem_ring->ring_size)
read_offset -= self->mem_ring->ring_size;
self->mem_ring->readx += readx;
self->mem_ring->read_offset = read_offset;
g_cond_broadcast(self->mem_ring->free_cond);
} else { // shm_ring
read_offset = elt->shm_ring->mc->read_offset + readx;
if (read_offset >= elt->shm_ring->ring_size)
read_offset -= elt->shm_ring->ring_size;
elt->shm_ring->mc->readx += readx;
elt->shm_ring->mc->read_offset = read_offset;
sem_post(elt->shm_ring->sem_write);
}
}
/* Write an entire part. Called with the state_mutex held */
static XMsg *
device_thread_write_part(
XferDestTaperSplitter *self)
{
GTimer *timer = g_timer_new();
XferElement *elt = XFER_ELEMENT(self);
enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
int fileno = 0;
XMsg *msg;
void *buf;
self->part_bytes_written = 0;
self->crc_before_part = elt->crc;
g_timer_start(timer);
/* write the header; if this fails or hits LEOM, we consider this a
* successful 0-byte part */
if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
part_status = PART_LEOM;
goto part_done;
}
fileno = self->device->file;
g_assert(fileno > 0);
/* free the header, now that it's written */
dumpfile_free(self->part_header);
self->part_header = NULL;
/* First, read the requisite number of bytes from the part_slices, if the part was
* unsuccessful. */
if (self->bytes_to_read_from_slices) {
SliceIterator iter;
gsize to_write = self->block_size;
gpointer buf = g_malloc(to_write);
gboolean successful = TRUE;
guint64 bytes_from_slices = self->bytes_to_read_from_slices;
DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
iterate_slices(self, &iter);
while (bytes_from_slices) {
DeviceWriteResult ok;
if (!iterator_get_block(self, &iter, buf, to_write)) {
part_status = PART_FAILED;
successful = FALSE;
break;
}
/* note that it's OK to reference these ring_* vars here, as they
* are static at this point */
ok = device_write_block(self->device, (guint)to_write, buf);
DBG(8, "writing %ju bytes from slice to device", (uintmax_t)to_write);
if (ok == WRITE_SPACE)
ok = retry_write(self, to_write, buf);
if (ok == WRITE_FAILED) {
part_status = PART_FAILED;
successful = FALSE;
break;
} else if (ok == WRITE_FULL) {
part_status = PART_EOP;
successful = FALSE;
break;
} else if (ok == WRITE_SPACE) {
part_status = PART_EOP;
successful = FALSE;
break;
}
self->part_bytes_written += to_write;
bytes_from_slices -= to_write;
crc32_add((uint8_t *)buf, to_write, &elt->crc);
if (self->part_size && self->part_bytes_written >= self->part_size) {
part_status = PART_EOP;
successful = FALSE;
break;
} else if (self->device->is_eom) {
part_status = PART_LEOM;
successful = FALSE;
break;
}
}
iterator_free(&iter);
g_free(buf);
/* if we didn't finish, get out of here now */
if (!successful)
goto part_done;
}
if (self->mem_ring)
g_mutex_lock(self->mem_ring->mutex);
while (!elt->cancelled &&
(!elt->shm_ring || !elt->shm_ring->mc->cancelled)) {
DeviceWriteResult ok;
gboolean eof_flag;
/* wait for at least one block, and (if necessary) prebuffer */
gsize to_writeX = device_thread_wait_for_block(self, &eof_flag);
if (elt->cancelled || (elt->shm_ring && elt->shm_ring->mc->cancelled))
break;
while (to_writeX >= self->device->block_size || eof_flag) {
//crc_t block_crc;
gsize to_write = MIN(to_writeX, self->device->block_size);
if (elt->cancelled)
goto part_done_unlock;
if (to_write == 0) {
part_status = PART_EOF;
goto part_done_unlock;
}
if (self->mem_ring)
g_mutex_unlock(self->mem_ring->mutex);
DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
/* note that it's OK to reference these ring_* vars here, as they
* are static at this point */
if (self->mem_ring) {
buf = self->mem_ring->buffer + self->mem_ring->read_offset;
} else {
buf = elt->shm_ring->data + elt->shm_ring->mc->read_offset;
}
ok = device_write_block(self->device, (guint)to_write, buf);
if (ok == WRITE_SPACE)
ok = retry_write(self, to_write, buf);
if (self->mem_ring)
g_mutex_lock(self->mem_ring->mutex);
if (ok == WRITE_FAILED) {
part_status = PART_FAILED;
goto part_done_unlock;
} else if (ok == WRITE_FULL) {
part_status = PART_EOP;
goto part_done_unlock;
} else if (ok == WRITE_SPACE) {
part_status = PART_EOP;
goto part_done_unlock;
}
crc32_add((uint8_t *)(buf),
to_write, &elt->crc);
self->part_bytes_written += to_write;
device_thread_consume_block(self, to_write);
if (self->part_size && self->part_bytes_written >= self->part_size) {
part_status = PART_EOP;
goto part_done_unlock;
} else if (self->device->is_eom) {
part_status = PART_LEOM;
goto part_done_unlock;
}
to_writeX -= to_write;
}
}
part_done_unlock:
if (self->mem_ring)
g_mutex_unlock(self->mem_ring->mutex);
part_done:
if (elt->shm_ring) {
if (elt->cancelled) {
if (elt->shm_ring) {
g_debug("device_thread_write_part: cancelling shm-ring because xfer is cancelled");
elt->shm_ring->mc->cancelled = TRUE;
}
} else if (elt->shm_ring->mc->cancelled) {
part_status = PART_FAILED;
xfer_cancel_with_error(elt, "shm_ring cancelled");
}
sem_post(elt->shm_ring->sem_read);
sem_post(elt->shm_ring->sem_read);
sem_post(elt->shm_ring->sem_read);
sem_post(elt->shm_ring->sem_write);
}
/* if we write all of the blocks, but the finish_file fails, then likely
* there was some buffering going on in the device driver, and the blocks
* did not all make it to permanent storage -- so it's a failed part. Note
* that we try to finish_file even if the part failed, just to be thorough. */
if (self->device->in_file) {
if (!device_finish_file(self->device)) {
if (!elt->cancelled) {
part_status = PART_FAILED;
}
}
}
g_timer_stop(timer);
if (part_status == PART_FAILED) {
elt->crc = self->crc_before_part;
}
msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
msg->size = self->part_bytes_written;
msg->duration = g_timer_elapsed(timer, NULL);
msg->partnum = self->partnum;
msg->fileno = fileno;
msg->successful = self->last_part_successful = part_status != PART_FAILED;
msg->eom = self->last_part_eom = part_status == PART_LEOM || self->device->is_eom;
msg->eof = self->last_part_eof = part_status == PART_EOF;
/* time runs backward on some test boxes, so make sure this is positive */
if (msg->duration < 0) msg->duration = 0;
if (msg->successful && msg->size > 0)
self->partnum++;
//self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
self->no_more_parts = msg->eof;
g_timer_destroy(timer);
return msg;
}
static gpointer
device_thread(
gpointer data)
{
XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
XferElement *elt = XFER_ELEMENT(self);
XMsg *msg;
DBG(1, "(this is the device thread)");
if (elt->input_mech == XFER_MECH_PUSH_BUFFER) {
self->mem_ring = create_mem_ring();
init_mem_ring(self->mem_ring, self->max_memory, self->device->block_size);
} else if (elt->input_mech == XFER_MECH_MEM_RING) {
self->mem_ring = xfer_element_get_mem_ring(elt->upstream);
mem_ring_consumer_set_size(self->mem_ring, self->max_memory, self->device->block_size);
} else if (elt->input_mech == XFER_MECH_SHM_RING) {
shm_ring_consumer_set_size(elt->shm_ring, self->max_memory, self->device->block_size);
if (elt->input_mech == XFER_MECH_SHM_RING &&
elt->shm_ring->mc->cancelled) {
g_debug("A shm_ring is cancelled");
if (!elt->cancelled) {
xfer_cancel_with_error(XFER_ELEMENT(self)->upstream,
_("failed to set shm-ring"));
}
goto device_thread_done;
}
}
crc32_init(&elt->crc);
/* This is the outer loop, that loops once for each split part written to
* tape. */
g_mutex_lock(self->ring_mutex);
self->ring_ready = TRUE;
g_cond_broadcast(self->ring_cond);
g_mutex_unlock(self->ring_mutex);
g_mutex_lock(self->state_mutex);
while (1) {
/* wait until the main thread un-pauses us, and check that we have
* the relevant device info available (block_size) */
while (self->paused && !elt->cancelled) {
DBG(9, "device_thread waiting to be unpaused");
g_cond_wait(self->state_cond, self->state_mutex);
}
DBG(9, "device_thread done waiting");
if (elt->cancelled)
break;
if (elt->input_mech == XFER_MECH_SHM_RING &&
elt->shm_ring->mc->cancelled) {
g_debug("B shm_ring is cancelled");
if (!elt->cancelled) {
xfer_cancel_with_error(XFER_ELEMENT(self)->upstream,
_("shm-ring cancelled"));
}
break;
}
DBG(2, "device_thread beginning to write part");
msg = device_thread_write_part(self);
DBG(2, "device_thread done writing part");
if (!msg) /* cancelled */
break;
/* release the slices for this part, if there were any slices */
if (msg->successful && self->expect_cache_inform) {
fast_forward_slices(self, msg->size);
}
xfer_queue_message(elt->xfer, msg);
/* pause ourselves and await instructions from the main thread */
self->paused = TRUE;
/* if this is the last part, we're done with the part loop */
if (self->no_more_parts)
break;
}
g_mutex_unlock(self->state_mutex);
// notify the producer that everythinng is read
if (elt->input_mech == XFER_MECH_SHM_RING) {
sem_post(elt->shm_ring->sem_write);
}
g_debug("device_thread sending XMSG_CRC message");
DBG(2, "xfer-dest-taper-splitter CRC: %08x size %lld",
crc32_finish(&elt->crc), (long long)elt->crc.size);
msg = xmsg_new(XFER_ELEMENT(self), XMSG_CRC, 0);
msg->crc = crc32_finish(&elt->crc);
msg->size = elt->crc.size;
xfer_queue_message(elt->xfer, msg);
device_thread_done:
/* tell the main thread we're done */
xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
return NULL;
}
/*
* Class mechanics
*/
static void
push_buffer_impl(
XferElement *elt,
gpointer buf,
size_t size)
{
XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
gchar *p = buf;
DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
/* do nothing if cancelled */
if (G_UNLIKELY(elt->cancelled)) {
goto free_and_finish;
}
if (G_UNLIKELY(!self->ring_ready)) {
g_mutex_lock(self->ring_mutex);
while (!self->ring_ready && !elt->cancelled) {
g_cond_wait(self->ring_cond, self->ring_mutex);
}
if (elt->cancelled)
goto unlock_and_free_and_finish;
g_mutex_unlock(self->ring_mutex);
}
/* handle EOF */
if (G_UNLIKELY(buf == NULL)) {
/* indicate EOF to the device thread */
g_mutex_lock(self->mem_ring->mutex);
self->mem_ring->eof_flag = TRUE;
g_cond_broadcast(self->mem_ring->add_cond);
g_mutex_unlock(self->mem_ring->mutex);
goto free_and_finish;
}
/* push the block into the ring buffer, in pieces if necessary */
g_mutex_lock(self->mem_ring->mutex);
while (size > 0) {
gsize avail;
/* wait for some space */
while (self->mem_ring->written - self->mem_ring->readx == self->mem_ring->ring_size && !elt->cancelled) {
DBG(9, "push_buffer waiting for any space to buffer pushed data");
g_cond_wait(self->mem_ring->free_cond, self->mem_ring->mutex);
}
DBG(9, "push_buffer done waiting");
if (elt->cancelled)
goto unlock_and_free_and_finish;
/* only copy to the end of the buffer, if the available space wraps
* around to the beginning */
avail = MIN(size, self->mem_ring->ring_size - (self->mem_ring->written - self->mem_ring->readx));
avail = MIN(avail, self->mem_ring->ring_size - self->mem_ring->write_offset);
/* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
memmove(self->mem_ring->buffer + self->mem_ring->write_offset, p, avail);
/* reset the ring variables to represent this state */
self->mem_ring->written += avail;
self->mem_ring->write_offset += avail; /* will, at most, hit mem_ring->ring_size */
if (self->mem_ring->write_offset == self->mem_ring->ring_size)
self->mem_ring->write_offset = 0;
p = (gpointer)((guchar *)p + avail);
size -= avail;
/* and give the device thread a notice that data is ready */
g_cond_broadcast(self->mem_ring->add_cond);
}
unlock_and_free_and_finish:
g_mutex_unlock(self->mem_ring->mutex);
free_and_finish:
if (buf)
g_free(buf);
}
/*
* Element mechanics
*/
static gboolean
start_impl(
XferElement *elt)
{
XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
GError *error = NULL;
self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
if (!self->device_thread) {
g_critical(_("Error creating new thread: %s (%s)"),
error->message, errno? strerror(errno) : _("no error code"));
}
return TRUE;
}
static gboolean
cancel_impl(
XferElement *elt,
gboolean expect_eof)
{
XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
gboolean rv;
/* chain up first */
rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
/* then signal all of our condition variables, so that threads waiting on them
* wake up and see elt->cancelled. */
g_mutex_lock(self->ring_mutex);
g_cond_broadcast(self->ring_cond);
g_mutex_unlock(self->ring_mutex);
if (elt->shm_ring && !elt->shm_ring->mc->cancelled) {
if (!elt->shm_ring->mc->cancelled) {
g_debug("XDTS:cancel_impl: cancelling shm-ring because xfer is cancelled");
elt->shm_ring->mc->cancelled = TRUE;
}
sem_post(elt->shm_ring->sem_ready);
sem_post(elt->shm_ring->sem_start);
sem_post(elt->shm_ring->sem_read);
sem_post(elt->shm_ring->sem_write);
}
if (self->mem_ring) {
g_mutex_lock(self->mem_ring->mutex);
self->mem_ring->eof_flag = TRUE;
g_cond_broadcast(self->mem_ring->add_cond);
g_cond_broadcast(self->mem_ring->free_cond);
g_mutex_unlock(self->mem_ring->mutex);
}
g_mutex_lock(self->state_mutex);
g_cond_broadcast(self->state_cond);
g_mutex_unlock(self->state_mutex);
return rv;
}
static void
start_part_impl(
XferDestTaper *xdt,
gboolean retry_part,
dumpfile_t *header)
{
XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
XferElement *elt = XFER_ELEMENT(self);
g_assert(self->device != NULL);
g_assert(!self->device->in_file);
g_assert(header != NULL);
DBG(1, "start_part() start_part_impl");
/* we can only retry the part if we're getting slices via cache_inform's */
if (retry_part) {
if (self->last_part_successful) {
xfer_cancel_with_error(XFER_ELEMENT(self),
_("Previous part did not fail; cannot retry"));
if (elt->shm_ring && !elt->shm_ring->mc->cancelled) {
elt->shm_ring->mc->cancelled = TRUE;
sem_post(elt->shm_ring->sem_ready);
sem_post(elt->shm_ring->sem_start);
sem_post(elt->shm_ring->sem_read);
sem_post(elt->shm_ring->sem_write);
}
return;
}
if (!self->expect_cache_inform) {
xfer_cancel_with_error(XFER_ELEMENT(self),
_("No cache for previous failed part; cannot retry"));
if (elt->shm_ring && !elt->shm_ring->mc->cancelled) {
elt->shm_ring->mc->cancelled = TRUE;
sem_post(elt->shm_ring->sem_ready);
sem_post(elt->shm_ring->sem_start);
sem_post(elt->shm_ring->sem_read);
sem_post(elt->shm_ring->sem_write);
}
return;
}
self->bytes_to_read_from_slices = self->part_bytes_written;
} else {
/* don't read any bytes from the slices, since we're not retrying */
self->bytes_to_read_from_slices = 0;
}
g_mutex_lock(self->state_mutex);
g_assert(self->paused);
g_assert(!self->no_more_parts);
if (self->part_header)
dumpfile_free(self->part_header);
self->part_header = dumpfile_copy(header);
DBG(1, "unpausing");
self->paused = FALSE;
g_cond_broadcast(self->state_cond);
g_mutex_unlock(self->state_mutex);
}
static void
use_device_impl(
XferDestTaper *xdtself,
Device *device)
{
XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
StreamingRequirement newstreaming;
GValue val;
DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
/* short-circuit if nothing is changing */
if (self->device == device)
return;
g_mutex_lock(self->state_mutex);
if (self->device)
g_object_unref(self->device);
self->device = device;
g_object_ref(device);
/* get this new device's streaming requirements */
bzero(&val, sizeof(val));
if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
|| !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
g_warning("Couldn't get streaming type for %s", self->device->device_name);
} else {
newstreaming = g_value_get_enum(&val);
if (newstreaming != self->streaming)
g_warning("New device has different streaming requirements from the original; "
"ignoring new requirement");
}
g_value_unset(&val);
/* check that the blocksize hasn't changed */
if (self->block_size != device->block_size) {
g_mutex_unlock(self->state_mutex);
xfer_cancel_with_error(XFER_ELEMENT(self),
_("All devices used by the taper must have the same block size"));
return;
}
g_mutex_unlock(self->state_mutex);
}
static void
cache_inform_impl(
XferDestTaper *xdt,
const char *filename,
off_t offset,
off_t length)
{
XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
FileSlice *slice = g_new(FileSlice, 1), *iter;
slice->next = NULL;
slice->filename = g_strdup(filename);
slice->offset = offset;
slice->length = length;
g_mutex_lock(self->part_slices_mutex);
if (self->part_slices) {
for (iter = self->part_slices; iter->next; iter = iter->next) {}
iter->next = slice;
} else {
self->part_slices = slice;
}
g_mutex_unlock(self->part_slices_mutex);
}
static guint64
get_part_bytes_written_impl(
XferDestTaper *xdtself)
{
XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
/* NOTE: this access is unsafe and may return inconsistent results (e.g, a
* partial write to the 64-bit value on a 32-bit system). This is ok for
* the moment, as it's only informational, but be warned. */
if (self->device) {
return device_get_bytes_written(self->device);
} else {
return self->part_bytes_written;
}
}
static void
new_space_available_impl(
XferDestTaper *xdtself,
int made_space)
{
XferDestTaperSplitter *self;
self = XFER_DEST_TAPER_SPLITTER(xdtself);
self->more_space = made_space;
g_mutex_lock(self->state_mutex);
device_reset(self->device);
g_cond_broadcast(self->state_cond);
g_mutex_unlock(self->state_mutex);
}
static void
instance_init(
XferElement *elt)
{
XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
elt->can_generate_eof = FALSE;
self->ring_mutex = g_mutex_new();
self->ring_cond = g_cond_new();
self->state_mutex = g_mutex_new();
self->state_cond = g_cond_new();
self->part_slices_mutex = g_mutex_new();
self->device = NULL;
self->paused = TRUE;
self->part_header = NULL;
self->partnum = 1;
self->part_bytes_written = 0;
self->part_slices = NULL;
crc32_init(&elt->crc);
}
static gboolean
setup_impl(
XferElement *elt)
{
if (elt->input_mech == XFER_MECH_SHM_RING) {
elt->shm_ring = shm_ring_create(NULL);
}
return TRUE;
}
static void
finalize_impl(
GObject * obj_self)
{
XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
XferElement *elt = XFER_ELEMENT(self);
FileSlice *slice, *next_slice;
g_mutex_free(self->ring_mutex);
g_cond_free(self->ring_cond);
g_mutex_free(self->state_mutex);
g_cond_free(self->state_cond);
if (self->mem_ring) {
g_mutex_free(self->mem_ring->mutex);
g_cond_free(self->mem_ring->add_cond);
g_cond_free(self->mem_ring->free_cond);
}
if (elt->shm_ring) {
close_consumer_shm_ring(elt->shm_ring);
elt->shm_ring = NULL;
}
g_mutex_free(self->part_slices_mutex);
for (slice = self->part_slices; slice; slice = next_slice) {
next_slice = slice->next;
if (slice->filename)
g_free(slice->filename);
g_free(slice);
}
if (self->mem_ring && self->mem_ring->buffer)
g_free(self->mem_ring->buffer);
if (self->part_header)
dumpfile_free(self->part_header);
if (self->device)
g_object_unref(self->device);
/* chain up */
G_OBJECT_CLASS(parent_class)->finalize(obj_self);
}
static void
class_init(
XferDestTaperSplitterClass * selfc)
{
XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
GObjectClass *goc = G_OBJECT_CLASS(selfc);
static xfer_element_mech_pair_t mech_pairs[] = {
{ XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(2), XFER_NTHREADS(1), XFER_NALLOC(0) },
{ XFER_MECH_MEM_RING, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(2), XFER_NALLOC(1) },
{ XFER_MECH_SHM_RING, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1), XFER_NALLOC(0) },
{ XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0), XFER_NALLOC(0) },
};
klass->setup = setup_impl;
klass->start = start_impl;
klass->cancel = cancel_impl;
klass->push_buffer = push_buffer_impl;
xdt_klass->start_part = start_part_impl;
xdt_klass->use_device = use_device_impl;
xdt_klass->cache_inform = cache_inform_impl;
xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
xdt_klass->new_space_available = new_space_available_impl;
goc->finalize = finalize_impl;
klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
klass->mech_pairs = mech_pairs;
parent_class = g_type_class_peek_parent(selfc);
}
static GType
xfer_dest_taper_splitter_get_type (void)
{
static GType type = 0;
if (G_UNLIKELY(type == 0)) {
static const GTypeInfo info = {
sizeof (XferDestTaperSplitterClass),
(GBaseInitFunc) NULL,
(GBaseFinalizeFunc) NULL,
(GClassInitFunc) class_init,
(GClassFinalizeFunc) NULL,
NULL /* class_data */,
sizeof (XferDestTaperSplitter),
0 /* n_preallocs */,
(GInstanceInitFunc) instance_init,
NULL
};
type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
}
return type;
}
/*
* Constructor
*/
XferElement *
xfer_dest_taper_splitter(
Device *first_device,
size_t max_memory,
guint64 part_size,
gboolean expect_cache_inform)
{
XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
GValue val;
/* max_memory and part_size get rounded up to the next multiple of
* block_size */
max_memory = ((max_memory + first_device->block_size - 1)
/ first_device->block_size) * first_device->block_size;
if (part_size)
part_size = ((part_size + first_device->block_size - 1)
/ first_device->block_size) * first_device->block_size;
self->part_size = part_size;
self->partnum = 1;
self->device = first_device;
g_object_ref(self->device);
self->block_size = first_device->block_size;
self->paused = TRUE;
self->no_more_parts = FALSE;
self->max_memory = max_memory;
/* get this new device's streaming requirements */
bzero(&val, sizeof(val));
if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
|| !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
g_warning("Couldn't get streaming type for %s", self->device->device_name);
self->streaming = STREAMING_REQUIREMENT_REQUIRED;
} else {
self->streaming = g_value_get_enum(&val);
}
g_value_unset(&val);
/* grab data from cache_inform, just in case we hit PEOM */
self->expect_cache_inform = expect_cache_inform;
return XFER_ELEMENT(self);
}
static DeviceWriteResult
retry_write(
XferDestTaperSplitter *self,
gsize to_write,
gpointer buf)
{
XferElement *elt = XFER_ELEMENT(self);
XMsg *msg;
DeviceWriteResult result;
msg = xmsg_new(XFER_ELEMENT(self), XMSG_NO_SPACE, 0);
xfer_queue_message(elt->xfer, msg);
while(1) {
self->more_space = -1;
while (self->more_space == -1 && !elt->cancelled) {
g_cond_wait(self->state_cond, self->state_mutex);
}
if (elt->cancelled) {
return WRITE_FAILED;
}
result = device_write_block(self->device, (guint)to_write, buf);
if (result != WRITE_SPACE) {
return result; /* WRITE_SUCCEED, WRITE_FAILED or WRITE_FULL */
}
if (self->more_space == 0) {
return result; /* WRITE_SPACE */
}
}
}