Blob Blame History Raw
/*
 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
 * Copyright (c) 1991-1998 University of Maryland at College Park
 * Copyright (c) 2007-2012 Zmanda, Inc.  All Rights Reserved.
 * Copyright (c) 2013-2016 Carbonite, Inc.  All Rights Reserved.
 * All Rights Reserved.
 *
 * Permission to use, copy, modify, distribute, and sell this software and its
 * documentation for any purpose is hereby granted without fee, provided that
 * the above copyright notice appear in all copies and that both that
 * copyright notice and this permission notice appear in supporting
 * documentation, and that the name of U.M. not be used in advertising or
 * publicity pertaining to distribution of the software without specific,
 * written prior permission.  U.M. makes no representations about the
 * suitability of this software for any purpose.  It is provided "as is"
 * without express or implied warranty.
 *
 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 *
 * Author: James da Silva, Systems Design and Analysis Group
 *			   Computer Science Department
 *			   University of Maryland at College Park
 */
/*
 * $Id: driverio.c,v 1.92 2006/08/24 01:57:16 paddy_s Exp $
 *
 * I/O-related functions for driver program
 */
#include "amanda.h"
#include "amutil.h"
#include "clock.h"
#include "server_util.h"
#include "conffile.h"
#include "diskfile.h"
#include "infofile.h"
#include "logfile.h"
#include "timestamp.h"

#define GLOBAL		/* the global variables defined here */
#include "driverio.h"

static const char *childstr(int);
static long generation = 1;

typedef struct serial_s {
    long   gen;
    job_t *job;
} serial_t;

static int max_serial;
static serial_t *stable;
static int max_jobs;
static job_t *jobs;

static taper_t * start_one_tape_process(char *taper_program,
					char *storage_name,
					gboolean  no_taper,
					int nb_taper);

void
init_driverio(
    int inparallel,
    int nb_storage,
    int sum_taper_parallel_write)
{
    dumper_t *dumper;

    tapetable = g_new0(taper_t, nb_storage+1);
    dmptable = g_new0(dumper_t, inparallel+1);
    chktable = g_new0(chunker_t, inparallel+1);
    for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) {
	dumper->fd = -1;
    }

    max_serial = inparallel + sum_taper_parallel_write;
    stable = g_new0(serial_t, max_serial);
    max_jobs = inparallel + sum_taper_parallel_write;
    jobs   = g_new0(job_t, max_jobs);
}


static const char *
childstr(
    int fd)
{
    static char buf[NUM_STR_SIZE + 32];
    dumper_t  *dumper;
    chunker_t *chunker;
    taper_t   *taper;

    for (taper = tapetable; taper->fd != 0; taper++) {
	if (taper->fd == fd)
	    return (taper->name);
    }

    for (dumper = dmptable; dumper->fd != 0; dumper++) {
	if (dumper->fd == fd)
	    return (dumper->name);
    }
    for (chunker = chktable; chunker->fd != 0; chunker++) {
	if (chunker->fd == fd)
	    return (chunker->name);
    }
    g_snprintf(buf, sizeof(buf), _("unknown child (fd %d)"), fd);
    return (buf);
}


static int nb_taper = 0;
int
startup_dump_tape_process(
    char      *taper_program,
    gboolean   no_taper)
{
    identlist_t  il;
    taper_t     *taper;

    for (il = getconf_identlist(CNF_STORAGE); il != NULL; il = il->next) {
	taper = start_one_tape_process(taper_program, (char *)il->data, no_taper, nb_taper);
	if (taper) {
	    taper->flush_storage = TRUE;
	    nb_taper++;
	}
    }
    return nb_taper;
}

int
startup_vault_tape_process(
    char      *taper_program,
    gboolean   no_taper)
{
    identlist_t  il;
    taper_t     *taper;

    for (il = getconf_identlist(CNF_VAULT_STORAGE); il != NULL; il = il->next) {
	taper = start_one_tape_process(taper_program, (char *)il->data, no_taper, nb_taper);
	if (taper) {
	    taper->vault_storage = TRUE;
	    nb_taper++;
	}
    }
    return nb_taper;
}

