/* * Amanda, The Advanced Maryland Automatic Network Disk Archiver * Copyright (c) 1991-1998 University of Maryland at College Park * Copyright (c) 2007-2012 Zmanda, Inc. All Rights Reserved. * Copyright (c) 2013-2016 Carbonite, Inc. All Rights Reserved. * All Rights Reserved. * * Permission to use, copy, modify, distribute, and sell this software and its * documentation for any purpose is hereby granted without fee, provided that * the above copyright notice appear in all copies and that both that * copyright notice and this permission notice appear in supporting * documentation, and that the name of U.M. not be used in advertising or * publicity pertaining to distribution of the software without specific, * written prior permission. U.M. makes no representations about the * suitability of this software for any purpose. It is provided "as is" * without express or implied warranty. * * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M. * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. * * Author: James da Silva, Systems Design and Analysis Group * Computer Science Department * University of Maryland at College Park */ /* * $Id: driverio.c,v 1.92 2006/08/24 01:57:16 paddy_s Exp $ * * I/O-related functions for driver program */ #include "amanda.h" #include "amutil.h" #include "clock.h" #include "server_util.h" #include "conffile.h" #include "diskfile.h" #include "infofile.h" #include "logfile.h" #include "timestamp.h" #define GLOBAL /* the global variables defined here */ #include "driverio.h" static const char *childstr(int); static long generation = 1; typedef struct serial_s { long gen; job_t *job; } serial_t; static int max_serial; static serial_t *stable; static int max_jobs; static job_t *jobs; static taper_t * start_one_tape_process(char *taper_program, char *storage_name, gboolean no_taper, int nb_taper); void init_driverio( int inparallel, int nb_storage, int sum_taper_parallel_write) { dumper_t *dumper; tapetable = g_new0(taper_t, nb_storage+1); dmptable = g_new0(dumper_t, inparallel+1); chktable = g_new0(chunker_t, inparallel+1); for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) { dumper->fd = -1; } max_serial = inparallel + sum_taper_parallel_write; stable = g_new0(serial_t, max_serial); max_jobs = inparallel + sum_taper_parallel_write; jobs = g_new0(job_t, max_jobs); } static const char * childstr( int fd) { static char buf[NUM_STR_SIZE + 32]; dumper_t *dumper; chunker_t *chunker; taper_t *taper; for (taper = tapetable; taper->fd != 0; taper++) { if (taper->fd == fd) return (taper->name); } for (dumper = dmptable; dumper->fd != 0; dumper++) { if (dumper->fd == fd) return (dumper->name); } for (chunker = chktable; chunker->fd != 0; chunker++) { if (chunker->fd == fd) return (chunker->name); } g_snprintf(buf, sizeof(buf), _("unknown child (fd %d)"), fd); return (buf); } static int nb_taper = 0; int startup_dump_tape_process( char *taper_program, gboolean no_taper) { identlist_t il; taper_t *taper; for (il = getconf_identlist(CNF_STORAGE); il != NULL; il = il->next) { taper = start_one_tape_process(taper_program, (char *)il->data, no_taper, nb_taper); if (taper) { taper->flush_storage = TRUE; nb_taper++; } } return nb_taper; } int startup_vault_tape_process( char *taper_program, gboolean no_taper) { identlist_t il; taper_t *taper; for (il = getconf_identlist(CNF_VAULT_STORAGE); il != NULL; il = il->next) { taper = start_one_tape_process(taper_program, (char *)il->data, no_taper, nb_taper); if (taper) { taper->vault_storage = TRUE; nb_taper++; } } return nb_taper; } static taper_t * start_one_tape_process( char *taper_program, char *storage_n, gboolean no_taper, int nb_taper) { int fd[2]; int nb_wtaper; int nb_worker; int i; char **config_options; char **env; taper_t *taper; wtaper_t *wtaper; storage_t *storage = lookup_storage(storage_n); char *tapetype; tapetype_t *tape; int conf_flush_threshold_dumped; int conf_flush_threshold_scheduled; int conf_taperflush; taper = &tapetable[nb_taper]; taper->pid = -1; for (i=0; iname = g_strdup_printf("taper%d", nb_taper); taper->storage_name = g_strdup(storage_n); taper->ev_read = NULL; taper->nb_wait_reply = 0; nb_worker = storage_get_taper_parallel_write(storage); taper->runtapes = storage_get_runtapes(storage); if (nb_worker > taper->runtapes) nb_worker = taper->runtapes; taper->nb_worker = nb_worker; tapetype = storage_get_tapetype(storage); tape = lookup_tapetype(tapetype); taper->tape_length = tapetype_get_length(tape); taper->current_tape = 0; conf_flush_threshold_dumped = storage_get_flush_threshold_dumped(storage); conf_flush_threshold_scheduled = storage_get_flush_threshold_scheduled(storage); conf_taperflush = storage_get_taperflush(storage); taper->flush_threshold_dumped = (conf_flush_threshold_dumped * taper->tape_length) /100; taper->flush_threshold_scheduled = (conf_flush_threshold_scheduled * taper->tape_length) /100; taper->taperflush = (conf_taperflush * taper->tape_length) /100; g_debug("storage %s: tape_length %lld", storage_name(storage), (long long)taper->tape_length); g_debug("storage %s: flush_threshold_dumped %lld", storage_name(storage), (long long)taper->flush_threshold_dumped); g_debug("storage %s: flush_threshold_scheduled %lld", storage_name(storage), (long long)taper->flush_threshold_scheduled ); g_debug("storage %s: taperflush %lld", storage_name(storage), (long long)taper->taperflush); taper->max_dle_by_volume = storage_get_max_dle_by_volume(storage); taper->tapeq.head = NULL; taper->tapeq.tail = NULL; taper->vaultqss = NULL; taper->degraded_mode = no_taper; taper->down = FALSE; taper->wtapetable = g_new0(wtaper_t, tapetable[nb_taper].nb_worker + 1); if (!taper->wtapetable) { error(_("could not g_malloc tapetable")); /*NOTREACHED*/ } for (wtaper = taper->wtapetable, nb_wtaper = 0; nb_wtaper < nb_worker; wtaper++, nb_wtaper++) { wtaper->name = g_strdup_printf("worker%d-%d", nb_taper, nb_wtaper); wtaper->sendresult = 0; wtaper->input_error = NULL; wtaper->tape_error = NULL; wtaper->result = LAST_TOK; wtaper->job = NULL; wtaper->first_label = NULL; wtaper->first_fileno = 0; wtaper->state = TAPER_STATE_DEFAULT; wtaper->left = 0; wtaper->written = 0; wtaper->vaultqs.src_labels_str = NULL; wtaper->vaultqs.src_labels = NULL; wtaper->vaultqs.vaultq.head = NULL; wtaper->vaultqs.vaultq.tail = NULL; wtaper->taper = taper; /* jump right to degraded mode if there's no taper */ if (no_taper) { wtaper->tape_error = g_strdup("no taper started (--no-taper)"); wtaper->result = BOGUS; } } /* don't start the taper if we're not supposed to */ taper->fd = -1; if (no_taper) return NULL; if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) { error(_("taper pipe: %s"), strerror(errno)); /*NOTREACHED*/ } if (fd[0] < 0 || fd[0] >= (int)FD_SETSIZE) { error(_("taper socketpair 0: descriptor %d out of range (0 .. %d)\n"), fd[0], (int)FD_SETSIZE-1); /*NOTREACHED*/ } if (fd[1] < 0 || fd[1] >= (int)FD_SETSIZE) { error(_("taper socketpair 1: descriptor %d out of range (0 .. %d)\n"), fd[1], (int)FD_SETSIZE-1); /*NOTREACHED*/ } switch(taper->pid = fork()) { case -1: error(_("fork taper: %s"), strerror(errno)); /*NOTREACHED*/ case 0: /* child process */ aclose(fd[0]); if (dup2(fd[1], 0) == -1 || dup2(fd[1], 1) == -1) error(_("taper dup2: %s"), strerror(errno)); config_options = get_config_options(6); config_options[0] = "taper"; config_options[1] = get_config_name(); config_options[2] = "--storage"; config_options[3] = storage_name(storage); config_options[4] = "--log-filename"; config_options[5] = log_filename; safe_fd(-1, 0); env = safe_env(); execve(taper_program, config_options, env); free_env(env); error("exec %s: %s", taper_program, strerror(errno)); /*NOTREACHED*/ default: /* parent process */ aclose(fd[1]); taper->fd = fd[0]; } g_fprintf(stderr, "driver: taper %s storage %s tape_size %lld\n", taper->name, taper->storage_name, (long long)taper->tape_length); return taper; } void startup_dump_process( dumper_t *dumper, char *dumper_program) { int fd[2]; char **config_options; char **env; if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) { error(_("%s pipe: %s"), dumper->name, strerror(errno)); /*NOTREACHED*/ } switch(dumper->pid = fork()) { case -1: error(_("fork %s: %s"), dumper->name, strerror(errno)); /*NOTREACHED*/ case 0: /* child process */ aclose(fd[0]); if(dup2(fd[1], 0) == -1 || dup2(fd[1], 1) == -1) error(_("%s dup2: %s"), dumper->name, strerror(errno)); config_options = get_config_options(4); config_options[0] = dumper->name ? dumper->name : "dumper", config_options[1] = get_config_name(); config_options[2] = "--log-filename"; config_options[3] = log_filename; safe_fd(-1, 0); env = safe_env(); execve(dumper_program, config_options, env); free_env(env); error(_("exec %s (%s): %s"), dumper_program, dumper->name, strerror(errno)); /*NOTREACHED*/ default: /* parent process */ aclose(fd[1]); dumper->fd = fd[0]; dumper->ev_read = NULL; dumper->busy = dumper->down = 0; g_fprintf(stderr,_("driver: started %s pid %u\n"), dumper->name, (unsigned)dumper->pid); fflush(stderr); } } void startup_dump_processes( char *dumper_program, int inparallel, char *timestamp) { int i; dumper_t *dumper; char number[NUM_STR_SIZE]; for(dumper = dmptable, i = 0; i < inparallel; dumper++, i++) { g_snprintf(number, sizeof(number), "%d", i); dumper->name = g_strconcat("dumper", number, NULL); dumper->job = NULL; chktable[i].name = g_strconcat("chunker", number, NULL); chktable[i].job = NULL; chktable[i].fd = -1; startup_dump_process(dumper, dumper_program); dumper_cmd(dumper, START, NULL, (void *)timestamp); } } void startup_chunk_process( chunker_t *chunker, char *chunker_program) { int fd[2]; char **config_options; char **env; if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) { error(_("%s pipe: %s"), chunker->name, strerror(errno)); /*NOTREACHED*/ } switch(chunker->pid = fork()) { case -1: error(_("fork %s: %s"), chunker->name, strerror(errno)); /*NOTREACHED*/ case 0: /* child process */ aclose(fd[0]); if(dup2(fd[1], 0) == -1 || dup2(fd[1], 1) == -1) { error(_("%s dup2: %s"), chunker->name, strerror(errno)); /*NOTREACHED*/ } config_options = get_config_options(4); config_options[0] = chunker->name ? chunker->name : "chunker", config_options[1] = get_config_name(); config_options[2] = "--log-filename"; config_options[3] = log_filename; safe_fd(-1, 0); env = safe_env(); execve(chunker_program, config_options, env); free_env(env); error(_("exec %s (%s): %s"), chunker_program, chunker->name, strerror(errno)); /*NOTREACHED*/ default: /* parent process */ aclose(fd[1]); chunker->down = 0; chunker->fd = fd[0]; chunker->ev_read = NULL; g_fprintf(stderr,_("driver: started %s pid %u\n"), chunker->name, (unsigned)chunker->pid); fflush(stderr); } } cmd_t getresult( int fd, int show, int *result_argc, char ***result_argv) { cmd_t t; char *line; if ((line = areads(fd)) == NULL) { if(errno) { g_fprintf(stderr, _("reading result from %s: %s"), childstr(fd), strerror(errno)); } *result_argv = NULL; *result_argc = 0; /* EOF */ } else { *result_argv = split_quoted_strings(line); *result_argc = g_strv_length(*result_argv); } if (show) { gchar *msg; msg = g_strdup_printf("driver: result time %s from %s: %s", walltime_str(curclock()), childstr(fd), line?line:"(eof)"); g_printf("%s\n", msg); fflush(stdout); g_debug("%s", msg); g_free(msg); } amfree(line); if (*result_argc < 1) return BOGUS; for (t = (cmd_t)(BOGUS+1); t < LAST_TOK; t++) if (g_str_equal((*result_argv)[0], cmdstr[t])) return t; return BOGUS; } static char * taper_splitting_args( char *storage_name, disk_t *dp) { GString *args = NULL; char *q = NULL; dumptype_t *dt = dp->config; storage_t *st; tapetype_t *tt; st = lookup_storage(storage_name); tt = lookup_tapetype(storage_get_tapetype(st)); g_assert(tt != NULL); args = g_string_new(""); /* old dumptype-based parameters, using empty strings when not seen */ if (dt) { /* 'dt' may be NULL for flushes */ if (dumptype_seen(dt, DUMPTYPE_TAPE_SPLITSIZE)) { g_string_append_printf(args, "%ju ", (uintmax_t)dumptype_get_tape_splitsize(dt)*1024); } else { g_string_append(args, "\"\" "); } q = quote_string(dumptype_seen(dt, DUMPTYPE_SPLIT_DISKBUFFER)? dumptype_get_split_diskbuffer(dt) : ""); g_string_append_printf(args, "%s ", q); g_free(q); if (dumptype_seen(dt, DUMPTYPE_FALLBACK_SPLITSIZE)) { g_string_append_printf(args, "%ju ", (uintmax_t)dumptype_get_fallback_splitsize(dt)*1024); } else { g_string_append(args, "\"\" "); } if (dumptype_seen(dt, DUMPTYPE_ALLOW_SPLIT)) { g_string_append_printf(args, "%d ", (int)dumptype_get_allow_split(dt)); } else { g_string_append(args, "\"\" "); } } else { g_string_append(args, "\"\" \"\" \"\" \"\" "); } /* new tapetype-based parameters */ if (tapetype_seen(tt, TAPETYPE_PART_SIZE)) { g_string_append_printf(args, "%ju ", (uintmax_t)tapetype_get_part_size(tt)*1024); } else { g_string_append(args, "\"\" "); } q = ""; if (tapetype_seen(tt, TAPETYPE_PART_CACHE_TYPE)) { switch (tapetype_get_part_cache_type(tt)) { default: case PART_CACHE_TYPE_NONE: q = "none"; break; case PART_CACHE_TYPE_MEMORY: q = "memory"; break; case PART_CACHE_TYPE_DISK: q = "disk"; break; } } q = quote_string(q); g_string_append_printf(args, "%s ", q); g_free(q); q = quote_string(tapetype_seen(tt, TAPETYPE_PART_CACHE_DIR)? tapetype_get_part_cache_dir(tt) : ""); g_string_append_printf(args, "%s ", q); g_free(q); if (tapetype_seen(tt, TAPETYPE_PART_CACHE_MAX_SIZE)) { g_string_append_printf(args, "%ju ", (uintmax_t)tapetype_get_part_cache_max_size(tt)*1024); } else { g_string_append(args, "\"\" "); } return g_string_free(args, FALSE); } int taper_cmd( taper_t *taper, wtaper_t *wtaper, cmd_t cmd, void *ptr, char *destname, int level, char *datestamp) { char *cmdline = NULL; char number[NUM_STR_SIZE]; char orig_kb[NUM_STR_SIZE]; char n_crc[NUM_STR_SIZE+11]; char c_crc[NUM_STR_SIZE+11]; char s_crc[NUM_STR_SIZE+11]; char *data_path; sched_t *sp; disk_t *dp; char *qname; char *qdest; char *q; char *splitargs; uintmax_t origsize; switch(cmd) { case START_TAPER: cmdline = g_strjoin(NULL, cmdstr[cmd], " ", taper->name, " ", wtaper->name, " ", taper->storage_name, " ", datestamp, "\n", NULL); break; case CLOSE_VOLUME: dp = (disk_t *) ptr; cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, "\n", NULL); break; case CLOSE_SOURCE_VOLUME: dp = (disk_t *) ptr; cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, "\n", NULL); break; case FILE_WRITE: sp = (sched_t *)ptr; dp = sp->disk; qname = quote_string(dp->name); qdest = quote_string(destname); g_snprintf(number, sizeof(number), "%d", level); if (sp->origsize >= 0) origsize = sp->origsize; else origsize = 0; g_snprintf(orig_kb, sizeof(orig_kb), "%ju", origsize); splitargs = taper_splitting_args(taper->storage_name, dp); cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), " ", qdest, " ", dp->host->hostname, " ", qname, " ", number, " ", datestamp, " ", splitargs, orig_kb, "\n", NULL); amfree(splitargs); amfree(qdest); amfree(qname); break; case PORT_WRITE: case SHM_WRITE: sp = (sched_t *)ptr; dp = sp->disk; qname = quote_string(dp->name); g_snprintf(number, sizeof(number), "%d", level); data_path = data_path_to_string(dp->data_path); /* If we haven't been given a place to buffer split dumps to disk, make the argument something besides and empty string so's taper won't get confused */ splitargs = taper_splitting_args(taper->storage_name, dp); cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), " ", dp->host->hostname, " ", qname, " ", number, " ", datestamp, " ", splitargs, data_path, "\n", NULL); amfree(splitargs); amfree(qname); break; case VAULT_WRITE: sp = (sched_t *) ptr; dp = sp->disk; qname = quote_string(dp->name); g_snprintf(number, sizeof(number), "%d", level); if (sp->origsize >= 0) origsize = sp->origsize; else origsize = 0; g_snprintf(orig_kb, sizeof(orig_kb), "%ju", origsize); splitargs = taper_splitting_args(taper->storage_name, dp); cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), " ", sp->src_storage, " ", sp->src_pool, " ", sp->src_label, " ", dp->host->hostname, " ", qname, " ", number, " ", datestamp, " ", splitargs, orig_kb, "\n", NULL); amfree(splitargs); amfree(qname); break; case DONE: /* handle */ sp = (sched_t *)ptr; dp = sp->disk; if (sp->origsize >= 0) origsize = sp->origsize; else origsize = 0; g_snprintf(number, sizeof(number), "%ju", origsize); g_snprintf(n_crc, sizeof(n_crc), "%08x:%lld", sp->native_crc.crc, (long long)sp->native_crc.size); g_snprintf(c_crc, sizeof(c_crc), "%08x:%lld", sp->client_crc.crc, (long long)sp->client_crc.size); if (dp->compress == COMP_SERVER_FAST || dp->compress == COMP_SERVER_BEST || dp->compress == COMP_SERVER_CUST || dp->encrypt == ENCRYPT_SERV_CUST) { /* The server-crc do not match the client-crc */ g_snprintf(s_crc, sizeof(s_crc), "00000000:0"); } else { /* The server-crc should match the client-crc */ g_snprintf(s_crc, sizeof(s_crc), "%08x:%lld", sp->client_crc.crc, (long long)sp->client_crc.size); } cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), " ", number, " ", n_crc, " ", c_crc, " ", s_crc, "\n", NULL); break; case FAILED: /* handle */ sp = (sched_t *)ptr; dp = sp->disk; cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), "\n", NULL); break; case NO_NEW_TAPE: sp = (sched_t *)ptr; dp = sp->disk; q = quote_string(destname); /* reason why no new tape */ cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), " ", q, "\n", NULL); amfree(q); break; case NEW_TAPE: sp = (sched_t *)ptr; dp = sp->disk; cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), "\n", NULL); break; case START_SCAN: sp = (sched_t *)ptr; dp = sp->disk; cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), "\n", NULL); break; case TAKE_SCRIBE_FROM: sp = (sched_t *)ptr; dp = sp->disk; cmdline = g_strjoin(NULL, cmdstr[cmd], " ", wtaper->name, " ", job2serial(wtaper->job), " ", destname, /* name of worker */ "\n", NULL); break; case QUIT: cmdline = g_strconcat(cmdstr[cmd], "\n", NULL); break; default: error(_("Don't know how to send %s command to taper"), cmdstr[cmd]); /*NOTREACHED*/ } /* * Note: cmdline already has a '\n'. */ g_printf(_("driver: send-cmd time %s to %s: %s"), walltime_str(curclock()), taper->name, cmdline); fflush(stdout); if ((full_write(taper->fd, cmdline, strlen(cmdline))) < strlen(cmdline)) { g_printf(_("writing taper command '%s' failed: %s\n"), cmdline, strerror(errno)); fflush(stdout); amfree(cmdline); return 0; } cmdline[strlen(cmdline)-1] = '\0'; g_debug("driver: send-cmd time %s to %s: %s", walltime_str(curclock()), taper->name, cmdline); if (cmd == QUIT) { aclose(taper->fd); amfree(taper->name); amfree(taper->storage_name); } amfree(cmdline); return 1; } int dumper_cmd( dumper_t *dumper, cmd_t cmd, sched_t *sp, char *mesg) { char *cmdline = NULL; char *qmesg; disk_t *dp; switch(cmd) { case START: cmdline = g_strdup_printf("%s %s\n", cmdstr[cmd], mesg); break; case PORT_DUMP: if (!sp) error("PORT-DUMP without sched pointer\n"); // fall through case SHM_DUMP: { application_t *application = NULL; GPtrArray *array; GString *strbuf; gchar **args; am_feature_t *features; char *device, *plugin; char *tmp; if (!sp) error("SHM-DUMP without sched pointer\n"); dp = sp->disk; array = g_ptr_array_new(); features = dp->host->features; device = (dp->device) ? dp->device : "NODEVICE"; if (dp->application != NULL) { application = lookup_application(dp->application); g_assert(application != NULL); } g_ptr_array_add(array, g_strdup(cmdstr[cmd])); g_ptr_array_add(array, g_strdup(job2serial(dumper->job))); g_ptr_array_add(array, g_strdup_printf("%d", dumper->output_port)); g_ptr_array_add(array, g_strdup(interface_get_src_ip(dp->host->netif->config))); g_ptr_array_add(array, g_strdup_printf("%d", dp->host->maxdumps)); g_ptr_array_add(array, g_strdup(dp->host->hostname)); g_ptr_array_add(array, am_feature_to_string(features)); g_ptr_array_add(array, quote_string(dp->name)); g_ptr_array_add(array, quote_string(device)); g_ptr_array_add(array, g_strdup_printf("%d", sp->level)); g_ptr_array_add(array, g_strdup(sp->dumpdate)); /* * Build the last argument */ strbuf = g_string_new("|"); if (am_has_feature(features, fe_req_xml)) { char *qtmp; tmp = xml_optionstr(dp, 1); qtmp = quote_string(tmp); g_free(tmp); g_string_append(strbuf, qtmp); g_free(qtmp); tmp = xml_dumptype_properties(dp); qtmp = quote_string(tmp); g_free(tmp); g_string_append(strbuf, qtmp); g_free(qtmp); if (application) { tmp = xml_application(dp, application, features); qtmp = quote_string(tmp); g_free(tmp); g_string_append(strbuf, qtmp); g_free(qtmp); } } else { tmp = optionstr(dp); g_string_append(strbuf, tmp); g_free(tmp); } g_string_append_c(strbuf, '\n'); g_assert(dp->program != NULL); if (g_str_equal(dp->program, "APPLICATION")) { g_assert(application != NULL); plugin = application_get_plugin(application); } else { plugin = dp->program; } g_ptr_array_add(array, quote_string(plugin)); g_ptr_array_add(array, quote_string(dp->amandad_path)); g_ptr_array_add(array, quote_string(dp->client_username)); g_ptr_array_add(array, quote_string(dp->ssl_fingerprint_file)); g_ptr_array_add(array, quote_string(dp->ssl_cert_file)); g_ptr_array_add(array, quote_string(dp->ssl_key_file)); g_ptr_array_add(array, quote_string(dp->ssl_ca_cert_file)); g_ptr_array_add(array, quote_string(dp->ssl_cipher_list)); g_ptr_array_add(array, g_strdup_printf("%d", dp->ssl_check_certificate_host)); g_ptr_array_add(array, quote_string(dp->client_port)); g_ptr_array_add(array, quote_string(dp->ssh_keys)); g_ptr_array_add(array, g_strdup(dp->auth)); g_ptr_array_add(array, g_strdup(data_path_to_string(dp->data_path))); if (cmd == PORT_DUMP) { g_ptr_array_add(array, g_strdup(dp->dataport_list)); } else { g_ptr_array_add(array, g_strdup(dp->shm_name)); } g_ptr_array_add(array, g_strdup_printf("%d", dp->max_warnings)); g_ptr_array_add(array, g_string_free(strbuf, FALSE)); g_ptr_array_add(array, NULL); args = (gchar **)g_ptr_array_free(array, FALSE); cmdline = g_strjoinv(" ", args); g_strfreev(args); break; } case ABORT: qmesg = quote_string(mesg); cmdline = g_strdup_printf("%s %s %s\n", cmdstr[cmd], job2serial(dumper->job), qmesg); amfree(qmesg); break; case QUIT: qmesg = quote_string(mesg); cmdline = g_strdup_printf("%s %s\n", cmdstr[cmd], qmesg); amfree(qmesg); break; default: error("Don't know how to send %s command to dumper", cmdstr[cmd]); /*NOTREACHED*/ } /* * Note: cmdline already has a '\n'. */ if (dumper->down) { g_printf(_("driver: send-cmd time %s ignored to down dumper %s: %s"), walltime_str(curclock()), dumper->name, cmdline); } else { g_printf(_("driver: send-cmd time %s to %s: %s"), walltime_str(curclock()), dumper->name, cmdline); fflush(stdout); if (full_write(dumper->fd, cmdline, strlen(cmdline)) < strlen(cmdline)) { g_printf(_("writing %s command: %s\n"), dumper->name, strerror(errno)); fflush(stdout); g_free(cmdline); return 0; } cmdline[strlen(cmdline)-1] = '\0'; g_debug("driver: send-cmd time %s to %s: %s", walltime_str(curclock()), dumper->name, cmdline); if (cmd == QUIT) aclose(dumper->fd); } g_free(cmdline); return 1; } int chunker_cmd( chunker_t *chunker, cmd_t cmd, sched_t *sp, char *mesg) { char *cmdline = NULL; char number[NUM_STR_SIZE]; char chunksize[NUM_STR_SIZE]; char use[NUM_STR_SIZE]; char c_crc[NUM_STR_SIZE+11]; char *o; int activehd=0; assignedhd_t **h=NULL; char *features; char *qname; char *qdest; disk_t *dp; switch(cmd) { case START: cmdline = g_strjoin(NULL, cmdstr[cmd], " ", mesg, "\n", NULL); break; case PORT_WRITE: case SHM_WRITE: dp = sp->disk; if(sp->holdp) { h = sp->holdp; activehd = sp->activehd; } if (dp && h) { qname = quote_string(dp->name); qdest = quote_string(sp->destname); h[activehd]->disk->allocated_dumpers++; g_snprintf(number, sizeof(number), "%d", sp->level); g_snprintf(chunksize, sizeof(chunksize), "%lld", (long long)holdingdisk_get_chunksize(h[0]->disk->hdisk)); g_snprintf(use, sizeof(use), "%lld", (long long)h[0]->reserved); features = am_feature_to_string(dp->host->features); o = optionstr(dp); cmdline = g_strjoin(NULL, cmdstr[cmd], " ", job2serial(chunker->job), " ", qdest, " ", dp->host->hostname, " ", features, " ", qname, " ", number, " ", mesg, /* datestamp */ " ", chunksize, " ", dp->program, " ", use, " |", o, "\n", NULL); amfree(features); amfree(o); amfree(qdest); amfree(qname); } else { error(_("%s command without disk and holding disk.\n"), cmdstr[cmd]); /*NOTREACHED*/ } break; case CONTINUE: dp = sp->disk; if(sp->holdp) { h = sp->holdp; activehd = sp->activehd; } if(dp && h) { qname = quote_string(dp->name); qdest = quote_string(h[activehd]->destname); h[activehd]->disk->allocated_dumpers++; g_snprintf(chunksize, sizeof(chunksize), "%lld", (long long)holdingdisk_get_chunksize(h[activehd]->disk->hdisk)); g_snprintf(use, sizeof(use), "%lld", (long long)(h[activehd]->reserved - h[activehd]->used)); cmdline = g_strjoin(NULL, cmdstr[cmd], " ", job2serial(chunker->job), " ", qdest, " ", chunksize, " ", use, "\n", NULL ); amfree(qdest); amfree(qname); } else { cmdline = g_strconcat(cmdstr[cmd], "\n", NULL); } break; case QUIT: cmdline = g_strjoin(NULL, cmdstr[cmd], "\n", NULL); break; case ABORT: { char *q = quote_string(mesg); cmdline = g_strjoin(NULL, cmdstr[cmd], " ", q, "\n", NULL); cmdline = g_strdup_printf("%s %s %s\n", cmdstr[cmd], job2serial(chunker->job), q); amfree(q); } break; case DONE: dp = sp->disk; if (dp) { if (sp->client_crc.crc == 0 || dp->compress == COMP_SERVER_FAST || dp->compress == COMP_SERVER_BEST || dp->compress == COMP_SERVER_CUST || dp->encrypt == ENCRYPT_SERV_CUST) { g_snprintf(c_crc, sizeof(c_crc), "00000000:0"); } else { g_snprintf(c_crc, sizeof(c_crc), "%08x:%lld", sp->client_crc.crc, (long long)sp->client_crc.size); } cmdline = g_strjoin(NULL, cmdstr[cmd], " ", job2serial(chunker->job), " ", c_crc, "\n", NULL); } else { cmdline = g_strjoin(NULL, cmdstr[cmd], "\n", NULL); } break; case FAILED: dp = sp->disk; if( dp ) { cmdline = g_strjoin(NULL, cmdstr[cmd], " ", job2serial(chunker->job), "\n", NULL); } else { cmdline = g_strjoin(NULL, cmdstr[cmd], "\n", NULL); } break; default: error(_("Don't know how to send %s command to chunker"), cmdstr[cmd]); /*NOTREACHED*/ } /* * Note: cmdline already has a '\n'. */ g_printf(_("driver: send-cmd time %s to %s: %s"), walltime_str(curclock()), chunker->name, cmdline); fflush(stdout); if (full_write(chunker->fd, cmdline, strlen(cmdline)) < strlen(cmdline)) { g_printf(_("writing %s command: %s\n"), chunker->name, strerror(errno)); fflush(stdout); amfree(cmdline); return 0; } cmdline[strlen(cmdline)-1] = '\0'; g_debug("driver: send-cmd time %s to %s: %s", walltime_str(curclock()), chunker->name, cmdline); if (cmd == QUIT) aclose(chunker->fd); amfree(cmdline); return 1; } job_t * alloc_job(void) { int i; for (i=0; iin_use = 0; job->sched = NULL; job->dumper = NULL; job->chunker = NULL; job->wtaper = NULL; } void free_sched( sched_t *sp) { g_free(sp->dumpdate); g_free(sp->degr_dumpdate); g_free(sp->destname); g_free(sp->datestamp); g_free(sp->degr_mesg); g_free(sp->src_storage); g_free(sp->src_pool); g_free(sp->src_label); g_free(sp); } job_t * serial2job( char *str) { int rc, s; long gen; rc = sscanf(str, "%d-%ld", &s, &gen); if(rc != 2) { error(_("error [serial2job \"%s\" parse error]"), str); /*NOTREACHED*/ } else if (s < 0 || s >= max_serial) { error(_("error [serial out of range 0..%d: %d]"), max_serial, s); /*NOTREACHED*/ } if(gen != stable[s].gen) g_printf("driver: serial2job error time %s serial gen mismatch %s %d %ld %ld\n", walltime_str(curclock()), str, s, gen, stable[s].gen); return stable[s].job; } void free_serial( char *str) { int rc, s; long gen; rc = sscanf(str, _("%d-%ld"), &s, &gen); if(!(rc == 2 && s >= 0 && s < max_serial)) { /* nuke self to get core dump for Brett */ g_fprintf(stderr, _("driver: free_serial: str \"%s\" rc %d s %d\n"), str, rc, s); fflush(stderr); abort(); } if(gen != stable[s].gen) g_printf(_("driver: free_serial error time %s serial gen mismatch %s\n"), walltime_str(curclock()),str); stable[s].gen = 0; stable[s].job = NULL; } void free_serial_job( job_t *job) { int s; for(s = 0; s < max_serial; s++) { if(stable[s].job == job) { //g_printf("free serial %02d-%05ld for disk\n", s, stable[s].gen); stable[s].gen = 0; stable[s].job = NULL; return; } } // Should try to print DB name found by dumper/chunker or wtaper. g_printf(_("driver: error time %s serial not found for job %p\n"), walltime_str(curclock()), job); } void check_unfree_serial(void) { int s; /* find used serial number */ for(s = 0; s < max_serial; s++) { if(stable[s].gen != 0 || stable[s].job != NULL) { g_printf(_("driver: error time %s bug: serial in use: %02d-%05ld\n"), walltime_str(curclock()), s, stable[s].gen); } } } char *job2serial( job_t *job) { int s; static char str[NUM_STR_SIZE]; for(s = 0; s < max_serial; s++) { if(stable[s].job == job) { g_snprintf(str, sizeof(str), "%02d-%05ld", s, stable[s].gen); return str; } } /* find unused serial number */ for(s = 0; s < max_serial; s++) if(stable[s].gen == 0 && stable[s].job == NULL) break; if(s >= max_serial) { g_printf(_("driver: error time %s bug: out of serial numbers\n"), walltime_str(curclock())); s = 0; } stable[s].gen = generation++; stable[s].job = job; g_snprintf(str, sizeof(str), "%02d-%05ld", s, stable[s].gen); return str; } void update_info_dumper( sched_t *sp, off_t origsize, off_t dumpsize, time_t dumptime) { int level, i; info_t info; stats_t *infp; perf_t *perfp; char *conf_infofile; disk_t *dp = sp->disk; level = sp->level; if (origsize == 0 || dumpsize == 0) { g_debug("not updating because origsize or dumpsize is 0"); return; } conf_infofile = config_dir_relative(getconf_str(CNF_INFOFILE)); if (open_infofile(conf_infofile)) { error(_("could not open info db \"%s\""), conf_infofile); /*NOTREACHED*/ } amfree(conf_infofile); get_info(dp->host->hostname, dp->name, &info); /* Clean up information about this and higher-level dumps. This assumes that update_info_dumper() is always run before update_info_taper(). */ for (i = level; i < DUMP_LEVELS; ++i) { infp = &info.inf[i]; infp->size = (off_t)-1; infp->csize = (off_t)-1; infp->secs = (time_t)-1; infp->date = (time_t)-1; infp->label[0] = '\0'; infp->filenum = 0; } /* now store information about this dump */ infp = &info.inf[level]; infp->size = origsize; infp->csize = dumpsize; infp->secs = dumptime; if (sp->timestamp == 0) { infp->date = 0; } else { infp->date = get_time_from_timestamp(sp->datestamp); } if(level == 0) perfp = &info.full; else perfp = &info.incr; /* Update the stats, but only if the new values are meaningful */ if(dp->compress != COMP_NONE && origsize > (off_t)0) { newperf(perfp->comp, (double)dumpsize/(double)origsize); } if(dumptime > (time_t)0) { if((off_t)dumptime >= dumpsize) newperf(perfp->rate, 1); else newperf(perfp->rate, (double)dumpsize/(double)dumptime); } if(origsize >= (off_t)0 && getconf_int(CNF_RESERVE)<100) { info.command = NO_COMMAND; } if (origsize >= (off_t)0 && level == info.last_level) { info.consecutive_runs++; } else if (origsize >= (off_t)0) { info.last_level = level; info.consecutive_runs = 1; } if(origsize >= (off_t)0 && dumpsize >= (off_t)0) { for(i=NB_HISTORY-1;i>0;i--) { info.history[i] = info.history[i-1]; } info.history[0].level = level; info.history[0].size = origsize; info.history[0].csize = dumpsize; if (sp->timestamp == 0) { info.history[0].date = 0; } else { info.history[0].date = get_time_from_timestamp(sp->datestamp); } info.history[0].secs = dumptime; } if (put_info(dp->host->hostname, dp->name, &info)) { int save_errno = errno; g_fprintf(stderr, _("infofile update failed (%s,'%s'): %s\n"), dp->host->hostname, dp->name, strerror(save_errno)); log_add(L_ERROR, _("infofile update failed (%s,'%s'): %s\n"), dp->host->hostname, dp->name, strerror(save_errno)); error(_("infofile update failed (%s,'%s'): %s\n"), dp->host->hostname, dp->name, strerror(save_errno)); /*NOTREACHED*/ } close_infofile(); } void update_info_taper( sched_t *sp, char *label, off_t filenum, int level) { info_t info; stats_t *infp; int rc; disk_t *dp = sp->disk; if (!label) { log_add(L_ERROR, "update_info_taper without label"); return; } rc = open_infofile(getconf_str(CNF_INFOFILE)); if(rc) { error(_("could not open infofile %s: %s (%d)"), getconf_str(CNF_INFOFILE), strerror(errno), rc); /*NOTREACHED*/ } get_info(dp->host->hostname, dp->name, &info); infp = &info.inf[level]; /* XXX - should we record these two if no-record? */ strncpy(infp->label, label, sizeof(infp->label)-1); infp->label[sizeof(infp->label)-1] = '\0'; infp->filenum = filenum; info.command = NO_COMMAND; if (put_info(dp->host->hostname, dp->name, &info)) { int save_errno = errno; g_fprintf(stderr, _("infofile update failed (%s,'%s'): %s\n"), dp->host->hostname, dp->name, strerror(save_errno)); log_add(L_ERROR, _("infofile update failed (%s,'%s'): %s\n"), dp->host->hostname, dp->name, strerror(save_errno)); error(_("infofile update failed (%s,'%s'): %s\n"), dp->host->hostname, dp->name, strerror(save_errno)); /*NOTREACHED*/ } close_infofile(); } /* Free an array of pointers to assignedhd_t after freeing the * assignedhd_t themselves. The array must be NULL-terminated. */ void free_assignedhd( assignedhd_t **ahd) { int i; if( !ahd ) { return; } for( i = 0; ahd[i]; i++ ) { amfree(ahd[i]->destname); amfree(ahd[i]); } amfree(ahd); }