static taper_t *
start_one_tape_process(
    char      *taper_program,
    char      *storage_n,
    gboolean   no_taper,
    int        nb_taper)
{
    int       fd[2];
    int       nb_wtaper;
    int       nb_worker;
    int       i;
    char    **config_options;
    char    **env;
    taper_t  *taper;
    wtaper_t *wtaper;

    storage_t *storage = lookup_storage(storage_n);
    char *tapetype;
    tapetype_t *tape;
    int   conf_flush_threshold_dumped;
    int   conf_flush_threshold_scheduled;
    int   conf_taperflush;

    taper = &tapetable[nb_taper];
    taper->pid = -1;
    for (i=0; i<nb_taper; i++) {
	if (tapetable[i].storage_name &&
	    g_str_equal(storage_n, tapetable[i].storage_name)) {
	    return NULL;
	}
    }

	taper->name = g_strdup_printf("taper%d", nb_taper);
	taper->storage_name = g_strdup(storage_n);
	taper->ev_read = NULL;
	taper->nb_wait_reply = 0;
	nb_worker = storage_get_taper_parallel_write(storage);
	taper->runtapes = storage_get_runtapes(storage);
	if (nb_worker > taper->runtapes)
	    nb_worker = taper->runtapes;
	taper->nb_worker = nb_worker;
	tapetype = storage_get_tapetype(storage);
	tape = lookup_tapetype(tapetype);
	taper->tape_length = tapetype_get_length(tape);
	taper->current_tape = 0;
	conf_flush_threshold_dumped = storage_get_flush_threshold_dumped(storage);
	conf_flush_threshold_scheduled = storage_get_flush_threshold_scheduled(storage);
	conf_taperflush = storage_get_taperflush(storage);
	taper->flush_threshold_dumped = (conf_flush_threshold_dumped * taper->tape_length) /100;
	taper->flush_threshold_scheduled = (conf_flush_threshold_scheduled * taper->tape_length) /100;
	taper->taperflush = (conf_taperflush * taper->tape_length) /100;
	g_debug("storage %s: tape_length %lld", storage_name(storage), (long long)taper->tape_length);
	g_debug("storage %s: flush_threshold_dumped %lld", storage_name(storage), (long long)taper->flush_threshold_dumped);
	g_debug("storage %s: flush_threshold_scheduled  %lld", storage_name(storage), (long long)taper->flush_threshold_scheduled );
	g_debug("storage %s: taperflush %lld", storage_name(storage), (long long)taper->taperflush);
	taper->max_dle_by_volume = storage_get_max_dle_by_volume(storage);
	taper->tapeq.head = NULL;
	taper->tapeq.tail = NULL;
	taper->vaultqss = NULL;
	taper->degraded_mode = no_taper;
	taper->down = FALSE;

	taper->wtapetable = g_new0(wtaper_t, tapetable[nb_taper].nb_worker + 1);
	if (!taper->wtapetable) {
	    error(_("could not g_malloc tapetable"));
	    /*NOTREACHED*/
	}

	for (wtaper = taper->wtapetable, nb_wtaper = 0; nb_wtaper < nb_worker; wtaper++, nb_wtaper++) {
	    wtaper->name = g_strdup_printf("worker%d-%d", nb_taper, nb_wtaper);
	    wtaper->sendresult = 0;
	    wtaper->input_error = NULL;
	    wtaper->tape_error = NULL;
	    wtaper->result = LAST_TOK;
	    wtaper->job = NULL;
	    wtaper->first_label = NULL;
	    wtaper->first_fileno = 0;
	    wtaper->state = TAPER_STATE_DEFAULT;
	    wtaper->left = 0;
	    wtaper->written = 0;
	    wtaper->vaultqs.src_labels_str = NULL;
	    wtaper->vaultqs.src_labels = NULL;
	    wtaper->vaultqs.vaultq.head = NULL;
	    wtaper->vaultqs.vaultq.tail = NULL;
	    wtaper->taper = taper;

	    /* jump right to degraded mode if there's no taper */
	    if (no_taper) {
		wtaper->tape_error = g_strdup("no taper started (--no-taper)");
		wtaper->result = BOGUS;
	    }
	}

	/* don't start the taper if we're not supposed to */
	taper->fd = -1;
	if (no_taper)
	    return NULL;

	if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) {
	    error(_("taper pipe: %s"), strerror(errno));
	    /*NOTREACHED*/
	}
	if (fd[0] < 0 || fd[0] >= (int)FD_SETSIZE) {
	    error(_("taper socketpair 0: descriptor %d out of range (0 .. %d)\n"),
	          fd[0], (int)FD_SETSIZE-1);
            /*NOTREACHED*/
	}
	if (fd[1] < 0 || fd[1] >= (int)FD_SETSIZE) {
	    error(_("taper socketpair 1: descriptor %d out of range (0 .. %d)\n"),
	          fd[1], (int)FD_SETSIZE-1);
            /*NOTREACHED*/
	}

	switch(taper->pid = fork()) {
	case -1:
	    error(_("fork taper: %s"), strerror(errno));
	    /*NOTREACHED*/

	case 0:	/* child process */
	    aclose(fd[0]);
	    if (dup2(fd[1], 0) == -1 || dup2(fd[1], 1) == -1)
	        error(_("taper dup2: %s"), strerror(errno));
	    config_options = get_config_options(6);
	    config_options[0] = "taper";
	    config_options[1] = get_config_name();
	    config_options[2] = "--storage";
	    config_options[3] = storage_name(storage);
	    config_options[4] = "--log-filename";
	    config_options[5] = log_filename;
	    safe_fd(-1, 0);
	    env = safe_env();
	    execve(taper_program, config_options, env);
	    free_env(env);
	    error("exec %s: %s", taper_program, strerror(errno));
	    /*NOTREACHED*/

	default: /* parent process */
	    aclose(fd[1]);
	    taper->fd = fd[0];
	}
	g_fprintf(stderr, "driver: taper %s storage %s tape_size %lld\n", taper->name, taper->storage_name, (long long)taper->tape_length);

    return taper;
}

void
startup_dump_process(
    dumper_t *dumper,
    char *dumper_program)
{
    int    fd[2];
    char **config_options;
    char **env;

    if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) {
	error(_("%s pipe: %s"), dumper->name, strerror(errno));
	/*NOTREACHED*/
    }

    switch(dumper->pid = fork()) {
    case -1:
	error(_("fork %s: %s"), dumper->name, strerror(errno));
	/*NOTREACHED*/

    case 0:		/* child process */
	aclose(fd[0]);
	if(dup2(fd[1], 0) == -1 || dup2(fd[1], 1) == -1)
	    error(_("%s dup2: %s"), dumper->name, strerror(errno));
	config_options = get_config_options(4);
	config_options[0] = dumper->name ? dumper->name : "dumper",
	config_options[1] = get_config_name();
	config_options[2] = "--log-filename";
	config_options[3] = log_filename;
	safe_fd(-1, 0);
	env = safe_env();
	execve(dumper_program, config_options, env);
	free_env(env);
	error(_("exec %s (%s): %s"), dumper_program,
	      dumper->name, strerror(errno));
        /*NOTREACHED*/

    default:	/* parent process */
	aclose(fd[1]);
	dumper->fd = fd[0];
	dumper->ev_read = NULL;
	dumper->busy = dumper->down = 0;
	g_fprintf(stderr,_("driver: started %s pid %u\n"),
		dumper->name, (unsigned)dumper->pid);
	fflush(stderr);
    }
}

void
startup_dump_processes(
    char *dumper_program,
    int inparallel,
    char *timestamp)
{
    int i;
    dumper_t *dumper;
    char number[NUM_STR_SIZE];

    for(dumper = dmptable, i = 0; i < inparallel; dumper++, i++) {
	g_snprintf(number, sizeof(number), "%d", i);
	dumper->name = g_strconcat("dumper", number, NULL);
	dumper->job = NULL;
	chktable[i].name = g_strconcat("chunker", number, NULL);
	chktable[i].job = NULL;
	chktable[i].fd = -1;

	startup_dump_process(dumper, dumper_program);
	dumper_cmd(dumper, START, NULL, (void *)timestamp);
    }
}

void
startup_chunk_process(
    chunker_t *chunker,
    char *chunker_program)
{
    int    fd[2];
    char **config_options;
    char **env;

    if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) {
	error(_("%s pipe: %s"), chunker->name, strerror(errno));
	/*NOTREACHED*/
    }

    switch(chunker->pid = fork()) {
    case -1:
	error(_("fork %s: %s"), chunker->name, strerror(errno));
	/*NOTREACHED*/

    case 0:		/* child process */
	aclose(fd[0]);
	if(dup2(fd[1], 0) == -1 || dup2(fd[1], 1) == -1) {
	    error(_("%s dup2: %s"), chunker->name, strerror(errno));
	    /*NOTREACHED*/
	}
	config_options = get_config_options(4);
	config_options[0] = chunker->name ? chunker->name : "chunker",
	config_options[1] = get_config_name();
	config_options[2] = "--log-filename";
	config_options[3] = log_filename;
	safe_fd(-1, 0);
	env = safe_env();
	execve(chunker_program, config_options, env);
	free_env(env);
	error(_("exec %s (%s): %s"), chunker_program,
	      chunker->name, strerror(errno));
        /*NOTREACHED*/

    default:	/* parent process */
	aclose(fd[1]);
	chunker->down = 0;
	chunker->fd = fd[0];
	chunker->ev_read = NULL;
	g_fprintf(stderr,_("driver: started %s pid %u\n"),
		chunker->name, (unsigned)chunker->pid);
	fflush(stderr);
    }
}

cmd_t
getresult(
    int fd,
    int show,
    int *result_argc,
    char ***result_argv)
{
    cmd_t t;
    char *line;

    if ((line = areads(fd)) == NULL) {
	if(errno) {
	    g_fprintf(stderr, _("reading result from %s: %s"), childstr(fd), strerror(errno));
	}
	*result_argv = NULL;
	*result_argc = 0;				/* EOF */
    } else {
	*result_argv = split_quoted_strings(line);
	*result_argc = g_strv_length(*result_argv);
    }

    if (show) {
	gchar *msg;
	msg = g_strdup_printf("driver: result time %s from %s: %s", walltime_str(curclock()), childstr(fd), line?line:"(eof)");
	g_printf("%s\n", msg);
	fflush(stdout);
	g_debug("%s", msg);
	g_free(msg);
    }
    amfree(line);

    if (*result_argc < 1) return BOGUS;

    for (t = (cmd_t)(BOGUS+1); t < LAST_TOK; t++)
	if (g_str_equal((*result_argv)[0], cmdstr[t])) return t;

    return BOGUS;
}


static char *
taper_splitting_args(
    char *storage_name,
    disk_t *dp)
{
    GString *args = NULL;
    char *q = NULL;
    dumptype_t *dt = dp->config;
    storage_t  *st;
    tapetype_t *tt;

    st = lookup_storage(storage_name);
    tt = lookup_tapetype(storage_get_tapetype(st));
    g_assert(tt != NULL);

    args = g_string_new("");

    /* old dumptype-based parameters, using empty strings when not seen */
    if (dt) { /* 'dt' may be NULL for flushes */
	if (dumptype_seen(dt, DUMPTYPE_TAPE_SPLITSIZE)) {
	    g_string_append_printf(args, "%ju ",
			(uintmax_t)dumptype_get_tape_splitsize(dt)*1024);
	} else {
	    g_string_append(args, "\"\" ");
	}

	q = quote_string(dumptype_seen(dt, DUMPTYPE_SPLIT_DISKBUFFER)?
		dumptype_get_split_diskbuffer(dt) : "");
	g_string_append_printf(args, "%s ", q);
	g_free(q);

	if (dumptype_seen(dt, DUMPTYPE_FALLBACK_SPLITSIZE)) {
	    g_string_append_printf(args, "%ju ",
			(uintmax_t)dumptype_get_fallback_splitsize(dt)*1024);
	} else {
	    g_string_append(args, "\"\" ");
	}

	if (dumptype_seen(dt, DUMPTYPE_ALLOW_SPLIT)) {
	    g_string_append_printf(args, "%d ",
			(int)dumptype_get_allow_split(dt));
	} else {
	    g_string_append(args, "\"\" ");
	}
    } else {
	g_string_append(args, "\"\" \"\" \"\" \"\" ");
    }

    /* new tapetype-based parameters */
    if (tapetype_seen(tt, TAPETYPE_PART_SIZE)) {
	g_string_append_printf(args, "%ju ",
		    (uintmax_t)tapetype_get_part_size(tt)*1024);
    } else {
	g_string_append(args, "\"\" ");
    }

    q = "";
    if (tapetype_seen(tt, TAPETYPE_PART_CACHE_TYPE)) {
	switch (tapetype_get_part_cache_type(tt)) {
	    default:
	    case PART_CACHE_TYPE_NONE:
		q = "none";
		break;

	    case PART_CACHE_TYPE_MEMORY:
		q = "memory";
		break;

	    case PART_CACHE_TYPE_DISK:
		q = "disk";
		break;
	}
    }
    q = quote_string(q);
    g_string_append_printf(args, "%s ", q);
    g_free(q);

    q = quote_string(tapetype_seen(tt, TAPETYPE_PART_CACHE_DIR)?
	    tapetype_get_part_cache_dir(tt) : "");
    g_string_append_printf(args, "%s ", q);
    g_free(q);

    if (tapetype_seen(tt, TAPETYPE_PART_CACHE_MAX_SIZE)) {
	g_string_append_printf(args, "%ju ",
		    (uintmax_t)tapetype_get_part_cache_max_size(tt)*1024);
    } else {
	g_string_append(args, "\"\" ");
    }


    return g_string_free(args, FALSE);
}

int
taper_cmd(
    taper_t  *taper,
    wtaper_t *wtaper,
    cmd_t cmd,
    void *ptr,
    char *destname,
    int level,
    char *datestamp)
{
    char *cmdline = NULL;
    char number[NUM_STR_SIZE];
    char orig_kb[NUM_STR_SIZE];
    char n_crc[NUM_STR_SIZE+11];
    char c_crc[NUM_STR_SIZE+11];
    char s_crc[NUM_STR_SIZE+11];
    char *data_path;
    sched_t *sp;
    disk_t *dp;
    char *qname;
    char *qdest;
    char *q;
    char *splitargs;
    uintmax_t origsize;

    switch(cmd) {
    case START_TAPER:
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", taper->name,
			    " ", wtaper->name,
			    " ", taper->storage_name,
			    " ", datestamp,
			    "\n", NULL);
	break;
    case CLOSE_VOLUME:
	dp = (disk_t *) ptr;
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    "\n", NULL);
	break;
    case CLOSE_SOURCE_VOLUME:
	dp = (disk_t *) ptr;
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    "\n", NULL);
	break;
    case FILE_WRITE:
	sp = (sched_t *)ptr;
	dp = sp->disk;
        qname = quote_string(dp->name);
	qdest = quote_string(destname);
	g_snprintf(number, sizeof(number), "%d", level);
	if (sp->origsize >= 0)
	    origsize = sp->origsize;
	else
	    origsize = 0;
	g_snprintf(orig_kb, sizeof(orig_kb), "%ju", origsize);
	splitargs = taper_splitting_args(taper->storage_name, dp);
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    " ", qdest,
			    " ", dp->host->hostname,
			    " ", qname,
			    " ", number,
			    " ", datestamp,
			    " ", splitargs,
			         orig_kb,
			    "\n", NULL);
	amfree(splitargs);
	amfree(qdest);
	amfree(qname);
	break;

    case PORT_WRITE:
    case SHM_WRITE:
	sp = (sched_t *)ptr;
	dp = sp->disk;
        qname = quote_string(dp->name);
	g_snprintf(number, sizeof(number), "%d", level);
	data_path = data_path_to_string(dp->data_path);

	/*
          If we haven't been given a place to buffer split dumps to disk,
          make the argument something besides and empty string so's taper
          won't get confused
	*/
	splitargs = taper_splitting_args(taper->storage_name, dp);
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    " ", dp->host->hostname,
			    " ", qname,
			    " ", number,
			    " ", datestamp,
			    " ", splitargs,
			         data_path,
			    "\n", NULL);
	amfree(splitargs);
	amfree(qname);
	break;

    case VAULT_WRITE:
	sp = (sched_t *) ptr;
	dp = sp->disk;
        qname = quote_string(dp->name);
	g_snprintf(number, sizeof(number), "%d", level);
	if (sp->origsize >= 0)
	    origsize = sp->origsize;
	else
	    origsize = 0;
	g_snprintf(orig_kb, sizeof(orig_kb), "%ju", origsize);
	splitargs = taper_splitting_args(taper->storage_name, dp);
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    " ", sp->src_storage,
			    " ", sp->src_pool,
			    " ", sp->src_label,
			    " ", dp->host->hostname,
			    " ", qname,
			    " ", number,
			    " ", datestamp,
			    " ", splitargs,
				 orig_kb,
			    "\n", NULL);
	amfree(splitargs);
	amfree(qname);
	break;

    case DONE: /* handle */
	sp = (sched_t *)ptr;
	dp = sp->disk;
	if (sp->origsize >= 0)
	    origsize = sp->origsize;
	else
	    origsize = 0;
	g_snprintf(number, sizeof(number), "%ju", origsize);
	g_snprintf(n_crc, sizeof(n_crc), "%08x:%lld", sp->native_crc.crc,
		   (long long)sp->native_crc.size);
	g_snprintf(c_crc, sizeof(c_crc), "%08x:%lld", sp->client_crc.crc,
		   (long long)sp->client_crc.size);
	if (dp->compress == COMP_SERVER_FAST ||
	    dp->compress == COMP_SERVER_BEST ||
	    dp->compress == COMP_SERVER_CUST ||
	    dp->encrypt  == ENCRYPT_SERV_CUST) {
	    /* The server-crc do not match the client-crc */
	    g_snprintf(s_crc, sizeof(s_crc), "00000000:0");
	} else {
	    /* The server-crc should match the client-crc */
	    g_snprintf(s_crc, sizeof(s_crc), "%08x:%lld",
		       sp->client_crc.crc,
		       (long long)sp->client_crc.size);
	}
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    " ", number,
			    " ", n_crc,
			    " ", c_crc,
			    " ", s_crc,
			    "\n", NULL);
	break;
    case FAILED: /* handle */
	sp = (sched_t *)ptr;
	dp = sp->disk;
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    "\n", NULL);
	break;
    case NO_NEW_TAPE:
	sp = (sched_t *)ptr;
	dp = sp->disk;
	q = quote_string(destname);	/* reason why no new tape */
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    " ", q,
			    "\n", NULL);
	amfree(q);
	break;
    case NEW_TAPE:
	sp = (sched_t *)ptr;
	dp = sp->disk;
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    "\n", NULL);
	break;
    case START_SCAN:
	sp = (sched_t *)ptr;
	dp = sp->disk;
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    "\n", NULL);
	break;
    case TAKE_SCRIBE_FROM:
	sp = (sched_t *)ptr;
	dp = sp->disk;
	cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", wtaper->name,
			    " ", job2serial(wtaper->job),
			    " ", destname,  /* name of worker */
			    "\n", NULL);
	break;
    case QUIT:
	cmdline = g_strconcat(cmdstr[cmd], "\n", NULL);
	break;
    default:
	error(_("Don't know how to send %s command to taper"), cmdstr[cmd]);
	/*NOTREACHED*/
    }

    /*
     * Note: cmdline already has a '\n'.
     */
    g_printf(_("driver: send-cmd time %s to %s: %s"),
	   walltime_str(curclock()), taper->name, cmdline);
    fflush(stdout);
    if ((full_write(taper->fd, cmdline, strlen(cmdline))) < strlen(cmdline)) {
	g_printf(_("writing taper command '%s' failed: %s\n"),
		cmdline, strerror(errno));
	fflush(stdout);
	amfree(cmdline);
	return 0;
    }
    cmdline[strlen(cmdline)-1] = '\0';
    g_debug("driver: send-cmd time %s to %s: %s", walltime_str(curclock()), taper->name, cmdline);
    if (cmd == QUIT) {
	aclose(taper->fd);
	amfree(taper->name);
	amfree(taper->storage_name);
    }
    amfree(cmdline);
    return 1;
}

int
dumper_cmd(
    dumper_t *dumper,
    cmd_t cmd,
    sched_t *sp,
    char   *mesg)
{
    char *cmdline = NULL;
    char *qmesg;
    disk_t *dp;

    switch(cmd) {
    case START:
        cmdline = g_strdup_printf("%s %s\n", cmdstr[cmd], mesg);
	break;
    case PORT_DUMP:
        if (!sp)
            error("PORT-DUMP without sched pointer\n");
	// fall through
    case SHM_DUMP: {
        application_t *application = NULL;
        GPtrArray *array;
        GString *strbuf;
        gchar **args;
        am_feature_t *features;
        char *device, *plugin;
        char *tmp;

        if (!sp)
            error("SHM-DUMP without sched pointer\n");

	dp = sp->disk;
        array = g_ptr_array_new();
        features = dp->host->features;

        device = (dp->device) ? dp->device : "NODEVICE";

        if (dp->application != NULL) {
            application = lookup_application(dp->application);
            g_assert(application != NULL);
        }

        g_ptr_array_add(array, g_strdup(cmdstr[cmd]));
        g_ptr_array_add(array, g_strdup(job2serial(dumper->job)));
        g_ptr_array_add(array, g_strdup_printf("%d", dumper->output_port));
        g_ptr_array_add(array, g_strdup(interface_get_src_ip(dp->host->netif->config)));
        g_ptr_array_add(array, g_strdup_printf("%d", dp->host->maxdumps));
        g_ptr_array_add(array, g_strdup(dp->host->hostname));
        g_ptr_array_add(array, am_feature_to_string(features));
        g_ptr_array_add(array, quote_string(dp->name));
        g_ptr_array_add(array, quote_string(device));
        g_ptr_array_add(array, g_strdup_printf("%d", sp->level));
        g_ptr_array_add(array, g_strdup(sp->dumpdate));


        /*
         * Build the last argument
         */
        strbuf = g_string_new("|");

        if (am_has_feature(features, fe_req_xml)) {
            char *qtmp;

            tmp = xml_optionstr(dp, 1);
            qtmp = quote_string(tmp);
            g_free(tmp);

            g_string_append(strbuf, qtmp);
            g_free(qtmp);

	    tmp = xml_dumptype_properties(dp);
	    qtmp = quote_string(tmp);
	    g_free(tmp);
	    g_string_append(strbuf, qtmp);
	    g_free(qtmp);

            if (application) {
                tmp = xml_application(dp, application, features);
                qtmp = quote_string(tmp);
                g_free(tmp);

                g_string_append(strbuf, qtmp);
                g_free(qtmp);
            }
        } else {
            tmp = optionstr(dp);
            g_string_append(strbuf, tmp);
            g_free(tmp);
        }

        g_string_append_c(strbuf, '\n');

        g_assert(dp->program != NULL);

        if (g_str_equal(dp->program, "APPLICATION")) {
            g_assert(application != NULL);
            plugin = application_get_plugin(application);
        } else {
            plugin = dp->program;
        }

        g_ptr_array_add(array, quote_string(plugin));
        g_ptr_array_add(array, quote_string(dp->amandad_path));
        g_ptr_array_add(array, quote_string(dp->client_username));
        g_ptr_array_add(array, quote_string(dp->ssl_fingerprint_file));
        g_ptr_array_add(array, quote_string(dp->ssl_cert_file));
        g_ptr_array_add(array, quote_string(dp->ssl_key_file));
        g_ptr_array_add(array, quote_string(dp->ssl_ca_cert_file));
        g_ptr_array_add(array, quote_string(dp->ssl_cipher_list));
        g_ptr_array_add(array, g_strdup_printf("%d", dp->ssl_check_certificate_host));
        g_ptr_array_add(array, quote_string(dp->client_port));
        g_ptr_array_add(array, quote_string(dp->ssh_keys));
        g_ptr_array_add(array, g_strdup(dp->auth));
        g_ptr_array_add(array, g_strdup(data_path_to_string(dp->data_path)));
	if (cmd == PORT_DUMP) {
            g_ptr_array_add(array, g_strdup(dp->dataport_list));
	} else {
            g_ptr_array_add(array, g_strdup(dp->shm_name));
	}
        g_ptr_array_add(array, g_strdup_printf("%d", dp->max_warnings));
        g_ptr_array_add(array, g_string_free(strbuf, FALSE));
        g_ptr_array_add(array, NULL);

        args = (gchar **)g_ptr_array_free(array, FALSE);
        cmdline = g_strjoinv(" ", args);
        g_strfreev(args);

	break;
    }
    case ABORT:
	qmesg = quote_string(mesg);
        cmdline = g_strdup_printf("%s %s %s\n", cmdstr[cmd], job2serial(dumper->job), qmesg);
	amfree(qmesg);
	break;
    case QUIT:
	qmesg = quote_string(mesg);
        cmdline = g_strdup_printf("%s %s\n", cmdstr[cmd], qmesg);
	amfree(qmesg);
	break;
    default:
	error("Don't know how to send %s command to dumper", cmdstr[cmd]);
	/*NOTREACHED*/
    }

    /*
     * Note: cmdline already has a '\n'.
     */
    if (dumper->down) {
	g_printf(_("driver: send-cmd time %s ignored to down dumper %s: %s"),
	       walltime_str(curclock()), dumper->name, cmdline);
    } else {
	g_printf(_("driver: send-cmd time %s to %s: %s"),
	       walltime_str(curclock()), dumper->name, cmdline);
	fflush(stdout);
	if (full_write(dumper->fd, cmdline, strlen(cmdline)) < strlen(cmdline)) {
	    g_printf(_("writing %s command: %s\n"), dumper->name, strerror(errno));
	    fflush(stdout);
	    g_free(cmdline);
	    return 0;
	}
	cmdline[strlen(cmdline)-1] = '\0';
	g_debug("driver: send-cmd time %s to %s: %s", walltime_str(curclock()), dumper->name, cmdline);
	if (cmd == QUIT) aclose(dumper->fd);
    }
    g_free(cmdline);
    return 1;
}

int
chunker_cmd(
    chunker_t *chunker,
    cmd_t cmd,
    sched_t *sp,
    char   *mesg)
{
    char *cmdline = NULL;
    char number[NUM_STR_SIZE];
    char chunksize[NUM_STR_SIZE];
    char use[NUM_STR_SIZE];
    char c_crc[NUM_STR_SIZE+11];
    char *o;
    int activehd=0;
    assignedhd_t **h=NULL;
    char *features;
    char *qname;
    char *qdest;
    disk_t *dp;

    switch(cmd) {
    case START:
	cmdline = g_strjoin(NULL, cmdstr[cmd], " ", mesg, "\n", NULL);
	break;
    case PORT_WRITE:
    case SHM_WRITE:
	dp = sp->disk;
	if(sp->holdp) {
	    h = sp->holdp;
	    activehd = sp->activehd;
	}

	if (dp && h) {
	    qname = quote_string(dp->name);
	    qdest = quote_string(sp->destname);
	    h[activehd]->disk->allocated_dumpers++;
	    g_snprintf(number, sizeof(number), "%d", sp->level);
	    g_snprintf(chunksize, sizeof(chunksize), "%lld",
		    (long long)holdingdisk_get_chunksize(h[0]->disk->hdisk));
	    g_snprintf(use, sizeof(use), "%lld",
		    (long long)h[0]->reserved);
	    features = am_feature_to_string(dp->host->features);
	    o = optionstr(dp);
	    cmdline = g_strjoin(NULL, cmdstr[cmd],
			    " ", job2serial(chunker->job),
			    " ", qdest,
			    " ", dp->host->hostname,
			    " ", features,
			    " ", qname,
			    " ", number,
			    " ", mesg,  /* datestamp */
			    " ", chunksize,
			    " ", dp->program,
			    " ", use,
			    " |", o,
			    "\n", NULL);
	    amfree(features);
	    amfree(o);
	    amfree(qdest);
	    amfree(qname);
	} else {
		error(_("%s command without disk and holding disk.\n"),
		      cmdstr[cmd]);
		/*NOTREACHED*/
	}
	break;
    case CONTINUE:
	dp = sp->disk;
	if(sp->holdp) {
	    h = sp->holdp;
	    activehd = sp->activehd;
	}

	if(dp && h) {
	    qname = quote_string(dp->name);
	    qdest = quote_string(h[activehd]->destname);
	    h[activehd]->disk->allocated_dumpers++;
	    g_snprintf(chunksize, sizeof(chunksize), "%lld",
		     (long long)holdingdisk_get_chunksize(h[activehd]->disk->hdisk));
	    g_snprintf(use, sizeof(use), "%lld",
		     (long long)(h[activehd]->reserved - h[activehd]->used));
	    cmdline = g_strjoin(NULL, cmdstr[cmd],
				" ", job2serial(chunker->job),
				" ", qdest,
				" ", chunksize,
				" ", use,
				"\n", NULL );
	    amfree(qdest);
	    amfree(qname);
	} else {
	    cmdline = g_strconcat(cmdstr[cmd], "\n", NULL);
	}
	break;
    case QUIT:
	cmdline = g_strjoin(NULL, cmdstr[cmd], "\n", NULL);
	break;
    case ABORT:
	{
	    char *q = quote_string(mesg);
	    cmdline = g_strjoin(NULL, cmdstr[cmd], " ", q, "\n", NULL);
            cmdline = g_strdup_printf("%s %s %s\n", cmdstr[cmd], job2serial(chunker->job), q);
	    amfree(q);
	}
	break;
    case DONE:
	dp = sp->disk;
	if (dp) {
	    if (sp->client_crc.crc == 0 ||
		dp->compress == COMP_SERVER_FAST ||
		dp->compress == COMP_SERVER_BEST ||
		dp->compress == COMP_SERVER_CUST ||
		dp->encrypt  == ENCRYPT_SERV_CUST) {
		g_snprintf(c_crc, sizeof(c_crc), "00000000:0");
	    } else {
		g_snprintf(c_crc, sizeof(c_crc), "%08x:%lld",
			   sp->client_crc.crc,
			   (long long)sp->client_crc.size);
	    }
	    cmdline = g_strjoin(NULL, cmdstr[cmd],
				" ", job2serial(chunker->job),
				" ", c_crc,
				"\n",  NULL);
	} else {
	    cmdline = g_strjoin(NULL, cmdstr[cmd], "\n", NULL);
	}
	break;
    case FAILED:
	dp = sp->disk;
	if( dp ) {
	    cmdline = g_strjoin(NULL, cmdstr[cmd],
				" ", job2serial(chunker->job),
				"\n",  NULL);
	} else {
	    cmdline = g_strjoin(NULL, cmdstr[cmd], "\n", NULL);
	}
	break;
    default:
	error(_("Don't know how to send %s command to chunker"), cmdstr[cmd]);
	/*NOTREACHED*/
    }

    /*
     * Note: cmdline already has a '\n'.
     */
    g_printf(_("driver: send-cmd time %s to %s: %s"),
	   walltime_str(curclock()), chunker->name, cmdline);
    fflush(stdout);
    if (full_write(chunker->fd, cmdline, strlen(cmdline)) < strlen(cmdline)) {
	g_printf(_("writing %s command: %s\n"), chunker->name, strerror(errno));
	fflush(stdout);
	amfree(cmdline);
	return 0;
    }
    cmdline[strlen(cmdline)-1] = '\0';
    g_debug("driver: send-cmd time %s to %s: %s", walltime_str(curclock()), chunker->name, cmdline);
    if (cmd == QUIT) aclose(chunker->fd);
    amfree(cmdline);
    return 1;
}

job_t *
alloc_job(void)
{
    int i;

    for (i=0; i<max_jobs; i++) {
	if (jobs[i].in_use == 0) {
	    jobs[i].in_use = 1;
	    return &jobs[i];
	}
    }
    error("All job in use");
}

void
free_job(
    job_t *job)
{
    job->in_use  = 0;
    job->sched   = NULL;
    job->dumper  = NULL;
    job->chunker = NULL;
    job->wtaper  = NULL;
}

void
free_sched(
    sched_t *sp)
{
    g_free(sp->dumpdate);
    g_free(sp->degr_dumpdate);
    g_free(sp->destname);
    g_free(sp->datestamp);
    g_free(sp->degr_mesg);
    g_free(sp->src_storage);
    g_free(sp->src_pool);
    g_free(sp->src_label);
    g_free(sp);
}

job_t *
serial2job(
    char *str)
{
    int rc, s;
    long gen;

    rc = sscanf(str, "%d-%ld", &s, &gen);
    if(rc != 2) {
	error(_("error [serial2job \"%s\" parse error]"), str);
	/*NOTREACHED*/
    } else if (s < 0 || s >= max_serial) {
	error(_("error [serial out of range 0..%d: %d]"), max_serial, s);
	/*NOTREACHED*/
    }
    if(gen != stable[s].gen)
	g_printf("driver: serial2job error time %s serial gen mismatch %s %d %ld %ld\n",
	       walltime_str(curclock()), str, s, gen, stable[s].gen);
    return stable[s].job;
}

void
free_serial(
    char *str)
{
    int rc, s;
    long gen;

    rc = sscanf(str, _("%d-%ld"), &s, &gen);
    if(!(rc == 2 && s >= 0 && s < max_serial)) {
	/* nuke self to get core dump for Brett */
	g_fprintf(stderr, _("driver: free_serial: str \"%s\" rc %d s %d\n"),
		str, rc, s);
	fflush(stderr);
	abort();
    }

    if(gen != stable[s].gen)
	g_printf(_("driver: free_serial error time %s serial gen mismatch %s\n"),
	       walltime_str(curclock()),str);
    stable[s].gen = 0;
    stable[s].job = NULL;
}


void
free_serial_job(
    job_t *job)
{
    int s;

    for(s = 0; s < max_serial; s++) {
	if(stable[s].job == job) {
	    //g_printf("free serial %02d-%05ld for disk\n", s, stable[s].gen);
	    stable[s].gen = 0;
	    stable[s].job = NULL;
	    return;
	}
    }

    // Should try to print DB name found by dumper/chunker or wtaper.
    g_printf(_("driver: error time %s serial not found for job %p\n"),
	   walltime_str(curclock()), job);
}


void
check_unfree_serial(void)
{
    int s;

    /* find used serial number */
    for(s = 0; s < max_serial; s++) {
	if(stable[s].gen != 0 || stable[s].job != NULL) {
	    g_printf(_("driver: error time %s bug: serial in use: %02d-%05ld\n"),
		   walltime_str(curclock()), s, stable[s].gen);
	}
    }
}

char *job2serial(
    job_t *job)
{
    int s;
    static char str[NUM_STR_SIZE];

    for(s = 0; s < max_serial; s++) {
	if(stable[s].job == job) {
	    g_snprintf(str, sizeof(str), "%02d-%05ld", s, stable[s].gen);
	    return str;
	}
    }

    /* find unused serial number */
    for(s = 0; s < max_serial; s++)
	if(stable[s].gen == 0 && stable[s].job == NULL)
	    break;
    if(s >= max_serial) {
	g_printf(_("driver: error time %s bug: out of serial numbers\n"),
	       walltime_str(curclock()));
	s = 0;
    }

    stable[s].gen = generation++;
    stable[s].job = job;

    g_snprintf(str, sizeof(str), "%02d-%05ld", s, stable[s].gen);
    return str;
}

void
update_info_dumper(
     sched_t *sp,
     off_t origsize,
     off_t dumpsize,
     time_t dumptime)
{
    int level, i;
    info_t info;
    stats_t *infp;
    perf_t *perfp;
    char *conf_infofile;
    disk_t *dp = sp->disk;

    level = sp->level;

    if (origsize == 0 || dumpsize == 0) {
	g_debug("not updating because origsize or dumpsize is 0");
	return;
    }

    conf_infofile = config_dir_relative(getconf_str(CNF_INFOFILE));
    if (open_infofile(conf_infofile)) {
	error(_("could not open info db \"%s\""), conf_infofile);
	/*NOTREACHED*/
    }
    amfree(conf_infofile);

    get_info(dp->host->hostname, dp->name, &info);

    /* Clean up information about this and higher-level dumps.  This
       assumes that update_info_dumper() is always run before
       update_info_taper(). */
    for (i = level; i < DUMP_LEVELS; ++i) {
      infp = &info.inf[i];
      infp->size = (off_t)-1;
      infp->csize = (off_t)-1;
      infp->secs = (time_t)-1;
      infp->date = (time_t)-1;
      infp->label[0] = '\0';
      infp->filenum = 0;
    }

    /* now store information about this dump */
    infp = &info.inf[level];
    infp->size = origsize;
    infp->csize = dumpsize;
    infp->secs = dumptime;
    if (sp->timestamp == 0) {
	infp->date = 0;
    } else {
	infp->date = get_time_from_timestamp(sp->datestamp);
    }

    if(level == 0) perfp = &info.full;
    else perfp = &info.incr;

    /* Update the stats, but only if the new values are meaningful */
    if(dp->compress != COMP_NONE && origsize > (off_t)0) {
	newperf(perfp->comp, (double)dumpsize/(double)origsize);
    }
    if(dumptime > (time_t)0) {
	if((off_t)dumptime >= dumpsize)
	    newperf(perfp->rate, 1);
	else
	    newperf(perfp->rate, (double)dumpsize/(double)dumptime);
    }

    if(origsize >= (off_t)0 && getconf_int(CNF_RESERVE)<100) {
	info.command = NO_COMMAND;
    }

    if (origsize >= (off_t)0 && level == info.last_level) {
	info.consecutive_runs++;
    } else if (origsize >= (off_t)0) {
	info.last_level = level;
	info.consecutive_runs = 1;
    }

    if(origsize >= (off_t)0 && dumpsize >= (off_t)0) {
	for(i=NB_HISTORY-1;i>0;i--) {
	    info.history[i] = info.history[i-1];
	}

	info.history[0].level = level;
	info.history[0].size  = origsize;
	info.history[0].csize = dumpsize;
	if (sp->timestamp == 0) {
	    info.history[0].date = 0;
	} else {
	    info.history[0].date = get_time_from_timestamp(sp->datestamp);
	}
	info.history[0].secs  = dumptime;
    }

    if (put_info(dp->host->hostname, dp->name, &info)) {
	int save_errno = errno;
	g_fprintf(stderr, _("infofile update failed (%s,'%s'): %s\n"),
		  dp->host->hostname, dp->name, strerror(save_errno));
	log_add(L_ERROR, _("infofile update failed (%s,'%s'): %s\n"),
		dp->host->hostname, dp->name, strerror(save_errno));
	error(_("infofile update failed (%s,'%s'): %s\n"),
	      dp->host->hostname, dp->name, strerror(save_errno));
	/*NOTREACHED*/
    }

    close_infofile();
}

void
update_info_taper(
    sched_t *sp,
    char *label,
    off_t filenum,
    int level)
{
    info_t info;
    stats_t *infp;
    int rc;
    disk_t *dp = sp->disk;

    if (!label) {
	log_add(L_ERROR, "update_info_taper without label");
	return;
    }

    rc = open_infofile(getconf_str(CNF_INFOFILE));
    if(rc) {
	error(_("could not open infofile %s: %s (%d)"), getconf_str(CNF_INFOFILE),
	      strerror(errno), rc);
	/*NOTREACHED*/
    }

    get_info(dp->host->hostname, dp->name, &info);

    infp = &info.inf[level];
    /* XXX - should we record these two if no-record? */
    strncpy(infp->label, label, sizeof(infp->label)-1);
    infp->label[sizeof(infp->label)-1] = '\0';
    infp->filenum = filenum;

    info.command = NO_COMMAND;

    if (put_info(dp->host->hostname, dp->name, &info)) {
	int save_errno = errno;
	g_fprintf(stderr, _("infofile update failed (%s,'%s'): %s\n"),
		  dp->host->hostname, dp->name, strerror(save_errno));
	log_add(L_ERROR, _("infofile update failed (%s,'%s'): %s\n"),
		dp->host->hostname, dp->name, strerror(save_errno));
	error(_("infofile update failed (%s,'%s'): %s\n"),
	      dp->host->hostname, dp->name, strerror(save_errno));
	/*NOTREACHED*/
    }
    close_infofile();
}

/* Free an array of pointers to assignedhd_t after freeing the
 * assignedhd_t themselves. The array must be NULL-terminated.
 */
void free_assignedhd(
    assignedhd_t **ahd)
{
    int i;

    if( !ahd ) { return; }

    for( i = 0; ahd[i]; i++ ) {
	amfree(ahd[i]->destname);
	amfree(ahd[i]);
    }
    amfree(ahd);
}